Skip to content

Commit

Permalink
Add min parameter to await
Browse files Browse the repository at this point in the history
  • Loading branch information
e5l committed May 13, 2024
1 parent 6aa8c78 commit 31b0425
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 18 deletions.
11 changes: 8 additions & 3 deletions ktor-io/api/ktor-io.api
Expand Up @@ -7,7 +7,7 @@ public final class io/ktor/utils/io/ByteChannel : io/ktor/utils/io/BufferedByteW
public fun <init> ()V
public fun <init> (Z)V
public synthetic fun <init> (ZILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun awaitContent (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun awaitContent (ILkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun cancel (Ljava/lang/Throwable;)V
public fun close ()V
public fun flush (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand Down Expand Up @@ -43,7 +43,7 @@ public final class io/ktor/utils/io/ByteChannelUtilsKt {

public abstract interface class io/ktor/utils/io/ByteReadChannel {
public static final field Companion Lio/ktor/utils/io/ByteReadChannel$Companion;
public abstract fun awaitContent (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun awaitContent (ILkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun cancel (Ljava/lang/Throwable;)V
public abstract fun getClosedCause ()Ljava/lang/Throwable;
public abstract fun getReadBuffer ()Lkotlinx/io/Source;
Expand All @@ -54,6 +54,10 @@ public final class io/ktor/utils/io/ByteReadChannel$Companion {
public final fun getEmpty ()Lio/ktor/utils/io/ByteReadChannel;
}

public final class io/ktor/utils/io/ByteReadChannel$DefaultImpls {
public static synthetic fun awaitContent$default (Lio/ktor/utils/io/ByteReadChannel;ILkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
}

public final class io/ktor/utils/io/ByteReadChannelKt {
public static final fun cancel (Lio/ktor/utils/io/ByteReadChannel;)V
}
Expand Down Expand Up @@ -85,6 +89,7 @@ public final class io/ktor/utils/io/ByteReadChannelOperationsKt {
public static final fun readShort (Lio/ktor/utils/io/ByteReadChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun readUTF8Line (Lio/ktor/utils/io/ByteReadChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun readUTF8LineTo (Lio/ktor/utils/io/ByteReadChannel;Ljava/lang/Appendable;ILkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun readUTF8LineTo$default (Lio/ktor/utils/io/ByteReadChannel;Ljava/lang/Appendable;ILkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static final fun reader (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lio/ktor/utils/io/ByteChannel;Lkotlin/jvm/functions/Function2;)Lio/ktor/utils/io/ReaderJob;
public static final fun reader (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ZLkotlin/jvm/functions/Function2;)Lio/ktor/utils/io/ReaderJob;
public static synthetic fun reader$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ZLkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/ktor/utils/io/ReaderJob;
Expand Down Expand Up @@ -173,7 +178,7 @@ public abstract interface class io/ktor/utils/io/ChannelJob {

public final class io/ktor/utils/io/CountedByteReadChannel : io/ktor/utils/io/ByteReadChannel {
public fun <init> (Lio/ktor/utils/io/ByteReadChannel;)V
public fun awaitContent (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun awaitContent (ILkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun cancel (Ljava/lang/Throwable;)V
public fun getClosedCause ()Ljava/lang/Throwable;
public final fun getDelegate ()Lio/ktor/utils/io/ByteReadChannel;
Expand Down
4 changes: 2 additions & 2 deletions ktor-io/common/src/io/ktor/utils/io/ByteChannel.kt
Expand Up @@ -58,11 +58,11 @@ public class ByteChannel(public val autoFlush: Boolean = false) : ByteReadChanne
get() = (closedCause != null) || (isClosedForWrite && flushBufferSize == 0 && _readBuffer.exhausted())

@OptIn(InternalAPI::class)
override suspend fun awaitContent(): Boolean {
override suspend fun awaitContent(min: Int): Boolean {
rethrowCloseCauseIfNeeded()

slot.sleepWhile {
flushBufferSize == 0 && _readBuffer.size == 0L && !isClosedForRead
flushBufferSize + _readBuffer.size < min && !isClosedForRead
}
if (_readBuffer.size < CHANNEL_MAX_SIZE) moveFlushToReadBuffer()
return _closedCause.value == null
Expand Down
8 changes: 4 additions & 4 deletions ktor-io/common/src/io/ktor/utils/io/ByteReadChannel.kt
Expand Up @@ -22,12 +22,12 @@ public interface ByteReadChannel {
public val readBuffer: Source

/**
* Suspend the channel until it has bytes or gets closed. Throws exception if the channel was closed with an error.
* If there are bytes available in the channel, this function returns immediately.
* Suspend the channel until it has [min] bytes or gets closed. Throws exception if the channel was closed with an
* error. If there are bytes available in the channel, this function returns immediately.
*
* @return return `false` eof is reached, otherwise `true`.
*/
public suspend fun awaitContent(): Boolean
public suspend fun awaitContent(min: Int = 1): Boolean

public fun cancel(cause: Throwable?)

Expand All @@ -41,7 +41,7 @@ public interface ByteReadChannel {
@InternalAPI
override val readBuffer: Source = Buffer()

override suspend fun awaitContent(): Boolean = false
override suspend fun awaitContent(min: Int): Boolean = false

override fun cancel(cause: Throwable?) {
}
Expand Down
4 changes: 2 additions & 2 deletions ktor-io/common/src/io/ktor/utils/io/CountedByteReadChannel.kt
Expand Up @@ -33,9 +33,9 @@ public class CountedByteReadChannel(public val delegate: ByteReadChannel) : Byte
override val readBuffer: Source
get() = delegate.readBuffer

override suspend fun awaitContent(): Boolean {
override suspend fun awaitContent(min: Int): Boolean {
val before = delegate.availableForRead
val result = delegate.awaitContent()
val result = delegate.awaitContent(min)
count += (initial - before).toLong()
initial = delegate.availableForRead
return result
Expand Down
Expand Up @@ -28,7 +28,7 @@ internal class SourceByteReadChannel(private val source: Source) : ByteReadChann
return source
}

override suspend fun awaitContent(): Boolean {
override suspend fun awaitContent(min: Int): Boolean {
closedCause?.let { throw it }
return false
}
Expand Down
2 changes: 1 addition & 1 deletion ktor-io/jvm/src/io/ktor/utils/io/LookAheadSession.kt
Expand Up @@ -36,7 +36,7 @@ public class LookAheadSuspendSession(private val channel: ByteReadChannel) {
@OptIn(InternalAPI::class)
public suspend fun awaitAtLeast(min: Int): Boolean {
if (channel.readBuffer.remaining >= min) return true
channel.awaitContent()
channel.awaitContent(min)
return channel.readBuffer.remaining >= min
}

Expand Down
13 changes: 8 additions & 5 deletions ktor-io/jvm/src/io/ktor/utils/io/jvm/javaio/Reading.kt
Expand Up @@ -54,14 +54,17 @@ internal class RawSourceChannel(
override val readBuffer: Source
get() = buffer

override suspend fun awaitContent(): Boolean {
override suspend fun awaitContent(min: Int): Boolean {
if (closedToken != null) return true

withContext(coroutineContext) {
val result = try {
source.readAtMostTo(buffer, Long.MAX_VALUE)
} catch (cause: EOFException) {
-1L
var result = 0L
while (buffer.size < min && result >= 0) {
result = try {
source.readAtMostTo(buffer, Long.MAX_VALUE)
} catch (cause: EOFException) {
-1L
}
}

if (result == -1L) {
Expand Down

0 comments on commit 31b0425

Please sign in to comment.