(enqueue message (mailbox-queue mailbox))
(signal-semaphore (mailbox-semaphore mailbox))))
-;;; TODO: TIMEOUT argument.
-(defun receive-message (mailbox &key)
- "Removes the oldest message from MAILBOX and returns it as the
-primary value. If MAILBOX is empty waits until a message arrives."
+(defun receive-message (mailbox &key timeout)
+ "Removes the oldest message from MAILBOX and returns it as the primary
+value, and a secondary value of T. If MAILBOX is empty waits until a message
+arrives.
+
+If TIMEOUT is provided, and no message arrives within the specified interval,
+returns primary and secondary value of NIL."
(tagbody
;; Disable interrupts for keeping semaphore count in sync with
;; #msgs in the mailbox.
(sb-sys:without-interrupts
(sb-sys:allow-with-interrupts
- (wait-on-semaphore (mailbox-semaphore mailbox)))
+ (or (wait-on-semaphore (mailbox-semaphore mailbox) :timeout timeout)
+ (return-from receive-message (values nil nil))))
(multiple-value-bind (value ok) (dequeue (mailbox-queue mailbox))
(if ok
- (return-from receive-message value)
+ (return-from receive-message (values value t))
(go :error))))
:error
(sb-int:bug "Mailbox ~S empty after WAIT-ON-SEMAPHORE."
(3 nil (#\1 #\2 #\3) nil)
(0 t nil t))
-;;; FIXME: Several tests disabled on Darwin due to hangs. Something not right
-;;; with mailboxes -- or possibly semaphores -- there.
+(deftest mailbox-timeouts
+ (let* ((mbox (make-mailbox))
+ (writers (loop for i from 1 upto 20
+ collect (make-thread
+ (lambda (x)
+ (loop repeat 50
+ do (send-message mbox x)
+ (sleep 0.001)))
+ :arguments i)))
+ (readers (loop repeat 10
+ collect (make-thread
+ (lambda ()
+ (loop while (receive-message mbox :timeout 0.1)
+ count t))))))
+ (mapc #'join-thread writers)
+ (apply #'+ (mapcar #'join-thread readers)))
+ 1000)
+
+;;; FIXME: Several tests disabled on Darwin and SunOS due to hangs.
+;;;
+;;; On Darwin at least the issues don't seem to have anything to do with
+;;; mailboxes per-se, but are rather related to our usage of signal-unsafe
+;;; pthread functions inside signal handlers.
#+(and sb-thread (not (or darwin sunos)))
(progn
"
#!-sb-thread (declare (ignore queue timeout))
(assert mutex)
- #!-sb-thread (error "Not supported in unithread builds.")
+ #!-sb-thread
+ (wait-for nil :timeout timeout) ; Yeah...
#!+sb-thread
(let ((me *current-thread*))
(barrier (:read))
(when (and (eq :timeout status) deadlinep)
(let ((got-it (%try-mutex mutex me)))
(allow-with-interrupts
- (signal-deadline))
- (cond (got-it
- (return-from condition-wait t))
- (t
- (setf (values to-sec to-usec stop-sec stop-usec deadlinep)
- (decode-timeout timeout))))))
+ (signal-deadline)
+ (cond (got-it
+ (return-from condition-wait t))
+ (t
+ ;; The deadline may have changed.
+ (setf (values to-sec to-usec stop-sec stop-usec deadlinep)
+ (decode-timeout timeout))
+ (setf status :ok))))))
;; Re-acquire the mutex for normal return.
- (unless (or (%try-mutex mutex me)
- (allow-with-interrupts
- (%wait-for-mutex mutex me timeout
- to-sec to-usec
- stop-sec stop-usec deadlinep)))
+ (when (eq :ok status)
+ (unless (or (%try-mutex mutex me)
+ (allow-with-interrupts
+ (%wait-for-mutex mutex me timeout
+ to-sec to-usec
+ stop-sec stop-usec deadlinep)))
+ (setf status :timeout)))))
+ (or (eq :ok status)
+ (unless (eq :timeout status)
;; The only case we return normally without re-acquiring the
;; mutex is when there is a :TIMEOUT that runs out.
- (aver (and timeout (not deadlinep)))
- (return-from condition-wait nil)))))))
- t)
+ (bug "CONDITION-WAIT: invalid status on normal return: ~S" status)))))))
(defun condition-notify (queue &optional (n 1))
#!+sb-doc
"Create a semaphore with the supplied COUNT and NAME."
(%make-semaphore name count))
-(defun wait-on-semaphore (semaphore)
+(defun wait-on-semaphore (semaphore &key timeout)
#!+sb-doc
- "Decrement the count of SEMAPHORE if the count would not be
-negative. Else blocks until the semaphore can be decremented."
+ "Decrement the count of SEMAPHORE if the count would not be negative. Else
+blocks until the semaphore can be decremented. Returns T on success.
+
+If TIMEOUT is given, it is the maximum number of seconds to wait. If the count
+cannot be decremented in that time, returns NIL without decrementing the
+count."
;; A more direct implementation based directly on futexes should be
;; possible.
;;
;; We need to disable interrupts so that we don't forget to
;; decrement the waitcount (which would happen if an asynch
;; interrupt should catch us on our way out from the loop.)
+ ;;
+ ;; FIXME: No timeout on initial mutex acquisition.
(with-system-mutex ((semaphore-mutex semaphore) :allow-with-interrupts t)
;; Quick check: is it positive? If not, enter the wait loop.
(let ((count (semaphore-%count semaphore)))
;; at most one increment per thread waiting on the semaphore.
(sb!ext:atomic-incf (semaphore-waitcount semaphore))
(loop until (plusp (setf count (semaphore-%count semaphore)))
- do (condition-wait (semaphore-queue semaphore)
- (semaphore-mutex semaphore)))
+ do (or (condition-wait (semaphore-queue semaphore)
+ (semaphore-mutex semaphore)
+ :timeout timeout)
+ (return-from wait-on-semaphore nil)))
(setf (semaphore-%count semaphore) (1- count)))
;; Need to use ATOMIC-DECF instead of DECF, as CONDITION-WAIT
;; may unwind without the lock being held due to timeouts.
- (sb!ext:atomic-decf (semaphore-waitcount semaphore)))))))
+ (sb!ext:atomic-decf (semaphore-waitcount semaphore))))))
+ t)
(defun try-semaphore (semaphore &optional (n 1))
#!+sb-doc
(sb-ext:wait-for nil :timeout 10)
(error "oops"))
(sb-sys:deadline-timeout () :deadline)))))
+
+(with-test (:name (:condition-wait :timeout :one-thread))
+ (let ((mutex (make-mutex))
+ (waitqueue (make-waitqueue)))
+ (assert (not (with-mutex (mutex)
+ (condition-wait waitqueue mutex :timeout 0.01))))))
+
+(with-test (:name (:condition-wait :timeout :many-threads)
+ :skipped-on '(not :sb-thread))
+ (let* ((mutex (make-mutex))
+ (waitqueue (make-waitqueue))
+ (sem (make-semaphore))
+ (data nil)
+ (workers
+ (loop repeat 100
+ collect (make-thread
+ (lambda ()
+ (wait-on-semaphore sem)
+ (block thread
+ (with-mutex (mutex)
+ (loop until data
+ do (or (condition-wait waitqueue mutex :timeout 0.01)
+ (return-from thread nil)))
+ (assert (eq t (pop data)))
+ t)))))))
+ (loop repeat 50
+ do (with-mutex (mutex)
+ (push t data)
+ (condition-notify waitqueue)))
+ (signal-semaphore sem 100)
+ (let ((ok (count-if #'join-thread workers)))
+ (unless (eql 50 ok)
+ (error "Wanted 50, got ~S" ok)))))
+
+(with-test (:name (:wait-on-semaphore :timeout :one-thread))
+ (let ((sem (make-semaphore))
+ (n 0))
+ (signal-semaphore sem 10)
+ (loop repeat 100
+ do (when (wait-on-semaphore sem :timeout 0.001)
+ (incf n)))
+ (assert (= n 10))))
+
+(with-test (:name (:wait-on-semaphore :timeout :many-threads)
+ :skipped-on '(not :sb-thread))
+ (let* ((sem (make-semaphore))
+ (threads
+ (progn
+ (signal-semaphore sem 10)
+ (loop repeat 100
+ collect (make-thread
+ (lambda ()
+ (sleep (random 0.02))
+ (wait-on-semaphore sem :timeout 0.01)))))))
+ (loop repeat 5
+ do (signal-semaphore sem 2))
+ (let ((ok (count-if #'join-thread threads)))
+ (unless (eql 20 ok)
+ (error "Wanted 20, got ~S" ok)))))