Skip to content

Commit

Permalink
Merge pull request #212 from NREL/memory_fix
Browse files Browse the repository at this point in the history
memory_issue_fix
  • Loading branch information
nmerket committed Mar 22, 2021
2 parents 311fe8b + ec365a6 commit 840630d
Show file tree
Hide file tree
Showing 10 changed files with 76 additions and 40 deletions.
4 changes: 1 addition & 3 deletions .circleci/config.yml
Expand Up @@ -2,14 +2,13 @@ version: 2
jobs:
build:
docker:
- image: continuumio/miniconda3
- image: cimg/python:3.8
steps:
- setup_remote_docker
- checkout
- run:
name: Install buildstock
command: |
conda install -c conda-forge -y -q scipy numpy "pandas>=1.0.0,!=1.0.4" "pyarrow>=0.14.1" dask joblib pyyaml
pip install .[dev] --progress-bar off
- run:
name: Run PyTest
Expand All @@ -35,7 +34,6 @@ jobs:
name: Build documentation
when: always
command: |
apt-get install make
cd docs
make html
mkdir /tmp/docs
Expand Down
6 changes: 5 additions & 1 deletion buildstockbatch/base.py
Expand Up @@ -784,4 +784,8 @@ def process_results(self, skip_combine=False, force_upload=False):
if 'athena' in aws_conf:
postprocessing.create_athena_tables(aws_conf, os.path.basename(self.output_dir), s3_bucket, s3_prefix)

postprocessing.remove_intermediate_files(fs, self.results_dir)
if not self.cfg['eagle'].get('postprocessing', {}).get('keep_intermediate_files', False):
logger.info("Removing intermediate files.")
postprocessing.remove_intermediate_files(fs, self.results_dir)
else:
logger.info("Skipped removing intermediate files.")
9 changes: 5 additions & 4 deletions buildstockbatch/eagle.py
Expand Up @@ -494,7 +494,8 @@ def queue_post_processing(self, after_jobids=[], upload_only=False, hipri=False)
# Configuration values
account = self.cfg['eagle']['account']
walltime = self.cfg['eagle'].get('postprocessing', {}).get('time', '1:30:00')

memory = self.cfg['eagle'].get('postprocessing', {}).get('node_memory_mb', 85248)
print(f"Submitting job to {memory}MB memory nodes.")
# Throw an error if the files already exist.
if not upload_only:
for subdir in ('parquet', 'results_csvs'):
Expand All @@ -518,19 +519,20 @@ def queue_post_processing(self, after_jobids=[], upload_only=False, hipri=False)
env['MY_CONDA_ENV'] = os.environ['CONDA_PREFIX']
env['OUT_DIR'] = self.output_dir
env['UPLOADONLY'] = str(upload_only)
env['MEMORY'] = str(memory)
here = os.path.dirname(os.path.abspath(__file__))
eagle_post_sh = os.path.join(here, 'eagle_postprocessing.sh')

args = [
'sbatch',
'--account={}'.format(account),
'--time={}'.format(walltime),
'--export=PROJECTFILE,MY_CONDA_ENV,OUT_DIR,UPLOADONLY',
'--export=PROJECTFILE,MY_CONDA_ENV,OUT_DIR,UPLOADONLY,MEMORY',
'--job-name=bstkpost',
'--output=postprocessing.out',
'--nodes=1',
':',
'--mem=180000',
'--mem={}'.format(memory),
'--output=dask_workers.out',
'--nodes={}'.format(self.cfg['eagle'].get('postprocessing', {}).get('n_workers', 2)),
eagle_post_sh
Expand Down Expand Up @@ -599,7 +601,6 @@ def user_cli(argv=sys.argv[1:]):
'''
This is the user entry point for running buildstockbatch on Eagle
'''

# set up logging, currently based on within-this-file hard-coded config
logging.config.dictConfig(logging_config)

Expand Down
4 changes: 2 additions & 2 deletions buildstockbatch/eagle_postprocessing.sh
@@ -1,5 +1,4 @@
#!/bin/bash

echo "begin eagle_postprocessing.sh"

echo "Job ID: $SLURM_JOB_ID"
Expand All @@ -11,6 +10,7 @@ source activate "$MY_CONDA_ENV"
export POSTPROCESS=1

echo "UPLOADONLY: ${UPLOADONLY}"
echo "MEMORY: ${MEMORY}"

SCHEDULER_FILE=$OUT_DIR/dask_scheduler.json

Expand All @@ -22,6 +22,6 @@ echo $SLURM_JOB_NODELIST_PACK_GROUP_1
pdsh -w $SLURM_JOB_NODELIST_PACK_GROUP_1 "free -h"

$MY_CONDA_ENV/bin/dask-scheduler --scheduler-file $SCHEDULER_FILE &> $OUT_DIR/dask_scheduler.out &
pdsh -w $SLURM_JOB_NODELIST_PACK_GROUP_1 "$MY_CONDA_ENV/bin/dask-worker --scheduler-file $SCHEDULER_FILE --local-directory /tmp/scratch/dask --nprocs 9" &> $OUT_DIR/dask_workers.out &
pdsh -w $SLURM_JOB_NODELIST_PACK_GROUP_1 "$MY_CONDA_ENV/bin/dask-worker --scheduler-file $SCHEDULER_FILE --local-directory /tmp/scratch/dask --nprocs 1 --nthreads 1 --memory-limit ${MEMORY}MB" &> $OUT_DIR/dask_workers.out &

time python -u -m buildstockbatch.eagle "$PROJECTFILE"
57 changes: 35 additions & 22 deletions buildstockbatch/postprocessing.py
Expand Up @@ -9,10 +9,9 @@
:copyright: (c) 2018 by The Alliance for Sustainable Energy
:license: BSD-3
"""

import boto3
import dask.bag as db
import dask.dataframe as dd
from dask.distributed import performance_report
import dask
import datetime as dt
from fsspec.implementations.local import LocalFileSystem
Expand All @@ -34,7 +33,7 @@

logger = logging.getLogger(__name__)

MAX_PARQUET_MEMORY = 1e9 # maximum size of the parquet file in memory when combining multiple parquets
MAX_PARQUET_MEMORY = 4000 # maximum size (MB) of the parquet file in memory when combining multiple parquets


def read_data_point_out_json(fs, reporting_measures, filename):
Expand Down Expand Up @@ -227,8 +226,12 @@ def read_enduse_timeseries_parquet(fs, filename, all_cols):
return df[all_cols]


def read_and_concat_enduse_timeseries_parquet(fs, filenames, all_cols):
return pd.concat(read_enduse_timeseries_parquet(fs, filename, all_cols) for filename in filenames)
def read_and_concat_enduse_timeseries_parquet(fs, all_cols, output_dir, filenames, group_id):
dfs = [read_enduse_timeseries_parquet(fs, filename, all_cols) for filename in filenames]
grouped_df = pd.concat(dfs)
grouped_df.set_index('building_id', inplace=True)
grouped_df.to_parquet(output_dir + f'group{group_id}.parquet')
del grouped_df


def combine_results(fs, results_dir, cfg, do_timeseries=True):
Expand Down Expand Up @@ -257,6 +260,7 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True):
fs.makedirs(dr)

# Results "CSV"
logger.info("Creating results_df.")
results_job_json_glob = f'{sim_output_dir}/results_job*.json.gz'
results_jsons = fs.glob(results_job_json_glob)
results_json_job_ids = [int(re.search(r'results_job(\d+)\.json\.gz', x).group(1)) for x in results_jsons]
Expand All @@ -277,6 +281,7 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True):
if do_timeseries:

# Look at all the parquet files to see what columns are in all of them.
logger.info("Collecting all the columns in timeseries parquet files.")
ts_filenames = fs.glob(f'{ts_in_dir}/up*/bldg*.parquet')
all_ts_cols = db.from_sequence(ts_filenames, partition_size=100).map(partial(get_cols, fs)).\
fold(lambda x, y: set(x).union(y)).compute()
Expand Down Expand Up @@ -325,37 +330,45 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True):
# Get the names of the timseries file for each simulation in this upgrade
ts_filenames = fs.glob(f'{ts_in_dir}/up{upgrade_id:02d}/bldg*.parquet')

if not ts_filenames:
logger.info(f"There are no timeseries files for upgrade{upgrade_id}.")
continue

# Calculate the mean and estimate the total memory usage
read_ts_parquet = partial(read_enduse_timeseries_parquet, fs, all_cols=all_ts_cols_sorted)
get_ts_mem_usage_d = dask.delayed(lambda x: read_ts_parquet(x).memory_usage(deep=True).sum())
sample_size = min(len(ts_filenames), 36 * 3)
mean_mem = np.mean(dask.compute(map(get_ts_mem_usage_d, random.sample(ts_filenames, sample_size)))[0])
total_mem = mean_mem * len(ts_filenames)
total_mem = mean_mem * len(ts_filenames) / 1e6 # total_mem in MB

# Determine how many files should be in each partition and group the files
npartitions = math.ceil(total_mem / MAX_PARQUET_MEMORY) # 1 GB per partition
parquet_memory = int(cfg['eagle'].get('postprocessing', {}).get('parquet_memory_mb', MAX_PARQUET_MEMORY))
logger.info(f"Max parquet memory: {parquet_memory} MB")
npartitions = math.ceil(total_mem / parquet_memory)
npartitions = min(len(ts_filenames), npartitions) # cannot have less than one file per partition
ts_files_in_each_partition = np.array_split(ts_filenames, npartitions)

# Read the timeseries into a dask dataframe
read_and_concat_ts_pq_d = dask.delayed(
partial(read_and_concat_enduse_timeseries_parquet, fs, all_cols=all_ts_cols_sorted)
)
ts_df = dd.from_delayed(map(read_and_concat_ts_pq_d, ts_files_in_each_partition))
ts_df = ts_df.set_index('building_id', sorted=True)

# Write out new dask timeseries dataframe.
logger.info(f"Combining about {len(ts_files_in_each_partition[0])} parquets together."
f" Creating {npartitions} groups.")
if isinstance(fs, LocalFileSystem):
ts_out_loc = f"{ts_dir}/upgrade={upgrade_id}"
ts_out_loc = f"{ts_dir}/upgrade={upgrade_id}/"
else:
assert isinstance(fs, S3FileSystem)
ts_out_loc = f"s3://{ts_dir}/upgrade={upgrade_id}"
logger.info(f'Writing {ts_out_loc}')
ts_df.to_parquet(
ts_out_loc,
engine='pyarrow',
flavor='spark'
ts_out_loc = f"s3://{ts_dir}/upgrade={upgrade_id}/"

fs.makedirs(ts_out_loc)
logger.info(f'Created directory {ts_out_loc} for writing.')

# Read the timeseries into a dask dataframe
read_and_concat_ts_pq_d = dask.delayed(
# fs, all_cols, output_dir, filenames, group_id
partial(read_and_concat_enduse_timeseries_parquet, fs, all_ts_cols_sorted, ts_out_loc)
)
group_ids = list(range(npartitions))
with performance_report(filename=f'dask_combine_report{upgrade_id}.html'):
dask.compute(map(read_and_concat_ts_pq_d, ts_files_in_each_partition, group_ids))

logger.info(f"Finished combining and saving timeseries for upgrade{upgrade_id}.")


def remove_intermediate_files(fs, results_dir):
Expand Down
3 changes: 3 additions & 0 deletions buildstockbatch/schemas/v0.2.yaml
Expand Up @@ -50,6 +50,9 @@ hpc-spec:
hpc-postprocessing-spec:
time: int(required=True)
n_workers: int(required=False)
node_memory_mb: enum(85248, 180224, 751616, required=False)
parquet_memory_mb: int(required=False)
keep_intermediate_files: bool(required=False)

sampling-spec:
time: int(required=True)
Expand Down
12 changes: 7 additions & 5 deletions buildstockbatch/test/test_base.py
Expand Up @@ -12,7 +12,7 @@
import shutil
import tempfile
from unittest.mock import patch, MagicMock

import buildstockbatch
from buildstockbatch.base import BuildStockBatchBase
from buildstockbatch.postprocessing import write_dataframe_as_parquet

Expand All @@ -21,6 +21,8 @@

OUTPUT_FOLDER_NAME = 'output'

buildstockbatch.postprocessing.performance_report = MagicMock()


def test_reference_scenario(basic_residential_project_file):
# verify that the reference_scenario get's added to the upgrade file
Expand Down Expand Up @@ -304,13 +306,13 @@ def test_upload_files(mocked_s3, basic_residential_project_file):
assert (source_file_path, s3_file_path) in files_uploaded
files_uploaded.remove((source_file_path, s3_file_path))

s3_file_path = s3_path + 'timeseries/upgrade=0/part.0.parquet'
source_file_path = os.path.join(source_path, 'timeseries', 'upgrade=0', 'part.0.parquet')
s3_file_path = s3_path + 'timeseries/upgrade=0/group0.parquet'
source_file_path = os.path.join(source_path, 'timeseries', 'upgrade=0', 'group0.parquet')
assert (source_file_path, s3_file_path) in files_uploaded
files_uploaded.remove((source_file_path, s3_file_path))

s3_file_path = s3_path + 'timeseries/upgrade=1/part.0.parquet'
source_file_path = os.path.join(source_path, 'timeseries', 'upgrade=1', 'part.0.parquet')
s3_file_path = s3_path + 'timeseries/upgrade=1/group0.parquet'
source_file_path = os.path.join(source_path, 'timeseries', 'upgrade=1', 'group0.parquet')
assert (source_file_path, s3_file_path) in files_uploaded
files_uploaded.remove((source_file_path, s3_file_path))

Expand Down
5 changes: 3 additions & 2 deletions buildstockbatch/test/test_postprocessing.py
Expand Up @@ -7,10 +7,11 @@
import tarfile
import pytest
import shutil

from buildstockbatch import postprocessing
from buildstockbatch.base import BuildStockBatchBase
from unittest.mock import patch
from unittest.mock import patch, MagicMock

postprocessing.performance_report = MagicMock()


def test_report_additional_results_csv_columns(basic_residential_project_file):
Expand Down
8 changes: 8 additions & 0 deletions docs/changelog/changelog_dev.rst
Expand Up @@ -14,3 +14,11 @@ Development Changelog
This is an example change. Please copy and paste it - for valid tags please refer to ``conf.py`` in the docs
directory. ``pullreq`` should be set to the appropriate pull request number and ``tickets`` to any related
github issues. These will be automatically linked in the documentation.

.. change::
:tags: postprocessing
:pullreq: 212
:tickets:

Use a map of dask delayed function to combine parquets instead of a giant dask df to avoid memory issues.
Default to 85GB memory nodes in eagle with single process and single thread in each node to avoid memory issues.
8 changes: 7 additions & 1 deletion docs/project_defn.rst
Expand Up @@ -312,7 +312,13 @@ the Eagle supercomputer.
* ``postprocessing``: Eagle configuration for the postprocessing step

* ``time``: Maximum time in minutes to allocate postprocessing job
* ``n_workers``: Number of eagle workers to parallelize the postprocessing job into
* ``n_workers``: Number of eagle workers to parallelize the postprocessing job into. Max supported is 32.
* ``node_memory_mb``: The memory (in MB) to request for eagle node for postprocessing. The valid values are
85248, 180224 and 751616. Default is 85248.
* ``parquet_memory_mb``: The size (in MB) of the combined parquet file in memory. Default is 40000.
* ``keep_intermediate_files``: Set this to true if you want to keep postprocessing intermediate files (for debugging
or other explorative purpose). The intermediate files contain results_job*.json.gz
files and individual building's timeseries parquet files. Default is false.

.. _aws-config:

Expand Down

0 comments on commit 840630d

Please sign in to comment.