Skip to content

Commit

Permalink
Merge pull request #270 from JupiterOne/fix-hung-queue
Browse files Browse the repository at this point in the history
throw error when queue stops processing; handle 403 error on fetch roles
  • Loading branch information
RonaldEAM committed May 3, 2024
2 parents 9bf26b5 + 0994c96 commit 343e3c9
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 28 deletions.
3 changes: 0 additions & 3 deletions src/client.ts
Expand Up @@ -57,9 +57,6 @@ export class APIClient extends BaseAPIClient {
super({
baseUrl: config.oktaOrgUrl,
logger,
retryOptions: {
timeout: 0,
},
rateLimitThrottling: {
threshold: rateLimitThreshold,
resetMode: 'datetime_epoch_s',
Expand Down
21 changes: 17 additions & 4 deletions src/steps/roles.ts
Expand Up @@ -3,6 +3,7 @@ import {
createIntegrationEntity,
IntegrationStep,
IntegrationStepExecutionContext,
IntegrationWarnEventName,
parseTimePropertyValue,
RelationshipClass,
} from '@jupiterone/integration-sdk-core';
Expand Down Expand Up @@ -125,8 +126,14 @@ export async function fetchRoles({
);
await flushBatch();
} catch (err) {
logger.error({ err }, 'Failed to fetch user roles');
throw err;
if (err.status === 403) {
logger.publishWarnEvent({
name: IntegrationWarnEventName.MissingPermission,
description: 'The API key does not have access to fetch user roles.',
});
} else {
throw err;
}
}

try {
Expand All @@ -146,8 +153,14 @@ export async function fetchRoles({
);
await flushBatch();
} catch (err) {
logger.error({ err }, 'Failed to fetch group roles');
throw err;
if (err.status === 403) {
logger.publishWarnEvent({
name: IntegrationWarnEventName.MissingPermission,
description: 'The API key does not have access to fetch group roles.',
});
} else {
throw err;
}
}
}

Expand Down
67 changes: 46 additions & 21 deletions src/util/withConcurrentQueue.ts
@@ -1,7 +1,12 @@
import { IntegrationLogger } from '@jupiterone/integration-sdk-core';
import {
IntegrationError,
IntegrationLogger,
} from '@jupiterone/integration-sdk-core';
import Bottleneck from 'bottleneck';
import { QueueTasksState } from '../types/queue';

const FIVE_MINUTES = 5 * 60 * 1_000;

/**
* Executes a function with managed concurrency using a rate-limiting strategy.
* This function sets up a `Bottleneck` limiter to control task execution, allowing
Expand Down Expand Up @@ -66,6 +71,16 @@ export async function withConcurrentQueue(
reservoirRefreshInterval: ONE_MINUTE_IN_MS, // refresh every minute.
});

const resetLimiter = () => {
limiter.updateSettings({
reservoir: null,
maxConcurrent: null,
minTime: 0,
reservoirRefreshAmount: null,
reservoirRefreshInterval: null,
});
};

const tasksState: QueueTasksState = {
error: undefined,
rateLimitReached: false,
Expand All @@ -82,13 +97,7 @@ export async function withConcurrentQueue(
if (!tasksState.error) {
tasksState.error = err;
// After the first error, reset the limiter to allow all remaining tasks to finish immediately.
limiter.updateSettings({
reservoir: null,
maxConcurrent: null,
minTime: 0,
reservoirRefreshAmount: null,
reservoirRefreshInterval: null,
});
resetLimiter();
}
});

Expand All @@ -102,17 +111,35 @@ export async function withConcurrentQueue(
});
};

let debugLimiterIntervalId: NodeJS.Timeout | undefined;
if (options.logQueueState) {
debugLimiterIntervalId = setInterval(
() => {
options.logger?.info(
`${options.logPrefix ? `${options.logPrefix} ` : ''}${JSON.stringify(limiter.counts())}`,
);
},
5 * 60 * 1_000, // Log every 5 minutes
);
let lastStateChangeTime = Date.now();
const states = ['received', 'queued', 'scheduled', 'executing', 'done'];
for (const state of states) {
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
limiter.on(state, () => {
lastStateChangeTime = Date.now();
});
}
const limiterIntervalId = setInterval(() => {
const queueState = JSON.stringify(limiter.counts());
if (options.logQueueState) {
options.logger?.info(
`${options.logPrefix ? `${options.logPrefix} ` : ''}${queueState}`,
);
}
if (Date.now() - lastStateChangeTime >= FIVE_MINUTES) {
options.logger?.error(
{ queueState },
'Queue has been in the same state for more than 5 minutes.',
);
tasksState.error = new IntegrationError({
code: 'QUEUE_STATE_CHANGE_TIMEOUT',
message: `Queue has been in the same state for more than 5 minutes.`,
});
resetLimiter();
resolveIdlePromise?.();
}
}, FIVE_MINUTES);

try {
await fn(limiter, tasksState, async () => {
Expand All @@ -124,9 +151,7 @@ export async function withConcurrentQueue(
}
});
} finally {
if (debugLimiterIntervalId) {
clearInterval(debugLimiterIntervalId);
}
clearInterval(limiterIntervalId);
limiter.removeAllListeners();
}
}

0 comments on commit 343e3c9

Please sign in to comment.