From 01cd0f6edd0055e488b8f27b368c18bc83c8fed6 Mon Sep 17 00:00:00 2001 From: Crystal Magloire Date: Mon, 5 Feb 2024 15:02:01 -0500 Subject: [PATCH] Diagnostics channel support --- docs/docs/api/DiagnosticsChannel.md | 57 +++++ lib/core/diagnostics.js | 79 ++++++- lib/web/fetch/index.js | 233 ++++++++++++-------- test/node-test/debug.js | 5 +- test/node-test/diagnostics-channel/fetch.js | 106 +++++++++ 5 files changed, 390 insertions(+), 90 deletions(-) create mode 100644 test/node-test/diagnostics-channel/fetch.js diff --git a/docs/docs/api/DiagnosticsChannel.md b/docs/docs/api/DiagnosticsChannel.md index 099c072f6c6..9fae2c40c77 100644 --- a/docs/docs/api/DiagnosticsChannel.md +++ b/docs/docs/api/DiagnosticsChannel.md @@ -202,3 +202,60 @@ diagnosticsChannel.channel('undici:websocket:pong').subscribe(({ payload }) => { console.log(payload) }) ``` +The below channels collectively act as [`tracingChannel.tracePromise`](https://nodejs.org/api/diagnostics_channel.html#tracingchanneltracepromisefn-context-thisarg-args) on `fetch`. So all of them will publish the arguments passed to `fetch`. + +## `tracing:undici:fetch:start` + +This message is published when `fetch` is called, and will publish the arguments passed to `fetch`. + +```js +import diagnosticsChannel from 'diagnostics_channel' +diagnosticsChannel.channel('tracing:undici:fetch:start').subscribe(({ input, init }) => { + console.log('input', input) + console.log('init', init) +}) +``` + +## `tracing:undici:fetch:end` + +This message is published at the end of `fetch`'s execution, and will publish any `error` from the synchronous part of `fetch`. Since `fetch` is asynchronous, this should be empty. This channel will publish the same values as `undici:fetch:start`, but we are including it to track when `fetch` finishes execution and to be consistent with [`TracingChannel`](https://nodejs.org/api/diagnostics_channel.html#class-tracingchannel). +```js +import diagnosticsChannel from 'diagnostics_channel' +diagnosticsChannel.channel('tracing:undici:fetch:end').subscribe(({ input, init, error }) => { + console.log('input', input) + console.log('init', init) + console.log('error', error) // should be empty +}) +``` +## `tracing:undici:fetch:asyncStart` +This message is published after `fetch` resolves or rejects. If `fetch` resolves, it publishes the response in `result`. If it rejects, it publishes the error in `error`. +```js +import diagnosticsChannel from 'diagnostics_channel' +diagnosticsChannel.channel('tracing:undici:fetch:asyncStart').subscribe(({ input, init, result, error }) => { + console.log('input', input) + console.log('init', init) + console.log('response', result) + console.log('error', error) +}) +``` +## `tracing:undici:fetch:asyncEnd` +This channel gets published the same values as and at the same time as `tracing:undici:fetch:asyncStart` in the case of [`tracingChannel.tracePromise`](https://nodejs.org/api/diagnostics_channel.html#tracingchanneltracepromisefn-context-thisarg-args) +```js +import diagnosticsChannel from 'diagnostics_channel' +diagnosticsChannel.channel('tracing:undici:fetch:asyncEnd').subscribe(({ input, init, result, error }) => { + console.log('input', input) + console.log('init', init) + console.log('response', result) + console.log('error', error) +}) +``` +## `tracing:undici:fetch:error` +This message is published when an error is thrown or promise rejects while calling `fetch`. +```js +import diagnosticsChannel from 'diagnostics_channel' +diagnosticsChannel.channel('tracing:undici:fetch:error').subscribe(({ input, init, error }) => { + console.log('input', input) + console.log('init', init) + console.log('error', error) +}) +``` \ No newline at end of file diff --git a/lib/core/diagnostics.js b/lib/core/diagnostics.js index e1af3db6112..1f7ff8e8995 100644 --- a/lib/core/diagnostics.js +++ b/lib/core/diagnostics.js @@ -6,6 +6,12 @@ const undiciDebugLog = util.debuglog('undici') const fetchDebuglog = util.debuglog('fetch') const websocketDebuglog = util.debuglog('websocket') let isClientSet = false +let tracingChannel + +if (diagnosticsChannel.tracingChannel) { + tracingChannel = diagnosticsChannel.tracingChannel('undici:fetch') +} + const channels = { // Client beforeConnect: diagnosticsChannel.channel('undici:client:beforeConnect'), @@ -23,7 +29,9 @@ const channels = { close: diagnosticsChannel.channel('undici:websocket:close'), socketError: diagnosticsChannel.channel('undici:websocket:socket_error'), ping: diagnosticsChannel.channel('undici:websocket:ping'), - pong: diagnosticsChannel.channel('undici:websocket:pong') + pong: diagnosticsChannel.channel('undici:websocket:pong'), + // Fetch channels + tracingChannel } if (undiciDebugLog.enabled || fetchDebuglog.enabled) { @@ -114,6 +122,75 @@ if (undiciDebugLog.enabled || fetchDebuglog.enabled) { isClientSet = true } +// Track fetch requests +if (fetchDebuglog.enabled && diagnosticsChannel.tracingChannel) { + const debuglog = fetchDebuglog + + tracingChannel.start.subscribe(evt => { + const { + input + } = evt + debuglog( + 'fetch has started request to %s', + input + ) + }) + + tracingChannel.end.subscribe(evt => { + const { + input + } = evt + debuglog( + 'fetch has received response from %s', + input + ) + }) + + tracingChannel.asyncStart.subscribe(evt => { + const { + input, + result, + error + } = evt + if (result && error) { + debuglog( + 'fetch has received response for %s - HTTP %d, error is %s', + input, + result.status, + error.message + ) + } else if (result) { + debuglog( + 'fetch has received response for %s - HTTP %d', + input, + result.status + ) + } else if (error) { + debuglog( + 'fetch has errored for %s - %s', + input, + error.message + ) + } else { + debuglog( + 'fetch has started request to %s', + input + ) + } + }) + + tracingChannel.error.subscribe(evt => { + const { + error + } = evt + debuglog( + 'fetch error event received response %s', + error.message + ) + }) + isClientSet = true +} + if (websocketDebuglog.enabled) { if (!isClientSet) { const debuglog = undiciDebugLog.enabled ? undiciDebugLog : websocketDebuglog diff --git a/lib/web/fetch/index.js b/lib/web/fetch/index.js index 82ccb26865c..03b0362bba4 100644 --- a/lib/web/fetch/index.js +++ b/lib/web/fetch/index.js @@ -70,6 +70,8 @@ const defaultUserAgent = typeof __UNDICI_IS_NODE__ !== 'undefined' || typeof esb ? 'node' : 'undici' +const channels = require('../../core/diagnostics.js').channels.tracingChannel + /** @type {import('buffer').resolveObjectURL} */ let resolveObjectURL @@ -120,12 +122,66 @@ class Fetch extends EE { } } -// https://fetch.spec.whatwg.org/#fetch-method -function fetch (input, init = undefined) { - webidl.argumentLengthCheck(arguments, 1, 'globalThis.fetch') +// This will publish all diagnostic events only when we have subscribers. +function ifSubscribersRunStores (request, input, init, callback) { + const hasSubscribers = subscribersCheck() - // 1. Let p be a new promise. - const p = createDeferredPromise() + if (hasSubscribers) { + const context = { request, input, init, result: null, error: null } + + return channels.start.runStores(context, () => { + try { + return callback(createInstrumentedDeferredPromise(context)) + } catch (e) { + context.error = e + channels.error.publish(context) + throw e + } finally { + channels.end.publish(context) + } + }) + } else { + return callback(createDeferredPromise()) + } +} + +// subscribersCheck will be called at the beginning of the fetch call +// and will check if we have subscribers +function subscribersCheck () { + return channels && (channels.start.hasSubscribers || + channels.end.hasSubscribers || + channels.asyncStart.hasSubscribers || + channels.asyncEnd.hasSubscribers || + channels.error.hasSubscribers) +} + +function createInstrumentedDeferredPromise (context) { + let res + let rej + const promise = new Promise((resolve, reject) => { + res = function (result) { + context.result = result + channels.asyncStart.runStores(context, () => { + resolve.apply(this, arguments) + channels.asyncEnd.publish(context) + }) + } + rej = function (error) { + context.error = error + channels.error.publish(context) + channels.asyncStart.runStores(context, () => { + reject.apply(this, arguments) + channels.asyncEnd.publish(context) + }) + } + }) + + return { promise, resolve: res, reject: rej } +} + +// https://fetch.spec.whatwg.org/#fetch-method +function fetch (input, init = {}) { + webidl.argumentLengthCheck(arguments, 1, { header: 'globalThis.fetch' }) // 2. Let requestObject be the result of invoking the initial value of // Request as constructor with input and init as arguments. If this throws @@ -135,114 +191,115 @@ function fetch (input, init = undefined) { try { requestObject = new Request(input, init) } catch (e) { - p.reject(e) - return p.promise + return Promise.reject(e) } - // 3. Let request be requestObject’s request. - const request = requestObject[kState] + return ifSubscribersRunStores(requestObject, input, init, p => { + // 3. Let request be requestObject’s request. + const request = requestObject[kState] - // 4. If requestObject’s signal’s aborted flag is set, then: - if (requestObject.signal.aborted) { - // 1. Abort the fetch() call with p, request, null, and - // requestObject’s signal’s abort reason. - abortFetch(p, request, null, requestObject.signal.reason) + // 4. If requestObject’s signal’s aborted flag is set, then: + if (requestObject.signal.aborted) { + // 1. Abort the fetch() call with p, request, null, and + // requestObject’s signal’s abort reason. + abortFetch(p, request, null, requestObject.signal.reason) - // 2. Return p. - return p.promise - } + // 2. Return p. + return p.promise + } - // 5. Let globalObject be request’s client’s global object. - const globalObject = request.client.globalObject + // 5. Let globalObject be request’s client’s global object. + const globalObject = request.client.globalObject - // 6. If globalObject is a ServiceWorkerGlobalScope object, then set - // request’s service-workers mode to "none". - if (globalObject?.constructor?.name === 'ServiceWorkerGlobalScope') { - request.serviceWorkers = 'none' - } + // 6. If globalObject is a ServiceWorkerGlobalScope object, then set + // request’s service-workers mode to "none". + if (globalObject?.constructor?.name === 'ServiceWorkerGlobalScope') { + request.serviceWorkers = 'none' + } - // 7. Let responseObject be null. - let responseObject = null + // 7. Let responseObject be null. + let responseObject = null - // 8. Let relevantRealm be this’s relevant Realm. + // 8. Let relevantRealm be this’s relevant Realm. + const relevantRealm = null - // 9. Let locallyAborted be false. - let locallyAborted = false + // 9. Let locallyAborted be false. + let locallyAborted = false - // 10. Let controller be null. - let controller = null + // 10. Let controller be null. + let controller = null - // 11. Add the following abort steps to requestObject’s signal: - addAbortListener( - requestObject.signal, - () => { - // 1. Set locallyAborted to true. - locallyAborted = true + // 11. Add the following abort steps to requestObject’s signal: + addAbortListener( + requestObject.signal, + () => { + // 1. Set locallyAborted to true. + locallyAborted = true - // 2. Assert: controller is non-null. - assert(controller != null) + // 2. Assert: controller is non-null. + assert(controller != null) - // 3. Abort controller with requestObject’s signal’s abort reason. - controller.abort(requestObject.signal.reason) + // 3. Abort controller with requestObject’s signal’s abort reason. + controller.abort(requestObject.signal.reason) - // 4. Abort the fetch() call with p, request, responseObject, - // and requestObject’s signal’s abort reason. - abortFetch(p, request, responseObject, requestObject.signal.reason) - } - ) + // 4. Abort the fetch() call with p, request, responseObject, + // and requestObject’s signal’s abort reason. + abortFetch(p, request, responseObject, requestObject.signal.reason) + } + ) - // 12. Let handleFetchDone given response response be to finalize and - // report timing with response, globalObject, and "fetch". - const handleFetchDone = (response) => - finalizeAndReportTiming(response, 'fetch') + // 12. Let handleFetchDone given response response be to finalize and + // report timing with response, globalObject, and "fetch". + const handleFetchDone = (response) => + finalizeAndReportTiming(response, 'fetch') - // 13. Set controller to the result of calling fetch given request, - // with processResponseEndOfBody set to handleFetchDone, and processResponse - // given response being these substeps: + // 13. Set controller to the result of calling fetch given request, + // with processResponseEndOfBody set to handleFetchDone, and processResponse + // given response being these substeps: - const processResponse = (response) => { - // 1. If locallyAborted is true, terminate these substeps. - if (locallyAborted) { - return - } + const processResponse = (response) => { + // 1. If locallyAborted is true, terminate these substeps. + if (locallyAborted) { + return + } - // 2. If response’s aborted flag is set, then: - if (response.aborted) { - // 1. Let deserializedError be the result of deserialize a serialized - // abort reason given controller’s serialized abort reason and - // relevantRealm. + // 2. If response’s aborted flag is set, then: + if (response.aborted) { + // 1. Let deserializedError be the result of deserialize a serialized + // abort reason given controller’s serialized abort reason and + // relevantRealm. - // 2. Abort the fetch() call with p, request, responseObject, and - // deserializedError. + // 2. Abort the fetch() call with p, request, responseObject, and + // deserializedError. - abortFetch(p, request, responseObject, controller.serializedAbortReason) - return - } + abortFetch(p, request, responseObject, controller.serializedAbortReason) + return + } - // 3. If response is a network error, then reject p with a TypeError - // and terminate these substeps. - if (response.type === 'error') { - p.reject(new TypeError('fetch failed', { cause: response.error })) - return - } + // 3. If response is a network error, then reject p with a TypeError + // and terminate these substeps. + if (response.type === 'error') { + p.reject(new TypeError('fetch failed', { cause: response.error })) + return + } - // 4. Set responseObject to the result of creating a Response object, - // given response, "immutable", and relevantRealm. - responseObject = fromInnerResponse(response, 'immutable') + // 4. Set responseObject to the result of creating a Response object, + // given response, "immutable", and relevantRealm. + responseObject = fromInnerResponse(response, 'immutable', relevantRealm) - // 5. Resolve p with responseObject. - p.resolve(responseObject) - } + // 5. Resolve p with responseObject. + p.resolve(responseObject) + } - controller = fetching({ - request, - processResponseEndOfBody: handleFetchDone, - processResponse, - dispatcher: requestObject[kDispatcher] // undici + controller = fetching({ + request, + processResponseEndOfBody: handleFetchDone, + processResponse, + dispatcher: requestObject[kDispatcher] // undici + }) + // 14. Return p. + return p.promise }) - - // 14. Return p. - return p.promise } // https://fetch.spec.whatwg.org/#finalize-and-report-timing diff --git a/test/node-test/debug.js b/test/node-test/debug.js index d7c462f57ae..f26d1cb9213 100644 --- a/test/node-test/debug.js +++ b/test/node-test/debug.js @@ -48,7 +48,7 @@ test('debug#websocket', { skip: !process.versions.icu }, async t => { }) test('debug#fetch', async t => { - const assert = tspl(t, { plan: 7 }) + const assert = tspl(t, { plan: 10 }) const child = spawn( process.execPath, [join(__dirname, '../fixtures/fetch.js')], @@ -58,11 +58,14 @@ test('debug#fetch', async t => { ) const chunks = [] const assertions = [ + /(FETCH [0-9]+:) (fetch has started)/, /(FETCH [0-9]+:) (connecting to)/, + /(FETCH [0-9]+:) (fetch has received)/, /(FETCH [0-9]+:) (connected to)/, /(FETCH [0-9]+:) (sending request)/, /(FETCH [0-9]+:) (received response)/, /(FETCH [0-9]+:) (trailers received)/, + /(FETCH [0-9]+:) (fetch has received)/, /^$/ ] diff --git a/test/node-test/diagnostics-channel/fetch.js b/test/node-test/diagnostics-channel/fetch.js new file mode 100644 index 00000000000..4987f15bfac --- /dev/null +++ b/test/node-test/diagnostics-channel/fetch.js @@ -0,0 +1,106 @@ +'use strict' + +const { tspl } = require('@matteo.collina/tspl') +const { describe, test, before, after } = require('node:test') +const { fetch } = require('../../..') + +let diagnosticsChannel +let skip = false +try { + diagnosticsChannel = require('node:diagnostics_channel') +} catch { + skip = true +} + +const { createServer } = require('http') + +describe('diagnosticsChannel for fetch', { skip }, () => { + let server + before(() => { + server = createServer((req, res) => { + res.setHeader('Content-Type', 'text/plain') + res.setHeader('trailer', 'foo') + res.write('hello') + res.addTrailers({ + foo: 'oof' + }) + res.end() + }) + }) + + after(() => { server.close() }) + + test('fetch', async t => { + t = tspl(t, { plan: 17 }) + + let startCalled = 0 + diagnosticsChannel.channel('tracing:undici:fetch:start').subscribe(({ input, init, result, error }) => { + startCalled += 1 + if (input.redirect) { + t.strictEqual(input, 'badrequest') + t.deepStrictEqual(init, { redirect: 'error' }) + } else { + t.strictEqual(input, `http://localhost:${server.address().port}`) + t.deepStrictEqual(init, {}) + } + }) + + let endCalled = 0 + diagnosticsChannel.channel('tracing:undici:fetch:end').subscribe(({ input, init, result, error }) => { + endCalled += 1 + if (init.redirect) { + t.strictEqual(input, 'badrequest') + t.deepStrictEqual(init, { redirect: 'error' }) + } else { + t.strictEqual(input, `http://localhost:${server.address().port}`) + t.deepStrictEqual(init, {}) + } + t.strictEqual(result, null) + }) + + let asyncStartCalled = 0 + diagnosticsChannel.channel('tracing:undici:fetch:asyncStart').subscribe(({ input, init, result, error }) => { + asyncStartCalled += 1 + if (init.redirect) { + t.strictEqual(input, 'badrequest') + t.deepStrictEqual(init, { redirect: 'error' }) + } else { + t.strictEqual(input, `http://localhost:${server.address().port}`) + t.deepStrictEqual(init, {}) + t.ok(result) + } + }) + + let asyncEndCalled = 0 + diagnosticsChannel.channel('tracing:undici:fetch:asyncEnd').subscribe(async ({ input, init, result, error }) => { + asyncEndCalled += 1 + if (init.redirect) { + t.strictEqual(input, 'badrequest') + t.deepStrictEqual(init, { redirect: 'error' }) + t.strictEqual(result, null) + t.ok(error) + t.strictEqual(error.cause.code, 'ERR_INVALID_URL') + } else { + t.strictEqual(input, `http://localhost:${server.address().port}`) + t.deepStrictEqual(init, {}) + t.ok(result) + t.strictEqual(result.status, 200) + t.strictEqual(error, null) + } + }) + + server.listen(0, async () => { + await fetch(`http://localhost:${server.address().port}`) + try { + await fetch('badrequest', { redirect: 'error' }) + } catch (e) { } + server.close() + t.strictEqual(startCalled, 1) + t.strictEqual(endCalled, 1) + t.strictEqual(asyncStartCalled, 1) + t.strictEqual(asyncEndCalled, 1) + }) + + await t.completed + }) +})