Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests with higher level chaining api #1816

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 13 additions & 6 deletions metaflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from . import metaflow_version
from . import namespace
from .metaflow_current import current
from .client.core import get_metadata
from .cli_args import cli_args
from .tagging_util import validate_tags
from .util import (
Expand Down Expand Up @@ -558,11 +559,11 @@ def common_run_options(func):
help="Write the ID of this run to the file specified.",
)
@click.option(
"--pathspec-file",
"--runner-attribute-file",
default=None,
show_default=True,
type=str,
help="Write the pathspec of this run to the file specified.",
help="Write the metadata and pathspec of this run to the file specified. Used internally for Metaflow's Runner API.",
)
@wraps(func)
def wrapper(*args, **kwargs):
Expand Down Expand Up @@ -622,7 +623,7 @@ def resume(
decospecs=None,
run_id_file=None,
resume_identifier=None,
pathspec_file=None,
runner_attribute_file=None,
):
before_run(obj, tags, decospecs + obj.environment.decospecs())

Expand Down Expand Up @@ -679,10 +680,13 @@ def resume(
resume_identifier=resume_identifier,
)
write_file(run_id_file, runtime.run_id)
write_file(pathspec_file, "/".join((obj.flow.name, runtime.run_id)))
runtime.print_workflow_info()

runtime.persist_constants()
write_file(
runner_attribute_file,
"%s:%s" % (get_metadata(), "/".join((obj.flow.name, runtime.run_id))),
)
if clone_only:
runtime.clone_original_run()
else:
Expand Down Expand Up @@ -713,7 +717,7 @@ def run(
max_log_size=None,
decospecs=None,
run_id_file=None,
pathspec_file=None,
runner_attribute_file=None,
user_namespace=None,
**kwargs
):
Expand All @@ -738,11 +742,14 @@ def run(
)
write_latest_run_id(obj, runtime.run_id)
write_file(run_id_file, runtime.run_id)
write_file(pathspec_file, "/".join((obj.flow.name, runtime.run_id)))

obj.flow._set_constants(obj.graph, kwargs)
runtime.print_workflow_info()
runtime.persist_constants()
write_file(
runner_attribute_file,
"%s:%s" % (get_metadata(), "/".join((obj.flow.name, runtime.run_id))),
)
runtime.execute()


Expand Down
65 changes: 47 additions & 18 deletions metaflow/runner/metaflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@
import time
import tempfile
from typing import Dict, Iterator, Optional, Tuple
from metaflow import Run
from metaflow import Run, metadata
from .subprocess_manager import SubprocessManager, CommandManager


def clear_and_set_os_environ(env: Dict):
os.environ.clear()
os.environ.update(env)


def read_from_file_when_ready(file_path: str, timeout: float = 5):
start_time = time.time()
with open(file_path, "r", encoding="utf-8") as file_pointer:
Expand Down Expand Up @@ -227,7 +232,8 @@ def __init__(
from metaflow.runner.click_api import MetaflowAPI

self.flow_file = flow_file
self.env_vars = os.environ.copy()
self.old_env = os.environ.copy()
self.env_vars = self.old_env.copy()
self.env_vars.update(env or {})
if profile:
self.env_vars["METAFLOW_PROFILE"] = profile
Expand All @@ -241,9 +247,22 @@ def __enter__(self) -> "Runner":
async def __aenter__(self) -> "Runner":
return self

def __get_executing_run(self, tfp_pathspec, command_obj):

def __get_executing_run(self, tfp_runner_attribute, command_obj):
# When two 'Runner' executions are done sequentially i.e. one after the other
# the 2nd run kinda uses the 1st run's previously set metadata and
# environment variables.

# It is thus necessary to set them to correct values before we return
# the Run object.
try:
pathspec = read_from_file_when_ready(tfp_pathspec.name, timeout=10)
# Set the environment variables to what they were before the run executed.
clear_and_set_os_environ(self.old_env)

# Set the correct metadata from the runner_attribute file corresponding to this run.
content = read_from_file_when_ready(tfp_runner_attribute.name, timeout=10)
metadata_for_flow, pathspec = content.split(":", maxsplit=1)
metadata(metadata_for_flow)
run_object = Run(pathspec, _namespace_check=False)
return ExecutingRun(self, command_obj, run_object)
except TimeoutError as e:
Expand Down Expand Up @@ -280,17 +299,19 @@ def run(self, show_output: bool = False, **kwargs) -> ExecutingRun:
ExecutingRun object for this run.
"""
with tempfile.TemporaryDirectory() as temp_dir:
tfp_pathspec = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False)
tfp_runner_attribute = tempfile.NamedTemporaryFile(
dir=temp_dir, delete=False
)
command = self.api(**self.top_level_kwargs).run(
pathspec_file=tfp_pathspec.name, **kwargs
runner_attribute_file=tfp_runner_attribute.name, **kwargs
)

pid = self.spm.run_command(
[sys.executable, *command], env=self.env_vars, show_output=show_output
)
command_obj = self.spm.get(pid)

return self.__get_executing_run(tfp_pathspec, command_obj)
return self.__get_executing_run(tfp_runner_attribute, command_obj)

def resume(self, show_output: bool = False, **kwargs):
"""
Expand All @@ -315,17 +336,19 @@ def resume(self, show_output: bool = False, **kwargs):
ExecutingRun object for this resumed run.
"""
with tempfile.TemporaryDirectory() as temp_dir:
tfp_pathspec = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False)
tfp_runner_attribute = tempfile.NamedTemporaryFile(
dir=temp_dir, delete=False
)
command = self.api(**self.top_level_kwargs).resume(
pathspec_file=tfp_pathspec.name, **kwargs
runner_attribute_file=tfp_runner_attribute.name, **kwargs
)

pid = self.spm.run_command(
[sys.executable, *command], env=self.env_vars, show_output=show_output
)
command_obj = self.spm.get(pid)

return self.__get_executing_run(tfp_pathspec, command_obj)
return self.__get_executing_run(tfp_runner_attribute, command_obj)

async def async_run(self, **kwargs) -> ExecutingRun:
"""
Expand All @@ -344,17 +367,20 @@ async def async_run(self, **kwargs) -> ExecutingRun:
ExecutingRun object for this run.
"""
with tempfile.TemporaryDirectory() as temp_dir:
tfp_pathspec = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False)
tfp_runner_attribute = tempfile.NamedTemporaryFile(
dir=temp_dir, delete=False
)
command = self.api(**self.top_level_kwargs).run(
pathspec_file=tfp_pathspec.name, **kwargs
runner_attribute_file=tfp_runner_attribute.name, **kwargs
)

pid = await self.spm.async_run_command(
[sys.executable, *command], env=self.env_vars
[sys.executable, *command],
env=self.env_vars,
)
command_obj = self.spm.get(pid)

return self.__get_executing_run(tfp_pathspec, command_obj)
return self.__get_executing_run(tfp_runner_attribute, command_obj)

async def async_resume(self, **kwargs):
"""
Expand All @@ -373,17 +399,20 @@ async def async_resume(self, **kwargs):
ExecutingRun object for this resumed run.
"""
with tempfile.TemporaryDirectory() as temp_dir:
tfp_pathspec = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False)
tfp_runner_attribute = tempfile.NamedTemporaryFile(
dir=temp_dir, delete=False
)
command = self.api(**self.top_level_kwargs).resume(
pathspec_file=tfp_pathspec.name, **kwargs
runner_attribute_file=tfp_runner_attribute.name, **kwargs
)

pid = await self.spm.async_run_command(
[sys.executable, *command], env=self.env_vars
[sys.executable, *command],
env=self.env_vars,
)
command_obj = self.spm.get(pid)

return self.__get_executing_run(tfp_pathspec, command_obj)
return self.__get_executing_run(tfp_runner_attribute, command_obj)

def __exit__(self, exc_type, exc_value, traceback):
self.spm.cleanup()
Expand Down
5 changes: 1 addition & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,7 @@
[console_scripts]
metaflow=metaflow.cmd.main_cli:start
""",
install_requires=[
"requests",
"boto3",
],
install_requires=["requests", "boto3"],
extras_require={
"stubs": ["metaflow-stubs==%s" % version],
},
Expand Down
66 changes: 36 additions & 30 deletions test/core/contexts.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,19 @@
"--quiet"
],
"run_options": [
"--max-workers", "50",
"--max-num-splits", "10000",
"--tag", "\u523a\u8eab means sashimi",
"--tag", "multiple tags should be ok"
"--max-workers=50",
"--max-num-splits=10000",
"--tag=\u523a\u8eab means sashimi",
"--tag=multiple tags should be ok"
],
"checks": [ "python3-cli", "python3-metadata"],
"disabled_tests": [
"LargeArtifactTest",
"S3FailureTest",
"CardComponentRefreshTest",
"CardWithRefreshTest"
]
],
"executors": ["cli", "api"]
},
{
"name": "python3-all-local-cards-realtime",
Expand All @@ -51,16 +52,17 @@
"--quiet"
],
"run_options": [
"--max-workers", "50",
"--max-num-splits", "10000",
"--tag", "\u523a\u8eab means sashimi",
"--tag", "multiple tags should be ok"
"--max-workers=50",
"--max-num-splits=10000",
"--tag=\u523a\u8eab means sashimi",
"--tag=multiple tags should be ok"
],
"checks": [ "python3-cli", "python3-metadata"],
"enabled_tests": [
"CardComponentRefreshTest",
"CardWithRefreshTest"
]
],
"executors": ["cli", "api"]
},
{
"name": "python3-all-local-azure-storage",
Expand All @@ -81,16 +83,17 @@
"--quiet"
],
"run_options": [
"--max-workers", "50",
"--max-num-splits", "10000",
"--tag", "\u523a\u8eab means sashimi",
"--tag", "multiple tags should be ok"
"--max-workers=50",
"--max-num-splits=10000",
"--tag=\u523a\u8eab means sashimi",
"--tag=multiple tags should be ok"
],
"checks": [ "python3-cli", "python3-metadata"],
"disabled_tests": [
"LargeArtifactTest",
"S3FailureTest"
]
],
"executors": ["cli", "api"]
},
{
"name": "dev-local",
Expand All @@ -111,15 +114,16 @@
"--quiet"
],
"run_options": [
"--max-workers", "50",
"--max-num-splits", "10000",
"--tag", "\u523a\u8eab means sashimi",
"--tag", "multiple tags should be ok"
"--max-workers=50",
"--max-num-splits=10000",
"--tag=\u523a\u8eab means sashimi",
"--tag=multiple tags should be ok"
],
"checks": ["python3-cli", "python3-metadata"],
"disabled_tests": [
"S3FailureTest"
]
],
"executors": ["cli", "api"]
},
{
"name": "python3-batch",
Expand All @@ -140,10 +144,10 @@
"METAFLOW_DEFAULT_METADATA": "service"
},
"run_options": [
"--max-workers", "50",
"--max-num-splits", "10000",
"--tag", "\u523a\u8eab means sashimi",
"--tag", "multiple tags should be ok"
"--max-workers=50",
"--max-num-splits=10000",
"--tag=\u523a\u8eab means sashimi",
"--tag=multiple tags should be ok"
],
"checks": ["python3-cli", "python3-metadata"],
"disabled_tests": [
Expand All @@ -156,7 +160,8 @@
"TimeoutDecoratorTest",
"CardExtensionsImportTest",
"RunIdFileTest"
]
],
"executors": ["cli", "api"]
},
{
"name": "python3-k8s",
Expand All @@ -177,10 +182,10 @@
"METAFLOW_DEFAULT_METADATA": "service"
},
"run_options": [
"--max-workers", "50",
"--max-num-splits", "10000",
"--tag", "\u523a\u8eab means sashimi",
"--tag", "multiple tags should be ok"
"--max-workers=50",
"--max-num-splits=10000",
"--tag=\u523a\u8eab means sashimi",
"--tag=multiple tags should be ok"
],
"checks": ["python3-cli", "python3-metadata"],
"disabled_tests": [
Expand All @@ -193,7 +198,8 @@
"TimeoutDecoratorTest",
"CardExtensionsImportTest",
"RunIdFileTest"
]
],
"executors": ["cli", "api"]
}
],
"checks": {
Expand Down