diff --git a/google/cloud/aiplatform/utils/pipeline_utils.py b/google/cloud/aiplatform/utils/pipeline_utils.py index a840150527..5a09a98f18 100644 --- a/google/cloud/aiplatform/utils/pipeline_utils.py +++ b/google/cloud/aiplatform/utils/pipeline_utils.py @@ -17,6 +17,7 @@ import copy import json from typing import Any, Dict, Mapping, Optional, Union +import packaging.version class PipelineRuntimeConfigBuilder(object): @@ -28,6 +29,7 @@ class PipelineRuntimeConfigBuilder(object): def __init__( self, pipeline_root: str, + schema_version: str, parameter_types: Mapping[str, str], parameter_values: Optional[Dict[str, Any]] = None, ): @@ -36,12 +38,15 @@ def __init__( Args: pipeline_root (str): Required. The root of the pipeline outputs. + schema_version (str): + Required. Schema version of the IR. This field determines the fields supported in current version of IR. parameter_types (Mapping[str, str]): Required. The mapping from pipeline parameter name to its type. parameter_values (Dict[str, Any]): Optional. The mapping from runtime parameter name to its value. """ self._pipeline_root = pipeline_root + self._schema_version = schema_version self._parameter_types = parameter_types self._parameter_values = copy.deepcopy(parameter_values or {}) @@ -64,6 +69,8 @@ def from_job_spec_json( .get("inputDefinitions", {}) .get("parameters", {}) ) + schema_version = job_spec["pipelineSpec"]["schemaVersion"] + # 'type' is deprecated in IR and change to 'parameterType'. parameter_types = { k: v.get("parameterType") or v.get("type") @@ -72,7 +79,7 @@ def from_job_spec_json( pipeline_root = runtime_config_spec.get("gcsOutputDirectory") parameter_values = _parse_runtime_parameters(runtime_config_spec) - return cls(pipeline_root, parameter_types, parameter_values) + return cls(pipeline_root, schema_version, parameter_types, parameter_values) def update_pipeline_root(self, pipeline_root: Optional[str]) -> None: """Updates pipeline_root value. @@ -95,9 +102,12 @@ def update_runtime_parameters( """ if parameter_values: parameters = dict(parameter_values) - for k, v in parameter_values.items(): - if isinstance(v, (dict, list, bool)): - parameters[k] = json.dumps(v) + if packaging.version.parse(self._schema_version) <= packaging.version.parse( + "2.0.0" + ): + for k, v in parameter_values.items(): + if isinstance(v, (dict, list, bool)): + parameters[k] = json.dumps(v) self._parameter_values.update(parameters) def build(self) -> Dict[str, Any]: @@ -111,9 +121,15 @@ def build(self) -> Dict[str, Any]: "Pipeline root must be specified, either during " "compile time, or when calling the service." ) + if packaging.version.parse(self._schema_version) > packaging.version.parse( + "2.0.0" + ): + parameter_values_key = "parameterValues" + else: + parameter_values_key = "parameters" return { "gcsOutputDirectory": self._pipeline_root, - "parameters": { + parameter_values_key: { k: self._get_vertex_value(k, v) for k, v in self._parameter_values.items() if v is not None @@ -122,7 +138,7 @@ def build(self) -> Dict[str, Any]: def _get_vertex_value( self, name: str, value: Union[int, float, str, bool, list, dict] - ) -> Dict[str, Any]: + ) -> Union[int, float, str, bool, list, dict]: """Converts primitive values into Vertex pipeline Value proto message. Args: @@ -147,27 +163,21 @@ def _get_vertex_value( "pipeline job input definitions.".format(name) ) - result = {} - if self._parameter_types[name] == "INT": - result["intValue"] = value - elif self._parameter_types[name] == "DOUBLE": - result["doubleValue"] = value - elif self._parameter_types[name] == "STRING": - result["stringValue"] = value - elif self._parameter_types[name] == "BOOLEAN": - result["boolValue"] = value - elif self._parameter_types[name] == "NUMBER_DOUBLE": - result["numberValue"] = value - elif self._parameter_types[name] == "NUMBER_INTEGER": - result["numberValue"] = value - elif self._parameter_types[name] == "LIST": - result["listValue"] = value - elif self._parameter_types[name] == "STRUCT": - result["structValue"] = value + if packaging.version.parse(self._schema_version) <= packaging.version.parse( + "2.0.0" + ): + result = {} + if self._parameter_types[name] == "INT": + result["intValue"] = value + elif self._parameter_types[name] == "DOUBLE": + result["doubleValue"] = value + elif self._parameter_types[name] == "STRING": + result["stringValue"] = value + else: + raise TypeError("Got unknown type of value: {}".format(value)) + return result else: - raise TypeError("Got unknown type of value: {}".format(value)) - - return result + return value def _parse_runtime_parameters( diff --git a/tests/unit/aiplatform/test_pipeline_jobs.py b/tests/unit/aiplatform/test_pipeline_jobs.py index a4135a6f37..81f14a7ead 100644 --- a/tests/unit/aiplatform/test_pipeline_jobs.py +++ b/tests/unit/aiplatform/test_pipeline_jobs.py @@ -53,13 +53,24 @@ _TEST_PIPELINE_JOB_NAME = f"projects/{_TEST_PROJECT}/locations/{_TEST_LOCATION}/pipelineJobs/{_TEST_PIPELINE_JOB_ID}" -_TEST_PIPELINE_PARAMETER_VALUES = {"string_param": "hello"} +_TEST_PIPELINE_PARAMETER_VALUES_LEGACY = {"string_param": "hello"} +_TEST_PIPELINE_PARAMETER_VALUES = { + "string_param": "hello world", + "bool_param": True, + "double_param": 12.34, + "int_param": 5678, + "list_int_param": [123, 456, 789], + "list_string_param": ["lorem", "ipsum"], + "struct_param": {"key1": 12345, "key2": 67890}, +} + _TEST_PIPELINE_SPEC_LEGACY = { "pipelineInfo": {"name": "my-pipeline"}, "root": { "dag": {"tasks": {}}, "inputDefinitions": {"parameters": {"string_param": {"type": "STRING"}}}, }, + "schemaVersion": "2.0.0", "components": {}, } _TEST_PIPELINE_SPEC = { @@ -69,28 +80,16 @@ "inputDefinitions": { "parameters": { "string_param": {"parameterType": "STRING"}, - # uncomment when GAPIC library change for protobufValue is in - # "bool_param": { - # "parameterType": "BOOLEAN" - # }, - # "double_param": { - # "parameterType": "NUMBER_DOUBLE" - # }, - # "int_param": { - # "parameterType": "NUMBER_INTEGER" - # }, - # "list_int_param": { - # "parameterType": "LIST" - # }, - # "list_string_param": { - # "parameterType": "LIST" - # }, - # "struct_param": { - # "parameterType": "STRUCT" - # } + "bool_param": {"parameterType": "BOOLEAN"}, + "double_param": {"parameterType": "NUMBER_DOUBLE"}, + "int_param": {"parameterType": "NUMBER_INTEGER"}, + "list_int_param": {"parameterType": "LIST"}, + "list_string_param": {"parameterType": "LIST"}, + "struct_param": {"parameterType": "STRUCT"}, } }, }, + "schemaVersion": "2.1.0", "components": {}, } @@ -98,20 +97,8 @@ "runtimeConfig": {}, "pipelineSpec": _TEST_PIPELINE_SPEC_LEGACY, } - _TEST_PIPELINE_JOB = { - "runtimeConfig": { - "parameterValues": { - "string_param": "lorem ipsum", - # uncomment when GAPIC library change for protobufValue is in - # "bool_param": True, - # "double_param": 12.34, - # "int_param": 5678, - # "list_int_param": [123, 456, 789], - # "list_string_param": ["lorem", "ipsum"], - # "struct_param": { "key1": 12345, "key2": 67890} - }, - }, + "runtimeConfig": {"parameterValues": _TEST_PIPELINE_PARAMETER_VALUES}, "pipelineSpec": _TEST_PIPELINE_SPEC, } @@ -250,13 +237,7 @@ def teardown_method(self): initializer.global_pool.shutdown(wait=True) @pytest.mark.parametrize( - "job_spec_json", - [ - _TEST_PIPELINE_SPEC, - _TEST_PIPELINE_JOB, - _TEST_PIPELINE_SPEC_LEGACY, - _TEST_PIPELINE_JOB_LEGACY, - ], + "job_spec_json", [_TEST_PIPELINE_SPEC, _TEST_PIPELINE_JOB], ) @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_create( @@ -286,6 +267,77 @@ def test_run_call_pipeline_service_create( service_account=_TEST_SERVICE_ACCOUNT, network=_TEST_NETWORK, sync=sync, ) + if not sync: + job.wait() + + expected_runtime_config_dict = { + "gcsOutputDirectory": _TEST_GCS_BUCKET_NAME, + "parameterValues": _TEST_PIPELINE_PARAMETER_VALUES, + } + runtime_config = gca_pipeline_job_v1.PipelineJob.RuntimeConfig()._pb + json_format.ParseDict(expected_runtime_config_dict, runtime_config) + + pipeline_spec = job_spec_json.get("pipelineSpec") or job_spec_json + + # Construct expected request + expected_gapic_pipeline_job = gca_pipeline_job_v1.PipelineJob( + display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME, + pipeline_spec={ + "components": {}, + "pipelineInfo": pipeline_spec["pipelineInfo"], + "root": pipeline_spec["root"], + "schemaVersion": "2.1.0", + }, + runtime_config=runtime_config, + service_account=_TEST_SERVICE_ACCOUNT, + network=_TEST_NETWORK, + ) + + mock_pipeline_service_create.assert_called_once_with( + parent=_TEST_PARENT, + pipeline_job=expected_gapic_pipeline_job, + pipeline_job_id=_TEST_PIPELINE_JOB_ID, + ) + + mock_pipeline_service_get.assert_called_with( + name=_TEST_PIPELINE_JOB_NAME, retry=base._DEFAULT_RETRY + ) + + assert job._gca_resource == make_pipeline_job( + gca_pipeline_state_v1.PipelineState.PIPELINE_STATE_SUCCEEDED + ) + + @pytest.mark.parametrize( + "job_spec_json", [_TEST_PIPELINE_SPEC_LEGACY, _TEST_PIPELINE_JOB_LEGACY], + ) + @pytest.mark.parametrize("sync", [True, False]) + def test_run_call_pipeline_service_create_legacy( + self, + mock_pipeline_service_create, + mock_pipeline_service_get, + job_spec_json, + mock_load_json, + sync, + ): + aiplatform.init( + project=_TEST_PROJECT, + staging_bucket=_TEST_GCS_BUCKET_NAME, + location=_TEST_LOCATION, + credentials=_TEST_CREDENTIALS, + ) + + job = pipeline_jobs.PipelineJob( + display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME, + template_path=_TEST_TEMPLATE_PATH, + job_id=_TEST_PIPELINE_JOB_ID, + parameter_values=_TEST_PIPELINE_PARAMETER_VALUES_LEGACY, + enable_caching=True, + ) + + job.run( + service_account=_TEST_SERVICE_ACCOUNT, network=_TEST_NETWORK, sync=sync, + ) + if not sync: job.wait() @@ -305,6 +357,7 @@ def test_run_call_pipeline_service_create( "components": {}, "pipelineInfo": pipeline_spec["pipelineInfo"], "root": pipeline_spec["root"], + "schemaVersion": "2.0.0", }, runtime_config=runtime_config, service_account=_TEST_SERVICE_ACCOUNT, @@ -326,13 +379,7 @@ def test_run_call_pipeline_service_create( ) @pytest.mark.parametrize( - "job_spec_json", - [ - _TEST_PIPELINE_SPEC, - _TEST_PIPELINE_JOB, - _TEST_PIPELINE_SPEC_LEGACY, - _TEST_PIPELINE_JOB_LEGACY, - ], + "job_spec_json", [_TEST_PIPELINE_SPEC, _TEST_PIPELINE_JOB], ) def test_submit_call_pipeline_service_pipeline_job_create( self, @@ -359,8 +406,76 @@ def test_submit_call_pipeline_service_pipeline_job_create( job.submit(service_account=_TEST_SERVICE_ACCOUNT, network=_TEST_NETWORK) expected_runtime_config_dict = { - "gcs_output_directory": _TEST_GCS_BUCKET_NAME, + "gcsOutputDirectory": _TEST_GCS_BUCKET_NAME, + "parameterValues": _TEST_PIPELINE_PARAMETER_VALUES, + } + runtime_config = gca_pipeline_job_v1.PipelineJob.RuntimeConfig()._pb + json_format.ParseDict(expected_runtime_config_dict, runtime_config) + + pipeline_spec = job_spec_json.get("pipelineSpec") or job_spec_json + + # Construct expected request + expected_gapic_pipeline_job = gca_pipeline_job_v1.PipelineJob( + display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME, + pipeline_spec={ + "components": {}, + "pipelineInfo": pipeline_spec["pipelineInfo"], + "root": pipeline_spec["root"], + "schemaVersion": "2.1.0", + }, + runtime_config=runtime_config, + service_account=_TEST_SERVICE_ACCOUNT, + network=_TEST_NETWORK, + ) + + mock_pipeline_service_create.assert_called_once_with( + parent=_TEST_PARENT, + pipeline_job=expected_gapic_pipeline_job, + pipeline_job_id=_TEST_PIPELINE_JOB_ID, + ) + + assert not mock_pipeline_service_get.called + + job.wait() + + mock_pipeline_service_get.assert_called_with( + name=_TEST_PIPELINE_JOB_NAME, retry=base._DEFAULT_RETRY + ) + + assert job._gca_resource == make_pipeline_job( + gca_pipeline_state_v1.PipelineState.PIPELINE_STATE_SUCCEEDED + ) + + @pytest.mark.parametrize( + "job_spec_json", [_TEST_PIPELINE_SPEC_LEGACY, _TEST_PIPELINE_JOB_LEGACY], + ) + def test_submit_call_pipeline_service_pipeline_job_create_legacy( + self, + mock_pipeline_service_create, + mock_pipeline_service_get, + job_spec_json, + mock_load_json, + ): + aiplatform.init( + project=_TEST_PROJECT, + staging_bucket=_TEST_GCS_BUCKET_NAME, + location=_TEST_LOCATION, + credentials=_TEST_CREDENTIALS, + ) + + job = pipeline_jobs.PipelineJob( + display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME, + template_path=_TEST_TEMPLATE_PATH, + job_id=_TEST_PIPELINE_JOB_ID, + parameter_values=_TEST_PIPELINE_PARAMETER_VALUES_LEGACY, + enable_caching=True, + ) + + job.submit(service_account=_TEST_SERVICE_ACCOUNT, network=_TEST_NETWORK) + + expected_runtime_config_dict = { "parameters": {"string_param": {"stringValue": "hello"}}, + "gcsOutputDirectory": _TEST_GCS_BUCKET_NAME, } runtime_config = gca_pipeline_job_v1.PipelineJob.RuntimeConfig()._pb json_format.ParseDict(expected_runtime_config_dict, runtime_config) @@ -374,6 +489,7 @@ def test_submit_call_pipeline_service_pipeline_job_create( "components": {}, "pipelineInfo": pipeline_spec["pipelineInfo"], "root": pipeline_spec["root"], + "schemaVersion": "2.0.0", }, runtime_config=runtime_config, service_account=_TEST_SERVICE_ACCOUNT, @@ -508,13 +624,7 @@ def test_cancel_pipeline_job_without_running( "mock_pipeline_service_create", "mock_pipeline_service_get_with_fail", ) @pytest.mark.parametrize( - "job_spec_json", - [ - _TEST_PIPELINE_SPEC, - _TEST_PIPELINE_JOB, - _TEST_PIPELINE_SPEC_LEGACY, - _TEST_PIPELINE_JOB_LEGACY, - ], + "job_spec_json", [_TEST_PIPELINE_SPEC, _TEST_PIPELINE_JOB], ) @pytest.mark.parametrize("sync", [True, False]) def test_pipeline_failure_raises(self, mock_load_json, sync): diff --git a/tests/unit/aiplatform/test_utils.py b/tests/unit/aiplatform/test_utils.py index 34bba22819..928b01a889 100644 --- a/tests/unit/aiplatform/test_utils.py +++ b/tests/unit/aiplatform/test_utils.py @@ -375,7 +375,8 @@ class TestPipelineUtils: "list_param": {"type": "STRING"}, } } - } + }, + "schemaVersion": "2.0.0", }, "runtimeConfig": { "gcsOutputDirectory": "path/to/my/root", @@ -390,6 +391,7 @@ class TestPipelineUtils: def test_pipeline_utils_runtime_config_builder_from_values(self): my_builder = pipeline_utils.PipelineRuntimeConfigBuilder( pipeline_root="path/to/my/root", + schema_version="2.0.0", parameter_types={ "string_param": "STRING", "int_param": "INT",