Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow borg with-lock children to grab locks on same repository. #4120

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
36 changes: 25 additions & 11 deletions src/borg/archiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -1530,18 +1530,24 @@ def do_with_lock(self, args, repository):
# we can only do this for local repositories (with .io), though:
if hasattr(repository, 'io'):
repository.io.close_segment()
# we need to commit the "no change" operation we did to the manifest
# because it created a new segment file in the repository. if we would
# roll back, the same file would be later used otherwise (for other content).
# that would be bad if somebody uses rsync with ignore-existing (or
# any other mechanism relying on existing segment data not changing).
# see issue #1867.
repository.commit(compact=False)
if args.shared_lock:
try:
repository.lock.downgrade()
except AttributeError:
# this is only for local repositories
self.print_error('Running borg with-lock using shared locks is supported only for local repositories.')
return self.exit_code
env = prepare_subprocess_env(system=True)
try:
# we exit with the return code we get from the subprocess
return subprocess.call([args.command] + args.args, env=env)
finally:
# we need to commit the "no change" operation we did to the manifest
# because it created a new segment file in the repository. if we would
# roll back, the same file would be later used otherwise (for other content).
# that would be bad if somebody uses rsync with ignore-existing (or
# any other mechanism relying on existing segment data not changing).
# see issue #1867.
repository.commit(compact=False)
os.setpgid(0, 0)
# we exit with the return code we get from the subprocess
return subprocess.call([args.command] + args.args, env=env)

@with_repository(manifest=False, exclusive=True)
def do_compact(self, args, repository):
Expand Down Expand Up @@ -3788,6 +3794,12 @@ def define_archive_filters_group(subparser, *, sort_by=True, first_last=True):
for its termination, release the lock and return the user command's return
code as borg's return code.

It is possible to relax the lock with --shared-lock, so that you can run borg commands
within borg with-lock. This is only supported for local repositories, as the locking
protocol cannot account for concurrent access between two or more hosts.
This is useful if you want to check a repository state before running some maintenance
operations without race conditions or Time Of Check/Time Of Use problems.

.. note::

If you copy a repository with the lock held, the lock will be present in
Expand All @@ -3801,6 +3813,8 @@ def define_archive_filters_group(subparser, *, sort_by=True, first_last=True):
formatter_class=argparse.RawDescriptionHelpFormatter,
help='run user command with lock held')
subparser.set_defaults(func=self.do_with_lock)
subparser.add_argument('--shared-lock', dest='shared_lock', action='store_true',
help='use a shared lock rather than an exclusive one')
subparser.add_argument('location', metavar='REPOSITORY',
type=location_validator(archive=False),
help='repository to lock')
Expand Down
11 changes: 8 additions & 3 deletions src/borg/locking.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,12 +320,13 @@ class Lock:
This makes sure the lock is released again if the block is left, no
matter how (e.g. if an exception occurred).
"""
def __init__(self, path, exclusive=False, sleep=None, timeout=None, id=None, kill_stale_locks=False):
def __init__(self, path, exclusive=False, sleep=None, timeout=None, id=None, kill_stale_locks=False, pgid=None):
self.path = path
self.is_exclusive = exclusive
self.sleep = sleep
self.timeout = timeout
self.id = id or platform.get_process_id()
self.pgid = pgid or id or platform.get_process_group()
# globally keeping track of shared and exclusive lockers:
self._roster = LockRoster(path + '.roster', id=id, kill_stale_locks=kill_stale_locks)
# an exclusive lock, used for:
Expand Down Expand Up @@ -364,8 +365,12 @@ def _wait_for_readers_finishing(self, remove, sleep):
try:
if remove is not None:
self._roster.modify(remove, REMOVE)
if len(self._roster.get(SHARED)) == 0:
return # we are the only one and we keep the lock!
# We keep the lock if we are the only one, or if it is held only by a reader
# whose pid equals our pgid.
# A process can only set its own and its children pgids, and only to the target's pid
# or a process' pgid from the same session (see setpgid(2))
if self._roster.get(SHARED).issubset((self.pgid,)):
return # we are the only one, or we are in the shared group, and we keep the lock!
# restore the roster state as before (undo the roster change):
if remove is not None:
self._roster.modify(remove, ADD)
Expand Down
4 changes: 2 additions & 2 deletions src/borg/platform/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
from .base import set_flags, get_flags
from .base import SaveFile, SyncFile, sync_dir, fdatasync, safe_fadvise
from .base import swidth, API_VERSION
from .base import process_alive, get_process_id, local_pid_alive, fqdn, hostname, hostid
from .base import process_alive, get_process_id, get_process_group, local_pid_alive, fqdn, hostname, hostid

OS_API_VERSION = API_VERSION

if not sys.platform.startswith(('win32', )):
from .posix import process_alive, local_pid_alive
from .posix import process_alive, local_pid_alive, get_process_group
# posix swidth implementation works for: linux, freebsd, darwin, openindiana, cygwin
from .posix import swidth

Expand Down
7 changes: 7 additions & 0 deletions src/borg/platform/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,13 @@ def get_process_id():
return hostid, pid, thread_id


def get_process_group():
"""
Return group tuple (hostname, pgid, thread_id) for 'us'.
"""
raise NotImplementedError


def process_alive(host, pid, thread):
"""
Check if the (host, pid, thread_id) combination corresponds to a potentially alive process.
Expand Down
12 changes: 12 additions & 0 deletions src/borg/platform/posix.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,15 @@ def local_pid_alive(pid):
return False
# Any other error (eg. permissions) means that the process ID refers to a live process.
return True


def get_process_group():
"""
Return group id (hostname, pgid, thread_id) from identification tuple for 'us'.
This always returns the current pgid, which might be different from before, e.g. if os.setpgid() was used.
"""
from . import hostid

thread_id = 0
pgid = os.getpgid(0)
return hostid, pgid, thread_id
18 changes: 16 additions & 2 deletions src/borg/testsuite/locking.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,30 @@ def test_exclusive(self, lockpath):
assert len(lock._roster.get(EXCLUSIVE)) == 1
assert not lock._roster.empty(SHARED, EXCLUSIVE)

def test_exclusive_pgid_shared(self, lockpath):
lock1 = Lock(lockpath, exclusive=False, id=ID1).acquire()
with Lock(lockpath, exclusive=True, id=ID2, timeout=1, pgid=ID1):
assert len(lock1._roster.get(SHARED)) == 1
assert len(lock1._roster.get(EXCLUSIVE)) == 1
lock1.release()

def test_exclusive_pgid_distinct(self, lockpath):
lock1 = Lock(lockpath, exclusive=False, id=ID1).acquire()
with pytest.raises(LockTimeout):
with Lock(lockpath, exclusive=True, id=ID2, timeout=1):
pass
lock1.release()

def test_upgrade(self, lockpath):
with Lock(lockpath, exclusive=False) as lock:
with Lock(lockpath, exclusive=False, pgid=ID1) as lock:
lock.upgrade()
lock.upgrade() # NOP
assert len(lock._roster.get(SHARED)) == 0
assert len(lock._roster.get(EXCLUSIVE)) == 1
assert not lock._roster.empty(SHARED, EXCLUSIVE)

def test_downgrade(self, lockpath):
with Lock(lockpath, exclusive=True) as lock:
with Lock(lockpath, exclusive=True, pgid=ID1) as lock:
lock.downgrade()
lock.downgrade() # NOP
assert len(lock._roster.get(SHARED)) == 1
Expand Down