X-Git-Url: http://repo.macrolet.net/gitweb/?a=blobdiff_plain;f=src%2Fcode%2Ffd-stream.lisp;h=9a7ce726876ad137238f1f3de54f326afc626026;hb=6a0601ab48635465ad3400c290e5cfbca28e5367;hp=374d10bee741db199582187a56236e9300ea78f8;hpb=34652b637f023fb24cf76df53e6a1936e94ce9ec;p=sbcl.git diff --git a/src/code/fd-stream.lisp b/src/code/fd-stream.lisp index 374d10b..9a7ce72 100644 --- a/src/code/fd-stream.lisp +++ b/src/code/fd-stream.lisp @@ -16,12 +16,29 @@ ;;;; Streams hold BUFFER objects, which contain a SAP, size of the ;;;; memory area the SAP stands for (LENGTH bytes), and HEAD and TAIL ;;;; indexes which delimit the "valid", or "active" area of the -;;;; memory. +;;;; memory. HEAD is inclusive, TAIL is exclusive. ;;;; ;;;; Buffers get allocated lazily, and are recycled by returning them ;;;; to the *AVAILABLE-BUFFERS* list. Every buffer has it's own ;;;; finalizer, to take care of releasing the SAP memory when a stream ;;;; is not properly closed. +;;;; +;;;; The code aims to provide a limited form of thread and interrupt +;;;; safety: parallel writes and reads may lose output or input, cause +;;;; interleaved IO, etc -- but they should not corrupt memory. The +;;;; key to doing this is to read buffer state once, and update the +;;;; state based on the read state: +;;;; +;;;; (let ((tail (buffer-tail buffer))) +;;;; ... +;;;; (setf (buffer-tail buffer) (+ tail n))) +;;;; +;;;; NOT +;;;; +;;;; (let ((tail (buffer-tail buffer))) +;;;; ... +;;;; (incf (buffer-tail buffer) n)) +;;;; (declaim (inline buffer-sap buffer-length buffer-head buffer-tail (setf buffer-head) (setf buffer-tail))) @@ -65,8 +82,11 @@ (without-interrupts (let* ((sap (allocate-system-memory size)) (buffer (%make-buffer sap size))) + (when (zerop (sap-int sap)) + (error "Could not allocate ~D bytes for buffer." size)) (finalize buffer (lambda () - (deallocate-system-memory sap size))) + (deallocate-system-memory sap size)) + :dont-save t) buffer))) (defun get-buffer () @@ -97,7 +117,7 @@ (let ((ibuf (fd-stream-ibuf fd-stream)) (obuf (fd-stream-obuf fd-stream)) (queue (loop for item in (fd-stream-output-queue fd-stream) - when (bufferp item) + when (buffer-p item) collect (reset-buffer item)))) (when ibuf (push (reset-buffer ibuf) queue)) @@ -181,16 +201,19 @@ (error ":END before :START!")) (when (> end start) ;; Copy bytes from THING to buffers. - (flet ((copy-to-buffer (buffer offset count) - (declare (buffer buffer) (index offset count)) + (flet ((copy-to-buffer (buffer tail count) + (declare (buffer buffer) (index tail count)) (aver (plusp count)) (let ((sap (buffer-sap buffer))) (etypecase thing (system-area-pointer - (system-area-ub8-copy thing start sap offset count)) + (system-area-ub8-copy thing start sap tail count)) ((simple-unboxed-array (*)) - (copy-ub8-to-system-area thing start sap offset count)))) - (incf (buffer-tail buffer) count) + (copy-ub8-to-system-area thing start sap tail count)))) + ;; Not INCF! If another thread has moved tail from under + ;; us, we don't want to accidentally increment tail + ;; beyond buffer-length. + (setf (buffer-tail buffer) (+ count tail)) (incf start count))) (tagbody ;; First copy is special: the buffer may already contain @@ -202,12 +225,13 @@ (copy-to-buffer obuf tail (min space (- end start))) (go :more-output-p))) :flush-and-fill - ;; Later copies always have an empty buffer, since they are freshly - ;; flushed. + ;; Later copies should always have an empty buffer, since + ;; they are freshly flushed, but if another thread is + ;; stomping on the same buffer that might not be the case. (let* ((obuf (flush-output-buffer stream)) - (offset (buffer-tail obuf))) - (aver (zerop offset)) - (copy-to-buffer obuf offset (min (buffer-length obuf) (- end start)))) + (tail (buffer-tail obuf)) + (space (- (buffer-length obuf) tail))) + (copy-to-buffer obuf tail (min space (- end start)))) :more-output-p (when (> end start) (go :flush-and-fill)))))) @@ -237,20 +261,24 @@ (synchronize-stream-output stream) (let ((length (- tail head))) (multiple-value-bind (count errno) - (sb!unix:unix-write (fd-stream-fd stream) (buffer-sap obuf) head length) + (sb!unix:unix-write (fd-stream-fd stream) (buffer-sap obuf) + head length) (cond ((eql count length) ;; Complete write -- we can use the same buffer. (reset-buffer obuf)) (count ;; Partial write -- update buffer status and queue. - (incf (buffer-head obuf) count) + ;; Do not use INCF! Another thread might have moved + ;; head... + (setf (buffer-head obuf) (+ count head)) (%queue-and-replace-output-buffer stream)) #!-win32 ((eql errno sb!unix:ewouldblock) ;; Blocking, queue. (%queue-and-replace-output-buffer stream)) (t - (simple-stream-perror "Couldn't write to ~s" stream errno))))))))))) + (simple-stream-perror "Couldn't write to ~s" + stream errno))))))))))) ;;; Helper for FLUSH-OUTPUT-BUFFER -- returns the new buffer. (defun %queue-and-replace-output-buffer (stream) @@ -286,8 +314,10 @@ (head (buffer-head buffer)) (length (- (buffer-tail buffer) head))) (declare (index head length)) + (aver (>= length 0)) (multiple-value-bind (count errno) - (sb!unix:unix-write (fd-stream-fd stream) (buffer-sap buffer) head length) + (sb!unix:unix-write (fd-stream-fd stream) (buffer-sap buffer) + head length) (cond ((eql count length) ;; Complete write, see if we can do another right ;; away, or remove the handler if we're done. @@ -303,7 +333,8 @@ (count ;; Partial write. Update buffer status and requeue. (aver (< count length)) - (incf (buffer-head buffer) (or count 0)) + ;; Do not use INCF! Another thread might have moved head. + (setf (buffer-head buffer) (+ head count)) (push buffer (fd-stream-output-queue stream))) (not-first-p ;; We tried to do multiple writes, and finally our @@ -315,8 +346,9 @@ (simple-stream-perror "Couldn't write to ~S." stream errno) #!-win32 (if (= errno sb!unix:ewouldblock) - (bug "Unexpected blocking write in WRITE-OUTPUT-FROM-QUEUE.") - (simple-stream-perror "Couldn't write to ~S" stream errno)))))))) + (bug "Unexpected blocking in WRITE-OUTPUT-FROM-QUEUE.") + (simple-stream-perror "Couldn't write to ~S" + stream errno)))))))) nil) ;;; Try to write THING directly to STREAM without buffering, if @@ -454,29 +486,30 @@ (defun fd-stream-output-finished-p (stream) (let ((obuf (fd-stream-obuf stream))) (or (not obuf) - (and (zerop (buffer-tail obuf))) - (not (fd-stream-output-queue stream))))) + (and (zerop (buffer-tail obuf)) + (not (fd-stream-output-queue stream)))))) (defmacro output-wrapper/variable-width ((stream size buffering restart) &body body) (let ((stream-var (gensym "STREAM"))) `(let* ((,stream-var ,stream) (obuf (fd-stream-obuf ,stream-var)) + (tail (buffer-tail obuf)) (size ,size)) ,(unless (eq (car buffering) :none) - `(when (< (buffer-length obuf) - (+ (buffer-tail obuf) size)) - (setf obuf (flush-output-buffer ,stream-var)))) + `(when (<= (buffer-length obuf) (+ tail size)) + (setf obuf (flush-output-buffer ,stream-var) + tail (buffer-tail obuf)))) ,(unless (eq (car buffering) :none) ;; FIXME: Why this here? Doesn't seem necessary. `(synchronize-stream-output ,stream-var)) ,(if restart `(catch 'output-nothing ,@body - (incf (buffer-tail obuf) size)) + (setf (buffer-tail obuf) (+ tail size))) `(progn ,@body - (incf (buffer-tail obuf) size))) + (setf (buffer-tail obuf) (+ tail size)))) ,(ecase (car buffering) (:none `(flush-output-buffer ,stream-var)) @@ -489,21 +522,22 @@ (defmacro output-wrapper ((stream size buffering restart) &body body) (let ((stream-var (gensym "STREAM"))) `(let* ((,stream-var ,stream) - (obuf (fd-stream-obuf ,stream-var))) + (obuf (fd-stream-obuf ,stream-var)) + (tail (buffer-tail obuf))) ,(unless (eq (car buffering) :none) - `(when (< (buffer-length obuf) - (+ (buffer-tail obuf) ,size)) - (setf obuf (flush-output-buffer ,stream-var)))) + `(when (<= (buffer-length obuf) (+ tail ,size)) + (setf obuf (flush-output-buffer ,stream-var) + tail (buffer-tail obuf)))) ;; FIXME: Why this here? Doesn't seem necessary. ,(unless (eq (car buffering) :none) `(synchronize-stream-output ,stream-var)) ,(if restart `(catch 'output-nothing ,@body - (incf (buffer-tail obuf) ,size)) + (setf (buffer-tail obuf) (+ tail ,size))) `(progn ,@body - (incf (buffer-tail obuf) ,size))) + (setf (buffer-tail obuf) (+ tail ,size)))) ,(ecase (car buffering) (:none `(flush-output-buffer ,stream-var)) @@ -575,7 +609,7 @@ (if (eql byte #\Newline) (setf (fd-stream-char-pos stream) 0) (incf (fd-stream-char-pos stream))) - (setf (sap-ref-8 (buffer-sap obuf) (buffer-tail obuf)) + (setf (sap-ref-8 (buffer-sap obuf) tail) (char-code byte))) (def-output-routines ("OUTPUT-UNSIGNED-BYTE-~A-BUFFERED" @@ -583,7 +617,7 @@ nil (:none (unsigned-byte 8)) (:full (unsigned-byte 8))) - (setf (sap-ref-8 (buffer-sap obuf) (buffer-tail obuf)) + (setf (sap-ref-8 (buffer-sap obuf) tail) byte)) (def-output-routines ("OUTPUT-SIGNED-BYTE-~A-BUFFERED" @@ -591,7 +625,7 @@ nil (:none (signed-byte 8)) (:full (signed-byte 8))) - (setf (signed-sap-ref-8 (buffer-sap obuf) (buffer-tail obuf)) + (setf (signed-sap-ref-8 (buffer-sap obuf) tail) byte)) (def-output-routines ("OUTPUT-UNSIGNED-SHORT-~A-BUFFERED" @@ -599,7 +633,7 @@ nil (:none (unsigned-byte 16)) (:full (unsigned-byte 16))) - (setf (sap-ref-16 (buffer-sap obuf) (buffer-tail obuf)) + (setf (sap-ref-16 (buffer-sap obuf) tail) byte)) (def-output-routines ("OUTPUT-SIGNED-SHORT-~A-BUFFERED" @@ -607,7 +641,7 @@ nil (:none (signed-byte 16)) (:full (signed-byte 16))) - (setf (signed-sap-ref-16 (buffer-sap obuf) (buffer-tail obuf)) + (setf (signed-sap-ref-16 (buffer-sap obuf) tail) byte)) (def-output-routines ("OUTPUT-UNSIGNED-LONG-~A-BUFFERED" @@ -615,7 +649,7 @@ nil (:none (unsigned-byte 32)) (:full (unsigned-byte 32))) - (setf (sap-ref-32 (buffer-sap obuf) (buffer-tail obuf)) + (setf (sap-ref-32 (buffer-sap obuf) tail) byte)) (def-output-routines ("OUTPUT-SIGNED-LONG-~A-BUFFERED" @@ -623,7 +657,7 @@ nil (:none (signed-byte 32)) (:full (signed-byte 32))) - (setf (signed-sap-ref-32 (buffer-sap obuf) (buffer-tail obuf)) + (setf (signed-sap-ref-32 (buffer-sap obuf) tail) byte)) #+#.(cl:if (cl:= sb!vm:n-word-bits 64) '(and) '(or)) @@ -633,14 +667,14 @@ nil (:none (unsigned-byte 64)) (:full (unsigned-byte 64))) - (setf (sap-ref-64 (buffer-sap obuf) (buffer-tail obuf)) + (setf (sap-ref-64 (buffer-sap obuf) tail) byte)) (def-output-routines ("OUTPUT-SIGNED-LONG-LONG-~A-BUFFERED" 8 nil (:none (signed-byte 64)) (:full (signed-byte 64))) - (setf (signed-sap-ref-64 (buffer-sap obuf) (buffer-tail obuf)) + (setf (signed-sap-ref-64 (buffer-sap obuf) tail) byte))) ;;; the routine to use to output a string. If the stream is @@ -737,14 +771,14 @@ (output-wrapper (stream (/ i 8) (:none) nil) (loop for j from 0 below (/ i 8) do (setf (sap-ref-8 (buffer-sap obuf) - (+ j (buffer-tail obuf))) + (+ j tail)) (ldb (byte 8 (- i 8 (* j 8))) byte)))))) (:full (lambda (stream byte) (output-wrapper (stream (/ i 8) (:full) nil) (loop for j from 0 below (/ i 8) do (setf (sap-ref-8 (buffer-sap obuf) - (+ j (buffer-tail obuf))) + (+ j tail)) (ldb (byte 8 (- i 8 (* j 8))) byte))))))) `(unsigned-byte ,i) (/ i 8)))) @@ -758,14 +792,14 @@ (output-wrapper (stream (/ i 8) (:none) nil) (loop for j from 0 below (/ i 8) do (setf (sap-ref-8 (buffer-sap obuf) - (+ j (buffer-tail obuf))) + (+ j tail)) (ldb (byte 8 (- i 8 (* j 8))) byte)))))) (:full (lambda (stream byte) (output-wrapper (stream (/ i 8) (:full) nil) (loop for j from 0 below (/ i 8) do (setf (sap-ref-8 (buffer-sap obuf) - (+ j (buffer-tail obuf))) + (+ j tail)) (ldb (byte 8 (- i 8 (* j 8))) byte))))))) `(signed-byte ,i) (/ i 8))))) @@ -860,7 +894,6 @@ (buffer-head ibuf) head tail n (buffer-tail ibuf) tail))))) - (setf (fd-stream-listen stream) nil) (setf (values count errno) (sb!unix:unix-read fd (sap+ sap tail) (- length tail))) @@ -876,8 +909,8 @@ (/show0 "THROWing EOF-INPUT-CATCHER") (throw 'eof-input-catcher nil)) (t - ;; Success! - (incf (buffer-tail ibuf) count)))))) + ;; Success! (Do not use INCF, for sake of other threads.) + (setf (buffer-tail ibuf) (+ count tail))))))) count)) ;;; Make sure there are at least BYTES number of bytes in the input @@ -1201,7 +1234,7 @@ (declare (type index start end)) (synchronize-stream-output stream) (unless (<= 0 start end (length string)) - (signal-bounding-indices-bad-error string start end)) + (sequence-bounding-indices-bad-error string start end)) (do () ((= end start)) (let ((obuf (fd-stream-obuf stream))) @@ -1378,7 +1411,7 @@ (declare (type index start end)) (synchronize-stream-output stream) (unless (<= 0 start end (length string)) - (signal-bounding-indices-bad-error string start end)) + (sequence-bounding-indices-bad string start end)) (do () ((= end start)) (let ((obuf (fd-stream-obuf stream))) @@ -2262,7 +2295,8 @@ (sb!unix:unix-close fd) #!+sb-show (format *terminal-io* "** closed file descriptor ~W **~%" - fd)))) + fd)) + :dont-save t)) stream)) ;;; Pick a name to use for the backup file for the :IF-EXISTS @@ -2496,9 +2530,19 @@ (setf *trace-output* *standard-output*) (values)) +(defun stream-deinit () + ;; Unbind to make sure we're not accidently dealing with it + ;; before we're ready (or after we think it's been deinitialized). + (with-available-buffers-lock () + (without-package-locks + (makunbound '*available-buffers*)))) + ;;; This is called whenever a saved core is restarted. -(defun stream-reinit () - (setf *available-buffers* nil) +(defun stream-reinit (&optional init-buffers-p) + (when init-buffers-p + (with-available-buffers-lock () + (aver (not (boundp '*available-buffers*))) + (setf *available-buffers* nil))) (with-output-to-string (*error-output*) (setf *stdin* (make-fd-stream 0 :name "standard input" :input t :buffering :line