Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove references to unused FileLockingConfig and PatchedFileLock #8737

Merged
merged 6 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 0 additions & 1 deletion packages/syft/src/syft/store/dict_document_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ class DictStoreConfig(StoreConfig):
The config used for store locking. Available options:
* NoLockingConfig: no locking, ideal for single-thread stores.
* ThreadingLockingConfig: threading-based locking, ideal for same-process in-memory stores.
* FileLockingConfig: file based locking, ideal for same-device different-processes/threads stores.
Defaults to ThreadingLockingConfig.
"""

Expand Down
1 change: 0 additions & 1 deletion packages/syft/src/syft/store/document_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,6 @@ class StoreConfig(SyftBaseObject):
The config used for store locking. Available options:
* NoLockingConfig: no locking, ideal for single-thread stores.
* ThreadingLockingConfig: threading-based locking, ideal for same-process in-memory stores.
* FileLockingConfig: file based locking, ideal for same-device different-processes/threads stores.
Defaults to NoLockingConfig.
"""

Expand Down
167 changes: 0 additions & 167 deletions packages/syft/src/syft/store/locks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# third party
from pydantic import BaseModel
from sherlock.lock import BaseLock
from sherlock.lock import FileLock

# relative
from ..serde.serializable import serializable
Expand Down Expand Up @@ -63,13 +62,6 @@ class ThreadingLockingConfig(LockingConfig):
pass


@serializable()
class FileLockingConfig(LockingConfig):
"""File locking policy"""

client_path: Path | None = None


class ThreadingLock(BaseLock):
"""
Threading-based Lock. Used to provide the same API as the rest of the locks.
Expand Down Expand Up @@ -135,159 +127,6 @@ def _renew(self) -> bool:
return True


class PatchedFileLock(FileLock):
"""
Implementation of lock with the file system as the backend for synchronization.
This version patches for the `FileLock._expiry_time` crash(https://github.com/py-sherlock/sherlock/issues/71)

`sherlock.FileLock` might not work as expected for Python threads.
It uses re-entrant OS locks, meaning that multiple Python threads could acquire the lock at the same time.
For different processes/OS threads, the file lock will work as expected.
We need to patch the lock to handle Python threads too.

"""

def __init__(self, *args: Any, **kwargs: Any) -> None:
self._lock_file_enabled = True
try:
super().__init__(*args, **kwargs)
except BaseException as e:
print(f"Failed to create a file lock = {e}. Using memory-lock only")
self._lock_file_enabled = False

self._lock_py_thread = ThreadingLock(*args, **kwargs)

def _expiry_time(self) -> str:
if self.expire is not None:
expiry_time = self._now() + datetime.timedelta(seconds=self.expire)
else:
expiry_time = datetime.datetime.max.replace(
tzinfo=datetime.timezone.utc
).astimezone(datetime.timezone.utc)
return expiry_time.isoformat()

def _thread_safe_cbk(self, cbk: Callable) -> bool:
# Acquire lock at Python level(if-needed)
locked = self._lock_py_thread._acquire()
if not locked:
return False

try:
result = cbk()
except BaseException as e:
print(e)
result = False

self._lock_py_thread._release()
return result

def _acquire(self) -> bool:
return self._thread_safe_cbk(self._acquire_file_lock)

def _release(self) -> bool:
res = self._thread_safe_cbk(self._release_file_lock)
return res

def _acquire_file_lock(self) -> bool:
if not self._lock_file_enabled:
return True

owner = str(uuid.uuid4())

# Acquire lock at OS level
with self._lock_file:
if self._data_file.exists():
for _retry in range(10):
try:
data = json.loads(self._data_file.read_text())
break
except BaseException:
time.sleep(0.1)
if _retry == 9:
pass

now = self._now()
has_expired = self._has_expired(data, now)
if owner != data["owner"]:
if not has_expired:
# Someone else holds the lock.
return False
else:
# Lock is available for us to take.
data = {"owner": owner, "expiry_time": self._expiry_time()}
else:
# Same owner so do not set or modify Lease.
return False
else:
data = {"owner": owner, "expiry_time": self._expiry_time()}

# Write new data back to file.
self._data_file.touch()
self._data_file.write_text(json.dumps(data))

# We succeeded in writing to the file so we now hold the lock.
self._owner: str | None = owner

return True

@property
def _locked(self) -> bool:
if self._lock_py_thread.locked():
return True

if not self._lock_file_enabled:
return False

if not self._data_file.exists():
# File doesn't exist so can't be locked.
return False

with self._lock_file:
data = None
for _retry in range(10):
try:
data = json.loads(self._data_file.read_text())
break
except BaseException:
time.sleep(0.1)

if data is None:
return False

if self._has_expired(data, self._now()):
# File exists but has expired.
return False

# Lease exists and has not expired.
return True

def _release_file_lock(self) -> None:
if not self._lock_file_enabled:
return

if self._owner is None:
return

if not self._data_file.exists():
return

with self._lock_file:
data = None
for _retry in range(10):
try:
data = json.loads(self._data_file.read_text())
break
except BaseException:
time.sleep(0.1)

if data is None:
return

if self._owner == data["owner"]:
self._data_file.unlink()
self._owner = None


class SyftLock(BaseLock):
"""
Syft Lock implementations.
Expand Down Expand Up @@ -320,12 +159,6 @@ def __init__(self, config: LockingConfig):
self.passthrough = True
elif isinstance(config, ThreadingLockingConfig):
self._lock = ThreadingLock(**base_params)
elif isinstance(config, FileLockingConfig):
client: Path | None = config.client_path
self._lock = PatchedFileLock(
**base_params,
client=client,
)
else:
raise ValueError("Unsupported config type")

Expand Down
1 change: 0 additions & 1 deletion packages/syft/src/syft/store/mongo_document_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -995,7 +995,6 @@ class MongoStoreConfig(StoreConfig):
The config used for store locking. Available options:
* NoLockingConfig: no locking, ideal for single-thread stores.
* ThreadingLockingConfig: threading-based locking, ideal for same-process in-memory stores.
* FileLockingConfig: file based locking, ideal for same-device different-processes/threads stores.
Defaults to NoLockingConfig.
"""

Expand Down
3 changes: 1 addition & 2 deletions packages/syft/src/syft/store/sqlite_document_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,7 @@ class SQLiteStoreConfig(StoreConfig):
The config used for store locking. Available options:
* NoLockingConfig: no locking, ideal for single-thread stores.
* ThreadingLockingConfig: threading-based locking, ideal for same-process in-memory stores.
* FileLockingConfig: file based locking, ideal for same-device different-processes/threads stores.
Defaults to FileLockingConfig.
Defaults to NoLockingConfig.
"""

client_config: SQLiteStoreClientConfig
Expand Down
56 changes: 0 additions & 56 deletions packages/syft/tests/syft/locks_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import pytest

# syft absolute
from syft.store.locks import FileLockingConfig
from syft.store.locks import LockingConfig
from syft.store.locks import NoLockingConfig
from syft.store.locks import SyftLock
Expand All @@ -35,18 +34,11 @@ def locks_threading_config(request):
yield ThreadingLockingConfig(**def_params)


@pytest.fixture(scope="function")
def locks_file_config():
def_params["lock_name"] = token_hex(8)
yield FileLockingConfig(**def_params)


@pytest.mark.parametrize(
"config",
[
pytest.lazy_fixture("locks_nop_config"),
pytest.lazy_fixture("locks_threading_config"),
pytest.lazy_fixture("locks_file_config"),
],
)
def test_sanity(config: LockingConfig):
Expand Down Expand Up @@ -80,7 +72,6 @@ def test_acquire_nop(config: LockingConfig):
"config",
[
pytest.lazy_fixture("locks_threading_config"),
pytest.lazy_fixture("locks_file_config"),
],
)
@pytest.mark.flaky(reruns=3, reruns_delay=3)
Expand All @@ -107,7 +98,6 @@ def test_acquire_release(config: LockingConfig):
"config",
[
pytest.lazy_fixture("locks_threading_config"),
pytest.lazy_fixture("locks_file_config"),
],
)
@pytest.mark.flaky(reruns=3, reruns_delay=3)
Expand All @@ -123,7 +113,6 @@ def test_acquire_release_with(config: LockingConfig):
"config",
[
pytest.lazy_fixture("locks_threading_config"),
pytest.lazy_fixture("locks_file_config"),
],
)
def test_acquire_expire(config: LockingConfig):
Expand All @@ -150,7 +139,6 @@ def test_acquire_expire(config: LockingConfig):
"config",
[
pytest.lazy_fixture("locks_threading_config"),
pytest.lazy_fixture("locks_file_config"),
],
)
@pytest.mark.flaky(reruns=3, reruns_delay=3)
Expand All @@ -173,7 +161,6 @@ def test_acquire_double_aqcuire_timeout_fail(config: LockingConfig):
"config",
[
pytest.lazy_fixture("locks_threading_config"),
pytest.lazy_fixture("locks_file_config"),
],
)
@pytest.mark.flaky(reruns=3, reruns_delay=3)
Expand All @@ -198,7 +185,6 @@ def test_acquire_double_aqcuire_timeout_ok(config: LockingConfig):
"config",
[
pytest.lazy_fixture("locks_threading_config"),
pytest.lazy_fixture("locks_file_config"),
],
)
@pytest.mark.flaky(reruns=3, reruns_delay=3)
Expand All @@ -223,7 +209,6 @@ def test_acquire_double_aqcuire_nonblocking(config: LockingConfig):
"config",
[
pytest.lazy_fixture("locks_threading_config"),
pytest.lazy_fixture("locks_file_config"),
],
)
@pytest.mark.flaky(reruns=3, reruns_delay=3)
Expand All @@ -249,7 +234,6 @@ def test_acquire_double_aqcuire_retry_interval(config: LockingConfig):
"config",
[
pytest.lazy_fixture("locks_threading_config"),
pytest.lazy_fixture("locks_file_config"),
],
)
@pytest.mark.flaky(reruns=3, reruns_delay=3)
Expand All @@ -266,7 +250,6 @@ def test_acquire_double_release(config: LockingConfig):
"config",
[
pytest.lazy_fixture("locks_threading_config"),
pytest.lazy_fixture("locks_file_config"),
],
)
@pytest.mark.flaky(reruns=3, reruns_delay=3)
Expand All @@ -288,7 +271,6 @@ def test_acquire_same_name_diff_namespace(config: LockingConfig):
"config",
[
pytest.lazy_fixture("locks_threading_config"),
pytest.lazy_fixture("locks_file_config"),
],
)
def test_locks_parallel_multithreading(config: LockingConfig) -> None:
Expand Down Expand Up @@ -341,41 +323,3 @@ def _kv_cbk(tid: int) -> None:
stored = int(f.read())

assert stored == thread_cnt * repeats


# @pytest.mark.skip(reason="Joblib is flaky")
# @pytest.mark.parametrize(
# "config",
# [
# pytest.lazy_fixture("locks_file_config"),
# ],
# )
# def test_parallel_joblib(
# config: LockingConfig,
# ) -> None:
# thread_cnt = 3
# repeats = 5

# temp_dir = Path(tempfile.TemporaryDirectory().name)
# temp_dir.mkdir(parents=True, exist_ok=True)
# temp_file = temp_dir / "dbg.txt"
# if temp_file.exists():
# temp_file.unlink()

# with open(temp_file, "w") as f:
# f.write("0")

# def _kv_cbk(tid: int) -> None:
# for _idx in range(repeats):
# with SyftLock(config):
# with open(temp_file) as f:
# prev = int(f.read())
# with open(temp_file, "w") as f:
# f.write(str(prev + 1))

# Parallel(n_jobs=thread_cnt)(delayed(_kv_cbk)(idx) for idx in range(thread_cnt))

# with open(temp_file) as f:
# stored = int(f.read())

# assert stored == thread_cnt * repeats