Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client configured with deflate=true loses connection on Tornado 6 #258

Open
ayamnikovx opened this issue Oct 17, 2021 · 3 comments
Open
Labels

Comments

@ayamnikovx
Copy link

Versions

  • nsqd v1.2.1 (built w/go1.16.7)
  • Python 3.9.7
  • pynsq 0.9.0
  • tornado 6.1

Issue

Reader constantly loses connection with nsqd on large payloads when running on tornado 6 with enabled deflate option. This also happens to Writer but quite rarely. I believe there is a bug somewhere in DeflateSocket.
The issue isn't reproducible with tornado 5 or with disabled deflate.

Steps to reproduce

Running nsqd:

docker run --name lookupd -d -p 4160:4160 -p 4161:4161 nsqio/nsq /nsqlookupd
docker run --name nsqd -p 4150:4150 -p 4151:4151 nsqio/nsq /nsqd --broadcast-address=172.17.0.2 --l
ookupd-tcp-address=172.17.0.2:4160

Preparing virtual environment:

virtualenv -p python3.9 venv
. venv/bin/activate
pip install pynsq==0.9.0 tornado==6.1.0

Running the program (sometimes you should restart the script a few times to run into the issue):

python main.py
# Output:
Received 1 messages
ERROR:nsq.client:[127.0.0.1:4150:test:test] ERROR: ConnectionClosedError('Stream is closed')
WARNING:nsq.reader:[127.0.0.1:4150:test:test] connection closed

main.py:

from nsq import Reader, Writer
import tornado.ioloop
from tornado import gen
import json


# crafting some non-trivial payload
DATA = json.dumps({str(x): x for x in range(10000)}).encode()
print(f'DATA size: {len(DATA)}')
NSQD_HOST = '127.0.0.1'
NSQD_PORT = 4150
USE_DEFLATE = True


@gen.coroutine
def producer():
    writer = Writer(
        nsqd_tcp_addresses=[f'{NSQD_HOST}:{NSQD_PORT}'],
        deflate=USE_DEFLATE,
    )

    # waiting for connection
    while not writer.conns:
        yield gen.sleep(.1)

    # filling the queue
    for _ in range(10):
        writer.pub('test', DATA)

    cnt = 0
    while True:
        writer.pub('test', DATA)
        cnt += 1
        print(f'Sent {cnt} messages')
        yield gen.sleep(1)


@gen.coroutine
def consumer():
    # waiting for the queue to fill
    yield gen.sleep(1)

    cnt = 0

    def handler(msg):
        msg.finish()
        nonlocal cnt
        cnt += 1
        print(f'Received {cnt} messages')

    _ = Reader(
        topic='test',
        channel='test',
        message_handler=handler,
        nsqd_tcp_addresses=[f'{NSQD_HOST}:{NSQD_PORT}'],
        deflate=USE_DEFLATE,
    )

    while True:
        yield gen.sleep(1)


if __name__ == '__main__':
    loop = tornado.ioloop.IOLoop.current()
    loop.add_callback(producer)
    loop.add_callback(consumer)
    try:
        loop.start()
    except KeyboardInterrupt:
        pass
@ploxiln ploxiln added the bug label Oct 19, 2021
@mreiferson
Copy link
Member

I can confirm that this does indeed fail as described.

@ploxiln
Copy link
Member

ploxiln commented Oct 24, 2021

This may be because the DeflateSocket recv_into() method is a bit hacky, and may stash additional data for the next read, while the raw underlying socket has been fully read and does not indicate readyness for the ioloop.

https://github.com/nsqio/pynsq/pull/232/files#diff-f72976af2d2f01c6df1f82088ee1ef75b85384a2da757593fd505bdc25306758R27-R36

@gospider007
Copy link

The problem remains

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants