diff --git a/.circleci/config.yml b/.circleci/config.yml index 3083249f..a1858a9d 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -2,14 +2,13 @@ version: 2 jobs: build: docker: - - image: continuumio/miniconda3 + - image: cimg/python:3.8 steps: - setup_remote_docker - checkout - run: name: Install buildstock command: | - conda install -c conda-forge -y -q scipy numpy "pandas>=1.0.0,!=1.0.4" "pyarrow>=0.14.1" dask joblib pyyaml pip install .[dev] --progress-bar off - run: name: Run PyTest @@ -35,7 +34,6 @@ jobs: name: Build documentation when: always command: | - apt-get install make cd docs make html mkdir /tmp/docs diff --git a/buildstockbatch/base.py b/buildstockbatch/base.py index ade82507..3eb37e9a 100644 --- a/buildstockbatch/base.py +++ b/buildstockbatch/base.py @@ -784,4 +784,8 @@ def process_results(self, skip_combine=False, force_upload=False): if 'athena' in aws_conf: postprocessing.create_athena_tables(aws_conf, os.path.basename(self.output_dir), s3_bucket, s3_prefix) - postprocessing.remove_intermediate_files(fs, self.results_dir) + if not self.cfg['eagle'].get('postprocessing', {}).get('keep_intermediate_files', False): + logger.info("Removing intermediate files.") + postprocessing.remove_intermediate_files(fs, self.results_dir) + else: + logger.info("Skipped removing intermediate files.") diff --git a/buildstockbatch/eagle.py b/buildstockbatch/eagle.py index 320cee14..2f445ce6 100644 --- a/buildstockbatch/eagle.py +++ b/buildstockbatch/eagle.py @@ -494,7 +494,8 @@ def queue_post_processing(self, after_jobids=[], upload_only=False, hipri=False) # Configuration values account = self.cfg['eagle']['account'] walltime = self.cfg['eagle'].get('postprocessing', {}).get('time', '1:30:00') - + memory = self.cfg['eagle'].get('postprocessing', {}).get('node_memory_mb', 85248) + print(f"Submitting job to {memory}MB memory nodes.") # Throw an error if the files already exist. if not upload_only: for subdir in ('parquet', 'results_csvs'): @@ -518,6 +519,7 @@ def queue_post_processing(self, after_jobids=[], upload_only=False, hipri=False) env['MY_CONDA_ENV'] = os.environ['CONDA_PREFIX'] env['OUT_DIR'] = self.output_dir env['UPLOADONLY'] = str(upload_only) + env['MEMORY'] = str(memory) here = os.path.dirname(os.path.abspath(__file__)) eagle_post_sh = os.path.join(here, 'eagle_postprocessing.sh') @@ -525,12 +527,12 @@ def queue_post_processing(self, after_jobids=[], upload_only=False, hipri=False) 'sbatch', '--account={}'.format(account), '--time={}'.format(walltime), - '--export=PROJECTFILE,MY_CONDA_ENV,OUT_DIR,UPLOADONLY', + '--export=PROJECTFILE,MY_CONDA_ENV,OUT_DIR,UPLOADONLY,MEMORY', '--job-name=bstkpost', '--output=postprocessing.out', '--nodes=1', ':', - '--mem=180000', + '--mem={}'.format(memory), '--output=dask_workers.out', '--nodes={}'.format(self.cfg['eagle'].get('postprocessing', {}).get('n_workers', 2)), eagle_post_sh @@ -599,7 +601,6 @@ def user_cli(argv=sys.argv[1:]): ''' This is the user entry point for running buildstockbatch on Eagle ''' - # set up logging, currently based on within-this-file hard-coded config logging.config.dictConfig(logging_config) diff --git a/buildstockbatch/eagle_postprocessing.sh b/buildstockbatch/eagle_postprocessing.sh index 1d8f2576..ffee2689 100644 --- a/buildstockbatch/eagle_postprocessing.sh +++ b/buildstockbatch/eagle_postprocessing.sh @@ -1,5 +1,4 @@ #!/bin/bash - echo "begin eagle_postprocessing.sh" echo "Job ID: $SLURM_JOB_ID" @@ -11,6 +10,7 @@ source activate "$MY_CONDA_ENV" export POSTPROCESS=1 echo "UPLOADONLY: ${UPLOADONLY}" +echo "MEMORY: ${MEMORY}" SCHEDULER_FILE=$OUT_DIR/dask_scheduler.json @@ -22,6 +22,6 @@ echo $SLURM_JOB_NODELIST_PACK_GROUP_1 pdsh -w $SLURM_JOB_NODELIST_PACK_GROUP_1 "free -h" $MY_CONDA_ENV/bin/dask-scheduler --scheduler-file $SCHEDULER_FILE &> $OUT_DIR/dask_scheduler.out & -pdsh -w $SLURM_JOB_NODELIST_PACK_GROUP_1 "$MY_CONDA_ENV/bin/dask-worker --scheduler-file $SCHEDULER_FILE --local-directory /tmp/scratch/dask --nprocs 9" &> $OUT_DIR/dask_workers.out & +pdsh -w $SLURM_JOB_NODELIST_PACK_GROUP_1 "$MY_CONDA_ENV/bin/dask-worker --scheduler-file $SCHEDULER_FILE --local-directory /tmp/scratch/dask --nprocs 1 --nthreads 1 --memory-limit ${MEMORY}MB" &> $OUT_DIR/dask_workers.out & time python -u -m buildstockbatch.eagle "$PROJECTFILE" diff --git a/buildstockbatch/postprocessing.py b/buildstockbatch/postprocessing.py index 95f0c503..b0ed2011 100644 --- a/buildstockbatch/postprocessing.py +++ b/buildstockbatch/postprocessing.py @@ -9,10 +9,9 @@ :copyright: (c) 2018 by The Alliance for Sustainable Energy :license: BSD-3 """ - import boto3 import dask.bag as db -import dask.dataframe as dd +from dask.distributed import performance_report import dask import datetime as dt from fsspec.implementations.local import LocalFileSystem @@ -34,7 +33,7 @@ logger = logging.getLogger(__name__) -MAX_PARQUET_MEMORY = 1e9 # maximum size of the parquet file in memory when combining multiple parquets +MAX_PARQUET_MEMORY = 4000 # maximum size (MB) of the parquet file in memory when combining multiple parquets def read_data_point_out_json(fs, reporting_measures, filename): @@ -227,8 +226,12 @@ def read_enduse_timeseries_parquet(fs, filename, all_cols): return df[all_cols] -def read_and_concat_enduse_timeseries_parquet(fs, filenames, all_cols): - return pd.concat(read_enduse_timeseries_parquet(fs, filename, all_cols) for filename in filenames) +def read_and_concat_enduse_timeseries_parquet(fs, all_cols, output_dir, filenames, group_id): + dfs = [read_enduse_timeseries_parquet(fs, filename, all_cols) for filename in filenames] + grouped_df = pd.concat(dfs) + grouped_df.set_index('building_id', inplace=True) + grouped_df.to_parquet(output_dir + f'group{group_id}.parquet') + del grouped_df def combine_results(fs, results_dir, cfg, do_timeseries=True): @@ -257,6 +260,7 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True): fs.makedirs(dr) # Results "CSV" + logger.info("Creating results_df.") results_job_json_glob = f'{sim_output_dir}/results_job*.json.gz' results_jsons = fs.glob(results_job_json_glob) results_json_job_ids = [int(re.search(r'results_job(\d+)\.json\.gz', x).group(1)) for x in results_jsons] @@ -277,6 +281,7 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True): if do_timeseries: # Look at all the parquet files to see what columns are in all of them. + logger.info("Collecting all the columns in timeseries parquet files.") ts_filenames = fs.glob(f'{ts_in_dir}/up*/bldg*.parquet') all_ts_cols = db.from_sequence(ts_filenames, partition_size=100).map(partial(get_cols, fs)).\ fold(lambda x, y: set(x).union(y)).compute() @@ -325,37 +330,45 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True): # Get the names of the timseries file for each simulation in this upgrade ts_filenames = fs.glob(f'{ts_in_dir}/up{upgrade_id:02d}/bldg*.parquet') + if not ts_filenames: + logger.info(f"There are no timeseries files for upgrade{upgrade_id}.") + continue + # Calculate the mean and estimate the total memory usage read_ts_parquet = partial(read_enduse_timeseries_parquet, fs, all_cols=all_ts_cols_sorted) get_ts_mem_usage_d = dask.delayed(lambda x: read_ts_parquet(x).memory_usage(deep=True).sum()) sample_size = min(len(ts_filenames), 36 * 3) mean_mem = np.mean(dask.compute(map(get_ts_mem_usage_d, random.sample(ts_filenames, sample_size)))[0]) - total_mem = mean_mem * len(ts_filenames) + total_mem = mean_mem * len(ts_filenames) / 1e6 # total_mem in MB # Determine how many files should be in each partition and group the files - npartitions = math.ceil(total_mem / MAX_PARQUET_MEMORY) # 1 GB per partition + parquet_memory = int(cfg['eagle'].get('postprocessing', {}).get('parquet_memory_mb', MAX_PARQUET_MEMORY)) + logger.info(f"Max parquet memory: {parquet_memory} MB") + npartitions = math.ceil(total_mem / parquet_memory) npartitions = min(len(ts_filenames), npartitions) # cannot have less than one file per partition ts_files_in_each_partition = np.array_split(ts_filenames, npartitions) - # Read the timeseries into a dask dataframe - read_and_concat_ts_pq_d = dask.delayed( - partial(read_and_concat_enduse_timeseries_parquet, fs, all_cols=all_ts_cols_sorted) - ) - ts_df = dd.from_delayed(map(read_and_concat_ts_pq_d, ts_files_in_each_partition)) - ts_df = ts_df.set_index('building_id', sorted=True) - - # Write out new dask timeseries dataframe. + logger.info(f"Combining about {len(ts_files_in_each_partition[0])} parquets together." + f" Creating {npartitions} groups.") if isinstance(fs, LocalFileSystem): - ts_out_loc = f"{ts_dir}/upgrade={upgrade_id}" + ts_out_loc = f"{ts_dir}/upgrade={upgrade_id}/" else: assert isinstance(fs, S3FileSystem) - ts_out_loc = f"s3://{ts_dir}/upgrade={upgrade_id}" - logger.info(f'Writing {ts_out_loc}') - ts_df.to_parquet( - ts_out_loc, - engine='pyarrow', - flavor='spark' + ts_out_loc = f"s3://{ts_dir}/upgrade={upgrade_id}/" + + fs.makedirs(ts_out_loc) + logger.info(f'Created directory {ts_out_loc} for writing.') + + # Read the timeseries into a dask dataframe + read_and_concat_ts_pq_d = dask.delayed( + # fs, all_cols, output_dir, filenames, group_id + partial(read_and_concat_enduse_timeseries_parquet, fs, all_ts_cols_sorted, ts_out_loc) ) + group_ids = list(range(npartitions)) + with performance_report(filename=f'dask_combine_report{upgrade_id}.html'): + dask.compute(map(read_and_concat_ts_pq_d, ts_files_in_each_partition, group_ids)) + + logger.info(f"Finished combining and saving timeseries for upgrade{upgrade_id}.") def remove_intermediate_files(fs, results_dir): diff --git a/buildstockbatch/schemas/v0.2.yaml b/buildstockbatch/schemas/v0.2.yaml index a3745d69..14865fc9 100644 --- a/buildstockbatch/schemas/v0.2.yaml +++ b/buildstockbatch/schemas/v0.2.yaml @@ -50,6 +50,9 @@ hpc-spec: hpc-postprocessing-spec: time: int(required=True) n_workers: int(required=False) + node_memory_mb: enum(85248, 180224, 751616, required=False) + parquet_memory_mb: int(required=False) + keep_intermediate_files: bool(required=False) sampling-spec: time: int(required=True) diff --git a/buildstockbatch/test/test_base.py b/buildstockbatch/test/test_base.py index bba55b70..a798f0e2 100644 --- a/buildstockbatch/test/test_base.py +++ b/buildstockbatch/test/test_base.py @@ -12,7 +12,7 @@ import shutil import tempfile from unittest.mock import patch, MagicMock - +import buildstockbatch from buildstockbatch.base import BuildStockBatchBase from buildstockbatch.postprocessing import write_dataframe_as_parquet @@ -21,6 +21,8 @@ OUTPUT_FOLDER_NAME = 'output' +buildstockbatch.postprocessing.performance_report = MagicMock() + def test_reference_scenario(basic_residential_project_file): # verify that the reference_scenario get's added to the upgrade file @@ -304,13 +306,13 @@ def test_upload_files(mocked_s3, basic_residential_project_file): assert (source_file_path, s3_file_path) in files_uploaded files_uploaded.remove((source_file_path, s3_file_path)) - s3_file_path = s3_path + 'timeseries/upgrade=0/part.0.parquet' - source_file_path = os.path.join(source_path, 'timeseries', 'upgrade=0', 'part.0.parquet') + s3_file_path = s3_path + 'timeseries/upgrade=0/group0.parquet' + source_file_path = os.path.join(source_path, 'timeseries', 'upgrade=0', 'group0.parquet') assert (source_file_path, s3_file_path) in files_uploaded files_uploaded.remove((source_file_path, s3_file_path)) - s3_file_path = s3_path + 'timeseries/upgrade=1/part.0.parquet' - source_file_path = os.path.join(source_path, 'timeseries', 'upgrade=1', 'part.0.parquet') + s3_file_path = s3_path + 'timeseries/upgrade=1/group0.parquet' + source_file_path = os.path.join(source_path, 'timeseries', 'upgrade=1', 'group0.parquet') assert (source_file_path, s3_file_path) in files_uploaded files_uploaded.remove((source_file_path, s3_file_path)) diff --git a/buildstockbatch/test/test_postprocessing.py b/buildstockbatch/test/test_postprocessing.py index 5a2c131b..ef85119b 100644 --- a/buildstockbatch/test/test_postprocessing.py +++ b/buildstockbatch/test/test_postprocessing.py @@ -7,10 +7,11 @@ import tarfile import pytest import shutil - from buildstockbatch import postprocessing from buildstockbatch.base import BuildStockBatchBase -from unittest.mock import patch +from unittest.mock import patch, MagicMock + +postprocessing.performance_report = MagicMock() def test_report_additional_results_csv_columns(basic_residential_project_file): diff --git a/docs/changelog/changelog_dev.rst b/docs/changelog/changelog_dev.rst index 75eeab3d..d40e5efd 100644 --- a/docs/changelog/changelog_dev.rst +++ b/docs/changelog/changelog_dev.rst @@ -14,3 +14,11 @@ Development Changelog This is an example change. Please copy and paste it - for valid tags please refer to ``conf.py`` in the docs directory. ``pullreq`` should be set to the appropriate pull request number and ``tickets`` to any related github issues. These will be automatically linked in the documentation. + + .. change:: + :tags: postprocessing + :pullreq: 212 + :tickets: + + Use a map of dask delayed function to combine parquets instead of a giant dask df to avoid memory issues. + Default to 85GB memory nodes in eagle with single process and single thread in each node to avoid memory issues. \ No newline at end of file diff --git a/docs/project_defn.rst b/docs/project_defn.rst index cf7af043..9afb4ae7 100644 --- a/docs/project_defn.rst +++ b/docs/project_defn.rst @@ -312,7 +312,13 @@ the Eagle supercomputer. * ``postprocessing``: Eagle configuration for the postprocessing step * ``time``: Maximum time in minutes to allocate postprocessing job - * ``n_workers``: Number of eagle workers to parallelize the postprocessing job into + * ``n_workers``: Number of eagle workers to parallelize the postprocessing job into. Max supported is 32. + * ``node_memory_mb``: The memory (in MB) to request for eagle node for postprocessing. The valid values are + 85248, 180224 and 751616. Default is 85248. + * ``parquet_memory_mb``: The size (in MB) of the combined parquet file in memory. Default is 40000. + * ``keep_intermediate_files``: Set this to true if you want to keep postprocessing intermediate files (for debugging + or other explorative purpose). The intermediate files contain results_job*.json.gz + files and individual building's timeseries parquet files. Default is false. .. _aws-config: