Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Script executor temporal worker & similar identities merger workflows #2391

Merged
merged 21 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19,339 changes: 8,631 additions & 10,708 deletions pnpm-lock.yaml

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions scripts/builders/script-executor-worker.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
DOCKERFILE="./services/docker/Dockerfile.script_executor_worker"
CONTEXT="../"
REPO="crowddotdev/script-executor-worker"
SERVICES="script-executor-worker"
18 changes: 18 additions & 0 deletions scripts/services/docker/Dockerfile.script_executor_worker
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
FROM node:20-alpine as builder

WORKDIR /usr/crowd/app
RUN corepack enable

COPY ./pnpm-workspace.yaml ./pnpm-lock.yaml ./
COPY ./services ./services
RUN pnpm i --frozen-lockfile

FROM node:20-bookworm-slim as runner

WORKDIR /usr/crowd/app
RUN corepack enable && apt update && apt install -y ca-certificates --no-install-recommends && rm -rf /var/lib/apt/lists/*

COPY --from=builder /usr/crowd/app/node_modules ./node_modules
COPY --from=builder /usr/crowd/app/services/libs ./services/libs
COPY --from=builder /usr/crowd/app/services/archetypes/ ./services/archetypes
COPY --from=builder /usr/crowd/app/services/apps/script_executor_worker/ ./services/apps/script_executor_worker
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
**/.git
**/node_modules
**/venv*
**/.webpack
**/.serverless
**/.cubestore
**/.env
**/.env.*
**/.idea
**/.vscode
**/dist
backend/server-config/
backend/util/
backend/src/serverless/microservices/python
.vscode/
.github/
frontend/
scripts/
.flake8
*.md
Makefile
backend/
32 changes: 32 additions & 0 deletions services/apps/script_executor_worker/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"name": "@crowd/script-executor-worker",
"scripts": {
"start": "CROWD_TEMPORAL_TASKQUEUE=script-executor SERVICE=script-executor-worker tsx src/main.ts",
"start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=script-executor SERVICE=script-executor-worker LOG_LEVEL=trace tsx --inspect=0.0.0.0:9232 src/main.ts",
"start:debug": "CROWD_TEMPORAL_TASKQUEUE=script-executor SERVICE=script-executor-worker LOG_LEVEL=trace tsx --inspect=0.0.0.0:9232 src/main.ts",
"dev:local": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug:local",
"dev": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug",
"lint": "npx eslint --ext .ts src --max-warnings=0",
"format": "npx prettier --write \"src/**/*.ts\"",
"format-check": "npx prettier --check .",
"tsc-check": "tsc --noEmit"
},
"dependencies": {
"@crowd/archetype-standard": "workspace:*",
"@crowd/archetype-worker": "workspace:*",
"@crowd/cubejs": "workspace:*",
"@crowd/types": "workspace:*",
"@crowd/redis": "workspace:*",
"@crowd/logging": "workspace:*",
"@crowd/data-access-layer": "workspace:*",
"@crowd/feature-flags": "workspace:*",
"@temporalio/workflow": "~1.8.6",
"moment": "~2.29.4",
"tsx": "^4.7.1",
"typescript": "^5.2.2"
},
"devDependencies": {
"@types/node": "^20.8.2",
"nodemon": "^3.0.1"
}
}
12 changes: 12 additions & 0 deletions services/apps/script_executor_worker/src/activities.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import {
findMembersWithSameVerifiedEmailsInDifferentPlatforms,
findMembersWithSamePlatformIdentitiesDifferentCapitalization,
} from './activities/merge-members-with-similar-identities'

import { mergeMembers } from './activities/common'

