Skip to content

Commit

Permalink
Merge pull request #4651 from activepieces/fix/continue-flow-on-pause
Browse files Browse the repository at this point in the history
fix: continue flow on pause
  • Loading branch information
abuaboud committed May 9, 2024
2 parents 43d71c7 + facba8e commit 8f00e13
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 7 deletions.
29 changes: 28 additions & 1 deletion packages/server/api/src/app/flows/flow-run/flow-run-service.ts
Expand Up @@ -78,6 +78,31 @@ async function updateFlowRunToLatestFlowVersionId(
})
}

function returnHandlerId(pauseMetadata: PauseMetadata | undefined, requestId: string | undefined): string {
const handlerId = engineResponseWatcher.getHandlerId()
if (isNil(pauseMetadata)) {
return handlerId
}

if (pauseMetadata.type === PauseType.WEBHOOK && requestId === pauseMetadata.requestId && pauseMetadata.handlerId) {
return pauseMetadata.handlerId
}
else {
return handlerId
}
}

function modifyPauseMetadata(pauseMetadata: PauseMetadata): PauseMetadata {
if (pauseMetadata.type === PauseType.WEBHOOK) {
return {
...pauseMetadata,
handlerId: engineResponseWatcher.getHandlerId(),
}
}

return pauseMetadata
}

