Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pub/Sub 0.32.0 | Exception "google.api_core.exceptions.Unknown: None Stream removed" #4989

Closed
PicardParis opened this issue Mar 5, 2018 · 14 comments
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects

Comments

@PicardParis
Copy link

  • Context

    google-cloud-pubsub (0.32.0)
    Python 2.7.13
    Win10
  • Subscriber console | Fails w/ exception after a random time (from 15 min to 3 h)

    > python pubsub_subscribe.py
    Subscribed to messages on projects/PROJECT_ID/subscriptions/TOPIC
    Waiting for future...
    Messages received: 0 + 300
    Messages received: 300 + 3000
    Traceback (most recent call last):
      File ".\pubsub_subscribe.py", line 58, in <module>
        subscription.future.result()
      File "C:\Python27\lib\site-packages\google\cloud\pubsub_v1\futures.py", line 114, in result
        raise err
    google.api_core.exceptions.Unknown: None Stream removed
  • Publisher console | Completes as expected

    > python pubsub_publish.py
    Messages sent: 0 + 300
    Messages sent: 300 + 3000
    Messages sent: 3300 + 30000
    Messages sent: 33300 + 30
    Messages sent: 33330 + 5
    Messages sent: 33335 + 3
    Messages sent: 33338 + 2
    Messages sent: 33340 + 30000
    Messages sent: 63340 + 3000
    Messages sent: 66340 + 300
    Total messages sent: 66640
  • Initialization

    • GCP project GOOGLE_CLOUD_PROJECT
    • Pub/Sub API enabled
    • Existing topic PUBSUB_TOPIC
    • Environment variables GOOGLE_CLOUD_PROJECT & PUBSUB_TOPIC
  • Subscriber sample code <pubsub_subscribe.py>

    import os
    from socket import gethostname
    from time import sleep
    from google.cloud import pubsub
    from google.api_core.exceptions import AlreadyExists
    
    PROJECT_ID = os.getenv('GOOGLE_CLOUD_PROJECT')
    TOPIC = os.getenv('PUBSUB_TOPIC')
    
    def on_pubsub_message(msg):
        # These cases are not handled:
        # - A message may be received more than once
        # - A message may be received out of order
        cmd = msg.data
        msg.ack()
        # print('.. msg: %s' % cmd)
    
        global msg_received_total
        global msg_received_loop
        global subscription
    
        if cmd == 'stop':
            print("Received 'stop'")
            msg_received_total += msg_received_loop
            subscription.future.set_result(True)
            return
    
        msg_received_loop += 1
        if cmd == 'loop_end':
            print('Messages received: %d + %d' % (
                msg_received_total, msg_received_loop))
            msg_received_total += msg_received_loop
            msg_received_loop = 0
    
    msg_received_total = 0
    msg_received_loop = 0
    
    subscriber = pubsub.SubscriberClient()
    topic_path = subscriber.topic_path(PROJECT_ID, TOPIC)
    subsc_path = subscriber.subscription_path(PROJECT_ID, gethostname())
    
    try:
        subscriber.create_subscription(subsc_path, topic_path)
    except AlreadyExists:
        pass
    
    subscription = subscriber.subscribe(subsc_path, on_pubsub_message)
    print('Subscribed to messages on %s' % subsc_path)
    
    print('Waiting for future...')
    subscription.future.result()
    
    print('Total messages received: %d' % msg_received_total)
  • Publisher sample code <pubsub_publish.py>

    import os
    from time import sleep
    from google.cloud import pubsub
    
    PROJECT_ID = os.getenv('GOOGLE_CLOUD_PROJECT')
    TOPIC = os.getenv('PUBSUB_TOPIC')
    
    PARAMS = [
        # (number_of_messages, seconds_after_each_message)
        (300, 1),  # 5 min
        (3000, 0.1),  # 5 min
        (30000, 0.01),  # 5 min
        (30, 10),  # 5 min
        (5, 60),  # 5 min
        (3, 600),  # 30 min
        (2, 3600),  # 120 min
        (30000, 0.01),  # 5 min
        (3000, 0.1),  # 5 min
        (300, 1),  # 5 min
    ]
    
    publisher = pubsub.PublisherClient()
    topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
    
    msg_sent = 0
    for messages, delay in PARAMS:
        for i in xrange(1, messages):
            publisher.publish(topic_path, 'msg#{}'.format(i).encode('utf-8'))
            sleep(delay)
        # Keep last message to send a 'loop_end' command
        sleep(1)
        publisher.publish(topic_path, b'loop_end')
        sleep(1)
        print('Messages sent: %d + %d' % (msg_sent, messages))
        msg_sent += messages
    
    # publisher.publish(topic_path, b'stop')
    print('Total messages sent: %d' % msg_sent)
@theacodes
Copy link
Contributor

@PicardParis is this on Raspberry Pi?

@theacodes theacodes self-assigned this Mar 5, 2018
@theacodes theacodes added type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. api: pubsub Issues related to the Pub/Sub API. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. labels Mar 5, 2018
@PicardParis
Copy link
Author

With version 0.32.0, for the time being, I have only produced this exact issue on Win10 but previously had the same issues on both envs. I'm currently updating a Raspberry Pi and will run the same subscriber code...

@jeanbza
Copy link
Member

jeanbza commented Mar 6, 2018

Hi @PicardParis. Could you provide your pip list?

@PicardParis
Copy link
Author

I haven't been getting consistent results between Windows & Raspbian (didn't get the exception so far) and didn't keep the original environments. Here is the log extract I kept on Windows:

> pip install --upgrade google-cloud-pubsub
...
Successfully installed google-cloud-pubsub-0.32.0 psutil-5.4.3

Right now, on the Pi, if I query outdated modules, I get this

> pip list -o --format=columns
Package         Version Latest Type
--------------- ------- ------ -----
google-api-core 0.1.4   1.0.0  wheel

Might it be the issue?

I'm changing the sample code to detect potentially missed msgs and will run new tests.

@jeanbza
Copy link
Member

jeanbza commented Mar 7, 2018

Could you post the version of grpc you're using here? Thanks for helping debug!

@jeanbza
Copy link
Member

jeanbza commented Mar 7, 2018

Also just to confirm, you're getting errors on the raspberry pi but not on windows?

@chemelnucfin chemelnucfin added this to To Do in P1 Bugs Mar 7, 2018
@PicardParis
Copy link
Author

grpc info

grpc-google-iam-v1 (0.11.4)
grpcio (1.10.0)

Tests so far

Date Publisher Suscriber Suscriber Power Suscriber Network Result Repro#
2018-03-05 Win10 =Publisher AC Ethernet google.api_core.exceptions x2
2018-03-07 Win10 Raspbian AC Wi-Fi no exception but doubt on # msgs received x1
2018-03-07 Win10 Raspbian AC Ethernet no exception but doubt on # msgs received x1
2018-03-07 Win10 =Publisher Battery Wi-Fi OK x1

Tests are not consistent. No conclusion fttb.

@jeanbza
Copy link
Member

jeanbza commented Mar 7, 2018

Thanks. I'll see if I can snag a windows laptop and repro. Let me know if you find out anything else - and again, thanks for helping debug.

@PicardParis
Copy link
Author

PicardParis commented Mar 7, 2018

Update

  • Could repro the issue with a cleaner/shorter test code (see below)

  • Issue reproduced randomly (some tests OK, some others fails) with

    • subscriber on Windows
    • publisher on Windows laptop, or
    • publisher on Raspberry too
  • Issue not reproduced with subscriber on Raspberry

  • Issue might be related to the network, will try on different network

  • Subscriber sample code <pubsub_subscribe.py>

    import os
    import re
    import threading
    from socket import gethostname
    from time import sleep
    from google.cloud import pubsub
    from google.api_core.exceptions import AlreadyExists
    
    PROJECT_ID = os.getenv('GOOGLE_CLOUD_PROJECT')
    TOPIC = os.getenv('PUBSUB_TOPIC')
    MESSAGE_MAX = 20000
    
    def on_pubsub_message(msg):
        # Messages may be received (1) more than once, (2) out of order
        cmd = msg.data
        msg.ack()
    
        with thread_lock:
            global messages
            global subscription
    
            match = re.match(b'^(msg)([0-9]+)', cmd)
            if match:
                message_index = int(match.group(2))
                messages[message_index] += 1
                return
            match = re.match(b'^(status)([0-9]+)', cmd)
            if match:
                message_count = int(match.group(2))
                print_status(message_count)
                return
            if cmd == 'stop':
                print('Stopping...')
                subscription.future.set_result(True)
                return
    
    def print_status(message_count):
        global messages
        stats = [0, 0, 0]
        for i in xrange(message_count):
            cnt = messages[i]
            stats[min(cnt, 2)] += 1
        print('Stats: Rcvd1[%d] Rcvd0[%d] RcvdN[%d]' %
              (stats[1], stats[0], stats[2]))
    
    subscriber = pubsub.SubscriberClient()
    topic_path = subscriber.topic_path(PROJECT_ID, TOPIC)
    subsc_path = subscriber.subscription_path(PROJECT_ID, gethostname())
    try:
        subscriber.create_subscription(subsc_path, topic_path)
    except AlreadyExists:
        pass
    
    thread_lock = threading.Lock()
    messages = bytearray(MESSAGE_MAX)
    subscription = subscriber.subscribe(subsc_path, on_pubsub_message)
    print('Subscribed to messages on %s' % subsc_path)
    
    subscription.future.result()
    sleep(1)
  • Subscriber succeeding

    > python pubsub_subscribe.py
    Subscribed to messages on projects/...
    Stats: Rcvd1[100] Rcvd0[0] RcvdN[0]
    Stats: Rcvd1[1100] Rcvd0[0] RcvdN[0]
    Stats: Rcvd1[10100] Rcvd0[0] RcvdN[0]
    Stats: Rcvd1[10150] Rcvd0[0] RcvdN[0]
    Stats: Rcvd1[10550] Rcvd0[0] RcvdN[0]
    Stats: Rcvd1[15000] Rcvd0[0] RcvdN[0]
    Stopping...
  • Subscriber failing

    > python pubsub_subscribe.py
    Subscribed to messages on projects/...
    Stats: Rcvd1[100] Rcvd0[0] RcvdN[0]
    Stats: Rcvd1[1100] Rcvd0[0] RcvdN[0]
    Stats: Rcvd1[10100] Rcvd0[0] RcvdN[0]
    Traceback (most recent call last):
      File "pubsub_subscribe.py", line 68, in <module>
        subscription.future.result()
      File "C:\Python27\lib\site-packages\google\cloud\pubsub_v1\futures.py", line 114, in result
        raise err
    google.api_core.exceptions.Unknown: None Stream removed
  • Publisher sample code <pubsub_publish.py>

    import os
    from time import sleep
    from google.cloud import pubsub
    
    PROJECT_ID = os.getenv('GOOGLE_CLOUD_PROJECT')
    TOPIC = os.getenv('PUBSUB_TOPIC')
    PARAMS = [
        # (number_of_messages, seconds_after_each_message)
        (100, 1),  # 1.7 min
        (1000, 0.1),  # 1.7 min
        (9000, 0.01),  # 1.5 min
        (50, 10),  # 8.3 min
        (400, 1),  # 6.7 min
        (4450, 0.1),  # 7.4 min
        # Total (15000, 27.3 min)
    ]
    
    publisher = pubsub.PublisherClient()
    topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
    
    msg_idx = 0
    for messages, delay in PARAMS:
        for _ in xrange(messages):
            publisher.publish(topic_path, 'msg{}'.format(msg_idx).encode())
            msg_idx += 1
            sleep(delay)
        sleep(1)
        print('Messages sent: %d' % (msg_idx))
        publisher.publish(topic_path, 'status{}'.format(msg_idx).encode())
        sleep(1)
    
    sleep(1)
    publisher.publish(topic_path, b'stop')
  • Publisher log

    > python pubsub_publish.py
    Messages sent: 100
    Messages sent: 1100
    Messages sent: 10100
    Messages sent: 10150
    Messages sent: 10550
    Messages sent: 15000

@jeanbza
Copy link
Member

jeanbza commented Mar 9, 2018

Hi @PicardParis. Thanks for this repro, much appreciated. I wasn't able to get a windows laptop yesterday, and am out of the office today. However! We have two raspberry pi's rush ordered and should have a windows laptop on monday, so hopefully I can start digging into this soon. Sorry for the delay, hope we can figure out what's going on soon!

@jeanbza
Copy link
Member

jeanbza commented Mar 9, 2018

In the meantime @PicardParis , if you could try running your app with GRPC_VERBOSITY=debug and GRPC_TRACE=all, I'd really love to see a log of what's happening (gist.github.com or whatever works).

@PicardParis
Copy link
Author

The error could be reproduced with traces. See gist.

@theacodes
Copy link
Contributor

Okay, after some discussion the proper thing for us to do here is retry on UNKNOWN status.

@patrickbeekman
Copy link

patrickbeekman commented Apr 21, 2020

I was seeing the same error messages, but it was caused by a different bug with the latest google-api-core==1.17.0. This issue was resolved for me by downgrading to 1.16.0. googleapis/python-api-core#25 For people in the future I hope this is helpful.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
No open projects
P1 Bugs
  
Done
Development

No branches or pull requests

4 participants