Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep individual timeseries files #228

Merged
merged 7 commits into from May 4, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion buildstockbatch/aws/s3_assets/bsb_post.py
Expand Up @@ -57,7 +57,8 @@ def do_postprocessing(s3_bucket, s3_bucket_prefix):
f'{s3_bucket_prefix}/results/parquet'
)

remove_intermediate_files(fs, results_s3_loc)
keep_individual_timeseries = cfg.get('postprocessing', {}).get('keep_individual_timeseries', False)
remove_intermediate_files(fs, results_s3_loc, keep_individual_timeseries)


if __name__ == '__main__':
Expand Down
12 changes: 3 additions & 9 deletions buildstockbatch/base.py
Expand Up @@ -268,10 +268,7 @@ def validate_project_schema(project_file):
@staticmethod
def validate_misc_constraints(project_file):
# validate other miscellaneous constraints
cfg = get_project_configuration(project_file)

if cfg.get('postprocessing', {}).get('aggregate_timeseries', False):
logger.warning('aggregate_timeseries has been deprecated and will be removed in a future version.')
cfg = get_project_configuration(project_file) # noqa F841

return True

Expand Down Expand Up @@ -557,8 +554,5 @@ def process_results(self, skip_combine=False, force_upload=False):
if 'athena' in aws_conf:
postprocessing.create_athena_tables(aws_conf, os.path.basename(self.output_dir), s3_bucket, s3_prefix)

if not self.cfg.get('eagle', {}).get('postprocessing', {}).get('keep_intermediate_files', False):
logger.info("Removing intermediate files.")
postprocessing.remove_intermediate_files(fs, self.results_dir)
else:
logger.info("Skipped removing intermediate files.")
keep_individual_timeseries = self.cfg.get('postprocessing', {}).get('keep_individual_timeseries', False)
postprocessing.remove_intermediate_files(fs, self.results_dir, keep_individual_timeseries)
9 changes: 5 additions & 4 deletions buildstockbatch/postprocessing.py
Expand Up @@ -365,15 +365,16 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True):
logger.info(f"Finished combining and saving timeseries for upgrade{upgrade_id}.")


def remove_intermediate_files(fs, results_dir):
def remove_intermediate_files(fs, results_dir, keep_individual_timeseries=False):
# Remove aggregated files to save space
sim_output_dir = f'{results_dir}/simulation_output'
ts_in_dir = f'{sim_output_dir}/timeseries'
results_job_json_glob = f'{sim_output_dir}/results_job*.json.gz'
logger.info('Removing temporary files')
fs.rm(ts_in_dir, recursive=True)
logger.info('Removing results_job*.json.gz')
for filename in fs.glob(results_job_json_glob):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nmerket In the past this function would run even if there was an uncaught error in the metadata results aggregation, leading to the entire analysis having to be rerun. Is it possible still for this code to be reached if that dask cluster job doesn't complete successfully?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible. I didn't do anything in this PR to mitigate that possibility. In general, this should run only after the results.csv aggregation is complete. Is there a way you could provide a minimum verifiable example of this issue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the scope of this PR is to leave the timeseries files on disk, not the results jsons.

fs.rm(filename)
if not keep_individual_timeseries:
ts_in_dir = f'{sim_output_dir}/timeseries'
fs.rm(ts_in_dir, recursive=True)


def upload_results(aws_conf, output_dir, results_dir):
Expand Down
3 changes: 1 addition & 2 deletions buildstockbatch/schemas/v0.3.yaml
Expand Up @@ -48,7 +48,6 @@ hpc-postprocessing-spec:
n_workers: int(required=False)
node_memory_mb: enum(85248, 180224, 751616, required=False)
parquet_memory_mb: int(required=False)
keep_intermediate_files: bool(required=False)

sampler-spec:
type: str(required=True)
Expand Down Expand Up @@ -106,7 +105,7 @@ cost-spec:

postprocessing-spec:
aws: include('aws-postprocessing-spec', required=False)
aggregate_timeseries: bool(required=False)
keep_individual_timeseries: bool(required=False)

aws-postprocessing-spec:
region_name: str(required=False)
Expand Down
14 changes: 2 additions & 12 deletions buildstockbatch/test/test_base.py
Expand Up @@ -62,12 +62,7 @@ def test_combine_files_flexible(basic_residential_project_file, mocker):
# test_results/results_csvs need to be updated with new data *if* columns were indeed supposed to be added/
# removed/renamed.

