Skip to content

Commit

Permalink
Merge pull request #1318 from opendatacube/transaction_api
Browse files Browse the repository at this point in the history
Database Transaction API - as per EP07.
  • Loading branch information
SpacemanPaul committed Sep 29, 2022
2 parents 5f1af9b + 6701dad commit 324481e
Show file tree
Hide file tree
Showing 32 changed files with 849 additions and 315 deletions.
3 changes: 2 additions & 1 deletion datacube/drivers/postgis/__init__.py
Expand Up @@ -9,5 +9,6 @@
"""

from ._connections import PostGisDb
from ._api import PostgisDbAPI

__all__ = ['PostGisDb']
__all__ = ['PostGisDb', 'PostgisDbAPI']
6 changes: 6 additions & 0 deletions datacube/drivers/postgis/_api.py
Expand Up @@ -182,6 +182,12 @@ def __init__(self, parentdb, connection):
def in_transaction(self):
return self._connection.in_transaction()

def begin(self):
self._connection.execute(text('BEGIN'))

def commit(self):
self._connection.execute(text('COMMIT'))

def rollback(self):
self._connection.execute(text('ROLLBACK'))

Expand Down
29 changes: 3 additions & 26 deletions datacube/drivers/postgis/_connections.py
Expand Up @@ -19,7 +19,7 @@
from contextlib import contextmanager
from typing import Any, Callable, Iterable, Mapping, Optional, Union, Type

from sqlalchemy import event, create_engine, text
from sqlalchemy import event, create_engine
from sqlalchemy.engine import Engine
from sqlalchemy.engine.url import URL as EngineUrl # noqa: N811

Expand Down Expand Up @@ -245,7 +245,7 @@ def spatial_indexes(self, refresh=False) -> Iterable[CRS]:
return list(self.spindexes.keys())

@contextmanager
def connect(self):
def _connect(self):
"""
Borrow a connection from the pool.
Expand All @@ -258,35 +258,12 @@ def connect(self):
The connection can raise errors if not following this advice ("server closed the connection unexpectedly"),
as some servers will aggressively close idle connections (eg. DEA's NCI servers). It also prevents the
connection from being reused while borrowed.
"""
with self._engine.connect() as connection:
yield _api.PostgisDbAPI(self, connection)
connection.close()

@contextmanager
def begin(self):
"""
Start a transaction.
Returns an instance that will maintain a single connection in a transaction.
Call commit() or rollback() to complete the transaction or use a context manager:
with db.begin() as trans:
trans.insert_dataset(...)
(Don't share an instance between threads)
:rtype: PostgresDBAPI
Low level context manager, use <index_resource>._db_connection instead
"""
with self._engine.connect() as connection:
connection.execute(text('BEGIN'))
try:
yield _api.PostgisDbAPI(self, connection)
connection.execute(text('COMMIT'))
except Exception: # pylint: disable=broad-except
connection.execute(text('ROLLBACK'))
raise
finally:
connection.close()

Expand Down
3 changes: 2 additions & 1 deletion datacube/drivers/postgres/__init__.py
Expand Up @@ -9,5 +9,6 @@
"""

from ._connections import PostgresDb
from ._api import PostgresDbAPI

__all__ = ['PostgresDb']
__all__ = ['PostgresDb', 'PostgresDbAPI']
6 changes: 6 additions & 0 deletions datacube/drivers/postgres/_api.py
Expand Up @@ -182,9 +182,15 @@ def __init__(self, connection):
def in_transaction(self):
return self._connection.in_transaction()

def begin(self):
self._connection.execute(text('BEGIN'))

def rollback(self):
self._connection.execute(text('ROLLBACK'))

def commit(self):
self._connection.execute(text('COMMIT'))

def execute(self, command):
return self._connection.execute(command)

Expand Down
29 changes: 3 additions & 26 deletions datacube/drivers/postgres/_connections.py
Expand Up @@ -19,7 +19,7 @@
from contextlib import contextmanager
from typing import Callable, Optional, Union

from sqlalchemy import event, create_engine, text
from sqlalchemy import event, create_engine
from sqlalchemy.engine import Engine
from sqlalchemy.engine.url import URL as EngineUrl # noqa: N811

Expand Down Expand Up @@ -207,7 +207,7 @@ def init(self, with_permissions=True):
return is_new

@contextmanager
def connect(self):
def _connect(self):
"""
Borrow a connection from the pool.
Expand All @@ -220,35 +220,12 @@ def connect(self):
The connection can raise errors if not following this advice ("server closed the connection unexpectedly"),
as some servers will aggressively close idle connections (eg. DEA's NCI servers). It also prevents the
connection from being reused while borrowed.
"""
with self._engine.connect() as connection:
yield _api.PostgresDbAPI(connection)
connection.close()

@contextmanager
def begin(self):
"""
Start a transaction.
Returns an instance that will maintain a single connection in a transaction.
Call commit() or rollback() to complete the transaction or use a context manager:
with db.begin() as trans:
trans.insert_dataset(...)
(Don't share an instance between threads)
:rtype: PostgresDBAPI
Low level context manager, use <index_resource>._db_connection instead
"""
with self._engine.connect() as connection:
connection.execute(text('BEGIN'))
try:
yield _api.PostgresDbAPI(connection)
connection.execute(text('COMMIT'))
except Exception: # pylint: disable=broad-except
connection.execute(text('ROLLBACK'))
raise
finally:
connection.close()

Expand Down

0 comments on commit 324481e

Please sign in to comment.