Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GlobalTable not working when using multiple instances #751

Open
2 tasks done
Hamdiovish opened this issue Mar 9, 2022 · 0 comments
Open
2 tasks done

GlobalTable not working when using multiple instances #751

Hamdiovish opened this issue Mar 9, 2022 · 0 comments

Comments

@Hamdiovish
Copy link

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Running the following app in 2 instances, then by pushing a message to one partition, the instance listening to the related partition detect the message and update the GlobalTable, whilst the other instance will not update its version of the GlobalTable and keep showing nothing.

import faust
from faust.cli import option

app = faust.App(
    'hello-world',
    broker='kafka://localhost:29092',
    key_serializer='raw',
    value_serializer='raw',
	store="rocksdb://", 
	topic_disable_leader=True,
    )

greetings_topic = app.topic('greetings-topic',partitions=2,key_type=str, value_type=str,internal=True)
greetings_table = app.GlobalTable('greetings-table',partitions=2,key_type=str, value_type=str,default=str)

@app.agent(greetings_topic)
async def greet(greetings):
	"""run: faust -A app worker"""
	async for event in greetings.events():
		k = event.key
		v = event.value
		print(f"processing: {k}: {v} on partition: {event.message}")
		greetings_table[k]=v

@app.command(
	option('--k', type=str, default='a',help='The key.'),
	option('--v', type=str, default='0',help='The value.'))
async def sim_greeting(self, k, v, p):
	"""simulate: faust -A app01 sim-greeting --k a --v 0"""
	await greetings_topic.send(key=k,value=v)

@app.timer(interval=1.0)
async def plot(app):
	for k in greetings_table:
		print(f'[{k}:{greetings_table[k]}]')

Expected behavior

All instances should detect GlobalTable updates.

Actual behavior

The GlobalTable is not synced across instances.

Versions

  • Python version: 3.7.12
  • Faust version: v1.10.3
  • Operating system: Ubuntu 18
  • Kafka version: 2.8-IV1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant