Skip to content

Commit

Permalink
fix: update to support the open-telemetry status code spec change (#358)
Browse files Browse the repository at this point in the history
* fix: for opentelemetry status code spec change

* fix: corrected open telemetry tests to work with latest open telemetry specs

* fix: correct open telemetry tests status code

* fix: open telemetry schema related changes and fixes for tests to work with in memory exporter

* fix: variable name correction for ot_exporter

* fix: correct variable name from memeory_exporter to ot_exporter

* fix: remove patch for opentelemetry.util.time with _constant_time as it was not used

* refactor: correct opentelemetry.util.time to opentelemetry.util._time

* ci: update packages for open telemetry

* refactor: increased version of open telemetry as per new specs

* fix: changed opentelemetry dependency version

* updated constraints file with opentelemetry-instrumentation >= 0.20b0

* fix: added ot_exporter clear call after reload to clear out the exporter memeory

* fix: removed repeated constraints for different versions of python
  • Loading branch information
vi3k6i5 committed Jun 11, 2021
1 parent 6598dea commit 0f894f1
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 71 deletions.
13 changes: 4 additions & 9 deletions google/cloud/spanner_v1/_opentelemetry_tracing.py
Expand Up @@ -21,8 +21,7 @@

try:
from opentelemetry import trace
from opentelemetry.trace.status import Status, StatusCanonicalCode
from opentelemetry.instrumentation.utils import http_status_to_canonical_code
from opentelemetry.trace.status import Status, StatusCode

HAS_OPENTELEMETRY_INSTALLED = True
except ImportError:
Expand Down Expand Up @@ -53,13 +52,9 @@ def trace_call(name, session, extra_attributes=None):
name, kind=trace.SpanKind.CLIENT, attributes=attributes
) as span:
try:
span.set_status(Status(StatusCode.OK))
yield span
except GoogleAPICallError as error:
if error.code is not None:
span.set_status(Status(http_status_to_canonical_code(error.code)))
elif error.grpc_status_code is not None:
span.set_status(
# OpenTelemetry's StatusCanonicalCode maps 1-1 with grpc status codes
Status(StatusCanonicalCode(error.grpc_status_code.value[0]))
)
span.set_status(Status(StatusCode.ERROR))
span.record_exception(error)
raise
6 changes: 3 additions & 3 deletions setup.py
Expand Up @@ -37,9 +37,9 @@
]
extras = {
"tracing": [
"opentelemetry-api >= 0.11b0",
"opentelemetry-sdk >= 0.11b0",
"opentelemetry-instrumentation >= 0.11b0",
"opentelemetry-api >= 1.1.0",
"opentelemetry-sdk >= 1.1.0",
"opentelemetry-instrumentation >= 0.20b0",
],
"libcst": "libcst >= 0.2.5",
}
Expand Down
6 changes: 3 additions & 3 deletions testing/constraints-3.6.txt
Expand Up @@ -11,6 +11,6 @@ grpc-google-iam-v1==0.12.3
libcst==0.2.5
proto-plus==1.13.0
sqlparse==0.3.0
opentelemetry-api==0.11b0
opentelemetry-sdk==0.11b0
opentelemetry-instrumentation==0.11b0
opentelemetry-api==1.1.0
opentelemetry-sdk==1.1.0
opentelemetry-instrumentation==0.20b0
57 changes: 40 additions & 17 deletions tests/_helpers.py
Expand Up @@ -2,49 +2,72 @@
import mock

try:
from opentelemetry import trace as trace_api
from opentelemetry.trace.status import StatusCanonicalCode

from opentelemetry.sdk.trace import TracerProvider, export
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import (
InMemorySpanExporter,
)
from opentelemetry.trace.status import StatusCode

trace.set_tracer_provider(TracerProvider())

HAS_OPENTELEMETRY_INSTALLED = True
except ImportError:
HAS_OPENTELEMETRY_INSTALLED = False

StatusCanonicalCode = mock.Mock()
StatusCode = mock.Mock()

_TEST_OT_EXPORTER = None
_TEST_OT_PROVIDER_INITIALIZED = False


def get_test_ot_exporter():
global _TEST_OT_EXPORTER

if _TEST_OT_EXPORTER is None:
_TEST_OT_EXPORTER = InMemorySpanExporter()
return _TEST_OT_EXPORTER


def use_test_ot_exporter():
global _TEST_OT_PROVIDER_INITIALIZED

if _TEST_OT_PROVIDER_INITIALIZED:
return

provider = trace.get_tracer_provider()
if not hasattr(provider, "add_span_processor"):
return
provider.add_span_processor(SimpleSpanProcessor(get_test_ot_exporter()))
_TEST_OT_PROVIDER_INITIALIZED = True


class OpenTelemetryBase(unittest.TestCase):
def setUp(self):
@classmethod
def setUpClass(cls):
if HAS_OPENTELEMETRY_INSTALLED:
self.original_tracer_provider = trace_api.get_tracer_provider()
self.tracer_provider = TracerProvider()
self.memory_exporter = InMemorySpanExporter()
span_processor = export.SimpleExportSpanProcessor(self.memory_exporter)
self.tracer_provider.add_span_processor(span_processor)
trace_api.set_tracer_provider(self.tracer_provider)
use_test_ot_exporter()
cls.ot_exporter = get_test_ot_exporter()

