1.0.41.6: threads: Insert barriers as appear to be required.
[sbcl.git] / contrib / sb-concurrency / queue.lisp
1 ;;;; Lock-free FIFO queues, from "An Optimistic Approach to Lock-Free FIFO
2 ;;;; Queues" by Edya Ladan-Mozes and Nir Shavit.
3 ;;;;
4 ;;;; Written by Nikodemus Siivola for SBCL.
5 ;;;;
6 ;;;; This software is part of the SBCL system. See the README file for
7 ;;;; more information.
8 ;;;;
9 ;;;; This software is derived from the CMU CL system, which was written at
10 ;;;; Carnegie Mellon University and released into the public domain. The
11 ;;;; software is in the public domain and is provided with absolutely no
12 ;;;; warranty. See the COPYING and CREDITS files for more information.
13
14 (in-package :sb-concurrency)
15
16 (defconstant +dummy+ '.dummy.)
17
18 (declaim (inline make-node))
19 (defstruct node
20   value
21   (prev nil :type (or null node))
22   (next nil :type (or null node)))
23
24 (declaim (inline %make-queue))
25 (defstruct (queue (:constructor %make-queue (head tail name))
26                   (:copier nil)
27                   (:predicate queuep))
28   "Lock-free thread safe queue."
29   (head (error "No HEAD.") :type node)
30   (tail (error "No TAIL.") :type node)
31   (name nil))
32
33 (setf (documentation 'queuep 'function)
34       "Returns true if argument is a QUEUE, NIL otherwise."
35       (documentation 'queue-name 'function)
36       "Name of a QUEUE. Can be assingned to using SETF. Queue names
37 can be arbitrary printable objects, and need not be unique.")
38
39 (defun make-queue (&key name initial-contents)
40   "Returns a new QUEUE with NAME and contents of the INITIAL-CONTENTS
41 sequence enqueued."
42   (let* ((dummy (make-node :value +dummy+))
43          (queue (%make-queue dummy dummy name)))
44     (flet ((enc-1 (x)
45              (enqueue x queue)))
46       (declare (dynamic-extent #'enc-1))
47       (map nil #'enc-1 initial-contents))
48     queue))
49
50 (defun enqueue (value queue)
51   "Adds VALUE to the end of QUEUE. Returns VALUE."
52   (let ((node (make-node :value value)))
53     (loop for tail = (queue-tail queue)
54           do (setf (node-next node) tail)
55              (when (eq tail (sb-ext:compare-and-swap (queue-tail queue) tail node))
56                (setf (node-prev tail) node)
57                (return value)))))
58
59 (defun dequeue (queue)
60   "Retrieves the oldest value in QUEUE and returns it as the primary value,
61 and T as secondary value. If the queue is empty, returns NIL as both primary
62 and secondary value."
63   (tagbody
64    :continue
65      (let* ((head (queue-head queue))
66             (tail (queue-tail queue))
67             (first-node-prev (node-prev head))
68             (val (node-value head)))
69        (barrier (:read))
70        (when (eq head (queue-head queue))
71          (cond ((not (eq val +dummy+))
72                 (if (eq tail head)
73                     (let ((dummy (make-node :value +dummy+ :next tail)))
74                       (when (eq tail (sb-ext:compare-and-swap (queue-tail queue)
75                                                               tail dummy))
76                         (setf (node-prev head) dummy))
77                       (go :continue))
78                     (when (null first-node-prev)
79                       (fixList queue tail head)
80                       (go :continue)))
81                 (when (eq head (sb-ext:compare-and-swap (queue-head queue)
82                                                         head first-node-prev))
83                   ;; This assignment is not present in the paper, but is
84                   ;; equivalent to the free(head.ptr) call there: it unlinks
85                   ;; the HEAD from the queue -- the code in the paper leaves
86                   ;; the dangling pointer in place.
87                   (setf (node-next first-node-prev) nil)
88                   (return-from dequeue (values val t))))
89                ((eq tail head)
90                 (return-from dequeue (values nil nil)))
91                ((null first-node-prev)
92                 (fixList queue tail head)
93                 (go :continue))
94                (t
95                 (sb-ext:compare-and-swap (queue-head queue)
96                                          head first-node-prev)))))
97      (go :continue)))
98
99 (defun fixlist (queue tail head)
100   (let ((current tail))
101     (loop while (and (eq head (queue-head queue)) (not (eq current head)))
102           do (let ((next (node-next current)))
103                (when (not next)
104                  (return-from fixlist nil))
105                (let ((nextNodePrev (node-prev next)))
106                  (when (not (eq nextNodePrev current))
107                    (setf (node-prev next) current))
108                  (setf current next))))))
109
110 (defun list-queue-contents (queue)
111   "Returns the contents of QUEUE as a list without removing them from the
112 QUEUE. Mainly useful for manual examination of queue state."
113   (let (all)
114     (labels ((walk (node)
115                ;; Since NEXT pointers are always right, traversing from tail
116                ;; to head is safe.
117                (let ((value (node-value node))
118                      (next (node-next node)))
119                  (unless (eq +dummy+ value)
120                    (push value all))
121                  (when next
122                    (walk next)))))
123       (walk (queue-tail queue)))
124     all))
125
126 (defun queue-count (queue)
127   "Returns the number of objects in QUEUE. Mainly useful for manual
128 examination of queue state, and in PRINT-OBJECT methods: inefficient as it
129 walks the entire queue."
130   (let ((n 0))
131     (declare (unsigned-byte n))
132     (labels ((walk (node)
133                (let ((value (node-value node))
134                      (next (node-next node)))
135                  (unless (eq +dummy+ value)
136                    (incf n))
137                  (when next
138                    (walk next)))))
139       (walk (queue-tail queue))
140       n)))
141
142 (defun queue-empty-p (queue)
143   "Returns T if QUEUE is empty, NIL otherwise."
144   (let* ((head (queue-head queue))
145          (tail (queue-tail queue))
146          (val (node-value head)))
147     (and (eq head tail) (eq val +dummy+))))