Skip to content

Commit

Permalink
refactor: remove new flow_datastore method and combine functionality …
Browse files Browse the repository at this point in the history
…with existing one.
  • Loading branch information
saikonen committed May 3, 2024
1 parent a05d4cd commit e9c988b
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 135 deletions.
156 changes: 28 additions & 128 deletions metaflow/datastore/flow_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,124 +68,13 @@ def datastore_root(self):
return self._storage_impl.datastore_root

def get_latest_task_datastores(
self, run_id=None, steps=None, pathspecs=None, allow_not_done=False, mode="r"
):
"""
Return a list of TaskDataStore for a subset of the tasks.
We filter the list based on `steps` if non-None.
Alternatively, `pathspecs` can contain the exact list of pathspec(s)
(run_id/step_name/task_id) that should be filtered.
Note: When `pathspecs` is specified, we expect strict consistency and
not eventual consistency in contrast to other modes.
Parameters
----------
run_id : str, optional
Run ID to get the tasks from. If not specified, use pathspecs,
by default None
steps : List[str] , optional
Steps to get the tasks from. If run_id is specified, this
must also be specified, by default None
pathspecs : List[str], optional
Full task specs (run_id/step_name/task_id). Can be used instead of
specifying run_id and steps, by default None
allow_not_done : bool, optional
If True, returns the latest attempt of a task even if that attempt
wasn't marked as done, by default False
mode : str, default "r"
Mode to initialize the returned TaskDataStores in.
Returns
-------
List[TaskDataStore]
Task datastores for all the tasks specified.
"""
task_urls = []
# Note: When `pathspecs` is specified, we avoid the potentially
# eventually consistent `list_content` operation, and directly construct
# the task_urls list.
if pathspecs:
task_urls = [
self._storage_impl.path_join(self.flow_name, pathspec)
for pathspec in pathspecs
]
else:
run_prefix = self._storage_impl.path_join(self.flow_name, run_id)
if steps:
step_urls = [
self._storage_impl.path_join(run_prefix, step) for step in steps
]
else:
step_urls = [
step.path
for step in self._storage_impl.list_content([run_prefix])
if step.is_file is False
]
task_urls = [
task.path
for task in self._storage_impl.list_content(step_urls)
if task.is_file is False
]
urls = []
for task_url in task_urls:
for attempt in range(metaflow_config.MAX_ATTEMPTS):
for suffix in [
TaskDataStore.METADATA_DATA_SUFFIX,
TaskDataStore.METADATA_ATTEMPT_SUFFIX,
TaskDataStore.METADATA_DONE_SUFFIX,
]:
urls.append(
self._storage_impl.path_join(
task_url,
TaskDataStore.metadata_name_for_attempt(suffix, attempt),
)
)

latest_started_attempts = {}
done_attempts = set()
data_objs = {}
with self._storage_impl.load_bytes(urls) as get_results:
for key, path, meta in get_results:
if path is not None:
_, run, step, task, fname = self._storage_impl.path_split(key)
attempt, fname = TaskDataStore.parse_attempt_metadata(fname)
attempt = int(attempt)
if fname == TaskDataStore.METADATA_DONE_SUFFIX:
done_attempts.add((run, step, task, attempt))
elif fname == TaskDataStore.METADATA_ATTEMPT_SUFFIX:
latest_started_attempts[(run, step, task)] = max(
latest_started_attempts.get((run, step, task), 0), attempt
)
elif fname == TaskDataStore.METADATA_DATA_SUFFIX:
# This somewhat breaks the abstraction since we are using
# load_bytes directly instead of load_metadata
with open(path, encoding="utf-8") as f:
data_objs[(run, step, task, attempt)] = json.load(f)
# We now figure out the latest attempt that started *and* finished.
# Note that if an attempt started but didn't finish, we do *NOT* return
# the previous attempt
latest_started_attempts = set(
(run, step, task, attempt)
for (run, step, task), attempt in latest_started_attempts.items()
)
if allow_not_done:
latest_to_fetch = latest_started_attempts
else:
latest_to_fetch = latest_started_attempts & done_attempts
latest_to_fetch = [
(v[0], v[1], v[2], v[3], data_objs.get(v), mode, allow_not_done)
for v in latest_to_fetch
]
return list(itertools.starmap(self.get_task_datastore, latest_to_fetch))

def get_task_datastores(
self,
run_id=None,
steps=None,
pathspecs=None,
allow_not_done=False,
attempt=None,
include_prior=False,
mode="r",
):
"""
Expand All @@ -209,17 +98,19 @@ def get_task_datastores(
Full task specs (run_id/step_name/task_id). Can be used instead of
specifying run_id and steps, by default None
allow_not_done : bool, optional
If True, returns the latest or specified attempt of a task even if that attempt
If True, returns the latest attempt of a task even if that attempt
wasn't marked as done, by default False
attempt: int, optional
Attempt number of the tasks to return
attempt : int, optional
Attempt number of the tasks to return. Overrides returning the latest TaskDataStore.
include_prior : boolean, default False
Flag to determine if previous attempts TaskDataStores should be returned along the latest one
mode : str, default "r"
Mode to initialize the returned TaskDataStores in.
Returns
-------
List[TaskDataStore]
Task datastores for all the attempts of the tasks specified.
Task datastores for all the tasks specified.
"""
task_urls = []
# Note: When `pathspecs` is specified, we avoid the potentially
Expand Down Expand Up @@ -250,11 +141,10 @@ def get_task_datastores(
urls = []
for task_url in task_urls:
# parse content urls for specific attempt only, or for all attempts in max range
attempt_range = (
[attempt]
if attempt is not None
else range(metaflow_config.MAX_ATTEMPTS)
)
attempt_range = range(metaflow_config.MAX_ATTEMPTS)
if attempt is not None:
attempt_range = range(attempt + 1) if include_prior else [attempt]

for attempt in attempt_range:
for suffix in [
TaskDataStore.METADATA_DATA_SUFFIX,
Expand All @@ -267,6 +157,7 @@ def get_task_datastores(
TaskDataStore.metadata_name_for_attempt(suffix, attempt),
)
)

latest_started_attempts = {}
done_attempts = set()
data_objs = {}
Expand All @@ -287,21 +178,30 @@ def get_task_datastores(
# load_bytes directly instead of load_metadata
with open(path, encoding="utf-8") as f:
data_objs[(run, step, task, attempt)] = json.load(f)

# We now figure out the latest attempt that started *and* finished.
# Note that if an attempt started but didn't finish, we do *NOT* return
# the previous attempt
latest_started_attempts = set(
(run, step, task, attempt)
for (run, step, task), attempt in latest_started_attempts.items()
)
if allow_not_done:
to_fetch = done_attempts.union(latest_started_attempts)
latest_to_fetch = (
done_attempts.union(latest_started_attempts)
if include_prior
else latest_started_attempts
)
else:
to_fetch = done_attempts

to_fetch = [
latest_to_fetch = (
done_attempts
if include_prior
else (latest_started_attempts & done_attempts)
)
latest_to_fetch = [
(v[0], v[1], v[2], v[3], data_objs.get(v), mode, allow_not_done)
for v in to_fetch
for v in latest_to_fetch
]
return list(itertools.starmap(self.get_task_datastore, to_fetch))
return list(itertools.starmap(self.get_task_datastore, latest_to_fetch))

def get_task_datastore(
self,
Expand Down
16 changes: 9 additions & 7 deletions metaflow/plugins/logs_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,6 @@ def scrub(
latest=None,
include_not_done=None,
):
if latest is not None and attempt is not None:
raise CommandException(
"Please only specify one of the options --latest/--all or --attempt, not both."
)
types = set()
if stdout:
types.add("stdout")
Expand Down Expand Up @@ -303,30 +299,36 @@ def scrub(
if task_id:
if latest:
ds_list = obj.flow_datastore.get_latest_task_datastores(
pathspecs=[input_path], mode="d", allow_not_done=include_not_done
pathspecs=[input_path],
attempt=attempt,
mode="d",
allow_not_done=include_not_done,
)
else:
ds_list = obj.flow_datastore.get_task_datastores(
ds_list = obj.flow_datastore.get_latest_task_datastores(
pathspecs=[input_path],
attempt=attempt,
mode="d",
allow_not_done=include_not_done,
include_prior=True,
)
else:
if latest:
ds_list = obj.flow_datastore.get_latest_task_datastores(
run_id=run_id,
steps=[step_name],
attempt=attempt,
mode="d",
allow_not_done=include_not_done,
)
else:
ds_list = obj.flow_datastore.get_task_datastores(
ds_list = obj.flow_datastore.get_latest_task_datastores(
run_id=run_id,
steps=[step_name],
attempt=attempt,
mode="d",
allow_not_done=include_not_done,
include_prior=True,
)

if ds_list:
Expand Down

0 comments on commit e9c988b

Please sign in to comment.