(char-pos nil :type (or unsigned-byte null))
;; T if input is waiting on FD. :EOF if we hit EOF.
(listen nil :type (member nil t :eof))
+ ;; T if serve-event is allowed when this stream blocks
+ (serve-events nil :type boolean)
;; the input buffer
(instead (make-array 0 :element-type 'character :adjustable t :fill-pointer t) :type (array character (*)))
(aver (< head tail))
(%queue-and-replace-output-buffer stream))
(t
- ;; Try a non-blocking write, queue whatever is left over.
+ ;; Try a non-blocking write, if SERVE-EVENT is allowed, queue
+ ;; whatever is left over. Otherwise wait until we can write.
(aver (< head tail))
(synchronize-stream-output stream)
- (let ((length (- tail head)))
- (multiple-value-bind (count errno)
- (sb!unix:unix-write (fd-stream-fd stream) (buffer-sap obuf)
- head length)
- (cond ((eql count length)
- ;; Complete write -- we can use the same buffer.
- (reset-buffer obuf))
- (count
- ;; Partial write -- update buffer status and queue.
- ;; Do not use INCF! Another thread might have moved
- ;; head...
- (setf (buffer-head obuf) (+ count head))
- (%queue-and-replace-output-buffer stream))
- #!-win32
- ((eql errno sb!unix:ewouldblock)
- ;; Blocking, queue.
- (%queue-and-replace-output-buffer stream))
- (t
- (simple-stream-perror "Couldn't write to ~s"
- stream errno)))))))))))
+ (loop
+ (let ((length (- tail head)))
+ (multiple-value-bind (count errno)
+ (sb!unix:unix-write (fd-stream-fd stream) (buffer-sap obuf)
+ head length)
+ (flet ((queue-or-wait ()
+ (if (fd-stream-serve-events stream)
+ (return (%queue-and-replace-output-buffer stream))
+ (or (wait-until-fd-usable (fd-stream-fd stream) :output
+ (fd-stream-timeout stream)
+ nil)
+ (signal-timeout 'io-timeout
+ :stream stream
+ :direction :output
+ :seconds (fd-stream-timeout stream))))))
+ (cond ((eql count length)
+ ;; Complete write -- we can use the same buffer.
+ (return (reset-buffer obuf)))
+ (count
+ ;; Partial write -- update buffer status and
+ ;; queue or wait. Do not use INCF! Another
+ ;; thread might have moved head...
+ (setf (buffer-head obuf) (+ count head))
+ (queue-or-wait))
+ #!-win32
+ ((eql errno sb!unix:ewouldblock)
+ ;; Blocking, queue or wair.
+ (queue-or-wait))
+ (t
+ (simple-stream-perror "Couldn't write to ~s"
+ stream errno)))))))))))))
;;; Helper for FLUSH-OUTPUT-BUFFER -- returns the new buffer.
(defun %queue-and-replace-output-buffer (stream)
+ (aver (fd-stream-serve-events stream))
(let ((queue (fd-stream-output-queue stream))
(later (list (or (fd-stream-obuf stream) (bug "Missing obuf."))))
(new (get-buffer)))
;;; This is called by the FD-HANDLER for the stream when output is
;;; possible.
(defun write-output-from-queue (stream)
+ (aver (fd-stream-serve-events stream))
(synchronize-stream-output stream)
(let (not-first-p)
(tagbody
(errno 0)
(count 0))
(tagbody
- ;; Check for blocking input before touching the stream, as if
- ;; we happen to wait we are liable to be interrupted, and the
- ;; interrupt handler may use the same stream.
- (if (sysread-may-block-p stream)
+ ;; Check for blocking input before touching the stream if we are to
+ ;; serve events: if the FD is blocking, we don't want to hang on the
+ ;; write if we are to serve events or notice timeouts.
+ (if (and (or (fd-stream-serve-events stream)
+ (fd-stream-timeout stream)
+ *deadline*)
+ (sysread-may-block-p stream))
(go :wait-for-input)
(go :main))
;; These (:CLOSED-FLAME and :READ-ERROR) tags are here so what
:wait-for-input
;; This tag is here so we can unwind outside the WITHOUT-INTERRUPTS
;; to wait for input if read tells us EWOULDBLOCK.
- (unless (wait-until-fd-usable fd :input (fd-stream-timeout stream))
- (signal-timeout 'io-timeout :stream stream :direction :read
+ (unless (wait-until-fd-usable fd :input (fd-stream-timeout stream)
+ (fd-stream-serve-events stream))
+ (signal-timeout 'io-timeout
+ :stream stream
+ :direction :input
:seconds (fd-stream-timeout stream)))
:main
;; Since the read should not block, we'll disable the
;; resulting thunk is stack-allocatable.
((lambda (return-reason)
(ecase return-reason
- ((nil)) ; fast path normal cases
+ ((nil)) ; fast path normal cases
((:wait-for-input) (go :wait-for-input))
((:closed-flame) (go :closed-flame))
((:read-error) (go :read-error))))
(flush-output-buffer stream)
(do ()
((null (fd-stream-output-queue stream)))
+ (aver (fd-stream-serve-events stream))
(serve-all-events)))
(defun fd-stream-get-file-position (stream)
;;; FILE is the name of the file (will be returned by PATHNAME).
;;;
;;; NAME is used to identify the stream when printed.
+;;;
+;;; If SERVE-EVENTS is true, SERVE-EVENT machinery is used to
+;;; handle blocking IO on the stream.
(defun make-fd-stream (fd
&key
(input nil input-p)
(element-type 'base-char)
(buffering :full)
(external-format :default)
+ serve-events
timeout
file
original
:external-format external-format
:bivalent-p (eq element-type :default)
:char-size (external-format-char-size external-format)
+ :serve-events serve-events
:timeout
(if timeout
(coerce timeout 'single-float)
:delete-original delete-original
:pathname pathname
:dual-channel-p nil
+ :serve-events nil
:input-buffer-p t
:auto-close t))
(:probe
(setf *stdin*
(make-fd-stream 0 :name "standard input" :input t :buffering :line
:element-type :default
+ :serve-events t
:external-format (stdstream-external-format nil)))
(setf *stdout*
(make-fd-stream 1 :name "standard output" :output t :buffering :line