Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

memory_issue_fix #212

Merged
merged 10 commits into from Mar 22, 2021
4 changes: 1 addition & 3 deletions .circleci/config.yml
Expand Up @@ -2,14 +2,13 @@ version: 2
jobs:
build:
docker:
- image: continuumio/miniconda3
- image: cimg/python:3.8
steps:
- setup_remote_docker
- checkout
- run:
name: Install buildstock
command: |
conda install -c conda-forge -y -q scipy numpy "pandas>=1.0.0,!=1.0.4" "pyarrow>=0.14.1" dask joblib pyyaml
pip install .[dev] --progress-bar off
- run:
name: Run PyTest
Expand All @@ -35,7 +34,6 @@ jobs:
name: Build documentation
when: always
command: |
apt-get install make
cd docs
make html
mkdir /tmp/docs
Expand Down
6 changes: 5 additions & 1 deletion buildstockbatch/base.py
Expand Up @@ -784,4 +784,8 @@ def process_results(self, skip_combine=False, force_upload=False):
if 'athena' in aws_conf:
postprocessing.create_athena_tables(aws_conf, os.path.basename(self.output_dir), s3_bucket, s3_prefix)

postprocessing.remove_intermediate_files(fs, self.results_dir)
if not self.cfg['eagle'].get('postprocessing', {}).get('keep_intermediate_files', False):
logger.info("Removing intermediate files.")
postprocessing.remove_intermediate_files(fs, self.results_dir)
else:
logger.info("Skipped removing intermediate files.")
9 changes: 5 additions & 4 deletions buildstockbatch/eagle.py
Expand Up @@ -494,7 +494,8 @@ def queue_post_processing(self, after_jobids=[], upload_only=False, hipri=False)
# Configuration values
account = self.cfg['eagle']['account']
walltime = self.cfg['eagle'].get('postprocessing', {}).get('time', '1:30:00')

memory = self.cfg['eagle'].get('postprocessing', {}).get('memory', 85248)
rajeee marked this conversation as resolved.
Show resolved Hide resolved
print(f"Submitting job to {memory}MB memory nodes.")
# Throw an error if the files already exist.
if not upload_only:
for subdir in ('parquet', 'results_csvs'):
Expand All @@ -518,19 +519,20 @@ def queue_post_processing(self, after_jobids=[], upload_only=False, hipri=False)
env['MY_CONDA_ENV'] = os.environ['CONDA_PREFIX']
env['OUT_DIR'] = self.output_dir
env['UPLOADONLY'] = str(upload_only)
env['MEMORY'] = str(memory)
here = os.path.dirname(os.path.abspath(__file__))
eagle_post_sh = os.path.join(here, 'eagle_postprocessing.sh')

