From 6985fbc86744b24956beeaabf1ff746a81588599 Mon Sep 17 00:00:00 2001 From: Alexander Olekhnovich Date: Thu, 18 Mar 2021 14:30:14 +0100 Subject: [PATCH 1/2] Add support for delta basebackups Delta backups are based on the hashes of the files. The idea is to create a Snapshot of the data files, which results in a manifest file, describing the hashes of all the files needed to be backed up. New hashes are uploaded to the storage and used together with complementary manifest from control files for restoration. In order to deal with situation when hash changes during the upload, the file is prepared for upload with a temporary name first and then being uploaded to the name with the real hash. --- .gitignore | 1 + .pylintrc | 4 + Makefile | 2 +- pghoard.spec | 2 +- pghoard/basebackup.py | 162 ++++++--- pghoard/basebackup_delta.py | 325 ++++++++++++++++++ pghoard/common.py | 35 +- pghoard/config.py | 1 + pghoard/pghoard.py | 105 +++++- pghoard/restore.py | 224 +++++++++--- pghoard/rohmu/dates.py | 4 + pghoard/rohmu/delta/__init__.py | 1 + pghoard/rohmu/delta/common.py | 282 +++++++++++++++ pghoard/rohmu/delta/snapshot.py | 238 +++++++++++++ pghoard/transfer.py | 7 +- requirements.txt | 1 + setup.py | 1 + ...60b08d80a634c421b8381df25f31fbed5e8a8c8d8b | Bin 0 -> 1252 bytes ...634c421b8381df25f31fbed5e8a8c8d8b.metadata | 1 + ...86e02bd8414a9f3a484869f2b96ed7c62f3c4eb088 | Bin 0 -> 439 bytes ...14a9f3a484869f2b96ed7c62f3c4eb088.metadata | 1 + ...ad513f103380c16896093a17868fc909aeda393559 | Bin 0 -> 1237 bytes ...380c16896093a17868fc909aeda393559.metadata | 1 + test/basebackup_delta/config.json | 23 ++ test/conftest.py | 28 ++ test/test_basebackup.py | 34 +- test/test_pghoard.py | 130 ++++++- test/test_restore.py | 62 +++- test/test_rohmu_delta.py | 80 +++++ 29 files changed, 1626 insertions(+), 129 deletions(-) create mode 100644 pghoard/basebackup_delta.py create mode 100644 pghoard/rohmu/delta/__init__.py create mode 100644 pghoard/rohmu/delta/common.py create mode 100644 pghoard/rohmu/delta/snapshot.py create mode 100644 test/basebackup_delta/chunks/0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b create mode 100644 test/basebackup_delta/chunks/0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b.metadata create mode 100644 test/basebackup_delta/chunks/4b65df4d0857bbbcb22aa086e02bd8414a9f3a484869f2b96ed7c62f3c4eb088 create mode 100644 test/basebackup_delta/chunks/4b65df4d0857bbbcb22aa086e02bd8414a9f3a484869f2b96ed7c62f3c4eb088.metadata create mode 100644 test/basebackup_delta/chunks/fc61c91430dcb345001306ad513f103380c16896093a17868fc909aeda393559 create mode 100644 test/basebackup_delta/chunks/fc61c91430dcb345001306ad513f103380c16896093a17868fc909aeda393559.metadata create mode 100644 test/basebackup_delta/config.json create mode 100644 test/test_rohmu_delta.py diff --git a/.gitignore b/.gitignore index 86de2f55..da3014de 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,4 @@ __pycache__/ /venv/ /.venv/ *.orig +/pghoard-rpm-src/ diff --git a/.pylintrc b/.pylintrc index 81017922..deeb75a2 100644 --- a/.pylintrc +++ b/.pylintrc @@ -28,7 +28,11 @@ disable= [FORMAT] max-line-length=125 +max-module-lines=1000 [REPORTS] output-format=text reports=no + +[TYPECHECK] +extension-pkg-whitelist=pydantic diff --git a/Makefile b/Makefile index 02a33ce6..b46472a8 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -short_ver = 2.1.1 +short_ver = $(shell git describe --abbrev=0) long_ver = $(shell git describe --long 2>/dev/null || echo $(short_ver)-0-unknown-g`git describe --always`) generated = pghoard/version.py diff --git a/pghoard.spec b/pghoard.spec index 63fee15a..b3c2f762 100644 --- a/pghoard.spec +++ b/pghoard.spec @@ -7,7 +7,7 @@ License: ASL 2.0 Source0: pghoard-rpm-src.tar Requires: systemd Requires: python3-botocore, python3-cryptography >= 0.8, python3-dateutil -Requires: python3-psycopg2, python3-requests, python3-snappy +Requires: python3-psycopg2, python3-requests, python3-snappy, python3-zstandard, python3-pydantic, Conflicts: pgespresso92 < 1.2, pgespresso93 < 1.2, pgespresso94 < 1.2, pgespresso95 < 1.2 BuildRequires: python3-flake8, python3-pytest, python3-pylint, python3-devel, golang diff --git a/pghoard/basebackup.py b/pghoard/basebackup.py index 4efed7c4..9aeb16e4 100644 --- a/pghoard/basebackup.py +++ b/pghoard/basebackup.py @@ -14,9 +14,11 @@ import subprocess import time from concurrent.futures import ThreadPoolExecutor +from dataclasses import dataclass from queue import Empty, Queue from tempfile import NamedTemporaryFile from threading import Thread +from typing import Optional import psycopg2 @@ -25,8 +27,10 @@ # pylint: disable=superfluous-parens from . import common, version, wal +from .basebackup_delta import DeltaBaseBackup from .common import ( - connection_string_using_pgpass, replication_connection_string_and_slot_using_pgpass, set_stream_nonblocking, + BackupFailure, BaseBackupFormat, BaseBackupMode, connection_string_using_pgpass, + replication_connection_string_and_slot_using_pgpass, set_stream_nonblocking, set_subprocess_stdout_and_stderr_nonblocking, terminate_subprocess ) from .patchedtarfile import tarfile @@ -46,14 +50,38 @@ ] -class BackupFailure(Exception): - """Backup failed - post a failure to callback_queue and allow the thread to terminate""" - - class NoException(BaseException): """Exception that's never raised, used in conditional except blocks""" +@dataclass(frozen=True) +class EncryptionData: + encryption_key_id: Optional[str] + rsa_public_key: Optional[str] + + @staticmethod + def from_site_config(site_config) -> "EncryptionData": + encryption_key_id = site_config["encryption_key_id"] + if encryption_key_id: + rsa_public_key = site_config["encryption_keys"][encryption_key_id]["public"] + else: + rsa_public_key = None + + return EncryptionData(encryption_key_id=encryption_key_id, rsa_public_key=rsa_public_key) + + +@dataclass(frozen=True) +class CompressionData: + algorithm: str + level: int + + @staticmethod + def from_config(config) -> "CompressionData": + algorithm = config["compression"]["algorithm"] + level = config["compression"]["level"] + return CompressionData(algorithm=algorithm, level=level) + + class PGBaseBackup(Thread): def __init__( self, @@ -63,10 +91,12 @@ def __init__( basebackup_path, compression_queue, metrics, + storage, transfer_queue=None, callback_queue=None, pg_version_server=None, - metadata=None + metadata=None, + get_remote_basebackups_info=None ): super().__init__() self.log = logging.getLogger("PGBaseBackup") @@ -84,15 +114,19 @@ def __init__( self.pid = None self.pg_version_server = pg_version_server self.latest_activity = datetime.datetime.utcnow() + self.storage = storage + self.get_remote_basebackups_info = get_remote_basebackups_info def run(self): try: - basebackup_mode = self.config["backup_sites"][self.site]["basebackup_mode"] - if basebackup_mode == "basic": + basebackup_mode = self.site_config["basebackup_mode"] + if basebackup_mode == BaseBackupMode.basic: self.run_basic_basebackup() - elif basebackup_mode == "local-tar": + elif basebackup_mode == BaseBackupMode.local_tar: self.run_local_tar_basebackup() - elif basebackup_mode == "pipe": + elif basebackup_mode == BaseBackupMode.delta: + self.run_local_tar_basebackup(delta=True) + elif basebackup_mode == BaseBackupMode.pipe: self.run_piped_basebackup() else: raise errors.InvalidConfigurationError("Unsupported basebackup_mode {!r}".format(basebackup_mode)) @@ -129,7 +163,7 @@ def get_paths_for_backup(basebackup_path): def get_command_line(self, output_name): command = [ - self.config["backup_sites"][self.site]["pg_basebackup_path"], + self.site_config["pg_basebackup_path"], "--format", "tar", "--label", @@ -139,7 +173,7 @@ def get_command_line(self, output_name): output_name, ] - if self.config["backup_sites"][self.site]["active_backup_mode"] == "standalone_hot_backup": + if self.site_config["active_backup_mode"] == "standalone_hot_backup": if self.pg_version_server >= 100000: command.extend(["--wal-method=fetch"]) else: @@ -169,9 +203,9 @@ def check_command_success(self, proc, output_file): def basebackup_compression_pipe(self, proc, basebackup_path): rsa_public_key = None - encryption_key_id = self.config["backup_sites"][self.site]["encryption_key_id"] + encryption_key_id = self.site_config["encryption_key_id"] if encryption_key_id: - rsa_public_key = self.config["backup_sites"][self.site]["encryption_keys"][encryption_key_id]["public"] + rsa_public_key = self.site_config["encryption_keys"][encryption_key_id]["public"] compression_algorithm = self.config["compression"]["algorithm"] compression_level = self.config["compression"]["level"] self.log.debug("Compressing basebackup directly to file: %r", basebackup_path) @@ -461,25 +495,30 @@ def add_entry(archive_path, local_path, *, missing_ok): yield from add_directory(archive_path, local_path, missing_ok=False) yield archive_path, local_path, False, "leave" + @property + def site_config(self): + return self.config["backup_sites"][self.site] + + @property + def encryption_data(self) -> EncryptionData: + return EncryptionData.from_site_config(self.site_config) + + @property + def compression_data(self) -> CompressionData: + return CompressionData.from_config(self.config) + def tar_one_file( self, *, temp_dir, chunk_path, files_to_backup, callback_queue, filetype="basebackup_chunk", extra_metadata=None ): start_time = time.monotonic() - site_config = self.config["backup_sites"][self.site] - encryption_key_id = site_config["encryption_key_id"] - if encryption_key_id: - rsa_public_key = site_config["encryption_keys"][encryption_key_id]["public"] - else: - rsa_public_key = None - with NamedTemporaryFile(dir=temp_dir, prefix=os.path.basename(chunk_path), suffix=".tmp") as raw_output_obj: # pylint: disable=bad-continuation with rohmufile.file_writer( - compression_algorithm=self.config["compression"]["algorithm"], - compression_level=self.config["compression"]["level"], - compression_threads=site_config["basebackup_compression_threads"], - rsa_public_key=rsa_public_key, + compression_algorithm=self.compression_data.algorithm, + compression_level=self.compression_data.level, + compression_threads=self.site_config["basebackup_compression_threads"], + rsa_public_key=self.encryption_data.rsa_public_key, fileobj=raw_output_obj ) as output_obj: with tarfile.TarFile(fileobj=output_obj, mode="w") as output_tar: @@ -492,7 +531,7 @@ def tar_one_file( os.link(raw_output_obj.name, chunk_path) rohmufile.log_compression_result( - encrypted=bool(encryption_key_id), + encrypted=bool(self.encryption_data.encryption_key_id), elapsed=time.monotonic() - start_time, original_size=input_size, result_size=result_size, @@ -505,16 +544,16 @@ def tar_one_file( "pghoard.compressed_size_ratio", size_ratio, tags={ - "algorithm": self.config["compression"]["algorithm"], + "algorithm": self.compression_data.algorithm, "site": self.site, "type": "basebackup", } ) metadata = { - "compression-algorithm": self.config["compression"]["algorithm"], - "encryption-key-id": encryption_key_id, - "format": "pghoard-bb-v2", + "compression-algorithm": self.compression_data.algorithm, + "encryption-key-id": self.encryption_data.encryption_key_id, + "format": BaseBackupFormat.v2, "original-file-size": input_size, "host": socket.gethostname(), } @@ -573,9 +612,8 @@ def create_and_upload_chunks(self, chunks, data_file_format, temp_base_dir): self.chunks_on_disk = 0 i = 0 - site_config = self.config["backup_sites"][self.site] - max_chunks_on_disk = site_config["basebackup_chunks_in_progress"] - threads = site_config["basebackup_threads"] + max_chunks_on_disk = self.site_config["basebackup_chunks_in_progress"] + threads = self.site_config["basebackup_threads"] with ThreadPoolExecutor(max_workers=threads) as tpe: pending_compress_and_encrypt_tasks = [] while i < len(chunks): @@ -612,8 +650,9 @@ def create_and_upload_chunks(self, chunks, data_file_format, temp_base_dir): return chunk_files - def run_local_tar_basebackup(self): - pgdata = self.config["backup_sites"][self.site]["pg_data_directory"] + def run_local_tar_basebackup(self, delta=False): + control_files_metadata_extra = {} + pgdata = self.site_config["pg_data_directory"] if not os.path.isdir(pgdata): raise errors.InvalidConfigurationError("pg_data_directory {!r} does not exist".format(pgdata)) @@ -622,7 +661,7 @@ def run_local_tar_basebackup(self): data_file_format = "{}/{}.{{0:08d}}.pghoard".format(compressed_base, os.path.basename(compressed_base)).format # Default to 2GB chunks of uncompressed data - target_chunk_size = self.config["backup_sites"][self.site]["basebackup_chunk_size"] + target_chunk_size = self.site_config["basebackup_chunk_size"] self.log.debug("Connecting to database to start backup process") connection_string = connection_string_using_pgpass(self.connection_info) @@ -686,13 +725,45 @@ def run_local_tar_basebackup(self): self.log.info("Starting to backup %r and %r tablespaces to %r", pgdata, len(tablespaces), compressed_base) start_time = time.monotonic() - total_file_count, chunks = self.find_and_split_files_to_backup( - pgdata=pgdata, tablespaces=tablespaces, target_chunk_size=target_chunk_size - ) + if delta: + delta_backup = DeltaBaseBackup( + storage=self.storage, + site=self.site, + site_config=self.site_config, + transfer_queue=self.transfer_queue, + metrics=self.metrics, + encryption_data=self.encryption_data, + compression_data=self.compression_data, + get_remote_basebackups_info=self.get_remote_basebackups_info, + parallel=self.site_config["basebackup_threads"], + temp_base_dir=temp_base_dir, + compressed_base=compressed_base + ) + total_size_plain, total_size_enc, manifest, total_file_count = delta_backup.run( + pgdata=pgdata, + src_iterate_func=lambda: ( + item[1] + for item in self.find_files_to_backup(pgdata=pgdata, tablespaces=tablespaces) + if not item[1].endswith(".pem") # Exclude such files like "dh1024.pem" + ), + ) + + chunks_count = total_file_count + control_files_metadata_extra["manifest"] = manifest.jsondict() + self.metadata["format"] = BaseBackupFormat.delta_v1 + else: + total_file_count, chunks = self.find_and_split_files_to_backup( + pgdata=pgdata, tablespaces=tablespaces, target_chunk_size=target_chunk_size + ) + chunks_count = len(chunks) + # Tar up the chunks and submit them for upload; note that we start from chunk 1 here; chunk 0 + # is reserved for special files and metadata and will be generated last. + chunk_files = self.create_and_upload_chunks(chunks, data_file_format, temp_base_dir) - # Tar up the chunks and submit them for upload; note that we start from chunk 1 here; chunk 0 - # is reserved for special files and metadata and will be generated last. - chunk_files = self.create_and_upload_chunks(chunks, data_file_format, temp_base_dir) + total_size_plain = sum(item["input_size"] for item in chunk_files) + total_size_enc = sum(item["result_size"] for item in chunk_files) + + control_files_metadata_extra["chunks"] = chunk_files # Everything is now tarred up, grab the latest pg_control and stop the backup process with open(os.path.join(pgdata, "global", "pg_control"), "rb") as fp: @@ -709,12 +780,9 @@ def run_local_tar_basebackup(self): db_conn.commit() backup_stopped = True - total_size_plain = sum(item["input_size"] for item in chunk_files) - total_size_enc = sum(item["result_size"] for item in chunk_files) - self.log.info( "Basebackup generation finished, %r files, %r chunks, " - "%r byte input, %r byte output, took %r seconds, waiting to upload", total_file_count, len(chunk_files), + "%r byte input, %r byte output, took %r seconds, waiting to upload", total_file_count, chunks_count, total_size_plain, total_size_enc, time.monotonic() - start_time ) @@ -740,13 +808,13 @@ def run_local_tar_basebackup(self): "backup_end_wal_segment": backup_end_wal_segment, "backup_start_time": backup_start_time, "backup_start_wal_segment": backup_start_wal_segment, - "chunks": chunk_files, "pgdata": pgdata, "pghoard_object": "basebackup", "pghoard_version": version.__version__, "tablespaces": tablespaces, "host": socket.gethostname(), } + metadata.update(control_files_metadata_extra) control_files = list( self.get_control_entries_for_tar( metadata=metadata, diff --git a/pghoard/basebackup_delta.py b/pghoard/basebackup_delta.py new file mode 100644 index 00000000..c5922540 --- /dev/null +++ b/pghoard/basebackup_delta.py @@ -0,0 +1,325 @@ +# Copyright (c) 2021 Aiven, Helsinki, Finland. https://aiven.io/ +import hashlib +import io +import logging +import os +import shutil +import socket +import threading +import time +import uuid +from contextlib import suppress +from multiprocessing.dummy import Pool +from queue import Empty, Queue +from tempfile import NamedTemporaryFile +from typing import Callable, Dict + +from pghoard.common import (BackupFailure, BaseBackupFormat, extract_pghoard_delta_v1_metadata) +from pghoard.rohmu import rohmufile +from pghoard.rohmu.dates import now +from pghoard.rohmu.delta.common import (BackupManifest, SnapshotFile, SnapshotHash, SnapshotResult, SnapshotUploadResult) +from pghoard.rohmu.delta.snapshot import Snapshotter +from pghoard.rohmu.errors import FileNotFoundFromStorageError + + +class DeltaBaseBackup: + def __init__( + self, *, storage, site, site_config, transfer_queue, metrics, encryption_data, compression_data, + get_remote_basebackups_info, parallel, temp_base_dir, compressed_base + ): + self.log = logging.getLogger("DeltaBaseBackup") + self.storage = storage + self.site = site + self.site_config = site_config + self.transfer_queue = transfer_queue + self.metrics = metrics + self.encryption_data = encryption_data + self.compression_data = compression_data + self.get_remote_basebackups_info = get_remote_basebackups_info + self.parallel = parallel + self.temp_base_dir = temp_base_dir + self.compressed_base = compressed_base + self.submitted_hashes_lock = threading.Lock() + self.submitted_hashes = set() + self.tracked_snapshot_files: Dict[str, SnapshotFile] = self._list_existing_files() + + def _snapshot(self, snapshotter) -> SnapshotResult: + snapshotter.snapshot(reuse_old_snapshotfiles=False) + snapshot_result = SnapshotResult() + snapshot_result.state = snapshotter.get_snapshot_state() + snapshot_result.hashes = [ + SnapshotHash(hexdigest=ssfile.hexdigest, size=ssfile.file_size) + for ssfile in snapshot_result.state.files + if ssfile.hexdigest + ] + snapshot_result.files = len(snapshot_result.state.files) + snapshot_result.total_size = sum(ssfile.file_size for ssfile in snapshot_result.state.files) + snapshot_result.end = now() + + self.log.debug("snapshot result: %r", snapshot_result.json()) + + return snapshot_result + + def _upload_hexdigest_from_file(self, file_obj, relative_path, callback_queue): + """Upload file into an object with a temporary name, after upload is finished, it will be copied to a new name + with a final/real hash""" + temp_object_name = f"temp_{uuid.uuid4()}" + return self._delta_upload_hexdigest( + callback_queue=callback_queue, + chunk_path=f"{self.compressed_base}/{temp_object_name}", + temp_dir=self.temp_base_dir, + file_obj=file_obj, + relative_path=relative_path + ) + + def _list_existing_files(self): + """Iterate through all manifest files and fetch information about hash files""" + all_snapshot_files: Dict[str, SnapshotFile] = {} + for backup in self.get_remote_basebackups_info(self.site): + if backup["metadata"].get("format") != BaseBackupFormat.delta_v1: + continue + + key = os.path.join(self.site_config["prefix"], "basebackup", backup["name"]) + bmeta_compressed = self.storage.get_contents_to_string(key)[0] + + with rohmufile.file_reader( + fileobj=io.BytesIO(bmeta_compressed), + metadata=backup["metadata"], + key_lookup=lambda key_id: self.site_config["encryption_keys"][key_id]["private"] + ) as input_obj: + meta = extract_pghoard_delta_v1_metadata(input_obj) + + manifest = meta["manifest"] + snapshot_result = manifest["snapshot_result"] + backup_state = snapshot_result["state"] + files = backup_state["files"] + for delta_file in files: + snapshot_file = SnapshotFile.parse_obj(delta_file) + if snapshot_file.hexdigest: + all_snapshot_files[snapshot_file.hexdigest] = snapshot_file + + return all_snapshot_files + + def _delta_upload_hexdigest(self, *, temp_dir, chunk_path, file_obj, callback_queue, relative_path): + skip_upload = False + start_time = time.monotonic() + + input_size = file_obj.seek(0, os.SEEK_END) + file_obj.seek(0) + + result_hash = hashlib.blake2s() + + def update_hash(data): + result_hash.update(data) + + with NamedTemporaryFile(dir=temp_dir, prefix=os.path.basename(chunk_path), suffix=".tmp") as raw_output_obj: + rohmufile.write_file( + input_obj=file_obj, + output_obj=raw_output_obj, + compression_algorithm=self.compression_data.algorithm, + compression_level=self.compression_data.level, + rsa_public_key=self.encryption_data.rsa_public_key, + log_func=self.log.info, + data_callback=update_hash + ) + result_size = raw_output_obj.tell() + raw_output_obj.seek(0) + + result_digest = result_hash.hexdigest() + + with self.submitted_hashes_lock: + if result_digest in self.submitted_hashes: + # file with the same hash was already submitted + self.log.debug( + "Skip uploading file %r, file with the same was hash already submitted for uploading", relative_path + ) + skip_upload = True + return input_size, result_size, result_digest, skip_upload + else: + self.submitted_hashes.add(result_digest) + + os.link(raw_output_obj.name, chunk_path) + + rohmufile.log_compression_result( + encrypted=bool(self.encryption_data.encryption_key_id), + elapsed=time.monotonic() - start_time, + original_size=input_size, + result_size=result_size, + source_name="$PGDATA delta basebackup file", + log_func=self.log.info, + ) + + if input_size: + size_ratio = result_size / input_size + self.metrics.gauge( + "pghoard.compressed_size_ratio", + size_ratio, + tags={ + "algorithm": self.compression_data.algorithm, + "site": self.site, + "type": "basebackup_delta", + } + ) + + metadata = { + "compression-algorithm": self.compression_data.algorithm, + "encryption-key-id": self.encryption_data.encryption_key_id, + "format": BaseBackupFormat.delta_v1, + "original-file-size": input_size, + "host": socket.gethostname(), + } + + self.transfer_queue.put({ + "callback_queue": callback_queue, + "file_size": result_size, + "filetype": "basebackup_delta", + "local_path": chunk_path, + "metadata": metadata, + "site": self.site, + "type": "UPLOAD", + "delta": { + "hexdigest": result_digest, + }, + }) + + return input_size, result_size, result_digest, skip_upload + + def _upload_files(self, callback_queue, todo, snapshotter): + todo_hexdigests = set(hash.hexdigest for hash in todo) + + # Keep track on the new hashes submitted with the current upload, so we can clean them up in case of error + new_submitted_hashes = set() + + def _submit_files_in_thread(hexdigest): + files = snapshotter.hexdigest_to_snapshotfiles.get(hexdigest, []) + for snapshotfile in files: + path = snapshotter.dst / snapshotfile.relative_path + if not path.is_file(): + self.log.error("%s disappeared post-snapshot", path) + return False + try: + with snapshotfile.open_for_reading(snapshotter.dst) as f: + size, stored_size, new_hash, skip_upload = self._upload_hexdigest_from_file( + file_obj=f, relative_path=snapshotfile.relative_path, callback_queue=callback_queue + ) + if new_hash not in self.tracked_snapshot_files: + new_submitted_hashes.add(new_hash) + snapshotter.update_snapshot_file_data( + relative_path=snapshotfile.relative_path, + hexdigest=new_hash, + file_size=size, + stored_file_size=stored_size + ) + if not skip_upload: + # Every thread is waiting for the transfer to finish, so we don't load disk too much, + # only `self.parallel` files in parallel + while True: + try: + callback_queue.get(timeout=3.0) + break + except Empty: + continue + except Exception: # pylint: disable=broad-except + # Report failure - whole step will be retried later + self.log.exception("Exception uploading %r", path) + return False + + return True + + sorted_todo_hexdigests = sorted( + todo_hexdigests, key=lambda hexdigest: -snapshotter.hexdigest_to_snapshotfiles[hexdigest][0].file_size + ) + iterable_as_list = list(sorted_todo_hexdigests) + with Pool(self.parallel) as p: + for hexdigest, res in zip(iterable_as_list, p.imap(_submit_files_in_thread, iterable_as_list)): + if not res: + self.log.error( + "Error while processing digest for upload %r, waiting for workers pool to shutdown", hexdigest + ) + + p.terminate() + p.join() + + self.log.info("Cleaning up already uploaded new backup files") + + for new_hash in new_submitted_hashes: + key = os.path.join(self.site_config["prefix"], "basebackup_delta", new_hash) + self.log.info("Removing object from the storage: %r", key) + try: + self.storage.delete_key(key) + except FileNotFoundFromStorageError: + self.log.warning("Object with key %r does not exist, skipping") + + raise BackupFailure("Error while uploading backup files") + + self.log.info("All basebackup files were uploaded successfully") + + def _delta_upload(self, snapshot_result: SnapshotResult, snapshotter: Snapshotter, start_time_utc): + callback_queue = Queue() + + # Determine which digests already exist and which need to be uploaded, also restore the backup size of re-used + # files from manifests + snapshot_hashes = set(snapshot_result.hashes) + uploaded_hashes = set() + for snapshot_file in snapshot_result.state.files: + if snapshot_file.hexdigest in self.tracked_snapshot_files: + snapshot_file_from_manifest = self.tracked_snapshot_files[snapshot_file.hexdigest] + uploaded_hashes.add( + SnapshotHash( + hexdigest=snapshot_file_from_manifest.hexdigest, size=snapshot_file_from_manifest.file_size + ) + ) + + todo = snapshot_hashes.difference(uploaded_hashes) + todo_count = len(todo) + + self.log.info("Submitting hashes for upload: %r, total hashes in the snapshot: %r", todo_count, len(snapshot_hashes)) + + self._upload_files(callback_queue=callback_queue, todo=todo, snapshotter=snapshotter) + + total_stored_size = 0 + total_size = 0 + + snapshot_result.state = snapshotter.get_snapshot_state() + for snapshot_file in snapshot_result.state.files: + total_size += snapshot_file.file_size + if snapshot_file.hexdigest: + if not snapshot_file.stored_file_size: + # Patch existing files with stored_file_size from existing manifest files (we can not have it otherwise) + snapshot_file.stored_file_size = self.tracked_snapshot_files[snapshot_file.hexdigest].stored_file_size + total_stored_size += snapshot_file.stored_file_size + elif snapshot_file.content_b64: + # Include embed files size into the total size as well + total_stored_size += snapshot_file.file_size + + manifest = BackupManifest( + start=start_time_utc, + snapshot_result=snapshot_result, + upload_result=SnapshotUploadResult(total_size=total_size, total_stored_size=total_stored_size) + ) + + return manifest, total_size, total_stored_size + + def run(self, pgdata: str, src_iterate_func: Callable): + # NOTE: Hard links work only in the same FS, therefore using hopefully the same FS in PG home folder + delta_dir = os.path.join(os.path.dirname(pgdata), "basebackup_delta") + with suppress(FileExistsError): + os.makedirs(delta_dir) + + snapshotter = Snapshotter( + src=pgdata, dst=delta_dir, globs=["**/*"], parallel=self.parallel, src_iterate_func=src_iterate_func + ) + + with snapshotter.lock: + start_time_utc = now() + + snapshot_result = self._snapshot(snapshotter) + + manifest, total_size, total_stored_size = self._delta_upload( + snapshot_result=snapshot_result, snapshotter=snapshotter, start_time_utc=start_time_utc + ) + self.log.debug("manifest %r", manifest) + + shutil.rmtree(delta_dir) + + return total_size, total_stored_size, manifest, snapshot_result.files diff --git a/pghoard/common.py b/pghoard/common.py index de9f24a3..342168ad 100644 --- a/pghoard/common.py +++ b/pghoard/common.py @@ -5,6 +5,7 @@ See LICENSE for details """ import datetime +import enum import fcntl import json import logging @@ -24,6 +25,26 @@ LOG = logging.getLogger("pghoard.common") +class StrEnum(str, enum.Enum): + def __str__(self): + return str(self.value) + + +@enum.unique +class BaseBackupFormat(StrEnum): + v1 = "pghoard-bb-v1" + v2 = "pghoard-bb-v2" + delta_v1 = "pghoard-delta-v1" + + +@enum.unique +class BaseBackupMode(StrEnum): + basic = "basic" + delta = "delta" + local_tar = "local-tar" + pipe = "pipe" + + def create_pgpass_file(connection_string_or_info): """Look up password from the given object which can be a dict or a string and write a possible password in a pgpass file; @@ -211,7 +232,7 @@ def delete_alert_file(config, filename): os.unlink(filepath) -def extract_pghoard_bb_v2_metadata(fileobj): +def _extract_metadata(fileobj): # | in mode to use tarfile's internal stream buffer manager, currently required because our SnappyFile # interface doesn't do proper buffering for reads with tarfile.open(fileobj=fileobj, mode="r|", bufsize=IO_BLOCK_SIZE) as tar: @@ -223,6 +244,14 @@ def extract_pghoard_bb_v2_metadata(fileobj): raise Exception(".pghoard_tar_metadata.json not found") +def extract_pghoard_bb_v2_metadata(fileobj): + return _extract_metadata(fileobj) + + +def extract_pghoard_delta_v1_metadata(fileobj): + return _extract_metadata(fileobj) + + def get_pg_wal_directory(config): if LooseVersion(config["pg_data_directory_version"]) >= "10": return os.path.join(config["pg_data_directory"], "pg_wal") @@ -256,3 +285,7 @@ def increase_pipe_capacity(*pipes): break except PermissionError: pass + + +class BackupFailure(Exception): + """Backup failed - post a failure to callback_queue and allow the thread to terminate""" diff --git a/pghoard/config.py b/pghoard/config.py index 40e40451..153a03fc 100644 --- a/pghoard/config.py +++ b/pghoard/config.py @@ -85,6 +85,7 @@ def set_and_check_config_defaults(config, *, check_commands=True, check_pgdata=T site_config.setdefault("basebackup_compression_threads", 0) site_config.setdefault("basebackup_count", 2) site_config.setdefault("basebackup_count_min", 2) + site_config.setdefault("basebackup_delta_mode_max_retries", 10) site_config.setdefault("basebackup_interval_hours", 24) # NOTE: stream_compression removed from documentation after 1.6.0 release site_config.setdefault("basebackup_mode", "pipe" if site_config.get("stream_compression") else "basic") diff --git a/pghoard/pghoard.py b/pghoard/pghoard.py index 631ae856..82c75ba8 100644 --- a/pghoard/pghoard.py +++ b/pghoard/pghoard.py @@ -19,20 +19,23 @@ import sys import time from contextlib import closing +from dataclasses import dataclass from queue import Empty, Queue +from typing import Dict import psycopg2 from pghoard import config, logutil, metrics, version, wal from pghoard.basebackup import PGBaseBackup from pghoard.common import ( - create_alert_file, extract_pghoard_bb_v2_metadata, get_object_storage_config, - replication_connection_string_and_slot_using_pgpass, write_json_file + BaseBackupFormat, BaseBackupMode, create_alert_file, extract_pghoard_bb_v2_metadata, extract_pghoard_delta_v1_metadata, + get_object_storage_config, replication_connection_string_and_slot_using_pgpass, write_json_file ) from pghoard.compressor import CompressorThread from pghoard.receivexlog import PGReceiveXLog from pghoard.rohmu import dates, get_transfer, rohmufile from pghoard.rohmu.compat import suppress +from pghoard.rohmu.dates import now as utc_now from pghoard.rohmu.errors import (FileNotFoundFromStorageError, InvalidConfigurationError) from pghoard.rohmu.inotify import InotifyWatcher from pghoard.transfer import TransferAgent @@ -45,6 +48,12 @@ WALReceiver = None +@dataclass +class DeltaBaseBackupFailureInfo: + last_failed_time: datetime + retries: int = 0 + + class PGHoard: def __init__(self, config_path): self.metrics = None @@ -117,6 +126,7 @@ def __init__(self, config_path): logutil.notify_systemd("READY=1") self.log.info("pghoard initialized, own_hostname: %r, cwd: %r", socket.gethostname(), os.getcwd()) + self.delta_backup_failures: Dict[str, DeltaBaseBackupFailureInfo] = {} def check_pg_versions_ok(self, site, pg_version_server, command): if pg_version_server is None: @@ -154,7 +164,9 @@ def create_basebackup(self, site, connection_info, basebackup_path, callback_que callback_queue=callback_queue, pg_version_server=pg_version_server, metrics=self.metrics, + storage=self.get_or_create_site_storage(site=site), metadata=metadata, + get_remote_basebackups_info=self.get_remote_basebackups_info ) thread.start() self.basebackups[site] = thread @@ -219,8 +231,11 @@ def start_walreceiver(self, site, chosen_backup_node, last_flushed_lsn): thread.start() self.walreceivers[site] = thread + def _get_site_prefix(self, site): + return self.config["backup_sites"][site]["prefix"] + def create_backup_site_paths(self, site): - site_path = os.path.join(self.config["backup_location"], self.config["backup_sites"][site]["prefix"]) + site_path = os.path.join(self.config["backup_location"], self._get_site_prefix(site)) xlog_path = os.path.join(site_path, "xlog") basebackup_path = os.path.join(site_path, "basebackup") @@ -249,9 +264,7 @@ def delete_remote_wal_before(self, wal_segment, site, pg_version): if seg == 0 and log == 0: break seg, log = wal.get_previous_wal_on_same_timeline(seg, log, pg_version) - wal_path = os.path.join( - self.config["backup_sites"][site]["prefix"], "xlog", wal.name_for_tli_log_seg(tli, log, seg) - ) + wal_path = os.path.join(self._get_site_prefix(site), "xlog", wal.name_for_tli_log_seg(tli, log, seg)) self.log.debug("Deleting wal_file: %r", wal_path) try: storage.delete_key(wal_path) @@ -274,13 +287,47 @@ def delete_remote_wal_before(self, wal_segment, site, pg_version): self.log.exception("Problem deleting: %r", wal_path) self.metrics.unexpected_exception(ex, where="delete_remote_wal_before") - def delete_remote_basebackup(self, site, basebackup, metadata): + def _get_delta_basebackup_files(self, site, storage, metadata, basebackup_name_to_delete, backups_to_keep): + all_hexdigests = set() + keep_hexdigests = set() + + basebackup_data_files = list() + for backup_name in [basebackup_name_to_delete] + [back["name"] for back in backups_to_keep]: + delta_backup_key = os.path.join(self._get_site_prefix(site), "basebackup", backup_name) + bmeta_compressed = storage.get_contents_to_string(delta_backup_key)[0] + with rohmufile.file_reader( + fileobj=io.BytesIO(bmeta_compressed), + metadata=metadata, + key_lookup=config.key_lookup_for_site(self.config, site) + ) as input_obj: + meta = extract_pghoard_delta_v1_metadata(input_obj) + + manifest = meta["manifest"] + snapshot_result = manifest["snapshot_result"] + backup_state = snapshot_result["state"] + files = backup_state["files"] + + backup_hexdigests = set(delta_file["hexdigest"] for delta_file in files if delta_file["hexdigest"]) + all_hexdigests |= backup_hexdigests + + if backup_name != basebackup_name_to_delete: + # Keep data file in case if there is still a reference from other backups + keep_hexdigests |= backup_hexdigests + + # Remove unreferenced files + extra_hexdigests = set(all_hexdigests).difference(keep_hexdigests) + for hexdigest in extra_hexdigests: + basebackup_data_files.append(os.path.join(self._get_site_prefix(site), "basebackup_delta", hexdigest)) + + return basebackup_data_files + + def delete_remote_basebackup(self, site, basebackup, metadata, basebackups): start_time = time.monotonic() storage = self.site_transfers.get(site) - main_backup_key = os.path.join(self.config["backup_sites"][site]["prefix"], "basebackup", basebackup) + main_backup_key = os.path.join(self._get_site_prefix(site), "basebackup", basebackup) basebackup_data_files = [main_backup_key] - if metadata.get("format") == "pghoard-bb-v2": + if metadata.get("format") == BaseBackupFormat.v2: bmeta_compressed = storage.get_contents_to_string(main_backup_key)[0] with rohmufile.file_reader( fileobj=io.BytesIO(bmeta_compressed), @@ -292,11 +339,13 @@ def delete_remote_basebackup(self, site, basebackup, metadata): for chunk in bmeta["chunks"]: basebackup_data_files.append( os.path.join( - self.config["backup_sites"][site]["prefix"], + self._get_site_prefix(site), "basebackup_chunk", chunk["chunk_filename"], ) ) + elif metadata.get("format") == BaseBackupFormat.delta_v1: + basebackup_data_files.extend(self._get_delta_basebackup_files(site, storage, metadata, basebackup, basebackups)) self.log.debug("Deleting basebackup datafiles: %r", ", ".join(basebackup_data_files)) for obj_key in basebackup_data_files: @@ -312,13 +361,16 @@ def delete_remote_basebackup(self, site, basebackup, metadata): time.monotonic() - start_time ) - def get_remote_basebackups_info(self, site): + def get_or_create_site_storage(self, site): storage = self.site_transfers.get(site) if not storage: storage_config = get_object_storage_config(self.config, site) storage = get_transfer(storage_config) self.site_transfers[site] = storage + return storage + def get_remote_basebackups_info(self, site): + storage = self.get_or_create_site_storage(site=site) site_config = self.config["backup_sites"][site] results = storage.list_path(os.path.join(site_config["prefix"], "basebackup")) for entry in results: @@ -404,7 +456,9 @@ def refresh_backup_list_and_delete_old(self, site): if last_wal_segment_still_needed: self.delete_remote_wal_before(last_wal_segment_still_needed, site, pg_version) - self.delete_remote_basebackup(site, basebackup_to_be_deleted["name"], basebackup_to_be_deleted["metadata"]) + self.delete_remote_basebackup( + site, basebackup_to_be_deleted["name"], basebackup_to_be_deleted["metadata"], basebackups=basebackups + ) self.state["backup_sites"][site]["basebackups"] = basebackups def get_normalized_backup_time(self, site_config, *, now=None): @@ -553,6 +607,18 @@ def handle_site(self, site, site_config): if site in self.basebackups: try: result = self.basebackups_callbacks[site].get(block=False) + if result["success"]: + # No matter which mode, if succeeded reset the counter + self.delta_backup_failures.pop(site, None) + elif site_config["basebackup_mode"] == BaseBackupMode.delta: + last_failed_time = utc_now() + if site not in self.delta_backup_failures: + self.delta_backup_failures[site] = DeltaBaseBackupFailureInfo( + retries=0, last_failed_time=last_failed_time + ) + else: + self.delta_backup_failures[site].retries += 1 + self.delta_backup_failures[site].last_failed_time = last_failed_time except Empty: # previous basebackup (or its compression and upload) still in progress return @@ -566,6 +632,21 @@ def handle_site(self, site, site_config): metadata = self.get_new_backup_details(site=site, site_config=site_config) if metadata and not os.path.exists(self.config["maintenance_mode_file"]): + if site in self.delta_backup_failures: + retries = self.delta_backup_failures[site].retries + if retries > site_config["basebackup_delta_mode_max_retries"]: + self.log.info("Giving up backup after exceeding max retries: %r", retries) + return + else: + # Start from ~2 min with cap of one hour + retry_interval = min(2 ** (retries + 7), 60 * 60) + if utc_now( + ) >= self.delta_backup_failures[site].last_failed_time + datetime.timedelta(seconds=retry_interval): + self.log.info("Re-trying delta basebackup") + else: + self.log.info("Waiting for backoff time before re-trying new delta backup due to previous failures") + return + self.basebackups_callbacks[site] = Queue() self.create_basebackup(site, chosen_backup_node, basebackup_path, self.basebackups_callbacks[site], metadata) diff --git a/pghoard/restore.py b/pghoard/restore.py index 9d549b6d..07bf72e7 100644 --- a/pghoard/restore.py +++ b/pghoard/restore.py @@ -4,8 +4,11 @@ Copyright (c) 2016 Ohmu Ltd See LICENSE for details """ +import abc import argparse +import base64 import datetime +import enum import errno import io import logging @@ -23,10 +26,12 @@ from distutils.version import \ LooseVersion # pylint: disable=no-name-in-module,import-error from threading import RLock +from typing import Dict, List, Optional from psycopg2.extensions import adapt from requests import Session +from pghoard.common import BaseBackupFormat, StrEnum from pghoard.rohmu import compat, dates, get_transfer, rohmufile from pghoard.rohmu.errors import Error, InvalidConfigurationError @@ -38,6 +43,50 @@ class RestoreError(Error): """Restore error""" +@enum.unique +class FileInfoType(StrEnum): + regular = "regular" + delta = "delta" + + +class FileInfo(abc.ABC): + def __init__(self, size, new_name, metadata, file_type): + self.size: int = size + self.id = str(uuid.uuid4()) + self.new_name = new_name + self.metadata = metadata + self.file_type = file_type + + +class FilePathInfo(FileInfo): + def __init__( + self, + name: str, + size: int, + new_name: Optional[str] = None, + metadata: Optional[Dict] = None, + file_type: Optional[FileInfoType] = FileInfoType.regular + ): + self.name: str = name + super().__init__(size=size, new_name=new_name, metadata=metadata, file_type=file_type) + + def __repr__(self): + return f"{self.name} (size={self.size}, file_type={self.file_type})" + + +class FileDataInfo(FileInfo): + def __init__( + self, + data: bytes, + size: int, + new_name: Optional[str] = None, + metadata: Optional[Dict] = None, + file_type: Optional[FileInfoType] = FileInfoType.regular + ): + self.data: bytes = data + super().__init__(size=size, new_name=new_name, metadata=metadata, file_type=file_type) + + def create_signal_file(file_path): """Just ensure the file exists""" with open(file_path, "w"): @@ -240,10 +289,13 @@ def list_basebackups_http(self, arg): self.storage = HTTPRestore(arg.host, arg.port, arg.site) self.storage.show_basebackup_list(verbose=arg.verbose) + def _get_site_prefix(self, site): + return self.config["backup_sites"][site]["prefix"] + def _get_object_storage(self, site, pgdata): storage_config = common.get_object_storage_config(self.config, site) storage = get_transfer(storage_config) - return ObjectStore(storage, self.config["backup_sites"][site]["prefix"], site, pgdata) + return ObjectStore(storage, self._get_site_prefix(site), site, pgdata) def list_basebackups(self, arg): """List basebackups from an object store""" @@ -330,6 +382,51 @@ def _find_nearest_basebackup(self, recovery_target_time=None): print("\nSelecting {!r} for restore".format(selected["name"])) return selected + def _get_delta_basebackup_data(self, site, metadata, basebackup_name): + basebackup_data_files = [] + bmeta_compressed = self.storage.get_file_bytes(basebackup_name) + with rohmufile.file_reader( + fileobj=io.BytesIO(bmeta_compressed), + metadata=metadata, + key_lookup=config.key_lookup_for_site(self.config, site) + ) as input_obj: + bmeta = common.extract_pghoard_delta_v1_metadata(input_obj) + self.log.debug("Delta backup metadata: %r", bmeta) + + delta_objects_path = os.path.join(self._get_site_prefix(site), "basebackup_delta") + + manifest = bmeta["manifest"] + snapshot_result = manifest["snapshot_result"] + backup_state = snapshot_result["state"] + files = backup_state["files"] + empty_dirs = backup_state["empty_dirs"] + tablespaces = bmeta["tablespaces"] + for delta_file in files: + if delta_file["hexdigest"]: + basebackup_data_files.append( + FilePathInfo( + name=os.path.join(delta_objects_path, delta_file["hexdigest"]), + size=delta_file["stored_file_size"], + new_name=delta_file["relative_path"], + file_type=FileInfoType.delta + ) + ) + elif delta_file["content_b64"] is not None: + # Restore embed files + basebackup_data_files.append( + FileDataInfo( + data=base64.b64decode(delta_file["content_b64"]), + metadata=metadata, + size=delta_file["file_size"], + new_name=delta_file["relative_path"], + file_type=FileInfoType.delta + ) + ) + + basebackup_data_files.append(FileDataInfo(data=bmeta_compressed, metadata=metadata, size=0)) + + return tablespaces, basebackup_data_files, empty_dirs + def _get_basebackup( self, pgdata, @@ -386,7 +483,9 @@ def _get_basebackup( "$PGDATA target directory {!r} exists, is not empty and --overwrite not specified, aborting.".format(pgdata) ) - if metadata.get("format") == "pghoard-bb-v2": + empty_dirs = None + basebackup_data_files: List[FileInfo] = [] + if metadata.get("format") == BaseBackupFormat.v2: # "Backup file" is a metadata object, fetch it to get more information bmeta_compressed = self.storage.get_file_bytes(basebackup["name"]) with rohmufile.file_reader( @@ -398,14 +497,16 @@ def _get_basebackup( self.log.debug("Backup metadata: %r", bmeta) tablespaces = bmeta["tablespaces"] - basebackup_data_files = [[ - os.path.join(self.config["backup_sites"][site]["prefix"], "basebackup_chunk", chunk["chunk_filename"]), - chunk["result_size"], - ] for chunk in bmeta["chunks"]] + basebackup_data_files = [ + FilePathInfo( + name=os.path.join(self._get_site_prefix(site), "basebackup_chunk", chunk["chunk_filename"]), + size=chunk["result_size"] + ) for chunk in bmeta["chunks"] + ] # We need the files from the main basebackup file too - basebackup_data_files.append([(bmeta_compressed, metadata), 0]) + basebackup_data_files.append(FileDataInfo(data=bmeta_compressed, metadata=metadata, size=0)) - elif metadata.get("format") == "pghoard-bb-v1": + elif metadata.get("format") == BaseBackupFormat.v1: # Tablespace information stored in object store metadata, look it up tsmetare = re.compile("^tablespace-name-([0-9]+)$") for kw, value in metadata.items(): @@ -420,11 +521,15 @@ def _get_basebackup( "path": tspath, } - basebackup_data_files = [[basebackup["name"], basebackup["size"]]] + basebackup_data_files = [FilePathInfo(name=basebackup["name"], size=basebackup["size"])] + elif metadata.get("format") == BaseBackupFormat.delta_v1: + tablespaces, basebackup_data_files, empty_dirs = self._get_delta_basebackup_data( + site, metadata, basebackup["name"] + ) else: # Object is a raw (encrypted, compressed) basebackup - basebackup_data_files = [[basebackup["name"], basebackup["size"]]] + basebackup_data_files = [FilePathInfo(name=basebackup["name"], size=basebackup["size"])] if tablespace_base_dir and not os.path.exists(tablespace_base_dir) and not overwrite: # we just care that the dir exists, but we're OK if there are other objects there @@ -470,6 +575,11 @@ def _get_basebackup( for dirname in dirs_to_create: os.makedirs(dirname) os.chmod(dirname, 0o700) + if empty_dirs: + for rel_path in empty_dirs: + dirname = os.path.join(pgdata, rel_path) + os.makedirs(dirname, exist_ok=True) + os.chmod(dirname, 0o700) fetcher = BasebackupFetcher( app_config=self.config, @@ -515,12 +625,12 @@ def run(self, args=None): return 1 -class BasebackupFetcher(): - def __init__(self, *, app_config, debug, site, pgdata, tablespaces, data_files, status_output_file=None): +class BasebackupFetcher: + def __init__(self, *, app_config, debug, site, pgdata, tablespaces, data_files: List[FileInfo], status_output_file=None): self.log = logging.getLogger(self.__class__.__name__) self.completed_jobs = set() self.config = app_config - self.data_files = [{"fn_or_data": item[0], "id": str(uuid.uuid4()), "size": item[1]} for item in data_files] + self.data_files = data_files self.debug = debug self.download_progress_per_file = {} self.errors = 0 @@ -593,10 +703,10 @@ def _process_count(self): return min(self.config["restore_process_count"], len(self.data_files)) def _setup_progress_tracking(self, manager): - self.total_download_size = sum(item["size"] for item in self.data_files) - initial_progress = [[item["fn_or_data"], item["size"] if item["id"] in self.completed_jobs else 0] + self.total_download_size = sum(item.size for item in self.data_files) + initial_progress = [[item.name, item.size if item.id in self.completed_jobs else 0] for item in self.data_files - if not isinstance(item["fn_or_data"], tuple)] + if isinstance(item, FilePathInfo)] self.download_progress_per_file = manager.dict(initial_progress) def current_progress(self): @@ -645,20 +755,20 @@ def jobs_in_progress(self): def _queue_jobs(self, pool): for item in self.data_files: with self.lock: - if item["id"] in self.completed_jobs or item["id"] in self.pending_jobs: + if item.id in self.completed_jobs or item.id in self.pending_jobs: continue - self.pending_jobs.add(item["id"]) - self._queue_job(pool, item["id"], item["fn_or_data"], item["size"]) + self.pending_jobs.add(item.id) + self._queue_job(pool, item) - def _queue_job(self, pool, key, data_file, data_file_size): + def _queue_job(self, pool, file_info: FileInfo): + key = file_info.id pool.apply_async( _fetch_and_process_chunk, [], { "app_config": self.config, "debug": self.debug, - "data_file": data_file, - "data_file_size": data_file_size, + "file_info": file_info, "download_progress_per_file": self.download_progress_per_file, "site": self.site, "pgdata": self.pgdata, @@ -705,18 +815,17 @@ def dict(self, *args, **kwargs): def _fetch_and_process_chunk( - *, app_config, debug, data_file, data_file_size, download_progress_per_file, site, pgdata, tablespaces + *, app_config, debug, file_info: FileInfo, download_progress_per_file, site, pgdata, tablespaces ): logutil.configure_logging(level=logging.DEBUG if debug else logging.INFO) - fetcher = ChunkFetcher(app_config, data_file, data_file_size, download_progress_per_file, site, pgdata, tablespaces) + fetcher = ChunkFetcher(app_config, file_info, download_progress_per_file, site, pgdata, tablespaces) fetcher.process_chunk() class ChunkFetcher: - def __init__(self, app_config, data_file, data_file_size, download_progress_per_file, site, pgdata, tablespaces): + def __init__(self, app_config, file_info: FileInfo, download_progress_per_file, site, pgdata, tablespaces): self.config = app_config - self.data_file = data_file - self.data_file_size = data_file_size + self.file_info = file_info self.download_progress_per_file = download_progress_per_file self.log = logging.getLogger(self.__class__.__name__) self.pgdata = pgdata @@ -728,30 +837,51 @@ def _create_transfer(self): return get_transfer(object_storage_config) def _progress_callback(self, current_pos, expected_max): - self.download_progress_per_file[self.data_file] = self.data_file_size * (current_pos / expected_max) + assert isinstance(self.file_info, FilePathInfo) + self.download_progress_per_file[self.file_info.name] = self.file_info.size * (current_pos / expected_max) def process_chunk(self): - self.log.debug("Processing one chunk: %r", self.data_file) - if isinstance(self.data_file, tuple): - data, metadata = self.data_file - src = io.BytesIO(data) - self._fetch_and_extract_one_backup(metadata, len(data), lambda sink: shutil.copyfileobj(src, sink)) - else: + self.log.info("Processing one chunk: %r", self.file_info) + + if self.file_info.file_type not in {FileInfoType.regular, FileInfoType.delta}: + raise NotImplementedError(f"Unknown FileInfoType {self.file_info.file_type}") + + if isinstance(self.file_info, FileDataInfo): + if self.file_info.file_type == FileInfoType.regular: + src = io.BytesIO(self.file_info.data) + self._fetch_and_extract_one_backup( + self.file_info.metadata, len(self.file_info.data), lambda sink: shutil.copyfileobj(src, sink) + ) + elif self.file_info.file_type == FileInfoType.delta: + assert self.file_info.new_name + with open(os.path.join(self.pgdata, self.file_info.new_name), "wb") as out_f: + out_f.write(self.file_info.data) + + elif isinstance(self.file_info, FilePathInfo): transfer = self._create_transfer() - metadata = transfer.get_metadata_for_key(self.data_file) + metadata = transfer.get_metadata_for_key(self.file_info.name) def fetch_fn(sink): - transfer.get_contents_to_fileobj(self.data_file, sink, progress_callback=self._progress_callback) + transfer.get_contents_to_fileobj(self.file_info.name, sink, progress_callback=self._progress_callback) - self._fetch_and_extract_one_backup(metadata, self.data_file_size, fetch_fn) + if self.file_info.file_type == FileInfoType.regular: + self._fetch_and_extract_one_backup(metadata, self.file_info.size, fetch_fn) + elif self.file_info.file_type == FileInfoType.delta: + assert self.file_info.new_name + self._fetch_delta_file(metadata, fetch_fn) + + else: + raise NotImplementedError(f"Unknown FileInfo {self.file_info.__class__.__name__}") def _build_tar_args(self, metadata): base_args = [self.config["tar_executable"], "-xf", "-", "-C", self.pgdata] file_format = metadata.get("format") if not file_format: return base_args - elif file_format in {"pghoard-bb-v1", "pghoard-bb-v2"}: + elif file_format in {BaseBackupFormat.v1, BaseBackupFormat.v2, BaseBackupFormat.delta_v1}: extra_args = ["--exclude", ".pghoard_tar_metadata.json", "--transform", "s,^pgdata/,,"] + if file_format == BaseBackupFormat.delta_v1: + extra_args += ["--exclude", ".manifest.json"] if self.tablespaces: extra_args.append("--absolute-names") for tsname, settings in self.tablespaces.items(): @@ -765,6 +895,20 @@ def _build_tar_args(self, metadata): else: raise RestoreError("Unrecognized basebackup format {!r}".format(file_format)) + def _fetch_delta_file(self, metadata, fetch_fn): + with open(os.path.join(self.pgdata, self.file_info.new_name), "wb") as target_file: + sink = rohmufile.create_sink_pipeline( + output=target_file, + file_size=self.file_info.size, + metadata=metadata, + key_lookup=config.key_lookup_for_site(self.config, self.site), + ) + fetch_fn(sink) + + self.log.info( + "Processing of delta file completed successfully: %r -> %r", self.file_info.name, self.file_info.new_name + ) + def _fetch_and_extract_one_backup(self, metadata, file_size, fetch_fn): with subprocess.Popen( self._build_tar_args(metadata), @@ -793,7 +937,7 @@ def _fetch_and_extract_one_backup(self, metadata, file_size, fetch_fn): tar.stdin = None output = tar.stderr.read() exit_code = tar.wait() - file_name = "" if isinstance(self.data_file, tuple) else self.data_file + file_name = "" if isinstance(self.file_info, FileDataInfo) else self.file_info.name if exit_code != 0: raise Exception("tar exited with code {!r} for file {!r}, output: {!r}".format(exit_code, file_name, output)) self.log.info("Processing of %r completed successfully", file_name) @@ -845,6 +989,8 @@ def main(): restore = Restore() return restore.run() except (InvalidConfigurationError, RestoreError) as ex: + import traceback + traceback.print_exc() print("FATAL: {}: {}".format(ex.__class__.__name__, ex)) return 1 diff --git a/pghoard/rohmu/dates.py b/pghoard/rohmu/dates.py index 05490e41..f85e1a22 100644 --- a/pghoard/rohmu/dates.py +++ b/pghoard/rohmu/dates.py @@ -39,3 +39,7 @@ def parse_timestamp(ts, *, with_tz=True, assume_local=False): tz = dateutil.tz.tzlocal() if assume_local else datetime.timezone.utc return dt.replace(tzinfo=tz) + + +def now(): + return datetime.datetime.now(datetime.timezone.utc) diff --git a/pghoard/rohmu/delta/__init__.py b/pghoard/rohmu/delta/__init__.py new file mode 100644 index 00000000..956777b8 --- /dev/null +++ b/pghoard/rohmu/delta/__init__.py @@ -0,0 +1 @@ +# Copyright (c) 2021 Aiven, Helsinki, Finland. https://aiven.io/ diff --git a/pghoard/rohmu/delta/common.py b/pghoard/rohmu/delta/common.py new file mode 100644 index 00000000..a1fd979b --- /dev/null +++ b/pghoard/rohmu/delta/common.py @@ -0,0 +1,282 @@ +# Copyright (c) 2021 Aiven, Helsinki, Finland. https://aiven.io/ +import functools +import hashlib +import json as _json +import logging +import math +import os +from datetime import datetime +from multiprocessing.dummy import Pool +from pathlib import Path +from typing import List, Optional + +from pydantic import BaseModel, Field + +from pghoard.rohmu.dates import now + +_hash = hashlib.blake2s +_log_1_1 = math.log(1.1) + +# Hexdigest is 32 bytes, so something orders of magnitude more at least +EMBEDDED_FILE_SIZE = 150 + +logger = logging.getLogger(__name__) + + +def hash_hexdigest_readable(f, *, read_buffer=1_000_000): + h = _hash() + while True: + data = f.read(read_buffer) + if not data: + break + h.update(data) + return h.hexdigest() + + +def increase_worth_reporting(value, new_value=None, *, total=None): + """ Make reporting sparser and sparser as values grow larger + - report every 1.1**N or so + - if we know total, report every percent + """ + if new_value is None: + new_value = value + value = new_value - 1 + if total is not None: + if new_value == total or total <= 100: + return True + old_percent = 100 * value // total + new_percent = 100 * new_value // total + return old_percent != new_percent + if value <= 10 or new_value <= 10: + return True + old_exp = int(math.log(value) / _log_1_1) + new_exp = int(math.log(new_value) / _log_1_1) + return old_exp != new_exp + + +class DeltaModel(BaseModel): + class Config: + # As we're keen to both export and decode json, just using + # enum values for encode/decode is much saner than the default + # enumname.value (it is also slightly less safe but oh well) + use_enum_values = True + + # Extra values should be errors, as they are most likely typos + # which lead to grief when not detected. However, if we ever + # start deprecating some old fields and not wanting to parse + # them, this might need to be revisited. + extra = "forbid" + + # Validate field default values too + validate_all = True + + # Validate also assignments + # validate_assignment = True + # TBD: Figure out why this doesn't work in some unit tests; + # possibly the tests themselves are broken + + def jsondict(self, **kw): + # By default, + # + # .json() returns json string. + # .dict() returns Python dict, but it has things that are not + # json serializable. + # + # We provide json seralizable dict (super inefficiently) here. + # + # This is mostly used for test code so that should be fine + return _json.loads(self.json(**kw)) + + +class SizeLimitedFile: + def __init__(self, *, path, file_size): + self._f = open(path, "rb") + self._file_size = file_size + self.tell = self._f.tell + + def __enter__(self): + return self + + def __exit__(self, t, v, tb): + self._f.close() + + def read(self, n=None): + can_read = max(0, self._file_size - self._f.tell()) + if n is None: + n = can_read + n = min(can_read, n) + return self._f.read(n) + + def seek(self, ofs, whence=0): + if whence == os.SEEK_END: + ofs += self._file_size + whence = os.SEEK_SET + return self._f.seek(ofs, whence) + + +class SnapshotHash(DeltaModel): + """ + This class represents something that is to be stored in the object storage. + + size is provided mainly to allow for even loading of nodes in case + same hexdigest is available from multiple nodes. + + """ + hexdigest: str + size: int + + def __eq__(self, other): + if isinstance(other, SnapshotHash): + return self.hexdigest == other.hexdigest + return False + + def __hash__(self): + # hexdigests should be unique, regardless of size + return hash(self.hexdigest) + + +@functools.total_ordering +class SnapshotFile(DeltaModel): + relative_path: Path + file_size: int + stored_file_size: int + mtime_ns: int + hexdigest: str = "" + content_b64: Optional[str] + + def __lt__(self, o): + # In our use case, paths uniquely identify files we care about + return self.relative_path < o.relative_path + + def equals_excluding_mtime(self, o): + return self.copy(update={"mtime_ns": 0}) == o.copy(update={"mtime_ns": 0}) + + def open_for_reading(self, root_path): + return SizeLimitedFile(path=root_path / self.relative_path, file_size=self.file_size) + + +class SnapshotState(DeltaModel): + root_globs: List[str] + files: List[SnapshotFile] + empty_dirs: List[Path] + + +class SnapshotResult(DeltaModel): + # when was the operation started ( / done ) + start: datetime = Field(default_factory=now) + end: Optional[datetime] + # + # should be passed opaquely to restore + state: Optional[SnapshotState] + # + # Summary data for manifest use + files: int = 0 + total_size: int = 0 + + # populated only if state is available + hashes: Optional[List[SnapshotHash]] + + +class SnapshotUploadResult(DeltaModel): + total_size: int = 0 + total_stored_size: int = 0 + + +class BackupManifest(DeltaModel): + start: datetime + end: datetime = Field(default_factory=now) + + # Filesystem snapshot contents of the backup + snapshot_result: SnapshotResult + + # What did the upload return (mostly for statistics) + upload_result: SnapshotUploadResult + + +class Progress(DeltaModel): + """ JSON-encodable progress meter of sorts """ + handled: int = 0 + failed: int = 0 + total: int = 0 + final: bool = False + + def __repr__(self): + finished = ", finished" if self.final else "" + return f"{self.handled}/{self.total} handled, {self.failed} failures{finished}" + + def start(self, n): + " Optional 'first' step, just for logic handling state (e.g. no progress object reuse desired) " + assert not self.total + logger.debug("start") + self.add_total(n) + + def add_total(self, n): + if not n: + return + old_total = self.total + self.total += n + if increase_worth_reporting(old_total, self.total): + logger.debug("add_total %r -> %r", n, self) + assert not self.final + + def add_fail(self, n=1, *, info="add_fail"): + assert n > 0 + old_failed = self.failed + self.failed += n + if increase_worth_reporting(old_failed, self.failed): + logger.debug("%s %r -> %r", info, n, self) + assert not self.final + + def add_success(self, n=1, *, info="add_success"): + assert n > 0 + old_handled = self.handled + self.handled += n + assert self.handled <= self.total + if increase_worth_reporting(old_handled, self.handled, total=self.total): + logger.debug("%s %r -> %r", info, n, self) + assert not self.final + + def download_success(self, size): + self.add_success(size, info="download_success") + + def upload_success(self, hexdigest): + self.add_success(info=f"upload_success {hexdigest}") + + def upload_missing(self, hexdigest): + self.add_fail(info=f"upload_missing {hexdigest}") + + def upload_failure(self, hexdigest): + self.add_fail(info=f"upload_failure {hexdigest}") + + def done(self): + assert self.total is not None and self.handled <= self.total + assert not self.final + self.final = True + logger.debug("done %r", self) + + @property + def finished_successfully(self): + return self.final and not self.failed and self.handled == self.total + + @property + def finished_failed(self): + return self.final and not self.finished_successfully + + @classmethod + def merge(cls, progresses): + p = cls() + for progress in progresses: + p.handled += progress.handled + p.failed += progress.failed + p.total += progress.total + p.final = all(progress.final for progress in progresses) + return p + + +def parallel_map_to(*, fun, iterable, result_callback, n=None) -> bool: + iterable_as_list = list(iterable) + with Pool(n) as p: + for map_in, map_out in zip(iterable_as_list, p.imap(fun, iterable_as_list)): + if not result_callback(map_in=map_in, map_out=map_out): + return False + return True diff --git a/pghoard/rohmu/delta/snapshot.py b/pghoard/rohmu/delta/snapshot.py new file mode 100644 index 00000000..a3e53976 --- /dev/null +++ b/pghoard/rohmu/delta/snapshot.py @@ -0,0 +1,238 @@ +# Copyright (c) 2021 Aiven, Helsinki, Finland. https://aiven.io/ +import base64 +import logging +import os +import threading +from pathlib import Path +from typing import Callable, Optional + +from pghoard.rohmu.delta.common import ( + EMBEDDED_FILE_SIZE, Progress, SnapshotFile, SnapshotHash, SnapshotState, hash_hexdigest_readable, + increase_worth_reporting, parallel_map_to +) + +logger = logging.getLogger(__name__) + + +class Snapshotter: + """Snapshotter keeps track of files on disk, and their hashes. + + The hash on disk MAY change, which may require subsequent + incremential snapshot and-or ignoring the files which have changed. + + The output to outside is just root object's hash, as well as list + of other hashes which correspond to files referred to within the + file list contained in root object. + + Note that any call to public API MUST be made with + snapshotter.lock held. This is because Snapshotter is process-wide + utility that is shared across operations, possibly used from + multiple threads, and the single-operation-only mode of operation + is not exactly flawless (the 'new operation can be started with + old running' is intentional feature but new operation should + eventually replace the old). The lock itself might not need to be + built-in to Snapshotter, but having it there enables asserting its + state during public API calls. + """ + def __init__(self, *, src, dst, globs, src_iterate_func: Optional[Callable] = None, parallel=1): + assert globs + self.src = Path(src) + self.dst = Path(dst) + self.globs = globs + self.src_iterate_func = src_iterate_func + self.relative_path_to_snapshotfile = {} + self.hexdigest_to_snapshotfiles = {} + self.parallel = parallel + self.lock = threading.Lock() + self.empty_dirs = [] + + def _list_files(self, basepath: Path): + result_files = set() + for glob in self.globs: + for path in basepath.glob(glob): + if not path.is_file() or path.is_symlink(): + continue + relpath = path.relative_to(basepath) + result_files.add(relpath) + + return sorted(result_files) + + def _list_dirs_and_files(self, basepath: Path): + files = self._list_files(basepath) + dirs = {p.parent for p in files} + return sorted(dirs), files + + def _add_snapshotfile(self, snapshotfile: SnapshotFile): + old_snapshotfile = self.relative_path_to_snapshotfile.get(snapshotfile.relative_path, None) + if old_snapshotfile: + self._remove_snapshotfile(old_snapshotfile) + self.relative_path_to_snapshotfile[snapshotfile.relative_path] = snapshotfile + if snapshotfile.hexdigest: + self.hexdigest_to_snapshotfiles.setdefault(snapshotfile.hexdigest, []).append(snapshotfile) + + def _remove_snapshotfile(self, snapshotfile: SnapshotFile): + assert self.relative_path_to_snapshotfile[snapshotfile.relative_path] == snapshotfile + del self.relative_path_to_snapshotfile[snapshotfile.relative_path] + if snapshotfile.hexdigest: + self.hexdigest_to_snapshotfiles[snapshotfile.hexdigest].remove(snapshotfile) + + def _snapshotfile_from_path(self, relative_path): + src_path = self.src / relative_path + st = src_path.stat() + return SnapshotFile(relative_path=relative_path, mtime_ns=st.st_mtime_ns, file_size=st.st_size, stored_file_size=0) + + def _gen_snapshot_hashes(self, relative_paths, reuse_old_snapshotfiles): + same = 0 + lost = 0 + for relative_path in relative_paths: + old_snapshotfile = self.relative_path_to_snapshotfile.get(relative_path) + try: + snapshotfile = self._snapshotfile_from_path(relative_path) + except FileNotFoundError: + lost += 1 + if increase_worth_reporting(lost): + logger.debug("#%d. lost - %s disappeared before stat, ignoring", lost, self.src / relative_path) + continue + if reuse_old_snapshotfiles and old_snapshotfile: + snapshotfile.hexdigest = old_snapshotfile.hexdigest + snapshotfile.content_b64 = old_snapshotfile.content_b64 + if old_snapshotfile == snapshotfile: + same += 1 + if increase_worth_reporting(same): + logger.debug("#%d. same - %r in %s is same", same, old_snapshotfile, relative_path) + continue + yield snapshotfile + + def get_snapshot_hashes(self): + assert self.lock.locked() + return [ + SnapshotHash(hexdigest=dig, size=sf[0].file_size) for dig, sf in self.hexdigest_to_snapshotfiles.items() if sf + ] + + def get_snapshot_state(self): + assert self.lock.locked() + return SnapshotState( + root_globs=self.globs, files=sorted(self.relative_path_to_snapshotfile.values()), empty_dirs=self.empty_dirs + ) + + def update_snapshot_file_data(self, *, relative_path, hexdigest, file_size, stored_file_size): + snapshotfile = self.relative_path_to_snapshotfile[relative_path] + snapshotfile.hexdigest = hexdigest + snapshotfile.file_size = file_size + snapshotfile.stored_file_size = stored_file_size + + def _snapshot_create_missing_directories(self, *, src_dirs, dst_dirs): + changes = 0 + for i, relative_dir in enumerate(set(src_dirs).difference(dst_dirs), 1): + dst_path = self.dst / relative_dir + dst_path.mkdir(parents=True, exist_ok=True) + if increase_worth_reporting(i): + logger.debug("#%d. new directory: %r", i, relative_dir) + changes += 1 + return changes + + def _snapshot_remove_extra_files(self, *, src_files, dst_files): + changes = 0 + for i, relative_path in enumerate(set(dst_files).difference(src_files), 1): + dst_path = self.dst / relative_path + snapshotfile = self.relative_path_to_snapshotfile.get(relative_path) + if snapshotfile: + self._remove_snapshotfile(snapshotfile) + dst_path.unlink() + if increase_worth_reporting(i): + logger.debug("#%d. extra file: %r", i, relative_path) + changes += 1 + return changes + + def _snapshot_add_missing_files(self, *, src_files, dst_files): + existing = 0 + disappeared = 0 + changes = 0 + for i, relative_path in enumerate(set(src_files).difference(dst_files), 1): + src_path = self.src / relative_path + dst_path = self.dst / relative_path + try: + os.link(src=src_path, dst=dst_path, follow_symlinks=False) + except FileExistsError: + # This happens only if snapshot is started twice at + # same time. While it is technically speaking upstream + # error, we rather handle it here than leave + # exceptions not handled. + existing += 1 + if increase_worth_reporting(existing): + logger.debug("#%d. %s already existed, ignoring", existing, src_path) + continue + except FileNotFoundError: + disappeared += 1 + if increase_worth_reporting(disappeared): + logger.debug("#%d. %s disappeared before linking, ignoring", disappeared, src_path) + continue + if increase_worth_reporting(i - disappeared): + logger.debug("#%d. new file: %r", i - disappeared, relative_path) + changes += 1 + return changes + + def snapshot(self, *, progress: Optional[Progress] = None, reuse_old_snapshotfiles=True): + assert self.lock.locked() + + if progress is None: + progress = Progress() + progress.start(3) + + if self.src_iterate_func: + src_dirs = set() + src_files = set() + for item in self.src_iterate_func(): + path = Path(item) + if path.is_file() and not path.is_symlink(): + src_files.add(path.relative_to(self.src)) + elif path.is_dir(): + src_dirs.add(path.relative_to(self.src)) + + src_dirs = sorted(src_dirs | {p.parent for p in src_files}) + src_files = sorted(src_files) + else: + src_dirs, src_files = self._list_dirs_and_files(self.src) + + dst_dirs, dst_files = self._list_dirs_and_files(self.dst) + + # Create missing directories + changes = self._snapshot_create_missing_directories(src_dirs=src_dirs, dst_dirs=dst_dirs) + progress.add_success() + + # Remove extra files + changes += self._snapshot_remove_extra_files(src_files=src_files, dst_files=dst_files) + progress.add_success() + + # Add missing files + changes += self._snapshot_add_missing_files(src_files=src_files, dst_files=dst_files) + progress.add_success() + + # We COULD also remove extra directories, but it is not + # probably really worth it and due to ignored files it + # actually might not even work. + + # Then, create/update corresponding snapshotfile objects (old + # ones were already removed) + dst_dirs, dst_files = self._list_dirs_and_files(self.dst) + self.empty_dirs = src_dirs + snapshotfiles = list(self._gen_snapshot_hashes(dst_files, reuse_old_snapshotfiles)) + progress.add_total(len(snapshotfiles)) + + def _cb(snapshotfile): + # src may or may not be present; dst is present as it is in snapshot + with snapshotfile.open_for_reading(self.dst) as f: + if snapshotfile.file_size <= EMBEDDED_FILE_SIZE: + snapshotfile.content_b64 = base64.b64encode(f.read()).decode() + else: + snapshotfile.hexdigest = hash_hexdigest_readable(f) + return snapshotfile + + def _result_cb(*, map_in, map_out): + self._add_snapshotfile(map_out) + progress.add_success() + return True + + changes += len(snapshotfiles) + parallel_map_to(iterable=snapshotfiles, fun=_cb, result_callback=_result_cb, n=self.parallel) + return changes diff --git a/pghoard/transfer.py b/pghoard/transfer.py index 1ee728d8..a32c66cb 100644 --- a/pghoard/transfer.py +++ b/pghoard/transfer.py @@ -51,6 +51,7 @@ def defaults(): return { "basebackup": EMPTY.copy(), "basebackup_chunk": EMPTY.copy(), + "basebackup_delta": EMPTY.copy(), "timeline": EMPTY.copy(), "xlog": EMPTY.copy(), } @@ -76,6 +77,8 @@ def form_key_path(file_to_transfer): name_parts = file_to_transfer["local_path"].split("/") if file_to_transfer["filetype"] == "basebackup_chunk": name = os.path.join(name_parts[-2], name_parts[-1]) + elif file_to_transfer["filetype"] == "basebackup_delta": + name = file_to_transfer["delta"]["hexdigest"] else: name = name_parts[-1] return os.path.join(file_to_transfer["prefix"], file_to_transfer["filetype"], name) @@ -139,7 +142,7 @@ def run(self): if oper == "upload": if filetype == "xlog": self.state[site][oper]["xlog"]["xlogs_since_basebackup"] += 1 - elif filetype == "basebackup": + elif filetype in {"basebackup", "basebackup_delta"}: # reset corresponding xlog stats at basebackup self.state[site][oper]["xlog"]["xlogs_since_basebackup"] = 0 @@ -243,7 +246,7 @@ def handle_upload(self, site, key, file_to_transfer): else: # Basebackups may be multipart uploads, depending on the driver. # Swift needs to know about this so it can do possible cleanups. - multipart = file_to_transfer["filetype"] in {"basebackup", "basebackup_chunk"} + multipart = file_to_transfer["filetype"] in {"basebackup", "basebackup_chunk", "basebackup_delta"} try: self.log.info("Uploading file to object store: src=%r dst=%r", file_to_transfer["local_path"], key) storage.store_file_from_disk( diff --git a/requirements.txt b/requirements.txt index bed0bf82..de37b6fd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,7 @@ google-api-python-client httplib2 oauth2client psycopg2 +pydantic python-dateutil python-snappy python-systemd diff --git a/setup.py b/setup.py index 5de6b3fb..36918f41 100644 --- a/setup.py +++ b/setup.py @@ -20,6 +20,7 @@ install_requires=[ "cryptography", "psycopg2 >= 2.0.0", + "pydantic", "python-dateutil", "python-snappy >= 0.5", "requests >= 1.2.0", diff --git a/test/basebackup_delta/chunks/0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b b/test/basebackup_delta/chunks/0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b new file mode 100644 index 0000000000000000000000000000000000000000..51f60f4694942ce16eef9c86918b804d619dc063 GIT binary patch literal 1252 zcmeH{&ubGw6vyAQN!{pf&D5su%A^?~2YYBpp~O8zSh1id=}Bvq+DS9rEZvMI~MC$rA_8f7Akt7|Oya)@Gsmkq^ zfM%6)dT2%}^lqKTm*_05QW-~`E{GYb#!)^f>d#s}aEHPum$7%ha}0AfNh9O^U|qVW zU)BAgR^E5U5t|oCI4E=|jkiI&iV8_XqttHYsK&uU*HR`~zQb0;d8+IfrMH(pGuNG_ zbvu#n?IACdt8^H9AG{dRr3MxBi<~{Et-}<$H@~s8d5&_@$X>KvdCS};QkX?a8Y^$L KgJ{C1_oJ183_gk1qKBc1wmnv*RL5~Gcd5SFfuao z=rJgP> znkn-P>px}|;?XytVr-`268yx-pz%6_i3w 0 + ss1 = self.get_snapshot_state() + assert self.snapshot(progress=Progress()) == 0 + ss2 = self.get_snapshot_state() + print("ss1", ss1) + print("ss2", ss2) + assert ss1 == ss2 + + +@pytest.fixture(name="snapshotter") +def fixture_snapshotter(tmpdir): + src = Path(tmpdir) / "src" + src.mkdir() + dst = Path(tmpdir) / "dst" + dst.mkdir() + yield SnapshotterWithDefaults(src=src, dst=dst, globs=["*"], parallel=1) diff --git a/test/test_basebackup.py b/test/test_basebackup.py index 6ec8ca9a..480a8896 100644 --- a/test/test_basebackup.py +++ b/test/test_basebackup.py @@ -18,6 +18,7 @@ from pghoard import common, metrics, pgutil from pghoard.basebackup import PGBaseBackup +from pghoard.common import BaseBackupMode from pghoard.restore import Restore, RestoreError from pghoard.rohmu import get_transfer from pghoard.rohmu.compat import makedirs @@ -50,6 +51,7 @@ def test_parse_backup_label(self, tmpdir): connection_info=None, basebackup_path=None, compression_queue=None, + storage=None, transfer_queue=None, metrics=metrics.Metrics(statsd={}) ) @@ -80,6 +82,7 @@ def create_test_files(): connection_info=None, basebackup_path=None, compression_queue=None, + storage=None, transfer_queue=None, metrics=metrics.Metrics(statsd={}) ) @@ -174,6 +177,7 @@ def test_find_and_split_files_to_backup(self, tmpdir): connection_info=None, basebackup_path=None, compression_queue=None, + storage=None, transfer_queue=None, metrics=metrics.Metrics(statsd={}) ) @@ -240,7 +244,7 @@ def _test_create_basebackup(self, capsys, db, pghoard, mode, replica=False, acti assert "pg-version" in out assert "start-wal-segment" in out - if mode == "local-tar": + if mode in {BaseBackupMode.local_tar, BaseBackupMode.delta}: assert "end-time" in out if replica is False: assert "end-wal-segment" in out @@ -255,7 +259,7 @@ def _test_create_basebackup(self, capsys, db, pghoard, mode, replica=False, acti assert backup["metadata"]["backup-reason"] == "scheduled" assert backup["metadata"]["backup-decision-time"] == now.isoformat() assert backup["metadata"]["normalized-backup-time"] == now.isoformat() - if mode == "local-tar": + if mode in {BaseBackupMode.local_tar, BaseBackupMode.delta}: if replica is False: assert "end-wal-segment" in backup["metadata"] assert "end-time" in backup["metadata"] @@ -313,6 +317,7 @@ def _test_restore_basebackup(self, db, pghoard, tmpdir, active_backup_mode="arch connection_info=None, basebackup_path=None, compression_queue=None, + storage=storage, transfer_queue=None, metrics=metrics.Metrics(statsd={}) ) @@ -341,28 +346,31 @@ def _test_basebackups(self, capsys, db, pghoard, tmpdir, mode, *, replica=False) self._test_restore_basebackup(db, pghoard, tmpdir) def test_basic_standalone_hot_backups(self, capsys, db, pghoard, tmpdir): - self._test_create_basebackup(capsys, db, pghoard, "basic", False, "standalone_hot_backup") + self._test_create_basebackup(capsys, db, pghoard, BaseBackupMode.basic, False, "standalone_hot_backup") self._test_restore_basebackup(db, pghoard, tmpdir, "standalone_hot_backup") def test_pipe_standalone_hot_backups(self, capsys, db, pghoard, tmpdir): - self._test_create_basebackup(capsys, db, pghoard, "pipe", False, "standalone_hot_backup") + self._test_create_basebackup(capsys, db, pghoard, BaseBackupMode.pipe, False, "standalone_hot_backup") self._test_restore_basebackup(db, pghoard, tmpdir, "standalone_hot_backup") def test_basebackups_basic(self, capsys, db, pghoard, tmpdir): - self._test_basebackups(capsys, db, pghoard, tmpdir, "basic") + self._test_basebackups(capsys, db, pghoard, tmpdir, BaseBackupMode.basic) def test_basebackups_basic_lzma(self, capsys, db, pghoard_lzma, tmpdir): - self._test_basebackups(capsys, db, pghoard_lzma, tmpdir, "basic") + self._test_basebackups(capsys, db, pghoard_lzma, tmpdir, BaseBackupMode.basic) + + def test_basebackups_delta(self, capsys, db, pghoard, tmpdir): + self._test_basebackups(capsys, db, pghoard, tmpdir, BaseBackupMode.delta) def test_basebackups_local_tar_nonexclusive(self, capsys, db, pghoard, tmpdir): if db.pgver < "9.6": pytest.skip("PostgreSQL 9.6+ required for non-exclusive backups") - self._test_basebackups(capsys, db, pghoard, tmpdir, "local-tar") + self._test_basebackups(capsys, db, pghoard, tmpdir, BaseBackupMode.local_tar) def test_basebackups_local_tar_legacy(self, capsys, db, pghoard, tmpdir): if db.pgver >= "9.6": pytest.skip("PostgreSQL < 9.6 required for exclusive backup tests") - self._test_basebackups(capsys, db, pghoard, tmpdir, "local-tar") + self._test_basebackups(capsys, db, pghoard, tmpdir, BaseBackupMode.local_tar) def test_basebackups_local_tar_exclusive_conflict(self, capsys, db, pghoard, tmpdir): if db.pgver >= "9.6": @@ -375,7 +383,7 @@ def test_basebackups_local_tar_exclusive_conflict(self, capsys, db, pghoard, tmp cursor = conn.cursor() cursor.execute("SELECT pg_start_backup('conflicting')") # pylint: disable=used-before-assignment need_stop = True - self._test_basebackups(capsys, db, pghoard, tmpdir, "local-tar") + self._test_basebackups(capsys, db, pghoard, tmpdir, BaseBackupMode.local_tar) need_stop = False finally: if need_stop: @@ -394,14 +402,14 @@ def test_basebackups_local_tar_pgespresso(self, capsys, db, pghoard, tmpdir): pytest.skip("pgespresso not available") try: cursor.execute("CREATE EXTENSION pgespresso") - self._test_basebackups(capsys, db, pghoard, tmpdir, "local-tar") + self._test_basebackups(capsys, db, pghoard, tmpdir, BaseBackupMode.local_tar) finally: cursor.execute("DROP EXTENSION pgespresso") def test_basebackups_replica_local_tar_nonexclusive(self, capsys, recovery_db, pghoard, tmpdir): if recovery_db.pgver < "9.6": pytest.skip("PostgreSQL 9.6+ required for non-exclusive backups") - self._test_basebackups(capsys, recovery_db, pghoard, tmpdir, "local-tar", replica=True) + self._test_basebackups(capsys, recovery_db, pghoard, tmpdir, BaseBackupMode.local_tar, replica=True) def test_basebackups_replica_local_tar_pgespresso(self, capsys, recovery_db, pghoard, tmpdir): conn_str = pgutil.create_connection_string(recovery_db.user) @@ -411,10 +419,10 @@ def test_basebackups_replica_local_tar_pgespresso(self, capsys, recovery_db, pgh cursor.execute("SELECT 1 FROM pg_available_extensions WHERE name = 'pgespresso' AND default_version >= '1.2'") if not cursor.fetchone(): pytest.skip("pgespresso not available") - self._test_basebackups(capsys, recovery_db, pghoard, tmpdir, "local-tar", replica=True) + self._test_basebackups(capsys, recovery_db, pghoard, tmpdir, BaseBackupMode.local_tar, replica=True) def test_basebackups_pipe(self, capsys, db, pghoard, tmpdir): - self._test_basebackups(capsys, db, pghoard, tmpdir, "pipe") + self._test_basebackups(capsys, db, pghoard, tmpdir, BaseBackupMode.pipe) def test_basebackups_tablespaces(self, capsys, db, pghoard, tmpdir): # Create a test tablespace for this instance, but make sure we drop it at the end of the test as the diff --git a/test/test_pghoard.py b/test/test_pghoard.py index 14ed88be..aab62b4c 100644 --- a/test/test_pghoard.py +++ b/test/test_pghoard.py @@ -5,18 +5,23 @@ See LICENSE for details """ import datetime +import io import json import os +import tarfile import time +from pathlib import Path from unittest.mock import Mock, patch import psycopg2 -from pghoard.common import (create_alert_file, delete_alert_file, write_json_file) +from pghoard import common +from pghoard.common import (BaseBackupFormat, create_alert_file, delete_alert_file, write_json_file) from pghoard.pghoard import PGHoard from pghoard.pgutil import create_connection_string - # pylint: disable=attribute-defined-outside-init +from pghoard.rohmu import rohmufile + from .base import PGHoardTestCase @@ -340,6 +345,118 @@ def write_backup_and_wal_files(what): assert len(basebackups) == 1 assert len(os.listdir(wal_storage_path)) == 1 + def test_local_refresh_backup_list_and_delete_old_delta_format(self): + basebackup_storage_path = os.path.join(self.local_storage_dir, "basebackup") + basebackup_delta_path = os.path.join(self.local_storage_dir, "basebackup_delta") + + os.makedirs(basebackup_storage_path) + os.makedirs(basebackup_delta_path) + + self.pghoard.set_state_defaults(self.test_site) + assert self.pghoard.get_remote_basebackups_info(self.test_site) == [] + + def write_backup_files(what): + for bb, bb_data in what.items(): + wal_start, hexdigests = bb_data + if bb: + bb_path = os.path.join(basebackup_storage_path, bb) + date_parts = [int(part) for part in bb.replace("_", "-").split("-")] + start_time = datetime.datetime(*date_parts, tzinfo=datetime.timezone.utc) + + metadata = { + "manifest": { + "snapshot_result": { + "state": { + "files": [{ + "relative_path": h, + "hexdigest": h + } for h in hexdigests] + } + } + } + } + mtime = time.time() + blob = io.BytesIO(common.json_encode(metadata, binary=True)) + ti = tarfile.TarInfo(name=".pghoard_tar_metadata.json") + ti.size = len(blob.getbuffer()) + ti.mtime = mtime + + with open(bb_path, "wb") as fp: + with rohmufile.file_writer( + compression_algorithm="snappy", compression_level=0, fileobj=fp + ) as output_obj: + with tarfile.TarFile(fileobj=output_obj, mode="w") as tar: + tar.addfile(ti, blob) + input_size = output_obj.tell() + + for h in hexdigests: + with open(Path(basebackup_delta_path) / h, "w") as digest_file, \ + open((Path(basebackup_delta_path) / (h + ".metadata")), "w") as digest_meta_file: + json.dump({}, digest_file) + json.dump({}, digest_meta_file) + + with open(bb_path + ".metadata", "w") as fp: + json.dump({ + "start-wal-segment": wal_start, + "start-time": start_time.isoformat(), + "format": BaseBackupFormat.delta_v1, + "compression-algorithm": "snappy", + "original-file-size": input_size + }, fp) + + backups_and_delta = { + "2015-08-25_0": ( + "000000010000000A000000AA", [ + "214967296374cae6f099e19910b33a0893f0abc62f50601baa2875ab055cd27b", + "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" + ] + ), + "2015-08-25_1": [ + "000000020000000A000000AB", ["214967296374cae6f099e19910b33a0893f0abc62f50601baa2875ab055cd27b"] + ], + "2015-08-25_2": [ + "000000030000000A000000AC", ["214967296374cae6f099e19910b33a0893f0abc62f50601baa2875ab055cd27b"] + ], + "2015-08-25_3": [ + "000000040000000B00000003", + [ + "214967296374cae6f099e19910b33a0893f0abc62f50601baa2875ab055cd27b", + "4b65df4d0857bbbcb22aa086e02bd8414a9f3a484869f2b96ed7c62f3c4eb088" + ] + ], + } + write_backup_files(backups_and_delta) + basebackups = self.pghoard.get_remote_basebackups_info(self.test_site) + assert len(basebackups) == 4 + self.pghoard.refresh_backup_list_and_delete_old(self.test_site) + basebackups = self.pghoard.get_remote_basebackups_info(self.test_site) + assert len(basebackups) == 1 + + left_delta_files = [p for p in os.listdir(basebackup_delta_path) if not p.endswith(".metadata")] + assert sorted(left_delta_files) == [ + "214967296374cae6f099e19910b33a0893f0abc62f50601baa2875ab055cd27b", + "4b65df4d0857bbbcb22aa086e02bd8414a9f3a484869f2b96ed7c62f3c4eb088" + ] + + new_delta_data = { + "2015-08-25_4": ( + "000000040000000B00000004", [ + "fc61c91430dcb345001306ad513f103380c16896093a17868fc909aeda393559", + ] + ) + } + write_backup_files(new_delta_data) + basebackups = self.pghoard.get_remote_basebackups_info(self.test_site) + assert len(basebackups) == 2 + self.pghoard.refresh_backup_list_and_delete_old(self.test_site) + basebackups = self.pghoard.get_remote_basebackups_info(self.test_site) + assert len(basebackups) == 1 + + left_delta_files = [p for p in os.listdir(basebackup_delta_path) if not p.endswith(".metadata")] + assert sorted(left_delta_files) == [ + "fc61c91430dcb345001306ad513f103380c16896093a17868fc909aeda393559", + ] + def test_alert_files(self): alert_file_path = os.path.join(self.config["alert_file_dir"], "test_alert") create_alert_file(self.pghoard.config, "test_alert") @@ -443,14 +560,17 @@ def test_pause_on_disk_full(self, db, pghoard_separate_volume): os.makedirs(wal_directory, exist_ok=True) pghoard.receivexlog_listener(pghoard.test_site, db.user, wal_directory) - # Create 20 new WAL segments in very quick succession. Our volume for incoming WALs is only 100 + # Create 15 new WAL segments in very quick succession. Our volume for incoming WALs is only 100 # MiB so if logic for automatically suspending pg_receive(xlog|wal) wasn't working the volume # would certainly fill up and the files couldn't be processed. Now this should work fine. for _ in range(16): + # Note: do not combine two function call in one select, PG executes it differently and + # sometimes looks like it generates less WAL files than we wanted + cursor.execute("SELECT txid_current()") if conn.server_version >= 100000: - cursor.execute("SELECT txid_current(), pg_switch_wal()") + cursor.execute("SELECT pg_switch_wal()") else: - cursor.execute("SELECT txid_current(), pg_switch_xlog()") + cursor.execute("SELECT pg_switch_xlog()") start = time.monotonic() site = "test_pause_on_disk_full" diff --git a/test/test_restore.py b/test/test_restore.py index 6d02bf52..41a4e9e0 100644 --- a/test/test_restore.py +++ b/test/test_restore.py @@ -19,7 +19,9 @@ import pytest from pghoard.common import write_json_file -from pghoard.restore import (BasebackupFetcher, ChunkFetcher, Restore, RestoreError, create_recovery_conf) +from pghoard.restore import ( + BasebackupFetcher, ChunkFetcher, FileDataInfo, FileInfoType, FilePathInfo, Restore, RestoreError, create_recovery_conf +) from .base import PGHoardTestCase @@ -166,7 +168,11 @@ def test_progress_tracking_and_error_handling(self): status_output_file = os.path.join(test_output_file_tmp, "pghoard-restore-status.json") pgdata = "/tmp/test_restore" tablespaces = {"foo": {"oid": 1234, "path": "/tmp/test_restore2"}} - data_files = [("bar1", 1000), ("bar2", 2000), ((b"baz", {}), 0)] + data_files = [ + FilePathInfo(name="bar1", size=1000), + FilePathInfo(name="bar2", size=2000), + FileDataInfo(data=b"baz", metadata={}, size=0) + ] fetcher = BasebackupFetcher( app_config=config, data_files=data_files, @@ -201,17 +207,17 @@ def sleep_mock(sleep_time): assert fetcher.current_progress() == (0, 0) assert fetcher.jobs_in_progress() is True progress_dict["bar1"] = 1000 - fetcher.job_completed(fetcher.data_files[0]["id"]) + fetcher.job_completed(fetcher.data_files[0].id) elif call[0] == 1: assert fetcher.current_progress() == (1000, 1000 / 3000) assert fetcher.jobs_in_progress() is True progress_dict["bar2"] = 1000 - fetcher.job_failed(fetcher.data_files[1]["id"], Exception("test exception")) + fetcher.job_failed(fetcher.data_files[1].id, Exception("test exception")) check_status_output_file(expected_progress=1000 / 3000) elif call[0] == 2: assert fetcher.current_progress() == (2000, 2000 / 3000) assert fetcher.jobs_in_progress() is True - fetcher.job_completed(fetcher.data_files[2]["id"]) + fetcher.job_completed(fetcher.data_files[2].id) check_status_output_file(expected_progress=2000 / 3000) elif call[0] == 3: assert False @@ -314,6 +320,36 @@ def _fetch_and_extract_one_backup(self, metadata, file_size, fetch_fn): with pytest.raises(RestoreError): fetcher.fetch_all() + def test_restore_from_delta_files(self): + for tar in ["tar", "pghoard/gnutaremu.py"]: + self.run_restore_test( + "basebackup_delta", + tar, + self.delta, + files=[ + "0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b", + "4b65df4d0857bbbcb22aa086e02bd8414a9f3a484869f2b96ed7c62f3c4eb088", + "fc61c91430dcb345001306ad513f103380c16896093a17868fc909aeda393559", + ], + file_type=FileInfoType.delta, + ) + + def delta(self, fetcher, restore_dir): + fetcher.fetch_all() + + self.check_sha256( + os.path.join(restore_dir, "0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b"), + "24f3f08b786494bdd8d1393fdf47eafe3aa4b3f51720e23b62dae812e54f6cc7" + ) + self.check_sha256( + os.path.join(restore_dir, "4b65df4d0857bbbcb22aa086e02bd8414a9f3a484869f2b96ed7c62f3c4eb088"), + "960f11a4bb45060ac3c69551e4e99d9d713e98e2968f450b8abac37dcfed86e7" + ) + self.check_sha256( + os.path.join(restore_dir, "fc61c91430dcb345001306ad513f103380c16896093a17868fc909aeda393559"), + "8acdc937fca22a496215056ed3960bff6d3319b9c45f3050e8edfc09d7085c27" + ) + def test_tablespaces(self): def rm_tablespace_paths(): shutil.rmtree("/tmp/nsd5b2b8e4978847ef9b3056b7e01c51a8", ignore_errors=True) @@ -375,10 +411,20 @@ def tablespaces(self, fetcher, restore_dir): "84e3bda6f1abdd0fb0aff4bc6587ea07b9d8b61c1a0d6bdc4d16d339a761717f" ) - def run_restore_test(self, path, tar_executable, logic, tablespaces=None, files=None): + def run_restore_test(self, path, tar_executable, logic, tablespaces=None, files=None, file_type=FileInfoType.regular): chunk_dir = os.path.join("test", path, "chunks") - files = [fn for fn in os.listdir(chunk_dir) if ".metadata" not in fn and (not files or fn in files)] - files = [(fn, os.stat(os.path.join(chunk_dir, fn)).st_size) for fn in files] + files_names = [fn for fn in os.listdir(chunk_dir) if ".metadata" not in fn and (not files or fn in files)] + if file_type == FileInfoType.delta: + files = [ + FilePathInfo(name=fn, size=os.stat(os.path.join(chunk_dir, fn)).st_size, file_type=file_type, new_name=fn) + for fn in files_names + ] + else: + files = [ + FilePathInfo(name=fn, size=os.stat(os.path.join(chunk_dir, fn)).st_size, file_type=file_type) + for fn in files_names + ] + with open(os.path.join("test", path, "config.json"), "r") as f: config = json.loads(f.read()) restore_dir = mkdtemp(prefix=self.__class__.__name__) diff --git a/test/test_rohmu_delta.py b/test/test_rohmu_delta.py new file mode 100644 index 00000000..e10a7fc3 --- /dev/null +++ b/test/test_rohmu_delta.py @@ -0,0 +1,80 @@ +# Copyright (c) 2021 Aiven, Helsinki, Finland. https://aiven.io/ +import os + +import pytest + +from pghoard.rohmu.delta.common import Progress, SnapshotHash + + +@pytest.mark.timeout(2) +def test_snapshot(snapshotter): + with snapshotter.lock: + # Start with empty + assert snapshotter.snapshot(progress=Progress()) == 0 + src = snapshotter.src + dst = snapshotter.dst + assert not (dst / "foo").is_file() + + # Create files in src, run snapshot + snapshotter.create_4foobar() + ss2 = snapshotter.get_snapshot_state() + + assert (dst / "foo").is_file() + assert (dst / "foo").read_text() == "foobar" + assert (dst / "foo2").read_text() == "foobar" + + hashes = snapshotter.get_snapshot_hashes() + assert len(hashes) == 1 + assert hashes == [ + SnapshotHash(hexdigest="c6479bce75c9a573ba073af83191c280721170793da6e9e9480201de94ab0654", size=900) + ] + + while True: + (src / "foo").write_text("barfoo") # same length + if snapshotter.snapshot(progress=Progress()) > 0: + # Sometimes fails on first iteration(s) due to same mtime + # (inaccurate timestamps) + break + ss3 = snapshotter.get_snapshot_state() + assert ss2 != ss3 + assert snapshotter.snapshot(progress=Progress()) == 0 + assert (dst / "foo").is_file() + assert (dst / "foo").read_text() == "barfoo" + + # Remove file from src, run snapshot + for filename in ["foo", "foo2", "foobig", "foobig2"]: + (src / filename).unlink() + assert snapshotter.snapshot(progress=Progress()) > 0 + assert snapshotter.snapshot(progress=Progress()) == 0 + assert not (dst / filename).is_file() + + # Now shouldn't have any data hashes + hashes_empty = snapshotter.get_snapshot_hashes() + assert not hashes_empty + + with pytest.raises(AssertionError): + snapshotter.snapshot(progress=Progress()) + + with pytest.raises(AssertionError): + snapshotter.get_snapshot_state() + + with pytest.raises(AssertionError): + snapshotter.get_snapshot_hashes() + + +@pytest.mark.parametrize("test", [(os, "link", 1, 1), (None, "_snapshotfile_from_path", 3, 0)]) +def test_snapshot_error_filenotfound(snapshotter, mocker, test): + (obj, fun, exp_progress_1, exp_progress_2) = test + + def _not_really_found(*a, **kw): + raise FileNotFoundError + + obj = obj or snapshotter + mocker.patch.object(obj, fun, new=_not_really_found) + (snapshotter.src / "foo").write_text("foobar") + (snapshotter.src / "bar").write_text("foobar") + with snapshotter.lock: + progress = Progress() + assert snapshotter.snapshot(progress=progress) == exp_progress_1 + progress = Progress() + assert snapshotter.snapshot(progress=progress) == exp_progress_2 From 366467efd8329cac92ca42fa99d6aa332e06a857 Mon Sep 17 00:00:00 2001 From: Alexander Olekhnovich Date: Fri, 9 Apr 2021 16:42:40 +0200 Subject: [PATCH 2/2] Add metrics for delta base backups * Ratio between new files count/size uploaded and total backup size * total size/count for the backup, upload size/count * amount of data remained for current backup (not uploaded) * backup time for different modes --- pghoard/basebackup.py | 9 ++++++-- pghoard/basebackup_delta.py | 41 +++++++++++++++++++++++++++++++------ 2 files changed, 42 insertions(+), 8 deletions(-) diff --git a/pghoard/basebackup.py b/pghoard/basebackup.py index 9aeb16e4..0ac4084a 100644 --- a/pghoard/basebackup.py +++ b/pghoard/basebackup.py @@ -780,11 +780,16 @@ def run_local_tar_basebackup(self, delta=False): db_conn.commit() backup_stopped = True + backup_time = time.monotonic() - start_time + self.metrics.gauge( + "pghoard.backup_time_{}".format(self.site_config["basebackup_mode"]), + backup_time, + ) + self.log.info( "Basebackup generation finished, %r files, %r chunks, " "%r byte input, %r byte output, took %r seconds, waiting to upload", total_file_count, chunks_count, - total_size_plain, total_size_enc, - time.monotonic() - start_time + total_size_plain, total_size_enc, backup_time ) finally: diff --git a/pghoard/basebackup_delta.py b/pghoard/basebackup_delta.py index c5922540..04e2c7df 100644 --- a/pghoard/basebackup_delta.py +++ b/pghoard/basebackup_delta.py @@ -188,7 +188,7 @@ def _upload_files(self, callback_queue, todo, snapshotter): todo_hexdigests = set(hash.hexdigest for hash in todo) # Keep track on the new hashes submitted with the current upload, so we can clean them up in case of error - new_submitted_hashes = set() + new_submitted_hashes = dict() def _submit_files_in_thread(hexdigest): files = snapshotter.hexdigest_to_snapshotfiles.get(hexdigest, []) @@ -203,7 +203,7 @@ def _submit_files_in_thread(hexdigest): file_obj=f, relative_path=snapshotfile.relative_path, callback_queue=callback_queue ) if new_hash not in self.tracked_snapshot_files: - new_submitted_hashes.add(new_hash) + new_submitted_hashes[new_hash] = stored_size snapshotter.update_snapshot_file_data( relative_path=snapshotfile.relative_path, hexdigest=new_hash, @@ -252,46 +252,75 @@ def _submit_files_in_thread(hexdigest): raise BackupFailure("Error while uploading backup files") + uploaded_size = sum(new_submitted_hashes.values()) + uploaded_count = len(new_submitted_hashes) + + self.metrics.increase("pghoard.delta_backup_total_size", inc_value=uploaded_size) + self.metrics.gauge("pghoard.delta_backup_upload_size", uploaded_size) + self.metrics.increase("pghoard.delta_backup_total_files", inc_value=uploaded_count) + self.metrics.gauge("pghoard.delta_backup_upload_files", uploaded_count) + self.log.info("All basebackup files were uploaded successfully") + return uploaded_count, uploaded_size + def _delta_upload(self, snapshot_result: SnapshotResult, snapshotter: Snapshotter, start_time_utc): callback_queue = Queue() # Determine which digests already exist and which need to be uploaded, also restore the backup size of re-used # files from manifests snapshot_hashes = set(snapshot_result.hashes) - uploaded_hashes = set() + already_uploaded_hashes = set() for snapshot_file in snapshot_result.state.files: if snapshot_file.hexdigest in self.tracked_snapshot_files: snapshot_file_from_manifest = self.tracked_snapshot_files[snapshot_file.hexdigest] - uploaded_hashes.add( + already_uploaded_hashes.add( SnapshotHash( hexdigest=snapshot_file_from_manifest.hexdigest, size=snapshot_file_from_manifest.file_size ) ) - todo = snapshot_hashes.difference(uploaded_hashes) + todo = snapshot_hashes.difference(already_uploaded_hashes) todo_count = len(todo) self.log.info("Submitting hashes for upload: %r, total hashes in the snapshot: %r", todo_count, len(snapshot_hashes)) - self._upload_files(callback_queue=callback_queue, todo=todo, snapshotter=snapshotter) + uploaded_count, uploaded_size = self._upload_files(callback_queue=callback_queue, todo=todo, snapshotter=snapshotter) total_stored_size = 0 total_size = 0 + total_digests_count = 0 + total_digests_stored_size = 0 snapshot_result.state = snapshotter.get_snapshot_state() for snapshot_file in snapshot_result.state.files: total_size += snapshot_file.file_size if snapshot_file.hexdigest: + total_digests_count += 1 if not snapshot_file.stored_file_size: # Patch existing files with stored_file_size from existing manifest files (we can not have it otherwise) snapshot_file.stored_file_size = self.tracked_snapshot_files[snapshot_file.hexdigest].stored_file_size total_stored_size += snapshot_file.stored_file_size + total_digests_stored_size += snapshot_file.stored_file_size elif snapshot_file.content_b64: # Include embed files size into the total size as well total_stored_size += snapshot_file.file_size + if already_uploaded_hashes: + # Collect these metrics for all delta backups, except the first one + # The lower the number of those metrics, the more efficient delta backups are + if total_digests_count: + self.metrics.gauge("pghoard.delta_backup_changed_data_files_ratio", uploaded_count / total_digests_count) + if total_digests_stored_size: + self.metrics.gauge( + "pghoard.delta_backup_changed_data_size_ratio", + uploaded_size / total_digests_stored_size, + ) + self.metrics.gauge( + "pghoard.delta_backup_remained_data_size", + total_digests_stored_size - uploaded_size, + ) + manifest = BackupManifest( start=start_time_utc, snapshot_result=snapshot_result,