/
fauxdbi.py
98 lines (75 loc) · 2.74 KB
/
fauxdbi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import google.api_core.exceptions
import google.cloud.bigquery.schema
import google.cloud.bigquery.table
import contextlib
import sqlite3
class Connection:
connection = None
def __init__(self, client=None, bqstorage_client=None):
# share a single connection:
if self.connection is None:
self.__class__.connection = sqlite3.connect(":memory:")
self._client = FauxClient(client, self.connection)
def cursor(self):
return Cursor(self.connection)
def commit(self):
pass
def rollback(self):
pass
def close(self):
self.connection.close()
class Cursor:
arraysize = 1
def __init__(self, connection):
self.connection = connection
self.cursor = connection.cursor()
def execute(self, operation, parameters=None):
if parameters:
parameters = {
name: "null" if value is None else repr(value)
for name, value in parameters.items()
}
operation %= parameters
self.cursor.execute(operation, parameters)
self.description = self.cursor.description
self.rowcount = self.cursor.rowcount
def executemany(self, operation, parameters_list):
for parameters in parameters_list:
self.execute(operation, parameters)
def close(self):
self.cursor.close()
def fetchone(self):
return self.cursor.fetchone()
def fetchmany(self, size=None):
self.cursor.fetchmany(size or self.arraysize)
def fetchall(self):
return self.cursor.fetchall()
def setinputsizes(self, sizes):
pass
def setoutputsize(self, size, column=None):
pass
class FauxClient:
def __init__(self, client, connection):
self._client = client
self.project = client.project
self.connection = connection
def get_table(self, table_ref):
table_name = table_ref.table_id
with contextlib.closing(self.connection.cursor()) as cursor:
cursor.execute(
f"select name from sqlite_master"
f" where type='table' and name='{table_name}'"
)
if list(cursor):
cursor.execute("PRAGMA table_info('{table_name}')")
schema = [
google.cloud.bigquery.schema.SchemaField(
name=name,
field_type=type_,
mode="REQUIRED" if notnull else "NULLABLE",
)
for cid, name, type_, notnull, dflt_value, pk in cursor
]
return google.cloud.bigquery.table.Table(table_ref, schema)
else:
raise google.api_core.exceptions.NotFound(table_ref)