args = [
'sbatch',
'--account={}'.format(account),
'--time={}'.format(walltime),
'--export=PROJECTFILE,MY_CONDA_ENV,OUT_DIR,UPLOADONLY',
'--export=PROJECTFILE,MY_CONDA_ENV,OUT_DIR,UPLOADONLY,MEMORY',
'--job-name=bstkpost',
'--output=postprocessing.out',
'--nodes=1',
':',
'--mem=180000',
'--mem={}'.format(memory),
'--output=dask_workers.out',
'--nodes={}'.format(self.cfg['eagle'].get('postprocessing', {}).get('n_workers', 2)),
eagle_post_sh
Expand Down Expand Up @@ -599,7 +601,6 @@ def user_cli(argv=sys.argv[1:]):
'''
This is the user entry point for running buildstockbatch on Eagle
'''

# set up logging, currently based on within-this-file hard-coded config
logging.config.dictConfig(logging_config)

Expand Down
4 changes: 2 additions & 2 deletions buildstockbatch/eagle_postprocessing.sh
@@ -1,5 +1,4 @@
#!/bin/bash

echo "begin eagle_postprocessing.sh"

echo "Job ID: $SLURM_JOB_ID"
Expand All @@ -11,6 +10,7 @@ source activate "$MY_CONDA_ENV"
export POSTPROCESS=1

echo "UPLOADONLY: ${UPLOADONLY}"
echo "MEMORY: ${MEMORY}"

SCHEDULER_FILE=$OUT_DIR/dask_scheduler.json

Expand All @@ -22,6 +22,6 @@ echo $SLURM_JOB_NODELIST_PACK_GROUP_1
pdsh -w $SLURM_JOB_NODELIST_PACK_GROUP_1 "free -h"

$MY_CONDA_ENV/bin/dask-scheduler --scheduler-file $SCHEDULER_FILE &> $OUT_DIR/dask_scheduler.out &
pdsh -w $SLURM_JOB_NODELIST_PACK_GROUP_1 "$MY_CONDA_ENV/bin/dask-worker --scheduler-file $SCHEDULER_FILE --local-directory /tmp/scratch/dask --nprocs 9" &> $OUT_DIR/dask_workers.out &
pdsh -w $SLURM_JOB_NODELIST_PACK_GROUP_1 "$MY_CONDA_ENV/bin/dask-worker --scheduler-file $SCHEDULER_FILE --local-directory /tmp/scratch/dask --nprocs 1 --nthreads 1 --memory-limit ${MEMORY}MB" &> $OUT_DIR/dask_workers.out &

time python -u -m buildstockbatch.eagle "$PROJECTFILE"
80 changes: 49 additions & 31 deletions buildstockbatch/postprocessing.py
Expand Up @@ -9,10 +9,10 @@
:copyright: (c) 2018 by The Alliance for Sustainable Energy
:license: BSD-3
"""

import traceback
import boto3
import dask.bag as db
import dask.dataframe as dd
from dask.distributed import performance_report
import dask
import datetime as dt
from fsspec.implementations.local import LocalFileSystem
Expand All @@ -34,7 +34,7 @@

logger = logging.getLogger(__name__)

MAX_PARQUET_MEMORY = 1e9 # maximum size of the parquet file in memory when combining multiple parquets
MAX_PARQUET_MEMORY = 4000 # maximum size (MB) of the parquet file in memory when combining multiple parquets


def read_data_point_out_json(fs, reporting_measures, filename):
Expand Down Expand Up @@ -218,17 +218,28 @@ def read_results_json(fs, filename):


def read_enduse_timeseries_parquet(fs, filename, all_cols):
with fs.open(filename, 'rb') as f:
df = pd.read_parquet(f, engine='pyarrow')
building_id = int(re.search(r'bldg(\d+).parquet', filename).group(1))
df['building_id'] = building_id
for col in set(all_cols).difference(df.columns.values):
df[col] = np.nan
return df[all_cols]


def read_and_concat_enduse_timeseries_parquet(fs, filenames, all_cols):
return pd.concat(read_enduse_timeseries_parquet(fs, filename, all_cols) for filename in filenames)
try:
with fs.open(filename, 'rb') as f:
df = pd.read_parquet(f, engine='pyarrow')
building_id = int(re.search(r'bldg(\d+).parquet', filename).group(1))
df['building_id'] = building_id
for col in set(all_cols).difference(df.columns.values):
df[col] = np.nan
return df[all_cols]
except Exception:
return pd.DataFrame(columns=[all_cols]+['building_id'])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a pretty broad except block. It looks like it just returns an empty dataframe with the appropriate columns if anything goes wrong. Intentionally creating these kinds of silent errors is troubling because it means that there could be major problems with reading some of the parquet files and the user will not know and think everything is fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, agreed it could mask errors. But I think a few times there was an issue reading a handful of parquet files that resulted in crashing everything, so I thought it might be a good idea to assemble whatever can be assembled. Do you think it would be okay to have this try-except if we only wrap the read_parquet call?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured that's why you had this here. I don't know if it functionally matters much to only wrap the read_parquet call rather than do this. My primary concern is that this will silently omit data the user is expecting. That could mess up results. Right now at least the user knows something went wrong because they don't get their results. Could we put some sort of error message in the postprocessing (or other relevant) log file saying, "building_id = x, upgrade = y failed to load parquet"? They still might miss that, but it would be documented somewhere. It would be better than what we have now, which is an obtuse error with very little to go on to find the problem.

Copy link
Contributor Author

@rajeee rajeee Mar 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nmerket I started working on the solution you proposed of logging the buildings we failed to read the timeseries, but then realized we would have to do it at multiple places. If there is any corrupted parquet it would fail here first:

all_ts_cols = db.from_sequence(ts_filenames, partition_size=100).map(partial(get_cols, fs)).\

And, the corrupted parquet (or not being able to read from disk) is something I have seen only a couple of times when the eagle was having an issue, so it may not be worthwhile to account for that. Even if we need to handle that, maybe not in this PR. So, I have removed all such handling.



def read_and_concat_enduse_timeseries_parquet(fs, all_cols, output_dir, filenames, group_id):
try:
dfs = [read_enduse_timeseries_parquet(fs, filename, all_cols) for filename in filenames]
grouped_df = pd.concat(dfs)
grouped_df.set_index('building_id', inplace=True)
grouped_df.to_parquet(output_dir + f'group{group_id}.parquet')
return ("success", group_id, len(grouped_df))
del grouped_df
except Exception:
return ("fail", group_id, f"Exp: {traceback.format_exc()}")


def combine_results(fs, results_dir, cfg, do_timeseries=True):
Expand Down Expand Up @@ -330,32 +341,39 @@ def combine_results(fs, results_dir, cfg, do_timeseries=True):
get_ts_mem_usage_d = dask.delayed(lambda x: read_ts_parquet(x).memory_usage(deep=True).sum())
sample_size = min(len(ts_filenames), 36 * 3)
mean_mem = np.mean(dask.compute(map(get_ts_mem_usage_d, random.sample(ts_filenames, sample_size)))[0])
total_mem = mean_mem * len(ts_filenames)
total_mem = mean_mem * len(ts_filenames) / 1e6 # total_mem in MB

# Determine how many files should be in each partition and group the files
npartitions = math.ceil(total_mem / MAX_PARQUET_MEMORY) # 1 GB per partition
parquet_memory = int(cfg['eagle'].get('postprocessing', {}).get('parquet_memory_mb', MAX_PARQUET_MEMORY))
logger.info(f"Max parquet memory: {parquet_memory} MB")
npartitions = math.ceil(total_mem / parquet_memory)
npartitions = min(len(ts_filenames), npartitions) # cannot have less than one file per partition
ts_files_in_each_partition = np.array_split(ts_filenames, npartitions)
N = len(ts_files_in_each_partition[0])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor thing, but usually a variable written in ALL_CAPS is understood to be a constant in python. CamelCase is a class. This looks to be a regular variable which are usually lower_snake_case. It's not clear from the variable name what this is. Maybe a more descriptive name would be helpful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

point taken :). Thanks. I have a propensity to use big N for counts of any sort. I will change the name.

group_ids = list(range(npartitions))

# Read the timeseries into a dask dataframe
read_and_concat_ts_pq_d = dask.delayed(
partial(read_and_concat_enduse_timeseries_parquet, fs, all_cols=all_ts_cols_sorted)
)
ts_df = dd.from_delayed(map(read_and_concat_ts_pq_d, ts_files_in_each_partition))
ts_df = ts_df.set_index('building_id', sorted=True)

# Write out new dask timeseries dataframe.
logger.info(f"Combining about {N} parquets together. Creating {npartitions} groups.")
if isinstance(fs, LocalFileSystem):
ts_out_loc = f"{ts_dir}/upgrade={upgrade_id}"
ts_out_loc = f"{ts_dir}/upgrade={upgrade_id}/"
else:
assert isinstance(fs, S3FileSystem)
ts_out_loc = f"s3://{ts_dir}/upgrade={upgrade_id}"
logger.info(f'Writing {ts_out_loc}')
ts_df.to_parquet(
ts_out_loc,
engine='pyarrow',
flavor='spark'
ts_out_loc = f"s3://{ts_dir}/upgrade={upgrade_id}/"

fs.makedirs(ts_out_loc)
logger.info(f'Created directory {ts_out_loc} for writing.')
# Read the timeseries into a dask dataframe
read_and_concat_ts_pq_d = dask.delayed(
# fs, all_cols, output_dir, filenames, group_id
partial(read_and_concat_enduse_timeseries_parquet, fs, all_ts_cols_sorted, ts_out_loc)
)
group_ids = list(range(npartitions))
with performance_report(filename='dask_combine_report.html'):
results = dask.compute(map(read_and_concat_ts_pq_d, ts_files_in_each_partition, group_ids))[0]

logger.info(f"Finished combining and saving. First 10 results: {results[:10]}")
failed_results = [r for r in results if r[0] == 'fail']
if failed_results:
logger.info(f"{len(failed_results)} groupings have failed. First 10 failed results: {results[:10]}")


def remove_intermediate_files(fs, results_dir):
Expand Down
3 changes: 3 additions & 0 deletions buildstockbatch/schemas/v0.2.yaml
Expand Up @@ -50,6 +50,9 @@ hpc-spec:
hpc-postprocessing-spec:
time: int(required=True)
n_workers: int(required=False)
node_memory_mb: enum(85248, 180224, 751616, required=False)
parquet_memory_mb: int(required=False)
keep_intermediate_files: bool(required=False)

sampling-spec:
time: int(required=True)
Expand Down
12 changes: 7 additions & 5 deletions buildstockbatch/test/test_base.py
Expand Up @@ -12,7 +12,7 @@
import shutil
import tempfile
from unittest.mock import patch, MagicMock

import buildstockbatch
from buildstockbatch.base import BuildStockBatchBase
from buildstockbatch.postprocessing import write_dataframe_as_parquet

Expand All @@ -21,6 +21,8 @@

OUTPUT_FOLDER_NAME = 'output'

buildstockbatch.postprocessing.performance_report = MagicMock()


def test_reference_scenario(basic_residential_project_file):
# verify that the reference_scenario get's added to the upgrade file
Expand Down Expand Up @@ -304,13 +306,13 @@ def test_upload_files(mocked_s3, basic_residential_project_file):
assert (source_file_path, s3_file_path) in files_uploaded
files_uploaded.remove((source_file_path, s3_file_path))

s3_file_path = s3_path + 'timeseries/upgrade=0/part.0.parquet'
source_file_path = os.path.join(source_path, 'timeseries', 'upgrade=0', 'part.0.parquet')
s3_file_path = s3_path + 'timeseries/upgrade=0/group0.parquet'
source_file_path = os.path.join(source_path, 'timeseries', 'upgrade=0', 'group0.parquet')
assert (source_file_path, s3_file_path) in files_uploaded
files_uploaded.remove((source_file_path, s3_file_path))

s3_file_path = s3_path + 'timeseries/upgrade=1/part.0.parquet'
source_file_path = os.path.join(source_path, 'timeseries', 'upgrade=1', 'part.0.parquet')
s3_file_path = s3_path + 'timeseries/upgrade=1/group0.parquet'
source_file_path = os.path.join(source_path, 'timeseries', 'upgrade=1', 'group0.parquet')
assert (source_file_path, s3_file_path) in files_uploaded
files_uploaded.remove((source_file_path, s3_file_path))

Expand Down
5 changes: 3 additions & 2 deletions buildstockbatch/test/test_postprocessing.py
Expand Up @@ -7,10 +7,11 @@
import tarfile
import pytest
import shutil

from buildstockbatch import postprocessing
from buildstockbatch.base import BuildStockBatchBase
from unittest.mock import patch
from unittest.mock import patch, MagicMock

postprocessing.performance_report = MagicMock()


def test_report_additional_results_csv_columns(basic_residential_project_file):
Expand Down