Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor docker_base to use inversion of control #421

Merged
merged 4 commits into from Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
61 changes: 22 additions & 39 deletions buildstockbatch/aws/aws.py
Expand Up @@ -23,7 +23,6 @@
import random
from s3fs import S3FileSystem
import tarfile
import tempfile
import re
import time
import io
Expand Down Expand Up @@ -1574,47 +1573,31 @@ def clean(self):
sns_env = AwsSNS(self.job_identifier, self.cfg["aws"], self.boto3_session)
sns_env.clean()

def run_batch(self):
"""
Run a batch of simulations using AWS Batch
This will
- perform the sampling
- package and upload the assets, including weather
- kick off a batch simulation on AWS
"""
# Compress and upload assets to S3
with tempfile.TemporaryDirectory(prefix="bsb_") as tmpdir, tempfile.TemporaryDirectory(
prefix="bsb_"
) as tmp_weather_dir:
self._weather_dir = tmp_weather_dir
tmppath = pathlib.Path(tmpdir)

array_size, unique_epws = self.prep_batches(tmppath)

logger.debug("Uploading files to S3")
upload_directory_to_s3(
tmppath,
self.cfg["aws"]["s3"]["bucket"],
self.cfg["aws"]["s3"]["prefix"],
)

# Copy the non-unique weather files on S3
epws_to_copy = []
for epws in unique_epws.values():
# The first in the list is already up there, copy the rest
for filename in epws[1:]:
epws_to_copy.append(
(
f"{self.cfg['aws']['s3']['prefix']}/weather/{epws[0]}.gz",
f"{self.cfg['aws']['s3']['prefix']}/weather/{filename}.gz",
)
)
def upload_batch_files_to_cloud(self, tmppath):
"""Implements :func:`DockerBase.upload_batch_files_to_cloud`"""
logger.debug("Uploading Batch files to S3")
upload_directory_to_s3(
tmppath,
self.cfg["aws"]["s3"]["bucket"],
self.cfg["aws"]["s3"]["prefix"],
)

def copy_files_at_cloud(self, files_to_copy):
"""Implements :func:`DockerBase.copy_files_at_cloud`"""
logger.debug("Copying weather files on S3")
bucket = self.cfg["aws"]["s3"]["bucket"]
Parallel(n_jobs=-1, verbose=9)(delayed(copy_s3_file)(bucket, src, bucket, dest) for src, dest in epws_to_copy)
Parallel(n_jobs=-1, verbose=9)(
delayed(copy_s3_file)(
bucket,
f"{self.cfg['aws']['s3']['prefix']}/weather/{src}",
bucket,
f"{self.cfg['aws']['s3']['prefix']}/weather/{dest}",
)
for src, dest in files_to_copy
)

def start_batch_job(self, batch_info):
"""Implements :func:`DockerBase.start_batch_job`"""
# Create the output directories
fs = S3FileSystem()
for upgrade_id in range(len(self.cfg.get("upgrades", [])) + 1):
Expand Down Expand Up @@ -1670,7 +1653,7 @@ def run_batch(self):
batch_env.create_emr_cluster_function()

# start job
batch_env.start_state_machine_execution(array_size)
batch_env.start_state_machine_execution(batch_info.job_count)

logger.info("Batch job submitted. Check your email to subscribe to notifications.")

Expand Down
242 changes: 185 additions & 57 deletions buildstockbatch/cloud/docker_base.py
Expand Up @@ -10,6 +10,7 @@
"""
import collections
import csv
from dataclasses import dataclass
import docker
from fsspec.implementations.local import LocalFileSystem
import gzip
Expand All @@ -24,6 +25,7 @@
import shutil
import subprocess
import tarfile
import tempfile
import time

from buildstockbatch import postprocessing
Expand All @@ -36,6 +38,19 @@
class DockerBatchBase(BuildStockBatchBase):
"""Base class for implementations that run in Docker containers."""

@dataclass
class BatchInfo:
"""Information about the Batch jobs to be run."""

# The total number of simulations that will be run.
n_sims: int

# The total number of simulations that each job will run.
n_sims_per_job: int

# The number of jobs the samples were split into.
job_count: int

CONTAINER_RUNTIME = ContainerRuntime.DOCKER
MAX_JOB_COUNT = 10000

Expand All @@ -61,75 +76,171 @@ def docker_image(self):
def weather_dir(self):
return self._weather_dir

def prep_batches(self, tmppath):
def upload_batch_files_to_cloud(self, tmppath):
"""Upload all files in ``tmppath`` to the cloud (where they will be used by the batch
jobs).
"""
Prepare batches of samples to be uploaded and run in the cloud.
raise NotImplementedError

This will:
- Perform the sampling
- Split the samples into (at most) self.batch_array_size batches
- Collect and package and the required assets, including weather
files, and write them to tmppath.
def copy_files_at_cloud(self, files_to_copy):
"""Copy files from-cloud-to-cloud storage. This is used to avoid using bandwidth to upload
duplicate files.
self.weather_dir must exist before calling this method. This is where weather files are stored temporarily.
:param files_to_copy: a dict where the key is a file on the cloud to copy, and the value is
the filename to copy the source file to. Both are relative to the ``tmppath`` used in
``prep_batches()`` (so the implementation should prepend the bucket name and prefix
where they were uploaded to by ``upload_batch_files_to_cloud``).
"""
raise NotImplementedError

:param tmppath: Path to a temporary directory where files should be collected before uploading.
def start_batch_job(self, batch_info):
"""Create and start the Batch job on the cloud.
:returns: (job_count, unique_epws), where
job_count: The number of jobs the samples were split into.
unique_epws: A dictionary mapping from the hash of weather files to a list of filenames
with that hashed value. Only the first in each list is written to tmppath, so
this can be used to recreate the other files later.
Files used by the batch job will have been prepared and uploaded (by
:func:`DockerBase.run_batch`, which is what runs this).
:param batch_info: A :class:`DockerBatchBase.BatchInfo` containing information about the job
"""
# Generate buildstock.csv
buildstock_csv_filename = self.sampler.run_sampling()
raise NotImplementedError

self._get_weather_files()
logger.debug("Creating assets tarfile")
with tarfile.open(tmppath / "assets.tar.gz", "x:gz") as tar_f:
project_path = pathlib.Path(self.project_dir)
buildstock_path = pathlib.Path(self.buildstock_dir)
tar_f.add(buildstock_path / "measures", "measures")
if os.path.exists(buildstock_path / "resources/hpxml-measures"):
tar_f.add(
buildstock_path / "resources/hpxml-measures",
"resources/hpxml-measures",
)
tar_f.add(buildstock_path / "resources", "lib/resources")
tar_f.add(
project_path / "housing_characteristics",
"lib/housing_characteristics",
)
def run_batch(self):
"""Prepare and start a Batch job on the cloud to run simulations.
This does all the cloud-agnostic prep (such as preparing weather files, assets, and job
definition), delegating to the implementations to upload those files to the cloud (using
(:func:`upload_batch_files_to_cloud` and :func:`copy_files_at_cloud`), and then calls the
implementation's :func:`start_batch_job` to actually create and start the batch job.
"""
with tempfile.TemporaryDirectory(prefix="bsb_") as tmpdir:
tmppath = pathlib.Path(tmpdir)
epws_to_copy, batch_info = self._run_batch_prep(tmppath)

# Copy all the files to cloud storage
logger.info("Uploading files for batch...")
self.upload_batch_files_to_cloud(tmppath)

logger.info("Copying duplicate weather files...")
self.copy_files_at_cloud(epws_to_copy)

self.start_batch_job(batch_info)

def _run_batch_prep(self, tmppath):
"""Do preparation for running the Batch jobs on the cloud, including producing and uploading
files to the cloud that the batch jobs will use.
This includes:
- Weather files (:func:`_prep_weather_files_for_batch`)
- Sampling, and splitting the samples into (at most) ``self.batch_array_size`` batches,
and bundling other assets needed for running simulations (:func:`_prep_jobs_for_batch`)
Those functions place their files to be uploaded into ``tmppath``, and then this will upload
them to the cloud using (:func:`upload_batch_files_to_cloud`).
Duplicate weather files will have been excluded from ``tmppath``, and this will use
(:func:`copy_files_at_cloud`) to copy those files from-cloud-to-cloud (instead of uploading
them).
``self.weather_dir`` must exist before calling this method. This is where weather files are
stored temporarily.
This takes ``tmppath`` (rather than managing itself) for testability (so test can manage and
inspect the contents of the tmppath).
:returns: DockerBatchBase.BatchInfo
"""

# Weather files
weather_path = tmppath / "weather"
os.makedirs(weather_path)

# Determine the unique weather files
epw_filenames = list(filter(lambda x: x.endswith(".epw"), os.listdir(self.weather_dir)))
logger.debug("Calculating hashes for weather files")
epw_hashes = Parallel(n_jobs=-1, verbose=9)(
delayed(calc_hash_for_file)(pathlib.Path(self.weather_dir) / epw_filename) for epw_filename in epw_filenames
)
unique_epws = collections.defaultdict(list)
for epw_filename, epw_hash in zip(epw_filenames, epw_hashes):
unique_epws[epw_hash].append(epw_filename)

# Compress unique weather files
logger.debug("Compressing weather files")
Parallel(n_jobs=-1, verbose=9)(
delayed(compress_file)(
pathlib.Path(self.weather_dir) / x[0],
str(weather_path / x[0]) + ".gz",
)
for x in unique_epws.values()
)
logger.info("Prepping weather files...")
epws_to_copy = self._prep_weather_files_for_batch(tmppath)

logger.debug("Writing project configuration for upload")
# Project configuration
logger.info("Writing project configuration for upload")
with open(tmppath / "config.json", "wt", encoding="utf-8") as f:
json.dump(self.cfg, f)

# Collect simulations to queue
batch_info = self._prep_jobs_for_batch(tmppath)

return (epws_to_copy, batch_info)

def _prep_weather_files_for_batch(self, tmppath):
"""Downloads, if necessary, and extracts weather files to ``self._weather_dir``.
Because there may be duplicate weather files, this also identifies duplicates to avoid
redundant compression work and bytes uploaded to the cloud.
It will put unique files in the ``tmppath`` (in the 'weather' subdir) which will get
uploaded to the cloud along with other batch files. It will also return a list of
duplicates. This will allow the duplicates to be quickly recreated on the cloud via copying
from-cloud-to-cloud.
:param tmppath: Unique weather files (compressed) will be copied into a 'weather' subdir
of this path.
:returns: an array of tuples where the first value is the filename of a file that will be
uploaded to cloud storage (because it's in the ``tmppath``), and the second value is the
filename that the first should be copied to.
For example, ``[("G2601210.epw.gz", "G2601390.epw.gz")]``.
"""
with tempfile.TemporaryDirectory(prefix="bsb_") as tmp_weather_in_dir:
self._weather_dir = tmp_weather_in_dir

# Downloads, if necessary, and extracts weather files to ``self._weather_dir``
self._get_weather_files()

# Determine the unique weather files
epw_filenames = list(filter(lambda x: x.endswith(".epw"), os.listdir(self.weather_dir)))
logger.info("Calculating hashes for weather files")
epw_hashes = Parallel(n_jobs=-1, verbose=9)(
delayed(calc_hash_for_file)(pathlib.Path(self.weather_dir) / epw_filename)
for epw_filename in epw_filenames
)
# keep track of unique EPWs that may have dupes, and to compress and upload to cloud
unique_epws = collections.defaultdict(list)
# keep track of duplicates of the unique EPWs to copy (from cloud-to-cloud)
epws_to_copy = []
for epw_filename, epw_hash in zip(epw_filenames, epw_hashes):
if bool(unique_epws[epw_hash]):
# not the first file with this hash (it's a duplicate). add to ``epws_to_copy``
epws_to_copy.append((unique_epws[epw_hash][0] + ".gz", epw_filename + ".gz"))
unique_epws[epw_hash].append(epw_filename)

# Compress unique weather files and save to ``tmp_weather_out_path``, which will get
# uploaded to cloud storage
logger.info("Compressing unique weather files")
tmp_weather_out_path = tmppath / "weather"
os.makedirs(tmp_weather_out_path)
Parallel(n_jobs=-1, verbose=9)(
delayed(compress_file)(
pathlib.Path(self.weather_dir) / x[0],
str(tmp_weather_out_path / x[0]) + ".gz",
)
for x in unique_epws.values()
)

# Calculate and print savings of duplicate files
total_count = 0
dupe_count = 0
dupe_bytes = 0
for epws in unique_epws.values():
count = len(epws)
total_count += count
if count > 1:
dupe_count += count - 1
bytes = os.path.getsize(str(tmp_weather_out_path / epws[0]) + ".gz") * dupe_count
dupe_bytes = bytes * (count - 1)
logger.info(
f"Identified {dupe_count:,} duplicate weather files "
f"({len(unique_epws):,} unique, {total_count:,} total); "
f"saved from uploading {(dupe_bytes / 1024 / 1024):,.1f} MiB"
)
return epws_to_copy

def _prep_jobs_for_batch(self, tmppath):
"""Splits simulations into batches, and prepares asset files needed to run them."""
# Run sampling - generates buildstock.csv
buildstock_csv_filename = self.sampler.run_sampling()

df = read_csv(buildstock_csv_filename, index_col=0, dtype=str)
self.validate_buildstock_csv(self.project_filename, df)
building_ids = df.index.tolist()
Expand Down Expand Up @@ -185,9 +296,26 @@ def prep_batches(self, tmppath):
logger.debug("Done compressing job jsons using gz {:.1f} seconds".format(tick))
shutil.rmtree(jobs_dir)

os.makedirs(tmppath / "results" / "simulation_output")
# Bundle together assets used when running OpenStudio simulations.
# Note: The housing_characteristics directory includes buildstock.csv
# generated by `run_sampling`.
logger.debug("Creating assets tarfile")
with tarfile.open(tmppath / "assets.tar.gz", "x:gz") as tar_f:
project_path = pathlib.Path(self.project_dir)
buildstock_path = pathlib.Path(self.buildstock_dir)
tar_f.add(buildstock_path / "measures", "measures")
if os.path.exists(buildstock_path / "resources/hpxml-measures"):
tar_f.add(
buildstock_path / "resources/hpxml-measures",
"resources/hpxml-measures",
)
tar_f.add(buildstock_path / "resources", "lib/resources")
tar_f.add(
project_path / "housing_characteristics",
"lib/housing_characteristics",
)

return (job_count, unique_epws)
return DockerBatchBase.BatchInfo(n_sims=n_sims, n_sims_per_job=n_sims_per_job, job_count=job_count)

@classmethod
def get_epws_to_download(cls, sim_dir, jobs_d):
Expand Down