Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Branching during Flow execution #2436

Open
vicpara opened this issue Feb 20, 2024 · 1 comment
Open

Branching during Flow execution #2436

vicpara opened this issue Feb 20, 2024 · 1 comment

Comments

@vicpara
Copy link

vicpara commented Feb 20, 2024

I am interested in knowing if this type of flow execution is possible. This is not necessarily a feature request. I'm using BullMQ with NodeJS.

In a given flow, the parent-children dependency tree is given at flow creation time.

Let's say that during the execution of a child task there is a boolean result 'answer'.
There are two chained strands of tasks that depend on this child task result 'answer'. Is it possible to run one branch and ignore the other depending on 'answer' value? Say for 'true' run one branch and for 'false' run the other?

Currently I'm designing a mechanism that relies on failing the first child of the non-executable branch and using 'ignoreDependencyOnFailure' when the branch results merge back together.
I was wondering if there are other primitives or if there's a more idiomatic way of achieving this.

@shaunakv1
Copy link

shaunakv1 commented Apr 17, 2024

@vicpara We have a similar use case and we use what we are calling a Orchestrator Worker pattern.

You have Orchestrator Queue. The workers in this queue can span Child Flow Tree with the parent being the main orchestrator worker.

When your Child Flow Tree finishes, your parent orchestrator worker executes. Here you can make a decision to span another flow tree based on previous tree's result.

You can design quite complex flow patterns this way.

Here's a code sample ( not complete, just an example)

class FlowOrchestrator {
    static async startProcess(jobDefination) {
        const orchestratorFlow = new FlowProducer({ connection });
        
        const childJobs = [{
            name: ``,
            opts: {},
            queueName: CHILD_QUEUE1,
            data: jobDefination,
        },
        {
                name: ``,
                opts: {},
                queueName: CHILD_QUEUE1,
                data: jobDefination,
        }];

        const orchestratorJob = {
            name: `${ORCHESTRATOR_QUEUE}_child1`,
            opts: {},
            queueName: ORCHESTRATOR_QUEUE,
            data:{},
            children: childJobs
        }

        await orchestratorFlow.add(childJobs);
    }

    static async startStep2(jobDefination) {
        const orchestratorFlow = new FlowProducer({ connection });

        const childJobs = [{
            name: ``,
            opts: {},
            queueName: CHILD_QUEUE2,
            data: jobDefination,
        },
        {
            name: ``,
            opts: {},
            queueName: CHILD_QUEUE2,
            data: jobDefination,
        }];

        const orchestratorJob = {
            name: `${ORCHESTRATOR_QUEUE}_child2`,
            opts: {},
            queueName: ORCHESTRATOR_QUEUE,
            data: {},
            children: childJobs
        }

        await orchestratorFlow.add(childJobs);
    }

    /**
     * BullMQ Orchestrator Worker Function
     */
    async start() {
        this.orchestratorWorker = new Worker(ORCHESTRATOR_QUEUE, async job => {
            const jobData = job.data;
            const currentJob = await job.asJSON()
            
            // In thise steps, we can now check return of the previous flow, 
            // and then decide to start the next step or not
            // or select some other step based on your logic
            if (currentJob.name.includes('_child1')) {
                await FlowOrchestrator.startStep2(job);
            }
            if (currentJob.name.includes('_child2')) {
                await FlowOrchestrator.startStep3(job)
            }
            return true;

        }, {
            connection,
            concurrency: 1
        });
    }
}

Note that in the steps, you can spwan more nested flow producers and chain them in a way that they always return to your main orchestrator. You can design very complicated workflows and decision trees this way. Another good thing about this is you can maintain the data flow between your steps using the main orchestrator.

Hope it helps.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants