sb-concurrency: add Allegro-style gate objects
authorNikodemus Siivola <nikodemus@random-state.net>
Mon, 14 Nov 2011 10:17:27 +0000 (12:17 +0200)
committerNikodemus Siivola <nikodemus@random-state.net>
Mon, 14 Nov 2011 10:58:22 +0000 (12:58 +0200)
contrib/sb-concurrency/gate.lisp [new file with mode: 0644]
contrib/sb-concurrency/mailbox.lisp
contrib/sb-concurrency/package.lisp
contrib/sb-concurrency/queue.lisp
contrib/sb-concurrency/sb-concurrency.asd
contrib/sb-concurrency/sb-concurrency.texinfo
contrib/sb-concurrency/tests/test-gate.lisp [new file with mode: 0644]
contrib/sb-concurrency/tests/test-utils.lisp

diff --git a/contrib/sb-concurrency/gate.lisp b/contrib/sb-concurrency/gate.lisp
new file mode 100644 (file)
index 0000000..9628d57
--- /dev/null
@@ -0,0 +1,90 @@
+;;;; -*-  Lisp -*-
+;;;;
+;;;; This software is part of the SBCL system. See the README file for
+;;;; more information.
+;;;;
+;;;; This software is derived from the CMU CL system, which was
+;;;; written at Carnegie Mellon University and released into the
+;;;; public domain. The software is in the public domain and is
+;;;; provided with absolutely no warranty. See the COPYING and CREDITS
+;;;; files for more information.
+
+(in-package :sb-concurrency)
+
+;;;; FIXME: On Linux a direct futex-based implementation would be more
+;;;; efficient.
+
+(defstruct (gate (:constructor %make-gate)
+                 (:copier nil)
+                 (:predicate gatep))
+  "GATE type. Gates are syncronization constructs suitable for making
+multiple threads wait for single event before proceeding.
+
+Use WAIT-ON-GATE to wait for a gate to open, OPEN-GATE to open one,
+and CLOSE-GATE to close an open gate. GATE-OPEN-P can be used to test
+the state of a gate without blocking."
+  (mutex (missing-arg) :type mutex)
+  (queue (missing-arg) :type waitqueue)
+  (state :closed :type (member :open :closed))
+  (name  nil :type (or null simple-string)))
+
+(setf (documentation 'gatep 'function)
+      "Returns true if the argument is a GATE."
+      (documentation 'gate-name 'function)
+      "Name of a GATE. SETFable.")
+
+(defmethod print-object ((gate gate) stream)
+  (print-unreadable-object (gate stream :type t :identity t)
+    (format stream "~@[~S ~]~((~A)~)"
+            (gate-name gate)
+            (gate-state gate))))
+
+(defun make-gate (&key name open)
+  "Makes a new gate. Gate will be initially open if OPEN is true, and closed if OPEN
+is NIL (the default.) NAME, if provided, is the name of the gate, used when printing
+the gate."
+  (flet ((generate-name (thing)
+           (when name
+             (format nil "gate ~S's ~A" name thing))))
+    (%make-gate
+     :name name
+     :mutex (make-mutex :name (generate-name "lock"))
+     :queue (make-waitqueue :name (generate-name "condition variable"))
+     :state (if open :open :closed))))
+
+(defun open-gate (gate)
+  "Opens GATE. Returns T if the gate was previously closed, and NIL
+if the gate was already open."
+  (declare (gate gate))
+  (let (closed)
+    (with-mutex ((gate-mutex gate))
+      (setf closed (eq :closed (gate-state gate))
+            (gate-state gate) :open)
+      (condition-broadcast (gate-queue gate)))
+    closed))
+
+(defun close-gate (gate)
+  "Closes GATE. Returns T if the gate was previously open, and NIL
+if the gate was already closed."
+  (declare (gate gate))
+  (let (open)
+    (with-mutex ((gate-mutex gate))
+      (setf open (eq :open (gate-state gate))
+            (gate-state gate) :closed))
+    open))
+
+(defun wait-on-gate (gate &key timeout)
+  "Waits for GATE to open, or TIMEOUT seconds to pass. Returns T
+if the gate was opened in time, and NIL otherwise."
+  (declare (gate gate))
+  (with-mutex ((gate-mutex gate))
+    (loop until (eq :open (gate-state gate))
+          do (or (condition-wait (gate-queue gate) (gate-mutex gate)
+                              :timeout timeout)
+                 (return-from wait-on-gate nil))))
+  t)
+
+(defun gate-open-p (gate)
+  "Returns true if GATE is open."
+  (declare (gate gate))
+  (eq :open (gate-state gate)))
index c4acea5..dfa6c8f 100644 (file)
 (defstruct (mailbox (:constructor %make-mailbox (queue semaphore name))
                     (:copier nil)
                     (:predicate mailboxp))
-  "Mailbox aka message queue."
+  "Mailbox aka message queue.
+
+SEND-MESSAGE adds a message to the mailbox, RECEIVE-MESSAGE waits till
+a message becomes available, whereas RECEIVE-MESSAGE-NO-HANG is a non-blocking
+variant, and RECEIVE-PENDING-MESSAGES empties the entire mailbox in one go.
+
+Messages can be arbitrary objects"
   (queue (missing-arg) :type queue)
   (semaphore (missing-arg) :type semaphore)
   (name nil))
