Skip to content
This repository has been archived by the owner on Nov 29, 2023. It is now read-only.

docs: update cluster sample #218

Merged
merged 19 commits into from Aug 12, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 5 additions & 5 deletions samples/snippets/create_cluster.py
Expand Up @@ -29,12 +29,12 @@

def create_cluster(project_id, region, cluster_name):
"""This sample walks a user through creating a Cloud Dataproc cluster
using the Python client library.
using the Python client library.

Args:
project_id (string): Project to use for creating resources.
region (string): Region where the resources should live.
cluster_name (string): Name to use for creating a cluster.
Args:
project_id (string): Project to use for creating resources.
region (string): Region where the resources should live.
cluster_name (string): Name to use for creating a cluster.
"""

# Create a client with the endpoint set to the desired cluster region.
Expand Down
8 changes: 4 additions & 4 deletions samples/snippets/instantiate_inline_workflow_template.py
Expand Up @@ -27,11 +27,11 @@

def instantiate_inline_workflow_template(project_id, region):
"""This sample walks a user through submitting a workflow
for a Cloud Dataproc using the Python client library.
for a Cloud Dataproc using the Python client library.

Args:
project_id (string): Project to use for running the workflow.
region (string): Region where the workflow resources should live.
Args:
project_id (string): Project to use for running the workflow.
region (string): Region where the workflow resources should live.
"""

# Create a client with the endpoint set to the desired region.
Expand Down
6 changes: 4 additions & 2 deletions samples/snippets/list_clusters.py
Expand Up @@ -49,8 +49,10 @@ def main(project_id, region):
else:
# Use a regional gRPC endpoint. See:
# https://cloud.google.com/dataproc/docs/concepts/regional-endpoints
client_transport = cluster_controller_grpc_transport.ClusterControllerGrpcTransport(
address="{}-dataproc.googleapis.com:443".format(region)
client_transport = (
cluster_controller_grpc_transport.ClusterControllerGrpcTransport(
address="{}-dataproc.googleapis.com:443".format(region)
)
)
dataproc_cluster_client = dataproc_v1.ClusterControllerClient(client_transport)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to block this PR on this, but there's an easier way to set regional endpoints now through google.api_core.client_options. See this speech sample: https://cloud.google.com/speech-to-text/docs/endpoints


Expand Down
24 changes: 13 additions & 11 deletions samples/snippets/submit_job.py
Expand Up @@ -23,8 +23,10 @@

# [START dataproc_submit_job]
import re

# [END dataproc_submit_job]
import sys

# [START dataproc_submit_job]

from google.cloud import dataproc_v1 as dataproc
Expand All @@ -33,21 +35,19 @@

def submit_job(project_id, region, cluster_name):
# Create the job client.
job_client = dataproc.JobControllerClient(client_options={
'api_endpoint': '{}-dataproc.googleapis.com:443'.format(region)
})
job_client = dataproc.JobControllerClient(
client_options={"api_endpoint": "{}-dataproc.googleapis.com:443".format(region)}
)

# Create the job config. 'main_jar_file_uri' can also be a
# Google Cloud Storage URL.
job = {
'placement': {
'cluster_name': cluster_name
"placement": {"cluster_name": cluster_name},
"spark_job": {
"main_class": "org.apache.spark.examples.SparkPi",
"jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
"args": ["1000"],
},
'spark_job': {
'main_class': 'org.apache.spark.examples.SparkPi',
'jar_file_uris': ['file:///usr/lib/spark/examples/jars/spark-examples.jar'],
'args': ['1000']
}
}

operation = job_client.submit_job_as_operation(
Expand All @@ -67,12 +67,14 @@ def submit_job(project_id, region, cluster_name):
)

print(f"Job finished successfully: {output}")

busunkim96 marked this conversation as resolved.
Show resolved Hide resolved

# [END dataproc_submit_job]


if __name__ == "__main__":
if len(sys.argv) < 3:
sys.exit('python submit_job.py project_id region cluster_name')
sys.exit("python submit_job.py project_id region cluster_name")

project_id = sys.argv[1]
region = sys.argv[2]
Expand Down
42 changes: 20 additions & 22 deletions samples/snippets/submit_job_test.py
Expand Up @@ -21,30 +21,24 @@
import submit_job


PROJECT_ID = os.environ['GOOGLE_CLOUD_PROJECT']
REGION = 'us-central1'
CLUSTER_NAME = 'py-sj-test-{}'.format(str(uuid.uuid4()))
PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"]
REGION = "us-central1"
CLUSTER_NAME = "py-sj-test-{}".format(str(uuid.uuid4()))
CLUSTER = {
'project_id': PROJECT_ID,
'cluster_name': CLUSTER_NAME,
'config': {
'master_config': {
'num_instances': 1,
'machine_type_uri': 'n1-standard-2'
},
'worker_config': {
'num_instances': 2,
'machine_type_uri': 'n1-standard-2'
}
}
"project_id": PROJECT_ID,
"cluster_name": CLUSTER_NAME,
"config": {
"master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
"worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
},
}


@pytest.fixture(autouse=True)
def setup_teardown():
cluster_client = dataproc.ClusterControllerClient(client_options={
'api_endpoint': '{}-dataproc.googleapis.com:443'.format(REGION)
})
cluster_client = dataproc.ClusterControllerClient(
client_options={"api_endpoint": "{}-dataproc.googleapis.com:443".format(REGION)}
)

# Create the cluster.
operation = cluster_client.create_cluster(
Expand All @@ -54,13 +48,17 @@ def setup_teardown():

yield

cluster_client.delete_cluster(request={
"project_id": PROJECT_ID, "region": REGION, "cluster_name": CLUSTER_NAME
})
cluster_client.delete_cluster(
request={
"project_id": PROJECT_ID,
"region": REGION,
"cluster_name": CLUSTER_NAME,
}
)


def test_submit_job(capsys):
submit_job.submit_job(PROJECT_ID, REGION, CLUSTER_NAME)
out, _ = capsys.readouterr()

assert 'Job finished successfully' in out
assert "Job finished successfully" in out
12 changes: 8 additions & 4 deletions samples/snippets/submit_job_to_cluster.py
Expand Up @@ -77,8 +77,10 @@ def download_output(project, cluster_id, output_bucket, job_id):
print("Downloading output file.")
client = storage.Client(project=project)
bucket = client.get_bucket(output_bucket)
output_blob = "google-cloud-dataproc-metainfo/{}/jobs/{}/driveroutput.000000000".format(
cluster_id, job_id
output_blob = (
"google-cloud-dataproc-metainfo/{}/jobs/{}/driveroutput.000000000".format(
cluster_id, job_id
)
)
return bucket.blob(output_blob).download_as_string()

Expand Down Expand Up @@ -230,8 +232,10 @@ def main(
region = get_region_from_zone(zone)
# Use a regional gRPC endpoint. See:
# https://cloud.google.com/dataproc/docs/concepts/regional-endpoints
client_transport = cluster_controller_grpc_transport.ClusterControllerGrpcTransport(
address="{}-dataproc.googleapis.com:443".format(region)
client_transport = (
cluster_controller_grpc_transport.ClusterControllerGrpcTransport(
address="{}-dataproc.googleapis.com:443".format(region)
)
)
job_transport = job_controller_grpc_transport.JobControllerGrpcTransport(
address="{}-dataproc.googleapis.com:443".format(region)
Expand Down
67 changes: 67 additions & 0 deletions samples/snippets/update_cluster.py
@@ -0,0 +1,67 @@
#!/usr/bin/env python
# Licensed under the Apache License, Version 2.0 (the "License");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double check the license is correct here. You can take it from here and replace the year with 2021.

# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This sample walks a user through updating a Cloud Dataproc cluster using
# the Python client library.
#
# This script can be run on its own:
# python update_cluster.py ${PROJECT_ID} ${REGION} ${CLUSTER_NAME}

import sys

# [START dataproc_update_cluster]
from google.cloud import dataproc_v1 as dataproc


def update_cluster(project_id, region, cluster_name, new_num_instances):
"""This sample walks a user through updating a Cloud Dataproc cluster
using the Python client library.
Args:
loferris marked this conversation as resolved.
Show resolved Hide resolved
project_id (string): Project to use for creating resources.
region (string): Region where the resources should live.
cluster_name (string): Name to use for creating a cluster.
loferris marked this conversation as resolved.
Show resolved Hide resolved
"""
# Create a client with the endpoint set to the desired cluster region.
client = dataproc.ClusterControllerClient(
client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
)
# Get cluster you wish to update.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be a stylistic thing but I would recommend a line break before each new comment.

cluster = client.get_cluster(
project_id=project_id, region=region, cluster_name=cluster_name
)
# Update number of clusters
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line break, see above.

mask = {"paths": {"config.worker_config.num_instances": str(new_num_instances)}}
# Update cluster config
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line break, see above.

cluster.config.worker_config.num_instances = new_num_instances
# Update cluster
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line break, see above.

operation = client.update_cluster(
project_id=project_id,
region=region,
cluster=cluster,
cluster_name=cluster_name,
update_mask=mask,
)
# Output a success message.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line break, see above.

updated_cluster = operation.result()
print(f"Cluster was updated successfully: {updated_cluster.cluster_name}")


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think you're missing the END region tag here.

if __name__ == "__main__":
if len(sys.argv) < 5:
sys.exit("python update_cluster.py project_id region cluster_name")

project_id = sys.argv[1]
region = sys.argv[2]
cluster_name = sys.argv[3]
new_num_instances = sys.argv[4]
update_cluster(project_id, region, cluster_name)
56 changes: 56 additions & 0 deletions samples/snippets/update_cluster_test.py
@@ -0,0 +1,56 @@
# Copyright 2019 Google LLC
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Copyright 2019 Google LLC
# Copyright 2021 Google LLC

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change this to 2021.

#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import uuid

from google.cloud import dataproc_v1 as dataproc
import pytest

import create_cluster
import update_cluster


PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"]
REGION = "us-central1"
CLUSTER_NAME = "py-cc-test-{}".format(str(uuid.uuid4()))
NEW_NUM_INSTANCES = 5


@pytest.fixture(autouse=True)
def teardown():
yield

cluster_client = dataproc.ClusterControllerClient(
client_options={"api_endpoint": f"{REGION}-dataproc.googleapis.com:443"}
)
# Client library function
operation = cluster_client.delete_cluster(
request={
"project_id": PROJECT_ID,
"region": REGION,
"cluster_name": CLUSTER_NAME,
}
)
# Wait for cluster to delete
operation.result()


def test_update_cluster(capsys):
# Wrapper function for client library function
create_cluster.create_cluster(PROJECT_ID, REGION, CLUSTER_NAME)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend implementing the logic for creating the cluster as a part of the setup, similar to here.

At least for Python, I typically tried not to make samples co-dependent on each other.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is done, just waiting on unblocking get_cluster to commit

update_cluster.update_cluster(PROJECT_ID, REGION, CLUSTER_NAME, NEW_NUM_INSTANCES)

out, _ = capsys.readouterr()
assert CLUSTER_NAME in out
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the test, I would instead suggest a get_cluster call and confirm that the number of instances matches NEW_NUM_INSTANCES.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been calling dataproc.ClusterControllerClient.get_cluster(PROJECT_ID).config to try and get the worker_config, but it doesn't let me get any deeper into the cluster object than config. I'm not sure how to get in there currently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, the following ended up working for me when pointed to a cluster that I have with two workers. Did you also provide the region / cluster_name to get_cluster?

from google.cloud import dataproc

PROJECT_ID = "my-project"
REGION = "my-region"
CLUSTER_NAME = "my-cluster"

client = dataproc.ClusterControllerClient(
                  client_options={
                      "api_endpoint": "{}-dataproc.googleapis.com:443".format(REGION)
                   }
               )

cluster = client.get_cluster(project_id=PROJECT_ID, region=REGION, cluster_name=CLUSTER_NAME)

print(cluster.config.worker_config.num_instances) # should print an integer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bradmiro I haven't been able to get it working in the context of a test. My recent commit has three options I've tried: creating a fixture, creating a static cluster, and creating the cluster in the test. I still can't get in deeper than config. (This may be a Pythonic learning curve thing.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually does seem to be working in the current commit in CI!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final commit ready for review!