Skip to content

Commit

Permalink
Replace get_num_cpu from resdata.util
Browse files Browse the repository at this point in the history
  • Loading branch information
eivindjahren committed Apr 5, 2024
1 parent 0ce54b5 commit c13ab33
Show file tree
Hide file tree
Showing 4 changed files with 245 additions and 30 deletions.
228 changes: 226 additions & 2 deletions src/ert/config/_get_num_cpu.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,233 @@
from __future__ import annotations

from typing import Optional
import logging
from typing import Iterator, Optional, TypeVar, overload

from resdata.rd_util import get_num_cpu

from .parsing import ConfigWarning

logger = logging.getLogger(__name__)


def get_num_cpu_from_data_file(data_file: str) -> Optional[int]:
return get_num_cpu(data_file)
try:
new_value = _new_get_num_cpu_from_data_file(data_file)
except Exception as err:
new_value = None
logger.info(f"New get_num_cpu errored: {err}")

old_value = get_num_cpu(data_file)
new_value = 1 if new_value is None else new_value

if old_value != new_value:
logger.info(
f"New get_num_cpu gave different result: {old_value} != {new_value}"
)

return old_value


def _new_get_num_cpu_from_data_file(data_file: str) -> Optional[int]:
"""Reads the number of cpus required from the reservoir simulator .data file.
Works similarly to resdata.util.get_num_cpu
This does not attempt to parse the .data file completely, although
that could be done using opm.io.Parser. The file format is context
sensitive and contains ~2000 keywords many of which needs to be parsed
in a unique way or changes the context.
Instead we keep backwards compatability with the files that we can
parse using the following heuristic method:
1. the first word on any line is the keyword;
2. A line is separated into words by splitting
with space, quotations and comments (see _split_line);
3. A sequence is consequtive words ended by "/".
The PARALLEL keyword is followed by one sequence.
The SLAVES keyword is followed by several sequences, and ends by a single "/".
4. Keywords that are not "PARALLEL" or "SLAVES" are ignored, except TITLE where
the next two lines have to be skipped.
To disambiguate what is correct behavior, the following implementation using
opm.io shows how this is interpreted by opm flow:
.. code-block:: python
from __future__ import annotations
from typing import Any, Optional, Tuple
import opm.io
from .parsing import ConfigWarning
OPMIOPARSER_RECOVERY: list[Tuple[str, Any]] = [
("PARSE_EXTRA_DATA", opm.io.action.ignore),
("PARSE_EXTRA_RECORDS", opm.io.action.ignore),
("PARSE_INVALID_KEYWORD_COMBINATION", opm.io.action.ignore),
("PARSE_MISSING_DIMS_KEYWORD", opm.io.action.ignore),
("PARSE_MISSING_INCLUDE", opm.io.action.ignore),
("PARSE_MISSING_SECTIONS", opm.io.action.ignore),
("PARSE_RANDOM_SLASH", opm.io.action.ignore),
("PARSE_RANDOM_TEXT", opm.io.action.ignore),
("PARSE_UNKNOWN_KEYWORD", opm.io.action.ignore),
("SUMMARY_UNKNOWN_GROUP", opm.io.action.ignore),
("UNSUPPORTED_*", opm.io.action.ignore),
]
def get_num_cpu_from_data_file(data_file: str) -> Optional[int]:
try:
parsecontext = opm.io.ParseContext(OPMIOPARSER_RECOVERY)
deck = opm.io.Parser().parse(data_file, parsecontext)
for _, kword in enumerate(deck):
if kword.name in ["PARALLEL"]:
return kword[0][0].get_int(0)
if kword.name in ["SLAVES"]:
num_cpu = 1
for rec in kword:
num_cpu += rec.get_int(1)
return num_cpu
except Exception as err:
ConfigWarning.ert_context_warn(
f"Failed to read NUM_CPU from {data_file}: {err}",
data_file,
)
return None
"""
try:
with open(data_file, "r") as file:
return _get_num_cpu(iter(file), data_file)
except OSError as err:
ConfigWarning.ert_context_warn(
f"Failed to read from DATA_FILE {data_file}: {err}", data_file
)
return None


def _get_num_cpu(
lines_iter: Iterator[str], data_file_name: Optional[str] = None
) -> Optional[int]:
"""Handles reading the lines in the data file and returns the num_cpu
TITLE keyword requires skipping two non-empty lines
>>> _get_num_cpu(iter(["TITLE", "", "", "PARALLEL", "3 / -- skipped", "PARALLEL", "4 /"]))
4
PARALLEL takes presedence even when SLAVES comes first:
>>> _get_num_cpu(iter(["SLAVES", "/", "PARALLEL", "10 /"]))
10
"""
parser = _Parser(lines_iter)
try:
slaves_num_cpu = None
while (words := parser.next_line(None)) is not None:
if not words:
continue
keyword = next(words, None)
keyword = keyword[0 : min(len(keyword), 8)] if keyword is not None else None
if keyword == "TITLE":
# Skip two non-blank lines following a TITLE
for _ in range(2):
line: list[str] = []
while line == []:
nline = parser.next_line(None)
if nline is None:
break
line = list(nline)
if keyword == "PARALLEL":
while (word := next(words, None)) is None:
words = parser.next_line(None)
if words is None:
return None
if word is not None:
return int(word)
else:
return None
if keyword == "SLAVES" and slaves_num_cpu is None:
slaves_num_cpu = 1
while (line_iter := parser.next_line(None)) is not None:
parameters = list(line_iter)
if not parameters:
continue
if parameters[0] == "/":
break
if len(parameters) != 6:
slaves_num_cpu += 1
else:
slaves_num_cpu += int(parameters[4])
except Exception as err:
ConfigWarning.ert_context_warn(
f"Failed to read NUM_CPU from {data_file_name} Line {parser.line_number}: {err}",
data_file_name if data_file_name else "",
)

return slaves_num_cpu


T = TypeVar("T")


class _Parser:
def __init__(self, line_iterator: Iterator[str]) -> None:
self._line_iterator = line_iterator
self.line_number = 1

@overload
def next_line(self) -> Iterator[str]: ...

@overload
def next_line(self, __default: T) -> Iterator[str] | T: ...

def next_line(self, *args: T) -> Iterator[str] | T:
self.line_number += 1
words = next(self._line_iterator, *args)
if isinstance(words, str):
return _split_line(words)
return words


def _split_line(line: str) -> Iterator[str]:
"""
split a keyword line inside a .data file. This splits the values of a
'simple' keyword into tokens. ie.
>>> list(_split_line("3 1.0 3*4 PORO 3*INC 'HELLO WORLD ' 3*'NAME'"))
['3', '1.0', '3*4', 'PORO', '3*INC', 'HELLO WORLD ', '3*', 'NAME']
"""
value = ""
inside_str = None
for char in line:
if char == "'":
# end of str
if inside_str:
yield value
value = ""
inside_str = False
# start of str
else:
if value != "":
yield value
value = ""
inside_str = char
elif inside_str:
value += char
elif value and value[-1] == "-" and char == "-":
# a comment
value = value[0:-1]
break
elif char.isspace():
# delimiting space
if value:
yield value
value = ""
else:
value += char
if value:
yield value
41 changes: 16 additions & 25 deletions tests/unit_tests/config/test_forward_model_data_to_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
from ert.substitution_list import SubstitutionList


@pytest.fixture()
def context():
return SubstitutionList({"<RUNPATH>": "./"})


@pytest.fixture
def joblist():
result = [
Expand Down Expand Up @@ -284,10 +289,9 @@ def verify_json_dump(joblist, config, selected_jobs, run_id):


@pytest.mark.usefixtures("use_tmpdir")
def test_config_path_and_file():
def test_config_path_and_file(context):
run_id = "test_config_path_and_file_in_jobs_json"

context = SubstitutionList.from_dict({"DEFINE": [["<RUNPATH>", "./"]]})
jobs_json = ErtConfig(
forward_model_list=set_up_forward_model([]),
substitution_list=context,
Expand All @@ -302,10 +306,9 @@ def test_config_path_and_file():


@pytest.mark.usefixtures("use_tmpdir")
def test_no_jobs():
def test_no_jobs(context):
run_id = "test_no_jobs_id"

context = SubstitutionList.from_dict({"DEFINE": [["<RUNPATH>", "./"]]})
data = ErtConfig(
forward_model_list=set_up_forward_model([]),
substitution_list=context,
Expand All @@ -318,7 +321,7 @@ def test_no_jobs():


@pytest.mark.usefixtures("use_tmpdir")
def test_transfer_arg_types():
def test_transfer_arg_types(context):
with open("FWD_MODEL", "w", encoding="utf-8") as f:
f.write("EXECUTABLE ls\n")
f.write("MIN_ARG 2\n")
Expand All @@ -335,7 +338,6 @@ def test_transfer_arg_types():
job = ForwardModel.from_config_file("FWD_MODEL")
run_id = "test_no_jobs_id"

context = SubstitutionList.from_dict({"DEFINE": [["<RUNPATH>", "./"]]})
config = ErtConfig(
forward_model_list=[job], substitution_list=context
).forward_model_data_to_json(run_id)
Expand All @@ -354,22 +356,19 @@ def test_transfer_arg_types():


@pytest.mark.usefixtures("use_tmpdir")
def test_one_job(joblist):
def test_one_job(joblist, context):
for i, job in enumerate(joblist):
run_id = "test_one_job"

context = SubstitutionList.from_dict({"DEFINE": [["<RUNPATH>", "./"]]})
data = ErtConfig(
forward_model_list=set_up_forward_model([job]),
substitution_list=context,
).forward_model_data_to_json(run_id)
verify_json_dump(joblist, data, [i], run_id)


def run_all(joblist):
def run_all(joblist, context):
run_id = "run_all"

context = SubstitutionList.from_dict({"DEFINE": [["<RUNPATH>", "./"]]})
data = ErtConfig(
forward_model_list=set_up_forward_model(joblist),
substitution_list=context,
Expand All @@ -379,17 +378,12 @@ def run_all(joblist):


@pytest.mark.usefixtures("use_tmpdir")
def test_all_jobs(joblist):
run_all(joblist)
def test_all_jobs(joblist, context):
run_all(joblist, context)


@pytest.mark.usefixtures("use_tmpdir")
def test_name_none(joblist):
run_all(joblist)


@pytest.mark.usefixtures("use_tmpdir")
def test_various_null_fields(joblist):
def test_various_null_fields(joblist, context):
for key in [
"target_file",
"error_file",
Expand All @@ -402,14 +396,12 @@ def test_various_null_fields(joblist):
"stdin",
]:
joblist[0][key] = None
run_all(joblist)
run_all(joblist, context)


@pytest.mark.usefixtures("use_tmpdir")
def test_status_file(joblist):
def test_status_file(joblist, context):
run_id = "test_no_jobs_id"

context = SubstitutionList.from_dict({"DEFINE": [["<RUNPATH>", "./"]]})
with open(JOBS_FILE, "w", encoding="utf-8") as fp:
json.dump(
ErtConfig(
Expand Down Expand Up @@ -447,12 +439,11 @@ def test_status_file(joblist):


@pytest.mark.usefixtures("use_tmpdir")
def test_that_values_with_brackets_are_ommitted(caplog, joblist):
def test_that_values_with_brackets_are_ommitted(caplog, joblist, context):
forward_model_list: List[ForwardModel] = set_up_forward_model(joblist)
forward_model_list[0].environment["ENV_VAR"] = "<SOME_BRACKETS>"
run_id = "test_no_jobs_id"

context = SubstitutionList.from_dict({"DEFINE": [["<RUNPATH>", "./"]]})
data = ErtConfig(
forward_model_list=forward_model_list, substitution_list=context
).forward_model_data_to_json(run_id)
Expand Down
4 changes: 1 addition & 3 deletions tests/unit_tests/job_runner/test_jobmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,7 @@ def test_default_env_variables_available_inside_job_context():
json.dump(
ErtConfig(
forward_model_list=[job],
substitution_list=SubstitutionList.from_dict(
{"DEFINE": [["<RUNPATH>", "./"]]}
),
substitution_list=SubstitutionList({"<RUNPATH>": "./"}),
).forward_model_data_to_json(
"run_id",
),
Expand Down
2 changes: 2 additions & 0 deletions tests/unit_tests/test_run_path_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ def test_that_sampling_prior_makes_initialized_fs(storage):
-- comment
-- comment with slash / "
'upper' 'base' '*' 'data_file' 4 /
-- Line above left intentionally blank
'lower' 'base' '*' 'data_file_lower' /
/"""
),
Expand Down

0 comments on commit c13ab33

Please sign in to comment.