diff --git a/pghoard/pghoard.py b/pghoard/pghoard.py index 3e5b9c75..13c439d7 100644 --- a/pghoard/pghoard.py +++ b/pghoard/pghoard.py @@ -601,11 +601,13 @@ def startup_walk_for_missed_files(self): with open(metadata_path, "r") as fp: metadata = json.load(fp) + file_type = FileType.Wal if is_xlog else FileType.Timeline + transfer_event = UploadEvent( - file_type=FileType.Wal, + file_type=file_type, backup_site_name=site, file_size=os.path.getsize(full_path), - file_path=FileTypePrefixes[FileType.Wal] / filename, + file_path=FileTypePrefixes[file_type] / filename, source_data=Path(full_path), callback_queue=None, metadata=metadata diff --git a/test/test_pghoard.py b/test/test_pghoard.py index 8cc20bc5..3bd5a101 100644 --- a/test/test_pghoard.py +++ b/test/test_pghoard.py @@ -529,27 +529,34 @@ def test_startup_walk_for_missed_uncompressed_files(self): assert self.pghoard.compression_queue.qsize() == 2 assert self.pghoard.transfer_queue.qsize() == 0 - def test_startup_walk_for_missed_uncompressed_files_timeline(self): + @pytest.mark.parametrize( + "file_type, file_name", [(FileType.Wal, "000000010000000000000004"), (FileType.Timeline, "00000002.history")] + ) + def test_startup_walk_for_missed_uncompressed_file_type(self, file_type: FileType, file_name: str): compressed_wal_path, _ = self.pghoard.create_backup_site_paths(self.test_site) uncompressed_wal_path = compressed_wal_path + "_incoming" - with open(os.path.join(uncompressed_wal_path, "00000002.history"), "wb") as fp: + with open(os.path.join(uncompressed_wal_path, file_name), "wb") as fp: fp.write(b"foo") self.pghoard.startup_walk_for_missed_files() assert self.pghoard.compression_queue.qsize() == 1 assert self.pghoard.transfer_queue.qsize() == 0 compress_event = self.pghoard.compression_queue.get(timeout=1.0) - assert compress_event.file_type == FileType.Timeline + assert compress_event.file_type == file_type - def test_startup_walk_for_missed_uncompressed_files_wal(self): + @pytest.mark.parametrize( + "file_type, file_name", [(FileType.Wal, "000000010000000000000005"), (FileType.Timeline, "00000003.history")] + ) + def test_startup_walk_for_missed_compressed_file_type(self, file_type: FileType, file_name: str): compressed_wal_path, _ = self.pghoard.create_backup_site_paths(self.test_site) - uncompressed_wal_path = compressed_wal_path + "_incoming" - with open(os.path.join(uncompressed_wal_path, "000000010000000000000004"), "wb") as fp: + with open(os.path.join(compressed_wal_path, file_name), "wb") as fp: fp.write(b"foo") + with open(os.path.join(compressed_wal_path, f"{file_name}.metadata"), "wb") as fp: + fp.write(b"{}") self.pghoard.startup_walk_for_missed_files() - assert self.pghoard.compression_queue.qsize() == 1 - assert self.pghoard.transfer_queue.qsize() == 0 - compress_event = self.pghoard.compression_queue.get(timeout=1.0) - assert compress_event.file_type == FileType.Wal + assert self.pghoard.compression_queue.qsize() == 0 + assert self.pghoard.transfer_queue.qsize() == 1 + upload_event = self.pghoard.transfer_queue.get(timeout=1.0) + assert upload_event.file_type == file_type class TestPGHoardWithPG: @@ -597,8 +604,6 @@ def test_pause_on_disk_full(self, db, pghoard_separate_volume, caplog): # MiB so if logic for automatically suspending pg_receive(xlog|wal) wasn't working the volume # would certainly fill up and the files couldn't be processed. Now this should work fine. for _ in range(16): - # Note: do not combine two function call in one select, PG executes it differently and - # sometimes looks like it generates less WAL files than we wanted switch_wal(conn) conn.close() @@ -625,6 +630,10 @@ def test_surviving_pg_receivewal_hickup(self, db, pghoard): if pghoard.receivexlogs[pghoard.test_site].is_alive(): pghoard.receivexlogs[pghoard.test_site].join() del pghoard.receivexlogs[pghoard.test_site] + # stopping the thread is not enough, it's possible that killed receiver will leave incomplete partial files + # around, pghoard is capable of cleaning those up but needs to be restarted, for the test it should be OK + # just to call startup_walk_for_missed_files, so it takes care of cleaning up + pghoard.startup_walk_for_missed_files() n_xlogs = pghoard.transfer_agent_state[pghoard.test_site]["upload"]["xlog"]["xlogs_since_basebackup"] diff --git a/test/util.py b/test/util.py index 39939647..7c49e3a9 100644 --- a/test/util.py +++ b/test/util.py @@ -24,9 +24,16 @@ def switch_wal(connection): cur = connection.cursor() # Force allocating a XID, otherwise if there was no activity we will # stay on the same WAL + # Note: do not combine two function call in one select, PG executes it differently and + # sometimes looks like it generates less WAL files than we wanted cur.execute("SELECT txid_current()") if connection.server_version >= 100000: cur.execute("SELECT pg_switch_wal()") else: cur.execute("SELECT pg_switch_xlog()") + # This should fix flaky tests, which expect a specific number of WAL files which never arrive. + # Quite often the last WAL would not be finalized by walreceiver unless there is some extra activity after + # switching, the bug should be fixed in PG 15 + # https://github.com/postgres/postgres/commit/596ba75cb11173a528c6b6ec0142a282e42b69ec + cur.execute("SELECT txid_current()") cur.close()