From e034d6a8d034a3f8ca755bf89fae850f6387c505 Mon Sep 17 00:00:00 2001 From: Nikodemus Siivola Date: Mon, 14 Nov 2011 12:17:27 +0200 Subject: [PATCH] sb-concurrency: add Allegro-style gate objects --- contrib/sb-concurrency/gate.lisp | 90 ++++++++++++++++++ contrib/sb-concurrency/mailbox.lisp | 8 +- contrib/sb-concurrency/package.lisp | 12 ++- contrib/sb-concurrency/queue.lisp | 9 +- contrib/sb-concurrency/sb-concurrency.asd | 6 +- contrib/sb-concurrency/sb-concurrency.texinfo | 44 ++++----- contrib/sb-concurrency/tests/test-gate.lisp | 124 +++++++++++++++++++++++++ contrib/sb-concurrency/tests/test-utils.lisp | 13 ++- 8 files changed, 274 insertions(+), 32 deletions(-) create mode 100644 contrib/sb-concurrency/gate.lisp create mode 100644 contrib/sb-concurrency/tests/test-gate.lisp diff --git a/contrib/sb-concurrency/gate.lisp b/contrib/sb-concurrency/gate.lisp new file mode 100644 index 0000000..9628d57 --- /dev/null +++ b/contrib/sb-concurrency/gate.lisp @@ -0,0 +1,90 @@ +;;;; -*- Lisp -*- +;;;; +;;;; This software is part of the SBCL system. See the README file for +;;;; more information. +;;;; +;;;; This software is derived from the CMU CL system, which was +;;;; written at Carnegie Mellon University and released into the +;;;; public domain. The software is in the public domain and is +;;;; provided with absolutely no warranty. See the COPYING and CREDITS +;;;; files for more information. + +(in-package :sb-concurrency) + +;;;; FIXME: On Linux a direct futex-based implementation would be more +;;;; efficient. + +(defstruct (gate (:constructor %make-gate) + (:copier nil) + (:predicate gatep)) + "GATE type. Gates are syncronization constructs suitable for making +multiple threads wait for single event before proceeding. + +Use WAIT-ON-GATE to wait for a gate to open, OPEN-GATE to open one, +and CLOSE-GATE to close an open gate. GATE-OPEN-P can be used to test +the state of a gate without blocking." + (mutex (missing-arg) :type mutex) + (queue (missing-arg) :type waitqueue) + (state :closed :type (member :open :closed)) + (name nil :type (or null simple-string))) + +(setf (documentation 'gatep 'function) + "Returns true if the argument is a GATE." + (documentation 'gate-name 'function) + "Name of a GATE. SETFable.") + +(defmethod print-object ((gate gate) stream) + (print-unreadable-object (gate stream :type t :identity t) + (format stream "~@[~S ~]~((~A)~)" + (gate-name gate) + (gate-state gate)))) + +(defun make-gate (&key name open) + "Makes a new gate. Gate will be initially open if OPEN is true, and closed if OPEN +is NIL (the default.) NAME, if provided, is the name of the gate, used when printing +the gate." + (flet ((generate-name (thing) + (when name + (format nil "gate ~S's ~A" name thing)))) + (%make-gate + :name name + :mutex (make-mutex :name (generate-name "lock")) + :queue (make-waitqueue :name (generate-name "condition variable")) + :state (if open :open :closed)))) + +(defun open-gate (gate) + "Opens GATE. Returns T if the gate was previously closed, and NIL +if the gate was already open." + (declare (gate gate)) + (let (closed) + (with-mutex ((gate-mutex gate)) + (setf closed (eq :closed (gate-state gate)) + (gate-state gate) :open) + (condition-broadcast (gate-queue gate))) + closed)) + +(defun close-gate (gate) + "Closes GATE. Returns T if the gate was previously open, and NIL +if the gate was already closed." + (declare (gate gate)) + (let (open) + (with-mutex ((gate-mutex gate)) + (setf open (eq :open (gate-state gate)) + (gate-state gate) :closed)) + open)) + +(defun wait-on-gate (gate &key timeout) + "Waits for GATE to open, or TIMEOUT seconds to pass. Returns T +if the gate was opened in time, and NIL otherwise." + (declare (gate gate)) + (with-mutex ((gate-mutex gate)) + (loop until (eq :open (gate-state gate)) + do (or (condition-wait (gate-queue gate) (gate-mutex gate) + :timeout timeout) + (return-from wait-on-gate nil)))) + t) + +(defun gate-open-p (gate) + "Returns true if GATE is open." + (declare (gate gate)) + (eq :open (gate-state gate))) diff --git a/contrib/sb-concurrency/mailbox.lisp b/contrib/sb-concurrency/mailbox.lisp index c4acea5..dfa6c8f 100644 --- a/contrib/sb-concurrency/mailbox.lisp +++ b/contrib/sb-concurrency/mailbox.lisp @@ -18,7 +18,13 @@ (defstruct (mailbox (:constructor %make-mailbox (queue semaphore name)) (:copier nil) (:predicate mailboxp)) - "Mailbox aka message queue." + "Mailbox aka message queue. + +SEND-MESSAGE adds a message to the mailbox, RECEIVE-MESSAGE waits till +a message becomes available, whereas RECEIVE-MESSAGE-NO-HANG is a non-blocking +variant, and RECEIVE-PENDING-MESSAGES empties the entire mailbox in one go. + +Messages can be arbitrary objects" (queue (missing-arg) :type queue) (semaphore (missing-arg) :type semaphore) (name nil)) diff --git a/contrib/sb-concurrency/package.lisp b/contrib/sb-concurrency/package.lisp index a9124b6..7e6c6bd 100644 --- a/contrib/sb-concurrency/package.lisp +++ b/contrib/sb-concurrency/package.lisp @@ -1,5 +1,5 @@ (defpackage :sb-concurrency - (:use :cl :sb-thread) + (:use :cl :sb-thread :sb-int) (:export ;; MAILBOX "LIST-MAILBOX-MESSAGES" @@ -24,4 +24,14 @@ "QUEUE-EMPTY-P" "QUEUE-NAME" "QUEUEP" + + ;; GATE + "CLOSE-GATE" + "GATE" + "GATE-NAME" + "GATE-OPEN-P" + "GATEP" + "MAKE-GATE" + "OPEN-GATE" + "WAIT-ON-GATE" )) \ No newline at end of file diff --git a/contrib/sb-concurrency/queue.lisp b/contrib/sb-concurrency/queue.lisp index 5fa7007..bf0bc98 100644 --- a/contrib/sb-concurrency/queue.lisp +++ b/contrib/sb-concurrency/queue.lisp @@ -25,7 +25,9 @@ (defstruct (queue (:constructor %make-queue (head tail name)) (:copier nil) (:predicate queuep)) - "Lock-free thread safe queue." + "Lock-free thread safe FIFO queue. + +Use ENQUEUE to add objects to the queue, and DEQUEUE to remove them." (head (error "No HEAD.") :type node) (tail (error "No TAIL.") :type node) (name nil)) @@ -117,7 +119,8 @@ and secondary value." (defun list-queue-contents (queue) "Returns the contents of QUEUE as a list without removing them from the -QUEUE. Mainly useful for manual examination of queue state." +QUEUE. Mainly useful for manual examination of queue state, as the list +may be out of date by the time it is returned." (let (all) (labels ((walk (node) ;; Since NEXT pointers are always right, traversing from tail @@ -134,7 +137,7 @@ QUEUE. Mainly useful for manual examination of queue state." (defun queue-count (queue) "Returns the number of objects in QUEUE. Mainly useful for manual examination of queue state, and in PRINT-OBJECT methods: inefficient as it -walks the entire queue." +must walk the entire queue." (let ((n 0)) (declare (unsigned-byte n)) (labels ((walk (node) diff --git a/contrib/sb-concurrency/sb-concurrency.asd b/contrib/sb-concurrency/sb-concurrency.asd index 4246d5b..b692132 100644 --- a/contrib/sb-concurrency/sb-concurrency.asd +++ b/contrib/sb-concurrency/sb-concurrency.asd @@ -14,7 +14,8 @@ (asdf:defsystem :sb-concurrency :components ((:file "package") (:file "queue" :depends-on ("package")) - (:file "mailbox" :depends-on ("package" "queue")))) + (:file "mailbox" :depends-on ("package" "queue")) + (:file "gate" :depends-on ("package")))) (asdf:defsystem :sb-concurrency-tests :depends-on (:sb-concurrency :sb-rt) @@ -24,7 +25,8 @@ ((:file "package") (:file "test-utils" :depends-on ("package")) (:file "test-queue" :depends-on ("package" "test-utils")) - (:file "test-mailbox" :depends-on ("package" "test-utils")))))) + (:file "test-mailbox" :depends-on ("package" "test-utils")) + (:file "test-gate" :depends-on ("package" "test-utils")))))) (defmethod asdf:perform :after ((o asdf:load-op) (c (eql (asdf:find-system :sb-concurrency)))) diff --git a/contrib/sb-concurrency/sb-concurrency.texinfo b/contrib/sb-concurrency/sb-concurrency.texinfo index f81223c..e61538e 100644 --- a/contrib/sb-concurrency/sb-concurrency.texinfo +++ b/contrib/sb-concurrency/sb-concurrency.texinfo @@ -22,15 +22,6 @@ Before SBCL 1.0.38, this implementation resided in its own contrib (@pxref{sb-queue}) which is still provided for backwards-compatibility but which has since been deprecated. -@sp 1 -@unnumberedsubsubsec Synopsis: - -@code{enqueue} can be used to add objects to a queue, and -@code{dequeue} retrieves items from a queue in FIFO order. - -@sp 1 -@unnumberedsubsubsec Dictionary: - @include struct-sb-concurrency-queue.texinfo @include fun-sb-concurrency-dequeue.texinfo @@ -48,23 +39,10 @@ but which has since been deprecated. @code{sb-concurrency:mailbox} is a lock-free message queue where one or multiple ends can send messages to one or multiple receivers. The -difference to @ref{Section sb-concurrency:queue} is that the receiving +difference to @ref{Section sb-concurrency:queue, queues} is that the receiving end may block until a message arrives. @*@* -The implementation is based on the Queue implementation above -(@pxref{Structure sb-concurrency:queue}.) - -@sp 1 -@unnumberedsubsubsec Synopsis: -@code{send-message} can be used to send a message to a mailbox, and -@code{receive-message} retrieves a message from a mailbox, or blocks -until a new message arrives. @code{receive-message-no-hang} is the -non-blocking variant. -@*@* -Messages can be any object. - -@sp 1 -@unnumberedsubsubsec Dictionary: +Built on top of the @ref{Structure sb-concurrency:queue, queue} implementation. @include struct-sb-concurrency-mailbox.texinfo @@ -78,3 +56,21 @@ Messages can be any object. @include fun-sb-concurrency-receive-message-no-hang.texinfo @include fun-sb-concurrency-receive-pending-messages.texinfo @include fun-sb-concurrency-send-message.texinfo + +@page +@anchor{Section sb-concurrency:gate} +@subsection Gates +@cindex Gate + +@code{sb-concurrency:gate} is a synchronization object suitable for when +multiple threads must wait for a single event before proceeding. + +@include struct-sb-concurrency-gate.texinfo + +@include fun-sb-concurrency-close-gate.texinfo +@include fun-sb-concurrency-gate-name.texinfo +@include fun-sb-concurrency-gate-open-p.texinfo +@include fun-sb-concurrency-gatep.texinfo +@include fun-sb-concurrency-make-gate.texinfo +@include fun-sb-concurrency-open-gate.texinfo +@include fun-sb-concurrency-wait-on-gate.texinfo diff --git a/contrib/sb-concurrency/tests/test-gate.lisp b/contrib/sb-concurrency/tests/test-gate.lisp new file mode 100644 index 0000000..a25a07e --- /dev/null +++ b/contrib/sb-concurrency/tests/test-gate.lisp @@ -0,0 +1,124 @@ +;;;; -*- Lisp -*- +;;;; +;;;; This software is part of the SBCL system. See the README file for +;;;; more information. +;;;; +;;;; This software is derived from the CMU CL system, which was +;;;; written at Carnegie Mellon University and released into the +;;;; public domain. The software is in the public domain and is +;;;; provided with absolutely no warranty. See the COPYING and CREDITS +;;;; files for more information. + +(in-package :sb-concurrency-test) + +;;; Create threads waiting until a gate is opened, then open that +;;; gate and assure that all waiters were waked up. Also make sure +;;; that interrupting a thread waiting on a gate doesn't make it +;;; cross the gate if it is closed. +(deftest gate.1 + (let* ((gate (make-gate)) + (marks (make-array 100 :initial-element nil)) + (threads (loop for i from 0 below (length marks) + collect (make-thread (lambda (n) + (wait-on-gate gate) + (setf (aref marks n) (cons n (aref marks n)))) + :arguments i))) + (int-gate (make-gate))) + (sleep 1) + (interrupt-thread (car threads) (lambda () + (unwind-protect + (when (gate-open-p gate) + (sb-ext:quit)) + (open-gate int-gate)))) + (wait-on-gate int-gate) + (assert (every #'null marks)) + (open-gate gate) + (mapc #'join-thread threads) + (dotimes (i (length marks)) + (assert (equal (list i) (aref marks i)))) + t) + t) + +;;; Assure that CLOSE-GATE can close a gate while other threads are operating +;;; through that gate. In particular, assure that no operation is performed +;;; once the gate is closed. +(deftest gate.2 + (let* ((gate (make-gate)) + (cont (make-gate)) + (marks (make-array 100 :initial-element nil)) + (threads (loop for i from 0 below (length marks) + collect (make-thread (lambda (n) + (wait-on-gate gate) + (when (oddp n) + (sleep 1.0)) + (wait-on-gate gate) + (setf (aref marks n) (cons n (aref marks n)))) + :arguments i)))) + (open-gate gate) + (sleep 0.5) + (close-gate gate) + (let (odds evens) + (loop while threads + do (push (pop threads) evens) + (push (pop threads) odds)) + (mapc #'join-thread evens) + (loop for i from 0 below (length marks) + do (if (oddp i) + (assert (not (aref marks i))) + (assert (equal (list i) (aref marks i))))) + (open-gate gate) + (mapc #'join-thread odds) + (loop for i from 0 below (length marks) + do (when (oddp i) + (assert (equal (list i) (aref marks i))))) + t)) + t) + +;;; Assures that WAIT-ON-GATE can be interrupted by deadlines. +(deftest gate-deadline.1 + (let* ((gate (make-gate)) + (waiter (make-thread (lambda () + (block nil + (handler-bind ((sb-sys:deadline-timeout + #'(lambda (c) + (return :deadline)))) + (sb-sys:with-deadline (:seconds 0.1) + (wait-on-gate gate)))))))) + (join-thread waiter)) + :deadline) + +;;; Assure that WAIT-ON-GATE can be interrupted by deadlines, and resumed from +;;; the deadline handler. +(deftest gate-deadline.1 + (let* ((gate (make-gate)) + (ready (make-gate)) + (cancel nil) + (waiter (make-thread (lambda () + (block nil + (handler-bind ((sb-sys:deadline-timeout + #'(lambda (c) + (setf cancel t) + (sb-sys:cancel-deadline c)))) + (sb-sys:with-deadline (:seconds 0.1) + (open-gate ready) + (wait-on-gate gate)))))))) + (wait-on-gate ready) + (sleep 1.0) + (open-gate gate) + (values (join-thread waiter) cancel)) + t t) + +(deftest gate-timeout.1 + (let* ((gate (make-gate)) + (waiter (make-thread (lambda () + (wait-on-gate gate :timeout 0.1))))) + (join-thread waiter)) + nil) + +(deftest gate-timeout.2 + (let* ((gate (make-gate)) + (waiter (make-thread (lambda () + (open-gate gate) + (wait-on-gate gate :timeout 0.1))))) + (join-thread waiter)) + t) diff --git a/contrib/sb-concurrency/tests/test-utils.lisp b/contrib/sb-concurrency/tests/test-utils.lisp index bc21ebc..a6e4414 100644 --- a/contrib/sb-concurrency/tests/test-utils.lisp +++ b/contrib/sb-concurrency/tests/test-utils.lisp @@ -1,3 +1,14 @@ +;;;; -*- Lisp -*- +;;;; +;;;; This software is part of the SBCL system. See the README file for +;;;; more information. +;;;; +;;;; This software is derived from the CMU CL system, which was +;;;; written at Carnegie Mellon University and released into the +;;;; public domain. The software is in the public domain and is +;;;; provided with absolutely no warranty. See the COPYING and CREDITS +;;;; files for more information. + (in-package :sb-concurrency-test) #+sb-thread @@ -23,4 +34,4 @@ (ignore-errors (terminate-thread thread)))) -) ;; #+sb-thread (progn ... \ No newline at end of file +) ;; #+sb-thread (progn ... -- 1.7.10.4