-;;;; Lock-free FIFO queues, from "An Optimistic Approach to Lock-Free FIFO
-;;;; Queues" by Edya Ladan-Mozes and Nir Shavit.
-;;;;
-;;;; Written by Nikodemus Siivola for SBCL.
+;;;; Written by James M. Lawrence for SBCL.
+;;;; API and docstrings by Nikodemus Siivola.
;;;;
;;;; This software is part of the SBCL system. See the README file for
;;;; more information.
;;;; software is in the public domain and is provided with absolutely no
;;;; warranty. See the COPYING and CREDITS files for more information.
+;;; Singly-linked queue with compare-and-swap operations.
+;;;
+;;; The following invariants hold except during updates:
+;;;
+;;; (car (queue-head queue)) == +dummy+
+;;;
+;;; (cdr (queue-tail queue)) == nil
+;;;
+;;; If the queue is empty, (queue-head queue) == (queue-tail queue).
+;;;
+;;; If the queue is non-empty, (cadr (queue-head queue)) is the next
+;;; value to be dequeued and (car (queue-tail queue)) is the most
+;;; recently enqueued value.
+;;;
+;;; The CDR of a discarded node is set to +DEAD-END+. This flag must
+;;; be checked at each traversal.
+
(in-package :sb-concurrency)
(defconstant +dummy+ '.dummy.)
-
-(declaim (inline make-node))
-(defstruct node
- value
- (prev nil :type (or null node))
- (next nil :type (or null node)))
+(defconstant +dead-end+ '.dead-end.)
(declaim (inline %make-queue))
(defstruct (queue (:constructor %make-queue (head tail name))
"Lock-free thread safe FIFO queue.
Use ENQUEUE to add objects to the queue, and DEQUEUE to remove them."
- (head (error "No HEAD.") :type node)
- (tail (error "No TAIL.") :type node)
+ (head (error "No HEAD.") :type cons)
+ (tail (error "No TAIL.") :type cons)
(name nil))
(setf (documentation 'queuep 'function)
(defun make-queue (&key name initial-contents)
"Returns a new QUEUE with NAME and contents of the INITIAL-CONTENTS
sequence enqueued."
- (let* ((dummy (make-node :value +dummy+))
+ (let* ((dummy (cons +dummy+ nil))
(queue (%make-queue dummy dummy name)))
(flet ((enc-1 (x)
(enqueue x queue)))
(defun enqueue (value queue)
"Adds VALUE to the end of QUEUE. Returns VALUE."
- (let ((node (make-node :value value)))
- (loop for tail = (queue-tail queue)
- do (setf (node-next node) tail)
- (when (eq tail (sb-ext:compare-and-swap (queue-tail queue) tail node))
- (setf (node-prev tail) node)
- (return value)))))
+ ;; Attempt CAS, repeat upon failure. Upon success update QUEUE-TAIL.
+ (declare (optimize speed))
+ (let ((new (cons value nil)))
+ (loop (when (eq nil (sb-ext:compare-and-swap (cdr (queue-tail queue))
+ nil new))
+ (setf (queue-tail queue) new)
+ (return value)))))
(defun dequeue (queue)
"Retrieves the oldest value in QUEUE and returns it as the primary value,
and T as secondary value. If the queue is empty, returns NIL as both primary
and secondary value."
- (tagbody
- :continue
- (let* ((head (queue-head queue))
- (tail (queue-tail queue))
- (first-node-prev (node-prev head))
- (val (node-value head)))
- (barrier (:read))
- (when (eq head (queue-head queue))
- (cond ((not (eq val +dummy+))
- (if (eq tail head)
- (let ((dummy (make-node :value +dummy+ :next tail)))
- (when (eq tail (sb-ext:compare-and-swap (queue-tail queue)
- tail dummy))
- (setf (node-prev head) dummy))
- (go :continue))
- (when (null first-node-prev)
- (fixList queue tail head)
- (go :continue)))
- (when (eq head (sb-ext:compare-and-swap (queue-head queue)
- head first-node-prev))
- ;; These assignment is not present in the paper, but are
- ;; equivalent to the free(head.ptr) call there.
- ;;
- ;; First we unlink the HEAD from the queue -- the code in
- ;; the paper leaves the dangling pointer in place.
- ;;
- ;; Then we NIL out the slots in HEAD to help the GC,
- ;; otherwise conservativism might lead to massive chains of
- ;; nodes being retained.
- (setf (node-next first-node-prev) nil
- (node-prev head) nil
- (node-next head) nil
- (node-value head) nil)
- (return-from dequeue (values val t))))
- ((eq tail head)
- (return-from dequeue (values nil nil)))
- ((null first-node-prev)
- (fixList queue tail head)
- (go :continue))
- (t
- (sb-ext:compare-and-swap (queue-head queue)
- head first-node-prev)))))
- (go :continue)))
+ ;; Attempt to CAS QUEUE-HEAD with the next node, repeat upon
+ ;; failure. Upon success, clear the discarded node and set the CAR
+ ;; of QUEUE-HEAD to +DUMMY+.
+ (declare (optimize speed))
+ (loop (let* ((head (queue-head queue))
+ (next (cdr head)))
+ ;; NEXT could be +DEAD-END+, whereupon we try again.
+ (typecase next
+ (null (return (values nil nil)))
+ (cons (when (eq head (sb-ext:compare-and-swap (queue-head queue)
+ head next))
+ (let ((value (car next)))
+ ;; Clear the CDR, otherwise the conservative GC could
+ ;; hoard long lists. (car head) is always +dummy+.
+ (setf (cdr head) +dead-end+
+ (car next) +dummy+)
+ (return (values value t)))))))))
-(defun fixlist (queue tail head)
- (let ((current tail))
- (loop while (and (eq head (queue-head queue)) (not (eq current head)))
- do (let ((next (node-next current)))
- (when (not next)
- (return-from fixlist nil))
- (let ((nextNodePrev (node-prev next)))
- (when (not (eq nextNodePrev current))
- (setf (node-prev next) current))
- (setf current next))))))
+(defun try-walk-queue (fun queue)
+ (let ((node (queue-head queue)))
+ (loop
+ (let ((value (car node)))
+ (unless (eq value +dummy+)
+ (funcall fun value)))
+ (setf node (cdr node))
+ (cond ((eq node +dead-end+)
+ (return nil))
+ ((null node)
+ (return t))))))
(defun list-queue-contents (queue)
"Returns the contents of QUEUE as a list without removing them from the
QUEUE. Mainly useful for manual examination of queue state, as the list
may be out of date by the time it is returned."
- (let (all)
- (labels ((walk (node)
- ;; Since NEXT pointers are always right, traversing from tail
- ;; to head is safe.
- (let ((value (node-value node))
- (next (node-next node)))
- (unless (eq +dummy+ value)
- (push value all))
- (when next
- (walk next)))))
- (walk (queue-tail queue)))
- all))
+ (tagbody
+ :retry
+ (collect ((result))
+ (unless (try-walk-queue (lambda (elem) (result elem)) queue)
+ (go :retry))
+ (return-from list-queue-contents (result)))))
(defun queue-count (queue)
"Returns the number of objects in QUEUE. Mainly useful for manual
examination of queue state, and in PRINT-OBJECT methods: inefficient as it
must walk the entire queue."
- (let ((n 0))
- (declare (unsigned-byte n))
- (labels ((walk (node)
- (let ((value (node-value node))
- (next (node-next node)))
- (unless (eq +dummy+ value)
- (incf n))
- (when next
- (walk next)))))
- (walk (queue-tail queue))
- n)))
+ (tagbody
+ :retry
+ (let ((count 0))
+ (unless (try-walk-queue (lambda (elem)
+ (declare (ignore elem))
+ (incf count))
+ queue)
+ (go :retry))
+ (return-from queue-count count))))
(defun queue-empty-p (queue)
"Returns T if QUEUE is empty, NIL otherwise."
- (let* ((head (queue-head queue))
- (tail (queue-tail queue))
- (val (node-value head)))
- (and (eq head tail) (eq val +dummy+))))
+ (null (cdr (queue-head queue))))