From 5437660eef78e7fe481e5fd7776a6dd429402e0d Mon Sep 17 00:00:00 2001 From: Robert LaThanh Date: Wed, 29 Nov 2023 16:31:40 -0800 Subject: [PATCH 1/3] Refactor docker_base to use inversion-of-control --- buildstockbatch/aws/aws.py | 61 +++--- buildstockbatch/cloud/docker_base.py | 226 ++++++++++++++++++----- buildstockbatch/test/test_docker_base.py | 31 ++-- 3 files changed, 217 insertions(+), 101 deletions(-) diff --git a/buildstockbatch/aws/aws.py b/buildstockbatch/aws/aws.py index 55f95a2c..6e4c1a7f 100644 --- a/buildstockbatch/aws/aws.py +++ b/buildstockbatch/aws/aws.py @@ -27,7 +27,6 @@ import shutil import subprocess import tarfile -import tempfile import re import time import io @@ -1579,47 +1578,31 @@ def clean(self): sns_env = AwsSNS(self.job_identifier, self.cfg["aws"], self.boto3_session) sns_env.clean() - def run_batch(self): - """ - Run a batch of simulations using AWS Batch - - This will - - perform the sampling - - package and upload the assets, including weather - - kick off a batch simulation on AWS - """ - # Compress and upload assets to S3 - with tempfile.TemporaryDirectory(prefix="bsb_") as tmpdir, tempfile.TemporaryDirectory( - prefix="bsb_" - ) as tmp_weather_dir: - self._weather_dir = tmp_weather_dir - tmppath = pathlib.Path(tmpdir) - - array_size, unique_epws = self.prep_batches(tmppath) - - logger.debug("Uploading files to S3") - upload_directory_to_s3( - tmppath, - self.cfg["aws"]["s3"]["bucket"], - self.cfg["aws"]["s3"]["prefix"], - ) - - # Copy the non-unique weather files on S3 - epws_to_copy = [] - for epws in unique_epws.values(): - # The first in the list is already up there, copy the rest - for filename in epws[1:]: - epws_to_copy.append( - ( - f"{self.cfg['aws']['s3']['prefix']}/weather/{epws[0]}.gz", - f"{self.cfg['aws']['s3']['prefix']}/weather/{filename}.gz", - ) - ) + def upload_batch_files_to_cloud(self, tmppath): + """Implements :func:`DockerBase.upload_batch_files_to_cloud`""" + logger.debug("Uploading Batch files to S3") + upload_directory_to_s3( + tmppath, + self.cfg["aws"]["s3"]["bucket"], + self.cfg["aws"]["s3"]["prefix"], + ) + def copy_files_at_cloud(self, files_to_copy): + """Implements :func:`DockerBase.copy_files_at_cloud`""" logger.debug("Copying weather files on S3") bucket = self.cfg["aws"]["s3"]["bucket"] - Parallel(n_jobs=-1, verbose=9)(delayed(copy_s3_file)(bucket, src, bucket, dest) for src, dest in epws_to_copy) + Parallel(n_jobs=-1, verbose=9)( + delayed(copy_s3_file)( + bucket, + f"{self.cfg['aws']['s3']['prefix']}/weather/{src}", + bucket, + f"{self.cfg['aws']['s3']['prefix']}/weather/{dest}", + ) + for src, dest in files_to_copy + ) + def start_batch_job(self, batch_info): + """Implements :func:`DockerBase.start_batch_job`""" # Create the output directories fs = S3FileSystem() for upgrade_id in range(len(self.cfg.get("upgrades", [])) + 1): @@ -1675,7 +1658,7 @@ def run_batch(self): batch_env.create_emr_cluster_function() # start job - batch_env.start_state_machine_execution(array_size) + batch_env.start_state_machine_execution(batch_info.job_count) logger.info("Batch job submitted. Check your email to subscribe to notifications.") diff --git a/buildstockbatch/cloud/docker_base.py b/buildstockbatch/cloud/docker_base.py index 2551c45f..e67bc3a1 100644 --- a/buildstockbatch/cloud/docker_base.py +++ b/buildstockbatch/cloud/docker_base.py @@ -10,6 +10,7 @@ """ import collections import docker +from dataclasses import dataclass import itertools from joblib import Parallel, delayed import json @@ -20,6 +21,7 @@ import random import shutil import tarfile +import tempfile import time from buildstockbatch.base import BuildStockBatchBase @@ -31,6 +33,19 @@ class DockerBatchBase(BuildStockBatchBase): """Base class for implementations that run in Docker containers.""" + @dataclass + class BatchInfo: + """Information about the Batch jobs to be run.""" + + # The total number of simulations that will be run. + n_sims: int + + # The total number of simulations that each job will run. + n_sims_per_job: int + + # The number of jobs the samples were split into. + job_count: int + CONTAINER_RUNTIME = ContainerRuntime.DOCKER MAX_JOB_COUNT = 10000 @@ -56,30 +71,171 @@ def docker_image(self): def weather_dir(self): return self._weather_dir - def prep_batches(self, tmppath): + def upload_batch_files_to_cloud(self, tmppath): + """Upload all files in ``tmppath`` to the cloud (where they will be used by the batch + jobs). """ - Prepare batches of samples to be uploaded and run in the cloud. + raise NotImplementedError - This will: - - Perform the sampling - - Split the samples into (at most) self.batch_array_size batches - - Collect and package and the required assets, including weather - files, and write them to tmppath. + def copy_files_at_cloud(self, files_to_copy): + """Copy files from-cloud-to-cloud storage. This is used to avoid using bandwidth to upload + duplicate files. - self.weather_dir must exist before calling this method. This is where weather files are stored temporarily. + :param files_to_copy: a dict where the key is a file on the cloud to copy, and the value is + the filename to copy the source file to. Both are relative to the ``tmppath`` used in + ``prep_batches()`` (so the implementation should prepend the bucket name and prefix + where they were uploaded to by ``upload_batch_files_to_cloud``). + """ + raise NotImplementedError + + def start_batch_job(self, batch_info): + """Create and start the Batch job on the cloud. - :param tmppath: Path to a temporary directory where files should be collected before uploading. + Files used by the batch job will have been prepared and uploaded (by + :func:`DockerBase.run_batch`, which is what runs this). - :returns: (job_count, unique_epws), where - job_count: The number of jobs the samples were split into. - unique_epws: A dictionary mapping from the hash of weather files to a list of filenames - with that hashed value. Only the first in each list is written to tmppath, so - this can be used to recreate the other files later. + :param batch_info: A :class:`DockerBatchBase.BatchInfo` containing information about the job """ - # Generate buildstock.csv - buildstock_csv_filename = self.sampler.run_sampling() + raise NotImplementedError + + def run_batch(self): + """Prepare and start a Batch job on the cloud to run simulations. + + This does all the cloud-agnostic prep (such as preparing weather files, assets, and job + definition), delegating to the implementations to upload those files to the cloud (using + (:func:`upload_batch_files_to_cloud` and :func:`copy_files_at_cloud`), and then calls the + implementation's :func:`start_batch_job` to actually create and start the batch job. + """ + with tempfile.TemporaryDirectory(prefix="bsb_") as tmpdir: + tmppath = pathlib.Path(tmpdir) + epws_to_copy, batch_info = self._run_batch_prep(tmppath) + + # Copy all the files to cloud storage + logger.info("Uploading files for batch...") + self.upload_batch_files_to_cloud(tmppath) + + logger.info("Copying duplicate weather files...") + self.copy_files_at_cloud(epws_to_copy) + + self.start_batch_job(batch_info) + + def _run_batch_prep(self, tmppath): + """Do preparation for running the Batch jobs on the cloud, including producing and uploading + files to the cloud that the batch jobs will use. + + This includes: + - Weather files (:func:`_prep_weather_files_for_batch`) + - Assets (:func:`_prep_assets_for_batch`) + - Sampling, and splitting the samples into (at most) ``self.batch_array_size`` batches + (:func:`_prep_jobs_for_batch`) + + Those functions place their files to be uploaded into ``tmppath``, and then this will upload + them to the cloud using (:func:`upload_batch_files_to_cloud`). + + Duplicate weather files will have been excluded from ``tmppath``, and this will use + (:func:`copy_files_at_cloud`) to copy those files from-cloud-to-cloud (instead of uploading + them). + + ``self.weather_dir`` must exist before calling this method. This is where weather files are + stored temporarily. + + This takes ``tmppath`` (rather than managing itself) for testability (so test can manage and + inspect the contents of the tmppath). + + :returns: DockerBatchBase.BatchInfo + """ + + # Weather files + logger.info("Prepping weather files...") + epws_to_copy = self._prep_weather_files_for_batch(tmppath) + + # Assets + self._prep_assets_for_batch(tmppath) + + # Project configuration + logger.info("Writing project configuration for upload") + with open(tmppath / "config.json", "wt", encoding="utf-8") as f: + json.dump(self.cfg, f) - self._get_weather_files() + # Collect simulations to queue + batch_info = self._prep_jobs_for_batch(tmppath) + + return (epws_to_copy, batch_info) + + def _prep_weather_files_for_batch(self, tmppath): + """Downloads, if necessary, and extracts weather files to ``self._weather_dir``. + + Because there may be duplicate weather files, this also identifies duplicates to avoid + redundant compression work and bytes uploaded to the cloud. + + It will put unique files in the ``tmppath`` (in the 'weather' subdir) which will get + uploaded to the cloud along with other batch files. It will also return a list of + duplicates. This will allow the duplicates to be quickly recreated on the cloud via copying + from-cloud-to-cloud. + + :param tmppath: Unique weather files (compressed) will be copied into a 'weather' subdir + of this path. + + :returns: an array of tuples where the first value is the filename of a file that will be + uploaded to cloud storage (because it's in the ``tmppath``), and the second value is the + filename that the first should be copied to. + For example, ``[("G2601210.epw.gz", "G2601390.epw.gz")]``. + """ + with tempfile.TemporaryDirectory(prefix="bsb_") as tmp_weather_in_dir: + self._weather_dir = tmp_weather_in_dir + + # Downloads, if necessary, and extracts weather files to ``self._weather_dir`` + self._get_weather_files() + + # Determine the unique weather files + epw_filenames = list(filter(lambda x: x.endswith(".epw"), os.listdir(self.weather_dir))) + logger.info("Calculating hashes for weather files") + epw_hashes = Parallel(n_jobs=-1, verbose=9)( + delayed(calc_hash_for_file)(pathlib.Path(self.weather_dir) / epw_filename) + for epw_filename in epw_filenames + ) + # keep track of unique EPWs that may have dupes, and to compress and upload to cloud + unique_epws = collections.defaultdict(list) + # keep track of duplicates of the unique EPWs to copy (from cloud-to-cloud) + epws_to_copy = [] + for epw_filename, epw_hash in zip(epw_filenames, epw_hashes): + if bool(unique_epws[epw_hash]): + # not the first file with this hash (it's a duplicate). add to ``epws_to_copy`` + epws_to_copy.append((unique_epws[epw_hash][0] + ".gz", epw_filename + ".gz")) + unique_epws[epw_hash].append(epw_filename) + + # Compress unique weather files and save to ``tmp_weather_out_path``, which will get + # uploaded to cloud storage + logger.info("Compressing unique weather files") + tmp_weather_out_path = tmppath / "weather" + os.makedirs(tmp_weather_out_path) + Parallel(n_jobs=-1, verbose=9)( + delayed(compress_file)( + pathlib.Path(self.weather_dir) / x[0], + str(tmp_weather_out_path / x[0]) + ".gz", + ) + for x in unique_epws.values() + ) + + # Calculate and print savings of duplicate files + total_count = 0 + dupe_count = 0 + dupe_bytes = 0 + for epws in unique_epws.values(): + count = len(epws) + total_count += count + if count > 1: + dupe_count += count - 1 + bytes = os.path.getsize(str(tmp_weather_out_path / epws[0]) + ".gz") * dupe_count + dupe_bytes = bytes * (count - 1) + logger.info( + f"Identified {dupe_count:,} duplicate weather files " + f"({len(unique_epws):,} unique, {total_count:,} total); " + f"saved from uploading {(dupe_bytes / 1024 / 1024):,.1f} MiB" + ) + return epws_to_copy + + def _prep_assets_for_batch(self, tmppath): logger.debug("Creating assets tarfile") with tarfile.open(tmppath / "assets.tar.gz", "x:gz") as tar_f: project_path = pathlib.Path(self.project_dir) @@ -96,35 +252,9 @@ def prep_batches(self, tmppath): "lib/housing_characteristics", ) - # Weather files - weather_path = tmppath / "weather" - os.makedirs(weather_path) - - # Determine the unique weather files - epw_filenames = list(filter(lambda x: x.endswith(".epw"), os.listdir(self.weather_dir))) - logger.debug("Calculating hashes for weather files") - epw_hashes = Parallel(n_jobs=-1, verbose=9)( - delayed(calc_hash_for_file)(pathlib.Path(self.weather_dir) / epw_filename) for epw_filename in epw_filenames - ) - unique_epws = collections.defaultdict(list) - for epw_filename, epw_hash in zip(epw_filenames, epw_hashes): - unique_epws[epw_hash].append(epw_filename) - - # Compress unique weather files - logger.debug("Compressing weather files") - Parallel(n_jobs=-1, verbose=9)( - delayed(compress_file)( - pathlib.Path(self.weather_dir) / x[0], - str(weather_path / x[0]) + ".gz", - ) - for x in unique_epws.values() - ) - - logger.debug("Writing project configuration for upload") - with open(tmppath / "config.json", "wt", encoding="utf-8") as f: - json.dump(self.cfg, f) - - # Collect simulations to queue + def _prep_jobs_for_batch(self, tmppath): + # Generate buildstock.csv + buildstock_csv_filename = self.sampler.run_sampling() df = read_csv(buildstock_csv_filename, index_col=0, dtype=str) self.validate_buildstock_csv(self.project_filename, df) building_ids = df.index.tolist() @@ -180,6 +310,4 @@ def prep_batches(self, tmppath): logger.debug("Done compressing job jsons using gz {:.1f} seconds".format(tick)) shutil.rmtree(jobs_dir) - os.makedirs(tmppath / "results" / "simulation_output") - - return (job_count, unique_epws) + return DockerBatchBase.BatchInfo(n_sims=n_sims, n_sims_per_job=n_sims_per_job, job_count=job_count) diff --git a/buildstockbatch/test/test_docker_base.py b/buildstockbatch/test/test_docker_base.py index be1a394c..1be262db 100644 --- a/buildstockbatch/test/test_docker_base.py +++ b/buildstockbatch/test/test_docker_base.py @@ -14,7 +14,7 @@ @docker_available -def test_prep_batches(basic_residential_project_file, mocker): +def test_run_batch_prep(basic_residential_project_file, mocker): """Test that samples are created and bundled into batches correctly.""" project_filename, results_dir = basic_residential_project_file() @@ -29,24 +29,29 @@ def test_prep_batches(basic_residential_project_file, mocker): dbb.batch_array_size = 3 DockerBatchBase.validate_project = MagicMock(return_value=True) - with tempfile.TemporaryDirectory(prefix="bsb_") as tmpdir, tempfile.TemporaryDirectory( - prefix="bsb_" - ) as tmp_weather_dir: - dbb._weather_dir = tmp_weather_dir + with tempfile.TemporaryDirectory(prefix="bsb_") as tmpdir: tmppath = pathlib.Path(tmpdir) - - job_count, unique_epws = dbb.prep_batches(tmppath) + epws_to_copy, batch_info = dbb._run_batch_prep(tmppath) sampler_mock.run_sampling.assert_called_once() - # Of the three test weather files, two are identical - assert sorted([sorted(i) for i in unique_epws.values()]) == [ - ["G2500210.epw"], - ["G2601210.epw", "G2601390.epw"], - ] + # There are three weather files... + # * "G2500210.epw" is unique; check for it (gz'd) in tmppath + # * "G2601210.epw" and "G2601390.epw" are dupes. One should be in + # tmppath; one should be copied to the other according to ``epws_to_copy`` + assert os.path.isfile(tmppath / "weather" / "G2500210.epw.gz") + assert os.path.isfile(tmppath / "weather" / "G2601210.epw.gz") or os.path.isfile( + tmppath / "weather" / "G2601390.epw.gz" + ) + src, dest = epws_to_copy[0] + assert src in ("G2601210.epw.gz", "G2601390.epw.gz") + assert dest in ("G2601210.epw.gz", "G2601390.epw.gz") + assert src != dest # Three job files should be created, with 10 total simulations, split # into batches of 4, 4, and 2 simulations. - assert job_count == 3 + assert batch_info.n_sims == 10 + assert batch_info.n_sims_per_job == 4 + assert batch_info.job_count == 3 jobs_file_path = tmppath / "jobs.tar.gz" with tarfile.open(jobs_file_path, "r") as tar_f: all_job_files = ["jobs", "jobs/job00000.json", "jobs/job00001.json", "jobs/job00002.json"] From 10ed80fd931609c166918d62e1ef641aa576c511 Mon Sep 17 00:00:00 2001 From: Robert LaThanh Date: Mon, 4 Dec 2023 14:08:59 -0800 Subject: [PATCH 2/3] Add changelog entry --- docs/changelog/changelog_dev.rst | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/docs/changelog/changelog_dev.rst b/docs/changelog/changelog_dev.rst index 75eeab3d..8ee3bfaf 100644 --- a/docs/changelog/changelog_dev.rst +++ b/docs/changelog/changelog_dev.rst @@ -14,3 +14,12 @@ Development Changelog This is an example change. Please copy and paste it - for valid tags please refer to ``conf.py`` in the docs directory. ``pullreq`` should be set to the appropriate pull request number and ``tickets`` to any related github issues. These will be automatically linked in the documentation. + + .. change:: + :tags: general + :pullreq: 421 + + Refactor docker_base to use inversion of control so that it can more strongly and easily ensure consistency + between various implementations (GCP implementation to come). This also includes teasing apart the several batch + prep steps (weather, assets, and jobs) into their own methods so they can each be more easily understood, + shared, and maintained. From a131f3236872f356fb758ae2ecee7a4c49a20496 Mon Sep 17 00:00:00 2001 From: Natalie Weires Date: Wed, 6 Dec 2023 15:14:10 -0500 Subject: [PATCH 3/3] Run sampling before uploading assets (#41) --- buildstockbatch/cloud/docker_base.py | 48 ++++++++++++++-------------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/buildstockbatch/cloud/docker_base.py b/buildstockbatch/cloud/docker_base.py index e67bc3a1..fb76417e 100644 --- a/buildstockbatch/cloud/docker_base.py +++ b/buildstockbatch/cloud/docker_base.py @@ -125,9 +125,8 @@ def _run_batch_prep(self, tmppath): This includes: - Weather files (:func:`_prep_weather_files_for_batch`) - - Assets (:func:`_prep_assets_for_batch`) - - Sampling, and splitting the samples into (at most) ``self.batch_array_size`` batches - (:func:`_prep_jobs_for_batch`) + - Sampling, and splitting the samples into (at most) ``self.batch_array_size`` batches, + and bundling other assets needed for running simulations (:func:`_prep_jobs_for_batch`) Those functions place their files to be uploaded into ``tmppath``, and then this will upload them to the cloud using (:func:`upload_batch_files_to_cloud`). @@ -149,9 +148,6 @@ def _run_batch_prep(self, tmppath): logger.info("Prepping weather files...") epws_to_copy = self._prep_weather_files_for_batch(tmppath) - # Assets - self._prep_assets_for_batch(tmppath) - # Project configuration logger.info("Writing project configuration for upload") with open(tmppath / "config.json", "wt", encoding="utf-8") as f: @@ -235,26 +231,11 @@ def _prep_weather_files_for_batch(self, tmppath): ) return epws_to_copy - def _prep_assets_for_batch(self, tmppath): - logger.debug("Creating assets tarfile") - with tarfile.open(tmppath / "assets.tar.gz", "x:gz") as tar_f: - project_path = pathlib.Path(self.project_dir) - buildstock_path = pathlib.Path(self.buildstock_dir) - tar_f.add(buildstock_path / "measures", "measures") - if os.path.exists(buildstock_path / "resources/hpxml-measures"): - tar_f.add( - buildstock_path / "resources/hpxml-measures", - "resources/hpxml-measures", - ) - tar_f.add(buildstock_path / "resources", "lib/resources") - tar_f.add( - project_path / "housing_characteristics", - "lib/housing_characteristics", - ) - def _prep_jobs_for_batch(self, tmppath): - # Generate buildstock.csv + """Splits simulations into batches, and prepares asset files needed to run them.""" + # Run sampling - generates buildstock.csv buildstock_csv_filename = self.sampler.run_sampling() + df = read_csv(buildstock_csv_filename, index_col=0, dtype=str) self.validate_buildstock_csv(self.project_filename, df) building_ids = df.index.tolist() @@ -310,4 +291,23 @@ def _prep_jobs_for_batch(self, tmppath): logger.debug("Done compressing job jsons using gz {:.1f} seconds".format(tick)) shutil.rmtree(jobs_dir) + # Bundle together assets used when running OpenStudio simulations. + # Note: The housing_characteristics directory includes buildstock.csv + # generated by `run_sampling`. + logger.debug("Creating assets tarfile") + with tarfile.open(tmppath / "assets.tar.gz", "x:gz") as tar_f: + project_path = pathlib.Path(self.project_dir) + buildstock_path = pathlib.Path(self.buildstock_dir) + tar_f.add(buildstock_path / "measures", "measures") + if os.path.exists(buildstock_path / "resources/hpxml-measures"): + tar_f.add( + buildstock_path / "resources/hpxml-measures", + "resources/hpxml-measures", + ) + tar_f.add(buildstock_path / "resources", "lib/resources") + tar_f.add( + project_path / "housing_characteristics", + "lib/housing_characteristics", + ) + return DockerBatchBase.BatchInfo(n_sims=n_sims, n_sims_per_job=n_sims_per_job, job_count=job_count)