index a9124b6..7e6c6bd 100644 (file)
@@ -1,5 +1,5 @@
 (defpackage :sb-concurrency
-  (:use :cl :sb-thread)
+  (:use :cl :sb-thread :sb-int)
   (:export
    ;; MAILBOX
    "LIST-MAILBOX-MESSAGES"
    "QUEUE-EMPTY-P"
    "QUEUE-NAME"
    "QUEUEP"
+
+   ;; GATE
+   "CLOSE-GATE"
+   "GATE"
+   "GATE-NAME"
+   "GATE-OPEN-P"
+   "GATEP"
+   "MAKE-GATE"
+   "OPEN-GATE"
+   "WAIT-ON-GATE"
    ))
\ No newline at end of file
index 5fa7007..bf0bc98 100644 (file)
@@ -25,7 +25,9 @@
 (defstruct (queue (:constructor %make-queue (head tail name))
                   (:copier nil)
                   (:predicate queuep))
-  "Lock-free thread safe queue."
+  "Lock-free thread safe FIFO queue.
+
+Use ENQUEUE to add objects to the queue, and DEQUEUE to remove them."
   (head (error "No HEAD.") :type node)
   (tail (error "No TAIL.") :type node)
   (name nil))
@@ -117,7 +119,8 @@ and secondary value."
 
 (defun list-queue-contents (queue)
   "Returns the contents of QUEUE as a list without removing them from the
-QUEUE. Mainly useful for manual examination of queue state."
+QUEUE. Mainly useful for manual examination of queue state, as the list
+may be out of date by the time it is returned."
   (let (all)
     (labels ((walk (node)
                ;; Since NEXT pointers are always right, traversing from tail
@@ -134,7 +137,7 @@ QUEUE. Mainly useful for manual examination of queue state."
 (defun queue-count (queue)
   "Returns the number of objects in QUEUE. Mainly useful for manual
 examination of queue state, and in PRINT-OBJECT methods: inefficient as it
-walks the entire queue."
+must walk the entire queue."
   (let ((n 0))
     (declare (unsigned-byte n))
     (labels ((walk (node)
index 4246d5b..b692132 100644 (file)
@@ -14,7 +14,8 @@
 (asdf:defsystem :sb-concurrency
   :components ((:file "package")
                (:file "queue"    :depends-on ("package"))
-               (:file "mailbox"  :depends-on ("package" "queue"))))
+               (:file "mailbox"  :depends-on ("package" "queue"))
+               (:file "gate"     :depends-on ("package"))))
 
 (asdf:defsystem :sb-concurrency-tests
   :depends-on (:sb-concurrency :sb-rt)
@@ -24,7 +25,8 @@
     ((:file "package")
      (:file "test-utils"   :depends-on ("package"))
      (:file "test-queue"   :depends-on ("package" "test-utils"))
-     (:file "test-mailbox" :depends-on ("package" "test-utils"))))))
+     (:file "test-mailbox" :depends-on ("package" "test-utils"))
+     (:file "test-gate"    :depends-on ("package" "test-utils"))))))
 
 (defmethod asdf:perform :after ((o asdf:load-op)
                                 (c (eql (asdf:find-system :sb-concurrency))))
index f81223c..e61538e 100644 (file)
@@ -22,15 +22,6 @@ Before SBCL 1.0.38, this implementation resided in its own contrib
 (@pxref{sb-queue}) which is still provided for backwards-compatibility
 but which has since been deprecated.
 
-@sp 1
-@unnumberedsubsubsec Synopsis:
-
-@code{enqueue} can be used to add objects to a queue, and
-@code{dequeue} retrieves items from a queue in FIFO order.
-
-@sp 1
-@unnumberedsubsubsec Dictionary:
-
 @include struct-sb-concurrency-queue.texinfo
 
 @include fun-sb-concurrency-dequeue.texinfo
@@ -48,23 +39,10 @@ but which has since been deprecated.
 
 @code{sb-concurrency:mailbox} is a lock-free message queue where one
 or multiple ends can send messages to one or multiple receivers. The
-difference to @ref{Section sb-concurrency:queue} is that the receiving
+difference to @ref{Section sb-concurrency:queue, queues} is that the receiving
 end may block until a message arrives.
 @*@*
-The implementation is based on the Queue implementation above
-(@pxref{Structure sb-concurrency:queue}.)
-
-@sp 1
-@unnumberedsubsubsec Synopsis:
-@code{send-message} can be used to send a message to a mailbox, and
-@code{receive-message} retrieves a message from a mailbox, or blocks
-until a new message arrives. @code{receive-message-no-hang} is the
-non-blocking variant.
-@*@*
-Messages can be any object.
-
-@sp 1
-@unnumberedsubsubsec Dictionary:
+Built on top of the @ref{Structure sb-concurrency:queue, queue} implementation.
 
 @include struct-sb-concurrency-mailbox.texinfo
 
@@ -78,3 +56,21 @@ Messages can be any object.
 @include fun-sb-concurrency-receive-message-no-hang.texinfo
 @include fun-sb-concurrency-receive-pending-messages.texinfo
 @include fun-sb-concurrency-send-message.texinfo
+
+@page
+@anchor{Section sb-concurrency:gate}
+@subsection Gates
+@cindex Gate
+
+@code{sb-concurrency:gate} is a synchronization object suitable for when
+multiple threads must wait for a single event before proceeding.
+
+@include struct-sb-concurrency-gate.texinfo
+
+@include fun-sb-concurrency-close-gate.texinfo
+@include fun-sb-concurrency-gate-name.texinfo
+@include fun-sb-concurrency-gate-open-p.texinfo
+@include fun-sb-concurrency-gatep.texinfo
+@include fun-sb-concurrency-make-gate.texinfo
+@include fun-sb-concurrency-open-gate.texinfo
+@include fun-sb-concurrency-wait-on-gate.texinfo
diff --git a/contrib/sb-concurrency/tests/test-gate.lisp b/contrib/sb-concurrency/tests/test-gate.lisp
new file mode 100644 (file)
index 0000000..a25a07e
--- /dev/null
@@ -0,0 +1,124 @@
+;;;; -*-  Lisp -*-
+;;;;
+;;;; This software is part of the SBCL system. See the README file for
+;;;; more information.
+;;;;
+;;;; This software is derived from the CMU CL system, which was
+;;;; written at Carnegie Mellon University and released into the
+;;;; public domain. The software is in the public domain and is
+;;;; provided with absolutely no warranty. See the COPYING and CREDITS
+;;;; files for more information.
+
+(in-package :sb-concurrency-test)
+
+;;; Create threads waiting until a gate is opened, then open that
+;;; gate and assure that all waiters were waked up. Also make sure
+;;; that interrupting a thread waiting on a gate doesn't make it
+;;; cross the gate if it is closed.
+(deftest gate.1
+    (let* ((gate (make-gate))
+           (marks (make-array 100 :initial-element nil))
+           (threads (loop for i from 0 below (length marks)
+                          collect (make-thread (lambda (n)
+                                                 (wait-on-gate gate)
+                                                 (setf (aref marks n) (cons n (aref marks n))))
+                                               :arguments i)))
+           (int-gate (make-gate)))
+      (sleep 1)
+      (interrupt-thread (car threads) (lambda ()
+                                        (unwind-protect
+                                             (when (gate-open-p gate)
+                                               (sb-ext:quit))
+                                          (open-gate int-gate))))
+      (wait-on-gate int-gate)
+      (assert (every #'null marks))
+      (open-gate gate)
+      (mapc #'join-thread threads)
+      (dotimes (i (length marks))
+        (assert (equal (list i) (aref marks i))))
+      t)
+  t)
+
+;;; Assure that CLOSE-GATE can close a gate while other threads are operating
+;;; through that gate. In particular, assure that no operation is performed
+;;; once the gate is closed.
+(deftest gate.2
+    (let* ((gate (make-gate))
+           (cont (make-gate))
+           (marks (make-array 100 :initial-element nil))
+           (threads (loop for i from 0 below (length marks)
+                          collect (make-thread (lambda (n)
+                                                 (wait-on-gate gate)
+                                                 (when (oddp n)
+                                                   (sleep 1.0))
+                                                 (wait-on-gate gate)
+                                                 (setf (aref marks n) (cons n (aref marks n))))
+                                               :arguments i))))
+      (open-gate gate)
+      (sleep 0.5)
+      (close-gate gate)
+      (let (odds evens)
+        (loop while threads
+              do (push (pop threads) evens)
+                 (push (pop threads) odds))
+        (mapc #'join-thread evens)
+        (loop for i from 0 below (length marks)
+              do (if (oddp i)
+                     (assert (not (aref marks i)))
+                     (assert (equal (list i) (aref marks i)))))
+        (open-gate gate)
+        (mapc #'join-thread odds)
+        (loop for i from 0 below (length marks)
+              do (when (oddp i)
+                   (assert (equal (list i) (aref marks i)))))
+        t))
+  t)
+
+;;; Assures that WAIT-ON-GATE can be interrupted by deadlines.
+(deftest gate-deadline.1
+    (let* ((gate (make-gate))
+           (waiter (make-thread (lambda ()
+                                  (block nil
+                                    (handler-bind ((sb-sys:deadline-timeout
+                                                     #'(lambda (c)
+                                                         (return :deadline))))
+                                      (sb-sys:with-deadline (:seconds 0.1)
+                                        (wait-on-gate gate))))))))
+      (join-thread waiter))
+  :deadline)
+
+;;; Assure that WAIT-ON-GATE can be interrupted by deadlines, and resumed from
+;;; the deadline handler.
+(deftest gate-deadline.1
+    (let* ((gate (make-gate))
+           (ready (make-gate))
+           (cancel nil)
+           (waiter (make-thread (lambda ()
+                                  (block nil
+                                    (handler-bind ((sb-sys:deadline-timeout
+                                                     #'(lambda (c)
+                                                         (setf cancel t)
+                                                         (sb-sys:cancel-deadline c))))
+                                      (sb-sys:with-deadline (:seconds 0.1)
+                                        (open-gate ready)
+                                        (wait-on-gate gate))))))))
+      (wait-on-gate ready)
+      (sleep 1.0)
+      (open-gate gate)
+      (values (join-thread waiter) cancel))
+  t t)
+
+(deftest gate-timeout.1
+    (let* ((gate (make-gate))
+           (waiter (make-thread (lambda ()
+                                  (wait-on-gate gate :timeout 0.1)))))
+      (join-thread waiter))
+  nil)
+
+(deftest gate-timeout.2
+    (let* ((gate (make-gate))
+           (waiter (make-thread (lambda ()
+                                  (open-gate gate)
+                                  (wait-on-gate gate :timeout 0.1)))))
+      (join-thread waiter))
+  t)
index bc21ebc..a6e4414 100644 (file)
@@ -1,3 +1,14 @@
+;;;; -*-  Lisp -*-
+;;;;
+;;;; This software is part of the SBCL system. See the README file for
+;;;; more information.
+;;;;
+;;;; This software is derived from the CMU CL system, which was
+;;;; written at Carnegie Mellon University and released into the
+;;;; public domain. The software is in the public domain and is
+;;;; provided with absolutely no warranty. See the COPYING and CREDITS
+;;;; files for more information.
+
 (in-package :sb-concurrency-test)
 
 #+sb-thread
@@ -23,4 +34,4 @@
     (ignore-errors
       (terminate-thread thread))))
 
-) ;; #+sb-thread (progn ...
\ No newline at end of file
+) ;; #+sb-thread (progn ...