timeouts on semaphores and mailboxes, fix timeouts on condition variables
authorNikodemus Siivola <nikodemus@random-state.net>
Thu, 10 Nov 2011 09:27:15 +0000 (11:27 +0200)
committerNikodemus Siivola <nikodemus@random-state.net>
Thu, 10 Nov 2011 12:12:59 +0000 (14:12 +0200)
  * Accidentally put in the version of condition variable timeouts that just
    looked like a spurious wakeup instead of returning NIL without grabbing
    the mutex. Ooops -- fixed that.

  * The issue with mailbox tests on Darwin at least appears to be related
    to our usage of pthread functions inside signal handlers.

contrib/sb-concurrency/mailbox.lisp
contrib/sb-concurrency/tests/test-mailbox.lisp
src/code/target-thread.lisp
tests/threads.pure.lisp

index ba4f651..c4acea5 100644 (file)
@@ -66,19 +66,23 @@ mailbox. Does not remove messages from the mailbox."
     (enqueue message (mailbox-queue mailbox))
     (signal-semaphore (mailbox-semaphore mailbox))))
 
-;;; TODO: TIMEOUT argument.
-(defun receive-message (mailbox &key)
-  "Removes the oldest message from MAILBOX and returns it as the
-primary value. If MAILBOX is empty waits until a message arrives."
+(defun receive-message (mailbox &key timeout)
+  "Removes the oldest message from MAILBOX and returns it as the primary
+value, and a secondary value of T. If MAILBOX is empty waits until a message
+arrives.
+
+If TIMEOUT is provided, and no message arrives within the specified interval,
+returns primary and secondary value of NIL."
   (tagbody
      ;; Disable interrupts for keeping semaphore count in sync with
      ;; #msgs in the mailbox.
      (sb-sys:without-interrupts
        (sb-sys:allow-with-interrupts
-         (wait-on-semaphore (mailbox-semaphore mailbox)))
+         (or (wait-on-semaphore (mailbox-semaphore mailbox) :timeout timeout)
+             (return-from receive-message (values nil nil))))
        (multiple-value-bind (value ok) (dequeue (mailbox-queue mailbox))
          (if ok
-             (return-from receive-message value)
+             (return-from receive-message (values value t))
              (go :error))))
    :error
      (sb-int:bug "Mailbox ~S empty after WAIT-ON-SEMAPHORE."
index b93b344..db4b221 100644 (file)
   (3 nil (#\1 #\2 #\3) nil)
   (0 t nil t))
 
-;;; FIXME: Several tests disabled on Darwin due to hangs. Something not right
-;;; with mailboxes -- or possibly semaphores -- there.
+(deftest mailbox-timeouts
+    (let* ((mbox (make-mailbox))
+           (writers (loop for i from 1 upto 20
+                          collect (make-thread
+                                   (lambda (x)
+                                     (loop repeat 50
+                                           do (send-message mbox x)
+                                              (sleep 0.001)))
+                                   :arguments i)))
+           (readers (loop repeat 10
+                          collect (make-thread
+                                   (lambda ()
+                                     (loop while (receive-message mbox :timeout 0.1)
+                                           count t))))))
+      (mapc #'join-thread writers)
+      (apply #'+ (mapcar #'join-thread readers)))
+  1000)
+
+;;; FIXME: Several tests disabled on Darwin and SunOS due to hangs.
+;;;
+;;; On Darwin at least the issues don't seem to have anything to do with
+;;; mailboxes per-se, but are rather related to our usage of signal-unsafe
+;;; pthread functions inside signal handlers.
 #+(and sb-thread (not (or darwin sunos)))
 (progn
 
index 4b566f6..0361ec5 100644 (file)
@@ -793,7 +793,8 @@ around the call, checking the the associated data:
 "
   #!-sb-thread (declare (ignore queue timeout))
   (assert mutex)
-  #!-sb-thread (error "Not supported in unithread builds.")
+  #!-sb-thread
+  (wait-for nil :timeout timeout) ; Yeah...
   #!+sb-thread
   (let ((me *current-thread*))
     (barrier (:read))
@@ -861,23 +862,27 @@ around the call, checking the the associated data:
             (when (and (eq :timeout status) deadlinep)
               (let ((got-it (%try-mutex mutex me)))
                 (allow-with-interrupts
-                  (signal-deadline))
-                (cond (got-it
-                       (return-from condition-wait t))
-                      (t
-                       (setf (values to-sec to-usec stop-sec stop-usec deadlinep)
-                             (decode-timeout timeout))))))
+                  (signal-deadline)
+                  (cond (got-it
+                         (return-from condition-wait t))
+                        (t
+                         ;; The deadline may have changed.
+                         (setf (values to-sec to-usec stop-sec stop-usec deadlinep)
+                               (decode-timeout timeout))
+                         (setf status :ok))))))
             ;; Re-acquire the mutex for normal return.
-            (unless (or (%try-mutex mutex me)
-                        (allow-with-interrupts
-                          (%wait-for-mutex mutex me timeout
-                                           to-sec to-usec
-                                           stop-sec stop-usec deadlinep)))
+            (when (eq :ok status)
+              (unless (or (%try-mutex mutex me)
+                          (allow-with-interrupts
+                            (%wait-for-mutex mutex me timeout
+                                             to-sec to-usec
+                                             stop-sec stop-usec deadlinep)))
+                (setf status :timeout)))))
+        (or (eq :ok status)
+            (unless (eq :timeout status)
               ;; The only case we return normally without re-acquiring the
               ;; mutex is when there is a :TIMEOUT that runs out.
-              (aver (and timeout (not deadlinep)))
-              (return-from condition-wait nil)))))))
-  t)
+              (bug "CONDITION-WAIT: invalid status on normal return: ~S" status)))))))
 
 (defun condition-notify (queue &optional (n 1))
   #!+sb-doc
@@ -951,16 +956,22 @@ future."
   "Create a semaphore with the supplied COUNT and NAME."
   (%make-semaphore name count))
 
-(defun wait-on-semaphore (semaphore)
+(defun wait-on-semaphore (semaphore &key timeout)
   #!+sb-doc
-  "Decrement the count of SEMAPHORE if the count would not be
-negative. Else blocks until the semaphore can be decremented."
+  "Decrement the count of SEMAPHORE if the count would not be negative. Else
+blocks until the semaphore can be decremented. Returns T on success.
+
+If TIMEOUT is given, it is the maximum number of seconds to wait. If the count
+cannot be decremented in that time, returns NIL without decrementing the
+count."
   ;; A more direct implementation based directly on futexes should be
   ;; possible.
   ;;
   ;; We need to disable interrupts so that we don't forget to
   ;; decrement the waitcount (which would happen if an asynch
   ;; interrupt should catch us on our way out from the loop.)
+  ;;
+  ;; FIXME: No timeout on initial mutex acquisition.
   (with-system-mutex ((semaphore-mutex semaphore) :allow-with-interrupts t)
     ;; Quick check: is it positive? If not, enter the wait loop.
     (let ((count (semaphore-%count semaphore)))
@@ -975,12 +986,15 @@ negative. Else blocks until the semaphore can be decremented."
                  ;; at most one increment per thread waiting on the semaphore.
                  (sb!ext:atomic-incf (semaphore-waitcount semaphore))
                  (loop until (plusp (setf count (semaphore-%count semaphore)))
-                       do (condition-wait (semaphore-queue semaphore)
-                                          (semaphore-mutex semaphore)))
+                       do (or (condition-wait (semaphore-queue semaphore)
+                                              (semaphore-mutex semaphore)
+                                              :timeout timeout)
+                              (return-from wait-on-semaphore nil)))
                  (setf (semaphore-%count semaphore) (1- count)))
             ;; Need to use ATOMIC-DECF instead of DECF, as CONDITION-WAIT
             ;; may unwind without the lock being held due to timeouts.
-            (sb!ext:atomic-decf (semaphore-waitcount semaphore)))))))
+            (sb!ext:atomic-decf (semaphore-waitcount semaphore))))))
+  t)
 
 (defun try-semaphore (semaphore &optional (n 1))
   #!+sb-doc
index 0816e51..d8ae4c4 100644 (file)
                     (sb-ext:wait-for nil :timeout 10)
                     (error "oops"))
                 (sb-sys:deadline-timeout () :deadline)))))
+
+(with-test (:name (:condition-wait :timeout :one-thread))
+  (let ((mutex (make-mutex))
+        (waitqueue (make-waitqueue)))
+    (assert (not (with-mutex (mutex)
+                   (condition-wait waitqueue mutex :timeout 0.01))))))
+
+(with-test (:name (:condition-wait :timeout :many-threads)
+            :skipped-on '(not :sb-thread))
+  (let* ((mutex (make-mutex))
+         (waitqueue (make-waitqueue))
+         (sem (make-semaphore))
+         (data nil)
+         (workers
+           (loop repeat 100
+                 collect (make-thread
+                          (lambda ()
+                            (wait-on-semaphore sem)
+                            (block thread
+                              (with-mutex (mutex)
+                                (loop until data
+                                      do (or (condition-wait waitqueue mutex :timeout 0.01)
+                                             (return-from thread nil)))
+                                (assert (eq t (pop data)))
+                                t)))))))
+    (loop repeat 50
+          do (with-mutex (mutex)
+               (push t data)
+               (condition-notify waitqueue)))
+    (signal-semaphore sem 100)
+    (let ((ok (count-if #'join-thread workers)))
+      (unless (eql 50 ok)
+        (error "Wanted 50, got ~S" ok)))))
+
+(with-test (:name (:wait-on-semaphore :timeout :one-thread))
+  (let ((sem (make-semaphore))
+        (n 0))
+    (signal-semaphore sem 10)
+    (loop repeat 100
+          do (when (wait-on-semaphore sem :timeout 0.001)
+               (incf n)))
+    (assert (= n 10))))
+
+(with-test (:name (:wait-on-semaphore :timeout :many-threads)
+            :skipped-on '(not :sb-thread))
+  (let* ((sem (make-semaphore))
+         (threads
+           (progn
+             (signal-semaphore sem 10)
+             (loop repeat 100
+                   collect (make-thread
+                            (lambda ()
+                              (sleep (random 0.02))
+                              (wait-on-semaphore sem :timeout 0.01)))))))
+    (loop repeat 5
+          do (signal-semaphore sem 2))
+    (let ((ok (count-if #'join-thread threads)))
+      (unless (eql 20 ok)
+        (error "Wanted 20, got ~S" ok)))))