New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
bug: Complete event not sent for SSE Subscriptions #1435
Labels
bug
Something isn't working
Comments
This was referenced Mar 30, 2023
@yogeshdengle if you would like a java implementation (and can't upgrade to spring-boot 3 straight away you can just use this rest controller and remove the dgs sse package. (FYI this was my testing code so probably has some unnecessary imports and logs etc) package com.example.demo.services;
import com.example.demo.models.GraphqlServerSentEvent;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.graphql.dgs.DgsQueryExecutor;
import com.netflix.graphql.types.subscription.Error;
import com.netflix.graphql.types.subscription.QueryPayload;
import com.netflix.graphql.types.subscription.SSEDataPayload;
import graphql.ExecutionResult;
import graphql.InvalidSyntaxError;
import graphql.language.Document;
import graphql.language.OperationDefinition;
import graphql.parser.InvalidSyntaxException;
import graphql.parser.Parser;
import graphql.validation.ValidationError;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerErrorException;
import org.springframework.web.server.ServerWebInputException;
import org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
@RestController
public class DgsSSEControllerOriginal {
DgsQueryExecutor dgsQueryExecutor;
@Value("${dgs.graphql.sse.pollPeriod:12000}")
Long pollPeriod;
private Logger logger = LoggerFactory.getLogger(getClass().getName());
private final ObjectMapper mapper = new ObjectMapper();
private static final String NEXT_EVENT = "next";
private static final String COMPLETE_EVENT = "complete";
@Autowired
public DgsSSEControllerOriginal(DgsQueryExecutor dgsQueryExecutor) {
this.dgsQueryExecutor = dgsQueryExecutor;
}
@PostMapping(path = "${dgs.graphql.sse.path:/subscriptions}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> handlePost(@RequestBody String body) {
try {
return handleSubscription(body);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private Flux<ServerSentEvent<String>> handleSubscription(String query) throws IOException {
QueryPayload queryPayload;
try {
queryPayload = mapper.readValue(query, QueryPayload.class);
} catch (Exception ex) {
throw new ServerWebInputException("Error parsing query: " + ex.getMessage());
}
if (!isSubscriptionQuery(queryPayload.getQuery())) {
throw new ServerWebInputException("Invalid query. operation type is not a subscription");
}
ExecutionResult executionResult = dgsQueryExecutor.execute(queryPayload.getQuery(), queryPayload.getVariables());
if (!executionResult.getErrors().isEmpty()) {
String errorMessage;
if (executionResult.getErrors().stream().anyMatch(error -> error instanceof ValidationError || error instanceof InvalidSyntaxError)) {
errorMessage = "Subscription query failed to validate: " + executionResult.getErrors().stream().map(Object::toString).collect(Collectors.joining());
} else {
errorMessage = "Error executing subscription query: " + executionResult.getErrors().stream().map(Object::toString).collect(Collectors.joining());
}
logger.error(errorMessage);
throw new ServerWebInputException(errorMessage);
}
Publisher<ExecutionResult> publisher;
try {
publisher = executionResult.getData();
} catch (ClassCastException exc) {
logger.error("Invalid return type for subscription datafetcher. A subscription datafetcher must return a Publisher<ExecutionResult>. The query was {}", query, exc);
throw new ServerErrorException("Invalid return type for subscription datafetcher. Was a non-subscription query send to the subscription endpoint?", exc);
}
String subscriptionId = queryPayload.getKey().isEmpty() ? UUID.randomUUID().toString() : queryPayload.getKey();
Flux<ServerSentEvent<String>> resultPublisher = Flux.from(publisher)
.map(it -> {
SSEDataPayload payload = new SSEDataPayload(it.getData(), it.getErrors(), subscriptionId, "SUBSCRIPTION_DATA");
try {
return ServerSentEvent.builder(mapper.writeValueAsString(payload)).id(UUID.randomUUID().toString()).event(NEXT_EVENT).build();
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
})
.onErrorResume(exc -> {
logger.warn("An exception occurred on subscription {}", subscriptionId, exc);
String errorMessage = exc.getMessage() != null ? exc.getMessage() : "An exception occurred";
SSEDataPayload payload = new SSEDataPayload(null, Collections.singletonList(new Error(errorMessage)), subscriptionId, "SUBSCRIPTION_DATA");
try {
return Flux.just(
ServerSentEvent.builder(mapper.writeValueAsString(payload)).id(UUID.randomUUID().toString()).event(NEXT_EVENT).build()
);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
});
Sinks.Many<ServerSentEvent<String>> sink = Sinks.many().unicast().onBackpressureBuffer();
List<Disposable> disposables = new ArrayList<>();
Disposable dis = resultPublisher.doOnNext(sink::tryEmitNext)
.doFinally(signalType -> {
sink.tryEmitNext(ServerSentEvent.builder("").id(UUID.randomUUID().toString()).event(COMPLETE_EVENT).build());
sink.tryEmitComplete();
})
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
disposables.add(dis);
if (pollPeriod != 0) {
Disposable poller = Flux.interval(Duration.ZERO, Duration.ofMillis(pollPeriod))
.map(l -> {
sink.tryEmitNext(ServerSentEvent.builder("").data(null).comment("").build());
return l;
})
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
disposables.add(poller);
}
return sink.asFlux().doOnNext(it -> logger.info("sending data: {}", it)).doFinally((it) -> disposables.forEach(Disposable::dispose));
}
private boolean isSubscriptionQuery(String query) {
Document document;
try {
document = new Parser().parseDocument(query);
} catch (InvalidSyntaxException exc) {
return false;
}
List<OperationDefinition> definitions = document.getDefinitionsOfType(OperationDefinition.class);
return !definitions.isEmpty() && definitions.stream().allMatch(def -> def.getOperation() == OperationDefinition.Operation.SUBSCRIPTION);
}
}
|
Thank you, we are planning a Spring boot 3 upgrade and will use it from there. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Expected behavior
For SSE Subscriptions on the complete of the events the server sends a 'complete' event.
Currently once the Publisher sent to the DGS framework as part of the SSE Subscription finishes there is no indication on the subscription that the server is done sending the data.
Actual behavior
the graphql-sse library (https://github.com/enisdenjo/graphql-sse) which is very commonly used in the graphql ecosystem expects a 'complete' event to be sent to indicate that the server is done sending events.
The spec is outlined here:
https://github.com/graphql/graphql-over-http/blob/d51ae80d62b5fd8802a3383793f01bdf306e8290/rfcs/GraphQLOverSSE.md#complete-event-1
This causes the library to retry the subscription as it think it disconnected from the server. The "complete" event is used by the library to understand that this is a clean closure of the connection and not something unintended.
Steps to reproduce
Create a subscription that has a publisher that just sends out 3 events. Expectation is that after the 3 events we get a "complete" event.
The text was updated successfully, but these errors were encountered: