Skip to content

Commit

Permalink
feat(dswx-s1): additional logging
Browse files Browse the repository at this point in the history
Refs #787
  • Loading branch information
chrisjrd committed Apr 9, 2024
1 parent 31689f9 commit b9cd904
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 9 deletions.
14 changes: 8 additions & 6 deletions data_subscriber/cmr.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ async def async_query_cmr(args, token, cmr, settings, timerange, now: datetime,
logger.info(f"Querying CMR. {request_url=} {params=}")

product_granules = await _async_request_search_cmr_granules(args, request_url, [params])
logger.info(f"Found {len(product_granules)} granules")
search_results_count = len(product_granules)
logger.info(f"Found {search_results_count} granules")
logger.info([(granule["granule_id"], granule["revision_id"]) for granule in product_granules])

# Filter out granules with revision-id greater than max allowed
least_revised_granules = []
Expand All @@ -175,16 +177,16 @@ async def async_query_cmr(args, token, cmr, settings, timerange, now: datetime,
)

product_granules = least_revised_granules
logger.info(f"Filtered to {len(product_granules)} granules after least "
f"revision check")
if len(product_granules) != search_results_count:
logger.info(f"Filtered to {len(product_granules)} granules after least revision check")

if args.collection in settings["SHORTNAME_FILTERS"]:
product_granules = [granule for granule in product_granules
if _match_identifier(settings, args, granule)]

if not silent:
logger.info(f"Filtered to {len(product_granules)} total granules "
f"after shortname filter check")
if len(product_granules) != search_results_count:
logger.info(f"Filtered to {len(product_granules)} total granules after shortname filter check")
logger.info([(granule["granule_id"], granule["revision_id"]) for granule in product_granules])

for granule in product_granules:
granule["filtered_urls"] = _filter_granules(granule, args)
Expand Down
1 change: 1 addition & 0 deletions data_subscriber/daac_data_subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ async def run_rtc_download(args, token, es_conn, netloc, username, password, job
logger.info(f"{args.dry_run=}. Skipping job submission. Producing mock job ID")
results = [uuid.uuid4()]
else:
logger.info(f"Submitting batches for DSWx-S1 job: {list(uploaded_batch_id_to_s3paths_map)}")
job_submission_tasks = submit_dswx_s1_job_submissions_tasks(uploaded_batch_id_to_s3paths_map, args_for_job_submitter, settings)
results = await asyncio.gather(*job_submission_tasks, return_exceptions=True)

Expand Down
13 changes: 10 additions & 3 deletions data_subscriber/rtc/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def main(
*args,
**kwargs
):
logger.info(f"{coverage_target=}")
# query GRQ catalog
grq_es = es_conn_util.get_es_connection(logger)

Expand Down Expand Up @@ -74,12 +75,16 @@ def main(
rtc_product_ids = product_id_to_product_files_map.keys()

coverage_result_set_id_to_product_sets_map = evaluate_rtc_products(rtc_product_ids, coverage_target)
coverage_results_short = {coverage_group: list(id_to_sets) for coverage_group, id_to_sets in coverage_result_set_id_to_product_sets_map.items()}
logger.info(f"{coverage_results_short=}")

logger.info("Converting coverage results to evaluator results")
mgrs = mbc_client.cached_load_mgrs_burst_db(filter_land=True)
for coverage_group, id_to_sets in coverage_result_set_id_to_product_sets_map.items():
if coverage_group == -1:
logger.info("Skipping results that don't meet the target coverage")
logger.info(f"Skipping results that don't meet the target coverage ({coverage_target=}), {coverage_group=}: {list(id_to_sets)}")
continue
logger.info(f"Results that meet the target coverage ({coverage_target=}), {coverage_group=}: {list(id_to_sets)}")
mgrs_set_id_to_product_sets_docs_map = join_product_file_docs(id_to_sets, product_id_to_product_files_map)
for mgrs_set_id, product_sets_docs in mgrs_set_id_to_product_sets_docs_map.items():
for product_set_docs in product_sets_docs:
Expand All @@ -91,6 +96,7 @@ def main(
"coverage_group": coverage_group,
"product_set": product_set_docs
})

# not native-id flow, grace period does not apply
# if 100% coverage target set, grace period does not apply and sets have been handled already above
if mgrs_set_id_acquisition_ts_cycle_indexes or coverage_target == 100:
Expand All @@ -115,13 +121,14 @@ def main(
for product_doc in chain.from_iterable(rtc_granule_id_to_product_docs_map.values())
}
max_retrieval_dt = max(*retrieval_dts) if len(retrieval_dts) > 1 else first(retrieval_dts)
grace_period_minutes_remaining = timedelta(minutes=required_min_age_minutes_for_partial_burstsets) - (datetime.now() - max_retrieval_dt)
if datetime.now() - max_retrieval_dt < timedelta(minutes=required_min_age_minutes_for_partial_burstsets):
# burst set meets target, but not old enough. continue to ignore
logger.info(f"Target covered burst still within grace period. Will not process at this time. {mgrs_set_id=}, {i=}")
logger.info(f"Target covered burst still within grace period ({grace_period_minutes_remaining=}). Will not process at this time. {mgrs_set_id=}, {i=}")
product_burstset_index_to_skip_processing.add(i)
else:
# burst set meets target, and old enough. process
logger.info(f"Target covered burst set aged out of grace period. Will process at this time. {mgrs_set_id=}, {i=}")
logger.info(f"Target covered burst set aged out of grace period ({grace_period_minutes_remaining=}). Will process at this time. {mgrs_set_id=}, {i=}")
pass
for i in sorted(product_burstset_index_to_skip_processing, reverse=True):
logger.info(f"Removing target covered burst still within grace period. {mgrs_set_id=}, {i=}")
Expand Down
1 change: 1 addition & 0 deletions data_subscriber/rtc/rtc_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ async def run_query(self, args, token, es_conn: RTCProductCatalog, cmr, job_id,
return

results = []
logger.info(f"Submitting batches for RTC download job: {list(batch_id_to_products_map)}")
for batch_id, products_map in batch_id_to_products_map.items():
job_submission_tasks = submit_rtc_download_job_submissions_tasks({batch_id: products_map}, args, settings)
results_batch = await asyncio.gather(*job_submission_tasks, return_exceptions=True)
Expand Down

0 comments on commit b9cd904

Please sign in to comment.