参考文档的这一部分涵盖了对反应式堆栈 WebSocket 消息传递的支持。
WebSocket 协议 RFC 6455 提供了一种标准化的方法来建立客户端和服务器之间通过单个 TCP 连接的全双工、双向通信通道。 这是一个与HTTP不同的TCP协议,但旨在通过 HTTP 工作,使用端口 80 和 443,并允许重新使用现有的防火墙规则。
WebSocket 交互以 HTTP 请求开始,使用 HTTP Upgrade
首部进行升级,或者在这种情况下,切换到 WebSocket 协议。 以下示例显示了这样的交互:
GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket ①
Connection: Upgrade ②
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: http://localhost:8080
① Upgrade
头
② 使用Upgrade
连接
并非200响应状态码,支持WebSocket协议的服务端返回类似输出,如下:
HTTP/1.1 101 Switching Protocols ①
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp
① 协议切换
成功握手后,HTTP 升级请求底层的 TCP 套接字对客户端和客户端都保持打开状态继续发送和接收消息。
对 WebSockets 工作原理的完整介绍超出了本文档的范围。 请参阅 RFC 6455,WebSocket 章节HTML5 或互联网上的介绍和教程。
请注意,如果 WebSocket 服务器在 Web 服务器(例如 nginx)后面运行,则你可能需要将其配置为允许WebSocket 升级请求通过并到达 WebSocket 服务器。 同样,如果应用程序在云环境中运行,请检查与 WebSocket 支持相关的云提供商的说明。
尽管 WebSocket 被设计为与 HTTP 兼容并以 HTTP 请求开始,但是这两种协议导致了非常不同的架构和应用程序编程模型。
在 HTTP 和 REST 中,一个应用程序被建模为多个 URL。为了与应用程序交互,客户端访问这些 URL,请求-响应风格。 服务器根据 HTTP URL、方法和首部将请求路由到适当的处理程序。
相比之下,在 WebSockets 中,通常只有一个 URL 用于初始连接。随后,所有消息在同一个 TCP 连接上流动。 这指向一个完全不同的异步、事件驱动、消息传递的架构。
WebSocket 也是一种低级传输协议,与 HTTP 不同,它不为消息的内容规定任何语义。 这意味着除非客户端和服务器就消息语义达成一致,否则无法路由或处理消息。
WebSocket 客户端和服务器可以协商使用更高级别的消息传递协议(例如,STOMP),通过HTTP 握手请求中的 Sec-WebSocket-Protocol
首部。 如果不使用这种模式,他们需要使用他们自己的会话协议。
WebSockets 可以使网页具有动态性和交互性。但是,在很多情况下,Ajax 和 HTTP 流的组合或长轮询可以提供简单有效的解决方案。
例如,新闻、邮件和社交提要需要动态更新,但每隔几分钟更新可能完全没问题。 另一方面,协作、游戏和金融应用程序需要更接近实时。
延迟本身并不是决定性因素。 如果消息量比较小(比如监控网络失败)HTTP 流或轮询可以提供有效的解决方案。 低延迟、高频率和高容量是使用 WebSocket 的最佳案例。
还要记住,在 Internet 上,不受您控制的限制性代理可能会阻止 WebSocket交互,要么是因为它们未配置为传递 Upgrade
标头,要么是因为它们关闭了看起来空闲的长连接。 这意味着在防火墙内对内部应用程序使用 WebSocket 是一种比面向公众的应用程序更直接的决定。
Spring Framework 提供了一个 WebSocket API,你可以使用它来编写客户端和服务器端应用程序来处理WebSocket 消息。
要创建 WebSocket 服务器,你可以先创建一个 WebSocketHandler
。 以下示例显示了如何执行此操作:
Java.
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
public class MyWebSocketHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
// ...
}
}
Kotlin.
import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession
class MyWebSocketHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
// ...
}
}
然后你把它映射到一个URL:
Java.
@Configuration
class WebConfig {
@Bean
public HandlerMapping handlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/path", new MyWebSocketHandler());
int order = -1; // before annotated controllers
return new SimpleUrlHandlerMapping(map, order);
}
}
Kotlin.
@Configuration
class WebConfig {
@Bean
fun handlerMapping(): HandlerMapping {
val map = mapOf("/path" to MyWebSocketHandler())
val order = -1 // before annotated controllers
return SimpleUrlHandlerMapping(map, order)
}
}
如果使用 WebFlux Config 这样就可以了。 否则如果不使用 WebFlux ,你需要声明一个 WebSocketHandlerAdapter
,如下所示:
Java.
@Configuration
class WebConfig {
// ...
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
Kotlin.
@Configuration
class WebConfig {
// ...
@Bean
fun handlerAdapter() = WebSocketHandlerAdapter()
}
WebSocketHandler
的 handle
方法接受 WebSocketSession
并返回 Mono<Void>
以指示会话的应用程序处理何时完成。 会话通过两个流处理,一个用于入站消息,另一个用于出站消息。 下表描述了处理流的两种方法:
WebSocketSession 方法 |
描述 |
---|---|
Flux<WebSocketMessage> receive() |
提供对入站消息流的访问,并在连接关闭时完成。 |
Mono<Void> send(Publisher<WebSocketMessage>) |
获取传出消息的源,写入消息,并返回一个 Mono<Void> ,在源完成并写入完成时完成。 |
WebSocketHandler
必须将入站和出站流组合成一个统一的流,并返回一个反映该流完成的 Mono<Void>
。 根据应用程序要求,统一的流将在以下情况下完成:
-
入站或出站消息流完成。
-
入站流完成(即连接关闭),而出站流是无限的。
-
在选定的点,通过
WebSocketSession
的close
方法。
当入站和出站消息流组合在一起时,无需检查连接是否打开,因为 Reactive Streams 会给出指示。 入站流接收完成或错误信号,出站流接收取消信号。
一个handler最基本的实现是处理入站流。 下面的示例展示了一个实现:
Java.
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.receive() ①
.doOnNext(message -> {
// ... ②
})
.concatMap(message -> {
// ... ③
})
.then(); ④
}
}
① 访问入站消息流。
② 针对每一个消息做一些事情。
③ 使用消息内容做一些嵌套的异步操作。
④ 返回一个 Mono<Void>
在接收完成的时候完成。
Kotlin.
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
return session.receive() ①
.doOnNext {
// ... ②
}
.concatMap {
// ... ③
}
.then() ④
}
}
① 访问入站消息流。
② 针对每一个消息做一些事情。
③ 使用消息内容做一些嵌套的异步操作。
④ 返回一个 Mono<Void>
在接收完成的时候完成。
提示
对于嵌套的异步操作,你需要在使用池化数据缓冲区的底层服务器(例如 Netty)上调用
message.retain()
。 否则,数据缓冲区可能会在你有机会读取数据之前被释放。 有关更多背景信息,请参阅 数据缓冲区和编解码器。
以下实现结合了入站和出站流:
Java.
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<WebSocketMessage> output = session.receive() ①
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.map(value -> session.textMessage("Echo " + value)); ②
return session.send(output); ③
}
}
① 处理入站消息流。
② 创建出站消息,产生一个组合的流。
③ 返回一个Mono<Void>
直到我们停止接收消息。
Kotlin.
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
val output = session.receive() ①
.doOnNext {
// ...
}
.concatMap {
// ...
}
.map { session.textMessage("Echo $it") } ②
return session.send(output) ③
}
}
① 处理入站消息流。
② 创建出站消息,产生一个组合的流。
③ 返回一个Mono<Void>
直到我们停止接收消息。
入站和出站流可以是独立的,并且仅在完成时进行join,如以下示例所示:
Java.
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Mono<Void> input = session.receive() ①
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.then();
Flux<String> source = ... ;
Mono<Void> output = session.send(source.map(session::textMessage)); ②
return Mono.zip(input, output).then(); ③
}
}
① 处理入站消息流。
② 发送出站消息。
③ 将两个流进行join操作并返回一个 Mono<Void>
,在任一流结束时完成。
Kotlin.
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
val input = session.receive() ①
.doOnNext {
// ...
}
.concatMap {
// ...
}
.then()
val source: Flux<String> = ...
val output = session.send(source.map(session::textMessage)) ②
return Mono.zip(input, output).then() ③
}
}
① 处理入站消息流。
② 发送出站消息。
③ 将两个流进行join操作并返回一个 Mono<Void>
,在任一流结束时完成。
DataBuffer
是 WebFlux 中字节缓冲区的表示。 参考指南的 Spring Core 部分 Data Buffers and Codecs 有更多相关内容。 需要理解的关键点是,在一些像 Netty 这样的服务器上,字节缓冲区被池化和引用计数,并且必须在消耗时释放以避免内存泄漏。
在 Netty 上运行时,如果应用程序希望保留输入数据缓冲区以确保它们不被释放,则必须使用 DataBufferUtils.retain(dataBuffer)
,然后在消耗缓冲区时使用 DataBufferUtils.release(dataBuffer)
。
WebSocketHandlerAdapter
委托给一个 WebSocketService
。 默认情况下,这是一个 HandshakeWebSocketService
的实例,它对 WebSocket 请求执行基本检查,然后执行 RequestUpgradeStrategy
。 目前, Reactor Netty、Tomcat、Jetty 和 Undertow 内置支持这些功能。
HandshakeWebSocketService
暴露了一个 sessionAttributePredicate
属性,该属性允许设置一个 Predicate<String>
以从 WebSession
中提取属性并将它们插入到 WebSocketSession
的属性中。
每个服务器的 RequestUpgradeStrategy
暴露了特定于底层 WebSocket 服务引擎的配置。 使用 WebFlux Java 配置时,您可以自定义某些属性,如 WebFlux 配置 的相应部分所展示的,否则如果不使用 WebFlux 配置,可以使用以下代码的方式来配置:
Java.
@Configuration
class WebConfig {
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter(webSocketService());
}
@Bean
public WebSocketService webSocketService() {
TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
strategy.setMaxSessionIdleTimeout(0L);
return new HandshakeWebSocketService(strategy);
}
}
Kotlin.
@Configuration
class WebConfig {
@Bean
fun handlerAdapter() =
WebSocketHandlerAdapter(webSocketService())
@Bean
fun webSocketService(): WebSocketService {
val strategy = TomcatRequestUpgradeStrategy().apply {
setMaxSessionIdleTimeout(0L)
}
return HandshakeWebSocketService(strategy)
}
}
检查你的服务器的升级策略以查看可用选项。 目前,只有 Tomcat 和 Jetty 暴露了这些选项。
配置 CORS 并限制对 WebSocket 端点的访问的最简单方法是让你的 WebSocketHandler
实现 CorsConfigurationSource
并返回一个包含允许的来源、首部和其他详细信息的 CorsConfiguration
。 如果你不能这样做,你还可以在 SimpleUrlHandler
上设置 corsConfigurations
属性以通过 URL 模式指定 CORS 设置。 如果两者都被指定,它们将通过使用 CorsConfiguration
上的 combine
方法进行组合。
Spring WebFlux 提供了一个 WebSocketClient
抽象,其中包含 Reactor Netty、Tomcat、Jetty、Undertow 和标准 Java(即 JSR-356)的实现。
注意
Tomcat 客户端实际上是标准 Java 客户端的扩展,在
WebSocketSession
处理时添加了一些额外的功能,以利用 Tomcat 特定的 API 来暂停接收消息以实现回压机制。
要启动 WebSocket 会话,您可以创建客户端的实例并使用其 execute
方法:
Java.
WebSocketClient client = new ReactorNettyWebSocketClient();
URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->
session.receive()
.doOnNext(System.out::println)
.then());
Kotlin.
val client = ReactorNettyWebSocketClient()
val url = URI("ws://localhost:8080/path")
client.execute(url) { session ->
session.receive()
.doOnNext(::println)
.then()
}
一些客户端,比如 Jetty,实现了 Lifecycle
接口,需要先停止再启动才能使用。 所有客户端都有与底层 WebSocket 客户端配置相关的构造函数选项。