Skip to content

Commit

Permalink
fix(acountability): record file size of RTC data downloaded
Browse files Browse the repository at this point in the history
Refs #705
  • Loading branch information
chrisjrd committed Mar 6, 2024
1 parent dde40df commit b6fc692
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 4 deletions.
13 changes: 9 additions & 4 deletions data_subscriber/asf_rtc_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from requests.exceptions import HTTPError

from data_subscriber.download import DaacDownload
from data_subscriber.rtc.rtc_catalog import RTCProductCatalog
from data_subscriber.url import _to_urls, _to_https_urls, _rtc_url_to_chunk_id

logger = logging.getLogger(__name__)
Expand All @@ -27,7 +28,7 @@ class AsfDaacRtcDownload(DaacDownload):
def perform_download(
self,
session: requests.Session,
es_conn,
es_conn: RTCProductCatalog,
downloads: list[dict],
args,
token,
Expand All @@ -41,19 +42,23 @@ def perform_download(

product_to_product_filepaths_map = defaultdict(set)
num_downloads = len(downloads)
download_id_to_downloads_map = {download["id"]: download for download in downloads}
with concurrent.futures.ThreadPoolExecutor(max_workers=min(8, os.cpu_count() + 4)) as executor:
futures = [
executor.submit(self.perform_download_single, download, token, args, download_counter, num_downloads)
for download_counter, download in enumerate(downloads, start=1)
]
list_product_id_product_filepath = [future.result() for future in concurrent.futures.as_completed(futures)]
for product_id_product_filepath in list_product_id_product_filepath:
for product_id, product_filepath in product_id_product_filepath:
for product_id, product_filepath, download_id, filesize in product_id_product_filepath:
product_to_product_filepaths_map[product_id].add(product_filepath)
if not download_id_to_downloads_map[download_id].get("filesize"):
download_id_to_downloads_map[download_id]["filesize"] = 0
download_id_to_downloads_map[download_id]["filesize"] += filesize

for download in downloads:
logger.info(f"Marking as downloaded. {download['id']=}")
es_conn.mark_product_as_downloaded(download['id'], job_id)
es_conn.mark_product_as_downloaded(download['id'], job_id, download_id_to_downloads_map[download["id"]]["filesize"])

logger.info(f"downloaded {len(product_to_product_filepaths_map)} products")
return product_to_product_filepaths_map
Expand Down Expand Up @@ -91,7 +96,7 @@ def perform_download_single(self, download, token, args, download_counter, num_d
)
logger.info(f"{product_filepath=}")

list_product_id_product_filepath.append((product_id, product_filepath))
list_product_id_product_filepath.append((product_id, product_filepath, download["id"], os.path.getsize(product_filepath)))
return list_product_id_product_filepath

def download_asf_product(self, product_url, token: str, target_dirpath: Path):
Expand Down
22 changes: 22 additions & 0 deletions data_subscriber/rtc/rtc_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,28 @@ def filter_catalog_by_sets(self, mgrs_set_id_acquisition_ts_cycle_indexes):
logging.info(f"Found {len(es_docs)=}")
return self.filter_query_result(es_docs)

def mark_product_as_downloaded(self, url, job_id, filesize):
filename = url.split("/")[-1]

index = self._get_index_name_for(_id=filename, default=self.generate_es_index_name())
result = self.es.update_document(
id=filename,
body={
"doc_as_upsert": True,
"doc": {
"downloaded": True,
"download_datetime": datetime.now(),
"download_job_id": job_id,
"metadata": {
"FileSize": filesize
}
}
},
index=index
)

self.logger.info(f"Document updated: {result}")

def mark_products_as_download_job_submitted(self, batch_id_to_products_map: dict):
operations = []
mgrs = mgrs_bursts_collection_db_client.cached_load_mgrs_burst_db(filter_land=True)
Expand Down

0 comments on commit b6fc692

Please sign in to comment.