Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply correct schema for all upgrades #271

Merged
merged 7 commits into from
Mar 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
113 changes: 90 additions & 23 deletions buildstockbatch/postprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ def read_simulation_outputs(fs, reporting_measures, sim_dir, upgrade_id, buildin
return dpout


def write_dataframe_as_parquet(df, fs, filename):
tbl = pa.Table.from_pandas(df, preserve_index=False)
def write_dataframe_as_parquet(df, fs, filename, schema=None):
tbl = pa.Table.from_pandas(df, schema=schema, preserve_index=False)
with fs.open(filename, 'wb') as f:
parquet.write_table(tbl, f)

Expand Down Expand Up @@ -220,17 +220,37 @@ def get_cols(fs, filename):
return set(schema.names)


def read_results_json(fs, filename):
def read_results_json(fs, filename, all_cols=None):
with fs.open(filename, 'rb') as f1:
with gzip.open(f1, 'rt', encoding='utf-8') as f2:
dpouts = json.load(f2)
df = pd.DataFrame(dpouts)
df['job_id'] = int(re.search(r'results_job(\d+)\.json\.gz', filename).group(1))
if all_cols is not None:
for missing_col in set(all_cols).difference(df.columns.values):
df[missing_col] = None
# Sorting is needed to ensure all dfs have same column order. Dask will fail otherwise.
df = df.reindex(sorted(df.columns), axis=1)
return df


def get_schema_dict(fs, filename):
df = read_results_json(fs, filename)
df = df.replace('', np.nan) # required to make pa correctly infer the dtypes
sch = pa.Schema.from_pandas(df)
sch_dict = {name: type for name, type in zip(sch.names, sch.types)}
return sch_dict


def merge_schema_dicts(dict1, dict2):
new_dict = dict(dict1)
for col, dtype2 in dict2.items():
dtype1 = new_dict.get(col)
if col not in new_dict or dtype1 == pa.null():
new_dict[col] = dtype2
return new_dict


def read_enduse_timeseries_parquet(fs, filename, all_cols):
with fs.open(filename, 'rb') as f:
df = pd.read_parquet(f, engine='pyarrow')
Expand All @@ -249,6 +269,29 @@ def read_and_concat_enduse_timeseries_parquet(fs, all_cols, output_dir, filename
del grouped_df


def get_null_cols(df):
sch = pa.Schema.from_pandas(df)
null_cols = []
for col, dtype in zip(sch.names, sch.types):
if dtype == pa.null():
null_cols.append(col)
return null_cols


def correct_schema(cur_schema_dict, df):
sch = pa.Schema.from_pandas(df)
sch_dict = {name: type for name, type in zip(sch.names, sch.types)}
unresolved = []
for col, dtype in sch_dict.items():
if dtype == pa.null():
if col in cur_schema_dict:
indx = sch.get_field_index(col)
sch = sch.set(indx, pa.field(col, cur_schema_dict.get(col)))
else:
unresolved.append(col)
return sch, unresolved


def combine_results(fs, results_dir, cfg, do_timeseries=True):
"""Combine the results of the batch simulations.

Expand Down Expand Up @@ -276,25 +319,38 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True):

# Results "CSV"
results_json_files = fs.glob(f'{sim_output_dir}/results_job*.json.gz')
if results_json_files:
delayed_results_dfs = [dask.delayed(read_results_json)(fs, x) for x in results_json_files]
results_df = dd.from_delayed(delayed_results_dfs, verify_meta=False)
else:
if not results_json_files:
raise ValueError("No simulation results found to post-process.")

logger.info("Collecting all the columns and datatypes in results_job*.json.gz parquet files.")
all_schema_dict = db.from_sequence(results_json_files).map(partial(get_schema_dict, fs)).\
fold(lambda x, y: merge_schema_dicts(x, y)).compute()
logger.info(f"Got {len(all_schema_dict)} columns")
all_results_cols = list(all_schema_dict.keys())
all_schema_dict = {to_camelcase(key): value for key, value in all_schema_dict.items()}
logger.info(f"Got this schema: {all_schema_dict}\n")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems unnecessary to print this out. Maybe helpful for debugging, but now that it's working you're probably good to remove this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I debated keeping it or removing it. But I kept it because over information (upto a limit!) is better than under information for debugging, and postprocessing.out is kinda used mostly for debugging, so may it's fine to leave? Most users probably don't need to read the postprocessing.out unless they can't find their table in Athena.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems unnecessary to print this out. Maybe helpful for debugging, but now that it's working you're probably good to remove this.

Suggested change
logger.info(f"Got this schema: {all_schema_dict}\n")

delayed_results_dfs = [dask.delayed(partial(read_results_json, fs, all_cols=all_results_cols))(x)
for x in results_json_files]
results_df = dd.from_delayed(delayed_results_dfs, verify_meta=False)

if do_timeseries:
# Look at all the parquet files to see what columns are in all of them.
logger.info("Collecting all the columns in timeseries parquet files.")
ts_filenames = fs.glob(f'{ts_in_dir}/up*/bldg*.parquet')
all_ts_cols = db.from_sequence(ts_filenames, partition_size=100).map(partial(get_cols, fs)).\
fold(lambda x, y: x.union(y)).compute()

# Sort the columns
all_ts_cols_sorted = ['building_id'] + sorted(x for x in all_ts_cols if x.startswith('time'))
all_ts_cols.difference_update(all_ts_cols_sorted)
all_ts_cols_sorted.extend(sorted(x for x in all_ts_cols if not x.endswith(']')))
all_ts_cols.difference_update(all_ts_cols_sorted)
all_ts_cols_sorted.extend(sorted(all_ts_cols))
if ts_filenames:
all_ts_cols = db.from_sequence(ts_filenames, partition_size=100).map(partial(get_cols, fs)).\
fold(lambda x, y: x.union(y)).compute()

# Sort the columns
all_ts_cols_sorted = ['building_id'] + sorted(x for x in all_ts_cols if x.startswith('time'))
all_ts_cols.difference_update(all_ts_cols_sorted)
all_ts_cols_sorted.extend(sorted(x for x in all_ts_cols if not x.endswith(']')))
all_ts_cols.difference_update(all_ts_cols_sorted)
all_ts_cols_sorted.extend(sorted(all_ts_cols))
logger.info(f"Got {len(all_ts_cols_sorted)} columns")
else:
logger.info("There are no timeseries files.")
do_timeseries = False

results_df_groups = results_df.groupby('upgrade')
upgrade_start = 1 if cfg['baseline'].get('skip_sims', False) else 0
Expand All @@ -303,20 +359,28 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True):
logger.info(f"Processing upgrade {upgrade_id}. ")
df = dask.compute(results_df_groups.get_group(upgrade_id))[0]
logger.info(f"Obtained results_df for {upgrade_id} with {len(df)} datapoints. ")
df.sort_index(inplace=True)
df.rename(columns=to_camelcase, inplace=True)
df = clean_up_results_df(df, cfg, keep_upgrade_id=True)

del df['upgrade']
df.set_index('building_id', inplace=True)
schema = None
if upgrade_id > 0:
# Remove building characteristics for upgrade scenarios.
cols_to_keep = list(
filter(lambda x: not x.startswith('build_existing_model.'), df.columns)
)
df = df[cols_to_keep]
df = df.copy()
del df['upgrade']
df.set_index('building_id', inplace=True)
df.sort_index(inplace=True)

null_cols = get_null_cols(df)
# If certain column datatype is null (happens when it doesn't have any data), the datatype
# for that column is attempted to be determined based on datatype in other upgrades
if null_cols:
logger.info(f"Upgrade {upgrade_id} has null cols: {null_cols}")
schema, unresolved = correct_schema(all_schema_dict, df)
if unresolved:
logger.info(f"The types for {unresolved} columns couldn't be determined.")
else:
logger.info("All columns were successfully assigned a datatype based on other upgrades.")
# Write CSV
csv_filename = f"{results_csvs_dir}/results_up{upgrade_id:02d}.csv.gz"
logger.info(f'Writing {csv_filename}')
Expand All @@ -331,10 +395,13 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True):
results_parquet_dir = f"{parquet_dir}/upgrades/upgrade={upgrade_id}"
if not fs.exists(results_parquet_dir):
fs.makedirs(results_parquet_dir)
parquet_filename = f"{results_parquet_dir}/results_up{upgrade_id:02d}.parquet"
logger.info(f'Writing {parquet_filename}')
write_dataframe_as_parquet(
df.reset_index(),
fs,
f"{results_parquet_dir}/results_up{upgrade_id:02d}.parquet"
parquet_filename,
schema=schema
)

if do_timeseries:
Expand Down
8 changes: 8 additions & 0 deletions docs/changelog/changelog_dev.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,11 @@ Development Changelog
:tickets:

Add ``residential_hpxml`` workflow generator.

.. change::
:tags: bugfix
:pullreq: 271
:tickets:

Postprocessing can correctly handle assortment of upgrades with overlaping set of columns with missing and
non-missing values.