Skip to content

Commit

Permalink
[SOAR-0010] Support for JSON Lines, JSON Sequence, and Server-sent Ev…
Browse files Browse the repository at this point in the history
…ents (#495)

### Motivation

Check out the proposal and leave feedback here or on the Swift forums.


https://github.com/czechboy0/swift-openapi-generator/blob/hd-soar-0010/Sources/swift-openapi-generator/Documentation.docc/Proposals/SOAR-0010.md

### Modifications

Added the proposal.

### Result

N/A

### Test Plan

N/A
  • Loading branch information
czechboy0 committed Jan 8, 2024
1 parent d6b0149 commit b18bece
Show file tree
Hide file tree
Showing 2 changed files with 372 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,4 @@ If you have any questions, tag [Honza Dvorsky](https://github.com/czechboy0) or
- <doc:SOAR-0007>
- <doc:SOAR-0008>
- <doc:SOAR-0009>
- <doc:SOAR-0010>
Original file line number Diff line number Diff line change
@@ -0,0 +1,371 @@
# SOAR-0010: Support for JSON Lines, JSON Sequence, and Server-sent Events

Introduce streaming encoders and decoders for JSON Lines, JSON Sequence, and Server-sent Events for as a convenience API.

## Overview

- Proposal: SOAR-0010
- Author(s): [Honza Dvorsky](https://github.com/czechboy0)
- Status: **Implemented (1.2.0)**
- Issue: [apple/swift-openapi-generator#416](https://github.com/apple/swift-openapi-generator/issues/416)
- Implementation:
- [apple/swift-openapi-runtime#91](https://github.com/apple/swift-openapi-runtime/pull/91)
- [apple/swift-openapi-generator#494](https://github.com/apple/swift-openapi-generator/pull/494)
- Affected components:
- generator (examples and docs only)
- runtime (streaming encoders and decoders)
- Related links:
- [JSON Lines](https://jsonlines.org)
- [JSON Sequence](https://datatracker.ietf.org/doc/html/rfc7464)
- [Server-sent Events](https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events)

### Introduction

Add streaming encoders and decoders for these three event stream formats to the runtime library, allowing adopters to easily produce and consume event streams, both on the client and server.

### Motivation

While the OpenAPI specification is optimized for HTTP APIs that send a single request value, and receive a single response value, there are many use cases in which developers want to stream values over time.

A simple example of streaming "values" is a file transfer, which can be thought of as a stream of byte chunks that represent the contents of the file. Another is multipart content, streaming individual parts over time. Both of these are already supported by Swift OpenAPI Generator, as of version 0.3.0 and 1.0.0-alpha.1, respectively.

Another popular use case for streaming is to send JSON-encoded events over time, usually (but not exclusively), from the server to the client.

- The [Kubernetes API](https://kubernetes.io/docs/reference/) uses [JSON Lines](https://jsonlines.org) to stream updates to resources from its control plane.
- The [OpenAI API](https://platform.openai.com/docs/api-reference/streaming) uses [Server-sent Events](https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events) to stream text snippets from ChatGPT.
- I couldn't find a popular service using JSON Sequence, but unlike JSON Lines, it's well-defined in [RFC7464](https://datatracker.ietf.org/doc/html/rfc7464), and also used around the industry.

The flow starts with the client initiating an HTTP request to the server, and the server responding with an HTTP response head, and then the server starting to stream the response body, which contains the delimited events, processed over time by the client.

This lightweight solution has the advantage of being a plain HTTP request/response pair, without requiring a custom protocol to either replace HTTP, or sit on top of it. This makes intermediaries, such as proxies, still able to pass data through without being aware of the streaming nature of the HTTP body.

### Proposed solution

Since the OpenAPI specification does not explicitly mention event streaming, it's up to tools, such as Swift OpenAPI Generator, to provide additional conveniences.

This proposed solution consists of two parts:
1. Add streaming encoders and decoders for the three event stream formats to the runtime library, represented as an `AsyncSequence` that converts elements between byte chunks and parsed events.
2. Provide examples for how adopters can then chain those sequences on the `HTTPBody` values they either produce or consume, in their code. No extra code would be generated.

Generally, the three event stream formats are associated with the following content types:
- JSON Lines: `application/jsonl`, `application/x-ndjson`
- JSON Sequence: `application/json-seq`
- Server-sent Events: `text/event-stream`

The generated code would continue to only vend the raw sequence of byte chunks (`HTTPBody`), and adopters could optionally chain the encoding/decoding sequence on it. For example, an OpenAPI document with a JSON Lines stream of `Greeting` values could contain the following:

```yaml
paths:
/greetings:
get:
operationId: getGreetingsStream
responses:
'200':
content:
application/jsonl:
schema:
$ref: '#/components/schemas/Greeting'
components:
schemas:
Greeting:
type: object
properties:
...
```

The important part is the `application/jsonl` (JSON Lines) content type (not to be confused with a plain `application/json` content type), and the event schema in `#/components/schemas`.

> Tip: For end-to-end working examples, check out the [pull request](https://github.com/apple/swift-openapi-generator/pull/494) adding example packages for all the formats.
#### Consuming event streams

As a consumer of such a body in Swift (usually on the client), you'd use one of the proposed methods, here `asDecodedJSONLines(of:decoder:)` to get a stream that parses the individual JSON lines and decodes each JSON object as a value of `Components.Schemas.Greeting`.

Then, you can read the stream, for example in a `for try await` loop.

```swift
let response = try await client.getGreetingsStream()
let httpBody = try response.ok.body.application_jsonl
let greetingStream = httpBody.asDecodedJSONLines(of: Components.Schemas.Greeting.self)
for try await greeting in greetingStream {
print("Got greeting: \(greeting.message)")
}
```

#### Producing event streams

As a producer of such a body, start with a root async sequence, for example an `AsyncStream`, and submit events to it.

```swift
let (stream, continuation) = AsyncStream<Components.Schemas.Greeting>.makeStream()
// Pass the continuation to another task that calls
// `continuation.yield(...)` with events, and `continuation.finish()`
// at the end.

let httpBody = HTTPBody(
stream.asEncodedJSONLines(),
length: .unknown,
iterationBehavior: .single
)
// Provide `httpBody` to the response, for example.
return .ok(.init(body: .application_jsonl(httpBody)))
```

### Detailed design

The rest of this section contains the Swift interface of the new API for the runtime library.

```swift
/// A sequence that parses arbitrary byte chunks into lines using the JSON Lines format.
public struct JSONLinesDeserializationSequence<Upstream> : Sendable where Upstream : Sendable, Upstream : AsyncSequence, Upstream.Element == ArraySlice<UInt8> {

/// Creates a new sequence.
/// - Parameter upstream: The upstream sequence of arbitrary byte chunks.
public init(upstream: Upstream)
}

extension JSONLinesDeserializationSequence : AsyncSequence {
public typealias Element = ArraySlice<UInt8>
public struct Iterator<UpstreamIterator> : AsyncIteratorProtocol where UpstreamIterator : AsyncIteratorProtocol, UpstreamIterator.Element == ArraySlice<UInt8> {
public mutating func next() async throws -> ArraySlice<UInt8>?
}
public func makeAsyncIterator() -> Iterator<Upstream.AsyncIterator>
}

extension AsyncSequence where Self.Element == ArraySlice<UInt8> {

/// Returns another sequence that decodes each JSON Lines event as the provided type using the provided decoder.
/// - Parameters:
/// - eventType: The type to decode the JSON event into.
/// - decoder: The JSON decoder to use.
/// - Returns: A sequence that provides the decoded JSON events.
public func asDecodedJSONLines<Event>(of eventType: Event.Type = Event.self, decoder: JSONDecoder = .init()) -> AsyncThrowingMapSequence<JSONLinesDeserializationSequence<Self>, Event> where Self : Sendable, Event : Decodable
}

/// A sequence that serializes lines by concatenating them using the JSON Lines format.
public struct JSONLinesSerializationSequence<Upstream> : Sendable where Upstream : Sendable, Upstream : AsyncSequence, Upstream.Element == ArraySlice<UInt8> {

/// Creates a new sequence.
/// - Parameter upstream: The upstream sequence of lines.
public init(upstream: Upstream)
}

extension JSONLinesSerializationSequence : AsyncSequence {
public typealias Element = ArraySlice<UInt8>
public struct Iterator<UpstreamIterator> : AsyncIteratorProtocol where UpstreamIterator : AsyncIteratorProtocol, UpstreamIterator.Element == ArraySlice<UInt8> {
public mutating func next() async throws -> ArraySlice<UInt8>?
}
public func makeAsyncIterator() -> Iterator<Upstream.AsyncIterator>
}

extension AsyncSequence where Self.Element : Encodable {

/// Returns another sequence that encodes the events using the provided encoder into JSON Lines.
/// - Parameter encoder: The JSON encoder to use.
/// - Returns: A sequence that provides the serialized JSON Lines.
public func asEncodedJSONLines(encoder: JSONEncoder = {
let encoder = JSONEncoder()
encoder.outputFormatting = [.sortedKeys, .withoutEscapingSlashes]
return encoder
}()) -> JSONLinesSerializationSequence<AsyncThrowingMapSequence<Self, ArraySlice<UInt8>>>
}

/// A sequence that parses arbitrary byte chunks into lines using the JSON Sequence format.
public struct JSONSequenceDeserializationSequence<Upstream> : Sendable where Upstream : Sendable, Upstream : AsyncSequence, Upstream.Element == ArraySlice<UInt8> {

/// Creates a new sequence.
/// - Parameter upstream: The upstream sequence of arbitrary byte chunks.
public init(upstream: Upstream)
}

extension JSONSequenceDeserializationSequence : AsyncSequence {
public typealias Element = ArraySlice<UInt8>
public struct Iterator<UpstreamIterator> : AsyncIteratorProtocol where UpstreamIterator : AsyncIteratorProtocol, UpstreamIterator.Element == ArraySlice<UInt8> {
public mutating func next() async throws -> ArraySlice<UInt8>?
}
public func makeAsyncIterator() -> Iterator<Upstream.AsyncIterator>
}

extension AsyncSequence where Self.Element == ArraySlice<UInt8> {

/// Returns another sequence that decodes each JSON Sequence event as the provided type using the provided decoder.
/// - Parameters:
/// - eventType: The type to decode the JSON event into.
/// - decoder: The JSON decoder to use.
/// - Returns: A sequence that provides the decoded JSON events.
public func asDecodedJSONSequence<Event>(of eventType: Event.Type = Event.self, decoder: JSONDecoder = .init()) -> AsyncThrowingMapSequence<JSONSequenceDeserializationSequence<Self>, Event> where Self : Sendable, Event : Decodable
}

/// A sequence that serializes lines by concatenating them using the JSON Sequence format.
public struct JSONSequenceSerializationSequence<Upstream> : Sendable where Upstream : Sendable, Upstream : AsyncSequence, Upstream.Element == ArraySlice<UInt8> {

/// Creates a new sequence.
/// - Parameter upstream: The upstream sequence of lines.
public init(upstream: Upstream)
}

extension JSONSequenceSerializationSequence : AsyncSequence {
public typealias Element = ArraySlice<UInt8>
public struct Iterator<UpstreamIterator> : AsyncIteratorProtocol where UpstreamIterator : AsyncIteratorProtocol, UpstreamIterator.Element == ArraySlice<UInt8> {
public mutating func next() async throws -> ArraySlice<UInt8>?
}
public func makeAsyncIterator() -> Iterator<Upstream.AsyncIterator>
}

extension AsyncSequence where Self.Element : Encodable {

/// Returns another sequence that encodes the events using the provided encoder into a JSON Sequence.
/// - Parameter encoder: The JSON encoder to use.
/// - Returns: A sequence that provides the serialized JSON Sequence.
public func asEncodedJSONSequence(encoder: JSONEncoder = {
let encoder = JSONEncoder()
encoder.outputFormatting = [.sortedKeys, .withoutEscapingSlashes]
return encoder
}()) -> JSONSequenceSerializationSequence<AsyncThrowingMapSequence<Self, ArraySlice<UInt8>>>
}

/// An event sent by the server that has a JSON payload in the data field.
///
/// https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation
public struct ServerSentEventWithJSONData<JSONDataType> : Sendable, Hashable where JSONDataType : Hashable, JSONDataType : Sendable {

/// A type of the event, helps inform how to interpret the data.
public var event: String?

/// The payload of the event.
public var data: JSONDataType?

/// A unique identifier of the event, can be used to resume an interrupted stream by
/// making a new request with the `Last-Event-ID` header field set to this value.
///
/// https://html.spec.whatwg.org/multipage/server-sent-events.html#the-last-event-id-header
public var id: String?

/// The amount of time, in milliseconds, the client should wait before reconnecting in case
/// of an interruption.
///
/// https://html.spec.whatwg.org/multipage/server-sent-events.html#the-eventsource-interface
public var retry: Int64?

/// Creates a new event.
/// - Parameters:
/// - event: A type of the event, helps inform how to interpret the data.
/// - data: The payload of the event.
/// - id: A unique identifier of the event.
/// - retry: The amount of time, in milliseconds, to wait before retrying.
public init(event: String? = nil, data: JSONDataType? = nil, id: String? = nil, retry: Int64? = nil)
}

/// An event sent by the server.
///
/// https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation
public struct ServerSentEvent : Sendable, Hashable {

/// A unique identifier of the event, can be used to resume an interrupted stream by
/// making a new request with the `Last-Event-ID` header field set to this value.
///
/// https://html.spec.whatwg.org/multipage/server-sent-events.html#the-last-event-id-header
public var id: String?

/// A type of the event, helps inform how to interpret the data.
public var event: String?

/// The payload of the event.
public var data: String?

/// The amount of time, in milliseconds, the client should wait before reconnecting in case
/// of an interruption.
///
/// https://html.spec.whatwg.org/multipage/server-sent-events.html#the-eventsource-interface
public var retry: Int64?

/// Creates a new event.
/// - Parameters:
/// - id: A unique identifier of the event.
/// - event: A type of the event, helps inform how to interpret the data.
/// - data: The payload of the event.
/// - retry: The amount of time, in milliseconds, to wait before retrying.
public init(id: String? = nil, event: String? = nil, data: String? = nil, retry: Int64? = nil)
}

/// A sequence that parses arbitrary byte chunks into events using the Server-sent Events format.
///
/// https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events
public struct ServerSentEventsDeserializationSequence<Upstream> : Sendable where Upstream : Sendable, Upstream : AsyncSequence, Upstream.Element == ArraySlice<UInt8> {

/// Creates a new sequence.
/// - Parameter upstream: The upstream sequence of arbitrary byte chunks.
public init(upstream: Upstream)
}

extension ServerSentEventsDeserializationSequence : AsyncSequence {
public typealias Element = ServerSentEvent
public struct Iterator<UpstreamIterator> : AsyncIteratorProtocol where UpstreamIterator : AsyncIteratorProtocol, UpstreamIterator.Element == ArraySlice<UInt8> {
public mutating func next() async throws -> ServerSentEvent?
}
public func makeAsyncIterator() -> Iterator<Upstream.AsyncIterator>
}

extension AsyncSequence where Self.Element == ArraySlice<UInt8> {

/// Returns another sequence that decodes each event's data as the provided type using the provided decoder.
///
/// Use this method if the event's `data` field is not JSON, or if you don't want to parse it using `asDecodedServerSentEventsWithJSONData`.
/// - Returns: A sequence that provides the events.
public func asDecodedServerSentEvents() -> ServerSentEventsDeserializationSequence<ServerSentEventsLineDeserializationSequence<Self>>

/// Returns another sequence that decodes each event's data as the provided type using the provided decoder.
///
/// Use this method if the event's `data` field is JSON.
/// - Parameters:
/// - dataType: The type to decode the JSON data into.
/// - decoder: The JSON decoder to use.
/// - Returns: A sequence that provides the events with the decoded JSON data.
public func asDecodedServerSentEventsWithJSONData<JSONDataType>(of dataType: JSONDataType.Type = JSONDataType.self, decoder: JSONDecoder = .init()) -> AsyncThrowingMapSequence<ServerSentEventsDeserializationSequence<ServerSentEventsLineDeserializationSequence<Self>>, ServerSentEventWithJSONData<JSONDataType>> where JSONDataType : Decodable
}

/// A sequence that serializes Server-sent Events.
public struct ServerSentEventsSerializationSequence<Upstream> : Sendable where Upstream : Sendable, Upstream : AsyncSequence, Upstream.Element == ServerSentEvent {

/// Creates a new sequence.
/// - Parameter upstream: The upstream sequence of events.
public init(upstream: Upstream)
}

extension ServerSentEventsSerializationSequence : AsyncSequence where Upstream.Element == ServerSentEvent {
public typealias Element = ArraySlice<UInt8>
public struct Iterator<UpstreamIterator> : AsyncIteratorProtocol where UpstreamIterator : AsyncIteratorProtocol, Upstream.Element == ServerSentEvent, UpstreamIterator.Element == ServerSentEvent {
public mutating func next() async throws -> ArraySlice<UInt8>?
}
public func makeAsyncIterator() -> Iterator<Upstream.AsyncIterator>
}

extension AsyncSequence {

/// Returns another sequence that encodes Server-sent Events with generic data in the data field.
/// - Returns: A sequence that provides the serialized Server-sent Events.
public func asEncodedServerSentEvents() -> ServerSentEventsSerializationSequence<Self> where Self : Sendable, Self.Element == ServerSentEvent

/// Returns another sequence that encodes Server-sent Events that have a JSON value in the data field.
/// - Parameter encoder: The JSON encoder to use.
/// - Returns: A sequence that provides the serialized Server-sent Events.
public func asEncodedServerSentEventsWithJSONData<JSONDataType>(encoder: JSONEncoder = {
let encoder = JSONEncoder()
encoder.outputFormatting = [.sortedKeys, .withoutEscapingSlashes]
return encoder
}()) -> ServerSentEventsSerializationSequence<AsyncThrowingMapSequence<Self, ServerSentEvent>> where JSONDataType : Encodable, Self.Element == ServerSentEventWithJSONData<JSONDataType>
}
```

### API stability

Additive changes to the runtime library, no API changes to the generator or other components.

### Future directions

We could add additional event stream formats, if they become popular and well-defined in the industry.

### Alternatives considered

- Not doing anything - this would require adopters to write these encoders and decoders by hand, which is time-consuming, error prone, and duplicates effort across the ecosystem.
- Generating special types for these streams - this was rejected because it would force the adopter to parse the event stream, even if they instead wanted to forward it as raw data elsewhere. Since these event streams formats are not part of OpenAPI, it felt like a too strong of a limitation, which is why these conveniences are opt-in.

0 comments on commit b18bece

Please sign in to comment.