Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip upgrades without ts #230

Merged
merged 4 commits into from May 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions buildstockbatch/localdocker.py
Expand Up @@ -11,6 +11,7 @@
"""

import argparse
from dask.distributed import Client, LocalCluster
import docker
import functools
from fsspec.implementations.local import LocalFileSystem
Expand Down Expand Up @@ -217,6 +218,10 @@ def results_dir(self):
os.makedirs(results_dir)
return results_dir

def get_dask_client(self):
cluster = LocalCluster(local_directory=os.path.join(self.results_dir, 'dask-tmp'))
return Client(cluster)


@log_error_details()
def main():
Expand Down
11 changes: 8 additions & 3 deletions buildstockbatch/postprocessing.py
Expand Up @@ -29,6 +29,7 @@
import random
import re
from s3fs import S3FileSystem
import tempfile
import time

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -325,7 +326,7 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True):
ts_filenames = fs.glob(f'{ts_in_dir}/up{upgrade_id:02d}/bldg*.parquet')

if not ts_filenames:
logger.info(f"There are no timeseries files for upgrade{upgrade_id}.")
logger.warning(f"There are no timeseries files for upgrade{upgrade_id}.")
continue

# Calculate the mean and estimate the total memory usage
Expand Down Expand Up @@ -359,8 +360,12 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True):
partial(read_and_concat_enduse_timeseries_parquet, fs, all_ts_cols_sorted, ts_out_loc)
)
group_ids = list(range(npartitions))
with performance_report(filename=f'dask_combine_report{upgrade_id}.html'):
dask.compute(map(read_and_concat_ts_pq_d, ts_files_in_each_partition, group_ids))
with tempfile.TemporaryDirectory() as tmpdir:
tmpfilepath = Path(tmpdir, 'dask-report.html')
with performance_report(filename=str(tmpfilepath)):
dask.compute(map(read_and_concat_ts_pq_d, ts_files_in_each_partition, group_ids))
if tmpfilepath.exists():
fs.put_file(str(tmpfilepath), f'{results_dir}/dask_combine_report{upgrade_id}.html')

logger.info(f"Finished combining and saving timeseries for upgrade{upgrade_id}.")

Expand Down
22 changes: 22 additions & 0 deletions buildstockbatch/test/test_postprocessing.py
@@ -1,6 +1,8 @@
from fsspec.implementations.local import LocalFileSystem
import gzip
import json
import logging
import os
import pandas as pd
import pathlib
import re
Expand Down Expand Up @@ -110,3 +112,23 @@ def test_keep_individual_timeseries(keep_individual_timeseries, basic_residentia

ts_path = simout_path / 'timeseries'
assert ts_path.exists() == keep_individual_timeseries


def test_upgrade_missing_ts(basic_residential_project_file, mocker, caplog):
caplog.set_level(logging.WARNING, logger='buildstockbatch.postprocessing')

project_filename, results_dir = basic_residential_project_file()
results_path = pathlib.Path(results_dir)
for filename in (results_path / 'simulation_output' / 'timeseries' / 'up01').glob('*.parquet'):
os.remove(filename)

mocker.patch.object(BuildStockBatchBase, 'weather_dir', None)
mocker.patch.object(BuildStockBatchBase, 'get_dask_client')
mocker.patch.object(BuildStockBatchBase, 'results_dir', results_dir)
bsb = BuildStockBatchBase(project_filename)
bsb.process_results()

assert len(caplog.records) == 1
record = caplog.records[0]
assert record.levelname == 'WARNING'
assert record.message == 'There are no timeseries files for upgrade1.'
10 changes: 10 additions & 0 deletions docs/changelog/changelog_dev.rst
Expand Up @@ -100,3 +100,13 @@ Development Changelog

Modifies docs to specify that the ``eagle.postprocessing.n_workers`` key
is for how many Eagle nodes are used and indicates the default of 2.

.. change::
:tags: postprocessing, bugfix
:pullreq: 230
:tickets: 199

Previously the postprocessing would fail if an upgrade scenario didn't
have any timeseries simulation output. Now it will skip it and post a
warning message. This was fixed previously, but now we have tests for
it.