diff --git a/.pylintrc b/.pylintrc index deeb75a2..f90a82a0 100644 --- a/.pylintrc +++ b/.pylintrc @@ -28,7 +28,7 @@ disable= [FORMAT] max-line-length=125 -max-module-lines=1000 +max-module-lines=1100 [REPORTS] output-format=text diff --git a/README.rst b/README.rst index 00cfc82b..8394d1de 100644 --- a/README.rst +++ b/README.rst @@ -690,6 +690,16 @@ tablespaces. Note that the ``local-tar`` backup mode can not be used on replica servers prior to PostgreSQL 9.6 unless the pgespresso extension is installed. +When using ``delta`` mode, only changed files are uploaded into the storage. +On every backup snapshot of the data files is taken, this results in a manifest file, +describing the hashes of all the files needed to be backed up. +New hashes are uploaded to the storage and used together with complementary +manifest from control file for restoration. +In order to properly assess the efficiency of ``delta`` mode in comparison with +``local-tar``, one can use ``local-tar-delta-stats`` mode, which behaves the same as +``local-tar``, but also collects the metrics as if it was ``delta`` mode. It can help +in decision making of switching to ``delta`` mode. + ``basebackup_threads`` (default ``1``) How many threads to use for tar, compress and encrypt tasks. Only applies for diff --git a/pghoard/basebackup.py b/pghoard/basebackup.py index 0ac4084a..6f0db3bb 100644 --- a/pghoard/basebackup.py +++ b/pghoard/basebackup.py @@ -5,6 +5,7 @@ See LICENSE for details """ import datetime +import hashlib import io import logging import os @@ -18,7 +19,7 @@ from queue import Empty, Queue from tempfile import NamedTemporaryFile from threading import Thread -from typing import Optional +from typing import Dict, Optional import psycopg2 @@ -29,11 +30,12 @@ from . import common, version, wal from .basebackup_delta import DeltaBaseBackup from .common import ( - BackupFailure, BaseBackupFormat, BaseBackupMode, connection_string_using_pgpass, + BackupFailure, BaseBackupFormat, BaseBackupMode, connection_string_using_pgpass, extract_pghoard_bb_v2_metadata, replication_connection_string_and_slot_using_pgpass, set_stream_nonblocking, set_subprocess_stdout_and_stderr_nonblocking, terminate_subprocess ) from .patchedtarfile import tarfile +from .rohmu.delta.common import EMBEDDED_FILE_SIZE BASEBACKUP_NAME = "pghoard_base_backup" EMPTY_DIRS = [ @@ -82,6 +84,26 @@ def from_config(config) -> "CompressionData": return CompressionData(algorithm=algorithm, level=level) +class HashFile: + def __init__(self, *, path): + self._file = open(path, "rb") + self.hash = hashlib.blake2s() + + def __enter__(self): + return self + + def __exit__(self, t, v, tb): + self._file.close() + + def read(self, n=None): + data = self._file.read(n) + self.hash.update(data) + return data + + def __getattr__(self, attr): + return getattr(self._file, attr) + + class PGBaseBackup(Thread): def __init__( self, @@ -126,6 +148,8 @@ def run(self): self.run_local_tar_basebackup() elif basebackup_mode == BaseBackupMode.delta: self.run_local_tar_basebackup(delta=True) + elif basebackup_mode == BaseBackupMode.local_tar_delta_stats: + self.run_local_tar_basebackup(with_delta_stats=True) elif basebackup_mode == BaseBackupMode.pipe: self.run_piped_basebackup() else: @@ -409,7 +433,7 @@ def get_control_entries_for_tar(self, *, metadata, pg_control, backup_label): ti.mtime = mtime yield ti, None, False - def write_files_to_tar(self, *, files, tar): + def write_files_to_tar(self, *, files, tar, delta_stats=None): for archive_path, local_path, missing_ok in files: if not self.running: raise BackupFailure("thread termination requested") @@ -419,7 +443,18 @@ def write_files_to_tar(self, *, files, tar): continue try: - tar.add(local_path, arcname=archive_path, recursive=False) + if delta_stats is None: + tar.add(local_path, arcname=archive_path, recursive=False) + else: + if os.path.isdir(local_path): + tar.add(local_path, arcname=archive_path, recursive=False) + else: + with HashFile(path=local_path) as fileobj: + ti = tar.gettarinfo(name=local_path, arcname=archive_path) + tar.addfile(ti, fileobj=fileobj) + if ti.size > EMBEDDED_FILE_SIZE: + # Tiny files are not uploaded separately, they are embed into the manifest, so skip them + delta_stats[fileobj.hash.hexdigest()] = ti.size except (FileNotFoundError if missing_ok else NoException): self.log.warning("File %r went away while writing to tar, ignoring", local_path) @@ -508,7 +543,15 @@ def compression_data(self) -> CompressionData: return CompressionData.from_config(self.config) def tar_one_file( - self, *, temp_dir, chunk_path, files_to_backup, callback_queue, filetype="basebackup_chunk", extra_metadata=None + self, + *, + temp_dir, + chunk_path, + files_to_backup, + callback_queue, + filetype="basebackup_chunk", + extra_metadata=None, + delta_stats=None ): start_time = time.monotonic() @@ -522,7 +565,7 @@ def tar_one_file( fileobj=raw_output_obj ) as output_obj: with tarfile.TarFile(fileobj=output_obj, mode="w") as output_tar: - self.write_files_to_tar(files=files_to_backup, tar=output_tar) + self.write_files_to_tar(files=files_to_backup, tar=output_tar, delta_stats=delta_stats) input_size = output_obj.tell() @@ -585,13 +628,14 @@ def wait_for_chunk_transfer_to_complete(self, chunk_count, upload_results, chunk ) return False - def handle_single_chunk(self, *, chunk_callback_queue, chunk_path, chunks, index, temp_dir): + def handle_single_chunk(self, *, chunk_callback_queue, chunk_path, chunks, index, temp_dir, delta_stats=None): one_chunk_files = chunks[index] chunk_name, input_size, result_size = self.tar_one_file( callback_queue=chunk_callback_queue, chunk_path=chunk_path, temp_dir=temp_dir, files_to_backup=one_chunk_files, + delta_stats=delta_stats, ) self.log.info( "Queued backup chunk %r for transfer, chunks on disk (including partial): %r, current: %r, total chunks: %r", @@ -604,7 +648,9 @@ def handle_single_chunk(self, *, chunk_callback_queue, chunk_path, chunks, index "files": [chunk[0] for chunk in one_chunk_files] } - def create_and_upload_chunks(self, chunks, data_file_format, temp_base_dir): + def create_and_upload_chunks( + self, chunks, data_file_format, temp_base_dir, delta_stats: Optional[Dict[str, int]] = None + ): start_time = time.monotonic() chunk_files = [] upload_results = [] @@ -633,6 +679,7 @@ def create_and_upload_chunks(self, chunks, data_file_format, temp_base_dir): chunks=chunks, index=i, temp_dir=temp_base_dir, + delta_stats=delta_stats, ) pending_compress_and_encrypt_tasks.append(task) self.chunks_on_disk += 1 @@ -650,7 +697,31 @@ def create_and_upload_chunks(self, chunks, data_file_format, temp_base_dir): return chunk_files - def run_local_tar_basebackup(self, delta=False): + def fetch_all_data_files_hashes(self): + hashes: Dict[str, int] = {} + + for backup in self.get_remote_basebackups_info(self.site): + if backup["metadata"].get("format") != BaseBackupFormat.v2: + continue + + key = os.path.join(self.site_config["prefix"], "basebackup", backup["name"]) + bmeta_compressed = self.storage.get_contents_to_string(key)[0] + + with rohmufile.file_reader( + fileobj=io.BytesIO(bmeta_compressed), + metadata=backup["metadata"], + key_lookup=lambda key_id: self.site_config["encryption_keys"][key_id]["private"] + ) as input_obj: + meta = extract_pghoard_bb_v2_metadata(input_obj) + + if "delta_stats" not in meta: + continue + + hashes.update(meta["delta_stats"]["hashes"]) + + return hashes + + def run_local_tar_basebackup(self, delta=False, with_delta_stats=False): control_files_metadata_extra = {} pgdata = self.site_config["pg_data_directory"] if not os.path.isdir(pgdata): @@ -756,13 +827,53 @@ def run_local_tar_basebackup(self, delta=False): pgdata=pgdata, tablespaces=tablespaces, target_chunk_size=target_chunk_size ) chunks_count = len(chunks) + + delta_stats: Optional[Dict[str, int]] = None + if with_delta_stats: + delta_stats = {} + # Tar up the chunks and submit them for upload; note that we start from chunk 1 here; chunk 0 # is reserved for special files and metadata and will be generated last. - chunk_files = self.create_and_upload_chunks(chunks, data_file_format, temp_base_dir) + chunk_files = self.create_and_upload_chunks( + chunks, data_file_format, temp_base_dir, delta_stats=delta_stats + ) total_size_plain = sum(item["input_size"] for item in chunk_files) total_size_enc = sum(item["result_size"] for item in chunk_files) + if with_delta_stats: + control_files_metadata_extra["delta_stats"] = {"hashes": delta_stats} + + existing_hashes = self.fetch_all_data_files_hashes() + new_hashes = {k: delta_stats[k] for k in set(delta_stats).difference(set(existing_hashes))} + + planned_upload_size = sum(new_hashes.values()) + planned_upload_count = len(new_hashes) + + if existing_hashes: + # Send ratio metrics for every backup except for the first one + planned_total_size = sum(delta_stats.values()) + planned_total_count = len(delta_stats) + if planned_total_count: + self.metrics.gauge( + "pghoard.planned_delta_backup_changed_data_files_ratio", + planned_upload_count / planned_total_count + ) + if planned_total_size: + self.metrics.gauge( + "pghoard.planned_delta_backup_changed_data_size_ratio", + planned_upload_size / planned_total_size + ) + self.metrics.gauge( + "pghoard.planned_delta_backup_remained_data_size_raw", + planned_total_size - planned_upload_size, + ) + + self.metrics.increase("pghoard.planned_delta_backup_total_size", inc_value=planned_upload_size) + self.metrics.gauge("pghoard.planned_delta_backup_upload_size", planned_upload_size) + self.metrics.increase("pghoard.planned_delta_backup_total_files", inc_value=planned_upload_count) + self.metrics.gauge("pghoard.planned_delta_backup_upload_files", planned_upload_count) + control_files_metadata_extra["chunks"] = chunk_files # Everything is now tarred up, grab the latest pg_control and stop the backup process diff --git a/pghoard/common.py b/pghoard/common.py index 342168ad..feddcc42 100644 --- a/pghoard/common.py +++ b/pghoard/common.py @@ -42,6 +42,7 @@ class BaseBackupMode(StrEnum): basic = "basic" delta = "delta" local_tar = "local-tar" + local_tar_delta_stats = "local-tar-delta-stats" pipe = "pipe" diff --git a/requirements.dev.txt b/requirements.dev.txt index e912a96a..51f889c8 100644 --- a/requirements.dev.txt +++ b/requirements.dev.txt @@ -1,6 +1,6 @@ # Use pip for build requirements to harmonize between OS versions mock -pylint>=2.4.3 +pylint>=2.4.3,<=2.7.2 pylint-quotes pytest pytest-mock diff --git a/test/test_basebackup.py b/test/test_basebackup.py index 480a8896..312e0240 100644 --- a/test/test_basebackup.py +++ b/test/test_basebackup.py @@ -362,6 +362,9 @@ def test_basebackups_basic_lzma(self, capsys, db, pghoard_lzma, tmpdir): def test_basebackups_delta(self, capsys, db, pghoard, tmpdir): self._test_basebackups(capsys, db, pghoard, tmpdir, BaseBackupMode.delta) + def test_basebackups_local_tar_with_delta_stats(self, capsys, db, pghoard, tmpdir): + self._test_basebackups(capsys, db, pghoard, tmpdir, BaseBackupMode.local_tar_delta_stats) + def test_basebackups_local_tar_nonexclusive(self, capsys, db, pghoard, tmpdir): if db.pgver < "9.6": pytest.skip("PostgreSQL 9.6+ required for non-exclusive backups")