Skip to content

Latest commit

 

History

History
465 lines (323 loc) · 17 KB

3.WebSockets.md

File metadata and controls

465 lines (323 loc) · 17 KB

参考文档的这一部分涵盖了对反应式堆栈 WebSocket 消息传递的支持。

3.1. 介绍WebSocket

WebSocket 协议 RFC 6455 提供了一种标准化的方法来建立客户端和服务器之间通过单个 TCP 连接的全双工、双向通信通道。 这是一个与HTTP不同的TCP协议,但旨在通过 HTTP 工作,使用端口 80 和 443,并允许重新使用现有的防火墙规则。

WebSocket 交互以 HTTP 请求开始,使用 HTTP Upgrade 首部进行升级,或者在这种情况下,切换到 WebSocket 协议。 以下示例显示了这样的交互:

GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket ①
Connection: Upgrade ②
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: http://localhost:8080

Upgrade

② 使用Upgrade连接

并非200响应状态码,支持WebSocket协议的服务端返回类似输出,如下:

HTTP/1.1 101 Switching Protocols ①
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp

① 协议切换

成功握手后,HTTP 升级请求底层的 TCP 套接字对客户端和客户端都保持打开状态继续发送和接收消息。

对 WebSockets 工作原理的完整介绍超出了本文档的范围。 请参阅 RFC 6455,WebSocket 章节HTML5 或互联网上的介绍和教程。

请注意,如果 WebSocket 服务器在 Web 服务器(例如 nginx)后面运行,则你可能需要将其配置为允许WebSocket 升级请求通过并到达 WebSocket 服务器。 同样,如果应用程序在云环境中运行,请检查与 WebSocket 支持相关的云提供商的说明。

3.1.1 HTTP Versus WebSocket

尽管 WebSocket 被设计为与 HTTP 兼容并以 HTTP 请求开始,但是这两种协议导致了非常不同的架构和应用程序编程模型。

在 HTTP 和 REST 中,一个应用程序被建模为多个 URL。为了与应用程序交互,客户端访问这些 URL,请求-响应风格。 服务器根据 HTTP URL、方法和首部将请求路由到适当的处理程序。

相比之下,在 WebSockets 中,通常只有一个 URL 用于初始连接。随后,所有消息在同一个 TCP 连接上流动。 这指向一个完全不同的异步、事件驱动、消息传递的架构。

WebSocket 也是一种低级传输协议,与 HTTP 不同,它不为消息的内容规定任何语义。 这意味着除非客户端和服务器就消息语义达成一致,否则无法路由或处理消息。

WebSocket 客户端和服务器可以协商使用更高级别的消息传递协议(例如,STOMP),通过HTTP 握手请求中的 Sec-WebSocket-Protocol 首部。 如果不使用这种模式,他们需要使用他们自己的会话协议。

3.1.2. 什么时候使用WebSockets

WebSockets 可以使网页具有动态性和交互性。但是,在很多情况下,Ajax 和 HTTP 流的组合或长轮询可以提供简单有效的解决方案。

例如,新闻、邮件和社交提要需要动态更新,但每隔几分钟更新可能完全没问题。 另一方面,协作、游戏和金融应用程序需要更接近实时。

延迟本身并不是决定性因素。 如果消息量比较小(比如监控网络失败)HTTP 流或轮询可以提供有效的解决方案。 低延迟、高频率和高容量是使用 WebSocket 的最佳案例。

还要记住,在 Internet 上,不受您控制的限制性代理可能会阻止 WebSocket交互,要么是因为它们未配置为传递 Upgrade 标头,要么是因为它们关闭了看起来空闲的长连接。 这意味着在防火墙内对内部应用程序使用 WebSocket 是一种比面向公众的应用程序更直接的决定。

3.2. WebSocket API

Same as in the Servlet stack

Spring Framework 提供了一个 WebSocket API,你可以使用它来编写客户端和服务器端应用程序来处理WebSocket 消息。

3.2.1. Server

Same as in the Servlet stack

要创建 WebSocket 服务器,你可以先创建一个 WebSocketHandler。 以下示例显示了如何执行此操作:

Java.

import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;

public class MyWebSocketHandler implements WebSocketHandler {

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        // ...
    }
}

Kotlin.

import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession

class MyWebSocketHandler : WebSocketHandler {

    override fun handle(session: WebSocketSession): Mono<Void> {
        // ...
    }
}

然后你把它映射到一个URL:

Java.

@Configuration
class WebConfig {

    @Bean
    public HandlerMapping handlerMapping() {
        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/path", new MyWebSocketHandler());
        int order = -1; // before annotated controllers

        return new SimpleUrlHandlerMapping(map, order);
    }
}

Kotlin.

@Configuration
class WebConfig {

    @Bean
    fun handlerMapping(): HandlerMapping {
        val map = mapOf("/path" to MyWebSocketHandler())
        val order = -1 // before annotated controllers

        return SimpleUrlHandlerMapping(map, order)
    }
}

如果使用 WebFlux Config 这样就可以了。 否则如果不使用 WebFlux ,你需要声明一个 WebSocketHandlerAdapter,如下所示:

Java.

@Configuration
class WebConfig {

    // ...

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

Kotlin.

@Configuration
class WebConfig {

    // ...

    @Bean
    fun handlerAdapter() =  WebSocketHandlerAdapter()
}

3.2.2. WebSocketHandler

WebSocketHandlerhandle 方法接受 WebSocketSession 并返回 Mono<Void> 以指示会话的应用程序处理何时完成。 会话通过两个流处理,一个用于入站消息,另一个用于出站消息。 下表描述了处理流的两种方法:

WebSocketSession 方法 描述
Flux<WebSocketMessage> receive() 提供对入站消息流的访问,并在连接关闭时完成。
Mono<Void> send(Publisher<WebSocketMessage>) 获取传出消息的源,写入消息,并返回一个 Mono<Void>,在源完成并写入完成时完成。

WebSocketHandler 必须将入站和出站流组合成一个统一的流,并返回一个反映该流完成的 Mono<Void>。 根据应用程序要求,统一的流将在以下情况下完成:

  • 入站或出站消息流完成。

  • 入站流完成(即连接关闭),而出站流是无限的。

  • 在选定的点,通过 WebSocketSessionclose 方法。

当入站和出站消息流组合在一起时,无需检查连接是否打开,因为 Reactive Streams 会给出指示。 入站流接收完成或错误信号,出站流接收取消信号。

一个handler最基本的实现是处理入站流。 下面的示例展示了一个实现:

Java.

class ExampleHandler implements WebSocketHandler {

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        return session.receive()            ①
                .doOnNext(message -> {
                    // ...                  ②
                })
                .concatMap(message -> {
                    // ...                  ③
                })
                .then();                    ④
    }
}

① 访问入站消息流。

② 针对每一个消息做一些事情。

③ 使用消息内容做一些嵌套的异步操作。

④ 返回一个 Mono<Void> 在接收完成的时候完成。

Kotlin.

class ExampleHandler : WebSocketHandler {

    override fun handle(session: WebSocketSession): Mono<Void> {
        return session.receive()          ①  
                .doOnNext {
                    // ...                ②  
                }
                .concatMap {
                    // ...                ③   
                }
                .then()                   ④ 
    }
}

① 访问入站消息流。

② 针对每一个消息做一些事情。

③ 使用消息内容做一些嵌套的异步操作。

④ 返回一个 Mono<Void> 在接收完成的时候完成。

提示

对于嵌套的异步操作,你需要在使用池化数据缓冲区的底层服务器(例如 Netty)上调用 message.retain()。 否则,数据缓冲区可能会在你有机会读取数据之前被释放。 有关更多背景信息,请参阅 数据缓冲区和编解码器

以下实现结合了入站和出站流:

Java.

class ExampleHandler implements WebSocketHandler {

    @Override
    public Mono<Void> handle(WebSocketSession session) {

        Flux<WebSocketMessage> output = session.receive()               ①
                .doOnNext(message -> {
                    // ...
                })
                .concatMap(message -> {
                    // ...
                })
                .map(value -> session.textMessage("Echo " + value));    ②

        return session.send(output);                                    ③
    }
}

① 处理入站消息流。

② 创建出站消息,产生一个组合的流。

③ 返回一个Mono<Void>直到我们停止接收消息。

Kotlin.

class ExampleHandler : WebSocketHandler {

    override fun handle(session: WebSocketSession): Mono<Void> {

        val output = session.receive()                      ①
                .doOnNext {
                    // ...
                }
                .concatMap {
                    // ...
                }
                .map { session.textMessage("Echo $it") }    ②

        return session.send(output)                         ③
    }
}

① 处理入站消息流。

② 创建出站消息,产生一个组合的流。

③ 返回一个Mono<Void>直到我们停止接收消息。

入站和出站流可以是独立的,并且仅在完成时进行join,如以下示例所示:

Java.

class ExampleHandler implements WebSocketHandler {

    @Override
    public Mono<Void> handle(WebSocketSession session) {

        Mono<Void> input = session.receive()                                    ①
                .doOnNext(message -> {
                    // ...
                })
                .concatMap(message -> {
                    // ...
                })
                .then();

        Flux<String> source = ... ;
        Mono<Void> output = session.send(source.map(session::textMessage));     ②

        return Mono.zip(input, output).then();                                  ③
    }
}

