Skip to content

Commit

Permalink
feat: add optional span creation with OpenTelemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
cnnradams committed Jul 2, 2020
1 parent 1e178c5 commit 039ff4f
Show file tree
Hide file tree
Showing 15 changed files with 864 additions and 89 deletions.
1 change: 1 addition & 0 deletions docs/index.rst
Expand Up @@ -23,6 +23,7 @@ API Documentation

api-reference
advanced-session-pool-topics
opentelemetry-tracing

Changelog
---------
Expand Down
36 changes: 36 additions & 0 deletions docs/opentelemetry-tracing.rst
@@ -0,0 +1,36 @@
Tracing with OpenTelemetry
==================================
Python-spanner uses `OpenTelemetry <https://opentelemetry.io/>`_ to automatically generates traces providing insight on calls to Cloud Spanner.
For information on the benefits and utility of tracing, see the `Cloud Trace docs <https://cloud.google.com/trace/docs/overview>`_.

To take advantage of these traces, we first need to install opentelemetry:

.. code-block:: sh
pip install opentelemetry-api opentelemetry-sdk opentelemetry-instrumentation
We also need to tell OpenTelemetry which exporter to use. For example, to export python-spanner traces to `Cloud Tracing <https://cloud.google.com/trace>`_, add the following lines to your application:

.. code:: python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.trace.sampling import ProbabilitySampler
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
# BatchExportSpanProcessor exports spans to Cloud Trace
# in a seperate thread to not block on the main thread
from opentelemetry.sdk.trace.export import BatchExportSpanProcessor
# create and export one trace every 1000 requests
sampler = ProbabilitySampler(1/1000)
# Uses the default tracer provider
trace.set_tracer_provider(TracerProvider(sampler=sampler))
trace.get_tracer_provider().add_span_processor(
# initialize the cloud tracing exporter
BatchExportSpanProcessor(CloudTraceSpanExporter())
)
Generated spanner traces should now be available on `Cloud Trace <https://console.cloud.google.com/traces>`_.

Tracing is most effective when many libraries are instrumented to provide insight over the entire lifespan of a request.
For a list of libraries that can be instrumented, see the `OpenTelemetry Integrations` section of the `OpenTelemetry Python docs <https://opentelemetry-python.readthedocs.io/en/stable/>`_
65 changes: 65 additions & 0 deletions google/cloud/spanner_v1/_opentelemetry_tracing.py
@@ -0,0 +1,65 @@
# Copyright 2016 Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Manages OpenTelemetry trace creation and handling"""

from contextlib import contextmanager

from google.api_core.exceptions import GoogleAPICallError
from google.cloud.spanner_v1.gapic import spanner_client

try:
from opentelemetry import trace
from opentelemetry.trace.status import Status, StatusCanonicalCode
from opentelemetry.instrumentation.utils import http_status_to_canonical_code

HAS_OPENTELEMETRY_INSTALLED = True
except (ImportError, ModuleNotFoundError):
HAS_OPENTELEMETRY_INSTALLED = False


@contextmanager
def trace_call(name, session, extra_attributes=None):
if not HAS_OPENTELEMETRY_INSTALLED:
# empty context manager. users will have to check if the generated value is None or a span
yield None
return

tracer = trace.get_tracer(__name__)

# base attributes that we know for every trace created
attributes = {
"db.type": "spanner",
"db.url": spanner_client.SpannerClient.SERVICE_ADDRESS,
"db.instance": session._database.name,
"net.host.name": spanner_client.SpannerClient.SERVICE_ADDRESS,
}

if extra_attributes:
attributes.update(extra_attributes)

with tracer.start_as_current_span(
name, kind=trace.SpanKind.CLIENT, attributes=attributes
) as span:
try:
yield span
except GoogleAPICallError as error:
if error.code is not None:
span.set_status(Status(http_status_to_canonical_code(error.code)))
elif error.grpc_status_code is not None:
span.set_status(
# OpenTelemetry's StatusCanonicalCode maps 1-1 with grpc status codes
Status(StatusCanonicalCode(error.grpc_status_code.value[0]))
)
raise
15 changes: 9 additions & 6 deletions google/cloud/spanner_v1/batch.py
Expand Up @@ -22,6 +22,7 @@
from google.cloud.spanner_v1._helpers import _SessionWrapper
from google.cloud.spanner_v1._helpers import _make_list_value_pbs
from google.cloud.spanner_v1._helpers import _metadata_with_prefix
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call

# pylint: enable=ungrouped-imports

Expand Down Expand Up @@ -147,12 +148,14 @@ def commit(self):
api = database.spanner_api
metadata = _metadata_with_prefix(database.name)
txn_options = TransactionOptions(read_write=TransactionOptions.ReadWrite())
response = api.commit(
self._session.name,
mutations=self._mutations,
single_use_transaction=txn_options,
metadata=metadata,
)
trace_attributes = {"num_mutations": len(self._mutations)}
with trace_call("CloudSpanner.Commit", self._session, trace_attributes):
response = api.commit(
self._session.name,
mutations=self._mutations,
single_use_transaction=txn_options,
metadata=metadata,
)
self.committed = _pb_timestamp_to_datetime(response.commit_timestamp)
return self.committed

Expand Down
23 changes: 16 additions & 7 deletions google/cloud/spanner_v1/session.py
Expand Up @@ -26,6 +26,7 @@
from google.cloud.spanner_v1.batch import Batch
from google.cloud.spanner_v1.snapshot import Snapshot
from google.cloud.spanner_v1.transaction import Transaction
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
import random

# pylint: enable=ungrouped-imports
Expand Down Expand Up @@ -114,7 +115,11 @@ def create(self):
kw = {}
if self._labels:
kw = {"session": {"labels": self._labels}}
session_pb = api.create_session(self._database.name, metadata=metadata, **kw)

with trace_call("CloudSpanner.CreateSession", self, self._labels):
session_pb = api.create_session(
self._database.name, metadata=metadata, **kw
)
self._session_id = session_pb.name.split("/")[-1]

def exists(self):
Expand All @@ -130,10 +135,14 @@ def exists(self):
return False
api = self._database.spanner_api
metadata = _metadata_with_prefix(self._database.name)
try:
api.get_session(self.name, metadata=metadata)
except NotFound:
return False

with trace_call("CloudSpanner.GetSession", self) as span:
try:
api.get_session(self.name, metadata=metadata)
span.set_attribute("session_found", True)
except NotFound:
span.set_attribute("session_found", False)
return False

return True

Expand All @@ -150,8 +159,8 @@ def delete(self):
raise ValueError("Session ID not set by back-end")
api = self._database.spanner_api
metadata = _metadata_with_prefix(self._database.name)

api.delete_session(self.name, metadata=metadata)
with trace_call("CloudSpanner.DeleteSession", self):
api.delete_session(self.name, metadata=metadata)

def ping(self):
"""Ping the session to keep it alive by executing "SELECT 1".
Expand Down
83 changes: 56 additions & 27 deletions google/cloud/spanner_v1/snapshot.py
Expand Up @@ -30,17 +30,22 @@
from google.cloud.spanner_v1._helpers import _SessionWrapper
from google.cloud.spanner_v1.streamed import StreamedResultSet
from google.cloud.spanner_v1.types import PartitionOptions
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call


def _restart_on_unavailable(restart):
def _restart_on_unavailable(restart, trace_name=None, session=None, attributes=None):
"""Restart iteration after :exc:`.ServiceUnavailable`.
:type restart: callable
:param restart: curried function returning iterator
"""
resume_token = b""
item_buffer = []
iterator = restart()
if trace_name and session:
with trace_call(trace_name, session, attributes):
iterator = restart()
else:
iterator = restart()
while True:
try:
for item in iterator:
Expand All @@ -50,7 +55,11 @@ def _restart_on_unavailable(restart):
break
except ServiceUnavailable:
del item_buffer[:]
iterator = restart(resume_token=resume_token)
if trace_name and session:
with trace_call(trace_name, session, attributes):
iterator = restart(resume_token=resume_token)
else:
iterator = restart(resume_token=resume_token)
continue

if len(item_buffer) == 0:
Expand Down Expand Up @@ -143,7 +152,10 @@ def read(self, table, columns, keyset, index="", limit=0, partition=None):
metadata=metadata,
)

iterator = _restart_on_unavailable(restart)
trace_attributes = {"table_id": table, "columns": columns}
iterator = _restart_on_unavailable(
restart, "CloudSpanner.ReadOnlyTransaction", self._session, trace_attributes
)

self._read_request_count += 1

Expand Down Expand Up @@ -243,7 +255,13 @@ def execute_sql(
timeout=timeout,
)

iterator = _restart_on_unavailable(restart)
trace_attributes = {"db.statement": sql}
iterator = _restart_on_unavailable(
restart,
"CloudSpanner.ReadWriteTransaction",
self._session,
trace_attributes,
)

self._read_request_count += 1
self._execute_sql_count += 1
Expand Down Expand Up @@ -309,16 +327,20 @@ def partition_read(
partition_size_bytes=partition_size_bytes, max_partitions=max_partitions
)

response = api.partition_read(
session=self._session.name,
table=table,
columns=columns,
key_set=keyset._to_pb(),
transaction=transaction,
index=index,
partition_options=partition_options,
metadata=metadata,
)
trace_attributes = {"table_id": table, "columns": columns}
with trace_call(
"CloudSpanner.PartitionReadOnlyTransaction", self._session, trace_attributes
):
response = api.partition_read(
session=self._session.name,
table=table,
columns=columns,
key_set=keyset._to_pb(),
transaction=transaction,
index=index,
partition_options=partition_options,
metadata=metadata,
)

return [partition.partition_token for partition in response.partitions]

Expand Down Expand Up @@ -385,15 +407,21 @@ def partition_query(
partition_size_bytes=partition_size_bytes, max_partitions=max_partitions
)

response = api.partition_query(
session=self._session.name,
sql=sql,
transaction=transaction,
params=params_pb,
param_types=param_types,
partition_options=partition_options,
metadata=metadata,
)
trace_attributes = {"db.statement": sql}
with trace_call(
"CloudSpanner.PartitionReadWriteTransaction",
self._session,
trace_attributes,
):
response = api.partition_query(
session=self._session.name,
sql=sql,
transaction=transaction,
params=params_pb,
param_types=param_types,
partition_options=partition_options,
metadata=metadata,
)

return [partition.partition_token for partition in response.partitions]

Expand Down Expand Up @@ -515,8 +543,9 @@ def begin(self):
api = database.spanner_api
metadata = _metadata_with_prefix(database.name)
txn_selector = self._make_txn_selector()
response = api.begin_transaction(
self._session.name, txn_selector.begin, metadata=metadata
)
with trace_call("CloudSpanner.BeginTransaction", self._session):
response = api.begin_transaction(
self._session.name, txn_selector.begin, metadata=metadata
)
self._transaction_id = response.id
return self._transaction_id

0 comments on commit 039ff4f

Please sign in to comment.