Skip to content

Commit

Permalink
simplify parser and serializer
Browse files Browse the repository at this point in the history
  • Loading branch information
paulofaria committed May 15, 2017
1 parent fd308a2 commit fdd830d
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 59 deletions.
4 changes: 3 additions & 1 deletion Sources/Example/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ import HTTP

let router = BasicRouter { root in
root.get { request in
return Response(status: .ok)
return Response(status: .ok, headers: ["Transfer-Encoding": "chunked"]) { stream in
try stream.write("hello world", deadline: 1.second.fromNow())
}
}
}

Expand Down
35 changes: 8 additions & 27 deletions Sources/HTTP/Parser/RequestParser.swift
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,6 @@ public final class RequestParser {

private var requests: [Request] = []

private var body: (Request) throws -> Void = { _ in }

public init(stream: ReadableStream, bufferSize: Int = 2048) {
self.stream = stream
self.bufferSize = bufferSize
Expand Down Expand Up @@ -238,21 +236,13 @@ public final class RequestParser {
buffer.deallocate()
}

public func parse(timeout: Duration, body: @escaping (Request) throws -> Void) throws {
self.body = body

public func parse(deadline: Deadline) throws -> Request {
while true {
do {
try read(deadline: timeout.fromNow())
} catch VeniceError.deadlineReached {
continue
} catch SystemError.brokenPipe {
break
} catch SystemError.connectionResetByPeer {
break
} catch SystemError.socketIsNotConnected {
break
guard requests.isEmpty else {
return requests.removeFirst()
}

try read(deadline: deadline)
}
}

Expand All @@ -263,14 +253,10 @@ public final class RequestParser {
try stream.close()
}

let requests = try parse(read)

for request in requests {
try body(request)
}
try parse(read)
}

private func parse(_ buffer: UnsafeRawBufferPointer) throws -> [Request] {
private func parse(_ buffer: UnsafeRawBufferPointer) throws {
let final = buffer.isEmpty
let needsMessage: Bool

Expand Down Expand Up @@ -298,14 +284,9 @@ public final class RequestParser {
throw RequestParserError(parser.http_errno)
}

let parsed = requests
requests = []

guard !parsed.isEmpty || !needsMessage else {
guard !requests.isEmpty || !needsMessage else {
throw RequestParserError(HPE_INVALID_EOF_STATE.rawValue)
}

return parsed
}

fileprivate func process(state newState: State, data: UnsafeRawBufferPointer? = nil) -> Int32 {
Expand Down
2 changes: 0 additions & 2 deletions Sources/HTTP/Router/Router.swift
Original file line number Diff line number Diff line change
Expand Up @@ -121,5 +121,3 @@ open class BasicRouter {
}
}
}


25 changes: 11 additions & 14 deletions Sources/HTTP/Serializer/ResponseSerializer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,21 @@ public final class ResponseSerializer {
buffer.deallocate()
}

public func serialize(_ response: Response, timeout: Duration) throws {
let deadline = timeout.fromNow()

public func serialize(_ response: Response, deadline: Deadline) throws -> Bool {
try writeHeaders(for: response, deadline: deadline)

if let contentLength = response.contentLength {
try writeBody(for: response, contentLength: contentLength, deadline: deadline)
} else if response.isChunkEncoded {
try writeChunkedBody(for: response, deadline: deadline)
} else {
try writeBody(for: response, deadline: deadline)
return true
}

if response.isChunkEncoded {
try writeChunkEncodedBody(for: response, deadline: deadline)
return true
}

try write(to: stream, body: response.body, deadline: deadline)
return false
}

@inline(__always)
Expand Down Expand Up @@ -75,18 +78,12 @@ public final class ResponseSerializer {
}

@inline(__always)
private func writeChunkedBody(for response: Response, deadline: Deadline) throws {
private func writeChunkEncodedBody(for response: Response, deadline: Deadline) throws {
let bodyStream = ResponseBodyStream(stream, mode: .chunkedEncoding)
try write(to: bodyStream, body: response.body, deadline: deadline)
try stream.write("0\r\n\r\n", deadline: deadline)
}

@inline(__always)
private func writeBody(for response: Response, deadline: Deadline) throws {
try write(to: stream, body: response.body, deadline: deadline)
try stream.close()
}

@inline(__always)
private func write(to writableStream: WritableStream, body: Body, deadline: Deadline) throws {
switch body {
Expand Down
52 changes: 37 additions & 15 deletions Sources/HTTP/Server/Server.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,46 +5,60 @@ import Venice
public typealias Respond = (Request) -> Response

public final class Server {
/// Server buffer size
public let bufferSize: Int
/// Parser buffer size
public let parserBufferSize: Int

/// Serializer buffer size
public let serializerBufferSize: Int

/// Parse timeout
public let parseTimeout: Duration

/// Serialization timeout
public let serializeTimeout: Duration

/// Close connection timeout
public let closeConnectionTimeout: Duration

private let logger: Logger
private let group = Coroutine.Group()
private let respond: Respond

/// Creates a new HTTP server
public init(
bufferSize: Int = 4096,
parserBufferSize: Int = 4096,
serializerBufferSize: Int = 4096,
parseTimeout: Duration = 5.minutes,
serializeTimeout: Duration = 5.minutes,
closeConnectionTimeout: Duration = 1.minute,
logAppenders: [LogAppender] = [defaultAppender],
respond: @escaping Respond
) {
self.bufferSize = bufferSize
self.parserBufferSize = parserBufferSize
self.serializerBufferSize = serializerBufferSize
self.parseTimeout = parseTimeout
self.serializeTimeout = serializeTimeout
self.closeConnectionTimeout = closeConnectionTimeout
self.logger = Logger(name: "HTTP server", appenders: logAppenders)
self.respond = respond
}

/// Creates a new HTTP server
public convenience init(
bufferSize: Int = 4096,
parserBufferSize: Int = 4096,
serializerBufferSize: Int = 4096,
parseTimeout: Duration = 5.minutes,
serializeTimeout: Duration = 5.minutes,
closeConnectionTimeout: Duration = 1.minute,
logAppenders: [LogAppender] = [defaultAppender],
router: BasicRouter
) {
self.init(
bufferSize: bufferSize,
parserBufferSize: parserBufferSize,
serializerBufferSize: serializerBufferSize,
parseTimeout: parseTimeout,
serializeTimeout: serializeTimeout,
closeConnectionTimeout: closeConnectionTimeout,
logAppenders: logAppenders,
respond: router.respond
)
Expand Down Expand Up @@ -113,7 +127,7 @@ public final class Server {
}

private static var defaultAppender: LogAppender {
return StandardOutputAppender(name: "HTTP server", levels: [.error, .info])
return StandardOutputAppender(name: "HTTP server", levels: [.error, .info, .debug])
}

private static var defaultHeader: String {
Expand All @@ -139,7 +153,7 @@ public final class Server {
private func accept(_ host: Host) throws {
let stream = try host.accept(deadline: .never)

try group.addCoroutine {
try group.addCoroutine { [unowned self] in
do {
try self.process(stream)
} catch SystemError.brokenPipe {
Expand All @@ -156,20 +170,28 @@ public final class Server {

@inline(__always)
private func process(_ stream: DuplexStream) throws {
let parser = RequestParser(stream: stream, bufferSize: bufferSize)
let serializer = ResponseSerializer(stream: stream, bufferSize: bufferSize)
let parser = RequestParser(stream: stream, bufferSize: parserBufferSize)
let serializer = ResponseSerializer(stream: stream, bufferSize: serializerBufferSize)

try parser.parse(timeout: parseTimeout) { request in
let response = self.respond(request)
try serializer.serialize(response, timeout: self.serializeTimeout)
while true {
let request = try parser.parse(deadline: parseTimeout.fromNow())
let response = respond(request)
let keepAlive = try serializer.serialize(response, deadline: serializeTimeout.fromNow())

guard keepAlive else {
try stream.done(deadline: closeConnectionTimeout.fromNow())
break
}

if let upgrade = response.upgradeConnection {
try upgrade(request, stream)
try stream.done(deadline: self.serializeTimeout.fromNow())
try stream.done(deadline: closeConnectionTimeout.fromNow())
break
}

if !request.isKeepAlive {
try stream.done(deadline: self.serializeTimeout.fromNow())
try stream.done(deadline: closeConnectionTimeout.fromNow())
break
}
}
}
Expand Down

0 comments on commit fdd830d

Please sign in to comment.