diff --git a/pghoard/fetcher.py b/pghoard/fetcher.py index e1fea876..6a08dc48 100644 --- a/pghoard/fetcher.py +++ b/pghoard/fetcher.py @@ -36,7 +36,7 @@ def fetch_file(self, site, key, target_path): self.last_activity = time.monotonic() self._start_process() if self.mp_manager: - self.task_queue.put((site, key, target_path)) + self.task_queue.put((self.config, site, key, target_path)) result = self.result_queue.get() if result is None: # Should only happen if the process is terminated while we're waiting for @@ -70,9 +70,7 @@ def _start_process(self): return self.result_queue = self.mp_manager.Queue() self.task_queue = self.mp_manager.Queue() - self.process = multiprocessing.Process( - target=_remote_file_fetch_loop, args=(self.config, self.task_queue, self.result_queue) - ) + self.process = multiprocessing.Process(target=_remote_file_fetch_loop, args=(self.task_queue, self.result_queue)) self.process.start() @@ -101,18 +99,26 @@ def fetch(self, site, key, target_path): raise -def _remote_file_fetch_loop(app_config, task_queue, result_queue): +def _remote_file_fetch_loop(task_queue, result_queue): transfers = {} + obj_storage_configs = {} while True: task = task_queue.get() if not task: return try: - site, key, target_path = task + app_config, site, key, target_path = task + obj_storage_config = get_object_storage_config(app_config, site) transfer = transfers.get(site) - if not transfer: - transfer = get_transfer(get_object_storage_config(app_config, site)) + + # even if we got a transfer for the site + # we should check if there was a change on the site's storage config, in such case + # we must get the correct transfer + if not transfer or obj_storage_configs.get(site, {}) != obj_storage_config: + transfer = get_transfer(obj_storage_config) transfers[site] = transfer + obj_storage_configs[site] = obj_storage_config + file_size, metadata = FileFetcher(app_config, transfer).fetch(site, key, target_path) result_queue.put((task, file_size, metadata)) except Exception as e: # pylint: disable=broad-except