Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/glue crawler trailing space failure #202

Merged
merged 11 commits into from Apr 14, 2021
13 changes: 11 additions & 2 deletions buildstockbatch/postprocessing.py
Expand Up @@ -396,7 +396,7 @@ def upload_results(aws_conf, output_dir, results_dir):
for files in parquet_dir.rglob('*.parquet'):
all_files.append(files.relative_to(parquet_dir))

s3_prefix = aws_conf.get('s3', {}).get('prefix', None)
s3_prefix = aws_conf.get('s3', {}).get('prefix', '').rstrip('/')
s3_bucket = aws_conf.get('s3', {}).get('bucket', None)
if not (s3_prefix and s3_bucket):
logger.error("YAML file missing postprocessing:aws:s3:prefix and/or bucket entry.")
Expand Down Expand Up @@ -431,10 +431,19 @@ def create_athena_tables(aws_conf, tbl_prefix, s3_bucket, s3_prefix):
max_crawling_time = aws_conf.get('athena', {}).get('max_crawling_time', 600)
assert db_name, "athena:database_name not supplied"

# Check that there are files in the s3 bucket before creating and running glue crawler
s3 = boto3.resource('s3')
bucket = s3.Bucket(s3_bucket)
s3_path = f's3://{s3_bucket}/{s3_prefix}'
n_existing_files = len(list(bucket.objects.filter(Prefix=s3_prefix)))
if n_existing_files == 0:
logger.warning(f"There are no files in {s3_path}, Athena tables will not be created as intended")
return

asparke2 marked this conversation as resolved.
Show resolved Hide resolved
glueClient = boto3.client('glue', region_name=region_name)
crawlTarget = {
'S3Targets': [{
'Path': f's3://{s3_bucket}/{s3_prefix}',
'Path': s3_path,
'Exclusions': []
}]
}
Expand Down
9 changes: 5 additions & 4 deletions buildstockbatch/test/test_base.py
Expand Up @@ -240,7 +240,7 @@ def test_combine_files(basic_residential_project_file):


@patch('buildstockbatch.postprocessing.boto3')
def test_upload_files(mocked_s3, basic_residential_project_file):
def test_upload_files(mocked_boto3, basic_residential_project_file):
s3_bucket = 'test_bucket'
s3_prefix = 'test_prefix'
db_name = 'test_db_name'
Expand All @@ -265,7 +265,8 @@ def test_upload_files(mocked_s3, basic_residential_project_file):
}
mocked_glueclient = MagicMock()
mocked_glueclient.get_crawler = MagicMock(return_value={'Crawler': {'State': 'READY'}})
mocked_s3.client = MagicMock(return_value=mocked_glueclient)
mocked_boto3.client = MagicMock(return_value=mocked_glueclient)
mocked_boto3.resource().Bucket().objects.filter.side_effect = [[], ['a', 'b', 'c']]
project_filename, results_dir = basic_residential_project_file(upload_config)
with patch.object(BuildStockBatchBase, 'weather_dir', None), \
patch.object(BuildStockBatchBase, 'output_dir', results_dir), \
Expand All @@ -278,7 +279,7 @@ def test_upload_files(mocked_s3, basic_residential_project_file):
files_uploaded = []
crawler_created = False
crawler_started = False
for call in mocked_s3.mock_calls + mocked_s3.client().mock_calls:
for call in mocked_boto3.mock_calls[2:] + mocked_boto3.client().mock_calls:
call_function = call[0].split('.')[-1] # 0 is for the function name
if call_function == 'resource':
assert call[1][0] in ['s3'] # call[1] is for the positional arguments
Expand All @@ -289,7 +290,7 @@ def test_upload_files(mocked_s3, basic_residential_project_file):
destination_path = call[1][1]
files_uploaded.append((source_file_path, destination_path))
if call_function == 'create_crawler':
crawler_para = call[2] # 2 is for the keyboard arguments
crawler_para = call[2] # 2 is for the keyword arguments
crawler_created = True
assert crawler_para['DatabaseName'] == upload_config['postprocessing']['aws']['athena']['database_name']
assert crawler_para['Role'] == upload_config['postprocessing']['aws']['athena']['glue_service_role']
Expand Down
9 changes: 9 additions & 0 deletions docs/changelog/changelog_dev.rst
Expand Up @@ -51,3 +51,12 @@ Development Changelog

Use a map of dask delayed function to combine parquets instead of a giant dask df to avoid memory issues.
Default to 85GB memory nodes in eagle with single process and single thread in each node to avoid memory issues.

.. change::
:tags: postprocessing
:pullreq: 202
:tickets: 159

The glue crawler was failing when there was a trailing ``/`` character.
This fixes that as well as checks to make sure files were uploaded
before running the crawler.