Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to set workers message treatment concurrency ? #77

Open
hashbulla opened this issue Feb 16, 2022 · 5 comments
Open

How to set workers message treatment concurrency ? #77

hashbulla opened this issue Feb 16, 2022 · 5 comments

Comments

@hashbulla
Copy link

hashbulla commented Feb 16, 2022

Description

Hey guys, thanks for the good work on this project, it rocks.

Although, I have not been able to play with my workers message treatment concurrency.
In python you can pass the --concurrency flag to the celery worker command. How can I achieve that with celery-node ?

The best scenario for me would be to set my workers message treatment concurrency to 1. I am using k8s and want to have full controle on scaling. Not being able to set concurrency to an arbitrary value goes against it.

Thanks !

@actumn
Copy link
Owner

actumn commented Feb 16, 2022

Hi, PoiraudVictor!
Thank you for your attention to celery.node project.

I'm not sure that I fully understand, but there's no control worker_threads or promise in celery.node.
The only part of controlling promise is below and it's hard-coded as 1.

public subscribe(queue: string, callback: Function): Promise<any[]> {
const promiseCount = 1;
return this.isReady().then(() => {
for (let index = 0; index < promiseCount; index += 1) {
this.channels.push(
new Promise(resolve => this.receive(index, resolve, queue, callback))
);
}
return Promise.all(this.channels);
});
}

Sorry for not being of much help.
It would be nice if you can describe your environment and codes more.

@hashbulla
Copy link
Author

hashbulla commented Feb 16, 2022

Thanks a lot actumn for your quick answer.

I have probably been unclear in my first message.

Actually I have 2 services, the first one (service A) sends messages to the second one (service B). When service B has finished processing a message, it sends it to a new queue (q1). Service B is listening to a queue (q0) for messages from service A. Although, if A sends a message m1 to q0 while B is still processing a previous message m0, B will process m1 and m0 in parallel.
I want to process them in series not in parallel, so B should wait for finishing the processing of m0 before processing m1.

How can I achieve this with celery-node ? In classic python celery, I would pass the --concurrency=1 to my celery worker <...> command.

Hereafter, service B code :

import { createClient as createPublisher } from "celery-node";
import { createWorker as createConsumer } from "celery-node";

const { CELERY_BROKER, CELERY_BACKEND, Q0,  Q1 } = process.env;

const celeryConsumer: CeleryConsumer = createConsumer(
    CELERY_BROKER,
    CELERY_BACKEND,
    Q0
  );
  const celeryPublisher: CeleryPublisher = createPublisher(
    CELERY_BROKER,
    CELERY_BACKEND,
    Q1
  );
celeryConsumer.register("some.task", async (message: CustomMessageType) => {
    <...message-processing-code...>;
    const task: Task = celeryPublisher.createTask("some.other.task");
    task.applyAsync([someArgs]);
  });
  await celeryConsumer.start();

I run my code locally on both macos Monterey v12.1 and Ubuntu container.
Node : v17.4.0
Typescript : v4.5.5

@akhilacubrio
Copy link

were you able to solve this?

@imdark
Copy link

imdark commented Dec 19, 2022

I implemented solo execution in my fork but only for redis right now, I will try to add more options next week,
https://github.com/imdark/celery.node

@shoham112233
Copy link

shoham112233 commented Jun 11, 2023

came over this issue and I have a cool solution that may be very helpful for rabbitMQ.
you

const message = require('celery-node/dist/kombu/message');


class AMQPMessage extends message.Message {
    constructor(payload: any) {
        super(payload.content, payload.properties.contentType, payload.properties.contentEncoding, payload.properties, payload.properties.headers);
    }
}

worker.broker.subscribe = (queue: string, callback: any) => {
    worker.broker.channel
        .then((ch: amqplib.Channel) => {
            ch.assertQueue(queue, {
                durable: true,
                autoDelete: false,
                exclusive: false,
                // nowait: false,
                arguments: null
            })
                .then(() => Promise.resolve(ch))
        })
    return worker.broker.channel.then(
        (ch: amqplib.Channel) => {
            ch.consume(queue, (rawMsg: any) => {
                if (runningTasks === 0) {
                    // When runningTasks === 0 process the task
                    ch.ack(rawMsg);
                    // now supports only application/json of content-type
                    if (rawMsg.properties.contentType !== "application/json") {
                        throw new Error(`unsupported content type ${rawMsg.properties.contentType}`);
                    }
                    // now supports only utf-8 of content-encoding
                    if (rawMsg.properties.contentEncoding !== "utf-8") {
                        throw new Error(`unsupported content encoding ${rawMsg.properties.contentEncoding}`);
                    }
                    callback(new AMQPMessage(rawMsg));
                } else {
                    // When runningTasks != 0 reject the task and requeue it.
                    ch.reject(rawMsg, true);
                }
            });
        }
    );
}

I don't think its the best solution but after using it after your const worker = celery.createWorker it should do the trick

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants