Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add proof of concept for pull mechanism #6270

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/borg/archiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,11 @@ def wrapper(self, args, **kwargs):
make_parent_dirs = getattr(args, 'make_parent_dirs', False)
if argument(args, fake) ^ invert_fake:
return method(self, args, repository=None, **kwargs)
elif location.proto == 'ssh':

elif location.proto == 'ssh' or location.proto == 'serve':
repository = RemoteRepository(location.omit_archive(), create=create, exclusive=argument(args, exclusive),
lock_wait=self.lock_wait, lock=lock, append_only=append_only,
make_parent_dirs=make_parent_dirs, args=args)
make_parent_dirs=make_parent_dirs, args=args, serve=(location.proto == 'serve'))
else:
repository = Repository(location.path, create=create, exclusive=argument(args, exclusive),
lock_wait=self.lock_wait, lock=lock, append_only=append_only,
Expand Down Expand Up @@ -269,6 +270,7 @@ def do_serve(self, args):
restrict_to_repositories=args.restrict_to_repositories,
append_only=args.append_only,
storage_quota=args.storage_quota,
pull_command=args.pull_command
).serve()
return EXIT_SUCCESS

Expand Down Expand Up @@ -4630,6 +4632,9 @@ def define_borg_mount(parser):
'When a new repository is initialized, sets the storage quota on the new '
'repository as well. Default: no quota.')

subparser.add_argument('--pull-command', metavar='cmd', dest='pull_command',
help='command to use for pulling from a borg server started in serve:// mode')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't that confusing?

the source of the data is always where the borg client runs and the destination of the data is where borg serve(r) runs (which feeds the data into the repo).

so the data is either pushed from client to server (what we ever did) or pulled from the client to the server.

but we never are "pulling from a borg server" (I guess you meant that the pulling action is executed on the server, but that is not really clear).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same applies for the PR text (which in the end needs to be refactored and moved into the documentation).

isn't is simpler/less confusing if we just define borg server == where "borg serve" runs (and usually where repo is located) and borg client == where the to-be-backed-up files are located == where the stuff is encrypted/compressed/etc.?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the PR top-post text is confusing the hell out of me. is it just me? please re-read / re-write.


# borg umount
umount_epilog = process_epilog("""
This command un-mounts a FUSE filesystem that was mounted with ``borg mount``.
Expand Down
10 changes: 10 additions & 0 deletions src/borg/helpers/parseformat.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,10 @@ class Location:
(?P<host>([^:/]+|\[[0-9a-fA-F:.]+\]))(?::(?P<port>\d+))? # host or host:port or [ipv6] or [ipv6]:port
""" + abs_path_re + optional_archive_re, re.VERBOSE) # path or path::archive

serve_re = re.compile(r"""
(?P<proto>serve):// # serve://
""" + abs_path_re + optional_archive_re, re.VERBOSE) # path or path::archive

file_re = re.compile(r"""
(?P<proto>file):// # file://
""" + file_path_re + optional_archive_re, re.VERBOSE) # servername/path, path or path::archive
Expand Down Expand Up @@ -427,6 +431,12 @@ def normpath_special(p):
self.path = normpath_special(m.group('path'))
self.archive = m.group('archive')
return True
m = self.serve_re.match(text)
if m:
self.proto = m.group('proto')
self.path = normpath_special(m.group('path'))
self.archive = m.group('archive')
return True
m = self.file_re.match(text)
if m:
self.proto = m.group('proto')
Expand Down
65 changes: 45 additions & 20 deletions src/borg/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class RepositoryServer: # pragma: no cover
'inject_exception',
)

def __init__(self, restrict_to_paths, restrict_to_repositories, append_only, storage_quota):
def __init__(self, restrict_to_paths, restrict_to_repositories, append_only, storage_quota, pull_command=None):
self.repository = None
self.restrict_to_paths = restrict_to_paths
self.restrict_to_repositories = restrict_to_repositories
Expand All @@ -175,6 +175,7 @@ def __init__(self, restrict_to_paths, restrict_to_repositories, append_only, sto
self.append_only = append_only
self.storage_quota = storage_quota
self.client_version = parse_version('1.0.8') # fallback version if client is too old to send version information
self.pull_command = pull_command

def positional_to_named(self, method, argv):
"""Translate from positional protocol to named protocol."""
Expand All @@ -194,13 +195,27 @@ def filter_args(self, f, kwargs):
return {name: kwargs[name] for name in kwargs if name in known}

def serve(self):
stdin_fd = sys.stdin.fileno()
stdout_fd = sys.stdout.fileno()
stderr_fd = sys.stdout.fileno()
os.set_blocking(stdin_fd, False)
os.set_blocking(stdout_fd, True)
os.set_blocking(stderr_fd, True)

if self.pull_command:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please reverse this, so that what we had in the past (push) is in the if-block and the new stuff is generally in the else-block?

self.p = Popen(shlex.split(self.pull_command), bufsize=0, stdin=PIPE, stdout=PIPE, stderr=PIPE)

stdin_fd = self.p.stdout.fileno()
stdout_fd = self.p.stdin.fileno()
stderr_fd = sys.stderr.fileno()
os.set_blocking(stdin_fd, False)
os.set_blocking(stdout_fd, False)
os.set_blocking(stderr_fd, False)

else:
stdin_fd = sys.stdin.fileno()
stdout_fd = sys.stdout.fileno()
stderr_fd = sys.stdout.fileno()
os.set_blocking(stdin_fd, False)
os.set_blocking(stdout_fd, True)
os.set_blocking(stderr_fd, True)

unpacker = get_limited_unpacker('server')

while True:
r, w, es = select.select([stdin_fd], [], [], 10)
if r:
Expand Down Expand Up @@ -530,7 +545,7 @@ def required_version(self):
dictFormat = False # outside of __init__ for testing of legacy free protocol

def __init__(self, location, create=False, exclusive=False, lock_wait=None, lock=True, append_only=False,
make_parent_dirs=False, args=None):
make_parent_dirs=False, args=None, serve=False):
self.location = self._location = location
self.preload_ids = []
self.msgid = 0
Expand All @@ -552,18 +567,28 @@ def __init__(self, location, create=False, exclusive=False, lock_wait=None, lock
testing = location.host == '__testsuite__'
# when testing, we invoke and talk to a borg process directly (no ssh).
# when not testing, we invoke the system-installed ssh binary to talk to a remote borg.
env = prepare_subprocess_env(system=not testing)
borg_cmd = self.borg_cmd(args, testing)
if not testing:
borg_cmd = self.ssh_cmd(location) + borg_cmd
logger.debug('SSH command line: %s', borg_cmd)
self.p = Popen(borg_cmd, bufsize=0, stdin=PIPE, stdout=PIPE, stderr=PIPE, env=env)
self.stdin_fd = self.p.stdin.fileno()
self.stdout_fd = self.p.stdout.fileno()
self.stderr_fd = self.p.stderr.fileno()
os.set_blocking(self.stdin_fd, False)
os.set_blocking(self.stdout_fd, False)
os.set_blocking(self.stderr_fd, False)
if serve:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also reverse blocks please.

self.stdin_fd = sys.stdout.fileno()
self.stdout_fd = sys.stdin.fileno()
self.stderr_fd = sys.stderr.fileno()

os.set_blocking(self.stdin_fd, True)
os.set_blocking(self.stdout_fd, False)

else:
env = prepare_subprocess_env(system=not testing)
borg_cmd = self.borg_cmd(args, testing)
if not testing:
borg_cmd = self.ssh_cmd(location) + borg_cmd
logger.debug('SSH command line: %s', borg_cmd)
self.p = Popen(borg_cmd, bufsize=0, stdin=PIPE, stdout=PIPE, stderr=PIPE, env=env)
self.stdin_fd = self.p.stdin.fileno()
self.stdout_fd = self.p.stdout.fileno()
self.stderr_fd = self.p.stderr.fileno()
os.set_blocking(self.stdin_fd, False)
os.set_blocking(self.stdout_fd, False)
os.set_blocking(self.stderr_fd, False)

self.r_fds = [self.stdout_fd, self.stderr_fd]
self.x_fds = [self.stdin_fd, self.stdout_fd, self.stderr_fd]

Expand Down