Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use distributed fixture in tests #3070

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
37 changes: 2 additions & 35 deletions tests/ignite/contrib/engines/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,21 +642,9 @@ def test_setup_neptune_logging(dirname):
npt_logger.close()


@pytest.mark.distributed
@pytest.mark.skipif(not idist.has_native_dist_support, reason="Skip if no native dist support")
@pytest.mark.skipif(torch.cuda.device_count() < 1, reason="Skip if no GPU")
def test_distrib_nccl_gpu(dirname, distributed_context_single_node_nccl):
local_rank = distributed_context_single_node_nccl["local_rank"]
device = idist.device()
_test_setup_common_training_handlers(dirname, device, rank=local_rank, local_rank=local_rank, distributed=True)
test_add_early_stopping_by_val_score()


@pytest.mark.distributed
@pytest.mark.skipif(not idist.has_native_dist_support, reason="Skip if no native dist support")
def test_distrib_gloo_cpu_or_gpu(dirname, distributed_context_single_node_gloo):
def test_distrib_training_handlers(distributed, dirname):
local_rank = idist.get_local_rank()
device = idist.device()
local_rank = distributed_context_single_node_gloo["local_rank"]
_test_setup_common_training_handlers(dirname, device, rank=local_rank, local_rank=local_rank, distributed=True)
_test_setup_common_training_handlers(
dirname, device, rank=local_rank, local_rank=local_rank, distributed=True, lr_scheduler="ignite|LRScheduler"
Expand All @@ -665,24 +653,3 @@ def test_distrib_gloo_cpu_or_gpu(dirname, distributed_context_single_node_gloo):
dirname, device, rank=local_rank, local_rank=local_rank, distributed=True, lr_scheduler="ignite"
)
test_add_early_stopping_by_val_score()


@pytest.mark.multinode_distributed
@pytest.mark.skipif(not idist.has_native_dist_support, reason="Skip if no native dist support")
@pytest.mark.skipif("MULTINODE_DISTRIB" not in os.environ, reason="Skip if not multi-node distributed")
def test_multinode_distrib_gloo_cpu_or_gpu(dirname, distributed_context_multi_node_gloo):
device = idist.device()
rank = distributed_context_multi_node_gloo["rank"]
_test_setup_common_training_handlers(dirname, device, rank=rank)
test_add_early_stopping_by_val_score()


@pytest.mark.multinode_distributed
@pytest.mark.skipif(not idist.has_native_dist_support, reason="Skip if no native dist support")
@pytest.mark.skipif("GPU_MULTINODE_DISTRIB" not in os.environ, reason="Skip if not multi-node distributed")
def test_multinode_distrib_nccl_gpu(dirname, distributed_context_multi_node_nccl):
local_rank = distributed_context_multi_node_nccl["local_rank"]
rank = distributed_context_multi_node_nccl["rank"]
device = idist.device()
_test_setup_common_training_handlers(dirname, device, rank=rank, local_rank=local_rank, distributed=True)
test_add_early_stopping_by_val_score()
48 changes: 3 additions & 45 deletions tests/ignite/contrib/handlers/test_clearml_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,9 @@ def forward(self, x):
return self.net(x)


def _test_save_model_optimizer_lr_scheduler_with_state_dict(device, on_zero_rank=False):
@pytest.mark.parametrize("on_zero_rank", [True, False])
def test_distrib_save_model_optimizer_lr_scheduler_with_state_dict(distributed, on_zero_rank):
device = idist.device()
if idist.get_rank() == 0:
clearml.Task.current_task = MagicMock(spec=clearml.Task)
clearml.binding.frameworks.WeightsFileHandler.create_output_model = MagicMock()
Expand Down Expand Up @@ -991,47 +993,3 @@ def update_fn(engine, batch):
lr_scheduler_value = lr_scheduler_state_dict[key]
loaded_lr_scheduler_value = loaded_lr_scheduler_state_dict[key]
assert lr_scheduler_value == loaded_lr_scheduler_value


@pytest.mark.distributed
@pytest.mark.skipif(not idist.has_native_dist_support, reason="Skip if no native dist support")
def test_distrib_gloo_cpu_or_gpu(distributed_context_single_node_gloo):
device = idist.device()
_test_save_model_optimizer_lr_scheduler_with_state_dict(device)
_test_save_model_optimizer_lr_scheduler_with_state_dict(device, on_zero_rank=True)


@pytest.mark.distributed
@pytest.mark.skipif(not idist.has_native_dist_support, reason="Skip if no native dist support")
@pytest.mark.skipif(torch.cuda.device_count() < 1, reason="Skip if no GPU")
def test_distrib_nccl_gpu(distributed_context_single_node_nccl):
device = idist.device()
_test_save_model_optimizer_lr_scheduler_with_state_dict(device)
_test_save_model_optimizer_lr_scheduler_with_state_dict(device, on_zero_rank=True)


@pytest.mark.tpu
@pytest.mark.skipif("NUM_TPU_WORKERS" in os.environ, reason="Skip if NUM_TPU_WORKERS is in env vars")
@pytest.mark.skipif(not idist.has_xla_support, reason="Not on TPU device")
def test_distrib_single_device_xla():
device = idist.device()
assert "xla" in device.type
_test_save_model_optimizer_lr_scheduler_with_state_dict(device)


def _test_save_model_optimizer_lr_scheduler_with_state_dict_xla_nprocs(index):
device = idist.device()
_test_save_model_optimizer_lr_scheduler_with_state_dict(device)

import time

# hack to have all proc properly sync:
time.sleep(1)


@pytest.mark.tpu
@pytest.mark.skipif("NUM_TPU_WORKERS" not in os.environ, reason="Skip if NUM_TPU_WORKERS is in env vars")
@pytest.mark.skipif(not idist.has_xla_support, reason="Not on TPU device")
def test_distrib_single_device_xla_nprocs(xmp_executor):
n = int(os.environ["NUM_TPU_WORKERS"])
xmp_executor(_test_save_model_optimizer_lr_scheduler_with_state_dict_xla_nprocs, args=(), nprocs=n)