From: Nikodemus Siivola Date: Thu, 10 Nov 2011 09:27:15 +0000 (+0200) Subject: timeouts on semaphores and mailboxes, fix timeouts on condition variables X-Git-Url: http://repo.macrolet.net/gitweb/?a=commitdiff_plain;h=1ecff2d1bc56850bf2f262a56402df4683fc57d9;p=sbcl.git timeouts on semaphores and mailboxes, fix timeouts on condition variables * Accidentally put in the version of condition variable timeouts that just looked like a spurious wakeup instead of returning NIL without grabbing the mutex. Ooops -- fixed that. * The issue with mailbox tests on Darwin at least appears to be related to our usage of pthread functions inside signal handlers. --- diff --git a/contrib/sb-concurrency/mailbox.lisp b/contrib/sb-concurrency/mailbox.lisp index ba4f651..c4acea5 100644 --- a/contrib/sb-concurrency/mailbox.lisp +++ b/contrib/sb-concurrency/mailbox.lisp @@ -66,19 +66,23 @@ mailbox. Does not remove messages from the mailbox." (enqueue message (mailbox-queue mailbox)) (signal-semaphore (mailbox-semaphore mailbox)))) -;;; TODO: TIMEOUT argument. -(defun receive-message (mailbox &key) - "Removes the oldest message from MAILBOX and returns it as the -primary value. If MAILBOX is empty waits until a message arrives." +(defun receive-message (mailbox &key timeout) + "Removes the oldest message from MAILBOX and returns it as the primary +value, and a secondary value of T. If MAILBOX is empty waits until a message +arrives. + +If TIMEOUT is provided, and no message arrives within the specified interval, +returns primary and secondary value of NIL." (tagbody ;; Disable interrupts for keeping semaphore count in sync with ;; #msgs in the mailbox. (sb-sys:without-interrupts (sb-sys:allow-with-interrupts - (wait-on-semaphore (mailbox-semaphore mailbox))) + (or (wait-on-semaphore (mailbox-semaphore mailbox) :timeout timeout) + (return-from receive-message (values nil nil)))) (multiple-value-bind (value ok) (dequeue (mailbox-queue mailbox)) (if ok - (return-from receive-message value) + (return-from receive-message (values value t)) (go :error)))) :error (sb-int:bug "Mailbox ~S empty after WAIT-ON-SEMAPHORE." diff --git a/contrib/sb-concurrency/tests/test-mailbox.lisp b/contrib/sb-concurrency/tests/test-mailbox.lisp index b93b344..db4b221 100644 --- a/contrib/sb-concurrency/tests/test-mailbox.lisp +++ b/contrib/sb-concurrency/tests/test-mailbox.lisp @@ -38,8 +38,29 @@ (3 nil (#\1 #\2 #\3) nil) (0 t nil t)) -;;; FIXME: Several tests disabled on Darwin due to hangs. Something not right -;;; with mailboxes -- or possibly semaphores -- there. +(deftest mailbox-timeouts + (let* ((mbox (make-mailbox)) + (writers (loop for i from 1 upto 20 + collect (make-thread + (lambda (x) + (loop repeat 50 + do (send-message mbox x) + (sleep 0.001))) + :arguments i))) + (readers (loop repeat 10 + collect (make-thread + (lambda () + (loop while (receive-message mbox :timeout 0.1) + count t)))))) + (mapc #'join-thread writers) + (apply #'+ (mapcar #'join-thread readers))) + 1000) + +;;; FIXME: Several tests disabled on Darwin and SunOS due to hangs. +;;; +;;; On Darwin at least the issues don't seem to have anything to do with +;;; mailboxes per-se, but are rather related to our usage of signal-unsafe +;;; pthread functions inside signal handlers. #+(and sb-thread (not (or darwin sunos))) (progn diff --git a/src/code/target-thread.lisp b/src/code/target-thread.lisp index 4b566f6..0361ec5 100644 --- a/src/code/target-thread.lisp +++ b/src/code/target-thread.lisp @@ -793,7 +793,8 @@ around the call, checking the the associated data: " #!-sb-thread (declare (ignore queue timeout)) (assert mutex) - #!-sb-thread (error "Not supported in unithread builds.") + #!-sb-thread + (wait-for nil :timeout timeout) ; Yeah... #!+sb-thread (let ((me *current-thread*)) (barrier (:read)) @@ -861,23 +862,27 @@ around the call, checking the the associated data: (when (and (eq :timeout status) deadlinep) (let ((got-it (%try-mutex mutex me))) (allow-with-interrupts - (signal-deadline)) - (cond (got-it - (return-from condition-wait t)) - (t - (setf (values to-sec to-usec stop-sec stop-usec deadlinep) - (decode-timeout timeout)))))) + (signal-deadline) + (cond (got-it + (return-from condition-wait t)) + (t + ;; The deadline may have changed. + (setf (values to-sec to-usec stop-sec stop-usec deadlinep) + (decode-timeout timeout)) + (setf status :ok)))))) ;; Re-acquire the mutex for normal return. - (unless (or (%try-mutex mutex me) - (allow-with-interrupts - (%wait-for-mutex mutex me timeout - to-sec to-usec - stop-sec stop-usec deadlinep))) + (when (eq :ok status) + (unless (or (%try-mutex mutex me) + (allow-with-interrupts + (%wait-for-mutex mutex me timeout + to-sec to-usec + stop-sec stop-usec deadlinep))) + (setf status :timeout))))) + (or (eq :ok status) + (unless (eq :timeout status) ;; The only case we return normally without re-acquiring the ;; mutex is when there is a :TIMEOUT that runs out. - (aver (and timeout (not deadlinep))) - (return-from condition-wait nil))))))) - t) + (bug "CONDITION-WAIT: invalid status on normal return: ~S" status))))))) (defun condition-notify (queue &optional (n 1)) #!+sb-doc @@ -951,16 +956,22 @@ future." "Create a semaphore with the supplied COUNT and NAME." (%make-semaphore name count)) -(defun wait-on-semaphore (semaphore) +(defun wait-on-semaphore (semaphore &key timeout) #!+sb-doc - "Decrement the count of SEMAPHORE if the count would not be -negative. Else blocks until the semaphore can be decremented." + "Decrement the count of SEMAPHORE if the count would not be negative. Else +blocks until the semaphore can be decremented. Returns T on success. + +If TIMEOUT is given, it is the maximum number of seconds to wait. If the count +cannot be decremented in that time, returns NIL without decrementing the +count." ;; A more direct implementation based directly on futexes should be ;; possible. ;; ;; We need to disable interrupts so that we don't forget to ;; decrement the waitcount (which would happen if an asynch ;; interrupt should catch us on our way out from the loop.) + ;; + ;; FIXME: No timeout on initial mutex acquisition. (with-system-mutex ((semaphore-mutex semaphore) :allow-with-interrupts t) ;; Quick check: is it positive? If not, enter the wait loop. (let ((count (semaphore-%count semaphore))) @@ -975,12 +986,15 @@ negative. Else blocks until the semaphore can be decremented." ;; at most one increment per thread waiting on the semaphore. (sb!ext:atomic-incf (semaphore-waitcount semaphore)) (loop until (plusp (setf count (semaphore-%count semaphore))) - do (condition-wait (semaphore-queue semaphore) - (semaphore-mutex semaphore))) + do (or (condition-wait (semaphore-queue semaphore) + (semaphore-mutex semaphore) + :timeout timeout) + (return-from wait-on-semaphore nil))) (setf (semaphore-%count semaphore) (1- count))) ;; Need to use ATOMIC-DECF instead of DECF, as CONDITION-WAIT ;; may unwind without the lock being held due to timeouts. - (sb!ext:atomic-decf (semaphore-waitcount semaphore))))))) + (sb!ext:atomic-decf (semaphore-waitcount semaphore)))))) + t) (defun try-semaphore (semaphore &optional (n 1)) #!+sb-doc diff --git a/tests/threads.pure.lisp b/tests/threads.pure.lisp index 0816e51..d8ae4c4 100644 --- a/tests/threads.pure.lisp +++ b/tests/threads.pure.lisp @@ -524,3 +524,62 @@ (sb-ext:wait-for nil :timeout 10) (error "oops")) (sb-sys:deadline-timeout () :deadline))))) + +(with-test (:name (:condition-wait :timeout :one-thread)) + (let ((mutex (make-mutex)) + (waitqueue (make-waitqueue))) + (assert (not (with-mutex (mutex) + (condition-wait waitqueue mutex :timeout 0.01)))))) + +(with-test (:name (:condition-wait :timeout :many-threads) + :skipped-on '(not :sb-thread)) + (let* ((mutex (make-mutex)) + (waitqueue (make-waitqueue)) + (sem (make-semaphore)) + (data nil) + (workers + (loop repeat 100 + collect (make-thread + (lambda () + (wait-on-semaphore sem) + (block thread + (with-mutex (mutex) + (loop until data + do (or (condition-wait waitqueue mutex :timeout 0.01) + (return-from thread nil))) + (assert (eq t (pop data))) + t))))))) + (loop repeat 50 + do (with-mutex (mutex) + (push t data) + (condition-notify waitqueue))) + (signal-semaphore sem 100) + (let ((ok (count-if #'join-thread workers))) + (unless (eql 50 ok) + (error "Wanted 50, got ~S" ok))))) + +(with-test (:name (:wait-on-semaphore :timeout :one-thread)) + (let ((sem (make-semaphore)) + (n 0)) + (signal-semaphore sem 10) + (loop repeat 100 + do (when (wait-on-semaphore sem :timeout 0.001) + (incf n))) + (assert (= n 10)))) + +(with-test (:name (:wait-on-semaphore :timeout :many-threads) + :skipped-on '(not :sb-thread)) + (let* ((sem (make-semaphore)) + (threads + (progn + (signal-semaphore sem 10) + (loop repeat 100 + collect (make-thread + (lambda () + (sleep (random 0.02)) + (wait-on-semaphore sem :timeout 0.01))))))) + (loop repeat 5 + do (signal-semaphore sem 2)) + (let ((ok (count-if #'join-thread threads))) + (unless (eql 20 ok) + (error "Wanted 20, got ~S" ok)))))