Skip to content

Commit

Permalink
Merge pull request #4657 from activepieces/fix/infinite-checking
Browse files Browse the repository at this point in the history
fix: infinite checking for run status
  • Loading branch information
abuaboud committed May 9, 2024
2 parents 2d3feb3 + 6a6e183 commit 7ffac90
Show file tree
Hide file tree
Showing 13 changed files with 108 additions and 83 deletions.
56 changes: 28 additions & 28 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Expand Up @@ -160,7 +160,7 @@
"openai": "4.17.5",
"papaparse": "5.4.1",
"pdf-parse": "1.1.1",
"pdf-text-reader": "5.0.0",
"pdf-text-reader": "4.1.0",
"pg": "8.11.3",
"pickleparser": "0.1.0",
"pino-loki": "2.1.3",
Expand Down
2 changes: 1 addition & 1 deletion packages/pieces/community/pdf/package.json
Expand Up @@ -3,7 +3,7 @@
"version": "0.0.1",
"dependencies": {
"@activepieces/pieces-framework": "*",
"pdf-text-reader": "5.0.0",
"pdf-text-reader": "4.1.0",
"tslib": "2.6.2"
},
"main": "./src/index.js",
Expand Down
Expand Up @@ -9,7 +9,8 @@ import { slack } from '@activepieces/piece-slack'
import { square } from '@activepieces/piece-square'
import { Piece } from '@activepieces/pieces-framework'
import { logger, rejectedPromiseHandler } from '@activepieces/server-shared'
import { ActivepiecesError, ALL_PRINCIPAL_TYPES,
import {
ActivepiecesError, ALL_PRINCIPAL_TYPES,
ErrorCode,
EventPayload,
isNil,
Expand Down
Expand Up @@ -199,7 +199,7 @@ export const flowRunService = {
projectId: flowRunToResume.projectId,
flowVersionId: flowRunToResume.flowVersionId,
synchronousHandlerId: returnHandlerId(pauseMetadata, requestId),
hookType: HookType.BEFORE_LOG,
hookType: HookType.AFTER_LOG,
executionType,
environment: RunEnvironment.PRODUCTION,
})
Expand Down
21 changes: 11 additions & 10 deletions packages/server/api/src/app/flows/flow.module.ts
@@ -1,4 +1,3 @@
import EventEmitter from 'events'
import { FastifyPluginAsyncTypebox } from '@fastify/type-provider-typebox'
import { accessTokenManager } from '../authentication/lib/access-token-manager'
import { websocketService } from '../websockets/websockets.service'
Expand All @@ -10,7 +9,7 @@ import { folderController } from './folder/folder.controller'
import { stepRunService } from './step-run/step-run-service'
import { testTriggerController } from './test-trigger/test-trigger-controller'
import { logger } from '@activepieces/server-shared'
import { CreateStepRunRequestBody, FlowRun, isFlowStateTerminal, StepRunResponse, TestFlowRunRequestBody, WebsocketClientEvent, WebsocketServerEvent } from '@activepieces/shared'
import { CreateStepRunRequestBody, isFlowStateTerminal, StepRunResponse, TestFlowRunRequestBody, WebsocketClientEvent, WebsocketServerEvent } from '@activepieces/shared'

export const flowModule: FastifyPluginAsyncTypebox = async (app) => {
await app.register(flowVersionController, { prefix: '/v1/flows' })
Expand All @@ -19,24 +18,26 @@ export const flowModule: FastifyPluginAsyncTypebox = async (app) => {
await app.register(testTriggerController, { prefix: '/v1/test-trigger' })
websocketService.addListener(WebsocketServerEvent.TEST_FLOW_RUN, (socket) => {
return async (data: TestFlowRunRequestBody) => {
const eventEmitter = new EventEmitter()
const principal = await accessTokenManager.extractPrincipal(socket.handshake.auth.token)
const flowRun = await flowRunService.test({
projectId: principal.projectId,
flowVersionId: data.flowVersionId,
})

socket.emit(WebsocketClientEvent.TEST_FLOW_RUN_STARTED, flowRun)

eventEmitter.on(WebsocketClientEvent.TEST_FLOW_RUN_PROGRESS, (flowRunResponse: FlowRun) => {
if (isFlowStateTerminal(flowRunResponse.status)) {
eventEmitter.removeAllListeners()
const eventEmitter = engineResponseWatcher.listen(flowRun.id)
eventEmitter.on(async (data) => {
const flowRun = await flowRunService.getOneOrThrow({
id: data.requestId,
projectId: principal.projectId,
})

if (isFlowStateTerminal(flowRun.status)) {
engineResponseWatcher.removeListener(flowRun.id)
}
socket.emit(WebsocketClientEvent.TEST_FLOW_RUN_PROGRESS, flowRunResponse)
socket.emit(WebsocketClientEvent.TEST_FLOW_RUN_PROGRESS, flowRun)
})

await engineResponseWatcher.listenAndEmit(flowRun.id, eventEmitter, flowRunService.getOneOrThrow({ id: flowRun.id, projectId: principal.projectId }))

}
})
websocketService.addListener(WebsocketServerEvent.TEST_STEP_RUN, (socket) => {
Expand Down
2 changes: 1 addition & 1 deletion packages/server/api/src/app/webhooks/webhook-controller.ts
Expand Up @@ -104,7 +104,7 @@ async function handleWebhook({ request, flowId, async, simulate }: { request: Fa
headers: {},
}
}
return engineResponseWatcher.listen(requestId, true)
return engineResponseWatcher.oneTimeListener(requestId, true)
}

async function convertRequest(request: FastifyRequest): Promise<EventPayload> {
Expand Down
1 change: 0 additions & 1 deletion packages/server/api/src/app/webhooks/webhook-service.ts
Expand Up @@ -69,7 +69,6 @@ export const webhookService = {
logger.info(
`[WebhookService#callback] flowInstance not found or not enabled ignoring the webhook, flowId=${flow.id}`,
)

throw new ActivepiecesError({
code: ErrorCode.FLOW_NOT_FOUND,
params: {
Expand Down
Expand Up @@ -3,7 +3,7 @@ import { flowService } from '../../../flows/flow/flow.service'
import { webhookService } from '../../../webhooks/webhook-service'
import { EngineHttpResponse, engineResponseWatcher } from '../engine-response-watcher'
import { WebhookJobData } from '../job-data'
import { isNil } from '@activepieces/shared'
import { FlowStatus, isNil } from '@activepieces/shared'

export const webhookConsumer = {
async consumeWebhook(data: WebhookJobData): Promise<void> {
Expand All @@ -17,6 +17,15 @@ export const webhookConsumer = {
})
return
}
const isPublishedAndEnabled = flow.status !== FlowStatus.ENABLED || isNil(flow.publishedVersionId)
if (isPublishedAndEnabled) {
await stopAndReply(data, {
status: StatusCodes.NOT_FOUND,
body: {},
headers: {},
})
return
}
const handshakeResponse = await webhookService.handshake({
flow,
payload,
Expand Down Expand Up @@ -51,7 +60,7 @@ export const webhookConsumer = {
return
}
const firstRun = runs[0]
const response = await engineResponseWatcher.listen(firstRun.id, true)
const response = await engineResponseWatcher.oneTimeListener(firstRun.id, true)
await stopAndReply(data, response)
},

Expand Down
@@ -1,19 +1,18 @@
import { EventEmitter } from 'events'
import { logger } from '@sentry/utils'
import { StatusCodes } from 'http-status-codes'
import { pubSub } from '../../helper/pubsub'
import { system, SystemProp } from '@activepieces/server-shared'
import { apId, FlowRunStatus, WebsocketClientEvent } from '@activepieces/shared'
import { system, SystemProp, TypedEventEmitter } from '@activepieces/server-shared'
import { apId } from '@activepieces/shared'

const listeners = new Map<string, (flowResponse: EngineHttpResponse) => void>()
const listeners = new Map<string, (flowResponse: EngineResponseWithId) => void>()

export type EngineHttpResponse = {
status: number
body: unknown
headers: Record<string, string>
}

type EngineResponseWithId = {
export type EngineResponseWithId = {
requestId: string
httpResponse: EngineHttpResponse
}
Expand All @@ -37,50 +36,47 @@ export const engineResponseWatcher = {
const parsedMessasge: EngineResponseWithId = JSON.parse(message)
const listener = listeners.get(parsedMessasge.requestId)
if (listener) {
listener(parsedMessasge.httpResponse)
listeners.delete(parsedMessasge.requestId)
listener(parsedMessasge)
}
logger.info(
`[engineWatcher#init] message=${parsedMessasge.requestId}`,
)
},
)
},
async listenAndEmit(requestId: string, event: EventEmitter, driver: Promise<any>): Promise<void> {
logger.info(`[engineWatcher#listenAndEmit] requestId=${requestId}`)

const listenStatus = async () => {
const finalFlowRun = await driver
event.emit(WebsocketClientEvent.TEST_FLOW_RUN_PROGRESS, finalFlowRun)
if (finalFlowRun.status !== FlowRunStatus.SUCCEEDED) {
await listenStatus()
}
}

await listenStatus()
listen(requestId: string): TypedEventEmitter<EngineResponseWithId> {
const eventEmitter = new TypedEventEmitter<EngineResponseWithId>()
logger.error('FUCK')
listeners.set(requestId, (data) => {
logger.error('ASH ' + data.requestId)
eventEmitter.emit(data)
})
return eventEmitter
},
async listen(requestId: string, timeoutRequest: boolean): Promise<EngineHttpResponse> {
async oneTimeListener(requestId: string, timeoutRequest: boolean): Promise<EngineHttpResponse> {
logger.info(`[engineWatcher#listen] requestId=${requestId}`)
return new Promise((resolve) => {
const defaultResponse: EngineHttpResponse = {
status: StatusCodes.NO_CONTENT,
body: {},
headers: {},
}
const responseHandler = (flowResponse: EngineHttpResponse) => {
clearTimeout(timeout)
resolve(flowResponse)
}
let timeout: NodeJS.Timeout
if (!timeoutRequest) {
listeners.set(requestId, resolve)
}
else {
if (timeoutRequest) {
const defaultResponse: EngineHttpResponse = {
status: StatusCodes.NO_CONTENT,
body: {},
headers: {},
}
timeout = setTimeout(() => {
this.removeListener(requestId)
resolve(defaultResponse)
}, WEBHOOK_TIMEOUT_MS)
listeners.set(requestId, responseHandler)

}
const responseHandler = (flowResponse: EngineResponseWithId) => {
if (timeout) {
clearTimeout(timeout)
}
this.removeListener(requestId)
resolve(flowResponse.httpResponse)
}
listeners.set(requestId, responseHandler)
})
},
async publish(
Expand Down
3 changes: 2 additions & 1 deletion packages/server/shared/package.json
Expand Up @@ -6,7 +6,8 @@
"@sentry/node": "7.64.0",
"pino": "8.18.0",
"pino-loki": "2.1.3",
"@activepieces/shared": "*"
"@activepieces/shared": "*",
"events": "3.3.0"
},
"type": "commonjs",
"main": "./src/index.js",
Expand Down
5 changes: 3 additions & 2 deletions packages/server/shared/src/index.ts
@@ -1,9 +1,10 @@
export * from './lib/exception-handler'
export * from './lib/typed-event-emitter'
export * from './lib/semaphore'
export * from './lib/file-compressor'
export * from './lib/file-system'
export * from './lib/package-manager'
export * from './lib/system/system'
export * from './lib/system/system-prop'
export * from './lib/promise-handler'
export * from './lib/logger'
export * from './lib/exception-handler'
export * from './lib/logger'

0 comments on commit 7ffac90

Please sign in to comment.