Skip to content

Commit

Permalink
Fix copy_files support with Parsl (#1942)
Browse files Browse the repository at this point in the history
## Summary of Changes

Closes #1776.

Requires:
- Parsl/parsl#3111

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
Andrew-S-Rosen and pre-commit-ci[bot] committed May 30, 2024
1 parent 0a1ec93 commit dcbb582
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 42 deletions.
63 changes: 32 additions & 31 deletions docs/user/wflow_engine/executors.md
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ If you haven't done so already:
```python
import parsl
from parsl.config import Config
from parsl.dataflow.dependency_resolvers import DEEP_DEPENDENCY_RESOLVER
from parsl.executors import HighThroughputExecutor
from parsl.launchers import SrunLauncher
from parsl.providers import SlurmProvider
Expand All @@ -394,24 +395,25 @@ If you haven't done so already:
env_vars = f"export OMP_NUM_THREADS={cores_per_job},1" # (1)!

config = Config(
strategy="htex_auto_scale", # (2)!
dependency_resolver=DEEP_DEPENDENCY_RESOLVER, # (2)!
strategy="htex_auto_scale", # (3)!
executors=[
HighThroughputExecutor(
label="quacc_parsl", # (3)!
max_workers_per_node=cores_per_node, # (4)!
cores_per_worker=cores_per_job, # (5)!
label="quacc_parsl", # (4)!
max_workers_per_node=cores_per_node, # (5)!
cores_per_worker=cores_per_job, # (6)!
provider=SlurmProvider(
account=account,
qos="debug",
constraint="cpu",
worker_init=f"source ~/.bashrc && conda activate quacc && {env_vars}", # (6)!
walltime="00:10:00", # (7)!
nodes_per_block=nodes_per_allocation, # (8)!
init_blocks=0, # (9)!
min_blocks=min_allocations, # (10)!
max_blocks=max_allocations, # (11)!
launcher=SrunLauncher(), # (12)!
cmd_timeout=60, # (13)!
worker_init=f"source ~/.bashrc && conda activate quacc && {env_vars}", # (7)!
walltime="00:10:00", # (8)!
nodes_per_block=nodes_per_allocation, # (9)!
init_blocks=0, # (10)!
min_blocks=min_allocations, # (11)!
max_blocks=max_allocations, # (12)!
launcher=SrunLauncher(), # (13)!
cmd_timeout=60, # (14)!
),
)
],
Expand All @@ -423,29 +425,31 @@ If you haven't done so already:

1. Since we are running single-core jobs, we need to set the `OMP_NUM_THREADS` environment variable to "1,1" according to the [TBLite documentation](https://tblite.readthedocs.io/en/latest/tutorial/parallel.html#running-tblite-in-parallel).

2. Unique to the `HighThroughputExecutor`, this `strategy` will automatically scale the number of active blocks (i.e. Slurm allocations) up or down based on the number of jobs remaining. We set `max_blocks=1` here so it can't scale up beyond 1 Slurm job, but it can scale down from 1 to 0 since `min_blocks=0`. By setting `init_blocks=0`, no Slurm allocation will be requested until jobs are launched.
2. This is an opt-in feature needed when using Parsl to ensure all features in quacc are supported.

3. This is just an arbitrary label for file I/O.
3. Unique to the `HighThroughputExecutor`, this `strategy` will automatically scale the number of active blocks (i.e. Slurm allocations) up or down based on the number of jobs remaining. We set `max_blocks=1` here so it can't scale up beyond 1 Slurm job, but it can scale down from 1 to 0 since `min_blocks=0`. By setting `init_blocks=0`, no Slurm allocation will be requested until jobs are launched.

4. The maximum number of running jobs per node. If you are running a non-MPI job, this value will generally be the number of physical cores per node (this example). Perlmutter has 128 physical CPU cores, so we have set a value of 128 here.
4. This is just an arbitrary label for file I/O.

5. The number of cores per job. We are running single-core jobs in this example.
5. The maximum number of running jobs per node. If you are running a non-MPI job, this value will generally be the number of physical cores per node (this example). Perlmutter has 128 physical CPU cores, so we have set a value of 128 here.

6. Any commands to run before carrying out any of the Parsl jobs. This is useful for setting environment variables, activating a given Conda environment, and loading modules.
6. The number of cores per job. We are running single-core jobs in this example.

7. The walltime for each block (i.e. Slurm allocation).
7. Any commands to run before carrying out any of the Parsl jobs. This is useful for setting environment variables, activating a given Conda environment, and loading modules.

8. The number of nodes that each block (i.e. Slurm allocation) should allocate.
8. The walltime for each block (i.e. Slurm allocation).

9. Sets the number of blocks (e.g. Slurm allocations) to provision during initialization of the workflow. We set this to a value of 0 so that there isn't a running Slurm job before any jobs have been submitted to Parsl.
9. The number of nodes that each block (i.e. Slurm allocation) should allocate.

10. Sets the minimum number of blocks (e.g. Slurm allocations) to maintain during [elastic resource management](https://parsl.readthedocs.io/en/stable/userguide/execution.html#elasticity). We set this to 0 so that Slurm jobs aren't running when there are no remaining jobs.
10. Sets the number of blocks (e.g. Slurm allocations) to provision during initialization of the workflow. We set this to a value of 0 so that there isn't a running Slurm job before any jobs have been submitted to Parsl.

11. Sets the maximum number of active blocks (e.g. Slurm allocations) during [elastic resource management](https://parsl.readthedocs.io/en/stable/userguide/execution.html#elasticity). We set this to 1 here, but it can be increased to have multiple Slurm jobs running simultaneously. Raising `max_blocks` to a larger value will allow the "htex_auto_scale" strategy to upscale resources as needed.
11. Sets the minimum number of blocks (e.g. Slurm allocations) to maintain during [elastic resource management](https://parsl.readthedocs.io/en/stable/userguide/execution.html#elasticity). We set this to 0 so that Slurm jobs aren't running when there are no remaining jobs.

12. The type of Launcher to use. `SrunLauncher()` will distribute jobs across the cores and nodes of the Slurm allocation. It should not be used for `PythonApp`s that themselves call MPI, which should use `SimpleLauncher()` instead.
12. Sets the maximum number of active blocks (e.g. Slurm allocations) during [elastic resource management](https://parsl.readthedocs.io/en/stable/userguide/execution.html#elasticity). We set this to 1 here, but it can be increased to have multiple Slurm jobs running simultaneously. Raising `max_blocks` to a larger value will allow the "htex_auto_scale" strategy to upscale resources as needed.

13. The maximum time to wait (in seconds) for the job scheduler info to be retrieved/sent.
13. The type of Launcher to use. `SrunLauncher()` will distribute jobs across the cores and nodes of the Slurm allocation. It should not be used for `PythonApp`s that themselves call MPI, which should use `SimpleLauncher()` instead.

14. The maximum time to wait (in seconds) for the job scheduler info to be retrieved/sent.

14. This will tidy up the Parsl logging to match the same log level as in quacc (`INFO` by default).

Expand Down Expand Up @@ -782,6 +786,7 @@ First, prepare your `QUACC_VASP_PP_PATH` environment variable in the `~/.bashrc`
```python
import parsl
from parsl.config import Config
from parsl.dataflow.dependency_resolvers import DEEP_DEPENDENCY_RESOLVER
from parsl.executors import HighThroughputExecutor
from parsl.launchers import SimpleLauncher
from parsl.providers import SlurmProvider
Expand All @@ -794,14 +799,13 @@ First, prepare your `QUACC_VASP_PP_PATH` environment variable in the `~/.bashrc`
vasp_parallel_cmd = (
f"srun -N {nodes_per_job} --ntasks-per-node={cores_per_node} --cpu_bind=cores"
)
min_allocations = 0
max_allocations = 1

config = Config(
dependency_resolver=DEEP_DEPENDENCY_RESOLVER,
strategy="htex_auto_scale",
executors=[
HighThroughputExecutor(
label="quacc_parsl",
label="quacc_mpi_parsl",
max_workers_per_node=nodes_per_allocation // nodes_per_job, # (1)!
cores_per_worker=1e-6, # (2)!
provider=SlurmProvider(
Expand All @@ -811,9 +815,6 @@ First, prepare your `QUACC_VASP_PP_PATH` environment variable in the `~/.bashrc`
worker_init=f"source ~/.bashrc && conda activate quacc && module load vasp/6.4.1-cpu && export QUACC_VASP_PARALLEL_CMD='{vasp_parallel_cmd}'",
walltime="00:10:00",
nodes_per_block=nodes_per_allocation,
init_blocks=0,
min_blocks=min_allocations,
max_blocks=max_allocations,
launcher=SimpleLauncher(), # (3)!
cmd_timeout=60,
),
Expand All @@ -825,7 +826,7 @@ First, prepare your `QUACC_VASP_PP_PATH` environment variable in the `~/.bashrc`
parsl.load(config)
```

1. Unlike the prior example, here `max_workers_per_node` is defining the maximum number of concurrent jobs in total and not the maximum number of jobs run per node.
1. Unlike the prior example, here `max_workers_per_node` is defining the maximum number of concurrent MPI jobs to run per allocation.

2. This is recommended in the Parsl manual for jobs that spawn MPI processes.

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ jobflow = ["jobflow[fireworks]>=0.1.14", "jobflow-remote>=0.1.0"]
mlp = ["matgl>=1.0.0", "chgnet>=0.3.3", "mace-torch>=0.3.3", "torch-dftd>=0.4.0"]
mp = ["atomate2>=0.0.14"]
newtonnet = ["newtonnet>=1.1"]
parsl = ["parsl[monitoring]>=2023.10.23; platform_system!='Windows'"]
parsl = ["parsl[monitoring]>=2024.5.27; platform_system!='Windows'"]
phonons = ["phonopy>=2.20.0", "seekpath>=2.1.0"]
prefect = ["prefect[dask]>=2.19.0", "dask-jobqueue>=0.8.2"]
redun = ["redun>=0.16.2"]
Expand Down
5 changes: 4 additions & 1 deletion tests/parsl/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
from pathlib import Path
from shutil import rmtree

from parsl.config import Config
from parsl.dataflow.dependency_resolvers import DEEP_DEPENDENCY_RESOLVER

TEST_RESULTS_DIR = Path(__file__).parent / "_test_results"
TEST_SCRATCH_DIR = Path(__file__).parent / "_test_scratch"
TEST_RUNINFO = Path(__file__).parent / "runinfo"
Expand All @@ -17,7 +20,7 @@ def pytest_sessionstart():
import os

if parsl:
parsl.load()
parsl.load(Config(dependency_resolver=DEEP_DEPENDENCY_RESOLVER))
file_dir = Path(__file__).parent
os.environ["QUACC_CONFIG_FILE"] = str(file_dir / "quacc.yaml")
os.environ["QUACC_RESULTS_DIR"] = str(TEST_RESULTS_DIR)
Expand Down
18 changes: 9 additions & 9 deletions tests/parsl/test_emt_recipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from ase.build import bulk

from quacc import job
from quacc import flow, job
from quacc.recipes.emt.core import relax_job # skipcq: PYL-C0412
from quacc.recipes.emt.slabs import bulk_to_slabs_flow # skipcq: PYL-C0412

Expand All @@ -27,16 +27,16 @@ def test_functools(tmp_path, monkeypatch, job_decorators):
assert result[-1]["parameters_opt"]["fmax"] == 0.1


# def test_copy_files(tmp_path, monkeypatch):
# monkeypatch.chdir(tmp_path)
# atoms = bulk("Cu")
def test_copy_files(tmp_path, monkeypatch):
monkeypatch.chdir(tmp_path)
atoms = bulk("Cu")

# @flow
# def myflow(atoms):
# result1 = relax_job(atoms)
# return relax_job(result1["atoms"], copy_files={result1["dir_name"]: "opt.*"})
@flow
def myflow(atoms):
result1 = relax_job(atoms)
return relax_job(result1["atoms"], copy_files={result1["dir_name"]: "opt.*"})

# assert "atoms" in myflow(atoms).result()
assert "atoms" in myflow(atoms).result()


def test_phonon_flow(tmp_path, monkeypatch):
Expand Down

0 comments on commit dcbb582

Please sign in to comment.