Skip to content

Commit

Permalink
fix QUIC
Browse files Browse the repository at this point in the history
  • Loading branch information
whyoleg committed Apr 18, 2024
1 parent cb43720 commit 104f31c
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 7 deletions.
Expand Up @@ -81,7 +81,7 @@ private class NettyQuicConnectionInboundHandler(
// Note: QUIC streams could be received unordered, so f.e we could receive first stream with id 4 and then with id 0
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
msg as QuicStreamChannel
val state = NettyQuicStreamState(false)
val state = NettyQuicStreamState(null)
if (inbound.trySend(state.wrapStream(msg)).isSuccess) {
msg.pipeline().addLast(NettyQuicStreamInitializer(streamsContext, state, isClient))
}
Expand All @@ -103,10 +103,17 @@ private class NettyQuicConnection(
private val streamsContext: CoroutineContext,
private val isClient: Boolean,
) : RSocketMultiplexedConnection {
private val startMarker = Job()
// we need to `hack` only first stream created for client - stream where frames with streamId=0 will be sent
private val first = AtomicBoolean(isClient)
override suspend fun createStream(): RSocketMultiplexedConnection.Stream {
val state = NettyQuicStreamState(first.getAndSet(false))
val startMarker = if (first.getAndSet(false)) {
startMarker
} else {
startMarker.join()
null
}
val state = NettyQuicStreamState(startMarker)
val stream = try {
channel.createStream(
QuicStreamType.BIDIRECTIONAL,
Expand Down
Expand Up @@ -28,16 +28,14 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.channels.Channel

// TODO: first is a hack to initiate first stream because of buffering
// TODO: first stream is a hack to initiate first stream because of buffering
// quic streams could be received unordered by server, so f.e we could receive first stream with id 4 and then with id 0
// for this, we disable buffering for first client stream, so that first frame will be sent first
// this will affect performance for this stream, so we need to do something else here.
@RSocketTransportApi
internal class NettyQuicStreamState(first: Boolean) {
internal class NettyQuicStreamState(val startMarker: CompletableJob?) {
val closeMarker: CompletableJob = Job()
val outbound = channelForCloseable<ByteReadPacket>(
if (first) Channel.RENDEZVOUS else Channel.BUFFERED
)
val outbound = channelForCloseable<ByteReadPacket>(Channel.BUFFERED)
val inbound = channelForCloseable<ByteReadPacket>(Channel.UNLIMITED)

fun wrapStream(stream: QuicStreamChannel): RSocketMultiplexedConnection.Stream =
Expand Down Expand Up @@ -68,6 +66,7 @@ internal class NettyQuicStreamHandler(
channel.flush()
// await writing to respect transport backpressure
lastWriteFuture.awaitFuture()
state.startMarker?.complete()
}
} finally {
withContext(NonCancellable) {
Expand Down

0 comments on commit 104f31c

Please sign in to comment.