forked from googleapis/python-spanner-django
/
connection.py
328 lines (253 loc) · 10.7 KB
/
connection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
# Copyright 2020 Google LLC
#
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file or at
# https://developers.google.com/open-source/licenses/bsd
"""DB-API Connection for the Google Cloud Spanner."""
import warnings
from google.api_core.gapic_v1.client_info import ClientInfo
from google.cloud import spanner_v1 as spanner
from google.cloud.spanner_dbapi.checksum import _compare_checksums
from google.cloud.spanner_dbapi.checksum import ResultsChecksum
from google.cloud.spanner_dbapi.cursor import Cursor
from google.cloud.spanner_dbapi.exceptions import InterfaceError
from google.cloud.spanner_dbapi.version import DEFAULT_USER_AGENT
from google.cloud.spanner_dbapi.version import PY_VERSION
AUTOCOMMIT_MODE_WARNING = "This method is non-operational in autocommit mode"
class Connection:
"""Representation of a DB-API connection to a Cloud Spanner database.
You most likely don't need to instantiate `Connection` objects
directly, use the `connect` module function instead.
:type instance: :class:`~google.cloud.spanner_v1.instance.Instance`
:param instance: Cloud Spanner instance to connect to.
:type database: :class:`~google.cloud.spanner_v1.database.Database`
:param database: The database to which the connection is linked.
"""
def __init__(self, instance, database):
self._instance = instance
self._database = database
self._ddl_statements = []
self._transaction = None
self._session = None
# SQL statements, which were executed
# within the current transaction
self._statements = []
self.is_closed = False
self._autocommit = False
@property
def autocommit(self):
"""Autocommit mode flag for this connection.
:rtype: bool
:returns: Autocommit mode flag value.
"""
return self._autocommit
@autocommit.setter
def autocommit(self, value):
"""Change this connection autocommit mode. Setting this value to True
while a transaction is active will commit the current transaction.
:type value: bool
:param value: New autocommit mode state.
"""
if value and not self._autocommit:
self.commit()
self._autocommit = value
@property
def database(self):
"""Database to which this connection relates.
:rtype: :class:`~google.cloud.spanner_v1.database.Database`
:returns: The related database object.
"""
return self._database
@property
def instance(self):
"""Instance to which this connection relates.
:rtype: :class:`~google.cloud.spanner_v1.instance.Instance`
:returns: The related instance object.
"""
return self._instance
def _session_checkout(self):
"""Get a Cloud Spanner session from the pool.
If there is already a session associated with
this connection, it'll be used instead.
:rtype: :class:`google.cloud.spanner_v1.session.Session`
:returns: Cloud Spanner session object ready to use.
"""
if not self._session:
self._session = self.database._pool.get()
return self._session
def _release_session(self):
"""Release the currently used Spanner session.
The session will be returned into the sessions pool.
"""
self.database._pool.put(self._session)
self._session = None
def retry_transaction(self):
"""Retry the aborted transaction.
All the statements executed in the transaction
will be re-executed. Results checksums of the original
statements and the retried ones will be compared.
:raises: :class:`google.api_core.exceptions.Aborted`
If results checksum of the retried statement is
not equal to the checksum of the original one.
"""
for statement in self._statements:
res_iter, retried_checksum = self.run_statement(
statement, retried=True
)
# executing all the completed statements
if statement != self._statements[-1]:
for res in res_iter:
retried_checksum.consume_result(res)
_compare_checksums(statement["checksum"], retried_checksum)
# executing the failed statement
else:
# streaming up to the failed result
while len(retried_checksum) < len(statement["checksum"]):
try:
res = next(iter(res_iter))
retried_checksum.consume_result(res)
except StopIteration:
break
_compare_checksums(statement["checksum"], retried_checksum)
def transaction_checkout(self):
"""Get a Cloud Spanner transaction.
Begin a new transaction, if there is no transaction in
this connection yet. Return the begun one otherwise.
The method is non operational in autocommit mode.
:rtype: :class:`google.cloud.spanner_v1.transaction.Transaction`
:returns: A Cloud Spanner transaction object, ready to use.
"""
if not self.autocommit:
if (
not self._transaction
or self._transaction.committed
or self._transaction.rolled_back
):
self._transaction = self._session_checkout().transaction()
self._transaction.begin()
return self._transaction
def _raise_if_closed(self):
"""Helper to check the connection state before running a query.
Raises an exception if this connection is closed.
:raises: :class:`InterfaceError`: if this connection is closed.
"""
if self.is_closed:
raise InterfaceError("connection is already closed")
def close(self):
"""Closes this connection.
The connection will be unusable from this point forward. If the
connection has an active transaction, it will be rolled back.
"""
if (
self._transaction
and not self._transaction.committed
and not self._transaction.rolled_back
):
self._transaction.rollback()
self.is_closed = True
def commit(self):
"""Commits any pending transaction to the database.
This method is non-operational in autocommit mode.
"""
if self._autocommit:
warnings.warn(AUTOCOMMIT_MODE_WARNING, UserWarning, stacklevel=2)
elif self._transaction:
self._transaction.commit()
self._release_session()
self._statements = []
def rollback(self):
"""Rolls back any pending transaction.
This is a no-op if there is no active transaction or if the connection
is in autocommit mode.
"""
if self._autocommit:
warnings.warn(AUTOCOMMIT_MODE_WARNING, UserWarning, stacklevel=2)
elif self._transaction:
self._transaction.rollback()
self._release_session()
self._statements = []
def cursor(self):
"""Factory to create a DB-API Cursor."""
self._raise_if_closed()
return Cursor(self)
def run_prior_DDL_statements(self):
self._raise_if_closed()
if self._ddl_statements:
ddl_statements = self._ddl_statements
self._ddl_statements = []
return self.database.update_ddl(ddl_statements).result()
def run_statement(self, statement, retried=False):
"""Run single SQL statement in begun transaction.
This method is never used in autocommit mode. In
!autocommit mode however it remembers every executed
SQL statement with its parameters.
:type statement: :class:`dict`
:param statement: SQL statement to execute.
:rtype: :class:`google.cloud.spanner_v1.streamed.StreamedResultSet`,
:class:`google.cloud.spanner_dbapi.checksum.ResultsChecksum`
:returns: Streamed result set of the statement and a
checksum of this statement results.
"""
transaction = self.transaction_checkout()
self._statements.append(statement)
return (
transaction.execute_sql(
statement["sql"],
statement["params"],
param_types=statement["param_types"],
),
ResultsChecksum() if retried else statement["checksum"],
)
def __enter__(self):
return self
def __exit__(self, etype, value, traceback):
self.commit()
self.close()
def connect(
instance_id,
database_id,
project=None,
credentials=None,
pool=None,
user_agent=None,
):
"""Creates a connection to a Google Cloud Spanner database.
:type instance_id: str
:param instance_id: The ID of the instance to connect to.
:type database_id: str
:param database_id: The ID of the database to connect to.
:type project: str
:param project: (Optional) The ID of the project which owns the
instances, tables and data. If not provided, will
attempt to determine from the environment.
:type credentials: :class:`~google.auth.credentials.Credentials`
:param credentials: (Optional) The authorization credentials to attach to
requests. These credentials identify this application
to the service. If none are specified, the client will
attempt to ascertain the credentials from the
environment.
:type pool: Concrete subclass of
:class:`~google.cloud.spanner_v1.pool.AbstractSessionPool`.
:param pool: (Optional). Session pool to be used by database.
:type user_agent: str
:param user_agent: (Optional) User agent to be used with this connection's
requests.
:rtype: :class:`google.cloud.spanner_dbapi.connection.Connection`
:returns: Connection object associated with the given Google Cloud Spanner
resource.
:raises: :class:`ValueError` in case of given instance/database
doesn't exist.
"""
client_info = ClientInfo(
user_agent=user_agent or DEFAULT_USER_AGENT, python_version=PY_VERSION
)
client = spanner.Client(
project=project, credentials=credentials, client_info=client_info
)
instance = client.instance(instance_id)
if not instance.exists():
raise ValueError("instance '%s' does not exist." % instance_id)
database = instance.database(database_id, pool=pool)
if not database.exists():
raise ValueError("database '%s' does not exist." % database_id)
return Connection(instance, database)