From 09b5af8702e50ad1c16604ebfd5969a6ad785885 Mon Sep 17 00:00:00 2001 From: Yaqi Date: Tue, 26 Oct 2021 17:20:36 -0700 Subject: [PATCH 1/8] feat: update PipelineJob to accept protobuf value --- .../cloud/aiplatform/utils/pipeline_utils.py | 54 +++-- tests/unit/aiplatform/test_pipeline_jobs.py | 221 ++++++++++-------- 2 files changed, 152 insertions(+), 123 deletions(-) diff --git a/google/cloud/aiplatform/utils/pipeline_utils.py b/google/cloud/aiplatform/utils/pipeline_utils.py index bc531d2b12..39c178f165 100644 --- a/google/cloud/aiplatform/utils/pipeline_utils.py +++ b/google/cloud/aiplatform/utils/pipeline_utils.py @@ -64,9 +64,13 @@ def from_job_spec_json( .get("inputDefinitions", {}) .get("parameters", {}) ) - parameter_types = {k: v["type"] for k, v in parameter_input_definitions.items()} + # 'type' is deprecated in IR and change to 'parameterType'. + parameter_types = { + k: v.get("type") or v.get("parameterType") + for k, v in parameter_input_definitions.items() + } - pipeline_root = runtime_config_spec.get("gcs_output_directory") + pipeline_root = runtime_config_spec.get("gcsOutputDirectory") parameter_values = _parse_runtime_parameters(runtime_config_spec) return cls(pipeline_root, parameter_types, parameter_values) @@ -108,7 +112,7 @@ def build(self) -> Dict[str, Any]: "compile time, or when calling the service." ) return { - "gcs_output_directory": self._pipeline_root, + "gcsOutputDirectory": self._pipeline_root, "parameters": { k: self._get_vertex_value(k, v) for k, v in self._parameter_values.items() @@ -117,14 +121,14 @@ def build(self) -> Dict[str, Any]: } def _get_vertex_value( - self, name: str, value: Union[int, float, str] + self, name: str, value: Union[int, float, str, bool, list, dict] ) -> Dict[str, Any]: """Converts primitive values into Vertex pipeline Value proto message. Args: name (str): Required. The name of the pipeline parameter. - value (Union[int, float, str]): + value (Union[int, float, str, bool, list, dict]): Required. The value of the pipeline parameter. Returns: @@ -150,6 +154,16 @@ def _get_vertex_value( result["doubleValue"] = value elif self._parameter_types[name] == "STRING": result["stringValue"] = value + elif self._parameter_types[name] == "BOOLEAN": + result["boolValue"] = value + elif self._parameter_types[name] == "NUMBER_DOUBLE": + result["numberValue"] = value + elif self._parameter_types[name] == "NUMBER_INTEGER": + result["numberValue"] = value + elif self._parameter_types[name] == "LIST": + result["listValue"] = value + elif self._parameter_types[name] == "STRUCT": + result["structValue"] = value else: raise TypeError("Got unknown type of value: {}".format(value)) @@ -164,19 +178,17 @@ def _parse_runtime_parameters( Raises: TypeError: if the parameter type is not one of 'INT', 'DOUBLE', 'STRING'. """ - runtime_parameters = runtime_config_spec.get("parameters") - if not runtime_parameters: - return None - - result = {} - for name, value in runtime_parameters.items(): - if "intValue" in value: - result[name] = int(value["intValue"]) - elif "doubleValue" in value: - result[name] = float(value["doubleValue"]) - elif "stringValue" in value: - result[name] = value["stringValue"] - else: - raise TypeError("Got unknown type of value: {}".format(value)) - - return result + # 'parameters' are deprecated in IR and changed to 'parameterValues'. + if runtime_config_spec.get("parameters") is not None: + result = {} + for name, value in runtime_config_spec.get("parameters").items(): + if "intValue" in value: + result[name] = int(value["intValue"]) + elif "doubleValue" in value: + result[name] = float(value["doubleValue"]) + elif "stringValue" in value: + result[name] = value["stringValue"] + else: + raise TypeError("Got unknown type of value: {}".format(value)) + return result + return runtime_config_spec.get("parameterValues") diff --git a/tests/unit/aiplatform/test_pipeline_jobs.py b/tests/unit/aiplatform/test_pipeline_jobs.py index d6580de24d..89e167886f 100644 --- a/tests/unit/aiplatform/test_pipeline_jobs.py +++ b/tests/unit/aiplatform/test_pipeline_jobs.py @@ -52,17 +52,65 @@ _TEST_PIPELINE_JOB_NAME = f"projects/{_TEST_PROJECT}/locations/{_TEST_LOCATION}/pipelineJobs/{_TEST_PIPELINE_JOB_ID}" -_TEST_PIPELINE_PARAMETER_VALUES = {"name_param": "hello"} +_TEST_PIPELINE_PARAMETER_VALUES = {"string_param": "hello"} +_TEST_PIPELINE_SPEC_LEGACY = { + "pipelineInfo": {"name": "my-pipeline"}, + "root": { + "dag": {"tasks": {}}, + "inputDefinitions": {"parameters": {"string_param": {"type": "STRING"}}}, + }, + "components": {}, +} _TEST_PIPELINE_SPEC = { "pipelineInfo": {"name": "my-pipeline"}, "root": { "dag": {"tasks": {}}, - "inputDefinitions": {"parameters": {"name_param": {"type": "STRING"}}}, + "inputDefinitions": { + "parameters": { + "string_param": {"parameterType": "STRING"}, + # uncomment when GAPIC library change for protobufValue is in + # "bool_param": { + # "parameterType": "BOOLEAN" + # }, + # "double_param": { + # "parameterType": "NUMBER_DOUBLE" + # }, + # "int_param": { + # "parameterType": "NUMBER_INTEGER" + # }, + # "list_int_param": { + # "parameterType": "LIST" + # }, + # "list_string_param": { + # "parameterType": "LIST" + # }, + # "struct_param": { + # "parameterType": "STRUCT" + # } + } + }, }, "components": {}, } -_TEST_PIPELINE_JOB = { + +_TEST_PIPELINE_JOB_LEGACY = { "runtimeConfig": {}, + "pipelineSpec": _TEST_PIPELINE_SPEC_LEGACY, +} + +_TEST_PIPELINE_JOB = { + "runtimeConfig": { + "parameterValues": { + "string_param": "lorem ipsum", + # uncomment when GAPIC library change for protobufValue is in + # "bool_param": True, + # "double_param": 12.34, + # "int_param": 5678, + # "list_int_param": [123, 456, 789], + # "list_string_param": ["lorem", "ipsum"], + # "struct_param": { "key1": 12345, "key2": 67890} + }, + }, "pipelineSpec": _TEST_PIPELINE_SPEC, } @@ -176,23 +224,10 @@ def mock_pipeline_service_list(): @pytest.fixture -def mock_load_pipeline_job_json(): - with patch.object(storage.Blob, "download_as_bytes") as mock_load_pipeline_job_json: - mock_load_pipeline_job_json.return_value = json.dumps( - _TEST_PIPELINE_JOB - ).encode() - yield mock_load_pipeline_job_json - - -@pytest.fixture -def mock_load_pipeline_spec_json(): - with patch.object( - storage.Blob, "download_as_bytes" - ) as mock_load_pipeline_spec_json: - mock_load_pipeline_spec_json.return_value = json.dumps( - _TEST_PIPELINE_SPEC - ).encode() - yield mock_load_pipeline_spec_json +def mock_load_json(job_spec_json): + with patch.object(storage.Blob, "download_as_bytes") as mock_load_json: + mock_load_json.return_value = json.dumps(job_spec_json).encode() + yield mock_load_json class TestPipelineJob: @@ -213,10 +248,23 @@ def setup_method(self): def teardown_method(self): initializer.global_pool.shutdown(wait=True) - @pytest.mark.usefixtures("mock_load_pipeline_job_json") + @pytest.mark.parametrize( + "job_spec_json", + [ + _TEST_PIPELINE_SPEC, + _TEST_PIPELINE_JOB, + _TEST_PIPELINE_SPEC_LEGACY, + _TEST_PIPELINE_JOB_LEGACY, + ], + ) @pytest.mark.parametrize("sync", [True, False]) - def test_run_call_pipeline_service_pipeline_job_create( - self, mock_pipeline_service_create, mock_pipeline_service_get, sync, + def test_run_call_pipeline_service_create( + self, + mock_pipeline_service_create, + mock_pipeline_service_get, + job_spec_json, + mock_load_json, + sync, ): aiplatform.init( project=_TEST_PROJECT, @@ -242,77 +290,20 @@ def test_run_call_pipeline_service_pipeline_job_create( expected_runtime_config_dict = { "gcs_output_directory": _TEST_GCS_BUCKET_NAME, - "parameters": {"name_param": {"stringValue": "hello"}}, + "parameters": {"string_param": {"stringValue": "hello"}}, } runtime_config = gca_pipeline_job_v1beta1.PipelineJob.RuntimeConfig()._pb json_format.ParseDict(expected_runtime_config_dict, runtime_config) - # Construct expected request - expected_gapic_pipeline_job = gca_pipeline_job_v1beta1.PipelineJob( - display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME, - pipeline_spec={ - "components": {}, - "pipelineInfo": _TEST_PIPELINE_JOB["pipelineSpec"]["pipelineInfo"], - "root": _TEST_PIPELINE_JOB["pipelineSpec"]["root"], - }, - runtime_config=runtime_config, - service_account=_TEST_SERVICE_ACCOUNT, - network=_TEST_NETWORK, - ) - - mock_pipeline_service_create.assert_called_once_with( - parent=_TEST_PARENT, - pipeline_job=expected_gapic_pipeline_job, - pipeline_job_id=_TEST_PIPELINE_JOB_ID, - ) - - mock_pipeline_service_get.assert_called_with(name=_TEST_PIPELINE_JOB_NAME) - - assert job._gca_resource == make_pipeline_job( - gca_pipeline_state_v1beta1.PipelineState.PIPELINE_STATE_SUCCEEDED - ) - - @pytest.mark.usefixtures("mock_load_pipeline_spec_json") - @pytest.mark.parametrize("sync", [True, False]) - def test_run_call_pipeline_service_pipeline_spec_create( - self, mock_pipeline_service_create, mock_pipeline_service_get, sync, - ): - aiplatform.init( - project=_TEST_PROJECT, - staging_bucket=_TEST_GCS_BUCKET_NAME, - location=_TEST_LOCATION, - credentials=_TEST_CREDENTIALS, - ) - - job = pipeline_jobs.PipelineJob( - display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME, - template_path=_TEST_TEMPLATE_PATH, - job_id=_TEST_PIPELINE_JOB_ID, - parameter_values=_TEST_PIPELINE_PARAMETER_VALUES, - enable_caching=True, - ) - - job.run( - service_account=_TEST_SERVICE_ACCOUNT, network=_TEST_NETWORK, sync=sync, - ) - - if not sync: - job.wait() - - expected_runtime_config_dict = { - "gcs_output_directory": _TEST_GCS_BUCKET_NAME, - "parameters": {"name_param": {"stringValue": "hello"}}, - } - runtime_config = gca_pipeline_job_v1beta1.PipelineJob.RuntimeConfig()._pb - json_format.ParseDict(expected_runtime_config_dict, runtime_config) + pipeline_spec = job_spec_json.get("pipelineSpec") or job_spec_json # Construct expected request expected_gapic_pipeline_job = gca_pipeline_job_v1beta1.PipelineJob( display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME, pipeline_spec={ "components": {}, - "pipelineInfo": _TEST_PIPELINE_JOB["pipelineSpec"]["pipelineInfo"], - "root": _TEST_PIPELINE_JOB["pipelineSpec"]["root"], + "pipelineInfo": pipeline_spec["pipelineInfo"], + "root": pipeline_spec["root"], }, runtime_config=runtime_config, service_account=_TEST_SERVICE_ACCOUNT, @@ -340,13 +331,18 @@ def test_get_pipeline_job(self, mock_pipeline_service_get): assert isinstance(job, pipeline_jobs.PipelineJob) @pytest.mark.usefixtures( - "mock_pipeline_service_create", - "mock_pipeline_service_get", - "mock_load_pipeline_job_json", + "mock_pipeline_service_create", "mock_pipeline_service_get", ) - def test_cancel_pipeline_job( - self, mock_pipeline_service_cancel, - ): + @pytest.mark.parametrize( + "job_spec_json", + [ + _TEST_PIPELINE_SPEC, + _TEST_PIPELINE_JOB, + _TEST_PIPELINE_SPEC_LEGACY, + _TEST_PIPELINE_JOB_LEGACY, + ], + ) + def test_cancel_pipeline_job(self, mock_pipeline_service_cancel, mock_load_json): aiplatform.init( project=_TEST_PROJECT, staging_bucket=_TEST_GCS_BUCKET_NAME, @@ -367,11 +363,18 @@ def test_cancel_pipeline_job( ) @pytest.mark.usefixtures( - "mock_pipeline_service_create", - "mock_pipeline_service_get", - "mock_load_pipeline_job_json", + "mock_pipeline_service_create", "mock_pipeline_service_get", + ) + @pytest.mark.parametrize( + "job_spec_json", + [ + _TEST_PIPELINE_SPEC, + _TEST_PIPELINE_JOB, + _TEST_PIPELINE_SPEC_LEGACY, + _TEST_PIPELINE_JOB_LEGACY, + ], ) - def test_list_pipeline_job(self, mock_pipeline_service_list): + def test_list_pipeline_job(self, mock_pipeline_service_list, mock_load_json): aiplatform.init( project=_TEST_PROJECT, staging_bucket=_TEST_GCS_BUCKET_NAME, @@ -392,12 +395,19 @@ def test_list_pipeline_job(self, mock_pipeline_service_list): ) @pytest.mark.usefixtures( - "mock_pipeline_service_create", - "mock_pipeline_service_get", - "mock_load_pipeline_job_json", + "mock_pipeline_service_create", "mock_pipeline_service_get", + ) + @pytest.mark.parametrize( + "job_spec_json", + [ + _TEST_PIPELINE_SPEC, + _TEST_PIPELINE_JOB, + _TEST_PIPELINE_SPEC_LEGACY, + _TEST_PIPELINE_JOB_LEGACY, + ], ) def test_cancel_pipeline_job_without_running( - self, mock_pipeline_service_cancel, + self, mock_pipeline_service_cancel, mock_load_json, ): aiplatform.init( project=_TEST_PROJECT, @@ -417,12 +427,19 @@ def test_cancel_pipeline_job_without_running( assert e.match(regexp=r"PipelineJob resource has not been created") @pytest.mark.usefixtures( - "mock_pipeline_service_create", - "mock_pipeline_service_get_with_fail", - "mock_load_pipeline_job_json", + "mock_pipeline_service_create", "mock_pipeline_service_get_with_fail", + ) + @pytest.mark.parametrize( + "job_spec_json", + [ + _TEST_PIPELINE_SPEC, + _TEST_PIPELINE_JOB, + _TEST_PIPELINE_SPEC_LEGACY, + _TEST_PIPELINE_JOB_LEGACY, + ], ) @pytest.mark.parametrize("sync", [True, False]) - def test_pipeline_failure_raises(self, sync): + def test_pipeline_failure_raises(self, mock_load_json, sync): aiplatform.init( project=_TEST_PROJECT, staging_bucket=_TEST_GCS_BUCKET_NAME, From 3d17409e54e5d28f9132f90562bdec20b040b544 Mon Sep 17 00:00:00 2001 From: Yaqi Date: Tue, 26 Oct 2021 17:25:18 -0700 Subject: [PATCH 2/8] fix tests --- tests/unit/aiplatform/test_pipeline_jobs.py | 2 +- tests/unit/aiplatform/test_utils.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/unit/aiplatform/test_pipeline_jobs.py b/tests/unit/aiplatform/test_pipeline_jobs.py index 89e167886f..77a2856f60 100644 --- a/tests/unit/aiplatform/test_pipeline_jobs.py +++ b/tests/unit/aiplatform/test_pipeline_jobs.py @@ -289,7 +289,7 @@ def test_run_call_pipeline_service_create( job.wait() expected_runtime_config_dict = { - "gcs_output_directory": _TEST_GCS_BUCKET_NAME, + "gcsOutputDirectory": _TEST_GCS_BUCKET_NAME, "parameters": {"string_param": {"stringValue": "hello"}}, } runtime_config = gca_pipeline_job_v1beta1.PipelineJob.RuntimeConfig()._pb diff --git a/tests/unit/aiplatform/test_utils.py b/tests/unit/aiplatform/test_utils.py index 418014ee45..34bba22819 100644 --- a/tests/unit/aiplatform/test_utils.py +++ b/tests/unit/aiplatform/test_utils.py @@ -378,7 +378,7 @@ class TestPipelineUtils: } }, "runtimeConfig": { - "gcs_output_directory": "path/to/my/root", + "gcsOutputDirectory": "path/to/my/root", "parameters": { "string_param": {"stringValue": "test-string"}, "int_param": {"intValue": 42}, @@ -444,7 +444,7 @@ def test_pipeline_utils_runtime_config_builder_with_merge_updates(self): actual_runtime_config = my_builder.build() expected_runtime_config = { - "gcs_output_directory": "path/to/my/new/root", + "gcsOutputDirectory": "path/to/my/new/root", "parameters": { "string_param": {"stringValue": "test-string"}, "int_param": {"intValue": 888}, From 31d2ba71820ca852f7c632b63a3225abc7a1230b Mon Sep 17 00:00:00 2001 From: Yaqi Date: Wed, 27 Oct 2021 11:25:53 -0700 Subject: [PATCH 3/8] address comments --- google/cloud/aiplatform/utils/pipeline_utils.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/google/cloud/aiplatform/utils/pipeline_utils.py b/google/cloud/aiplatform/utils/pipeline_utils.py index 39c178f165..a840150527 100644 --- a/google/cloud/aiplatform/utils/pipeline_utils.py +++ b/google/cloud/aiplatform/utils/pipeline_utils.py @@ -66,7 +66,7 @@ def from_job_spec_json( ) # 'type' is deprecated in IR and change to 'parameterType'. parameter_types = { - k: v.get("type") or v.get("parameterType") + k: v.get("parameterType") or v.get("type") for k, v in parameter_input_definitions.items() } @@ -179,6 +179,9 @@ def _parse_runtime_parameters( TypeError: if the parameter type is not one of 'INT', 'DOUBLE', 'STRING'. """ # 'parameters' are deprecated in IR and changed to 'parameterValues'. + if runtime_config_spec.get("parameterValues") is not None: + return runtime_config_spec.get("parameterValues") + if runtime_config_spec.get("parameters") is not None: result = {} for name, value in runtime_config_spec.get("parameters").items(): @@ -191,4 +194,3 @@ def _parse_runtime_parameters( else: raise TypeError("Got unknown type of value: {}".format(value)) return result - return runtime_config_spec.get("parameterValues") From 15ad9c330d170465cd6e483892eea2c5a83ea703 Mon Sep 17 00:00:00 2001 From: Yaqi Date: Tue, 2 Nov 2021 15:49:04 -0700 Subject: [PATCH 4/8] fix: update Pipeline Job parameter values according to schema_version --- .../cloud/aiplatform/utils/pipeline_utils.py | 23 +++++--- tests/unit/aiplatform/test_pipeline_jobs.py | 58 +++++++++---------- 2 files changed, 44 insertions(+), 37 deletions(-) diff --git a/google/cloud/aiplatform/utils/pipeline_utils.py b/google/cloud/aiplatform/utils/pipeline_utils.py index a840150527..11c4c2b7fa 100644 --- a/google/cloud/aiplatform/utils/pipeline_utils.py +++ b/google/cloud/aiplatform/utils/pipeline_utils.py @@ -17,7 +17,7 @@ import copy import json from typing import Any, Dict, Mapping, Optional, Union - +import packaging.version class PipelineRuntimeConfigBuilder(object): """Pipeline RuntimeConfig builder. @@ -30,6 +30,7 @@ def __init__( pipeline_root: str, parameter_types: Mapping[str, str], parameter_values: Optional[Dict[str, Any]] = None, + schema_version: Optional[str] = None, ): """Creates a PipelineRuntimeConfigBuilder object. @@ -40,10 +41,13 @@ def __init__( Required. The mapping from pipeline parameter name to its type. parameter_values (Dict[str, Any]): Optional. The mapping from runtime parameter name to its value. + schema_version (str): + Optional. Schema version of the IR. This field determines the fields supported in current version of IR. """ self._pipeline_root = pipeline_root self._parameter_types = parameter_types self._parameter_values = copy.deepcopy(parameter_values or {}) + self._schema_version = schema_version or '2.0.0' @classmethod def from_job_spec_json( @@ -59,11 +63,10 @@ def from_job_spec_json( A PipelineRuntimeConfigBuilder object. """ runtime_config_spec = job_spec["runtimeConfig"] - parameter_input_definitions = ( - job_spec["pipelineSpec"]["root"] - .get("inputDefinitions", {}) - .get("parameters", {}) - ) + input_definitions = job_spec["pipelineSpec"]["root"].get("inputDefinitions") or {} + parameter_input_definitions = input_definitions.get("parameter_values") or input_definitions.get("parameters") or {} + schema_version = job_spec.get('schemaVersion') + # 'type' is deprecated in IR and change to 'parameterType'. parameter_types = { k: v.get("parameterType") or v.get("type") @@ -72,7 +75,7 @@ def from_job_spec_json( pipeline_root = runtime_config_spec.get("gcsOutputDirectory") parameter_values = _parse_runtime_parameters(runtime_config_spec) - return cls(pipeline_root, parameter_types, parameter_values) + return cls(pipeline_root, parameter_types, parameter_values, schema_version) def update_pipeline_root(self, pipeline_root: Optional[str]) -> None: """Updates pipeline_root value. @@ -111,9 +114,13 @@ def build(self) -> Dict[str, Any]: "Pipeline root must be specified, either during " "compile time, or when calling the service." ) + if packaging.version.parse(self._schema_version) >= packaging.version.parse("2.1.0"): + parameter_values_key = 'parameter_values' + else: + parameter_values_key = 'parameters' return { "gcsOutputDirectory": self._pipeline_root, - "parameters": { + parameter_values_key: { k: self._get_vertex_value(k, v) for k, v in self._parameter_values.items() if v is not None diff --git a/tests/unit/aiplatform/test_pipeline_jobs.py b/tests/unit/aiplatform/test_pipeline_jobs.py index 00e39caf26..55063d7850 100644 --- a/tests/unit/aiplatform/test_pipeline_jobs.py +++ b/tests/unit/aiplatform/test_pipeline_jobs.py @@ -60,6 +60,7 @@ "dag": {"tasks": {}}, "inputDefinitions": {"parameters": {"string_param": {"type": "STRING"}}}, }, + "schema_version": "2.0.0", "components": {}, } _TEST_PIPELINE_SPEC = { @@ -67,30 +68,30 @@ "root": { "dag": {"tasks": {}}, "inputDefinitions": { - "parameters": { + "parameter_values": { "string_param": {"parameterType": "STRING"}, - # uncomment when GAPIC library change for protobufValue is in - # "bool_param": { - # "parameterType": "BOOLEAN" - # }, - # "double_param": { - # "parameterType": "NUMBER_DOUBLE" - # }, - # "int_param": { - # "parameterType": "NUMBER_INTEGER" - # }, - # "list_int_param": { - # "parameterType": "LIST" - # }, - # "list_string_param": { - # "parameterType": "LIST" - # }, - # "struct_param": { - # "parameterType": "STRUCT" - # } + "bool_param": { + "parameterType": "BOOLEAN" + }, + "double_param": { + "parameterType": "NUMBER_DOUBLE" + }, + "int_param": { + "parameterType": "NUMBER_INTEGER" + }, + "list_int_param": { + "parameterType": "LIST" + }, + "list_string_param": { + "parameterType": "LIST" + }, + "struct_param": { + "parameterType": "STRUCT" + } } }, }, + "schema_version":"2.1.0", "components": {}, } @@ -103,13 +104,12 @@ "runtimeConfig": { "parameterValues": { "string_param": "lorem ipsum", - # uncomment when GAPIC library change for protobufValue is in - # "bool_param": True, - # "double_param": 12.34, - # "int_param": 5678, - # "list_int_param": [123, 456, 789], - # "list_string_param": ["lorem", "ipsum"], - # "struct_param": { "key1": 12345, "key2": 67890} + "bool_param": True, + "double_param": 12.34, + "int_param": 5678, + "list_int_param": [123, 456, 789], + "list_string_param": ["lorem", "ipsum"], + "struct_param": { "key1": 12345, "key2": 67890} }, }, "pipelineSpec": _TEST_PIPELINE_SPEC, @@ -291,7 +291,7 @@ def test_run_call_pipeline_service_create( expected_runtime_config_dict = { "gcsOutputDirectory": _TEST_GCS_BUCKET_NAME, - "parameters": {"string_param": {"stringValue": "hello"}}, + "parameter_values": {"string_param": {"stringValue": "hello"}}, } runtime_config = gca_pipeline_job_v1beta1.PipelineJob.RuntimeConfig()._pb json_format.ParseDict(expected_runtime_config_dict, runtime_config) @@ -360,7 +360,7 @@ def test_submit_call_pipeline_service_pipeline_job_create( expected_runtime_config_dict = { "gcs_output_directory": _TEST_GCS_BUCKET_NAME, - "parameters": {"string_param": {"stringValue": "hello"}}, + "parameter_values": {"string_param": {"stringValue": "hello"}}, } runtime_config = gca_pipeline_job_v1beta1.PipelineJob.RuntimeConfig()._pb json_format.ParseDict(expected_runtime_config_dict, runtime_config) From 8f50d38a73c1c599913773623478ef1bb2d132bb Mon Sep 17 00:00:00 2001 From: Yaqi Date: Wed, 3 Nov 2021 15:35:22 -0700 Subject: [PATCH 5/8] fix test --- .../cloud/aiplatform/utils/pipeline_utils.py | 33 ++- tests/unit/aiplatform/test_pipeline_jobs.py | 232 ++++++++++++++---- 2 files changed, 206 insertions(+), 59 deletions(-) diff --git a/google/cloud/aiplatform/utils/pipeline_utils.py b/google/cloud/aiplatform/utils/pipeline_utils.py index 11c4c2b7fa..f4d6a674cf 100644 --- a/google/cloud/aiplatform/utils/pipeline_utils.py +++ b/google/cloud/aiplatform/utils/pipeline_utils.py @@ -19,6 +19,7 @@ from typing import Any, Dict, Mapping, Optional, Union import packaging.version + class PipelineRuntimeConfigBuilder(object): """Pipeline RuntimeConfig builder. @@ -47,7 +48,7 @@ def __init__( self._pipeline_root = pipeline_root self._parameter_types = parameter_types self._parameter_values = copy.deepcopy(parameter_values or {}) - self._schema_version = schema_version or '2.0.0' + self._schema_version = schema_version or "2.0.0" @classmethod def from_job_spec_json( @@ -63,9 +64,15 @@ def from_job_spec_json( A PipelineRuntimeConfigBuilder object. """ runtime_config_spec = job_spec["runtimeConfig"] - input_definitions = job_spec["pipelineSpec"]["root"].get("inputDefinitions") or {} - parameter_input_definitions = input_definitions.get("parameter_values") or input_definitions.get("parameters") or {} - schema_version = job_spec.get('schemaVersion') + input_definitions = ( + job_spec["pipelineSpec"]["root"].get("inputDefinitions") or {} + ) + parameter_input_definitions = ( + input_definitions.get("parameterValues") + or input_definitions.get("parameters") + or {} + ) + schema_version = job_spec["pipelineSpec"].get("schemaVersion") # 'type' is deprecated in IR and change to 'parameterType'. parameter_types = { @@ -98,9 +105,12 @@ def update_runtime_parameters( """ if parameter_values: parameters = dict(parameter_values) - for k, v in parameter_values.items(): - if isinstance(v, (dict, list, bool)): - parameters[k] = json.dumps(v) + if packaging.version.parse(self._schema_version) <= packaging.version.parse( + "2.0.0" + ): + for k, v in parameter_values.items(): + if isinstance(v, (dict, list, bool)): + parameters[k] = json.dumps(v) self._parameter_values.update(parameters) def build(self) -> Dict[str, Any]: @@ -114,10 +124,12 @@ def build(self) -> Dict[str, Any]: "Pipeline root must be specified, either during " "compile time, or when calling the service." ) - if packaging.version.parse(self._schema_version) >= packaging.version.parse("2.1.0"): - parameter_values_key = 'parameter_values' + if packaging.version.parse(self._schema_version) > packaging.version.parse( + "2.0.0" + ): + parameter_values_key = "parameterValues" else: - parameter_values_key = 'parameters' + parameter_values_key = "parameters" return { "gcsOutputDirectory": self._pipeline_root, parameter_values_key: { @@ -173,7 +185,6 @@ def _get_vertex_value( result["structValue"] = value else: raise TypeError("Got unknown type of value: {}".format(value)) - return result diff --git a/tests/unit/aiplatform/test_pipeline_jobs.py b/tests/unit/aiplatform/test_pipeline_jobs.py index 8aa05cbca2..e2ba8c62bb 100644 --- a/tests/unit/aiplatform/test_pipeline_jobs.py +++ b/tests/unit/aiplatform/test_pipeline_jobs.py @@ -53,14 +53,24 @@ _TEST_PIPELINE_JOB_NAME = f"projects/{_TEST_PROJECT}/locations/{_TEST_LOCATION}/pipelineJobs/{_TEST_PIPELINE_JOB_ID}" -_TEST_PIPELINE_PARAMETER_VALUES = {"string_param": "hello"} +_TEST_PIPELINE_PARAMETER_VALUES_LEGACY = {"string_param": "hello"} +_TEST_PIPELINE_PARAMETER_VALUES = { + "string_param": "hello world", + "bool_param": True, + "double_param": 12.34, + "int_param": 5678, + "list_int_param": [123, 456, 789], + "list_string_param": ["lorem", "ipsum"], + "struct_param": {"key1": 12345, "key2": 67890}, +} + _TEST_PIPELINE_SPEC_LEGACY = { "pipelineInfo": {"name": "my-pipeline"}, "root": { "dag": {"tasks": {}}, "inputDefinitions": {"parameters": {"string_param": {"type": "STRING"}}}, }, - "schema_version": "2.0.0", + "schemaVersion": "2.0.0", "components": {}, } _TEST_PIPELINE_SPEC = { @@ -68,30 +78,18 @@ "root": { "dag": {"tasks": {}}, "inputDefinitions": { - "parameter_values": { + "parameterValues": { "string_param": {"parameterType": "STRING"}, - "bool_param": { - "parameterType": "BOOLEAN" - }, - "double_param": { - "parameterType": "NUMBER_DOUBLE" - }, - "int_param": { - "parameterType": "NUMBER_INTEGER" - }, - "list_int_param": { - "parameterType": "LIST" - }, - "list_string_param": { - "parameterType": "LIST" - }, - "struct_param": { - "parameterType": "STRUCT" - } + "bool_param": {"parameterType": "BOOLEAN"}, + "double_param": {"parameterType": "NUMBER_DOUBLE"}, + "int_param": {"parameterType": "NUMBER_INTEGER"}, + "list_int_param": {"parameterType": "LIST"}, + "list_string_param": {"parameterType": "LIST"}, + "struct_param": {"parameterType": "STRUCT"}, } }, }, - "schema_version":"2.1.0", + "schemaVersion": "2.1.0", "components": {}, } @@ -99,7 +97,6 @@ "runtimeConfig": {}, "pipelineSpec": _TEST_PIPELINE_SPEC_LEGACY, } - _TEST_PIPELINE_JOB = { "runtimeConfig": { "parameterValues": { @@ -109,7 +106,7 @@ "int_param": 5678, "list_int_param": [123, 456, 789], "list_string_param": ["lorem", "ipsum"], - "struct_param": { "key1": 12345, "key2": 67890} + "struct_param": {"key1": 12345, "key2": 67890}, }, }, "pipelineSpec": _TEST_PIPELINE_SPEC, @@ -250,13 +247,7 @@ def teardown_method(self): initializer.global_pool.shutdown(wait=True) @pytest.mark.parametrize( - "job_spec_json", - [ - _TEST_PIPELINE_SPEC, - _TEST_PIPELINE_JOB, - _TEST_PIPELINE_SPEC_LEGACY, - _TEST_PIPELINE_JOB_LEGACY, - ], + "job_spec_json", [_TEST_PIPELINE_SPEC, _TEST_PIPELINE_JOB], ) @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_create( @@ -291,7 +282,15 @@ def test_run_call_pipeline_service_create( expected_runtime_config_dict = { "gcsOutputDirectory": _TEST_GCS_BUCKET_NAME, - "parameter_values": {"string_param": {"stringValue": "hello"}}, + "parameterValues": { + "bool_param": {"boolValue": True}, + "double_param": {"numberValue": 12.34}, + "int_param": {"numberValue": 5678}, + "list_int_param": {"listValue": [123, 456, 789]}, + "list_string_param": {"listValue": ["lorem", "ipsum"]}, + "struct_param": {"structValue": {"key1": 12345, "key2": 67890}}, + "string_param": {"stringValue": "hello world"}, + }, } runtime_config = gca_pipeline_job_v1.PipelineJob.RuntimeConfig()._pb json_format.ParseDict(expected_runtime_config_dict, runtime_config) @@ -305,6 +304,7 @@ def test_run_call_pipeline_service_create( "components": {}, "pipelineInfo": pipeline_spec["pipelineInfo"], "root": pipeline_spec["root"], + "schemaVersion": "2.1.0", }, runtime_config=runtime_config, service_account=_TEST_SERVICE_ACCOUNT, @@ -326,13 +326,78 @@ def test_run_call_pipeline_service_create( ) @pytest.mark.parametrize( - "job_spec_json", - [ - _TEST_PIPELINE_SPEC, - _TEST_PIPELINE_JOB, - _TEST_PIPELINE_SPEC_LEGACY, - _TEST_PIPELINE_JOB_LEGACY, - ], + "job_spec_json", [_TEST_PIPELINE_SPEC_LEGACY, _TEST_PIPELINE_JOB_LEGACY], + ) + @pytest.mark.parametrize("sync", [True, False]) + def test_run_call_pipeline_service_create_legacy( + self, + mock_pipeline_service_create, + mock_pipeline_service_get, + job_spec_json, + mock_load_json, + sync, + ): + aiplatform.init( + project=_TEST_PROJECT, + staging_bucket=_TEST_GCS_BUCKET_NAME, + location=_TEST_LOCATION, + credentials=_TEST_CREDENTIALS, + ) + + job = pipeline_jobs.PipelineJob( + display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME, + template_path=_TEST_TEMPLATE_PATH, + job_id=_TEST_PIPELINE_JOB_ID, + parameter_values=_TEST_PIPELINE_PARAMETER_VALUES_LEGACY, + enable_caching=True, + ) + + job.run( + service_account=_TEST_SERVICE_ACCOUNT, network=_TEST_NETWORK, sync=sync, + ) + + if not sync: + job.wait() + + expected_runtime_config_dict = { + "gcsOutputDirectory": _TEST_GCS_BUCKET_NAME, + "parameters": {"string_param": {"stringValue": "hello"}}, + } + runtime_config = gca_pipeline_job_v1.PipelineJob.RuntimeConfig()._pb + json_format.ParseDict(expected_runtime_config_dict, runtime_config) + + pipeline_spec = job_spec_json.get("pipelineSpec") or job_spec_json + + # Construct expected request + expected_gapic_pipeline_job = gca_pipeline_job_v1.PipelineJob( + display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME, + pipeline_spec={ + "components": {}, + "pipelineInfo": pipeline_spec["pipelineInfo"], + "root": pipeline_spec["root"], + "schemaVersion": "2.0.0", + }, + runtime_config=runtime_config, + service_account=_TEST_SERVICE_ACCOUNT, + network=_TEST_NETWORK, + ) + + mock_pipeline_service_create.assert_called_once_with( + parent=_TEST_PARENT, + pipeline_job=expected_gapic_pipeline_job, + pipeline_job_id=_TEST_PIPELINE_JOB_ID, + ) + + mock_pipeline_service_get.assert_called_with( + name=_TEST_PIPELINE_JOB_NAME, retry=base._DEFAULT_RETRY + ) + + assert job._gca_resource == make_pipeline_job( + gca_pipeline_state_v1.PipelineState.PIPELINE_STATE_SUCCEEDED + ) + + @pytest.mark.parametrize( + "job_spec_json", [_TEST_PIPELINE_SPEC, _TEST_PIPELINE_JOB], ) def test_submit_call_pipeline_service_pipeline_job_create( self, @@ -359,8 +424,84 @@ def test_submit_call_pipeline_service_pipeline_job_create( job.submit(service_account=_TEST_SERVICE_ACCOUNT, network=_TEST_NETWORK) expected_runtime_config_dict = { - "gcs_output_directory": _TEST_GCS_BUCKET_NAME, - "parameter_values": {"string_param": {"stringValue": "hello"}}, + "gcsOutputDirectory": _TEST_GCS_BUCKET_NAME, + "parameterValues": { + "bool_param": {"boolValue": True}, + "double_param": {"numberValue": 12.34}, + "int_param": {"numberValue": 5678}, + "list_int_param": {"listValue": [123, 456, 789]}, + "list_string_param": {"listValue": ["lorem", "ipsum"]}, + "struct_param": {"structValue": {"key1": 12345, "key2": 67890}}, + "string_param": {"stringValue": "hello world"}, + }, + } + runtime_config = gca_pipeline_job_v1.PipelineJob.RuntimeConfig()._pb + json_format.ParseDict(expected_runtime_config_dict, runtime_config) + + pipeline_spec = job_spec_json.get("pipelineSpec") or job_spec_json + + # Construct expected request + expected_gapic_pipeline_job = gca_pipeline_job_v1.PipelineJob( + display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME, + pipeline_spec={ + "components": {}, + "pipelineInfo": pipeline_spec["pipelineInfo"], + "root": pipeline_spec["root"], + "schemaVersion": "2.1.0", + }, + runtime_config=runtime_config, + service_account=_TEST_SERVICE_ACCOUNT, + network=_TEST_NETWORK, + ) + + mock_pipeline_service_create.assert_called_once_with( + parent=_TEST_PARENT, + pipeline_job=expected_gapic_pipeline_job, + pipeline_job_id=_TEST_PIPELINE_JOB_ID, + ) + + assert not mock_pipeline_service_get.called + + job.wait() + + mock_pipeline_service_get.assert_called_with( + name=_TEST_PIPELINE_JOB_NAME, retry=base._DEFAULT_RETRY + ) + + assert job._gca_resource == make_pipeline_job( + gca_pipeline_state_v1.PipelineState.PIPELINE_STATE_SUCCEEDED + ) + + @pytest.mark.parametrize( + "job_spec_json", [_TEST_PIPELINE_SPEC_LEGACY, _TEST_PIPELINE_JOB_LEGACY], + ) + def test_submit_call_pipeline_service_pipeline_job_create_legacy( + self, + mock_pipeline_service_create, + mock_pipeline_service_get, + job_spec_json, + mock_load_json, + ): + aiplatform.init( + project=_TEST_PROJECT, + staging_bucket=_TEST_GCS_BUCKET_NAME, + location=_TEST_LOCATION, + credentials=_TEST_CREDENTIALS, + ) + + job = pipeline_jobs.PipelineJob( + display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME, + template_path=_TEST_TEMPLATE_PATH, + job_id=_TEST_PIPELINE_JOB_ID, + parameter_values=_TEST_PIPELINE_PARAMETER_VALUES_LEGACY, + enable_caching=True, + ) + + job.submit(service_account=_TEST_SERVICE_ACCOUNT, network=_TEST_NETWORK) + + expected_runtime_config_dict = { + "parameters": {"string_param": {"stringValue": "hello"}}, + "gcsOutputDirectory": _TEST_GCS_BUCKET_NAME, } runtime_config = gca_pipeline_job_v1.PipelineJob.RuntimeConfig()._pb json_format.ParseDict(expected_runtime_config_dict, runtime_config) @@ -374,6 +515,7 @@ def test_submit_call_pipeline_service_pipeline_job_create( "components": {}, "pipelineInfo": pipeline_spec["pipelineInfo"], "root": pipeline_spec["root"], + "schemaVersion": "2.0.0", }, runtime_config=runtime_config, service_account=_TEST_SERVICE_ACCOUNT, @@ -508,13 +650,7 @@ def test_cancel_pipeline_job_without_running( "mock_pipeline_service_create", "mock_pipeline_service_get_with_fail", ) @pytest.mark.parametrize( - "job_spec_json", - [ - _TEST_PIPELINE_SPEC, - _TEST_PIPELINE_JOB, - _TEST_PIPELINE_SPEC_LEGACY, - _TEST_PIPELINE_JOB_LEGACY, - ], + "job_spec_json", [_TEST_PIPELINE_SPEC, _TEST_PIPELINE_JOB], ) @pytest.mark.parametrize("sync", [True, False]) def test_pipeline_failure_raises(self, mock_load_json, sync): From 3581bf189b62deef42599b3156a856350717fae2 Mon Sep 17 00:00:00 2001 From: Yaqi Date: Thu, 4 Nov 2021 11:37:48 -0700 Subject: [PATCH 6/8] fix key to parameters --- .../cloud/aiplatform/utils/pipeline_utils.py | 56 ++++++++----------- tests/unit/aiplatform/test_pipeline_jobs.py | 34 ++--------- 2 files changed, 28 insertions(+), 62 deletions(-) diff --git a/google/cloud/aiplatform/utils/pipeline_utils.py b/google/cloud/aiplatform/utils/pipeline_utils.py index f4d6a674cf..506dbdff26 100644 --- a/google/cloud/aiplatform/utils/pipeline_utils.py +++ b/google/cloud/aiplatform/utils/pipeline_utils.py @@ -29,26 +29,26 @@ class PipelineRuntimeConfigBuilder(object): def __init__( self, pipeline_root: str, + schema_version: str, parameter_types: Mapping[str, str], parameter_values: Optional[Dict[str, Any]] = None, - schema_version: Optional[str] = None, ): """Creates a PipelineRuntimeConfigBuilder object. Args: pipeline_root (str): Required. The root of the pipeline outputs. + schema_version (str): + Required. Schema version of the IR. This field determines the fields supported in current version of IR. parameter_types (Mapping[str, str]): Required. The mapping from pipeline parameter name to its type. parameter_values (Dict[str, Any]): Optional. The mapping from runtime parameter name to its value. - schema_version (str): - Optional. Schema version of the IR. This field determines the fields supported in current version of IR. """ self._pipeline_root = pipeline_root + self._schema_version = schema_version self._parameter_types = parameter_types self._parameter_values = copy.deepcopy(parameter_values or {}) - self._schema_version = schema_version or "2.0.0" @classmethod def from_job_spec_json( @@ -64,15 +64,12 @@ def from_job_spec_json( A PipelineRuntimeConfigBuilder object. """ runtime_config_spec = job_spec["runtimeConfig"] - input_definitions = ( - job_spec["pipelineSpec"]["root"].get("inputDefinitions") or {} - ) parameter_input_definitions = ( - input_definitions.get("parameterValues") - or input_definitions.get("parameters") - or {} + job_spec["pipelineSpec"]["root"] + .get("inputDefinitions", {}) + .get("parameters", {}) ) - schema_version = job_spec["pipelineSpec"].get("schemaVersion") + schema_version = job_spec["pipelineSpec"]["schemaVersion"] # 'type' is deprecated in IR and change to 'parameterType'. parameter_types = { @@ -82,7 +79,7 @@ def from_job_spec_json( pipeline_root = runtime_config_spec.get("gcsOutputDirectory") parameter_values = _parse_runtime_parameters(runtime_config_spec) - return cls(pipeline_root, parameter_types, parameter_values, schema_version) + return cls(pipeline_root, schema_version, parameter_types, parameter_values) def update_pipeline_root(self, pipeline_root: Optional[str]) -> None: """Updates pipeline_root value. @@ -141,7 +138,7 @@ def build(self) -> Dict[str, Any]: def _get_vertex_value( self, name: str, value: Union[int, float, str, bool, list, dict] - ) -> Dict[str, Any]: + ) -> Union[Dict[str, Any], int, float, str, bool, list, dict]: """Converts primitive values into Vertex pipeline Value proto message. Args: @@ -166,26 +163,21 @@ def _get_vertex_value( "pipeline job input definitions.".format(name) ) - result = {} - if self._parameter_types[name] == "INT": - result["intValue"] = value - elif self._parameter_types[name] == "DOUBLE": - result["doubleValue"] = value - elif self._parameter_types[name] == "STRING": - result["stringValue"] = value - elif self._parameter_types[name] == "BOOLEAN": - result["boolValue"] = value - elif self._parameter_types[name] == "NUMBER_DOUBLE": - result["numberValue"] = value - elif self._parameter_types[name] == "NUMBER_INTEGER": - result["numberValue"] = value - elif self._parameter_types[name] == "LIST": - result["listValue"] = value - elif self._parameter_types[name] == "STRUCT": - result["structValue"] = value + if packaging.version.parse(self._schema_version) <= packaging.version.parse( + "2.0.0" + ): + result = {} + if self._parameter_types[name] == "INT": + result["intValue"] = value + elif self._parameter_types[name] == "DOUBLE": + result["doubleValue"] = value + elif self._parameter_types[name] == "STRING": + result["stringValue"] = value + else: + raise TypeError("Got unknown type of value: {}".format(value)) + return result else: - raise TypeError("Got unknown type of value: {}".format(value)) - return result + return value def _parse_runtime_parameters( diff --git a/tests/unit/aiplatform/test_pipeline_jobs.py b/tests/unit/aiplatform/test_pipeline_jobs.py index e2ba8c62bb..81f14a7ead 100644 --- a/tests/unit/aiplatform/test_pipeline_jobs.py +++ b/tests/unit/aiplatform/test_pipeline_jobs.py @@ -78,7 +78,7 @@ "root": { "dag": {"tasks": {}}, "inputDefinitions": { - "parameterValues": { + "parameters": { "string_param": {"parameterType": "STRING"}, "bool_param": {"parameterType": "BOOLEAN"}, "double_param": {"parameterType": "NUMBER_DOUBLE"}, @@ -98,17 +98,7 @@ "pipelineSpec": _TEST_PIPELINE_SPEC_LEGACY, } _TEST_PIPELINE_JOB = { - "runtimeConfig": { - "parameterValues": { - "string_param": "lorem ipsum", - "bool_param": True, - "double_param": 12.34, - "int_param": 5678, - "list_int_param": [123, 456, 789], - "list_string_param": ["lorem", "ipsum"], - "struct_param": {"key1": 12345, "key2": 67890}, - }, - }, + "runtimeConfig": {"parameterValues": _TEST_PIPELINE_PARAMETER_VALUES}, "pipelineSpec": _TEST_PIPELINE_SPEC, } @@ -282,15 +272,7 @@ def test_run_call_pipeline_service_create( expected_runtime_config_dict = { "gcsOutputDirectory": _TEST_GCS_BUCKET_NAME, - "parameterValues": { - "bool_param": {"boolValue": True}, - "double_param": {"numberValue": 12.34}, - "int_param": {"numberValue": 5678}, - "list_int_param": {"listValue": [123, 456, 789]}, - "list_string_param": {"listValue": ["lorem", "ipsum"]}, - "struct_param": {"structValue": {"key1": 12345, "key2": 67890}}, - "string_param": {"stringValue": "hello world"}, - }, + "parameterValues": _TEST_PIPELINE_PARAMETER_VALUES, } runtime_config = gca_pipeline_job_v1.PipelineJob.RuntimeConfig()._pb json_format.ParseDict(expected_runtime_config_dict, runtime_config) @@ -425,15 +407,7 @@ def test_submit_call_pipeline_service_pipeline_job_create( expected_runtime_config_dict = { "gcsOutputDirectory": _TEST_GCS_BUCKET_NAME, - "parameterValues": { - "bool_param": {"boolValue": True}, - "double_param": {"numberValue": 12.34}, - "int_param": {"numberValue": 5678}, - "list_int_param": {"listValue": [123, 456, 789]}, - "list_string_param": {"listValue": ["lorem", "ipsum"]}, - "struct_param": {"structValue": {"key1": 12345, "key2": 67890}}, - "string_param": {"stringValue": "hello world"}, - }, + "parameterValues": _TEST_PIPELINE_PARAMETER_VALUES, } runtime_config = gca_pipeline_job_v1.PipelineJob.RuntimeConfig()._pb json_format.ParseDict(expected_runtime_config_dict, runtime_config) From a88a618b054623a615853b877f0e0d9577d40cc4 Mon Sep 17 00:00:00 2001 From: Yaqi Date: Mon, 8 Nov 2021 12:30:34 -0800 Subject: [PATCH 7/8] fix format' --- tests/unit/aiplatform/test_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/aiplatform/test_utils.py b/tests/unit/aiplatform/test_utils.py index cb4976f07d..928b01a889 100644 --- a/tests/unit/aiplatform/test_utils.py +++ b/tests/unit/aiplatform/test_utils.py @@ -376,7 +376,7 @@ class TestPipelineUtils: } } }, - "schemaVersion":"2.0.0", + "schemaVersion": "2.0.0", }, "runtimeConfig": { "gcsOutputDirectory": "path/to/my/root", From df9c883734169920f15ec80efcd966846bfcef89 Mon Sep 17 00:00:00 2001 From: Yaqi Date: Mon, 15 Nov 2021 11:29:43 -0800 Subject: [PATCH 8/8] address comments --- google/cloud/aiplatform/utils/pipeline_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/aiplatform/utils/pipeline_utils.py b/google/cloud/aiplatform/utils/pipeline_utils.py index 506dbdff26..5a09a98f18 100644 --- a/google/cloud/aiplatform/utils/pipeline_utils.py +++ b/google/cloud/aiplatform/utils/pipeline_utils.py @@ -138,7 +138,7 @@ def build(self) -> Dict[str, Any]: def _get_vertex_value( self, name: str, value: Union[int, float, str, bool, list, dict] - ) -> Union[Dict[str, Any], int, float, str, bool, list, dict]: + ) -> Union[int, float, str, bool, list, dict]: """Converts primitive values into Vertex pipeline Value proto message. Args: