Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

InputStream as passed a body gets closed by different thread when client closes connection prematurely #535

Open
mikroskeem opened this issue Feb 16, 2020 · 3 comments

Comments

@mikroskeem
Copy link

mikroskeem commented Feb 16, 2020

Reproduction is pretty simple:

  1. Get yourself an InputStream (in my case, one from Minio SDK when getting an object from the storage - which originates from OkHttp)
  2. Pass it into response body {:body my-is}
  3. Make client close connection prematurely (uh-oh, binary file being downloaded with curl into the terminal - by default curl does not allow that)
  4. Aleph closes InputStream in different thread

I run into following exception (OkHttp, or rather okio which backs said stream does not support that use-case):

{host 127.0.0.1:7000, user-agent curl/7.68.0, accept */*}
read3		Thread[manifold-wait-6,5,main]
available	Thread[manifold-wait-6,5,main]
read3		Thread[manifold-wait-6,5,main]
available	Thread[manifold-wait-6,5,main]
read3		Thread[manifold-wait-6,5,main]
read3		Thread[manifold-wait-6,5,main]
available	Thread[manifold-wait-6,5,main]
read3		Thread[manifold-wait-6,5,main]
close		Thread[aleph-netty-server-event-pool-23,5,main]
Feb 16, 2020 8:45:12 PM manifold.utils invoke
SEVERE: error in invoke-callbacks
java.lang.IllegalStateException: Unbalanced enter/exit
	at okio.AsyncTimeout.enter(AsyncTimeout.java:73)
	at okio.AsyncTimeout$2.read(AsyncTimeout.java:235)
	at okio.RealBufferedSource.read(RealBufferedSource.java:51)
	at okhttp3.internal.http1.Http1Codec$AbstractSource.read(Http1Codec.java:374)
	at okhttp3.internal.http1.Http1Codec$FixedLengthSource.read(Http1Codec.java:418)
	at okhttp3.internal.Util.skipAll(Util.java:204)
	at okhttp3.internal.Util.discard(Util.java:186)
	at okhttp3.internal.http1.Http1Codec$FixedLengthSource.close(Http1Codec.java:435)
	at okio.RealBufferedSource.close(RealBufferedSource.java:476)
	at okio.RealBufferedSource$1.close(RealBufferedSource.java:460)
	at java.base/jdk.internal.reflect.GeneratedMethodAccessor14.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:167)
	at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:438)
	at cu.resourcepack_server.core$fn__28582$fn__28595$fn__28605.invoke(form-init15695820198716781853.clj:36)
	at cu.resourcepack_server.core.proxy$java.io.InputStream$ff19274a.close(Unknown Source)
	at aleph.http.core$send_streaming_body$fn__15872.invoke(core.clj:324)
	at manifold.utils$invoke_callbacks$fn__1143.invoke(utils.clj:69)
	at manifold.utils$invoke_callbacks.invokeStatic(utils.clj:68)
	at manifold.utils$invoke_callbacks.invoke(utils.clj:65)
	at aleph.netty.ChannelSink.markClosed(netty.clj:344)
	at aleph.netty.ChannelSink.close(netty.clj:352)
	at aleph.netty$sink$fn__15376.invoke(netty.clj:407)
	at manifold.deferred$eval1788$chain_SINGLEQUOTE____1809.invoke(deferred.clj:749)
	at manifold.deferred$eval1788$subscribe__1789$fn__1794.invoke(deferred.clj:715)
	at manifold.deferred.Listener.onSuccess(deferred.clj:219)
	at manifold.deferred.Deferred$fn__1634.invoke(deferred.clj:398)
	at manifold.deferred.Deferred.success(deferred.clj:398)
	at manifold.deferred$success_BANG_.invokeStatic(deferred.clj:243)
	at manifold.deferred$success_BANG_.invoke(deferred.clj:240)
	at aleph.netty$wrap_future$reify__15320.operationComplete(netty.clj:218)
	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
	at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
	at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
	at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
	at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:103)
	at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)
	at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1152)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:768)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:744)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:615)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.closeOnRead(AbstractNioByteChannel.java:105)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:171)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:677)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:612)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:529)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:491)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
	at manifold.executor$thread_factory$reify__1009$f__1010.invoke(executor.clj:47)
	at clojure.lang.AFn.run(AFn.java:22)
	at java.base/java.lang.Thread.run(Thread.java:834)

available	Thread[manifold-wait-6,5,main]
read3		Thread[manifold-wait-6,5,main]
close		Thread[manifold-wait-6,5,main]
close		Thread[manifold-wait-6,5,main]

Body + proxy class which delegates to the original stream (for debugging):

           {:status 200
            :body (proxy [java.io.InputStream] []
                    (read
                      ([]
                       (a/put! log-ch (str "read0\t\t" (Thread/currentThread)))
                       (.read stream))
                      ([^bytes b]
                       (a/put! log-ch (str "read1\t\t" (Thread/currentThread)))
                       (.read stream b))
                      ([^bytes b off len]
                       (a/put! log-ch (str "read3\t\t" (Thread/currentThread)))
                       (.read stream b off len)))
                    (available []
                      (a/put! log-ch (str "available\t" (Thread/currentThread)))
                      (.available stream))                    
                    (skip [^long l]
                      (a/put! log-ch (str "skip\t\t" (Thread/currentThread)))
                      (.skip stream l))
                    (close []
                      (a/put! log-ch (str "close\t\t" (Thread/currentThread)))
                      (.close stream)))
            :headers {"Content-Type" "application/zip"}}

Also simple logging snippet which works ok with the test scenario:

(def log-ch
  (a/chan))

(a/go-loop [m (a/<! log-ch)]
  (when m
    (println m)
    (recur (a/<! log-ch))))

I think that this is not right

@kachayev
Copy link
Collaborator

Similar issue was described in #454. It's unclear if Netty provides a good way of dealing with the situation like this, at least I didn't find any (the issue in Netty repo is open since 2017).

@mikroskeem
Copy link
Author

So it starts from the Netty... bummer.

@kachayev
Copy link
Collaborator

There are multiple things happening here. As far as InputStream has inherently blocking API, the code that fills-in data from network and the code that reads it have to run on different threads. I think an exception thrown on async thread pool could be captured and propagated back to InputStream thread to be re-thrown there... but it would require some jiggling. I'm testing a few improvements to error handling, will see if I can cover this specific use case properly. Thanks for the report!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants