From 664183ecaad8723e1450a204a010303760c08fb9 Mon Sep 17 00:00:00 2001 From: Joannis Orlandos Date: Sat, 9 Mar 2024 22:53:31 +0100 Subject: [PATCH 1/3] Create an AsyncStream based OTel Logger w/ HB2 example --- .../Sources/ServerExample/ServerExample.swift | 39 ++-- .../Server/docker/otel-collector-config.yaml | 11 +- .../Logging/OTLPGRPCLogExporter.swift | 175 ++++++++++++++++++ .../OTLPGRPCLogExporterConfiguration.swift | 70 +++++++ .../Logging/Exporting/OTelLogExporter.swift | 40 ++++ Sources/OTel/Logging/OTLPLogDataModel.swift | 22 +++ Sources/OTel/Logging/OTelLogger.swift | 87 +++++++++ 7 files changed, 428 insertions(+), 16 deletions(-) create mode 100644 Sources/OTLPGRPC/Logging/OTLPGRPCLogExporter.swift create mode 100644 Sources/OTLPGRPC/Logging/OTLPGRPCLogExporterConfiguration.swift create mode 100644 Sources/OTel/Logging/Exporting/OTelLogExporter.swift create mode 100644 Sources/OTel/Logging/OTLPLogDataModel.swift create mode 100644 Sources/OTel/Logging/OTelLogger.swift diff --git a/Examples/Server/Sources/ServerExample/ServerExample.swift b/Examples/Server/Sources/ServerExample/ServerExample.swift index f8090e14..bf5356ea 100644 --- a/Examples/Server/Sources/ServerExample/ServerExample.swift +++ b/Examples/Server/Sources/ServerExample/ServerExample.swift @@ -16,18 +16,14 @@ import Logging import Metrics @_spi(Metrics) import OTel @_spi(Metrics) import OTLPGRPC +@_spi(Logging) import OTel +@_spi(Logging) import OTLPGRPC +import ServiceLifecycle import Tracing @main enum ServerMiddlewareExample { static func main() async throws { - // Bootstrap the logging backend with the OTel metadata provider which includes span IDs in logging messages. - LoggingSystem.bootstrap { label in - var handler = StreamLogHandler.standardError(label: label, metadataProvider: .otel) - handler.logLevel = .trace - return handler - } - // Configure OTel resource detection to automatically apply helpful attributes to events. let environment = OTelEnvironment.detected() let resourceDetection = OTelResourceDetection(detectors: [ @@ -51,14 +47,28 @@ enum ServerMiddlewareExample { ) MetricsSystem.bootstrap(OTLPMetricsFactory(registry: registry)) + // Bootstrap the logging backend with the OTel metadata provider which includes span IDs in logging messages. + let logExporter = try OTLPGRPCLogExporter( + configuration: .init(environment: environment) + ) + + let logger = OTelStreamingLogger( + resource: resource, + exporter: logExporter, + logLevel: .trace + ) + LoggingSystem.bootstrap { label in + return logger + } + // Bootstrap the tracing backend to export traces periodically in OTLP/gRPC. - let exporter = try OTLPGRPCSpanExporter(configuration: .init(environment: environment)) - let processor = OTelBatchSpanProcessor(exporter: exporter, configuration: .init(environment: environment)) + let traceExporter = try OTLPGRPCSpanExporter(configuration: .init(environment: environment)) + let traceProcessor = OTelBatchSpanProcessor(exporter: traceExporter, configuration: .init(environment: environment)) let tracer = OTelTracer( idGenerator: OTelRandomIDGenerator(), sampler: OTelConstantSampler(isOn: true), propagator: OTelW3CPropagator(), - processor: processor, + processor: traceProcessor, environment: environment, resource: resource ) @@ -69,11 +79,14 @@ enum ServerMiddlewareExample { router.middlewares.add(HBTracingMiddleware()) router.middlewares.add(HBMetricsMiddleware()) router.middlewares.add(HBLogRequestsMiddleware(.info)) - router.get("hello") { _, _ in "hello" } + router.get("hello") { _, context in + context.logger.info("Someone visited me, at last!") + return "hello" + } var app = HBApplication(router: router) - // Add the tracer lifecycle service to the HTTP server service group and start the application. - app.addServices(metrics, tracer) + // Add the logger, metrics and tracer lifecycle services to the HTTP server service group and start the application. + app.addServices(logger, metrics, tracer) try await app.runService() } } diff --git a/Examples/Server/docker/otel-collector-config.yaml b/Examples/Server/docker/otel-collector-config.yaml index c4bc9c6f..153274eb 100644 --- a/Examples/Server/docker/otel-collector-config.yaml +++ b/Examples/Server/docker/otel-collector-config.yaml @@ -5,8 +5,8 @@ receivers: endpoint: "otel-collector:4317" exporters: - debug: # Data sources: traces, metrics, logs - verbosity: detailed + logging: + loglevel: debug prometheus: # Data sources: metrics endpoint: "otel-collector:7070" @@ -24,6 +24,11 @@ service: exporters: [prometheus, debug] traces: receivers: [otlp] - exporters: [otlp/jaeger, debug] + exporters: [otlp/jaeger] + + logs: + receivers: [otlp] + processors: [] + exporters: [logging] # yaml-language-server: $schema=https://raw.githubusercontent.com/srikanthccv/otelcol-jsonschema/main/schema.json diff --git a/Sources/OTLPGRPC/Logging/OTLPGRPCLogExporter.swift b/Sources/OTLPGRPC/Logging/OTLPGRPCLogExporter.swift new file mode 100644 index 00000000..ed2ea6fe --- /dev/null +++ b/Sources/OTLPGRPC/Logging/OTLPGRPCLogExporter.swift @@ -0,0 +1,175 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift OTel open source project +// +// Copyright (c) 2024 Moritz Lang and the Swift OTel project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import GRPC +import Logging +import NIO +import NIOHPACK +import NIOSSL +@_spi(Logging) import OTel +@_spi(Logging) import OTLPCore + +/// Exports logs to an OTel collector using OTLP/gRPC. +@_spi(Logging) +public final class OTLPGRPCLogExporter: OTelLogExporter { + private let configuration: OTLPGRPCLogExporterConfiguration + private let connection: ClientConnection + private let client: Opentelemetry_Proto_Collector_Logs_V1_LogsServiceAsyncClient + private let logger = Logger(label: String(describing: OTLPGRPCLogExporter.self)) + + public init( + configuration: OTLPGRPCLogExporterConfiguration, + group: EventLoopGroup = MultiThreadedEventLoopGroup.singleton, + requestLogger: Logger = ._otelDisabled, + backgroundActivityLogger: Logger = ._otelDisabled + ) { + self.configuration = configuration + + var connectionConfiguration = ClientConnection.Configuration.default( + target: .host(configuration.endpoint.host, port: configuration.endpoint.port), + eventLoopGroup: group + ) + + if configuration.endpoint.isInsecure { + logger.debug("Using insecure connection.", metadata: [ + "host": "\(configuration.endpoint.host)", + "port": "\(configuration.endpoint.port)", + ]) + } + + // TODO: Support OTEL_EXPORTER_OTLP_CERTIFICATE + // TODO: Support OTEL_EXPORTER_OTLP_CLIENT_KEY + // TODO: Support OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE + + var headers = configuration.headers + if !headers.isEmpty { + logger.trace("Configured custom request headers.", metadata: [ + "keys": .array(headers.map { "\($0.name)" }), + ]) + } + headers.replaceOrAdd(name: "user-agent", value: "OTel-OTLP-Exporter-Swift/\(OTelLibrary.version)") + + connectionConfiguration.backgroundActivityLogger = backgroundActivityLogger + connection = ClientConnection(configuration: connectionConfiguration) + + client = Opentelemetry_Proto_Collector_Logs_V1_LogsServiceAsyncClient( + channel: connection, + defaultCallOptions: .init(customMetadata: headers, logger: requestLogger) + ) + } + + public func export(_ batch: some Collection & Sendable) async throws { + if case .shutdown = connection.connectivity.state { + throw OTelLogExporterAlreadyShutDownError() + } + + guard !batch.isEmpty else { return } + + let request = Opentelemetry_Proto_Collector_Logs_V1_ExportLogsServiceRequest.with { request in + request.resourceLogs = [ + Opentelemetry_Proto_Logs_V1_ResourceLogs.with { resourceLog in + resourceLog.scopeLogs = [ + Opentelemetry_Proto_Logs_V1_ScopeLogs.with { scopeLog in + scopeLog.logRecords = batch.map { log in + Opentelemetry_Proto_Logs_V1_LogRecord.with { logRecord in + logRecord.timeUnixNano = log.timeNanosecondsSinceEpoch + logRecord.observedTimeUnixNano = log.timeNanosecondsSinceEpoch + logRecord.severityNumber = switch log.level { + case .trace: .trace + case .debug: .debug + case .info: .info + case .notice: .info4 + case .warning: .warn + case .error: .error + case .critical: .fatal + } + logRecord.severityText = switch log.level { + case .trace: "TRACE" + case .debug: "DEUG" + case .info: "INFO" + case .notice: "NOTICE" + case .warning: "WARNING" + case .error: "ERROR" + case .critical: "CRITICAL" + } + if let metadata = log.metadata { + logRecord.attributes = .init(metadata) + } + logRecord.body = .with { body in + body.stringValue = log.body + } + } + } + } + ] + } + ] + } + + _ = try await client.export(request) + } + + public func forceFlush() async throws {} + + public func shutdown() async { + let promise = connection.eventLoop.makePromise(of: Void.self) + + connection.closeGracefully(deadline: .now() + .milliseconds(500), promise: promise) + + try? await promise.futureResult.get() + } +} + +@_spi(Logging) +extension [Opentelemetry_Proto_Common_V1_KeyValue] { + package init(_ metadata: Logger.Metadata) { + self = metadata.map { key, value in + return .with { attribute in + attribute.key = key + attribute.value = .init(value) + } + } + } +} + +@_spi(Logging) +extension Opentelemetry_Proto_Common_V1_KeyValueList { + package init(_ metadata: Logger.Metadata) { + self = .with { keyValueList in + keyValueList.values = .init(metadata) + } + } +} + +@_spi(Logging) +extension Opentelemetry_Proto_Common_V1_AnyValue { + package init(_ value: Logger.Metadata.Value) { + self = .with { attributeValue in + attributeValue.value = switch value { + case .string(let string): .stringValue(string) + case .stringConvertible(let stringConvertible): .stringValue(stringConvertible.description) + case .dictionary(let metadata): .kvlistValue(.init(metadata)) + case .array(let values): .arrayValue(.init(values)) + } + } + } +} + +@_spi(Logging) +extension Opentelemetry_Proto_Common_V1_ArrayValue { + package init(_ values: [Logger.Metadata.Value]) { + self = .with { valueList in + valueList.values = values.map(Opentelemetry_Proto_Common_V1_AnyValue.init) + } + } +} diff --git a/Sources/OTLPGRPC/Logging/OTLPGRPCLogExporterConfiguration.swift b/Sources/OTLPGRPC/Logging/OTLPGRPCLogExporterConfiguration.swift new file mode 100644 index 00000000..277ed2c4 --- /dev/null +++ b/Sources/OTLPGRPC/Logging/OTLPGRPCLogExporterConfiguration.swift @@ -0,0 +1,70 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift OTel open source project +// +// Copyright (c) 2024 Moritz Lang and the Swift OTel project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIOHPACK +import OTel + +@_spi(Logging) +public struct OTLPGRPCLogExporterConfiguration: Sendable { + let endpoint: OTLPGRPCEndpoint + let headers: HPACKHeaders + + /// Create a configuration for an ``OTLPGRPCMetricExporter``. + /// + /// - Parameters: + /// - environment: The environment variables. + /// - endpoint: An optional endpoint string that takes precedence over any environment values. Defaults to `localhost:4317` if `nil`. + /// - shouldUseAnInsecureConnection: Whether to use an insecure connection in the absence of a scheme inside an endpoint configuration value. + /// - headers: Optional headers that take precedence over any headers configured via environment values. + public init( + environment: OTelEnvironment, + endpoint: String? = nil, + shouldUseAnInsecureConnection: Bool? = nil, + headers: HPACKHeaders? = nil + ) throws { + let shouldUseAnInsecureConnection = try environment.value( + programmaticOverride: shouldUseAnInsecureConnection, + signalSpecificKey: "OTEL_EXPORTER_OTLP_LOGGING_INSECURE", + sharedKey: "OTEL_EXPORTER_OTLP_INSECURE" + ) ?? false + + let programmaticEndpoint: OTLPGRPCEndpoint? = try { + guard let endpoint else { return nil } + return try OTLPGRPCEndpoint(urlString: endpoint, isInsecure: shouldUseAnInsecureConnection) + }() + + self.endpoint = try environment.value( + programmaticOverride: programmaticEndpoint, + signalSpecificKey: "OTEL_EXPORTER_OTLP_LOGGING_ENDPOINT", + sharedKey: "OTEL_EXPORTER_OTLP_ENDPOINT", + transformValue: { value in + do { + return try OTLPGRPCEndpoint(urlString: value, isInsecure: shouldUseAnInsecureConnection) + } catch { + // TODO: Log + return nil + } + } + ) ?? .default + + self.headers = try environment.value( + programmaticOverride: headers, + signalSpecificKey: "OTEL_EXPORTER_OTLP_LOGGING_HEADERS", + sharedKey: "OTEL_EXPORTER_OTLP_HEADERS", + transformValue: { value in + guard let keyValuePairs = OTelEnvironment.headers(parsingValue: value) else { return nil } + return HPACKHeaders(keyValuePairs) + } + ) ?? [:] + } +} diff --git a/Sources/OTel/Logging/Exporting/OTelLogExporter.swift b/Sources/OTel/Logging/Exporting/OTelLogExporter.swift new file mode 100644 index 00000000..274dd597 --- /dev/null +++ b/Sources/OTel/Logging/Exporting/OTelLogExporter.swift @@ -0,0 +1,40 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift OTel open source project +// +// Copyright (c) 2024 Moritz Lang and the Swift OTel project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +/// A span exporter receives batches of processed spans to export them, e.g. by sending them over the network. +/// +/// [OpenTelemetry specification: Span exporter](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.20.0/specification/trace/sdk.md#span-exporter) +@_spi(Logging) +public protocol OTelLogExporter: Sendable { + /// Export the given batch of spans. + /// + /// - Parameter batch: A batch of spans to export. + func export(_ batch: some Collection & Sendable) async throws + + /// Force the span exporter to export any previously received spans as soon as possible. + func forceFlush() async throws + + /// Shut down the span exporter. + ///a + /// This method gives exporters a chance to wrap up existing work such as finishing in-flight exports while not allowing new ones anymore. + /// Once this method returns, the exporter is to be considered shut down and further invocations of ``export(_:)`` + /// are expected to fail. + func shutdown() async +} + +/// An error indicating that a given exporter has already been shut down while receiving an additional batch of spans to export. +@_spi(Logging) +public struct OTelLogExporterAlreadyShutDownError: Error { + /// Initialize the error. + public init() {} +} diff --git a/Sources/OTel/Logging/OTLPLogDataModel.swift b/Sources/OTel/Logging/OTLPLogDataModel.swift new file mode 100644 index 00000000..7d40c53a --- /dev/null +++ b/Sources/OTel/Logging/OTLPLogDataModel.swift @@ -0,0 +1,22 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift OTel open source project +// +// Copyright (c) 2024 Moritz Lang and the Swift OTel project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Logging + +@_spi(Logging) +public struct OTelLog: Equatable, Sendable { + public let body: String + public let level: Logger.Level + public let metadata: Logger.Metadata? + public let timeNanosecondsSinceEpoch: UInt64 +} diff --git a/Sources/OTel/Logging/OTelLogger.swift b/Sources/OTel/Logging/OTelLogger.swift new file mode 100644 index 00000000..7f2ece35 --- /dev/null +++ b/Sources/OTel/Logging/OTelLogger.swift @@ -0,0 +1,87 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift OTel open source project +// +// Copyright (c) 2024 Moritz Lang and the Swift OTel project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import AsyncAlgorithms +import Logging +import NIOConcurrencyHelpers +import ServiceLifecycle +import Logging +import Tracing + +@globalActor fileprivate actor OTelLoggingActor { + static let shared = OTelLoggingActor() +} + +@_spi(Logging) +@available(macOS 14, *) +public final class OTelStreamingLogger: Service, Sendable, LogHandler { + private let exporter: OTelLogExporter + var resource: OTelResource + private let logMessages: AsyncStream + private let logMessagesContinuation: AsyncStream.Continuation + public var metadata: Logging.Logger.Metadata + public var logLevel: Logging.Logger.Level + + public init( + resource: OTelResource, + exporter: OTelLogExporter, + logLevel: Logger.Level, + metadata: Logger.Metadata = [:] + ) { + self.resource = resource + self.exporter = exporter + self.logLevel = logLevel + self.metadata = metadata + (self.logMessages, self.logMessagesContinuation) = AsyncStream.makeStream(bufferingPolicy: .unbounded) + } + + public subscript(metadataKey key: String) -> Logging.Logger.Metadata.Value? { + get { metadata[key] } + set { metadata[key] = newValue } + } + + public func run() async throws { + await withDiscardingTaskGroup { taskGroup in + for await message in logMessages.cancelOnGracefulShutdown() { + taskGroup.addTask { + do { + try await self.exporter.export([message]) + } catch { + // TODO: Do we report this? What do we do? + } + } + } + } + } + + public func log( + level: Logger.Level, + message: Logger.Message, + metadata: Logger.Metadata?, + source: String, + file: String, + function: String, + line: UInt + ) { + let instant = DefaultTracerClock().now + + let message = OTelLog( + body: message.description, + level: level, + metadata: metadata, + timeNanosecondsSinceEpoch: instant.nanosecondsSinceEpoch + ) + + logMessagesContinuation.yield(message) + } +} From 1f969cd04909fa92fd0218085debeeb48573d53a Mon Sep 17 00:00:00 2001 From: Joannis Orlandos Date: Mon, 25 Mar 2024 11:38:17 +0100 Subject: [PATCH 2/3] Add LogProcessors, and replace OTelLogger with OTelLogHandler --- ...{OTelLogger.swift => OTelLogHandler.swift} | 35 +--- .../Batch/OTelBatchLogProcessor.swift | 153 ++++++++++++++++++ .../OTelBatchLogProcessorConfiguration.swift | 82 ++++++++++ .../Logging/Processing/OTelLogProcessor.swift | 31 ++++ .../OTelMultiplexLogProcessor.swift | 69 ++++++++ .../Processing/OTelNoOpLogProcessor.swift | 40 +++++ .../Processing/OTelSimpleLogProcessor.swift | 46 ++++++ .../Processing/OTelSpanProcessor.swift | 2 +- 8 files changed, 427 insertions(+), 31 deletions(-) rename Sources/OTel/Logging/{OTelLogger.swift => OTelLogHandler.swift} (56%) create mode 100644 Sources/OTel/Logging/Processing/Batch/OTelBatchLogProcessor.swift create mode 100644 Sources/OTel/Logging/Processing/Batch/OTelBatchLogProcessorConfiguration.swift create mode 100644 Sources/OTel/Logging/Processing/OTelLogProcessor.swift create mode 100644 Sources/OTel/Logging/Processing/OTelMultiplexLogProcessor.swift create mode 100644 Sources/OTel/Logging/Processing/OTelNoOpLogProcessor.swift create mode 100644 Sources/OTel/Logging/Processing/OTelSimpleLogProcessor.swift diff --git a/Sources/OTel/Logging/OTelLogger.swift b/Sources/OTel/Logging/OTelLogHandler.swift similarity index 56% rename from Sources/OTel/Logging/OTelLogger.swift rename to Sources/OTel/Logging/OTelLogHandler.swift index 7f2ece35..d6df9981 100644 --- a/Sources/OTel/Logging/OTelLogger.swift +++ b/Sources/OTel/Logging/OTelLogHandler.swift @@ -18,31 +18,20 @@ import ServiceLifecycle import Logging import Tracing -@globalActor fileprivate actor OTelLoggingActor { - static let shared = OTelLoggingActor() -} - @_spi(Logging) -@available(macOS 14, *) -public final class OTelStreamingLogger: Service, Sendable, LogHandler { - private let exporter: OTelLogExporter - var resource: OTelResource - private let logMessages: AsyncStream - private let logMessagesContinuation: AsyncStream.Continuation +public struct OTelLogHandler: Sendable, LogHandler { public var metadata: Logging.Logger.Metadata public var logLevel: Logging.Logger.Level + private let processor: any OTelLogProcessor public init( - resource: OTelResource, - exporter: OTelLogExporter, + processor: any OTelLogProcessor, logLevel: Logger.Level, metadata: Logger.Metadata = [:] ) { - self.resource = resource - self.exporter = exporter + self.processor = processor self.logLevel = logLevel self.metadata = metadata - (self.logMessages, self.logMessagesContinuation) = AsyncStream.makeStream(bufferingPolicy: .unbounded) } public subscript(metadataKey key: String) -> Logging.Logger.Metadata.Value? { @@ -50,20 +39,6 @@ public final class OTelStreamingLogger: Service, Sendable, LogHandler { set { metadata[key] = newValue } } - public func run() async throws { - await withDiscardingTaskGroup { taskGroup in - for await message in logMessages.cancelOnGracefulShutdown() { - taskGroup.addTask { - do { - try await self.exporter.export([message]) - } catch { - // TODO: Do we report this? What do we do? - } - } - } - } - } - public func log( level: Logger.Level, message: Logger.Message, @@ -82,6 +57,6 @@ public final class OTelStreamingLogger: Service, Sendable, LogHandler { timeNanosecondsSinceEpoch: instant.nanosecondsSinceEpoch ) - logMessagesContinuation.yield(message) + processor.onLog(message) } } diff --git a/Sources/OTel/Logging/Processing/Batch/OTelBatchLogProcessor.swift b/Sources/OTel/Logging/Processing/Batch/OTelBatchLogProcessor.swift new file mode 100644 index 00000000..9cd41259 --- /dev/null +++ b/Sources/OTel/Logging/Processing/Batch/OTelBatchLogProcessor.swift @@ -0,0 +1,153 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift OTel open source project +// +// Copyright (c) 2024 Moritz Lang and the Swift OTel project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import AsyncAlgorithms +import DequeModule +import Logging +import ServiceLifecycle + +/// A log processor that batches logs and forwards them to a configured exporter. +/// +/// [OpenTelemetry Specification: Batching processor](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.20.0/specification/logs/sdk.md#batching-processor) +@_spi(Logging) +public actor OTelBatchLogProcessor: + OTelLogProcessor, + Service, + CustomStringConvertible +where Clock.Duration == Duration +{ + public nonisolated let description = "OTelBatchLogProcessor" + + internal /* for testing */ private(set) var buffer: Deque + + private let exporter: Exporter + private let configuration: OTelBatchLogProcessorConfiguration + private let clock: Clock + private let logStream: AsyncStream + private let logContinuation: AsyncStream.Continuation + private let explicitTickStream: AsyncStream + private let explicitTick: AsyncStream.Continuation + + @_spi(Testing) + public init(exporter: Exporter, configuration: OTelBatchLogProcessorConfiguration, clock: Clock) { + self.exporter = exporter + self.configuration = configuration + self.clock = clock + + buffer = Deque(minimumCapacity: Int(configuration.maximumQueueSize)) + (explicitTickStream, explicitTick) = AsyncStream.makeStream() + (logStream, logContinuation) = AsyncStream.makeStream() + } + + nonisolated public func onLog(_ log: OTelLog) { + logContinuation.yield(log) + } + + private func _onLog(_ log: OTelLog) { + buffer.append(log) + + if self.buffer.count == self.configuration.maximumQueueSize { + self.explicitTick.yield() + } + } + + public func run() async throws { + let timerSequence = AsyncTimerSequence(interval: configuration.scheduleDelay, clock: clock).map { _ in } + let mergedSequence = merge(timerSequence, explicitTickStream).cancelOnGracefulShutdown() + + await withThrowingTaskGroup(of: Void.self) { taskGroup in + taskGroup.addTask { + for await log in self.logStream { + await self._onLog(log) + } + } + + taskGroup.addTask { + for try await _ in mergedSequence where !(await self.buffer.isEmpty) { + await self.tick() + } + } + + try? await taskGroup.next() + taskGroup.cancelAll() + } + + try? await forceFlush() + await exporter.shutdown() + } + + public func forceFlush() async throws { + let chunkSize = Int(configuration.maximumExportBatchSize) + let batches = stride(from: 0, to: buffer.count, by: chunkSize).map { + buffer[$0 ..< min($0 + Int(configuration.maximumExportBatchSize), buffer.count)] + } + + if !buffer.isEmpty { + buffer.removeAll() + + await withThrowingTaskGroup(of: Void.self) { group in + for batch in batches { + group.addTask { await self.export(batch) } + } + + group.addTask { + try await Task.sleep(for: self.configuration.exportTimeout, clock: self.clock) + throw CancellationError() + } + + try? await group.next() + group.cancelAll() + } + } + + try await exporter.forceFlush() + } + + private func tick() async { + let batch = buffer.prefix(Int(configuration.maximumExportBatchSize)) + buffer.removeFirst(batch.count) + + await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { await self.export(batch) } + group.addTask { + try await Task.sleep(for: self.configuration.exportTimeout, clock: self.clock) + throw CancellationError() + } + + try? await group.next() + group.cancelAll() + } + } + + private func export(_ batch: some Collection & Sendable) async { + do { + try await exporter.export(batch) + } catch is CancellationError { + // No-op + } catch { + // TODO: Should we emit this error somewhere? + } + } +} + +@_spi(Logging) +extension OTelBatchLogProcessor where Clock == ContinuousClock { + /// Create a batch log processor exporting log batches via the given log exporter. + /// + /// - Parameters: + /// - exporter: The log exporter to receive batched logs to export. + /// - configuration: Further configuration parameters to tweak the batching behavior. + public init(exporter: Exporter, configuration: OTelBatchLogProcessorConfiguration) { + self.init(exporter: exporter, configuration: configuration, clock: .continuous) + } +} diff --git a/Sources/OTel/Logging/Processing/Batch/OTelBatchLogProcessorConfiguration.swift b/Sources/OTel/Logging/Processing/Batch/OTelBatchLogProcessorConfiguration.swift new file mode 100644 index 00000000..65718492 --- /dev/null +++ b/Sources/OTel/Logging/Processing/Batch/OTelBatchLogProcessorConfiguration.swift @@ -0,0 +1,82 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift OTel open source project +// +// Copyright (c) 2024 Moritz Lang and the Swift OTel project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +/// The configuration options for an ``OTelBatchLogProcessor``. +@_spi(Logging) +public struct OTelBatchLogProcessorConfiguration: Sendable { + /// The maximum queue size. + /// + /// - Warning: After this size is reached log will be dropped. + public var maximumQueueSize: UInt + + /// The maximum delay between two consecutive log exports. + public var scheduleDelay: Duration + + /// The maximum batch size of each export. + /// + /// - Note: If the queue reaches this size, a batch will be exported even if ``scheduleDelay`` has not elapsed. + public var maximumExportBatchSize: UInt + + /// The duration a single export can run until it is cancelled. + public var exportTimeout: Duration + + /// Create a batch log processor configuration. + /// + /// - Parameters: + /// - environment: The environment variables. + /// - maximumQueueSize: A maximum queue size used even if `OTEL_BSP_MAX_QUEUE_SIZE` is set. Defaults to `2048` if both are `nil`. + /// - scheduleDelay: A schedule delay used even if `OTEL_BSP_SCHEDULE_DELAY` is set. Defaults to `5` seconds if both are `nil`. + /// - maximumExportBatchSize: A maximum export batch size used even if `OTEL_BSP_MAX_EXPORT_BATCH_SIZE` is set. Defaults to `512` if both are `nil`. + /// - exportTimeout: An export timeout used even if `OTEL_BSP_EXPORT_TIMEOUT` is set. Defaults to `30` seconds if both are `nil`. + public init( + environment: OTelEnvironment, + maximumQueueSize: UInt? = nil, + scheduleDelay: Duration? = nil, + maximumExportBatchSize: UInt? = nil, + exportTimeout: Duration? = nil + ) { + self.maximumQueueSize = environment.requiredValue( + programmaticOverride: maximumQueueSize, + key: "OTEL_BSP_MAX_QUEUE_SIZE", + defaultValue: 2048, + transformValue: UInt.init + ) + + self.scheduleDelay = environment.requiredValue( + programmaticOverride: scheduleDelay, + key: "OTEL_BSP_SCHEDULE_DELAY", + defaultValue: .seconds(5), + transformValue: { + guard let milliseconds = UInt($0) else { return nil } + return Duration.milliseconds(milliseconds) + } + ) + + self.maximumExportBatchSize = environment.requiredValue( + programmaticOverride: maximumExportBatchSize, + key: "OTEL_BSP_MAX_EXPORT_BATCH_SIZE", + defaultValue: 512, + transformValue: UInt.init + ) + + self.exportTimeout = environment.requiredValue( + programmaticOverride: exportTimeout, + key: "OTEL_BSP_EXPORT_TIMEOUT", + defaultValue: .seconds(30), + transformValue: { + guard let milliseconds = UInt($0) else { return nil } + return Duration.milliseconds(milliseconds) + } + ) + } +} diff --git a/Sources/OTel/Logging/Processing/OTelLogProcessor.swift b/Sources/OTel/Logging/Processing/OTelLogProcessor.swift new file mode 100644 index 00000000..45e26730 --- /dev/null +++ b/Sources/OTel/Logging/Processing/OTelLogProcessor.swift @@ -0,0 +1,31 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift OTel open source project +// +// Copyright (c) 2024 Moritz Lang and the Swift OTel project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import ServiceLifecycle +import ServiceContextModule + +/// Log processors allow for processing logs throughout their lifetime via ``onStart(_:parentContext:)`` and ``onEnd(_:)`` calls. +/// Usually, log processors will forward logs to a configurable ``OTelLogExporter``. +/// +/// [OpenTelemetry specification: LogRecord processor](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.20.0/specification/logs/sdk.md#logrecordprocessor) +/// +/// ### Implementation Notes +/// +/// On shutdown, processors forwarding logs to an ``OTelLogExporter`` MUST shutdown that exporter. +@_spi(Logging) +public protocol OTelLogProcessor: Service & Sendable { + func onLog(_ log: OTelLog) + + /// Force log processors that batch logs to flush immediately. + func forceFlush() async throws +} diff --git a/Sources/OTel/Logging/Processing/OTelMultiplexLogProcessor.swift b/Sources/OTel/Logging/Processing/OTelMultiplexLogProcessor.swift new file mode 100644 index 00000000..003d44c8 --- /dev/null +++ b/Sources/OTel/Logging/Processing/OTelMultiplexLogProcessor.swift @@ -0,0 +1,69 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift OTel open source project +// +// Copyright (c) 2024 Moritz Lang and the Swift OTel project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import ServiceContextModule +import ServiceLifecycle + +/// A pseudo-``OTelLogProcessor`` that may be used to process using multiple other ``OTelLogProcessor``s. +@_spi(Logging) +public actor OTelMultiplexLogProcessor: OTelLogProcessor { + private let processors: [any OTelLogProcessor] + private let shutdownStream: AsyncStream + private let shutdownContinuation: AsyncStream.Continuation + + /// Create an ``OTelMultiplexLogProcessor``. + /// + /// - Parameter processors: An array of ``OTelLogProcessor``s, each of which will be invoked on log events + /// Processors are called sequentially and the order of this array defines the order in which they're being called. + public init(processors: [any OTelLogProcessor]) { + self.processors = processors + (shutdownStream, shutdownContinuation) = AsyncStream.makeStream() + } + + public func run() async throws { + await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + var shutdowns = self.shutdownStream.makeAsyncIterator() + await shutdowns.next() + throw CancellationError() + } + + for processor in processors { + group.addTask { try await processor.run() } + } + + await withGracefulShutdownHandler { + try? await group.next() + group.cancelAll() + } onGracefulShutdown: { + self.shutdownContinuation.yield() + } + } + } + + nonisolated public func onLog(_ log: OTelLog) { + for processor in processors { + processor.onLog(log) + } + } + + public func forceFlush() async throws { + try await withThrowingTaskGroup(of: Void.self) { group in + for processor in processors { + group.addTask { try await processor.forceFlush() } + } + + try await group.waitForAll() + } + } +} diff --git a/Sources/OTel/Logging/Processing/OTelNoOpLogProcessor.swift b/Sources/OTel/Logging/Processing/OTelNoOpLogProcessor.swift new file mode 100644 index 00000000..d0c6e900 --- /dev/null +++ b/Sources/OTel/Logging/Processing/OTelNoOpLogProcessor.swift @@ -0,0 +1,40 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift OTel open source project +// +// Copyright (c) 2024 Moritz Lang and the Swift OTel project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import ServiceContextModule + +/// A span processor that ignores all operations, used when no spans should be processed. +@_spi(Logging) +public struct OTelNoOpLogProcessor: OTelLogProcessor, CustomStringConvertible { + public let description = "OTelNoOpSpanProcessor" + + private let stream: AsyncStream + private let continuation: AsyncStream.Continuation + + /// Initialize a no-op span processor. + public init() { + (stream, continuation) = AsyncStream.makeStream() + } + + public func run() async { + for await _ in stream.cancelOnGracefulShutdown() {} + } + + public func onLog(_ log: OTelLog) { + // no-op + } + + public func forceFlush() async throws { + // no-op + } +} diff --git a/Sources/OTel/Logging/Processing/OTelSimpleLogProcessor.swift b/Sources/OTel/Logging/Processing/OTelSimpleLogProcessor.swift new file mode 100644 index 00000000..bda0c1bd --- /dev/null +++ b/Sources/OTel/Logging/Processing/OTelSimpleLogProcessor.swift @@ -0,0 +1,46 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift OTel open source project +// +// Copyright (c) 2023 Moritz Lang and the Swift OTel project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +@_spi(Logging) +public struct OTelSimpleLogProcessor: OTelLogProcessor { + private let exporter: Exporter + private let stream: AsyncStream + private let continuation: AsyncStream.Continuation + + public init(exporter: Exporter) { + self.exporter = exporter + (stream, continuation) = AsyncStream.makeStream() + } + + public func run() async throws { + for try await log in stream.cancelOnGracefulShutdown() { + do { + try await exporter.export([log]) + } catch { + // simple log processor does not attempt retries + } + } + } + + public func onLog(_ log: OTelLog) { + continuation.yield(log) + } + + public func forceFlush() async throws { + try await exporter.forceFlush() + } + + public func shutdown() async throws { + await exporter.shutdown() + } +} diff --git a/Sources/OTel/Tracing/Processing/OTelSpanProcessor.swift b/Sources/OTel/Tracing/Processing/OTelSpanProcessor.swift index 7f5b7ea0..cd7520bc 100644 --- a/Sources/OTel/Tracing/Processing/OTelSpanProcessor.swift +++ b/Sources/OTel/Tracing/Processing/OTelSpanProcessor.swift @@ -14,7 +14,7 @@ import ServiceContextModule import ServiceLifecycle -/// Span processor allow for processing spans throught their lifetime via ``onStart(_:parentContext:)`` and ``onEnd(_:)`` calls. +/// Span processors allow for processing spans throughout their lifetime via ``onStart(_:parentContext:)`` and ``onEnd(_:)`` calls. /// Usually, span processors will forward ended spans to a configurable ``OTelSpanExporter``. /// /// [OpenTelemetry specification: Span processor](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.20.0/specification/trace/sdk.md#span-processor) From fd24bc8549e7a2e00331b3ba76ccd2000d331f21 Mon Sep 17 00:00:00 2001 From: Joannis Orlandos Date: Mon, 25 Mar 2024 14:10:15 +0100 Subject: [PATCH 3/3] Rename 'OTelLog' to 'OTelLogEntry' --- ...r.swift => OTLPGRPCLogEntryExporter.swift} | 12 ++++----- ...LPGRPCLogEntryExporterConfiguration.swift} | 4 +-- ...orter.swift => OTelLogEntryExporter.swift} | 6 ++--- Sources/OTel/Logging/OTLPLogDataModel.swift | 2 +- Sources/OTel/Logging/OTelLogHandler.swift | 6 ++--- ...swift => OTelBatchLogEntryProcessor.swift} | 26 +++++++++---------- ...BatchLogEntryProcessorConfiguration.swift} | 4 +-- ...ssor.swift => OTelLogEntryProcessor.swift} | 8 +++--- ...t => OTelMultiplexLogEntryProcessor.swift} | 14 +++++----- ....swift => OTelNoOpLogEntryProcessor.swift} | 4 +-- ...wift => OTelSimpleLogEntryProcessor.swift} | 8 +++--- 11 files changed, 47 insertions(+), 47 deletions(-) rename Sources/OTLPGRPC/Logging/{OTLPGRPCLogExporter.swift => OTLPGRPCLogEntryExporter.swift} (94%) rename Sources/OTLPGRPC/Logging/{OTLPGRPCLogExporterConfiguration.swift => OTLPGRPCLogEntryExporterConfiguration.swift} (95%) rename Sources/OTel/Logging/Exporting/{OTelLogExporter.swift => OTelLogEntryExporter.swift} (88%) rename Sources/OTel/Logging/Processing/Batch/{OTelBatchLogProcessor.swift => OTelBatchLogEntryProcessor.swift} (83%) rename Sources/OTel/Logging/Processing/Batch/{OTelBatchLogProcessorConfiguration.swift => OTelBatchLogEntryProcessorConfiguration.swift} (95%) rename Sources/OTel/Logging/Processing/{OTelLogProcessor.swift => OTelLogEntryProcessor.swift} (81%) rename Sources/OTel/Logging/Processing/{OTelMultiplexLogProcessor.swift => OTelMultiplexLogEntryProcessor.swift} (78%) rename Sources/OTel/Logging/Processing/{OTelNoOpLogProcessor.swift => OTelNoOpLogEntryProcessor.swift} (88%) rename Sources/OTel/Logging/Processing/{OTelSimpleLogProcessor.swift => OTelSimpleLogEntryProcessor.swift} (81%) diff --git a/Sources/OTLPGRPC/Logging/OTLPGRPCLogExporter.swift b/Sources/OTLPGRPC/Logging/OTLPGRPCLogEntryExporter.swift similarity index 94% rename from Sources/OTLPGRPC/Logging/OTLPGRPCLogExporter.swift rename to Sources/OTLPGRPC/Logging/OTLPGRPCLogEntryExporter.swift index ed2ea6fe..863b43aa 100644 --- a/Sources/OTLPGRPC/Logging/OTLPGRPCLogExporter.swift +++ b/Sources/OTLPGRPC/Logging/OTLPGRPCLogEntryExporter.swift @@ -21,14 +21,14 @@ import NIOSSL /// Exports logs to an OTel collector using OTLP/gRPC. @_spi(Logging) -public final class OTLPGRPCLogExporter: OTelLogExporter { - private let configuration: OTLPGRPCLogExporterConfiguration +public final class OTLPGRPCLogEntryExporter: OTelLogEntryExporter { + private let configuration: OTLPGRPCLogEntryExporterConfiguration private let connection: ClientConnection private let client: Opentelemetry_Proto_Collector_Logs_V1_LogsServiceAsyncClient - private let logger = Logger(label: String(describing: OTLPGRPCLogExporter.self)) + private let logger = Logger(label: String(describing: OTLPGRPCLogEntryExporter.self)) public init( - configuration: OTLPGRPCLogExporterConfiguration, + configuration: OTLPGRPCLogEntryExporterConfiguration, group: EventLoopGroup = MultiThreadedEventLoopGroup.singleton, requestLogger: Logger = ._otelDisabled, backgroundActivityLogger: Logger = ._otelDisabled @@ -68,9 +68,9 @@ public final class OTLPGRPCLogExporter: OTelLogExporter { ) } - public func export(_ batch: some Collection & Sendable) async throws { + public func export(_ batch: some Collection & Sendable) async throws { if case .shutdown = connection.connectivity.state { - throw OTelLogExporterAlreadyShutDownError() + throw OTelLogEntryExporterAlreadyShutDownError() } guard !batch.isEmpty else { return } diff --git a/Sources/OTLPGRPC/Logging/OTLPGRPCLogExporterConfiguration.swift b/Sources/OTLPGRPC/Logging/OTLPGRPCLogEntryExporterConfiguration.swift similarity index 95% rename from Sources/OTLPGRPC/Logging/OTLPGRPCLogExporterConfiguration.swift rename to Sources/OTLPGRPC/Logging/OTLPGRPCLogEntryExporterConfiguration.swift index 277ed2c4..ae1e3919 100644 --- a/Sources/OTLPGRPC/Logging/OTLPGRPCLogExporterConfiguration.swift +++ b/Sources/OTLPGRPC/Logging/OTLPGRPCLogEntryExporterConfiguration.swift @@ -15,11 +15,11 @@ import NIOHPACK import OTel @_spi(Logging) -public struct OTLPGRPCLogExporterConfiguration: Sendable { +public struct OTLPGRPCLogEntryExporterConfiguration: Sendable { let endpoint: OTLPGRPCEndpoint let headers: HPACKHeaders - /// Create a configuration for an ``OTLPGRPCMetricExporter``. + /// Create a configuration for an ``OTLPGRPCLogEntryExporter``. /// /// - Parameters: /// - environment: The environment variables. diff --git a/Sources/OTel/Logging/Exporting/OTelLogExporter.swift b/Sources/OTel/Logging/Exporting/OTelLogEntryExporter.swift similarity index 88% rename from Sources/OTel/Logging/Exporting/OTelLogExporter.swift rename to Sources/OTel/Logging/Exporting/OTelLogEntryExporter.swift index 274dd597..e8d4d39b 100644 --- a/Sources/OTel/Logging/Exporting/OTelLogExporter.swift +++ b/Sources/OTel/Logging/Exporting/OTelLogEntryExporter.swift @@ -15,11 +15,11 @@ /// /// [OpenTelemetry specification: Span exporter](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.20.0/specification/trace/sdk.md#span-exporter) @_spi(Logging) -public protocol OTelLogExporter: Sendable { +public protocol OTelLogEntryExporter: Sendable { /// Export the given batch of spans. /// /// - Parameter batch: A batch of spans to export. - func export(_ batch: some Collection & Sendable) async throws + func export(_ batch: some Collection & Sendable) async throws /// Force the span exporter to export any previously received spans as soon as possible. func forceFlush() async throws @@ -34,7 +34,7 @@ public protocol OTelLogExporter: Sendable { /// An error indicating that a given exporter has already been shut down while receiving an additional batch of spans to export. @_spi(Logging) -public struct OTelLogExporterAlreadyShutDownError: Error { +public struct OTelLogEntryExporterAlreadyShutDownError: Error { /// Initialize the error. public init() {} } diff --git a/Sources/OTel/Logging/OTLPLogDataModel.swift b/Sources/OTel/Logging/OTLPLogDataModel.swift index 7d40c53a..7e8fb6c1 100644 --- a/Sources/OTel/Logging/OTLPLogDataModel.swift +++ b/Sources/OTel/Logging/OTLPLogDataModel.swift @@ -14,7 +14,7 @@ import Logging @_spi(Logging) -public struct OTelLog: Equatable, Sendable { +public struct OTelLogEntry: Equatable, Sendable { public let body: String public let level: Logger.Level public let metadata: Logger.Metadata? diff --git a/Sources/OTel/Logging/OTelLogHandler.swift b/Sources/OTel/Logging/OTelLogHandler.swift index d6df9981..0d6c299d 100644 --- a/Sources/OTel/Logging/OTelLogHandler.swift +++ b/Sources/OTel/Logging/OTelLogHandler.swift @@ -22,10 +22,10 @@ import Tracing public struct OTelLogHandler: Sendable, LogHandler { public var metadata: Logging.Logger.Metadata public var logLevel: Logging.Logger.Level - private let processor: any OTelLogProcessor + private let processor: any OTelLogEntryProcessor public init( - processor: any OTelLogProcessor, + processor: any OTelLogEntryProcessor, logLevel: Logger.Level, metadata: Logger.Metadata = [:] ) { @@ -50,7 +50,7 @@ public struct OTelLogHandler: Sendable, LogHandler { ) { let instant = DefaultTracerClock().now - let message = OTelLog( + let message = OTelLogEntry( body: message.description, level: level, metadata: metadata, diff --git a/Sources/OTel/Logging/Processing/Batch/OTelBatchLogProcessor.swift b/Sources/OTel/Logging/Processing/Batch/OTelBatchLogEntryProcessor.swift similarity index 83% rename from Sources/OTel/Logging/Processing/Batch/OTelBatchLogProcessor.swift rename to Sources/OTel/Logging/Processing/Batch/OTelBatchLogEntryProcessor.swift index 9cd41259..6972c6c3 100644 --- a/Sources/OTel/Logging/Processing/Batch/OTelBatchLogProcessor.swift +++ b/Sources/OTel/Logging/Processing/Batch/OTelBatchLogEntryProcessor.swift @@ -20,26 +20,26 @@ import ServiceLifecycle /// /// [OpenTelemetry Specification: Batching processor](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.20.0/specification/logs/sdk.md#batching-processor) @_spi(Logging) -public actor OTelBatchLogProcessor: - OTelLogProcessor, +public actor OTelBatchLogEntryProcessor: + OTelLogEntryProcessor, Service, CustomStringConvertible where Clock.Duration == Duration { - public nonisolated let description = "OTelBatchLogProcessor" + public nonisolated let description = "OTelBatchLogEntryProcessor" - internal /* for testing */ private(set) var buffer: Deque + internal /* for testing */ private(set) var buffer: Deque private let exporter: Exporter - private let configuration: OTelBatchLogProcessorConfiguration + private let configuration: OTelBatchLogEntryProcessorConfiguration private let clock: Clock - private let logStream: AsyncStream - private let logContinuation: AsyncStream.Continuation + private let logStream: AsyncStream + private let logContinuation: AsyncStream.Continuation private let explicitTickStream: AsyncStream private let explicitTick: AsyncStream.Continuation @_spi(Testing) - public init(exporter: Exporter, configuration: OTelBatchLogProcessorConfiguration, clock: Clock) { + public init(exporter: Exporter, configuration: OTelBatchLogEntryProcessorConfiguration, clock: Clock) { self.exporter = exporter self.configuration = configuration self.clock = clock @@ -49,11 +49,11 @@ where Clock.Duration == Duration (logStream, logContinuation) = AsyncStream.makeStream() } - nonisolated public func onLog(_ log: OTelLog) { + nonisolated public func onLog(_ log: OTelLogEntry) { logContinuation.yield(log) } - private func _onLog(_ log: OTelLog) { + private func _onLog(_ log: OTelLogEntry) { buffer.append(log) if self.buffer.count == self.configuration.maximumQueueSize { @@ -129,7 +129,7 @@ where Clock.Duration == Duration } } - private func export(_ batch: some Collection & Sendable) async { + private func export(_ batch: some Collection & Sendable) async { do { try await exporter.export(batch) } catch is CancellationError { @@ -141,13 +141,13 @@ where Clock.Duration == Duration } @_spi(Logging) -extension OTelBatchLogProcessor where Clock == ContinuousClock { +extension OTelBatchLogEntryProcessor where Clock == ContinuousClock { /// Create a batch log processor exporting log batches via the given log exporter. /// /// - Parameters: /// - exporter: The log exporter to receive batched logs to export. /// - configuration: Further configuration parameters to tweak the batching behavior. - public init(exporter: Exporter, configuration: OTelBatchLogProcessorConfiguration) { + public init(exporter: Exporter, configuration: OTelBatchLogEntryProcessorConfiguration) { self.init(exporter: exporter, configuration: configuration, clock: .continuous) } } diff --git a/Sources/OTel/Logging/Processing/Batch/OTelBatchLogProcessorConfiguration.swift b/Sources/OTel/Logging/Processing/Batch/OTelBatchLogEntryProcessorConfiguration.swift similarity index 95% rename from Sources/OTel/Logging/Processing/Batch/OTelBatchLogProcessorConfiguration.swift rename to Sources/OTel/Logging/Processing/Batch/OTelBatchLogEntryProcessorConfiguration.swift index 65718492..59435b60 100644 --- a/Sources/OTel/Logging/Processing/Batch/OTelBatchLogProcessorConfiguration.swift +++ b/Sources/OTel/Logging/Processing/Batch/OTelBatchLogEntryProcessorConfiguration.swift @@ -11,9 +11,9 @@ // //===----------------------------------------------------------------------===// -/// The configuration options for an ``OTelBatchLogProcessor``. +/// The configuration options for an ``OTelBatchLogEntryProcessor``. @_spi(Logging) -public struct OTelBatchLogProcessorConfiguration: Sendable { +public struct OTelBatchLogEntryProcessorConfiguration: Sendable { /// The maximum queue size. /// /// - Warning: After this size is reached log will be dropped. diff --git a/Sources/OTel/Logging/Processing/OTelLogProcessor.swift b/Sources/OTel/Logging/Processing/OTelLogEntryProcessor.swift similarity index 81% rename from Sources/OTel/Logging/Processing/OTelLogProcessor.swift rename to Sources/OTel/Logging/Processing/OTelLogEntryProcessor.swift index 45e26730..e789899e 100644 --- a/Sources/OTel/Logging/Processing/OTelLogProcessor.swift +++ b/Sources/OTel/Logging/Processing/OTelLogEntryProcessor.swift @@ -15,16 +15,16 @@ import ServiceLifecycle import ServiceContextModule /// Log processors allow for processing logs throughout their lifetime via ``onStart(_:parentContext:)`` and ``onEnd(_:)`` calls. -/// Usually, log processors will forward logs to a configurable ``OTelLogExporter``. +/// Usually, log processors will forward logs to a configurable ``OTelLogEntryExporter``. /// /// [OpenTelemetry specification: LogRecord processor](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.20.0/specification/logs/sdk.md#logrecordprocessor) /// /// ### Implementation Notes /// -/// On shutdown, processors forwarding logs to an ``OTelLogExporter`` MUST shutdown that exporter. +/// On shutdown, processors forwarding logs to an ``OTelLogEntryExporter`` MUST shutdown that exporter. @_spi(Logging) -public protocol OTelLogProcessor: Service & Sendable { - func onLog(_ log: OTelLog) +public protocol OTelLogEntryProcessor: Service & Sendable { + func onLog(_ log: OTelLogEntry) /// Force log processors that batch logs to flush immediately. func forceFlush() async throws diff --git a/Sources/OTel/Logging/Processing/OTelMultiplexLogProcessor.swift b/Sources/OTel/Logging/Processing/OTelMultiplexLogEntryProcessor.swift similarity index 78% rename from Sources/OTel/Logging/Processing/OTelMultiplexLogProcessor.swift rename to Sources/OTel/Logging/Processing/OTelMultiplexLogEntryProcessor.swift index 003d44c8..0d714d09 100644 --- a/Sources/OTel/Logging/Processing/OTelMultiplexLogProcessor.swift +++ b/Sources/OTel/Logging/Processing/OTelMultiplexLogEntryProcessor.swift @@ -14,18 +14,18 @@ import ServiceContextModule import ServiceLifecycle -/// A pseudo-``OTelLogProcessor`` that may be used to process using multiple other ``OTelLogProcessor``s. +/// A pseudo-``OTelLogEntryProcessor`` that may be used to process using multiple other ``OTelLogEntryProcessor``s. @_spi(Logging) -public actor OTelMultiplexLogProcessor: OTelLogProcessor { - private let processors: [any OTelLogProcessor] +public actor OTelMultiplexLogEntryProcessor: OTelLogEntryProcessor { + private let processors: [any OTelLogEntryProcessor] private let shutdownStream: AsyncStream private let shutdownContinuation: AsyncStream.Continuation - /// Create an ``OTelMultiplexLogProcessor``. + /// Create an ``OTelMultiplexLogEntryProcessor``. /// - /// - Parameter processors: An array of ``OTelLogProcessor``s, each of which will be invoked on log events + /// - Parameter processors: An array of ``OTelLogEntryProcessor``s, each of which will be invoked on log events /// Processors are called sequentially and the order of this array defines the order in which they're being called. - public init(processors: [any OTelLogProcessor]) { + public init(processors: [any OTelLogEntryProcessor]) { self.processors = processors (shutdownStream, shutdownContinuation) = AsyncStream.makeStream() } @@ -51,7 +51,7 @@ public actor OTelMultiplexLogProcessor: OTelLogProcessor { } } - nonisolated public func onLog(_ log: OTelLog) { + nonisolated public func onLog(_ log: OTelLogEntry) { for processor in processors { processor.onLog(log) } diff --git a/Sources/OTel/Logging/Processing/OTelNoOpLogProcessor.swift b/Sources/OTel/Logging/Processing/OTelNoOpLogEntryProcessor.swift similarity index 88% rename from Sources/OTel/Logging/Processing/OTelNoOpLogProcessor.swift rename to Sources/OTel/Logging/Processing/OTelNoOpLogEntryProcessor.swift index d0c6e900..61df0f15 100644 --- a/Sources/OTel/Logging/Processing/OTelNoOpLogProcessor.swift +++ b/Sources/OTel/Logging/Processing/OTelNoOpLogEntryProcessor.swift @@ -15,7 +15,7 @@ import ServiceContextModule /// A span processor that ignores all operations, used when no spans should be processed. @_spi(Logging) -public struct OTelNoOpLogProcessor: OTelLogProcessor, CustomStringConvertible { +public struct OTelNoOpLogEntryProcessor: OTelLogEntryProcessor, CustomStringConvertible { public let description = "OTelNoOpSpanProcessor" private let stream: AsyncStream @@ -30,7 +30,7 @@ public struct OTelNoOpLogProcessor: OTelLogProcessor, CustomStringConvertible { for await _ in stream.cancelOnGracefulShutdown() {} } - public func onLog(_ log: OTelLog) { + public func onLog(_ log: OTelLogEntry) { // no-op } diff --git a/Sources/OTel/Logging/Processing/OTelSimpleLogProcessor.swift b/Sources/OTel/Logging/Processing/OTelSimpleLogEntryProcessor.swift similarity index 81% rename from Sources/OTel/Logging/Processing/OTelSimpleLogProcessor.swift rename to Sources/OTel/Logging/Processing/OTelSimpleLogEntryProcessor.swift index bda0c1bd..bd682855 100644 --- a/Sources/OTel/Logging/Processing/OTelSimpleLogProcessor.swift +++ b/Sources/OTel/Logging/Processing/OTelSimpleLogEntryProcessor.swift @@ -12,10 +12,10 @@ //===----------------------------------------------------------------------===// @_spi(Logging) -public struct OTelSimpleLogProcessor: OTelLogProcessor { +public struct OTelSimpleLogEntryProcessor: OTelLogEntryProcessor { private let exporter: Exporter - private let stream: AsyncStream - private let continuation: AsyncStream.Continuation + private let stream: AsyncStream + private let continuation: AsyncStream.Continuation public init(exporter: Exporter) { self.exporter = exporter @@ -32,7 +32,7 @@ public struct OTelSimpleLogProcessor: OTelLogProcesso } } - public func onLog(_ log: OTelLog) { + public func onLog(_ log: OTelLogEntry) { continuation.yield(log) }