Skip to content

Commit

Permalink
Merge pull request #779 from nasa/issue-703-r3-calval
Browse files Browse the repository at this point in the history
add uat option to validator script and document in readme
  • Loading branch information
riverma committed May 2, 2024
2 parents cde8484 + 0e8cb23 commit a2f0999
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 10 deletions.
5 changes: 3 additions & 2 deletions report/dswx-s1-validator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,16 @@ This guide provides a quick way to get started with the script.
2. Optionally, use the `--file` argument to specify a file with granule IDs.
3. Optionally, use the `--threshold` argument to a threshold percentage to filter MGRS Tile Set coverages by.
4. Optionally, use the `--timestamp` argument to specify the type of timestamp to query CMR with. Example values: `TEMPORAL|PRODUCTION|REVISION|CREATED`. Default value is `TEMPORAL`. See [CMR documentation](https://cmr.earthdata.nasa.gov/search/site/docs/search/api.html) for details.
5. Optionally, use the `--verbose` argument to get detailed information like a list of matching bursts and granule IDs
5. Optionally, use the `--endpoint` argument to specify the CMR endpoint venue. Accepted values are `OPS|UAT`, with `OPS` set as the default value.
6. Optionally, use the `--verbose` argument to get detailed information like a list of matching bursts and granule IDs

### Usage Examples

* Retrieve a list of MGRS Tile Set IDs for the RTC burst processing a given time range on CMR, and filter the results to show only MGRS Tile Sets that had coverage of greater than or equal to 50%.

```
$ python dswx_s1_validator.py --start "2023-12-05T01:00:00Z" --end "2023-12-05T03:59:59Z" --db MGRS_tile_collection_v0.2.sqlite --threshold 50
Querying CMR for time range 2023-12-05T01:00:00Z to 2023-12-05T03:59:59Z.
Querying CMR for time range 2023-12-05T01:00:00 to 2023-12-05T03:59:59.
Querying CMR for 2316 granules.
Fetching granules: 100%|███████████████████████████████████████| 2316/2316 [00:01<00:00, 1660.76it/s]
Expand Down
98 changes: 90 additions & 8 deletions report/dswx-s1-validator/dswx_s1_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,16 @@ def parallel_fetch(url, params, page_num, page_size, downloaded_batches, total_b
params['page_size'] = page_size

try:
logging.debug(f"Fetching {url} with {params}")
batch_granules = fetch_with_backoff(url, params)
return batch_granules
logging.debug(f"Fetch success: {len(batch_granules)} batch granules downloaded.")
except Exception as e:
logging.error(f"Failed to fetch granules for page {page_num}: {e}")
batch_granules = []
finally:
with downloaded_batches.get_lock(): # Safely increment the count
downloaded_batches.value += 1
return batch_granules

def get_burst_id(granule_id):
"""
Expand All @@ -105,7 +110,7 @@ def get_burst_id(granule_id):
"""
burst_id = ''
if granule_id:
match = re.search(r'_T(\d+)-(\d+)-([A-Z]+\d+)_\d+T\d+Z_\d+T\d+Z_S1A_\d+_v\d+\.\d+', granule_id)
match = re.search(r'_T(\d+)-(\d+)-([A-Z]+\d+)_\d+T\d+Z_\d+T\d+Z_S1[AB]_\d+_v\d+\.\d+', granule_id)
if match:
t_number = match.group(1)
orbit_number = match.group(2)
Expand Down Expand Up @@ -236,13 +241,14 @@ def get_burst_ids_from_query(start, end, timestamp, db):
if __name__ == '__main__':
# Create an argument parser
parser = argparse.ArgumentParser(description="CMR Query with Temporal Range and SQLite DB Access")
parser.add_argument("--timestamp", metavar="TEMPORAL|REVISION|PRODUCTION|CREATED", default="temporal", required=False, help="Use temporal, revision, or production time in start / end time granule query to CMR. Ex. --timestamp revision. Default is 'TEMPORAL'")
parser.add_argument("--timestamp", required=False, default='TEMPORAL', metavar="TEMPORAL|REVISION|PRODUCTION|CREATED", help="Use temporal, revision, or production time in start / end time granule query to CMR. Ex. --timestamp revision")
parser.add_argument("--start", required=False, help="Temporal start time (ISO 8601 format)")
parser.add_argument("--end", required=False, help="Temporal end time (ISO 8601 format)")
parser.add_argument("--db", required=True, help="Path to the SQLite database file")
parser.add_argument("--file", required=False, help="Optional file path containing granule IDs")
parser.add_argument("--threshold", required=False, help="Completion threshold minimum to filter results by (percentage format - leave out the % sign)")
parser.add_argument("--verbose", action='store_true', help="Verbose and detailed output")
parser.add_argument("--endpoint", required=False, choices=['UAT', 'OPS'], default='OPS', help='CMR endpoint venue')

# Parse the command-line arguments
args = parser.parse_args()
Expand All @@ -254,11 +260,87 @@ def get_burst_ids_from_query(start, end, timestamp, db):
burst_ids = get_burst_ids_from_file(filename=args.file)

else:
burst_ids = get_burst_ids_from_query(start = args.start,
end = args.end,
timestamp = args.timestamp,
db = args.db,
)
# Ensure start and end times are provided
if not args.start or not args.end:
raise ValueError("Start and end times are required if no file input is provided.")

# Base URL for granule searches
base_url = "https://cmr.earthdata.nasa.gov/search/granules.umm_json"
if args.endpoint == 'UAT':
base_url = "https://cmr.uat.earthdata.nasa.gov/search/granules.umm_json"
params = {
'provider': 'ASF',
'ShortName[]': 'OPERA_L2_RTC-S1_V1'
}

# Set CMR param to ignore granule searches prior to a certain date
start_datetime = datetime.fromisoformat(args.start)
temporal_start_datetime = start_datetime - timedelta(days=30) # 30 days by default design - check with PCM team
params['temporal'] = f"{temporal_start_datetime.isoformat()}"

# Set time query type for CMR
if args.timestamp.lower() == "production":
params['production_date'] = f"{args.start},{args.end}"
elif args.timestamp.lower() == "revision":
params['revision_date'] = f"{args.start},{args.end}"
elif args.timestamp.lower() == "created":
params['created_at'] = f"{args.start},{args.end}"
else: # default time query type if not provided or set to temporal
params['temporal'] = f"{args.start},{args.end}"

# Construct the URL for the total granules query
total_granules = get_total_granules(base_url, params)
print(f"Total granules: {total_granules}")
print(f"Querying CMR for time range {args.start} to {args.end}.")

# Exit with error code if no granules to process
if (total_granules == 0):
print(f"Error: no granules to process.")
sys.exit(1)

# Optimize page_size and number of workers based on total_granules
page_size = min(1000, total_granules)
num_workers = min(5, (total_granules + page_size - 1) // page_size)

# Initialize progress bar
tqdm.tqdm._instances.clear() # Clear any existing tqdm instances
print()

# Main loop to fetch granules, update progress bar, and extract burst_ids
with tqdm.tqdm(total=total_granules, desc="Fetching granules", position=0) as pbar_global:
downloaded_batches = multiprocessing.Value('i', 0) # For counting downloaded batches
total_batches = (total_granules + page_size - 1) // page_size

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# futures = [executor.submit(parallel_fetch, base_url, params, page_num, page_size, downloaded_batches, total_batches) for page_num in range(1, total_batches + 1)]
futures = []
for page_num in range(1, total_batches + 1):
future = executor.submit(parallel_fetch, base_url, params, page_num, page_size, downloaded_batches, total_batches)
futures.append(future)
random_delay = random.uniform(0, 0.1)
time.sleep(random_delay) # Stagger the submission of function calls for CMR optimization
logging.debug(f"Scheduled granule fetch for batch {page_num}")

for future in concurrent.futures.as_completed(futures):
granules = future.result()
pbar_global.update(len(granules))

# RegEx for extracting burst IDs from granule IDs
pattern = r'_T(\d+)-(\d+)-([A-Z]+\d+)_\d+T\d+Z_\d+T\d+Z_S1A_\d+_v\d+\.\d+'
for granule in granules:
granule_id = granule.get("umm").get("GranuleUR")
burst_id = get_burst_id(granule_id)
if (burst_id):
burst_ids[burst_id] = granule_id
else:
print(f"\nWarning: Could not extract burst ID from malformed granule ID {granule_id}.")
print("\nGranule fetching complete.")

# Integrity check for total granules
total_downloaded = sum(len(future.result()) for future in futures)
if total_downloaded != total_granules:
print(f"\nError: Expected {total_granules} granules, but downloaded {total_downloaded}. Try running again after some delay.")
sys.exit(1)

# Connect to the MGRS Tile Set SQLITE database
conn = sqlite3.connect(args.db)
Expand Down

0 comments on commit a2f0999

Please sign in to comment.