Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bugfix][Executor] Fix the issue of duplicate line numbers in the chat group run scenario #3145

Merged
merged 33 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
eaf08b3
Add is_chat_group_run
PeiwenGaoMS May 8, 2024
1fd90f3
Add is_chat_group_run to ChatGroupOrchestrator
PeiwenGaoMS May 8, 2024
a1107a1
Add is_chat_group_run to SingleLinePythonExecutorProxy
PeiwenGaoMS May 8, 2024
997b144
Put real_line_number to input_queue
PeiwenGaoMS May 8, 2024
a18c491
Add some comments to _process_line_number and _get_line_number
PeiwenGaoMS May 8, 2024
a706037
Add comments and rename _retrieve_line_number
PeiwenGaoMS May 8, 2024
4bb7592
Update comments
PeiwenGaoMS May 8, 2024
b685cd0
Update origin_line_number
PeiwenGaoMS May 8, 2024
86b59a5
Use request_id as the key of result dict
PeiwenGaoMS May 8, 2024
fc33b42
Remove parameter is_chat_group_run
PeiwenGaoMS May 8, 2024
4ae830c
Update type of result dict
PeiwenGaoMS May 8, 2024
825e296
Change get to pop when getting line result from result dict
PeiwenGaoMS May 8, 2024
b60ce5d
Merge branch 'main' of https://github.com/microsoft/promptflow into d…
PeiwenGaoMS May 8, 2024
48d8c09
Merge branch 'main' into devs/peiwen/fix_chatgroup_run
May 9, 2024
bf75c70
update recording
May 9, 2024
ef4d6c5
update record again
May 9, 2024
8414f23
Merge branch 'main' into devs/peiwen/fix_chatgroup_run
PeiwenGaoMS May 10, 2024
afbaa7c
Merge branch 'main' of https://github.com/microsoft/promptflow into d…
chw-microsoft May 10, 2024
99e696b
Merge branch 'main' into devs/peiwen/fix_chatgroup_run
PeiwenGaoMS May 10, 2024
b2de23b
update shelve files
chw-microsoft May 10, 2024
b968450
revert changes
chw-microsoft May 10, 2024
e0e8ff5
update shelve file
chw-microsoft May 10, 2024
d06ec6e
Merge branch 'main' into devs/peiwen/fix_chatgroup_run
PeiwenGaoMS May 11, 2024
086c569
Merge branch 'main' into devs/peiwen/fix_chatgroup_run
PeiwenGaoMS May 11, 2024
3abdc21
update the shevle with recording
chw-microsoft May 11, 2024
47922fe
Merge branch 'devs/peiwen/fix_chatgroup_run' of https://github.com/mi…
chw-microsoft May 11, 2024
b233cf7
Merge branch 'main' into devs/peiwen/fix_chatgroup_run
PeiwenGaoMS May 11, 2024
5a5a34c
Merge branch 'main' into devs/peiwen/fix_chatgroup_run
PeiwenGaoMS May 11, 2024
f5463cb
updte the recording for test_chat_group_batch_run
chw-microsoft May 13, 2024
c19e277
Merge branch 'main' into devs/peiwen/fix_chatgroup_run
PeiwenGaoMS May 13, 2024
36cd794
Merge branch 'main' into devs/peiwen/fix_chatgroup_run
PeiwenGaoMS May 13, 2024
88fbbf1
Remove recording for now
PeiwenGaoMS May 13, 2024
9b639b8
Revert recording change
PeiwenGaoMS May 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import signal
import sys
import threading
import uuid
from contextlib import nullcontext
from datetime import datetime
from functools import partial
Expand Down Expand Up @@ -118,7 +119,7 @@ def __init__(
self._serialize_multimedia_during_execution = serialize_multimedia_during_execution

# Initialize the results dictionary that stores line results.
self._result_dict: Dict[int, LineResult] = {}
self._result_dict: Dict[str, LineResult] = {}

# Initialize some fields from flow_executor and construct flow_create_kwargs
self._flow_id = flow_executor._flow_id
Expand Down Expand Up @@ -250,11 +251,12 @@ def close(self):

async def submit(self, run_id: str, line_number: int, inputs: dict):
"""Submit a line execution request to the process pool and return the line result."""
self._task_queue.put((run_id, line_number, inputs))
request_id = get_request_id()
self._task_queue.put((request_id, run_id, line_number, inputs))
start_time = datetime.utcnow()
line_result = None
while not self._line_timeout_expired(start_time, buffer_sec=20) and not line_result:
line_result = self._result_dict.get(line_number, None)
line_result = self._result_dict.pop(request_id, None)
# Check monitor status every 1 second
self._monitor_thread_pool_status()
await asyncio.sleep(1)
Expand All @@ -279,6 +281,7 @@ async def run(self, batch_inputs) -> List[LineResult]:
for index, inputs in batch_inputs:
self._task_queue.put(
(
get_request_id(),
self._run_id,
index,
inputs,
Expand Down Expand Up @@ -322,14 +325,14 @@ async def run(self, batch_inputs) -> List[LineResult]:
# Set the timeout flag to True and log the warning.
self._is_timeout = True
bulk_logger.warning(f"The batch run timed out, with {len(self._result_dict)} line results processed.")
return [self._result_dict[key] for key in sorted(self._result_dict)]
return [line_result for line_result in sorted(self._result_dict.values(), key=lambda item: item.run_info.index)]

# region monitor thread target function

def _monitor_workers_and_process_tasks_in_thread(
self,
task_queue: Queue,
result_dict: Dict[int, LineResult],
result_dict: Dict[str, LineResult],
index: int,
input_queue: Queue,
output_queue: Queue,
Expand Down Expand Up @@ -364,7 +367,7 @@ def _monitor_workers_and_process_tasks_in_thread(
terminated = True
else:
# If the task is a line execution request, put the request into the input queue.
run_id, line_number, inputs = data
request_id, run_id, line_number, inputs = data
args = (run_id, line_number, inputs, line_timeout_sec)
input_queue.put(args)

Expand All @@ -388,7 +391,7 @@ def _monitor_workers_and_process_tasks_in_thread(
# Handle output queue message.
message = self._handle_output_queue_messages(output_queue)
if isinstance(message, LineResult):
result_dict[line_number] = message
result_dict[request_id] = message
completed = True
break
if isinstance(message, NodeRunInfo):
Expand Down Expand Up @@ -438,7 +441,7 @@ def _monitor_workers_and_process_tasks_in_thread(
ex,
returned_node_run_infos,
)
result_dict[line_number] = result
result_dict[request_id] = result

self._completed_idx[line_number] = format_current_process_info(process_name, process_id, line_number)
log_process_status(process_name, process_id, line_number, is_failed=True)
Expand Down Expand Up @@ -842,4 +845,12 @@ def format_current_process_info(process_name, pid, line_number: int):
return f"Process name({process_name})-Process id({pid})-Line number({line_number})"


def get_request_id() -> str:
"""
Treat each input as a request to the line process pool and
get the id of each request to use it as the key for the result_dict.
"""
return str(uuid.uuid4())


# endregion
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def __init__(self, name: str, output_path: Path):
self.flow = None


@pytest.mark.usefixtures("use_secrets_config_file", "dev_connections", "recording_injection")
@pytest.mark.usefixtures("use_secrets_config_file", "dev_connections")
@pytest.mark.e2etest
class TestBatch:
def test_batch_storage(self, dev_connections):
Expand Down