Crowd linux /services folder merge #1934
Draft
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Changes proposed ✍️
What
🤖[deprecated] Generated by Copilot at 9a84aea
This pull request adds features, fixes bugs, and improves performance for the automations worker and the data sink worker services. It adds support for checking creation timestamps, custom workflow IDs, and reuse policies for automations, and for delaying and retrying results, soft deleting activities, and measuring metrics for the data sink worker. It also migrates to use
pnpm
as the package manager, and updates the code style and logic of several methods and queries. It modifies the interfaces and queries inautomation.repo.ts
,types.ts
,automation.service.ts
,newActivityAutomations.ts
,newMemberAutomations.ts
,dataSink.repo.ts
,activity.repo.ts
,dataSink.service.ts
,activity.service.ts
,data.repo.ts
,activity.data.ts
, anddataSink.data.ts
, and adds new classes and functions inpackage.json
,processOldResults.ts
,main.ts
,index.ts
,default.json
, andindex.ts
. It also fixes a bug intriggerAutomation
andmember.repo.ts
.
🤖[deprecated] Generated by Copilot at 9a84aea
Why
How
🤖[deprecated] Generated by Copilot at 9a84aea
pnpm
as the package manager and update thedev:local
anddev
scripts inpackage.json
files (link, link)createdAt
field to theIRelevantAutomationData
interface and thegetRelevantAutomations
query inautomation.repo.ts
to store and fetch the creation timestamp of each automation (link, link)joinedAt
field to theIMemberData
interface intypes.ts
to store the timestamp of when a member joined a community (link)timestamp
field to theIActivityData
interface intypes.ts
to store the timestamp of when an activity was created (link)shouldProcess
variable and theif
conditions to theprocessNewMemberAutomation
andprocessNewActivityAutomation
methods inautomation.service.ts
to check the creation timestamps of members, activities, and automations, and log warnings and skip processing if the member or activity was created before the automation (link, link, link, link, link, link, link, link, link, link)WorkflowIdReusePolicy
andworkflowInfo
imports to thenewActivityAutomations.ts
andnewMemberAutomations.ts
files, and use them to declare theinfo
variable and pass theworkflowId
,workflowIdReusePolicy
,retry
, andsearchAttributes
options to theexecuteChild
calls in theprocessNewActivityAutomation
andprocessNewMemberAutomation
functions, to use a custom workflow ID and reuse policy, retry the child workflow with the same maximum attempts as the parent workflow, and add the tenant ID as a search attribute (link, link, link, link, link, link)@crowd/telemetry
dependency to thepackage.json
file in thedata_sink_worker
folder, and use thetelemetry
import to measure and increment metrics for the data sink worker in thedataSink.service.ts
file (link, link, link, link)maxStreamRetries
option to theworker
section in thedefault.json
file, and theIWorkerConfig
interface and theWORKER_SETTINGS
function to theindex.ts
file in theconf
folder, to allow configuring the worker settings, such as the maximum number of stream retries (link, link)DataSinkWorkerEmitter
import to themain.ts
,index.ts
in thequeue
folder, anddataSink.service.ts
files, and use it to declare, initialize, and pass thedataSinkWorkerEmitter
variable to theWorkerQueueReceiver
constructor, theprocessOldResultsJob
function, and theDataSinkService
constructor, and to handle theDataSinkWorkerQueueMessageType.CHECK_RESULTS
message type in thehandleMessage
method, to use the data sink worker emitter to trigger the result processing for the delayed results (link, link, link, link, link, link, link, link, link, link)retries
anddelayedUntil
fields to theIResultData
interface in thedataSink.data.ts
file, and thegetResult
,resetResults
,delayResult
, andgetDelayedResults
methods in thedataSink.repo.ts
file, to store, fetch, update, and query the retry count and the delay timestamp of the results (link, link, link, link, link)resultInfo
parameter to thetriggerResultError
method in thedataSink.service.ts
file, and use it to pass the result information to the error handler and themarkResultError
anddelayResult
calls, and to check the retry count and delay the result if it is below the maximum number of stream retries (link, link, link, link)deletedAt
field to theIDbActivity
interface in theactivity.data.ts
file, and thefindExisting
method and query in theactivity.repo.ts
file, to support soft deletion of activities and to fetch the deletion timestamp of an existing activity (link, link, link, link)if
condition and thelog.trace
call to theprocessActivity
method in theactivity.service.ts
file, to skip the processing if the existing activity is deleted (link)workflowId
option to use theTemporalWorkflowId.NEW_MEMBER_AUTOMATION
enum value in theprocessNewMemberAutomation
method in theactivity.service.ts
file, and add theTemporalWorkflowId
import to the file, to use a custom enum for the temporal workflow IDs (link, link)findByEmail
method in themember.repo.ts
file to use the@>
operator instead of the= ANY
operator, to use the index on theemails
array column (link)MAX_CONCURRENT_PROMISES
andMAX_RESULTS_TO_LOAD
constants from 50 and 200 to 2 and 10 respectively in theprocessOldResults.ts
file, to reduce the load on the database and the temporal server when processing old results (link)loadChildTables
parameter to theif
condition in thegetActivities
method indata.repo.ts
to check if the engagement score should be calculated for each activity, to avoid unnecessary queries (link)throw err
statement from thecatch
block in thetriggerAutomation
method inautomation.service.ts
, to avoid throwing errors that are already handled by thetriggerResultError
method (link)log.error
call from thecatch
block in theprocessOldResultsJob
function in theprocessOldResults.ts
file, to avoid logging the same error twice, since thetriggerResultError
method already logs the error (link)i
andbatchLength
variables from theprocessOldResultsJob
function in theprocessOldResults.ts
file, to avoid unnecessary variables and use theresultsToProcess.length
directly (link, link, link)currentIndex
,i
, andbatchLength
variables from theprocessOldResultsJob
function in theprocessOldResults.ts
file, to avoid unnecessary logging and variables (link)process
variable from theprocessNewMemberAutomation
andprocessNewActivityAutomation
methods inautomation.service.ts
, and use theshouldProcess
variable instead, to use a more descriptive name (link, link, link, link, link, link, link)&& activity.body
condition to theif
condition in theprocessNewActivityAutomation
method inautomation.service.ts
to check if the activity has a body before performing the keyword matching, to avoid errors when the body is null or undefined (link)if
condition to theprocessActivity
method in theactivity.service.ts
file, to avoid triggering the member sync for a null or undefined member ID (link)Checklist ✅
Feature
,Improvement
, orBug
.