Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add uat option to validator script and document in readme #779

Merged
merged 4 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions report/dswx-s1-validator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,16 @@ This guide provides a quick way to get started with the script.
2. Optionally, use the `--file` argument to specify a file with granule IDs.
3. Optionally, use the `--threshold` argument to a threshold percentage to filter MGRS Tile Set coverages by.
4. Optionally, use the `--timestamp` argument to specify the type of timestamp to query CMR with. Example values: `TEMPORAL|PRODUCTION|REVISION|CREATED`. Default value is `TEMPORAL`. See [CMR documentation](https://cmr.earthdata.nasa.gov/search/site/docs/search/api.html) for details.
5. Optionally, use the `--verbose` argument to get detailed information like a list of matching bursts and granule IDs
5. Optionally, use the `--endpoint` argument to specify the CMR endpoint venue. Accepted values are `OPS|UAT`, with `OPS` set as the default value.
6. Optionally, use the `--verbose` argument to get detailed information like a list of matching bursts and granule IDs

### Usage Examples

* Retrieve a list of MGRS Tile Set IDs for the RTC burst processing a given time range on CMR, and filter the results to show only MGRS Tile Sets that had coverage of greater than or equal to 50%.

```
$ python dswx_s1_validator.py --start "2023-12-05T01:00:00Z" --end "2023-12-05T03:59:59Z" --db MGRS_tile_collection_v0.2.sqlite --threshold 50
Querying CMR for time range 2023-12-05T01:00:00Z to 2023-12-05T03:59:59Z.
Querying CMR for time range 2023-12-05T01:00:00 to 2023-12-05T03:59:59.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should keep the Z as that denotes times are expected to be in UTC.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get a ValueError, when including Z: ValueError: Invalid isoformat string: '2023-12-05T01:00:00Z'

Querying CMR for 2316 granules.

Fetching granules: 100%|███████████████████████████████████████| 2316/2316 [00:01<00:00, 1660.76it/s]
Expand Down
98 changes: 90 additions & 8 deletions report/dswx-s1-validator/dswx_s1_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,16 @@ def parallel_fetch(url, params, page_num, page_size, downloaded_batches, total_b
params['page_size'] = page_size

try:
logging.debug(f"Fetching {url} with {params}")
batch_granules = fetch_with_backoff(url, params)
return batch_granules
logging.debug(f"Fetch success: {len(batch_granules)} batch granules downloaded.")
except Exception as e:
logging.error(f"Failed to fetch granules for page {page_num}: {e}")
batch_granules = []
finally:
with downloaded_batches.get_lock(): # Safely increment the count
downloaded_batches.value += 1
return batch_granules

def get_burst_id(granule_id):
"""
Expand All @@ -105,7 +110,7 @@ def get_burst_id(granule_id):
"""
burst_id = ''
if granule_id:
match = re.search(r'_T(\d+)-(\d+)-([A-Z]+\d+)_\d+T\d+Z_\d+T\d+Z_S1A_\d+_v\d+\.\d+', granule_id)
match = re.search(r'_T(\d+)-(\d+)-([A-Z]+\d+)_\d+T\d+Z_\d+T\d+Z_S1[AB]_\d+_v\d+\.\d+', granule_id)
if match:
t_number = match.group(1)
orbit_number = match.group(2)
Expand Down Expand Up @@ -236,13 +241,14 @@ def get_burst_ids_from_query(start, end, timestamp, db):
if __name__ == '__main__':
# Create an argument parser
parser = argparse.ArgumentParser(description="CMR Query with Temporal Range and SQLite DB Access")
parser.add_argument("--timestamp", metavar="TEMPORAL|REVISION|PRODUCTION|CREATED", default="temporal", required=False, help="Use temporal, revision, or production time in start / end time granule query to CMR. Ex. --timestamp revision. Default is 'TEMPORAL'")
parser.add_argument("--timestamp", required=False, default='TEMPORAL', metavar="TEMPORAL|REVISION|PRODUCTION|CREATED", help="Use temporal, revision, or production time in start / end time granule query to CMR. Ex. --timestamp revision")
parser.add_argument("--start", required=False, help="Temporal start time (ISO 8601 format)")
parser.add_argument("--end", required=False, help="Temporal end time (ISO 8601 format)")
parser.add_argument("--db", required=True, help="Path to the SQLite database file")
parser.add_argument("--file", required=False, help="Optional file path containing granule IDs")
parser.add_argument("--threshold", required=False, help="Completion threshold minimum to filter results by (percentage format - leave out the % sign)")
parser.add_argument("--verbose", action='store_true', help="Verbose and detailed output")
parser.add_argument("--endpoint", required=False, choices=['UAT', 'OPS'], default='OPS', help='CMR endpoint venue')
riverma marked this conversation as resolved.
Show resolved Hide resolved

# Parse the command-line arguments
args = parser.parse_args()
Expand All @@ -254,11 +260,87 @@ def get_burst_ids_from_query(start, end, timestamp, db):
burst_ids = get_burst_ids_from_file(filename=args.file)

else:
burst_ids = get_burst_ids_from_query(start = args.start,
end = args.end,
timestamp = args.timestamp,
db = args.db,
)
# Ensure start and end times are provided
if not args.start or not args.end:
raise ValueError("Start and end times are required if no file input is provided.")

# Base URL for granule searches
base_url = "https://cmr.earthdata.nasa.gov/search/granules.umm_json"
if args.endpoint == 'UAT':
riverma marked this conversation as resolved.
Show resolved Hide resolved
base_url = "https://cmr.uat.earthdata.nasa.gov/search/granules.umm_json"
params = {
'provider': 'ASF',
'ShortName[]': 'OPERA_L2_RTC-S1_V1'
}

# Set CMR param to ignore granule searches prior to a certain date
start_datetime = datetime.fromisoformat(args.start)
temporal_start_datetime = start_datetime - timedelta(days=30) # 30 days by default design - check with PCM team
params['temporal'] = f"{temporal_start_datetime.isoformat()}"

# Set time query type for CMR
if args.timestamp.lower() == "production":
params['production_date'] = f"{args.start},{args.end}"
elif args.timestamp.lower() == "revision":
params['revision_date'] = f"{args.start},{args.end}"
elif args.timestamp.lower() == "created":
params['created_at'] = f"{args.start},{args.end}"
else: # default time query type if not provided or set to temporal
params['temporal'] = f"{args.start},{args.end}"

# Construct the URL for the total granules query
total_granules = get_total_granules(base_url, params)
print(f"Total granules: {total_granules}")
print(f"Querying CMR for time range {args.start} to {args.end}.")

# Exit with error code if no granules to process
if (total_granules == 0):
print(f"Error: no granules to process.")
sys.exit(1)

# Optimize page_size and number of workers based on total_granules
page_size = min(1000, total_granules)
num_workers = min(5, (total_granules + page_size - 1) // page_size)

# Initialize progress bar
tqdm.tqdm._instances.clear() # Clear any existing tqdm instances
print()

# Main loop to fetch granules, update progress bar, and extract burst_ids
with tqdm.tqdm(total=total_granules, desc="Fetching granules", position=0) as pbar_global:
downloaded_batches = multiprocessing.Value('i', 0) # For counting downloaded batches
total_batches = (total_granules + page_size - 1) // page_size

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# futures = [executor.submit(parallel_fetch, base_url, params, page_num, page_size, downloaded_batches, total_batches) for page_num in range(1, total_batches + 1)]
futures = []
for page_num in range(1, total_batches + 1):
future = executor.submit(parallel_fetch, base_url, params, page_num, page_size, downloaded_batches, total_batches)
futures.append(future)
random_delay = random.uniform(0, 0.1)
time.sleep(random_delay) # Stagger the submission of function calls for CMR optimization
logging.debug(f"Scheduled granule fetch for batch {page_num}")

for future in concurrent.futures.as_completed(futures):
granules = future.result()
pbar_global.update(len(granules))

# RegEx for extracting burst IDs from granule IDs
pattern = r'_T(\d+)-(\d+)-([A-Z]+\d+)_\d+T\d+Z_\d+T\d+Z_S1A_\d+_v\d+\.\d+'
for granule in granules:
granule_id = granule.get("umm").get("GranuleUR")
burst_id = get_burst_id(granule_id)
if (burst_id):
burst_ids[burst_id] = granule_id
else:
print(f"\nWarning: Could not extract burst ID from malformed granule ID {granule_id}.")
print("\nGranule fetching complete.")

# Integrity check for total granules
total_downloaded = sum(len(future.result()) for future in futures)
if total_downloaded != total_granules:
print(f"\nError: Expected {total_granules} granules, but downloaded {total_downloaded}. Try running again after some delay.")
sys.exit(1)

# Connect to the MGRS Tile Set SQLITE database
conn = sqlite3.connect(args.db)
Expand Down