Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka group.id to manage the number of workers Bytewax Docker #377

Open
rafael-ariascalles opened this issue Jan 18, 2024 · 3 comments
Open
Labels
needs triage New issue, needs triage

Comments

@rafael-ariascalles
Copy link

Bug Description

I am building a containerized application of Bytewax and Kafka.

I am expecting to manage the parallelism with the group.id, the topic has a partition of 10, so I can have 10 replicas of the worker.

from bytewax import operators as op
from bytewax.connectors.kafka import operators as kop
from bytewax.connectors.stdio import StdOutSink
from bytewax.dataflow import Dataflow

STREAM_BROKER = "kafka-bootstrap:9092"
TOPIC = "my-topic"
GROUP_ID = "A"

def get_flow():
    flow = Dataflow("processor-kafka")
    inp = kop.input("in", flow, brokers=[STREAM_BROKER], topics=[TOPIC], add_config={"group.id": GROUP_ID})
    op.output("out", inp.oks, StdOutSink())
    return flow

I was expecting to see different result by worker but all return the same value (meaning that they are not using consumer group

tracking the value I found this message:

%4|1705620172.932|CONFWARN|rdkafka#producer-1| [thrd:app]: Configuration property group.id is a consumer property and will be ignored by this producer instance

If I am setting up a input (consumer) Why I am receiving this messages. Is this the right approach to use bytewax with Kafka? Inst the kop.input a consumer?

Python version (python -V)

Python 3.11

Bytewax version (pip list | grep bytewax)

bytewax 18,1

Operating System version (uname -morp)

FROM bytewax/bytewax:0.18.1-python3.11

Relevant log output

%4|1705620172.932|CONFWARN|rdkafka#producer-1| [thrd:app]: Configuration property group.id is a consumer property and will be ignored by this producer instance

Steps to Reproduce

  1. Create a kafka Cluster
  2. Run docker-compose with Bytewax docker image --scale=3
  3. lookup for the logs
    (To reproduce exactly the problem we will have to setup the env)
@github-actions github-actions bot added the needs triage New issue, needs triage label Jan 18, 2024
@Psykopear
Copy link
Contributor

Psykopear commented Jan 19, 2024

Hi rafael.

The connector does not let kafka handle the offset with the group.id mechanism by default because we keep track of it in each partition internally. Bytewax does that to ensure that in the event of a restart, the dataflow can resume from the latest known checkpoint and ensure that all the messages will be processed at least once.
If you don't care about recovery, you can set group.id and enable auto.commit. But that means that when your dataflow restarts, you loose all the messages that were received but not processed, loosing the at-least-once semantics the operator normally offers.

The nice thing is that you don't need to use group.id to achieve the parallelization level you want, the connector already does that and creates a bytewax input partition for each partition in each topic, so if you have a single topic with 10 partitions, you can run the dataflow with 10 workers (or 10 processes), and each one will handle a single kafka partition.

The warning is related to the fact the we pass the same config used in the consumer to an AdminClient we use when bootstrapping the dataflow to retrieve the list of partitions for each topic (see here). That client is seen as a "producer", so you get the warning, but you can safely ignore that.

@rafael-ariascalles
Copy link
Author

Thanks for the response. @Psykopear

If I understand correctly, I will have to modify the entrypoint.sh used by the Image bytewax/bytewax:0.18.1-python3.11

#!/bin/sh

cd $BYTEWAX_WORKDIR
. /venv/bin/activate
python -m bytewax.run -w$NUM_WORKERS $BYTEWAX_PYTHON_FILE_PATH

echo 'Process ended.'

if [ "$BYTEWAX_KEEP_CONTAINER_ALIVE" = true ]
then
    echo 'Keeping container alive...';
    while :; do sleep 1; done
fi

NUM_WORKERS is the known number of partitions of a given topic.

so In a given Dockerfile I will just need to

FROM bytewax/bytewax:0.18.1-python3.11
ENV NUM_WORKERS=10
COPY entrypoint.sh /bytewax/entrypoint.sh

@whoahbot
Copy link
Contributor

whoahbot commented Jan 22, 2024

Hi @rafael-ariascalles,

You shouldn't need to modify the entrypoint, as most of these configuration options already accept environment variables.

As an example, you can set the number of workers per process using the environment variable BYTEWAX_WORKERS_PER_PROCESS.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs triage New issue, needs triage
Projects
None yet
Development

No branches or pull requests

3 participants