semaphore notification objects
authorNikodemus Siivola <nikodemus@random-state.net>
Thu, 17 Nov 2011 12:11:18 +0000 (14:11 +0200)
committerNikodemus Siivola <nikodemus@random-state.net>
Thu, 17 Nov 2011 17:08:58 +0000 (19:08 +0200)
doc/manual/threading.texinfo
package-data-list.lisp-expr
src/code/target-thread.lisp
tests/threads.pure.lisp

index 610941d..21d2756 100644 (file)
@@ -162,16 +162,22 @@ if you want a bounded wait.
 @comment  node-name,  next,  previous,  up
 @section Semaphores
 
-described here should be considered
-experimental, subject to API changes without notice.
+Semaphores are among other things useful for keeping track of a
+countable resource, eg. messages in a queue, and sleep when the
+resource is exhausted.
 
 @include struct-sb-thread-semaphore.texinfo
 @include fun-sb-thread-make-semaphore.texinfo
-@include fun-sb-thread-semaphore-count.texinfo
-@include fun-sb-thread-semaphore-name.texinfo
 @include fun-sb-thread-signal-semaphore.texinfo
-@include fun-sb-thread-try-semaphore.texinfo
 @include fun-sb-thread-wait-on-semaphore.texinfo
+@include fun-sb-thread-try-semaphore.texinfo
+@include fun-sb-thread-semaphore-count.texinfo
+@include fun-sb-thread-semaphore-name.texinfo
+
+@include struct-sb-thread-semaphore-notification.texinfo
+@include fun-sb-thread-make-semaphore-notification.texinfo
+@include fun-sb-thread-semaphore-notification-status.texinfo
+@include fun-sb-thread-clear-semaphore-notification.texinfo
 
 @node Waitqueue/condition variables
 @comment  node-name,  next,  previous,  up
index 60b14e9..e27b6e8 100644 (file)
@@ -2022,7 +2022,12 @@ is a good idea, but see SB-SYS re. blurring of boundaries."
                "SEMAPHORE-COUNT"
                "SIGNAL-SEMAPHORE"
                "TRY-SEMAPHORE"
-               "WAIT-ON-SEMAPHORE"))
+               "WAIT-ON-SEMAPHORE"
+               ;; Semaphore notification objects
+               "CLEAR-SEMAPHORE-NOTIFICATION"
+               "MAKE-SEMAPHORE-NOTIFICATION"
+               "SEMAPHORE-NOTIFICATION"
+               "SEMAPHORE-NOTIFICATION-STATUS"))
 
    #s(sb-cold:package-data
       :name "SB!LOOP"
index 1bff9ad..4798bf4 100644 (file)
@@ -912,8 +912,38 @@ future."
 (setf (fdocumentation 'semaphore-name 'function)
       "The name of the semaphore INSTANCE. Setfable.")
 
+(defstruct (semaphore-notification (:constructor make-semaphore-notification ())
+                                   (:copier nil))
+  #!+sb-doc
+  "Semaphore notification object. Can be passed to WAIT-ON-SEMAPHORE and
+TRY-SEMAPHORE as the :NOTIFICATION argument. Consequences are undefined if
+multiple threads are using the same notification object in parallel."
+  (%status nil :type boolean))
+
+(setf (fdocumentation 'make-semaphore-notification 'function)
+      "Constructor for SEMAPHORE-NOTIFICATION objects. SEMAPHORE-NOTIFICATION-STATUS
+is initially NIL.")
+
+(declaim (inline semaphore-notification-status))
+(defun semaphore-notification-status (semaphore-notification)
+  #!+sb-doc
+  "Returns T if a WAIT-ON-SEMAPHORE or TRY-SEMAPHORE using
+SEMAPHORE-NOTICATION has succeeded since the notification object was created
+or cleared."
+  (barrier (:read))
+  (semaphore-notification-%status semaphore-notification))
+
+(declaim (inline clear-semaphore-notification))
+(defun clear-semaphore-notification (semaphore-notification)
+  #!+sb-doc
+  "Resets the SEMAPHORE-NOTIFICATION object for use with another call to
+WAIT-ON-SEMAPHORE or TRY-SEMAPHORE."
+  (barrier (:write)
+    (setf (semaphore-notification-%status semaphore-notification) nil)))
+
 (declaim (inline semaphore-count))
 (defun semaphore-count (instance)
+  #!+sb-doc
   "Returns the current count of the semaphore INSTANCE."
   (barrier (:read))
   (semaphore-%count instance))
@@ -923,14 +953,23 @@ future."
   "Create a semaphore with the supplied COUNT and NAME."
   (%make-semaphore name count))
 
-(defun wait-on-semaphore (semaphore &key timeout)
+(defun wait-on-semaphore (semaphore &key timeout notification)
   #!+sb-doc
   "Decrement the count of SEMAPHORE if the count would not be negative. Else
 blocks until the semaphore can be decremented. Returns T on success.
 
 If TIMEOUT is given, it is the maximum number of seconds to wait. If the count
 cannot be decremented in that time, returns NIL without decrementing the
-count."
+count.
+
+If NOTIFICATION is given, it must be a SEMAPHORE-NOTIFICATION object whose
+SEMAPHORE-NOTIFICATION-STATUS is NIL. If WAIT-ON-SEMAPHORE succeeds and
+decrements the count, the status is set to T."
+  (when (and notification (semaphore-notification-status notification))
+    (with-simple-restart (continue "Clear notification status and continue.")
+      (error "~@<Semaphore notification object status not cleared on entry to ~S on ~S.~:@>"
+             'wait-on-semaphore semaphore))
+    (clear-semaphore-notification notification))
   ;; A more direct implementation based directly on futexes should be
   ;; possible.
   ;;
@@ -942,36 +981,55 @@ count."
   (with-system-mutex ((semaphore-mutex semaphore) :allow-with-interrupts t)
     ;; Quick check: is it positive? If not, enter the wait loop.
     (let ((count (semaphore-%count semaphore)))
-      (if (plusp count)
-          (setf (semaphore-%count semaphore) (1- count))
-          (unwind-protect
-               (progn
-                 ;; Need to use ATOMIC-INCF despite the lock, because on our
-                 ;; way out from here we might not be locked anymore -- so
-                 ;; another thread might be tweaking this in parallel using
-                 ;; ATOMIC-DECF. No danger over overflow, since there it
-                 ;; at most one increment per thread waiting on the semaphore.
-                 (sb!ext:atomic-incf (semaphore-waitcount semaphore))
-                 (loop until (plusp (setf count (semaphore-%count semaphore)))
-                       do (or (condition-wait (semaphore-queue semaphore)
-                                              (semaphore-mutex semaphore)
-                                              :timeout timeout)
-                              (return-from wait-on-semaphore nil)))
-                 (setf (semaphore-%count semaphore) (1- count)))
-            ;; Need to use ATOMIC-DECF instead of DECF, as CONDITION-WAIT
-            ;; may unwind without the lock being held due to timeouts.
-            (sb!ext:atomic-decf (semaphore-waitcount semaphore))))))
+      (cond ((plusp count)
+             (setf (semaphore-%count semaphore) (1- count))
+             (when notification
+               (setf (semaphore-notification-%status notification) t)))
+            (t
+             (unwind-protect
+                  (progn
+                    ;; Need to use ATOMIC-INCF despite the lock, because on our
+                    ;; way out from here we might not be locked anymore -- so
+                    ;; another thread might be tweaking this in parallel using
+                    ;; ATOMIC-DECF. No danger over overflow, since there it
+                    ;; at most one increment per thread waiting on the semaphore.
+                    (sb!ext:atomic-incf (semaphore-waitcount semaphore))
+                    (loop until (plusp (setf count (semaphore-%count semaphore)))
+                          do (or (condition-wait (semaphore-queue semaphore)
+                                                 (semaphore-mutex semaphore)
+                                                 :timeout timeout)
+                                 (return-from wait-on-semaphore nil)))
+                    (setf (semaphore-%count semaphore) (1- count))
+                    (when notification
+                      (setf (semaphore-notification-%status notification) t)))
+               ;; Need to use ATOMIC-DECF as we may unwind without the lock
+               ;; being held!
+               (sb!ext:atomic-decf (semaphore-waitcount semaphore)))))))
   t)
 
-(defun try-semaphore (semaphore &optional (n 1))
+(defun try-semaphore (semaphore &optional (n 1) notification)
   #!+sb-doc
   "Try to decrement the count of SEMAPHORE by N. If the count were to
-become negative, punt and return NIL, otherwise return true."
+become negative, punt and return NIL, otherwise return true.
+
+If NOTIFICATION is given it must be a semaphore notification object
+with SEMAPHORE-NOTIFICATION-STATUS of NIL. If the count is decremented,
+the status is set to T."
   (declare (type (integer 1) n))
+  (when (and notification (semaphore-notification-status notification))
+    (with-simple-restart (continue "Clear notification status and continue.")
+      (error "~@<Semaphore notification object status not cleared on entry to ~S on ~S.~:@>"
+             'try-semaphore semaphore))
+    (clear-semaphore-notification notification))
   (with-system-mutex ((semaphore-mutex semaphore) :allow-with-interrupts t)
     (let ((new-count (- (semaphore-%count semaphore) n)))
       (when (not (minusp new-count))
-        (setf (semaphore-%count semaphore) new-count)))))
+        (setf (semaphore-%count semaphore) new-count)
+        (when notification
+          (setf (semaphore-notifiction-%status notification) t))
+        ;; FIXME: We don't actually document this -- should we just
+        ;; return T, or document new count as the return?
+        new-count))))
 
 (defun signal-semaphore (semaphore &optional (n 1))
   #!+sb-doc
index 3d6d119..fb21822 100644 (file)
                 (join-thread (make-thread (lambda () (sleep 10)))
                              :timeout 0.01
                              :default cookie)))))
+
+(with-test (:name :semaphore-notification
+            :skipped-on '(not :sb-thread))
+  (let ((sem (make-semaphore))
+        (ok nil)
+        (n 0))
+    (flet ((critical ()
+             (let ((note (make-semaphore-notification)))
+               (sb-sys:without-interrupts
+                 (unwind-protect
+                      (progn
+                        (sb-sys:with-local-interrupts
+                          (wait-on-semaphore sem :notification note)
+                          (sleep (random 0.1)))
+                        (incf n))
+                   ;; Re-increment on exit if we decremented it.
+                   (when (semaphore-notification-status note)
+                     (signal-semaphore sem))
+                   ;; KLUDGE: Prevent interrupts after this point from
+                   ;; unwinding us, so that we can reason about the counts.
+                   (sb-thread::block-deferrable-signals))))))
+      (let* ((threads (loop for i from 1 upto 100
+                            collect (make-thread #'critical :name (format nil "T~A" i))))
+             (safe nil)
+             (unsafe nil)
+             (interruptor (make-thread (lambda ()
+                                         (loop until ok)
+                                         (let (x)
+                                           (dolist (thread threads)
+                                             (cond (x
+                                                    (push thread unsafe)
+                                                    (sleep (random 0.1))
+                                                    (ignore-errors
+                                                     (terminate-thread thread)))
+                                                   (t
+                                                    (push thread safe)))
+                                             (setf x (not x))))))))
+        (signal-semaphore sem)
+        (setf ok t)
+        (join-thread interruptor)
+        (mapc #'join-thread safe)
+        (let ((k (count-if (lambda (th)
+                             (join-thread th :default nil))
+                           unsafe)))
+          (assert (= n (+ k (length safe))))
+          (assert unsafe))))))