Skip to content

Commit

Permalink
Merge pull request #266 from JupiterOne/INT-10529-improv13
Browse files Browse the repository at this point in the history
re-enqueue tasks when rate limit is reached
  • Loading branch information
RonaldEAM committed Apr 16, 2024
2 parents ae40055 + 71073cc commit ccb526e
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 25 deletions.
1 change: 1 addition & 0 deletions package.json
Expand Up @@ -34,6 +34,7 @@
},
"dependencies": {
"@jupiterone/integration-sdk-http-client": "^12.4.0",
"@lifeomic/attempt": "^3.1.0",
"bottleneck": "^2.19.5",
"lodash": "^4.17.21",
"lodash.startcase": "^4.4.0",
Expand Down
195 changes: 178 additions & 17 deletions src/client.ts
@@ -1,5 +1,7 @@
import {
IntegrationError,
IntegrationLogger,
IntegrationProviderAPIError,
IntegrationProviderAuthenticationError,
} from '@jupiterone/integration-sdk-core';
import { IntegrationConfig } from './config';
Expand All @@ -22,18 +24,21 @@ import parse from 'parse-link-header';
import {
BaseAPIClient,
RetryOptions,
fatalRequestError,
isRetryableRequest,
retryableRequestError,
} from '@jupiterone/integration-sdk-http-client';
import Bottleneck from 'bottleneck';
import { retry } from '@lifeomic/attempt';
import { Response } from 'node-fetch';
import { QueueTasksState } from './types/queue';
import { setTimeout } from 'node:timers/promises';

export type ResourceIteratee<T> = (each: T) => Promise<void> | void;

const NINETY_DAYS_AGO = 90 * 24 * 60 * 60 * 1000;
const DEFAULT_RATE_LIMIT_THRESHOLD = 0.5;

async function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
}

/**
* An APIClient maintains authentication state and provides an interface to
* third party data APIs.
Expand Down Expand Up @@ -76,7 +81,11 @@ export class APIClient extends BaseAPIClient {

private getHandleErrorFn(): RetryOptions['handleError'] {
return async (err, context, logger) => {
if (err.code === 'ETIMEDOUT') {
if (
['ECONNRESET', 'ETIMEDOUT'].some(
(code) => err.code === code || err.message.includes(code),
)
) {
return;
}

Expand All @@ -93,14 +102,11 @@ export class APIClient extends BaseAPIClient {
err.cause.headers?.['date'] &&
err.cause.headers?.['x-rate-limit-reset']
) {
// Determine wait time by getting the delta X-Rate-Limit-Reset and the Date header
// Add 1 second to account for sub second differences between the clocks that create these headers
const nowDate = new Date(err.cause.headers['date'] as string);
const retryDate = new Date(
parseInt(err.cause.headers['x-rate-limit-reset'] as string, 10) *
1000,
const headers = err.cause.headers;
retryAfter = this.getDelayUntilReset(
headers.get('date') as string,
headers.get('x-rate-limit-reset') as string,
);
retryAfter = retryDate.getTime() - nowDate.getTime() + 1000;
}

logger.warn(
Expand All @@ -110,7 +116,7 @@ export class APIClient extends BaseAPIClient {
},
'Received a rate limit error. Waiting before retrying.',
);
await sleep(retryAfter);
await setTimeout(retryAfter);
}
};
}
Expand Down Expand Up @@ -306,7 +312,7 @@ export class APIClient extends BaseAPIClient {
groupId: string,
iteratee: ResourceIteratee<OktaUser>,
limiter: Bottleneck,
tasksState: { error: any },
tasksState: QueueTasksState,
): void {
const initialUrl = `/api/v1/groups/${groupId}/users?limit=1000`;
const iteratePages = async (url: string) => {
Expand All @@ -317,9 +323,12 @@ export class APIClient extends BaseAPIClient {
}
let nextUrl: string | undefined;
try {
nextUrl = await this.requestPage(url, iteratee);
nextUrl = await this.requestPage(url, iteratee, tasksState);
} catch (err) {
if (err.status === 404) {
if (err.code === 'RATE_LIMIT_REACHED') {
// Retry this task after the rate limit is reset
void limiter.schedule(() => iteratePages(url));
} else if (err.status === 404) {
//ignore it. It's probably a group that got deleted between steps
} else {
err.groupId = groupId;
Expand All @@ -337,8 +346,9 @@ export class APIClient extends BaseAPIClient {
private async requestPage<T>(
url: string,
iteratee: ResourceIteratee<T>,
tasksState: QueueTasksState,
): Promise<string | undefined> {
const response = await this.retryableRequest(url);
const response = await this.retryableQueueRequest(url, tasksState);
const data = await response.json();
for (const item of data) {
await iteratee(item);
Expand All @@ -357,6 +367,157 @@ export class APIClient extends BaseAPIClient {
return parsedLink.next.url;
}

private async retryableQueueRequest(
endpoint: string,
tasksState: QueueTasksState,
) {
return retry(
async () => {
return this.withQueueRateLimiting(async () => {
if (tasksState.rateLimitReached) {
// throw error to re-enqueue this task until rate limit is reset
const error = new IntegrationError({
code: 'RATE_LIMIT_REACHED',
message: 'Rate limit reached',
});
throw error;
}
let response: Response | undefined;
try {
response = await this.request(endpoint);
} catch (err) {
this.logger.error(
{ code: err.code, err, endpoint },
'Error sending request',
);
throw err;
}

if (response.ok) {
return response;
}

let error: IntegrationProviderAPIError | undefined;
const requestErrorParams = {
endpoint,
response,
logger: this.logger,
logErrorBody: this.logErrorBody,
};
if (isRetryableRequest(response.status)) {
error = await retryableRequestError(requestErrorParams);
} else {
error = await fatalRequestError(requestErrorParams);
}
for await (const _chunk of response.body) {
// force consumption of body to avoid memory leaks
// https://github.com/node-fetch/node-fetch/issues/83
}
throw error;
}, tasksState);
},
{
maxAttempts: this.retryOptions.maxAttempts,
delay: this.retryOptions.delay,
factor: this.retryOptions.factor,
handleError: async (err, context) => {
if (err.code === 'ETIMEDOUT') {
return;
}

if (!err.retryable) {
// can't retry this? just abort
context.abort();
return;
}

if (err.status === 429) {
let retryAfter = 60_000;
if (
err.cause &&
err.cause.headers?.['date'] &&
err.cause.headers?.['x-rate-limit-reset']
) {
const headers = err.cause.headers;
retryAfter = this.getDelayUntilReset(
headers.get('date') as string,
headers.get('x-rate-limit-reset') as string,
);
}

this.logger.warn(
{
retryAfter,
endpoint: err.endpoint,
},
'Received a rate limit error. Waiting before retrying.',
);

tasksState.rateLimitReached = true;
await setTimeout(retryAfter);
tasksState.rateLimitReached = false;
}
},
},
);
}

private async withQueueRateLimiting(
fn: () => Promise<Response>,
tasksState: QueueTasksState,
): Promise<Response> {
const response = await fn();
const { headers } = response;
if (
!headers.has('x-rate-limit-limit') ||
!headers.has('x-rate-limit-remaining') ||
!headers.has('x-rate-limit-reset')
) {
return response;
}
const limit = parseInt(headers.get('x-rate-limit-limit') as string, 10);
const remaining = parseInt(
headers.get('x-rate-limit-remaining') as string,
10,
);
const rateLimitConsumed = limit - remaining;
const shouldThrottleRequests =
rateLimitConsumed / limit > this.rateLimitThrottling!.threshold;

if (shouldThrottleRequests) {
const timeToSleepInMs = this.getDelayUntilReset(
headers.get('date')!,
headers.get('x-rate-limit-reset')!,
);
this.logger.warn(
{
endpoint: response.url,
limit,
remaining,
timeToSleepInMs,
},
`Exceeded ${this.rateLimitThrottling!.threshold * 100}% of rate limit. Sleeping until x-rate-limit-reset.`,
);
tasksState.rateLimitReached = true;
await setTimeout(timeToSleepInMs);
tasksState.rateLimitReached = false;
}
return response;
}

/**
* Determine wait time by getting the delta X-Rate-Limit-Reset and the Date header
* Add 1 second to account for sub second differences between the clocks that create these headers
*/
private getDelayUntilReset(
nowTimestamp: string,
resetTimestamp: string,
): number {
const nowDate = new Date(nowTimestamp);
const retryDate = new Date(parseInt(resetTimestamp, 10) * 1000);
return retryDate.getTime() - nowDate.getTime() + 1000;
}

