Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

memory_issue_fix #212

Merged
merged 10 commits into from Mar 22, 2021
Merged
4 changes: 1 addition & 3 deletions .circleci/config.yml
Expand Up @@ -2,14 +2,13 @@ version: 2
jobs:
build:
docker:
- image: continuumio/miniconda3
- image: cimg/python:3.8
steps:
- setup_remote_docker
- checkout
- run:
name: Install buildstock
command: |
conda install -c conda-forge -y -q scipy numpy "pandas>=1.0.0,!=1.0.4" "pyarrow>=0.14.1" dask joblib pyyaml
pip install .[dev] --progress-bar off
- run:
name: Run PyTest
Expand All @@ -35,7 +34,6 @@ jobs:
name: Build documentation
when: always
command: |
apt-get install make
cd docs
make html
mkdir /tmp/docs
Expand Down
6 changes: 5 additions & 1 deletion buildstockbatch/base.py
Expand Up @@ -784,4 +784,8 @@ def process_results(self, skip_combine=False, force_upload=False):
if 'athena' in aws_conf:
postprocessing.create_athena_tables(aws_conf, os.path.basename(self.output_dir), s3_bucket, s3_prefix)

postprocessing.remove_intermediate_files(fs, self.results_dir)
if not self.cfg['eagle'].get('postprocessing', {}).get('keep_intermediate_files', False):
logger.info("Removing intermediate files.")
postprocessing.remove_intermediate_files(fs, self.results_dir)
else:
logger.info("Skipped removing intermediate files.")
9 changes: 5 additions & 4 deletions buildstockbatch/eagle.py
Expand Up @@ -494,7 +494,8 @@ def queue_post_processing(self, after_jobids=[], upload_only=False, hipri=False)
# Configuration values
account = self.cfg['eagle']['account']
walltime = self.cfg['eagle'].get('postprocessing', {}).get('time', '1:30:00')

memory = self.cfg['eagle'].get('postprocessing', {}).get('node_memory_mb', 85248)
print(f"Submitting job to {memory}MB memory nodes.")
# Throw an error if the files already exist.
if not upload_only:
for subdir in ('parquet', 'results_csvs'):
Expand All @@ -518,19 +519,20 @@ def queue_post_processing(self, after_jobids=[], upload_only=False, hipri=False)
env['MY_CONDA_ENV'] = os.environ['CONDA_PREFIX']
env['OUT_DIR'] = self.output_dir
env['UPLOADONLY'] = str(upload_only)
env['MEMORY'] = str(memory)
here = os.path.dirname(os.path.abspath(__file__))
eagle_post_sh = os.path.join(here, 'eagle_postprocessing.sh')

args = [
'sbatch',
'--account={}'.format(account),
'--time={}'.format(walltime),
'--export=PROJECTFILE,MY_CONDA_ENV,OUT_DIR,UPLOADONLY',
'--export=PROJECTFILE,MY_CONDA_ENV,OUT_DIR,UPLOADONLY,MEMORY',
'--job-name=bstkpost',
'--output=postprocessing.out',
'--nodes=1',
':',
'--mem=180000',
'--mem={}'.format(memory),
'--output=dask_workers.out',
'--nodes={}'.format(self.cfg['eagle'].get('postprocessing', {}).get('n_workers', 2)),
eagle_post_sh
Expand Down Expand Up @@ -599,7 +601,6 @@ def user_cli(argv=sys.argv[1:]):
'''
This is the user entry point for running buildstockbatch on Eagle
'''

# set up logging, currently based on within-this-file hard-coded config
logging.config.dictConfig(logging_config)

Expand Down
4 changes: 2 additions & 2 deletions buildstockbatch/eagle_postprocessing.sh
@@ -1,5 +1,4 @@
#!/bin/bash

echo "begin eagle_postprocessing.sh"

echo "Job ID: $SLURM_JOB_ID"
Expand All @@ -11,6 +10,7 @@ source activate "$MY_CONDA_ENV"
export POSTPROCESS=1

echo "UPLOADONLY: ${UPLOADONLY}"
echo "MEMORY: ${MEMORY}"

SCHEDULER_FILE=$OUT_DIR/dask_scheduler.json

Expand All @@ -22,6 +22,6 @@ echo $SLURM_JOB_NODELIST_PACK_GROUP_1
pdsh -w $SLURM_JOB_NODELIST_PACK_GROUP_1 "free -h"

$MY_CONDA_ENV/bin/dask-scheduler --scheduler-file $SCHEDULER_FILE &> $OUT_DIR/dask_scheduler.out &
pdsh -w $SLURM_JOB_NODELIST_PACK_GROUP_1 "$MY_CONDA_ENV/bin/dask-worker --scheduler-file $SCHEDULER_FILE --local-directory /tmp/scratch/dask --nprocs 9" &> $OUT_DIR/dask_workers.out &
pdsh -w $SLURM_JOB_NODELIST_PACK_GROUP_1 "$MY_CONDA_ENV/bin/dask-worker --scheduler-file $SCHEDULER_FILE --local-directory /tmp/scratch/dask --nprocs 1 --nthreads 1 --memory-limit ${MEMORY}MB" &> $OUT_DIR/dask_workers.out &

time python -u -m buildstockbatch.eagle "$PROJECTFILE"
57 changes: 35 additions & 22 deletions buildstockbatch/postprocessing.py
Expand Up @@ -9,10 +9,9 @@
:copyright: (c) 2018 by The Alliance for Sustainable Energy
:license: BSD-3
"""

import boto3
import dask.bag as db
import dask.dataframe as dd
from dask.distributed import performance_report
import dask
import datetime as dt
from fsspec.implementations.local import LocalFileSystem
Expand All @@ -34,7 +33,7 @@

logger = logging.getLogger(__name__)

MAX_PARQUET_MEMORY = 1e9 # maximum size of the parquet file in memory when combining multiple parquets
MAX_PARQUET_MEMORY = 4000 # maximum size (MB) of the parquet file in memory when combining multiple parquets


def read_data_point_out_json(fs, reporting_measures, filename):
Expand Down Expand Up @@ -227,8 +226,12 @@ def read_enduse_timeseries_parquet(fs, filename, all_cols):
return df[all_cols]


def read_and_concat_enduse_timeseries_parquet(fs, filenames, all_cols):
return pd.concat(read_enduse_timeseries_parquet(fs, filename, all_cols) for filename in filenames)
def read_and_concat_enduse_timeseries_parquet(fs, all_cols, output_dir, filenames, group_id):
dfs = [read_enduse_timeseries_parquet(fs, filename, all_cols) for filename in filenames]
grouped_df = pd.concat(dfs)
grouped_df.set_index('building_id', inplace=True)
grouped_df.to_parquet(output_dir + f'group{group_id}.parquet')
del grouped_df


def combine_results(fs, results_dir, cfg, do_timeseries=True):
Expand Down Expand Up @@ -257,6 +260,7 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True):
fs.makedirs(dr)

# Results "CSV"
logger.info("Creating results_df.")
results_job_json_glob = f'{sim_output_dir}/results_job*.json.gz'
results_jsons = fs.glob(results_job_json_glob)
results_json_job_ids = [int(re.search(r'results_job(\d+)\.json\.gz', x).group(1)) for x in results_jsons]
Expand All @@ -277,6 +281,7 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True):
if do_timeseries:

# Look at all the parquet files to see what columns are in all of them.
logger.info("Collecting all the columns in timeseries parquet files.")
ts_filenames = fs.glob(f'{ts_in_dir}/up*/bldg*.parquet')
all_ts_cols = db.from_sequence(ts_filenames, partition_size=100).map(partial(get_cols, fs)).\
fold(lambda x, y: set(x).union(y)).compute()
Expand Down Expand Up @@ -325,37 +330,45 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True):
# Get the names of the timseries file for each simulation in this upgrade
ts_filenames = fs.glob(f'{ts_in_dir}/up{upgrade_id:02d}/bldg*.parquet')

if not ts_filenames:
logger.info(f"There are no timeseries files for upgrade{upgrade_id}.")
continue

# Calculate the mean and estimate the total memory usage
read_ts_parquet = partial(read_enduse_timeseries_parquet, fs, all_cols=all_ts_cols_sorted)
get_ts_mem_usage_d = dask.delayed(lambda x: read_ts_parquet(x).memory_usage(deep=True).sum())
sample_size = min(len(ts_filenames), 36 * 3)
mean_mem = np.mean(dask.compute(map(get_ts_mem_usage_d, random.sample(ts_filenames, sample_size)))[0])
total_mem = mean_mem * len(ts_filenames)
total_mem = mean_mem * len(ts_filenames) / 1e6 # total_mem in MB

# Determine how many files should be in each partition and group the files
npartitions = math.ceil(total_mem / MAX_PARQUET_MEMORY) # 1 GB per partition
parquet_memory = int(cfg['eagle'].get('postprocessing', {}).get('parquet_memory_mb', MAX_PARQUET_MEMORY))
logger.info(f"Max parquet memory: {parquet_memory} MB")
npartitions = math.ceil(total_mem / parquet_memory)
npartitions = min(len(ts_filenames), npartitions) # cannot have less than one file per partition
ts_files_in_each_partition = np.array_split(ts_filenames, npartitions)

# Read the timeseries into a dask dataframe
read_and_concat_ts_pq_d = dask.delayed(
partial(read_and_concat_enduse_timeseries_parquet, fs, all_cols=all_ts_cols_sorted)
)
ts_df = dd.from_delayed(map(read_and_concat_ts_pq_d, ts_files_in_each_partition))
ts_df = ts_df.set_index('building_id', sorted=True)

# Write out new dask timeseries dataframe.
logger.info(f"Combining about {len(ts_files_in_each_partition[0])} parquets together."
f" Creating {npartitions} groups.")
if isinstance(fs, LocalFileSystem):
ts_out_loc = f"{ts_dir}/upgrade={upgrade_id}"
ts_out_loc = f"{ts_dir}/upgrade={upgrade_id}/"
else:
assert isinstance(fs, S3FileSystem)
ts_out_loc = f"s3://{ts_dir}/upgrade={upgrade_id}"
logger.info(f'Writing {ts_out_loc}')
ts_df.to_parquet(
ts_out_loc,
engine='pyarrow',
flavor='spark'
ts_out_loc = f"s3://{ts_dir}/upgrade={upgrade_id}/"

fs.makedirs(ts_out_loc)
logger.info(f'Created directory {ts_out_loc} for writing.')

# Read the timeseries into a dask dataframe
read_and_concat_ts_pq_d = dask.delayed(
# fs, all_cols, output_dir, filenames, group_id
partial(read_and_concat_enduse_timeseries_parquet, fs, all_ts_cols_sorted, ts_out_loc)
)
group_ids = list(range(npartitions))
with performance_report(filename=f'dask_combine_report{upgrade_id}.html'):
dask.compute(map(read_and_concat_ts_pq_d, ts_files_in_each_partition, group_ids))

logger.info(f"Finished combining and saving timeseries for upgrade{upgrade_id}.")


def remove_intermediate_files(fs, results_dir):
Expand Down
3 changes: 3 additions & 0 deletions buildstockbatch/schemas/v0.2.yaml
Expand Up @@ -50,6 +50,9 @@ hpc-spec:
hpc-postprocessing-spec:
time: int(required=True)
n_workers: int(required=False)
node_memory_mb: enum(85248, 180224, 751616, required=False)
parquet_memory_mb: int(required=False)
keep_intermediate_files: bool(required=False)

sampling-spec:
time: int(required=True)
Expand Down
12 changes: 7 additions & 5 deletions buildstockbatch/test/test_base.py
Expand Up @@ -12,7 +12,7 @@
import shutil
import tempfile
from unittest.mock import patch, MagicMock

import buildstockbatch
from buildstockbatch.base import BuildStockBatchBase
from buildstockbatch.postprocessing import write_dataframe_as_parquet

Expand All @@ -21,6 +21,8 @@

OUTPUT_FOLDER_NAME = 'output'

buildstockbatch.postprocessing.performance_report = MagicMock()


def test_reference_scenario(basic_residential_project_file):
# verify that the reference_scenario get's added to the upgrade file
Expand Down Expand Up @@ -304,13 +306,13 @@ def test_upload_files(mocked_s3, basic_residential_project_file):
assert (source_file_path, s3_file_path) in files_uploaded
files_uploaded.remove((source_file_path, s3_file_path))

s3_file_path = s3_path + 'timeseries/upgrade=0/part.0.parquet'
source_file_path = os.path.join(source_path, 'timeseries', 'upgrade=0', 'part.0.parquet')
s3_file_path = s3_path + 'timeseries/upgrade=0/group0.parquet'
source_file_path = os.path.join(source_path, 'timeseries', 'upgrade=0', 'group0.parquet')
assert (source_file_path, s3_file_path) in files_uploaded
files_uploaded.remove((source_file_path, s3_file_path))

s3_file_path = s3_path + 'timeseries/upgrade=1/part.0.parquet'
source_file_path = os.path.join(source_path, 'timeseries', 'upgrade=1', 'part.0.parquet')
s3_file_path = s3_path + 'timeseries/upgrade=1/group0.parquet'
source_file_path = os.path.join(source_path, 'timeseries', 'upgrade=1', 'group0.parquet')
assert (source_file_path, s3_file_path) in files_uploaded
files_uploaded.remove((source_file_path, s3_file_path))

Expand Down
5 changes: 3 additions & 2 deletions buildstockbatch/test/test_postprocessing.py
Expand Up @@ -7,10 +7,11 @@
import tarfile
import pytest
import shutil

from buildstockbatch import postprocessing
from buildstockbatch.base import BuildStockBatchBase
from unittest.mock import patch
from unittest.mock import patch, MagicMock

postprocessing.performance_report = MagicMock()


def test_report_additional_results_csv_columns(basic_residential_project_file):
Expand Down
8 changes: 8 additions & 0 deletions docs/changelog/changelog_dev.rst
Expand Up @@ -14,3 +14,11 @@ Development Changelog
This is an example change. Please copy and paste it - for valid tags please refer to ``conf.py`` in the docs
directory. ``pullreq`` should be set to the appropriate pull request number and ``tickets`` to any related
github issues. These will be automatically linked in the documentation.

.. change::
:tags: postprocessing
:pullreq: 212
:tickets:

Use a map of dask delayed function to combine parquets instead of a giant dask df to avoid memory issues.
Default to 85GB memory nodes in eagle with single process and single thread in each node to avoid memory issues.
8 changes: 7 additions & 1 deletion docs/project_defn.rst
Expand Up @@ -312,7 +312,13 @@ the Eagle supercomputer.
* ``postprocessing``: Eagle configuration for the postprocessing step

* ``time``: Maximum time in minutes to allocate postprocessing job
* ``n_workers``: Number of eagle workers to parallelize the postprocessing job into
* ``n_workers``: Number of eagle workers to parallelize the postprocessing job into. Max supported is 32.
* ``node_memory_mb``: The memory (in MB) to request for eagle node for postprocessing. The valid values are
85248, 180224 and 751616. Default is 85248.
* ``parquet_memory_mb``: The size (in MB) of the combined parquet file in memory. Default is 40000.
* ``keep_intermediate_files``: Set this to true if you want to keep postprocessing intermediate files (for debugging
or other explorative purpose). The intermediate files contain results_job*.json.gz
files and individual building's timeseries parquet files. Default is false.

.. _aws-config:

Expand Down