Skip to content

Commit

Permalink
Merge pull request #114 from amzn/thread_processing_options
Browse files Browse the repository at this point in the history
Add option to process requests on the cooperative or custom thread pool
  • Loading branch information
tachyonics committed Jan 16, 2023
2 parents cf94338 + 386a6e1 commit 497d912
Show file tree
Hide file tree
Showing 11 changed files with 201 additions and 123 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/swift.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ jobs:
name: Swift ${{ matrix.swift }} on ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-20.04, ubuntu-18.04]
swift: ["5.7"]
os: [ubuntu-22.04, ubuntu-20.04]
swift: ["5.7.2"]
runs-on: ${{ matrix.os }}
steps:
- uses: swift-actions/setup-swift@v1.18.0
- uses: swift-actions/setup-swift@v1.21.0
with:
swift-version: ${{ matrix.swift }}
- uses: actions/checkout@v2
Expand All @@ -31,7 +31,7 @@ jobs:
swift: ["5.6.3", "5.5.3"]
runs-on: ${{ matrix.os }}
steps:
- uses: swift-actions/setup-swift@v1.18.0
- uses: swift-actions/setup-swift@v1.21.0
with:
swift-version: ${{ matrix.swift }}
- uses: actions/checkout@v2
Expand Down
14 changes: 4 additions & 10 deletions Sources/SmokeOperations/OperationHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -221,18 +221,12 @@ public struct OperationHandler<ContextType, RequestHeadType, InvocationReporting
// To avoid retaining the original body `Data` object, `body` should not be referenced in this
// invocation.
if ignoreInvocationStrategy {
inputDecodeResult.handle(
requestHead: requestHead,
context: context,
responseHandler: responseHandler,
operationDelegate: operationDelegate)
inputDecodeResult.handle(requestHead: requestHead, context: context,
responseHandler: responseHandler, operationDelegate: operationDelegate)
} else {
invocationStrategy.invoke {
inputDecodeResult.handle(
requestHead: requestHead,
context: context,
responseHandler: responseHandler,
operationDelegate: operationDelegate)
inputDecodeResult.handle(requestHead: requestHead, context: context,
responseHandler: responseHandler, operationDelegate: operationDelegate)
}
}
}
Expand Down
64 changes: 24 additions & 40 deletions Sources/SmokeOperations/SmokeOperationReporting.swift
Original file line number Diff line number Diff line change
Expand Up @@ -48,32 +48,36 @@ public struct SmokeOperationReporting {
configuration: SmokeReportingConfiguration<OperationIdentifer>) {
let operationName = request.description

func getCounter(metricName: String) -> Counter {
let counterDimensions = [(namespaceDimension, serverName),
(operationNameDimension, operationName),
(metricNameDimension, metricName)]
return Counter(label: "\(serverName).\(operationName).\(metricName)",
dimensions: counterDimensions)
}

func getTimer(metricName: String) -> Timer {
let timerDimensions = [(namespaceDimension, serverName),
(operationNameDimension, operationName),
(metricNameDimension, metricName)]
return Timer(label: "\(serverName).\(operationName).\(metricName)",
dimensions: timerDimensions)
}

if configuration.reportSuccessForRequest(request) {
let successCounterDimensions = [(namespaceDimension, serverName),
(operationNameDimension, operationName),
(metricNameDimension, successCountMetric)]
successCounter = Counter(label: "\(serverName).\(operationName).\(successCountMetric)",
dimensions: successCounterDimensions)
successCounter = getCounter(metricName: successCountMetric)
} else {
successCounter = nil
}

if configuration.reportFailure5XXForRequest(request) {
let failure5XXCounterDimensions = [(namespaceDimension, serverName),
(operationNameDimension, operationName),
(metricNameDimension, failure5XXCountMetric)]
failure5XXCounter = Counter(label: "\(serverName).\(operationName).\(failure5XXCountMetric)",
dimensions: failure5XXCounterDimensions)
failure5XXCounter = getCounter(metricName: failure5XXCountMetric)
} else {
failure5XXCounter = nil
}

if configuration.reportFailure4XXForRequest(request) {
let failure4XXCounterDimensions = [(namespaceDimension, serverName),
(operationNameDimension, operationName),
(metricNameDimension, failure4XXCountMetric)]
failure4XXCounter = Counter(label: "\(serverName).\(operationName).\(failure4XXCountMetric)",
dimensions: failure4XXCounterDimensions)
failure4XXCounter = getCounter(metricName: failure4XXCountMetric)
} else {
failure4XXCounter = nil
}
Expand All @@ -82,11 +86,7 @@ public struct SmokeOperationReporting {
let specificFailureStatusesToReport = configuration.specificFailureStatusesToReport {
let countersWithStatusCodes: [(UInt, Counter)] = specificFailureStatusesToReport.map { statusCode in
let metricName = String(format: specificFailureStatusCountMetricFormat, statusCode)
let specificFailureStatusDimensions = [(namespaceDimension, serverName),
(operationNameDimension, operationName),
(metricNameDimension, metricName)]
let specificFailureStatusCounter = Counter(label: "\(serverName).\(operationName).\(metricName)",
dimensions: specificFailureStatusDimensions)
let specificFailureStatusCounter = getCounter(metricName: metricName)
return (statusCode, specificFailureStatusCounter)
}
specificFailureStatusCounters = Dictionary(uniqueKeysWithValues: countersWithStatusCodes)
Expand All @@ -95,41 +95,25 @@ public struct SmokeOperationReporting {
}

if configuration.reportLatencyForRequest(request) {
let latencyTimeDimensions = [(namespaceDimension, serverName),
(operationNameDimension, operationName),
(metricNameDimension, latencyTimeMetric)]
latencyTimer = Metrics.Timer(label: "\(serverName).\(operationName).\(latencyTimeMetric)",
dimensions: latencyTimeDimensions)
latencyTimer = getTimer(metricName: latencyTimeMetric)
} else {
latencyTimer = nil
}

if configuration.reportServiceLatencyForRequest(request) {
let serviceLatencyTimeDimensions = [(namespaceDimension, serverName),
(operationNameDimension, operationName),
(metricNameDimension, serviceLatencyTimeMetric)]
serviceLatencyTimer = Metrics.Timer(label: "\(serverName).\(operationName).\(serviceLatencyTimeMetric)",
dimensions: serviceLatencyTimeDimensions)
serviceLatencyTimer = getTimer(metricName: serviceLatencyTimeMetric)
} else {
serviceLatencyTimer = nil
}

if configuration.reportOutwardsServiceCallLatencySumForRequest(request) {
let serviceLatencyTimeDimensions = [(namespaceDimension, serverName),
(operationNameDimension, operationName),
(metricNameDimension, outwardsServiceCallLatencySumMetric)]
outwardsServiceCallLatencySumTimer = Metrics.Timer(label: "\(serverName).\(operationName).\(outwardsServiceCallLatencySumMetric)",
dimensions: serviceLatencyTimeDimensions)
outwardsServiceCallLatencySumTimer = getTimer(metricName: outwardsServiceCallLatencySumMetric)
} else {
outwardsServiceCallLatencySumTimer = nil
}

if configuration.reportOutwardsServiceCallRetryWaitLatencySumForRequest(request) {
let serviceLatencyTimeDimensions = [(namespaceDimension, serverName),
(operationNameDimension, operationName),
(metricNameDimension, outwardsServiceCallRetryWaitSumMetric)]
outwardsServiceCallRetryWaitSumTimer = Metrics.Timer(label: "\(serverName).\(operationName).\(outwardsServiceCallRetryWaitSumMetric)",
dimensions: serviceLatencyTimeDimensions)
outwardsServiceCallRetryWaitSumTimer = getTimer(metricName: outwardsServiceCallRetryWaitSumMetric)
} else {
outwardsServiceCallRetryWaitSumTimer = nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,23 @@ internal struct JSONErrorEncoder: ErrorEncoder {
}
}

public enum ResponseExecutor {
case existingThread
case eventLoop
case dispatchQueue
}

/**
Struct conforming to the OperationDelegate protocol that handles operations from HTTP1 requests with JSON encoded
request and response payloads.
*/
public struct GenericJSONPayloadHTTP1OperationDelegate<ResponseHandlerType: HTTP1ResponseHandler,
InvocationReportingType: InvocationReporting>: HTTP1OperationDelegate
where ResponseHandlerType.InvocationContext == SmokeInvocationContext<InvocationReportingType> {
public init() {

internal let responseExecutor: ResponseExecutor

public init(responseExecutor: ResponseExecutor = .eventLoop) {
self.responseExecutor = responseExecutor
}

public func decorateLoggerForAnonymousRequest(requestLogger: inout Logger) {
Expand Down Expand Up @@ -114,18 +122,29 @@ public struct GenericJSONPayloadHTTP1OperationDelegate<ResponseHandlerType: HTTP
}

public func handleResponseForOperation<OutputType>(
requestHead: SmokeHTTP1RequestHead, output: OutputType,
responseHandler: ResponseHandlerType,
invocationContext: SmokeInvocationContext<InvocationReportingType>) where OutputType: OperationHTTP1OutputProtocol {
// encode the response within the event loop of the server to limit the number of response
// `Data` objects that exist at single time to the number of threads in the event loop
responseHandler.executeInEventLoop(invocationContext: invocationContext) {
self.handleResponseForOperationInEventLoop(requestHead: requestHead, output: output, responseHandler: responseHandler,
invocationContext: invocationContext)
requestHead: SmokeHTTP1RequestHead, output: OutputType,
responseHandler: ResponseHandlerType,
invocationContext: SmokeInvocationContext<InvocationReportingType>) where OutputType: OperationHTTP1OutputProtocol {
switch self.responseExecutor {
case .existingThread:
self.handleResponseForOperationOnDesiredThreadPool(requestHead: requestHead, output: output, responseHandler: responseHandler,
invocationContext: invocationContext)
case .dispatchQueue:
DispatchQueue.global().async {
self.handleResponseForOperationOnDesiredThreadPool(requestHead: requestHead, output: output, responseHandler: responseHandler,
invocationContext: invocationContext)
}
case .eventLoop:
// encode the response within the event loop of the server to limit the number of response
// `Data` objects that exist at single time to the number of threads in the event loop
responseHandler.executeInEventLoop(invocationContext: invocationContext) {
self.handleResponseForOperationOnDesiredThreadPool(requestHead: requestHead, output: output, responseHandler: responseHandler,
invocationContext: invocationContext)
}
}
}

private func handleResponseForOperationInEventLoop<OutputType>(
private func handleResponseForOperationOnDesiredThreadPool<OutputType>(
requestHead: SmokeHTTP1RequestHead, output: OutputType,
responseHandler: ResponseHandlerType,
invocationContext: SmokeInvocationContext<InvocationReportingType>) where OutputType: OperationHTTP1OutputProtocol {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public protocol SmokeAsyncPerInvocationContextInitializer {
var handlerSelectorProvider: (() -> SelectorType) { get }
var operationsInitializer: ((inout SelectorType) -> Void) { get }

var defaultOperationDelegate: SelectorType.DefaultOperationDelegateType { get }
var serverName: String { get }
var invocationStrategy: InvocationStrategy { get }
var defaultLogger: Logger { get }
Expand Down

0 comments on commit 497d912

Please sign in to comment.