Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscription support #294

Open
wants to merge 32 commits into
base: main
Choose a base branch
from

Conversation

matthieu4294967296moineau
Copy link

@matthieu4294967296moineau matthieu4294967296moineau commented Sep 4, 2023

Added support for graphQL subscriptions.

  • Using graphql.NewClientUsingWebSocket(), the returned graphql.WebSocketClient will be able to subscribe to graphQL endpoints.
  • Implementation does not depend on a specific websocket package, but it is similar to github.com/gorilla/websocket (this package is also used to create a client in the tests).

I have:

  • Written a clear PR title and description (above)
  • Signed the Khan Academy CLA
  • Added tests covering my changes, if applicable
  • Included a link to the issue fixed, if applicable
  • Included documentation, for new features
  • Added an entry to the changelog

@StevenACoffman
Copy link
Member

Hi! In addition to your CEO, you also need to sign the CLA as yourself, since you are the one contributing the code. Thanks!

@matthieu4294967296moineau
Copy link
Author

Hi! In addition to your CEO, you also need to sign the CLA as yourself, since you are the one contributing the code. Thanks!

Done !

Copy link
Collaborator

@benjaminjkraft benjaminjkraft left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, thanks so much for adding this! On the whole it looks great, I think almost everything is integrated in the right places and in particular I'm very happy with how minimal the changes to generate are, which I hope will keep this very maintainable.

I made a bunch of small comments and things to tweak, including some probably-foolish questions since I haven't worked with GraphQL subscriptions much. I didn't really get a chance to review the core websocket logic yet, but I figured I'd post everything else now. If anyone else has more experience with websockets a review of those parts would be very helpful!

One other thing is I think this will need some documentation, for now the best place is probably to add something in FAQ.md as well as of course under features in the changelog!

@@ -1,7 +1,14 @@
// The query or mutation executed by {{.Name}}.
// The query, mutation or subscription executed by {{.Name}}.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// The query, mutation or subscription executed by {{.Name}}.
// The {{.Type}} executed by {{.Name}}.

