Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
KyleGoyette committed May 2, 2024
1 parent 2d2b25e commit 8659dd6
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,13 @@ async def test_reconcile_max_concurrency(mocked_test_manager_reconile):


@pytest.mark.asyncio
async def test_reconcile_clear_unowned_item(mocked_test_manager_reconile):
async def test_reconcile_not_clear_unowned_item(mocked_test_manager_reconile):
mocked_test_manager_reconile.max_concurrency = 0
mocked_test_manager_reconile.active_runs = {"not-test-id": MagicMock()}
mocked_test_manager_reconile.release_item = MagicMock()
await mocked_test_manager_reconile.reconcile()
mocked_test_manager_reconile.release_item.assert_called_once_with("not-test-id")
# local process controller doesn't clear unowned items
assert mocked_test_manager_reconile.release_item.call_count == 0


@pytest.fixture
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from unittest.mock import MagicMock

import pytest
Expand All @@ -10,31 +11,34 @@ async def __call__(self, *args, **kwargs):


@pytest.fixture
def scheduler_controller(controller_config, jobset):
def scheduler_controller(controller_config, jobset, mocker):
mock_scheduler_manager = AsyncMock()
mock_scheduler_manager.active_runs = MagicMock()
mock_scheduler_manager.active_runs.return_value = {}
mock_scheduler_manager.launch_scheduler_item = AsyncMock()

mock_queue = AsyncMock()
mock_queue.get.return_value = MagicMock()
scheduler_manager = SchedulerController(
scheduler_controller = SchedulerController(
mock_scheduler_manager, 1, mock_queue, MagicMock()
)
return scheduler_manager
return scheduler_controller


@pytest.mark.asyncio
async def test_scheduler_controller_poll(scheduler_controller):
await scheduler_controller.poll()
assert scheduler_controller._scheduler_jobs_queue.get.called_once()
assert scheduler_controller._controller.launch_scheduler_item.called_once()
await asyncio.sleep(1)
assert scheduler_controller._scheduler_jobs_queue.get.call_count == 1
assert scheduler_controller._manager.launch_scheduler_item.call_count == 1


@pytest.mark.asyncio
async def test_scheduler_controller_poll_max_jobs(scheduler_controller, capsys):
scheduler_controller._scheduler_jobs_queue.get.return_value = MagicMock()
scheduler_controller._controller.active_runs = {1: MagicMock()}
scheduler_controller._manager.active_runs = {1: MagicMock()}
await scheduler_controller.poll()
assert scheduler_controller._controller.launch_scheduler_item.call_count == 0
await asyncio.sleep(1)
assert scheduler_controller._manager.launch_scheduler_item.call_count == 0
captured = capsys.readouterr()
assert "Agent already running the maximum number" in captured.err
10 changes: 5 additions & 5 deletions wandb/sdk/launch/agent2/controllers/scheduler_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ async def scheduler_process_controller(
class SchedulerController:
def __init__(
self,
controller: LocalProcessManager,
manager: LocalProcessManager,
max_schedulers: int,
scheduler_jobs_queue: "asyncio.Queue[JobWithQueue]",
logger: logging.Logger,
):
self._controller = controller
self._manager = manager
self._scheduler_jobs_queue = scheduler_jobs_queue
self._logger = logger
self._max_schedulers = max_schedulers
Expand All @@ -61,13 +61,12 @@ async def poll(self):
"this value use `max_schedulers` key in the agent config"
)
return
asyncio.create_task(self._controller.launch_scheduler_item(job))
self._scheduler_jobs_queue.task_done()
asyncio.create_task(self._manager.launch_scheduler_item(job))
self._logger.info(f"Launched scheduler job: {job}")

@property
def active_runs(self):
return self._controller.active_runs
return self._manager.active_runs


class SchedulerManager(LocalProcessManager):
Expand All @@ -92,6 +91,7 @@ async def ack_run_queue_item(self, queue_item: str, run_id: str):

async def launch_scheduler_item(self, item: JobWithQueue) -> Optional[str]:
self.logger.info(f"Launching item: {item}")
print("LAUNCHING SCHEDULER")
project = self._populate_project(item)
project.fetch_and_validate_project()

Check warning on line 96 in wandb/sdk/launch/agent2/controllers/scheduler_controller.py

View check run for this annotation

Codecov / codecov/patch

wandb/sdk/launch/agent2/controllers/scheduler_controller.py#L93-L96

Added lines #L93 - L96 were not covered by tests

Expand Down

0 comments on commit 8659dd6

Please sign in to comment.