From 9cbef67c0d16955fc77a555e7ad251d93e206524 Mon Sep 17 00:00:00 2001 From: Nikodemus Siivola Date: Fri, 19 Sep 2008 14:55:52 +0000 Subject: [PATCH] 1.0.20.10: semaphore and condition variable fixes * Keep track of waiters on semaphores, so we know when a wakeup is needed. * Interrupt proof semaphores and condition variables. * Check that the current thread owns the mutex in CONDITION-WAIT. --- NEWS | 6 +++ src/code/target-thread.lisp | 108 +++++++++++++++++++++++++++---------------- tests/threads.pure.lisp | 55 ++++++++++++++++++++++ version.lisp-expr | 2 +- 4 files changed, 129 insertions(+), 42 deletions(-) diff --git a/NEWS b/NEWS index 86ccfc0..7a162e2 100644 --- a/NEWS +++ b/NEWS @@ -5,6 +5,12 @@ average 45% less pages pinned (measured from SBCL self build). * bug fix: SB-EXT:COMPARE-AND-SWAP on SYMBOL-VALUE can no longer mutate constant symbols or violate declaimed type of the symbol. + * bug fix: SB-THREAD:SIGNAL-SEMAPHORE could fail to wakeup threads + sleeping on the semaphore in heavily contested semaphores. + * bug fix: semaphores and condition variables were not interrupt + safe. + * bug fix: SB-THREAD:CONDITION-WAIT doesn't allow waits on mutexes + owned by other threads anymore. changes in sbcl-1.0.20 relative to 1.0.19: * minor incompatible change: OPTIMIZE qualities diff --git a/src/code/target-thread.lisp b/src/code/target-thread.lisp index 27e686c..c534ca0 100644 --- a/src/code/target-thread.lisp +++ b/src/code/target-thread.lisp @@ -387,41 +387,50 @@ time we reacquire MUTEX and return to the caller." (assert mutex) #!-sb-thread (error "Not supported in unithread builds.") #!+sb-thread - (let ((owner (mutex-%owner mutex))) + (let ((me *current-thread*)) + (assert (eq me (mutex-%owner mutex))) (/show0 "CONDITION-WAITing") #!+sb-lutex - (progn - ;; FIXME: This doesn't look interrupt safe! - (setf (mutex-%owner mutex) nil) - (with-lutex-address (queue-lutex-address (waitqueue-lutex queue)) - (with-lutex-address (mutex-lutex-address (mutex-lutex mutex)) - (%lutex-wait queue-lutex-address mutex-lutex-address))) - (setf (mutex-%owner mutex) owner)) + ;; Need to disable interrupts so that we don't miss setting the owner on + ;; our way out. (pthread_cond_wait handles the actual re-acquisition.) + (without-interrupts + (unwind-protect + (progn + (setf (mutex-%owner mutex) nil) + (with-lutex-address (queue-lutex-address (waitqueue-lutex queue)) + (with-lutex-address (mutex-lutex-address (mutex-lutex mutex)) + (with-local-interrupts + (%lutex-wait queue-lutex-address mutex-lutex-address))))) + (setf (mutex-%owner mutex) me))) #!-sb-lutex - (unwind-protect - (let ((me *current-thread*)) - ;; FIXME: should we do something to ensure that the result - ;; of this setf is visible to all CPUs? - (setf (waitqueue-data queue) me) - (release-mutex mutex) - ;; Now we go to sleep using futex-wait. If anyone else - ;; manages to grab MUTEX and call CONDITION-NOTIFY during - ;; this comment, it will change queue->data, and so - ;; futex-wait returns immediately instead of sleeping. - ;; Ergo, no lost wakeup. We may get spurious wakeups, - ;; but that's ok. - (multiple-value-bind (to-sec to-usec) (decode-timeout nil) - (when (= 1 (with-pinned-objects (queue me) - (futex-wait (waitqueue-data-address queue) - (get-lisp-obj-address me) - (or to-sec -1) ;; our way if saying "no timeout" - (or to-usec 0)))) - (signal-deadline)))) - ;; If we are interrupted while waiting, we should do these things - ;; before returning. Ideally, in the case of an unhandled signal, - ;; we should do them before entering the debugger, but this is - ;; better than nothing. - (get-mutex mutex owner)))) + ;; Need to disable interrupts so that we don't miss grabbing the mutex + ;; on our way out. + (without-interrupts + (unwind-protect + (let ((me *current-thread*)) + ;; FIXME: should we do something to ensure that the result + ;; of this setf is visible to all CPUs? + (setf (waitqueue-data queue) me) + (release-mutex mutex) + ;; Now we go to sleep using futex-wait. If anyone else + ;; manages to grab MUTEX and call CONDITION-NOTIFY during + ;; this comment, it will change queue->data, and so + ;; futex-wait returns immediately instead of sleeping. + ;; Ergo, no lost wakeup. We may get spurious wakeups, + ;; but that's ok. + (multiple-value-bind (to-sec to-usec) (decode-timeout nil) + (when (= 1 (with-pinned-objects (queue me) + (allow-with-interrupts + (futex-wait (waitqueue-data-address queue) + (get-lisp-obj-address me) + (or to-sec -1) ;; our way if saying "no timeout" + (or to-usec 0))))) + (signal-deadline)))) + ;; If we are interrupted while waiting, we should do these things + ;; before returning. Ideally, in the case of an unhandled signal, + ;; we should do them before entering the debugger, but this is + ;; better than nothing. + (get-mutex mutex))))) (defun condition-notify (queue &optional (n 1)) #!+sb-doc @@ -466,6 +475,7 @@ should be considered an implementation detail, and may change in the future." (name nil :type (or null simple-string)) (%count 0 :type (integer 0)) + (waitcount 0 :type (integer 0)) (mutex (make-mutex)) (queue (make-waitqueue))) @@ -486,21 +496,37 @@ future." #!+sb-doc "Decrement the count of SEMAPHORE if the count would not be negative. Else blocks until the semaphore can be decremented." - ;; a more direct implementation based directly on futexes should be - ;; possible - (with-mutex ((semaphore-mutex semaphore)) - (loop until (> (semaphore-%count semaphore) 0) - do (condition-wait (semaphore-queue semaphore) (semaphore-mutex semaphore)) - finally (decf (semaphore-%count semaphore))))) + ;; A more direct implementation based directly on futexes should be + ;; possible. + ;; + ;; We need to disable interrupts so that we don't forget to decrement the + ;; waitcount (which would happen if an asynch interrupt should catch us on + ;; our way out from the loop.) + (with-system-mutex ((semaphore-mutex semaphore) :allow-with-interrupts t) + ;; Quick check: is it positive? If not, enter the wait loop. + (let ((count (semaphore-%count semaphore))) + (if (plusp count) + (setf (semaphore-%count semaphore) (1- count)) + (unwind-protect + (progn + (incf (semaphore-waitcount semaphore)) + (loop until (plusp (setf count (semaphore-%count semaphore))) + do (condition-wait (semaphore-queue semaphore) (semaphore-mutex semaphore))) + (setf (semaphore-%count semaphore) (1- count))) + (decf (semaphore-waitcount semaphore))))))) (defun signal-semaphore (semaphore &optional (n 1)) #!+sb-doc "Increment the count of SEMAPHORE by N. If there are threads waiting on this semaphore, then N of them is woken up." (declare (type (integer 1) n)) - (with-mutex ((semaphore-mutex semaphore)) - (when (= n (incf (semaphore-%count semaphore) n)) - (condition-notify (semaphore-queue semaphore) n)))) + ;; Need to disable interrupts so that we don't lose a wakeup after we have + ;; incremented the count. + (with-system-mutex ((semaphore-mutex semaphore)) + (let ((waitcount (semaphore-waitcount semaphore)) + (count (incf (semaphore-%count semaphore) n))) + (when (plusp waitcount) + (condition-notify (semaphore-queue semaphore) (min waitcount count)))))) ;;;; job control, independent listeners diff --git a/tests/threads.pure.lisp b/tests/threads.pure.lisp index b10820a..8187b51 100644 --- a/tests/threads.pure.lisp +++ b/tests/threads.pure.lisp @@ -84,3 +84,58 @@ (setf oops t)))))))) (mapcar #'sb-thread:join-thread threads) (assert (not oops)))) + +#+sb-thread +(with-test (:name :semaphore-multiple-waiters) + (let ((semaphore (make-semaphore :name "test sem"))) + (labels ((make-readers (n i) + (values + (loop for r from 0 below n + collect + (let ((r r)) + (sb-thread:make-thread (lambda () + (let ((sem semaphore)) + (dotimes (s i) + (sb-thread:wait-on-semaphore sem)))) + :name "reader"))) + (* n i))) + (make-writers (n readers i) + (let ((j (* readers i))) + (multiple-value-bind (k rem) (truncate j n) + (values + (let ((writers + (loop for w from 0 below n + collect + (let ((w w)) + (sb-thread:make-thread (lambda () + (let ((sem semaphore)) + (dotimes (s k) + (sb-thread:signal-semaphore sem)))) + :name "writer"))))) + (assert (zerop rem)) + writers) + (+ rem (* n k)))))) + (test (r w n) + (multiple-value-bind (readers x) (make-readers r n) + (assert (= (length readers) r)) + (multiple-value-bind (writers y) (make-writers w r n) + (assert (= (length writers) w)) + (assert (= x y)) + (mapc #'sb-thread:join-thread writers) + (mapc #'sb-thread:join-thread readers) + (assert (zerop (sb-thread:semaphore-count semaphore))) + (values))))) + (assert + (eq :ok + (handler-case + (sb-ext:with-timeout 10 + (test 1 1 100) + (test 2 2 10000) + (test 4 2 10000) + (test 4 2 10000) + (test 10 10 10000) + (test 10 1 10000) + :ok) + (sb-ext:timeout () + :timeout))))))) + diff --git a/version.lisp-expr b/version.lisp-expr index c08e1e7..dd67df4 100644 --- a/version.lisp-expr +++ b/version.lisp-expr @@ -17,4 +17,4 @@ ;;; checkins which aren't released. (And occasionally for internal ;;; versions, especially for internal versions off the main CVS ;;; branch, it gets hairier, e.g. "0.pre7.14.flaky4.13".) -"1.0.20.9" +"1.0.20.10" -- 1.7.10.4