Skip to content

Commit

Permalink
Merge pull request #264 from JupiterOne/INT-10529-improv11
Browse files Browse the repository at this point in the history
execute group->users relationship steps sequentially, remove timeout
  • Loading branch information
RonaldEAM committed Apr 10, 2024
2 parents 4bba1a6 + 9bf1620 commit 7199586
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 107 deletions.
9 changes: 4 additions & 5 deletions src/client.ts
Expand Up @@ -50,6 +50,9 @@ export class APIClient extends BaseAPIClient {
super({
baseUrl: config.oktaOrgUrl,
logger,
retryOptions: {
timeout: 0,
},
rateLimitThrottling: {
threshold: rateLimitThreshold,
resetMode: 'datetime_epoch_s',
Expand All @@ -71,11 +74,7 @@ export class APIClient extends BaseAPIClient {

private getHandleErrorFn(): RetryOptions['handleError'] {
return async (err, context, logger) => {
if (
['ECONNRESET', 'ETIMEDOUT'].some(
(code) => err.code === code || err.message.includes(code),
)
) {
if (err.code === 'ETIMEDOUT') {
return;
}

Expand Down
169 changes: 67 additions & 102 deletions src/steps/groups.ts
@@ -1,13 +1,13 @@
import {
Entity,
IntegrationLogger,
IntegrationStep,
IntegrationStepExecutionContext,
Relationship,
IntegrationWarnEventName,
getRawData,
} from '@jupiterone/integration-sdk-core';
import pMap from 'p-map';

import { APIClient, createAPIClient } from '../client';
import { createAPIClient } from '../client';
import { ExecutionConfig, IntegrationConfig } from '../config';
import { createAccountGroupRelationship } from '../converters';
import {
Expand Down Expand Up @@ -96,18 +96,17 @@ function getGroupStatsCollector() {
stats,
collectStats: (stats: GroupStats, group: Group) => {
stats.totalGroupsCollected++;
const isUserGroup =
group.type === 'OKTA_GROUP' || group.type === 'BUILT_IN';
if (isUserGroup) {
stats.userGroupsCollected++;
} else {
const isAppUserGroup = group.type === 'APP_GROUP';
if (isAppUserGroup) {
stats.appUserGroupsCollected++;
} else {
stats.userGroupsCollected++;
}
if (group._embedded?.stats?.usersCount) {
if (isUserGroup) {
stats.userGroupsCollectedWithUsers++;
} else {
if (isAppUserGroup) {
stats.appUserGroupsCollectedWithUsers++;
} else {
stats.userGroupsCollectedWithUsers++;
}
}
},
Expand Down Expand Up @@ -155,60 +154,17 @@ async function buildGroupEntityToUserRelationships(
requestedGroups: 0,
relationshipsCreated: 0,
};

const processGroupEntities = async (groupEntities: Entity[]) => {
const usersForGroupEntities = await collectUsersForGroupEntities(
apiClient,
groupEntities,
);
stats.requestedGroups += groupEntities.length;

let relationships: Relationship[] = [];

for (const { groupEntity, userKeys } of usersForGroupEntities) {
for (const userKey of userKeys) {
if (jobState.hasKey(userKey)) {
relationships.push(createGroupUserRelationship(groupEntity, userKey));
stats.relationshipsCreated++;
} else {
logger.warn(
{ groupId: groupEntity.id as string, userId: userKey },
'[SKIP] User not found in job state, could not build relationship to group',
);
}

if (relationships.length >= 500) {
await jobState.addRelationships(relationships);
if (logGroupMetrics) {
logger.info({ stats }, `[${groupEntityType}] Added relationships`);
}
relationships = [];
}
}
}

// Add any remaining relationships
if (relationships.length) {
await jobState.addRelationships(relationships);
if (logGroupMetrics) {
logger.info({ stats }, `[${groupEntityType}] Added relationships`);
}
}
};

const skippedGroups: string[] = [];
let everyoneGroupEntity: Entity | undefined;
try {
let entitiesBuffer: Entity[] = [];

const processBufferedEntities = async () => {
if (entitiesBuffer.length) await processGroupEntities(entitiesBuffer);
entitiesBuffer = [];
};
const statsLogger = createStatsLogger(stats, logger, logGroupMetrics);

try {
await jobState.iterateEntities(
{ _type: groupEntityType },
async (groupEntity) => {
stats.processedGroups++;
statsLogger(`[${groupEntityType}] Processed groups`);

const rawGroup = getRawData(groupEntity) as Group;
if ('_embedded' in rawGroup && !rawGroup._embedded?.stats?.usersCount) {
return;
Expand All @@ -226,30 +182,45 @@ async function buildGroupEntityToUserRelationships(
return;
}

entitiesBuffer.push(groupEntity);

if (entitiesBuffer.length >= 1000) {
await processBufferedEntities();
if (logGroupMetrics) {
logger.info({ stats }, `[${groupEntityType}] Processed groups`);
const groupId = groupEntity.id as string;
try {
await apiClient.iterateUsersForGroup(groupId, async (user) => {
if (!user.id) {
return;
}
const userKey = user.id;
if (jobState.hasKey(userKey)) {
await jobState.addRelationship(
createGroupUserRelationship(groupEntity, userKey),
);
stats.relationshipsCreated++;
statsLogger(`[${groupEntityType}] Added relationships`);
} else {
logger.warn(
{ groupId: groupEntity.id as string, userId: userKey },
'[SKIP] User not found in job state, could not build relationship to group',
);
}
});
} catch (err) {
if (err.code === 'ECONNRESET') {
logger.warn(`ECONNRESET error for group ${groupId}. Skipping.`);
skippedGroups.push(groupId);
} else {
throw err;
}
} finally {
stats.requestedGroups++;
}
},
);

if (entitiesBuffer.length) {
await processBufferedEntities();
if (logGroupMetrics) {
logger.info({ stats }, `[${groupEntityType}] Processed groups`);
}
}
} catch (err) {
logger.error({ err }, 'Failed to build group to user relationships');
throw err;
}

if (everyoneGroupEntity) {
logger.info('Adding all users to the Everyone group');
logger.info('Adding all users to the "Everyone" group');
await jobState.iterateEntities(
{ _type: Entities.USER._type },
async (userEntity) => {
Expand All @@ -262,38 +233,32 @@ async function buildGroupEntityToUserRelationships(
},
);
}

if (skippedGroups.length) {
logger.publishWarnEvent({
name: IntegrationWarnEventName.IncompleteData,
description: `Skipped groups due to ECONNRESET error: ${JSON.stringify(skippedGroups)}`,
});
}
}

async function collectUsersForGroupEntities(
apiClient: APIClient,
groupEntities: Entity[],
/**
* Create a function that logs group stats every 5 minutes.
*/
function createStatsLogger(
stats: any,
logger: IntegrationLogger,
logGroupMetrics: boolean | undefined,
) {
return await pMap(
groupEntities,
async (groupEntity) => {
const groupId = groupEntity.id as string;
const userKeys: string[] = [];

await apiClient.iterateUsersForGroup(groupId, async (user) => {
if (!user.id) {
return;
}
userKeys.push(user.id);
return Promise.resolve();
});

return { groupEntity, userKeys };
},
{
/**
* We throttle requests when the x-rate-limit-remaining header drops below
* 5. Previously, our concurrency here was 10, which meant that even if a
* request was throttled, there could be 9 additional requests coming right
* on its tail - which will result in throttling.
*/
concurrency: 4,
},
);
const FIVE_MINUTES = 5 * 60 * 1000;
let lastLogTime = Date.now();
return (message: string) => {
const now = Date.now();
if (Date.now() - lastLogTime >= FIVE_MINUTES && logGroupMetrics) {
logger.info({ stats }, message);
lastLogTime = now;
}
};
}

export const groupSteps: IntegrationStep<IntegrationConfig>[] = [
Expand Down

0 comments on commit 7199586

Please sign in to comment.