forked from googleapis/python-spanner-django
/
connection.py
262 lines (203 loc) · 7.91 KB
/
connection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
# Copyright 2020 Google LLC
#
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file or at
# https://developers.google.com/open-source/licenses/bsd
"""Cloud Spanner DB connection object."""
from collections import namedtuple
import warnings
from google.cloud import spanner_v1
from .cursor import Cursor
from .exceptions import InterfaceError
AUTOCOMMIT_MODE_WARNING = "This method is non-operational in autocommit mode"
ColumnDetails = namedtuple("column_details", ["null_ok", "spanner_type"])
class Connection:
"""Representation of a connection to a Cloud Spanner database.
You most likely don't need to instantiate `Connection` objects
directly, use the `connect` module function instead.
:type instance: :class:`~google.cloud.spanner_v1.instance.Instance`
:param instance: Cloud Spanner instance to connect to.
:type database: :class:`~google.cloud.spanner_v1.database.Database`
:param database: Cloud Spanner database to connect to.
"""
def __init__(self, instance, database):
self._instance = instance
self._database = database
self._ddl_statements = []
self._transaction = None
self._session = None
self.is_closed = False
self._autocommit = False
@property
def autocommit(self):
"""Autocommit mode flag for this connection.
:rtype: bool
:returns: Autocommit mode flag value.
"""
return self._autocommit
@autocommit.setter
def autocommit(self, value):
"""Change this connection autocommit mode.
:type value: bool
:param value: New autocommit mode state.
"""
if value and not self._autocommit:
self.commit()
self._autocommit = value
@property
def database(self):
"""Database to which this connection relates.
:rtype: :class:`~google.cloud.spanner_v1.database.Database`
:returns: The related database object.
"""
return self._database
@property
def instance(self):
"""Instance to which this connection relates.
:rtype: :class:`~google.cloud.spanner_v1.instance.Instance`
:returns: The related instance object.
"""
return self._instance
def _session_checkout(self):
"""Get a Cloud Spanner session from the pool.
If there is already a session associated with
this connection, it'll be used instead.
:rtype: :class:`google.cloud.spanner_v1.session.Session`
:returns: Cloud Spanner session object ready to use.
"""
if not self._session:
self._session = self.database._pool.get()
return self._session
def _release_session(self):
"""Release the currently used Spanner session.
The session will be returned into the sessions pool.
"""
self.database._pool.put(self._session)
self._session = None
def transaction_checkout(self):
"""Get a Cloud Spanner transaction.
Begin a new transaction, if there is no transaction in
this connection yet. Return the begun one otherwise.
The method is non operational in autocommit mode.
:rtype: :class:`google.cloud.spanner_v1.transaction.Transaction`
:returns: A Cloud Spanner transaction object, ready to use.
"""
if not self.autocommit:
if (
not self._transaction
or self._transaction.committed
or self._transaction.rolled_back
):
self._transaction = self._session_checkout().transaction()
self._transaction.begin()
return self._transaction
def cursor(self):
self._raise_if_closed()
return Cursor(self)
def _raise_if_closed(self):
"""Raise an exception if this connection is closed.
Helper to check the connection state before
running a SQL/DDL/DML query.
:raises: :class:`InterfaceError` if this connection is closed.
"""
if self.is_closed:
raise InterfaceError("connection is already closed")
def __handle_update_ddl(self, ddl_statements):
"""
Run the list of Data Definition Language (DDL) statements on the underlying
database. Each DDL statement MUST NOT contain a semicolon.
Args:
ddl_statements: a list of DDL statements, each without a semicolon.
Returns:
google.api_core.operation.Operation.result()
"""
self._raise_if_closed()
# Synchronously wait on the operation's completion.
return self.database.update_ddl(ddl_statements).result()
def read_snapshot(self):
self._raise_if_closed()
return self.database.snapshot()
def in_transaction(self, fn, *args, **kwargs):
self._raise_if_closed()
return self.database.run_in_transaction(fn, *args, **kwargs)
def append_ddl_statement(self, ddl_statement):
self._raise_if_closed()
self._ddl_statements.append(ddl_statement)
def run_prior_DDL_statements(self):
self._raise_if_closed()
if not self._ddl_statements:
return
ddl_statements = self._ddl_statements
self._ddl_statements = []
return self.__handle_update_ddl(ddl_statements)
def list_tables(self):
return self.run_sql_in_snapshot(
"""
SELECT
t.table_name
FROM
information_schema.tables AS t
WHERE
t.table_catalog = '' and t.table_schema = ''
"""
)
def run_sql_in_snapshot(self, sql, params=None, param_types=None):
# Some SQL e.g. for INFORMATION_SCHEMA cannot be run in read-write transactions
# hence this method exists to circumvent that limit.
self.run_prior_DDL_statements()
with self.database.snapshot() as snapshot:
res = snapshot.execute_sql(
sql, params=params, param_types=param_types
)
return list(res)
def get_table_column_schema(self, table_name):
rows = self.run_sql_in_snapshot(
"""SELECT
COLUMN_NAME, IS_NULLABLE, SPANNER_TYPE
FROM
INFORMATION_SCHEMA.COLUMNS
WHERE
TABLE_SCHEMA = ''
AND
TABLE_NAME = @table_name""",
params={"table_name": table_name},
param_types={"table_name": spanner_v1.param_types.STRING},
)
column_details = {}
for column_name, is_nullable, spanner_type in rows:
column_details[column_name] = ColumnDetails(
null_ok=is_nullable == "YES", spanner_type=spanner_type
)
return column_details
def close(self):
"""Close this connection.
The connection will be unusable from this point forward. If the
connection has an active transaction, it will be rolled back.
"""
if (
self._transaction
and not self._transaction.committed
and not self._transaction.rolled_back
):
self._transaction.rollback()
self.database._pool.clear()
self.is_closed = True
def commit(self):
"""Commit all the pending transactions."""
if self.autocommit:
warnings.warn(AUTOCOMMIT_MODE_WARNING, UserWarning, stacklevel=2)
elif self._transaction:
self._transaction.commit()
self._release_session()
def rollback(self):
"""Rollback all the pending transactions."""
if self.autocommit:
warnings.warn(AUTOCOMMIT_MODE_WARNING, UserWarning, stacklevel=2)
elif self._transaction:
self._transaction.rollback()
self._release_session()
def __enter__(self):
return self
def __exit__(self, etype, value, traceback):
self.commit()
self.close()