Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move more shared code into DockerBatchBase #422

Merged
merged 3 commits into from Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
123 changes: 2 additions & 121 deletions buildstockbatch/aws/aws.py
Expand Up @@ -14,8 +14,6 @@
import base64
import boto3
from botocore.exceptions import ClientError
import csv
from fsspec.implementations.local import LocalFileSystem
import gzip
from joblib import Parallel, delayed
import json
Expand All @@ -24,16 +22,13 @@
import pathlib
import random
from s3fs import S3FileSystem
import shutil
import subprocess
import tarfile
import tempfile
import re
import time
import io
import zipfile

from buildstockbatch import postprocessing
from buildstockbatch.aws.awsbase import AwsJobBase
from buildstockbatch.base import ValidationError
from buildstockbatch.cloud.docker_base import DockerBatchBase
Expand Down Expand Up @@ -1717,37 +1712,7 @@ def run_job(cls, job_id, bucket, prefix, job_name, region):
weather_dir = sim_dir / "weather"
os.makedirs(weather_dir, exist_ok=True)

# Make a lookup of which parameter points to the weather file from options_lookup.tsv
with open(sim_dir / "lib" / "resources" / "options_lookup.tsv", "r", encoding="utf-8") as f:
tsv_reader = csv.reader(f, delimiter="\t")
next(tsv_reader) # skip headers
param_name = None
epws_by_option = {}
for row in tsv_reader:
row_has_epw = [x.endswith(".epw") for x in row[2:]]
if sum(row_has_epw):
if row[0] != param_name and param_name is not None:
raise RuntimeError(
"The epw files are specified in options_lookup.tsv under more than one parameter type: "
f"{param_name}, {row[0]}"
)
epw_filename = row[row_has_epw.index(True) + 2].split("=")[1]
param_name = row[0]
option_name = row[1]
epws_by_option[option_name] = epw_filename

# Look through the buildstock.csv to find the appropriate location and epw
epws_to_download = set()
building_ids = [x[0] for x in jobs_d["batch"]]
with open(
sim_dir / "lib" / "housing_characteristics" / "buildstock.csv",
"r",
encoding="utf-8",
) as f:
csv_reader = csv.DictReader(f)
for row in csv_reader:
if int(row["Building"]) in building_ids:
epws_to_download.add(epws_by_option[row[param_name]])
epws_to_download = cls.get_epws_to_download(sim_dir, jobs_d)

# Download the epws needed for these simulations
for epw_filename in epws_to_download:
Expand All @@ -1757,92 +1722,8 @@ def run_job(cls, job_id, bucket, prefix, job_name, region):
with open(weather_dir / epw_filename, "wb") as f_out:
logger.debug("Extracting {}".format(epw_filename))
f_out.write(gzip.decompress(f_gz.getvalue()))
asset_dirs = os.listdir(sim_dir)

fs = S3FileSystem()
local_fs = LocalFileSystem()
reporting_measures = cls.get_reporting_measures(cfg)
dpouts = []
simulation_output_tar_filename = sim_dir.parent / "simulation_outputs.tar.gz"
with tarfile.open(str(simulation_output_tar_filename), "w:gz") as simout_tar:
for building_id, upgrade_idx in jobs_d["batch"]:
upgrade_id = 0 if upgrade_idx is None else upgrade_idx + 1
sim_id = f"bldg{building_id:07d}up{upgrade_id:02d}"

# Create OSW
osw = cls.create_osw(cfg, jobs_d["n_datapoints"], sim_id, building_id, upgrade_idx)
with open(os.path.join(sim_dir, "in.osw"), "w") as f:
json.dump(osw, f, indent=4)

# Run Simulation
with open(sim_dir / "os_stdout.log", "w") as f_out:
try:
logger.debug("Running {}".format(sim_id))
subprocess.run(
["openstudio", "run", "-w", "in.osw"],
check=True,
stdout=f_out,
stderr=subprocess.STDOUT,
cwd=str(sim_dir),
)
except subprocess.CalledProcessError:
logger.debug(f"Simulation failed: see {sim_id}/os_stdout.log")

# Clean Up simulation directory
cls.cleanup_sim_dir(
sim_dir,
fs,
f"{bucket}/{prefix}/results/simulation_output/timeseries",
upgrade_id,
building_id,
)

# Read data_point_out.json
dpout = postprocessing.read_simulation_outputs(
local_fs, reporting_measures, str(sim_dir), upgrade_id, building_id
)
dpouts.append(dpout)

# Add the rest of the simulation outputs to the tar archive
logger.info("Archiving simulation outputs")
for dirpath, dirnames, filenames in os.walk(sim_dir):
if dirpath == str(sim_dir):
for dirname in set(dirnames).intersection(asset_dirs):
dirnames.remove(dirname)
for filename in filenames:
abspath = os.path.join(dirpath, filename)
relpath = os.path.relpath(abspath, sim_dir)
simout_tar.add(abspath, os.path.join(sim_id, relpath))

# Clear directory for next simulation
logger.debug("Clearing out simulation directory")
for item in set(os.listdir(sim_dir)).difference(asset_dirs):
if os.path.isdir(item):
shutil.rmtree(item)
elif os.path.isfile(item):
os.remove(item)

# Upload simulation outputs tarfile to s3
fs.put(
str(simulation_output_tar_filename),
f"{bucket}/{prefix}/results/simulation_output/simulations_job{job_id}.tar.gz",
)

# Upload aggregated dpouts as a json file
with fs.open(
f"{bucket}/{prefix}/results/simulation_output/results_job{job_id}.json.gz",
"wb",
) as f1:
with gzip.open(f1, "wt", encoding="utf-8") as f2:
json.dump(dpouts, f2)

# Remove files (it helps docker if we don't leave a bunch of files laying around)
os.remove(simulation_output_tar_filename)
for item in os.listdir(sim_dir):
if os.path.isdir(item):
shutil.rmtree(item)
elif os.path.isfile(item):
os.remove(item)
cls.run_simulations(cfg, jobs_d, job_id, sim_dir, S3FileSystem(), bucket, prefix)


@log_error_details()
Expand Down
151 changes: 151 additions & 0 deletions buildstockbatch/cloud/docker_base.py
Expand Up @@ -9,7 +9,10 @@
:license: BSD-3
"""
import collections
import csv
import docker
from fsspec.implementations.local import LocalFileSystem
import gzip
import itertools
from joblib import Parallel, delayed
import json
Expand All @@ -19,9 +22,11 @@
import pathlib
import random
import shutil
import subprocess
import tarfile
import time

from buildstockbatch import postprocessing
from buildstockbatch.base import BuildStockBatchBase
from buildstockbatch.utils import ContainerRuntime, calc_hash_for_file, compress_file, read_csv

Expand Down Expand Up @@ -183,3 +188,149 @@ def prep_batches(self, tmppath):
os.makedirs(tmppath / "results" / "simulation_output")

return (job_count, unique_epws)

@classmethod
def get_epws_to_download(cls, sim_dir, jobs_d):
"""
Gets the list of filenames for the weather data required for a single batch of simulations.

:param sim_dir: Path to the directory where job files are stored
:param jobs_d: Contents of a single job JSON file; contains the list of buildings to simulate in this job.

:returns: Set of epw filenames needed for this batch of simulations.
"""
# Make a lookup of which parameter points to the weather file from options_lookup.tsv
with open(sim_dir / "lib" / "resources" / "options_lookup.tsv", "r", encoding="utf-8") as f:
tsv_reader = csv.reader(f, delimiter="\t")
next(tsv_reader) # skip headers
param_name = None
epws_by_option = {}
for row in tsv_reader:
row_has_epw = [x.endswith(".epw") for x in row[2:]]
if sum(row_has_epw):
if row[0] != param_name and param_name is not None:
raise RuntimeError(
"The epw files are specified in options_lookup.tsv under more than one parameter type: "
f"{param_name}, {row[0]}"
)
epw_filename = row[row_has_epw.index(True) + 2].split("=")[1]
param_name = row[0]
option_name = row[1]
epws_by_option[option_name] = epw_filename

# Look through the buildstock.csv to find the appropriate location and epw
epws_to_download = set()
building_ids = [x[0] for x in jobs_d["batch"]]
with open(
sim_dir / "lib" / "housing_characteristics" / "buildstock.csv",
"r",
encoding="utf-8",
) as f:
csv_reader = csv.DictReader(f)
for row in csv_reader:
if int(row["Building"]) in building_ids:
epws_to_download.add(epws_by_option[row[param_name]])

return epws_to_download

@classmethod
def run_simulations(cls, cfg, job_id, jobs_d, sim_dir, fs, bucket, prefix):
lathanh marked this conversation as resolved.
Show resolved Hide resolved
"""
Run one batch of simulations.

Runs the simulations, writes outputs to the provided storage bucket, and cleans up intermediate files.

:param cfg: Project config contents.
:param job_id: Index of this job.
:param jobs_d: Contents of a single job JSON file; contains the list of buildings to simulate in this job.
:param sim_dir: Path to the (local) directory where job files are stored.
:param fs: Filesystem to use when writing outputs to storage bucket
:param bucket: Name of the storage bucket to upload results to.
:param prefix: File prefix to use when writing to storage bucket.
"""
local_fs = LocalFileSystem()
reporting_measures = cls.get_reporting_measures(cfg)
dpouts = []
simulation_output_tar_filename = sim_dir.parent / "simulation_outputs.tar.gz"
asset_dirs = os.listdir(sim_dir)
ts_output_dir = (f"{bucket}/{prefix}/results/simulation_output/timeseries",)

with tarfile.open(str(simulation_output_tar_filename), "w:gz") as simout_tar:
for building_id, upgrade_idx in jobs_d["batch"]:
upgrade_id = 0 if upgrade_idx is None else upgrade_idx + 1
sim_id = f"bldg{building_id:07d}up{upgrade_id:02d}"

# Create OSW
osw = cls.create_osw(cfg, jobs_d["n_datapoints"], sim_id, building_id, upgrade_idx)
with open(os.path.join(sim_dir, "in.osw"), "w") as f:
json.dump(osw, f, indent=4)

# Run Simulation
with open(sim_dir / "os_stdout.log", "w") as f_out:
try:
logger.debug("Running {}".format(sim_id))
subprocess.run(
["openstudio", "run", "-w", "in.osw"],
check=True,
stdout=f_out,
stderr=subprocess.STDOUT,
cwd=str(sim_dir),
)
except subprocess.CalledProcessError:
logger.debug(f"Simulation failed: see {sim_id}/os_stdout.log")

# Clean Up simulation directory
cls.cleanup_sim_dir(
sim_dir,
fs,
ts_output_dir,
upgrade_id,
building_id,
)

# Read data_point_out.json
dpout = postprocessing.read_simulation_outputs(
local_fs, reporting_measures, str(sim_dir), upgrade_id, building_id
)
dpouts.append(dpout)

# Add the rest of the simulation outputs to the tar archive
logger.info("Archiving simulation outputs")
for dirpath, dirnames, filenames in os.walk(sim_dir):
if dirpath == str(sim_dir):
for dirname in set(dirnames).intersection(asset_dirs):
dirnames.remove(dirname)
for filename in filenames:
abspath = os.path.join(dirpath, filename)
relpath = os.path.relpath(abspath, sim_dir)
simout_tar.add(abspath, os.path.join(sim_id, relpath))

# Clear directory for next simulation
logger.debug("Clearing out simulation directory")
for item in set(os.listdir(sim_dir)).difference(asset_dirs):
if os.path.isdir(item):
shutil.rmtree(item)
elif os.path.isfile(item):
os.remove(item)

# Upload simulation outputs tarfile to s3
fs.put(
str(simulation_output_tar_filename),
f"{bucket}/{prefix}/results/simulation_output/simulations_job{job_id}.tar.gz",
)

# Upload aggregated dpouts as a json file
with fs.open(
f"{bucket}/{prefix}/results/simulation_output/results_job{job_id}.json.gz",
"wb",
) as f1:
with gzip.open(f1, "wt", encoding="utf-8") as f2:
json.dump(dpouts, f2)

# Remove files (it helps docker if we don't leave a bunch of files laying around)
os.remove(simulation_output_tar_filename)
for item in os.listdir(sim_dir):
if os.path.isdir(item):
shutil.rmtree(item)
elif os.path.isfile(item):
os.remove(item)