export {
findMembersWithSameVerifiedEmailsInDifferentPlatforms,
findMembersWithSamePlatformIdentitiesDifferentCapitalization,
mergeMembers,
}
23 changes: 23 additions & 0 deletions services/apps/script_executor_worker/src/activities/common.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
export async function mergeMembers(
primaryMemberId: string,
secondaryMemberId: string,
tenantId: string,
): Promise<void> {
const res = await fetch(
`${process.env['CROWD_API_SERVICE_URL']}/tenant/${tenantId}/member/${primaryMemberId}/merge`,
{
method: 'PUT',
headers: {
Authorization: `Bearer ${process.env['CROWD_API_SERVICE_USER_TOKEN']}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
memberToMerge: secondaryMemberId,
}),
},
)

if (res.status !== 200) {
throw new Error(`Failed to merge member ${primaryMemberId} with ${secondaryMemberId}!`)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { svc } from '../../main'
import MemberRepository from '@crowd/data-access-layer/src/old/apps/script_executor_worker/member.repo'
import { ISimilarMember } from '@crowd/data-access-layer/src/old/apps/script_executor_worker/types'
export async function findMembersWithSameVerifiedEmailsInDifferentPlatforms(
tenantId: string,
limit: number,
afterHash?: number,
): Promise<ISimilarMember[]> {
let rows: ISimilarMember[] = []

try {
const memberRepo = new MemberRepository(svc.postgres.reader.connection(), svc.log)
rows = await memberRepo.findMembersWithSameVerifiedEmailsInDifferentPlatforms(
tenantId,
limit,
afterHash,
)
} catch (err) {
throw new Error(err)
}

return rows
}

export async function findMembersWithSamePlatformIdentitiesDifferentCapitalization(
tenantId: string,
platform: string,
limit: number,
afterHash?: number,
): Promise<ISimilarMember[]> {
let rows: ISimilarMember[] = []

try {
const memberRepo = new MemberRepository(svc.postgres.reader.connection(), svc.log)
rows = await memberRepo.findMembersWithSameGithubIdentitiesDifferentCapitalization(
tenantId,
platform,
limit,
afterHash,
)
} catch (err) {
throw new Error(err)
}

return rows
}
39 changes: 39 additions & 0 deletions services/apps/script_executor_worker/src/main.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { Config } from '@crowd/archetype-standard'
import { ServiceWorker, Options } from '@crowd/archetype-worker'

const config: Config = {
envvars: [],
producer: {
enabled: false,
},
temporal: {
enabled: true,
},
redis: {
enabled: true,
},
}

const options: Options = {
maxTaskQueueActivitiesPerSecond: process.env['CROWD_TEMPORAL_TASKQUEUE_CACHE_MAX_ACTIVITIES']
? Number(process.env['CROWD_TEMPORAL_TASKQUEUE_CACHE_MAX_ACTIVITIES'])
: undefined,
maxConcurrentActivityTaskExecutions: process.env[
'CROWD_TEMPORAL_TASKQUEUE_CACHE_CONCURRENT_ACTIVITIES'
]
? Number(process.env['CROWD_TEMPORAL_TASKQUEUE_CACHE_CONCURRENT_ACTIVITIES'])
: undefined,
postgres: {
enabled: true,
},
opensearch: {
enabled: false,
},
}

export const svc = new ServiceWorker(config, options)

setImmediate(async () => {
await svc.init()
await svc.start()
})
10 changes: 10 additions & 0 deletions services/apps/script_executor_worker/src/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
export interface IFindAndMergeMembersWithSameVerifiedEmailsInDifferentPlatformsArgs {
tenantId: string
afterHash?: number
}

export interface IFindAndMergeMembersWithSameIdentitiesDifferentCapitalizationInPlatformArgs {
tenantId: string
platform: string
afterHash?: number
}
7 changes: 7 additions & 0 deletions services/apps/script_executor_worker/src/workflows.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization } from './workflows/findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization'
import { findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms } from './workflows/findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms'

export {
findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms,
findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { continueAsNew, proxyActivities } from '@temporalio/workflow'

import * as activities from '../activities/merge-members-with-similar-identities'
import * as commonActivities from '../activities/common'

import { IFindAndMergeMembersWithSameIdentitiesDifferentCapitalizationInPlatformArgs } from '../types'

const activity = proxyActivities<typeof activities>({
startToCloseTimeout: '3 minute',
retry: { maximumAttempts: 3 },
})

const common = proxyActivities<typeof commonActivities>({
startToCloseTimeout: '3 minute',
retry: { maximumAttempts: 3 },
})

export async function findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization(
args: IFindAndMergeMembersWithSameIdentitiesDifferentCapitalizationInPlatformArgs,
): Promise<void> {
const PROCESS_MEMBERS_PER_RUN = 10

const mergeableMemberCouples =
await activity.findMembersWithSamePlatformIdentitiesDifferentCapitalization(
args.tenantId,
args.platform,
PROCESS_MEMBERS_PER_RUN,
args.afterHash || undefined,
)

if (mergeableMemberCouples.length === 0) {
console.log(`Finished processing!`)
return
}

for (const couple of mergeableMemberCouples) {
console.log(
`Merging ${couple.secondaryMemberId} [${couple.secondaryMemberIdentityValue}] into ${couple.primaryMemberId} [${couple.primaryMemberIdentityValue}]! `,
)
await common.mergeMembers(couple.primaryMemberId, couple.secondaryMemberId, args.tenantId)
}

await continueAsNew<typeof findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization>({
tenantId: args.tenantId,
platform: args.platform,
afterHash: mergeableMemberCouples[mergeableMemberCouples.length - 1]?.hash,
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { continueAsNew, proxyActivities } from '@temporalio/workflow'

import * as activities from '../activities/merge-members-with-similar-identities'
import * as commonActivities from '../activities/common'
import { IFindAndMergeMembersWithSameVerifiedEmailsInDifferentPlatformsArgs } from '../types'

const activity = proxyActivities<typeof activities>({
startToCloseTimeout: '3 minute',
retry: { maximumAttempts: 3 },
})

const common = proxyActivities<typeof commonActivities>({
startToCloseTimeout: '3 minute',
retry: { maximumAttempts: 3 },
})

export async function findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms(
args: IFindAndMergeMembersWithSameVerifiedEmailsInDifferentPlatformsArgs,
): Promise<void> {
const PROCESS_MEMBERS_PER_RUN = 10

const mergeableMemberCouples =
await activity.findMembersWithSameVerifiedEmailsInDifferentPlatforms(
args.tenantId,
PROCESS_MEMBERS_PER_RUN,
args.afterHash || undefined,
)

if (mergeableMemberCouples.length === 0) {
console.log(`Finished processing!`)
return
}

for (const couple of mergeableMemberCouples) {
console.log(
`Merging ${couple.secondaryMemberId} [${couple.secondaryMemberIdentityValue}] into ${couple.primaryMemberId} [${couple.primaryMemberIdentityValue}]! `,
)
await common.mergeMembers(couple.primaryMemberId, couple.secondaryMemberId, args.tenantId)
}

await continueAsNew<typeof findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms>({
tenantId: args.tenantId,
afterHash: mergeableMemberCouples[mergeableMemberCouples.length - 1]?.hash,
})
}
12 changes: 12 additions & 0 deletions services/apps/script_executor_worker/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"compilerOptions": {
"target": "es2017",
"module": "Node16",
"lib": ["es6", "es7", "es2017", "es2017.object", "es2015.promise", "ES2021.String"],
"skipLibCheck": true,
"moduleResolution": "node16",
"experimentalDecorators": true,
"esModuleInterop": true
},
"include": ["src/**/*"]
}