Skip to content

Commit

Permalink
no-op
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 625128214
  • Loading branch information
tfx-copybara committed May 7, 2024
1 parent 2937ac6 commit 96f1985
Show file tree
Hide file tree
Showing 124 changed files with 943 additions and 7,352 deletions.
2 changes: 0 additions & 2 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
can now lead to (justified) type checking errors that were previously
hidden due to `C` being of type `Any`.
* `ph.to_list()` was renamed to `ph.make_list()` for consistency.
* Support KFP pipeline spec 2.1.0 version schema


### For Pipeline Authors
Expand Down Expand Up @@ -102,7 +101,6 @@
| `tensorflow-decision-forests` | `>=1.0.1,<1.9` | `>=1.0.1,<2` | |
| `tensorflow-hub` | `>=0.9.0,<0.14` | `>=0.15.0,<0.16` | |
| `tensorflow-serving` | `>=1.15,!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,<3` | `>=2.15,<2.16` | |
| `kfp-pipeline-spec` | `kfp-pipeline-spec>=0.1.10,<0.2` | `>0.1.13,<0.2` | |

## Documentation Updates

Expand Down
4 changes: 2 additions & 2 deletions tfx/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def make_extra_packages_kfp():
return [
# TODO(b/304892416): Migrate from KFP SDK v1 to v2.
'kfp>=1.8.14,<2',
'kfp-pipeline-spec>0.1.13,<0.2',
'kfp-pipeline-spec>=0.1.10,<0.2',
]


Expand All @@ -163,7 +163,7 @@ def make_extra_packages_docker_image():
return [
# TODO(b/304892416): Migrate from KFP SDK v1 to v2.
'kfp>=1.8.14,<2',
'kfp-pipeline-spec>0.1.13,<0.2',
'kfp-pipeline-spec>=0.1.10,<0.2',
'mmh>=2.2,<3',
'python-snappy>=0.5,<0.6',
# Required for tfx/examples/penguin/penguin_utils_cloud_tuner.py
Expand Down
24 changes: 10 additions & 14 deletions tfx/examples/penguin/penguin_pipeline_kubeflow_e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import os

from absl.testing import parameterized
import tensorflow as tf
from tfx.dsl.io import fileio
from tfx.examples.penguin import penguin_pipeline_kubeflow
Expand All @@ -24,9 +23,8 @@
from tfx.utils import io_utils


class PenguinPipelineKubeflowV2Test(
base_test_case.BaseKubeflowV2Test, parameterized.TestCase
):
class PenguinPipelineKubeflowV2Test(base_test_case.BaseKubeflowV2Test):

def setUp(self):
super().setUp()
penguin_examples_dir = os.path.join(self._REPO_BASE, 'tfx', 'examples',
Expand All @@ -43,11 +41,7 @@ def setUp(self):
io_utils.copy_file(
penguin_test_schema_file, self._penguin_schema_file, overwrite=True)

@parameterized.named_parameters(
dict(testcase_name='use_pipeline_spec_2_1', use_pipeline_spec_2_1=True),
dict(testcase_name='use_pipeline_spec_2_0', use_pipeline_spec_2_1=False),
)
def testEndToEndPipelineRun(self, use_pipeline_spec_2_1):
def testEndToEndPipelineRun(self):
"""E2E test for pipeline with runtime parameter."""
pipeline_name = 'kubeflow-v2-e2e-test-{}'.format(self._test_id)
kubeflow_pipeline = penguin_pipeline_kubeflow.create_pipeline(
Expand All @@ -72,11 +66,13 @@ def testEndToEndPipelineRun(self, use_pipeline_spec_2_1):
self._run_pipeline(
pipeline=kubeflow_pipeline,
parameter_values={
'train-args': '{"num_steps": 100}',
'eval-args': '{"num_steps": 50}',
},
use_pipeline_spec_2_1=use_pipeline_spec_2_1,
)
'train-args': {
'num_steps': 100
},
'eval-args': {
'num_steps': 50
}
})
self.assertTrue(fileio.exists(self._serving_model_dir))


Expand Down
30 changes: 30 additions & 0 deletions tfx/orchestration/datahub_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright 2024 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utils to log Tflex/MLMD entities to Datahub."""
from typing import Optional

from tfx.orchestration.experimental.core import task as task_lib
from tfx.utils import typing_utils

from ml_metadata.proto import metadata_store_pb2


def log_node_execution(
execution: metadata_store_pb2.Execution,
task: Optional[task_lib.ExecNodeTask] = None,
output_artifacts: Optional[typing_utils.ArtifactMultiMap] = None,
):
"""Logs a Tflex node execution and its input/output artifacts."""
del execution, task, output_artifacts
return
23 changes: 0 additions & 23 deletions tfx/orchestration/experimental/core/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,18 +93,6 @@ def prepare_orchestrator_for_pipeline_run(
pipeline: The pipeline IR to prepare for.
"""

@abc.abstractmethod
def create_pipeline_run(
self,
owner: str,
pipeline_name: str,
execution: metadata_store_pb2.Execution,
pipeline: pipeline_pb2.Pipeline,
pipeline_run_metadata: Optional[str] = None,
base_pipeline_run_id: Optional[str] = None,
) -> None:
"""Creates a (sub-)pipeline run."""

@abc.abstractmethod
def update_pipeline_run_status(
self,
Expand Down Expand Up @@ -161,17 +149,6 @@ def prepare_orchestrator_for_pipeline_run(
):
pass

def create_pipeline_run(
self,
owner: str,
pipeline_name: str,
execution: metadata_store_pb2.Execution,
pipeline: pipeline_pb2.Pipeline,
pipeline_run_metadata: Optional[str] = None,
base_pipeline_run_id: Optional[str] = None,
) -> None:
pass

def update_pipeline_run_status(
self,
owner: str,
Expand Down
11 changes: 0 additions & 11 deletions tfx/orchestration/experimental/core/env_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,6 @@ def prepare_orchestrator_for_pipeline_run(
):
raise NotImplementedError()

def create_pipeline_run(
self,
owner: str,
pipeline_name: str,
execution: metadata_store_pb2.Execution,
pipeline: pipeline_pb2.Pipeline,
pipeline_run_metadata: Optional[str] = None,
base_pipeline_run_id: Optional[str] = None,
):
raise NotImplementedError()

def update_pipeline_run_status(
self,
owner: str,
Expand Down
6 changes: 4 additions & 2 deletions tfx/orchestration/experimental/core/post_execution_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ def _update_state(
execution_id=task.execution_id,
contexts=task.contexts,
output_artifacts=task.output_artifacts,
executor_output=executor_output)
executor_output=executor_output,
task=task)
garbage_collection.run_garbage_collection_for_node(mlmd_handle,
task.node_uid,
task.get_node())
Expand Down Expand Up @@ -125,7 +126,8 @@ def _update_state(
mlmd_handle,
execution_id=task.execution_id,
contexts=task.contexts,
output_artifacts=output_artifacts)
output_artifacts=output_artifacts,
task=task)
elif isinstance(result.output, ts.ResolverNodeOutput):
resolved_input_artifacts = result.output.resolved_input_artifacts
# TODO(b/262040844): Instead of directly using the context manager here, we
Expand Down
121 changes: 32 additions & 89 deletions tfx/orchestration/kubeflow/v2/compiler_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,36 @@
_YAML_DOUBLE_TYPE = 'double'


def build_parameter_type_spec_legacy(
value: Union[types.Property, data_types.RuntimeParameter],
def build_runtime_parameter_spec(
parameters: List[data_types.RuntimeParameter]
) -> Dict[str, pipeline_pb2.PipelineSpec.RuntimeParameter]:
"""Converts RuntimeParameters to mapping from names to proto messages."""

def to_message(parameter: data_types.RuntimeParameter):
"""Converts a RuntimeParameter to RuntimeParameter message."""
result = pipeline_pb2.PipelineSpec.RuntimeParameter()
# 1. Map the RuntimeParameter type to an enum in the proto definition.
if parameter.ptype == int or parameter.ptype == bool:
result.type = pipeline_pb2.PrimitiveType.INT
elif parameter.ptype == float:
result.type = pipeline_pb2.PrimitiveType.DOUBLE
elif parameter.ptype == str:
result.type = pipeline_pb2.PrimitiveType.STRING
else:
raise TypeError(
'Unknown parameter type: {} found in parameter: {}'.format(
parameter.ptype, parameter.name))
# 2. Convert its default value.
default = value_converter(parameter.default)
if default is not None:
result.default_value.CopyFrom(default.constant_value)
return result

return {param.name: to_message(param) for param in parameters}


def build_parameter_type_spec(
value: Union[types.Property, data_types.RuntimeParameter]
) -> pipeline_pb2.ComponentInputsSpec.ParameterSpec:
"""Extracts the artifact type info into ComponentInputsSpec.ParameterSpec."""
is_runtime_param = isinstance(value, data_types.RuntimeParameter)
Expand All @@ -92,25 +120,6 @@ def build_parameter_type_spec_legacy(
return result


def build_parameter_type_spec(
value: Union[types.Property, data_types.RuntimeParameter],
) -> pipeline_pb2.ComponentInputsSpec.ParameterSpec:
"""Extracts the artifact type info into ComponentInputsSpec.ParameterSpec."""
is_runtime_param = isinstance(value, data_types.RuntimeParameter)
result = pipeline_pb2.ComponentInputsSpec.ParameterSpec()
if isinstance(value, int) or (is_runtime_param and value.ptype == int):
result.parameter_type = pipeline_pb2.ParameterType.NUMBER_INTEGER
elif isinstance(value, float) or (is_runtime_param and value.ptype == float):
result.parameter_type = pipeline_pb2.ParameterType.NUMBER_DOUBLE
elif isinstance(value, str) or (is_runtime_param and value.ptype == str):
result.parameter_type = pipeline_pb2.ParameterType.STRING
else:
# By default, unrecognized object will be json dumped, hence is string type.
# For example, resolver class.
result.parameter_type = pipeline_pb2.ParameterType.STRING
return result


def _validate_properties_schema(
instance_schema: str,
properties: Optional[Mapping[str, artifact.PropertyType]] = None):
Expand Down Expand Up @@ -219,9 +228,8 @@ def pack_artifact_properties(artifact_instance: artifact.Artifact):
return struct_proto


def value_converter_legacy(
tfx_value: Any,
) -> Optional[pipeline_pb2.ValueOrRuntimeParameter]:
def value_converter(
tfx_value: Any) -> Optional[pipeline_pb2.ValueOrRuntimeParameter]:
"""Converts TFX/MLMD values into Kubeflow pipeline ValueOrRuntimeParameter."""
if tfx_value is None:
return None
Expand Down Expand Up @@ -258,53 +266,6 @@ def value_converter_legacy(
return result


def value_converter(
tfx_value: Any,
) -> Optional[pipeline_pb2.ValueOrRuntimeParameter]:
"""Converts TFX/MLMD values into Kubeflow pipeline ValueOrRuntimeParameter."""
if tfx_value is None:
return None

result = pipeline_pb2.ValueOrRuntimeParameter()
if isinstance(tfx_value, (int, float, str)):
result.constant.CopyFrom(get_google_value(tfx_value))
elif isinstance(tfx_value, (Dict, List)):
result.constant.CopyFrom(
struct_pb2.Value(string_value=json.dumps(tfx_value))
)
elif isinstance(tfx_value, data_types.RuntimeParameter):
# Attach the runtime parameter to the context.
parameter_utils.attach_parameter(tfx_value)
result.runtime_parameter = tfx_value.name
elif isinstance(tfx_value, metadata_store_pb2.Value):
if tfx_value.WhichOneof('value') == 'int_value':
result.constant.CopyFrom(
struct_pb2.Value(number_value=tfx_value.int_value)
)
elif tfx_value.WhichOneof('value') == 'double_value':
result.constant.CopyFrom(
struct_pb2.Value(number_value=tfx_value.double_value)
)
elif tfx_value.WhichOneof('value') == 'string_value':
result.constant.CopyFrom(
struct_pb2.Value(string_value=tfx_value.string_value)
)
elif isinstance(tfx_value, message.Message):
result.constant.CopyFrom(
struct_pb2.Value(
string_value=json_format.MessageToJson(
message=tfx_value, sort_keys=True
)
)
)
else:
# By default will attempt to encode the object using json_utils.dumps.
result.constant.CopyFrom(
struct_pb2.Value(string_value=json_utils.dumps(tfx_value))
)
return result


def get_kubeflow_value(
tfx_value: Union[int, float, str]) -> Optional[pipeline_pb2.Value]:
"""Converts TFX/MLMD values into Kubeflow pipeline Value proto message."""
Expand All @@ -324,24 +285,6 @@ def get_kubeflow_value(
return result


def get_google_value(
tfx_value: Union[int, float, str],
) -> Optional[struct_pb2.Value]:
"""Converts TFX/MLMD values into Kubeflow pipeline Value proto message."""
if tfx_value is None:
return None

result = struct_pb2.Value()
if isinstance(tfx_value, int) or isinstance(tfx_value, float):
result.number_value = tfx_value
elif isinstance(tfx_value, str):
result.string_value = tfx_value
else:
raise TypeError('Got unknown type of value: {}'.format(tfx_value))

return result


def get_mlmd_value(
kubeflow_value: pipeline_pb2.Value) -> metadata_store_pb2.Value:
"""Converts Kubeflow pipeline Value pb message to MLMD Value."""
Expand Down
27 changes: 3 additions & 24 deletions tfx/orchestration/kubeflow/v2/compiler_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def testCustomArtifactSchemaMismatchFails(self):
_MY_BAD_ARTIFACT_SCHEMA_WITH_PROPERTIES,
_MyArtifactWithProperty.PROPERTIES)

def testBuildParameterTypeSpecLegacy(self):
def testBuildParameterTypeSpec(self):
type_enum = pipeline_pb2.PrimitiveType.PrimitiveTypeEnum
testdata = {
42: type_enum.INT,
Expand All @@ -147,29 +147,8 @@ def testBuildParameterTypeSpecLegacy(self):
}
for value, expected_type_enum in testdata.items():
self.assertEqual(
compiler_utils.build_parameter_type_spec_legacy(value).type,
expected_type_enum,
)

def testBuildParameterTypeSpec(self):
type_enum = pipeline_pb2.ParameterType.ParameterTypeEnum
testdata = {
42: type_enum.NUMBER_INTEGER,
42.1: type_enum.NUMBER_DOUBLE,
'42': type_enum.STRING,
data_types.RuntimeParameter(
name='_', ptype=int
): type_enum.NUMBER_INTEGER,
data_types.RuntimeParameter(
name='_', ptype=float
): type_enum.NUMBER_DOUBLE,
data_types.RuntimeParameter(name='_', ptype=str): type_enum.STRING,
}
for value, expected_type_enum in testdata.items():
self.assertEqual(
compiler_utils.build_parameter_type_spec(value).parameter_type,
expected_type_enum,
)
compiler_utils.build_parameter_type_spec(value).type,
expected_type_enum)

def testBuildOutputParameterSpecValueArtifact(self):
param = pipeline_pb2.ParameterType
Expand Down

0 comments on commit 96f1985

Please sign in to comment.