Fix make-array transforms.
[sbcl.git] / contrib / sb-concurrency / mailbox.lisp
1 ;;;; Lock-free mailbox implementation using SB-QUEUE.
2 ;;;;
3 ;;;; Written by Nikodemus Siivola for SBCL.
4 ;;;; Extended by Tobias C Rittweiler.
5 ;;;;
6 ;;;; This software is part of the SBCL system. See the README file for
7 ;;;; more information.
8 ;;;;
9 ;;;; This software is derived from the CMU CL system, which was written at
10 ;;;; Carnegie Mellon University and released into the public domain. The
11 ;;;; software is in the public domain and is provided with absolutely no
12 ;;;; warranty. See the COPYING and CREDITS files for more information.
13
14 (in-package :sb-concurrency)
15
16 ;; TODO: type and values decls
17
18 (defstruct (mailbox (:constructor %make-mailbox (queue semaphore name))
19                     (:copier nil)
20                     (:predicate mailboxp))
21   "Mailbox aka message queue.
22
23 SEND-MESSAGE adds a message to the mailbox, RECEIVE-MESSAGE waits till
24 a message becomes available, whereas RECEIVE-MESSAGE-NO-HANG is a non-blocking
25 variant, and RECEIVE-PENDING-MESSAGES empties the entire mailbox in one go.
26
27 Messages can be arbitrary objects"
28   (queue (missing-arg) :type queue)
29   (semaphore (missing-arg) :type semaphore)
30   (name nil))
31
32 (setf (documentation 'mailboxp 'function)
33       "Returns true if argument is a MAILBOX, NIL otherwise."
34       (documentation 'mailbox-name 'function)
35       "Name of a MAILBOX. SETFable.")
36
37 (defun make-mailbox (&key name initial-contents)
38   "Returns a new MAILBOX with messages in INITIAL-CONTENTS enqueued."
39   (flet ((genname (thing name)
40            (format nil "~:[Mailbox ~A~;~A for mailbox ~S~]" name thing name)))
41     (%make-mailbox (make-queue
42                     :name (genname "Queue" name)
43                     :initial-contents initial-contents)
44                    (make-semaphore
45                     :name (genname "Semaphore" name)
46                     :count (length initial-contents))
47                    name)))
48
49 (defmethod print-object ((mailbox mailbox) stream)
50   (print-unreadable-object (mailbox stream :type t :identity t)
51     (format stream "~@[~S ~](~D msgs pending)"
52             (mailbox-name mailbox)
53             (mailbox-count mailbox)))
54   mailbox)
55
56 (defun mailbox-count (mailbox)
57   "Returns the number of messages currently in the mailbox."
58   (semaphore-count (mailbox-semaphore mailbox)))
59
60 (defun mailbox-empty-p (mailbox)
61   "Returns true if MAILBOX is currently empty, NIL otherwise."
62   (zerop (mailbox-count mailbox)))
63
64 (defun list-mailbox-messages (mailbox)
65   "Returns a fresh list containing all the messages in the
66 mailbox. Does not remove messages from the mailbox."
67   (list-queue-contents (mailbox-queue mailbox)))
68
69 (defun send-message (mailbox message)
70   "Adds a MESSAGE to MAILBOX. Message can be any object."
71   (sb-sys:without-interrupts
72     (enqueue message (mailbox-queue mailbox))
73     (signal-semaphore (mailbox-semaphore mailbox))))
74
75 (defun receive-message (mailbox &key timeout)
76   "Removes the oldest message from MAILBOX and returns it as the primary
77 value, and a secondary value of T. If MAILBOX is empty waits until a message
78 arrives.
79
80 If TIMEOUT is provided, and no message arrives within the specified interval,
81 returns primary and secondary value of NIL."
82   (tagbody
83      ;; Disable interrupts for keeping semaphore count in sync with
84      ;; #msgs in the mailbox.
85      (sb-sys:without-interrupts
86        (sb-sys:allow-with-interrupts
87          (or (wait-on-semaphore (mailbox-semaphore mailbox) :timeout timeout)
88              (return-from receive-message (values nil nil))))
89        (multiple-value-bind (value ok) (dequeue (mailbox-queue mailbox))
90          (if ok
91              (return-from receive-message (values value t))
92              (go :error))))
93    :error
94      (sb-int:bug "Mailbox ~S empty after WAIT-ON-SEMAPHORE."
95                  mailbox)))
96
97 (defun receive-message-no-hang (mailbox)
98   "The non-blocking variant of RECEIVE-MESSAGE. Returns two values,
99 the message removed from MAILBOX, and a flag specifying whether a
100 message could be received."
101   (prog ((semaphore (mailbox-semaphore mailbox))
102          (queue     (mailbox-queue mailbox)))
103      ;; Disable interrupts, v.s.
104      (sb-sys:without-interrupts
105        (unless (sb-sys:allow-with-interrupts
106                  (sb-thread::try-semaphore semaphore))
107          (return (values nil nil)))
108        (multiple-value-bind (value ok) (dequeue queue)
109          (if ok
110              (return (values value t))
111              (go :error))))
112    :error
113      (sb-int:bug "Mailbox ~S empty after successfull TRY-SEMAPHORE."
114                  mailbox)))
115
116 (defun receive-pending-messages (mailbox &optional n)
117   "Removes and returns all (or at most N) currently pending messages
118 from MAILBOX, or returns NIL if no messages are pending.
119
120 Note: Concurrent threads may be snarfing messages during the run of
121 this function, so even though X,Y appear right next to each other in
122 the result, does not necessarily mean that Y was the message sent
123 right after X."
124   (prog* ((msgs  '())
125           (sem   (mailbox-semaphore mailbox))
126           (queue (mailbox-queue mailbox))
127           (avail (mailbox-count mailbox))
128           (count (if n (min n avail) avail)))
129      (when (zerop count)
130        (go :finish))
131      ;; Disable interrupts, v.s.
132      (sb-sys:without-interrupts
133        (unless (sb-sys:allow-with-interrupts
134                  (sb-thread::try-semaphore sem count))
135          (go :slow-path))
136        ;; Safe because QUEUE is private; other threads may be snarfing
137        ;; messages under our feet, though, hence the out of order bit
138        ;; in the docstring. Same for the slow path.
139        (loop
140          (multiple-value-bind (msg ok) (dequeue queue)
141            (unless ok (go :error))
142            (push msg msgs)
143            (when (zerop (decf count))
144              (go :finish)))))
145    ;; This is the slow path as RECEIVE-MESSAGE-NO-HANG will have to
146    ;; lock the semaphore's mutex again and again.
147    :slow-path
148      ;; No need for disabling interrupts because we never leave the
149      ;; mailbox in an inconsistent state here.
150      (loop
151        (multiple-value-bind (msg ok)
152            (receive-message-no-hang mailbox)
153          (unless ok (go :finish))
154          (push msg msgs)
155          (when (zerop (decf count))
156            (go :finish))))
157    :finish
158        (return (nreverse msgs))
159    :error
160        (sb-int:bug "Mailbox ~S empty after successfull TRY-SEMAPHORE."
161                    mailbox)))