Skip to content

WebSockets Support: Tyrus Integration

Santiago Pericasgeertsen edited this page Oct 25, 2019 · 17 revisions

Introduction

This document describes a prototype integration of Tyrus and Helidon.

What is Tyrus?

Tyrus is the reference implementation of JSR 356: Java API for WebSocket. It provides both a client and a server API to communicate using WebSockets for Java. In addition, the JSR defines an SPI to integrate implementations with HTTP containers/connectors. This document focuses on the server API part of JSR 356.

A WebSocket connection starts as a normal HTTP connection that is subsequently upgraded to use WebSockets, essentially turning an HTTP connection into a plain TCP connection. JSR 356 provides an API to not only send and receive messages over a WebSocket connection but also an API to intercept an HTTP upgrade request and potentially alter its outcome --all JSR implementations provide a default upgrade handler.

Doesn't Netty already support WebSockets?

Yes, Netty has support for WebSockets. However, there is an overlap between this support in Netty and what Tyrus provides. For example, both the Netty's handlers and Tyrus provide support for protocol upgrades and both handle codecs for different types of messages.

In order to support JSR 356 in Helidon, we need to limit Netty's involvement in handling connections to a minimum and let Tyrus do its work. In particular, we need protocol upgrades to be handled by Tyrus (or the application handler) and for Netty to simply (and opaquely!) relay WebSocket messages to connection peers.

Connection Upgrades

A connection upgrade starts as a regular HTTP request with some special headers. As stated above, this request must be forwarded to Tyrus so that it can participate in the upgrade process: this is accomplished by setting up the regular Netty pipeline without any special handlers for WebSockets at this time.

The response to this initial upgrade request needs to be intercepted by Helidon in order to detect a successful upgrade, in which case, Netty's pipeline needs to be modified so that WebSocket messages can be relayed over the now upgraded connection. In particular, HTTP handlers in Netty's pipeline need to be replaced by handlers capable of delivering WebSocket messages.

Relaying Messages

After a connection is upgraded, WebSocket messages are allowed to flow over a full-duplex channel. These messages are intended to be processed by Tyrus and must not be interpreted by Netty. After updating the Netty pipeline accordingly, Helidon's implementation of Tyrus' SPI can register a Flow.Publisher for server-to-client messages and a Flow.Subscriber for client-to-server ones. This is the responsibility of the TyrusSupport class --akin to JerseySupport.

Any messages received by the registered Flow.Subscriber are forwarded to the SPI's ReadHandler for processing; conversely, any messages written to the SPI's Writer are forwarded to the registered Flow.Publisher for delivery.

Helidon DataChunk's

Publishers and subscribers in Helidon use the type DataChunk. In order to support JSR 356's CompletionHandler, DataChunk needs to be extended with a write listener. This listener must invoke the completion handler when the data is successfully written to the connection (or an error occurs). Each message sent from the server provides an instance of CompletionHandler --in fact, only when the handler is called is Tyrus allowed to re-use its byte buffers.

Outline of changes to DataChunk:

new DataChunk() {
    private boolean isReleased = false;
    private CompletableFuture<DataChunk> writeFuture;      // write listener

    // ...

    @Override
    public void writeFuture(CompletableFuture<DataChunk> writeFuture) {
        this.writeFuture = writeFuture;
    }

    @Override
    public Optional<CompletableFuture<DataChunk>> writeFuture() {
        return Optional.ofNullable(writeFuture);
    }
};

Sample WebSocket Handler

  webServer = WebServer.create(builder.build(),
                  Routing.builder().register("/tyrus",
                      TyrusSupport.builder().register(EchoEndpoint.class).build()));
@ServerEndpoint(
        value = "/echo",
        encoders = { UppercaseCodec.class },
        decoders = { UppercaseCodec.class },
        configurator = ServerConfigurator.class
)
public class EchoEndpoint {
    private static final Logger LOGGER = Logger.getLogger(
        EchoEndpoint.class.getName());

    @OnOpen
    public void onOpen(Session session) throws IOException {
        LOGGER.info("OnOpen called");
    }

    @OnMessage
    public void echo(Session session, String message) throws IOException {
        LOGGER.info("OnMessage called with '" + message + "'");
        session.getBasicRemote().sendText(message);
    }

    @OnError
    public void onError(Throwable t) {
        LOGGER.info("OnError called");
    }

    @OnClose
    public void onClose() {
        LOGGER.info("OnClose called");
    }
}

Sample Run with Helidon Prototype and Chrome

Unit Test in Java

https://github.com/spericas/helidon/blob/tyrus/webserver/tyrus/src/test/java/io/helidon/webserver/tyrus/TyrusSupportTest.java

Not Covered in Prototype

  • Sessions

  • WebSocket client API/SPI

  • Tracing

  • Security and TLS

Prototype Location