Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement new streamwriter api #291

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
116 changes: 108 additions & 8 deletions Sources/AsyncHTTPClient/HTTPHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,32 +32,92 @@ extension HTTPClient {
///
/// - parameters:
/// - closure: function that will be called to write actual bytes to the channel.
@available(*, deprecated, message: "StreamWriter is deprecated, please use StreamWriter2")
public init(closure: @escaping (IOData) -> EventLoopFuture<Void>) {
self.closure = closure
}

// This is needed so we don't have build warnings in the client itself
init(internalClosure: @escaping (IOData) -> EventLoopFuture<Void>) {
self.closure = internalClosure
}

/// Write data to server.
///
/// - parameters:
/// - data: `IOData` to write.
@available(*, deprecated, message: "StreamWriter is deprecated, please use StreamWriter2")
public func write(_ data: IOData) -> EventLoopFuture<Void> {
return self.closure(data)
}
}

public struct StreamWriter2 {
public let allocator: ByteBufferAllocator
let onChunk: (IOData) -> EventLoopFuture<Void>
let onComplete: EventLoopPromise<Void>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we're writing this as a callback-taking type rather than just holding onto the task object?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and "forward" all write requests to it's channel? that might be a good idea, will try that, thanks!


public init(allocator: ByteBufferAllocator, onChunk: @escaping (IOData) -> EventLoopFuture<Void>, onComplete: EventLoopPromise<Void>) {
self.allocator = allocator
self.onChunk = onChunk
self.onComplete = onComplete
}

public func write(_ buffer: ByteBuffer) -> EventLoopFuture<Void> {
return self.onChunk(.byteBuffer(buffer))
}

public func write(_ data: IOData) -> EventLoopFuture<Void> {
return self.onChunk(data)
}

public func write(_ buffer: ByteBuffer, promise: EventLoopPromise<Void>?) {
self.onChunk(.byteBuffer(buffer)).cascade(to: promise)
}

public func write(_ data: IOData, promise: EventLoopPromise<Void>?) {
self.onChunk(data).cascade(to: promise)
}

public func end() {
self.onComplete.succeed(())
}

public func fail(_ error: Error) {
self.onComplete.fail(error)
}
}

/// Body size. Request validation will be failed with `HTTPClientErrors.contentLengthMissing` if nil,
/// unless `Trasfer-Encoding: chunked` header is set.
public var length: Int?
/// Body chunk provider.
public var stream: (StreamWriter) -> EventLoopFuture<Void>
var stream2: ((StreamWriter2) -> Void)?

@available(*, deprecated, message: "StreamWriter is deprecated, please use StreamWriter2")
init(length: Int?, stream: @escaping (StreamWriter) -> EventLoopFuture<Void>) {
self.length = length
self.stream = stream
self.stream2 = nil
}

init(length: Int?, stream: @escaping (StreamWriter2) -> Void) {
self.length = length
self.stream = { _ in
preconditionFailure("stream writer 2 was called")
}
self.stream2 = stream
}

/// Create and stream body using `ByteBuffer`.
///
/// - parameters:
/// - buffer: Body `ByteBuffer` representation.
public static func byteBuffer(_ buffer: ByteBuffer) -> Body {
return Body(length: buffer.readableBytes) { writer in
writer.write(.byteBuffer(buffer))
return Body(length: buffer.readableBytes) { (writer: StreamWriter2) in
writer.write(.byteBuffer(buffer), promise: nil)
writer.end()
}
}

Expand All @@ -67,17 +127,30 @@ extension HTTPClient {
/// - length: Body size. Request validation will be failed with `HTTPClientErrors.contentLengthMissing` if nil,
/// unless `Transfer-Encoding: chunked` header is set.
/// - stream: Body chunk provider.
@available(*, deprecated, message: "StreamWriter is deprecated, please use StreamWriter2 instead")
public static func stream(length: Int? = nil, _ stream: @escaping (StreamWriter) -> EventLoopFuture<Void>) -> Body {
return Body(length: length, stream: stream)
}

/// Create and stream body using `StreamWriter`.
///
/// - parameters:
/// - length: Body size. Request validation will be failed with `HTTPClientErrors.contentLengthMissing` if nil,
/// unless `Transfer-Encoding: chunked` header is set.
/// - stream: Body chunk provider.
public static func stream2(length: Int? = nil, _ stream: @escaping (StreamWriter2) -> Void) -> Body {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I we add method with the same name but different type we will break compilation, because some calls could become ambiguous 馃槩

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that there is discussion about possibly breaking API in the nearish future anyway, we may find we want to delay this until then to resolve this problem.

return Body(length: length, stream: stream)
}

/// Create and stream body using `Data`.
///
/// - parameters:
/// - data: Body `Data` representation.
public static func data(_ data: Data) -> Body {
return Body(length: data.count) { writer in
writer.write(.byteBuffer(ByteBuffer(bytes: data)))
return Body(length: data.count) { (writer: StreamWriter2) in
let buffer = writer.allocator.buffer(data: data)
writer.write(.byteBuffer(buffer), promise: nil)
writer.end()
}
}

Expand All @@ -86,8 +159,10 @@ extension HTTPClient {
/// - parameters:
/// - string: Body `String` representation.
public static func string(_ string: String) -> Body {
return Body(length: string.utf8.count) { writer in
writer.write(.byteBuffer(ByteBuffer(string: string)))
return Body(length: string.utf8.count) { (writer: StreamWriter2) in
let buffer = writer.allocator.buffer(string: string)
writer.write(.byteBuffer(buffer), promise: nil)
writer.end()
}
}
}
Expand Down Expand Up @@ -874,7 +949,32 @@ extension TaskHandler: ChannelDuplexHandler {
let channel = context.channel

func doIt() -> EventLoopFuture<Void> {
return body.stream(HTTPClient.Body.StreamWriter { part in
if let stream2 = body.stream2 {
let completion = channel.eventLoop.makePromise(of: Void.self)
stream2(HTTPClient.Body.StreamWriter2(allocator: channel.allocator, onChunk: { part in
let promise = self.task.eventLoop.makePromise(of: Void.self)
// All writes have to be switched to the channel EL if channel and task ELs differ
if channel.eventLoop.inEventLoop {
self.writeBodyPart(context: context, part: part, promise: promise)
} else {
channel.eventLoop.execute {
self.writeBodyPart(context: context, part: part, promise: promise)
}
}

promise.futureResult.whenFailure { error in
completion.fail(error)
}

return promise.futureResult.map {
self.callOutToDelegateFireAndForget(value: part, self.delegate.didSendRequestPart)
}
}, onComplete: completion))

return completion.futureResult
}

return body.stream(HTTPClient.Body.StreamWriter(internalClosure: { part in
let promise = self.task.eventLoop.makePromise(of: Void.self)
// All writes have to be switched to the channel EL if channel and task ELs differ
if channel.eventLoop.inEventLoop {
Expand All @@ -888,7 +988,7 @@ extension TaskHandler: ChannelDuplexHandler {
return promise.futureResult.map {
self.callOutToDelegateFireAndForget(value: part, self.delegate.didSendRequestPart)
}
})
}))
}

// Callout to the user to start body streaming should be on task EL
Expand Down
57 changes: 31 additions & 26 deletions Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -169,17 +169,19 @@ class HTTPClientInternalTests: XCTestCase {
XCTAssertNoThrow(try httpBin.shutdown())
}

let body: HTTPClient.Body = .stream(length: 50) { writer in
let body: HTTPClient.Body = .stream2(length: 50) { writer in
do {
var request = try Request(url: "http://localhost:\(httpBin.port)/events/10/1")
request.headers.add(name: "Accept", value: "text/event-stream")

let delegate = HTTPClientCopyingDelegate { part in
writer.write(.byteBuffer(part))
writer.write(part)
}
httpClient.execute(request: request, delegate: delegate).futureResult.whenComplete { _ in
writer.end()
}
return httpClient.execute(request: request, delegate: delegate).futureResult
} catch {
return httpClient.eventLoopGroup.next().makeFailedFuture(error)
writer.fail(error)
}
}

Expand All @@ -198,23 +200,25 @@ class HTTPClientInternalTests: XCTestCase {
XCTAssertNoThrow(try httpBin.shutdown())
}

var body: HTTPClient.Body = .stream(length: 50) { _ in
httpClient.eventLoopGroup.next().makeFailedFuture(HTTPClientError.invalidProxyResponse)
var body: HTTPClient.Body = .stream2(length: 50) { writer in
writer.fail(HTTPClientError.invalidProxyResponse)
}

XCTAssertThrowsError(try httpClient.post(url: "http://localhost:\(httpBin.port)/post", body: body).wait())

body = .stream(length: 50) { _ in
body = .stream2(length: 50) { writer in
do {
var request = try Request(url: "http://localhost:\(httpBin.port)/events/10/1")
request.headers.add(name: "Accept", value: "text/event-stream")

let delegate = HTTPClientCopyingDelegate { _ in
httpClient.eventLoopGroup.next().makeFailedFuture(HTTPClientError.invalidProxyResponse)
}
return httpClient.execute(request: request, delegate: delegate).futureResult
httpClient.execute(request: request, delegate: delegate).futureResult.whenComplete { _ in
writer.end()
}
} catch {
return httpClient.eventLoopGroup.next().makeFailedFuture(error)
writer.fail(error)
}
}

Expand Down Expand Up @@ -432,11 +436,11 @@ class HTTPClientInternalTests: XCTestCase {
XCTAssertNoThrow(try httpClient.syncShutdown(requiresCleanClose: true))
}

let body: HTTPClient.Body = .stream(length: 8) { writer in
let buffer = ByteBuffer(string: "1234")
return writer.write(.byteBuffer(buffer)).flatMap {
let buffer = ByteBuffer(string: "4321")
return writer.write(.byteBuffer(buffer))
let body: HTTPClient.Body = .stream2(length: 8) { writer in
writer.write(writer.allocator.buffer(string: "1234")).whenComplete { _ in
writer.write(writer.allocator.buffer(string: "4321")).whenComplete { _ in
writer.end()
}
}
}

Expand Down Expand Up @@ -885,13 +889,13 @@ class HTTPClientInternalTests: XCTestCase {
let el2 = group.next()
XCTAssert(el1 !== el2)

let body: HTTPClient.Body = .stream(length: 8) { writer in
let body: HTTPClient.Body = .stream2(length: 8) { writer in
XCTAssert(el1.inEventLoop)
let buffer = ByteBuffer(string: "1234")
return writer.write(.byteBuffer(buffer)).flatMap {
return writer.write(writer.allocator.buffer(string: "1234")).whenComplete { _ in
XCTAssert(el1.inEventLoop)
let buffer = ByteBuffer(string: "4321")
return writer.write(.byteBuffer(buffer))
writer.write(writer.allocator.buffer(string: "4321")).whenComplete { _ in
writer.end()
}
}
}
let request = try HTTPClient.Request(url: "http://localhost:\(httpBin.port)/post", method: .POST, body: body)
Expand Down Expand Up @@ -921,17 +925,17 @@ class HTTPClientInternalTests: XCTestCase {
XCTAssert(el1 !== el2)

let taskPromise = group.next().makePromise(of: HTTPClient.Task<HTTPClient.Response>.self)
let body: HTTPClient.Body = .stream(length: 8) { writer in
let body: HTTPClient.Body = .stream2(length: 8) { writer in
XCTAssert(el1.inEventLoop)
let buffer = ByteBuffer(string: "1234")
return writer.write(.byteBuffer(buffer)).flatMap {
writer.write(writer.allocator.buffer(string: "1234")).whenComplete { _ in
XCTAssert(el1.inEventLoop)
let buffer = ByteBuffer(string: "4321")
return taskPromise.futureResult.map { (task: HTTPClient.Task<HTTPClient.Response>) -> Void in
XCTAssertNotNil(task.connection)
XCTAssert(task.connection?.channel.eventLoop === el2)
}.flatMap {
writer.write(.byteBuffer(buffer))
writer.write(writer.allocator.buffer(string: "4321"))
}.whenComplete { _ in
writer.end()
}
}
}
Expand Down Expand Up @@ -1070,8 +1074,9 @@ class HTTPClientInternalTests: XCTestCase {
try channel.pipeline.addHandler(handler).wait()

var request = try Request(url: "http://localhost:8080/post")
request.body = .stream(length: 1) { writer in
writer.write(.byteBuffer(ByteBuffer(string: "1234")))
request.body = .stream2(length: 1) { writer in
writer.write(writer.allocator.buffer(string: "1234"), promise: nil)
writer.end()
}

XCTAssertThrowsError(try channel.writeOutbound(request))
Expand Down