From e20bf696a0722f349744152e3cac2600d3073d03 Mon Sep 17 00:00:00 2001 From: Gabor Melis Date: Wed, 31 Aug 2005 15:14:26 +0000 Subject: [PATCH] 0.9.4.20: * added trivial implementation of semaphores, not exported for the time being --- src/code/target-thread.lisp | 62 +++++++++++++++++++++++++++++++++---------- tests/threads.impure.lisp | 60 +++++++++++++++++++++++++++++++++++++++++ version.lisp-expr | 2 +- 3 files changed, 109 insertions(+), 15 deletions(-) diff --git a/src/code/target-thread.lisp b/src/code/target-thread.lisp index 63e282d..19e293a 100644 --- a/src/code/target-thread.lisp +++ b/src/code/target-thread.lisp @@ -276,12 +276,14 @@ time we reacquire MUTEX and return to the caller." ;; better than nothing. (get-mutex mutex value)))) -(defun condition-notify (queue) +(defun condition-notify (queue &optional (n 1)) #!+sb-doc - "Notify one of the threads waiting on QUEUE." - #!-sb-thread (declare (ignore queue)) + "Notify N threads waiting on QUEUE." + #!-sb-thread (declare (ignore queue n)) #!-sb-thread (error "Not supported in unithread builds.") #!+sb-thread + (declare (type (and fixnum (integer 1)) n)) + #!+sb-thread (let ((me *current-thread*)) ;; no problem if >1 thread notifies during the comment in ;; condition-wait: as long as the value in queue-data isn't the @@ -289,17 +291,50 @@ time we reacquire MUTEX and return to the caller." ;; XXX we should do something to ensure that the result of this setf ;; is visible to all CPUs (setf (waitqueue-data queue) me) - (futex-wake (waitqueue-data-address queue) 1))) + (futex-wake (waitqueue-data-address queue) n))) (defun condition-broadcast (queue) #!+sb-doc "Notify all threads waiting on QUEUE." - #!-sb-thread (declare (ignore queue)) - #!-sb-thread (error "Not supported in unithread builds.") - #!+sb-thread - (let ((me *current-thread*)) - (setf (waitqueue-data queue) me) - (futex-wake (waitqueue-data-address queue) (ash 1 30)))) + (condition-notify queue most-positive-fixnum)) + +;;;; semaphores + +(defstruct (semaphore (:constructor %make-semaphore)) + #!+sb-doc + "Semaphore type." + (name nil :type (or null simple-string)) + (count 0 :type (integer 0)) + (mutex (make-mutex)) + (queue (make-waitqueue))) + +(defun make-semaphore (&key name (count 0)) + #!+sb-doc + "Create a semaphore with the supplied COUNT." + (%make-semaphore :name name :count count)) + +(setf (sb!kernel:fdocumentation 'semaphore-name 'function) + "The name of the semaphore. Setfable.") + +(defun wait-on-semaphore (sem) + #!+sb-doc + "Decrement the count of SEM if the count would not be negative. Else +block until the semaphore can be decremented." + ;; a more direct implementation based directly on futexes should be + ;; possible + (with-mutex ((semaphore-mutex sem)) + (loop until (> (semaphore-count sem) 0) + do (condition-wait (semaphore-queue sem) (semaphore-mutex sem)) + finally (decf (semaphore-count sem))))) + +(defun signal-semaphore (sem &optional (n 1)) + #!+sb-doc + "Increment the count of SEM by N. If there are threads waiting on +this semaphore, then N of them is woken up." + (declare (type (and fixnum (integer 1)) n)) + (with-mutex ((semaphore-mutex sem)) + (when (= n (incf (semaphore-count sem) n)) + (condition-notify (semaphore-queue sem) n)))) ;;;; job control, independent listeners @@ -464,7 +499,7 @@ returns the thread exits." #!-sb-thread (error "Not supported in unithread builds.") #!+sb-thread (let* ((thread (%make-thread :name name)) - (setup-p nil) + (setup-sem (make-semaphore :name "Thread setup semaphore")) (real-function (coerce function 'function)) (thread-sap ;; don't let the child inherit *CURRENT-THREAD* because that @@ -473,8 +508,6 @@ returns the thread exits." (%create-thread (sb!kernel:get-lisp-obj-address (lambda () - ;; FIXME: use semaphores? - (loop until setup-p) ;; in time we'll move some of the binding presently done in C ;; here too (let ((*current-thread* thread) @@ -482,6 +515,7 @@ returns the thread exits." (sb!kernel::*handler-clusters* nil) (sb!kernel::*condition-restarts* nil) (sb!impl::*descriptor-handlers* nil)) ; serve-event + (wait-on-semaphore setup-sem) ;; can't use handling-end-of-the-world, because that flushes ;; output streams, and we don't necessarily have any (or we ;; could be sharing them) @@ -515,7 +549,7 @@ returns the thread exits." (push thread *all-threads*)) (with-session-lock (*session*) (push thread (session-threads *session*))) - (setq setup-p t) + (signal-semaphore setup-sem) (sb!impl::finalize thread (lambda () (reap-dead-thread thread-sap))) thread)) diff --git a/tests/threads.impure.lisp b/tests/threads.impure.lisp index 3e1c97c..f9bec10 100644 --- a/tests/threads.impure.lisp +++ b/tests/threads.impure.lisp @@ -13,6 +13,8 @@ (in-package "SB-THREAD") ; this is white-box testing, really +(use-package :test-util) + (defun wait-for-threads (threads) (loop while (some #'sb-thread:thread-alive-p threads) do (sleep 0.01))) @@ -183,6 +185,64 @@ (format t "contention ~A ~A~%" kid1 kid2) (wait-for-threads (list kid1 kid2))))) +;;; semaphores + +(defmacro raises-timeout-p (&body body) + `(handler-case (progn (progn ,@body) nil) + (sb-ext:timeout () t))) + +(with-test (:name (:semaphore :wait-forever)) + (let ((sem (make-semaphore :count 0))) + (assert (raises-timeout-p + (sb-ext:with-timeout 0.1 + (wait-on-semaphore sem)))))) + +(with-test (:name (:semaphore :initial-count)) + (let ((sem (make-semaphore :count 1))) + (sb-ext:with-timeout 0.1 + (wait-on-semaphore sem)))) + +(with-test (:name (:semaphore :wait-then-signal)) + (let ((sem (make-semaphore)) + (signalled-p nil)) + (make-thread (lambda () + (sleep 0.1) + (setq signalled-p t) + (signal-semaphore sem))) + (wait-on-semaphore sem) + (assert signalled-p))) + +(with-test (:name (:semaphore :signal-then-wait)) + (let ((sem (make-semaphore)) + (signalled-p nil)) + (make-thread (lambda () + (signal-semaphore sem) + (setq signalled-p t))) + (loop until signalled-p) + (wait-on-semaphore sem) + (assert signalled-p))) + +(with-test (:name (:semaphore :multiple-signals)) + (let* ((sem (make-semaphore :count 5)) + (threads (loop repeat 20 + collect (make-thread (lambda () + (wait-on-semaphore sem)))))) + (flet ((count-live-threads () + (count-if #'thread-alive-p threads))) + (sleep 0.5) + (assert (= 15 (count-live-threads))) + (signal-semaphore sem 10) + (sleep 0.5) + (assert (= 5 (count-live-threads))) + (signal-semaphore sem 3) + (sleep 0.5) + (assert (= 2 (count-live-threads))) + (signal-semaphore sem 4) + (sleep 0.5) + (assert (= 0 (count-live-threads)))))) + +(format t "~&semaphore tests done~%") + (defun test-interrupt (function-to-interrupt &optional quit-p) (let ((child (make-thread function-to-interrupt))) ;;(format t "gdb ./src/runtime/sbcl ~A~%attach ~A~%" child child) diff --git a/version.lisp-expr b/version.lisp-expr index 3025ea7..782d259 100644 --- a/version.lisp-expr +++ b/version.lisp-expr @@ -17,4 +17,4 @@ ;;; checkins which aren't released. (And occasionally for internal ;;; versions, especially for internal versions off the main CVS ;;; branch, it gets hairier, e.g. "0.pre7.14.flaky4.13".) -"0.9.4.19" +"0.9.4.20" -- 1.7.10.4