Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge patch #213

Merged
merged 16 commits into from Mar 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion buildstockbatch/base.py
Expand Up @@ -551,4 +551,8 @@ def process_results(self, skip_combine=False, force_upload=False):
if 'athena' in aws_conf:
postprocessing.create_athena_tables(aws_conf, os.path.basename(self.output_dir), s3_bucket, s3_prefix)

postprocessing.remove_intermediate_files(fs, self.results_dir)
if not self.cfg['eagle'].get('postprocessing', {}).get('keep_intermediate_files', False):
logger.info("Removing intermediate files.")
postprocessing.remove_intermediate_files(fs, self.results_dir)
else:
logger.info("Skipped removing intermediate files.")
9 changes: 5 additions & 4 deletions buildstockbatch/eagle.py
Expand Up @@ -451,7 +451,8 @@ def queue_post_processing(self, after_jobids=[], upload_only=False, hipri=False)
# Configuration values
account = self.cfg['eagle']['account']
walltime = self.cfg['eagle'].get('postprocessing', {}).get('time', '1:30:00')

memory = self.cfg['eagle'].get('postprocessing', {}).get('node_memory_mb', 85248)
print(f"Submitting job to {memory}MB memory nodes.")
# Throw an error if the files already exist.
if not upload_only:
for subdir in ('parquet', 'results_csvs'):
Expand All @@ -475,19 +476,20 @@ def queue_post_processing(self, after_jobids=[], upload_only=False, hipri=False)
env['MY_CONDA_ENV'] = os.environ['CONDA_PREFIX']
env['OUT_DIR'] = self.output_dir
env['UPLOADONLY'] = str(upload_only)
env['MEMORY'] = str(memory)
here = os.path.dirname(os.path.abspath(__file__))
eagle_post_sh = os.path.join(here, 'eagle_postprocessing.sh')

args = [
'sbatch',
'--account={}'.format(account),
'--time={}'.format(walltime),
'--export=PROJECTFILE,MY_CONDA_ENV,OUT_DIR,UPLOADONLY',
'--export=PROJECTFILE,MY_CONDA_ENV,OUT_DIR,UPLOADONLY,MEMORY',
'--job-name=bstkpost',
'--output=postprocessing.out',
'--nodes=1',
':',
'--mem=180000',
'--mem={}'.format(memory),
'--output=dask_workers.out',
'--nodes={}'.format(self.cfg['eagle'].get('postprocessing', {}).get('n_workers', 2)),
eagle_post_sh
Expand Down Expand Up @@ -556,7 +558,6 @@ def user_cli(argv=sys.argv[1:]):
'''
This is the user entry point for running buildstockbatch on Eagle
'''

# set up logging, currently based on within-this-file hard-coded config
logging.config.dictConfig(logging_config)

Expand Down
4 changes: 2 additions & 2 deletions buildstockbatch/eagle_postprocessing.sh
@@ -1,5 +1,4 @@
#!/bin/bash

echo "begin eagle_postprocessing.sh"

echo "Job ID: $SLURM_JOB_ID"
Expand All @@ -11,6 +10,7 @@ source activate "$MY_CONDA_ENV"
export POSTPROCESS=1

echo "UPLOADONLY: ${UPLOADONLY}"
echo "MEMORY: ${MEMORY}"

SCHEDULER_FILE=$OUT_DIR/dask_scheduler.json

Expand All @@ -22,6 +22,6 @@ echo $SLURM_JOB_NODELIST_PACK_GROUP_1
pdsh -w $SLURM_JOB_NODELIST_PACK_GROUP_1 "free -h"

$MY_CONDA_ENV/bin/dask-scheduler --scheduler-file $SCHEDULER_FILE &> $OUT_DIR/dask_scheduler.out &
pdsh -w $SLURM_JOB_NODELIST_PACK_GROUP_1 "$MY_CONDA_ENV/bin/dask-worker --scheduler-file $SCHEDULER_FILE --local-directory /tmp/scratch/dask --nprocs 9" &> $OUT_DIR/dask_workers.out &
pdsh -w $SLURM_JOB_NODELIST_PACK_GROUP_1 "$MY_CONDA_ENV/bin/dask-worker --scheduler-file $SCHEDULER_FILE --local-directory /tmp/scratch/dask --nprocs 1 --nthreads 1 --memory-limit ${MEMORY}MB" &> $OUT_DIR/dask_workers.out &

time python -u -m buildstockbatch.eagle "$PROJECTFILE"
57 changes: 35 additions & 22 deletions buildstockbatch/postprocessing.py
Expand Up @@ -9,10 +9,9 @@
:copyright: (c) 2018 by The Alliance for Sustainable Energy
:license: BSD-3
"""

import boto3
import dask.bag as db
import dask.dataframe as dd
from dask.distributed import performance_report
import dask
import datetime as dt
from fsspec.implementations.local import LocalFileSystem
Expand All @@ -34,7 +33,7 @@

logger = logging.getLogger(__name__)

MAX_PARQUET_MEMORY = 1e9 # maximum size of the parquet file in memory when combining multiple parquets
MAX_PARQUET_MEMORY = 4000 # maximum size (MB) of the parquet file in memory when combining multiple parquets


def read_data_point_out_json(fs, reporting_measures, filename):
Expand Down Expand Up @@ -227,8 +226,12 @@ def read_enduse_timeseries_parquet(fs, filename, all_cols):
return df[all_cols]


def read_and_concat_enduse_timeseries_parquet(fs, filenames, all_cols):
return pd.concat(read_enduse_timeseries_parquet(fs, filename, all_cols) for filename in filenames)
def read_and_concat_enduse_timeseries_parquet(fs, all_cols, output_dir, filenames, group_id):
dfs = [read_enduse_timeseries_parquet(fs, filename, all_cols) for filename in filenames]
grouped_df = pd.concat(dfs)
grouped_df.set_index('building_id', inplace=True)
grouped_df.to_parquet(output_dir + f'group{group_id}.parquet')
del grouped_df


def combine_results(fs, results_dir, cfg, do_timeseries=True):
Expand Down Expand Up @@ -257,6 +260,7 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True):
fs.makedirs(dr)

# Results "CSV"
logger.info("Creating results_df.")
results_job_json_glob = f'{sim_output_dir}/results_job*.json.gz'
results_jsons = fs.glob(results_job_json_glob)
results_json_job_ids = [int(re.search(r'results_job(\d+)\.json\.gz', x).group(1)) for x in results_jsons]
Expand All @@ -277,6 +281,7 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True):
if do_timeseries:

# Look at all the parquet files to see what columns are in all of them.
logger.info("Collecting all the columns in timeseries parquet files.")
ts_filenames = fs.glob(f'{ts_in_dir}/up*/bldg*.parquet')
all_ts_cols = db.from_sequence(ts_filenames, partition_size=100).map(partial(get_cols, fs)).\
fold(lambda x, y: set(x).union(y)).compute()
Expand Down Expand Up @@ -325,37 +330,45 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True):
# Get the names of the timseries file for each simulation in this upgrade
ts_filenames = fs.glob(f'{ts_in_dir}/up{upgrade_id:02d}/bldg*.parquet')

if not ts_filenames:
logger.info(f"There are no timeseries files for upgrade{upgrade_id}.")
continue

# Calculate the mean and estimate the total memory usage
read_ts_parquet = partial(read_enduse_timeseries_parquet, fs, all_cols=all_ts_cols_sorted)
get_ts_mem_usage_d = dask.delayed(lambda x: read_ts_parquet(x).memory_usage(deep=True).sum())
sample_size = min(len(ts_filenames), 36 * 3)
mean_mem = np.mean(dask.compute(map(get_ts_mem_usage_d, random.sample(ts_filenames, sample_size)))[0])
total_mem = mean_mem * len(ts_filenames)
total_mem = mean_mem * len(ts_filenames) / 1e6 # total_mem in MB

# Determine how many files should be in each partition and group the files
npartitions = math.ceil(total_mem / MAX_PARQUET_MEMORY) # 1 GB per partition
parquet_memory = int(cfg['eagle'].get('postprocessing', {}).get('parquet_memory_mb', MAX_PARQUET_MEMORY))
logger.info(f"Max parquet memory: {parquet_memory} MB")
npartitions = math.ceil(total_mem / parquet_memory)
npartitions = min(len(ts_filenames), npartitions) # cannot have less than one file per partition
ts_files_in_each_partition = np.array_split(ts_filenames, npartitions)

# Read the timeseries into a dask dataframe
read_and_concat_ts_pq_d = dask.delayed(
partial(read_and_concat_enduse_timeseries_parquet, fs, all_cols=all_ts_cols_sorted)
)
ts_df = dd.from_delayed(map(read_and_concat_ts_pq_d, ts_files_in_each_partition))
ts_df = ts_df.set_index('building_id', sorted=True)

# Write out new dask timeseries dataframe.
logger.info(f"Combining about {len(ts_files_in_each_partition[0])} parquets together."
f" Creating {npartitions} groups.")
if isinstance(fs, LocalFileSystem):
ts_out_loc = f"{ts_dir}/upgrade={upgrade_id}"
ts_out_loc = f"{ts_dir}/upgrade={upgrade_id}/"
else:
assert isinstance(fs, S3FileSystem)
ts_out_loc = f"s3://{ts_dir}/upgrade={upgrade_id}"
logger.info(f'Writing {ts_out_loc}')
ts_df.to_parquet(
ts_out_loc,
engine='pyarrow',
flavor='spark'
ts_out_loc = f"s3://{ts_dir}/upgrade={upgrade_id}/"

fs.makedirs(ts_out_loc)
logger.info(f'Created directory {ts_out_loc} for writing.')

# Read the timeseries into a dask dataframe
read_and_concat_ts_pq_d = dask.delayed(
# fs, all_cols, output_dir, filenames, group_id
partial(read_and_concat_enduse_timeseries_parquet, fs, all_ts_cols_sorted, ts_out_loc)
)
group_ids = list(range(npartitions))
with performance_report(filename=f'dask_combine_report{upgrade_id}.html'):
dask.compute(map(read_and_concat_ts_pq_d, ts_files_in_each_partition, group_ids))

logger.info(f"Finished combining and saving timeseries for upgrade{upgrade_id}.")


def remove_intermediate_files(fs, results_dir):
Expand Down
3 changes: 3 additions & 0 deletions buildstockbatch/schemas/v0.2.yaml
Expand Up @@ -50,6 +50,9 @@ hpc-spec:
hpc-postprocessing-spec:
time: int(required=True)
n_workers: int(required=False)
node_memory_mb: enum(85248, 180224, 751616, required=False)
parquet_memory_mb: int(required=False)
keep_intermediate_files: bool(required=False)

sampling-spec:
time: int(required=True)
Expand Down
3 changes: 3 additions & 0 deletions buildstockbatch/schemas/v0.3.yaml
Expand Up @@ -46,6 +46,9 @@ hpc-spec:
hpc-postprocessing-spec:
time: int(required=True)
n_workers: int(required=False)
node_memory_mb: enum(85248, 180224, 751616, required=False)
parquet_memory_mb: int(required=False)
keep_intermediate_files: bool(required=False)

sampler-spec:
type: str(required=True)
Expand Down
11 changes: 7 additions & 4 deletions buildstockbatch/test/test_base.py
Expand Up @@ -15,6 +15,7 @@
from unittest.mock import patch, MagicMock, PropertyMock
import yaml

import buildstockbatch
from buildstockbatch.base import BuildStockBatchBase
from buildstockbatch.exc import ValidationError
from buildstockbatch.postprocessing import write_dataframe_as_parquet
Expand All @@ -25,6 +26,8 @@

OUTPUT_FOLDER_NAME = 'output'

buildstockbatch.postprocessing.performance_report = MagicMock()


def test_reference_scenario(basic_residential_project_file):
# verify that the reference_scenario get's added to the upgrade file
Expand Down Expand Up @@ -316,13 +319,13 @@ def test_upload_files(mocked_s3, basic_residential_project_file):
assert (source_file_path, s3_file_path) in files_uploaded
files_uploaded.remove((source_file_path, s3_file_path))

s3_file_path = s3_path + 'timeseries/upgrade=0/part.0.parquet'
source_file_path = os.path.join(source_path, 'timeseries', 'upgrade=0', 'part.0.parquet')
s3_file_path = s3_path + 'timeseries/upgrade=0/group0.parquet'
source_file_path = os.path.join(source_path, 'timeseries', 'upgrade=0', 'group0.parquet')
assert (source_file_path, s3_file_path) in files_uploaded
files_uploaded.remove((source_file_path, s3_file_path))

s3_file_path = s3_path + 'timeseries/upgrade=1/part.0.parquet'
source_file_path = os.path.join(source_path, 'timeseries', 'upgrade=1', 'part.0.parquet')
s3_file_path = s3_path + 'timeseries/upgrade=1/group0.parquet'
source_file_path = os.path.join(source_path, 'timeseries', 'upgrade=1', 'group0.parquet')
assert (source_file_path, s3_file_path) in files_uploaded
files_uploaded.remove((source_file_path, s3_file_path))

Expand Down
4 changes: 3 additions & 1 deletion buildstockbatch/test/test_postprocessing.py
Expand Up @@ -7,11 +7,13 @@
import tarfile
import pytest
import shutil
from unittest.mock import patch, MagicMock

from buildstockbatch import postprocessing
from buildstockbatch.base import BuildStockBatchBase
from buildstockbatch.utils import get_project_configuration
from unittest.mock import patch

postprocessing.performance_report = MagicMock()


def test_report_additional_results_csv_columns(basic_residential_project_file):
Expand Down
15 changes: 15 additions & 0 deletions docs/changelog/changelog_0_19_1.rst
@@ -0,0 +1,15 @@
================
0.19.1 Changelog
================

.. changelog::
:version: 0.19.1
:released: March 22, 2021

.. change::
:tags: postprocessing
:pullreq: 212
:tickets:

Use a map of dask delayed function to combine parquets instead of a giant dask df to avoid memory issues.
Default to 85GB memory nodes in eagle with single process and single thread in each node to avoid memory issues.
8 changes: 8 additions & 0 deletions docs/changelog/changelog_dev.rst
Expand Up @@ -43,3 +43,11 @@ Development Changelog
:tickets: 196

Fixing issue where the postprocessing fails when a building simulation crashes in buildstockbatch.

.. change::
:tags: postprocessing
:pullreq: 212
:tickets:

Use a map of dask delayed function to combine parquets instead of a giant dask df to avoid memory issues.
Default to 85GB memory nodes in eagle with single process and single thread in each node to avoid memory issues.
1 change: 1 addition & 0 deletions docs/changelog/index.rst
Expand Up @@ -19,6 +19,7 @@ Change logs
.. toctree::
:titlesonly:

changelog_0_19_1
changelog_0_19
changelog_0_18

Expand Down
8 changes: 7 additions & 1 deletion docs/project_defn.rst
Expand Up @@ -144,7 +144,13 @@ the Eagle supercomputer.
* ``postprocessing``: Eagle configuration for the postprocessing step

* ``time``: Maximum time in minutes to allocate postprocessing job
* ``n_workers``: Number of eagle workers to parallelize the postprocessing job into
* ``n_workers``: Number of eagle workers to parallelize the postprocessing job into. Max supported is 32.
* ``node_memory_mb``: The memory (in MB) to request for eagle node for postprocessing. The valid values are
85248, 180224 and 751616. Default is 85248.
* ``parquet_memory_mb``: The size (in MB) of the combined parquet file in memory. Default is 40000.
* ``keep_intermediate_files``: Set this to true if you want to keep postprocessing intermediate files (for debugging
or other explorative purpose). The intermediate files contain results_job*.json.gz
files and individual building's timeseries parquet files. Default is false.

.. _aws-config:

Expand Down