;;;; Streams hold BUFFER objects, which contain a SAP, size of the
;;;; memory area the SAP stands for (LENGTH bytes), and HEAD and TAIL
;;;; indexes which delimit the "valid", or "active" area of the
-;;;; memory.
+;;;; memory. HEAD is inclusive, TAIL is exclusive.
;;;;
;;;; Buffers get allocated lazily, and are recycled by returning them
;;;; to the *AVAILABLE-BUFFERS* list. Every buffer has it's own
;;;; finalizer, to take care of releasing the SAP memory when a stream
;;;; is not properly closed.
+;;;;
+;;;; The code aims to provide a limited form of thread and interrupt
+;;;; safety: parallel writes and reads may lose output or input, cause
+;;;; interleaved IO, etc -- but they should not corrupt memory. The
+;;;; key to doing this is to read buffer state once, and update the
+;;;; state based on the read state:
+;;;;
+;;;; (let ((tail (buffer-tail buffer)))
+;;;; ...
+;;;; (setf (buffer-tail buffer) (+ tail n)))
+;;;;
+;;;; NOT
+;;;;
+;;;; (let ((tail (buffer-tail buffer)))
+;;;; ...
+;;;; (incf (buffer-tail buffer) n))
+;;;;
(declaim (inline buffer-sap buffer-length buffer-head buffer-tail
(setf buffer-head) (setf buffer-tail)))
(without-interrupts
(let* ((sap (allocate-system-memory size))
(buffer (%make-buffer sap size)))
+ (when (zerop (sap-int sap))
+ (error "Could not allocate ~D bytes for buffer." size))
(finalize buffer (lambda ()
- (deallocate-system-memory sap size)))
+ (deallocate-system-memory sap size))
+ :dont-save t)
buffer)))
(defun get-buffer ()
(error ":END before :START!"))
(when (> end start)
;; Copy bytes from THING to buffers.
- (flet ((copy-to-buffer (buffer offset count)
- (declare (buffer buffer) (index offset count))
+ (flet ((copy-to-buffer (buffer tail count)
+ (declare (buffer buffer) (index tail count))
(aver (plusp count))
(let ((sap (buffer-sap buffer)))
(etypecase thing
(system-area-pointer
- (system-area-ub8-copy thing start sap offset count))
+ (system-area-ub8-copy thing start sap tail count))
((simple-unboxed-array (*))
- (copy-ub8-to-system-area thing start sap offset count))))
- (incf (buffer-tail buffer) count)
+ (copy-ub8-to-system-area thing start sap tail count))))
+ ;; Not INCF! If another thread has moved tail from under
+ ;; us, we don't want to accidentally increment tail
+ ;; beyond buffer-length.
+ (setf (buffer-tail buffer) (+ count tail))
(incf start count)))
(tagbody
;; First copy is special: the buffer may already contain
(copy-to-buffer obuf tail (min space (- end start)))
(go :more-output-p)))
:flush-and-fill
- ;; Later copies always have an empty buffer, since they are freshly
- ;; flushed.
+ ;; Later copies should always have an empty buffer, since
+ ;; they are freshly flushed, but if another thread is
+ ;; stomping on the same buffer that might not be the case.
(let* ((obuf (flush-output-buffer stream))
- (offset (buffer-tail obuf)))
- (aver (zerop offset))
- (copy-to-buffer obuf offset (min (buffer-length obuf) (- end start))))
+ (tail (buffer-tail obuf))
+ (space (- (buffer-length obuf) tail)))
+ (copy-to-buffer obuf tail (min space (- end start))))
:more-output-p
(when (> end start)
(go :flush-and-fill))))))
(synchronize-stream-output stream)
(let ((length (- tail head)))
(multiple-value-bind (count errno)
- (sb!unix:unix-write (fd-stream-fd stream) (buffer-sap obuf) head length)
+ (sb!unix:unix-write (fd-stream-fd stream) (buffer-sap obuf)
+ head length)
(cond ((eql count length)
;; Complete write -- we can use the same buffer.
(reset-buffer obuf))
(count
;; Partial write -- update buffer status and queue.
- (incf (buffer-head obuf) count)
+ ;; Do not use INCF! Another thread might have moved
+ ;; head...
+ (setf (buffer-head obuf) (+ count head))
(%queue-and-replace-output-buffer stream))
#!-win32
((eql errno sb!unix:ewouldblock)
;; Blocking, queue.
(%queue-and-replace-output-buffer stream))
(t
- (simple-stream-perror "Couldn't write to ~s" stream errno)))))))))))
+ (simple-stream-perror "Couldn't write to ~s"
+ stream errno)))))))))))
;;; Helper for FLUSH-OUTPUT-BUFFER -- returns the new buffer.
(defun %queue-and-replace-output-buffer (stream)
(head (buffer-head buffer))
(length (- (buffer-tail buffer) head)))
(declare (index head length))
+ (aver (>= length 0))
(multiple-value-bind (count errno)
- (sb!unix:unix-write (fd-stream-fd stream) (buffer-sap buffer) head length)
+ (sb!unix:unix-write (fd-stream-fd stream) (buffer-sap buffer)
+ head length)
(cond ((eql count length)
;; Complete write, see if we can do another right
;; away, or remove the handler if we're done.
(count
;; Partial write. Update buffer status and requeue.
(aver (< count length))
- (incf (buffer-head buffer) (or count 0))
+ ;; Do not use INCF! Another thread might have moved head.
+ (setf (buffer-head buffer) (+ head count))
(push buffer (fd-stream-output-queue stream)))
(not-first-p
;; We tried to do multiple writes, and finally our
(simple-stream-perror "Couldn't write to ~S." stream errno)
#!-win32
(if (= errno sb!unix:ewouldblock)
- (bug "Unexpected blocking write in WRITE-OUTPUT-FROM-QUEUE.")
- (simple-stream-perror "Couldn't write to ~S" stream errno))))))))
+ (bug "Unexpected blocking in WRITE-OUTPUT-FROM-QUEUE.")
+ (simple-stream-perror "Couldn't write to ~S"
+ stream errno))))))))
nil)
;;; Try to write THING directly to STREAM without buffering, if
(defun fd-stream-output-finished-p (stream)
(let ((obuf (fd-stream-obuf stream)))
(or (not obuf)
- (and (zerop (buffer-tail obuf)))
- (not (fd-stream-output-queue stream)))))
+ (and (zerop (buffer-tail obuf))
+ (not (fd-stream-output-queue stream))))))
(defmacro output-wrapper/variable-width ((stream size buffering restart)
&body body)
(let ((stream-var (gensym "STREAM")))
`(let* ((,stream-var ,stream)
(obuf (fd-stream-obuf ,stream-var))
+ (tail (buffer-tail obuf))
(size ,size))
,(unless (eq (car buffering) :none)
- `(when (< (buffer-length obuf)
- (+ (buffer-tail obuf) size))
- (setf obuf (flush-output-buffer ,stream-var))))
+ `(when (<= (buffer-length obuf) (+ tail size))
+ (setf obuf (flush-output-buffer ,stream-var)
+ tail (buffer-tail obuf))))
,(unless (eq (car buffering) :none)
;; FIXME: Why this here? Doesn't seem necessary.
`(synchronize-stream-output ,stream-var))
,(if restart
`(catch 'output-nothing
,@body
- (incf (buffer-tail obuf) size))
+ (setf (buffer-tail obuf) (+ tail size)))
`(progn
,@body
- (incf (buffer-tail obuf) size)))
+ (setf (buffer-tail obuf) (+ tail size))))
,(ecase (car buffering)
(:none
`(flush-output-buffer ,stream-var))
(defmacro output-wrapper ((stream size buffering restart) &body body)
(let ((stream-var (gensym "STREAM")))
`(let* ((,stream-var ,stream)
- (obuf (fd-stream-obuf ,stream-var)))
+ (obuf (fd-stream-obuf ,stream-var))
+ (tail (buffer-tail obuf)))
,(unless (eq (car buffering) :none)
- `(when (< (buffer-length obuf)
- (+ (buffer-tail obuf) ,size))
- (setf obuf (flush-output-buffer ,stream-var))))
+ `(when (<= (buffer-length obuf) (+ tail ,size))
+ (setf obuf (flush-output-buffer ,stream-var)
+ tail (buffer-tail obuf))))
;; FIXME: Why this here? Doesn't seem necessary.
,(unless (eq (car buffering) :none)
`(synchronize-stream-output ,stream-var))
,(if restart
`(catch 'output-nothing
,@body
- (incf (buffer-tail obuf) ,size))
+ (setf (buffer-tail obuf) (+ tail ,size)))
`(progn
,@body
- (incf (buffer-tail obuf) ,size)))
+ (setf (buffer-tail obuf) (+ tail ,size))))
,(ecase (car buffering)
(:none
`(flush-output-buffer ,stream-var))
(if (eql byte #\Newline)
(setf (fd-stream-char-pos stream) 0)
(incf (fd-stream-char-pos stream)))
- (setf (sap-ref-8 (buffer-sap obuf) (buffer-tail obuf))
+ (setf (sap-ref-8 (buffer-sap obuf) tail)
(char-code byte)))
(def-output-routines ("OUTPUT-UNSIGNED-BYTE-~A-BUFFERED"
nil
(:none (unsigned-byte 8))
(:full (unsigned-byte 8)))
- (setf (sap-ref-8 (buffer-sap obuf) (buffer-tail obuf))
+ (setf (sap-ref-8 (buffer-sap obuf) tail)
byte))
(def-output-routines ("OUTPUT-SIGNED-BYTE-~A-BUFFERED"
nil
(:none (signed-byte 8))
(:full (signed-byte 8)))
- (setf (signed-sap-ref-8 (buffer-sap obuf) (buffer-tail obuf))
+ (setf (signed-sap-ref-8 (buffer-sap obuf) tail)
byte))
(def-output-routines ("OUTPUT-UNSIGNED-SHORT-~A-BUFFERED"
nil
(:none (unsigned-byte 16))
(:full (unsigned-byte 16)))
- (setf (sap-ref-16 (buffer-sap obuf) (buffer-tail obuf))
+ (setf (sap-ref-16 (buffer-sap obuf) tail)
byte))
(def-output-routines ("OUTPUT-SIGNED-SHORT-~A-BUFFERED"
nil
(:none (signed-byte 16))
(:full (signed-byte 16)))
- (setf (signed-sap-ref-16 (buffer-sap obuf) (buffer-tail obuf))
+ (setf (signed-sap-ref-16 (buffer-sap obuf) tail)
byte))
(def-output-routines ("OUTPUT-UNSIGNED-LONG-~A-BUFFERED"
nil
(:none (unsigned-byte 32))
(:full (unsigned-byte 32)))
- (setf (sap-ref-32 (buffer-sap obuf) (buffer-tail obuf))
+ (setf (sap-ref-32 (buffer-sap obuf) tail)
byte))
(def-output-routines ("OUTPUT-SIGNED-LONG-~A-BUFFERED"
nil
(:none (signed-byte 32))
(:full (signed-byte 32)))
- (setf (signed-sap-ref-32 (buffer-sap obuf) (buffer-tail obuf))
+ (setf (signed-sap-ref-32 (buffer-sap obuf) tail)
byte))
#+#.(cl:if (cl:= sb!vm:n-word-bits 64) '(and) '(or))
nil
(:none (unsigned-byte 64))
(:full (unsigned-byte 64)))
- (setf (sap-ref-64 (buffer-sap obuf) (buffer-tail obuf))
+ (setf (sap-ref-64 (buffer-sap obuf) tail)
byte))
(def-output-routines ("OUTPUT-SIGNED-LONG-LONG-~A-BUFFERED"
8
nil
(:none (signed-byte 64))
(:full (signed-byte 64)))
- (setf (signed-sap-ref-64 (buffer-sap obuf) (buffer-tail obuf))
+ (setf (signed-sap-ref-64 (buffer-sap obuf) tail)
byte)))
;;; the routine to use to output a string. If the stream is
(output-wrapper (stream (/ i 8) (:none) nil)
(loop for j from 0 below (/ i 8)
do (setf (sap-ref-8 (buffer-sap obuf)
- (+ j (buffer-tail obuf)))
+ (+ j tail))
(ldb (byte 8 (- i 8 (* j 8))) byte))))))
(:full
(lambda (stream byte)
(output-wrapper (stream (/ i 8) (:full) nil)
(loop for j from 0 below (/ i 8)
do (setf (sap-ref-8 (buffer-sap obuf)
- (+ j (buffer-tail obuf)))
+ (+ j tail))
(ldb (byte 8 (- i 8 (* j 8))) byte)))))))
`(unsigned-byte ,i)
(/ i 8))))
(output-wrapper (stream (/ i 8) (:none) nil)
(loop for j from 0 below (/ i 8)
do (setf (sap-ref-8 (buffer-sap obuf)
- (+ j (buffer-tail obuf)))
+ (+ j tail))
(ldb (byte 8 (- i 8 (* j 8))) byte))))))
(:full
(lambda (stream byte)
(output-wrapper (stream (/ i 8) (:full) nil)
(loop for j from 0 below (/ i 8)
do (setf (sap-ref-8 (buffer-sap obuf)
- (+ j (buffer-tail obuf)))
+ (+ j tail))
(ldb (byte 8 (- i 8 (* j 8))) byte)))))))
`(signed-byte ,i)
(/ i 8)))))
(buffer-head ibuf) head
tail n
(buffer-tail ibuf) tail)))))
-
(setf (fd-stream-listen stream) nil)
(setf (values count errno)
(sb!unix:unix-read fd (sap+ sap tail) (- length tail)))
(/show0 "THROWing EOF-INPUT-CATCHER")
(throw 'eof-input-catcher nil))
(t
- ;; Success!
- (incf (buffer-tail ibuf) count))))))
+ ;; Success! (Do not use INCF, for sake of other threads.)
+ (setf (buffer-tail ibuf) (+ count tail)))))))
count))
;;; Make sure there are at least BYTES number of bytes in the input
(sb!unix:unix-close fd)
#!+sb-show
(format *terminal-io* "** closed file descriptor ~W **~%"
- fd))))
+ fd))
+ :dont-save t))
stream))
;;; Pick a name to use for the backup file for the :IF-EXISTS
(setf *trace-output* *standard-output*)
(values))
+(defun stream-deinit ()
+ ;; Unbind to make sure we're not accidently dealing with it
+ ;; before we're ready (or after we think it's been deinitialized).
+ (with-available-buffers-lock ()
+ (without-package-locks
+ (makunbound '*available-buffers*))))
+
;;; This is called whenever a saved core is restarted.
-(defun stream-reinit ()
- (setf *available-buffers* nil)
+(defun stream-reinit (&optional init-buffers-p)
+ (when init-buffers-p
+ (with-available-buffers-lock ()
+ (aver (not (boundp '*available-buffers*)))
+ (setf *available-buffers* nil)))
(with-output-to-string (*error-output*)
(setf *stdin*
(make-fd-stream 0 :name "standard input" :input t :buffering :line