Skip to content

Commit

Permalink
Fix readUtf8Line with multiple buffers
Browse files Browse the repository at this point in the history
  • Loading branch information
e5l committed May 8, 2024
1 parent ec01ba7 commit 9218e53
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 42 deletions.
106 changes: 64 additions & 42 deletions ktor-io/common/src/io/ktor/utils/io/ByteReadChannelOperations.kt
Expand Up @@ -117,26 +117,9 @@ public suspend fun ByteReadChannel.copyAndClose(channel: ByteWriteChannel, limit

@OptIn(InternalAPI::class)
public suspend fun ByteReadChannel.readUTF8Line(): String? {
var lfIndex = readBuffer.indexOf('\n'.code.toByte())
if (lfIndex >= 0) {
return readBuffer.readLine()
}

val tmp = Buffer()

while (lfIndex < 0 && !isClosedForRead) {
readBuffer.transferTo(tmp)
awaitContent()

lfIndex = readBuffer.indexOf('\n'.code.toByte())
}

if (tmp.exhausted() && readBuffer.exhausted()) return null

return buildString {
if (!tmp.exhausted()) append(tmp.readLine())
if (!readBuffer.exhausted()) append(readBuffer.readLine())
}
val result = StringBuilder()
val completed = readUTF8LineTo(result)
return if (!completed) null else result.toString()
}

@OptIn(InternalAPI::class)
Expand Down Expand Up @@ -357,34 +340,73 @@ public suspend fun ByteReadChannel.discard(max: Long = Long.MAX_VALUE): Long {
return max - remaining
}

@OptIn(InternalAPI::class)
public suspend fun ByteReadChannel.readUTF8LineTo(out: Appendable, max: Int): Boolean {
if (isClosedForRead) return false
/**
* Reads a line of UTF-8 characters to the specified [out] buffer.
* It recognizes CR, LF and CRLF as a line delimiter.
*
* @param out the buffer to write the line to
* @param max the maximum number of characters to read
*
* @return `true` if a new line separator was found or max bytes appended. `false` if no new line separator and no bytes read.
*/
@OptIn(InternalAPI::class, InternalIoApi::class)
public suspend fun ByteReadChannel.readUTF8LineTo(out: Appendable, max: Int = Int.MAX_VALUE): Boolean {
if (readBuffer.exhausted()) awaitContent()
if (isClosedForRead) return false

var cr = readBuffer.indexOf('\r'.code.toByte())
var lf = readBuffer.indexOf('\n'.code.toByte())
var searchEnd = readBuffer.remaining
while (cr < 0 && lf < 0 && readBuffer.remaining < max && awaitContent()) {
cr = readBuffer.indexOf('\r'.code.toByte(), searchEnd)
lf = readBuffer.indexOf('\n'.code.toByte(), searchEnd)
searchEnd = readBuffer.remaining
}
var consumed = 0
while (!isClosedForRead) {
awaitContent()

val cr = readBuffer.indexOf('\r'.code.toByte())
val lf = readBuffer.indexOf('\n'.code.toByte())

// No new line separator
if (cr == -1L && lf == -1L) {
if (max == Int.MAX_VALUE) {
val value = readBuffer.readString()
out.append(value)
} else {
val count = minOf(max - consumed, readBuffer.remaining.toInt())
consumed += count
out.append(readBuffer.readString(count.toLong()))

if (consumed == max) return true
}

continue
}

// CRLF fully in buffer
if (cr >=0 && lf == cr + 1) {
val count = if (max != Int.MAX_VALUE) cr else minOf(max - consumed, cr.toInt()).toLong()
out.append(readBuffer.readString(count))
if (count == cr) readBuffer.discard(2)
return true
}

// CR in buffer before LF
if (cr >= 0 && (lf == -1L || cr < lf)) {
val count = if (max != Int.MAX_VALUE) cr else minOf(max - consumed, cr.toInt()).toLong()
out.append(readBuffer.readString(count))
if (count == cr) readBuffer.discard(1)

if (cr < 0 && lf < 0) {
val count = minOf(max.toLong(), readBuffer.remaining)
out.append(readBuffer.readString(count))
} else {
val eol = when {
cr >= 0 && lf >= 0 -> minOf(cr, lf)
cr >= 0 -> cr
else -> lf
// Check if LF follows CR after awaiting
if (readBuffer.exhausted()) awaitContent()
if (readBuffer.buffer[0] == '\n'.code.toByte()) {
readBuffer.discard(1)
}

return true
}

out.append(readBuffer.readString(eol))
readBuffer.readByte()
if (eol == cr && lf == cr + 1) readBuffer.readByte()
// LF in buffer before CR
if (lf >= 0) {
val count = if (max != Int.MAX_VALUE) lf else minOf(max - consumed, lf.toInt()).toLong()
out.append(readBuffer.readString(count))
if (count == lf) readBuffer.discard(1)
return true
}
}

return true
Expand Down
64 changes: 64 additions & 0 deletions ktor-io/common/test/ReadUtf8LineTest.kt
@@ -0,0 +1,64 @@
import io.ktor.test.dispatcher.*
import io.ktor.utils.io.*
import kotlinx.coroutines.*
import kotlin.test.*

/*
* Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

class ReadUtf8LineTest {

@Test
fun testReadUtf8LineWithLongLineWithLimit() = testSuspend {
val lineSize = 1024
val line = "A".repeat(lineSize)

val channel = writer {
repeat(10) {
channel.writeStringUtf8(line)
}
}.channel

val builder = StringBuilder()
channel.readUTF8LineTo(builder, 8 * 1024)
assertEquals(8 * 1024, builder.length)
assertEquals("A".repeat(8 * 1024), builder.toString())
}

@Test
fun testNewLineAfterFlush() = testSuspend {
val channel = writer {
channel.writeStringUtf8("4\r")
channel.flush()
delay(100)
channel.writeStringUtf8("\n2\r\n")
}.channel

val buffer = StringBuilder()
channel.readUTF8LineTo(buffer, 1024)
assertEquals("4", buffer.toString())
buffer.clear()

channel.readUTF8LineTo(buffer, 1024)
assertEquals("2", buffer.toString())
}

@Test
fun testFlushBeforeNewLine() = testSuspend {
val channel = writer {
channel.writeStringUtf8("4")
channel.flush()
delay(100)
channel.writeStringUtf8("\r\n2\r\n")
}.channel

val buffer = StringBuilder()
channel.readUTF8LineTo(buffer, 1024)
assertEquals("4", buffer.toString())
buffer.clear()

channel.readUTF8LineTo(buffer, 1024)
assertEquals("2", buffer.toString())
}
}

0 comments on commit 9218e53

Please sign in to comment.