(defstruct counter
(ref 0 :type sb-vm:word))
+(defun receiver-distribution (n-receivers)
+ (let* ((aux (floor n-receivers 2))
+ (n-recv-msg (- n-receivers aux))
+ (n-recv-pend-msgs (floor aux 3))
+ (n-recv-msg-n-h (- aux n-recv-pend-msgs)))
+ (values n-recv-msg
+ n-recv-msg-n-h
+ n-recv-pend-msgs)))
+
(defun test-mailbox-producers-consumers
- (&key n-senders n-receivers n-messages mailbox interruptor)
- (let* ((cnt (make-counter))
- (mbox (or mailbox (make-mailbox)))
- (senders
- (make-threads n-senders "SENDER"
- #'(lambda ()
- (dotimes (i n-messages)
- (send-message mbox i)
- (sleep (random 0.001))))))
- (receivers
- ;; We have three groups of receivers, one using
- ;; RECEIVE-MESSAGE, one RECEIVE-MESSAGE-NO-HANG, and another
- ;; one RECEIVE-PENDING-MESSAGES.
- (let* ((aux (floor n-receivers 2))
- (n-recv-msg (- n-receivers aux))
- (n-recv-pend-msgs (floor aux 3))
- (n-recv-msg-n-h (- aux n-recv-pend-msgs)))
- (append
- (make-threads n-recv-msg "RECV-MSG"
- #'(lambda ()
- (sleep (random 0.001))
- (handler-case
- (loop
- (sb-sys:with-deadline (:seconds 1.0)
- (let ((msg (receive-message mbox)))
- (sb-ext:atomic-incf (counter-ref cnt))
- (unless (< -1 msg n-messages)
- (hang)))))
- (sb-ext:timeout ()))))
- (make-threads n-recv-pend-msgs "RECV-PEND-MSGS"
- #'(lambda ()
- (sleep (random 0.001))
- (dotimes (i 10)
- (thread-yield)
- (let ((msgs (receive-pending-messages mbox (random 5))))
- (mapc #'(lambda (msg)
- (sb-ext:atomic-incf (counter-ref cnt))
- (unless (< -1 msg n-messages)
- (hang)))
- msgs)))))
- (make-threads n-recv-msg-n-h "RECV-MSG-NO-HANG"
- #'(lambda ()
- (sleep (random 0.001))
- (dotimes (i 30)
- (thread-yield)
- (multiple-value-bind (msg ok)
- (receive-message-no-hang mbox)
- (when ok
- (sb-ext:atomic-incf (counter-ref cnt))
- (unless (< -1 msg n-messages)
- (hang))))))))))
- (threads (append receivers senders)))
- (when interruptor (funcall interruptor threads))
- (mapc #'timed-join-thread threads)
- (values mbox (counter-ref cnt) (* n-senders n-messages))))
+ (&key n-senders n-receivers n-messages interruptor)
+ (let ((mbox (make-mailbox))
+ (counter (make-counter))
+ (+sleep+ 0.0001)
+ (+fin-token+ :finish) ; end token for receivers to stop
+ (+blksize+ 5)) ; "block size" for RECEIVE-PENDING-MESSAGES
+ (multiple-value-bind (n-recv-msg
+ n-recv-msg-n-h
+ n-recv-pend-msgs)
+ ;; We have three groups of receivers, one using
+ ;; RECEIVE-MESSAGE, one RECEIVE-MESSAGE-NO-HANG, and
+ ;; another one RECEIVE-PENDING-MESSAGES.
+ (receiver-distribution n-receivers)
+ (let ((senders
+ (make-threads n-senders "SENDER"
+ #'(lambda ()
+ (dotimes (i n-messages t)
+ (send-message mbox i)
+ (sleep (random +sleep+))))))
+ (receivers
+ (flet ((process-msg (msg out)
+ (cond
+ ((eq msg +fin-token+)
+ (funcall out t))
+ ((not (< -1 msg n-messages))
+ (funcall out nil))
+ (t
+ (sb-ext:atomic-incf (counter-ref counter))))))
+ (append
+ (make-threads n-recv-msg "RECV-MSG"
+ #'(lambda ()
+ (sleep (random +sleep+))
+ (loop (process-msg (receive-message mbox)
+ #'(lambda (x) (return x))))))
+ (make-threads n-recv-pend-msgs "RECV-PEND-MSGS"
+ #'(lambda ()
+ (loop
+ (sleep (random +sleep+))
+ (mapc #'(lambda (msg)
+ (process-msg msg #'(lambda (x) (return x))))
+ (receive-pending-messages mbox +blksize+)))))
+ (make-threads n-recv-msg-n-h "RECV-MSG-NO-HANG"
+ #'(lambda ()
+ (loop
+ (sleep (random +sleep+))
+ (multiple-value-bind (msg ok)
+ (receive-message-no-hang mbox)
+ (when ok
+ (process-msg msg #'(lambda (x)
+ (return x))))))))))))
+
+ (when interruptor
+ (funcall interruptor (append receivers senders)))
+ (let ((garbage 0)
+ (errors 0)
+ (timeouts 0))
+ (flet ((wait-for (threads)
+ (mapc #'(lambda (thread)
+ (ecase (timed-join-thread thread)
+ ((t))
+ ((nil) (incf garbage))
+ ((:aborted) (incf errors))
+ ((:timeout) (incf timeouts)
+ (kill-thread thread))))
+ threads)))
+ ;; First wait until all messages are propagating.
+ (wait-for senders)
+ ;; Senders are finished, inform and wait for the
+ ;; receivers.
+ (loop repeat (+ n-recv-msg
+ n-recv-msg-n-h
+ (* n-recv-pend-msgs +blksize+))
+ ;; The number computed above is an upper bound; if
+ ;; we send as many FINs as that, we can be sure that
+ ;; every receiver must have got at least one FIN.
+ do (send-message mbox +fin-token+))
+ (wait-for receivers)
+ ;; We may in fact have sent too many FINs, so make sure
+ ;; it's only FINs in the mailbox now.
+ (mapc #'(lambda (msg) (unless (eq msg +fin-token+)
+ (incf garbage)))
+ (list-mailbox-messages mbox))
+ (values `(:received . ,(counter-ref counter))
+ `(:garbage . ,garbage)
+ `(:errors . ,errors)
+ `(:timeouts . ,timeouts))))))))
+
(deftest mailbox.single-producer-single-consumer
- (multiple-value-bind (mbox received total)
- (test-mailbox-producers-consumers :n-senders 1
- :n-receivers 1
- :n-messages 10000)
- (values
- (= received total)
- (mailbox-count mbox)
- (list-mailbox-messages mbox)))
- t
- 0
- nil)
+ (test-mailbox-producers-consumers :n-senders 1
+ :n-receivers 1
+ :n-messages 10000)
+ (:received . 10000)
+ (:garbage . 0)
+ (:errors . 0)
+ (:timeouts . 0))
(deftest mailbox.single-producer-multiple-consumers
- (multiple-value-bind (mbox received total)
- (test-mailbox-producers-consumers :n-senders 1
- :n-receivers 100
- :n-messages 10000)
- (values
- (= received total)
- (mailbox-count mbox)
- (list-mailbox-messages mbox)))
- t
- 0
- nil)
+ (test-mailbox-producers-consumers :n-senders 1
+ :n-receivers 100
+ :n-messages 10000)
+ (:received . 10000)
+ (:garbage . 0)
+ (:errors . 0)
+ (:timeouts . 0))
(deftest mailbox.multiple-producers-single-consumer
- (multiple-value-bind (mbox received total)
- (test-mailbox-producers-consumers :n-senders 100
- :n-receivers 10
- :n-messages 1000)
- (values
- (= received total)
- (mailbox-count mbox)
- (list-mailbox-messages mbox)))
- t
- 0
- nil)
+ (test-mailbox-producers-consumers :n-senders 100
+ :n-receivers 1
+ :n-messages 100)
+ (:received . 10000)
+ (:garbage . 0)
+ (:errors . 0)
+ (:timeouts . 0))
(deftest mailbox.multiple-producers-multiple-consumers
- (multiple-value-bind (mbox received total)
- (test-mailbox-producers-consumers :n-senders 100
- :n-receivers 100
- :n-messages 1000)
- (values
- (= received total)
- (mailbox-count mbox)
- (list-mailbox-messages mbox)))
- t
- 0
- nil)
+ (test-mailbox-producers-consumers :n-senders 100
+ :n-receivers 100
+ :n-messages 10000)
+ (:received . 1000000)
+ (:garbage . 0)
+ (:errors . 0)
+ (:timeouts . 0))
(deftest mailbox.interrupts-safety.1
- (multiple-value-bind (mbox received total)
+ (multiple-value-bind (received garbage errors timeouts)
(test-mailbox-producers-consumers
:n-senders 100
:n-receivers 100
:n-messages 1000
- :interruptor #'(lambda (threads)
- (let ((n (length threads)))
- ;; 99 so even in the unlikely case that only
- ;; receivers (or only senders) are shot
- ;; dead, there's still one that survives to
- ;; properly end the test.
- (loop repeat 99 do
- (kill-thread (nth (random n) threads))))))
+ :interruptor #'(lambda (threads &aux (n (length threads)))
+ ;; 99 so even in the unlikely case that only
+ ;; receivers (or only senders) are shot
+ ;; dead, there's still one that survives to
+ ;; properly end the test.
+ (loop repeat 99
+ for victim = (nth (random n) threads)
+ do (kill-thread victim)
+ (sleep (random 0.0001)))))
(values
;; We may have killed a receiver before it got to incrementing
;; the counter.
- (<= received total)
- (mailbox-count mbox)
- (list-mailbox-messages mbox)))
- t
- 0
- nil)
+ (if (<= (cdr received) 1000000)
+ `(:received . :ok)
+ received)
+ garbage
+ ;; we may have gotten errors due to our killing spree.
+ timeouts))
+ (:received . :ok)
+ (:garbage . 0)
+ (:timeouts . 0))
) ; #+sb-thread (progn ...
\ No newline at end of file