export const flowRunService = {
async list({
projectId,
Expand Down Expand Up @@ -173,6 +198,8 @@ export const flowRunService = {
flowRunId: flowRunToResume.id,
projectId: flowRunToResume.projectId,
flowVersionId: flowRunToResume.flowVersionId,
synchronousHandlerId: returnHandlerId(pauseMetadata, requestId),
hookType: HookType.BEFORE_LOG,
executionType,
environment: RunEnvironment.PRODUCTION,
})
Expand Down Expand Up @@ -292,7 +319,7 @@ export const flowRunService = {
status: FlowRunStatus.PAUSED,
logsFileId: logFileId,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
pauseMetadata: pauseMetadata as any,
pauseMetadata: modifyPauseMetadata(pauseMetadata) as any,
})

const flowRun = await flowRunRepo.findOneByOrFail({ id: flowRunId })
Expand Down
17 changes: 14 additions & 3 deletions packages/server/api/src/app/flows/flow.module.ts
@@ -1,3 +1,4 @@
import EventEmitter from 'events'
import { FastifyPluginAsyncTypebox } from '@fastify/type-provider-typebox'
import { accessTokenManager } from '../authentication/lib/access-token-manager'
import { websocketService } from '../websockets/websockets.service'
Expand All @@ -9,7 +10,7 @@ import { folderController } from './folder/folder.controller'
import { stepRunService } from './step-run/step-run-service'
import { testTriggerController } from './test-trigger/test-trigger-controller'
import { logger } from '@activepieces/server-shared'
import { CreateStepRunRequestBody, StepRunResponse, TestFlowRunRequestBody, WebsocketClientEvent, WebsocketServerEvent } from '@activepieces/shared'
import { CreateStepRunRequestBody, FlowRun, isFlowStateTerminal, StepRunResponse, TestFlowRunRequestBody, WebsocketClientEvent, WebsocketServerEvent } from '@activepieces/shared'

export const flowModule: FastifyPluginAsyncTypebox = async (app) => {
await app.register(flowVersionController, { prefix: '/v1/flows' })
Expand All @@ -18,14 +19,24 @@ export const flowModule: FastifyPluginAsyncTypebox = async (app) => {
await app.register(testTriggerController, { prefix: '/v1/test-trigger' })
websocketService.addListener(WebsocketServerEvent.TEST_FLOW_RUN, (socket) => {
return async (data: TestFlowRunRequestBody) => {
const eventEmitter = new EventEmitter()
const principal = await accessTokenManager.extractPrincipal(socket.handshake.auth.token)
const flowRun = await flowRunService.test({
projectId: principal.projectId,
flowVersionId: data.flowVersionId,
})

socket.emit(WebsocketClientEvent.TEST_FLOW_RUN_STARTED, flowRun)
await engineResponseWatcher.listen(flowRun.id, false)
socket.emit(WebsocketClientEvent.TEST_FLOW_RUN_FINISHED, flowRun)

eventEmitter.on(WebsocketClientEvent.TEST_FLOW_RUN_PROGRESS, (flowRunResponse: FlowRun) => {
if (isFlowStateTerminal(flowRunResponse.status)) {
eventEmitter.removeAllListeners()
engineResponseWatcher.removeListener(flowRun.id)
}
socket.emit(WebsocketClientEvent.TEST_FLOW_RUN_PROGRESS, flowRunResponse)
})

await engineResponseWatcher.listenAndEmit(flowRun.id, eventEmitter, flowRunService.getOneOrThrow({ id: flowRun.id, projectId: principal.projectId }))
}
})
websocketService.addListener(WebsocketServerEvent.TEST_STEP_RUN, (socket) => {
Expand Down
@@ -1,8 +1,9 @@
import { EventEmitter } from 'events'
import { logger } from '@sentry/utils'
import { StatusCodes } from 'http-status-codes'
import { pubSub } from '../../helper/pubsub'
import { system, SystemProp } from '@activepieces/server-shared'
import { apId } from '@activepieces/shared'
import { apId, FlowRunStatus, WebsocketClientEvent } from '@activepieces/shared'

const listeners = new Map<string, (flowResponse: EngineHttpResponse) => void>()

Expand All @@ -24,6 +25,9 @@ export const engineResponseWatcher = {
getHandlerId(): string {
return HANDLER_ID
},
removeListener(requestId: string): void {
listeners.delete(requestId)
},
async init(): Promise<void> {
logger.info('[engineWatcher#init] Initializing engine run watcher')

Expand All @@ -42,6 +46,19 @@ export const engineResponseWatcher = {
},
)
},
async listenAndEmit(requestId: string, event: EventEmitter, driver: Promise<any>): Promise<void> {
logger.info(`[engineWatcher#listenAndEmit] requestId=${requestId}`)

const listenStatus = async () => {
const finalFlowRun = await driver
event.emit(WebsocketClientEvent.TEST_FLOW_RUN_PROGRESS, finalFlowRun)
if (finalFlowRun.status !== FlowRunStatus.SUCCEEDED) {
await listenStatus()
}
}

await listenStatus()
},
async listen(requestId: string, timeoutRequest: boolean): Promise<EngineHttpResponse> {
logger.info(`[engineWatcher#listen] requestId=${requestId}`)
return new Promise((resolve) => {
Expand Down
1 change: 1 addition & 0 deletions packages/shared/src/index.ts
Expand Up @@ -60,6 +60,7 @@ export { DelayPauseMetadata, PauseMetadata, WebhookPauseMetadata } from './lib/f
export * from './lib/federated-authn'
export { STORE_KEY_MAX_LENGTH } from './lib/store-entry/store-entry'
export { RetryFlowRequestBody } from './lib/flow-run/test-flow-run-request'
export * from './lib/flow-run/flow-status'
export * from './lib/flows/dto/flow-template-request'
// Look at https://github.com/sinclairzx81/typebox/issues/350
TypeSystem.ExactOptionalPropertyTypes = false
Expand Down
Expand Up @@ -29,6 +29,7 @@ export const WebhookPauseMetadata = Type.Object({
type: Type.Literal(PauseType.WEBHOOK),
requestId: Type.String(),
response: Type.Unknown(),
handlerId: Type.Optional(Type.String({})),
})
export type WebhookPauseMetadata = Static<typeof WebhookPauseMetadata>

Expand Down
5 changes: 5 additions & 0 deletions packages/shared/src/lib/flow-run/flow-status.ts
@@ -0,0 +1,5 @@
import { FlowRunStatus } from './execution/flow-execution'

export const isFlowStateTerminal = (status: FlowRunStatus): boolean => {
return status === FlowRunStatus.SUCCEEDED || status === FlowRunStatus.FAILED || status === FlowRunStatus.INTERNAL_ERROR || status === FlowRunStatus.QUOTA_EXCEEDED
}
2 changes: 1 addition & 1 deletion packages/shared/src/lib/websocket/index.ts
Expand Up @@ -2,7 +2,7 @@

export enum WebsocketClientEvent {
TEST_FLOW_RUN_STARTED = 'TEST_FLOW_RUN_STARTED',
TEST_FLOW_RUN_FINISHED = 'TEST_FLOW_RUN_FINISHED',
TEST_FLOW_RUN_PROGRESS = 'TEST_FLOW_RUN_PROGRESS',
GENERATE_CODE_FINISHED = 'GENERATE_CODE_FINIISHED',
TEST_STEP_FINISHED = 'TEST_STEP_FINISHED',
}
Expand Down
Expand Up @@ -112,7 +112,7 @@ export class TestFlowWidgetComponent implements OnInit {
);

this.testResult$ = this.websockService.socket
.fromEvent<FlowRun>(WebsocketClientEvent.TEST_FLOW_RUN_FINISHED)
.fromEvent<FlowRun>(WebsocketClientEvent.TEST_FLOW_RUN_PROGRESS)
.pipe(
switchMap((flowRun) => {
return this.instanceRunService.get(flowRun.id);
Expand Down

0 comments on commit 8f00e13

Please sign in to comment.