Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix copy_files support with Parsl #1942

Merged
merged 19 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 32 additions & 31 deletions docs/user/wflow_engine/executors.md
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ If you haven't done so already:
```python
import parsl
from parsl.config import Config
from parsl.dataflow.dependency_resolvers import DEEP_DEPENDENCY_RESOLVER
from parsl.executors import HighThroughputExecutor
from parsl.launchers import SrunLauncher
from parsl.providers import SlurmProvider
Expand All @@ -394,24 +395,25 @@ If you haven't done so already:
env_vars = f"export OMP_NUM_THREADS={cores_per_job},1" # (1)!

config = Config(
strategy="htex_auto_scale", # (2)!
dependency_resolver=DEEP_DEPENDENCY_RESOLVER, # (2)!
strategy="htex_auto_scale", # (3)!
executors=[
HighThroughputExecutor(
label="quacc_parsl", # (3)!
max_workers_per_node=cores_per_node, # (4)!
cores_per_worker=cores_per_job, # (5)!
label="quacc_parsl", # (4)!
max_workers_per_node=cores_per_node, # (5)!
cores_per_worker=cores_per_job, # (6)!
provider=SlurmProvider(
account=account,
qos="debug",
constraint="cpu",
worker_init=f"source ~/.bashrc && conda activate quacc && {env_vars}", # (6)!
walltime="00:10:00", # (7)!
nodes_per_block=nodes_per_allocation, # (8)!
init_blocks=0, # (9)!
min_blocks=min_allocations, # (10)!
max_blocks=max_allocations, # (11)!
launcher=SrunLauncher(), # (12)!
cmd_timeout=60, # (13)!
worker_init=f"source ~/.bashrc && conda activate quacc && {env_vars}", # (7)!
walltime="00:10:00", # (8)!
nodes_per_block=nodes_per_allocation, # (9)!
init_blocks=0, # (10)!
min_blocks=min_allocations, # (11)!
max_blocks=max_allocations, # (12)!
launcher=SrunLauncher(), # (13)!
cmd_timeout=60, # (14)!
),
)
],
Expand All @@ -423,29 +425,31 @@ If you haven't done so already:

1. Since we are running single-core jobs, we need to set the `OMP_NUM_THREADS` environment variable to "1,1" according to the [TBLite documentation](https://tblite.readthedocs.io/en/latest/tutorial/parallel.html#running-tblite-in-parallel).

2. Unique to the `HighThroughputExecutor`, this `strategy` will automatically scale the number of active blocks (i.e. Slurm allocations) up or down based on the number of jobs remaining. We set `max_blocks=1` here so it can't scale up beyond 1 Slurm job, but it can scale down from 1 to 0 since `min_blocks=0`. By setting `init_blocks=0`, no Slurm allocation will be requested until jobs are launched.
2. This is an opt-in feature needed when using Parsl to ensure all features in quacc are supported.

3. This is just an arbitrary label for file I/O.
3. Unique to the `HighThroughputExecutor`, this `strategy` will automatically scale the number of active blocks (i.e. Slurm allocations) up or down based on the number of jobs remaining. We set `max_blocks=1` here so it can't scale up beyond 1 Slurm job, but it can scale down from 1 to 0 since `min_blocks=0`. By setting `init_blocks=0`, no Slurm allocation will be requested until jobs are launched.

4. The maximum number of running jobs per node. If you are running a non-MPI job, this value will generally be the number of physical cores per node (this example). Perlmutter has 128 physical CPU cores, so we have set a value of 128 here.
4. This is just an arbitrary label for file I/O.

5. The number of cores per job. We are running single-core jobs in this example.
5. The maximum number of running jobs per node. If you are running a non-MPI job, this value will generally be the number of physical cores per node (this example). Perlmutter has 128 physical CPU cores, so we have set a value of 128 here.

6. Any commands to run before carrying out any of the Parsl jobs. This is useful for setting environment variables, activating a given Conda environment, and loading modules.
6. The number of cores per job. We are running single-core jobs in this example.

7. The walltime for each block (i.e. Slurm allocation).
7. Any commands to run before carrying out any of the Parsl jobs. This is useful for setting environment variables, activating a given Conda environment, and loading modules.

8. The number of nodes that each block (i.e. Slurm allocation) should allocate.
8. The walltime for each block (i.e. Slurm allocation).

9. Sets the number of blocks (e.g. Slurm allocations) to provision during initialization of the workflow. We set this to a value of 0 so that there isn't a running Slurm job before any jobs have been submitted to Parsl.
9. The number of nodes that each block (i.e. Slurm allocation) should allocate.

10. Sets the minimum number of blocks (e.g. Slurm allocations) to maintain during [elastic resource management](https://parsl.readthedocs.io/en/stable/userguide/execution.html#elasticity). We set this to 0 so that Slurm jobs aren't running when there are no remaining jobs.
10. Sets the number of blocks (e.g. Slurm allocations) to provision during initialization of the workflow. We set this to a value of 0 so that there isn't a running Slurm job before any jobs have been submitted to Parsl.

11. Sets the maximum number of active blocks (e.g. Slurm allocations) during [elastic resource management](https://parsl.readthedocs.io/en/stable/userguide/execution.html#elasticity). We set this to 1 here, but it can be increased to have multiple Slurm jobs running simultaneously. Raising `max_blocks` to a larger value will allow the "htex_auto_scale" strategy to upscale resources as needed.
11. Sets the minimum number of blocks (e.g. Slurm allocations) to maintain during [elastic resource management](https://parsl.readthedocs.io/en/stable/userguide/execution.html#elasticity). We set this to 0 so that Slurm jobs aren't running when there are no remaining jobs.

12. The type of Launcher to use. `SrunLauncher()` will distribute jobs across the cores and nodes of the Slurm allocation. It should not be used for `PythonApp`s that themselves call MPI, which should use `SimpleLauncher()` instead.
12. Sets the maximum number of active blocks (e.g. Slurm allocations) during [elastic resource management](https://parsl.readthedocs.io/en/stable/userguide/execution.html#elasticity). We set this to 1 here, but it can be increased to have multiple Slurm jobs running simultaneously. Raising `max_blocks` to a larger value will allow the "htex_auto_scale" strategy to upscale resources as needed.

13. The maximum time to wait (in seconds) for the job scheduler info to be retrieved/sent.
13. The type of Launcher to use. `SrunLauncher()` will distribute jobs across the cores and nodes of the Slurm allocation. It should not be used for `PythonApp`s that themselves call MPI, which should use `SimpleLauncher()` instead.

14. The maximum time to wait (in seconds) for the job scheduler info to be retrieved/sent.

14. This will tidy up the Parsl logging to match the same log level as in quacc (`INFO` by default).

Expand Down Expand Up @@ -782,6 +786,7 @@ First, prepare your `QUACC_VASP_PP_PATH` environment variable in the `~/.bashrc`
```python
import parsl
from parsl.config import Config
from parsl.dataflow.dependency_resolvers import DEEP_DEPENDENCY_RESOLVER
from parsl.executors import HighThroughputExecutor
from parsl.launchers import SimpleLauncher
from parsl.providers import SlurmProvider
Expand All @@ -794,14 +799,13 @@ First, prepare your `QUACC_VASP_PP_PATH` environment variable in the `~/.bashrc`
vasp_parallel_cmd = (
f"srun -N {nodes_per_job} --ntasks-per-node={cores_per_node} --cpu_bind=cores"
)
min_allocations = 0
max_allocations = 1

config = Config(
dependency_resolver=DEEP_DEPENDENCY_RESOLVER,
strategy="htex_auto_scale",
executors=[
HighThroughputExecutor(
label="quacc_parsl",
label="quacc_mpi_parsl",
max_workers_per_node=nodes_per_allocation // nodes_per_job, # (1)!
cores_per_worker=1e-6, # (2)!
provider=SlurmProvider(
Expand All @@ -811,9 +815,6 @@ First, prepare your `QUACC_VASP_PP_PATH` environment variable in the `~/.bashrc`
worker_init=f"source ~/.bashrc && conda activate quacc && module load vasp/6.4.1-cpu && export QUACC_VASP_PARALLEL_CMD='{vasp_parallel_cmd}'",
walltime="00:10:00",
nodes_per_block=nodes_per_allocation,
init_blocks=0,
min_blocks=min_allocations,
max_blocks=max_allocations,
launcher=SimpleLauncher(), # (3)!
cmd_timeout=60,
),
Expand All @@ -825,7 +826,7 @@ First, prepare your `QUACC_VASP_PP_PATH` environment variable in the `~/.bashrc`
parsl.load(config)
```

1. Unlike the prior example, here `max_workers_per_node` is defining the maximum number of concurrent jobs in total and not the maximum number of jobs run per node.
1. Unlike the prior example, here `max_workers_per_node` is defining the maximum number of concurrent MPI jobs to run per allocation.

2. This is recommended in the Parsl manual for jobs that spawn MPI processes.

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ jobflow = ["jobflow[fireworks]>=0.1.14", "jobflow-remote>=0.1.0"]
mlp = ["matgl>=1.0.0", "chgnet>=0.3.3", "mace-torch>=0.3.3", "torch-dftd>=0.4.0"]
mp = ["atomate2>=0.0.14"]
newtonnet = ["newtonnet>=1.1"]
parsl = ["parsl[monitoring]>=2023.10.23; platform_system!='Windows'"]
parsl = ["parsl[monitoring]>=2024.5.27; platform_system!='Windows'"]
phonons = ["phonopy>=2.20.0", "seekpath>=2.1.0"]
prefect = ["prefect[dask]>=2.19.0", "dask-jobqueue>=0.8.2"]
redun = ["redun>=0.16.2"]
Expand Down
5 changes: 4 additions & 1 deletion tests/parsl/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
from pathlib import Path
from shutil import rmtree

from parsl.config import Config
from parsl.dataflow.dependency_resolvers import DEEP_DEPENDENCY_RESOLVER

TEST_RESULTS_DIR = Path(__file__).parent / "_test_results"
TEST_SCRATCH_DIR = Path(__file__).parent / "_test_scratch"
TEST_RUNINFO = Path(__file__).parent / "runinfo"
Expand All @@ -17,7 +20,7 @@ def pytest_sessionstart():
import os

if parsl:
parsl.load()
parsl.load(Config(dependency_resolver=DEEP_DEPENDENCY_RESOLVER))
file_dir = Path(__file__).parent
os.environ["QUACC_CONFIG_FILE"] = str(file_dir / "quacc.yaml")
os.environ["QUACC_RESULTS_DIR"] = str(TEST_RESULTS_DIR)
Expand Down
18 changes: 9 additions & 9 deletions tests/parsl/test_emt_recipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from ase.build import bulk

from quacc import job
from quacc import flow, job
from quacc.recipes.emt.core import relax_job # skipcq: PYL-C0412
from quacc.recipes.emt.slabs import bulk_to_slabs_flow # skipcq: PYL-C0412

Expand All @@ -27,16 +27,16 @@ def test_functools(tmp_path, monkeypatch, job_decorators):
assert result[-1]["parameters_opt"]["fmax"] == 0.1


# def test_copy_files(tmp_path, monkeypatch):
# monkeypatch.chdir(tmp_path)
# atoms = bulk("Cu")
def test_copy_files(tmp_path, monkeypatch):
monkeypatch.chdir(tmp_path)
atoms = bulk("Cu")

# @flow
# def myflow(atoms):
# result1 = relax_job(atoms)
# return relax_job(result1["atoms"], copy_files={result1["dir_name"]: "opt.*"})
@flow
def myflow(atoms):
result1 = relax_job(atoms)
return relax_job(result1["atoms"], copy_files={result1["dir_name"]: "opt.*"})

# assert "atoms" in myflow(atoms).result()
assert "atoms" in myflow(atoms).result()


def test_phonon_flow(tmp_path, monkeypatch):
Expand Down
Loading