def tearDown(self):
if HAS_OPENTELEMETRY_INSTALLED:
trace_api.set_tracer_provider(self.original_tracer_provider)
self.ot_exporter.clear()

def assertNoSpans(self):
if HAS_OPENTELEMETRY_INSTALLED:
span_list = self.memory_exporter.get_finished_spans()
span_list = self.ot_exporter.get_finished_spans()
self.assertEqual(len(span_list), 0)

def assertSpanAttributes(
self, name, status=StatusCanonicalCode.OK, attributes=None, span=None
self, name, status=StatusCode.OK, attributes=None, span=None
):
if HAS_OPENTELEMETRY_INSTALLED:
if not span:
span_list = self.memory_exporter.get_finished_spans()
span_list = self.ot_exporter.get_finished_spans()
self.assertEqual(len(span_list), 1)
span = span_list[0]

self.assertEqual(span.name, name)
self.assertEqual(span.status.canonical_code, status)
self.assertEqual(span.status.status_code, status)
self.assertEqual(dict(span.attributes), attributes)
13 changes: 10 additions & 3 deletions tests/system/test_system.py
Expand Up @@ -1165,6 +1165,8 @@ class TestSessionAPI(OpenTelemetryBase, _TestData):

@classmethod
def setUpClass(cls):
# Call SetUpClass from parent (OpenTelemetryBase)
super(TestSessionAPI, cls).setUpClass()
pool = BurstyPool(labels={"testcase": "session_api"})
ddl_statements = EMULATOR_DDL_STATEMENTS if USE_EMULATOR else DDL_STATEMENTS
cls._db = Config.INSTANCE.database(
Expand All @@ -1187,6 +1189,8 @@ def tearDown(self):
super(TestSessionAPI, self).tearDown()
for doomed in self.to_delete:
doomed.delete()
if HAS_OPENTELEMETRY_INSTALLED:
self.ot_exporter.clear() # Clear any ot spans from above step.

def test_session_crud(self):
retry_true = RetryResult(operator.truth)
Expand All @@ -1211,7 +1215,7 @@ def test_batch_insert_then_read(self):
self._check_rows_data(rows)

if HAS_OPENTELEMETRY_INSTALLED:
span_list = self.memory_exporter.get_finished_spans()
span_list = self.ot_exporter.get_finished_spans()
self.assertEqual(len(span_list), 4)
self.assertSpanAttributes(
"CloudSpanner.GetSession",
Expand Down Expand Up @@ -1355,7 +1359,7 @@ def test_transaction_read_and_insert_then_rollback(self):
self.assertEqual(rows, [])

if HAS_OPENTELEMETRY_INSTALLED:
span_list = self.memory_exporter.get_finished_spans()
span_list = self.ot_exporter.get_finished_spans()
self.assertEqual(len(span_list), 8)
self.assertSpanAttributes(
"CloudSpanner.CreateSession",
Expand Down Expand Up @@ -1736,6 +1740,9 @@ def test_transaction_batch_update_w_parent_span(self):
retry = RetryInstanceState(_has_all_ddl)
retry(self._db.reload)()

if HAS_OPENTELEMETRY_INSTALLED:
self.ot_exporter.clear() # Clear any ot spans from above steps.

session = self._db.session()
session.create()
self.to_delete.append(session)
Expand Down Expand Up @@ -1768,7 +1775,7 @@ def unit_of_work(transaction, self):
with tracer.start_as_current_span("Test Span"):
session.run_in_transaction(unit_of_work, self)

span_list = self.memory_exporter.get_finished_spans()
span_list = self.ot_exporter.get_finished_spans()
self.assertEqual(len(span_list), 6)
self.assertEqual(
list(map(lambda span: span.name, span_list)),
Expand Down
20 changes: 9 additions & 11 deletions tests/unit/test__opentelemetry_tracing.py
Expand Up @@ -5,7 +5,7 @@

try:
from opentelemetry import trace as trace_api
from opentelemetry.trace.status import StatusCanonicalCode
from opentelemetry.trace.status import StatusCode
except ImportError:
pass

Expand Down Expand Up @@ -69,13 +69,13 @@ def test_trace_call(self):

expected_attributes["after_setup_attribute"] = 1

span_list = self.memory_exporter.get_finished_spans()
span_list = self.ot_exporter.get_finished_spans()
self.assertEqual(len(span_list), 1)
span = span_list[0]
self.assertEqual(span.kind, trace_api.SpanKind.CLIENT)
self.assertEqual(span.attributes, expected_attributes)
self.assertEqual(span.name, "CloudSpanner.Test")
self.assertEqual(span.status.canonical_code, StatusCanonicalCode.OK)
self.assertEqual(span.status.status_code, StatusCode.OK)

def test_trace_error(self):
extra_attributes = {"db.instance": "database_name"}
Expand All @@ -95,15 +95,13 @@ def test_trace_error(self):

raise _make_rpc_error(InvalidArgument)

span_list = self.memory_exporter.get_finished_spans()
span_list = self.ot_exporter.get_finished_spans()
self.assertEqual(len(span_list), 1)
span = span_list[0]
self.assertEqual(span.kind, trace_api.SpanKind.CLIENT)
self.assertEqual(dict(span.attributes), expected_attributes)
self.assertEqual(span.name, "CloudSpanner.Test")
self.assertEqual(
span.status.canonical_code, StatusCanonicalCode.INVALID_ARGUMENT
)
self.assertEqual(span.status.status_code, StatusCode.ERROR)

def test_trace_grpc_error(self):
extra_attributes = {"db.instance": "database_name"}
Expand All @@ -123,10 +121,10 @@ def test_trace_grpc_error(self):

raise DataLoss("error")

span_list = self.memory_exporter.get_finished_spans()
span_list = self.ot_exporter.get_finished_spans()
self.assertEqual(len(span_list), 1)
span = span_list[0]
self.assertEqual(span.status.canonical_code, StatusCanonicalCode.DATA_LOSS)
self.assertEqual(span.status.status_code, StatusCode.ERROR)

def test_trace_codeless_error(self):
extra_attributes = {"db.instance": "database_name"}
Expand All @@ -144,7 +142,7 @@ def test_trace_codeless_error(self):
) as span:
raise GoogleAPICallError("error")

span_list = self.memory_exporter.get_finished_spans()
span_list = self.ot_exporter.get_finished_spans()
self.assertEqual(len(span_list), 1)
span = span_list[0]
self.assertEqual(span.status.canonical_code, StatusCanonicalCode.UNKNOWN)
self.assertEqual(span.status.status_code, StatusCode.ERROR)
4 changes: 2 additions & 2 deletions tests/unit/test_batch.py
Expand Up @@ -14,7 +14,7 @@


import unittest
from tests._helpers import OpenTelemetryBase, StatusCanonicalCode
from tests._helpers import OpenTelemetryBase, StatusCode

TABLE_NAME = "citizens"
COLUMNS = ["email", "first_name", "last_name", "age"]
Expand Down Expand Up @@ -207,7 +207,7 @@ def test_commit_grpc_error(self):

self.assertSpanAttributes(
"CloudSpanner.Commit",
status=StatusCanonicalCode.UNKNOWN,
status=StatusCode.ERROR,
attributes=dict(BASE_ATTRIBUTES, num_mutations=1),
)

Expand Down
14 changes: 7 additions & 7 deletions tests/unit/test_session.py
Expand Up @@ -17,7 +17,7 @@
import mock
from tests._helpers import (
OpenTelemetryBase,
StatusCanonicalCode,
StatusCode,
HAS_OPENTELEMETRY_INSTALLED,
)

Expand Down Expand Up @@ -192,7 +192,7 @@ def test_create_error(self):

self.assertSpanAttributes(
"CloudSpanner.CreateSession",
status=StatusCanonicalCode.UNKNOWN,
status=StatusCode.ERROR,
attributes=TestSession.BASE_ATTRIBUTES,
)

Expand Down Expand Up @@ -311,7 +311,7 @@ def test_exists_error(self):

self.assertSpanAttributes(
"CloudSpanner.GetSession",
status=StatusCanonicalCode.UNKNOWN,
status=StatusCode.ERROR,
attributes=TestSession.BASE_ATTRIBUTES,
)

Expand Down Expand Up @@ -427,7 +427,7 @@ def test_delete_miss(self):

self.assertSpanAttributes(
"CloudSpanner.DeleteSession",
status=StatusCanonicalCode.NOT_FOUND,
status=StatusCode.ERROR,
attributes=TestSession.BASE_ATTRIBUTES,
)

Expand All @@ -451,7 +451,7 @@ def test_delete_error(self):

self.assertSpanAttributes(
"CloudSpanner.DeleteSession",
status=StatusCanonicalCode.UNKNOWN,
status=StatusCode.ERROR,
attributes=TestSession.BASE_ATTRIBUTES,
)

Expand Down Expand Up @@ -1190,7 +1190,7 @@ def _time(_results=[1, 1.5]):

with mock.patch("time.time", _time):
if HAS_OPENTELEMETRY_INSTALLED:
with mock.patch("opentelemetry.util.time", _ConstantTime()):
with mock.patch("opentelemetry.util._time", _ConstantTime()):
with mock.patch("time.sleep") as sleep_mock:
with self.assertRaises(Aborted):
session.run_in_transaction(
Expand Down Expand Up @@ -1263,7 +1263,7 @@ def _time(_results=[1, 2, 4, 8]):

with mock.patch("time.time", _time):
if HAS_OPENTELEMETRY_INSTALLED:
with mock.patch("opentelemetry.util.time", _ConstantTime()):
with mock.patch("opentelemetry.util._time", _ConstantTime()):
with mock.patch("time.sleep") as sleep_mock:
with self.assertRaises(Aborted):
session.run_in_transaction(unit_of_work, timeout_secs=8)
Expand Down

0 comments on commit 0f894f1

Please sign in to comment.