Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jestream pull_subscribe() not reconnecting after server reboot #504

Open
martinsson opened this issue Oct 9, 2023 · 2 comments
Open

jestream pull_subscribe() not reconnecting after server reboot #504

martinsson opened this issue Oct 9, 2023 · 2 comments
Labels
defect Suspected defect such as a bug or regression

Comments

@martinsson
Copy link

What version were you using?

v2.9.6

What environment was the server running in?

docker

Is this defect reproducible?

When the nats servers becomes unavailable for a short while any pull_subscription is lost (i.e. stops receiving messages), whereas push subscribers works as expected.

Reproduce

Run the following script. Reboot the nats server. Emit messages on the queue testnats.reconnec. Nothing is received

import asyncio

import nats

SUBJECT = "testnats.reconnect"
CLUSTER_ADDRESS = "nats://localhost:4222"


async def run(loop):
    async def disconnected_cb():
        print("Disconnected!")

    async def reconnected_cb():
        print("Reconnected!")

    async def error_cb(e):
        print("Error:", e)

    nc = await nats.connect(
        servers=[CLUSTER_ADDRESS],
        error_cb=error_cb,
        disconnected_cb=disconnected_cb,
        reconnected_cb=reconnected_cb,
        max_reconnect_attempts=-1,

    )

    js = nc.jetstream()

    async def message_handler(msg):
        subject = msg.subject
        reply = msg.reply
        data = msg.data.decode()
        print(
            "Received a message on '{subject} {reply}': {data}".format(
                subject=subject, reply=reply, data=data
            )
        )
        await msg.ack()


    # await js.subscribe(SUBJECT, cb=message_handler)
    subscription = await js.pull_subscribe(SUBJECT, "durable_name", "TEST_STREAM")
    while True:
        try:
            messages = await subscription.fetch(1, 10000000)
            for message in messages:
                await message_handler(message)
        except Exception as e:
            print(e)


if __name__ == "__main__":
    loop = asyncio.new_event_loop()
    loop.create_task(run(loop))
    loop.run_forever()

It works as expected if I uncomment the pull_subscribe logic and replace it with the commented out js.subscribe() line

Given the capability you are leveraging, describe your expectation?

Pull subscribers should reconnect after reboot

Given the expectation, what is the defect you are observing?

pull subscribers are not resubscribing

@martinsson martinsson added the defect Suspected defect such as a bug or regression label Oct 9, 2023
@charbonnierg
Copy link
Contributor

charbonnierg commented Oct 9, 2023

I tried to create a new test under tests/test_js.py:PullSubscribeTest and ran it:

    @async_long_test
    async def test_pull_subscribe_reconnect(self):
        srv = self.server_pool[0]
        nc = NATS()
        await nc.connect()
        js = nc.jetstream()

        await js.add_stream(name="TEST1", subjects=["foo.1", "bar"])

        for i in range(10):
            ack = await js.publish("foo.1", f"Hello from NATS!".encode())
            assert ack.stream == "TEST1"
            assert ack.seq == i + 1

        consumer = await js.pull_subscribe("foo.1", "dur")
        for i in range(10):
            # Test fails if we uncomment the following lines
            # srv.stop()
            # srv.start()
            msg, *rest = await consumer.fetch(1, timeout=10)
            assert msg.data == b"Hello from NATS!"
            assert msg.metadata.stream == "TEST1"
            await msg.ack()

This test passes, but if I uncomment the srv.stop() and srv.start() lines the test does not pass.

@wallyqs does this test describe correctly the auto-reconnect feature that we should expect from nats-py ?

Edit

The following test passes successfully (the call to .fetch_one() is wrapped within a while True loop with a lower timeout):

    @async_long_test
    async def test_pull_subscribe_reconnect(self):
        srv = self.server_pool[0]
        nc = NATS()
        await nc.connect(
            max_reconnect_attempts=-1,
            allow_reconnect=True,
            reconnect_time_wait=1,
        )
        js = nc.jetstream()

        await js.add_stream(name="TEST1", subjects=["foo.1", "bar"])

        for i in range(3):
            ack = await js.publish("foo.1", "Hello from NATS!".encode())
            assert ack.stream == "TEST1"
            assert ack.seq == i + 1
        consumer = await js.pull_subscribe("foo.1", "dur")
        for i in range(3):
            srv.stop()
            srv.start()
            while True:
                try:
                    msg, *rest = await consumer.fetch(1, timeout=0.5)
                except TimeoutError:
                    continue
                break
            assert msg.data == b"Hello from NATS!"
            assert msg.metadata.stream == "TEST1"
            await msg.ack()

It seems that using a big or infinite timeout leads to the error in first test, because the fetch request is never sent to the client, and will never succeed (regardless of timeout value).

For now, it seems preferable to use a lower timeout, and retry the fetch operation.

I'd say that the following lines are "responsible" for this behavior (nats/js/client.py:L828):

            await self._nc.publish(
                self._nms,
                json.dumps(next_req).encode(),
                self._deliver,
            )
            # Wait for the response or raise timeout.
            msg = await self._sub.next_msg(timeout)

This publish operation never fails, whether a message is effectively sent to NATS or not, so we end up waiting for the whole timeout (at least that seem to be the case for the code sample provided in this issue).

I'm wondering whether it is normal that the NATS client does not send the publish once it is reconnected though, naively I would think that the message to be published stays in the pending queue, and on reconnect the message is sent, so NATS will publish the message and the coroutine waiting on the subscription queue (e.g., PullSubscription._sub.next_msg) will still receive the message before the timeout, but that does not seem to be the case ? Maybe that's because flusher loop does not know that server is disconnected, and hits a BrokenPipe error while writing to transport so this message is lost 🤨?

It may be worth knowing that setting the connect option pending_size=0 will prevent the call to publish from succeeding and will make it so that an the error nats.errors.OutboundBufferLimitError: nats: outbound buffer limit exceeded is raised on L828. This way, client will never wait if message is not published.

This last test illustrates very well what is happening (run with pytest tests/test_js.py -k test_pull_subscribe_reconnect -s to see captured stderr in console):

    @async_long_test
    async def test_pull_subscribe_reconnect(self):
        srv = self.server_pool[0]
        nc = NATS()
        await nc.connect(
            max_reconnect_attempts=-1,
            allow_reconnect=True,
            reconnect_time_wait=1,
            pending_size=0,
        )
        js = nc.jetstream()

        await js.add_stream(name="TEST1", subjects=["foo.1", "bar"])

        for i in range(3):
            ack = await js.publish("foo.1", "Hello from NATS!".encode())
            assert ack.stream == "TEST1"
            assert ack.seq == i + 1
        consumer = await js.pull_subscribe("foo.1", "dur")
        for i in range(3):
            srv.stop()
            srv.start()
            await asyncio.sleep(0)
            while True:
                try:
                    msg, *rest = await consumer.fetch(1, timeout=0.5)
                except TimeoutError:
                    if nc.is_connected:
                        print("TIMEOUT - Retrying immediately", file=sys.stderr)
                    else:
                        print(
                            "TIMEOUT - Retrying in 250ms (currently disconnected)",
                            file=sys.stderr,
                        )
                        await asyncio.sleep(0.250)
                except OutboundBufferLimitError:
                    print("NOT CONNECTED - Retrying in 250ms", file=sys.stderr)
                    await asyncio.sleep(0.250)
                else:
                    print("OK - Got message", file=sys.stderr)
                    break
            assert msg.data == b"Hello from NATS!"
            assert msg.metadata.stream == "TEST1"
            await msg.ack()

Captured Output:

[DEBUG] Server listening on port 4222 started.
[DEBUG] Server listening on 4222 will stop.
[DEBUG] Server listening on 4222 was stopped.
[DEBUG] Server listening on port 4222 started.
TIMEOUT - Retrying in 250ms (currently disconnected)
NOT CONNECTED - Retrying in 250ms
NOT CONNECTED - Retrying in 250ms
OK - Got message
[DEBUG] Server listening on 4222 will stop.
[DEBUG] Server listening on 4222 was stopped.
[DEBUG] Server listening on port 4222 started.
TIMEOUT - Retrying in 250ms (currently disconnected)
NOT CONNECTED - Retrying in 250ms
OK - Got message
[DEBUG] Server listening on 4222 will stop.
[DEBUG] Server listening on 4222 was stopped.
[DEBUG] Server listening on port 4222 started.
TIMEOUT - Retrying in 250ms (currently disconnected)
NOT CONNECTED - Retrying in 250ms
NOT CONNECTED - Retrying in 250ms
OK - Got message
[DEBUG] Server listening on 4222 will stop.
[DEBUG] Server listening on 4222 was stopped.

Second edit

I'm pretty sure that the request for next message is not published due to an error encountered when flushing (broken pipe). When an error occurs within the flusher task, pending buffer is lost. This has a much broader impact that just pull subscriptions, but if we change the _flusher method to something like that:

              try:
                  if self._pending_data_size > 0:
-                     self.transport.writlines(self._pending[:])
+                    pending = self._pending[:]
                      self._pending = []
                      self._pending_data_size = 0
+                    self._transport.writelines(pending)
-                     await self._transport.drain()
+                    try:
+                        await self._transport.drain()
+                    except BaseException:
+                        if pending:
+                            self._pending = pending + self._pending
+                            self._pending_data_size += len(pending)
+                        raise
            except OSError as e:
                if pending:
                    self._pending_data_size += len(pending)
                    self._pending = pending + self._pending

then messages are indeed published on reconnect and test passes , however, the asyncio.StreamWriter documentation (the transport used in the code samples in the issue) states that:

This is a flow control method that interacts with the underlying IO write buffer. When the size of the buffer reaches the high watermark, drain() blocks until the size of the buffer is drained down to the low watermark and writing can be resumed. When there is nothing to wait for, the drain() returns immediately.

I don't really understand what this means 😅 but I take it as "caller don't know how much of pending data has been written" so I guess the code above may send messages twice if an error occured after some data has been effectively written but before drain completes ? In the end, it seems that users should always consider the possibility that publish is never received by the server.

Regarding the specific case of PullSubscription.fetch(), it seems that it's up to users to make sure that they are using small timeouts with retries instead of None because it will always be possible that message is considered as published by client, but never received by the server.

On nats-py side, it may be worth changing the PullSubscription.fetch() signature to not accept None as a timeout value, because it will wait forever if request failed to be published ?

@tkeller-moxe
Copy link

tkeller-moxe commented Dec 11, 2023

I can second this bug, it hit is un production when we last did an update.
We managed to catch it quick and reboot but I would expect nats-py to properly handle server reboots!

Some notes:
Latest Nats-py version
Nats server 2.9.24.
On my end this is possible effecting pull subscribe and Request Reply/standard subscriptions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

No branches or pull requests

3 participants