Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: work around segfault with >100 jobs in google life sciences backend #1451

Merged
merged 7 commits into from Mar 4, 2022
Merged
19 changes: 12 additions & 7 deletions snakemake/executors/google_lifesciences.py
Expand Up @@ -22,7 +22,8 @@
from snakemake.executors import ClusterExecutor, sleep
from snakemake.common import get_container_image, get_file_hash
from snakemake.resources import DefaultResources

import httplib2
import google_auth_httplib2
cademirch marked this conversation as resolved.
Show resolved Hide resolved
# https://github.com/googleapis/google-api-python-client/issues/299#issuecomment-343255309
logging.getLogger("googleapiclient.discovery_cache").setLevel(logging.ERROR)

Expand Down Expand Up @@ -148,6 +149,7 @@ def _get_services(self):
# Credentials must be exported to environment
try:
creds = GoogleCredentials.get_application_default()
self.creds = creds
except ApplicationDefaultCredentialsError as ex:
log_verbose_traceback(ex)
raise ex
Expand Down Expand Up @@ -885,7 +887,7 @@ def _job_was_successful(self, status):

return success

def _retry_request(self, request, timeout=2, attempts=3):
def _retry_request(self, request, http=None, timeout=2, attempts=3):
"""The Google Python API client frequently has BrokenPipe errors. This
function takes a request, and executes it up to number of retry,
each time with a 2* increase in timeout.
Expand All @@ -897,24 +899,27 @@ def _retry_request(self, request, timeout=2, attempts=3):
attempts: remaining attempts, throw error when hit 0
"""
import googleapiclient

import google.auth
credentials, project_id = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vsoch do we authenticate this way in the other places as well? Or can there maybe be an auth object for the entire life science executor object instead creating a special one here in the retry?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We go about it different ways, but I suspect under the hood since both are using the default application credentials (exported to the environment as GOOGLE_APPLICATION_CREDENTIALS) the methods are similar. We use the discovery API clients to authenticate clients that are attached to the entire class, e.g., here

def _get_services(self):
"""use the Google Discovery Build to generate API clients
for Life Sciences, and use the google storage python client
for storage.
"""
from googleapiclient.discovery import build as discovery_build
from oauth2client.client import (
GoogleCredentials,
ApplicationDefaultCredentialsError,
)
from google.cloud import storage
# Credentials must be exported to environment
try:
creds = GoogleCredentials.get_application_default()
except ApplicationDefaultCredentialsError as ex:
log_verbose_traceback(ex)
raise ex
# Discovery clients for Google Cloud Storage and Life Sciences API
self._storage_cli = discovery_build(
"storage", "v1", credentials=creds, cache_discovery=False
)
self._compute_cli = discovery_build(
"compute", "v1", credentials=creds, cache_discovery=False
)
self._api = discovery_build(
"lifesciences", "v2beta", credentials=creds, cache_discovery=False
)
self._bucket_service = storage.Client()
. And if you look into the source code, the googleapiclient discovery has that same "google.auth.default" function here which will call that same auth function via here. Notably build takes an http object here and given that we don't provide one and it's generated,I think it might be saved on the returned service objects here so I think what I would do is compare the object there (likely directly or its credentials) to the one created above. If they are comparable, arguably we could grab this object from one of the service objects generated from the discovery build API. The discovery build APIs are going to be different to have scopes for their respective endpoints, vs the one above which is all of google cloud. We could also try the reverse - creating one credential object and then seeing if we can pass to the services, although we'd want to double check how the scopes line up. And if neither of those ideas work, we could minimally have this call in the same function so they are in the same place!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The request object is generated ultimately from one of those services - either directly or as a result of doing like pipelines.run() so I'm curious why the request object isn't coming with its own http already (or maybe some of them are but not consistently?) E.g., pipelines.run() here is the one that was originally giving us trouble:

operation = pipelines.run(parent=self.location, body=body)
and maybe @moschetti knows what using these endpoints and generated from a discovery endpoint, e.g, here doesn't generate a request with a comparable http object that we are generating later in retry request?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my first attempt I tried passing the credentials created here:

creds = GoogleCredentials.get_application_default()

To google_auth_httplib2.AuthorizedHttp in _retry_request, but it did not like those credentials giving this error: AttributeError: '_JWTAccessCredentials' object has no attribute 'before_request' which led me to this issue, where I found to use google.auth for the credentials.

if http is None:
http = google_auth_httplib2.AuthorizedHttp(credentials=credentials, http=httplib2.Http())
cademirch marked this conversation as resolved.
Show resolved Hide resolved
try:
return request.execute()
return request.execute(http=http)
except BrokenPipeError as ex:
if attempts > 0:
time.sleep(timeout)
return self._retry_request(request, timeout * 2, attempts - 1)
return self._retry_request(request, http=http, timeout=timeout * 2, attempts=attempts - 1)
raise ex
except googleapiclient.errors.HttpError as ex:
if attempts > 0:
time.sleep(timeout)
return self._retry_request(request, timeout * 2, attempts - 1)
return self._retry_request(request, http=http, timeout=timeout * 2, attempts=attempts - 1)
log_verbose_traceback(ex)
raise ex
except Exception as ex:
if attempts > 0:
time.sleep(timeout)
return self._retry_request(request, timeout * 2, attempts - 1)
return self._retry_request(request, http=http, timeout=timeout * 2, attempts=attempts - 1)
log_verbose_traceback(ex)
raise ex

Expand Down