(not new, but may as well improve)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines 5 to 11
{{if eq .Type "subscription"}}
{{if eq .Doc ""}}
// {{.Name}}
{{end -}}
//
// To close the connection, use [graphql.Client.CloseWebSocket()]
{{end -}}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do this in Go? I feel it gets confusing when we mix both.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -15,7 +22,7 @@ func {{.Name}}(
{{.GraphQLName}} {{.GoType.Reference}},
{{end -}}
{{end -}}
) (*{{.ResponseName}}, {{if .Config.Extensions -}}map[string]interface{},{{end}} error) {
) ({{if eq .Type "subscription"}}dataChan_ chan {{.Name}}WsResponse, errChan_ chan error,{{else}}data_ *{{.ResponseName}}, {{if .Config.Extensions -}}ext_ map[string]interface{},{{end}}{{end}} err error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
) ({{if eq .Type "subscription"}}dataChan_ chan {{.Name}}WsResponse, errChan_ chan error,{{else}}data_ *{{.ResponseName}}, {{if .Config.Extensions -}}ext_ map[string]interface{},{{end}}{{end}} err error) {
) ({{if eq .Type "subscription"}}dataChan_ chan {{.Name}}WsResponse, errChan_ chan error,{{else}}data_ *{{.ResponseName}}, {{if .Config.Extensions -}}ext_ map[string]interface{},{{end}}{{end}} err_ error) {

(marginally safer, in case someone has a var named err, I think, although I much hope they don't)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines 47 to 48
dataChan_ = make(chan {{.Name}}WsResponse, 1)
respChan_ := make(chan json.RawMessage, 1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is buffer of 1 here standard? are we gonna need to configure it?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used 99designs code as a model for the buffered channel.
I am not sure whether this is standard or if it is their implementation.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, after having read more of this, I think we shouldn't have buffers in any of the channels unless there's a clear reason. It's not clear to me whether it's the caller's obligation to call ReadMessage "regularly" (i.e. is it okay for an application blocked elsewhere to just not even try to read the next message for a while) -- but a buffer of 1 doesn't really help with that; the only way is for the caller to actually read from their channel regularly. Having a buffer just seems likely to make concurrency bugs harder to find, because there's a bit of extra slop in the system.

This also interacts with having multiple subscriptions on one connection. If you do, you have to be good about reading from both, since a pending message on one will block the other. A buffer of 1 papers over that for 2 subscriptions, but again, that just makes the bug harder to find. (Well, I guess it all depends if the buffer is per-subscription or per-connection.)

(BTW, the caller can configure the gorilla/websocket client's internal buffer size, as well as of course spawn their own goroutine to buffer the output from dataChan. It wouldn't surprise me if it's eventually useful for us to do the latter, but I'd prefer to have some use cases before we do, because it's not clear to me whether the buffer should go on respChan or dataChan and errChan. I think it's just respChan? But this gets back to the per-connection or per-subscription question.)

Errors error `json:"errors"`
}

func {{.Name}}ForwardData(dataChan_ chan {{.Name}}WsResponse, respChan_ chan json.RawMessage, errChan_ chan error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this take in and check the ctx? Or does the websocket client do that and then pass down to the errChan_?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes the webSocket client sends errors on the errChan_

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, poking at gorilla/websocket I think it doesn't check the ctx. That means we need to -- something like the suggestions in this issue: gorilla/websocket#733

@@ -126,3 +136,24 @@ func newRoundtripGetClient(t *testing.T, endpoint string) graphql.Client {
t: t,
}
}

type MyDialer struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking out loud: would it be useful to test against multiple clients here? e.g. x/net/websocket or nhooyr.io/websocket or whatever is up there on grank.io. Not a big deal if there are issues but would help prove out if the interface can actually support different clients.

(If x/net/websocket works out, we could also use that in the example (with a comment suggesting others) and keep the main go.mod a bit smaller.)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested with x/net/websocket and it works, I will add to the doc both ways: with x/net/websocket and with gorilla/websocket.
The package gorilla/websocket was in the go.mod because it was the client I used for the tests.
But now I can change and use x/net/websocket in the tests if you prefer, even if it adds golang.org/x/net as direct dependency, and golang.org/x/sys as indirect.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, examples are enough!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add this to example_test.go to make sure it stays up to date?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will replace the example with documentation, so no need for this now.

)

// Close codes defined in RFC 6455, section 11.7.
const (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can these, and below, be private? since hopefully the caller gets them from their client anyway.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines +12 to +19
webSocketMethod = "websocket"
webSocketTypeConnInit = "connection_init"
webSocketTypeConnAck = "connection_ack"
webSocketTypeSubscribe = "subscribe"
webSocketTypeNext = "next"
webSocketTypeError = "error"
webSocketTypeComplete = "complete"
websocketConnAckTimeOut = time.Second * 30
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

silly question, are these GraphQL things or websocket things or what? is there some spec or documentation we can link to that lists them? (are they things a client might need to configure in the future?)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        webSocketMethod         = "websocket"

this is just an identifier I chose to identify the websocket method from the http.MethodGet and http.MethodPost

	webSocketTypeConnInit   = "connection_init"
	webSocketTypeConnAck    = "connection_ack"
	webSocketTypeSubscribe  = "subscribe"
	webSocketTypeNext       = "next"
	webSocketTypeError      = "error"
	webSocketTypeComplete   = "complete"

these are graphQL constants, they are defined here

        websocketConnAckTimeOut = time.Second * 30

this is arbitrary, it could be added to a webSocket client configuration

Comment on lines 32 to 45
key := os.Getenv("GITHUB_TOKEN")
if key == "" {
err = fmt.Errorf("must set GITHUB_TOKEN=<github token>")
return
}

dialer := websocket.DefaultDialer
headers := http.Header{}
headers.Add("Authorization", "bearer "+key)

graphqlClient := graphql.NewClientUsingWebSocket(
"wss://api.github.com/graphql",
&MyDialer{Dialer: dialer},
headers,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, does the real GitHub API even have subscriptions? If not, I don't think this will work, right? We could either include a gqlgen server with the example, or just skip the full example in the repo entirely and provide the relevant snippets in the documentation. (Eventually maybe we should have many examples each with their own gqlgen server.)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the example thinking we could add a subscription on the server side.
But for now I will just replace it with documentation.

Copy link
Author

@matthieu4294967296moineau matthieu4294967296moineau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I answered to all the comments so far.
Either I directly pushed the fix/update, or I answered the comment when not sure what decision should be made.

What do you think of the way errors are returned ?
It might be a bit confusing to have 3 possibilities to receive errors:

  • respChan: where api errors are returned in the response
  • errChan: where webSocket errors are sent
  • err: initialization error

Copy link
Collaborator

@benjaminjkraft benjaminjkraft left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all the replies! I am learning so much about websockets and graphql-ws, and now I have a bunch more questions (sorry!).

The most important question, I realized -- shouldn't the client open a single websocket for all subscriptions rather than one per subscription? I don't know how many subscriptions the average client will have but whenever it's more than one I bet the server prefers that. It looks like that's what graphql-ws (used by apollo-client and thus the defacto standard) does.

Two ways to do this, I guess: we could do it at init (or as a separate method the caller is supposed to call first), or do it lazily. Probably lazily is less surprising? (Again that's what graphql-ws does.) Either way, we'll need to keep some plumbing to direct the right messages to the right channels based on their IDs. And we'll probably need separate close-subscription and close-client methods. (Or do we need close-client? I feel like http clients don't tend to come with those, even when they keep a connection pool. (Ok, looks like http.Transport has a CloseIdleConnections, which is semi-equivalent. Maybe a good idea to be safe.)

Or, we don't necessarily need to implement that now. But at a bare minimum I would like to make sure all the API we're adding has what we need. Maybe it mostly already does, but at least we should rename it to Subscribe since it won't necessarily dial, and figure out the semantics of the close methods.

.golangci.yml Outdated
@@ -49,6 +49,7 @@ linters-settings:
- golang.org/x/tools
- gopkg.in/yaml.v2
- github.com/alexflint/go-arg
- github.com/gorilla/websocket
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! In that case I think you can revert the changes to this file

@@ -26,6 +26,7 @@ When releasing a new version:

- The new `optional: generic` allows using a generic type to represent optionality. See the [documentation](genqlient.yaml) for details.
- For schemas with enum values that differ only in casing, it's now possible to disable smart-casing in genqlient.yaml; see the [documentation](genqlient.yaml) for `casing` for details.
- genqlient now supports subscriptions
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a link to the docs?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

var gqlResp graphql.Response
var wsResp {{.Name}}WsResponse
for {
jsonRaw, more_ := <-respChan_
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't really matter but no need for the underscores here (since no argument/variable names are user-controlled)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if headers == nil {
headers = http.Header{}
}
headers.Add("Sec-WebSocket-Protocol", "graphql-transport-ws")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's mention that this is the graphql-transport-ws protocol in the docs + changelog, just in case someone is still using one of the other less common ones.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As suggested here,
I replaced this with

if headers.Get("Sec-WebSocket-Protocol") == "" {
        headers.Add("Sec-WebSocket-Protocol", "graphql-transport-ws")
}

so we can define the protocol in the headers.

I added this to the doc + changelog.

@@ -126,3 +136,24 @@ func newRoundtripGetClient(t *testing.T, endpoint string) graphql.Client {
t: t,
}
}

type MyDialer struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, examples are enough!


// formatCloseMessage formats closeCode and text as a WebSocket close message.
// An empty message is returned for code CloseNoStatusReceived.
func formatCloseMessage(closeCode int, text string) []byte {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like text is unused? When would it be used? (Not a big deal either way since it's private.)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be used to send any custom message when closing the connection. But yes, for now, it is unused.
Tell me if you want it removed.

Comment on lines 47 to 48
dataChan_ = make(chan {{.Name}}WsResponse, 1)
respChan_ := make(chan json.RawMessage, 1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, after having read more of this, I think we shouldn't have buffers in any of the channels unless there's a clear reason. It's not clear to me whether it's the caller's obligation to call ReadMessage "regularly" (i.e. is it okay for an application blocked elsewhere to just not even try to read the next message for a while) -- but a buffer of 1 doesn't really help with that; the only way is for the caller to actually read from their channel regularly. Having a buffer just seems likely to make concurrency bugs harder to find, because there's a bit of extra slop in the system.

This also interacts with having multiple subscriptions on one connection. If you do, you have to be good about reading from both, since a pending message on one will block the other. A buffer of 1 papers over that for 2 subscriptions, but again, that just makes the bug harder to find. (Well, I guess it all depends if the buffer is per-subscription or per-connection.)

(BTW, the caller can configure the gorilla/websocket client's internal buffer size, as well as of course spawn their own goroutine to buffer the output from dataChan. It wouldn't surprise me if it's eventually useful for us to do the latter, but I'd prefer to have some use cases before we do, because it's not clear to me whether the buffer should go on respChan or dataChan and errChan. I think it's just respChan? But this gets back to the per-connection or per-subscription question.)

Errors error `json:"errors"`
}

func {{.Name}}ForwardData(dataChan_ chan {{.Name}}WsResponse, respChan_ chan json.RawMessage, errChan_ chan error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, poking at gorilla/websocket I think it doesn't check the ctx. That means we need to -- something like the suggestions in this issue: gorilla/websocket#733

return err
}

go w.listenWebSocket(respChan)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I guess we end up with two goroutines per subscription, one in the graphql.Client to wait on the websocket client and forward to respChan, and then one from the generated code that waits on respChan and forwards to dataChan and errChan. Can we avoid that? (Or at least, can we set up the interface so it's possible to avoid in the future, even if it's not worth the effort now?)

Comment on lines 50 to 52
connInit := webSocketSendMessage{
Type: webSocketTypeConnInit,
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So in our case we need to provide bearer token in connection_init message:

{
  "type": "connection_init",
  "payload": {
    "Authorization": "Bearer token"
  }
}

Would it be possible to provide extension point for this ?

if headers == nil {
headers = http.Header{}
}
headers.Add("Sec-WebSocket-Protocol", "graphql-transport-ws")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to force graphql-transport-ws sub-protocol? We are using graphql-ws in our case. Had to override this header in DialContext to make it work.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change made here

@matthieu4294967296moineau
Copy link
Author

According to your comments, I modified the code so that it is now possible to have several subscriptions using the same webSocket connection.
There is also now only 1 go routine that runs to listen the webSocket messages instead of 2 before.
I also made the minor changes listed in the previous comments.

@StevenACoffman StevenACoffman removed their request for review October 12, 2023 18:39
Copy link
Collaborator

@benjaminjkraft benjaminjkraft left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all the updates, and sorry for the huge delay in reviewing here! Finding the time to review bigger PRs has been tricky for me.

I think this is quite close -- most of my comments are on documentation plus a couple race conditions that are probably easy-enough to fix. (I'm not super worried about getting the implementation totally perfect there as long as the interface is workable.)

One high-level thing to look at and document in the interface is: who is supposed to close the data/error channels? Ideally we can avoid any race conditions like the unsubscribe issue, since pushing those things to callers gets pretty annoying.

@@ -91,7 +92,7 @@ issues:
# Ok to use fmt.Print in the examples, and in the CLI entrypoint.
- linters:
- forbidigo
path: ^example/|^generate/main\.go$
path: ^example/|^exampleSubscription/|^generate/main\.go$
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can revert these now

@@ -26,6 +26,8 @@ When releasing a new version:

- The new `optional: generic` allows using a generic type to represent optionality. See the [documentation](genqlient.yaml) for details.
- For schemas with enum values that differ only in casing, it's now possible to disable smart-casing in genqlient.yaml; see the [documentation](genqlient.yaml) for `casing` for details.
- genqlient now supports subscriptions; the websocket protocol is by default `graphql-transport-ws` but can be set to another value.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could put this at the top if you want :-)

Comment on lines +65 to +70
err := json.Unmarshal(jsonRawMsg, &gqlResp)
if err != nil {
return err
}
if len(gqlResp.Errors) == 0 {
err = json.Unmarshal(jsonRawMsg, &wsResp)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need the double-unmarshal here? If we just need to "peek" at the errors, we shouldn't unmarshal into an (inline?) struct with just errors, but even that requires parsing the entire JSON. Can we unmarshal straight into wsResp and just look at errors first, or does that cause some other trouble?

Comment on lines +363 to +365
if err != nil {
w.errChan <- err
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like it should return the error rather than sending on some channel which the caller may no longer be listening on. Or is there a reason not to do it that way?

Comment on lines +49 to +50
// errChan is a channel on which are sent the errors of webSocket
// communication.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only protocol-level errors, which we can't send to specific subscriptions because we don't even know which they go with, right?

Comment on lines +64 to +65
// interfaceChan is a channel used to send the data that arrives via the
// webSocket connection.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you document what type this would be? (pseudo-types or plain english is fine)

Edit: I guess really the answer is, it's the thing you pass to the forwardDataFunc, so you can just say that.

Comment on lines +53 to +57
StartWebSocket(ctx context.Context) (errChan chan error, err error)

// CloseWebSocket must close the webSocket connection. If no connection was
// started, CloseWebSocket is a no-op
CloseWebSocket()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could probably just call these Start and Close -- latter is semi-standard too

type client struct {
httpClient Doer
endpoint string
method string
}

type webSocketClient struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: probably clearer to move this + methods below to websocket.go (you can keep the interfaces here or not, either way)

if err != nil {
return err
}
w.subscriptions.Delete(subscriptionID)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then conversely here, if we receive more messages for this sub that were already inflight when we sent the complete, we'll drop them. I think probably we want to keep the subscription ID around (maybe clean up the channel and just no-op). Then we can just discard those instead of erroring.

(Or, we could discard without error on unknown subscription ID. I don't like the idea of dropping errors like that but it's hard to know what the caller would do with them; at best they're useful to find a bug in genqlient or in the server. But probably better to just make that unlikely; we can always make the error Is-able if it becomes an issue with some servers.)

if err != nil {
return "", err
}
w.subscriptions.Create(subscriptionID, interfaceChan, forwardDataFunc)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can end up in trouble (unknown subscription ID error + dropped message) if the first message from the websocket comes in before we add the subscription to the map. I think it suffices to add the subscription to the map first (then remove it on error).

@HaraldNordgren
Copy link

HaraldNordgren commented Apr 12, 2024

@matthieu4294967296moineau Any updates here? It would be great to have this available! 🤗

@its-a-feature
Copy link

Any progress here? This seems super cool and I love the functionality so far, but I need subscriptions to be possible before I can fully use it :(

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants