Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ordered consumer does not reset in case of missing heartbeats #510

Open
Kazmirchuk opened this issue Oct 21, 2023 · 0 comments
Open

Ordered consumer does not reset in case of missing heartbeats #510

Kazmirchuk opened this issue Oct 21, 2023 · 0 comments
Labels
defect Suspected defect such as a bug or regression

Comments

@Kazmirchuk
Copy link

Observed behavior

Consider the following code:

import logging
import asyncio
import nats
from nats.js.api import ConsumerConfig, StreamConfig, StorageType
import sys

logging.basicConfig(stream=sys.stdout, level=logging.INFO)

async def consume():
    nc = await nats.connect() 
    js = nc.jetstream()
    await js.add_stream(StreamConfig(name="MY_STREAM", subjects=["test.*"], storage=StorageType.MEMORY))
    subject = "test.1"
    for m in ['1', '2', '3']:
        await js.publish(subject=subject, payload=m.encode())

    async def cb(msg):
        logging.info(f'Got {msg}')
    sub = await js.subscribe(
        subject,
        cb=cb,
        ordered_consumer=True,
        idle_heartbeat=0.5
    )
    info = await sub.consumer_info()
    logging.info(f"SUB: {info.name}")
    await js.delete_consumer("MY_STREAM", info.name)
    await asyncio.sleep(3)  # now the consumer should reset due to missing HB
    try:
        info = await sub.consumer_info()
        logging.info(f"SUB: {info.name}")  # should print a new name
    except nats.js.errors.Error as e:
        logging.exception(e)

    await js.delete_stream("MY_STREAM")
    await nc.close()


async def main():
    consumer_task = asyncio.create_task(consume())
    try:
        await asyncio.wait([consumer_task], return_when=asyncio.FIRST_COMPLETED)
    except Exception as e:
        logging.exception(e)


asyncio.run(main())

produces the following output:

C:\playground\nats.py\.venv\Scripts\python.exe C:\playground\nats.py\examples\bar.py 
INFO:root:Got Msg(_client=<nats client v2.4.0>, subject='test.1', reply='$JS.ACK.MY_STREAM.KnjEVTb4.1.1.1.1697887152782136300.2', data=b'1', headers=None, _metadata=None, _ackd=False, _sid=2)
INFO:root:Got Msg(_client=<nats client v2.4.0>, subject='test.1', reply='$JS.ACK.MY_STREAM.KnjEVTb4.1.2.2.1697887152782136300.1', data=b'2', headers=None, _metadata=None, _ackd=False, _sid=2)
INFO:root:Got Msg(_client=<nats client v2.4.0>, subject='test.1', reply='$JS.ACK.MY_STREAM.KnjEVTb4.1.3.3.1697887152782675900.0', data=b'3', headers=None, _metadata=None, _ackd=False, _sid=2)
INFO:root:SUB: KnjEVTb4
ERROR:root:nats: NotFoundError: code=404 err_code=10014 description='consumer not found'
Traceback (most recent call last):
  File "C:\playground\nats.py\examples\bar.py", line 38, in consume
    info = await sub.consumer_info()
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\playground\nats.py\nats\js\client.py", line 674, in consumer_info
    info = await self._js._jsm.consumer_info(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\playground\nats.py\nats\js\manager.py", line 163, in consumer_info
    resp = await self._api_request(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\playground\nats.py\nats\js\manager.py", line 377, in _api_request
    raise APIError.from_error(resp['error'])
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\playground\nats.py\nats\js\errors.py", line 86, in from_error
    raise NotFoundError(**err)
nats.js.errors.NotFoundError: nats: NotFoundError: code=404 err_code=10014 description='consumer not found'

Process finished with exit code 0

Expected behavior

The ordered consumer detects missing HB and re-creates the push consumer in NATS; second call to sub.consumer_info() returns updated info. No errors are raised.

Server and client version

NATS Server 2.10.3
nats.py 2.4.0

Host environment

No response

Steps to reproduce

No response

@Kazmirchuk Kazmirchuk added the defect Suspected defect such as a bug or regression label Oct 21, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

No branches or pull requests

1 participant