From e7379b5ab45a0c1e5b6944330c3e8ae4faa115e8 Mon Sep 17 00:00:00 2001 From: Brad Miro Date: Wed, 16 Sep 2020 15:53:38 -0400 Subject: [PATCH] feat: adding submit_job samples (#88) Adding submit_job samples and updating quickstart samples. --- samples/snippets/quickstart/quickstart.py | 65 +++------------ .../snippets/quickstart/quickstart_test.py | 3 +- samples/snippets/submit_job.py | 80 +++++++++++++++++++ samples/snippets/submit_job_test.py | 66 +++++++++++++++ 4 files changed, 159 insertions(+), 55 deletions(-) create mode 100644 samples/snippets/submit_job.py create mode 100644 samples/snippets/submit_job_test.py diff --git a/samples/snippets/quickstart/quickstart.py b/samples/snippets/quickstart/quickstart.py index 68f0bdf4..45e7f348 100644 --- a/samples/snippets/quickstart/quickstart.py +++ b/samples/snippets/quickstart/quickstart.py @@ -27,7 +27,7 @@ """ import argparse -import time +import re from google.cloud import dataproc_v1 as dataproc from google.cloud import storage @@ -68,64 +68,23 @@ def quickstart(project_id, region, cluster_name, job_file_path): "pyspark_job": {"main_python_file_uri": job_file_path}, } - job_response = job_client.submit_job( + operation = job_client.submit_job_as_operation( request={"project_id": project_id, "region": region, "job": job} ) - job_id = job_response.reference.job_id + response = operation.result() - print('Submitted job "{}".'.format(job_id)) + # Dataproc job output gets saved to the Google Cloud Storage bucket + # allocated to the job. Use a regex to obtain the bucket and blob info. + matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri) - # Termimal states for a job. - terminal_states = { - dataproc.JobStatus.State.ERROR, - dataproc.JobStatus.State.CANCELLED, - dataproc.JobStatus.State.DONE, - } - - # Create a timeout such that the job gets cancelled if not in a - # terminal state after a fixed period of time. - timeout_seconds = 600 - time_start = time.time() - - # Wait for the job to complete. - while job_response.status.state not in terminal_states: - if time.time() > time_start + timeout_seconds: - job_client.cancel_job( - request={"project_id": project_id, "region": region, "job_id": job_id} - ) - print( - "Job {} timed out after threshold of {} seconds.".format( - job_id, timeout_seconds - ) - ) - - # Poll for job termination once a second. - time.sleep(1) - job_response = job_client.get_job( - request={"project_id": project_id, "region": region, "job_id": job_id} - ) - - # Cloud Dataproc job output gets saved to a GCS bucket allocated to it. - cluster_info = cluster_client.get_cluster( - request={ - "project_id": project_id, - "region": region, - "cluster_name": cluster_name, - } + output = ( + storage.Client() + .get_bucket(matches.group(1)) + .blob(f"{matches.group(2)}.000000000") + .download_as_string() ) - storage_client = storage.Client() - bucket = storage_client.get_bucket(cluster_info.config.config_bucket) - output_blob = "google-cloud-dataproc-metainfo/{}/jobs/{}/driveroutput.000000000".format( - cluster_info.cluster_uuid, job_id - ) - output = bucket.blob(output_blob).download_as_string() - - print( - "Job {} finished with state {}:\n{}".format( - job_id, job_response.status.state.name, output - ) - ) + print(f"Job finished successfully: {output}") # Delete the cluster once the job has terminated. operation = cluster_client.delete_cluster( diff --git a/samples/snippets/quickstart/quickstart_test.py b/samples/snippets/quickstart/quickstart_test.py index 9ea46cd1..38ae8b89 100644 --- a/samples/snippets/quickstart/quickstart_test.py +++ b/samples/snippets/quickstart/quickstart_test.py @@ -74,6 +74,5 @@ def test_quickstart(capsys): out, _ = capsys.readouterr() assert "Cluster created successfully" in out - assert "Submitted job" in out - assert "finished with state DONE:" in out + assert "Job finished successfully" in out assert "successfully deleted" in out diff --git a/samples/snippets/submit_job.py b/samples/snippets/submit_job.py new file mode 100644 index 00000000..b70348c3 --- /dev/null +++ b/samples/snippets/submit_job.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python + +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This sample walks a user through submitting a Spark job using the Dataproc +# client library. + +# Usage: +# python submit_job.py --project_id --region \ +# --cluster_name + +# [START dataproc_submit_job] +import re +# [END dataproc_submit_job] +import sys +# [START dataproc_submit_job] + +from google.cloud import dataproc_v1 as dataproc +from google.cloud import storage + + +def submit_job(project_id, region, cluster_name): + # Create the job client. + job_client = dataproc.JobControllerClient(client_options={ + 'api_endpoint': '{}-dataproc.googleapis.com:443'.format(region) + }) + + # Create the job config. 'main_jar_file_uri' can also be a + # Google Cloud Storage URL. + job = { + 'placement': { + 'cluster_name': cluster_name + }, + 'spark_job': { + 'main_class': 'org.apache.spark.examples.SparkPi', + 'jar_file_uris': ['file:///usr/lib/spark/examples/jars/spark-examples.jar'], + 'args': ['1000'] + } + } + + operation = job_client.submit_job_as_operation( + request={"project_id": project_id, "region": region, "job": job} + ) + response = operation.result() + + # Dataproc job output gets saved to the Google Cloud Storage bucket + # allocated to the job. Use a regex to obtain the bucket and blob info. + matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri) + + output = ( + storage.Client() + .get_bucket(matches.group(1)) + .blob(f"{matches.group(2)}.000000000") + .download_as_string() + ) + + print(f"Job finished successfully: {output}") +# [END dataproc_submit_job] + + +if __name__ == "__main__": + if len(sys.argv) < 3: + sys.exit('python submit_job.py project_id region cluster_name') + + project_id = sys.argv[1] + region = sys.argv[2] + cluster_name = sys.argv[3] + submit_job(project_id, region, cluster_name) diff --git a/samples/snippets/submit_job_test.py b/samples/snippets/submit_job_test.py new file mode 100644 index 00000000..0cad6248 --- /dev/null +++ b/samples/snippets/submit_job_test.py @@ -0,0 +1,66 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import uuid + +from google.cloud import dataproc_v1 as dataproc +import pytest + +import submit_job + + +PROJECT_ID = os.environ['GOOGLE_CLOUD_PROJECT'] +REGION = 'us-central1' +CLUSTER_NAME = 'py-sj-test-{}'.format(str(uuid.uuid4())) +CLUSTER = { + 'project_id': PROJECT_ID, + 'cluster_name': CLUSTER_NAME, + 'config': { + 'master_config': { + 'num_instances': 1, + 'machine_type_uri': 'n1-standard-1' + }, + 'worker_config': { + 'num_instances': 2, + 'machine_type_uri': 'n1-standard-1' + } + } +} + + +@pytest.fixture(autouse=True) +def setup_teardown(): + cluster_client = dataproc.ClusterControllerClient(client_options={ + 'api_endpoint': '{}-dataproc.googleapis.com:443'.format(REGION) + }) + + # Create the cluster. + operation = cluster_client.create_cluster( + request={"project_id": PROJECT_ID, "region": REGION, "cluster": CLUSTER} + ) + operation.result() + + yield + + cluster_client.delete_cluster(request={ + "project_id": PROJECT_ID, "region": REGION, "cluster_name": CLUSTER_NAME + }) + + +def test_submit_job(capsys): + submit_job.submit_job(PROJECT_ID, REGION, CLUSTER_NAME) + out, _ = capsys.readouterr() + + assert 'Job finished successfully' in out