-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Apply correct schema for all upgrades #271
Merged
Merged
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
5e9f063
Correct schema for upgrades
rajeee f3925f2
Flake8 fix
rajeee 11396d9
Change the case of schema columns to match df case
rajeee 8382b52
Merge branch 'develop' into postprocessing_fix
rajeee 6efe48c
pyarrow schema reading fix
rajeee 8404da3
Style fix
rajeee 6659280
Update change log
rajeee File filter
Filter by extension
Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -154,8 +154,8 @@ def read_simulation_outputs(fs, reporting_measures, sim_dir, upgrade_id, buildin | |||
return dpout | ||||
|
||||
|
||||
def write_dataframe_as_parquet(df, fs, filename): | ||||
tbl = pa.Table.from_pandas(df, preserve_index=False) | ||||
def write_dataframe_as_parquet(df, fs, filename, schema=None): | ||||
tbl = pa.Table.from_pandas(df, schema=schema, preserve_index=False) | ||||
with fs.open(filename, 'wb') as f: | ||||
parquet.write_table(tbl, f) | ||||
|
||||
|
@@ -220,17 +220,37 @@ def get_cols(fs, filename): | |||
return set(schema.names) | ||||
|
||||
|
||||
def read_results_json(fs, filename): | ||||
def read_results_json(fs, filename, all_cols=None): | ||||
with fs.open(filename, 'rb') as f1: | ||||
with gzip.open(f1, 'rt', encoding='utf-8') as f2: | ||||
dpouts = json.load(f2) | ||||
df = pd.DataFrame(dpouts) | ||||
df['job_id'] = int(re.search(r'results_job(\d+)\.json\.gz', filename).group(1)) | ||||
if all_cols is not None: | ||||
for missing_col in set(all_cols).difference(df.columns.values): | ||||
df[missing_col] = None | ||||
# Sorting is needed to ensure all dfs have same column order. Dask will fail otherwise. | ||||
df = df.reindex(sorted(df.columns), axis=1) | ||||
return df | ||||
|
||||
|
||||
def get_schema_dict(fs, filename): | ||||
df = read_results_json(fs, filename) | ||||
df = df.replace('', np.nan) # required to make pa correctly infer the dtypes | ||||
sch = pa.Schema.from_pandas(df) | ||||
sch_dict = {name: type for name, type in zip(sch.names, sch.types)} | ||||
return sch_dict | ||||
|
||||
|
||||
def merge_schema_dicts(dict1, dict2): | ||||
new_dict = dict(dict1) | ||||
for col, dtype2 in dict2.items(): | ||||
dtype1 = new_dict.get(col) | ||||
if col not in new_dict or dtype1 == pa.null(): | ||||
new_dict[col] = dtype2 | ||||
return new_dict | ||||
|
||||
|
||||
def read_enduse_timeseries_parquet(fs, filename, all_cols): | ||||
with fs.open(filename, 'rb') as f: | ||||
df = pd.read_parquet(f, engine='pyarrow') | ||||
|
@@ -249,6 +269,29 @@ def read_and_concat_enduse_timeseries_parquet(fs, all_cols, output_dir, filename | |||
del grouped_df | ||||
|
||||
|
||||
def get_null_cols(df): | ||||
sch = pa.Schema.from_pandas(df) | ||||
null_cols = [] | ||||
for col, dtype in zip(sch.names, sch.types): | ||||
if dtype == pa.null(): | ||||
null_cols.append(col) | ||||
return null_cols | ||||
|
||||
|
||||
def correct_schema(cur_schema_dict, df): | ||||
sch = pa.Schema.from_pandas(df) | ||||
sch_dict = {name: type for name, type in zip(sch.names, sch.types)} | ||||
unresolved = [] | ||||
for col, dtype in sch_dict.items(): | ||||
if dtype == pa.null(): | ||||
if col in cur_schema_dict: | ||||
indx = sch.get_field_index(col) | ||||
sch = sch.set(indx, pa.field(col, cur_schema_dict.get(col))) | ||||
else: | ||||
unresolved.append(col) | ||||
return sch, unresolved | ||||
|
||||
|
||||
def combine_results(fs, results_dir, cfg, do_timeseries=True): | ||||
"""Combine the results of the batch simulations. | ||||
|
||||
|
@@ -276,25 +319,38 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True): | |||
|
||||
# Results "CSV" | ||||
results_json_files = fs.glob(f'{sim_output_dir}/results_job*.json.gz') | ||||
if results_json_files: | ||||
delayed_results_dfs = [dask.delayed(read_results_json)(fs, x) for x in results_json_files] | ||||
results_df = dd.from_delayed(delayed_results_dfs, verify_meta=False) | ||||
else: | ||||
if not results_json_files: | ||||
raise ValueError("No simulation results found to post-process.") | ||||
|
||||
logger.info("Collecting all the columns and datatypes in results_job*.json.gz parquet files.") | ||||
all_schema_dict = db.from_sequence(results_json_files).map(partial(get_schema_dict, fs)).\ | ||||
fold(lambda x, y: merge_schema_dicts(x, y)).compute() | ||||
logger.info(f"Got {len(all_schema_dict)} columns") | ||||
all_results_cols = list(all_schema_dict.keys()) | ||||
all_schema_dict = {to_camelcase(key): value for key, value in all_schema_dict.items()} | ||||
logger.info(f"Got this schema: {all_schema_dict}\n") | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems unnecessary to print this out. Maybe helpful for debugging, but now that it's working you're probably good to remove this.
Suggested change
|
||||
delayed_results_dfs = [dask.delayed(partial(read_results_json, fs, all_cols=all_results_cols))(x) | ||||
for x in results_json_files] | ||||
results_df = dd.from_delayed(delayed_results_dfs, verify_meta=False) | ||||
|
||||
if do_timeseries: | ||||
# Look at all the parquet files to see what columns are in all of them. | ||||
logger.info("Collecting all the columns in timeseries parquet files.") | ||||
ts_filenames = fs.glob(f'{ts_in_dir}/up*/bldg*.parquet') | ||||
all_ts_cols = db.from_sequence(ts_filenames, partition_size=100).map(partial(get_cols, fs)).\ | ||||
fold(lambda x, y: x.union(y)).compute() | ||||
|
||||
# Sort the columns | ||||
all_ts_cols_sorted = ['building_id'] + sorted(x for x in all_ts_cols if x.startswith('time')) | ||||
all_ts_cols.difference_update(all_ts_cols_sorted) | ||||
all_ts_cols_sorted.extend(sorted(x for x in all_ts_cols if not x.endswith(']'))) | ||||
all_ts_cols.difference_update(all_ts_cols_sorted) | ||||
all_ts_cols_sorted.extend(sorted(all_ts_cols)) | ||||
if ts_filenames: | ||||
all_ts_cols = db.from_sequence(ts_filenames, partition_size=100).map(partial(get_cols, fs)).\ | ||||
fold(lambda x, y: x.union(y)).compute() | ||||
|
||||
# Sort the columns | ||||
all_ts_cols_sorted = ['building_id'] + sorted(x for x in all_ts_cols if x.startswith('time')) | ||||
all_ts_cols.difference_update(all_ts_cols_sorted) | ||||
all_ts_cols_sorted.extend(sorted(x for x in all_ts_cols if not x.endswith(']'))) | ||||
all_ts_cols.difference_update(all_ts_cols_sorted) | ||||
all_ts_cols_sorted.extend(sorted(all_ts_cols)) | ||||
logger.info(f"Got {len(all_ts_cols_sorted)} columns") | ||||
else: | ||||
logger.info("There are no timeseries files.") | ||||
do_timeseries = False | ||||
|
||||
results_df_groups = results_df.groupby('upgrade') | ||||
upgrade_start = 1 if cfg['baseline'].get('skip_sims', False) else 0 | ||||
|
@@ -303,20 +359,28 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True): | |||
logger.info(f"Processing upgrade {upgrade_id}. ") | ||||
df = dask.compute(results_df_groups.get_group(upgrade_id))[0] | ||||
logger.info(f"Obtained results_df for {upgrade_id} with {len(df)} datapoints. ") | ||||
df.sort_index(inplace=True) | ||||
df.rename(columns=to_camelcase, inplace=True) | ||||
df = clean_up_results_df(df, cfg, keep_upgrade_id=True) | ||||
|
||||
del df['upgrade'] | ||||
df.set_index('building_id', inplace=True) | ||||
schema = None | ||||
if upgrade_id > 0: | ||||
# Remove building characteristics for upgrade scenarios. | ||||
cols_to_keep = list( | ||||
filter(lambda x: not x.startswith('build_existing_model.'), df.columns) | ||||
) | ||||
df = df[cols_to_keep] | ||||
df = df.copy() | ||||
del df['upgrade'] | ||||
df.set_index('building_id', inplace=True) | ||||
df.sort_index(inplace=True) | ||||
|
||||
null_cols = get_null_cols(df) | ||||
# If certain column datatype is null (happens when it doesn't have any data), the datatype | ||||
# for that column is attempted to be determined based on datatype in other upgrades | ||||
if null_cols: | ||||
logger.info(f"Upgrade {upgrade_id} has null cols: {null_cols}") | ||||
schema, unresolved = correct_schema(all_schema_dict, df) | ||||
if unresolved: | ||||
logger.info(f"The types for {unresolved} columns couldn't be determined.") | ||||
else: | ||||
logger.info("All columns were successfully assigned a datatype based on other upgrades.") | ||||
# Write CSV | ||||
csv_filename = f"{results_csvs_dir}/results_up{upgrade_id:02d}.csv.gz" | ||||
logger.info(f'Writing {csv_filename}') | ||||
|
@@ -331,10 +395,13 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True): | |||
results_parquet_dir = f"{parquet_dir}/upgrades/upgrade={upgrade_id}" | ||||
if not fs.exists(results_parquet_dir): | ||||
fs.makedirs(results_parquet_dir) | ||||
parquet_filename = f"{results_parquet_dir}/results_up{upgrade_id:02d}.parquet" | ||||
logger.info(f'Writing {parquet_filename}') | ||||
write_dataframe_as_parquet( | ||||
df.reset_index(), | ||||
fs, | ||||
f"{results_parquet_dir}/results_up{upgrade_id:02d}.parquet" | ||||
parquet_filename, | ||||
schema=schema | ||||
) | ||||
|
||||
if do_timeseries: | ||||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems unnecessary to print this out. Maybe helpful for debugging, but now that it's working you're probably good to remove this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I debated keeping it or removing it. But I kept it because over information (upto a limit!) is better than under information for debugging, and postprocessing.out is kinda used mostly for debugging, so may it's fine to leave? Most users probably don't need to read the postprocessing.out unless they can't find their table in Athena.