/
test__opentelemetry_tracing.py
148 lines (114 loc) · 5.46 KB
/
test__opentelemetry_tracing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import importlib
import mock
import unittest
import sys
try:
from opentelemetry import trace as trace_api
from opentelemetry.trace.status import StatusCode
except ImportError:
pass
from google.api_core.exceptions import GoogleAPICallError
from google.cloud.spanner_v1 import _opentelemetry_tracing
from tests._helpers import OpenTelemetryBase, HAS_OPENTELEMETRY_INSTALLED
def _make_rpc_error(error_cls, trailing_metadata=None):
import grpc
grpc_error = mock.create_autospec(grpc.Call, instance=True)
grpc_error.trailing_metadata.return_value = trailing_metadata
return error_cls("error", errors=(grpc_error,))
def _make_session():
from google.cloud.spanner_v1.session import Session
return mock.Mock(autospec=Session, instance=True)
# Skip all of these tests if we don't have OpenTelemetry
if HAS_OPENTELEMETRY_INSTALLED:
class TestNoTracing(unittest.TestCase):
def setUp(self):
self._temp_opentelemetry = sys.modules["opentelemetry"]
sys.modules["opentelemetry"] = None
importlib.reload(_opentelemetry_tracing)
def tearDown(self):
sys.modules["opentelemetry"] = self._temp_opentelemetry
importlib.reload(_opentelemetry_tracing)
def test_no_trace_call(self):
with _opentelemetry_tracing.trace_call("Test", _make_session()) as no_span:
self.assertIsNone(no_span)
class TestTracing(OpenTelemetryBase):
def test_trace_call(self):
extra_attributes = {
"attribute1": "value1",
# Since our database is mocked, we have to override the db.instance parameter so it is a string
"db.instance": "database_name",
}
expected_attributes = {
"db.type": "spanner",
"db.url": "spanner.googleapis.com",
"net.host.name": "spanner.googleapis.com",
}
expected_attributes.update(extra_attributes)
with _opentelemetry_tracing.trace_call(
"CloudSpanner.Test", _make_session(), extra_attributes
) as span:
span.set_attribute("after_setup_attribute", 1)
expected_attributes["after_setup_attribute"] = 1
span_list = self.ot_exporter.get_finished_spans()
self.assertEqual(len(span_list), 1)
span = span_list[0]
self.assertEqual(span.kind, trace_api.SpanKind.CLIENT)
self.assertEqual(span.attributes, expected_attributes)
self.assertEqual(span.name, "CloudSpanner.Test")
self.assertEqual(span.status.status_code, StatusCode.OK)
def test_trace_error(self):
extra_attributes = {"db.instance": "database_name"}
expected_attributes = {
"db.type": "spanner",
"db.url": "spanner.googleapis.com",
"net.host.name": "spanner.googleapis.com",
}
expected_attributes.update(extra_attributes)
with self.assertRaises(GoogleAPICallError):
with _opentelemetry_tracing.trace_call(
"CloudSpanner.Test", _make_session(), extra_attributes
) as span:
from google.api_core.exceptions import InvalidArgument
raise _make_rpc_error(InvalidArgument)
span_list = self.ot_exporter.get_finished_spans()
self.assertEqual(len(span_list), 1)
span = span_list[0]
self.assertEqual(span.kind, trace_api.SpanKind.CLIENT)
self.assertEqual(dict(span.attributes), expected_attributes)
self.assertEqual(span.name, "CloudSpanner.Test")
self.assertEqual(span.status.status_code, StatusCode.ERROR)
def test_trace_grpc_error(self):
extra_attributes = {"db.instance": "database_name"}
expected_attributes = {
"db.type": "spanner",
"db.url": "spanner.googleapis.com:443",
"net.host.name": "spanner.googleapis.com:443",
}
expected_attributes.update(extra_attributes)
with self.assertRaises(GoogleAPICallError):
with _opentelemetry_tracing.trace_call(
"CloudSpanner.Test", _make_session(), extra_attributes
) as span:
from google.api_core.exceptions import DataLoss
raise DataLoss("error")
span_list = self.ot_exporter.get_finished_spans()
self.assertEqual(len(span_list), 1)
span = span_list[0]
self.assertEqual(span.status.status_code, StatusCode.ERROR)
def test_trace_codeless_error(self):
extra_attributes = {"db.instance": "database_name"}
expected_attributes = {
"db.type": "spanner",
"db.url": "spanner.googleapis.com:443",
"net.host.name": "spanner.googleapis.com:443",
}
expected_attributes.update(extra_attributes)
with self.assertRaises(GoogleAPICallError):
with _opentelemetry_tracing.trace_call(
"CloudSpanner.Test", _make_session(), extra_attributes
) as span:
raise GoogleAPICallError("error")
span_list = self.ot_exporter.get_finished_spans()
self.assertEqual(len(span_list), 1)
span = span_list[0]
self.assertEqual(span.status.status_code, StatusCode.ERROR)