From 2a07883cf259db2dccaf286bb91bd3ed58740db6 Mon Sep 17 00:00:00 2001 From: Connor Adams Date: Mon, 29 Jun 2020 11:17:11 -0400 Subject: [PATCH 1/7] feat: add optional span creation with OpenTelemetry --- docs/index.rst | 1 + docs/opentelemetry-tracing.rst | 36 ++++ .../spanner_v1/_opentelemetry_tracing.py | 65 +++++++ google/cloud/spanner_v1/batch.py | 15 +- google/cloud/spanner_v1/session.py | 23 ++- google/cloud/spanner_v1/snapshot.py | 83 ++++++--- google/cloud/spanner_v1/transaction.py | 70 +++++--- noxfile.py | 15 +- tests/_helpers.py | 35 ++++ tests/system/test_system.py | 169 +++++++++++++++++- tests/unit/test__opentelemetry_tracing.py | 139 ++++++++++++++ tests/unit/test_batch.py | 27 ++- tests/unit/test_session.py | 83 ++++++++- tests/unit/test_snapshot.py | 143 ++++++++++++++- tests/unit/test_transaction.py | 66 ++++++- 15 files changed, 876 insertions(+), 94 deletions(-) create mode 100644 docs/opentelemetry-tracing.rst create mode 100644 google/cloud/spanner_v1/_opentelemetry_tracing.py create mode 100644 tests/_helpers.py create mode 100644 tests/unit/test__opentelemetry_tracing.py diff --git a/docs/index.rst b/docs/index.rst index 64c5c65c7f..cabf56157c 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -23,6 +23,7 @@ API Documentation api-reference advanced-session-pool-topics + opentelemetry-tracing Changelog --------- diff --git a/docs/opentelemetry-tracing.rst b/docs/opentelemetry-tracing.rst new file mode 100644 index 0000000000..f85f8c1833 --- /dev/null +++ b/docs/opentelemetry-tracing.rst @@ -0,0 +1,36 @@ +Tracing with OpenTelemetry +================================== +Python-spanner uses `OpenTelemetry `_ to automatically generates traces providing insight on calls to Cloud Spanner. +For information on the benefits and utility of tracing, see the `Cloud Trace docs `_. + +To take advantage of these traces, we first need to install opentelemetry: + +.. code-block:: sh + + pip install opentelemetry-api opentelemetry-sdk opentelemetry-instrumentation + +We also need to tell OpenTelemetry which exporter to use. For example, to export python-spanner traces to `Cloud Tracing `_, add the following lines to your application: + +.. code:: python + + from opentelemetry import trace + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.trace.sampling import ProbabilitySampler + from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter + # BatchExportSpanProcessor exports spans to Cloud Trace + # in a seperate thread to not block on the main thread + from opentelemetry.sdk.trace.export import BatchExportSpanProcessor + + # create and export one trace every 1000 requests + sampler = ProbabilitySampler(1/1000) + # Uses the default tracer provider + trace.set_tracer_provider(TracerProvider(sampler=sampler)) + trace.get_tracer_provider().add_span_processor( + # initialize the cloud tracing exporter + BatchExportSpanProcessor(CloudTraceSpanExporter()) + ) + +Generated spanner traces should now be available on `Cloud Trace `_. + +Tracing is most effective when many libraries are instrumented to provide insight over the entire lifespan of a request. +For a list of libraries that can be instrumented, see the `OpenTelemetry Integrations` section of the `OpenTelemetry Python docs `_ diff --git a/google/cloud/spanner_v1/_opentelemetry_tracing.py b/google/cloud/spanner_v1/_opentelemetry_tracing.py new file mode 100644 index 0000000000..5c1a487012 --- /dev/null +++ b/google/cloud/spanner_v1/_opentelemetry_tracing.py @@ -0,0 +1,65 @@ +# Copyright 2016 Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Manages OpenTelemetry trace creation and handling""" + +from contextlib import contextmanager + +from google.api_core.exceptions import GoogleAPICallError +from google.cloud.spanner_v1.gapic import spanner_client + +try: + from opentelemetry import trace + from opentelemetry.trace.status import Status, StatusCanonicalCode + from opentelemetry.instrumentation.utils import http_status_to_canonical_code + + HAS_OPENTELEMETRY_INSTALLED = True +except ImportError: + HAS_OPENTELEMETRY_INSTALLED = False + + +@contextmanager +def trace_call(name, session, extra_attributes=None): + if not HAS_OPENTELEMETRY_INSTALLED: + # empty context manager. users will have to check if the generated value is None or a span + yield None + return + + tracer = trace.get_tracer(__name__) + + # base attributes that we know for every trace created + attributes = { + "db.type": "spanner", + "db.url": spanner_client.SpannerClient.SERVICE_ADDRESS, + "db.instance": session._database.name, + "net.host.name": spanner_client.SpannerClient.SERVICE_ADDRESS, + } + + if extra_attributes: + attributes.update(extra_attributes) + + with tracer.start_as_current_span( + name, kind=trace.SpanKind.CLIENT, attributes=attributes + ) as span: + try: + yield span + except GoogleAPICallError as error: + if error.code is not None: + span.set_status(Status(http_status_to_canonical_code(error.code))) + elif error.grpc_status_code is not None: + span.set_status( + # OpenTelemetry's StatusCanonicalCode maps 1-1 with grpc status codes + Status(StatusCanonicalCode(error.grpc_status_code.value[0])) + ) + raise diff --git a/google/cloud/spanner_v1/batch.py b/google/cloud/spanner_v1/batch.py index e62763d7fd..7ab394b215 100644 --- a/google/cloud/spanner_v1/batch.py +++ b/google/cloud/spanner_v1/batch.py @@ -22,6 +22,7 @@ from google.cloud.spanner_v1._helpers import _SessionWrapper from google.cloud.spanner_v1._helpers import _make_list_value_pbs from google.cloud.spanner_v1._helpers import _metadata_with_prefix +from google.cloud.spanner_v1._opentelemetry_tracing import trace_call # pylint: enable=ungrouped-imports @@ -147,12 +148,14 @@ def commit(self): api = database.spanner_api metadata = _metadata_with_prefix(database.name) txn_options = TransactionOptions(read_write=TransactionOptions.ReadWrite()) - response = api.commit( - self._session.name, - mutations=self._mutations, - single_use_transaction=txn_options, - metadata=metadata, - ) + trace_attributes = {"num_mutations": len(self._mutations)} + with trace_call("CloudSpanner.Commit", self._session, trace_attributes): + response = api.commit( + self._session.name, + mutations=self._mutations, + single_use_transaction=txn_options, + metadata=metadata, + ) self.committed = _pb_timestamp_to_datetime(response.commit_timestamp) return self.committed diff --git a/google/cloud/spanner_v1/session.py b/google/cloud/spanner_v1/session.py index a84aaa7c6d..d05930389b 100644 --- a/google/cloud/spanner_v1/session.py +++ b/google/cloud/spanner_v1/session.py @@ -26,6 +26,7 @@ from google.cloud.spanner_v1.batch import Batch from google.cloud.spanner_v1.snapshot import Snapshot from google.cloud.spanner_v1.transaction import Transaction +from google.cloud.spanner_v1._opentelemetry_tracing import trace_call import random # pylint: enable=ungrouped-imports @@ -114,7 +115,11 @@ def create(self): kw = {} if self._labels: kw = {"session": {"labels": self._labels}} - session_pb = api.create_session(self._database.name, metadata=metadata, **kw) + + with trace_call("CloudSpanner.CreateSession", self, self._labels): + session_pb = api.create_session( + self._database.name, metadata=metadata, **kw + ) self._session_id = session_pb.name.split("/")[-1] def exists(self): @@ -130,10 +135,14 @@ def exists(self): return False api = self._database.spanner_api metadata = _metadata_with_prefix(self._database.name) - try: - api.get_session(self.name, metadata=metadata) - except NotFound: - return False + + with trace_call("CloudSpanner.GetSession", self) as span: + try: + api.get_session(self.name, metadata=metadata) + span.set_attribute("session_found", True) + except NotFound: + span.set_attribute("session_found", False) + return False return True @@ -150,8 +159,8 @@ def delete(self): raise ValueError("Session ID not set by back-end") api = self._database.spanner_api metadata = _metadata_with_prefix(self._database.name) - - api.delete_session(self.name, metadata=metadata) + with trace_call("CloudSpanner.DeleteSession", self): + api.delete_session(self.name, metadata=metadata) def ping(self): """Ping the session to keep it alive by executing "SELECT 1". diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index dcb6e32d88..56e3ebaedc 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -30,9 +30,10 @@ from google.cloud.spanner_v1._helpers import _SessionWrapper from google.cloud.spanner_v1.streamed import StreamedResultSet from google.cloud.spanner_v1.types import PartitionOptions +from google.cloud.spanner_v1._opentelemetry_tracing import trace_call -def _restart_on_unavailable(restart): +def _restart_on_unavailable(restart, trace_name=None, session=None, attributes=None): """Restart iteration after :exc:`.ServiceUnavailable`. :type restart: callable @@ -40,7 +41,11 @@ def _restart_on_unavailable(restart): """ resume_token = b"" item_buffer = [] - iterator = restart() + if trace_name and session: + with trace_call(trace_name, session, attributes): + iterator = restart() + else: + iterator = restart() while True: try: for item in iterator: @@ -50,7 +55,11 @@ def _restart_on_unavailable(restart): break except ServiceUnavailable: del item_buffer[:] - iterator = restart(resume_token=resume_token) + if trace_name and session: + with trace_call(trace_name, session, attributes): + iterator = restart(resume_token=resume_token) + else: + iterator = restart(resume_token=resume_token) continue if len(item_buffer) == 0: @@ -143,7 +152,10 @@ def read(self, table, columns, keyset, index="", limit=0, partition=None): metadata=metadata, ) - iterator = _restart_on_unavailable(restart) + trace_attributes = {"table_id": table, "columns": columns} + iterator = _restart_on_unavailable( + restart, "CloudSpanner.ReadOnlyTransaction", self._session, trace_attributes + ) self._read_request_count += 1 @@ -243,7 +255,13 @@ def execute_sql( timeout=timeout, ) - iterator = _restart_on_unavailable(restart) + trace_attributes = {"db.statement": sql} + iterator = _restart_on_unavailable( + restart, + "CloudSpanner.ReadWriteTransaction", + self._session, + trace_attributes, + ) self._read_request_count += 1 self._execute_sql_count += 1 @@ -309,16 +327,20 @@ def partition_read( partition_size_bytes=partition_size_bytes, max_partitions=max_partitions ) - response = api.partition_read( - session=self._session.name, - table=table, - columns=columns, - key_set=keyset._to_pb(), - transaction=transaction, - index=index, - partition_options=partition_options, - metadata=metadata, - ) + trace_attributes = {"table_id": table, "columns": columns} + with trace_call( + "CloudSpanner.PartitionReadOnlyTransaction", self._session, trace_attributes + ): + response = api.partition_read( + session=self._session.name, + table=table, + columns=columns, + key_set=keyset._to_pb(), + transaction=transaction, + index=index, + partition_options=partition_options, + metadata=metadata, + ) return [partition.partition_token for partition in response.partitions] @@ -385,15 +407,21 @@ def partition_query( partition_size_bytes=partition_size_bytes, max_partitions=max_partitions ) - response = api.partition_query( - session=self._session.name, - sql=sql, - transaction=transaction, - params=params_pb, - param_types=param_types, - partition_options=partition_options, - metadata=metadata, - ) + trace_attributes = {"db.statement": sql} + with trace_call( + "CloudSpanner.PartitionReadWriteTransaction", + self._session, + trace_attributes, + ): + response = api.partition_query( + session=self._session.name, + sql=sql, + transaction=transaction, + params=params_pb, + param_types=param_types, + partition_options=partition_options, + metadata=metadata, + ) return [partition.partition_token for partition in response.partitions] @@ -515,8 +543,9 @@ def begin(self): api = database.spanner_api metadata = _metadata_with_prefix(database.name) txn_selector = self._make_txn_selector() - response = api.begin_transaction( - self._session.name, txn_selector.begin, metadata=metadata - ) + with trace_call("CloudSpanner.BeginTransaction", self._session): + response = api.begin_transaction( + self._session.name, txn_selector.begin, metadata=metadata + ) self._transaction_id = response.id return self._transaction_id diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py index 3c1abc7326..bbf676f2ce 100644 --- a/google/cloud/spanner_v1/transaction.py +++ b/google/cloud/spanner_v1/transaction.py @@ -26,6 +26,7 @@ from google.cloud.spanner_v1.proto.transaction_pb2 import TransactionOptions from google.cloud.spanner_v1.snapshot import _SnapshotBase from google.cloud.spanner_v1.batch import _BatchBase +from google.cloud.spanner_v1._opentelemetry_tracing import trace_call class Transaction(_SnapshotBase, _BatchBase): @@ -95,9 +96,10 @@ def begin(self): api = database.spanner_api metadata = _metadata_with_prefix(database.name) txn_options = TransactionOptions(read_write=TransactionOptions.ReadWrite()) - response = api.begin_transaction( - self._session.name, txn_options, metadata=metadata - ) + with trace_call("CloudSpanner.BeginTransaction", self._session): + response = api.begin_transaction( + self._session.name, txn_options, metadata=metadata + ) self._transaction_id = response.id return self._transaction_id @@ -107,7 +109,8 @@ def rollback(self): database = self._session._database api = database.spanner_api metadata = _metadata_with_prefix(database.name) - api.rollback(self._session.name, self._transaction_id, metadata=metadata) + with trace_call("CloudSpanner.Rollback", self._session): + api.rollback(self._session.name, self._transaction_id, metadata=metadata) self.rolled_back = True del self._session._transaction @@ -123,12 +126,14 @@ def commit(self): database = self._session._database api = database.spanner_api metadata = _metadata_with_prefix(database.name) - response = api.commit( - self._session.name, - mutations=self._mutations, - transaction_id=self._transaction_id, - metadata=metadata, - ) + trace_attributes = {"num_mutations": len(self._mutations)} + with trace_call("CloudSpanner.Commit", self._session, trace_attributes): + response = api.commit( + self._session.name, + mutations=self._mutations, + transaction_id=self._transaction_id, + metadata=metadata, + ) self.committed = _pb_timestamp_to_datetime(response.commit_timestamp) del self._session._transaction return self.committed @@ -212,17 +217,21 @@ def execute_update( default_query_options = database._instance._client._query_options query_options = _merge_query_options(default_query_options, query_options) - response = api.execute_sql( - self._session.name, - dml, - transaction=transaction, - params=params_pb, - param_types=param_types, - query_mode=query_mode, - query_options=query_options, - seqno=seqno, - metadata=metadata, - ) + trace_attributes = {"db.statement": dml} + with trace_call( + "CloudSpanner.ReadWriteTransaction", self._session, trace_attributes + ): + response = api.execute_sql( + self._session.name, + dml, + transaction=transaction, + params=params_pb, + param_types=param_types, + query_mode=query_mode, + query_options=query_options, + seqno=seqno, + metadata=metadata, + ) return response.stats.row_count_exact def batch_update(self, statements): @@ -268,13 +277,18 @@ def batch_update(self, statements): self._execute_sql_count + 1, ) - response = api.execute_batch_dml( - session=self._session.name, - transaction=transaction, - statements=parsed, - seqno=seqno, - metadata=metadata, - ) + trace_attributes = { + # Get just the queries from the DML statement batch + "db.statement": [statement[0] for statement in statements] + } + with trace_call("CloudSpanner.DMLTransaction", self._session, trace_attributes): + response = api.execute_batch_dml( + session=self._session.name, + transaction=transaction, + statements=parsed, + seqno=seqno, + metadata=metadata, + ) row_counts = [ result_set.stats.row_count_exact for result_set in response.result_sets ] diff --git a/noxfile.py b/noxfile.py index ee0e4c8b78..aa6f2ef7e8 100644 --- a/noxfile.py +++ b/noxfile.py @@ -66,6 +66,12 @@ def lint_setup_py(session): def default(session): # Install all test dependencies, then install this package in-place. session.install("mock", "pytest", "pytest-cov") + + # Install opentelemetry dependencies + session.install( + "opentelemetry-api", "opentelemetry-sdk", "opentelemetry-instrumentation" + ) + session.install("-e", ".") # Run py.test against the unit tests. @@ -83,13 +89,13 @@ def default(session): ) -@nox.session(python=["2.7", "3.5", "3.6", "3.7", "3.8"]) +@nox.session(python=["3.5", "3.6", "3.7", "3.8"]) def unit(session): """Run the unit test suite.""" default(session) -@nox.session(python=["2.7", "3.7"]) +@nox.session(python="3.7") def system(session): """Run the system test suite.""" system_test_path = os.path.join("tests", "system.py") @@ -115,6 +121,11 @@ def system(session): # virtualenv's dist-packages. session.install("mock", "pytest") + # Install opentelemetry dependencies + session.install( + "opentelemetry-api", "opentelemetry-sdk", "opentelemetry-instrumentation" + ) + session.install("-e", ".") session.install("-e", "test_utils/") diff --git a/tests/_helpers.py b/tests/_helpers.py new file mode 100644 index 0000000000..f0cf298a2b --- /dev/null +++ b/tests/_helpers.py @@ -0,0 +1,35 @@ +import unittest +from opentelemetry import trace as trace_api +from opentelemetry.trace.status import StatusCanonicalCode + +from opentelemetry.sdk.trace import TracerProvider, export +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + + +class OpenTelemetryBase(unittest.TestCase): + def setUp(self): + self.original_tracer_provider = trace_api.get_tracer_provider() + self.tracer_provider = TracerProvider() + self.memory_exporter = InMemorySpanExporter() + span_processor = export.SimpleExportSpanProcessor(self.memory_exporter) + self.tracer_provider.add_span_processor(span_processor) + trace_api.set_tracer_provider(self.tracer_provider) + + def tearDown(self): + trace_api.set_tracer_provider(self.original_tracer_provider) + + def assertNoSpans(self): + span_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(span_list), 0) + + def assertSpanAttributes( + self, name, status=StatusCanonicalCode.OK, attributes=None, span=None + ): + if not span: + span_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(span_list), 1) + span = span_list[0] + print(status, attributes, span.status, span.attributes) + self.assertEqual(span.name, name) + self.assertEqual(span.status.canonical_code, status) + self.assertEqual(span.attributes, attributes) diff --git a/tests/system/test_system.py b/tests/system/test_system.py index 9fde7db0c3..27f55d5308 100644 --- a/tests/system/test_system.py +++ b/tests/system/test_system.py @@ -52,6 +52,7 @@ from test_utils.retry import RetryResult from test_utils.system import unique_resource_id from tests._fixtures import DDL_STATEMENTS +from tests._helpers import OpenTelemetryBase CREATE_INSTANCE = os.getenv("GOOGLE_CLOUD_TESTS_CREATE_SPANNER_INSTANCE") is not None @@ -67,6 +68,12 @@ COUNTERS_TABLE = "counters" COUNTERS_COLUMNS = ("name", "value") +BASE_ATTRIBUTES = { + "db.type": "spanner", + "db.url": "spanner.googleapis.com:443", + "net.host.name": "spanner.googleapis.com:443", +} + _STATUS_CODE_TO_GRPC_STATUS_CODE = { member.value[0]: member for member in grpc.StatusCode } @@ -726,7 +733,7 @@ def test_list_backups(self): NANO_TIME = DatetimeWithNanoseconds(1995, 8, 31, nanosecond=987654321) POS_INF = float("+inf") NEG_INF = float("-inf") -OTHER_NAN, = struct.unpack(" Date: Wed, 15 Jul 2020 15:56:43 -0400 Subject: [PATCH 2/7] bring back support for python2.7 --- .../spanner_v1/_opentelemetry_tracing.py | 2 +- google/cloud/spanner_v1/session.py | 6 +- noxfile.py | 22 +- tests/_helpers.py | 54 +++-- tests/system/test_system.py | 6 +- tests/unit/test__opentelemetry_tracing.py | 216 +++++++++--------- tests/unit/test_batch.py | 3 +- tests/unit/test_session.py | 17 +- tests/unit/test_snapshot.py | 55 +++-- tests/unit/test_transaction.py | 3 +- 10 files changed, 200 insertions(+), 184 deletions(-) diff --git a/google/cloud/spanner_v1/_opentelemetry_tracing.py b/google/cloud/spanner_v1/_opentelemetry_tracing.py index 5c1a487012..86a9fb7c51 100644 --- a/google/cloud/spanner_v1/_opentelemetry_tracing.py +++ b/google/cloud/spanner_v1/_opentelemetry_tracing.py @@ -1,4 +1,4 @@ -# Copyright 2016 Google LLC All rights reserved. +# Copyright 2020 Google LLC All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/google/cloud/spanner_v1/session.py b/google/cloud/spanner_v1/session.py index d05930389b..b3a1b7e6d8 100644 --- a/google/cloud/spanner_v1/session.py +++ b/google/cloud/spanner_v1/session.py @@ -139,9 +139,11 @@ def exists(self): with trace_call("CloudSpanner.GetSession", self) as span: try: api.get_session(self.name, metadata=metadata) - span.set_attribute("session_found", True) + if span: + span.set_attribute("session_found", True) except NotFound: - span.set_attribute("session_found", False) + if span: + span.set_attribute("session_found", False) return False return True diff --git a/noxfile.py b/noxfile.py index aa6f2ef7e8..91de61a9de 100644 --- a/noxfile.py +++ b/noxfile.py @@ -67,10 +67,11 @@ def default(session): # Install all test dependencies, then install this package in-place. session.install("mock", "pytest", "pytest-cov") - # Install opentelemetry dependencies - session.install( - "opentelemetry-api", "opentelemetry-sdk", "opentelemetry-instrumentation" - ) + # Install opentelemetry dependencies if python3+ + if session.python != "2.7": + session.install( + "opentelemetry-api", "opentelemetry-sdk", "opentelemetry-instrumentation" + ) session.install("-e", ".") @@ -89,13 +90,13 @@ def default(session): ) -@nox.session(python=["3.5", "3.6", "3.7", "3.8"]) +@nox.session(python=["2.7", "3.5", "3.6", "3.7", "3.8"]) def unit(session): """Run the unit test suite.""" default(session) -@nox.session(python="3.7") +@nox.session(python=["2.7", "3.7"]) def system(session): """Run the system test suite.""" system_test_path = os.path.join("tests", "system.py") @@ -121,10 +122,11 @@ def system(session): # virtualenv's dist-packages. session.install("mock", "pytest") - # Install opentelemetry dependencies - session.install( - "opentelemetry-api", "opentelemetry-sdk", "opentelemetry-instrumentation" - ) + # Install opentelemetry dependencies if not 2.7 + if session.python != "2.7": + session.install( + "opentelemetry-api", "opentelemetry-sdk", "opentelemetry-instrumentation" + ) session.install("-e", ".") session.install("-e", "test_utils/") diff --git a/tests/_helpers.py b/tests/_helpers.py index f0cf298a2b..2b013d8108 100644 --- a/tests/_helpers.py +++ b/tests/_helpers.py @@ -1,35 +1,47 @@ import unittest -from opentelemetry import trace as trace_api -from opentelemetry.trace.status import StatusCanonicalCode +from unittest import mock -from opentelemetry.sdk.trace import TracerProvider, export -from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter +try: + from opentelemetry import trace as trace_api + from opentelemetry.trace.status import StatusCanonicalCode + from opentelemetry.sdk.trace import TracerProvider, export + from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + + HAS_OPENTELEMETRY_INSTALLED = True +except ImportError: + HAS_OPENTELEMETRY_INSTALLED = False + + StatusCanonicalCode = mock.Mock() class OpenTelemetryBase(unittest.TestCase): def setUp(self): - self.original_tracer_provider = trace_api.get_tracer_provider() - self.tracer_provider = TracerProvider() - self.memory_exporter = InMemorySpanExporter() - span_processor = export.SimpleExportSpanProcessor(self.memory_exporter) - self.tracer_provider.add_span_processor(span_processor) - trace_api.set_tracer_provider(self.tracer_provider) + if HAS_OPENTELEMETRY_INSTALLED: + self.original_tracer_provider = trace_api.get_tracer_provider() + self.tracer_provider = TracerProvider() + self.memory_exporter = InMemorySpanExporter() + span_processor = export.SimpleExportSpanProcessor(self.memory_exporter) + self.tracer_provider.add_span_processor(span_processor) + trace_api.set_tracer_provider(self.tracer_provider) def tearDown(self): - trace_api.set_tracer_provider(self.original_tracer_provider) + if HAS_OPENTELEMETRY_INSTALLED: + trace_api.set_tracer_provider(self.original_tracer_provider) def assertNoSpans(self): - span_list = self.memory_exporter.get_finished_spans() - self.assertEqual(len(span_list), 0) + if HAS_OPENTELEMETRY_INSTALLED: + span_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(span_list), 0) def assertSpanAttributes( self, name, status=StatusCanonicalCode.OK, attributes=None, span=None ): - if not span: - span_list = self.memory_exporter.get_finished_spans() - self.assertEqual(len(span_list), 1) - span = span_list[0] - print(status, attributes, span.status, span.attributes) - self.assertEqual(span.name, name) - self.assertEqual(span.status.canonical_code, status) - self.assertEqual(span.attributes, attributes) + if HAS_OPENTELEMETRY_INSTALLED: + if not span: + span_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(span_list), 1) + span = span_list[0] + + self.assertEqual(span.name, name) + self.assertEqual(span.status.canonical_code, status) + self.assertEqual(span.attributes, attributes) diff --git a/tests/system/test_system.py b/tests/system/test_system.py index 27f55d5308..4855e5a784 100644 --- a/tests/system/test_system.py +++ b/tests/system/test_system.py @@ -1332,8 +1332,10 @@ def test_transaction_batch_update_wo_statements(self): transaction.batch_update([]) def test_transaction_batch_update_w_parent_span(self): - import sys - from opentelemetry import trace + try: + from opentelemetry import trace + except ImportError: + return tracer = trace.get_tracer(__name__) diff --git a/tests/unit/test__opentelemetry_tracing.py b/tests/unit/test__opentelemetry_tracing.py index 83a0cf2221..85d27a3553 100644 --- a/tests/unit/test__opentelemetry_tracing.py +++ b/tests/unit/test__opentelemetry_tracing.py @@ -3,27 +3,18 @@ import unittest import sys -from opentelemetry import trace as trace_api -from opentelemetry.trace.status import StatusCanonicalCode -from opentelemetry.sdk.trace import TracerProvider, export -from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter +try: + from opentelemetry import trace as trace_api + from opentelemetry.trace.status import StatusCanonicalCode + from opentelemetry.sdk.trace import TracerProvider, export + from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter +except ImportError: + pass from google.api_core.exceptions import GoogleAPICallError from google.cloud.spanner_v1 import _opentelemetry_tracing - -class OpenTelemetryBase(unittest.TestCase): - def setUp(self): - self.original_tracer_provider = trace_api.get_tracer_provider() - self.tracer_provider = TracerProvider() - self.memory_exporter = InMemorySpanExporter() - span_processor = export.SimpleExportSpanProcessor(self.memory_exporter) - self.tracer_provider.add_span_processor(span_processor) - trace_api.set_tracer_provider(self.tracer_provider) - - def tearDown(self): - trace_api.set_tracer_provider(self.original_tracer_provider) - +from tests._helpers import OpenTelemetryBase, HAS_OPENTELEMETRY_INSTALLED def _make_rpc_error(error_cls, trailing_metadata=None): import grpc @@ -38,102 +29,103 @@ def _make_session(): return mock.Mock(autospec=Session, instance=True) +# Skip all of these tests if we don't have OpenTelemetry +if HAS_OPENTELEMETRY_INSTALLED: + class TestNoTracing(unittest.TestCase): + def setUp(self): + self._temp_opentelemetry = sys.modules["opentelemetry"] + + sys.modules["opentelemetry"] = None + importlib.reload(_opentelemetry_tracing) + + def tearDown(self): + sys.modules["opentelemetry"] = self._temp_opentelemetry + importlib.reload(_opentelemetry_tracing) + + def test_no_trace_call(self): + with _opentelemetry_tracing.trace_call("Test", _make_session()) as no_span: + self.assertIsNone(no_span) + + + class TestTracing(OpenTelemetryBase): + def test_trace_call(self): + extra_attributes = { + "attribute1": "value1", + # Since our database is mocked, we have to override the db.instance parameter so it is a string + "db.instance": "database_name", + } + + expected_attributes = { + "db.type": "spanner", + "db.url": "spanner.googleapis.com:443", + "net.host.name": "spanner.googleapis.com:443", + } + expected_attributes.update(extra_attributes) -class TestNoTracing(unittest.TestCase): - def setUp(self): - self._temp_opentelemetry = sys.modules["opentelemetry"] - - sys.modules["opentelemetry"] = None - importlib.reload(_opentelemetry_tracing) - - def tearDown(self): - sys.modules["opentelemetry"] = self._temp_opentelemetry - importlib.reload(_opentelemetry_tracing) - - def test_no_trace_call(self): - with _opentelemetry_tracing.trace_call("Test", _make_session()) as no_span: - self.assertIsNone(no_span) - - -class TestTracing(OpenTelemetryBase): - def test_trace_call(self): - extra_attributes = { - "attribute1": "value1", - # Since our database is mocked, we have to override the db.instance parameter so it is a string - "db.instance": "database_name", - } - - expected_attributes = { - "db.type": "spanner", - "db.url": "spanner.googleapis.com:443", - "net.host.name": "spanner.googleapis.com:443", - } - expected_attributes.update(extra_attributes) - - with _opentelemetry_tracing.trace_call( - "CloudSpanner.Test", _make_session(), extra_attributes - ) as span: - span.set_attribute("after_setup_attribute", 1) - - expected_attributes["after_setup_attribute"] = 1 - - span_list = self.memory_exporter.get_finished_spans() - self.assertEqual(len(span_list), 1) - span = span_list[0] - self.assertEqual(span.kind, trace_api.SpanKind.CLIENT) - self.assertEqual(span.attributes, expected_attributes) - self.assertEqual(span.name, "CloudSpanner.Test") - self.assertEqual( - span.status.canonical_code, trace_api.status.StatusCanonicalCode.OK - ) - - def test_trace_error(self): - extra_attributes = {"db.instance": "database_name"} - - expected_attributes = { - "db.type": "spanner", - "db.url": "spanner.googleapis.com:443", - "net.host.name": "spanner.googleapis.com:443", - } - expected_attributes.update(extra_attributes) - - with self.assertRaises(GoogleAPICallError): - with _opentelemetry_tracing.trace_call( - "CloudSpanner.Test", _make_session(), extra_attributes - ) as span: - from google.api_core.exceptions import InvalidArgument - - raise _make_rpc_error(InvalidArgument) - - span_list = self.memory_exporter.get_finished_spans() - self.assertEqual(len(span_list), 1) - span = span_list[0] - self.assertEqual(span.kind, trace_api.SpanKind.CLIENT) - self.assertEqual(span.attributes, expected_attributes) - self.assertEqual(span.name, "CloudSpanner.Test") - self.assertEqual( - span.status.canonical_code, StatusCanonicalCode.INVALID_ARGUMENT - ) - - def test_trace_grpc_error(self): - extra_attributes = {"db.instance": "database_name"} - - expected_attributes = { - "db.type": "spanner", - "db.url": "spanner.googleapis.com:443", - "net.host.name": "spanner.googleapis.com:443", - } - expected_attributes.update(extra_attributes) - - with self.assertRaises(GoogleAPICallError): with _opentelemetry_tracing.trace_call( "CloudSpanner.Test", _make_session(), extra_attributes ) as span: - from google.api_core.exceptions import DataLoss - - raise _make_rpc_error(DataLoss) - - span_list = self.memory_exporter.get_finished_spans() - self.assertEqual(len(span_list), 1) - span = span_list[0] - self.assertEqual(span.status.canonical_code, StatusCanonicalCode.DATA_LOSS) + span.set_attribute("after_setup_attribute", 1) + + expected_attributes["after_setup_attribute"] = 1 + + span_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(span_list), 1) + span = span_list[0] + self.assertEqual(span.kind, trace_api.SpanKind.CLIENT) + self.assertEqual(span.attributes, expected_attributes) + self.assertEqual(span.name, "CloudSpanner.Test") + self.assertEqual( + span.status.canonical_code, trace_api.status.StatusCanonicalCode.OK + ) + + def test_trace_error(self): + extra_attributes = {"db.instance": "database_name"} + + expected_attributes = { + "db.type": "spanner", + "db.url": "spanner.googleapis.com:443", + "net.host.name": "spanner.googleapis.com:443", + } + expected_attributes.update(extra_attributes) + + with self.assertRaises(GoogleAPICallError): + with _opentelemetry_tracing.trace_call( + "CloudSpanner.Test", _make_session(), extra_attributes + ) as span: + from google.api_core.exceptions import InvalidArgument + + raise _make_rpc_error(InvalidArgument) + + span_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(span_list), 1) + span = span_list[0] + self.assertEqual(span.kind, trace_api.SpanKind.CLIENT) + self.assertEqual(span.attributes, expected_attributes) + self.assertEqual(span.name, "CloudSpanner.Test") + self.assertEqual( + span.status.canonical_code, StatusCanonicalCode.INVALID_ARGUMENT + ) + + def test_trace_grpc_error(self): + extra_attributes = {"db.instance": "database_name"} + + expected_attributes = { + "db.type": "spanner", + "db.url": "spanner.googleapis.com:443", + "net.host.name": "spanner.googleapis.com:443", + } + expected_attributes.update(extra_attributes) + + with self.assertRaises(GoogleAPICallError): + with _opentelemetry_tracing.trace_call( + "CloudSpanner.Test", _make_session(), extra_attributes + ) as span: + from google.api_core.exceptions import DataLoss + + raise _make_rpc_error(DataLoss) + + span_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(span_list), 1) + span = span_list[0] + self.assertEqual(span.status.canonical_code, StatusCanonicalCode.DATA_LOSS) diff --git a/tests/unit/test_batch.py b/tests/unit/test_batch.py index 291b8e4c32..9b831f4906 100644 --- a/tests/unit/test_batch.py +++ b/tests/unit/test_batch.py @@ -14,8 +14,7 @@ import unittest -from tests._helpers import OpenTelemetryBase -from opentelemetry.trace.status import StatusCanonicalCode +from tests._helpers import OpenTelemetryBase, StatusCanonicalCode TABLE_NAME = "citizens" COLUMNS = ["email", "first_name", "last_name", "age"] diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index ff0118206b..17b2ce4688 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -15,8 +15,7 @@ import google.api_core.gapic_v1.method import mock -from tests._helpers import OpenTelemetryBase -from opentelemetry.trace.status import StatusCanonicalCode +from tests._helpers import OpenTelemetryBase, StatusCanonicalCode, HAS_OPENTELEMETRY_INSTALLED def _make_rpc_error(error_cls, trailing_metadata=None): @@ -1109,7 +1108,12 @@ def _time(_results=[1, 1.5]): return _results.pop(0) with mock.patch("time.time", _time): - with mock.patch("opentelemetry.util.time", _ConstantTime()): + if HAS_OPENTELEMETRY_INSTALLED: + with mock.patch("opentelemetry.util.time", _ConstantTime()): + with mock.patch("time.sleep") as sleep_mock: + with self.assertRaises(Aborted): + session.run_in_transaction(unit_of_work, "abc", timeout_secs=1) + else: with mock.patch("time.sleep") as sleep_mock: with self.assertRaises(Aborted): session.run_in_transaction(unit_of_work, "abc", timeout_secs=1) @@ -1172,7 +1176,12 @@ def _time(_results=[1, 2, 4, 8]): return _results.pop(0) with mock.patch("time.time", _time): - with mock.patch("opentelemetry.util.time", _ConstantTime()): + if HAS_OPENTELEMETRY_INSTALLED: + with mock.patch("opentelemetry.util.time", _ConstantTime()): + with mock.patch("time.sleep") as sleep_mock: + with self.assertRaises(Aborted): + session.run_in_transaction(unit_of_work, timeout_secs=8) + else: with mock.patch("time.sleep") as sleep_mock: with self.assertRaises(Aborted): session.run_in_transaction(unit_of_work, timeout_secs=8) diff --git a/tests/unit/test_snapshot.py b/tests/unit/test_snapshot.py index 4e1dd80f48..17e10c7a9e 100644 --- a/tests/unit/test_snapshot.py +++ b/tests/unit/test_snapshot.py @@ -15,8 +15,7 @@ import google.api_core.gapic_v1.method import mock -from opentelemetry.trace.status import StatusCanonicalCode -from tests._helpers import OpenTelemetryBase +from tests._helpers import OpenTelemetryBase, StatusCanonicalCode, HAS_OPENTELEMETRY_INSTALLED TABLE_NAME = "citizens" COLUMNS = ["email", "first_name", "last_name", "age"] @@ -133,33 +132,34 @@ def test_iteration_w_span_creation(self): self.assertSpanAttributes(name, attributes=dict(BASE_ATTRIBUTES, test_att=1)) def test_iteration_w_multiple_span_creation(self): - FIRST = (self._make_item(0), self._make_item(1, resume_token=RESUME_TOKEN)) - SECOND = (self._make_item(2),) # discarded after 503 - LAST = (self._make_item(3),) - before = _MockIterator(*(FIRST + SECOND), fail_after=True) - after = _MockIterator(*LAST) - restart = mock.Mock(spec=[], side_effect=[before, after]) - name = "TestSpan" - resumable = self._call_fut(restart, name, _Session(_Database())) - self.assertEqual(list(resumable), list(FIRST + LAST)) - self.assertEqual( - restart.mock_calls, [mock.call(), mock.call(resume_token=RESUME_TOKEN)] - ) - - span_list = self.memory_exporter.get_finished_spans() - self.assertEqual(len(span_list), 2) - for span in span_list: - self.assertEqual(span.name, name) + if HAS_OPENTELEMETRY_INSTALLED: + FIRST = (self._make_item(0), self._make_item(1, resume_token=RESUME_TOKEN)) + SECOND = (self._make_item(2),) # discarded after 503 + LAST = (self._make_item(3),) + before = _MockIterator(*(FIRST + SECOND), fail_after=True) + after = _MockIterator(*LAST) + restart = mock.Mock(spec=[], side_effect=[before, after]) + name = "TestSpan" + resumable = self._call_fut(restart, name, _Session(_Database())) + self.assertEqual(list(resumable), list(FIRST + LAST)) self.assertEqual( - span.attributes, - { - "db.type": "spanner", - "db.url": "spanner.googleapis.com:443", - "db.instance": "testing", - "net.host.name": "spanner.googleapis.com:443", - }, + restart.mock_calls, [mock.call(), mock.call(resume_token=RESUME_TOKEN)] ) + span_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(span_list), 2) + for span in span_list: + self.assertEqual(span.name, name) + self.assertEqual( + span.attributes, + { + "db.type": "spanner", + "db.url": "spanner.googleapis.com:443", + "db.instance": "testing", + "net.host.name": "spanner.googleapis.com:443", + }, + ) + class Test_SnapshotBase(OpenTelemetryBase): @@ -215,8 +215,7 @@ def test_ctor(self): self.assertIs(base._session, session) self.assertEqual(base._execute_sql_count, 0) - span_list = self.memory_exporter.get_finished_spans() - self.assertEqual(len(span_list), 0) + self.assertNoSpans() def test__make_txn_selector_virtual(self): session = _Session() diff --git a/tests/unit/test_transaction.py b/tests/unit/test_transaction.py index 939242e6bf..e2ac7c2eec 100644 --- a/tests/unit/test_transaction.py +++ b/tests/unit/test_transaction.py @@ -14,8 +14,7 @@ import mock -from tests._helpers import OpenTelemetryBase -from opentelemetry.trace.status import StatusCanonicalCode +from tests._helpers import OpenTelemetryBase, StatusCanonicalCode TABLE_NAME = "citizens" COLUMNS = ["email", "first_name", "last_name", "age"] From bee0e58b31449bce3de286050cd97ee4ec178085 Mon Sep 17 00:00:00 2001 From: Connor Adams Date: Wed, 15 Jul 2020 17:13:44 -0400 Subject: [PATCH 3/7] address comments --- google/cloud/spanner_v1/_opentelemetry_tracing.py | 2 +- google/cloud/spanner_v1/snapshot.py | 10 ++-------- google/cloud/spanner_v1/transaction.py | 2 +- tests/_helpers.py | 5 ++++- tests/unit/test__opentelemetry_tracing.py | 10 ++++------ tests/unit/test_session.py | 12 +++++++++--- tests/unit/test_snapshot.py | 6 +++++- 7 files changed, 26 insertions(+), 21 deletions(-) diff --git a/google/cloud/spanner_v1/_opentelemetry_tracing.py b/google/cloud/spanner_v1/_opentelemetry_tracing.py index 86a9fb7c51..93357eda98 100644 --- a/google/cloud/spanner_v1/_opentelemetry_tracing.py +++ b/google/cloud/spanner_v1/_opentelemetry_tracing.py @@ -31,7 +31,7 @@ @contextmanager def trace_call(name, session, extra_attributes=None): - if not HAS_OPENTELEMETRY_INSTALLED: + if not HAS_OPENTELEMETRY_INSTALLED or not session: # empty context manager. users will have to check if the generated value is None or a span yield None return diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index 56e3ebaedc..0b5ee1d894 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -41,10 +41,7 @@ def _restart_on_unavailable(restart, trace_name=None, session=None, attributes=N """ resume_token = b"" item_buffer = [] - if trace_name and session: - with trace_call(trace_name, session, attributes): - iterator = restart() - else: + with trace_call(trace_name, session, attributes): iterator = restart() while True: try: @@ -55,10 +52,7 @@ def _restart_on_unavailable(restart, trace_name=None, session=None, attributes=N break except ServiceUnavailable: del item_buffer[:] - if trace_name and session: - with trace_call(trace_name, session, attributes): - iterator = restart(resume_token=resume_token) - else: + with trace_call(trace_name, session, attributes): iterator = restart(resume_token=resume_token) continue diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py index bbf676f2ce..80da63a3fd 100644 --- a/google/cloud/spanner_v1/transaction.py +++ b/google/cloud/spanner_v1/transaction.py @@ -279,7 +279,7 @@ def batch_update(self, statements): trace_attributes = { # Get just the queries from the DML statement batch - "db.statement": [statement[0] for statement in statements] + "db.statement": [statement["sql"] for statement in parsed] } with trace_call("CloudSpanner.DMLTransaction", self._session, trace_attributes): response = api.execute_batch_dml( diff --git a/tests/_helpers.py b/tests/_helpers.py index 2b013d8108..c4f90d8653 100644 --- a/tests/_helpers.py +++ b/tests/_helpers.py @@ -6,7 +6,9 @@ from opentelemetry.trace.status import StatusCanonicalCode from opentelemetry.sdk.trace import TracerProvider, export - from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, + ) HAS_OPENTELEMETRY_INSTALLED = True except ImportError: @@ -14,6 +16,7 @@ StatusCanonicalCode = mock.Mock() + class OpenTelemetryBase(unittest.TestCase): def setUp(self): if HAS_OPENTELEMETRY_INSTALLED: diff --git a/tests/unit/test__opentelemetry_tracing.py b/tests/unit/test__opentelemetry_tracing.py index 85d27a3553..8e26468dfe 100644 --- a/tests/unit/test__opentelemetry_tracing.py +++ b/tests/unit/test__opentelemetry_tracing.py @@ -6,8 +6,6 @@ try: from opentelemetry import trace as trace_api from opentelemetry.trace.status import StatusCanonicalCode - from opentelemetry.sdk.trace import TracerProvider, export - from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter except ImportError: pass @@ -16,6 +14,7 @@ from tests._helpers import OpenTelemetryBase, HAS_OPENTELEMETRY_INSTALLED + def _make_rpc_error(error_cls, trailing_metadata=None): import grpc @@ -29,8 +28,10 @@ def _make_session(): return mock.Mock(autospec=Session, instance=True) + # Skip all of these tests if we don't have OpenTelemetry if HAS_OPENTELEMETRY_INSTALLED: + class TestNoTracing(unittest.TestCase): def setUp(self): self._temp_opentelemetry = sys.modules["opentelemetry"] @@ -46,7 +47,6 @@ def test_no_trace_call(self): with _opentelemetry_tracing.trace_call("Test", _make_session()) as no_span: self.assertIsNone(no_span) - class TestTracing(OpenTelemetryBase): def test_trace_call(self): extra_attributes = { @@ -75,9 +75,7 @@ def test_trace_call(self): self.assertEqual(span.kind, trace_api.SpanKind.CLIENT) self.assertEqual(span.attributes, expected_attributes) self.assertEqual(span.name, "CloudSpanner.Test") - self.assertEqual( - span.status.canonical_code, trace_api.status.StatusCanonicalCode.OK - ) + self.assertEqual(span.status.canonical_code, StatusCanonicalCode.OK) def test_trace_error(self): extra_attributes = {"db.instance": "database_name"} diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index 17b2ce4688..e95b9e1a06 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -15,7 +15,11 @@ import google.api_core.gapic_v1.method import mock -from tests._helpers import OpenTelemetryBase, StatusCanonicalCode, HAS_OPENTELEMETRY_INSTALLED +from tests._helpers import ( + OpenTelemetryBase, + StatusCanonicalCode, + HAS_OPENTELEMETRY_INSTALLED, +) def _make_rpc_error(error_cls, trailing_metadata=None): @@ -43,7 +47,7 @@ class TestSession(OpenTelemetryBase): BASE_ATTRIBUTES = { "db.type": "spanner", "db.url": "spanner.googleapis.com:443", - "db.instance": "projects/project-id/instances/instance-id/databases/database-id", + "db.instance": DATABASE_NAME, "net.host.name": "spanner.googleapis.com:443", } @@ -1112,7 +1116,9 @@ def _time(_results=[1, 1.5]): with mock.patch("opentelemetry.util.time", _ConstantTime()): with mock.patch("time.sleep") as sleep_mock: with self.assertRaises(Aborted): - session.run_in_transaction(unit_of_work, "abc", timeout_secs=1) + session.run_in_transaction( + unit_of_work, "abc", timeout_secs=1 + ) else: with mock.patch("time.sleep") as sleep_mock: with self.assertRaises(Aborted): diff --git a/tests/unit/test_snapshot.py b/tests/unit/test_snapshot.py index 17e10c7a9e..5c53ee6a0e 100644 --- a/tests/unit/test_snapshot.py +++ b/tests/unit/test_snapshot.py @@ -15,7 +15,11 @@ import google.api_core.gapic_v1.method import mock -from tests._helpers import OpenTelemetryBase, StatusCanonicalCode, HAS_OPENTELEMETRY_INSTALLED +from tests._helpers import ( + OpenTelemetryBase, + StatusCanonicalCode, + HAS_OPENTELEMETRY_INSTALLED, +) TABLE_NAME = "citizens" COLUMNS = ["email", "first_name", "last_name", "age"] From 027e0b1bc94efd8225921a88a713f62e25552542 Mon Sep 17 00:00:00 2001 From: Connor Adams Date: Thu, 16 Jul 2020 10:37:30 -0400 Subject: [PATCH 4/7] fix 2.7 tests --- tests/_helpers.py | 2 +- tests/system/test_system.py | 209 +++++++++++++++++++----------------- 2 files changed, 113 insertions(+), 98 deletions(-) diff --git a/tests/_helpers.py b/tests/_helpers.py index c4f90d8653..6ebc4bb374 100644 --- a/tests/_helpers.py +++ b/tests/_helpers.py @@ -1,5 +1,5 @@ import unittest -from unittest import mock +import mock try: from opentelemetry import trace as trace_api diff --git a/tests/system/test_system.py b/tests/system/test_system.py index 4855e5a784..07702f25a6 100644 --- a/tests/system/test_system.py +++ b/tests/system/test_system.py @@ -52,7 +52,7 @@ from test_utils.retry import RetryResult from test_utils.system import unique_resource_id from tests._fixtures import DDL_STATEMENTS -from tests._helpers import OpenTelemetryBase +from tests._helpers import OpenTelemetryBase, HAS_OPENTELEMETRY_INSTALLED CREATE_INSTANCE = os.getenv("GOOGLE_CLOUD_TESTS_CREATE_SPANNER_INSTANCE") is not None @@ -805,11 +805,11 @@ def tearDownClass(cls): cls._db.drop() def setUp(self): - super().setUp() + super(TestSessionAPI, self).setUp() self.to_delete = [] def tearDown(self): - super().tearDown() + super(TestSessionAPI, self).tearDown() for doomed in self.to_delete: doomed.delete() @@ -835,39 +835,45 @@ def test_batch_insert_then_read(self): rows = list(snapshot.read(self.TABLE, self.COLUMNS, self.ALL)) self._check_rows_data(rows) - span_list = self.memory_exporter.get_finished_spans() - self.assertEqual(len(span_list), 4) - self.assertSpanAttributes( - "CloudSpanner.GetSession", - attributes=dict( - BASE_ATTRIBUTES, **{"db.instance": self._db.name}, session_found=True - ), - span=span_list[0], - ) - self.assertSpanAttributes( - "CloudSpanner.Commit", - attributes=dict( - BASE_ATTRIBUTES, **{"db.instance": self._db.name}, num_mutations=2 - ), - span=span_list[1], - ) - self.assertSpanAttributes( - "CloudSpanner.GetSession", - attributes=dict( - BASE_ATTRIBUTES, **{"db.instance": self._db.name}, session_found=True - ), - span=span_list[2], - ) - self.assertSpanAttributes( - "CloudSpanner.ReadOnlyTransaction", - attributes=dict( - BASE_ATTRIBUTES, - **{"db.instance": self._db.name}, - columns=self.COLUMNS, - table_id=self.TABLE - ), - span=span_list[3], - ) + if HAS_OPENTELEMETRY_INSTALLED: + span_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(span_list), 4) + self.assertSpanAttributes( + "CloudSpanner.GetSession", + attributes=dict( + BASE_ATTRIBUTES, + **{"db.instance": self._db.name, "session_found": True} + ), + span=span_list[0], + ) + self.assertSpanAttributes( + "CloudSpanner.Commit", + attributes=dict( + BASE_ATTRIBUTES, + **{"db.instance": self._db.name, "num_mutations": 2} + ), + span=span_list[1], + ) + self.assertSpanAttributes( + "CloudSpanner.GetSession", + attributes=dict( + BASE_ATTRIBUTES, + **{"db.instance": self._db.name, "session_found": True} + ), + span=span_list[2], + ) + self.assertSpanAttributes( + "CloudSpanner.ReadOnlyTransaction", + attributes=dict( + BASE_ATTRIBUTES, + **{ + "db.instance": self._db.name, + "columns": self.COLUMNS, + "table_id": self.TABLE, + } + ), + span=span_list[3], + ) def test_batch_insert_then_read_string_array_of_string(self): TABLE = "string_plus_array_of_string" @@ -967,67 +973,76 @@ def test_transaction_read_and_insert_then_rollback(self): rows = list(session.read(self.TABLE, self.COLUMNS, self.ALL)) self.assertEqual(rows, []) - span_list = self.memory_exporter.get_finished_spans() - self.assertEqual(len(span_list), 8) - self.assertSpanAttributes( - "CloudSpanner.CreateSession", - attributes=dict(BASE_ATTRIBUTES, **{"db.instance": self._db.name}), - span=span_list[0], - ) - self.assertSpanAttributes( - "CloudSpanner.GetSession", - attributes=dict( - BASE_ATTRIBUTES, **{"db.instance": self._db.name}, session_found=True - ), - span=span_list[1], - ) - self.assertSpanAttributes( - "CloudSpanner.Commit", - attributes=dict( - BASE_ATTRIBUTES, **{"db.instance": self._db.name}, num_mutations=1 - ), - span=span_list[2], - ) - self.assertSpanAttributes( - "CloudSpanner.BeginTransaction", - attributes=dict(BASE_ATTRIBUTES, **{"db.instance": self._db.name}), - span=span_list[3], - ) - self.assertSpanAttributes( - "CloudSpanner.ReadOnlyTransaction", - attributes=dict( - BASE_ATTRIBUTES, - **{"db.instance": self._db.name}, - table_id=self.TABLE, - columns=self.COLUMNS - ), - span=span_list[4], - ) - self.assertSpanAttributes( - "CloudSpanner.ReadOnlyTransaction", - attributes=dict( - BASE_ATTRIBUTES, - **{"db.instance": self._db.name}, - table_id=self.TABLE, - columns=self.COLUMNS - ), - span=span_list[5], - ) - self.assertSpanAttributes( - "CloudSpanner.Rollback", - attributes=dict(BASE_ATTRIBUTES, **{"db.instance": self._db.name}), - span=span_list[6], - ) - self.assertSpanAttributes( - "CloudSpanner.ReadOnlyTransaction", - attributes=dict( - BASE_ATTRIBUTES, - **{"db.instance": self._db.name}, - table_id=self.TABLE, - columns=self.COLUMNS - ), - span=span_list[7], - ) + if HAS_OPENTELEMETRY_INSTALLED: + span_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(span_list), 8) + self.assertSpanAttributes( + "CloudSpanner.CreateSession", + attributes=dict(BASE_ATTRIBUTES, **{"db.instance": self._db.name}), + span=span_list[0], + ) + self.assertSpanAttributes( + "CloudSpanner.GetSession", + attributes=dict( + BASE_ATTRIBUTES, + **{"db.instance": self._db.name, "session_found": True} + ), + span=span_list[1], + ) + self.assertSpanAttributes( + "CloudSpanner.Commit", + attributes=dict( + BASE_ATTRIBUTES, + **{"db.instance": self._db.name, "num_mutations": 1} + ), + span=span_list[2], + ) + self.assertSpanAttributes( + "CloudSpanner.BeginTransaction", + attributes=dict(BASE_ATTRIBUTES, **{"db.instance": self._db.name}), + span=span_list[3], + ) + self.assertSpanAttributes( + "CloudSpanner.ReadOnlyTransaction", + attributes=dict( + BASE_ATTRIBUTES, + **{ + "db.instance": self._db.name, + "table_id": self.TABLE, + "columns": self.COLUMNS, + } + ), + span=span_list[4], + ) + self.assertSpanAttributes( + "CloudSpanner.ReadOnlyTransaction", + attributes=dict( + BASE_ATTRIBUTES, + **{ + "db.instance": self._db.name, + "table_id": self.TABLE, + "columns": self.COLUMNS, + } + ), + span=span_list[5], + ) + self.assertSpanAttributes( + "CloudSpanner.Rollback", + attributes=dict(BASE_ATTRIBUTES, **{"db.instance": self._db.name}), + span=span_list[6], + ) + self.assertSpanAttributes( + "CloudSpanner.ReadOnlyTransaction", + attributes=dict( + BASE_ATTRIBUTES, + **{ + "db.instance": self._db.name, + "table_id": self.TABLE, + "columns": self.COLUMNS, + } + ), + span=span_list[7], + ) def _transaction_read_then_raise(self, transaction): rows = list(transaction.read(self.TABLE, self.COLUMNS, self.ALL)) From de4b7e7c4ecb22458f57c430c12ac468337b2c78 Mon Sep 17 00:00:00 2001 From: Connor Adams Date: Mon, 20 Jul 2020 10:53:47 -0400 Subject: [PATCH 5/7] nit fixes --- docs/opentelemetry-tracing.rst | 6 +++--- google/cloud/spanner_v1/_opentelemetry_tracing.py | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/opentelemetry-tracing.rst b/docs/opentelemetry-tracing.rst index f85f8c1833..4ac9f7d6c9 100644 --- a/docs/opentelemetry-tracing.rst +++ b/docs/opentelemetry-tracing.rst @@ -21,12 +21,12 @@ We also need to tell OpenTelemetry which exporter to use. For example, to export # in a seperate thread to not block on the main thread from opentelemetry.sdk.trace.export import BatchExportSpanProcessor - # create and export one trace every 1000 requests + # Create and export one trace every 1000 requests sampler = ProbabilitySampler(1/1000) - # Uses the default tracer provider + # Use the default tracer provider trace.set_tracer_provider(TracerProvider(sampler=sampler)) trace.get_tracer_provider().add_span_processor( - # initialize the cloud tracing exporter + # Initialize the cloud tracing exporter BatchExportSpanProcessor(CloudTraceSpanExporter()) ) diff --git a/google/cloud/spanner_v1/_opentelemetry_tracing.py b/google/cloud/spanner_v1/_opentelemetry_tracing.py index 93357eda98..60e68598e9 100644 --- a/google/cloud/spanner_v1/_opentelemetry_tracing.py +++ b/google/cloud/spanner_v1/_opentelemetry_tracing.py @@ -32,13 +32,13 @@ @contextmanager def trace_call(name, session, extra_attributes=None): if not HAS_OPENTELEMETRY_INSTALLED or not session: - # empty context manager. users will have to check if the generated value is None or a span + # Empty context manager. Users will have to check if the generated value is None or a span yield None return tracer = trace.get_tracer(__name__) - # base attributes that we know for every trace created + # Set base attributes that we know for every trace created attributes = { "db.type": "spanner", "db.url": spanner_client.SpannerClient.SERVICE_ADDRESS, From 190f6a78d0964647dfcbb46a26400be0ee3ed032 Mon Sep 17 00:00:00 2001 From: Connor Adams Date: Tue, 21 Jul 2020 14:26:58 -0400 Subject: [PATCH 6/7] db.statement join with ; --- google/cloud/spanner_v1/transaction.py | 2 +- tests/system/test_system.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py index 80da63a3fd..40116a9bbb 100644 --- a/google/cloud/spanner_v1/transaction.py +++ b/google/cloud/spanner_v1/transaction.py @@ -279,7 +279,7 @@ def batch_update(self, statements): trace_attributes = { # Get just the queries from the DML statement batch - "db.statement": [statement["sql"] for statement in parsed] + "db.statement": ";".join([statement["sql"] for statement in parsed]) } with trace_call("CloudSpanner.DMLTransaction", self._session, trace_attributes): response = api.execute_batch_dml( diff --git a/tests/system/test_system.py b/tests/system/test_system.py index 07702f25a6..7779769c8f 100644 --- a/tests/system/test_system.py +++ b/tests/system/test_system.py @@ -2513,7 +2513,7 @@ def test_execute_sql_returning_transfinite_floats(self): ) ) self.assertEqual(len(rows), 1) - (float_array,) = rows[0] + float_array = rows[0][0] self.assertEqual(float_array[0], float("-inf")) self.assertEqual(float_array[1], float("+inf")) # NaNs cannot be searched for by equality. From e78e78004ec4e426ff69368f1046cbd7b11e662a Mon Sep 17 00:00:00 2001 From: larkee <31196561+larkee@users.noreply.github.com> Date: Tue, 28 Jul 2020 14:58:38 +1200 Subject: [PATCH 7/7] Update docs/opentelemetry-tracing.rst --- docs/opentelemetry-tracing.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/opentelemetry-tracing.rst b/docs/opentelemetry-tracing.rst index 4ac9f7d6c9..8906db43b6 100644 --- a/docs/opentelemetry-tracing.rst +++ b/docs/opentelemetry-tracing.rst @@ -1,6 +1,6 @@ Tracing with OpenTelemetry ================================== -Python-spanner uses `OpenTelemetry `_ to automatically generates traces providing insight on calls to Cloud Spanner. +This library uses `OpenTelemetry `_ to automatically generate traces providing insight on calls to Cloud Spanner. For information on the benefits and utility of tracing, see the `Cloud Trace docs `_. To take advantage of these traces, we first need to install opentelemetry: