Skip to content

Commit

Permalink
[Executor] Add timeout task to async scheduler to achieve timeout fea…
Browse files Browse the repository at this point in the history
…ture. (#3176)

# Description
Improve the timeout handling of async scheduler.

Before:
The async nodes would be keep running until the process is killed.

After:
The async function would got an asyncio.CanceledError, then the line
raises LineExecutionTimeoutError.

This pull request introduces changes in the
`src/promptflow-core/promptflow/executor/_async_nodes_scheduler.py` and
`src/promptflow-core/promptflow/executor/flow_executor.py` files to
handle line execution timeouts. The `Optional` type has been imported
and a `timeout_seconds` parameter has been added to the `execute` and
`_execute_with_thread_pool` methods. A timeout mechanism has been
implemented in `_execute_with_thread_pool` to cancel tasks that exceed
the specified timeout. A `cancel_tasks` method has also been added to
cancel tasks and yield control to the event loop for cleanup. The
`LineExecutionTimeoutError` exception is raised when a timeout occurs.

Changes to handle line execution timeouts:

*
[`src/promptflow-core/promptflow/executor/_async_nodes_scheduler.py`](diffhunk://#diff-cd6772e1603e398a564a4d2768ee2f7ffdf7c5e3e0bb7cda5bf19e560f143901L14-R14):
Imported `Optional` type and added `timeout_seconds` parameter to
`execute` and `_execute_with_thread_pool` methods. Implemented timeout
mechanism in `_execute_with_thread_pool` and added `cancel_tasks` method
to cancel tasks and yield control to the event loop for cleanup. Raised
`LineExecutionTimeoutError` when timeout occurs.
[[1]](diffhunk://#diff-cd6772e1603e398a564a4d2768ee2f7ffdf7c5e3e0bb7cda5bf19e560f143901L14-R14)
[[2]](diffhunk://#diff-cd6772e1603e398a564a4d2768ee2f7ffdf7c5e3e0bb7cda5bf19e560f143901L23-R23)
[[3]](diffhunk://#diff-cd6772e1603e398a564a4d2768ee2f7ffdf7c5e3e0bb7cda5bf19e560f143901R47)
[[4]](diffhunk://#diff-cd6772e1603e398a564a4d2768ee2f7ffdf7c5e3e0bb7cda5bf19e560f143901L78-R79)
[[5]](diffhunk://#diff-cd6772e1603e398a564a4d2768ee2f7ffdf7c5e3e0bb7cda5bf19e560f143901R92-R106)
[[6]](diffhunk://#diff-cd6772e1603e398a564a4d2768ee2f7ffdf7c5e3e0bb7cda5bf19e560f143901R116-R124)
*
[`src/promptflow-core/promptflow/executor/flow_executor.py`](diffhunk://#diff-bec06607cb28fd791b8ed11bb488979344ca342be5f1c67ba6dd663d5e12240fL1200-R1200):
Passed `self._line_timeout_sec` as `timeout_seconds` to
`scheduler.execute` method.

# All Promptflow Contribution checklist:
- [ ] **The pull request does not introduce [breaking changes].**
- [ ] **CHANGELOG is updated for new features, bug fixes or other
significant changes.**
- [ ] **I have read the [contribution guidelines](../CONTRIBUTING.md).**
- [ ] **Create an issue and link to the pull request to get dedicated
review from promptflow team. Learn more: [suggested
workflow](../CONTRIBUTING.md#suggested-workflow).**

## General Guidelines and Best Practices
- [ ] Title of the pull request is clear and informative.
- [ ] There are a small number of commits, each of which have an
informative message. This means that previously merged commits do not
appear in the history of the PR. For more information on cleaning up the
commits in your PR, [see this
page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md).

### Testing Guidelines
- [ ] Pull request includes test coverage for the included changes.

---------

Co-authored-by: Heyi <[email protected]>
  • Loading branch information
thy09 and Heyi committed May 13, 2024
1 parent c11438f commit 4489ca7
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 5 deletions.
26 changes: 22 additions & 4 deletions src/promptflow-core/promptflow/executor/_async_nodes_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import traceback
from asyncio import Task
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Dict, List, Tuple
from typing import Any, Dict, List, Optional, Tuple

from promptflow._core.flow_execution_context import FlowExecutionContext
from promptflow._core.tools_manager import ToolsManager
Expand All @@ -20,7 +20,7 @@
from promptflow._utils.utils import extract_user_frame_summaries, set_context, try_get_long_running_logging_interval
from promptflow.contracts.flow import Node
from promptflow.executor._dag_manager import DAGManager
from promptflow.executor._errors import NoNodeExecutedError
from promptflow.executor._errors import LineExecutionTimeoutError, NoNodeExecutedError

PF_ASYNC_NODE_SCHEDULER_EXECUTE_TASK_NAME = "_pf_async_nodes_scheduler.execute"
DEFAULT_TASK_LOGGING_INTERVAL = 60
Expand All @@ -44,6 +44,7 @@ async def execute(
nodes: List[Node],
inputs: Dict[str, Any],
context: FlowExecutionContext,
timeout_seconds: Optional[int] = None,
) -> Tuple[dict, dict]:
# Semaphore should be created in the loop, otherwise it will not work.
loop = asyncio.get_running_loop()
Expand Down Expand Up @@ -75,7 +76,7 @@ async def execute(
# Then the event loop will wait for all tasks to be completed before raising the cancellation error.
# See reference: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor
try:
outputs = await self._execute_with_thread_pool(executor, nodes, inputs, context)
outputs = await self._execute_with_thread_pool(executor, nodes, inputs, context, timeout_seconds)
except asyncio.CancelledError:
await self.cancel()
raise
Expand All @@ -88,12 +89,21 @@ async def _execute_with_thread_pool(
nodes: List[Node],
inputs: Dict[str, Any],
context: FlowExecutionContext,
timeout_seconds: Optional[int] = None,
) -> Tuple[dict, dict]:
flow_logger.info(f"Start to run {len(nodes)} nodes with the current event loop.")
start_time = time.time()
dag_manager = DAGManager(nodes, inputs)
task2nodes = self._execute_nodes(dag_manager, context, executor)
while not dag_manager.completed():
task2nodes = await self._wait_and_complete_nodes(task2nodes, dag_manager)
remaining_timeout = None if timeout_seconds is None else timeout_seconds - (time.time() - start_time)
task = asyncio.create_task(self._wait_and_complete_nodes(task2nodes, dag_manager))
try:
task2nodes = await asyncio.wait_for(task, remaining_timeout)
except asyncio.TimeoutError:
flow_logger.warning(f"Line execution timeout after {timeout_seconds} seconds.")
await self.cancel_tasks(task2nodes.keys())
raise LineExecutionTimeoutError(context._line_number, timeout_seconds)
submitted_tasks2nodes = self._execute_nodes(dag_manager, context, executor)
task2nodes.update(submitted_tasks2nodes)
# Set the event to notify the monitor thread to exit
Expand All @@ -103,6 +113,14 @@ async def _execute_with_thread_pool(
dag_manager.completed_nodes_outputs[node] = None
return dag_manager.completed_nodes_outputs, dag_manager.bypassed_nodes

async def cancel_tasks(self, tasks: List[asyncio.Task]):
for task in tasks:
if not task.done():
task.cancel()
cancel_timeout = 1
# Wait at most 1 second for the tasks to cleanup
await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED, timeout=cancel_timeout)

async def _wait_and_complete_nodes(self, task2nodes: Dict[Task, Node], dag_manager: DAGManager) -> Dict[Task, Node]:
if not task2nodes:
raise NoNodeExecutedError("No nodes are ready for execution, but the flow is not completed.")
Expand Down
2 changes: 1 addition & 1 deletion src/promptflow-core/promptflow/executor/flow_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1208,7 +1208,7 @@ async def _traverse_nodes_async(self, inputs, context: FlowExecutionContext) ->
batch_nodes = [node for node in self._flow.nodes if not node.aggregation]
flow_logger.info("Start executing nodes in async mode.")
scheduler = AsyncNodesScheduler(self._tools_manager, self._node_concurrency)
nodes_outputs, bypassed_nodes = await scheduler.execute(batch_nodes, inputs, context)
nodes_outputs, bypassed_nodes = await scheduler.execute(batch_nodes, inputs, context, self._line_timeout_sec)
outputs = self._extract_outputs(nodes_outputs, bypassed_nodes, inputs)
return outputs, nodes_outputs

Expand Down

0 comments on commit 4489ca7

Please sign in to comment.