Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async invocations #16

Open
lispyclouds opened this issue Jan 20, 2020 · 12 comments
Open

async invocations #16

lispyclouds opened this issue Jan 20, 2020 · 12 comments
Labels
enhancement New feature or request

Comments

@lispyclouds
Copy link
Collaborator

would be nice to have an async version of invoke. its possible now with returning the response as a stream, but can be styled after aws-api to return a channel.

@lispyclouds lispyclouds added the enhancement New feature or request label Jan 20, 2020
@lispyclouds lispyclouds changed the title aysnc invocations async invocations Jan 20, 2020
@bharendt
Copy link
Contributor

We also need an async version of invoke, mainly to be able to cancel a stale or a long running request. I added it in this branch and could also add the ability to pass a channel. Currently it is possible to pass a function which is called with the response:

(invoke images {:op :ImageList
                :async #(clojure.pprint/pprint %)
                :params {:digests true}})

and invoke returns a okhttp3.Call object in the async case which can be canceled:

(let [call (docker/invoke events {:op    :SystemEvents
                                  :async #(try
                                           (stream-response ^InputStream %)
                                           (catch IOException e :request-canceled))                                             
                                  :as    :stream})]
  (Thread/sleep 3000)
  (.cancel call))

It would be easy also allow channels and to push just the response (or stream) to the channel, but it might be even better to push the data itself, if it is requested as stream. This would remove the need of always adding an own "read stream" implementation and would allow to pass a transducer to the channel, which already decodes the data, e.g. parses the frame headers of attached output streams or the json events of a docker event stream, e.g:

(let [chan (async/chan buf-size (clj-docker-client.core/output-stream-transducer))]
  (invoke containers {:op :ContainerAttach
                      :async chan
                      :params {:id container-id
                               :stream true
                               :stdout true
                               :stderr true}})
  (while ...
    (let [{:keys [stdout-type stdout-data]} (async/<!! chan)])))

This would require to pass the channel as option instead of returning it and still would allow to cancel the request any time.

@lispyclouds
Copy link
Collaborator Author

Thanks a lot for looking into this @bharendt , much appreciated! 😄

This looks pretty neat. I had one small suggestion of making it a bit more explicit by calling the param async-fn. Just async for me feels more like a flag. What do you think?

In the second case of passing the data to the channel, I was having doubts of how should we chunk the data? Should we do a readline or a readbytes in case of raw data or something else? Hence I was preferring to ask for the reader impl. But yes I'm open to ideas! 😄

@lispyclouds
Copy link
Collaborator Author

Also, in case of the :ContainerAttach the invoke would be :as :socket. This would not work with the chan approach right? As we have expose the raw hijacked sock ouside to allow for external bidirectional I/O.

@bharendt
Copy link
Contributor

@lispyclouds Thanks for you feedback. Being more explicit for the option sounds reasonable. Should we then allow async-fn and async-chan as optional arguments?

How to chunk the data in case of passing it directly to the channel depends on the kind of the stream. This would be handled by the specific transducer, which you would need to set explicitly to your channel that you pass. E.g. for an attatched output stream, the header contains the length of the frame, so this transducer would read the header and then read up to that length and pass that to the channel. In case of an event stream, the events are separated by new lines, to the transducer for an event stream channel would read up to a new line and then maybe pass the decoded json event. If no transducer is set to the channel, we would just pass as much as a .read returns to the channel.

If you pass :as :socket to an :ContainerAttach request, this socket would be passed as argument to async-fn - or to the channel, if we add that. Encoding / decoding using a transducer would not be possible in that case.

@lispyclouds
Copy link
Collaborator Author

async-fn and async-chan seems good to me!

When a :socket is requested, does it make sense to have it as an async call? I'm trying to think of a case where this would be valid as the control is completely to the consumer now. The exposed socket itself becomes the chan right?

@lispyclouds
Copy link
Collaborator Author

lispyclouds commented Feb 26, 2020

I'm totally on board with the transducer flow btw 😁

@bharendt
Copy link
Contributor

I'm totally on board with the trasnducer flow btw

👍 . I think it would simplify the access to the api quite much, because the (default) decoding (and even reading) could then be provided already by the clj-docker-client, by providing some default transducers - maybe also for synchronous calls.

When a :socket is requested, does it make sense to have it as an async call?

I think it would make sense. The (synchronous) call (which returns the socket) could block e.g. because of a network delay and that blocking could e.g. block or overload the go thread pool during that time (if it is called from there). Default network timeouts are very high.

The exposed socket itself becomes the chan right?

No, the socket would then be passed / pushed as value to the channel, similar to passing it as argument to the async-fn. This would then still allow to write to that socket, but when the actual request finished without blocking until then.

@lispyclouds
Copy link
Collaborator Author

lispyclouds commented Feb 26, 2020

Seems good to me.

About the default transducers, I'd prefer to have them as examples in the docs like log streaming than provide some utils which people may use. That way the control/opinions are external and this lib stays purpose built. What do you say?

Yes for the socket I meant it behaves like a chan.

I assume there's an incoming PR for this 😄, when thats in will spend good time to doc this behaviour.

@lispyclouds
Copy link
Collaborator Author

lispyclouds commented Feb 26, 2020

Also the thing to note is when a socket is exposed, the connection is still technically open after the control has returned to the caller.

@bharendt
Copy link
Contributor

True, the same connection can't be used during that time for other calls. But this applies also for long running API calls, like streaming docker events. Maybe it is an option to use a connection pool which automatically uses a new connection if the current one is still busy?

@lispyclouds
Copy link
Collaborator Author

Yeah that needs some thought on our side. Its a limitation on using UNIX sockets as a transport. See #20
Some hammock time on thinking of data multiplexing maybe needed here as OkHttp is kinda tricked into thinking this is a TCP socket where this could work.

@mikroskeem
Copy link

I'm personally wrapping everything into manifold deferred/streams currently for convenience, maybe could consider using that?

Many have problems with using non... "standard" library stuff however. Using core.async would make things already a lot more convenient 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants