(assert mutex)
#!-sb-thread (error "Not supported in unithread builds.")
#!+sb-thread
- (let ((owner (mutex-%owner mutex)))
+ (let ((me *current-thread*))
+ (assert (eq me (mutex-%owner mutex)))
(/show0 "CONDITION-WAITing")
#!+sb-lutex
- (progn
- ;; FIXME: This doesn't look interrupt safe!
- (setf (mutex-%owner mutex) nil)
- (with-lutex-address (queue-lutex-address (waitqueue-lutex queue))
- (with-lutex-address (mutex-lutex-address (mutex-lutex mutex))
- (%lutex-wait queue-lutex-address mutex-lutex-address)))
- (setf (mutex-%owner mutex) owner))
+ ;; Need to disable interrupts so that we don't miss setting the owner on
+ ;; our way out. (pthread_cond_wait handles the actual re-acquisition.)
+ (without-interrupts
+ (unwind-protect
+ (progn
+ (setf (mutex-%owner mutex) nil)
+ (with-lutex-address (queue-lutex-address (waitqueue-lutex queue))
+ (with-lutex-address (mutex-lutex-address (mutex-lutex mutex))
+ (with-local-interrupts
+ (%lutex-wait queue-lutex-address mutex-lutex-address)))))
+ (setf (mutex-%owner mutex) me)))
#!-sb-lutex
- (unwind-protect
- (let ((me *current-thread*))
- ;; FIXME: should we do something to ensure that the result
- ;; of this setf is visible to all CPUs?
- (setf (waitqueue-data queue) me)
- (release-mutex mutex)
- ;; Now we go to sleep using futex-wait. If anyone else
- ;; manages to grab MUTEX and call CONDITION-NOTIFY during
- ;; this comment, it will change queue->data, and so
- ;; futex-wait returns immediately instead of sleeping.
- ;; Ergo, no lost wakeup. We may get spurious wakeups,
- ;; but that's ok.
- (multiple-value-bind (to-sec to-usec) (decode-timeout nil)
- (when (= 1 (with-pinned-objects (queue me)
- (futex-wait (waitqueue-data-address queue)
- (get-lisp-obj-address me)
- (or to-sec -1) ;; our way if saying "no timeout"
- (or to-usec 0))))
- (signal-deadline))))
- ;; If we are interrupted while waiting, we should do these things
- ;; before returning. Ideally, in the case of an unhandled signal,
- ;; we should do them before entering the debugger, but this is
- ;; better than nothing.
- (get-mutex mutex owner))))
+ ;; Need to disable interrupts so that we don't miss grabbing the mutex
+ ;; on our way out.
+ (without-interrupts
+ (unwind-protect
+ (let ((me *current-thread*))
+ ;; FIXME: should we do something to ensure that the result
+ ;; of this setf is visible to all CPUs?
+ (setf (waitqueue-data queue) me)
+ (release-mutex mutex)
+ ;; Now we go to sleep using futex-wait. If anyone else
+ ;; manages to grab MUTEX and call CONDITION-NOTIFY during
+ ;; this comment, it will change queue->data, and so
+ ;; futex-wait returns immediately instead of sleeping.
+ ;; Ergo, no lost wakeup. We may get spurious wakeups,
+ ;; but that's ok.
+ (multiple-value-bind (to-sec to-usec) (decode-timeout nil)
+ (when (= 1 (with-pinned-objects (queue me)
+ (allow-with-interrupts
+ (futex-wait (waitqueue-data-address queue)
+ (get-lisp-obj-address me)
+ (or to-sec -1) ;; our way if saying "no timeout"
+ (or to-usec 0)))))
+ (signal-deadline))))
+ ;; If we are interrupted while waiting, we should do these things
+ ;; before returning. Ideally, in the case of an unhandled signal,
+ ;; we should do them before entering the debugger, but this is
+ ;; better than nothing.
+ (get-mutex mutex)))))
(defun condition-notify (queue &optional (n 1))
#!+sb-doc
future."
(name nil :type (or null simple-string))
(%count 0 :type (integer 0))
+ (waitcount 0 :type (integer 0))
(mutex (make-mutex))
(queue (make-waitqueue)))
#!+sb-doc
"Decrement the count of SEMAPHORE if the count would not be
negative. Else blocks until the semaphore can be decremented."
- ;; a more direct implementation based directly on futexes should be
- ;; possible
- (with-mutex ((semaphore-mutex semaphore))
- (loop until (> (semaphore-%count semaphore) 0)
- do (condition-wait (semaphore-queue semaphore) (semaphore-mutex semaphore))
- finally (decf (semaphore-%count semaphore)))))
+ ;; A more direct implementation based directly on futexes should be
+ ;; possible.
+ ;;
+ ;; We need to disable interrupts so that we don't forget to decrement the
+ ;; waitcount (which would happen if an asynch interrupt should catch us on
+ ;; our way out from the loop.)
+ (with-system-mutex ((semaphore-mutex semaphore) :allow-with-interrupts t)
+ ;; Quick check: is it positive? If not, enter the wait loop.
+ (let ((count (semaphore-%count semaphore)))
+ (if (plusp count)
+ (setf (semaphore-%count semaphore) (1- count))
+ (unwind-protect
+ (progn
+ (incf (semaphore-waitcount semaphore))
+ (loop until (plusp (setf count (semaphore-%count semaphore)))
+ do (condition-wait (semaphore-queue semaphore) (semaphore-mutex semaphore)))
+ (setf (semaphore-%count semaphore) (1- count)))
+ (decf (semaphore-waitcount semaphore)))))))
(defun signal-semaphore (semaphore &optional (n 1))
#!+sb-doc
"Increment the count of SEMAPHORE by N. If there are threads waiting
on this semaphore, then N of them is woken up."
(declare (type (integer 1) n))
- (with-mutex ((semaphore-mutex semaphore))
- (when (= n (incf (semaphore-%count semaphore) n))
- (condition-notify (semaphore-queue semaphore) n))))
+ ;; Need to disable interrupts so that we don't lose a wakeup after we have
+ ;; incremented the count.
+ (with-system-mutex ((semaphore-mutex semaphore))
+ (let ((waitcount (semaphore-waitcount semaphore))
+ (count (incf (semaphore-%count semaphore) n)))
+ (when (plusp waitcount)
+ (condition-notify (semaphore-queue semaphore) (min waitcount count))))))
;;;; job control, independent listeners
(setf oops t))))))))
(mapcar #'sb-thread:join-thread threads)
(assert (not oops))))
+
+#+sb-thread
+(with-test (:name :semaphore-multiple-waiters)
+ (let ((semaphore (make-semaphore :name "test sem")))
+ (labels ((make-readers (n i)
+ (values
+ (loop for r from 0 below n
+ collect
+ (let ((r r))
+ (sb-thread:make-thread (lambda ()
+ (let ((sem semaphore))
+ (dotimes (s i)
+ (sb-thread:wait-on-semaphore sem))))
+ :name "reader")))
+ (* n i)))
+ (make-writers (n readers i)
+ (let ((j (* readers i)))
+ (multiple-value-bind (k rem) (truncate j n)
+ (values
+ (let ((writers
+ (loop for w from 0 below n
+ collect
+ (let ((w w))
+ (sb-thread:make-thread (lambda ()
+ (let ((sem semaphore))
+ (dotimes (s k)
+ (sb-thread:signal-semaphore sem))))
+ :name "writer")))))
+ (assert (zerop rem))
+ writers)
+ (+ rem (* n k))))))
+ (test (r w n)
+ (multiple-value-bind (readers x) (make-readers r n)
+ (assert (= (length readers) r))
+ (multiple-value-bind (writers y) (make-writers w r n)
+ (assert (= (length writers) w))
+ (assert (= x y))
+ (mapc #'sb-thread:join-thread writers)
+ (mapc #'sb-thread:join-thread readers)
+ (assert (zerop (sb-thread:semaphore-count semaphore)))
+ (values)))))
+ (assert
+ (eq :ok
+ (handler-case
+ (sb-ext:with-timeout 10
+ (test 1 1 100)
+ (test 2 2 10000)
+ (test 4 2 10000)
+ (test 4 2 10000)
+ (test 10 10 10000)
+ (test 10 1 10000)
+ :ok)
+ (sb-ext:timeout ()
+ :timeout)))))))
+