Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: Complete event not sent for SSE Subscriptions #1435

Open
yogeshdengle opened this issue Feb 23, 2023 · 2 comments
Open

bug: Complete event not sent for SSE Subscriptions #1435

yogeshdengle opened this issue Feb 23, 2023 · 2 comments
Labels
bug Something isn't working

Comments

@yogeshdengle
Copy link

yogeshdengle commented Feb 23, 2023

Expected behavior

For SSE Subscriptions on the complete of the events the server sends a 'complete' event.
Currently once the Publisher sent to the DGS framework as part of the SSE Subscription finishes there is no indication on the subscription that the server is done sending the data.

Actual behavior

the graphql-sse library (https://github.com/enisdenjo/graphql-sse) which is very commonly used in the graphql ecosystem expects a 'complete' event to be sent to indicate that the server is done sending events.
The spec is outlined here:
https://github.com/graphql/graphql-over-http/blob/d51ae80d62b5fd8802a3383793f01bdf306e8290/rfcs/GraphQLOverSSE.md#complete-event-1

This causes the library to retry the subscription as it think it disconnected from the server. The "complete" event is used by the library to understand that this is a clean closure of the connection and not something unintended.

Steps to reproduce

Create a subscription that has a publisher that just sends out 3 events. Expectation is that after the 3 events we get a "complete" event.

@Ancient-Dragon
Copy link
Contributor

Ancient-Dragon commented Apr 13, 2023

@yogeshdengle if you would like a java implementation (and can't upgrade to spring-boot 3 straight away you can just use this rest controller and remove the dgs sse package. (FYI this was my testing code so probably has some unnecessary imports and logs etc)

package com.example.demo.services;

import com.example.demo.models.GraphqlServerSentEvent;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.graphql.dgs.DgsQueryExecutor;
import com.netflix.graphql.types.subscription.Error;
import com.netflix.graphql.types.subscription.QueryPayload;
import com.netflix.graphql.types.subscription.SSEDataPayload;
import graphql.ExecutionResult;
import graphql.InvalidSyntaxError;
import graphql.language.Document;
import graphql.language.OperationDefinition;
import graphql.parser.InvalidSyntaxException;
import graphql.parser.Parser;
import graphql.validation.ValidationError;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerErrorException;
import org.springframework.web.server.ServerWebInputException;
import org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;

@RestController
public class DgsSSEControllerOriginal {

    DgsQueryExecutor dgsQueryExecutor;
    @Value("${dgs.graphql.sse.pollPeriod:12000}")
    Long pollPeriod;
    private Logger logger = LoggerFactory.getLogger(getClass().getName());
    private final ObjectMapper mapper = new ObjectMapper();
    private static final String NEXT_EVENT = "next";
    private static final String COMPLETE_EVENT = "complete";

    @Autowired
    public DgsSSEControllerOriginal(DgsQueryExecutor dgsQueryExecutor) {
        this.dgsQueryExecutor = dgsQueryExecutor;
    }

    @PostMapping(path = "${dgs.graphql.sse.path:/subscriptions}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<String>> handlePost(@RequestBody String body) {
        try {
            return handleSubscription(body);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private Flux<ServerSentEvent<String>> handleSubscription(String query) throws IOException {
        QueryPayload queryPayload;
        try {
            queryPayload = mapper.readValue(query, QueryPayload.class);
        } catch (Exception ex) {
            throw new ServerWebInputException("Error parsing query: " + ex.getMessage());
        }

        if (!isSubscriptionQuery(queryPayload.getQuery())) {
            throw new ServerWebInputException("Invalid query. operation type is not a subscription");
        }

        ExecutionResult executionResult = dgsQueryExecutor.execute(queryPayload.getQuery(), queryPayload.getVariables());
        if (!executionResult.getErrors().isEmpty()) {
            String errorMessage;
            if (executionResult.getErrors().stream().anyMatch(error -> error instanceof ValidationError || error instanceof InvalidSyntaxError)) {
                errorMessage = "Subscription query failed to validate: " + executionResult.getErrors().stream().map(Object::toString).collect(Collectors.joining());
            } else {
                errorMessage = "Error executing subscription query: " + executionResult.getErrors().stream().map(Object::toString).collect(Collectors.joining());
            }
            logger.error(errorMessage);
            throw new ServerWebInputException(errorMessage);
        }

        Publisher<ExecutionResult> publisher;
        try {
            publisher = executionResult.getData();
        } catch (ClassCastException exc) {
            logger.error("Invalid return type for subscription datafetcher. A subscription datafetcher must return a Publisher<ExecutionResult>. The query was {}", query, exc);
            throw new ServerErrorException("Invalid return type for subscription datafetcher. Was a non-subscription query send to the subscription endpoint?", exc);
        }

        String subscriptionId = queryPayload.getKey().isEmpty() ? UUID.randomUUID().toString() : queryPayload.getKey();
        Flux<ServerSentEvent<String>> resultPublisher = Flux.from(publisher)
                .map(it -> {
                    SSEDataPayload payload = new SSEDataPayload(it.getData(), it.getErrors(), subscriptionId, "SUBSCRIPTION_DATA");
                    try {
                        return ServerSentEvent.builder(mapper.writeValueAsString(payload)).id(UUID.randomUUID().toString()).event(NEXT_EVENT).build();
                    } catch (JsonProcessingException e) {
                        throw new RuntimeException(e);
                    }
                })
                .onErrorResume(exc -> {
                    logger.warn("An exception occurred on subscription {}", subscriptionId, exc);
                    String errorMessage = exc.getMessage() != null ? exc.getMessage() : "An exception occurred";
                    SSEDataPayload payload = new SSEDataPayload(null, Collections.singletonList(new Error(errorMessage)), subscriptionId, "SUBSCRIPTION_DATA");
                    try {
                        return Flux.just(
                                ServerSentEvent.builder(mapper.writeValueAsString(payload)).id(UUID.randomUUID().toString()).event(NEXT_EVENT).build()
                        );
                    } catch (JsonProcessingException e) {
                        throw new RuntimeException(e);
                    }
                });

        Sinks.Many<ServerSentEvent<String>> sink = Sinks.many().unicast().onBackpressureBuffer();
        List<Disposable> disposables = new ArrayList<>();
        Disposable dis = resultPublisher.doOnNext(sink::tryEmitNext)
                .doFinally(signalType -> {
                    sink.tryEmitNext(ServerSentEvent.builder("").id(UUID.randomUUID().toString()).event(COMPLETE_EVENT).build());
                    sink.tryEmitComplete();
                })
                .subscribeOn(Schedulers.boundedElastic())
                .subscribe();
        disposables.add(dis);
        if (pollPeriod != 0) {
            Disposable poller = Flux.interval(Duration.ZERO, Duration.ofMillis(pollPeriod))
                    .map(l -> {
                        sink.tryEmitNext(ServerSentEvent.builder("").data(null).comment("").build());
                        return l;
                    })
                    .subscribeOn(Schedulers.boundedElastic())
                    .subscribe();
            disposables.add(poller);
        }
        return sink.asFlux().doOnNext(it -> logger.info("sending data: {}", it)).doFinally((it) -> disposables.forEach(Disposable::dispose));
    }

    private boolean isSubscriptionQuery(String query) {
        Document document;
        try {
            document = new Parser().parseDocument(query);
        } catch (InvalidSyntaxException exc) {
            return false;
        }
        List<OperationDefinition> definitions = document.getDefinitionsOfType(OperationDefinition.class);
        return !definitions.isEmpty() && definitions.stream().allMatch(def -> def.getOperation() == OperationDefinition.Operation.SUBSCRIPTION);
    }

}

@yogeshdengle
Copy link
Author

Thank you, we are planning a Spring boot 3 upgrade and will use it from there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants