ba4f651a602f4a9ed54cadaa6af14d3de41e54f3
[sbcl.git] / contrib / sb-concurrency / mailbox.lisp
1 ;;;; Lock-free mailbox implementation using SB-QUEUE.
2 ;;;;
3 ;;;; Written by Nikodemus Siivola for SBCL.
4 ;;;; Extended by Tobias C Rittweiler.
5 ;;;;
6 ;;;; This software is part of the SBCL system. See the README file for
7 ;;;; more information.
8 ;;;;
9 ;;;; This software is derived from the CMU CL system, which was written at
10 ;;;; Carnegie Mellon University and released into the public domain. The
11 ;;;; software is in the public domain and is provided with absolutely no
12 ;;;; warranty. See the COPYING and CREDITS files for more information.
13
14 (in-package :sb-concurrency)
15
16 ;; TODO: type and values decls
17
18 (defstruct (mailbox (:constructor %make-mailbox (queue semaphore name))
19                     (:copier nil)
20                     (:predicate mailboxp))
21   "Mailbox aka message queue."
22   (queue (missing-arg) :type queue)
23   (semaphore (missing-arg) :type semaphore)
24   (name nil))
25
26 (setf (documentation 'mailboxp 'function)
27       "Returns true if argument is a MAILBOX, NIL otherwise."
28       (documentation 'mailbox-name 'function)
29       "Name of a MAILBOX. SETFable.")
30
31 (defun make-mailbox (&key name initial-contents)
32   "Returns a new MAILBOX with messages in INITIAL-CONTENTS enqueued."
33   (flet ((genname (thing name)
34            (format nil "~:[Mailbox ~A~;~A for mailbox ~S~]" name thing name)))
35     (%make-mailbox (make-queue
36                     :name (genname "Queue" name)
37                     :initial-contents initial-contents)
38                    (make-semaphore
39                     :name (genname "Semaphore" name)
40                     :count (length initial-contents))
41                    name)))
42
43 (defmethod print-object ((mailbox mailbox) stream)
44   (print-unreadable-object (mailbox stream :type t :identity t)
45     (format stream "~@[~S ~](~D msgs pending)"
46             (mailbox-name mailbox)
47             (mailbox-count mailbox)))
48   mailbox)
49
50 (defun mailbox-count (mailbox)
51   "Returns the number of messages currently in the mailbox."
52   (semaphore-count (mailbox-semaphore mailbox)))
53
54 (defun mailbox-empty-p (mailbox)
55   "Returns true if MAILBOX is currently empty, NIL otherwise."
56   (zerop (mailbox-count mailbox)))
57
58 (defun list-mailbox-messages (mailbox)
59   "Returns a fresh list containing all the messages in the
60 mailbox. Does not remove messages from the mailbox."
61   (list-queue-contents (mailbox-queue mailbox)))
62
63 (defun send-message (mailbox message)
64   "Adds a MESSAGE to MAILBOX. Message can be any object."
65   (sb-sys:without-interrupts
66     (enqueue message (mailbox-queue mailbox))
67     (signal-semaphore (mailbox-semaphore mailbox))))
68
69 ;;; TODO: TIMEOUT argument.
70 (defun receive-message (mailbox &key)
71   "Removes the oldest message from MAILBOX and returns it as the
72 primary value. If MAILBOX is empty waits until a message arrives."
73   (tagbody
74      ;; Disable interrupts for keeping semaphore count in sync with
75      ;; #msgs in the mailbox.
76      (sb-sys:without-interrupts
77        (sb-sys:allow-with-interrupts
78          (wait-on-semaphore (mailbox-semaphore mailbox)))
79        (multiple-value-bind (value ok) (dequeue (mailbox-queue mailbox))
80          (if ok
81              (return-from receive-message value)
82              (go :error))))
83    :error
84      (sb-int:bug "Mailbox ~S empty after WAIT-ON-SEMAPHORE."
85                  mailbox)))
86
87 (defun receive-message-no-hang (mailbox)
88   "The non-blocking variant of RECEIVE-MESSAGE. Returns two values,
89 the message removed from MAILBOX, and a flag specifying whether a
90 message could be received."
91   (prog ((semaphore (mailbox-semaphore mailbox))
92          (queue     (mailbox-queue mailbox)))
93      ;; Disable interrupts, v.s.
94      (sb-sys:without-interrupts
95        (unless (sb-sys:allow-with-interrupts
96                  (sb-thread::try-semaphore semaphore))
97          (return (values nil nil)))
98        (multiple-value-bind (value ok) (dequeue queue)
99          (if ok
100              (return (values value t))
101              (go :error))))
102    :error
103      (sb-int:bug "Mailbox ~S empty after successfull TRY-SEMAPHORE."
104                  mailbox)))
105
106 (defun receive-pending-messages (mailbox &optional n)
107   "Removes and returns all (or at most N) currently pending messages
108 from MAILBOX, or returns NIL if no messages are pending.
109
110 Note: Concurrent threads may be snarfing messages during the run of
111 this function, so even though X,Y appear right next to each other in
112 the result, does not necessarily mean that Y was the message sent
113 right after X."
114   (prog* ((msgs  '())
115           (sem   (mailbox-semaphore mailbox))
116           (queue (mailbox-queue mailbox))
117           (avail (mailbox-count mailbox))
118           (count (if n (min n avail) avail)))
119      (when (zerop count)
120        (go :finish))
121      ;; Disable interrupts, v.s.
122      (sb-sys:without-interrupts
123        (unless (sb-sys:allow-with-interrupts
124                  (sb-thread::try-semaphore sem count))
125          (go :slow-path))
126        ;; Safe because QUEUE is private; other threads may be snarfing
127        ;; messages under our feet, though, hence the out of order bit
128        ;; in the docstring. Same for the slow path.
129        (loop
130          (multiple-value-bind (msg ok) (dequeue queue)
131            (unless ok (go :error))
132            (push msg msgs)
133            (when (zerop (decf count))
134              (go :finish)))))
135    ;; This is the slow path as RECEIVE-MESSAGE-NO-HANG will have to
136    ;; lock the semaphore's mutex again and again.
137    :slow-path
138      ;; No need for disabling interrupts because we never leave the
139      ;; mailbox in an inconsistent state here.
140      (loop
141        (multiple-value-bind (msg ok)
142            (receive-message-no-hang mailbox)
143          (unless ok (go :finish))
144          (push msg msgs)
145          (when (zerop (decf count))
146            (go :finish))))
147    :finish
148        (return (nreverse msgs))
149    :error
150        (sb-int:bug "Mailbox ~S empty after successfull TRY-SEMAPHORE."
151                    mailbox)))