From 9f1903c072936fb15f03f93182c57beb4621b39d Mon Sep 17 00:00:00 2001 From: Nikodemus Siivola Date: Thu, 17 Nov 2011 14:11:18 +0200 Subject: [PATCH] semaphore notification objects --- doc/manual/threading.texinfo | 16 +++++-- package-data-list.lisp-expr | 7 ++- src/code/target-thread.lisp | 106 ++++++++++++++++++++++++++++++++---------- tests/threads.pure.lisp | 46 ++++++++++++++++++ 4 files changed, 145 insertions(+), 30 deletions(-) diff --git a/doc/manual/threading.texinfo b/doc/manual/threading.texinfo index 610941d..21d2756 100644 --- a/doc/manual/threading.texinfo +++ b/doc/manual/threading.texinfo @@ -162,16 +162,22 @@ if you want a bounded wait. @comment node-name, next, previous, up @section Semaphores -described here should be considered -experimental, subject to API changes without notice. +Semaphores are among other things useful for keeping track of a +countable resource, eg. messages in a queue, and sleep when the +resource is exhausted. @include struct-sb-thread-semaphore.texinfo @include fun-sb-thread-make-semaphore.texinfo -@include fun-sb-thread-semaphore-count.texinfo -@include fun-sb-thread-semaphore-name.texinfo @include fun-sb-thread-signal-semaphore.texinfo -@include fun-sb-thread-try-semaphore.texinfo @include fun-sb-thread-wait-on-semaphore.texinfo +@include fun-sb-thread-try-semaphore.texinfo +@include fun-sb-thread-semaphore-count.texinfo +@include fun-sb-thread-semaphore-name.texinfo + +@include struct-sb-thread-semaphore-notification.texinfo +@include fun-sb-thread-make-semaphore-notification.texinfo +@include fun-sb-thread-semaphore-notification-status.texinfo +@include fun-sb-thread-clear-semaphore-notification.texinfo @node Waitqueue/condition variables @comment node-name, next, previous, up diff --git a/package-data-list.lisp-expr b/package-data-list.lisp-expr index 60b14e9..e27b6e8 100644 --- a/package-data-list.lisp-expr +++ b/package-data-list.lisp-expr @@ -2022,7 +2022,12 @@ is a good idea, but see SB-SYS re. blurring of boundaries." "SEMAPHORE-COUNT" "SIGNAL-SEMAPHORE" "TRY-SEMAPHORE" - "WAIT-ON-SEMAPHORE")) + "WAIT-ON-SEMAPHORE" + ;; Semaphore notification objects + "CLEAR-SEMAPHORE-NOTIFICATION" + "MAKE-SEMAPHORE-NOTIFICATION" + "SEMAPHORE-NOTIFICATION" + "SEMAPHORE-NOTIFICATION-STATUS")) #s(sb-cold:package-data :name "SB!LOOP" diff --git a/src/code/target-thread.lisp b/src/code/target-thread.lisp index 1bff9ad..4798bf4 100644 --- a/src/code/target-thread.lisp +++ b/src/code/target-thread.lisp @@ -912,8 +912,38 @@ future." (setf (fdocumentation 'semaphore-name 'function) "The name of the semaphore INSTANCE. Setfable.") +(defstruct (semaphore-notification (:constructor make-semaphore-notification ()) + (:copier nil)) + #!+sb-doc + "Semaphore notification object. Can be passed to WAIT-ON-SEMAPHORE and +TRY-SEMAPHORE as the :NOTIFICATION argument. Consequences are undefined if +multiple threads are using the same notification object in parallel." + (%status nil :type boolean)) + +(setf (fdocumentation 'make-semaphore-notification 'function) + "Constructor for SEMAPHORE-NOTIFICATION objects. SEMAPHORE-NOTIFICATION-STATUS +is initially NIL.") + +(declaim (inline semaphore-notification-status)) +(defun semaphore-notification-status (semaphore-notification) + #!+sb-doc + "Returns T if a WAIT-ON-SEMAPHORE or TRY-SEMAPHORE using +SEMAPHORE-NOTICATION has succeeded since the notification object was created +or cleared." + (barrier (:read)) + (semaphore-notification-%status semaphore-notification)) + +(declaim (inline clear-semaphore-notification)) +(defun clear-semaphore-notification (semaphore-notification) + #!+sb-doc + "Resets the SEMAPHORE-NOTIFICATION object for use with another call to +WAIT-ON-SEMAPHORE or TRY-SEMAPHORE." + (barrier (:write) + (setf (semaphore-notification-%status semaphore-notification) nil))) + (declaim (inline semaphore-count)) (defun semaphore-count (instance) + #!+sb-doc "Returns the current count of the semaphore INSTANCE." (barrier (:read)) (semaphore-%count instance)) @@ -923,14 +953,23 @@ future." "Create a semaphore with the supplied COUNT and NAME." (%make-semaphore name count)) -(defun wait-on-semaphore (semaphore &key timeout) +(defun wait-on-semaphore (semaphore &key timeout notification) #!+sb-doc "Decrement the count of SEMAPHORE if the count would not be negative. Else blocks until the semaphore can be decremented. Returns T on success. If TIMEOUT is given, it is the maximum number of seconds to wait. If the count cannot be decremented in that time, returns NIL without decrementing the -count." +count. + +If NOTIFICATION is given, it must be a SEMAPHORE-NOTIFICATION object whose +SEMAPHORE-NOTIFICATION-STATUS is NIL. If WAIT-ON-SEMAPHORE succeeds and +decrements the count, the status is set to T." + (when (and notification (semaphore-notification-status notification)) + (with-simple-restart (continue "Clear notification status and continue.") + (error "~@" + 'wait-on-semaphore semaphore)) + (clear-semaphore-notification notification)) ;; A more direct implementation based directly on futexes should be ;; possible. ;; @@ -942,36 +981,55 @@ count." (with-system-mutex ((semaphore-mutex semaphore) :allow-with-interrupts t) ;; Quick check: is it positive? If not, enter the wait loop. (let ((count (semaphore-%count semaphore))) - (if (plusp count) - (setf (semaphore-%count semaphore) (1- count)) - (unwind-protect - (progn - ;; Need to use ATOMIC-INCF despite the lock, because on our - ;; way out from here we might not be locked anymore -- so - ;; another thread might be tweaking this in parallel using - ;; ATOMIC-DECF. No danger over overflow, since there it - ;; at most one increment per thread waiting on the semaphore. - (sb!ext:atomic-incf (semaphore-waitcount semaphore)) - (loop until (plusp (setf count (semaphore-%count semaphore))) - do (or (condition-wait (semaphore-queue semaphore) - (semaphore-mutex semaphore) - :timeout timeout) - (return-from wait-on-semaphore nil))) - (setf (semaphore-%count semaphore) (1- count))) - ;; Need to use ATOMIC-DECF instead of DECF, as CONDITION-WAIT - ;; may unwind without the lock being held due to timeouts. - (sb!ext:atomic-decf (semaphore-waitcount semaphore)))))) + (cond ((plusp count) + (setf (semaphore-%count semaphore) (1- count)) + (when notification + (setf (semaphore-notification-%status notification) t))) + (t + (unwind-protect + (progn + ;; Need to use ATOMIC-INCF despite the lock, because on our + ;; way out from here we might not be locked anymore -- so + ;; another thread might be tweaking this in parallel using + ;; ATOMIC-DECF. No danger over overflow, since there it + ;; at most one increment per thread waiting on the semaphore. + (sb!ext:atomic-incf (semaphore-waitcount semaphore)) + (loop until (plusp (setf count (semaphore-%count semaphore))) + do (or (condition-wait (semaphore-queue semaphore) + (semaphore-mutex semaphore) + :timeout timeout) + (return-from wait-on-semaphore nil))) + (setf (semaphore-%count semaphore) (1- count)) + (when notification + (setf (semaphore-notification-%status notification) t))) + ;; Need to use ATOMIC-DECF as we may unwind without the lock + ;; being held! + (sb!ext:atomic-decf (semaphore-waitcount semaphore))))))) t) -(defun try-semaphore (semaphore &optional (n 1)) +(defun try-semaphore (semaphore &optional (n 1) notification) #!+sb-doc "Try to decrement the count of SEMAPHORE by N. If the count were to -become negative, punt and return NIL, otherwise return true." +become negative, punt and return NIL, otherwise return true. + +If NOTIFICATION is given it must be a semaphore notification object +with SEMAPHORE-NOTIFICATION-STATUS of NIL. If the count is decremented, +the status is set to T." (declare (type (integer 1) n)) + (when (and notification (semaphore-notification-status notification)) + (with-simple-restart (continue "Clear notification status and continue.") + (error "~@" + 'try-semaphore semaphore)) + (clear-semaphore-notification notification)) (with-system-mutex ((semaphore-mutex semaphore) :allow-with-interrupts t) (let ((new-count (- (semaphore-%count semaphore) n))) (when (not (minusp new-count)) - (setf (semaphore-%count semaphore) new-count))))) + (setf (semaphore-%count semaphore) new-count) + (when notification + (setf (semaphore-notifiction-%status notification) t)) + ;; FIXME: We don't actually document this -- should we just + ;; return T, or document new count as the return? + new-count)))) (defun signal-semaphore (semaphore &optional (n 1)) #!+sb-doc diff --git a/tests/threads.pure.lisp b/tests/threads.pure.lisp index 3d6d119..fb21822 100644 --- a/tests/threads.pure.lisp +++ b/tests/threads.pure.lisp @@ -495,3 +495,49 @@ (join-thread (make-thread (lambda () (sleep 10))) :timeout 0.01 :default cookie))))) + +(with-test (:name :semaphore-notification + :skipped-on '(not :sb-thread)) + (let ((sem (make-semaphore)) + (ok nil) + (n 0)) + (flet ((critical () + (let ((note (make-semaphore-notification))) + (sb-sys:without-interrupts + (unwind-protect + (progn + (sb-sys:with-local-interrupts + (wait-on-semaphore sem :notification note) + (sleep (random 0.1))) + (incf n)) + ;; Re-increment on exit if we decremented it. + (when (semaphore-notification-status note) + (signal-semaphore sem)) + ;; KLUDGE: Prevent interrupts after this point from + ;; unwinding us, so that we can reason about the counts. + (sb-thread::block-deferrable-signals)))))) + (let* ((threads (loop for i from 1 upto 100 + collect (make-thread #'critical :name (format nil "T~A" i)))) + (safe nil) + (unsafe nil) + (interruptor (make-thread (lambda () + (loop until ok) + (let (x) + (dolist (thread threads) + (cond (x + (push thread unsafe) + (sleep (random 0.1)) + (ignore-errors + (terminate-thread thread))) + (t + (push thread safe))) + (setf x (not x)))))))) + (signal-semaphore sem) + (setf ok t) + (join-thread interruptor) + (mapc #'join-thread safe) + (let ((k (count-if (lambda (th) + (join-thread th :default nil)) + unsafe))) + (assert (= n (+ k (length safe)))) + (assert unsafe)))))) -- 1.7.10.4