Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

google-cloud-pubsub SDK message pull is not working with SOCKS5 proxy #943

Open
rkumarkhaniya-crest opened this issue Jun 30, 2023 · 3 comments
Assignees
Labels
api: pubsub Issues related to the googleapis/python-pubsub API. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.

Comments

@rkumarkhaniya-crest
Copy link

We are trying to build a pubsub integration to pull messages with proxy support using Pub/Sub SDK. It works fine with HTTP_PROXY but does not work with SOCKS5 proxy without internet connectivity.

Environment details

  • OS type and version: Ubuntu 20.04.6 LTS (Focal Fossa)
  • Python version: 3.8
  • pip version: 3.8
  • google-cloud-pubsub version: 2.17.1

Steps to reproduce

Use the mentioned code example to check the issue, but it is not working when the VM internet is off. The script works with HTTP_PROXY.

  1. Create virtualenv
  2. pip install google-cloud-pubsub
  3. turn-off internet
  4. Execute the script

Code example

import os
import json

from google.oauth2 import service_account
from google.api_core import retry
from google.cloud import pubsub_v1

import os

NO_PROXY = "localhost,127.0.0.1,0.0.0.0,localaddress"

# TODO
# require to create creds.json file on the same path with Google Service credentials JSON details
proxy_uri = "socks5h://<host/ip>:<port>"
# proxy_uri = "http://<host_ip>:<port>"
project_id = "<project_name>"
subscription_id = "<pubsub_subscription_name>"



os.environ["no_proxy"] = NO_PROXY
os.environ["NO_PROXY"] = NO_PROXY
os.environ["http_proxy"] = proxy_uri
os.environ["HTTP_PROXY"] = proxy_uri 
os.environ["https_proxy"] = proxy_uri 
os.environ["HTTPS_PROXY"] = proxy_uri 

def get_credentials():
    dir_name = os.path.dirname(os.path.abspath(__file__))
    creds_path = f"{dir_name}/creds.json"        
    credentials = service_account.Credentials.from_service_account_file(creds_path)
    return credentials

def synchronous_pull(project_id, subscription_id):
    subscriber = pubsub_v1.SubscriberClient(credentials=get_credentials())
    subscription_path = subscriber.subscription_path(project_id, subscription_id)
        
    NUM_MESSAGES = 10
    
    with subscriber:                
        while(True):            
            print("Started receiving message...")            
            response = subscriber.pull(
                request={"subscription": subscription_path, "max_messages": NUM_MESSAGES},
                retry=retry.Retry(deadline=300),
            )            

            if len(response.received_messages) == 0:
                print(f"No messages to process.")
                break

            ack_ids = []
            for received_message in response.received_messages:                
                print(f"Message: {received_message.message}")                
                ack_ids.append(received_message.ack_id)                        
                                                
            subscriber.acknowledge(
                request={"subscription": subscription_path, "ack_ids": ack_ids}
            )

            print(
                f"Received and acknowledged {len(response.received_messages)} messages from {subscription_path}."
            )        
        
if __name__ == "__main__":    
    synchronous_pull(project_id, subscription_id)

Stack trace

E0630 08:03:56.145507323 2560525 http_proxy.cc:119]                    'socks5h' scheme not supported in proxy URI

We tried to find the documents about the proxies supported by this SDK but had no luck.
Thanks!

@product-auto-label product-auto-label bot added the api: pubsub Issues related to the googleapis/python-pubsub API. label Jun 30, 2023
@acocuzzo acocuzzo added type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. type: question Request for information or clarification. Not an issue. and removed type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. labels Jul 6, 2023
@acocuzzo
Copy link
Contributor

acocuzzo commented Jul 7, 2023

Can you try setting passing an httplib2.http object with the proxy_info parameter as decribed here in this similar issue? googleapis/google-api-python-client#569 (comment)

@acocuzzo acocuzzo assigned acocuzzo and liuyunnnn and unassigned acocuzzo Jul 7, 2023
@rkumarkhaniya-crest
Copy link
Author

rkumarkhaniya-crest commented Jul 10, 2023

@acocuzzo I tried following script but not working.

  • Credentials object returned by "service_account.Credentials.from_service_account_file(creds_path)" does not have authorize method.
    Error: AttributeError: 'Credentials' object has no attribute 'authorize'

  • "pubsub_v1.SubscriberClient" does not have http attribute. Error: TypeError: init() got an unexpected keyword argument 'http'

import os
import json
import httplib2

from google.oauth2 import service_account
from google.api_core import retry
from google.cloud import pubsub_v1

proxy_host = 'proxy-ip-host'
proxy_port = 'proxy-port'
proxy_user = None
proxy_pass = None

project_id = "<project_name>"
subscription_id = "<pubsub_subscription_name>"

def get_credentials():
    dir_name = os.path.dirname(os.path.abspath(__file__))
    creds_path = f"{dir_name}/creds.json"        
    credentials = service_account.Credentials.from_service_account_file(creds_path)
    return credentials

def synchronous_pull(project_id, subscription_id):
    proxy_info = httplib2.ProxyInfo(
        proxy_type=httplib2.socks.PROXY_TYPE_SOCKS5,
        proxy_host=proxy_host,
        proxy_port=proxy_port,
        proxy_user=proxy_user,
        proxy_pass=proxy_pass
    )    
    http_client = httplib2.Http(proxy_info=proxy_info)
    
    credentials = get_credentials()
    http_client = credentials.authorize(http_client)

    subscriber = pubsub_v1.SubscriberClient(credentials=credentials, http=http_client)
    subscription_path = subscriber.subscription_path(project_id, subscription_id)
        
    NUM_MESSAGES = 10
    
    with subscriber:                
        while(True):            
            print("Started receiving message...")            
            response = subscriber.pull(
                request={"subscription": subscription_path, "max_messages": NUM_MESSAGES},
                retry=retry.Retry(deadline=300),
            )            

            if len(response.received_messages) == 0:
                print(f"No messages to process.")
                break

            ack_ids = []
            for received_message in response.received_messages:                
                print(f"Message: {received_message.message}")                
                ack_ids.append(received_message.ack_id)                        
                                                
            subscriber.acknowledge(
                request={"subscription": subscription_path, "ack_ids": ack_ids}
            )

            print(
                f"Received and acknowledged {len(response.received_messages)} messages from {subscription_path}."
            )        
        
if __name__ == "__main__":    
    synchronous_pull(project_id, subscription_id)

@acocuzzo
Copy link
Contributor

acocuzzo commented Jul 13, 2023

If you are trying the method used in: googleapis/google-api-python-client#569 (comment), I believe the use of "authorize" method requires like the use of the oauth2client.service_account package
https://oauth2client.readthedocs.io/en/latest/source/oauth2client.service_account.html

However I don't think that credentials object can be passed to the normal SubscriberClient (imported via "from google.cloud
import pubsub_v1". I'm going to set this issue to a FR to allow passing the http_proxy argument in this client.

As a mitigation, one option would be to use the underlying auto-generated client which can be imported via "from google import pubsub_v1", and used like so:

from google import pubsub_v1

def sample_pull():
    # Create a client
    client = pubsub_v1.SubscriberClient()

    # Initialize request argument(s)
    request = pubsub_v1.PullRequest(
        subscription="subscription_value",
        max_messages=1277,
    )

    # Make the request
    response = client.pull(request=request)

    # Handle the response
    print(response)

Specifically you can pass this SubscriberClient, a transport of type Optional[Union[str, SubscriberTransport]]

The transport parameter can be set to a SubscriberGrpcTransport, which takes a "channel" parameter, which is a grpc.Channel

    transports.SubscriberGrpcTransport(
            channel: Optional[grpc.Channel])

Please note that the "credentials" parameter is ignore when the "channel" argument is present.

This grpc.Channel object can be constructed with the SubscriberGrpcTransport.create_channel method, which accepts a "credentials" argument as well as the the keyword argument "options".

Options is an optional list of key-value pairs (channel_arguments in gRPC Core runtime) to configure the channel.

The "grpc.http_proxy" key can be include in this list of options, please see all options are described here:
https://github.com/grpc/grpc/blob/v1.56.x/include/grpc/impl/grpc_types.h#L444C46-L444C46

@acocuzzo acocuzzo added type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design. and removed type: question Request for information or clarification. Not an issue. labels Jul 13, 2023
@liuyunnnn liuyunnnn assigned pradn and unassigned liuyunnnn Oct 24, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/python-pubsub API. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.
Projects
None yet
Development

No branches or pull requests

5 participants