From 013f11fb55a5820083bd7f8323029834d77e21b3 Mon Sep 17 00:00:00 2001 From: Connor Adams Date: Mon, 29 Jun 2020 11:17:11 -0400 Subject: [PATCH] Tracing with OpenTelemetry --- docs/index.rst | 1 + docs/opentelemetry-tracing.rst | 36 ++++ .../spanner_v1/_opentelemetry_tracing.py | 65 +++++++ google/cloud/spanner_v1/batch.py | 15 +- google/cloud/spanner_v1/session.py | 23 ++- google/cloud/spanner_v1/snapshot.py | 83 ++++++--- google/cloud/spanner_v1/transaction.py | 70 +++++--- noxfile.py | 17 +- tests/_helpers.py | 35 ++++ tests/system/test_system.py | 169 +++++++++++++++++- tests/unit/test__opentelemetry_tracing.py | 139 ++++++++++++++ tests/unit/test_batch.py | 27 ++- tests/unit/test_session.py | 64 ++++++- tests/unit/test_snapshot.py | 143 ++++++++++++++- tests/unit/test_transaction.py | 66 ++++++- 15 files changed, 864 insertions(+), 89 deletions(-) create mode 100644 docs/opentelemetry-tracing.rst create mode 100644 google/cloud/spanner_v1/_opentelemetry_tracing.py create mode 100644 tests/_helpers.py create mode 100644 tests/unit/test__opentelemetry_tracing.py diff --git a/docs/index.rst b/docs/index.rst index 64c5c65c7fa..cabf56157c9 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -23,6 +23,7 @@ API Documentation api-reference advanced-session-pool-topics + opentelemetry-tracing Changelog --------- diff --git a/docs/opentelemetry-tracing.rst b/docs/opentelemetry-tracing.rst new file mode 100644 index 00000000000..f85f8c1833a --- /dev/null +++ b/docs/opentelemetry-tracing.rst @@ -0,0 +1,36 @@ +Tracing with OpenTelemetry +================================== +Python-spanner uses `OpenTelemetry `_ to automatically generates traces providing insight on calls to Cloud Spanner. +For information on the benefits and utility of tracing, see the `Cloud Trace docs `_. + +To take advantage of these traces, we first need to install opentelemetry: + +.. code-block:: sh + + pip install opentelemetry-api opentelemetry-sdk opentelemetry-instrumentation + +We also need to tell OpenTelemetry which exporter to use. For example, to export python-spanner traces to `Cloud Tracing `_, add the following lines to your application: + +.. code:: python + + from opentelemetry import trace + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.trace.sampling import ProbabilitySampler + from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter + # BatchExportSpanProcessor exports spans to Cloud Trace + # in a seperate thread to not block on the main thread + from opentelemetry.sdk.trace.export import BatchExportSpanProcessor + + # create and export one trace every 1000 requests + sampler = ProbabilitySampler(1/1000) + # Uses the default tracer provider + trace.set_tracer_provider(TracerProvider(sampler=sampler)) + trace.get_tracer_provider().add_span_processor( + # initialize the cloud tracing exporter + BatchExportSpanProcessor(CloudTraceSpanExporter()) + ) + +Generated spanner traces should now be available on `Cloud Trace `_. + +Tracing is most effective when many libraries are instrumented to provide insight over the entire lifespan of a request. +For a list of libraries that can be instrumented, see the `OpenTelemetry Integrations` section of the `OpenTelemetry Python docs `_ diff --git a/google/cloud/spanner_v1/_opentelemetry_tracing.py b/google/cloud/spanner_v1/_opentelemetry_tracing.py new file mode 100644 index 00000000000..4d9cf46248a --- /dev/null +++ b/google/cloud/spanner_v1/_opentelemetry_tracing.py @@ -0,0 +1,65 @@ +# Copyright 2016 Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Manages OpenTelemetry trace creation and handling""" + +from contextlib import contextmanager + +from google.api_core.exceptions import GoogleAPICallError +from google.cloud.spanner_v1.gapic import spanner_client + +try: + from opentelemetry import trace + from opentelemetry.trace.status import Status, StatusCanonicalCode + from opentelemetry.instrumentation.utils import http_status_to_canonical_code + + HAS_OPENTELEMETRY_INSTALLED = True +except (ImportError, ModuleNotFoundError): + HAS_OPENTELEMETRY_INSTALLED = False + + +@contextmanager +def trace_call(name, session, extra_attributes=None): + if not HAS_OPENTELEMETRY_INSTALLED: + # empty context manager. users will have to check if the generated value is None or a span + yield None + return + + tracer = trace.get_tracer(__name__) + + # base attributes that we know for every trace created + attributes = { + "db.type": "spanner", + "db.url": spanner_client.SpannerClient.SERVICE_ADDRESS, + "db.instance": session._database.name, + "net.host.name": spanner_client.SpannerClient.SERVICE_ADDRESS, + } + + if extra_attributes: + attributes.update(extra_attributes) + + with tracer.start_as_current_span( + name, kind=trace.SpanKind.CLIENT, attributes=attributes + ) as span: + try: + yield span + except GoogleAPICallError as error: + if error.code is not None: + span.set_status(Status(http_status_to_canonical_code(error.code))) + elif error.grpc_status_code is not None: + span.set_status( + # OpenTelemetry's StatusCanonicalCode maps 1-1 with grpc status codes + Status(StatusCanonicalCode(error.grpc_status_code.value[0])) + ) + raise diff --git a/google/cloud/spanner_v1/batch.py b/google/cloud/spanner_v1/batch.py index e62763d7fd7..7ab394b2157 100644 --- a/google/cloud/spanner_v1/batch.py +++ b/google/cloud/spanner_v1/batch.py @@ -22,6 +22,7 @@ from google.cloud.spanner_v1._helpers import _SessionWrapper from google.cloud.spanner_v1._helpers import _make_list_value_pbs from google.cloud.spanner_v1._helpers import _metadata_with_prefix +from google.cloud.spanner_v1._opentelemetry_tracing import trace_call # pylint: enable=ungrouped-imports @@ -147,12 +148,14 @@ def commit(self): api = database.spanner_api metadata = _metadata_with_prefix(database.name) txn_options = TransactionOptions(read_write=TransactionOptions.ReadWrite()) - response = api.commit( - self._session.name, - mutations=self._mutations, - single_use_transaction=txn_options, - metadata=metadata, - ) + trace_attributes = {"num_mutations": len(self._mutations)} + with trace_call("CloudSpanner.Commit", self._session, trace_attributes): + response = api.commit( + self._session.name, + mutations=self._mutations, + single_use_transaction=txn_options, + metadata=metadata, + ) self.committed = _pb_timestamp_to_datetime(response.commit_timestamp) return self.committed diff --git a/google/cloud/spanner_v1/session.py b/google/cloud/spanner_v1/session.py index a84aaa7c6d9..d05930389b7 100644 --- a/google/cloud/spanner_v1/session.py +++ b/google/cloud/spanner_v1/session.py @@ -26,6 +26,7 @@ from google.cloud.spanner_v1.batch import Batch from google.cloud.spanner_v1.snapshot import Snapshot from google.cloud.spanner_v1.transaction import Transaction +from google.cloud.spanner_v1._opentelemetry_tracing import trace_call import random # pylint: enable=ungrouped-imports @@ -114,7 +115,11 @@ def create(self): kw = {} if self._labels: kw = {"session": {"labels": self._labels}} - session_pb = api.create_session(self._database.name, metadata=metadata, **kw) + + with trace_call("CloudSpanner.CreateSession", self, self._labels): + session_pb = api.create_session( + self._database.name, metadata=metadata, **kw + ) self._session_id = session_pb.name.split("/")[-1] def exists(self): @@ -130,10 +135,14 @@ def exists(self): return False api = self._database.spanner_api metadata = _metadata_with_prefix(self._database.name) - try: - api.get_session(self.name, metadata=metadata) - except NotFound: - return False + + with trace_call("CloudSpanner.GetSession", self) as span: + try: + api.get_session(self.name, metadata=metadata) + span.set_attribute("session_found", True) + except NotFound: + span.set_attribute("session_found", False) + return False return True @@ -150,8 +159,8 @@ def delete(self): raise ValueError("Session ID not set by back-end") api = self._database.spanner_api metadata = _metadata_with_prefix(self._database.name) - - api.delete_session(self.name, metadata=metadata) + with trace_call("CloudSpanner.DeleteSession", self): + api.delete_session(self.name, metadata=metadata) def ping(self): """Ping the session to keep it alive by executing "SELECT 1". diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index f7b9f07f8fa..04028537a18 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -30,9 +30,10 @@ from google.cloud.spanner_v1._helpers import _SessionWrapper from google.cloud.spanner_v1.streamed import StreamedResultSet from google.cloud.spanner_v1.types import PartitionOptions +from google.cloud.spanner_v1._opentelemetry_tracing import trace_call -def _restart_on_unavailable(restart): +def _restart_on_unavailable(restart, trace_name=None, session=None, attributes=None): """Restart iteration after :exc:`.ServiceUnavailable`. :type restart: callable @@ -40,7 +41,11 @@ def _restart_on_unavailable(restart): """ resume_token = b"" item_buffer = [] - iterator = restart() + if trace_name and session: + with trace_call(trace_name, session, attributes): + iterator = restart() + else: + iterator = restart() while True: try: for item in iterator: @@ -50,7 +55,11 @@ def _restart_on_unavailable(restart): break except ServiceUnavailable: del item_buffer[:] - iterator = restart(resume_token=resume_token) + if trace_name and session: + with trace_call(trace_name, session, attributes): + iterator = restart(resume_token=resume_token) + else: + iterator = restart(resume_token=resume_token) continue if len(item_buffer) == 0: @@ -143,7 +152,10 @@ def read(self, table, columns, keyset, index="", limit=0, partition=None): metadata=metadata, ) - iterator = _restart_on_unavailable(restart) + trace_attributes = {"table_id": table, "columns": columns} + iterator = _restart_on_unavailable( + restart, "CloudSpanner.ReadOnlyTransaction", self._session, trace_attributes + ) self._read_request_count += 1 @@ -243,7 +255,13 @@ def execute_sql( timeout=timeout, ) - iterator = _restart_on_unavailable(restart) + trace_attributes = {"db.statement": sql} + iterator = _restart_on_unavailable( + restart, + "CloudSpanner.ReadWriteTransaction", + self._session, + trace_attributes, + ) self._read_request_count += 1 self._execute_sql_count += 1 @@ -309,16 +327,20 @@ def partition_read( partition_size_bytes=partition_size_bytes, max_partitions=max_partitions ) - response = api.partition_read( - session=self._session.name, - table=table, - columns=columns, - key_set=keyset._to_pb(), - transaction=transaction, - index=index, - partition_options=partition_options, - metadata=metadata, - ) + trace_attributes = {"table_id": table, "columns": columns} + with trace_call( + "CloudSpanner.PartitionReadOnlyTransaction", self._session, trace_attributes + ): + response = api.partition_read( + session=self._session.name, + table=table, + columns=columns, + key_set=keyset._to_pb(), + transaction=transaction, + index=index, + partition_options=partition_options, + metadata=metadata, + ) return [partition.partition_token for partition in response.partitions] @@ -385,15 +407,21 @@ def partition_query( partition_size_bytes=partition_size_bytes, max_partitions=max_partitions ) - response = api.partition_query( - session=self._session.name, - sql=sql, - transaction=transaction, - params=params_pb, - param_types=param_types, - partition_options=partition_options, - metadata=metadata, - ) + trace_attributes = {"db.statement": sql} + with trace_call( + "CloudSpanner.PartitionReadWriteTransaction", + self._session, + trace_attributes, + ): + response = api.partition_query( + session=self._session.name, + sql=sql, + transaction=transaction, + params=params_pb, + param_types=param_types, + partition_options=partition_options, + metadata=metadata, + ) return [partition.partition_token for partition in response.partitions] @@ -515,8 +543,9 @@ def begin(self): api = database.spanner_api metadata = _metadata_with_prefix(database.name) txn_selector = self._make_txn_selector() - response = api.begin_transaction( - self._session.name, txn_selector.begin, metadata=metadata - ) + with trace_call("CloudSpanner.BeginTransaction", self._session): + response = api.begin_transaction( + self._session.name, txn_selector.begin, metadata=metadata + ) self._transaction_id = response.id return self._transaction_id diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py index 3c1abc73269..bbf676f2cee 100644 --- a/google/cloud/spanner_v1/transaction.py +++ b/google/cloud/spanner_v1/transaction.py @@ -26,6 +26,7 @@ from google.cloud.spanner_v1.proto.transaction_pb2 import TransactionOptions from google.cloud.spanner_v1.snapshot import _SnapshotBase from google.cloud.spanner_v1.batch import _BatchBase +from google.cloud.spanner_v1._opentelemetry_tracing import trace_call class Transaction(_SnapshotBase, _BatchBase): @@ -95,9 +96,10 @@ def begin(self): api = database.spanner_api metadata = _metadata_with_prefix(database.name) txn_options = TransactionOptions(read_write=TransactionOptions.ReadWrite()) - response = api.begin_transaction( - self._session.name, txn_options, metadata=metadata - ) + with trace_call("CloudSpanner.BeginTransaction", self._session): + response = api.begin_transaction( + self._session.name, txn_options, metadata=metadata + ) self._transaction_id = response.id return self._transaction_id @@ -107,7 +109,8 @@ def rollback(self): database = self._session._database api = database.spanner_api metadata = _metadata_with_prefix(database.name) - api.rollback(self._session.name, self._transaction_id, metadata=metadata) + with trace_call("CloudSpanner.Rollback", self._session): + api.rollback(self._session.name, self._transaction_id, metadata=metadata) self.rolled_back = True del self._session._transaction @@ -123,12 +126,14 @@ def commit(self): database = self._session._database api = database.spanner_api metadata = _metadata_with_prefix(database.name) - response = api.commit( - self._session.name, - mutations=self._mutations, - transaction_id=self._transaction_id, - metadata=metadata, - ) + trace_attributes = {"num_mutations": len(self._mutations)} + with trace_call("CloudSpanner.Commit", self._session, trace_attributes): + response = api.commit( + self._session.name, + mutations=self._mutations, + transaction_id=self._transaction_id, + metadata=metadata, + ) self.committed = _pb_timestamp_to_datetime(response.commit_timestamp) del self._session._transaction return self.committed @@ -212,17 +217,21 @@ def execute_update( default_query_options = database._instance._client._query_options query_options = _merge_query_options(default_query_options, query_options) - response = api.execute_sql( - self._session.name, - dml, - transaction=transaction, - params=params_pb, - param_types=param_types, - query_mode=query_mode, - query_options=query_options, - seqno=seqno, - metadata=metadata, - ) + trace_attributes = {"db.statement": dml} + with trace_call( + "CloudSpanner.ReadWriteTransaction", self._session, trace_attributes + ): + response = api.execute_sql( + self._session.name, + dml, + transaction=transaction, + params=params_pb, + param_types=param_types, + query_mode=query_mode, + query_options=query_options, + seqno=seqno, + metadata=metadata, + ) return response.stats.row_count_exact def batch_update(self, statements): @@ -268,13 +277,18 @@ def batch_update(self, statements): self._execute_sql_count + 1, ) - response = api.execute_batch_dml( - session=self._session.name, - transaction=transaction, - statements=parsed, - seqno=seqno, - metadata=metadata, - ) + trace_attributes = { + # Get just the queries from the DML statement batch + "db.statement": [statement[0] for statement in statements] + } + with trace_call("CloudSpanner.DMLTransaction", self._session, trace_attributes): + response = api.execute_batch_dml( + session=self._session.name, + transaction=transaction, + statements=parsed, + seqno=seqno, + metadata=metadata, + ) row_counts = [ result_set.stats.row_count_exact for result_set in response.result_sets ] diff --git a/noxfile.py b/noxfile.py index ee0e4c8b78a..3e49796d401 100644 --- a/noxfile.py +++ b/noxfile.py @@ -66,8 +66,14 @@ def lint_setup_py(session): def default(session): # Install all test dependencies, then install this package in-place. session.install("mock", "pytest", "pytest-cov") - session.install("-e", ".") + # Install opentelemetry dependencies + session.install( + "opentelemetry-api", "opentelemetry-sdk", "opentelemetry-instrumentation" + ) + + session.install("-e", ".") + # Run py.test against the unit tests. session.run( "py.test", @@ -83,13 +89,13 @@ def default(session): ) -@nox.session(python=["2.7", "3.5", "3.6", "3.7", "3.8"]) +@nox.session(python=["3.5", "3.6", "3.7", "3.8"]) def unit(session): """Run the unit test suite.""" default(session) -@nox.session(python=["2.7", "3.7"]) +@nox.session(python="3.7") def system(session): """Run the system test suite.""" system_test_path = os.path.join("tests", "system.py") @@ -115,6 +121,11 @@ def system(session): # virtualenv's dist-packages. session.install("mock", "pytest") + # Install opentelemetry dependencies + session.install( + "opentelemetry-api", "opentelemetry-sdk", "opentelemetry-instrumentation" + ) + session.install("-e", ".") session.install("-e", "test_utils/") diff --git a/tests/_helpers.py b/tests/_helpers.py new file mode 100644 index 00000000000..f0cf298a2b1 --- /dev/null +++ b/tests/_helpers.py @@ -0,0 +1,35 @@ +import unittest +from opentelemetry import trace as trace_api +from opentelemetry.trace.status import StatusCanonicalCode + +from opentelemetry.sdk.trace import TracerProvider, export +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + + +class OpenTelemetryBase(unittest.TestCase): + def setUp(self): + self.original_tracer_provider = trace_api.get_tracer_provider() + self.tracer_provider = TracerProvider() + self.memory_exporter = InMemorySpanExporter() + span_processor = export.SimpleExportSpanProcessor(self.memory_exporter) + self.tracer_provider.add_span_processor(span_processor) + trace_api.set_tracer_provider(self.tracer_provider) + + def tearDown(self): + trace_api.set_tracer_provider(self.original_tracer_provider) + + def assertNoSpans(self): + span_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(span_list), 0) + + def assertSpanAttributes( + self, name, status=StatusCanonicalCode.OK, attributes=None, span=None + ): + if not span: + span_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(span_list), 1) + span = span_list[0] + print(status, attributes, span.status, span.attributes) + self.assertEqual(span.name, name) + self.assertEqual(span.status.canonical_code, status) + self.assertEqual(span.attributes, attributes) diff --git a/tests/system/test_system.py b/tests/system/test_system.py index 9fde7db0c3e..27f55d53085 100644 --- a/tests/system/test_system.py +++ b/tests/system/test_system.py @@ -52,6 +52,7 @@ from test_utils.retry import RetryResult from test_utils.system import unique_resource_id from tests._fixtures import DDL_STATEMENTS +from tests._helpers import OpenTelemetryBase CREATE_INSTANCE = os.getenv("GOOGLE_CLOUD_TESTS_CREATE_SPANNER_INSTANCE") is not None @@ -67,6 +68,12 @@ COUNTERS_TABLE = "counters" COUNTERS_COLUMNS = ("name", "value") +BASE_ATTRIBUTES = { + "db.type": "spanner", + "db.url": "spanner.googleapis.com:443", + "net.host.name": "spanner.googleapis.com:443", +} + _STATUS_CODE_TO_GRPC_STATUS_CODE = { member.value[0]: member for member in grpc.StatusCode } @@ -726,7 +733,7 @@ def test_list_backups(self): NANO_TIME = DatetimeWithNanoseconds(1995, 8, 31, nanosecond=987654321) POS_INF = float("+inf") NEG_INF = float("-inf") -OTHER_NAN, = struct.unpack("