Allow synchronous win32 I/O to be interrupted, too
authorDavid Lichteblau <david@lichteblau.com>
Fri, 5 Oct 2012 20:10:53 +0000 (22:10 +0200)
committerDavid Lichteblau <david@lichteblau.com>
Fri, 2 Nov 2012 12:23:19 +0000 (13:23 +0100)
... if and only if running on a version of Windows new enough to
support doing so.  Two scenarios come to mind where synchronous (i.e.
non-overlapped) I/O might matter:

  - There is one kind of HANDLE which is never overlapped: Unnamed
    pipes.  Unlike named pipes, the feature added by this commit is
    our only option of interrupting I/O on the former.

  - User code might pass in a HANDLE through MAKE-FD-STREAM without
    the right flag set.  In principle, non-interruptibily of such a
    HANDLE is a bug in said user code, but it doesn't hurt to deal
    with these correctly as a side benefit.  (The only Windows
    releases which support re-opening of a HANDLE with the right
    flag also have the functions needed by this commit.)

One downside for users might be an element of surprise, in that the
same SBCL binary will exhibit the presence or lack of features,
respectively, when started on recent Windows or old Windows.  However,
the advantages of offering the feature seem to me to outweigh that
disadvantage.

Thanks to Anton Kovalenko.

src/compiler/generic/objdef.lisp
src/runtime/print.c
src/runtime/safepoint.c
src/runtime/thread.c
src/runtime/win32-os.c
src/runtime/win32-os.h
tests/threads.impure.lisp

index 35e7cb5..b4de2df 100644 (file)
   #!+(and sb-safepoint x86) (selfptr :c-type "struct thread *")
   #!+sb-safepoint (csp-around-foreign-call :c-type "lispobj *")
   #!+sb-safepoint (pc-around-foreign-call :c-type "lispobj *")
   #!+(and sb-safepoint x86) (selfptr :c-type "struct thread *")
   #!+sb-safepoint (csp-around-foreign-call :c-type "lispobj *")
   #!+sb-safepoint (pc-around-foreign-call :c-type "lispobj *")
+  #!+win32 (synchronous-io-handle-and-flag :c-type "HANDLE" :length 1)
   ;; KLUDGE: On alpha, until STEPPING we have been lucky and the 32
   ;; bit slots came in pairs. However the C compiler will align
   ;; interrupt_contexts on a double word boundary. This logic should
   ;; KLUDGE: On alpha, until STEPPING we have been lucky and the 32
   ;; bit slots came in pairs. However the C compiler will align
   ;; interrupt_contexts on a double word boundary. This logic should
index fc3f235..cf47f55 100644 (file)
@@ -145,9 +145,8 @@ vodxprint_fun(const char *fmt, va_list args)
 {
 #ifdef LISP_FEATURE_WIN32
     DWORD lastError = GetLastError();
 {
 #ifdef LISP_FEATURE_WIN32
     DWORD lastError = GetLastError();
-#else
-    int original_errno = errno;
 #endif
 #endif
+    int original_errno = errno;
 
     QSHOW_BLOCK;
 
 
     QSHOW_BLOCK;
 
@@ -184,9 +183,8 @@ vodxprint_fun(const char *fmt, va_list args)
 
 #ifdef LISP_FEATURE_WIN32
     SetLastError(lastError);
 
 #ifdef LISP_FEATURE_WIN32
     SetLastError(lastError);
-#else
-    errno = original_errno;
 #endif
 #endif
+    errno = original_errno;
 }
 
 /* Translate the rather awkward syntax
 }
 
 /* Translate the rather awkward syntax
index 08b56d7..d94a61e 100644 (file)
@@ -831,6 +831,7 @@ void
 wake_thread_io(struct thread * thread)
 {
     SetEvent(thread->private_events.events[1]);
 wake_thread_io(struct thread * thread)
 {
     SetEvent(thread->private_events.events[1]);
+    win32_maybe_interrupt_io(thread);
 }
 
 void
 }
 
 void
index cc9eebd..4f0f618 100644 (file)
@@ -674,6 +674,7 @@ create_thread_struct(lispobj initial_function) {
            sizeof(th->private_events.events[0]); ++i) {
       th->private_events.events[i] = CreateEvent(NULL,FALSE,FALSE,NULL);
     }
            sizeof(th->private_events.events[0]); ++i) {
       th->private_events.events[i] = CreateEvent(NULL,FALSE,FALSE,NULL);
     }
+    th->synchronous_io_handle_and_flag = 0;
 #endif
     th->stepping = NIL;
     return th;
 #endif
     th->stepping = NIL;
     return th;
index 5aa33f7..c43cfed 100644 (file)
@@ -378,6 +378,28 @@ void os_preinit()
 
 int os_number_of_processors = 1;
 
 
 int os_number_of_processors = 1;
 
+BOOL WINAPI CancelIoEx(HANDLE handle, LPOVERLAPPED overlapped);
+typeof(CancelIoEx) *ptr_CancelIoEx;
+BOOL WINAPI CancelSynchronousIo(HANDLE threadHandle);
+typeof(CancelSynchronousIo) *ptr_CancelSynchronousIo;
+
+#define RESOLVE(hmodule,fn)                     \
+    do {                                        \
+        ptr_##fn = (typeof(ptr_##fn))           \
+            GetProcAddress(hmodule,#fn);        \
+    } while (0)
+
+static void resolve_optional_imports()
+{
+    HMODULE kernel32 = GetModuleHandleA("kernel32");
+    if (kernel32) {
+        RESOLVE(kernel32,CancelIoEx);
+        RESOLVE(kernel32,CancelSynchronousIo);
+    }
+}
+
+#undef RESOLVE
+
 void os_init(char *argv[], char *envp[])
 {
     SYSTEM_INFO system_info;
 void os_init(char *argv[], char *envp[])
 {
     SYSTEM_INFO system_info;
@@ -390,6 +412,8 @@ void os_init(char *argv[], char *envp[])
     os_number_of_processors = system_info.dwNumberOfProcessors;
 
     base_seh_frame = get_seh_frame();
     os_number_of_processors = system_info.dwNumberOfProcessors;
 
     base_seh_frame = get_seh_frame();
+
+    resolve_optional_imports();
 }
 
 static inline boolean local_thread_stack_address_p(os_vm_address_t address)
 }
 
 static inline boolean local_thread_stack_address_p(os_vm_address_t address)
@@ -1103,6 +1127,118 @@ console_handle_p(HANDLE handle)
         ((((int)(intptr_t)handle)&3)==3);
 }
 
         ((((int)(intptr_t)handle)&3)==3);
 }
 
+/* Atomically mark current thread as (probably) doing synchronous I/O
+ * on handle, if no cancellation is requested yet (and return TRUE),
+ * otherwise clear thread's I/O cancellation flag and return false.
+ */
+static
+boolean io_begin_interruptible(HANDLE handle)
+{
+    /* No point in doing it unless OS supports cancellation from other
+     * threads */
+    if (!ptr_CancelIoEx)
+        return 1;
+
+    if (!__sync_bool_compare_and_swap(&this_thread->synchronous_io_handle_and_flag,
+                                      0, handle)) {
+        ResetEvent(this_thread->private_events.events[0]);
+        this_thread->synchronous_io_handle_and_flag = 0;
+        return 0;
+    }
+    return 1;
+}
+
+/* Unmark current thread as (probably) doing synchronous I/O; if an
+ * I/O cancellation was requested, postpone it until next
+ * io_begin_interruptible */
+static void
+io_end_interruptible(HANDLE handle)
+{
+    if (!ptr_CancelIoEx)
+        return;
+    __sync_bool_compare_and_swap(&this_thread->synchronous_io_handle_and_flag,
+                                 handle, 0);
+}
+
+boolean
+win32_maybe_interrupt_io(void* thread)
+{
+    struct thread *th = thread;
+    boolean done = 0;
+    /* Kludge. (?)
+     *
+     * ICBW about all of this.  But it seems to me that this procedure is
+     * a race condition.  In theory.  One that is hard produce (I can't
+     * come up with a test case that exploits it), and might only be a bug
+     * if users are doing weird things with I/O, possibly from FFI.  But a
+     * race is a race, so shouldn't this function and io_end_interruptible
+     * cooperate more?
+     *
+     * Here's my thinking:
+     *
+     * A.. <interruptee thread>
+     *     ... stuffs its handle into its structure.
+     * B.. <interrupter thread>
+     *     ... calls us to wake the thread, finds the handle.
+     *     But just before we actually call CancelSynchronousIo/CancelIoEx,
+     *     something weird happens in the scheduler and the system is
+     *     so extremely busy that the interrupter doesn't get scheduled
+     *     for a while, giving the interruptee lots of time to continue.
+     * A.. Didn't actually have to block, calls io_end_interruptible (in
+     *     which the handle flag already invalid, but it doesn't care
+     *     about that and still continues).
+     *     ... Proceeds to do unrelated I/O, e.g. goes into FFI code
+     *     (possible, because the CSP page hasn't been armed yet), which
+     *     does I/O from a C library, completely unrelated to SBCL's
+     *     routines.
+     * B.. The scheduler gives us time for the interrupter again.
+     *     We call CancelSynchronousIo/CancelIoEx.
+     * A.. Interruptee gets an expected error in unrelated I/O during FFI.
+     *     Interruptee's C code is unhappy and dies.
+     *
+     * Note that CancelSynchronousIo and CancelIoEx have a rather different
+     * effect here.  In the normal (CancelIoEx) case, we only ever kill
+     * I/O on the file handle in question.  I think we could ask users
+     * to please not both use Lisp streams (unix-read/write) _and_ FFI code
+     * on the same file handle in quick succession.
+     *
+     * CancelSynchronousIo seems more dangerous though.  Here we interrupt
+     * I/O on any other handle, even ones we're not actually responsible for,
+     * because this functions deals with the thread handle, not the file
+     * handle.
+     *
+     * Options:
+     *  - Use mutexes.  Somewhere, somehow.  Presumably one mutex per
+     *    target thread, acquired around win32_maybe_interrupt_io and
+     *    io_end_interruptible.  (That's one mutex use per I/O
+     *    operation, but I can't imagine that compared to our FFI overhead
+     *    that's much of a problem.)
+     *  - In io_end_interruptible, detect that the flag has been
+     *    invalidated, and in that case, do something clever (what?) to
+     *    wait for the imminent gc_stop_the_world, which implicitly tells
+     *    us that win32_maybe_interrupt_io must have exited.  Except if
+     *    some _third_ thread is also beginning to call interrupt-thread
+     *    and wake_thread at the same time...?
+     *  - Revert the whole CancelSynchronousIo business after all.
+     *  - I'm wrong and everything is OK already.
+     */
+    if (ptr_CancelIoEx) {
+        HANDLE h = (HANDLE)
+            InterlockedExchangePointer((volatile LPVOID *)
+                                       &th->synchronous_io_handle_and_flag,
+                                       (LPVOID)INVALID_HANDLE_VALUE);
+        if (h && (h!=INVALID_HANDLE_VALUE)) {
+            if (ptr_CancelSynchronousIo) {
+                pthread_mutex_lock(&th->os_thread->fiber_lock);
+                done = ptr_CancelSynchronousIo(th->os_thread->fiber_group->handle);
+                pthread_mutex_unlock(&th->os_thread->fiber_lock);
+            }
+            return (!!done)|(!!ptr_CancelIoEx(h,NULL));
+        }
+    }
+    return 0;
+}
+
 static const LARGE_INTEGER zero_large_offset = {.QuadPart = 0LL};
 
 int
 static const LARGE_INTEGER zero_large_offset = {.QuadPart = 0LL};
 
 int
@@ -1133,12 +1269,23 @@ win32_unix_write(FDTYPE fd, void * buf, int count)
         overlapped.Offset = 0;
         overlapped.OffsetHigh = 0;
     }
         overlapped.Offset = 0;
         overlapped.OffsetHigh = 0;
     }
+    if (!io_begin_interruptible(handle)) {
+        errno = EINTR;
+        return -1;
+    }
     ok = WriteFile(handle, buf, count, &written_bytes, &overlapped);
     ok = WriteFile(handle, buf, count, &written_bytes, &overlapped);
+    io_end_interruptible(handle);
 
     if (ok) {
         goto done_something;
     } else {
 
     if (ok) {
         goto done_something;
     } else {
-        if (GetLastError()!=ERROR_IO_PENDING) {
+        DWORD errorCode = GetLastError();
+        if (errorCode==ERROR_OPERATION_ABORTED) {
+            GetOverlappedResult(handle,&overlapped,&written_bytes,FALSE);
+            errno = EINTR;
+            return -1;
+        }
+        if (errorCode!=ERROR_IO_PENDING) {
             errno = EIO;
             return -1;
         } else {
             errno = EIO;
             return -1;
         } else {
@@ -1184,13 +1331,8 @@ win32_unix_read(FDTYPE fd, void * buf, int count)
 
     handle = (HANDLE)maybe_get_osfhandle(fd);
 
 
     handle = (HANDLE)maybe_get_osfhandle(fd);
 
-    if (console_handle_p(handle)) {
-        /* 1. Console is a singleton.
-           2. The only way to cancel console handle I/O is to close it.
-        */
     if (console_handle_p(handle))
         return read(fd, buf, count);
     if (console_handle_p(handle))
         return read(fd, buf, count);
-    }
     overlapped.hEvent = self->private_events.events[0];
     /* If it has a position, we won't try overlapped */
     seekable = SetFilePointerEx(handle,
     overlapped.hEvent = self->private_events.events[0];
     /* If it has a position, we won't try overlapped */
     seekable = SetFilePointerEx(handle,
@@ -1204,7 +1346,12 @@ win32_unix_read(FDTYPE fd, void * buf, int count)
         overlapped.Offset = 0;
         overlapped.OffsetHigh = 0;
     }
         overlapped.Offset = 0;
         overlapped.OffsetHigh = 0;
     }
+    if (!io_begin_interruptible(handle)) {
+        errno = EINTR;
+        return -1;
+    }
     ok = ReadFile(handle,buf,count,&read_bytes, &overlapped);
     ok = ReadFile(handle,buf,count,&read_bytes, &overlapped);
+    io_end_interruptible(handle);
     if (ok) {
         /* immediately */
         goto done_something;
     if (ok) {
         /* immediately */
         goto done_something;
@@ -1216,6 +1363,11 @@ win32_unix_read(FDTYPE fd, void * buf, int count)
             read_bytes = 0;
             goto done_something;
         }
             read_bytes = 0;
             goto done_something;
         }
+        if (errorCode==ERROR_OPERATION_ABORTED) {
+            GetOverlappedResult(handle,&overlapped,&read_bytes,FALSE);
+            errno = EINTR;
+            return -1;
+        }
         if (errorCode!=ERROR_IO_PENDING) {
             /* is it some _real_ error? */
             errno = EIO;
         if (errorCode!=ERROR_IO_PENDING) {
             /* is it some _real_ error? */
             errno = EIO;
index 9fca92e..acc8b10 100644 (file)
@@ -83,6 +83,8 @@ char *dirname(char *path);
 
 void os_invalidate_free(os_vm_address_t addr, os_vm_size_t len);
 
 
 void os_invalidate_free(os_vm_address_t addr, os_vm_size_t len);
 
+boolean win32_maybe_interrupt_io(void* thread);
+
 #define bcopy(src,dest,n) memmove(dest,src,n)
 
 struct thread;
 #define bcopy(src,dest,n) memmove(dest,src,n)
 
 struct thread;
index 18835aa..0e3848b 100644 (file)
           (funcall get lock)
           (funcall release lock)
           (assert (eq t (funcall with lock))))))))
           (funcall get lock)
           (funcall release lock)
           (assert (eq t (funcall with lock))))))))
+
+(with-test (:name :interrupt-io-unnamed-pipe)
+  (let (result)
+    (labels
+        ((reader (fd)
+           (let ((stream (sb-sys:make-fd-stream fd
+                                                :element-type :default
+                                                :serve-events nil)))
+             (time
+              (let ((ok (handler-case
+                            (catch 'stop
+                              (progn
+                                (read-char stream)
+                                (sleep 0.1)
+                                (sleep 0.1)
+                                (sleep 0.1)))
+                          (error (c)
+                            c))))
+                (setf result ok)
+                (progn
+                  (format *trace-output* "~&=> ~A~%" ok)
+                  (force-output *trace-output*))))
+             (sleep 2)
+             (ignore-errors (close stream))))
+
+         (writer ()
+           (multiple-value-bind (read write)
+               (sb-unix:unix-pipe)
+             (let* ((reader (sb-thread:make-thread (lambda () (reader read))))
+                    (stream (sb-sys:make-fd-stream write
+                                                   :output t
+                                                   :element-type :default
+                                                   :serve-events nil))
+                    (ok :ok))
+               (sleep 1)
+               (sb-thread:interrupt-thread reader (lambda ()
+                                                    (print :throwing)
+                                                    (force-output)
+                                                    (throw 'stop ok)))
+               (sleep 1)
+               (setf ok :not-ok)
+               (write-char #\x stream)
+               (close stream)
+               (sb-thread:join-thread reader)))))
+      (writer))
+    (assert (eq result :ok))))