From f1b5fa8b8c4017aa979dd72513efdb3e1f4b4505 Mon Sep 17 00:00:00 2001 From: Rajendra Adhikari Date: Fri, 12 Mar 2021 12:19:50 -0700 Subject: [PATCH 01/10] memory_issue_fix --- buildstockbatch/eagle.py | 6 +-- buildstockbatch/eagle_postprocessing.sh | 3 +- buildstockbatch/postprocessing.py | 52 ++++++++++++++++--------- buildstockbatch/schemas/v0.2.yaml | 1 + 4 files changed, 38 insertions(+), 24 deletions(-) diff --git a/buildstockbatch/eagle.py b/buildstockbatch/eagle.py index 320cee14..ea7bd9d6 100644 --- a/buildstockbatch/eagle.py +++ b/buildstockbatch/eagle.py @@ -494,7 +494,8 @@ def queue_post_processing(self, after_jobids=[], upload_only=False, hipri=False) # Configuration values account = self.cfg['eagle']['account'] walltime = self.cfg['eagle'].get('postprocessing', {}).get('time', '1:30:00') - + memory = self.cfg['eagle'].get('postprocessing', {}).get('memory', 85248) + print(f"Submitting job to {memory}MB memory nodes.") # Throw an error if the files already exist. if not upload_only: for subdir in ('parquet', 'results_csvs'): @@ -530,7 +531,7 @@ def queue_post_processing(self, after_jobids=[], upload_only=False, hipri=False) '--output=postprocessing.out', '--nodes=1', ':', - '--mem=180000', + '--mem={}'.format(memory), '--output=dask_workers.out', '--nodes={}'.format(self.cfg['eagle'].get('postprocessing', {}).get('n_workers', 2)), eagle_post_sh @@ -599,7 +600,6 @@ def user_cli(argv=sys.argv[1:]): ''' This is the user entry point for running buildstockbatch on Eagle ''' - # set up logging, currently based on within-this-file hard-coded config logging.config.dictConfig(logging_config) diff --git a/buildstockbatch/eagle_postprocessing.sh b/buildstockbatch/eagle_postprocessing.sh index 1d8f2576..6c78b53b 100644 --- a/buildstockbatch/eagle_postprocessing.sh +++ b/buildstockbatch/eagle_postprocessing.sh @@ -1,5 +1,4 @@ #!/bin/bash - echo "begin eagle_postprocessing.sh" echo "Job ID: $SLURM_JOB_ID" @@ -22,6 +21,6 @@ echo $SLURM_JOB_NODELIST_PACK_GROUP_1 pdsh -w $SLURM_JOB_NODELIST_PACK_GROUP_1 "free -h" $MY_CONDA_ENV/bin/dask-scheduler --scheduler-file $SCHEDULER_FILE &> $OUT_DIR/dask_scheduler.out & -pdsh -w $SLURM_JOB_NODELIST_PACK_GROUP_1 "$MY_CONDA_ENV/bin/dask-worker --scheduler-file $SCHEDULER_FILE --local-directory /tmp/scratch/dask --nprocs 9" &> $OUT_DIR/dask_workers.out & +pdsh -w $SLURM_JOB_NODELIST_PACK_GROUP_1 "$MY_CONDA_ENV/bin/dask-worker --scheduler-file $SCHEDULER_FILE --local-directory /tmp/scratch/dask --nprocs 1 --nthreads 1 --memory-limit 84GB" &> $OUT_DIR/dask_workers.out & time python -u -m buildstockbatch.eagle "$PROJECTFILE" diff --git a/buildstockbatch/postprocessing.py b/buildstockbatch/postprocessing.py index 95f0c503..b1bef917 100644 --- a/buildstockbatch/postprocessing.py +++ b/buildstockbatch/postprocessing.py @@ -9,10 +9,11 @@ :copyright: (c) 2018 by The Alliance for Sustainable Energy :license: BSD-3 """ - +import traceback import boto3 import dask.bag as db import dask.dataframe as dd +from dask.distributed import performance_report import dask import datetime as dt from fsspec.implementations.local import LocalFileSystem @@ -34,7 +35,7 @@ logger = logging.getLogger(__name__) -MAX_PARQUET_MEMORY = 1e9 # maximum size of the parquet file in memory when combining multiple parquets +MAX_PARQUET_MEMORY = 4e9 # maximum size of the parquet file in memory when combining multiple parquets def read_data_point_out_json(fs, reporting_measures, filename): @@ -227,8 +228,16 @@ def read_enduse_timeseries_parquet(fs, filename, all_cols): return df[all_cols] -def read_and_concat_enduse_timeseries_parquet(fs, filenames, all_cols): - return pd.concat(read_enduse_timeseries_parquet(fs, filename, all_cols) for filename in filenames) +def read_and_concat_enduse_timeseries_parquet(fs, all_cols, output_dir, filenames, group_id): + try: + dfs = [read_enduse_timeseries_parquet(fs, filename, all_cols) for filename in filenames] + grouped_df = pd.concat(dfs) + grouped_df.set_index('building_id', inplace=True) + grouped_df.to_parquet(output_dir + f'group{group_id}.parquet') + return ("success", group_id, len(grouped_df)) + del grouped_df + except Exception as exp: + return ("fail", group_id, f"Exp: {traceback.format_exc()}") def combine_results(fs, results_dir, cfg, do_timeseries=True): @@ -336,26 +345,31 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True): npartitions = math.ceil(total_mem / MAX_PARQUET_MEMORY) # 1 GB per partition npartitions = min(len(ts_filenames), npartitions) # cannot have less than one file per partition ts_files_in_each_partition = np.array_split(ts_filenames, npartitions) + N = len(ts_files_in_each_partition[0]) + group_ids = list(range(npartitions)) - # Read the timeseries into a dask dataframe - read_and_concat_ts_pq_d = dask.delayed( - partial(read_and_concat_enduse_timeseries_parquet, fs, all_cols=all_ts_cols_sorted) - ) - ts_df = dd.from_delayed(map(read_and_concat_ts_pq_d, ts_files_in_each_partition)) - ts_df = ts_df.set_index('building_id', sorted=True) - - # Write out new dask timeseries dataframe. + logger.info(f"Combining about {N} parquets together. Creating {npartitions} groups.") if isinstance(fs, LocalFileSystem): - ts_out_loc = f"{ts_dir}/upgrade={upgrade_id}" + ts_out_loc = f"{ts_dir}/upgrade={upgrade_id}/" else: assert isinstance(fs, S3FileSystem) - ts_out_loc = f"s3://{ts_dir}/upgrade={upgrade_id}" - logger.info(f'Writing {ts_out_loc}') - ts_df.to_parquet( - ts_out_loc, - engine='pyarrow', - flavor='spark' + ts_out_loc = f"s3://{ts_dir}/upgrade={upgrade_id}/" + + fs.makedirs(ts_out_loc) + logger.info(f'Created directory {ts_out_loc} for writing.') + # Read the timeseries into a dask dataframe + read_and_concat_ts_pq_d = dask.delayed( + # fs, all_cols, output_dir, filenames, group_id + partial(read_and_concat_enduse_timeseries_parquet, fs, all_ts_cols_sorted, ts_out_loc) ) + group_ids = list(range(npartitions)) + with performance_report(filename='dask_combine_report.html'): + results = dask.compute(map(read_and_concat_ts_pq_d, ts_files_in_each_partition, group_ids))[0] + + logger.info(f"Finished combining and saving. First 10 results: {results[:10]}") + failed_results = [r for r in results if r[0] == 'fail'] + if failed_results: + logger.info(f"{len(failed_results)} groupings have failed. First 10 failed results: {results[:10]}") def remove_intermediate_files(fs, results_dir): diff --git a/buildstockbatch/schemas/v0.2.yaml b/buildstockbatch/schemas/v0.2.yaml index a3745d69..03c5c3c4 100644 --- a/buildstockbatch/schemas/v0.2.yaml +++ b/buildstockbatch/schemas/v0.2.yaml @@ -50,6 +50,7 @@ hpc-spec: hpc-postprocessing-spec: time: int(required=True) n_workers: int(required=False) + memory: enum(85248, 180224, 751616, required=False) sampling-spec: time: int(required=True) From 634773da50584b9d85d16503362b9f73fbd67216 Mon Sep 17 00:00:00 2001 From: Rajendra Adhikari Date: Sun, 14 Mar 2021 22:53:41 -0600 Subject: [PATCH 02/10] Pass through postprocessing node memory size --- buildstockbatch/eagle.py | 3 ++- buildstockbatch/eagle_postprocessing.sh | 3 ++- buildstockbatch/postprocessing.py | 26 +++++++++++++++---------- buildstockbatch/schemas/v0.2.yaml | 3 ++- 4 files changed, 22 insertions(+), 13 deletions(-) diff --git a/buildstockbatch/eagle.py b/buildstockbatch/eagle.py index ea7bd9d6..74f3ac67 100644 --- a/buildstockbatch/eagle.py +++ b/buildstockbatch/eagle.py @@ -519,6 +519,7 @@ def queue_post_processing(self, after_jobids=[], upload_only=False, hipri=False) env['MY_CONDA_ENV'] = os.environ['CONDA_PREFIX'] env['OUT_DIR'] = self.output_dir env['UPLOADONLY'] = str(upload_only) + env['MEMORY'] = str(memory) here = os.path.dirname(os.path.abspath(__file__)) eagle_post_sh = os.path.join(here, 'eagle_postprocessing.sh') @@ -526,7 +527,7 @@ def queue_post_processing(self, after_jobids=[], upload_only=False, hipri=False) 'sbatch', '--account={}'.format(account), '--time={}'.format(walltime), - '--export=PROJECTFILE,MY_CONDA_ENV,OUT_DIR,UPLOADONLY', + '--export=PROJECTFILE,MY_CONDA_ENV,OUT_DIR,UPLOADONLY,MEMORY', '--job-name=bstkpost', '--output=postprocessing.out', '--nodes=1', diff --git a/buildstockbatch/eagle_postprocessing.sh b/buildstockbatch/eagle_postprocessing.sh index 6c78b53b..ffee2689 100644 --- a/buildstockbatch/eagle_postprocessing.sh +++ b/buildstockbatch/eagle_postprocessing.sh @@ -10,6 +10,7 @@ source activate "$MY_CONDA_ENV" export POSTPROCESS=1 echo "UPLOADONLY: ${UPLOADONLY}" +echo "MEMORY: ${MEMORY}" SCHEDULER_FILE=$OUT_DIR/dask_scheduler.json @@ -21,6 +22,6 @@ echo $SLURM_JOB_NODELIST_PACK_GROUP_1 pdsh -w $SLURM_JOB_NODELIST_PACK_GROUP_1 "free -h" $MY_CONDA_ENV/bin/dask-scheduler --scheduler-file $SCHEDULER_FILE &> $OUT_DIR/dask_scheduler.out & -pdsh -w $SLURM_JOB_NODELIST_PACK_GROUP_1 "$MY_CONDA_ENV/bin/dask-worker --scheduler-file $SCHEDULER_FILE --local-directory /tmp/scratch/dask --nprocs 1 --nthreads 1 --memory-limit 84GB" &> $OUT_DIR/dask_workers.out & +pdsh -w $SLURM_JOB_NODELIST_PACK_GROUP_1 "$MY_CONDA_ENV/bin/dask-worker --scheduler-file $SCHEDULER_FILE --local-directory /tmp/scratch/dask --nprocs 1 --nthreads 1 --memory-limit ${MEMORY}MB" &> $OUT_DIR/dask_workers.out & time python -u -m buildstockbatch.eagle "$PROJECTFILE" diff --git a/buildstockbatch/postprocessing.py b/buildstockbatch/postprocessing.py index b1bef917..dc97462e 100644 --- a/buildstockbatch/postprocessing.py +++ b/buildstockbatch/postprocessing.py @@ -35,7 +35,7 @@ logger = logging.getLogger(__name__) -MAX_PARQUET_MEMORY = 4e9 # maximum size of the parquet file in memory when combining multiple parquets +MAX_PARQUET_MEMORY = 4000 # maximum size (MB) of the parquet file in memory when combining multiple parquets def read_data_point_out_json(fs, reporting_measures, filename): @@ -219,13 +219,16 @@ def read_results_json(fs, filename): def read_enduse_timeseries_parquet(fs, filename, all_cols): - with fs.open(filename, 'rb') as f: - df = pd.read_parquet(f, engine='pyarrow') - building_id = int(re.search(r'bldg(\d+).parquet', filename).group(1)) - df['building_id'] = building_id - for col in set(all_cols).difference(df.columns.values): - df[col] = np.nan - return df[all_cols] + try: + with fs.open(filename, 'rb') as f: + df = pd.read_parquet(f, engine='pyarrow') + building_id = int(re.search(r'bldg(\d+).parquet', filename).group(1)) + df['building_id'] = building_id + for col in set(all_cols).difference(df.columns.values): + df[col] = np.nan + return df[all_cols] + except: + return pd.DataFrame(columns=[all_cols]+['building_id']) def read_and_concat_enduse_timeseries_parquet(fs, all_cols, output_dir, filenames, group_id): @@ -339,10 +342,12 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True): get_ts_mem_usage_d = dask.delayed(lambda x: read_ts_parquet(x).memory_usage(deep=True).sum()) sample_size = min(len(ts_filenames), 36 * 3) mean_mem = np.mean(dask.compute(map(get_ts_mem_usage_d, random.sample(ts_filenames, sample_size)))[0]) - total_mem = mean_mem * len(ts_filenames) + total_mem = mean_mem * len(ts_filenames) / 1e6 # total_mem in MB # Determine how many files should be in each partition and group the files - npartitions = math.ceil(total_mem / MAX_PARQUET_MEMORY) # 1 GB per partition + parquet_memory = int(cfg['eagle'].get('postprocessing', {}).get('parquet_memory_mb', MAX_PARQUET_MEMORY)) + logger.info(f"Max parquet memory: {parquet_memory} MB") + npartitions = math.ceil(total_mem / parquet_memory) # 1 GB per partition npartitions = min(len(ts_filenames), npartitions) # cannot have less than one file per partition ts_files_in_each_partition = np.array_split(ts_filenames, npartitions) N = len(ts_files_in_each_partition[0]) @@ -374,6 +379,7 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True): def remove_intermediate_files(fs, results_dir): # Remove aggregated files to save space + return sim_output_dir = f'{results_dir}/simulation_output' ts_in_dir = f'{sim_output_dir}/timeseries' results_job_json_glob = f'{sim_output_dir}/results_job*.json.gz' diff --git a/buildstockbatch/schemas/v0.2.yaml b/buildstockbatch/schemas/v0.2.yaml index 03c5c3c4..034fe678 100644 --- a/buildstockbatch/schemas/v0.2.yaml +++ b/buildstockbatch/schemas/v0.2.yaml @@ -50,7 +50,8 @@ hpc-spec: hpc-postprocessing-spec: time: int(required=True) n_workers: int(required=False) - memory: enum(85248, 180224, 751616, required=False) + node_memory_mb: enum(85248, 180224, 751616, required=False) + parquet_memory_mb: int(required=False) sampling-spec: time: int(required=True) From f8e955808c36bc528808293fe5fa21471cae7b9b Mon Sep 17 00:00:00 2001 From: Rajendra Adhikari Date: Mon, 15 Mar 2021 11:49:43 -0600 Subject: [PATCH 03/10] Flake8 fixes and add yaml setting for keeping intermediate files --- buildstockbatch/base.py | 6 +++++- buildstockbatch/postprocessing.py | 10 ++++------ buildstockbatch/schemas/v0.2.yaml | 1 + 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/buildstockbatch/base.py b/buildstockbatch/base.py index ade82507..3eb37e9a 100644 --- a/buildstockbatch/base.py +++ b/buildstockbatch/base.py @@ -784,4 +784,8 @@ def process_results(self, skip_combine=False, force_upload=False): if 'athena' in aws_conf: postprocessing.create_athena_tables(aws_conf, os.path.basename(self.output_dir), s3_bucket, s3_prefix) - postprocessing.remove_intermediate_files(fs, self.results_dir) + if not self.cfg['eagle'].get('postprocessing', {}).get('keep_intermediate_files', False): + logger.info("Removing intermediate files.") + postprocessing.remove_intermediate_files(fs, self.results_dir) + else: + logger.info("Skipped removing intermediate files.") diff --git a/buildstockbatch/postprocessing.py b/buildstockbatch/postprocessing.py index dc97462e..67562e5e 100644 --- a/buildstockbatch/postprocessing.py +++ b/buildstockbatch/postprocessing.py @@ -12,7 +12,6 @@ import traceback import boto3 import dask.bag as db -import dask.dataframe as dd from dask.distributed import performance_report import dask import datetime as dt @@ -227,7 +226,7 @@ def read_enduse_timeseries_parquet(fs, filename, all_cols): for col in set(all_cols).difference(df.columns.values): df[col] = np.nan return df[all_cols] - except: + except Exception: return pd.DataFrame(columns=[all_cols]+['building_id']) @@ -239,7 +238,7 @@ def read_and_concat_enduse_timeseries_parquet(fs, all_cols, output_dir, filename grouped_df.to_parquet(output_dir + f'group{group_id}.parquet') return ("success", group_id, len(grouped_df)) del grouped_df - except Exception as exp: + except Exception: return ("fail", group_id, f"Exp: {traceback.format_exc()}") @@ -342,12 +341,12 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True): get_ts_mem_usage_d = dask.delayed(lambda x: read_ts_parquet(x).memory_usage(deep=True).sum()) sample_size = min(len(ts_filenames), 36 * 3) mean_mem = np.mean(dask.compute(map(get_ts_mem_usage_d, random.sample(ts_filenames, sample_size)))[0]) - total_mem = mean_mem * len(ts_filenames) / 1e6 # total_mem in MB + total_mem = mean_mem * len(ts_filenames) / 1e6 # total_mem in MB # Determine how many files should be in each partition and group the files parquet_memory = int(cfg['eagle'].get('postprocessing', {}).get('parquet_memory_mb', MAX_PARQUET_MEMORY)) logger.info(f"Max parquet memory: {parquet_memory} MB") - npartitions = math.ceil(total_mem / parquet_memory) # 1 GB per partition + npartitions = math.ceil(total_mem / parquet_memory) npartitions = min(len(ts_filenames), npartitions) # cannot have less than one file per partition ts_files_in_each_partition = np.array_split(ts_filenames, npartitions) N = len(ts_files_in_each_partition[0]) @@ -379,7 +378,6 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True): def remove_intermediate_files(fs, results_dir): # Remove aggregated files to save space - return sim_output_dir = f'{results_dir}/simulation_output' ts_in_dir = f'{sim_output_dir}/timeseries' results_job_json_glob = f'{sim_output_dir}/results_job*.json.gz' diff --git a/buildstockbatch/schemas/v0.2.yaml b/buildstockbatch/schemas/v0.2.yaml index 034fe678..14865fc9 100644 --- a/buildstockbatch/schemas/v0.2.yaml +++ b/buildstockbatch/schemas/v0.2.yaml @@ -52,6 +52,7 @@ hpc-postprocessing-spec: n_workers: int(required=False) node_memory_mb: enum(85248, 180224, 751616, required=False) parquet_memory_mb: int(required=False) + keep_intermediate_files: bool(required=False) sampling-spec: time: int(required=True) From 6b0665f7282f96e6d58c1e416eabe2aacc5de65a Mon Sep 17 00:00:00 2001 From: Rajendra Adhikari Date: Mon, 15 Mar 2021 12:31:38 -0600 Subject: [PATCH 04/10] Modify tests --- buildstockbatch/test/test_base.py | 12 +++++++----- buildstockbatch/test/test_postprocessing.py | 6 ++++-- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/buildstockbatch/test/test_base.py b/buildstockbatch/test/test_base.py index bba55b70..a798f0e2 100644 --- a/buildstockbatch/test/test_base.py +++ b/buildstockbatch/test/test_base.py @@ -12,7 +12,7 @@ import shutil import tempfile from unittest.mock import patch, MagicMock - +import buildstockbatch from buildstockbatch.base import BuildStockBatchBase from buildstockbatch.postprocessing import write_dataframe_as_parquet @@ -21,6 +21,8 @@ OUTPUT_FOLDER_NAME = 'output' +buildstockbatch.postprocessing.performance_report = MagicMock() + def test_reference_scenario(basic_residential_project_file): # verify that the reference_scenario get's added to the upgrade file @@ -304,13 +306,13 @@ def test_upload_files(mocked_s3, basic_residential_project_file): assert (source_file_path, s3_file_path) in files_uploaded files_uploaded.remove((source_file_path, s3_file_path)) - s3_file_path = s3_path + 'timeseries/upgrade=0/part.0.parquet' - source_file_path = os.path.join(source_path, 'timeseries', 'upgrade=0', 'part.0.parquet') + s3_file_path = s3_path + 'timeseries/upgrade=0/group0.parquet' + source_file_path = os.path.join(source_path, 'timeseries', 'upgrade=0', 'group0.parquet') assert (source_file_path, s3_file_path) in files_uploaded files_uploaded.remove((source_file_path, s3_file_path)) - s3_file_path = s3_path + 'timeseries/upgrade=1/part.0.parquet' - source_file_path = os.path.join(source_path, 'timeseries', 'upgrade=1', 'part.0.parquet') + s3_file_path = s3_path + 'timeseries/upgrade=1/group0.parquet' + source_file_path = os.path.join(source_path, 'timeseries', 'upgrade=1', 'group0.parquet') assert (source_file_path, s3_file_path) in files_uploaded files_uploaded.remove((source_file_path, s3_file_path)) diff --git a/buildstockbatch/test/test_postprocessing.py b/buildstockbatch/test/test_postprocessing.py index 5a2c131b..9b5daa91 100644 --- a/buildstockbatch/test/test_postprocessing.py +++ b/buildstockbatch/test/test_postprocessing.py @@ -7,10 +7,12 @@ import tarfile import pytest import shutil - +import buildstockbatch from buildstockbatch import postprocessing from buildstockbatch.base import BuildStockBatchBase -from unittest.mock import patch +from unittest.mock import patch, MagicMock + +postprocessing.performance_report = MagicMock() def test_report_additional_results_csv_columns(basic_residential_project_file): From 749346bc1ca5993019c46c74e1340e8338c9f4df Mon Sep 17 00:00:00 2001 From: Noel Merket Date: Tue, 9 Mar 2021 13:45:15 -0700 Subject: [PATCH 05/10] switching back to the circleci python image and installing all deps with only pip --- .circleci/config.yml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 3083249f..a1858a9d 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -2,14 +2,13 @@ version: 2 jobs: build: docker: - - image: continuumio/miniconda3 + - image: cimg/python:3.8 steps: - setup_remote_docker - checkout - run: name: Install buildstock command: | - conda install -c conda-forge -y -q scipy numpy "pandas>=1.0.0,!=1.0.4" "pyarrow>=0.14.1" dask joblib pyyaml pip install .[dev] --progress-bar off - run: name: Run PyTest @@ -35,7 +34,6 @@ jobs: name: Build documentation when: always command: | - apt-get install make cd docs make html mkdir /tmp/docs From 72f50e890fe351b6dfc0bbb16106b2b06b58f064 Mon Sep 17 00:00:00 2001 From: Rajendra Adhikari Date: Mon, 15 Mar 2021 14:56:50 -0600 Subject: [PATCH 06/10] remove unused import --- buildstockbatch/test/test_postprocessing.py | 1 - 1 file changed, 1 deletion(-) diff --git a/buildstockbatch/test/test_postprocessing.py b/buildstockbatch/test/test_postprocessing.py index 9b5daa91..ef85119b 100644 --- a/buildstockbatch/test/test_postprocessing.py +++ b/buildstockbatch/test/test_postprocessing.py @@ -7,7 +7,6 @@ import tarfile import pytest import shutil -import buildstockbatch from buildstockbatch import postprocessing from buildstockbatch.base import BuildStockBatchBase from unittest.mock import patch, MagicMock From befbcb5c034f7977fadd3f827cd9e7e91fab7799 Mon Sep 17 00:00:00 2001 From: Rajendra Adhikari Date: Tue, 16 Mar 2021 12:14:38 -0600 Subject: [PATCH 07/10] Rename 'memory' to 'node_memory_mb' Co-authored-by: Noel Merket --- buildstockbatch/eagle.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/buildstockbatch/eagle.py b/buildstockbatch/eagle.py index 74f3ac67..2f445ce6 100644 --- a/buildstockbatch/eagle.py +++ b/buildstockbatch/eagle.py @@ -494,7 +494,7 @@ def queue_post_processing(self, after_jobids=[], upload_only=False, hipri=False) # Configuration values account = self.cfg['eagle']['account'] walltime = self.cfg['eagle'].get('postprocessing', {}).get('time', '1:30:00') - memory = self.cfg['eagle'].get('postprocessing', {}).get('memory', 85248) + memory = self.cfg['eagle'].get('postprocessing', {}).get('node_memory_mb', 85248) print(f"Submitting job to {memory}MB memory nodes.") # Throw an error if the files already exist. if not upload_only: From c7e05463bb582d2b8103ec198a01be5f5a4fc610 Mon Sep 17 00:00:00 2001 From: Rajendra Adhikari Date: Tue, 16 Mar 2021 17:02:39 -0600 Subject: [PATCH 08/10] Remove the broad exception and update the documentation --- buildstockbatch/postprocessing.py | 47 ++++++++++++++----------------- docs/project_defn.rst | 8 +++++- 2 files changed, 28 insertions(+), 27 deletions(-) diff --git a/buildstockbatch/postprocessing.py b/buildstockbatch/postprocessing.py index 67562e5e..a082a2f2 100644 --- a/buildstockbatch/postprocessing.py +++ b/buildstockbatch/postprocessing.py @@ -9,7 +9,6 @@ :copyright: (c) 2018 by The Alliance for Sustainable Energy :license: BSD-3 """ -import traceback import boto3 import dask.bag as db from dask.distributed import performance_report @@ -218,28 +217,21 @@ def read_results_json(fs, filename): def read_enduse_timeseries_parquet(fs, filename, all_cols): - try: - with fs.open(filename, 'rb') as f: - df = pd.read_parquet(f, engine='pyarrow') - building_id = int(re.search(r'bldg(\d+).parquet', filename).group(1)) - df['building_id'] = building_id - for col in set(all_cols).difference(df.columns.values): - df[col] = np.nan - return df[all_cols] - except Exception: - return pd.DataFrame(columns=[all_cols]+['building_id']) + with fs.open(filename, 'rb') as f: + df = pd.read_parquet(f, engine='pyarrow') + building_id = int(re.search(r'bldg(\d+).parquet', filename).group(1)) + df['building_id'] = building_id + for col in set(all_cols).difference(df.columns.values): + df[col] = np.nan + return df[all_cols] def read_and_concat_enduse_timeseries_parquet(fs, all_cols, output_dir, filenames, group_id): - try: - dfs = [read_enduse_timeseries_parquet(fs, filename, all_cols) for filename in filenames] - grouped_df = pd.concat(dfs) - grouped_df.set_index('building_id', inplace=True) - grouped_df.to_parquet(output_dir + f'group{group_id}.parquet') - return ("success", group_id, len(grouped_df)) - del grouped_df - except Exception: - return ("fail", group_id, f"Exp: {traceback.format_exc()}") + dfs = [read_enduse_timeseries_parquet(fs, filename, all_cols) for filename in filenames] + grouped_df = pd.concat(dfs) + grouped_df.set_index('building_id', inplace=True) + grouped_df.to_parquet(output_dir + f'group{group_id}.parquet') + del grouped_df def combine_results(fs, results_dir, cfg, do_timeseries=True): @@ -268,6 +260,7 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True): fs.makedirs(dr) # Results "CSV" + logger.info("Creating results_df.") results_job_json_glob = f'{sim_output_dir}/results_job*.json.gz' results_jsons = fs.glob(results_job_json_glob) results_json_job_ids = [int(re.search(r'results_job(\d+)\.json\.gz', x).group(1)) for x in results_jsons] @@ -288,6 +281,7 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True): if do_timeseries: # Look at all the parquet files to see what columns are in all of them. + logger.info("Collecting all the columns in timeseries parquet files.") ts_filenames = fs.glob(f'{ts_in_dir}/up*/bldg*.parquet') all_ts_cols = db.from_sequence(ts_filenames, partition_size=100).map(partial(get_cols, fs)).\ fold(lambda x, y: set(x).union(y)).compute() @@ -336,6 +330,10 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True): # Get the names of the timseries file for each simulation in this upgrade ts_filenames = fs.glob(f'{ts_in_dir}/up{upgrade_id:02d}/bldg*.parquet') + if not ts_filenames: + logger.info(f"There are no timeseries files for upgrade{upgrade_id}.") + continue + # Calculate the mean and estimate the total memory usage read_ts_parquet = partial(read_enduse_timeseries_parquet, fs, all_cols=all_ts_cols_sorted) get_ts_mem_usage_d = dask.delayed(lambda x: read_ts_parquet(x).memory_usage(deep=True).sum()) @@ -367,13 +365,10 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True): partial(read_and_concat_enduse_timeseries_parquet, fs, all_ts_cols_sorted, ts_out_loc) ) group_ids = list(range(npartitions)) - with performance_report(filename='dask_combine_report.html'): - results = dask.compute(map(read_and_concat_ts_pq_d, ts_files_in_each_partition, group_ids))[0] + with performance_report(filename=f'dask_combine_report{upgrade_id}.html'): + dask.compute(map(read_and_concat_ts_pq_d, ts_files_in_each_partition, group_ids))[0] - logger.info(f"Finished combining and saving. First 10 results: {results[:10]}") - failed_results = [r for r in results if r[0] == 'fail'] - if failed_results: - logger.info(f"{len(failed_results)} groupings have failed. First 10 failed results: {results[:10]}") + logger.info(f"Finished combining and saving timeseries for upgrade{upgrade_id}.") def remove_intermediate_files(fs, results_dir): diff --git a/docs/project_defn.rst b/docs/project_defn.rst index cf7af043..9afb4ae7 100644 --- a/docs/project_defn.rst +++ b/docs/project_defn.rst @@ -312,7 +312,13 @@ the Eagle supercomputer. * ``postprocessing``: Eagle configuration for the postprocessing step * ``time``: Maximum time in minutes to allocate postprocessing job - * ``n_workers``: Number of eagle workers to parallelize the postprocessing job into + * ``n_workers``: Number of eagle workers to parallelize the postprocessing job into. Max supported is 32. + * ``node_memory_mb``: The memory (in MB) to request for eagle node for postprocessing. The valid values are + 85248, 180224 and 751616. Default is 85248. + * ``parquet_memory_mb``: The size (in MB) of the combined parquet file in memory. Default is 40000. + * ``keep_intermediate_files``: Set this to true if you want to keep postprocessing intermediate files (for debugging + or other explorative purpose). The intermediate files contain results_job*.json.gz + files and individual building's timeseries parquet files. Default is false. .. _aws-config: From 564e089c848598c4bc519229ed1a7c31d56bae26 Mon Sep 17 00:00:00 2001 From: Rajendra Adhikari Date: Tue, 16 Mar 2021 17:05:10 -0600 Subject: [PATCH 09/10] Style fix --- buildstockbatch/postprocessing.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/buildstockbatch/postprocessing.py b/buildstockbatch/postprocessing.py index a082a2f2..b0ed2011 100644 --- a/buildstockbatch/postprocessing.py +++ b/buildstockbatch/postprocessing.py @@ -347,10 +347,9 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True): npartitions = math.ceil(total_mem / parquet_memory) npartitions = min(len(ts_filenames), npartitions) # cannot have less than one file per partition ts_files_in_each_partition = np.array_split(ts_filenames, npartitions) - N = len(ts_files_in_each_partition[0]) - group_ids = list(range(npartitions)) - logger.info(f"Combining about {N} parquets together. Creating {npartitions} groups.") + logger.info(f"Combining about {len(ts_files_in_each_partition[0])} parquets together." + f" Creating {npartitions} groups.") if isinstance(fs, LocalFileSystem): ts_out_loc = f"{ts_dir}/upgrade={upgrade_id}/" else: @@ -359,6 +358,7 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True): fs.makedirs(ts_out_loc) logger.info(f'Created directory {ts_out_loc} for writing.') + # Read the timeseries into a dask dataframe read_and_concat_ts_pq_d = dask.delayed( # fs, all_cols, output_dir, filenames, group_id @@ -366,7 +366,7 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True): ) group_ids = list(range(npartitions)) with performance_report(filename=f'dask_combine_report{upgrade_id}.html'): - dask.compute(map(read_and_concat_ts_pq_d, ts_files_in_each_partition, group_ids))[0] + dask.compute(map(read_and_concat_ts_pq_d, ts_files_in_each_partition, group_ids)) logger.info(f"Finished combining and saving timeseries for upgrade{upgrade_id}.") From ec365a6b58580b0361bc2c4d798fa42f0cd18c30 Mon Sep 17 00:00:00 2001 From: Rajendra Adhikari Date: Tue, 16 Mar 2021 17:37:08 -0600 Subject: [PATCH 10/10] Update changelog for v0.19_patch --- docs/changelog/changelog_dev.rst | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/docs/changelog/changelog_dev.rst b/docs/changelog/changelog_dev.rst index 75eeab3d..d40e5efd 100644 --- a/docs/changelog/changelog_dev.rst +++ b/docs/changelog/changelog_dev.rst @@ -14,3 +14,11 @@ Development Changelog This is an example change. Please copy and paste it - for valid tags please refer to ``conf.py`` in the docs directory. ``pullreq`` should be set to the appropriate pull request number and ``tickets`` to any related github issues. These will be automatically linked in the documentation. + + .. change:: + :tags: postprocessing + :pullreq: 212 + :tickets: + + Use a map of dask delayed function to combine parquets instead of a giant dask df to avoid memory issues. + Default to 85GB memory nodes in eagle with single process and single thread in each node to avoid memory issues. \ No newline at end of file