① 处理入站消息流。

② 发送出站消息。

③ 将两个流进行join操作并返回一个 Mono<Void>,在任一流结束时完成。

Kotlin.

class ExampleHandler : WebSocketHandler {

    override fun handle(session: WebSocketSession): Mono<Void> {

        val input = session.receive()                                   ①
                .doOnNext {
                    // ...
                }
                .concatMap {
                    // ...
                }
                .then()

        val source: Flux<String> = ...
        val output = session.send(source.map(session::textMessage))     ②

        return Mono.zip(input, output).then()                           ③
    }
}

① 处理入站消息流。

② 发送出站消息。

③ 将两个流进行join操作并返回一个 Mono<Void>,在任一流结束时完成。

3.2.3. DataBuffer

DataBuffer 是 WebFlux 中字节缓冲区的表示。 参考指南的 Spring Core 部分 Data Buffers and Codecs 有更多相关内容。 需要理解的关键点是,在一些像 Netty 这样的服务器上,字节缓冲区被池化和引用计数,并且必须在消耗时释放以避免内存泄漏。

在 Netty 上运行时,如果应用程序希望保留输入数据缓冲区以确保它们不被释放,则必须使用 DataBufferUtils.retain(dataBuffer) ,然后在消耗缓冲区时使用 DataBufferUtils.release(dataBuffer)

3.2.4. Handshake

Same as in the Servlet stack

WebSocketHandlerAdapter 委托给一个 WebSocketService。 默认情况下,这是一个 HandshakeWebSocketService 的实例,它对 WebSocket 请求执行基本检查,然后执行 RequestUpgradeStrategy。 目前, Reactor Netty、Tomcat、Jetty 和 Undertow 内置支持这些功能。

HandshakeWebSocketService 暴露了一个 sessionAttributePredicate 属性,该属性允许设置一个 Predicate<String> 以从 WebSession 中提取属性并将它们插入到 WebSocketSession 的属性中。

3.2.5. Server Configation

Same as in the Servlet stack

每个服务器的 RequestUpgradeStrategy 暴露了特定于底层 WebSocket 服务引擎的配置。 使用 WebFlux Java 配置时,您可以自定义某些属性,如 WebFlux 配置 的相应部分所展示的,否则如果不使用 WebFlux 配置,可以使用以下代码的方式来配置:

Java.

@Configuration
class WebConfig {

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter(webSocketService());
    }

    @Bean
    public WebSocketService webSocketService() {
        TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
        strategy.setMaxSessionIdleTimeout(0L);
        return new HandshakeWebSocketService(strategy);
    }
}

Kotlin.

@Configuration
class WebConfig {

    @Bean
    fun handlerAdapter() =
            WebSocketHandlerAdapter(webSocketService())

    @Bean
    fun webSocketService(): WebSocketService {
        val strategy = TomcatRequestUpgradeStrategy().apply {
            setMaxSessionIdleTimeout(0L)
        }
        return HandshakeWebSocketService(strategy)
    }
}

检查你的服务器的升级策略以查看可用选项。 目前,只有 Tomcat 和 Jetty 暴露了这些选项。

3.2.6. CORS

Same as in the Servlet stack

配置 CORS 并限制对 WebSocket 端点的访问的最简单方法是让你的 WebSocketHandler 实现 CorsConfigurationSource 并返回一个包含允许的来源、首部和其他详细信息的 CorsConfiguration 。 如果你不能这样做,你还可以在 SimpleUrlHandler 上设置 corsConfigurations 属性以通过 URL 模式指定 CORS 设置。 如果两者都被指定,它们将通过使用 CorsConfiguration 上的 combine 方法进行组合。

3.2.7. Client

Spring WebFlux 提供了一个 WebSocketClient 抽象,其中包含 Reactor Netty、Tomcat、Jetty、Undertow 和标准 Java(即 JSR-356)的实现。

注意

Tomcat 客户端实际上是标准 Java 客户端的扩展,在 WebSocketSession 处理时添加了一些额外的功能,以利用 Tomcat 特定的 API 来暂停接收消息以实现回压机制。

要启动 WebSocket 会话,您可以创建客户端的实例并使用其 execute 方法:

Java.

WebSocketClient client = new ReactorNettyWebSocketClient();

URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->
        session.receive()
                .doOnNext(System.out::println)
                .then());

Kotlin.

val client = ReactorNettyWebSocketClient()

        val url = URI("ws://localhost:8080/path")
        client.execute(url) { session ->
            session.receive()
                    .doOnNext(::println)
            .then()
        }

一些客户端,比如 Jetty,实现了 Lifecycle接口,需要先停止再启动才能使用。 所有客户端都有与底层 WebSocket 客户端配置相关的构造函数选项。