Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KeyDecodeError when group_by changes key type #543

Open
tapple opened this issue Mar 6, 2020 · 2 comments · May be fixed by #746
Open

KeyDecodeError when group_by changes key type #543

tapple opened this issue Mar 6, 2020 · 2 comments · May be fixed by #746

Comments

@tapple
Copy link

tapple commented Mar 6, 2020

stream.group_by does not appear to set it's topic's type properly, if it's different than the source stream's key type. Here's an example program that demonstrates converting from int key to str key:

import faust

app = faust.App('example', broker='kafka://')


class StringModel(faust.Record):
    string_value: str


int_topic = app.topic(
    'int_topic',
    key_type=int,
    value_type=StringModel,
    internal=True,
)


@app.agent(int_topic)
async def int_agent(int_topic):
    async for k, v in int_topic.group_by(StringModel.string_value).items():
        print(f'{k}: {v}')


@app.task
async def example_sender(app):
    await int_topic.send(key='5', value=StringModel(string_value="five"))


if __name__ == '__main__':
    app.main()

run the program like python bugdemo.py worker

Running the program generates a KeyDecodeError. It should not do that. A workaround is to explicitly specify the group_by topic to one with the correct key type

Full traceback

Traceback (most recent call last):
  File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/faust/serializers/registry.py", line 56, in loads_key
    return cast(K, self._prepare_payload(typ, payload))
  File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/faust/serializers/registry.py", line 111, in _prepare_payload
    return int(want_str(value))
ValueError: invalid literal for int() with base 10: 'five'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/faust/agents/agent.py", line 646, in _execute_actor
    await coro
  File "/Users/tapple/cabbage/parsers/common/deepminer/async2/bugdemo.py", line 20, in int_agent
    async for k, v in int_topic.group_by(StringModel.string_value).items():
  File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/faust/streams.py", line 290, in items
    async for event in self.events():
  File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/faust/streams.py", line 299, in events
    async for _ in self:  # noqa: F841
  File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/faust/streams.py", line 768, in _c_aiter
    value, sensor_state = await it.next()  # noqa: B305
  File "faust/_cython/streams.pyx", line 87, in next
  File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/faust/channels.py", line 502, in __anext__
    return await self.queue.get()
  File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/mode/utils/queues.py", line 129, in get
    return await super().get()
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/queues.py", line 167, in get
    yield from getter
  File "faust/transport/_cython/conductor.pyx", line 54, in faust.transport._cython.conductor.ConductorHandler.__call__
  File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/faust/serializers/schemas.py", line 137, in decode
    k: K = schema_loads_key(app, message, loads=loads_key)
  File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/faust/serializers/schemas.py", line 75, in loads_key
    serializer=serializer or self.key_serializer,
  File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/faust/serializers/registry.py", line 61, in loads_key
    sys.exc_info()[2]) from exc
  File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/faust/serializers/registry.py", line 56, in loads_key
    return cast(K, self._prepare_payload(typ, payload))
  File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/faust/serializers/registry.py", line 111, in _prepare_payload
    return int(want_str(value))
faust.exceptions.KeyDecodeError: invalid literal for int() with base 10: 'five'

Versions

  • Python version 3.6.4
  • Faust version 1.10.2
  • Operating system MacOS 10.15.3
  • Kafka version 2.4.0
  • RocksDB version (if applicable)
@tapple
Copy link
Author

tapple commented Mar 6, 2020

Here's the workaround. It works:

import faust

app = faust.App('example', broker='kafka://')


class StringModel(faust.Record):
    string_value: str


int_topic = app.topic(
    'int_topic',
    key_type=int,
    value_type=StringModel,
    internal=True,
)


str_topic = app.topic(
    'str_topic',
    key_type=str,
    value_type=StringModel,
    internal=True,
)


@app.agent(int_topic)
async def int_agent(int_topic):
    async for k, v in int_topic.group_by(StringModel.string_value, topic=str_topic).items():
        print(f'{k}: {v}')


@app.task
async def example_sender(app):
    await int_topic.send(key='5', value=StringModel(string_value="five"))


if __name__ == '__main__':
    app.main()

pscottdevos pushed a commit to pscottdevos/faust that referenced this issue Dec 26, 2021
robinhood#543

Passes key_type to derive if key has "type" attribute.
@pscottdevos pscottdevos linked a pull request Dec 26, 2021 that will close this issue
@pscottdevos
Copy link

I have submitted a PR that fixes this issue.

#746

pscottdevos pushed a commit to pscottdevos/faust that referenced this issue Dec 27, 2021
robinhood#543

Passes key_type to derive if key has "type" attribute.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants