Skip to content

Commit

Permalink
Merge pull request #271 from NREL/postprocessing_fix
Browse files Browse the repository at this point in the history
Apply correct schema for all upgrades
  • Loading branch information
nmerket committed Mar 17, 2022
2 parents df7cab3 + 6659280 commit 608279d
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 23 deletions.
113 changes: 90 additions & 23 deletions buildstockbatch/postprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ def read_simulation_outputs(fs, reporting_measures, sim_dir, upgrade_id, buildin
return dpout


def write_dataframe_as_parquet(df, fs, filename):
tbl = pa.Table.from_pandas(df, preserve_index=False)
def write_dataframe_as_parquet(df, fs, filename, schema=None):
tbl = pa.Table.from_pandas(df, schema=schema, preserve_index=False)
with fs.open(filename, 'wb') as f:
parquet.write_table(tbl, f)

Expand Down Expand Up @@ -220,17 +220,37 @@ def get_cols(fs, filename):
return set(schema.names)


def read_results_json(fs, filename):
def read_results_json(fs, filename, all_cols=None):
with fs.open(filename, 'rb') as f1:
with gzip.open(f1, 'rt', encoding='utf-8') as f2:
dpouts = json.load(f2)
df = pd.DataFrame(dpouts)
df['job_id'] = int(re.search(r'results_job(\d+)\.json\.gz', filename).group(1))
if all_cols is not None:
for missing_col in set(all_cols).difference(df.columns.values):
df[missing_col] = None
# Sorting is needed to ensure all dfs have same column order. Dask will fail otherwise.
df = df.reindex(sorted(df.columns), axis=1)
return df


def get_schema_dict(fs, filename):
df = read_results_json(fs, filename)
df = df.replace('', np.nan) # required to make pa correctly infer the dtypes
sch = pa.Schema.from_pandas(df)
sch_dict = {name: type for name, type in zip(sch.names, sch.types)}
return sch_dict


def merge_schema_dicts(dict1, dict2):
new_dict = dict(dict1)
for col, dtype2 in dict2.items():
dtype1 = new_dict.get(col)
if col not in new_dict or dtype1 == pa.null():
new_dict[col] = dtype2
return new_dict


def read_enduse_timeseries_parquet(fs, filename, all_cols):
with fs.open(filename, 'rb') as f:
df = pd.read_parquet(f, engine='pyarrow')
Expand All @@ -249,6 +269,29 @@ def read_and_concat_enduse_timeseries_parquet(fs, all_cols, output_dir, filename
del grouped_df


def get_null_cols(df):
sch = pa.Schema.from_pandas(df)
null_cols = []
for col, dtype in zip(sch.names, sch.types):
if dtype == pa.null():
null_cols.append(col)
return null_cols


def correct_schema(cur_schema_dict, df):
sch = pa.Schema.from_pandas(df)
sch_dict = {name: type for name, type in zip(sch.names, sch.types)}
unresolved = []
for col, dtype in sch_dict.items():
if dtype == pa.null():
if col in cur_schema_dict:
indx = sch.get_field_index(col)
sch = sch.set(indx, pa.field(col, cur_schema_dict.get(col)))
else:
unresolved.append(col)
return sch, unresolved


def combine_results(fs, results_dir, cfg, do_timeseries=True):
"""Combine the results of the batch simulations.
Expand Down Expand Up @@ -276,25 +319,38 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True):

# Results "CSV"
results_json_files = fs.glob(f'{sim_output_dir}/results_job*.json.gz')
if results_json_files:
delayed_results_dfs = [dask.delayed(read_results_json)(fs, x) for x in results_json_files]
results_df = dd.from_delayed(delayed_results_dfs, verify_meta=False)
else:
if not results_json_files:
raise ValueError("No simulation results found to post-process.")

logger.info("Collecting all the columns and datatypes in results_job*.json.gz parquet files.")
all_schema_dict = db.from_sequence(results_json_files).map(partial(get_schema_dict, fs)).\
fold(lambda x, y: merge_schema_dicts(x, y)).compute()
logger.info(f"Got {len(all_schema_dict)} columns")
all_results_cols = list(all_schema_dict.keys())
all_schema_dict = {to_camelcase(key): value for key, value in all_schema_dict.items()}
logger.info(f"Got this schema: {all_schema_dict}\n")
delayed_results_dfs = [dask.delayed(partial(read_results_json, fs, all_cols=all_results_cols))(x)
for x in results_json_files]
results_df = dd.from_delayed(delayed_results_dfs, verify_meta=False)

if do_timeseries:
# Look at all the parquet files to see what columns are in all of them.
logger.info("Collecting all the columns in timeseries parquet files.")
ts_filenames = fs.glob(f'{ts_in_dir}/up*/bldg*.parquet')
all_ts_cols = db.from_sequence(ts_filenames, partition_size=100).map(partial(get_cols, fs)).\
fold(lambda x, y: x.union(y)).compute()

# Sort the columns
all_ts_cols_sorted = ['building_id'] + sorted(x for x in all_ts_cols if x.startswith('time'))
all_ts_cols.difference_update(all_ts_cols_sorted)
all_ts_cols_sorted.extend(sorted(x for x in all_ts_cols if not x.endswith(']')))
all_ts_cols.difference_update(all_ts_cols_sorted)
all_ts_cols_sorted.extend(sorted(all_ts_cols))
if ts_filenames:
all_ts_cols = db.from_sequence(ts_filenames, partition_size=100).map(partial(get_cols, fs)).\
fold(lambda x, y: x.union(y)).compute()

# Sort the columns
all_ts_cols_sorted = ['building_id'] + sorted(x for x in all_ts_cols if x.startswith('time'))
all_ts_cols.difference_update(all_ts_cols_sorted)
all_ts_cols_sorted.extend(sorted(x for x in all_ts_cols if not x.endswith(']')))
all_ts_cols.difference_update(all_ts_cols_sorted)
all_ts_cols_sorted.extend(sorted(all_ts_cols))
logger.info(f"Got {len(all_ts_cols_sorted)} columns")
else:
logger.info("There are no timeseries files.")
do_timeseries = False

results_df_groups = results_df.groupby('upgrade')
upgrade_start = 1 if cfg['baseline'].get('skip_sims', False) else 0
Expand All @@ -303,20 +359,28 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True):
logger.info(f"Processing upgrade {upgrade_id}. ")
df = dask.compute(results_df_groups.get_group(upgrade_id))[0]
logger.info(f"Obtained results_df for {upgrade_id} with {len(df)} datapoints. ")
df.sort_index(inplace=True)
df.rename(columns=to_camelcase, inplace=True)
df = clean_up_results_df(df, cfg, keep_upgrade_id=True)

del df['upgrade']
df.set_index('building_id', inplace=True)
schema = None
if upgrade_id > 0:
# Remove building characteristics for upgrade scenarios.
cols_to_keep = list(
filter(lambda x: not x.startswith('build_existing_model.'), df.columns)
)
df = df[cols_to_keep]
df = df.copy()
del df['upgrade']
df.set_index('building_id', inplace=True)
df.sort_index(inplace=True)

null_cols = get_null_cols(df)
# If certain column datatype is null (happens when it doesn't have any data), the datatype
# for that column is attempted to be determined based on datatype in other upgrades
if null_cols:
logger.info(f"Upgrade {upgrade_id} has null cols: {null_cols}")
schema, unresolved = correct_schema(all_schema_dict, df)
if unresolved:
logger.info(f"The types for {unresolved} columns couldn't be determined.")
else:
logger.info("All columns were successfully assigned a datatype based on other upgrades.")
# Write CSV
csv_filename = f"{results_csvs_dir}/results_up{upgrade_id:02d}.csv.gz"
logger.info(f'Writing {csv_filename}')
Expand All @@ -331,10 +395,13 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True):
results_parquet_dir = f"{parquet_dir}/upgrades/upgrade={upgrade_id}"
if not fs.exists(results_parquet_dir):
fs.makedirs(results_parquet_dir)
parquet_filename = f"{results_parquet_dir}/results_up{upgrade_id:02d}.parquet"
logger.info(f'Writing {parquet_filename}')
write_dataframe_as_parquet(
df.reset_index(),
fs,
f"{results_parquet_dir}/results_up{upgrade_id:02d}.parquet"
parquet_filename,
schema=schema
)

if do_timeseries:
Expand Down
8 changes: 8 additions & 0 deletions docs/changelog/changelog_dev.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,11 @@ Development Changelog
:tickets:

Add ``residential_hpxml`` workflow generator.

.. change::
:tags: bugfix
:pullreq: 271
:tickets:

Postprocessing can correctly handle assortment of upgrades with overlaping set of columns with missing and
non-missing values.

0 comments on commit 608279d

Please sign in to comment.