Skip to content
This repository has been archived by the owner on Nov 29, 2023. It is now read-only.

Commit

Permalink
feat: adding submit_job samples (#88)
Browse files Browse the repository at this point in the history
Adding submit_job samples and updating quickstart samples.
  • Loading branch information
bradmiro committed Sep 16, 2020
1 parent f2e66c7 commit e7379b5
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 55 deletions.
65 changes: 12 additions & 53 deletions samples/snippets/quickstart/quickstart.py
Expand Up @@ -27,7 +27,7 @@
"""

import argparse
import time
import re

from google.cloud import dataproc_v1 as dataproc
from google.cloud import storage
Expand Down Expand Up @@ -68,64 +68,23 @@ def quickstart(project_id, region, cluster_name, job_file_path):
"pyspark_job": {"main_python_file_uri": job_file_path},
}

job_response = job_client.submit_job(
operation = job_client.submit_job_as_operation(
request={"project_id": project_id, "region": region, "job": job}
)
job_id = job_response.reference.job_id
response = operation.result()

print('Submitted job "{}".'.format(job_id))
# Dataproc job output gets saved to the Google Cloud Storage bucket
# allocated to the job. Use a regex to obtain the bucket and blob info.
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)

# Termimal states for a job.
terminal_states = {
dataproc.JobStatus.State.ERROR,
dataproc.JobStatus.State.CANCELLED,
dataproc.JobStatus.State.DONE,
}

# Create a timeout such that the job gets cancelled if not in a
# terminal state after a fixed period of time.
timeout_seconds = 600
time_start = time.time()

# Wait for the job to complete.
while job_response.status.state not in terminal_states:
if time.time() > time_start + timeout_seconds:
job_client.cancel_job(
request={"project_id": project_id, "region": region, "job_id": job_id}
)
print(
"Job {} timed out after threshold of {} seconds.".format(
job_id, timeout_seconds
)
)

# Poll for job termination once a second.
time.sleep(1)
job_response = job_client.get_job(
request={"project_id": project_id, "region": region, "job_id": job_id}
)

# Cloud Dataproc job output gets saved to a GCS bucket allocated to it.
cluster_info = cluster_client.get_cluster(
request={
"project_id": project_id,
"region": region,
"cluster_name": cluster_name,
}
output = (
storage.Client()
.get_bucket(matches.group(1))
.blob(f"{matches.group(2)}.000000000")
.download_as_string()
)

storage_client = storage.Client()
bucket = storage_client.get_bucket(cluster_info.config.config_bucket)
output_blob = "google-cloud-dataproc-metainfo/{}/jobs/{}/driveroutput.000000000".format(
cluster_info.cluster_uuid, job_id
)
output = bucket.blob(output_blob).download_as_string()

print(
"Job {} finished with state {}:\n{}".format(
job_id, job_response.status.state.name, output
)
)
print(f"Job finished successfully: {output}")

# Delete the cluster once the job has terminated.
operation = cluster_client.delete_cluster(
Expand Down
3 changes: 1 addition & 2 deletions samples/snippets/quickstart/quickstart_test.py
Expand Up @@ -74,6 +74,5 @@ def test_quickstart(capsys):
out, _ = capsys.readouterr()

assert "Cluster created successfully" in out
assert "Submitted job" in out
assert "finished with state DONE:" in out
assert "Job finished successfully" in out
assert "successfully deleted" in out
80 changes: 80 additions & 0 deletions samples/snippets/submit_job.py
@@ -0,0 +1,80 @@
#!/usr/bin/env python

# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This sample walks a user through submitting a Spark job using the Dataproc
# client library.

# Usage:
# python submit_job.py --project_id <PROJECT_ID> --region <REGION> \
# --cluster_name <CLUSTER_NAME>

# [START dataproc_submit_job]
import re
# [END dataproc_submit_job]
import sys
# [START dataproc_submit_job]

from google.cloud import dataproc_v1 as dataproc
from google.cloud import storage


def submit_job(project_id, region, cluster_name):
# Create the job client.
job_client = dataproc.JobControllerClient(client_options={
'api_endpoint': '{}-dataproc.googleapis.com:443'.format(region)
})

# Create the job config. 'main_jar_file_uri' can also be a
# Google Cloud Storage URL.
job = {
'placement': {
'cluster_name': cluster_name
},
'spark_job': {
'main_class': 'org.apache.spark.examples.SparkPi',
'jar_file_uris': ['file:///usr/lib/spark/examples/jars/spark-examples.jar'],
'args': ['1000']
}
}

operation = job_client.submit_job_as_operation(
request={"project_id": project_id, "region": region, "job": job}
)
response = operation.result()

# Dataproc job output gets saved to the Google Cloud Storage bucket
# allocated to the job. Use a regex to obtain the bucket and blob info.
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)

output = (
storage.Client()
.get_bucket(matches.group(1))
.blob(f"{matches.group(2)}.000000000")
.download_as_string()
)

print(f"Job finished successfully: {output}")
# [END dataproc_submit_job]


if __name__ == "__main__":
if len(sys.argv) < 3:
sys.exit('python submit_job.py project_id region cluster_name')

project_id = sys.argv[1]
region = sys.argv[2]
cluster_name = sys.argv[3]
submit_job(project_id, region, cluster_name)
66 changes: 66 additions & 0 deletions samples/snippets/submit_job_test.py
@@ -0,0 +1,66 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import uuid

from google.cloud import dataproc_v1 as dataproc
import pytest

import submit_job


PROJECT_ID = os.environ['GOOGLE_CLOUD_PROJECT']
REGION = 'us-central1'
CLUSTER_NAME = 'py-sj-test-{}'.format(str(uuid.uuid4()))
CLUSTER = {
'project_id': PROJECT_ID,
'cluster_name': CLUSTER_NAME,
'config': {
'master_config': {
'num_instances': 1,
'machine_type_uri': 'n1-standard-1'
},
'worker_config': {
'num_instances': 2,
'machine_type_uri': 'n1-standard-1'
}
}
}


@pytest.fixture(autouse=True)
def setup_teardown():
cluster_client = dataproc.ClusterControllerClient(client_options={
'api_endpoint': '{}-dataproc.googleapis.com:443'.format(REGION)
})

# Create the cluster.
operation = cluster_client.create_cluster(
request={"project_id": PROJECT_ID, "region": REGION, "cluster": CLUSTER}
)
operation.result()

yield

cluster_client.delete_cluster(request={
"project_id": PROJECT_ID, "region": REGION, "cluster_name": CLUSTER_NAME
})


def test_submit_job(capsys):
submit_job.submit_job(PROJECT_ID, REGION, CLUSTER_NAME)
out, _ = capsys.readouterr()

assert 'Job finished successfully' in out

0 comments on commit e7379b5

Please sign in to comment.