From 2fc05cab03a2c7f8462b234b02d43bc7581ba845 Mon Sep 17 00:00:00 2001 From: Yaqi Ji Date: Thu, 28 Oct 2021 16:15:18 -0700 Subject: [PATCH] feat: support new protobuf value param types for Pipeline Job client (#797) * feat: update PipelineJob to accept protobuf value * fix tests * address comments --- .../cloud/aiplatform/utils/pipeline_utils.py | 54 ++-- tests/unit/aiplatform/test_pipeline_jobs.py | 251 ++++++++++-------- tests/unit/aiplatform/test_utils.py | 4 +- 3 files changed, 176 insertions(+), 133 deletions(-) diff --git a/google/cloud/aiplatform/utils/pipeline_utils.py b/google/cloud/aiplatform/utils/pipeline_utils.py index bc531d2b12..a840150527 100644 --- a/google/cloud/aiplatform/utils/pipeline_utils.py +++ b/google/cloud/aiplatform/utils/pipeline_utils.py @@ -64,9 +64,13 @@ def from_job_spec_json( .get("inputDefinitions", {}) .get("parameters", {}) ) - parameter_types = {k: v["type"] for k, v in parameter_input_definitions.items()} + # 'type' is deprecated in IR and change to 'parameterType'. + parameter_types = { + k: v.get("parameterType") or v.get("type") + for k, v in parameter_input_definitions.items() + } - pipeline_root = runtime_config_spec.get("gcs_output_directory") + pipeline_root = runtime_config_spec.get("gcsOutputDirectory") parameter_values = _parse_runtime_parameters(runtime_config_spec) return cls(pipeline_root, parameter_types, parameter_values) @@ -108,7 +112,7 @@ def build(self) -> Dict[str, Any]: "compile time, or when calling the service." ) return { - "gcs_output_directory": self._pipeline_root, + "gcsOutputDirectory": self._pipeline_root, "parameters": { k: self._get_vertex_value(k, v) for k, v in self._parameter_values.items() @@ -117,14 +121,14 @@ def build(self) -> Dict[str, Any]: } def _get_vertex_value( - self, name: str, value: Union[int, float, str] + self, name: str, value: Union[int, float, str, bool, list, dict] ) -> Dict[str, Any]: """Converts primitive values into Vertex pipeline Value proto message. Args: name (str): Required. The name of the pipeline parameter. - value (Union[int, float, str]): + value (Union[int, float, str, bool, list, dict]): Required. The value of the pipeline parameter. Returns: @@ -150,6 +154,16 @@ def _get_vertex_value( result["doubleValue"] = value elif self._parameter_types[name] == "STRING": result["stringValue"] = value + elif self._parameter_types[name] == "BOOLEAN": + result["boolValue"] = value + elif self._parameter_types[name] == "NUMBER_DOUBLE": + result["numberValue"] = value + elif self._parameter_types[name] == "NUMBER_INTEGER": + result["numberValue"] = value + elif self._parameter_types[name] == "LIST": + result["listValue"] = value + elif self._parameter_types[name] == "STRUCT": + result["structValue"] = value else: raise TypeError("Got unknown type of value: {}".format(value)) @@ -164,19 +178,19 @@ def _parse_runtime_parameters( Raises: TypeError: if the parameter type is not one of 'INT', 'DOUBLE', 'STRING'. """ - runtime_parameters = runtime_config_spec.get("parameters") - if not runtime_parameters: - return None - - result = {} - for name, value in runtime_parameters.items(): - if "intValue" in value: - result[name] = int(value["intValue"]) - elif "doubleValue" in value: - result[name] = float(value["doubleValue"]) - elif "stringValue" in value: - result[name] = value["stringValue"] - else: - raise TypeError("Got unknown type of value: {}".format(value)) + # 'parameters' are deprecated in IR and changed to 'parameterValues'. + if runtime_config_spec.get("parameterValues") is not None: + return runtime_config_spec.get("parameterValues") - return result + if runtime_config_spec.get("parameters") is not None: + result = {} + for name, value in runtime_config_spec.get("parameters").items(): + if "intValue" in value: + result[name] = int(value["intValue"]) + elif "doubleValue" in value: + result[name] = float(value["doubleValue"]) + elif "stringValue" in value: + result[name] = value["stringValue"] + else: + raise TypeError("Got unknown type of value: {}".format(value)) + return result diff --git a/tests/unit/aiplatform/test_pipeline_jobs.py b/tests/unit/aiplatform/test_pipeline_jobs.py index 098aef2570..00e39caf26 100644 --- a/tests/unit/aiplatform/test_pipeline_jobs.py +++ b/tests/unit/aiplatform/test_pipeline_jobs.py @@ -53,17 +53,65 @@ _TEST_PIPELINE_JOB_NAME = f"projects/{_TEST_PROJECT}/locations/{_TEST_LOCATION}/pipelineJobs/{_TEST_PIPELINE_JOB_ID}" -_TEST_PIPELINE_PARAMETER_VALUES = {"name_param": "hello"} +_TEST_PIPELINE_PARAMETER_VALUES = {"string_param": "hello"} +_TEST_PIPELINE_SPEC_LEGACY = { + "pipelineInfo": {"name": "my-pipeline"}, + "root": { + "dag": {"tasks": {}}, + "inputDefinitions": {"parameters": {"string_param": {"type": "STRING"}}}, + }, + "components": {}, +} _TEST_PIPELINE_SPEC = { "pipelineInfo": {"name": "my-pipeline"}, "root": { "dag": {"tasks": {}}, - "inputDefinitions": {"parameters": {"name_param": {"type": "STRING"}}}, + "inputDefinitions": { + "parameters": { + "string_param": {"parameterType": "STRING"}, + # uncomment when GAPIC library change for protobufValue is in + # "bool_param": { + # "parameterType": "BOOLEAN" + # }, + # "double_param": { + # "parameterType": "NUMBER_DOUBLE" + # }, + # "int_param": { + # "parameterType": "NUMBER_INTEGER" + # }, + # "list_int_param": { + # "parameterType": "LIST" + # }, + # "list_string_param": { + # "parameterType": "LIST" + # }, + # "struct_param": { + # "parameterType": "STRUCT" + # } + } + }, }, "components": {}, } -_TEST_PIPELINE_JOB = { + +_TEST_PIPELINE_JOB_LEGACY = { "runtimeConfig": {}, + "pipelineSpec": _TEST_PIPELINE_SPEC_LEGACY, +} + +_TEST_PIPELINE_JOB = { + "runtimeConfig": { + "parameterValues": { + "string_param": "lorem ipsum", + # uncomment when GAPIC library change for protobufValue is in + # "bool_param": True, + # "double_param": 12.34, + # "int_param": 5678, + # "list_int_param": [123, 456, 789], + # "list_string_param": ["lorem", "ipsum"], + # "struct_param": { "key1": 12345, "key2": 67890} + }, + }, "pipelineSpec": _TEST_PIPELINE_SPEC, } @@ -177,23 +225,10 @@ def mock_pipeline_service_list(): @pytest.fixture -def mock_load_pipeline_job_json(): - with patch.object(storage.Blob, "download_as_bytes") as mock_load_pipeline_job_json: - mock_load_pipeline_job_json.return_value = json.dumps( - _TEST_PIPELINE_JOB - ).encode() - yield mock_load_pipeline_job_json - - -@pytest.fixture -def mock_load_pipeline_spec_json(): - with patch.object( - storage.Blob, "download_as_bytes" - ) as mock_load_pipeline_spec_json: - mock_load_pipeline_spec_json.return_value = json.dumps( - _TEST_PIPELINE_SPEC - ).encode() - yield mock_load_pipeline_spec_json +def mock_load_json(job_spec_json): + with patch.object(storage.Blob, "download_as_bytes") as mock_load_json: + mock_load_json.return_value = json.dumps(job_spec_json).encode() + yield mock_load_json class TestPipelineJob: @@ -214,10 +249,23 @@ def setup_method(self): def teardown_method(self): initializer.global_pool.shutdown(wait=True) - @pytest.mark.usefixtures("mock_load_pipeline_job_json") + @pytest.mark.parametrize( + "job_spec_json", + [ + _TEST_PIPELINE_SPEC, + _TEST_PIPELINE_JOB, + _TEST_PIPELINE_SPEC_LEGACY, + _TEST_PIPELINE_JOB_LEGACY, + ], + ) @pytest.mark.parametrize("sync", [True, False]) - def test_run_call_pipeline_service_pipeline_job_create( - self, mock_pipeline_service_create, mock_pipeline_service_get, sync, + def test_run_call_pipeline_service_create( + self, + mock_pipeline_service_create, + mock_pipeline_service_get, + job_spec_json, + mock_load_json, + sync, ): aiplatform.init( project=_TEST_PROJECT, @@ -242,19 +290,21 @@ def test_run_call_pipeline_service_pipeline_job_create( job.wait() expected_runtime_config_dict = { - "gcs_output_directory": _TEST_GCS_BUCKET_NAME, - "parameters": {"name_param": {"stringValue": "hello"}}, + "gcsOutputDirectory": _TEST_GCS_BUCKET_NAME, + "parameters": {"string_param": {"stringValue": "hello"}}, } runtime_config = gca_pipeline_job_v1beta1.PipelineJob.RuntimeConfig()._pb json_format.ParseDict(expected_runtime_config_dict, runtime_config) + pipeline_spec = job_spec_json.get("pipelineSpec") or job_spec_json + # Construct expected request expected_gapic_pipeline_job = gca_pipeline_job_v1beta1.PipelineJob( display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME, pipeline_spec={ "components": {}, - "pipelineInfo": _TEST_PIPELINE_JOB["pipelineSpec"]["pipelineInfo"], - "root": _TEST_PIPELINE_JOB["pipelineSpec"]["root"], + "pipelineInfo": pipeline_spec["pipelineInfo"], + "root": pipeline_spec["root"], }, runtime_config=runtime_config, service_account=_TEST_SERVICE_ACCOUNT, @@ -275,9 +325,21 @@ def test_run_call_pipeline_service_pipeline_job_create( gca_pipeline_state_v1beta1.PipelineState.PIPELINE_STATE_SUCCEEDED ) - @pytest.mark.usefixtures("mock_load_pipeline_job_json") + @pytest.mark.parametrize( + "job_spec_json", + [ + _TEST_PIPELINE_SPEC, + _TEST_PIPELINE_JOB, + _TEST_PIPELINE_SPEC_LEGACY, + _TEST_PIPELINE_JOB_LEGACY, + ], + ) def test_submit_call_pipeline_service_pipeline_job_create( - self, mock_pipeline_service_create, mock_pipeline_service_get + self, + mock_pipeline_service_create, + mock_pipeline_service_get, + job_spec_json, + mock_load_json, ): aiplatform.init( project=_TEST_PROJECT, @@ -298,18 +360,20 @@ def test_submit_call_pipeline_service_pipeline_job_create( expected_runtime_config_dict = { "gcs_output_directory": _TEST_GCS_BUCKET_NAME, - "parameters": {"name_param": {"stringValue": "hello"}}, + "parameters": {"string_param": {"stringValue": "hello"}}, } runtime_config = gca_pipeline_job_v1beta1.PipelineJob.RuntimeConfig()._pb json_format.ParseDict(expected_runtime_config_dict, runtime_config) + pipeline_spec = job_spec_json.get("pipelineSpec") or job_spec_json + # Construct expected request expected_gapic_pipeline_job = gca_pipeline_job_v1beta1.PipelineJob( display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME, pipeline_spec={ "components": {}, - "pipelineInfo": _TEST_PIPELINE_JOB["pipelineSpec"]["pipelineInfo"], - "root": _TEST_PIPELINE_JOB["pipelineSpec"]["root"], + "pipelineInfo": pipeline_spec["pipelineInfo"], + "root": pipeline_spec["root"], }, runtime_config=runtime_config, service_account=_TEST_SERVICE_ACCOUNT, @@ -334,67 +398,6 @@ def test_submit_call_pipeline_service_pipeline_job_create( gca_pipeline_state_v1beta1.PipelineState.PIPELINE_STATE_SUCCEEDED ) - @pytest.mark.usefixtures("mock_load_pipeline_spec_json") - @pytest.mark.parametrize("sync", [True, False]) - def test_run_call_pipeline_service_pipeline_spec_create( - self, mock_pipeline_service_create, mock_pipeline_service_get, sync, - ): - aiplatform.init( - project=_TEST_PROJECT, - staging_bucket=_TEST_GCS_BUCKET_NAME, - location=_TEST_LOCATION, - credentials=_TEST_CREDENTIALS, - ) - - job = pipeline_jobs.PipelineJob( - display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME, - template_path=_TEST_TEMPLATE_PATH, - job_id=_TEST_PIPELINE_JOB_ID, - parameter_values=_TEST_PIPELINE_PARAMETER_VALUES, - enable_caching=True, - ) - - job.run( - service_account=_TEST_SERVICE_ACCOUNT, network=_TEST_NETWORK, sync=sync, - ) - - if not sync: - job.wait() - - expected_runtime_config_dict = { - "gcs_output_directory": _TEST_GCS_BUCKET_NAME, - "parameters": {"name_param": {"stringValue": "hello"}}, - } - runtime_config = gca_pipeline_job_v1beta1.PipelineJob.RuntimeConfig()._pb - json_format.ParseDict(expected_runtime_config_dict, runtime_config) - - # Construct expected request - expected_gapic_pipeline_job = gca_pipeline_job_v1beta1.PipelineJob( - display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME, - pipeline_spec={ - "components": {}, - "pipelineInfo": _TEST_PIPELINE_JOB["pipelineSpec"]["pipelineInfo"], - "root": _TEST_PIPELINE_JOB["pipelineSpec"]["root"], - }, - runtime_config=runtime_config, - service_account=_TEST_SERVICE_ACCOUNT, - network=_TEST_NETWORK, - ) - - mock_pipeline_service_create.assert_called_once_with( - parent=_TEST_PARENT, - pipeline_job=expected_gapic_pipeline_job, - pipeline_job_id=_TEST_PIPELINE_JOB_ID, - ) - - mock_pipeline_service_get.assert_called_with( - name=_TEST_PIPELINE_JOB_NAME, retry=base._DEFAULT_RETRY - ) - - assert job._gca_resource == make_pipeline_job( - gca_pipeline_state_v1beta1.PipelineState.PIPELINE_STATE_SUCCEEDED - ) - @pytest.mark.usefixtures("mock_pipeline_service_get") def test_get_pipeline_job(self, mock_pipeline_service_get): aiplatform.init(project=_TEST_PROJECT) @@ -406,13 +409,18 @@ def test_get_pipeline_job(self, mock_pipeline_service_get): assert isinstance(job, pipeline_jobs.PipelineJob) @pytest.mark.usefixtures( - "mock_pipeline_service_create", - "mock_pipeline_service_get", - "mock_load_pipeline_job_json", + "mock_pipeline_service_create", "mock_pipeline_service_get", ) - def test_cancel_pipeline_job( - self, mock_pipeline_service_cancel, - ): + @pytest.mark.parametrize( + "job_spec_json", + [ + _TEST_PIPELINE_SPEC, + _TEST_PIPELINE_JOB, + _TEST_PIPELINE_SPEC_LEGACY, + _TEST_PIPELINE_JOB_LEGACY, + ], + ) + def test_cancel_pipeline_job(self, mock_pipeline_service_cancel, mock_load_json): aiplatform.init( project=_TEST_PROJECT, staging_bucket=_TEST_GCS_BUCKET_NAME, @@ -433,11 +441,18 @@ def test_cancel_pipeline_job( ) @pytest.mark.usefixtures( - "mock_pipeline_service_create", - "mock_pipeline_service_get", - "mock_load_pipeline_job_json", + "mock_pipeline_service_create", "mock_pipeline_service_get", + ) + @pytest.mark.parametrize( + "job_spec_json", + [ + _TEST_PIPELINE_SPEC, + _TEST_PIPELINE_JOB, + _TEST_PIPELINE_SPEC_LEGACY, + _TEST_PIPELINE_JOB_LEGACY, + ], ) - def test_list_pipeline_job(self, mock_pipeline_service_list): + def test_list_pipeline_job(self, mock_pipeline_service_list, mock_load_json): aiplatform.init( project=_TEST_PROJECT, staging_bucket=_TEST_GCS_BUCKET_NAME, @@ -458,12 +473,19 @@ def test_list_pipeline_job(self, mock_pipeline_service_list): ) @pytest.mark.usefixtures( - "mock_pipeline_service_create", - "mock_pipeline_service_get", - "mock_load_pipeline_job_json", + "mock_pipeline_service_create", "mock_pipeline_service_get", + ) + @pytest.mark.parametrize( + "job_spec_json", + [ + _TEST_PIPELINE_SPEC, + _TEST_PIPELINE_JOB, + _TEST_PIPELINE_SPEC_LEGACY, + _TEST_PIPELINE_JOB_LEGACY, + ], ) def test_cancel_pipeline_job_without_running( - self, mock_pipeline_service_cancel, + self, mock_pipeline_service_cancel, mock_load_json, ): aiplatform.init( project=_TEST_PROJECT, @@ -483,12 +505,19 @@ def test_cancel_pipeline_job_without_running( assert e.match(regexp=r"PipelineJob resource has not been created") @pytest.mark.usefixtures( - "mock_pipeline_service_create", - "mock_pipeline_service_get_with_fail", - "mock_load_pipeline_job_json", + "mock_pipeline_service_create", "mock_pipeline_service_get_with_fail", + ) + @pytest.mark.parametrize( + "job_spec_json", + [ + _TEST_PIPELINE_SPEC, + _TEST_PIPELINE_JOB, + _TEST_PIPELINE_SPEC_LEGACY, + _TEST_PIPELINE_JOB_LEGACY, + ], ) @pytest.mark.parametrize("sync", [True, False]) - def test_pipeline_failure_raises(self, sync): + def test_pipeline_failure_raises(self, mock_load_json, sync): aiplatform.init( project=_TEST_PROJECT, staging_bucket=_TEST_GCS_BUCKET_NAME, diff --git a/tests/unit/aiplatform/test_utils.py b/tests/unit/aiplatform/test_utils.py index 418014ee45..34bba22819 100644 --- a/tests/unit/aiplatform/test_utils.py +++ b/tests/unit/aiplatform/test_utils.py @@ -378,7 +378,7 @@ class TestPipelineUtils: } }, "runtimeConfig": { - "gcs_output_directory": "path/to/my/root", + "gcsOutputDirectory": "path/to/my/root", "parameters": { "string_param": {"stringValue": "test-string"}, "int_param": {"intValue": 42}, @@ -444,7 +444,7 @@ def test_pipeline_utils_runtime_config_builder_with_merge_updates(self): actual_runtime_config = my_builder.build() expected_runtime_config = { - "gcs_output_directory": "path/to/my/new/root", + "gcsOutputDirectory": "path/to/my/new/root", "parameters": { "string_param": {"stringValue": "test-string"}, "int_param": {"intValue": 888},