post_process_config = {
'postprocessing': {
'aggregate_timeseries': True
}
}
project_filename, results_dir = basic_residential_project_file(post_process_config)
project_filename, results_dir = basic_residential_project_file()

mocker.patch.object(BuildStockBatchBase, 'weather_dir', None)
get_dask_client_mock = mocker.patch.object(BuildStockBatchBase, 'get_dask_client')
Expand Down Expand Up @@ -178,12 +173,7 @@ def test_downselect_integer_options(basic_residential_project_file, mocker):

def test_combine_files(basic_residential_project_file):

post_process_config = {
'postprocessing': {
'aggregate_timeseries': True
}
}
project_filename, results_dir = basic_residential_project_file(post_process_config)
project_filename, results_dir = basic_residential_project_file()

with patch.object(BuildStockBatchBase, 'weather_dir', None), \
patch.object(BuildStockBatchBase, 'get_dask_client') as get_dask_client_mock, \
Expand Down
29 changes: 23 additions & 6 deletions buildstockbatch/test/test_postprocessing.py
Expand Up @@ -80,16 +80,33 @@ def test_large_parquet_combine(basic_residential_project_file):
# Test a simulated scenario where the individual timeseries parquet are larger than the max memory per partition
# allocated for the parquet file combining.

post_process_config = {
'postprocessing': {
'aggregate_timeseries': True
}
}
project_filename, results_dir = basic_residential_project_file(post_process_config)
project_filename, results_dir = basic_residential_project_file()

with patch.object(BuildStockBatchBase, 'weather_dir', None), \
patch.object(BuildStockBatchBase, 'get_dask_client'), \
patch.object(BuildStockBatchBase, 'results_dir', results_dir),\
patch.object(postprocessing, 'MAX_PARQUET_MEMORY', 1e6): # set the max memory to just 1MB
bsb = BuildStockBatchBase(project_filename)
bsb.process_results() # this would raise exception if the postprocessing could not handle the situation


@pytest.mark.parametrize('keep_individual_timeseries', [True, False])
def test_keep_individual_timeseries(keep_individual_timeseries, basic_residential_project_file, mocker):
project_filename, results_dir = basic_residential_project_file({
'postprocessing': {
'keep_individual_timeseries': keep_individual_timeseries
}
})

mocker.patch.object(BuildStockBatchBase, 'weather_dir', None)
mocker.patch.object(BuildStockBatchBase, 'get_dask_client')
mocker.patch.object(BuildStockBatchBase, 'results_dir', results_dir)
bsb = BuildStockBatchBase(project_filename)
bsb.process_results()

results_path = pathlib.Path(results_dir)
simout_path = results_path / 'simulation_output'
assert len(list(simout_path.glob('results_job*.json.gz'))) == 0

ts_path = simout_path / 'timeseries'
assert ts_path.exists() == keep_individual_timeseries
10 changes: 10 additions & 0 deletions docs/changelog/changelog_dev.rst
Expand Up @@ -82,3 +82,13 @@ Development Changelog
:tickets:

Fix for create_eagle_env.sh not creating environment.

.. change::
:tags: postprocessing
:pullreq: 228
:tickets: 182

Moves the ``eagle.postprocessing.keep_intermediate_files`` to
``postprocessing.keep_individual_timeseries`` and changes behavior to
keep only the timeseries parquet files. Also, removes the deprecated
``aggregate_timeseries`` key as that aggregation always happens.
88 changes: 57 additions & 31 deletions docs/changelog/migration_0_20.rst
Expand Up @@ -169,37 +169,6 @@ New Spec:
reporting_frequency: Hourly
include_enduse_subcategories: true

Commercial Workflw Generator Hard-Coded Measures
------------------------------------------------

The commercial workflow generator has changed to remove most of the hard-coded
reporting measures, allowing them to be added to the config file as-needed.
This should avoid the need to create custom BuildStockBatch environments
for each project that needs to add/remove/modify reporting measures.

Old hard-coded reporting measures:

- SimulationOutputReport
- OpenStudio Results (measure_dir_name: f8e23017-894d-4bdf-977f-37e3961e6f42)
- TimeseriesCSVExport
- comstock_sensitivity_reports
- qoi_report
- la_100_qaqc (if include_qaqc = true in config)
- simulation_settings_check (if include_qaqc = true in config)

New hard-coded reporting measures:

- SimulationOutputReport (reports annual totals in results.csv)
- TimeseriesCSVExport (generates timeseries results at Timestep frequency)

Two other hard-coded model measures were removed from the workflow. These will
be added to the workflow via the options-lookup.tsv in ComStock instead.

Removed hard-coded model measures:

- add_blinds_to_selected_windows
- set_space_type_load_subcategories

Reporting Measures in Workflows
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down Expand Up @@ -231,6 +200,37 @@ New Spec:
- measure_dir_name: ReportingMeasure2


Commercial Workflow Generator Hard-Coded Measures
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

The commercial workflow generator has changed to remove most of the hard-coded
reporting measures, allowing them to be added to the config file as-needed.
This should avoid the need to create custom BuildStockBatch environments
for each project that needs to add/remove/modify reporting measures.

Old hard-coded reporting measures:

- ``SimulationOutputReport``
- OpenStudio Results (measure_dir_name: ``f8e23017-894d-4bdf-977f-37e3961e6f42``)
- ``TimeseriesCSVExport``
- ``comstock_sensitivity_reports``
- ``qoi_report``
- ``la_100_qaqc`` (if include_qaqc = true in config)
- ``simulation_settings_check`` (if include_qaqc = true in config)

New hard-coded reporting measures:

- ``SimulationOutputReport`` (reports annual totals in results.csv)
- ``TimeseriesCSVExport`` (generates timeseries results at Timestep frequency)

Two other hard-coded model measures were removed from the workflow. These will
be added to the workflow via the options-lookup.tsv in ComStock instead.

Removed hard-coded model measures:

- ``add_blinds_to_selected_windows``
- ``set_space_type_load_subcategories``

AWS EMR Configuration Name Changes
----------------------------------

Expand All @@ -246,3 +246,29 @@ renamed the following keys under ``aws.emr``:
+----------------------+-----------------------+
| slave_instance_count | worker_instance_count |
+----------------------+-----------------------+


Keep Individual Timeseries
--------------------------

For some applications it is helpful to keep the timeseries parquet files for
each simulation. Normally, they are aggregated into fewer, larger files. There
was a key introduced in v0.19.1 that enabled this. We moved it to a new home
place in the config file.

Old Spec:

.. code-block:: yaml

schema_version: 0.2
eagle:
postprocessing:
keep_intermediate_files: true # default false if omitted

New Spec:

.. code-block:: yaml

schema_version: '0.3'
postprocessing:
keep_individual_timeseries: true # default false if omitted
16 changes: 9 additions & 7 deletions docs/project_defn.rst
Expand Up @@ -148,9 +148,6 @@ the Eagle supercomputer.
* ``node_memory_mb``: The memory (in MB) to request for eagle node for postprocessing. The valid values are
85248, 180224 and 751616. Default is 85248.
* ``parquet_memory_mb``: The size (in MB) of the combined parquet file in memory. Default is 40000.
* ``keep_intermediate_files``: Set this to true if you want to keep postprocessing intermediate files (for debugging
or other explorative purpose). The intermediate files contain results_job*.json.gz
files and individual building's timeseries parquet files. Default is false.

.. _aws-config:

Expand Down Expand Up @@ -222,10 +219,11 @@ follows:
fewer larger parquet files that are better suited for querying using big data
analysis tools.

For ResStock runs with the ResidentialScheduleGenerator, the generated schedules
are horizontally concatenated with the time series files before aggregation,
making sure the schedule values are properly lined up with the timestamps in the
`same way that Energeyplus handles ScheduleFiles <https://github.com/NREL/resstock/issues/469#issuecomment-697849076>`_.
For ResStock runs with the ResidentialScheduleGenerator, the generated schedules
are horizontally concatenated with the time series files before aggregation,
making sure the schedule values are properly lined up with the timestamps in the
`same way that Energeyplus handles ScheduleFiles
<https://github.com/NREL/resstock/issues/469#issuecomment-697849076>`_.


Uploading to AWS Athena
Expand Down Expand Up @@ -255,6 +253,10 @@ The configuration options for postprocessing and AWS upload are:

* ``postprocessing``: postprocessing configuration

* ``keep_individual_timeseries``: For some use cases it is useful to keep
the timeseries output for each simulation as a separate parquet file.
Setting this option to ``true`` allows that. Default is ``false``.

* ``aws``: configuration related to uploading to and managing data in amazon web services. For this to work, please
`configure aws. <https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html#configuration>`_
Including this key will cause your datasets to be uploaded to AWS, omitting it will cause them not to be uploaded.
Expand Down
1 change: 0 additions & 1 deletion project_resstock_national.yml
Expand Up @@ -59,7 +59,6 @@ aws:
notifications_email: user@nrel.gov

postprocessing:
aggregate_timeseries: true
aws:
region_name: 'us-west-2'
s3:
Expand Down