Skip to content

Feature: Support spinning a single metaflow step (Rebased) #2506

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

talsperre
Copy link
Collaborator

To test spin on a new flow you can do the following:

Simple case:

python <flow_name.py> --environment=conda spin <step_name>

Pass in specific pathspec:

python runtime_dag_flow.py --environment=conda spin --spin-pathspec RuntimeDAGFlow/13/step_c/275232971

Pass in custom artifacts via module:

python runtime_dag_flow.py spin --spin-pathspec RuntimeDAGFlow/13/step_d/275233082 --artifacts-module ./my_artifacts.py

Skip decorators (including the whitelisted ones):

python complex_dag_flow.py --environment=conda spin step_d --skip-decorators

Use with Runner API:

    with Runner('complex_dag_flow.py', environment="conda").spin(
        step_name,
        spin_pathspec="<Some Val>",
        artifacts_module='./artifacts/complex_dag_step_d.py',
    ) as spin:
        print("-" * 50)
        print(f"Running test for step: step_a")
        spin_task = spin.task
        print(f"my_output: {spin_task['my_output']}")
        assert spin_task['my_output'].data == [-1]

See the tests for more examples on hot to use this command.

@@ -250,6 +254,70 @@ def init_task(self):
"""
self.save_metadata({self.METADATA_ATTEMPT_SUFFIX: {"time": time.time()}})

@only_if_not_done
@require_mode("w")
def transfer_artifacts(self, other_datastore, names=None):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor nit: can we add types here for the args i.e. other_datastore and names

@@ -65,6 +65,9 @@ def save_blobs(self, blob_iter, raw=False, len_hint=0):
Whether to save the bytes directly or process them, by default False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add default False for raw instead of optional

@@ -265,3 +374,11 @@ def load_data(self, keys, force_raw=False):
"""
for key, blob in self.ca_store.load_blobs(keys, force_raw=force_raw):
yield key, blob


class MetadataCache(object):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be made an abstract class instead? with the methods raising NotImplementedError?

@@ -178,6 +179,45 @@ def resolve_identity():
return "%s:%s" % (identity_type, identity_value)


def get_latest_task_pathspec(flow_name: str, step_name: str) -> (str, str):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this returns a Task instance so the function can be renamed since it doesn't just return the pathspec

import importlib.util

try:
spec = importlib.util.spec_from_file_location("artifacts_module", file_path)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"artifacts_module" can be replaced with actual name of the file such as

os.path.splitext(os.path.basename(file_path))[0]

if self.orig_flow_datastore:
# We filter only the whitelisted decorators in case of spin step.
decorators = [
deco for deco in decorators if deco.name in whitelist_decorators
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe an additional check for whitelist_decorators is not None or [] can be helpful here?

@@ -63,8 +65,8 @@ def __init__(self, cache_dir=None, max_size=None):
# when querying for sizes of artifacts. Once we have queried for the size
# of one artifact in a TaskDatastore, caching this means that any
# queries on that same TaskDatastore will be quick (since we already
# have all the metadata)
self._task_metadata_caches = OrderedDict()
# have all the metadata). We keep track of this in a file so it persists
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if same should be done for self._store_caches? which is right now an OrderedDict

# Get the parent steps
steps = []
for node_name, attributes in graph_info["steps"].items():
if step_name in attributes["next"]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if in_funcs can be used here somehow...


yield from self._iter_matching_tasks(steps, "foreach-execution-path", pattern)
metadata_key = "foreach-execution-path"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be declared way earlier and then re-used instead of current_path = metadata_dict.get("foreach-execution-path")

target_depth = current_depth - 1
pattern = ",".join(current_path.split(",")[:target_depth])

metadata_key = "foreach-execution-path"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as before about declaring earlier


self._step_func = step_func

# Verify whether the use has provided step-name or spin-pathspec
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the use --> the user

default=True,
show_default=True,
help="Whether to persist the artifacts in the spun step. If set to False, "
"the artifacts will notbe persisted and will not be available in the spun step's "
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: notbe --> not be

echo = echo_always

if opt_namespace is not None:
namespace(opt_namespace or None)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already checked None in the if statement
so can avoid opt_namespace or None I guess

spin_artifacts = read_artifacts_module(artifacts_module) if artifacts_module else {}
from_start("SpinStep: read artifacts module")

ds_type, ds_root = orig_flow_datastore.split("@")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be None too? because that's the default value when defining this arg

) -> None:
"""
Create a new ExecutingTask -- this should not be done by the user directly but
instead user Runner.spin()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: user --> use

) -> None:
"""
Create a new ExecutingRun -- this should not be done by the user directly but
instead user Runner.run()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same nit: user --> use

"""

def __init__(
self, runner: "Runner", command_obj: CommandManager, task_obj: "metaflow.Task"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can import from metaflow import Task directly and use that as type instead of "metaflow.Task", to be consistent with how it's done for ExecutingRun


# Set the correct metadata from the runner_attribute file corresponding to this run.
metadata_for_flow = content.get("metadata")
from metaflow import Task
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be moved to the top, and thus the type can also be changed to Task instead of "metaflow.Task" in the __init__ of ExecutingTask

@@ -57,7 +63,6 @@ def __init__(
"""
self.runner = runner
self.command_obj = command_obj
self.run = run_obj

def __enter__(self) -> "ExecutingRun":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't the return type now be ExecutingProcess instead?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same for other functions such as async def wait

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe some functions specific to ExecutingRun (if there are any) will have to moved to the new subclass, would be good to check

# Set the correct metadata from the runner_attribute file corresponding to this run.
metadata_for_flow = content.get("metadata")

from metaflow import Task
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment, can be imported at the top

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants