bf0bc9887714427c2097eee48bab0a9e3a103831
[sbcl.git] / contrib / sb-concurrency / queue.lisp
1 ;;;; Lock-free FIFO queues, from "An Optimistic Approach to Lock-Free FIFO
2 ;;;; Queues" by Edya Ladan-Mozes and Nir Shavit.
3 ;;;;
4 ;;;; Written by Nikodemus Siivola for SBCL.
5 ;;;;
6 ;;;; This software is part of the SBCL system. See the README file for
7 ;;;; more information.
8 ;;;;
9 ;;;; This software is derived from the CMU CL system, which was written at
10 ;;;; Carnegie Mellon University and released into the public domain. The
11 ;;;; software is in the public domain and is provided with absolutely no
12 ;;;; warranty. See the COPYING and CREDITS files for more information.
13
14 (in-package :sb-concurrency)
15
16 (defconstant +dummy+ '.dummy.)
17
18 (declaim (inline make-node))
19 (defstruct node
20   value
21   (prev nil :type (or null node))
22   (next nil :type (or null node)))
23
24 (declaim (inline %make-queue))
25 (defstruct (queue (:constructor %make-queue (head tail name))
26                   (:copier nil)
27                   (:predicate queuep))
28   "Lock-free thread safe FIFO queue.
29
30 Use ENQUEUE to add objects to the queue, and DEQUEUE to remove them."
31   (head (error "No HEAD.") :type node)
32   (tail (error "No TAIL.") :type node)
33   (name nil))
34
35 (setf (documentation 'queuep 'function)
36       "Returns true if argument is a QUEUE, NIL otherwise."
37       (documentation 'queue-name 'function)
38       "Name of a QUEUE. Can be assingned to using SETF. Queue names
39 can be arbitrary printable objects, and need not be unique.")
40
41 (defun make-queue (&key name initial-contents)
42   "Returns a new QUEUE with NAME and contents of the INITIAL-CONTENTS
43 sequence enqueued."
44   (let* ((dummy (make-node :value +dummy+))
45          (queue (%make-queue dummy dummy name)))
46     (flet ((enc-1 (x)
47              (enqueue x queue)))
48       (declare (dynamic-extent #'enc-1))
49       (map nil #'enc-1 initial-contents))
50     queue))
51
52 (defun enqueue (value queue)
53   "Adds VALUE to the end of QUEUE. Returns VALUE."
54   (let ((node (make-node :value value)))
55     (loop for tail = (queue-tail queue)
56           do (setf (node-next node) tail)
57              (when (eq tail (sb-ext:compare-and-swap (queue-tail queue) tail node))
58                (setf (node-prev tail) node)
59                (return value)))))
60
61 (defun dequeue (queue)
62   "Retrieves the oldest value in QUEUE and returns it as the primary value,
63 and T as secondary value. If the queue is empty, returns NIL as both primary
64 and secondary value."
65   (tagbody
66    :continue
67      (let* ((head (queue-head queue))
68             (tail (queue-tail queue))
69             (first-node-prev (node-prev head))
70             (val (node-value head)))
71        (barrier (:read))
72        (when (eq head (queue-head queue))
73          (cond ((not (eq val +dummy+))
74                 (if (eq tail head)
75                     (let ((dummy (make-node :value +dummy+ :next tail)))
76                       (when (eq tail (sb-ext:compare-and-swap (queue-tail queue)
77                                                               tail dummy))
78                         (setf (node-prev head) dummy))
79                       (go :continue))
80                     (when (null first-node-prev)
81                       (fixList queue tail head)
82                       (go :continue)))
83                 (when (eq head (sb-ext:compare-and-swap (queue-head queue)
84                                                         head first-node-prev))
85                   ;; These assignment is not present in the paper, but are
86                   ;; equivalent to the free(head.ptr) call there.
87                   ;;
88                   ;; First we unlink the HEAD from the queue -- the code in
89                   ;; the paper leaves the dangling pointer in place.
90                   ;;
91                   ;; Then we NIL out the slots in HEAD to help the GC,
92                   ;; otherwise conservativism might lead to massive chains of
93                   ;; nodes being retained.
94                   (setf (node-next first-node-prev) nil
95                         (node-prev head) nil
96                         (node-next head) nil
97                         (node-value head) nil)
98                   (return-from dequeue (values val t))))
99                ((eq tail head)
100                 (return-from dequeue (values nil nil)))
101                ((null first-node-prev)
102                 (fixList queue tail head)
103                 (go :continue))
104                (t
105                 (sb-ext:compare-and-swap (queue-head queue)
106                                          head first-node-prev)))))
107      (go :continue)))
108
109 (defun fixlist (queue tail head)
110   (let ((current tail))
111     (loop while (and (eq head (queue-head queue)) (not (eq current head)))
112           do (let ((next (node-next current)))
113                (when (not next)
114                  (return-from fixlist nil))
115                (let ((nextNodePrev (node-prev next)))
116                  (when (not (eq nextNodePrev current))
117                    (setf (node-prev next) current))
118                  (setf current next))))))
119
120 (defun list-queue-contents (queue)
121   "Returns the contents of QUEUE as a list without removing them from the
122 QUEUE. Mainly useful for manual examination of queue state, as the list
123 may be out of date by the time it is returned."
124   (let (all)
125     (labels ((walk (node)
126                ;; Since NEXT pointers are always right, traversing from tail
127                ;; to head is safe.
128                (let ((value (node-value node))
129                      (next (node-next node)))
130                  (unless (eq +dummy+ value)
131                    (push value all))
132                  (when next
133                    (walk next)))))
134       (walk (queue-tail queue)))
135     all))
136
137 (defun queue-count (queue)
138   "Returns the number of objects in QUEUE. Mainly useful for manual
139 examination of queue state, and in PRINT-OBJECT methods: inefficient as it
140 must walk the entire queue."
141   (let ((n 0))
142     (declare (unsigned-byte n))
143     (labels ((walk (node)
144                (let ((value (node-value node))
145                      (next (node-next node)))
146                  (unless (eq +dummy+ value)
147                    (incf n))
148                  (when next
149                    (walk next)))))
150       (walk (queue-tail queue))
151       n)))
152
153 (defun queue-empty-p (queue)
154   "Returns T if QUEUE is empty, NIL otherwise."
155   (let* ((head (queue-head queue))
156          (tail (queue-tail queue))
157          (val (node-value head)))
158     (and (eq head tail) (eq val +dummy+))))