diff --git a/.gitignore b/.gitignore index 86de2f55..da3014de 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,4 @@ __pycache__/ /venv/ /.venv/ *.orig +/pghoard-rpm-src/ diff --git a/.pylintrc b/.pylintrc index 81017922..deeb75a2 100644 --- a/.pylintrc +++ b/.pylintrc @@ -28,7 +28,11 @@ disable= [FORMAT] max-line-length=125 +max-module-lines=1000 [REPORTS] output-format=text reports=no + +[TYPECHECK] +extension-pkg-whitelist=pydantic diff --git a/Makefile b/Makefile index 02a33ce6..b46472a8 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -short_ver = 2.1.1 +short_ver = $(shell git describe --abbrev=0) long_ver = $(shell git describe --long 2>/dev/null || echo $(short_ver)-0-unknown-g`git describe --always`) generated = pghoard/version.py diff --git a/pghoard.spec b/pghoard.spec index 63fee15a..b3c2f762 100644 --- a/pghoard.spec +++ b/pghoard.spec @@ -7,7 +7,7 @@ License: ASL 2.0 Source0: pghoard-rpm-src.tar Requires: systemd Requires: python3-botocore, python3-cryptography >= 0.8, python3-dateutil -Requires: python3-psycopg2, python3-requests, python3-snappy +Requires: python3-psycopg2, python3-requests, python3-snappy, python3-zstandard, python3-pydantic, Conflicts: pgespresso92 < 1.2, pgespresso93 < 1.2, pgespresso94 < 1.2, pgespresso95 < 1.2 BuildRequires: python3-flake8, python3-pytest, python3-pylint, python3-devel, golang diff --git a/pghoard/basebackup.py b/pghoard/basebackup.py index 4efed7c4..0ac4084a 100644 --- a/pghoard/basebackup.py +++ b/pghoard/basebackup.py @@ -14,9 +14,11 @@ import subprocess import time from concurrent.futures import ThreadPoolExecutor +from dataclasses import dataclass from queue import Empty, Queue from tempfile import NamedTemporaryFile from threading import Thread +from typing import Optional import psycopg2 @@ -25,8 +27,10 @@ # pylint: disable=superfluous-parens from . import common, version, wal +from .basebackup_delta import DeltaBaseBackup from .common import ( - connection_string_using_pgpass, replication_connection_string_and_slot_using_pgpass, set_stream_nonblocking, + BackupFailure, BaseBackupFormat, BaseBackupMode, connection_string_using_pgpass, + replication_connection_string_and_slot_using_pgpass, set_stream_nonblocking, set_subprocess_stdout_and_stderr_nonblocking, terminate_subprocess ) from .patchedtarfile import tarfile @@ -46,14 +50,38 @@ ] -class BackupFailure(Exception): - """Backup failed - post a failure to callback_queue and allow the thread to terminate""" - - class NoException(BaseException): """Exception that's never raised, used in conditional except blocks""" +@dataclass(frozen=True) +class EncryptionData: + encryption_key_id: Optional[str] + rsa_public_key: Optional[str] + + @staticmethod + def from_site_config(site_config) -> "EncryptionData": + encryption_key_id = site_config["encryption_key_id"] + if encryption_key_id: + rsa_public_key = site_config["encryption_keys"][encryption_key_id]["public"] + else: + rsa_public_key = None + + return EncryptionData(encryption_key_id=encryption_key_id, rsa_public_key=rsa_public_key) + + +@dataclass(frozen=True) +class CompressionData: + algorithm: str + level: int + + @staticmethod + def from_config(config) -> "CompressionData": + algorithm = config["compression"]["algorithm"] + level = config["compression"]["level"] + return CompressionData(algorithm=algorithm, level=level) + + class PGBaseBackup(Thread): def __init__( self, @@ -63,10 +91,12 @@ def __init__( basebackup_path, compression_queue, metrics, + storage, transfer_queue=None, callback_queue=None, pg_version_server=None, - metadata=None + metadata=None, + get_remote_basebackups_info=None ): super().__init__() self.log = logging.getLogger("PGBaseBackup") @@ -84,15 +114,19 @@ def __init__( self.pid = None self.pg_version_server = pg_version_server self.latest_activity = datetime.datetime.utcnow() + self.storage = storage + self.get_remote_basebackups_info = get_remote_basebackups_info def run(self): try: - basebackup_mode = self.config["backup_sites"][self.site]["basebackup_mode"] - if basebackup_mode == "basic": + basebackup_mode = self.site_config["basebackup_mode"] + if basebackup_mode == BaseBackupMode.basic: self.run_basic_basebackup() - elif basebackup_mode == "local-tar": + elif basebackup_mode == BaseBackupMode.local_tar: self.run_local_tar_basebackup() - elif basebackup_mode == "pipe": + elif basebackup_mode == BaseBackupMode.delta: + self.run_local_tar_basebackup(delta=True) + elif basebackup_mode == BaseBackupMode.pipe: self.run_piped_basebackup() else: raise errors.InvalidConfigurationError("Unsupported basebackup_mode {!r}".format(basebackup_mode)) @@ -129,7 +163,7 @@ def get_paths_for_backup(basebackup_path): def get_command_line(self, output_name): command = [ - self.config["backup_sites"][self.site]["pg_basebackup_path"], + self.site_config["pg_basebackup_path"], "--format", "tar", "--label", @@ -139,7 +173,7 @@ def get_command_line(self, output_name): output_name, ] - if self.config["backup_sites"][self.site]["active_backup_mode"] == "standalone_hot_backup": + if self.site_config["active_backup_mode"] == "standalone_hot_backup": if self.pg_version_server >= 100000: command.extend(["--wal-method=fetch"]) else: @@ -169,9 +203,9 @@ def check_command_success(self, proc, output_file): def basebackup_compression_pipe(self, proc, basebackup_path): rsa_public_key = None - encryption_key_id = self.config["backup_sites"][self.site]["encryption_key_id"] + encryption_key_id = self.site_config["encryption_key_id"] if encryption_key_id: - rsa_public_key = self.config["backup_sites"][self.site]["encryption_keys"][encryption_key_id]["public"] + rsa_public_key = self.site_config["encryption_keys"][encryption_key_id]["public"] compression_algorithm = self.config["compression"]["algorithm"] compression_level = self.config["compression"]["level"] self.log.debug("Compressing basebackup directly to file: %r", basebackup_path) @@ -461,25 +495,30 @@ def add_entry(archive_path, local_path, *, missing_ok): yield from add_directory(archive_path, local_path, missing_ok=False) yield archive_path, local_path, False, "leave" + @property + def site_config(self): + return self.config["backup_sites"][self.site] + + @property + def encryption_data(self) -> EncryptionData: + return EncryptionData.from_site_config(self.site_config) + + @property + def compression_data(self) -> CompressionData: + return CompressionData.from_config(self.config) + def tar_one_file( self, *, temp_dir, chunk_path, files_to_backup, callback_queue, filetype="basebackup_chunk", extra_metadata=None ): start_time = time.monotonic() - site_config = self.config["backup_sites"][self.site] - encryption_key_id = site_config["encryption_key_id"] - if encryption_key_id: - rsa_public_key = site_config["encryption_keys"][encryption_key_id]["public"] - else: - rsa_public_key = None - with NamedTemporaryFile(dir=temp_dir, prefix=os.path.basename(chunk_path), suffix=".tmp") as raw_output_obj: # pylint: disable=bad-continuation with rohmufile.file_writer( - compression_algorithm=self.config["compression"]["algorithm"], - compression_level=self.config["compression"]["level"], - compression_threads=site_config["basebackup_compression_threads"], - rsa_public_key=rsa_public_key, + compression_algorithm=self.compression_data.algorithm, + compression_level=self.compression_data.level, + compression_threads=self.site_config["basebackup_compression_threads"], + rsa_public_key=self.encryption_data.rsa_public_key, fileobj=raw_output_obj ) as output_obj: with tarfile.TarFile(fileobj=output_obj, mode="w") as output_tar: @@ -492,7 +531,7 @@ def tar_one_file( os.link(raw_output_obj.name, chunk_path) rohmufile.log_compression_result( - encrypted=bool(encryption_key_id), + encrypted=bool(self.encryption_data.encryption_key_id), elapsed=time.monotonic() - start_time, original_size=input_size, result_size=result_size, @@ -505,16 +544,16 @@ def tar_one_file( "pghoard.compressed_size_ratio", size_ratio, tags={ - "algorithm": self.config["compression"]["algorithm"], + "algorithm": self.compression_data.algorithm, "site": self.site, "type": "basebackup", } ) metadata = { - "compression-algorithm": self.config["compression"]["algorithm"], - "encryption-key-id": encryption_key_id, - "format": "pghoard-bb-v2", + "compression-algorithm": self.compression_data.algorithm, + "encryption-key-id": self.encryption_data.encryption_key_id, + "format": BaseBackupFormat.v2, "original-file-size": input_size, "host": socket.gethostname(), } @@ -573,9 +612,8 @@ def create_and_upload_chunks(self, chunks, data_file_format, temp_base_dir): self.chunks_on_disk = 0 i = 0 - site_config = self.config["backup_sites"][self.site] - max_chunks_on_disk = site_config["basebackup_chunks_in_progress"] - threads = site_config["basebackup_threads"] + max_chunks_on_disk = self.site_config["basebackup_chunks_in_progress"] + threads = self.site_config["basebackup_threads"] with ThreadPoolExecutor(max_workers=threads) as tpe: pending_compress_and_encrypt_tasks = [] while i < len(chunks): @@ -612,8 +650,9 @@ def create_and_upload_chunks(self, chunks, data_file_format, temp_base_dir): return chunk_files - def run_local_tar_basebackup(self): - pgdata = self.config["backup_sites"][self.site]["pg_data_directory"] + def run_local_tar_basebackup(self, delta=False): + control_files_metadata_extra = {} + pgdata = self.site_config["pg_data_directory"] if not os.path.isdir(pgdata): raise errors.InvalidConfigurationError("pg_data_directory {!r} does not exist".format(pgdata)) @@ -622,7 +661,7 @@ def run_local_tar_basebackup(self): data_file_format = "{}/{}.{{0:08d}}.pghoard".format(compressed_base, os.path.basename(compressed_base)).format # Default to 2GB chunks of uncompressed data - target_chunk_size = self.config["backup_sites"][self.site]["basebackup_chunk_size"] + target_chunk_size = self.site_config["basebackup_chunk_size"] self.log.debug("Connecting to database to start backup process") connection_string = connection_string_using_pgpass(self.connection_info) @@ -686,13 +725,45 @@ def run_local_tar_basebackup(self): self.log.info("Starting to backup %r and %r tablespaces to %r", pgdata, len(tablespaces), compressed_base) start_time = time.monotonic() - total_file_count, chunks = self.find_and_split_files_to_backup( - pgdata=pgdata, tablespaces=tablespaces, target_chunk_size=target_chunk_size - ) + if delta: + delta_backup = DeltaBaseBackup( + storage=self.storage, + site=self.site, + site_config=self.site_config, + transfer_queue=self.transfer_queue, + metrics=self.metrics, + encryption_data=self.encryption_data, + compression_data=self.compression_data, + get_remote_basebackups_info=self.get_remote_basebackups_info, + parallel=self.site_config["basebackup_threads"], + temp_base_dir=temp_base_dir, + compressed_base=compressed_base + ) + total_size_plain, total_size_enc, manifest, total_file_count = delta_backup.run( + pgdata=pgdata, + src_iterate_func=lambda: ( + item[1] + for item in self.find_files_to_backup(pgdata=pgdata, tablespaces=tablespaces) + if not item[1].endswith(".pem") # Exclude such files like "dh1024.pem" + ), + ) + + chunks_count = total_file_count + control_files_metadata_extra["manifest"] = manifest.jsondict() + self.metadata["format"] = BaseBackupFormat.delta_v1 + else: + total_file_count, chunks = self.find_and_split_files_to_backup( + pgdata=pgdata, tablespaces=tablespaces, target_chunk_size=target_chunk_size + ) + chunks_count = len(chunks) + # Tar up the chunks and submit them for upload; note that we start from chunk 1 here; chunk 0 + # is reserved for special files and metadata and will be generated last. + chunk_files = self.create_and_upload_chunks(chunks, data_file_format, temp_base_dir) - # Tar up the chunks and submit them for upload; note that we start from chunk 1 here; chunk 0 - # is reserved for special files and metadata and will be generated last. - chunk_files = self.create_and_upload_chunks(chunks, data_file_format, temp_base_dir) + total_size_plain = sum(item["input_size"] for item in chunk_files) + total_size_enc = sum(item["result_size"] for item in chunk_files) + + control_files_metadata_extra["chunks"] = chunk_files # Everything is now tarred up, grab the latest pg_control and stop the backup process with open(os.path.join(pgdata, "global", "pg_control"), "rb") as fp: @@ -709,14 +780,16 @@ def run_local_tar_basebackup(self): db_conn.commit() backup_stopped = True - total_size_plain = sum(item["input_size"] for item in chunk_files) - total_size_enc = sum(item["result_size"] for item in chunk_files) + backup_time = time.monotonic() - start_time + self.metrics.gauge( + "pghoard.backup_time_{}".format(self.site_config["basebackup_mode"]), + backup_time, + ) self.log.info( "Basebackup generation finished, %r files, %r chunks, " - "%r byte input, %r byte output, took %r seconds, waiting to upload", total_file_count, len(chunk_files), - total_size_plain, total_size_enc, - time.monotonic() - start_time + "%r byte input, %r byte output, took %r seconds, waiting to upload", total_file_count, chunks_count, + total_size_plain, total_size_enc, backup_time ) finally: @@ -740,13 +813,13 @@ def run_local_tar_basebackup(self): "backup_end_wal_segment": backup_end_wal_segment, "backup_start_time": backup_start_time, "backup_start_wal_segment": backup_start_wal_segment, - "chunks": chunk_files, "pgdata": pgdata, "pghoard_object": "basebackup", "pghoard_version": version.__version__, "tablespaces": tablespaces, "host": socket.gethostname(), } + metadata.update(control_files_metadata_extra) control_files = list( self.get_control_entries_for_tar( metadata=metadata, diff --git a/pghoard/basebackup_delta.py b/pghoard/basebackup_delta.py new file mode 100644 index 00000000..04e2c7df --- /dev/null +++ b/pghoard/basebackup_delta.py @@ -0,0 +1,354 @@ +# Copyright (c) 2021 Aiven, Helsinki, Finland. https://aiven.io/ +import hashlib +import io +import logging +import os +import shutil +import socket +import threading +import time +import uuid +from contextlib import suppress +from multiprocessing.dummy import Pool +from queue import Empty, Queue +from tempfile import NamedTemporaryFile +from typing import Callable, Dict + +from pghoard.common import (BackupFailure, BaseBackupFormat, extract_pghoard_delta_v1_metadata) +from pghoard.rohmu import rohmufile +from pghoard.rohmu.dates import now +from pghoard.rohmu.delta.common import (BackupManifest, SnapshotFile, SnapshotHash, SnapshotResult, SnapshotUploadResult) +from pghoard.rohmu.delta.snapshot import Snapshotter +from pghoard.rohmu.errors import FileNotFoundFromStorageError + + +class DeltaBaseBackup: + def __init__( + self, *, storage, site, site_config, transfer_queue, metrics, encryption_data, compression_data, + get_remote_basebackups_info, parallel, temp_base_dir, compressed_base + ): + self.log = logging.getLogger("DeltaBaseBackup") + self.storage = storage + self.site = site + self.site_config = site_config + self.transfer_queue = transfer_queue + self.metrics = metrics + self.encryption_data = encryption_data + self.compression_data = compression_data + self.get_remote_basebackups_info = get_remote_basebackups_info + self.parallel = parallel + self.temp_base_dir = temp_base_dir + self.compressed_base = compressed_base + self.submitted_hashes_lock = threading.Lock() + self.submitted_hashes = set() + self.tracked_snapshot_files: Dict[str, SnapshotFile] = self._list_existing_files() + + def _snapshot(self, snapshotter) -> SnapshotResult: + snapshotter.snapshot(reuse_old_snapshotfiles=False) + snapshot_result = SnapshotResult() + snapshot_result.state = snapshotter.get_snapshot_state() + snapshot_result.hashes = [ + SnapshotHash(hexdigest=ssfile.hexdigest, size=ssfile.file_size) + for ssfile in snapshot_result.state.files + if ssfile.hexdigest + ] + snapshot_result.files = len(snapshot_result.state.files) + snapshot_result.total_size = sum(ssfile.file_size for ssfile in snapshot_result.state.files) + snapshot_result.end = now() + + self.log.debug("snapshot result: %r", snapshot_result.json()) + + return snapshot_result + + def _upload_hexdigest_from_file(self, file_obj, relative_path, callback_queue): + """Upload file into an object with a temporary name, after upload is finished, it will be copied to a new name + with a final/real hash""" + temp_object_name = f"temp_{uuid.uuid4()}" + return self._delta_upload_hexdigest( + callback_queue=callback_queue, + chunk_path=f"{self.compressed_base}/{temp_object_name}", + temp_dir=self.temp_base_dir, + file_obj=file_obj, + relative_path=relative_path + ) + + def _list_existing_files(self): + """Iterate through all manifest files and fetch information about hash files""" + all_snapshot_files: Dict[str, SnapshotFile] = {} + for backup in self.get_remote_basebackups_info(self.site): + if backup["metadata"].get("format") != BaseBackupFormat.delta_v1: + continue + + key = os.path.join(self.site_config["prefix"], "basebackup", backup["name"]) + bmeta_compressed = self.storage.get_contents_to_string(key)[0] + + with rohmufile.file_reader( + fileobj=io.BytesIO(bmeta_compressed), + metadata=backup["metadata"], + key_lookup=lambda key_id: self.site_config["encryption_keys"][key_id]["private"] + ) as input_obj: + meta = extract_pghoard_delta_v1_metadata(input_obj) + + manifest = meta["manifest"] + snapshot_result = manifest["snapshot_result"] + backup_state = snapshot_result["state"] + files = backup_state["files"] + for delta_file in files: + snapshot_file = SnapshotFile.parse_obj(delta_file) + if snapshot_file.hexdigest: + all_snapshot_files[snapshot_file.hexdigest] = snapshot_file + + return all_snapshot_files + + def _delta_upload_hexdigest(self, *, temp_dir, chunk_path, file_obj, callback_queue, relative_path): + skip_upload = False + start_time = time.monotonic() + + input_size = file_obj.seek(0, os.SEEK_END) + file_obj.seek(0) + + result_hash = hashlib.blake2s() + + def update_hash(data): + result_hash.update(data) + + with NamedTemporaryFile(dir=temp_dir, prefix=os.path.basename(chunk_path), suffix=".tmp") as raw_output_obj: + rohmufile.write_file( + input_obj=file_obj, + output_obj=raw_output_obj, + compression_algorithm=self.compression_data.algorithm, + compression_level=self.compression_data.level, + rsa_public_key=self.encryption_data.rsa_public_key, + log_func=self.log.info, + data_callback=update_hash + ) + result_size = raw_output_obj.tell() + raw_output_obj.seek(0) + + result_digest = result_hash.hexdigest() + + with self.submitted_hashes_lock: + if result_digest in self.submitted_hashes: + # file with the same hash was already submitted + self.log.debug( + "Skip uploading file %r, file with the same was hash already submitted for uploading", relative_path + ) + skip_upload = True + return input_size, result_size, result_digest, skip_upload + else: + self.submitted_hashes.add(result_digest) + + os.link(raw_output_obj.name, chunk_path) + + rohmufile.log_compression_result( + encrypted=bool(self.encryption_data.encryption_key_id), + elapsed=time.monotonic() - start_time, + original_size=input_size, + result_size=result_size, + source_name="$PGDATA delta basebackup file", + log_func=self.log.info, + ) + + if input_size: + size_ratio = result_size / input_size + self.metrics.gauge( + "pghoard.compressed_size_ratio", + size_ratio, + tags={ + "algorithm": self.compression_data.algorithm, + "site": self.site, + "type": "basebackup_delta", + } + ) + + metadata = { + "compression-algorithm": self.compression_data.algorithm, + "encryption-key-id": self.encryption_data.encryption_key_id, + "format": BaseBackupFormat.delta_v1, + "original-file-size": input_size, + "host": socket.gethostname(), + } + + self.transfer_queue.put({ + "callback_queue": callback_queue, + "file_size": result_size, + "filetype": "basebackup_delta", + "local_path": chunk_path, + "metadata": metadata, + "site": self.site, + "type": "UPLOAD", + "delta": { + "hexdigest": result_digest, + }, + }) + + return input_size, result_size, result_digest, skip_upload + + def _upload_files(self, callback_queue, todo, snapshotter): + todo_hexdigests = set(hash.hexdigest for hash in todo) + + # Keep track on the new hashes submitted with the current upload, so we can clean them up in case of error + new_submitted_hashes = dict() + + def _submit_files_in_thread(hexdigest): + files = snapshotter.hexdigest_to_snapshotfiles.get(hexdigest, []) + for snapshotfile in files: + path = snapshotter.dst / snapshotfile.relative_path + if not path.is_file(): + self.log.error("%s disappeared post-snapshot", path) + return False + try: + with snapshotfile.open_for_reading(snapshotter.dst) as f: + size, stored_size, new_hash, skip_upload = self._upload_hexdigest_from_file( + file_obj=f, relative_path=snapshotfile.relative_path, callback_queue=callback_queue + ) + if new_hash not in self.tracked_snapshot_files: + new_submitted_hashes[new_hash] = stored_size + snapshotter.update_snapshot_file_data( + relative_path=snapshotfile.relative_path, + hexdigest=new_hash, + file_size=size, + stored_file_size=stored_size + ) + if not skip_upload: + # Every thread is waiting for the transfer to finish, so we don't load disk too much, + # only `self.parallel` files in parallel + while True: + try: + callback_queue.get(timeout=3.0) + break + except Empty: + continue + except Exception: # pylint: disable=broad-except + # Report failure - whole step will be retried later + self.log.exception("Exception uploading %r", path) + return False + + return True + + sorted_todo_hexdigests = sorted( + todo_hexdigests, key=lambda hexdigest: -snapshotter.hexdigest_to_snapshotfiles[hexdigest][0].file_size + ) + iterable_as_list = list(sorted_todo_hexdigests) + with Pool(self.parallel) as p: + for hexdigest, res in zip(iterable_as_list, p.imap(_submit_files_in_thread, iterable_as_list)): + if not res: + self.log.error( + "Error while processing digest for upload %r, waiting for workers pool to shutdown", hexdigest + ) + + p.terminate() + p.join() + + self.log.info("Cleaning up already uploaded new backup files") + + for new_hash in new_submitted_hashes: + key = os.path.join(self.site_config["prefix"], "basebackup_delta", new_hash) + self.log.info("Removing object from the storage: %r", key) + try: + self.storage.delete_key(key) + except FileNotFoundFromStorageError: + self.log.warning("Object with key %r does not exist, skipping") + + raise BackupFailure("Error while uploading backup files") + + uploaded_size = sum(new_submitted_hashes.values()) + uploaded_count = len(new_submitted_hashes) + + self.metrics.increase("pghoard.delta_backup_total_size", inc_value=uploaded_size) + self.metrics.gauge("pghoard.delta_backup_upload_size", uploaded_size) + self.metrics.increase("pghoard.delta_backup_total_files", inc_value=uploaded_count) + self.metrics.gauge("pghoard.delta_backup_upload_files", uploaded_count) + + self.log.info("All basebackup files were uploaded successfully") + + return uploaded_count, uploaded_size + + def _delta_upload(self, snapshot_result: SnapshotResult, snapshotter: Snapshotter, start_time_utc): + callback_queue = Queue() + + # Determine which digests already exist and which need to be uploaded, also restore the backup size of re-used + # files from manifests + snapshot_hashes = set(snapshot_result.hashes) + already_uploaded_hashes = set() + for snapshot_file in snapshot_result.state.files: + if snapshot_file.hexdigest in self.tracked_snapshot_files: + snapshot_file_from_manifest = self.tracked_snapshot_files[snapshot_file.hexdigest] + already_uploaded_hashes.add( + SnapshotHash( + hexdigest=snapshot_file_from_manifest.hexdigest, size=snapshot_file_from_manifest.file_size + ) + ) + + todo = snapshot_hashes.difference(already_uploaded_hashes) + todo_count = len(todo) + + self.log.info("Submitting hashes for upload: %r, total hashes in the snapshot: %r", todo_count, len(snapshot_hashes)) + + uploaded_count, uploaded_size = self._upload_files(callback_queue=callback_queue, todo=todo, snapshotter=snapshotter) + + total_stored_size = 0 + total_size = 0 + total_digests_count = 0 + total_digests_stored_size = 0 + + snapshot_result.state = snapshotter.get_snapshot_state() + for snapshot_file in snapshot_result.state.files: + total_size += snapshot_file.file_size + if snapshot_file.hexdigest: + total_digests_count += 1 + if not snapshot_file.stored_file_size: + # Patch existing files with stored_file_size from existing manifest files (we can not have it otherwise) + snapshot_file.stored_file_size = self.tracked_snapshot_files[snapshot_file.hexdigest].stored_file_size + total_stored_size += snapshot_file.stored_file_size + total_digests_stored_size += snapshot_file.stored_file_size + elif snapshot_file.content_b64: + # Include embed files size into the total size as well + total_stored_size += snapshot_file.file_size + + if already_uploaded_hashes: + # Collect these metrics for all delta backups, except the first one + # The lower the number of those metrics, the more efficient delta backups are + if total_digests_count: + self.metrics.gauge("pghoard.delta_backup_changed_data_files_ratio", uploaded_count / total_digests_count) + if total_digests_stored_size: + self.metrics.gauge( + "pghoard.delta_backup_changed_data_size_ratio", + uploaded_size / total_digests_stored_size, + ) + self.metrics.gauge( + "pghoard.delta_backup_remained_data_size", + total_digests_stored_size - uploaded_size, + ) + + manifest = BackupManifest( + start=start_time_utc, + snapshot_result=snapshot_result, + upload_result=SnapshotUploadResult(total_size=total_size, total_stored_size=total_stored_size) + ) + + return manifest, total_size, total_stored_size + + def run(self, pgdata: str, src_iterate_func: Callable): + # NOTE: Hard links work only in the same FS, therefore using hopefully the same FS in PG home folder + delta_dir = os.path.join(os.path.dirname(pgdata), "basebackup_delta") + with suppress(FileExistsError): + os.makedirs(delta_dir) + + snapshotter = Snapshotter( + src=pgdata, dst=delta_dir, globs=["**/*"], parallel=self.parallel, src_iterate_func=src_iterate_func + ) + + with snapshotter.lock: + start_time_utc = now() + + snapshot_result = self._snapshot(snapshotter) + + manifest, total_size, total_stored_size = self._delta_upload( + snapshot_result=snapshot_result, snapshotter=snapshotter, start_time_utc=start_time_utc + ) + self.log.debug("manifest %r", manifest) + + shutil.rmtree(delta_dir) + + return total_size, total_stored_size, manifest, snapshot_result.files diff --git a/pghoard/common.py b/pghoard/common.py index de9f24a3..342168ad 100644 --- a/pghoard/common.py +++ b/pghoard/common.py @@ -5,6 +5,7 @@ See LICENSE for details """ import datetime +import enum import fcntl import json import logging @@ -24,6 +25,26 @@ LOG = logging.getLogger("pghoard.common") +class StrEnum(str, enum.Enum): + def __str__(self): + return str(self.value) + + +@enum.unique +class BaseBackupFormat(StrEnum): + v1 = "pghoard-bb-v1" + v2 = "pghoard-bb-v2" + delta_v1 = "pghoard-delta-v1" + + +@enum.unique +class BaseBackupMode(StrEnum): + basic = "basic" + delta = "delta" + local_tar = "local-tar" + pipe = "pipe" + + def create_pgpass_file(connection_string_or_info): """Look up password from the given object which can be a dict or a string and write a possible password in a pgpass file; @@ -211,7 +232,7 @@ def delete_alert_file(config, filename): os.unlink(filepath) -def extract_pghoard_bb_v2_metadata(fileobj): +def _extract_metadata(fileobj): # | in mode to use tarfile's internal stream buffer manager, currently required because our SnappyFile # interface doesn't do proper buffering for reads with tarfile.open(fileobj=fileobj, mode="r|", bufsize=IO_BLOCK_SIZE) as tar: @@ -223,6 +244,14 @@ def extract_pghoard_bb_v2_metadata(fileobj): raise Exception(".pghoard_tar_metadata.json not found") +def extract_pghoard_bb_v2_metadata(fileobj): + return _extract_metadata(fileobj) + + +def extract_pghoard_delta_v1_metadata(fileobj): + return _extract_metadata(fileobj) + + def get_pg_wal_directory(config): if LooseVersion(config["pg_data_directory_version"]) >= "10": return os.path.join(config["pg_data_directory"], "pg_wal") @@ -256,3 +285,7 @@ def increase_pipe_capacity(*pipes): break except PermissionError: pass + + +class BackupFailure(Exception): + """Backup failed - post a failure to callback_queue and allow the thread to terminate""" diff --git a/pghoard/config.py b/pghoard/config.py index 40e40451..153a03fc 100644 --- a/pghoard/config.py +++ b/pghoard/config.py @@ -85,6 +85,7 @@ def set_and_check_config_defaults(config, *, check_commands=True, check_pgdata=T site_config.setdefault("basebackup_compression_threads", 0) site_config.setdefault("basebackup_count", 2) site_config.setdefault("basebackup_count_min", 2) + site_config.setdefault("basebackup_delta_mode_max_retries", 10) site_config.setdefault("basebackup_interval_hours", 24) # NOTE: stream_compression removed from documentation after 1.6.0 release site_config.setdefault("basebackup_mode", "pipe" if site_config.get("stream_compression") else "basic") diff --git a/pghoard/pghoard.py b/pghoard/pghoard.py index 2f004af1..a02cd042 100644 --- a/pghoard/pghoard.py +++ b/pghoard/pghoard.py @@ -19,20 +19,23 @@ import sys import time from contextlib import closing +from dataclasses import dataclass from queue import Empty, Queue +from typing import Dict import psycopg2 from pghoard import config, logutil, metrics, version, wal from pghoard.basebackup import PGBaseBackup from pghoard.common import ( - create_alert_file, extract_pghoard_bb_v2_metadata, get_object_storage_config, - replication_connection_string_and_slot_using_pgpass, write_json_file + BaseBackupFormat, BaseBackupMode, create_alert_file, extract_pghoard_bb_v2_metadata, extract_pghoard_delta_v1_metadata, + get_object_storage_config, replication_connection_string_and_slot_using_pgpass, write_json_file ) from pghoard.compressor import CompressorThread from pghoard.receivexlog import PGReceiveXLog from pghoard.rohmu import dates, get_transfer, rohmufile from pghoard.rohmu.compat import suppress +from pghoard.rohmu.dates import now as utc_now from pghoard.rohmu.errors import (FileNotFoundFromStorageError, InvalidConfigurationError) from pghoard.rohmu.inotify import InotifyWatcher from pghoard.transfer import TransferAgent @@ -45,6 +48,12 @@ WALReceiver = None +@dataclass +class DeltaBaseBackupFailureInfo: + last_failed_time: datetime + retries: int = 0 + + class PGHoard: def __init__(self, config_path): self.metrics = None @@ -117,6 +126,7 @@ def __init__(self, config_path): logutil.notify_systemd("READY=1") self.log.info("pghoard initialized, own_hostname: %r, cwd: %r", socket.gethostname(), os.getcwd()) + self.delta_backup_failures: Dict[str, DeltaBaseBackupFailureInfo] = {} def check_pg_versions_ok(self, site, pg_version_server, command): if pg_version_server is None: @@ -154,7 +164,9 @@ def create_basebackup(self, site, connection_info, basebackup_path, callback_que callback_queue=callback_queue, pg_version_server=pg_version_server, metrics=self.metrics, + storage=self.get_or_create_site_storage(site=site), metadata=metadata, + get_remote_basebackups_info=self.get_remote_basebackups_info ) thread.start() self.basebackups[site] = thread @@ -219,8 +231,11 @@ def start_walreceiver(self, site, chosen_backup_node, last_flushed_lsn): thread.start() self.walreceivers[site] = thread + def _get_site_prefix(self, site): + return self.config["backup_sites"][site]["prefix"] + def create_backup_site_paths(self, site): - site_path = os.path.join(self.config["backup_location"], self.config["backup_sites"][site]["prefix"]) + site_path = os.path.join(self.config["backup_location"], self._get_site_prefix(site)) xlog_path = os.path.join(site_path, "xlog") basebackup_path = os.path.join(site_path, "basebackup") @@ -249,9 +264,7 @@ def delete_remote_wal_before(self, wal_segment, site, pg_version): if seg == 0 and log == 0: break seg, log = wal.get_previous_wal_on_same_timeline(seg, log, pg_version) - wal_path = os.path.join( - self.config["backup_sites"][site]["prefix"], "xlog", wal.name_for_tli_log_seg(tli, log, seg) - ) + wal_path = os.path.join(self._get_site_prefix(site), "xlog", wal.name_for_tli_log_seg(tli, log, seg)) self.log.debug("Deleting wal_file: %r", wal_path) try: storage.delete_key(wal_path) @@ -274,13 +287,47 @@ def delete_remote_wal_before(self, wal_segment, site, pg_version): self.log.exception("Problem deleting: %r", wal_path) self.metrics.unexpected_exception(ex, where="delete_remote_wal_before") - def delete_remote_basebackup(self, site, basebackup, metadata): + def _get_delta_basebackup_files(self, site, storage, metadata, basebackup_name_to_delete, backups_to_keep): + all_hexdigests = set() + keep_hexdigests = set() + + basebackup_data_files = list() + for backup_name in [basebackup_name_to_delete] + [back["name"] for back in backups_to_keep]: + delta_backup_key = os.path.join(self._get_site_prefix(site), "basebackup", backup_name) + bmeta_compressed = storage.get_contents_to_string(delta_backup_key)[0] + with rohmufile.file_reader( + fileobj=io.BytesIO(bmeta_compressed), + metadata=metadata, + key_lookup=config.key_lookup_for_site(self.config, site) + ) as input_obj: + meta = extract_pghoard_delta_v1_metadata(input_obj) + + manifest = meta["manifest"] + snapshot_result = manifest["snapshot_result"] + backup_state = snapshot_result["state"] + files = backup_state["files"] + + backup_hexdigests = set(delta_file["hexdigest"] for delta_file in files if delta_file["hexdigest"]) + all_hexdigests |= backup_hexdigests + + if backup_name != basebackup_name_to_delete: + # Keep data file in case if there is still a reference from other backups + keep_hexdigests |= backup_hexdigests + + # Remove unreferenced files + extra_hexdigests = set(all_hexdigests).difference(keep_hexdigests) + for hexdigest in extra_hexdigests: + basebackup_data_files.append(os.path.join(self._get_site_prefix(site), "basebackup_delta", hexdigest)) + + return basebackup_data_files + + def delete_remote_basebackup(self, site, basebackup, metadata, basebackups): start_time = time.monotonic() storage = self.site_transfers.get(site) - main_backup_key = os.path.join(self.config["backup_sites"][site]["prefix"], "basebackup", basebackup) + main_backup_key = os.path.join(self._get_site_prefix(site), "basebackup", basebackup) basebackup_data_files = [main_backup_key] - if metadata.get("format") == "pghoard-bb-v2": + if metadata.get("format") == BaseBackupFormat.v2: bmeta_compressed = storage.get_contents_to_string(main_backup_key)[0] with rohmufile.file_reader( fileobj=io.BytesIO(bmeta_compressed), @@ -292,11 +339,13 @@ def delete_remote_basebackup(self, site, basebackup, metadata): for chunk in bmeta["chunks"]: basebackup_data_files.append( os.path.join( - self.config["backup_sites"][site]["prefix"], + self._get_site_prefix(site), "basebackup_chunk", chunk["chunk_filename"], ) ) + elif metadata.get("format") == BaseBackupFormat.delta_v1: + basebackup_data_files.extend(self._get_delta_basebackup_files(site, storage, metadata, basebackup, basebackups)) self.log.debug("Deleting basebackup datafiles: %r", ", ".join(basebackup_data_files)) for obj_key in basebackup_data_files: @@ -312,13 +361,16 @@ def delete_remote_basebackup(self, site, basebackup, metadata): time.monotonic() - start_time ) - def get_remote_basebackups_info(self, site): + def get_or_create_site_storage(self, site): storage = self.site_transfers.get(site) if not storage: storage_config = get_object_storage_config(self.config, site) storage = get_transfer(storage_config) self.site_transfers[site] = storage + return storage + def get_remote_basebackups_info(self, site): + storage = self.get_or_create_site_storage(site=site) site_config = self.config["backup_sites"][site] results = storage.list_path(os.path.join(site_config["prefix"], "basebackup")) for entry in results: @@ -404,7 +456,9 @@ def refresh_backup_list_and_delete_old(self, site): if last_wal_segment_still_needed: self.delete_remote_wal_before(last_wal_segment_still_needed, site, pg_version) - self.delete_remote_basebackup(site, basebackup_to_be_deleted["name"], basebackup_to_be_deleted["metadata"]) + self.delete_remote_basebackup( + site, basebackup_to_be_deleted["name"], basebackup_to_be_deleted["metadata"], basebackups=basebackups + ) self.state["backup_sites"][site]["basebackups"] = basebackups def get_normalized_backup_time(self, site_config, *, now=None): @@ -553,6 +607,18 @@ def handle_site(self, site, site_config): if site in self.basebackups: try: result = self.basebackups_callbacks[site].get(block=False) + if result["success"]: + # No matter which mode, if succeeded reset the counter + self.delta_backup_failures.pop(site, None) + elif site_config["basebackup_mode"] == BaseBackupMode.delta: + last_failed_time = utc_now() + if site not in self.delta_backup_failures: + self.delta_backup_failures[site] = DeltaBaseBackupFailureInfo( + retries=0, last_failed_time=last_failed_time + ) + else: + self.delta_backup_failures[site].retries += 1 + self.delta_backup_failures[site].last_failed_time = last_failed_time except Empty: # previous basebackup (or its compression and upload) still in progress return @@ -566,6 +632,21 @@ def handle_site(self, site, site_config): metadata = self.get_new_backup_details(site=site, site_config=site_config) if metadata and not os.path.exists(self.config["maintenance_mode_file"]): + if site in self.delta_backup_failures: + retries = self.delta_backup_failures[site].retries + if retries > site_config["basebackup_delta_mode_max_retries"]: + self.log.info("Giving up backup after exceeding max retries: %r", retries) + return + else: + # Start from ~2 min with cap of one hour + retry_interval = min(2 ** (retries + 7), 60 * 60) + if utc_now( + ) >= self.delta_backup_failures[site].last_failed_time + datetime.timedelta(seconds=retry_interval): + self.log.info("Re-trying delta basebackup") + else: + self.log.info("Waiting for backoff time before re-trying new delta backup due to previous failures") + return + self.basebackups_callbacks[site] = Queue() self.create_basebackup(site, chosen_backup_node, basebackup_path, self.basebackups_callbacks[site], metadata) diff --git a/pghoard/restore.py b/pghoard/restore.py index 9d549b6d..07bf72e7 100644 --- a/pghoard/restore.py +++ b/pghoard/restore.py @@ -4,8 +4,11 @@ Copyright (c) 2016 Ohmu Ltd See LICENSE for details """ +import abc import argparse +import base64 import datetime +import enum import errno import io import logging @@ -23,10 +26,12 @@ from distutils.version import \ LooseVersion # pylint: disable=no-name-in-module,import-error from threading import RLock +from typing import Dict, List, Optional from psycopg2.extensions import adapt from requests import Session +from pghoard.common import BaseBackupFormat, StrEnum from pghoard.rohmu import compat, dates, get_transfer, rohmufile from pghoard.rohmu.errors import Error, InvalidConfigurationError @@ -38,6 +43,50 @@ class RestoreError(Error): """Restore error""" +@enum.unique +class FileInfoType(StrEnum): + regular = "regular" + delta = "delta" + + +class FileInfo(abc.ABC): + def __init__(self, size, new_name, metadata, file_type): + self.size: int = size + self.id = str(uuid.uuid4()) + self.new_name = new_name + self.metadata = metadata + self.file_type = file_type + + +class FilePathInfo(FileInfo): + def __init__( + self, + name: str, + size: int, + new_name: Optional[str] = None, + metadata: Optional[Dict] = None, + file_type: Optional[FileInfoType] = FileInfoType.regular + ): + self.name: str = name + super().__init__(size=size, new_name=new_name, metadata=metadata, file_type=file_type) + + def __repr__(self): + return f"{self.name} (size={self.size}, file_type={self.file_type})" + + +class FileDataInfo(FileInfo): + def __init__( + self, + data: bytes, + size: int, + new_name: Optional[str] = None, + metadata: Optional[Dict] = None, + file_type: Optional[FileInfoType] = FileInfoType.regular + ): + self.data: bytes = data + super().__init__(size=size, new_name=new_name, metadata=metadata, file_type=file_type) + + def create_signal_file(file_path): """Just ensure the file exists""" with open(file_path, "w"): @@ -240,10 +289,13 @@ def list_basebackups_http(self, arg): self.storage = HTTPRestore(arg.host, arg.port, arg.site) self.storage.show_basebackup_list(verbose=arg.verbose) + def _get_site_prefix(self, site): + return self.config["backup_sites"][site]["prefix"] + def _get_object_storage(self, site, pgdata): storage_config = common.get_object_storage_config(self.config, site) storage = get_transfer(storage_config) - return ObjectStore(storage, self.config["backup_sites"][site]["prefix"], site, pgdata) + return ObjectStore(storage, self._get_site_prefix(site), site, pgdata) def list_basebackups(self, arg): """List basebackups from an object store""" @@ -330,6 +382,51 @@ def _find_nearest_basebackup(self, recovery_target_time=None): print("\nSelecting {!r} for restore".format(selected["name"])) return selected + def _get_delta_basebackup_data(self, site, metadata, basebackup_name): + basebackup_data_files = [] + bmeta_compressed = self.storage.get_file_bytes(basebackup_name) + with rohmufile.file_reader( + fileobj=io.BytesIO(bmeta_compressed), + metadata=metadata, + key_lookup=config.key_lookup_for_site(self.config, site) + ) as input_obj: + bmeta = common.extract_pghoard_delta_v1_metadata(input_obj) + self.log.debug("Delta backup metadata: %r", bmeta) + + delta_objects_path = os.path.join(self._get_site_prefix(site), "basebackup_delta") + + manifest = bmeta["manifest"] + snapshot_result = manifest["snapshot_result"] + backup_state = snapshot_result["state"] + files = backup_state["files"] + empty_dirs = backup_state["empty_dirs"] + tablespaces = bmeta["tablespaces"] + for delta_file in files: + if delta_file["hexdigest"]: + basebackup_data_files.append( + FilePathInfo( + name=os.path.join(delta_objects_path, delta_file["hexdigest"]), + size=delta_file["stored_file_size"], + new_name=delta_file["relative_path"], + file_type=FileInfoType.delta + ) + ) + elif delta_file["content_b64"] is not None: + # Restore embed files + basebackup_data_files.append( + FileDataInfo( + data=base64.b64decode(delta_file["content_b64"]), + metadata=metadata, + size=delta_file["file_size"], + new_name=delta_file["relative_path"], + file_type=FileInfoType.delta + ) + ) + + basebackup_data_files.append(FileDataInfo(data=bmeta_compressed, metadata=metadata, size=0)) + + return tablespaces, basebackup_data_files, empty_dirs + def _get_basebackup( self, pgdata, @@ -386,7 +483,9 @@ def _get_basebackup( "$PGDATA target directory {!r} exists, is not empty and --overwrite not specified, aborting.".format(pgdata) ) - if metadata.get("format") == "pghoard-bb-v2": + empty_dirs = None + basebackup_data_files: List[FileInfo] = [] + if metadata.get("format") == BaseBackupFormat.v2: # "Backup file" is a metadata object, fetch it to get more information bmeta_compressed = self.storage.get_file_bytes(basebackup["name"]) with rohmufile.file_reader( @@ -398,14 +497,16 @@ def _get_basebackup( self.log.debug("Backup metadata: %r", bmeta) tablespaces = bmeta["tablespaces"] - basebackup_data_files = [[ - os.path.join(self.config["backup_sites"][site]["prefix"], "basebackup_chunk", chunk["chunk_filename"]), - chunk["result_size"], - ] for chunk in bmeta["chunks"]] + basebackup_data_files = [ + FilePathInfo( + name=os.path.join(self._get_site_prefix(site), "basebackup_chunk", chunk["chunk_filename"]), + size=chunk["result_size"] + ) for chunk in bmeta["chunks"] + ] # We need the files from the main basebackup file too - basebackup_data_files.append([(bmeta_compressed, metadata), 0]) + basebackup_data_files.append(FileDataInfo(data=bmeta_compressed, metadata=metadata, size=0)) - elif metadata.get("format") == "pghoard-bb-v1": + elif metadata.get("format") == BaseBackupFormat.v1: # Tablespace information stored in object store metadata, look it up tsmetare = re.compile("^tablespace-name-([0-9]+)$") for kw, value in metadata.items(): @@ -420,11 +521,15 @@ def _get_basebackup( "path": tspath, } - basebackup_data_files = [[basebackup["name"], basebackup["size"]]] + basebackup_data_files = [FilePathInfo(name=basebackup["name"], size=basebackup["size"])] + elif metadata.get("format") == BaseBackupFormat.delta_v1: + tablespaces, basebackup_data_files, empty_dirs = self._get_delta_basebackup_data( + site, metadata, basebackup["name"] + ) else: # Object is a raw (encrypted, compressed) basebackup - basebackup_data_files = [[basebackup["name"], basebackup["size"]]] + basebackup_data_files = [FilePathInfo(name=basebackup["name"], size=basebackup["size"])] if tablespace_base_dir and not os.path.exists(tablespace_base_dir) and not overwrite: # we just care that the dir exists, but we're OK if there are other objects there @@ -470,6 +575,11 @@ def _get_basebackup( for dirname in dirs_to_create: os.makedirs(dirname) os.chmod(dirname, 0o700) + if empty_dirs: + for rel_path in empty_dirs: + dirname = os.path.join(pgdata, rel_path) + os.makedirs(dirname, exist_ok=True) + os.chmod(dirname, 0o700) fetcher = BasebackupFetcher( app_config=self.config, @@ -515,12 +625,12 @@ def run(self, args=None): return 1 -class BasebackupFetcher(): - def __init__(self, *, app_config, debug, site, pgdata, tablespaces, data_files, status_output_file=None): +class BasebackupFetcher: + def __init__(self, *, app_config, debug, site, pgdata, tablespaces, data_files: List[FileInfo], status_output_file=None): self.log = logging.getLogger(self.__class__.__name__) self.completed_jobs = set() self.config = app_config - self.data_files = [{"fn_or_data": item[0], "id": str(uuid.uuid4()), "size": item[1]} for item in data_files] + self.data_files = data_files self.debug = debug self.download_progress_per_file = {} self.errors = 0 @@ -593,10 +703,10 @@ def _process_count(self): return min(self.config["restore_process_count"], len(self.data_files)) def _setup_progress_tracking(self, manager): - self.total_download_size = sum(item["size"] for item in self.data_files) - initial_progress = [[item["fn_or_data"], item["size"] if item["id"] in self.completed_jobs else 0] + self.total_download_size = sum(item.size for item in self.data_files) + initial_progress = [[item.name, item.size if item.id in self.completed_jobs else 0] for item in self.data_files - if not isinstance(item["fn_or_data"], tuple)] + if isinstance(item, FilePathInfo)] self.download_progress_per_file = manager.dict(initial_progress) def current_progress(self): @@ -645,20 +755,20 @@ def jobs_in_progress(self): def _queue_jobs(self, pool): for item in self.data_files: with self.lock: - if item["id"] in self.completed_jobs or item["id"] in self.pending_jobs: + if item.id in self.completed_jobs or item.id in self.pending_jobs: continue - self.pending_jobs.add(item["id"]) - self._queue_job(pool, item["id"], item["fn_or_data"], item["size"]) + self.pending_jobs.add(item.id) + self._queue_job(pool, item) - def _queue_job(self, pool, key, data_file, data_file_size): + def _queue_job(self, pool, file_info: FileInfo): + key = file_info.id pool.apply_async( _fetch_and_process_chunk, [], { "app_config": self.config, "debug": self.debug, - "data_file": data_file, - "data_file_size": data_file_size, + "file_info": file_info, "download_progress_per_file": self.download_progress_per_file, "site": self.site, "pgdata": self.pgdata, @@ -705,18 +815,17 @@ def dict(self, *args, **kwargs): def _fetch_and_process_chunk( - *, app_config, debug, data_file, data_file_size, download_progress_per_file, site, pgdata, tablespaces + *, app_config, debug, file_info: FileInfo, download_progress_per_file, site, pgdata, tablespaces ): logutil.configure_logging(level=logging.DEBUG if debug else logging.INFO) - fetcher = ChunkFetcher(app_config, data_file, data_file_size, download_progress_per_file, site, pgdata, tablespaces) + fetcher = ChunkFetcher(app_config, file_info, download_progress_per_file, site, pgdata, tablespaces) fetcher.process_chunk() class ChunkFetcher: - def __init__(self, app_config, data_file, data_file_size, download_progress_per_file, site, pgdata, tablespaces): + def __init__(self, app_config, file_info: FileInfo, download_progress_per_file, site, pgdata, tablespaces): self.config = app_config - self.data_file = data_file - self.data_file_size = data_file_size + self.file_info = file_info self.download_progress_per_file = download_progress_per_file self.log = logging.getLogger(self.__class__.__name__) self.pgdata = pgdata @@ -728,30 +837,51 @@ def _create_transfer(self): return get_transfer(object_storage_config) def _progress_callback(self, current_pos, expected_max): - self.download_progress_per_file[self.data_file] = self.data_file_size * (current_pos / expected_max) + assert isinstance(self.file_info, FilePathInfo) + self.download_progress_per_file[self.file_info.name] = self.file_info.size * (current_pos / expected_max) def process_chunk(self): - self.log.debug("Processing one chunk: %r", self.data_file) - if isinstance(self.data_file, tuple): - data, metadata = self.data_file - src = io.BytesIO(data) - self._fetch_and_extract_one_backup(metadata, len(data), lambda sink: shutil.copyfileobj(src, sink)) - else: + self.log.info("Processing one chunk: %r", self.file_info) + + if self.file_info.file_type not in {FileInfoType.regular, FileInfoType.delta}: + raise NotImplementedError(f"Unknown FileInfoType {self.file_info.file_type}") + + if isinstance(self.file_info, FileDataInfo): + if self.file_info.file_type == FileInfoType.regular: + src = io.BytesIO(self.file_info.data) + self._fetch_and_extract_one_backup( + self.file_info.metadata, len(self.file_info.data), lambda sink: shutil.copyfileobj(src, sink) + ) + elif self.file_info.file_type == FileInfoType.delta: + assert self.file_info.new_name + with open(os.path.join(self.pgdata, self.file_info.new_name), "wb") as out_f: + out_f.write(self.file_info.data) + + elif isinstance(self.file_info, FilePathInfo): transfer = self._create_transfer() - metadata = transfer.get_metadata_for_key(self.data_file) + metadata = transfer.get_metadata_for_key(self.file_info.name) def fetch_fn(sink): - transfer.get_contents_to_fileobj(self.data_file, sink, progress_callback=self._progress_callback) + transfer.get_contents_to_fileobj(self.file_info.name, sink, progress_callback=self._progress_callback) - self._fetch_and_extract_one_backup(metadata, self.data_file_size, fetch_fn) + if self.file_info.file_type == FileInfoType.regular: + self._fetch_and_extract_one_backup(metadata, self.file_info.size, fetch_fn) + elif self.file_info.file_type == FileInfoType.delta: + assert self.file_info.new_name + self._fetch_delta_file(metadata, fetch_fn) + + else: + raise NotImplementedError(f"Unknown FileInfo {self.file_info.__class__.__name__}") def _build_tar_args(self, metadata): base_args = [self.config["tar_executable"], "-xf", "-", "-C", self.pgdata] file_format = metadata.get("format") if not file_format: return base_args - elif file_format in {"pghoard-bb-v1", "pghoard-bb-v2"}: + elif file_format in {BaseBackupFormat.v1, BaseBackupFormat.v2, BaseBackupFormat.delta_v1}: extra_args = ["--exclude", ".pghoard_tar_metadata.json", "--transform", "s,^pgdata/,,"] + if file_format == BaseBackupFormat.delta_v1: + extra_args += ["--exclude", ".manifest.json"] if self.tablespaces: extra_args.append("--absolute-names") for tsname, settings in self.tablespaces.items(): @@ -765,6 +895,20 @@ def _build_tar_args(self, metadata): else: raise RestoreError("Unrecognized basebackup format {!r}".format(file_format)) + def _fetch_delta_file(self, metadata, fetch_fn): + with open(os.path.join(self.pgdata, self.file_info.new_name), "wb") as target_file: + sink = rohmufile.create_sink_pipeline( + output=target_file, + file_size=self.file_info.size, + metadata=metadata, + key_lookup=config.key_lookup_for_site(self.config, self.site), + ) + fetch_fn(sink) + + self.log.info( + "Processing of delta file completed successfully: %r -> %r", self.file_info.name, self.file_info.new_name + ) + def _fetch_and_extract_one_backup(self, metadata, file_size, fetch_fn): with subprocess.Popen( self._build_tar_args(metadata), @@ -793,7 +937,7 @@ def _fetch_and_extract_one_backup(self, metadata, file_size, fetch_fn): tar.stdin = None output = tar.stderr.read() exit_code = tar.wait() - file_name = "" if isinstance(self.data_file, tuple) else self.data_file + file_name = "" if isinstance(self.file_info, FileDataInfo) else self.file_info.name if exit_code != 0: raise Exception("tar exited with code {!r} for file {!r}, output: {!r}".format(exit_code, file_name, output)) self.log.info("Processing of %r completed successfully", file_name) @@ -845,6 +989,8 @@ def main(): restore = Restore() return restore.run() except (InvalidConfigurationError, RestoreError) as ex: + import traceback + traceback.print_exc() print("FATAL: {}: {}".format(ex.__class__.__name__, ex)) return 1 diff --git a/pghoard/rohmu/dates.py b/pghoard/rohmu/dates.py index 05490e41..f85e1a22 100644 --- a/pghoard/rohmu/dates.py +++ b/pghoard/rohmu/dates.py @@ -39,3 +39,7 @@ def parse_timestamp(ts, *, with_tz=True, assume_local=False): tz = dateutil.tz.tzlocal() if assume_local else datetime.timezone.utc return dt.replace(tzinfo=tz) + + +def now(): + return datetime.datetime.now(datetime.timezone.utc) diff --git a/pghoard/rohmu/delta/__init__.py b/pghoard/rohmu/delta/__init__.py new file mode 100644 index 00000000..956777b8 --- /dev/null +++ b/pghoard/rohmu/delta/__init__.py @@ -0,0 +1 @@ +# Copyright (c) 2021 Aiven, Helsinki, Finland. https://aiven.io/ diff --git a/pghoard/rohmu/delta/common.py b/pghoard/rohmu/delta/common.py new file mode 100644 index 00000000..a1fd979b --- /dev/null +++ b/pghoard/rohmu/delta/common.py @@ -0,0 +1,282 @@ +# Copyright (c) 2021 Aiven, Helsinki, Finland. https://aiven.io/ +import functools +import hashlib +import json as _json +import logging +import math +import os +from datetime import datetime +from multiprocessing.dummy import Pool +from pathlib import Path +from typing import List, Optional + +from pydantic import BaseModel, Field + +from pghoard.rohmu.dates import now + +_hash = hashlib.blake2s +_log_1_1 = math.log(1.1) + +# Hexdigest is 32 bytes, so something orders of magnitude more at least +EMBEDDED_FILE_SIZE = 150 + +logger = logging.getLogger(__name__) + + +def hash_hexdigest_readable(f, *, read_buffer=1_000_000): + h = _hash() + while True: + data = f.read(read_buffer) + if not data: + break + h.update(data) + return h.hexdigest() + + +def increase_worth_reporting(value, new_value=None, *, total=None): + """ Make reporting sparser and sparser as values grow larger + - report every 1.1**N or so + - if we know total, report every percent + """ + if new_value is None: + new_value = value + value = new_value - 1 + if total is not None: + if new_value == total or total <= 100: + return True + old_percent = 100 * value // total + new_percent = 100 * new_value // total + return old_percent != new_percent + if value <= 10 or new_value <= 10: + return True + old_exp = int(math.log(value) / _log_1_1) + new_exp = int(math.log(new_value) / _log_1_1) + return old_exp != new_exp + + +class DeltaModel(BaseModel): + class Config: + # As we're keen to both export and decode json, just using + # enum values for encode/decode is much saner than the default + # enumname.value (it is also slightly less safe but oh well) + use_enum_values = True + + # Extra values should be errors, as they are most likely typos + # which lead to grief when not detected. However, if we ever + # start deprecating some old fields and not wanting to parse + # them, this might need to be revisited. + extra = "forbid" + + # Validate field default values too + validate_all = True + + # Validate also assignments + # validate_assignment = True + # TBD: Figure out why this doesn't work in some unit tests; + # possibly the tests themselves are broken + + def jsondict(self, **kw): + # By default, + # + # .json() returns json string. + # .dict() returns Python dict, but it has things that are not + # json serializable. + # + # We provide json seralizable dict (super inefficiently) here. + # + # This is mostly used for test code so that should be fine + return _json.loads(self.json(**kw)) + + +class SizeLimitedFile: + def __init__(self, *, path, file_size): + self._f = open(path, "rb") + self._file_size = file_size + self.tell = self._f.tell + + def __enter__(self): + return self + + def __exit__(self, t, v, tb): + self._f.close() + + def read(self, n=None): + can_read = max(0, self._file_size - self._f.tell()) + if n is None: + n = can_read + n = min(can_read, n) + return self._f.read(n) + + def seek(self, ofs, whence=0): + if whence == os.SEEK_END: + ofs += self._file_size + whence = os.SEEK_SET + return self._f.seek(ofs, whence) + + +class SnapshotHash(DeltaModel): + """ + This class represents something that is to be stored in the object storage. + + size is provided mainly to allow for even loading of nodes in case + same hexdigest is available from multiple nodes. + + """ + hexdigest: str + size: int + + def __eq__(self, other): + if isinstance(other, SnapshotHash): + return self.hexdigest == other.hexdigest + return False + + def __hash__(self): + # hexdigests should be unique, regardless of size + return hash(self.hexdigest) + + +@functools.total_ordering +class SnapshotFile(DeltaModel): + relative_path: Path + file_size: int + stored_file_size: int + mtime_ns: int + hexdigest: str = "" + content_b64: Optional[str] + + def __lt__(self, o): + # In our use case, paths uniquely identify files we care about + return self.relative_path < o.relative_path + + def equals_excluding_mtime(self, o): + return self.copy(update={"mtime_ns": 0}) == o.copy(update={"mtime_ns": 0}) + + def open_for_reading(self, root_path): + return SizeLimitedFile(path=root_path / self.relative_path, file_size=self.file_size) + + +class SnapshotState(DeltaModel): + root_globs: List[str] + files: List[SnapshotFile] + empty_dirs: List[Path] + + +class SnapshotResult(DeltaModel): + # when was the operation started ( / done ) + start: datetime = Field(default_factory=now) + end: Optional[datetime] + # + # should be passed opaquely to restore + state: Optional[SnapshotState] + # + # Summary data for manifest use + files: int = 0 + total_size: int = 0 + + # populated only if state is available + hashes: Optional[List[SnapshotHash]] + + +class SnapshotUploadResult(DeltaModel): + total_size: int = 0 + total_stored_size: int = 0 + + +class BackupManifest(DeltaModel): + start: datetime + end: datetime = Field(default_factory=now) + + # Filesystem snapshot contents of the backup + snapshot_result: SnapshotResult + + # What did the upload return (mostly for statistics) + upload_result: SnapshotUploadResult + + +class Progress(DeltaModel): + """ JSON-encodable progress meter of sorts """ + handled: int = 0 + failed: int = 0 + total: int = 0 + final: bool = False + + def __repr__(self): + finished = ", finished" if self.final else "" + return f"{self.handled}/{self.total} handled, {self.failed} failures{finished}" + + def start(self, n): + " Optional 'first' step, just for logic handling state (e.g. no progress object reuse desired) " + assert not self.total + logger.debug("start") + self.add_total(n) + + def add_total(self, n): + if not n: + return + old_total = self.total + self.total += n + if increase_worth_reporting(old_total, self.total): + logger.debug("add_total %r -> %r", n, self) + assert not self.final + + def add_fail(self, n=1, *, info="add_fail"): + assert n > 0 + old_failed = self.failed + self.failed += n + if increase_worth_reporting(old_failed, self.failed): + logger.debug("%s %r -> %r", info, n, self) + assert not self.final + + def add_success(self, n=1, *, info="add_success"): + assert n > 0 + old_handled = self.handled + self.handled += n + assert self.handled <= self.total + if increase_worth_reporting(old_handled, self.handled, total=self.total): + logger.debug("%s %r -> %r", info, n, self) + assert not self.final + + def download_success(self, size): + self.add_success(size, info="download_success") + + def upload_success(self, hexdigest): + self.add_success(info=f"upload_success {hexdigest}") + + def upload_missing(self, hexdigest): + self.add_fail(info=f"upload_missing {hexdigest}") + + def upload_failure(self, hexdigest): + self.add_fail(info=f"upload_failure {hexdigest}") + + def done(self): + assert self.total is not None and self.handled <= self.total + assert not self.final + self.final = True + logger.debug("done %r", self) + + @property + def finished_successfully(self): + return self.final and not self.failed and self.handled == self.total + + @property + def finished_failed(self): + return self.final and not self.finished_successfully + + @classmethod + def merge(cls, progresses): + p = cls() + for progress in progresses: + p.handled += progress.handled + p.failed += progress.failed + p.total += progress.total + p.final = all(progress.final for progress in progresses) + return p + + +def parallel_map_to(*, fun, iterable, result_callback, n=None) -> bool: + iterable_as_list = list(iterable) + with Pool(n) as p: + for map_in, map_out in zip(iterable_as_list, p.imap(fun, iterable_as_list)): + if not result_callback(map_in=map_in, map_out=map_out): + return False + return True diff --git a/pghoard/rohmu/delta/snapshot.py b/pghoard/rohmu/delta/snapshot.py new file mode 100644 index 00000000..a3e53976 --- /dev/null +++ b/pghoard/rohmu/delta/snapshot.py @@ -0,0 +1,238 @@ +# Copyright (c) 2021 Aiven, Helsinki, Finland. https://aiven.io/ +import base64 +import logging +import os +import threading +from pathlib import Path +from typing import Callable, Optional + +from pghoard.rohmu.delta.common import ( + EMBEDDED_FILE_SIZE, Progress, SnapshotFile, SnapshotHash, SnapshotState, hash_hexdigest_readable, + increase_worth_reporting, parallel_map_to +) + +logger = logging.getLogger(__name__) + + +class Snapshotter: + """Snapshotter keeps track of files on disk, and their hashes. + + The hash on disk MAY change, which may require subsequent + incremential snapshot and-or ignoring the files which have changed. + + The output to outside is just root object's hash, as well as list + of other hashes which correspond to files referred to within the + file list contained in root object. + + Note that any call to public API MUST be made with + snapshotter.lock held. This is because Snapshotter is process-wide + utility that is shared across operations, possibly used from + multiple threads, and the single-operation-only mode of operation + is not exactly flawless (the 'new operation can be started with + old running' is intentional feature but new operation should + eventually replace the old). The lock itself might not need to be + built-in to Snapshotter, but having it there enables asserting its + state during public API calls. + """ + def __init__(self, *, src, dst, globs, src_iterate_func: Optional[Callable] = None, parallel=1): + assert globs + self.src = Path(src) + self.dst = Path(dst) + self.globs = globs + self.src_iterate_func = src_iterate_func + self.relative_path_to_snapshotfile = {} + self.hexdigest_to_snapshotfiles = {} + self.parallel = parallel + self.lock = threading.Lock() + self.empty_dirs = [] + + def _list_files(self, basepath: Path): + result_files = set() + for glob in self.globs: + for path in basepath.glob(glob): + if not path.is_file() or path.is_symlink(): + continue + relpath = path.relative_to(basepath) + result_files.add(relpath) + + return sorted(result_files) + + def _list_dirs_and_files(self, basepath: Path): + files = self._list_files(basepath) + dirs = {p.parent for p in files} + return sorted(dirs), files + + def _add_snapshotfile(self, snapshotfile: SnapshotFile): + old_snapshotfile = self.relative_path_to_snapshotfile.get(snapshotfile.relative_path, None) + if old_snapshotfile: + self._remove_snapshotfile(old_snapshotfile) + self.relative_path_to_snapshotfile[snapshotfile.relative_path] = snapshotfile + if snapshotfile.hexdigest: + self.hexdigest_to_snapshotfiles.setdefault(snapshotfile.hexdigest, []).append(snapshotfile) + + def _remove_snapshotfile(self, snapshotfile: SnapshotFile): + assert self.relative_path_to_snapshotfile[snapshotfile.relative_path] == snapshotfile + del self.relative_path_to_snapshotfile[snapshotfile.relative_path] + if snapshotfile.hexdigest: + self.hexdigest_to_snapshotfiles[snapshotfile.hexdigest].remove(snapshotfile) + + def _snapshotfile_from_path(self, relative_path): + src_path = self.src / relative_path + st = src_path.stat() + return SnapshotFile(relative_path=relative_path, mtime_ns=st.st_mtime_ns, file_size=st.st_size, stored_file_size=0) + + def _gen_snapshot_hashes(self, relative_paths, reuse_old_snapshotfiles): + same = 0 + lost = 0 + for relative_path in relative_paths: + old_snapshotfile = self.relative_path_to_snapshotfile.get(relative_path) + try: + snapshotfile = self._snapshotfile_from_path(relative_path) + except FileNotFoundError: + lost += 1 + if increase_worth_reporting(lost): + logger.debug("#%d. lost - %s disappeared before stat, ignoring", lost, self.src / relative_path) + continue + if reuse_old_snapshotfiles and old_snapshotfile: + snapshotfile.hexdigest = old_snapshotfile.hexdigest + snapshotfile.content_b64 = old_snapshotfile.content_b64 + if old_snapshotfile == snapshotfile: + same += 1 + if increase_worth_reporting(same): + logger.debug("#%d. same - %r in %s is same", same, old_snapshotfile, relative_path) + continue + yield snapshotfile + + def get_snapshot_hashes(self): + assert self.lock.locked() + return [ + SnapshotHash(hexdigest=dig, size=sf[0].file_size) for dig, sf in self.hexdigest_to_snapshotfiles.items() if sf + ] + + def get_snapshot_state(self): + assert self.lock.locked() + return SnapshotState( + root_globs=self.globs, files=sorted(self.relative_path_to_snapshotfile.values()), empty_dirs=self.empty_dirs + ) + + def update_snapshot_file_data(self, *, relative_path, hexdigest, file_size, stored_file_size): + snapshotfile = self.relative_path_to_snapshotfile[relative_path] + snapshotfile.hexdigest = hexdigest + snapshotfile.file_size = file_size + snapshotfile.stored_file_size = stored_file_size + + def _snapshot_create_missing_directories(self, *, src_dirs, dst_dirs): + changes = 0 + for i, relative_dir in enumerate(set(src_dirs).difference(dst_dirs), 1): + dst_path = self.dst / relative_dir + dst_path.mkdir(parents=True, exist_ok=True) + if increase_worth_reporting(i): + logger.debug("#%d. new directory: %r", i, relative_dir) + changes += 1 + return changes + + def _snapshot_remove_extra_files(self, *, src_files, dst_files): + changes = 0 + for i, relative_path in enumerate(set(dst_files).difference(src_files), 1): + dst_path = self.dst / relative_path + snapshotfile = self.relative_path_to_snapshotfile.get(relative_path) + if snapshotfile: + self._remove_snapshotfile(snapshotfile) + dst_path.unlink() + if increase_worth_reporting(i): + logger.debug("#%d. extra file: %r", i, relative_path) + changes += 1 + return changes + + def _snapshot_add_missing_files(self, *, src_files, dst_files): + existing = 0 + disappeared = 0 + changes = 0 + for i, relative_path in enumerate(set(src_files).difference(dst_files), 1): + src_path = self.src / relative_path + dst_path = self.dst / relative_path + try: + os.link(src=src_path, dst=dst_path, follow_symlinks=False) + except FileExistsError: + # This happens only if snapshot is started twice at + # same time. While it is technically speaking upstream + # error, we rather handle it here than leave + # exceptions not handled. + existing += 1 + if increase_worth_reporting(existing): + logger.debug("#%d. %s already existed, ignoring", existing, src_path) + continue + except FileNotFoundError: + disappeared += 1 + if increase_worth_reporting(disappeared): + logger.debug("#%d. %s disappeared before linking, ignoring", disappeared, src_path) + continue + if increase_worth_reporting(i - disappeared): + logger.debug("#%d. new file: %r", i - disappeared, relative_path) + changes += 1 + return changes + + def snapshot(self, *, progress: Optional[Progress] = None, reuse_old_snapshotfiles=True): + assert self.lock.locked() + + if progress is None: + progress = Progress() + progress.start(3) + + if self.src_iterate_func: + src_dirs = set() + src_files = set() + for item in self.src_iterate_func(): + path = Path(item) + if path.is_file() and not path.is_symlink(): + src_files.add(path.relative_to(self.src)) + elif path.is_dir(): + src_dirs.add(path.relative_to(self.src)) + + src_dirs = sorted(src_dirs | {p.parent for p in src_files}) + src_files = sorted(src_files) + else: + src_dirs, src_files = self._list_dirs_and_files(self.src) + + dst_dirs, dst_files = self._list_dirs_and_files(self.dst) + + # Create missing directories + changes = self._snapshot_create_missing_directories(src_dirs=src_dirs, dst_dirs=dst_dirs) + progress.add_success() + + # Remove extra files + changes += self._snapshot_remove_extra_files(src_files=src_files, dst_files=dst_files) + progress.add_success() + + # Add missing files + changes += self._snapshot_add_missing_files(src_files=src_files, dst_files=dst_files) + progress.add_success() + + # We COULD also remove extra directories, but it is not + # probably really worth it and due to ignored files it + # actually might not even work. + + # Then, create/update corresponding snapshotfile objects (old + # ones were already removed) + dst_dirs, dst_files = self._list_dirs_and_files(self.dst) + self.empty_dirs = src_dirs + snapshotfiles = list(self._gen_snapshot_hashes(dst_files, reuse_old_snapshotfiles)) + progress.add_total(len(snapshotfiles)) + + def _cb(snapshotfile): + # src may or may not be present; dst is present as it is in snapshot + with snapshotfile.open_for_reading(self.dst) as f: + if snapshotfile.file_size <= EMBEDDED_FILE_SIZE: + snapshotfile.content_b64 = base64.b64encode(f.read()).decode() + else: + snapshotfile.hexdigest = hash_hexdigest_readable(f) + return snapshotfile + + def _result_cb(*, map_in, map_out): + self._add_snapshotfile(map_out) + progress.add_success() + return True + + changes += len(snapshotfiles) + parallel_map_to(iterable=snapshotfiles, fun=_cb, result_callback=_result_cb, n=self.parallel) + return changes diff --git a/pghoard/transfer.py b/pghoard/transfer.py index 1ee728d8..a32c66cb 100644 --- a/pghoard/transfer.py +++ b/pghoard/transfer.py @@ -51,6 +51,7 @@ def defaults(): return { "basebackup": EMPTY.copy(), "basebackup_chunk": EMPTY.copy(), + "basebackup_delta": EMPTY.copy(), "timeline": EMPTY.copy(), "xlog": EMPTY.copy(), } @@ -76,6 +77,8 @@ def form_key_path(file_to_transfer): name_parts = file_to_transfer["local_path"].split("/") if file_to_transfer["filetype"] == "basebackup_chunk": name = os.path.join(name_parts[-2], name_parts[-1]) + elif file_to_transfer["filetype"] == "basebackup_delta": + name = file_to_transfer["delta"]["hexdigest"] else: name = name_parts[-1] return os.path.join(file_to_transfer["prefix"], file_to_transfer["filetype"], name) @@ -139,7 +142,7 @@ def run(self): if oper == "upload": if filetype == "xlog": self.state[site][oper]["xlog"]["xlogs_since_basebackup"] += 1 - elif filetype == "basebackup": + elif filetype in {"basebackup", "basebackup_delta"}: # reset corresponding xlog stats at basebackup self.state[site][oper]["xlog"]["xlogs_since_basebackup"] = 0 @@ -243,7 +246,7 @@ def handle_upload(self, site, key, file_to_transfer): else: # Basebackups may be multipart uploads, depending on the driver. # Swift needs to know about this so it can do possible cleanups. - multipart = file_to_transfer["filetype"] in {"basebackup", "basebackup_chunk"} + multipart = file_to_transfer["filetype"] in {"basebackup", "basebackup_chunk", "basebackup_delta"} try: self.log.info("Uploading file to object store: src=%r dst=%r", file_to_transfer["local_path"], key) storage.store_file_from_disk( diff --git a/requirements.txt b/requirements.txt index bed0bf82..de37b6fd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,7 @@ google-api-python-client httplib2 oauth2client psycopg2 +pydantic python-dateutil python-snappy python-systemd diff --git a/setup.py b/setup.py index 5de6b3fb..36918f41 100644 --- a/setup.py +++ b/setup.py @@ -20,6 +20,7 @@ install_requires=[ "cryptography", "psycopg2 >= 2.0.0", + "pydantic", "python-dateutil", "python-snappy >= 0.5", "requests >= 1.2.0", diff --git a/test/basebackup_delta/chunks/0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b b/test/basebackup_delta/chunks/0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b new file mode 100644 index 00000000..51f60f46 Binary files /dev/null and b/test/basebackup_delta/chunks/0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b differ diff --git a/test/basebackup_delta/chunks/0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b.metadata b/test/basebackup_delta/chunks/0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b.metadata new file mode 100644 index 00000000..3cf8a38d --- /dev/null +++ b/test/basebackup_delta/chunks/0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b.metadata @@ -0,0 +1 @@ +{"compression-algorithm": "snappy", "format": "pghoard-delta-v1", "original-file-size": "16384"} diff --git a/test/basebackup_delta/chunks/4b65df4d0857bbbcb22aa086e02bd8414a9f3a484869f2b96ed7c62f3c4eb088 b/test/basebackup_delta/chunks/4b65df4d0857bbbcb22aa086e02bd8414a9f3a484869f2b96ed7c62f3c4eb088 new file mode 100644 index 00000000..19cd2819 Binary files /dev/null and b/test/basebackup_delta/chunks/4b65df4d0857bbbcb22aa086e02bd8414a9f3a484869f2b96ed7c62f3c4eb088 differ diff --git a/test/basebackup_delta/chunks/4b65df4d0857bbbcb22aa086e02bd8414a9f3a484869f2b96ed7c62f3c4eb088.metadata b/test/basebackup_delta/chunks/4b65df4d0857bbbcb22aa086e02bd8414a9f3a484869f2b96ed7c62f3c4eb088.metadata new file mode 100644 index 00000000..086318b3 --- /dev/null +++ b/test/basebackup_delta/chunks/4b65df4d0857bbbcb22aa086e02bd8414a9f3a484869f2b96ed7c62f3c4eb088.metadata @@ -0,0 +1 @@ +{"compression-algorithm": "snappy", "format": "pghoard-delta-v1", "original-file-size": "8192"} diff --git a/test/basebackup_delta/chunks/fc61c91430dcb345001306ad513f103380c16896093a17868fc909aeda393559 b/test/basebackup_delta/chunks/fc61c91430dcb345001306ad513f103380c16896093a17868fc909aeda393559 new file mode 100644 index 00000000..4ca21d76 Binary files /dev/null and b/test/basebackup_delta/chunks/fc61c91430dcb345001306ad513f103380c16896093a17868fc909aeda393559 differ diff --git a/test/basebackup_delta/chunks/fc61c91430dcb345001306ad513f103380c16896093a17868fc909aeda393559.metadata b/test/basebackup_delta/chunks/fc61c91430dcb345001306ad513f103380c16896093a17868fc909aeda393559.metadata new file mode 100644 index 00000000..3c289d7c --- /dev/null +++ b/test/basebackup_delta/chunks/fc61c91430dcb345001306ad513f103380c16896093a17868fc909aeda393559.metadata @@ -0,0 +1 @@ +{"compression-algorithm": "snappy", "format": "pghoard-delta-v1", "original-file-size": "24576"} diff --git a/test/basebackup_delta/config.json b/test/basebackup_delta/config.json new file mode 100644 index 00000000..833d29b5 --- /dev/null +++ b/test/basebackup_delta/config.json @@ -0,0 +1,23 @@ +{ + "backup_sites": { + "f73f56ee-6b9f-4ce0-b7aa-a170d58da833": { + "encryption_key_id": "5ba999de817c49a682ffed124abf9a2e", + "encryption_keys": { + "5ba999de817c49a682ffed124abf9a2e": { + "private": "-----BEGIN PRIVATE KEY-----\nMIIG/AIBADANBgkqhkiG9w0BAQEFAASCBuYwggbiAgEAAoIBgQC/e0jNVBCB8pxK\nwPmJUlus6q+mKQ9QD0esP/TzBZ6TwKiMlMukwh0FCah88UTf/9VNDEvgzFhrcbEc\n0O0ZKywSHNFOEq6onm3QWRqnMvXZLlTyhIBZRuLB8Vt3WH3Atv6BwbLRgFuT6Rfx\nopOGptmNQsOMT4z9lB2n2JiTBJsg7+iAfw6ZltuBSIjC8/5flcmYkkTQFEHUJ3RY\nOTjlqTY7y8J464qvXgQIUE/kCx7np4pdvWc3Zf9l1hgv5Ol/escpr8Mo2Cg5Qhjg\ntLzB4emx8dfnoV1oc77F4XEFj5SzBZqgKy/pV5yy8UtJ6NDBEwG4cK5kXXFyu31q\nc+XYzgD2SdmG3Fqvy5Ikwj8Sx82zrwBWEM8etEy7CniyZi7+Sr9G4NfFvBG8OOQ2\nXGMfnl8doEFYRtVdjU0o3VSn37ASCW8XsHgd/Zobu6k9sMKdU1iNrEjkiSGQtlbD\nsq8GMuq8saSbPyRUNQO7mFQC2F6K49KGgxpn4d65G0wQMMnbq4sCAwEAAQKCAYAK\nYYOr5g/TC7UfdGDS6g0gTcTiDD3RSFLJato7xqU3O22n2XVE5GUwXbqts2LZhgQp\nXi5K7KkqggppFoaUI7wK61cJlYe0iopHjl0cjW24rYNbdoWC0Y3/l7cuvDRtGz6n\nCDpKk1vjo/JxXjADT85hkyoI1FM/eCU3cU2sQsaqPXdsZ/cBqqUR2D3Z2+KBihxY\n0i063q5G8zCii8+i286d5UkQxyxIn582WCxMn7G4O2QL+vW6kiQLgFTlW9Kw35YO\nfbM04zUmpvZpjCJuqLDw/2x7/sJn17vzQ+LsdBz/JXaLEidkI4Tr4caWqSjJZVBK\nb5LzeOrqCBcsppXUhN+4yvqCAtK0iUCLbFwPuqqmv3Ly2GpWQBPEgozdaXZOOGzc\ne7M5xCDH5bQRR/3e6Md9fHl/ATKD5eSun+7dyjNq8FJHeUAEKS2okd5oFmTD4mkZ\nxV1GTHTLzUy0qeyiUoGCxnCnS8q9rx2Os4j4/Y+aQI6xzUWYK+Zb57o9w7kIirEC\ngcEA8BHiDzTM4HgmKJ5kPkB38JYkoI0hFzofesvvKg8Nx4hEg6KBL3t7kJPqdJgm\ntZHwhg3EwzKIRCY+wHCpYy0ouX/MAfKjWBI9uAYhTB5nbJEqxFNo18uK40PoWRMt\nNSralCrRcIb5z/Kl5WTsXN6DHJFTDwsGHF41LWrnCXRNM/XfzJvE7X0VlcjbL/A9\n0cxfEuTb7k2xhJNBbizCvdw92mmDemFxq9PO63966nnOIjwaX3m4G1yg3nsUGeTY\nVmpZAoHBAMwwBaj9x29Cxjn1Dy4rF5GiNErO5JY9mZnroa4I43zbYuXww5mWTpHs\nORntM1XtKt2E5KeW9fBUdJPWh4epnh2cyFmodSxaEk35wfnEGnMPlwHTgZvUE7kz\n6VXqDEGK4mYmzof6edbpg+tejx8SsK6Pwrt1Moj9evGs+I38oK2IRJchIS7Ur+qj\njLS13Z1mxLmAquDsmzAVgEL13rgUsBsiV+ghEEpvPS8gb9yZmpTiKD07JKml18VK\n06FsF9VAgwKBwBT/O2phD9pCJ/Q9hj77nIHqX+G69j3103MGCzD+iBH/lR5+RBZH\nEpOenE7+T8Rps0PGSINaFBkBz1M9h1MpS/qNduZktmypi8RgpODnd9xDBh6NvQnC\nv68I7XV8++M+kEeNRyw0Yf0SF/hsT1AAFi+VdlJGgI5SnwwN4Y8uIOJ+ish2h07O\nNekX3DPhK0cCPP6GDcZV/US/LGXafF3muXI08E7v3uVMbTijubhwVtsfrp7TIosi\nGt/am/N31IQaYQKBwAmUmz9hoPDsfiaMBAlThkiUBsYXzQvrmgBp2O00h4/9LzfA\nwzy6m7cnEUrRIV5/wUohiST/5UxAejPRlgxcfgm/qHrkd5L8Ku2zsVFJzT/m1FwG\nk4c/PSmscN9SGv8cSCEo4vnoW70kucbaafa4Rsf6ANYQ2q0oz5L1XbgzyUo7IZTB\nvi/XVOW6hMiZ2+sdvk9B5UKmd2WbLKh3ptqWRekQBHXkz0He1E0YxYbhQiqILgEp\nfD/lgylDqIhjbP7ZhwKBwFYIBmEYEs/+ixHkHGbzE4tP+VhgzQ7we1+xyqN7Z69e\nf7StEuWWIZ5Os9JpdhZmN/9qb4kzL+Pb0vELuNHLwv08MnJ792wA9dPeCuYLKKiK\nUBuBwnslBy69tCRSXQ8ltH3kX67lxCu28hDRg+oN83bEczPJ2I/x7BKDrWNYZvrq\nJw7Ijp5l5gLqtdb7eOI8mSs5F7yaUKR6yVhl3ifc6bQUmmGmmElobpCpcGqAmXSj\noAeJCZug3MWS2fxhluthxQ==\n-----END PRIVATE KEY-----\n", + "public": "-----BEGIN PUBLIC KEY-----\nMIIBojANBgkqhkiG9w0BAQEFAAOCAY8AMIIBigKCAYEAv3tIzVQQgfKcSsD5iVJb\nrOqvpikPUA9HrD/08wWek8CojJTLpMIdBQmofPFE3//VTQxL4MxYa3GxHNDtGSss\nEhzRThKuqJ5t0FkapzL12S5U8oSAWUbiwfFbd1h9wLb+gcGy0YBbk+kX8aKThqbZ\njULDjE+M/ZQdp9iYkwSbIO/ogH8OmZbbgUiIwvP+X5XJmJJE0BRB1Cd0WDk45ak2\nO8vCeOuKr14ECFBP5Ase56eKXb1nN2X/ZdYYL+Tpf3rHKa/DKNgoOUIY4LS8weHp\nsfHX56FdaHO+xeFxBY+UswWaoCsv6VecsvFLSejQwRMBuHCuZF1xcrt9anPl2M4A\n9knZhtxar8uSJMI/EsfNs68AVhDPHrRMuwp4smYu/kq/RuDXxbwRvDjkNlxjH55f\nHaBBWEbVXY1NKN1Up9+wEglvF7B4Hf2aG7upPbDCnVNYjaxI5IkhkLZWw7KvBjLq\nvLGkmz8kVDUDu5hUAtheiuPShoMaZ+HeuRtMEDDJ26uLAgMBAAE=\n-----END PUBLIC KEY-----\n" + } + }, + "object_storage": { + "directory": "test/basebackup_delta/chunks", + "storage_type": "local" + }, + "prefix": "1052a492-1a01-459d-a126-9db8518724c0/f73f56ee-6b9f-4ce0-b7aa-a170d58da833" + } + }, + "compression": { + "algorithm": "snappy" + }, + "log_level": "INFO", + "restore_process_count": 2 +} diff --git a/test/conftest.py b/test/conftest.py index 55cdaa2f..f10f5cd0 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -15,6 +15,7 @@ import tempfile import time from distutils.version import LooseVersion +from pathlib import Path from unittest import SkipTest import psycopg2 @@ -25,6 +26,8 @@ from pghoard import logutil, pgutil from pghoard.pghoard import PGHoard from pghoard.rohmu.compat import suppress +from pghoard.rohmu.delta.common import EMBEDDED_FILE_SIZE, Progress +from pghoard.rohmu.delta.snapshot import Snapshotter from pghoard.rohmu.snappyfile import snappy logutil.configure_logging() @@ -337,3 +340,28 @@ def pghoard_metrics(db, tmpdir, request): # pylint: disable=redefined-outer-nam }, } yield from pghoard_base(db, tmpdir, request, metrics_cfg=metrics_cfg) + + +class SnapshotterWithDefaults(Snapshotter): + def create_4foobar(self): + (self.src / "foo").write_text("foobar") + (self.src / "foo2").write_text("foobar") + (self.src / "foobig").write_text("foobar" * EMBEDDED_FILE_SIZE) + (self.src / "foobig2").write_text("foobar" * EMBEDDED_FILE_SIZE) + progress = Progress() + assert self.snapshot(progress=progress) > 0 + ss1 = self.get_snapshot_state() + assert self.snapshot(progress=Progress()) == 0 + ss2 = self.get_snapshot_state() + print("ss1", ss1) + print("ss2", ss2) + assert ss1 == ss2 + + +@pytest.fixture(name="snapshotter") +def fixture_snapshotter(tmpdir): + src = Path(tmpdir) / "src" + src.mkdir() + dst = Path(tmpdir) / "dst" + dst.mkdir() + yield SnapshotterWithDefaults(src=src, dst=dst, globs=["*"], parallel=1) diff --git a/test/test_basebackup.py b/test/test_basebackup.py index 6ec8ca9a..480a8896 100644 --- a/test/test_basebackup.py +++ b/test/test_basebackup.py @@ -18,6 +18,7 @@ from pghoard import common, metrics, pgutil from pghoard.basebackup import PGBaseBackup +from pghoard.common import BaseBackupMode from pghoard.restore import Restore, RestoreError from pghoard.rohmu import get_transfer from pghoard.rohmu.compat import makedirs @@ -50,6 +51,7 @@ def test_parse_backup_label(self, tmpdir): connection_info=None, basebackup_path=None, compression_queue=None, + storage=None, transfer_queue=None, metrics=metrics.Metrics(statsd={}) ) @@ -80,6 +82,7 @@ def create_test_files(): connection_info=None, basebackup_path=None, compression_queue=None, + storage=None, transfer_queue=None, metrics=metrics.Metrics(statsd={}) ) @@ -174,6 +177,7 @@ def test_find_and_split_files_to_backup(self, tmpdir): connection_info=None, basebackup_path=None, compression_queue=None, + storage=None, transfer_queue=None, metrics=metrics.Metrics(statsd={}) ) @@ -240,7 +244,7 @@ def _test_create_basebackup(self, capsys, db, pghoard, mode, replica=False, acti assert "pg-version" in out assert "start-wal-segment" in out - if mode == "local-tar": + if mode in {BaseBackupMode.local_tar, BaseBackupMode.delta}: assert "end-time" in out if replica is False: assert "end-wal-segment" in out @@ -255,7 +259,7 @@ def _test_create_basebackup(self, capsys, db, pghoard, mode, replica=False, acti assert backup["metadata"]["backup-reason"] == "scheduled" assert backup["metadata"]["backup-decision-time"] == now.isoformat() assert backup["metadata"]["normalized-backup-time"] == now.isoformat() - if mode == "local-tar": + if mode in {BaseBackupMode.local_tar, BaseBackupMode.delta}: if replica is False: assert "end-wal-segment" in backup["metadata"] assert "end-time" in backup["metadata"] @@ -313,6 +317,7 @@ def _test_restore_basebackup(self, db, pghoard, tmpdir, active_backup_mode="arch connection_info=None, basebackup_path=None, compression_queue=None, + storage=storage, transfer_queue=None, metrics=metrics.Metrics(statsd={}) ) @@ -341,28 +346,31 @@ def _test_basebackups(self, capsys, db, pghoard, tmpdir, mode, *, replica=False) self._test_restore_basebackup(db, pghoard, tmpdir) def test_basic_standalone_hot_backups(self, capsys, db, pghoard, tmpdir): - self._test_create_basebackup(capsys, db, pghoard, "basic", False, "standalone_hot_backup") + self._test_create_basebackup(capsys, db, pghoard, BaseBackupMode.basic, False, "standalone_hot_backup") self._test_restore_basebackup(db, pghoard, tmpdir, "standalone_hot_backup") def test_pipe_standalone_hot_backups(self, capsys, db, pghoard, tmpdir): - self._test_create_basebackup(capsys, db, pghoard, "pipe", False, "standalone_hot_backup") + self._test_create_basebackup(capsys, db, pghoard, BaseBackupMode.pipe, False, "standalone_hot_backup") self._test_restore_basebackup(db, pghoard, tmpdir, "standalone_hot_backup") def test_basebackups_basic(self, capsys, db, pghoard, tmpdir): - self._test_basebackups(capsys, db, pghoard, tmpdir, "basic") + self._test_basebackups(capsys, db, pghoard, tmpdir, BaseBackupMode.basic) def test_basebackups_basic_lzma(self, capsys, db, pghoard_lzma, tmpdir): - self._test_basebackups(capsys, db, pghoard_lzma, tmpdir, "basic") + self._test_basebackups(capsys, db, pghoard_lzma, tmpdir, BaseBackupMode.basic) + + def test_basebackups_delta(self, capsys, db, pghoard, tmpdir): + self._test_basebackups(capsys, db, pghoard, tmpdir, BaseBackupMode.delta) def test_basebackups_local_tar_nonexclusive(self, capsys, db, pghoard, tmpdir): if db.pgver < "9.6": pytest.skip("PostgreSQL 9.6+ required for non-exclusive backups") - self._test_basebackups(capsys, db, pghoard, tmpdir, "local-tar") + self._test_basebackups(capsys, db, pghoard, tmpdir, BaseBackupMode.local_tar) def test_basebackups_local_tar_legacy(self, capsys, db, pghoard, tmpdir): if db.pgver >= "9.6": pytest.skip("PostgreSQL < 9.6 required for exclusive backup tests") - self._test_basebackups(capsys, db, pghoard, tmpdir, "local-tar") + self._test_basebackups(capsys, db, pghoard, tmpdir, BaseBackupMode.local_tar) def test_basebackups_local_tar_exclusive_conflict(self, capsys, db, pghoard, tmpdir): if db.pgver >= "9.6": @@ -375,7 +383,7 @@ def test_basebackups_local_tar_exclusive_conflict(self, capsys, db, pghoard, tmp cursor = conn.cursor() cursor.execute("SELECT pg_start_backup('conflicting')") # pylint: disable=used-before-assignment need_stop = True - self._test_basebackups(capsys, db, pghoard, tmpdir, "local-tar") + self._test_basebackups(capsys, db, pghoard, tmpdir, BaseBackupMode.local_tar) need_stop = False finally: if need_stop: @@ -394,14 +402,14 @@ def test_basebackups_local_tar_pgespresso(self, capsys, db, pghoard, tmpdir): pytest.skip("pgespresso not available") try: cursor.execute("CREATE EXTENSION pgespresso") - self._test_basebackups(capsys, db, pghoard, tmpdir, "local-tar") + self._test_basebackups(capsys, db, pghoard, tmpdir, BaseBackupMode.local_tar) finally: cursor.execute("DROP EXTENSION pgespresso") def test_basebackups_replica_local_tar_nonexclusive(self, capsys, recovery_db, pghoard, tmpdir): if recovery_db.pgver < "9.6": pytest.skip("PostgreSQL 9.6+ required for non-exclusive backups") - self._test_basebackups(capsys, recovery_db, pghoard, tmpdir, "local-tar", replica=True) + self._test_basebackups(capsys, recovery_db, pghoard, tmpdir, BaseBackupMode.local_tar, replica=True) def test_basebackups_replica_local_tar_pgespresso(self, capsys, recovery_db, pghoard, tmpdir): conn_str = pgutil.create_connection_string(recovery_db.user) @@ -411,10 +419,10 @@ def test_basebackups_replica_local_tar_pgespresso(self, capsys, recovery_db, pgh cursor.execute("SELECT 1 FROM pg_available_extensions WHERE name = 'pgespresso' AND default_version >= '1.2'") if not cursor.fetchone(): pytest.skip("pgespresso not available") - self._test_basebackups(capsys, recovery_db, pghoard, tmpdir, "local-tar", replica=True) + self._test_basebackups(capsys, recovery_db, pghoard, tmpdir, BaseBackupMode.local_tar, replica=True) def test_basebackups_pipe(self, capsys, db, pghoard, tmpdir): - self._test_basebackups(capsys, db, pghoard, tmpdir, "pipe") + self._test_basebackups(capsys, db, pghoard, tmpdir, BaseBackupMode.pipe) def test_basebackups_tablespaces(self, capsys, db, pghoard, tmpdir): # Create a test tablespace for this instance, but make sure we drop it at the end of the test as the diff --git a/test/test_pghoard.py b/test/test_pghoard.py index 14ed88be..aab62b4c 100644 --- a/test/test_pghoard.py +++ b/test/test_pghoard.py @@ -5,18 +5,23 @@ See LICENSE for details """ import datetime +import io import json import os +import tarfile import time +from pathlib import Path from unittest.mock import Mock, patch import psycopg2 -from pghoard.common import (create_alert_file, delete_alert_file, write_json_file) +from pghoard import common +from pghoard.common import (BaseBackupFormat, create_alert_file, delete_alert_file, write_json_file) from pghoard.pghoard import PGHoard from pghoard.pgutil import create_connection_string - # pylint: disable=attribute-defined-outside-init +from pghoard.rohmu import rohmufile + from .base import PGHoardTestCase @@ -340,6 +345,118 @@ def write_backup_and_wal_files(what): assert len(basebackups) == 1 assert len(os.listdir(wal_storage_path)) == 1 + def test_local_refresh_backup_list_and_delete_old_delta_format(self): + basebackup_storage_path = os.path.join(self.local_storage_dir, "basebackup") + basebackup_delta_path = os.path.join(self.local_storage_dir, "basebackup_delta") + + os.makedirs(basebackup_storage_path) + os.makedirs(basebackup_delta_path) + + self.pghoard.set_state_defaults(self.test_site) + assert self.pghoard.get_remote_basebackups_info(self.test_site) == [] + + def write_backup_files(what): + for bb, bb_data in what.items(): + wal_start, hexdigests = bb_data + if bb: + bb_path = os.path.join(basebackup_storage_path, bb) + date_parts = [int(part) for part in bb.replace("_", "-").split("-")] + start_time = datetime.datetime(*date_parts, tzinfo=datetime.timezone.utc) + + metadata = { + "manifest": { + "snapshot_result": { + "state": { + "files": [{ + "relative_path": h, + "hexdigest": h + } for h in hexdigests] + } + } + } + } + mtime = time.time() + blob = io.BytesIO(common.json_encode(metadata, binary=True)) + ti = tarfile.TarInfo(name=".pghoard_tar_metadata.json") + ti.size = len(blob.getbuffer()) + ti.mtime = mtime + + with open(bb_path, "wb") as fp: + with rohmufile.file_writer( + compression_algorithm="snappy", compression_level=0, fileobj=fp + ) as output_obj: + with tarfile.TarFile(fileobj=output_obj, mode="w") as tar: + tar.addfile(ti, blob) + input_size = output_obj.tell() + + for h in hexdigests: + with open(Path(basebackup_delta_path) / h, "w") as digest_file, \ + open((Path(basebackup_delta_path) / (h + ".metadata")), "w") as digest_meta_file: + json.dump({}, digest_file) + json.dump({}, digest_meta_file) + + with open(bb_path + ".metadata", "w") as fp: + json.dump({ + "start-wal-segment": wal_start, + "start-time": start_time.isoformat(), + "format": BaseBackupFormat.delta_v1, + "compression-algorithm": "snappy", + "original-file-size": input_size + }, fp) + + backups_and_delta = { + "2015-08-25_0": ( + "000000010000000A000000AA", [ + "214967296374cae6f099e19910b33a0893f0abc62f50601baa2875ab055cd27b", + "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" + ] + ), + "2015-08-25_1": [ + "000000020000000A000000AB", ["214967296374cae6f099e19910b33a0893f0abc62f50601baa2875ab055cd27b"] + ], + "2015-08-25_2": [ + "000000030000000A000000AC", ["214967296374cae6f099e19910b33a0893f0abc62f50601baa2875ab055cd27b"] + ], + "2015-08-25_3": [ + "000000040000000B00000003", + [ + "214967296374cae6f099e19910b33a0893f0abc62f50601baa2875ab055cd27b", + "4b65df4d0857bbbcb22aa086e02bd8414a9f3a484869f2b96ed7c62f3c4eb088" + ] + ], + } + write_backup_files(backups_and_delta) + basebackups = self.pghoard.get_remote_basebackups_info(self.test_site) + assert len(basebackups) == 4 + self.pghoard.refresh_backup_list_and_delete_old(self.test_site) + basebackups = self.pghoard.get_remote_basebackups_info(self.test_site) + assert len(basebackups) == 1 + + left_delta_files = [p for p in os.listdir(basebackup_delta_path) if not p.endswith(".metadata")] + assert sorted(left_delta_files) == [ + "214967296374cae6f099e19910b33a0893f0abc62f50601baa2875ab055cd27b", + "4b65df4d0857bbbcb22aa086e02bd8414a9f3a484869f2b96ed7c62f3c4eb088" + ] + + new_delta_data = { + "2015-08-25_4": ( + "000000040000000B00000004", [ + "fc61c91430dcb345001306ad513f103380c16896093a17868fc909aeda393559", + ] + ) + } + write_backup_files(new_delta_data) + basebackups = self.pghoard.get_remote_basebackups_info(self.test_site) + assert len(basebackups) == 2 + self.pghoard.refresh_backup_list_and_delete_old(self.test_site) + basebackups = self.pghoard.get_remote_basebackups_info(self.test_site) + assert len(basebackups) == 1 + + left_delta_files = [p for p in os.listdir(basebackup_delta_path) if not p.endswith(".metadata")] + assert sorted(left_delta_files) == [ + "fc61c91430dcb345001306ad513f103380c16896093a17868fc909aeda393559", + ] + def test_alert_files(self): alert_file_path = os.path.join(self.config["alert_file_dir"], "test_alert") create_alert_file(self.pghoard.config, "test_alert") @@ -443,14 +560,17 @@ def test_pause_on_disk_full(self, db, pghoard_separate_volume): os.makedirs(wal_directory, exist_ok=True) pghoard.receivexlog_listener(pghoard.test_site, db.user, wal_directory) - # Create 20 new WAL segments in very quick succession. Our volume for incoming WALs is only 100 + # Create 15 new WAL segments in very quick succession. Our volume for incoming WALs is only 100 # MiB so if logic for automatically suspending pg_receive(xlog|wal) wasn't working the volume # would certainly fill up and the files couldn't be processed. Now this should work fine. for _ in range(16): + # Note: do not combine two function call in one select, PG executes it differently and + # sometimes looks like it generates less WAL files than we wanted + cursor.execute("SELECT txid_current()") if conn.server_version >= 100000: - cursor.execute("SELECT txid_current(), pg_switch_wal()") + cursor.execute("SELECT pg_switch_wal()") else: - cursor.execute("SELECT txid_current(), pg_switch_xlog()") + cursor.execute("SELECT pg_switch_xlog()") start = time.monotonic() site = "test_pause_on_disk_full" diff --git a/test/test_restore.py b/test/test_restore.py index 6d02bf52..41a4e9e0 100644 --- a/test/test_restore.py +++ b/test/test_restore.py @@ -19,7 +19,9 @@ import pytest from pghoard.common import write_json_file -from pghoard.restore import (BasebackupFetcher, ChunkFetcher, Restore, RestoreError, create_recovery_conf) +from pghoard.restore import ( + BasebackupFetcher, ChunkFetcher, FileDataInfo, FileInfoType, FilePathInfo, Restore, RestoreError, create_recovery_conf +) from .base import PGHoardTestCase @@ -166,7 +168,11 @@ def test_progress_tracking_and_error_handling(self): status_output_file = os.path.join(test_output_file_tmp, "pghoard-restore-status.json") pgdata = "/tmp/test_restore" tablespaces = {"foo": {"oid": 1234, "path": "/tmp/test_restore2"}} - data_files = [("bar1", 1000), ("bar2", 2000), ((b"baz", {}), 0)] + data_files = [ + FilePathInfo(name="bar1", size=1000), + FilePathInfo(name="bar2", size=2000), + FileDataInfo(data=b"baz", metadata={}, size=0) + ] fetcher = BasebackupFetcher( app_config=config, data_files=data_files, @@ -201,17 +207,17 @@ def sleep_mock(sleep_time): assert fetcher.current_progress() == (0, 0) assert fetcher.jobs_in_progress() is True progress_dict["bar1"] = 1000 - fetcher.job_completed(fetcher.data_files[0]["id"]) + fetcher.job_completed(fetcher.data_files[0].id) elif call[0] == 1: assert fetcher.current_progress() == (1000, 1000 / 3000) assert fetcher.jobs_in_progress() is True progress_dict["bar2"] = 1000 - fetcher.job_failed(fetcher.data_files[1]["id"], Exception("test exception")) + fetcher.job_failed(fetcher.data_files[1].id, Exception("test exception")) check_status_output_file(expected_progress=1000 / 3000) elif call[0] == 2: assert fetcher.current_progress() == (2000, 2000 / 3000) assert fetcher.jobs_in_progress() is True - fetcher.job_completed(fetcher.data_files[2]["id"]) + fetcher.job_completed(fetcher.data_files[2].id) check_status_output_file(expected_progress=2000 / 3000) elif call[0] == 3: assert False @@ -314,6 +320,36 @@ def _fetch_and_extract_one_backup(self, metadata, file_size, fetch_fn): with pytest.raises(RestoreError): fetcher.fetch_all() + def test_restore_from_delta_files(self): + for tar in ["tar", "pghoard/gnutaremu.py"]: + self.run_restore_test( + "basebackup_delta", + tar, + self.delta, + files=[ + "0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b", + "4b65df4d0857bbbcb22aa086e02bd8414a9f3a484869f2b96ed7c62f3c4eb088", + "fc61c91430dcb345001306ad513f103380c16896093a17868fc909aeda393559", + ], + file_type=FileInfoType.delta, + ) + + def delta(self, fetcher, restore_dir): + fetcher.fetch_all() + + self.check_sha256( + os.path.join(restore_dir, "0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b"), + "24f3f08b786494bdd8d1393fdf47eafe3aa4b3f51720e23b62dae812e54f6cc7" + ) + self.check_sha256( + os.path.join(restore_dir, "4b65df4d0857bbbcb22aa086e02bd8414a9f3a484869f2b96ed7c62f3c4eb088"), + "960f11a4bb45060ac3c69551e4e99d9d713e98e2968f450b8abac37dcfed86e7" + ) + self.check_sha256( + os.path.join(restore_dir, "fc61c91430dcb345001306ad513f103380c16896093a17868fc909aeda393559"), + "8acdc937fca22a496215056ed3960bff6d3319b9c45f3050e8edfc09d7085c27" + ) + def test_tablespaces(self): def rm_tablespace_paths(): shutil.rmtree("/tmp/nsd5b2b8e4978847ef9b3056b7e01c51a8", ignore_errors=True) @@ -375,10 +411,20 @@ def tablespaces(self, fetcher, restore_dir): "84e3bda6f1abdd0fb0aff4bc6587ea07b9d8b61c1a0d6bdc4d16d339a761717f" ) - def run_restore_test(self, path, tar_executable, logic, tablespaces=None, files=None): + def run_restore_test(self, path, tar_executable, logic, tablespaces=None, files=None, file_type=FileInfoType.regular): chunk_dir = os.path.join("test", path, "chunks") - files = [fn for fn in os.listdir(chunk_dir) if ".metadata" not in fn and (not files or fn in files)] - files = [(fn, os.stat(os.path.join(chunk_dir, fn)).st_size) for fn in files] + files_names = [fn for fn in os.listdir(chunk_dir) if ".metadata" not in fn and (not files or fn in files)] + if file_type == FileInfoType.delta: + files = [ + FilePathInfo(name=fn, size=os.stat(os.path.join(chunk_dir, fn)).st_size, file_type=file_type, new_name=fn) + for fn in files_names + ] + else: + files = [ + FilePathInfo(name=fn, size=os.stat(os.path.join(chunk_dir, fn)).st_size, file_type=file_type) + for fn in files_names + ] + with open(os.path.join("test", path, "config.json"), "r") as f: config = json.loads(f.read()) restore_dir = mkdtemp(prefix=self.__class__.__name__) diff --git a/test/test_rohmu_delta.py b/test/test_rohmu_delta.py new file mode 100644 index 00000000..e10a7fc3 --- /dev/null +++ b/test/test_rohmu_delta.py @@ -0,0 +1,80 @@ +# Copyright (c) 2021 Aiven, Helsinki, Finland. https://aiven.io/ +import os + +import pytest + +from pghoard.rohmu.delta.common import Progress, SnapshotHash + + +@pytest.mark.timeout(2) +def test_snapshot(snapshotter): + with snapshotter.lock: + # Start with empty + assert snapshotter.snapshot(progress=Progress()) == 0 + src = snapshotter.src + dst = snapshotter.dst + assert not (dst / "foo").is_file() + + # Create files in src, run snapshot + snapshotter.create_4foobar() + ss2 = snapshotter.get_snapshot_state() + + assert (dst / "foo").is_file() + assert (dst / "foo").read_text() == "foobar" + assert (dst / "foo2").read_text() == "foobar" + + hashes = snapshotter.get_snapshot_hashes() + assert len(hashes) == 1 + assert hashes == [ + SnapshotHash(hexdigest="c6479bce75c9a573ba073af83191c280721170793da6e9e9480201de94ab0654", size=900) + ] + + while True: + (src / "foo").write_text("barfoo") # same length + if snapshotter.snapshot(progress=Progress()) > 0: + # Sometimes fails on first iteration(s) due to same mtime + # (inaccurate timestamps) + break + ss3 = snapshotter.get_snapshot_state() + assert ss2 != ss3 + assert snapshotter.snapshot(progress=Progress()) == 0 + assert (dst / "foo").is_file() + assert (dst / "foo").read_text() == "barfoo" + + # Remove file from src, run snapshot + for filename in ["foo", "foo2", "foobig", "foobig2"]: + (src / filename).unlink() + assert snapshotter.snapshot(progress=Progress()) > 0 + assert snapshotter.snapshot(progress=Progress()) == 0 + assert not (dst / filename).is_file() + + # Now shouldn't have any data hashes + hashes_empty = snapshotter.get_snapshot_hashes() + assert not hashes_empty + + with pytest.raises(AssertionError): + snapshotter.snapshot(progress=Progress()) + + with pytest.raises(AssertionError): + snapshotter.get_snapshot_state() + + with pytest.raises(AssertionError): + snapshotter.get_snapshot_hashes() + + +@pytest.mark.parametrize("test", [(os, "link", 1, 1), (None, "_snapshotfile_from_path", 3, 0)]) +def test_snapshot_error_filenotfound(snapshotter, mocker, test): + (obj, fun, exp_progress_1, exp_progress_2) = test + + def _not_really_found(*a, **kw): + raise FileNotFoundError + + obj = obj or snapshotter + mocker.patch.object(obj, fun, new=_not_really_found) + (snapshotter.src / "foo").write_text("foobar") + (snapshotter.src / "bar").write_text("foobar") + with snapshotter.lock: + progress = Progress() + assert snapshotter.snapshot(progress=progress) == exp_progress_1 + progress = Progress() + assert snapshotter.snapshot(progress=progress) == exp_progress_2