/**
* Iterates each Multi-Factor Authentication device assigned to a given user.
*
Expand Down
43 changes: 35 additions & 8 deletions src/steps/groups.ts
Expand Up @@ -27,6 +27,7 @@ import {
import { Group } from '@okta/okta-sdk-nodejs';
import Bottleneck from 'bottleneck';
import { chunk } from 'lodash';
import { QueueTasksState } from '../types/queue';

export async function fetchGroups({
instance,
Expand Down Expand Up @@ -213,6 +214,8 @@ async function buildGroupEntityToUserRelationships(
if (!groupIds.length) {
return;
}
const BATCH_SIZE = 500;
const ONE_MINUTE_IN_MS = 60_000;
const { instance, logger, jobState, executionConfig } = context;
const { logGroupMetrics } = executionConfig;

Expand All @@ -233,14 +236,15 @@ async function buildGroupEntityToUserRelationships(
);
const limiter = new Bottleneck({
maxConcurrent,
minTime: Math.floor(60_000 / maxConcurrent), // space requests evenly over 1 minute.
minTime: Math.floor(ONE_MINUTE_IN_MS / maxConcurrent), // space requests evenly over 1 minute.
reservoir: maxConcurrent,
reservoirRefreshAmount: maxConcurrent,
reservoirRefreshInterval: 60 * 1_000, // refresh every minute.
reservoirRefreshInterval: ONE_MINUTE_IN_MS, // refresh every minute.
});

const tasksState: { error: any } = {
const tasksState: QueueTasksState = {
error: undefined,
rateLimitReached: false,
};
limiter.on('failed', (err) => {
if (err.code === 'ECONNRESET') {
Expand All @@ -252,6 +256,14 @@ async function buildGroupEntityToUserRelationships(
} else {
if (!tasksState.error) {
tasksState.error = err;
// After the first error, reset the limiter to allow all remaining tasks to finish immediately.
limiter.updateSettings({
reservoir: null,
maxConcurrent: null,
minTime: 0,
reservoirRefreshAmount: null,
reservoirRefreshInterval: null,
});
}
}
});
Expand All @@ -266,7 +278,15 @@ async function buildGroupEntityToUserRelationships(
});
};

const groupIdBatches = chunk(groupIds, 1000);
// Log queue state every 5 minutes
const debugLimiterIntervalId = setInterval(
() => {
logger.info(`[${groupEntityType}] ${JSON.stringify(limiter.counts())}`);
},
5 * 60 * 1_000,
);

const groupIdBatches = chunk(groupIds, BATCH_SIZE);
try {
for (const groupIdBatch of groupIdBatches) {
for (const groupId of groupIdBatch) {
Expand All @@ -279,11 +299,15 @@ async function buildGroupEntityToUserRelationships(
}
const userKey = user.id;
if (jobState.hasKey(userKey)) {
await jobState.addRelationship(
createGroupUserRelationship(groupId, userKey),
const relationship = createGroupUserRelationship(
groupId,
userKey,
);
stats.relationshipsCreated++;
statsLogger(`[${groupEntityType}] Added relationships`);
if (!jobState.hasKey(relationship._key)) {
await jobState.addRelationship(relationship);
stats.relationshipsCreated++;
statsLogger(`[${groupEntityType}] Added relationships`);
}
} else {
logger.warn(
{ groupId, userId: userKey },
Expand Down Expand Up @@ -311,6 +335,9 @@ async function buildGroupEntityToUserRelationships(
} catch (err) {
logger.error({ err }, 'Failed to build group to user relationships');
throw err;
} finally {
clearInterval(debugLimiterIntervalId);
limiter.removeAllListeners();
}

if (skippedGroups.length) {
Expand Down
4 changes: 4 additions & 0 deletions src/types/queue.ts
@@ -0,0 +1,4 @@
export interface QueueTasksState {
error: any;
rateLimitReached: boolean;
}
5 changes: 5 additions & 0 deletions yarn.lock
Expand Up @@ -1344,6 +1344,11 @@
resolved "https://registry.yarnpkg.com/@lifeomic/attempt/-/attempt-3.0.3.tgz#e742a5b85eb673e2f1746b0f39cb932cbc6145bb"
integrity sha512-GlM2AbzrErd/TmLL3E8hAHmb5Q7VhDJp35vIbyPVA5Rz55LZuRr8pwL3qrwwkVNo05gMX1J44gURKb4MHQZo7w==

"@lifeomic/attempt@^3.1.0":
version "3.1.0"
resolved "https://registry.yarnpkg.com/@lifeomic/attempt/-/attempt-3.1.0.tgz#7fc703559177b81a008b9d263e3d9a001d11d08a"
integrity sha512-QZqem4QuAnAyzfz+Gj5/+SLxqwCAw2qmt7732ZXodr6VDWGeYLG6w1i/vYLa55JQM9wRuBKLmXmiZ2P0LtE5rw==

"@nodelib/fs.scandir@2.1.5":
version "2.1.5"
resolved "https://registry.yarnpkg.com/@nodelib/fs.scandir/-/fs.scandir-2.1.5.tgz#7619c2eb21b25483f6d167548b4cfd5a7488c3d5"
Expand Down

0 comments on commit ccb526e

Please sign in to comment.