timeouts on semaphores and mailboxes, fix timeouts on condition variables
[sbcl.git] / contrib / sb-concurrency / mailbox.lisp
1 ;;;; Lock-free mailbox implementation using SB-QUEUE.
2 ;;;;
3 ;;;; Written by Nikodemus Siivola for SBCL.
4 ;;;; Extended by Tobias C Rittweiler.
5 ;;;;
6 ;;;; This software is part of the SBCL system. See the README file for
7 ;;;; more information.
8 ;;;;
9 ;;;; This software is derived from the CMU CL system, which was written at
10 ;;;; Carnegie Mellon University and released into the public domain. The
11 ;;;; software is in the public domain and is provided with absolutely no
12 ;;;; warranty. See the COPYING and CREDITS files for more information.
13
14 (in-package :sb-concurrency)
15
16 ;; TODO: type and values decls
17
18 (defstruct (mailbox (:constructor %make-mailbox (queue semaphore name))
19                     (:copier nil)
20                     (:predicate mailboxp))
21   "Mailbox aka message queue."
22   (queue (missing-arg) :type queue)
23   (semaphore (missing-arg) :type semaphore)
24   (name nil))
25
26 (setf (documentation 'mailboxp 'function)
27       "Returns true if argument is a MAILBOX, NIL otherwise."
28       (documentation 'mailbox-name 'function)
29       "Name of a MAILBOX. SETFable.")
30
31 (defun make-mailbox (&key name initial-contents)
32   "Returns a new MAILBOX with messages in INITIAL-CONTENTS enqueued."
33   (flet ((genname (thing name)
34            (format nil "~:[Mailbox ~A~;~A for mailbox ~S~]" name thing name)))
35     (%make-mailbox (make-queue
36                     :name (genname "Queue" name)
37                     :initial-contents initial-contents)
38                    (make-semaphore
39                     :name (genname "Semaphore" name)
40                     :count (length initial-contents))
41                    name)))
42
43 (defmethod print-object ((mailbox mailbox) stream)
44   (print-unreadable-object (mailbox stream :type t :identity t)
45     (format stream "~@[~S ~](~D msgs pending)"
46             (mailbox-name mailbox)
47             (mailbox-count mailbox)))
48   mailbox)
49
50 (defun mailbox-count (mailbox)
51   "Returns the number of messages currently in the mailbox."
52   (semaphore-count (mailbox-semaphore mailbox)))
53
54 (defun mailbox-empty-p (mailbox)
55   "Returns true if MAILBOX is currently empty, NIL otherwise."
56   (zerop (mailbox-count mailbox)))
57
58 (defun list-mailbox-messages (mailbox)
59   "Returns a fresh list containing all the messages in the
60 mailbox. Does not remove messages from the mailbox."
61   (list-queue-contents (mailbox-queue mailbox)))
62
63 (defun send-message (mailbox message)
64   "Adds a MESSAGE to MAILBOX. Message can be any object."
65   (sb-sys:without-interrupts
66     (enqueue message (mailbox-queue mailbox))
67     (signal-semaphore (mailbox-semaphore mailbox))))
68
69 (defun receive-message (mailbox &key timeout)
70   "Removes the oldest message from MAILBOX and returns it as the primary
71 value, and a secondary value of T. If MAILBOX is empty waits until a message
72 arrives.
73
74 If TIMEOUT is provided, and no message arrives within the specified interval,
75 returns primary and secondary value of NIL."
76   (tagbody
77      ;; Disable interrupts for keeping semaphore count in sync with
78      ;; #msgs in the mailbox.
79      (sb-sys:without-interrupts
80        (sb-sys:allow-with-interrupts
81          (or (wait-on-semaphore (mailbox-semaphore mailbox) :timeout timeout)
82              (return-from receive-message (values nil nil))))
83        (multiple-value-bind (value ok) (dequeue (mailbox-queue mailbox))
84          (if ok
85              (return-from receive-message (values value t))
86              (go :error))))
87    :error
88      (sb-int:bug "Mailbox ~S empty after WAIT-ON-SEMAPHORE."
89                  mailbox)))
90
91 (defun receive-message-no-hang (mailbox)
92   "The non-blocking variant of RECEIVE-MESSAGE. Returns two values,
93 the message removed from MAILBOX, and a flag specifying whether a
94 message could be received."
95   (prog ((semaphore (mailbox-semaphore mailbox))
96          (queue     (mailbox-queue mailbox)))
97      ;; Disable interrupts, v.s.
98      (sb-sys:without-interrupts
99        (unless (sb-sys:allow-with-interrupts
100                  (sb-thread::try-semaphore semaphore))
101          (return (values nil nil)))
102        (multiple-value-bind (value ok) (dequeue queue)
103          (if ok
104              (return (values value t))
105              (go :error))))
106    :error
107      (sb-int:bug "Mailbox ~S empty after successfull TRY-SEMAPHORE."
108                  mailbox)))
109
110 (defun receive-pending-messages (mailbox &optional n)
111   "Removes and returns all (or at most N) currently pending messages
112 from MAILBOX, or returns NIL if no messages are pending.
113
114 Note: Concurrent threads may be snarfing messages during the run of
115 this function, so even though X,Y appear right next to each other in
116 the result, does not necessarily mean that Y was the message sent
117 right after X."
118   (prog* ((msgs  '())
119           (sem   (mailbox-semaphore mailbox))
120           (queue (mailbox-queue mailbox))
121           (avail (mailbox-count mailbox))
122           (count (if n (min n avail) avail)))
123      (when (zerop count)
124        (go :finish))
125      ;; Disable interrupts, v.s.
126      (sb-sys:without-interrupts
127        (unless (sb-sys:allow-with-interrupts
128                  (sb-thread::try-semaphore sem count))
129          (go :slow-path))
130        ;; Safe because QUEUE is private; other threads may be snarfing
131        ;; messages under our feet, though, hence the out of order bit
132        ;; in the docstring. Same for the slow path.
133        (loop
134          (multiple-value-bind (msg ok) (dequeue queue)
135            (unless ok (go :error))
136            (push msg msgs)
137            (when (zerop (decf count))
138              (go :finish)))))
139    ;; This is the slow path as RECEIVE-MESSAGE-NO-HANG will have to
140    ;; lock the semaphore's mutex again and again.
141    :slow-path
142      ;; No need for disabling interrupts because we never leave the
143      ;; mailbox in an inconsistent state here.
144      (loop
145        (multiple-value-bind (msg ok)
146            (receive-message-no-hang mailbox)
147          (unless ok (go :finish))
148          (push msg msgs)
149          (when (zerop (decf count))
150            (go :finish))))
151    :finish
152        (return (nreverse msgs))
153    :error
154        (sb-int:bug "Mailbox ~S empty after successfull TRY-SEMAPHORE."
155                    mailbox)))