New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bug 1685769 - Add cloud monitoring export script and jobs #1679
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
"""Monitoring.""" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,216 @@ | ||
"""Export GCP monitoring metric data to BigQuery.""" | ||
import datetime | ||
import os | ||
import time | ||
from typing import Dict, List | ||
|
||
from google.cloud import bigquery, monitoring | ||
from google.cloud.exceptions import NotFound | ||
|
||
|
||
def get_time_series( | ||
client: monitoring.MetricServiceClient, | ||
target_project: str, | ||
filter_string: str, | ||
aggregation: monitoring.Aggregation, | ||
start_time: datetime.datetime, | ||
end_time: datetime.datetime, | ||
) -> List[Dict]: | ||
"""Get monitoring time series data.""" | ||
interval = monitoring.TimeInterval( | ||
{ | ||
"start_time": {"seconds": int(start_time.timestamp())}, | ||
"end_time": {"seconds": int(end_time.timestamp())}, | ||
} | ||
) | ||
|
||
results = client.list_time_series( | ||
monitoring.ListTimeSeriesRequest( | ||
{ | ||
"name": f"projects/{target_project}", | ||
"filter": filter_string, | ||
"interval": interval, | ||
"view": monitoring.ListTimeSeriesRequest.TimeSeriesView.FULL, | ||
"aggregation": aggregation, | ||
} | ||
) | ||
) | ||
|
||
data_points = [] | ||
for result in results: | ||
data_points.extend( | ||
[ | ||
{ | ||
"value": point.value.double_value, | ||
"timestamp": point.interval.start_time, | ||
**result.resource.labels, | ||
**result.metric.labels, | ||
} | ||
for point in result.points | ||
] | ||
) | ||
|
||
return data_points | ||
|
||
|
||
def write_to_bq( | ||
client: bigquery.Client, | ||
target_table: bigquery.TableReference, | ||
data_points: List[Dict], | ||
overwrite: bool, | ||
) -> bigquery.LoadJob: | ||
"""Convert list of dict to dataframe and load into Bigquery.""" | ||
load_config = bigquery.LoadJobConfig( | ||
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE | ||
if overwrite | ||
else bigquery.WriteDisposition.WRITE_APPEND, | ||
create_disposition=bigquery.CreateDisposition.CREATE_IF_NEEDED, | ||
schema_update_options=None | ||
if overwrite | ||
else bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION, | ||
time_partitioning=bigquery.TimePartitioning(field="timestamp"), | ||
autodetect=True, | ||
) | ||
# stringify timestamp so it's serializable | ||
data_points_copy = [] | ||
for data_point in data_points: | ||
new_data_point = data_point.copy() | ||
new_data_point["timestamp"] = str(data_point["timestamp"]) | ||
data_points_copy.append(new_data_point) | ||
load_response = client.load_table_from_json( | ||
data_points_copy, | ||
target_table, | ||
job_config=load_config, | ||
) | ||
return load_response | ||
|
||
|
||
def filter_existing_data( | ||
time_series_data: List[Dict], | ||
client: bigquery.Client, | ||
target_table: bigquery.TableReference, | ||
start_time: datetime.datetime, | ||
end_time: datetime.datetime, | ||
) -> List[Dict]: | ||
"""Remove data points that have a matching timestamp in the target table.""" | ||
query_string = f""" | ||
SELECT DISTINCT(timestamp) AS timestamp | ||
FROM {target_table.dataset_id}.{target_table.table_id} | ||
WHERE timestamp BETWEEN '{str(start_time)}' AND '{str(end_time)}' | ||
""" | ||
try: | ||
results = client.query(query_string).result() | ||
except NotFound: | ||
results = [] | ||
existing_timestamps = {row.timestamp for row in results} | ||
|
||
if len(existing_timestamps) == 0: | ||
return time_series_data | ||
|
||
return [ | ||
data_point | ||
for data_point in time_series_data | ||
if data_point["timestamp"] not in existing_timestamps | ||
] | ||
|
||
|
||
# TODO: support additional metric filters and aggregation parameters | ||
def export_metrics( | ||
monitoring_project: str, | ||
dst_project: str, | ||
dst_dataset: str, | ||
dst_table: str, | ||
metric: str, | ||
execution_time: datetime.datetime, | ||
time_offset: int = 0, | ||
interval_hours: int = 1, | ||
overwrite: bool = False, | ||
aggregator: monitoring.Aggregation.Reducer = monitoring.Aggregation.Reducer.REDUCE_NONE, | ||
aligner: monitoring.Aggregation.Aligner = monitoring.Aggregation.Aligner.ALIGN_MEAN, | ||
alignment_period: int = 300, | ||
): | ||
""" | ||
Fetch given metric from cloud monitoring and write results to designated table. | ||
|
||
:param monitoring_project: Project containing the monitoring workspace | ||
:param dst_project: Project to write results to | ||
:param dst_dataset: Bigquery dataset to write results to | ||
:param dst_table: Bigquery table to write results to | ||
:param metric: Metric and resource identifier | ||
(e.g. pubsub.googleapis.com/topic/send_request_count) | ||
:param execution_time: End of the time interval to export | ||
:param time_offset: Number of hours to offset interval to account for delayed metrics | ||
:param interval_hours: Number of hours to export, ending at execution time | ||
:param overwrite: Overwrite destination table (default is append) | ||
:param aggregator: Cloud monitoring series aggregator | ||
:param aligner: Cloud monitoring series aligner | ||
:param alignment_period: Seconds between each point in the time series | ||
""" | ||
monitoring_client = monitoring.MetricServiceClient() | ||
|
||
bq_client = bigquery.Client(dst_project) | ||
|
||
# Force timezone to be UTC so intervals match exported metrics | ||
os.environ["TZ"] = "UTC" | ||
time.tzset() | ||
|
||
# Larger intervals cause run time to increase superlinearly and takes a lot of memory | ||
max_interval_width = 24 | ||
for interval_offset in range(0, interval_hours, max_interval_width): | ||
offset = time_offset + interval_offset | ||
end_time = execution_time - datetime.timedelta(hours=offset) | ||
start_time = execution_time - datetime.timedelta( | ||
hours=min(offset + max_interval_width, time_offset + interval_hours) | ||
) | ||
print(f"Fetching values for {start_time} to {end_time}") | ||
|
||
metric_filter = f'metric.type = "{metric}"' | ||
|
||
aggregation = monitoring.Aggregation( | ||
{ | ||
"alignment_period": {"seconds": alignment_period}, | ||
"cross_series_reducer": aggregator, | ||
"per_series_aligner": aligner, | ||
} | ||
) | ||
|
||
time_series_data = get_time_series( | ||
monitoring_client, | ||
monitoring_project, | ||
metric_filter, | ||
aggregation, | ||
start_time, | ||
end_time, | ||
) | ||
|
||
print( | ||
f"Retrieved {len(time_series_data)} data points for" | ||
f" {start_time} to {end_time}" | ||
) | ||
|
||
target_dataset = bigquery.DatasetReference(dst_project, dst_dataset) | ||
target_table = bigquery.table.TableReference(target_dataset, dst_table) | ||
|
||
# get existing timestamps in destination table to avoid overlap | ||
if not overwrite and len(time_series_data) > 0: | ||
time_series_data = filter_existing_data( | ||
time_series_data, | ||
bq_client, | ||
target_table, | ||
start_time, | ||
end_time, | ||
) | ||
|
||
if len(time_series_data) == 0: | ||
print(f"No new data points found for interval {start_time} to {end_time}") | ||
continue | ||
|
||
response = write_to_bq( | ||
bq_client, | ||
target_table, | ||
time_series_data, | ||
overwrite=overwrite and interval_offset == 0, | ||
) | ||
response.result() | ||
|
||
print(f"Wrote to {dst_project}.{dst_dataset}.{dst_table}") |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -331,3 +331,14 @@ bqetl_desktop_platform: | |
] | ||
retries: 2 | ||
retry_delay: 30m | ||
|
||
bqetl_cloud_monitoring_export: | ||
schedule_interval: 0 * * * * | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe this is equivalent to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is hourly but yes |
||
default_args: | ||
owner: "bewu@mozilla.com" | ||
start_date: '2021-01-01' | ||
end_date: '2021-01-01' | ||
email: | ||
['bewu@mozilla.com'] | ||
retries: 2 | ||
retry_delay: 15m |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
# Generated via https://github.com/mozilla/bigquery-etl/blob/master/bigquery_etl/query_scheduling/generate_airflow_dags.py | ||
|
||
from airflow import DAG | ||
from airflow.operators.sensors import ExternalTaskSensor | ||
import datetime | ||
from utils.gcp import bigquery_etl_query, gke_command | ||
|
||
default_args = { | ||
"owner": "bewu@mozilla.com", | ||
"start_date": datetime.datetime(2021, 1, 1, 0, 0), | ||
"end_date": datetime.datetime(2021, 1, 1, 0, 0), | ||
"email": ["bewu@mozilla.com"], | ||
"depends_on_past": False, | ||
"retry_delay": datetime.timedelta(seconds=900), | ||
"email_on_failure": True, | ||
"email_on_retry": True, | ||
"retries": 2, | ||
} | ||
|
||
with DAG( | ||
"bqetl_cloud_monitoring_export", | ||
default_args=default_args, | ||
schedule_interval="0 * * * *", | ||
) as dag: | ||
|
||
monitoring__dataflow_user_counters__v1 = gke_command( | ||
task_id="monitoring__dataflow_user_counters__v1", | ||
command=[ | ||
"python", | ||
"sql/moz-fx-data-shared-prod/monitoring/dataflow_user_counters_v1/query.py", | ||
] | ||
+ [ | ||
"--execution-time", | ||
"{{ ts }}", | ||
"--interval-hours", | ||
"1", | ||
"--time-offset", | ||
"1", | ||
], | ||
docker_image="mozilla/bigquery-etl:latest", | ||
owner="bewu@mozilla.com", | ||
email=["bewu@mozilla.com"], | ||
) | ||
|
||
monitoring__kubernetes_not_coerced_to_int__v1 = gke_command( | ||
task_id="monitoring__kubernetes_not_coerced_to_int__v1", | ||
command=[ | ||
"python", | ||
"sql/moz-fx-data-shared-prod/monitoring/kubernetes_not_coerced_to_int_v1/query.py", | ||
] | ||
+ [ | ||
"--execution-time", | ||
"{{ ts }}", | ||
"--interval-hours", | ||
"1", | ||
"--time-offset", | ||
"1", | ||
], | ||
docker_image="mozilla/bigquery-etl:latest", | ||
owner="bewu@mozilla.com", | ||
email=["bewu@mozilla.com"], | ||
) | ||
|
||
monitoring__pubsub_subscription_oldest_unacked_msg__v1 = gke_command( | ||
task_id="monitoring__pubsub_subscription_oldest_unacked_msg__v1", | ||
command=[ | ||
"python", | ||
"sql/moz-fx-data-shared-prod/monitoring/pubsub_subscription_oldest_unacked_msg_v1/query.py", | ||
] | ||
+ [ | ||
"--execution-time", | ||
"{{ ts }}", | ||
"--interval-hours", | ||
"1", | ||
"--time-offset", | ||
"1", | ||
], | ||
docker_image="mozilla/bigquery-etl:latest", | ||
owner="bewu@mozilla.com", | ||
email=["bewu@mozilla.com"], | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My inclination would be to create these monitoring tables with hourly partitioning, and then have these scripts atomically overwrite the target partition (specifying the destination table as
mytable$2021011201
, etc.), avoiding the need for filtering logic like this.We could achieve some simplification by having all this machinery assume that it's operating on one whole hour at a time. I may well be missing some nuance, though, so definitely open to pushback.