diff --git a/bigquery_etl/monitoring/__init__.py b/bigquery_etl/monitoring/__init__.py new file mode 100644 index 00000000000..ccf64277f44 --- /dev/null +++ b/bigquery_etl/monitoring/__init__.py @@ -0,0 +1 @@ +"""Monitoring.""" diff --git a/bigquery_etl/monitoring/export_metrics.py b/bigquery_etl/monitoring/export_metrics.py new file mode 100644 index 00000000000..98f1551c336 --- /dev/null +++ b/bigquery_etl/monitoring/export_metrics.py @@ -0,0 +1,216 @@ +"""Export GCP monitoring metric data to BigQuery.""" +import datetime +import os +import time +from typing import Dict, List + +from google.cloud import bigquery, monitoring +from google.cloud.exceptions import NotFound + + +def get_time_series( + client: monitoring.MetricServiceClient, + target_project: str, + filter_string: str, + aggregation: monitoring.Aggregation, + start_time: datetime.datetime, + end_time: datetime.datetime, +) -> List[Dict]: + """Get monitoring time series data.""" + interval = monitoring.TimeInterval( + { + "start_time": {"seconds": int(start_time.timestamp())}, + "end_time": {"seconds": int(end_time.timestamp())}, + } + ) + + results = client.list_time_series( + monitoring.ListTimeSeriesRequest( + { + "name": f"projects/{target_project}", + "filter": filter_string, + "interval": interval, + "view": monitoring.ListTimeSeriesRequest.TimeSeriesView.FULL, + "aggregation": aggregation, + } + ) + ) + + data_points = [] + for result in results: + data_points.extend( + [ + { + "value": point.value.double_value, + "timestamp": point.interval.start_time, + **result.resource.labels, + **result.metric.labels, + } + for point in result.points + ] + ) + + return data_points + + +def write_to_bq( + client: bigquery.Client, + target_table: bigquery.TableReference, + data_points: List[Dict], + overwrite: bool, +) -> bigquery.LoadJob: + """Convert list of dict to dataframe and load into Bigquery.""" + load_config = bigquery.LoadJobConfig( + write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE + if overwrite + else bigquery.WriteDisposition.WRITE_APPEND, + create_disposition=bigquery.CreateDisposition.CREATE_IF_NEEDED, + schema_update_options=None + if overwrite + else bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION, + time_partitioning=bigquery.TimePartitioning(field="timestamp"), + autodetect=True, + ) + # stringify timestamp so it's serializable + data_points_copy = [] + for data_point in data_points: + new_data_point = data_point.copy() + new_data_point["timestamp"] = str(data_point["timestamp"]) + data_points_copy.append(new_data_point) + load_response = client.load_table_from_json( + data_points_copy, + target_table, + job_config=load_config, + ) + return load_response + + +def filter_existing_data( + time_series_data: List[Dict], + client: bigquery.Client, + target_table: bigquery.TableReference, + start_time: datetime.datetime, + end_time: datetime.datetime, +) -> List[Dict]: + """Remove data points that have a matching timestamp in the target table.""" + query_string = f""" + SELECT DISTINCT(timestamp) AS timestamp + FROM {target_table.dataset_id}.{target_table.table_id} + WHERE timestamp BETWEEN '{str(start_time)}' AND '{str(end_time)}' + """ + try: + results = client.query(query_string).result() + except NotFound: + results = [] + existing_timestamps = {row.timestamp for row in results} + + if len(existing_timestamps) == 0: + return time_series_data + + return [ + data_point + for data_point in time_series_data + if data_point["timestamp"] not in existing_timestamps + ] + + +# TODO: support additional metric filters and aggregation parameters +def export_metrics( + monitoring_project: str, + dst_project: str, + dst_dataset: str, + dst_table: str, + metric: str, + execution_time: datetime.datetime, + time_offset: int = 0, + interval_hours: int = 1, + overwrite: bool = False, + aggregator: monitoring.Aggregation.Reducer = monitoring.Aggregation.Reducer.REDUCE_NONE, + aligner: monitoring.Aggregation.Aligner = monitoring.Aggregation.Aligner.ALIGN_MEAN, + alignment_period: int = 300, +): + """ + Fetch given metric from cloud monitoring and write results to designated table. + + :param monitoring_project: Project containing the monitoring workspace + :param dst_project: Project to write results to + :param dst_dataset: Bigquery dataset to write results to + :param dst_table: Bigquery table to write results to + :param metric: Metric and resource identifier + (e.g. pubsub.googleapis.com/topic/send_request_count) + :param execution_time: End of the time interval to export + :param time_offset: Number of hours to offset interval to account for delayed metrics + :param interval_hours: Number of hours to export, ending at execution time + :param overwrite: Overwrite destination table (default is append) + :param aggregator: Cloud monitoring series aggregator + :param aligner: Cloud monitoring series aligner + :param alignment_period: Seconds between each point in the time series + """ + monitoring_client = monitoring.MetricServiceClient() + + bq_client = bigquery.Client(dst_project) + + # Force timezone to be UTC so intervals match exported metrics + os.environ["TZ"] = "UTC" + time.tzset() + + # Larger intervals cause run time to increase superlinearly and takes a lot of memory + max_interval_width = 24 + for interval_offset in range(0, interval_hours, max_interval_width): + offset = time_offset + interval_offset + end_time = execution_time - datetime.timedelta(hours=offset) + start_time = execution_time - datetime.timedelta( + hours=min(offset + max_interval_width, time_offset + interval_hours) + ) + print(f"Fetching values for {start_time} to {end_time}") + + metric_filter = f'metric.type = "{metric}"' + + aggregation = monitoring.Aggregation( + { + "alignment_period": {"seconds": alignment_period}, + "cross_series_reducer": aggregator, + "per_series_aligner": aligner, + } + ) + + time_series_data = get_time_series( + monitoring_client, + monitoring_project, + metric_filter, + aggregation, + start_time, + end_time, + ) + + print( + f"Retrieved {len(time_series_data)} data points for" + f" {start_time} to {end_time}" + ) + + target_dataset = bigquery.DatasetReference(dst_project, dst_dataset) + target_table = bigquery.table.TableReference(target_dataset, dst_table) + + # get existing timestamps in destination table to avoid overlap + if not overwrite and len(time_series_data) > 0: + time_series_data = filter_existing_data( + time_series_data, + bq_client, + target_table, + start_time, + end_time, + ) + + if len(time_series_data) == 0: + print(f"No new data points found for interval {start_time} to {end_time}") + continue + + response = write_to_bq( + bq_client, + target_table, + time_series_data, + overwrite=overwrite and interval_offset == 0, + ) + response.result() + + print(f"Wrote to {dst_project}.{dst_dataset}.{dst_table}") diff --git a/dags.yaml b/dags.yaml index 125d26e56a8..9606af1d6e3 100644 --- a/dags.yaml +++ b/dags.yaml @@ -331,3 +331,14 @@ bqetl_desktop_platform: ] retries: 2 retry_delay: 30m + +bqetl_cloud_monitoring_export: + schedule_interval: 0 * * * * + default_args: + owner: "bewu@mozilla.com" + start_date: '2021-01-01' + end_date: '2021-01-01' + email: + ['bewu@mozilla.com'] + retries: 2 + retry_delay: 15m diff --git a/dags/bqetl_cloud_monitoring_export.py b/dags/bqetl_cloud_monitoring_export.py new file mode 100644 index 00000000000..6f84b0c3f2b --- /dev/null +++ b/dags/bqetl_cloud_monitoring_export.py @@ -0,0 +1,81 @@ +# Generated via https://github.com/mozilla/bigquery-etl/blob/master/bigquery_etl/query_scheduling/generate_airflow_dags.py + +from airflow import DAG +from airflow.operators.sensors import ExternalTaskSensor +import datetime +from utils.gcp import bigquery_etl_query, gke_command + +default_args = { + "owner": "bewu@mozilla.com", + "start_date": datetime.datetime(2021, 1, 1, 0, 0), + "end_date": datetime.datetime(2021, 1, 1, 0, 0), + "email": ["bewu@mozilla.com"], + "depends_on_past": False, + "retry_delay": datetime.timedelta(seconds=900), + "email_on_failure": True, + "email_on_retry": True, + "retries": 2, +} + +with DAG( + "bqetl_cloud_monitoring_export", + default_args=default_args, + schedule_interval="0 * * * *", +) as dag: + + monitoring__dataflow_user_counters__v1 = gke_command( + task_id="monitoring__dataflow_user_counters__v1", + command=[ + "python", + "sql/moz-fx-data-shared-prod/monitoring/dataflow_user_counters_v1/query.py", + ] + + [ + "--execution-time", + "{{ ts }}", + "--interval-hours", + "1", + "--time-offset", + "1", + ], + docker_image="mozilla/bigquery-etl:latest", + owner="bewu@mozilla.com", + email=["bewu@mozilla.com"], + ) + + monitoring__kubernetes_not_coerced_to_int__v1 = gke_command( + task_id="monitoring__kubernetes_not_coerced_to_int__v1", + command=[ + "python", + "sql/moz-fx-data-shared-prod/monitoring/kubernetes_not_coerced_to_int_v1/query.py", + ] + + [ + "--execution-time", + "{{ ts }}", + "--interval-hours", + "1", + "--time-offset", + "1", + ], + docker_image="mozilla/bigquery-etl:latest", + owner="bewu@mozilla.com", + email=["bewu@mozilla.com"], + ) + + monitoring__pubsub_subscription_oldest_unacked_msg__v1 = gke_command( + task_id="monitoring__pubsub_subscription_oldest_unacked_msg__v1", + command=[ + "python", + "sql/moz-fx-data-shared-prod/monitoring/pubsub_subscription_oldest_unacked_msg_v1/query.py", + ] + + [ + "--execution-time", + "{{ ts }}", + "--interval-hours", + "1", + "--time-offset", + "1", + ], + docker_image="mozilla/bigquery-etl:latest", + owner="bewu@mozilla.com", + email=["bewu@mozilla.com"], + ) diff --git a/requirements.in b/requirements.in index 35339e9c4e9..179fefc5306 100644 --- a/requirements.in +++ b/requirements.in @@ -1,5 +1,6 @@ gcloud==0.18.3 google-cloud-bigquery==2.6.1 +google-cloud-monitoring==2.0.0 google-cloud-storage==1.35.0 Jinja2==2.11.2 pytest-black==0.3.12 diff --git a/requirements.txt b/requirements.txt index 78289c738ef..efadf63a792 100644 --- a/requirements.txt +++ b/requirements.txt @@ -121,7 +121,7 @@ gitpython==3.1.12 \ google-api-core[grpc]==1.23.0 \ --hash=sha256:1bb3c485c38eacded8d685b1759968f6cf47dd9432922d34edb90359eaa391e2 \ --hash=sha256:94d8c707d358d8d9e8b0045c42be20efb58433d308bd92cf748511c7825569c8 \ - # via google-cloud-bigquery, google-cloud-core + # via google-cloud-bigquery, google-cloud-core, google-cloud-monitoring google-auth==1.22.0 \ --hash=sha256:a73e6fb6d232ed1293ef9a5301e6f8aada7880d19c65d7f63e130dc50ec05593 \ --hash=sha256:e86e72142d939a8d90a772947268aacc127ab7a1d1d6f3e0fecca7a8d74d8257 \ @@ -134,6 +134,10 @@ google-cloud-core==1.4.1 \ --hash=sha256:4c9e457fcfc026fdde2e492228f04417d4c717fb0f29f070122fb0ab89e34ebd \ --hash=sha256:613e56f164b6bee487dd34f606083a0130f66f42f7b10f99730afdf1630df507 \ # via google-cloud-bigquery, google-cloud-storage +google-cloud-monitoring==2.0.0 \ + --hash=sha256:1debfa046ab9518d46b68712c03d86d0ddb11d1aad428aed62c6465752f2201f \ + --hash=sha256:8c83b4248b6576ed133a38c67d9aefed55ecea2c78b31ff4f340617f1a813a56 \ + # via -r requirements.in google-cloud-storage==1.35.0 \ --hash=sha256:555c0db2f88f3419f123bf9c621d7fd92f7c9e4f8b11f08eda57facacba16a9e \ --hash=sha256:7b48b74683dafec5da315c7b0861ab168c208e04c763aa5db7ea8d7ecd8b6c5d \ @@ -227,6 +231,10 @@ jsonschema==3.2.0 \ --hash=sha256:4e5b3cf8216f577bee9ce139cbe72eca3ea4f292ec60928ff24758ce626cd163 \ --hash=sha256:c8a85b28d377cc7737e46e2d9f2b4f44ee3c0e1deac6bf46ddefc7187d30797a \ # via -r requirements.in, mozilla-schema-generator +libcst==0.3.16 \ + --hash=sha256:2c9e40245b8cb49b5219c76b36fe7037effa7594b9e6d5a092be99f8083d2415 \ + --hash=sha256:99c200004b6e845642eea7a433844d144994767f9ed50705171720b76d28cf7e \ + # via google-cloud-monitoring markupsafe==1.1.1 \ --hash=sha256:00bc623926325b26bb9605ae9eae8a215691f33cae5df11ca5424f06f2d1f473 \ --hash=sha256:09027a7803a62ca78792ad89403b1b7a73a01c8cb65909cd876f7fcebd79b161 \ @@ -292,7 +300,7 @@ multidict==4.7.6 \ mypy-extensions==0.4.3 \ --hash=sha256:090fedd75945a69ae91ce1303b5824f428daf5a028d2f6ab8a299250a846f15d \ --hash=sha256:2d82818f5bb3e369420cb3c4060a7970edba416647068eb4c5343488a6c604a8 \ - # via black, mypy + # via black, mypy, typing-inspect mypy==0.782 \ --hash=sha256:2c6cde8aa3426c1682d35190b59b71f661237d74b053822ea3d748e2c9578a7c \ --hash=sha256:3fdda71c067d3ddfb21da4b80e2686b71e9e5c72cca65fa216d207a358827f86 \ @@ -378,7 +386,7 @@ pluggy==0.13.1 \ # via pytest proto-plus==1.10.0 \ --hash=sha256:899f2bdc081197e683e3e29d05cd75861efbdd3de883bfb8b78165d74fb587f1 \ - # via google-cloud-bigquery + # via google-cloud-bigquery, google-cloud-monitoring protobuf==3.13.0 \ --hash=sha256:0bba42f439bf45c0f600c3c5993666fcb88e8441d011fad80a11df6f324eef33 \ --hash=sha256:1e834076dfef9e585815757a2c7e4560c7ccc5962b9d09f831214c693a91b463 \ @@ -472,24 +480,31 @@ pyyaml==5.3.1 \ --hash=sha256:06a0d7ba600ce0b2d2fe2e78453a470b5a6e000a985dd4a4e54e436cc36b0e97 \ --hash=sha256:240097ff019d7c70a4922b6869d8a86407758333f02203e0fc6ff79c5dcede76 \ --hash=sha256:4f4b913ca1a7319b33cfb1369e91e50354d6f07a135f3b901aca02aa95940bd2 \ + --hash=sha256:6034f55dab5fea9e53f436aa68fa3ace2634918e8b5994d82f3621c04ff5ed2e \ --hash=sha256:69f00dca373f240f842b2931fb2c7e14ddbacd1397d57157a9b005a6a9942648 \ --hash=sha256:73f099454b799e05e5ab51423c7bcf361c58d3206fa7b0d555426b1f4d9a3eaf \ --hash=sha256:74809a57b329d6cc0fdccee6318f44b9b8649961fa73144a98735b0aaf029f1f \ --hash=sha256:7739fc0fa8205b3ee8808aea45e968bc90082c10aef6ea95e855e10abf4a37b2 \ --hash=sha256:95f71d2af0ff4227885f7a6605c37fd53d3a106fcab511b8860ecca9fcf400ee \ + --hash=sha256:ad9c67312c84def58f3c04504727ca879cb0013b2517c85a9a253f0cb6380c0a \ --hash=sha256:b8eac752c5e14d3eca0e6dd9199cd627518cb5ec06add0de9d32baeee6fe645d \ --hash=sha256:cc8955cfbfc7a115fa81d85284ee61147059a753344bc51098f3ccd69b0d7e0c \ --hash=sha256:d13155f591e6fcc1ec3b30685d50bf0711574e2c0dfffd7644babf8b5102ca1a \ - # via -r requirements.in, mozilla-schema-generator, yamllint + # via -r requirements.in, libcst, mozilla-schema-generator, yamllint regex==2020.9.27 \ --hash=sha256:088afc8c63e7bd187a3c70a94b9e50ab3f17e1d3f52a32750b5b77dbe99ef5ef \ --hash=sha256:1fe0a41437bbd06063aa184c34804efa886bcc128222e9916310c92cd54c3b4c \ + --hash=sha256:3d20024a70b97b4f9546696cbf2fd30bae5f42229fbddf8661261b1eaff0deb7 \ --hash=sha256:41bb65f54bba392643557e617316d0d899ed5b4946dccee1cb6696152b29844b \ --hash=sha256:4318d56bccfe7d43e5addb272406ade7a2274da4b70eb15922a071c58ab0108c \ --hash=sha256:4707f3695b34335afdfb09be3802c87fa0bc27030471dbc082f815f23688bc63 \ + --hash=sha256:49f23ebd5ac073765ecbcf046edc10d63dcab2f4ae2bce160982cb30df0c0302 \ --hash=sha256:5533a959a1748a5c042a6da71fe9267a908e21eded7a4f373efd23a2cbdb0ecc \ + --hash=sha256:5d892a4f1c999834eaa3c32bc9e8b976c5825116cde553928c4c8e7e48ebda67 \ --hash=sha256:5f18875ac23d9aa2f060838e8b79093e8bb2313dbaaa9f54c6d8e52a5df097be \ --hash=sha256:60b0e9e6dc45683e569ec37c55ac20c582973841927a85f2d8a7d20ee80216ab \ + --hash=sha256:816064fc915796ea1f26966163f6845de5af78923dfcecf6551e095f00983650 \ + --hash=sha256:84cada8effefe9a9f53f9b0d2ba9b7b6f5edf8d2155f9fdbe34616e06ececf81 \ --hash=sha256:84e9407db1b2eb368b7ecc283121b5e592c9aaedbe8c78b1a2f1102eb2e21d19 \ --hash=sha256:8d69cef61fa50c8133382e61fd97439de1ae623fe943578e477e76a9d9471637 \ --hash=sha256:9a02d0ae31d35e1ec12a4ea4d4cca990800f66a917d0fb997b20fbc13f5321fc \ @@ -497,6 +512,7 @@ regex==2020.9.27 \ --hash=sha256:a6f32aea4260dfe0e55dc9733ea162ea38f0ea86aa7d0f77b15beac5bf7b369d \ --hash=sha256:ae91972f8ac958039920ef6e8769277c084971a142ce2b660691793ae44aae6b \ --hash=sha256:c570f6fa14b9c4c8a4924aaad354652366577b4f98213cf76305067144f7b100 \ + --hash=sha256:c9443124c67b1515e4fe0bb0aa18df640965e1030f468a2a5dc2589b26d130ad \ --hash=sha256:d23a18037313714fb3bb5a94434d3151ee4300bae631894b1ac08111abeaa4a3 \ --hash=sha256:eaf548d117b6737df379fdd53bdde4f08870e66d7ea653e230477f071f861121 \ --hash=sha256:ebbe29186a3d9b0c591e71b7393f1ae08c83cb2d8e517d2a822b8f7ec99dfd8b \ @@ -542,31 +558,45 @@ toml==0.10.1 \ typed-ast==1.4.1 \ --hash=sha256:0666aa36131496aed8f7be0410ff974562ab7eeac11ef351def9ea6fa28f6355 \ --hash=sha256:0c2c07682d61a629b68433afb159376e24e5b2fd4641d35424e462169c0a7919 \ + --hash=sha256:0d8110d78a5736e16e26213114a38ca35cb15b6515d535413b090bd50951556d \ --hash=sha256:249862707802d40f7f29f6e1aad8d84b5aa9e44552d2cc17384b209f091276aa \ --hash=sha256:24995c843eb0ad11a4527b026b4dde3da70e1f2d8806c99b7b4a7cf491612652 \ --hash=sha256:269151951236b0f9a6f04015a9004084a5ab0d5f19b57de779f908621e7d8b75 \ + --hash=sha256:3742b32cf1c6ef124d57f95be609c473d7ec4c14d0090e5a5e05a15269fb4d0c \ --hash=sha256:4083861b0aa07990b619bd7ddc365eb7fa4b817e99cf5f8d9cf21a42780f6e01 \ --hash=sha256:498b0f36cc7054c1fead3d7fc59d2150f4d5c6c56ba7fb150c013fbc683a8d2d \ --hash=sha256:4e3e5da80ccbebfff202a67bf900d081906c358ccc3d5e3c8aea42fdfdfd51c1 \ --hash=sha256:6daac9731f172c2a22ade6ed0c00197ee7cc1221aa84cfdf9c31defeb059a907 \ --hash=sha256:715ff2f2df46121071622063fc7543d9b1fd19ebfc4f5c8895af64a77a8c852c \ --hash=sha256:73d785a950fc82dd2a25897d525d003f6378d1cb23ab305578394694202a58c3 \ + --hash=sha256:7e4c9d7658aaa1fc80018593abdf8598bf91325af6af5cce4ce7c73bc45ea53d \ --hash=sha256:8c8aaad94455178e3187ab22c8b01a3837f8ee50e09cf31f1ba129eb293ec30b \ --hash=sha256:8ce678dbaf790dbdb3eba24056d5364fb45944f33553dd5869b7580cdbb83614 \ + --hash=sha256:92c325624e304ebf0e025d1224b77dd4e6393f18aab8d829b5b7e04afe9b7a2c \ --hash=sha256:aaee9905aee35ba5905cfb3c62f3e83b3bec7b39413f0a7f19be4e547ea01ebb \ + --hash=sha256:b52ccf7cfe4ce2a1064b18594381bccf4179c2ecf7f513134ec2f993dd4ab395 \ --hash=sha256:bcd3b13b56ea479b3650b82cabd6b5343a625b0ced5429e4ccad28a8973f301b \ --hash=sha256:c9e348e02e4d2b4a8b2eedb48210430658df6951fa484e59de33ff773fbd4b41 \ --hash=sha256:d205b1b46085271b4e15f670058ce182bd1199e56b317bf2ec004b6a44f911f6 \ --hash=sha256:d43943ef777f9a1c42bf4e552ba23ac77a6351de620aa9acf64ad54933ad4d34 \ --hash=sha256:d5d33e9e7af3b34a40dc05f498939f0ebf187f07c385fd58d591c533ad8562fe \ + --hash=sha256:d648b8e3bf2fe648745c8ffcee3db3ff903d0817a01a12dd6a6ea7a8f4889072 \ + --hash=sha256:f208eb7aff048f6bea9586e61af041ddf7f9ade7caed625742af423f6bae3298 \ + --hash=sha256:fac11badff8313e23717f3dada86a15389d0708275bddf766cca67a84ead3e91 \ --hash=sha256:fc0fea399acb12edbf8a628ba8d2312f583bdbdb3335635db062fa98cf71fca4 \ + --hash=sha256:fcf135e17cc74dbfbc05894ebca928ffeb23d9790b3167a674921db19082401f \ --hash=sha256:fe460b922ec15dd205595c9b5b99e2f056fd98ae8f9f56b888e7a17dc2b757e7 \ # via black, mypy typing-extensions==3.7.4.3 \ --hash=sha256:7cb407020f00f7bfc3cb3e7881628838e69d8f3fcab2f64742a5e76b2f841918 \ --hash=sha256:99d4073b617d30288f569d3f13d2bd7548c3a7e4c8de87db09a9d29bb3a4a60c \ --hash=sha256:dafc7639cde7f1b6e1acc0f457842a83e722ccca8eef5270af2d74792619a89f \ - # via black, mypy + # via black, libcst, mypy, typing-inspect +typing-inspect==0.6.0 \ + --hash=sha256:3b98390df4d999a28cf5b35d8b333425af5da2ece8a4ea9e98f71e7591347b4f \ + --hash=sha256:8f1b1dd25908dbfd81d3bebc218011531e7ab614ba6e5bf7826d887c834afab7 \ + --hash=sha256:de08f50a22955ddec353876df7b2545994d6df08a2f45d54ac8c05e530372ca0 \ + # via libcst typing==3.7.4.3 \ --hash=sha256:1187fb9c82fd670d10aa07bbb6cfcfe4bdda42d6fab8d5134f04e8c4d0b71cc9 \ --hash=sha256:283d868f5071ab9ad873e5e52268d611e851c870a2ba354193026f2dfb29d8b5 \ @@ -627,7 +657,7 @@ pip==20.3.3 \ --hash=sha256:79c1ac8a9dccbec8752761cb5a2df833224263ca661477a2a9ed03ddf4e0e3ba \ --hash=sha256:fab098c8a1758295dd9f57413c199f23571e8fde6cc39c22c78c961b4ac6286d \ # via pip-tools -setuptools==51.1.1 \ - --hash=sha256:0b43d1e0e0ac1467185581c2ceaf86b5c1a1bc408f8f6407687b0856302d1850 \ - --hash=sha256:6d119767443a0f770bab9738b86ce9c0a699a7759ff4f61af583ee73d2e528a0 \ +setuptools==51.1.2 \ + --hash=sha256:4fa149145ba5dcd4aaa89912ec92393a31170eaf17fe0268b1429538bad1f85a \ + --hash=sha256:67d8af2fc9f33e48f8f4387321700a79b27090a5cff154e5ce1a8c72c2eea54f \ # via google-api-core, google-auth, jsonschema, protobuf, yamllint diff --git a/sql/moz-fx-data-shared-prod/monitoring/dataflow_user_counters_v1/metadata.yaml b/sql/moz-fx-data-shared-prod/monitoring/dataflow_user_counters_v1/metadata.yaml new file mode 100644 index 00000000000..a7501b83f1a --- /dev/null +++ b/sql/moz-fx-data-shared-prod/monitoring/dataflow_user_counters_v1/metadata.yaml @@ -0,0 +1,17 @@ +--- +friendly_name: Dataflow User Counters +description: > + Exported dataflow user counters from GCP monitoring + (dataflow.googleapis.com/job/user_counter) +owners: + - bewu@mozilla.com +labels: + schedule: hourly + incremental: true +scheduling: + dag_name: bqetl_cloud_monitoring_export + arguments: [ + "--execution-time", "{{ ts }}", + "--interval-hours", "1", + "--time-offset", "1" + ] diff --git a/sql/moz-fx-data-shared-prod/monitoring/dataflow_user_counters_v1/query.py b/sql/moz-fx-data-shared-prod/monitoring/dataflow_user_counters_v1/query.py new file mode 100644 index 00000000000..9ba78a7268f --- /dev/null +++ b/sql/moz-fx-data-shared-prod/monitoring/dataflow_user_counters_v1/query.py @@ -0,0 +1,31 @@ +import datetime + +import click +from google.cloud import monitoring + +from bigquery_etl.monitoring import export_metrics + + +@click.command() +@click.option("--execution-time", type=datetime.datetime.fromisoformat, required=True) +@click.option("--interval-hours", type=int, required=True) +@click.option("--time-offset", type=int, required=True) +def main(execution_time, interval_hours, time_offset): + export_metrics.export_metrics( + monitoring_project="moz-fx-data-ingesti-prod-579d", + dst_project="moz-fx-data-shared-prod", + dst_dataset="monitoring", + dst_table="dataflow_user_counters_v1", + metric="dataflow.googleapis.com/job/user_counter", + execution_time=execution_time, + interval_hours=interval_hours, + time_offset=time_offset, + aggregator=monitoring.Aggregation.Reducer.REDUCE_NONE, + aligner=monitoring.Aggregation.Aligner.ALIGN_MEAN, + overwrite=False, + alignment_period=300, + ) + + +if __name__ == "__main__": + main() diff --git a/sql/moz-fx-data-shared-prod/monitoring/kubernetes_not_coerced_to_int_v1/metadata.yaml b/sql/moz-fx-data-shared-prod/monitoring/kubernetes_not_coerced_to_int_v1/metadata.yaml new file mode 100644 index 00000000000..1c42bfcb8e6 --- /dev/null +++ b/sql/moz-fx-data-shared-prod/monitoring/kubernetes_not_coerced_to_int_v1/metadata.yaml @@ -0,0 +1,17 @@ +--- +friendly_name: Kubernetes Not Coerced to int +description: > + Exported kubernetes coerced_to_int from GCP monitoring + (custom.googleapis.com/opencensus/not_coerced_to_int) +owners: + - bewu@mozilla.com +labels: + schedule: hourly + incremental: true +scheduling: + dag_name: bqetl_cloud_monitoring_export + arguments: [ + "--execution-time", "{{ ts }}", + "--interval-hours", "1", + "--time-offset", "1" + ] diff --git a/sql/moz-fx-data-shared-prod/monitoring/kubernetes_not_coerced_to_int_v1/query.py b/sql/moz-fx-data-shared-prod/monitoring/kubernetes_not_coerced_to_int_v1/query.py new file mode 100644 index 00000000000..0314b7a91ef --- /dev/null +++ b/sql/moz-fx-data-shared-prod/monitoring/kubernetes_not_coerced_to_int_v1/query.py @@ -0,0 +1,31 @@ +import datetime + +import click +from google.cloud import monitoring + +from bigquery_etl.monitoring import export_metrics + + +@click.command() +@click.option("--execution-time", type=datetime.datetime.fromisoformat, required=True) +@click.option("--interval-hours", type=int, required=True) +@click.option("--time-offset", type=int, required=True) +def main(execution_time, interval_hours, time_offset): + export_metrics.export_metrics( + monitoring_project="moz-fx-data-ingesti-prod-579d", + dst_project="moz-fx-data-shared-prod", + dst_dataset="monitoring", + dst_table="kubernetes_not_coerced_to_int_v1", + metric="custom.googleapis.com/opencensus/not_coerced_to_int", + execution_time=execution_time, + interval_hours=interval_hours, + time_offset=time_offset, + aggregator=monitoring.Aggregation.Reducer.REDUCE_NONE, + aligner=monitoring.Aggregation.Aligner.ALIGN_RATE, + overwrite=False, + alignment_period=300, + ) + + +if __name__ == "__main__": + main() diff --git a/sql/moz-fx-data-shared-prod/monitoring/pubsub_subscription_oldest_unacked_msg_v1/metadata.yaml b/sql/moz-fx-data-shared-prod/monitoring/pubsub_subscription_oldest_unacked_msg_v1/metadata.yaml new file mode 100644 index 00000000000..8cfb784de20 --- /dev/null +++ b/sql/moz-fx-data-shared-prod/monitoring/pubsub_subscription_oldest_unacked_msg_v1/metadata.yaml @@ -0,0 +1,17 @@ +--- +friendly_name: Pubsub Subscription Oldest Unacked Message +description: > + Exported pubsub oldest unacked message from GCP monitoring + (pubsub.googleapis.com/subscription/oldest_unacked_message_age) +owners: + - bewu@mozilla.com +labels: + schedule: hourly + incremental: true +scheduling: + dag_name: bqetl_cloud_monitoring_export + arguments: [ + "--execution-time", "{{ ts }}", + "--interval-hours", "1", + "--time-offset", "1" + ] diff --git a/sql/moz-fx-data-shared-prod/monitoring/pubsub_subscription_oldest_unacked_msg_v1/query.py b/sql/moz-fx-data-shared-prod/monitoring/pubsub_subscription_oldest_unacked_msg_v1/query.py new file mode 100644 index 00000000000..87e50f662fb --- /dev/null +++ b/sql/moz-fx-data-shared-prod/monitoring/pubsub_subscription_oldest_unacked_msg_v1/query.py @@ -0,0 +1,31 @@ +import datetime + +import click +from google.cloud import monitoring + +from bigquery_etl.monitoring import export_metrics + + +@click.command() +@click.option("--execution-time", type=datetime.datetime.fromisoformat, required=True) +@click.option("--interval-hours", type=int, required=True) +@click.option("--time-offset", type=int, required=True) +def main(execution_time, interval_hours, time_offset): + export_metrics.export_metrics( + monitoring_project="moz-fx-data-ingesti-prod-579d", + dst_project="moz-fx-data-shared-prod", + dst_dataset="monitoring", + dst_table="pubsub_subscription_oldest_unacked_msg_v1", + metric="pubsub.googleapis.com/subscription/oldest_unacked_message_age", + execution_time=execution_time, + interval_hours=interval_hours, + time_offset=time_offset, + aggregator=monitoring.Aggregation.Reducer.REDUCE_NONE, + aligner=monitoring.Aggregation.Aligner.ALIGN_MEAN, + overwrite=False, + alignment_period=300, + ) + + +if __name__ == "__main__": + main() diff --git a/tests/monitoring/test_monitoring_metrics_export.py b/tests/monitoring/test_monitoring_metrics_export.py new file mode 100644 index 00000000000..01f2b02c10e --- /dev/null +++ b/tests/monitoring/test_monitoring_metrics_export.py @@ -0,0 +1,292 @@ +import datetime +from unittest.mock import ANY, MagicMock + +import pytest +from google.cloud import bigquery, monitoring +from google.cloud.exceptions import NotFound + +from bigquery_etl.monitoring import export_metrics + + +class TestExportMetrics: + @pytest.fixture + def test_table(self): + return bigquery.TableReference( + bigquery.DatasetReference("project", "dataset"), "table" + ) + + # Obfuscated real data + @pytest.fixture + def test_data(self): + return [ + monitoring.TimeSeries( + { + "metric": { + "type": "pubsub.googleapis.com/subscription/oldest_unacked_message_age", # noqa E502 + "labels": {"test_key": "test_value"}, + }, + "resource": { + "type": "pubsub_subscription", + "labels": { + "project_id": "test-project-1", + "subscription_id": "subscription", + }, + }, + "metric_kind": "GAUGE", + "value_type": "DOUBLE", + "points": [ + { + "interval": { + "start_time": {"seconds": 1609512900}, + "end_time": {"seconds": 1609512900}, + }, + "value": {"double_value": 2.0}, + }, + { + "interval": { + "start_time": {"seconds": 1609512600}, + "end_time": {"seconds": 1609512600}, + }, + "value": {"double_value": 1.0}, + }, + ], + } + ), + monitoring.TimeSeries( + { + "metric": { + "type": "pubsub.googleapis.com/subscription/oldest_unacked_message_age", # noqa E502 + }, + "resource": { + "type": "pubsub_subscription", + "labels": { + "project_id": "test-project-2", + "subscription_id": "subscription", + }, + }, + "metric_kind": "GAUGE", + "value_type": "DOUBLE", + "points": [ + { + "interval": { + "start_time": {"seconds": 1609513200}, + "end_time": {"seconds": 1609513200}, + }, + "value": {"double_value": 4.0}, + }, + { + "interval": { + "start_time": {"seconds": 1609512900}, + "end_time": {"seconds": 1609512900}, + }, + "value": {"double_value": 3.0}, + }, + ], + } + ), + ] + + def test_get_time_series_parsing(self, test_data): + """Get time series should correctly transform TimeSeries objects to dict.""" + mock_client = MagicMock() + mock_client.list_time_series.return_value = test_data + results = export_metrics.get_time_series( + mock_client, + "project-1", + "", + monitoring.Aggregation(), + datetime.datetime(2021, 1, 1), + datetime.datetime(2020, 1, 2), + ) + expected = [ + { + "timestamp": datetime.datetime( + 2021, 1, 1, 14, 50, tzinfo=datetime.timezone.utc + ), + "value": 1, + "project_id": "test-project-1", + "subscription_id": "subscription", + "test_key": "test_value", + }, + { + "timestamp": datetime.datetime( + 2021, 1, 1, 14, 55, tzinfo=datetime.timezone.utc + ), + "value": 2, + "project_id": "test-project-1", + "subscription_id": "subscription", + "test_key": "test_value", + }, + { + "timestamp": datetime.datetime( + 2021, 1, 1, 14, 55, tzinfo=datetime.timezone.utc + ), + "value": 3, + "project_id": "test-project-2", + "subscription_id": "subscription", + }, + { + "timestamp": datetime.datetime( + 2021, 1, 1, 15, 0, tzinfo=datetime.timezone.utc + ), + "value": 4, + "project_id": "test-project-2", + "subscription_id": "subscription", + }, + ] + assert sorted(results, key=lambda x: (x["timestamp"], x["value"])) == expected + + def test_write_to_bq_overwrite_config(self, test_table): + """Write to bq should use correct config when overwriting data.""" + mock_client = MagicMock() + export_metrics.write_to_bq(mock_client, test_table, [], overwrite=True) + + job_config = mock_client.load_table_from_json.call_args.kwargs["job_config"] + + assert ( + job_config.create_disposition == bigquery.CreateDisposition.CREATE_IF_NEEDED + ) + assert job_config.schema_update_options is None + assert job_config.write_disposition == bigquery.WriteDisposition.WRITE_TRUNCATE + + def test_write_to_bq_append_config(self, test_table): + """Write to bq should use correct config when appending data.""" + mock_client = MagicMock() + export_metrics.write_to_bq(mock_client, test_table, [], overwrite=False) + + job_config = mock_client.load_table_from_json.call_args.kwargs["job_config"] + + assert ( + job_config.create_disposition + ), bigquery.CreateDisposition.CREATE_IF_NEEDED + assert ( + job_config.schema_update_options + ), bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION + assert job_config.write_disposition, bigquery.WriteDisposition.WRITE_APPEND + + def test_write_to_bq_stringify_timestamp(self): + """Write to bq should convert timestamp values to strings.""" + mock_client = MagicMock() + + data_points = [ + { + "timestamp": datetime.datetime( + 2021, 1, 1, 14, 50, tzinfo=datetime.timezone.utc + ), + "value": 1, + }, + { + "timestamp": datetime.datetime( + 2021, 1, 1, 14, 55, tzinfo=datetime.timezone.utc + ), + "value": 2, + }, + ] + expected = [ + { + "timestamp": "2021-01-01 14:50:00+00:00", + "value": 1, + }, + { + "timestamp": "2021-01-01 14:55:00+00:00", + "value": 2, + }, + ] + + export_metrics.write_to_bq( + mock_client, + bigquery.TableReference( + bigquery.DatasetReference("project", "dataset"), "table" + ), + data_points, + overwrite=False, + ) + + mock_client.load_table_from_json.assert_called_once_with( + expected, + ANY, + job_config=ANY, + ) + with pytest.raises(AssertionError): + mock_client.load_table_from_json.assert_called_once_with( + data_points, + ANY, + job_config=ANY, + ) + + def test_filter_existing_data_table_not_found(self, test_table): + """Filter should return all data when target table is not found.""" + mock_client = MagicMock() + mock_query = MagicMock() + mock_query.result.side_effect = NotFound("") + mock_client.query.return_value = mock_query + + data_points = [ + {"timestamp": datetime.datetime(2021, 1, 1, 1), "value": 1}, + {"timestamp": datetime.datetime(2021, 1, 1, 2), "value": 2}, + {"timestamp": datetime.datetime(2021, 1, 1, 3), "value": 3}, + ] + results = export_metrics.filter_existing_data( + data_points, + mock_client, + test_table, + start_time=datetime.datetime(2021, 1, 1, 0), + end_time=datetime.datetime(2021, 1, 1, 10), + ) + assert results == data_points + + def test_filter_existing_data_no_existing(self, test_table): + """Filter should return all data when there is no existing data.""" + mock_client = MagicMock() + mock_query = MagicMock() + mock_query.result.return_value = [] + mock_client.query.return_value = mock_query + + data_points = [ + {"timestamp": datetime.datetime(2021, 1, 1, 1), "value": 1}, + {"timestamp": datetime.datetime(2021, 1, 1, 2), "value": 2}, + {"timestamp": datetime.datetime(2021, 1, 1, 3), "value": 3}, + ] + results = export_metrics.filter_existing_data( + data_points, + mock_client, + test_table, + start_time=datetime.datetime(2021, 1, 1, 0), + end_time=datetime.datetime(2021, 1, 1, 10), + ) + assert results == data_points + + def test_filter_existing_data_some_existing(self, test_table): + """Filter should remove data points that have a matching timestamp.""" + + def mock_row(timestamp): + row = MagicMock() + row.timestamp = timestamp + return row + + mock_client = MagicMock() + mock_query = MagicMock() + mock_query.result.return_value = [ + mock_row(datetime.datetime(2021, 1, 1, 2)), + mock_row(datetime.datetime(2021, 1, 1, 3)), + mock_row(datetime.datetime(2021, 1, 1, 4)), + ] + mock_client.query.return_value = mock_query + + data_points = [ + {"timestamp": datetime.datetime(2021, 1, 1, 1), "value": 1}, + {"timestamp": datetime.datetime(2021, 1, 1, 2), "value": 2}, + {"timestamp": datetime.datetime(2021, 1, 1, 3), "value": 3}, + ] + expected = [ + {"timestamp": datetime.datetime(2021, 1, 1, 1), "value": 1}, + ] + + results = export_metrics.filter_existing_data( + data_points, + mock_client, + test_table, + start_time=datetime.datetime(2021, 1, 1, 0), + end_time=datetime.datetime(2021, 1, 1, 10), + ) + assert results == expected