Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for long-running operations with rest transport. #1094

Merged
merged 8 commits into from Nov 25, 2021
30 changes: 29 additions & 1 deletion gapic/schema/api.py
Expand Up @@ -27,11 +27,14 @@
from types import MappingProxyType

from google.api_core import exceptions
from google.api import http_pb2 # type: ignore
from google.api import resource_pb2 # type: ignore
from google.api import service_pb2 # type: ignore
from google.gapic.metadata import gapic_metadata_pb2 # type: ignore
from google.longrunning import operations_pb2 # type: ignore
from google.protobuf import descriptor_pb2
from google.protobuf.json_format import MessageToJson
from google.protobuf.json_format import ParseDict

import grpc # type: ignore

Expand Down Expand Up @@ -226,6 +229,7 @@ class API:
"""
naming: api_naming.Naming
all_protos: Mapping[str, Proto]
service_yaml_config: service_pb2.Service
subpackage_view: Tuple[str, ...] = dataclasses.field(default_factory=tuple)

@classmethod
Expand Down Expand Up @@ -318,8 +322,14 @@ def disambiguate_keyword_fname(
for name, proto in pre_protos.items()
}

# Parse the google.api.Service proto from the service_yaml data.
service_yaml_config = service_pb2.Service()
ParseDict(opts.service_yaml_config, service_yaml_config)

# Done; return the API.
return cls(naming=naming, all_protos=protos)
return cls(naming=naming,
all_protos=protos,
service_yaml_config=service_yaml_config)

@cached_property
def enums(self) -> Mapping[str, wrappers.EnumType]:
Expand Down Expand Up @@ -374,6 +384,24 @@ def services(self) -> Mapping[str, wrappers.Service]:
*[p.services for p in self.protos.values()],
)

@cached_property
def http_options(self) -> Mapping[str, Sequence[wrappers.HttpRule]]:
"""Return a map of API-wide http rules."""

def make_http_options(rule: http_pb2.HttpRule
) -> Sequence[wrappers.HttpRule]:
http_options = [rule] + list(rule.additional_bindings)
opt_gen = (wrappers.HttpRule.try_parse_http_rule(http_rule)
for http_rule in http_options)
return [rule for rule in opt_gen if rule]

result: Mapping[str, Sequence[http_pb2.HttpRule]] = {
rule.selector: make_http_options(rule)
for rule in self.service_yaml_config.http.rules
}

return result

@cached_property
def subpackages(self) -> Mapping[str, 'API']:
"""Return a map of all subpackages, if any.
Expand Down
Expand Up @@ -10,6 +10,7 @@ from google.api_core import path_template
from google.api_core import gapic_v1
{% if service.has_lro %}
from google.api_core import operations_v1
from google.protobuf import json_format
{% endif %}
from requests import __version__ as requests_version
from typing import Callable, Dict, Optional, Sequence, Tuple, Union
Expand All @@ -25,10 +26,6 @@ except AttributeError: # pragma: NO COVER
{% block content %}


{% if service.has_lro %}
{% endif %}


