Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial runner api #1732

Open
wants to merge 47 commits into
base: master
Choose a base branch
from
Open

initial runner api #1732

wants to merge 47 commits into from

Conversation

madhur-ob
Copy link
Collaborator

@madhur-ob madhur-ob commented Feb 15, 2024

The Usage looks something like this:

from metaflow import metaflow_runner

params = [{'alpha': 40}, {'alpha': 20}, {'alpha': 10}, {'alpha': 30}]
with metaflow_runner.Pool('../try.py', num_processes=2) as pool:
    results = pool.map(params)
    for i in results:
        print(i)

where the contents of try.py for the example are:

import time
from metaflow import FlowSpec, Parameter, step

class ParameterFlow(FlowSpec):
    alpha = Parameter('alpha',
                      help='Time to Sleep',
                      default=1)

    @step
    def start(self):
        print('alpha is %f' % self.alpha)
        print(f"Let me sleep for {self.alpha} seconds")
        time.sleep(self.alpha)
        self.next(self.end)

    @step
    def end(self):
        print('alpha is still %f' % self.alpha)

if __name__ == '__main__':
    ParameterFlow()

In the above example, the runs are executed 2 at a time. The order of them being completed is:

  • run that sleeps for 20 seconds
  • run that sleeps for 10 seconds
  • run that sleeps for 40 seconds
  • run that sleeps for 30 seconds

The total running time is 60 seconds. This is as follows:

  • 40 seconds that includes
    • run for 20 seconds
    • run for 10 seconds
    • 10 seconds of the run for 30 seconds
  • last 20 seconds of the run for 30 seconds

Assumptions:

  • Considering the use-case of Controlling and not Authoring metaflow runs... at least for now...
  • Only accept a file which contains the flow, and not an instance of the class which is derived from FlowSpec
  • Multiple concurrent runs can be spin off with the API

Things that need to be discussed

  • What other stuff should the API accept? (Eg: more environment variables, other options such as --with, etc.)
  • This is only the API for Pool, what about just Run?
  • What should a successful run return? Currently, we return the status code...
    • If the run is supposed to produce some data / artefact, how can we pass that further?
      Eg: this API is used as a step in some flow to produce an artefact which is consumed and used by another step
  • Error handling / Crashing aspects...

@romain-intel
Copy link
Contributor

I'm not going to comment on the implementation (which is fairly straightforward at this time anyways) because I want to align on some UX things first.

These are my initial thoughts on this but feel free to disagree with anything:

  • I think for the initial scope, we can restrict to basically the run command (so python myflow.py ... run ...). We should, however, try to design a UX that is then extensible to other command line options (resume, dump, step-functions, etc.) that are "flow based" and we can then later also take care of the "metaflow xyz" command line (although that one can probably be addressed in a different way so I am excluding it from too much consideration for now.
  • It doesn't feel like the current UX would extend to these other options. It is not so much a UX wrapping the "run" command but more an abstraction even on top of that that takes care of parallel executions of flows. In other words, I think the concept of having something like with Pool(...) is great, it should probably be built on top of a more direct overlay on the "run" CLI.
  • One aspect that I feel may be important to capture is the hierarchical nature of the command line (or at least it would be closer to what click does which means it may be easier to map functionality. For example, one option could be the following:
mf_cli = MetaflowCli(flow_file="myflow.py", environment="conda")
mf_cli.run(max_workers=5, alpha=123)

Not saying something exactly like this but this more cleanly maps to what we have for click and doesn't feel too un-natural (you have a Cli object -- or call it something else -- and from there you can run with various options exactly like you would on the command line). We can separately argue if we want to infer "environment" for example but that's a separate conversation :). Anyways, hopefully this shows the idea of having a multi level approach that maps more closely to click and can support any arbitrary options. It could be a very thin layer and should be reusable for various commands.

  • I also think that mf_cli.run should return immediately something akin to a future but also akin to the client's Run object. We could (with some modifications) actually return a Run object and you could do something like this:
my_run = mf_cli.run(max_workers=5, alpha=123)
while not my_run.finished
  ...

(note this is probably not a good idea since it can loop forever but you get the idea). We should also seriously revisit some of these status things (we had a long conversation in the past about this but it may become more useful as part of this as well -- for example, we can actually return a better "finished" value here because we know if the subprocess dies/goes away and so it may not run forever :)).
You could also get the log, etc. We could consider adding other functions to list things like "in process steps" for example or what not. Basically it's an object that has value during execution as well as after. If we want to use a recent analogy, it's like a "dynamic card" (it updates through the execution and is then frozen at the end of execution.

  • With something like that, it then becomes quite easy to write a Pool abstraction on top of it by having the async workers do something akin to what is above and just loop around until it is finished. As discussed, Pool should also return Run objects.

@madhur-ob
Copy link
Collaborator Author

madhur-ob commented Feb 21, 2024

The usage looks something like this, we can of-course pass blocking as True if we wanna wait...

with metaflow_runner.Runner('../try.py',  metadata="local") as runner:
    result = runner.run(blocking=False, alpha=5, tags=["abc", "def"], max_workers=5)
    print(result)
    while not result.finished:
        print("waiting for run to finish...")
        time.sleep(0.1)

or perhaps...

mf = metaflow_runner.Runner('../try.py', metadata="local")
result = mf.run(blocking=False, alpha=5, tags=["abc", "def"], max_workers=5)
print(result)
while not result.finished:
    print("waiting for run to finish...")
    time.sleep(0.1)

All this is built on top of a low-level chaining API that is dynamically constructed while traversing through the CLI command tree... Please see the following for usage:

from metaflow.click_api import MetaflowAPI
from metaflow.cli import start
    
api = MetaflowAPI.from_cli("../try.py", start)

command = api(metadata="local").run(
    tags=["abc", "def"],
    decospecs=["kubernetes"],
    max_workers=5,
    alpha=3,
    myfile="path/to/file",
)
print(command)

command = (
    api(metadata="local")
    .kubernetes()
    .step(
        step_name="process",
        code_package_sha="some_sha",
        code_package_url="some_url",
    )
)
print(command)

command = api().tag().add(tags=["abc", "def"])
print(command)

command = getattr(api(decospecs=["retry"]), "argo-workflows")().create()
print(command)

@madhur-ob
Copy link
Collaborator Author

Pushed out a subprocess manager that allows us to tail logs via streaming them into a queue first...
The higher level runner API uses this subprocess manager...

An example of how the higher level UX is as follows:

import asyncio
from metaflow import metaflow_runner

async def main():
    with metaflow_runner.Runner('../try.py',  metadata="local") as runner:
        result = await runner.run(blocking=False, alpha=5, tags=["abc", "def"], max_workers=5)
        print(result) # this is the Run object
        print(result.finished) # this should be False
        await runner.tail_logs(stream="stdout")
        print(result.finished) # this should be True

        # the below can also be done...

        # while not result.finished:
        #     print("waiting for run to finish...")
        #     `asyncio.sleep(0.1)` or perhaps `time.sleep(0.1)`

if __name__ == "__main__":
    asyncio.run(main())

the output looks like:

Run('ParameterFlow/1710272507865687')
False
2024-03-13 01:11:47.872
Workflow starting (run-id 1710272507865687):
2024-03-13 01:11:47.878 [1710272507865687/start/1 (pid 85339)] Task is starting.
2024-03-13 01:11:48.081 [1710272507865687/start/1 (pid 85339)] alpha is 5.000000
2024-03-13 01:11:53.112 [1710272507865687/start/1 (pid 85339)] Let me sleep for 5 seconds
2024-03-13 01:11:53.114
[1710272507865687/start/1 (pid 85339)] Task finished successfully.
2024-03-13 01:11:53.121 [1710272507865687/end/2 (pid 85381)] Task is starting.
2024-03-13 01:11:53.359 [1710272507865687/end/2 (pid 85381)] alpha is still 5.000000
2024-03-13 01:11:53.393 [1710272507865687/end/2 (pid 85381)] Task finished successfully.
2024-03-13 01:11:53.394 Done!
True

Copy link
Contributor

@romain-intel romain-intel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just looked at the process manager for now.

metaflow/cli.py Outdated Show resolved Hide resolved
metaflow/subprocess_manager.py Outdated Show resolved Hide resolved
metaflow/subprocess_manager.py Outdated Show resolved Hide resolved
metaflow/subprocess_manager.py Show resolved Hide resolved
metaflow/subprocess_manager.py Outdated Show resolved Hide resolved
metaflow/subprocess_manager.py Outdated Show resolved Hide resolved
metaflow/subprocess_manager.py Outdated Show resolved Hide resolved
metaflow/subprocess_manager.py Show resolved Hide resolved
metaflow/click_api.py Show resolved Hide resolved
metaflow/click_api.py Outdated Show resolved Hide resolved
metaflow/click_api.py Outdated Show resolved Hide resolved
metaflow/click_api.py Outdated Show resolved Hide resolved
metaflow/click_api.py Show resolved Hide resolved
metaflow/click_api.py Show resolved Hide resolved
@madhur-ob
Copy link
Collaborator Author

The latest usage looks like this:

import time
import asyncio
from metaflow import metaflow_runner

async def main():
    async with metaflow_runner.Runner('../try.py',  metadata="local") as runner:
        # returns immediately
        result = await runner.run(alpha=5, tags=["abc", "def"], max_workers=5)

        # can access properties of metaflow's Run object..
        print(result.finished)
        while not result.finished:
            print("waiting for run to finish...")
            time.sleep(1)
        print(result.successful)

        # can access properties of the CommandManager as well..
        print(result.log_files)

        await result.wait()
        await result.wait(timeout=2)
        await result.wait(stream="stderr")
        await result.wait(stream="stderr", timeout=3)

        async for _, line in result.stream_logs(stream="stdout"):
            print(line)

if __name__ == "__main__":
    asyncio.run(main())

Copy link
Contributor

@romain-intel romain-intel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initial comments. WIll need a bit more time but looks promising :)

metaflow/subprocess_manager.py Outdated Show resolved Hide resolved
metaflow/subprocess_manager.py Show resolved Hide resolved
metaflow/subprocess_manager.py Outdated Show resolved Hide resolved
else:
line = await asyncio.wait_for(f.readline(), timeout_per_line)
except asyncio.TimeoutError as e:
raise LogReadTimeoutError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could also return the position and None?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we would return:

  • the actual line
  • an empty line if EOF or just an actual empty line
  • None if a timeout.

I don't know if we want to distinguish the cases in 2.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can discuss for more clarity...

metaflow/subprocess_manager.py Outdated Show resolved Hide resolved
self.run_obj = run_obj

def __getattr__(self, name: str):
if hasattr(self.run_obj, name):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to think a bit more about this. We may want a more explicit control to make sure we expose what we want. It's also a bit harder to document this way.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can discuss more in the meeting...

@madhur-ob
Copy link
Collaborator Author

New Higher Level UX:

import time
import asyncio
from metaflow import metaflow_runner

if __name__ == "__main__":

    # sync version that calls run() directly
    with metaflow_runner.Runner('../try.py', metadata="local").run(alpha=5, tags=["abc", "def"], max_workers=5) as result:
        print(result.run.finished)
        print(result.stdout)

    # sync version using runner
    with metaflow_runner.Runner('../try.py', metadata="local") as runner:
        result = runner.run(alpha=5, tags=["abc", "def"], max_workers=5)
        print(result.run.finished)
        print(result.stdout)

    # async version that calls async_run() directly
    async def async_run_directly():
        with await metaflow_runner.Runner('../try.py', metadata="local").async_run(alpha=5, tags=["abc", "def"], max_workers=5) as result:
            print(result.run.data)
            print(result.run.finished)

            print(result.stdout)
            time.sleep(3)
            print(result.stdout)

            print((await result.wait()).stdout)
            # OR we can also do:
            #    await result.wait()
            #    print(result.stdout)

            while not result.run.finished:
                print("waiting for run to finish...")
                time.sleep(1)
            print(result.run.successful)

            print((await result.wait(timeout=3, stream="stdout")).run.data)
            print(result.run.finished)

            await result.wait(timeout=3, stream="stdout")

            async for _, line in result.stream_logs(stream="stdout"):
                print(line)

    asyncio.run(async_run_directly())

    # async version using runner
    async def async_runner():
        async with metaflow_runner.Runner('../try.py', metadata="local") as runner:
            result = await runner.async_run(alpha=5, tags=["abc", "def"], max_workers=5)

            print(result.run.data)
            print(result.run.finished)

            print(result.stdout)
            time.sleep(3)
            print(result.stdout)

            print((await result.wait()).stdout)
            # OR we can also do:
            #    await result.wait()
            #    print(result.stdout)

            while not result.run.finished:
                print("waiting for run to finish...")
                time.sleep(1)
            print(result.run.successful)

            print((await result.wait(timeout=3, stream="stdout")).run.data)
            print(result.run.finished)

            await result.wait(timeout=3, stream="stdout")

            async for _, line in result.stream_logs(stream="stdout"):
                print(line)

    asyncio.run(async_runner())

@madhur-ob madhur-ob changed the title initial runner map api initial runner api May 2, 2024
Copy link
Contributor

@romain-intel romain-intel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't checked all the logic in click_api.py but if it works for the runner, it should be good enough for now. We can improve as we go along and expose more of it.

metaflow/_vendor/vendor_any.txt Outdated Show resolved Hide resolved
metaflow/_vendor/vendor_v3_8.txt Outdated Show resolved Hide resolved
metaflow/metaflow_runner.py Show resolved Hide resolved
metaflow/metaflow_runner.py Show resolved Hide resolved
metaflow/metaflow_runner.py Show resolved Hide resolved
metaflow/subprocess_manager.py Outdated Show resolved Hide resolved
metaflow/subprocess_manager.py Outdated Show resolved Hide resolved
metaflow/subprocess_manager.py Outdated Show resolved Hide resolved
metaflow/click_api.py Outdated Show resolved Hide resolved
metaflow/click_api.py Outdated Show resolved Hide resolved
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants