diff --git a/samples/snippets/create_cluster.py b/samples/snippets/create_cluster.py index f4fee7d8..633b59e8 100644 --- a/samples/snippets/create_cluster.py +++ b/samples/snippets/create_cluster.py @@ -29,12 +29,12 @@ def create_cluster(project_id, region, cluster_name): """This sample walks a user through creating a Cloud Dataproc cluster - using the Python client library. + using the Python client library. - Args: - project_id (string): Project to use for creating resources. - region (string): Region where the resources should live. - cluster_name (string): Name to use for creating a cluster. + Args: + project_id (string): Project to use for creating resources. + region (string): Region where the resources should live. + cluster_name (string): Name to use for creating a cluster. """ # Create a client with the endpoint set to the desired cluster region. diff --git a/samples/snippets/instantiate_inline_workflow_template.py b/samples/snippets/instantiate_inline_workflow_template.py index b3a40d13..cbb1a218 100644 --- a/samples/snippets/instantiate_inline_workflow_template.py +++ b/samples/snippets/instantiate_inline_workflow_template.py @@ -27,11 +27,11 @@ def instantiate_inline_workflow_template(project_id, region): """This sample walks a user through submitting a workflow - for a Cloud Dataproc using the Python client library. + for a Cloud Dataproc using the Python client library. - Args: - project_id (string): Project to use for running the workflow. - region (string): Region where the workflow resources should live. + Args: + project_id (string): Project to use for running the workflow. + region (string): Region where the workflow resources should live. """ # Create a client with the endpoint set to the desired region. diff --git a/samples/snippets/list_clusters.py b/samples/snippets/list_clusters.py index 916f1a54..3ecb8aeb 100644 --- a/samples/snippets/list_clusters.py +++ b/samples/snippets/list_clusters.py @@ -49,8 +49,10 @@ def main(project_id, region): else: # Use a regional gRPC endpoint. See: # https://cloud.google.com/dataproc/docs/concepts/regional-endpoints - client_transport = cluster_controller_grpc_transport.ClusterControllerGrpcTransport( - address="{}-dataproc.googleapis.com:443".format(region) + client_transport = ( + cluster_controller_grpc_transport.ClusterControllerGrpcTransport( + address="{}-dataproc.googleapis.com:443".format(region) + ) ) dataproc_cluster_client = dataproc_v1.ClusterControllerClient(client_transport) diff --git a/samples/snippets/submit_job.py b/samples/snippets/submit_job.py index b70348c3..d7761b73 100644 --- a/samples/snippets/submit_job.py +++ b/samples/snippets/submit_job.py @@ -23,8 +23,10 @@ # [START dataproc_submit_job] import re + # [END dataproc_submit_job] import sys + # [START dataproc_submit_job] from google.cloud import dataproc_v1 as dataproc @@ -33,21 +35,19 @@ def submit_job(project_id, region, cluster_name): # Create the job client. - job_client = dataproc.JobControllerClient(client_options={ - 'api_endpoint': '{}-dataproc.googleapis.com:443'.format(region) - }) + job_client = dataproc.JobControllerClient( + client_options={"api_endpoint": "{}-dataproc.googleapis.com:443".format(region)} + ) # Create the job config. 'main_jar_file_uri' can also be a # Google Cloud Storage URL. job = { - 'placement': { - 'cluster_name': cluster_name + "placement": {"cluster_name": cluster_name}, + "spark_job": { + "main_class": "org.apache.spark.examples.SparkPi", + "jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"], + "args": ["1000"], }, - 'spark_job': { - 'main_class': 'org.apache.spark.examples.SparkPi', - 'jar_file_uris': ['file:///usr/lib/spark/examples/jars/spark-examples.jar'], - 'args': ['1000'] - } } operation = job_client.submit_job_as_operation( @@ -67,12 +67,14 @@ def submit_job(project_id, region, cluster_name): ) print(f"Job finished successfully: {output}") + + # [END dataproc_submit_job] if __name__ == "__main__": if len(sys.argv) < 3: - sys.exit('python submit_job.py project_id region cluster_name') + sys.exit("python submit_job.py project_id region cluster_name") project_id = sys.argv[1] region = sys.argv[2] diff --git a/samples/snippets/submit_job_test.py b/samples/snippets/submit_job_test.py index 326b38d5..6827916f 100644 --- a/samples/snippets/submit_job_test.py +++ b/samples/snippets/submit_job_test.py @@ -21,30 +21,24 @@ import submit_job -PROJECT_ID = os.environ['GOOGLE_CLOUD_PROJECT'] -REGION = 'us-central1' -CLUSTER_NAME = 'py-sj-test-{}'.format(str(uuid.uuid4())) +PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"] +REGION = "us-central1" +CLUSTER_NAME = "py-sj-test-{}".format(str(uuid.uuid4())) CLUSTER = { - 'project_id': PROJECT_ID, - 'cluster_name': CLUSTER_NAME, - 'config': { - 'master_config': { - 'num_instances': 1, - 'machine_type_uri': 'n1-standard-2' - }, - 'worker_config': { - 'num_instances': 2, - 'machine_type_uri': 'n1-standard-2' - } - } + "project_id": PROJECT_ID, + "cluster_name": CLUSTER_NAME, + "config": { + "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"}, + "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"}, + }, } @pytest.fixture(autouse=True) def setup_teardown(): - cluster_client = dataproc.ClusterControllerClient(client_options={ - 'api_endpoint': '{}-dataproc.googleapis.com:443'.format(REGION) - }) + cluster_client = dataproc.ClusterControllerClient( + client_options={"api_endpoint": "{}-dataproc.googleapis.com:443".format(REGION)} + ) # Create the cluster. operation = cluster_client.create_cluster( @@ -54,13 +48,17 @@ def setup_teardown(): yield - cluster_client.delete_cluster(request={ - "project_id": PROJECT_ID, "region": REGION, "cluster_name": CLUSTER_NAME - }) + cluster_client.delete_cluster( + request={ + "project_id": PROJECT_ID, + "region": REGION, + "cluster_name": CLUSTER_NAME, + } + ) def test_submit_job(capsys): submit_job.submit_job(PROJECT_ID, REGION, CLUSTER_NAME) out, _ = capsys.readouterr() - assert 'Job finished successfully' in out + assert "Job finished successfully" in out diff --git a/samples/snippets/submit_job_to_cluster.py b/samples/snippets/submit_job_to_cluster.py index 35d329c5..68a547c4 100644 --- a/samples/snippets/submit_job_to_cluster.py +++ b/samples/snippets/submit_job_to_cluster.py @@ -77,8 +77,10 @@ def download_output(project, cluster_id, output_bucket, job_id): print("Downloading output file.") client = storage.Client(project=project) bucket = client.get_bucket(output_bucket) - output_blob = "google-cloud-dataproc-metainfo/{}/jobs/{}/driveroutput.000000000".format( - cluster_id, job_id + output_blob = ( + "google-cloud-dataproc-metainfo/{}/jobs/{}/driveroutput.000000000".format( + cluster_id, job_id + ) ) return bucket.blob(output_blob).download_as_string() @@ -230,8 +232,10 @@ def main( region = get_region_from_zone(zone) # Use a regional gRPC endpoint. See: # https://cloud.google.com/dataproc/docs/concepts/regional-endpoints - client_transport = cluster_controller_grpc_transport.ClusterControllerGrpcTransport( - address="{}-dataproc.googleapis.com:443".format(region) + client_transport = ( + cluster_controller_grpc_transport.ClusterControllerGrpcTransport( + address="{}-dataproc.googleapis.com:443".format(region) + ) ) job_transport = job_controller_grpc_transport.JobControllerGrpcTransport( address="{}-dataproc.googleapis.com:443".format(region) diff --git a/samples/snippets/update_cluster.py b/samples/snippets/update_cluster.py new file mode 100644 index 00000000..f4520224 --- /dev/null +++ b/samples/snippets/update_cluster.py @@ -0,0 +1,78 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This sample walks a user through updating the number of clusters using the Dataproc +# client library. + +# Usage: +# python update_cluster.py --project_id --region --cluster_name + +import sys + +# [START dataproc_update_cluster] +from google.cloud import dataproc_v1 as dataproc + + +def update_cluster(project_id, region, cluster_name, new_num_instances): + """This sample walks a user through updating a Cloud Dataproc cluster + using the Python client library. + + Args: + project_id (str): Project to use for creating resources. + region (str): Region where the resources should live. + cluster_name (str): Name to use for creating a cluster. + """ + + # Create a client with the endpoint set to the desired cluster region. + client = dataproc.ClusterControllerClient( + client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"} + ) + + # Get cluster you wish to update. + cluster = client.get_cluster( + project_id=project_id, region=region, cluster_name=cluster_name + ) + + # Update number of clusters + mask = {"paths": {"config.worker_config.num_instances": str(new_num_instances)}} + + # Update cluster config + cluster.config.worker_config.num_instances = new_num_instances + + # Update cluster + operation = client.update_cluster( + project_id=project_id, + region=region, + cluster=cluster, + cluster_name=cluster_name, + update_mask=mask, + ) + + # Output a success message. + updated_cluster = operation.result() + print(f"Cluster was updated successfully: {updated_cluster.cluster_name}") + + +# [END dataproc_update_cluster] + + +if __name__ == "__main__": + if len(sys.argv) < 5: + sys.exit("python update_cluster.py project_id region cluster_name") + + project_id = sys.argv[1] + region = sys.argv[2] + cluster_name = sys.argv[3] + new_num_instances = sys.argv[4] + update_cluster(project_id, region, cluster_name) diff --git a/samples/snippets/update_cluster_test.py b/samples/snippets/update_cluster_test.py new file mode 100644 index 00000000..9c608ac4 --- /dev/null +++ b/samples/snippets/update_cluster_test.py @@ -0,0 +1,80 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This sample walks a user through updating the number of clusters using the Dataproc +# client library. + + +import os +import uuid + +from google.cloud.dataproc_v1.services.cluster_controller.client import ( + ClusterControllerClient, +) +import pytest + +import update_cluster + + +PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"] +REGION = "us-central1" +CLUSTER_NAME = f"py-cc-test-{str(uuid.uuid4())}" +NEW_NUM_INSTANCES = 5 +CLUSTER = { + "project_id": PROJECT_ID, + "cluster_name": CLUSTER_NAME, + "config": { + "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"}, + "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"}, + }, +} + + +@pytest.fixture(autouse=True) +def setup_teardown(cluster_client): + # Create the cluster. + operation = cluster_client.create_cluster( + request={"project_id": PROJECT_ID, "region": REGION, "cluster": CLUSTER} + ) + operation.result() + + yield + + cluster_client.delete_cluster( + request={ + "project_id": PROJECT_ID, + "region": REGION, + "cluster_name": CLUSTER_NAME, + } + ) + + +@pytest.fixture +def cluster_client(): + cluster_client = ClusterControllerClient( + client_options={"api_endpoint": "{}-dataproc.googleapis.com:443".format(REGION)} + ) + return cluster_client + + +def test_update_cluster(capsys, cluster_client: ClusterControllerClient): + # Wrapper function for client library function + update_cluster.update_cluster(PROJECT_ID, REGION, CLUSTER_NAME, NEW_NUM_INSTANCES) + new_num_cluster = cluster_client.get_cluster( + project_id=PROJECT_ID, region=REGION, cluster_name=CLUSTER_NAME + ) + + out, _ = capsys.readouterr() + assert CLUSTER_NAME in out + assert new_num_cluster.config.worker_config.num_instances == NEW_NUM_INSTANCES