--- /dev/null
+;;;; -*- Lisp -*-
+;;;;
+;;;; This software is part of the SBCL system. See the README file for
+;;;; more information.
+;;;;
+;;;; This software is derived from the CMU CL system, which was
+;;;; written at Carnegie Mellon University and released into the
+;;;; public domain. The software is in the public domain and is
+;;;; provided with absolutely no warranty. See the COPYING and CREDITS
+;;;; files for more information.
+
+(in-package :sb-concurrency)
+
+;;;; FIXME: On Linux a direct futex-based implementation would be more
+;;;; efficient.
+
+(defstruct (gate (:constructor %make-gate)
+ (:copier nil)
+ (:predicate gatep))
+ "GATE type. Gates are syncronization constructs suitable for making
+multiple threads wait for single event before proceeding.
+
+Use WAIT-ON-GATE to wait for a gate to open, OPEN-GATE to open one,
+and CLOSE-GATE to close an open gate. GATE-OPEN-P can be used to test
+the state of a gate without blocking."
+ (mutex (missing-arg) :type mutex)
+ (queue (missing-arg) :type waitqueue)
+ (state :closed :type (member :open :closed))
+ (name nil :type (or null simple-string)))
+
+(setf (documentation 'gatep 'function)
+ "Returns true if the argument is a GATE."
+ (documentation 'gate-name 'function)
+ "Name of a GATE. SETFable.")
+
+(defmethod print-object ((gate gate) stream)
+ (print-unreadable-object (gate stream :type t :identity t)
+ (format stream "~@[~S ~]~((~A)~)"
+ (gate-name gate)
+ (gate-state gate))))
+
+(defun make-gate (&key name open)
+ "Makes a new gate. Gate will be initially open if OPEN is true, and closed if OPEN
+is NIL (the default.) NAME, if provided, is the name of the gate, used when printing
+the gate."
+ (flet ((generate-name (thing)
+ (when name
+ (format nil "gate ~S's ~A" name thing))))
+ (%make-gate
+ :name name
+ :mutex (make-mutex :name (generate-name "lock"))
+ :queue (make-waitqueue :name (generate-name "condition variable"))
+ :state (if open :open :closed))))
+
+(defun open-gate (gate)
+ "Opens GATE. Returns T if the gate was previously closed, and NIL
+if the gate was already open."
+ (declare (gate gate))
+ (let (closed)
+ (with-mutex ((gate-mutex gate))
+ (setf closed (eq :closed (gate-state gate))
+ (gate-state gate) :open)
+ (condition-broadcast (gate-queue gate)))
+ closed))
+
+(defun close-gate (gate)
+ "Closes GATE. Returns T if the gate was previously open, and NIL
+if the gate was already closed."
+ (declare (gate gate))
+ (let (open)
+ (with-mutex ((gate-mutex gate))
+ (setf open (eq :open (gate-state gate))
+ (gate-state gate) :closed))
+ open))
+
+(defun wait-on-gate (gate &key timeout)
+ "Waits for GATE to open, or TIMEOUT seconds to pass. Returns T
+if the gate was opened in time, and NIL otherwise."
+ (declare (gate gate))
+ (with-mutex ((gate-mutex gate))
+ (loop until (eq :open (gate-state gate))
+ do (or (condition-wait (gate-queue gate) (gate-mutex gate)
+ :timeout timeout)
+ (return-from wait-on-gate nil))))
+ t)
+
+(defun gate-open-p (gate)
+ "Returns true if GATE is open."
+ (declare (gate gate))
+ (eq :open (gate-state gate)))
(defstruct (mailbox (:constructor %make-mailbox (queue semaphore name))
(:copier nil)
(:predicate mailboxp))
- "Mailbox aka message queue."
+ "Mailbox aka message queue.
+
+SEND-MESSAGE adds a message to the mailbox, RECEIVE-MESSAGE waits till
+a message becomes available, whereas RECEIVE-MESSAGE-NO-HANG is a non-blocking
+variant, and RECEIVE-PENDING-MESSAGES empties the entire mailbox in one go.
+
+Messages can be arbitrary objects"
(queue (missing-arg) :type queue)
(semaphore (missing-arg) :type semaphore)
(name nil))
(defpackage :sb-concurrency
- (:use :cl :sb-thread)
+ (:use :cl :sb-thread :sb-int)
(:export
;; MAILBOX
"LIST-MAILBOX-MESSAGES"
"QUEUE-EMPTY-P"
"QUEUE-NAME"
"QUEUEP"
+
+ ;; GATE
+ "CLOSE-GATE"
+ "GATE"
+ "GATE-NAME"
+ "GATE-OPEN-P"
+ "GATEP"
+ "MAKE-GATE"
+ "OPEN-GATE"
+ "WAIT-ON-GATE"
))
\ No newline at end of file
(defstruct (queue (:constructor %make-queue (head tail name))
(:copier nil)
(:predicate queuep))
- "Lock-free thread safe queue."
+ "Lock-free thread safe FIFO queue.
+
+Use ENQUEUE to add objects to the queue, and DEQUEUE to remove them."
(head (error "No HEAD.") :type node)
(tail (error "No TAIL.") :type node)
(name nil))
(defun list-queue-contents (queue)
"Returns the contents of QUEUE as a list without removing them from the
-QUEUE. Mainly useful for manual examination of queue state."
+QUEUE. Mainly useful for manual examination of queue state, as the list
+may be out of date by the time it is returned."
(let (all)
(labels ((walk (node)
;; Since NEXT pointers are always right, traversing from tail
(defun queue-count (queue)
"Returns the number of objects in QUEUE. Mainly useful for manual
examination of queue state, and in PRINT-OBJECT methods: inefficient as it
-walks the entire queue."
+must walk the entire queue."
(let ((n 0))
(declare (unsigned-byte n))
(labels ((walk (node)
(asdf:defsystem :sb-concurrency
:components ((:file "package")
(:file "queue" :depends-on ("package"))
- (:file "mailbox" :depends-on ("package" "queue"))))
+ (:file "mailbox" :depends-on ("package" "queue"))
+ (:file "gate" :depends-on ("package"))))
(asdf:defsystem :sb-concurrency-tests
:depends-on (:sb-concurrency :sb-rt)
((:file "package")
(:file "test-utils" :depends-on ("package"))
(:file "test-queue" :depends-on ("package" "test-utils"))
- (:file "test-mailbox" :depends-on ("package" "test-utils"))))))
+ (:file "test-mailbox" :depends-on ("package" "test-utils"))
+ (:file "test-gate" :depends-on ("package" "test-utils"))))))
(defmethod asdf:perform :after ((o asdf:load-op)
(c (eql (asdf:find-system :sb-concurrency))))
(@pxref{sb-queue}) which is still provided for backwards-compatibility
but which has since been deprecated.
-@sp 1
-@unnumberedsubsubsec Synopsis:
-
-@code{enqueue} can be used to add objects to a queue, and
-@code{dequeue} retrieves items from a queue in FIFO order.
-
-@sp 1
-@unnumberedsubsubsec Dictionary:
-
@include struct-sb-concurrency-queue.texinfo
@include fun-sb-concurrency-dequeue.texinfo
@code{sb-concurrency:mailbox} is a lock-free message queue where one
or multiple ends can send messages to one or multiple receivers. The
-difference to @ref{Section sb-concurrency:queue} is that the receiving
+difference to @ref{Section sb-concurrency:queue, queues} is that the receiving
end may block until a message arrives.
@*@*
-The implementation is based on the Queue implementation above
-(@pxref{Structure sb-concurrency:queue}.)
-
-@sp 1
-@unnumberedsubsubsec Synopsis:
-@code{send-message} can be used to send a message to a mailbox, and
-@code{receive-message} retrieves a message from a mailbox, or blocks
-until a new message arrives. @code{receive-message-no-hang} is the
-non-blocking variant.
-@*@*
-Messages can be any object.
-
-@sp 1
-@unnumberedsubsubsec Dictionary:
+Built on top of the @ref{Structure sb-concurrency:queue, queue} implementation.
@include struct-sb-concurrency-mailbox.texinfo
@include fun-sb-concurrency-receive-message-no-hang.texinfo
@include fun-sb-concurrency-receive-pending-messages.texinfo
@include fun-sb-concurrency-send-message.texinfo
+
+@page
+@anchor{Section sb-concurrency:gate}
+@subsection Gates
+@cindex Gate
+
+@code{sb-concurrency:gate} is a synchronization object suitable for when
+multiple threads must wait for a single event before proceeding.
+
+@include struct-sb-concurrency-gate.texinfo
+
+@include fun-sb-concurrency-close-gate.texinfo
+@include fun-sb-concurrency-gate-name.texinfo
+@include fun-sb-concurrency-gate-open-p.texinfo
+@include fun-sb-concurrency-gatep.texinfo
+@include fun-sb-concurrency-make-gate.texinfo
+@include fun-sb-concurrency-open-gate.texinfo
+@include fun-sb-concurrency-wait-on-gate.texinfo
--- /dev/null
+;;;; -*- Lisp -*-
+;;;;
+;;;; This software is part of the SBCL system. See the README file for
+;;;; more information.
+;;;;
+;;;; This software is derived from the CMU CL system, which was
+;;;; written at Carnegie Mellon University and released into the
+;;;; public domain. The software is in the public domain and is
+;;;; provided with absolutely no warranty. See the COPYING and CREDITS
+;;;; files for more information.
+
+(in-package :sb-concurrency-test)
+
+;;; Create threads waiting until a gate is opened, then open that
+;;; gate and assure that all waiters were waked up. Also make sure
+;;; that interrupting a thread waiting on a gate doesn't make it
+;;; cross the gate if it is closed.
+(deftest gate.1
+ (let* ((gate (make-gate))
+ (marks (make-array 100 :initial-element nil))
+ (threads (loop for i from 0 below (length marks)
+ collect (make-thread (lambda (n)
+ (wait-on-gate gate)
+ (setf (aref marks n) (cons n (aref marks n))))
+ :arguments i)))
+ (int-gate (make-gate)))
+ (sleep 1)
+ (interrupt-thread (car threads) (lambda ()
+ (unwind-protect
+ (when (gate-open-p gate)
+ (sb-ext:quit))
+ (open-gate int-gate))))
+ (wait-on-gate int-gate)
+ (assert (every #'null marks))
+ (open-gate gate)
+ (mapc #'join-thread threads)
+ (dotimes (i (length marks))
+ (assert (equal (list i) (aref marks i))))
+ t)
+ t)
+
+;;; Assure that CLOSE-GATE can close a gate while other threads are operating
+;;; through that gate. In particular, assure that no operation is performed
+;;; once the gate is closed.
+(deftest gate.2
+ (let* ((gate (make-gate))
+ (cont (make-gate))
+ (marks (make-array 100 :initial-element nil))
+ (threads (loop for i from 0 below (length marks)
+ collect (make-thread (lambda (n)
+ (wait-on-gate gate)
+ (when (oddp n)
+ (sleep 1.0))
+ (wait-on-gate gate)
+ (setf (aref marks n) (cons n (aref marks n))))
+ :arguments i))))
+ (open-gate gate)
+ (sleep 0.5)
+ (close-gate gate)
+ (let (odds evens)
+ (loop while threads
+ do (push (pop threads) evens)
+ (push (pop threads) odds))
+ (mapc #'join-thread evens)
+ (loop for i from 0 below (length marks)
+ do (if (oddp i)
+ (assert (not (aref marks i)))
+ (assert (equal (list i) (aref marks i)))))
+ (open-gate gate)
+ (mapc #'join-thread odds)
+ (loop for i from 0 below (length marks)
+ do (when (oddp i)
+ (assert (equal (list i) (aref marks i)))))
+ t))
+ t)
+
+;;; Assures that WAIT-ON-GATE can be interrupted by deadlines.
+(deftest gate-deadline.1
+ (let* ((gate (make-gate))
+ (waiter (make-thread (lambda ()
+ (block nil
+ (handler-bind ((sb-sys:deadline-timeout
+ #'(lambda (c)
+ (return :deadline))))
+ (sb-sys:with-deadline (:seconds 0.1)
+ (wait-on-gate gate))))))))
+ (join-thread waiter))
+ :deadline)
+
+;;; Assure that WAIT-ON-GATE can be interrupted by deadlines, and resumed from
+;;; the deadline handler.
+(deftest gate-deadline.1
+ (let* ((gate (make-gate))
+ (ready (make-gate))
+ (cancel nil)
+ (waiter (make-thread (lambda ()
+ (block nil
+ (handler-bind ((sb-sys:deadline-timeout
+ #'(lambda (c)
+ (setf cancel t)
+ (sb-sys:cancel-deadline c))))
+ (sb-sys:with-deadline (:seconds 0.1)
+ (open-gate ready)
+ (wait-on-gate gate))))))))
+ (wait-on-gate ready)
+ (sleep 1.0)
+ (open-gate gate)
+ (values (join-thread waiter) cancel))
+ t t)
+
+(deftest gate-timeout.1
+ (let* ((gate (make-gate))
+ (waiter (make-thread (lambda ()
+ (wait-on-gate gate :timeout 0.1)))))
+ (join-thread waiter))
+ nil)
+
+(deftest gate-timeout.2
+ (let* ((gate (make-gate))
+ (waiter (make-thread (lambda ()
+ (open-gate gate)
+ (wait-on-gate gate :timeout 0.1)))))
+ (join-thread waiter))
+ t)
+;;;; -*- Lisp -*-
+;;;;
+;;;; This software is part of the SBCL system. See the README file for
+;;;; more information.
+;;;;
+;;;; This software is derived from the CMU CL system, which was
+;;;; written at Carnegie Mellon University and released into the
+;;;; public domain. The software is in the public domain and is
+;;;; provided with absolutely no warranty. See the COPYING and CREDITS
+;;;; files for more information.
+
(in-package :sb-concurrency-test)
#+sb-thread
(ignore-errors
(terminate-thread thread))))
-) ;; #+sb-thread (progn ...
\ No newline at end of file
+) ;; #+sb-thread (progn ...