Skip to content

Commit

Permalink
Merge pull request #8737 from OpenMined/aziz/remove_file_lock_config
Browse files Browse the repository at this point in the history
remove references to unused `FileLockingConfig` and `PatchedFileLock`
  • Loading branch information
koenvanderveen committed Apr 29, 2024
2 parents 41f71da + 8318315 commit 2c8521d
Show file tree
Hide file tree
Showing 7 changed files with 1 addition and 254 deletions.
1 change: 0 additions & 1 deletion packages/syft/src/syft/store/dict_document_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ class DictStoreConfig(StoreConfig):
The config used for store locking. Available options:
* NoLockingConfig: no locking, ideal for single-thread stores.
* ThreadingLockingConfig: threading-based locking, ideal for same-process in-memory stores.
* FileLockingConfig: file based locking, ideal for same-device different-processes/threads stores.
Defaults to ThreadingLockingConfig.
"""

Expand Down
1 change: 0 additions & 1 deletion packages/syft/src/syft/store/document_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,6 @@ class StoreConfig(SyftBaseObject):
The config used for store locking. Available options:
* NoLockingConfig: no locking, ideal for single-thread stores.
* ThreadingLockingConfig: threading-based locking, ideal for same-process in-memory stores.
* FileLockingConfig: file based locking, ideal for same-device different-processes/threads stores.
Defaults to NoLockingConfig.
"""

Expand Down
167 changes: 0 additions & 167 deletions packages/syft/src/syft/store/locks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# third party
from pydantic import BaseModel
from sherlock.lock import BaseLock
from sherlock.lock import FileLock

# relative
from ..serde.serializable import serializable
Expand Down Expand Up @@ -63,13 +62,6 @@ class ThreadingLockingConfig(LockingConfig):
pass


@serializable()
class FileLockingConfig(LockingConfig):
"""File locking policy"""

client_path: Path | None = None


class ThreadingLock(BaseLock):
"""
Threading-based Lock. Used to provide the same API as the rest of the locks.
Expand Down Expand Up @@ -135,159 +127,6 @@ def _renew(self) -> bool:
return True


class PatchedFileLock(FileLock):
"""
Implementation of lock with the file system as the backend for synchronization.
This version patches for the `FileLock._expiry_time` crash(https://github.com/py-sherlock/sherlock/issues/71)
`sherlock.FileLock` might not work as expected for Python threads.
It uses re-entrant OS locks, meaning that multiple Python threads could acquire the lock at the same time.
For different processes/OS threads, the file lock will work as expected.
We need to patch the lock to handle Python threads too.
"""

def __init__(self, *args: Any, **kwargs: Any) -> None:
self._lock_file_enabled = True
try:
super().__init__(*args, **kwargs)
except BaseException as e:
print(f"Failed to create a file lock = {e}. Using memory-lock only")
self._lock_file_enabled = False

self._lock_py_thread = ThreadingLock(*args, **kwargs)

def _expiry_time(self) -> str:
if self.expire is not None:
expiry_time = self._now() + datetime.timedelta(seconds=self.expire)
else:
expiry_time = datetime.datetime.max.replace(
tzinfo=datetime.timezone.utc
).astimezone(datetime.timezone.utc)
return expiry_time.isoformat()

def _thread_safe_cbk(self, cbk: Callable) -> bool:
# Acquire lock at Python level(if-needed)
locked = self._lock_py_thread._acquire()
if not locked:
return False

try:
result = cbk()
except BaseException as e:
print(e)
result = False

self._lock_py_thread._release()
return result

def _acquire(self) -> bool:
return self._thread_safe_cbk(self._acquire_file_lock)

def _release(self) -> bool:
res = self._thread_safe_cbk(self._release_file_lock)
return res

def _acquire_file_lock(self) -> bool:
if not self._lock_file_enabled:
return True

owner = str(uuid.uuid4())

# Acquire lock at OS level
with self._lock_file:
if self._data_file.exists():
for _retry in range(10):
try:
data = json.loads(self._data_file.read_text())
break
except BaseException:
time.sleep(0.1)
if _retry == 9:
pass

now = self._now()
has_expired = self._has_expired(data, now)
if owner != data["owner"]:
if not has_expired:
# Someone else holds the lock.
return False
else:
# Lock is available for us to take.
data = {"owner": owner, "expiry_time": self._expiry_time()}
else:
# Same owner so do not set or modify Lease.
return False
else:
data = {"owner": owner, "expiry_time": self._expiry_time()}

# Write new data back to file.
self._data_file.touch()
self._data_file.write_text(json.dumps(data))

# We succeeded in writing to the file so we now hold the lock.
self._owner: str | None = owner

return True

@property
def _locked(self) -> bool:
if self._lock_py_thread.locked():
return True

if not self._lock_file_enabled:
return False

if not self._data_file.exists():
# File doesn't exist so can't be locked.
return False

with self._lock_file:
data = None
for _retry in range(10):
try:
data = json.loads(self._data_file.read_text())
break
except BaseException:
time.sleep(0.1)

if data is None:
return False

if self._has_expired(data, self._now()):
# File exists but has expired.
return False

# Lease exists and has not expired.
return True

def _release_file_lock(self) -> None:
if not self._lock_file_enabled:
return

if self._owner is None:
return

if not self._data_file.exists():
return

with self._lock_file:
data = None
for _retry in range(10):
try:
data = json.loads(self._data_file.read_text())
break
except BaseException:
time.sleep(0.1)

if data is None:
return

if self._owner == data["owner"]:
self._data_file.unlink()
self._owner = None


class SyftLock(BaseLock):
"""
Syft Lock implementations.
Expand Down Expand Up @@ -320,12 +159,6 @@ def __init__(self, config: LockingConfig):
self.passthrough = True
elif isinstance(config, ThreadingLockingConfig):
self._lock = ThreadingLock(**base_params)
elif isinstance(config, FileLockingConfig):
client: Path | None = config.client_path
self._lock = PatchedFileLock(
**base_params,
client=client,
)
else:
raise ValueError("Unsupported config type")

Expand Down
1 change: 0 additions & 1 deletion packages/syft/src/syft/store/mongo_document_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -995,7 +995,6 @@ class MongoStoreConfig(StoreConfig):
The config used for store locking. Available options:
* NoLockingConfig: no locking, ideal for single-thread stores.
* ThreadingLockingConfig: threading-based locking, ideal for same-process in-memory stores.
* FileLockingConfig: file based locking, ideal for same-device different-processes/threads stores.
Defaults to NoLockingConfig.
"""

Expand Down
3 changes: 1 addition & 2 deletions packages/syft/src/syft/store/sqlite_document_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,7 @@ class SQLiteStoreConfig(StoreConfig):
The config used for store locking. Available options:
* NoLockingConfig: no locking, ideal for single-thread stores.
* ThreadingLockingConfig: threading-based locking, ideal for same-process in-memory stores.
* FileLockingConfig: file based locking, ideal for same-device different-processes/threads stores.
Defaults to FileLockingConfig.
Defaults to NoLockingConfig.
"""

client_config: SQLiteStoreClientConfig
Expand Down
56 changes: 0 additions & 56 deletions packages/syft/tests/syft/locks_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import pytest

# syft absolute
from syft.store.locks import FileLockingConfig
from syft.store.locks import LockingConfig
from syft.store.locks import NoLockingConfig
from syft.store.locks import SyftLock
Expand All @@ -35,18 +34,11 @@ def locks_threading_config(request):
yield ThreadingLockingConfig(**def_params)


@pytest.fixture(scope="function")
def locks_file_config():
def_params["lock_name"] = token_hex(8)
yield FileLockingConfig(**def_params)


@pytest.mark.parametrize(
"config",
[
pytest.lazy_fixture("locks_nop_config"),
pytest.lazy_fixture("locks_threading_config"),
pytest.lazy_fixture("locks_file_config"),
],
)
def test_sanity(config: LockingConfig):
Expand Down Expand Up @@ -80,7 +72,6 @@ def test_acquire_nop(config: LockingConfig):
"config",
[
pytest.lazy_fixture("locks_threading_config"),
pytest.lazy_fixture("locks_file_config"),
],
)
@pytest.mark.flaky(reruns=3, reruns_delay=3)
Expand All @@ -107,7 +98,6 @@ def test_acquire_release(config: LockingConfig):
"config",
[
pytest.lazy_fixture("locks_threading_config"),
pytest.lazy_fixture("locks_file_config"),
],
)
@pytest.mark.flaky(reruns=3, reruns_delay=3)
Expand All @@ -123,7 +113,6 @@ def test_acquire_release_with(config: LockingConfig):
"config",
[
pytest.lazy_fixture("locks_threading_config"),
pytest.lazy_fixture("locks_file_config"),
],
)
def test_acquire_expire(config: LockingConfig):
Expand All @@ -150,7 +139,6 @@ def test_acquire_expire(config: LockingConfig):
"config",
[
pytest.lazy_fixture("locks_threading_config"),
pytest.lazy_fixture("locks_file_config"),
],
)
@pytest.mark.flaky(reruns=3, reruns_delay=3)
Expand All @@ -173,7 +161,6 @@ def test_acquire_double_aqcuire_timeout_fail(config: LockingConfig):
"config",
[
pytest.lazy_fixture("locks_threading_config"),
pytest.lazy_fixture("locks_file_config"),
],
)
@pytest.mark.flaky(reruns=3, reruns_delay=3)
Expand All @@ -198,7 +185,6 @@ def test_acquire_double_aqcuire_timeout_ok(config: LockingConfig):
"config",
[
pytest.lazy_fixture("locks_threading_config"),
pytest.lazy_fixture("locks_file_config"),
],
)
@pytest.mark.flaky(reruns=3, reruns_delay=3)
Expand All @@ -223,7 +209,6 @@ def test_acquire_double_aqcuire_nonblocking(config: LockingConfig):
"config",
[
pytest.lazy_fixture("locks_threading_config"),
pytest.lazy_fixture("locks_file_config"),
],
)
@pytest.mark.flaky(reruns=3, reruns_delay=3)
Expand All @@ -249,7 +234,6 @@ def test_acquire_double_aqcuire_retry_interval(config: LockingConfig):
"config",
[
pytest.lazy_fixture("locks_threading_config"),
pytest.lazy_fixture("locks_file_config"),
],
)
@pytest.mark.flaky(reruns=3, reruns_delay=3)
Expand All @@ -266,7 +250,6 @@ def test_acquire_double_release(config: LockingConfig):
"config",
[
pytest.lazy_fixture("locks_threading_config"),
pytest.lazy_fixture("locks_file_config"),
],
)
@pytest.mark.flaky(reruns=3, reruns_delay=3)
Expand All @@ -288,7 +271,6 @@ def test_acquire_same_name_diff_namespace(config: LockingConfig):
"config",
[
pytest.lazy_fixture("locks_threading_config"),
pytest.lazy_fixture("locks_file_config"),
],
)
def test_locks_parallel_multithreading(config: LockingConfig) -> None:
Expand Down Expand Up @@ -341,41 +323,3 @@ def _kv_cbk(tid: int) -> None:
stored = int(f.read())

assert stored == thread_cnt * repeats


# @pytest.mark.skip(reason="Joblib is flaky")
# @pytest.mark.parametrize(
# "config",
# [
# pytest.lazy_fixture("locks_file_config"),
# ],
# )
# def test_parallel_joblib(
# config: LockingConfig,
# ) -> None:
# thread_cnt = 3
# repeats = 5

# temp_dir = Path(tempfile.TemporaryDirectory().name)
# temp_dir.mkdir(parents=True, exist_ok=True)
# temp_file = temp_dir / "dbg.txt"
# if temp_file.exists():
# temp_file.unlink()

# with open(temp_file, "w") as f:
# f.write("0")

# def _kv_cbk(tid: int) -> None:
# for _idx in range(repeats):
# with SyftLock(config):
# with open(temp_file) as f:
# prev = int(f.read())
# with open(temp_file, "w") as f:
# f.write(str(prev + 1))

# Parallel(n_jobs=thread_cnt)(delayed(_kv_cbk)(idx) for idx in range(thread_cnt))

# with open(temp_file) as f:
# stored = int(f.read())

# assert stored == thread_cnt * repeats

0 comments on commit 2c8521d

Please sign in to comment.