Skip to content

Commit

Permalink
Add pipeline_start_post_processing to Orchestrator env
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 620324534
  • Loading branch information
kmonte authored and tfx-copybara committed Apr 2, 2024
1 parent b8b9423 commit 0d09a76
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 0 deletions.
13 changes: 13 additions & 0 deletions tfx/orchestration/experimental/core/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ def set_health_status(self, status: status_lib.Status) -> None:
def check_if_can_orchestrate(self, pipeline: pipeline_pb2.Pipeline) -> None:
"""Check if this orchestrator is capable of orchestrating the pipeline."""

@abc.abstractmethod
def pipeline_start_post_process(self, pipeline: pipeline_pb2.Pipeline):
"""Method for processing a pipeline at the end of its initialization, before it starts running.
This *will* mutate the provided IR in-place.
Args:
pipeline: The pipeline IR to process.
"""


class _DefaultEnv(Env):
"""Default environment."""
Expand Down Expand Up @@ -104,6 +114,9 @@ def set_health_status(self, status: status_lib.Status) -> None:
def check_if_can_orchestrate(self, pipeline: pipeline_pb2.Pipeline) -> None:
pass

def pipeline_start_post_process(self, pipeline: pipeline_pb2.Pipeline):
pass


_ENV = _DefaultEnv()

Expand Down
4 changes: 4 additions & 0 deletions tfx/orchestration/experimental/core/env_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import tensorflow as tf
from tfx.orchestration.experimental.core import env
from tfx.orchestration.experimental.core import test_utils
from tfx.proto.orchestration import pipeline_pb2
from tfx.utils import status as status_lib


Expand Down Expand Up @@ -45,6 +46,9 @@ def set_health_status(self, status: status_lib.Status) -> None:
def check_if_can_orchestrate(self, pipeline) -> None:
raise NotImplementedError()

def pipeline_start_post_process(self, pipeline: pipeline_pb2.Pipeline):
raise NotImplementedError()


class EnvTest(test_utils.TfxTest):

Expand Down

0 comments on commit 0d09a76

Please sign in to comment.