{# TODO(yon-mg): re-add python_import/ python_modules from removed diff/current grpc template code #}
{% filter sort_lines %}
{% for method in service.methods.values() %}
Expand Down Expand Up @@ -134,31 +131,41 @@ class {{service.name}}RestTransport({{service.name}}Transport):
This property caches on the instance; repeated calls return the same
client.
"""
# Sanity check: Only create a new client if we do not already have one.
# Only create a new client if we do not already have one.
if self._operations_client is None:
from google.api_core import grpc_helpers

self._operations_client = operations_v1.OperationsClient(
grpc_helpers.create_channel(
self._host,
http_options = {
{% for selector, rules in api.http_options.items() %}
{% if selector.startswith('google.longrunning.Operations') %}
'{{ selector }}': [
{% for rule in rules %}
{
'method': '{{ rule.method }}',
'uri': '{{ rule.uri }}',
{% if rule.body %}
'body': '{{ rule.body }}',
{% endif %}
},
{% endfor %}{# rules #}
],
{% endif %}{# longrunning.Operations #}
{% endfor %}{# http_options #}
}

rest_transport = operations_v1.OperationsRestTransport(
host=self._host,
credentials=self._credentials,
default_scopes=cls.AUTH_SCOPES,
scopes=self._scopes,
default_host=cls.DEFAULT_HOST,
options=[
("grpc.max_send_message_length", -1),
("grpc.max_receive_message_length", -1),
],
)
)
http_options=http_options)

self._operations_client = operations_v1.AbstractOperationsClient(transport=rest_transport)

# Return the client from cache.
return self._operations_client


{% endif %}
{% endif %}{# service.has_lro #}
{% for method in service.methods.values() %}
{%- if method.http_options and not method.lro and not (method.server_streaming or method.client_streaming) %}
{%- if method.http_options and not (method.server_streaming or method.client_streaming) %}
def _{{method.name | snake_case}}(self,
request: {{method.input.ident}}, *,
retry: OptionalRetry=gapic_v1.method.DEFAULT,
Expand Down Expand Up @@ -279,11 +286,17 @@ class {{service.name}}RestTransport({{service.name}}Transport):
{% if not method.void %}

# Return the response
{% if method.lro %}
return_op = operations_pb2.Operation()
json_format.Parse(response.content, return_op, ignore_unknown_fields=True)
return return_op
{% else %}
return {{method.output.ident}}.from_json(
response.content,
ignore_unknown_fields=True
)
{% endif %}
{% endif %}
{% else %}

def _{{method.name | snake_case}}(self,
Expand All @@ -296,10 +309,6 @@ class {{service.name}}RestTransport({{service.name}}Transport):

raise RuntimeError(
"Cannot define a method without a valid 'google.api.http' annotation.")
{%- elif method.lro %}

raise NotImplementedError(
"LRO over REST is not yet defined for python client.")
{%- elif method.server_streaming or method.client_streaming %}

raise NotImplementedError(
Expand Down
Expand Up @@ -36,8 +36,10 @@ from google.api_core import grpc_helpers_async
from google.api_core import path_template
{% if service.has_lro %}
from google.api_core import future
from google.api_core import operation
from google.api_core import operations_v1
from google.longrunning import operations_pb2
from google.protobuf import json_format
{% endif %}
from google.api_core import gapic_v1
{% for method in service.methods.values() %}
Expand Down Expand Up @@ -1119,8 +1121,8 @@ def test_{{ method_name }}_raw_page_lro():

{% for method in service.methods.values() if 'rest' in opts.transport and
method.http_options %}{% with method_name = method.name|snake_case + "_unary" if method.operation_service else method.name|snake_case %}
{# TODO(kbandes): remove this if condition when lro and streaming are supported. #}
{% if not method.lro and not (method.server_streaming or method.client_streaming) %}
{# TODO(kbandes): remove this if condition when streaming is supported in rest. #}
{% if not (method.server_streaming or method.client_streaming) %}
def test_{{ method_name }}_rest(transport: str = 'rest', request_type={{ method.input.ident }}):
client = {{ service.client_name }}(
credentials=ga_credentials.AnonymousCredentials(),
Expand Down Expand Up @@ -1167,11 +1169,13 @@ def test_{{ method_name }}_rest(transport: str = 'rest', request_type={{ method.
# Wrap the value into a proper Response obj
response_value = Response()
response_value.status_code = 200
{% if method.void %}
{% if method.void %}
json_return_value = ''
{% else %}
{% elif method.lro %}
json_return_value = json_format.MessageToJson(return_value)
{% else %}
json_return_value = {{ method.output.ident }}.to_json(return_value)
{% endif %}
{% endif %}
response_value._content = json_return_value.encode('UTF-8')
req.return_value = response_value
{% if method.client_streaming %}
Expand All @@ -1188,6 +1192,8 @@ def test_{{ method_name }}_rest(transport: str = 'rest', request_type={{ method.
# Establish that the response is the type that we expect.
{% if method.void %}
assert response is None
{% elif method.lro %}
assert response.operation.name == "operations/spam"
{% else %}
assert isinstance(response, {{ method.client_output.ident }})
{% for field in method.output.fields.values() | rejectattr('message') %}
Expand Down Expand Up @@ -1264,11 +1270,13 @@ def test_{{ method_name }}_rest_flattened(transport: str = 'rest'):
# Wrap the value into a proper Response obj
response_value = Response()
response_value.status_code = 200
{% if method.void %}
{% if method.void %}
json_return_value = ''
{% else %}
{% elif method.lro %}
json_return_value = json_format.MessageToJson(return_value)
{% else %}
json_return_value = {{ method.output.ident }}.to_json(return_value)
{% endif %}
{% endif %}

response_value._content = json_return_value.encode('UTF-8')
req.return_value = response_value
Expand Down Expand Up @@ -1453,6 +1461,7 @@ def test_{{ method_name }}_rest_error():
client.{{ method_name }}({})

{%- endif %}

{% endif %}{% endwith %}{# method_name #}

{% endfor -%} {#- method in methods for rest #}
Expand Down
17 changes: 16 additions & 1 deletion gapic/utils/options.py
Expand Up @@ -20,6 +20,7 @@
import json
import os
import warnings
import yaml

from gapic.samplegen_utils import utils as samplegen_utils

Expand All @@ -45,6 +46,8 @@ class Options:
metadata: bool = False
# TODO(yon-mg): should there be an enum for transport type?
transport: List[str] = dataclasses.field(default_factory=lambda: [])
service_yaml_config: Dict[str, Any] = dataclasses.field(
default_factory=dict)

# Class constants
PYTHON_GAPIC_PREFIX: str = 'python-gapic-'
Expand All @@ -54,6 +57,7 @@ class Options:
'metadata', # generate GAPIC metadata JSON file
'old-naming', # TODO(dovs): Come up with a better comment
'retry-config', # takes a path
'service-yaml', # takes a path
'samples', # output dir
'autogen-snippets', # produce auto-generated snippets
# transport type(s) delineated by '+' (i.e. grpc, rest, custom.[something], etc?)
Expand Down Expand Up @@ -129,6 +133,16 @@ def tweak_path(p):
with open(retry_paths[-1]) as f:
retry_cfg = json.load(f)

service_yaml_config = {}
service_yaml_paths = opts.pop('service-yaml', None)
if service_yaml_paths:
# Just use the last file specified.
with open(service_yaml_paths[-1]) as f:
service_yaml_config = yaml.load(f, Loader=yaml.Loader)
# The yaml service files typically have this field,
# but it is not a field in the gogle.api.Service proto.
service_yaml_config.pop('type', None)

# Build the options instance.
sample_paths = opts.pop('samples', [])

Expand All @@ -150,7 +164,8 @@ def tweak_path(p):
add_iam_methods=bool(opts.pop('add-iam-methods', False)),
metadata=bool(opts.pop('metadata', False)),
# transport should include desired transports delimited by '+', e.g. transport='grpc+rest'
transport=opts.pop('transport', ['grpc'])[0].split('+')
transport=opts.pop('transport', ['grpc'])[0].split('+'),
service_yaml_config=service_yaml_config,
)

# Note: if we ever need to recursively check directories for sample
Expand Down
3 changes: 3 additions & 0 deletions rules_python_gapic/py_gapic.bzl
Expand Up @@ -21,6 +21,7 @@ def py_gapic_library(
plugin_args = None,
opt_args = None,
metadata = True,
service_yaml = None,
**kwargs):
# srcjar_target_name = "%s_srcjar" % name
srcjar_target_name = name
Expand All @@ -35,6 +36,8 @@ def py_gapic_library(
file_args = {}
if grpc_service_config:
file_args[grpc_service_config] = "retry-config"
if service_yaml:
file_args[service_yaml] = "service-yaml"

proto_custom_library(
name = srcjar_target_name,
Expand Down
Expand Up @@ -29,6 +29,7 @@
from google.api_core import gapic_v1
from google.api_core import grpc_helpers
from google.api_core import grpc_helpers_async
from google.api_core import operation
from google.api_core import operation_async # type: ignore
from google.api_core import operations_v1
from google.api_core import path_template
Expand All @@ -44,6 +45,7 @@
from google.oauth2 import service_account
from google.protobuf import duration_pb2 # type: ignore
from google.protobuf import field_mask_pb2 # type: ignore
from google.protobuf import json_format
from google.protobuf import timestamp_pb2 # type: ignore
from google.type import expr_pb2 # type: ignore
import google.auth
Expand Down
Expand Up @@ -29,6 +29,7 @@
from google.api_core import gapic_v1
from google.api_core import grpc_helpers
from google.api_core import grpc_helpers_async
from google.api_core import operation
from google.api_core import operation_async # type: ignore
from google.api_core import operations_v1
from google.api_core import path_template
Expand All @@ -42,6 +43,7 @@
from google.longrunning import operations_pb2
from google.oauth2 import service_account
from google.protobuf import field_mask_pb2 # type: ignore
from google.protobuf import json_format
from google.protobuf import timestamp_pb2 # type: ignore
import google.auth

Expand Down
13 changes: 11 additions & 2 deletions tests/unit/generator/test_generator.py
Expand Up @@ -19,6 +19,7 @@
import jinja2
import pytest

from google.api import service_pb2
from google.protobuf import descriptor_pb2
from google.protobuf.compiler.plugin_pb2 import CodeGeneratorResponse

Expand Down Expand Up @@ -767,9 +768,17 @@ def make_proto(
).proto


def make_api(*protos, naming: naming.Naming = None, **kwargs) -> api.API:
def make_api(
*protos,
naming: naming.Naming = None,
service_yaml_config: service_pb2.Service = None,
**kwargs
) -> api.API:
return api.API(
naming=naming or make_naming(), all_protos={i.name: i for i in protos}, **kwargs
naming=naming or make_naming(),
service_yaml_config=service_yaml_config or service_pb2.Service(),
all_protos={i.name: i for i in protos},
**kwargs
)


Expand Down