Traces of thoughts

View on GitHub

Metaflow Deep Dive (1) - Static Analysis

img.png

Introduction

Metaflow is an open-source Python framework developed by Netflix for building fast and scalable data science and ML workflows. It offers a concise set of intuitive yet powerful APIs, enabling users to focus on expressing their workflow topology.

If you are new to Metaflow, here are some excellent tutorials to get started.

img.png

Why I Wrote This

Lately I joined Netflix to work on Metaflow, about which I’m keen on learning everything. Since I do not have much exposure to aspects such as functional and meta programming in Python, it is a rewarding learning experience digesting the source code.

In this series of articles, I’ll be walking through major implementation details of Metaflow. These writings will serve as my notes as well as (hopefully) reference for you.

Entering the Playground

Environment Setup

With an editor/IDE that comes with handy code navigation and debugger support, you can easily follow along with my progress in this series.

A Sample Workflow

Let’s start with a simple workflow, namely branch_flow.py:

from metaflow import FlowSpec, step


class BranchFlow(FlowSpec):

    @step
    def start(self):
        self.next(self.a, self.b)

    @step
    def a(self):
        self.x = 1
        self.next(self.join)

    @step
    def b(self):
        self.x = 2
        self.next(self.join)

    @step
    def join(self, inputs):
        print('a is %s' % inputs.a.x)
        print('b is %s' % inputs.b.x)
        print('total is %d' % sum(input.x for input in inputs))
        self.next(self.end)

    @step
    def end(self):
        pass


if __name__ == '__main__':
    BranchFlow()

A couple of things to note here:

Types of a Step

A step belongs to one of the following types:

Linear

A linear step contains one parent step and one child step.

Fan-Out

A fan-out step has one parent step but two or more child steps, or one child step to be executed multiple times.

Join (Fan-In)

A join step has fan-out parent step(s) and one child step. The child step is executed once all the parent steps are completed. It is the step to process the gathered results from parent fan-out steps.

The @step Decorator

Before we move on, let’s just inspect the @step decorator:

# metaflow/decorators.py

def step(f):
    """
    Marks a method in a FlowSpec as a Metaflow Step. Note that this
    decorator needs to be placed as close to the method as possible (ie:
    before other decorators).
    """
    f.is_step = True
    f.decorators = []
    f.config_decorators = []
    f.wrappers = []
    f.name = f.__name__
    return f

It marks a method as a step by adding an is_step attribute so that Metaflow can recognize it later. It also initializes lists for decorators, config_decorators (for config-based decorators), and wrappers (for user-defined step wrapper decorators) — both of which are newer additions to Metaflow’s decorator system.

Lifecycle of a Metaflow Run

By running the code of BranchFlow example with python3 branch_flow.py run, following output (or similar) can be observed:

Metaflow 2.19.19 executing BranchFlow for user:lizhaoliu
Validating your flow...
    The graph looks good!
Running pylint...
    Pylint is happy!
2026-02-15 09:24:11.264 Workflow starting (run-id 1740798651260512):
2026-02-15 09:24:11.269 [1740798651260512/start/1 (pid 47309)] Task is starting.
2026-02-15 09:24:11.793 [1740798651260512/start/1 (pid 47309)] Task finished successfully.
2026-02-15 09:24:11.796 [1740798651260512/a/2 (pid 47312)] Task is starting.
2026-02-15 09:24:11.798 [1740798651260512/b/3 (pid 47313)] Task is starting.
2026-02-15 09:24:12.428 [1740798651260512/b/3 (pid 47313)] Task finished successfully.
2026-02-15 09:24:12.442 [1740798651260512/a/2 (pid 47312)] Task finished successfully.
2026-02-15 09:24:12.445 [1740798651260512/join/4 (pid 47318)] Task is starting.
2026-02-15 09:24:12.900 [1740798651260512/join/4 (pid 47318)] a is 1
2026-02-15 09:24:12.900 [1740798651260512/join/4 (pid 47318)] b is 2
2026-02-15 09:24:12.954 [1740798651260512/join/4 (pid 47318)] total is 3
2026-02-15 09:24:12.955 [1740798651260512/join/4 (pid 47318)] Task finished successfully.
2026-02-15 09:24:12.958 [1740798651260512/end/5 (pid 47321)] Task is starting.
2026-02-15 09:24:13.428 [1740798651260512/end/5 (pid 47321)] Task finished successfully.
2026-02-15 09:24:13.428 Done!

We can see that Metaflow did the following things sequentially:

A more detailed workflow lifecyle diagram can be found here in Metaflow docs archive.

Entry Point

A user-defined workflow class is supposed to inherit from FlowSpec.

# metaflow/flowspec.py

class FlowSpec(metaclass=FlowSpecMeta):

    def __init__(self, use_cli=True):
        """
        Construct a FlowSpec

        Parameters
        ----------
        use_cli : bool, default True
            Set to True if the flow is invoked from __main__ or the command line
        """

        self.name = self.__class__.__name__

        self._datastore = None
        self._transition = None
        self._cached_input = {}

        if use_cli:
            with parameters.flow_context(self.__class__) as _:
                from . import cli
                cli.main(self)

A few things to note compared to earlier versions of Metaflow:

  1. FlowSpec now uses a FlowSpecMeta metaclass, which handles graph construction (_graph) and step discovery (_steps) at class creation time rather than in __init__. This enables features like flow inheritance and config-based decorators.
  2. parameters.flow_context sets up parameter context before the CLI is imported.
  3. cli.main(self) starts the workflow.

Now before diving into the execution part, let’s look into the FlowGraph class.

FlowGraph

FlowGraph is the class that represents a workflow DAG. It also provides with methods to statically inspect the graph.

# metaflow/graph.py

class FlowGraph(object):
    def __init__(self, flow):
        self.name = flow.__name__
        self.nodes = self._create_nodes(flow)
        self.doc = deindent_docstring(flow.__doc__)
        self.sorted_nodes = []
        self._traverse_graph()
        self._postprocess()

The constructor takes a workflow class as an argument, and it does the following things:

  1. self._create_nodes(flow) creates graph nodes and edges based on steps defined in the workflow.
  2. self._traverse_graph() traverses the graph to gather additional information (parents, fan-out, etc.). It also populates sorted_nodes with the topological order of step names.
  3. self._postprocess() does remaining postprocessing.

Node Creation

DAG nodes are created by _create_nodes function. It inspects the workflow class, discovers all methods marked as steps, parses their source code individually using ast module (Abstract Syntax Tree - AST), and returns a dictionary of DAG nodes.

# metaflow/graph.py

class FlowGraph(object):
    def _create_nodes(self, flow):
        nodes = {}
        for element in dir(flow):
            func = getattr(flow, element)
            if callable(func) and hasattr(func, "is_step"):
                source_file = inspect.getsourcefile(func)
                source_lines, lineno = inspect.getsourcelines(func)
                source_code = deindent_docstring("".join(source_lines))
                function_ast = ast.parse(source_code).body[0]
                node = DAGNode(
                    function_ast,
                    func.decorators,
                    func.wrappers,
                    func.config_decorators,
                    func.__doc__,
                    source_file,
                    lineno,
                )
                nodes[element] = node
        return nodes

Unlike earlier versions of Metaflow, which parsed the entire module AST and used a StepVisitor to walk the class definition, the current implementation iterates over the flow class attributes directly. For each callable with an is_step attribute (set by the @step decorator), it:

  1. Uses inspect.getsourcelines to get the function’s source code and line number.
  2. Parses only that function into an AST.
  3. Constructs a DAGNode with the function AST, its decorators, wrappers, config decorators, docstring, source file, and line number.

This per-function approach is cleaner and supports features like flow inheritance where steps may come from different source files.

(Optional) More on AST

If you are interested in knowing more about AST, you can read about it here, and here.

Building a DAGNode

As mentioned earlier, a DAGNode instance encapsulates the metadata of a step.

# metaflow/graph.py

# some fields are omitted in the constructor here.
class DAGNode(object):
    def __init__(self, func_ast, decos, wrappers, config_decorators, doc, source_file, lineno):
        self.name = func_ast.name
        self.source_file = source_file
        self.func_lineno = lineno + func_ast.lineno - 1
        self.decorators = decos
        self.wrappers = wrappers
        self.config_decorators = config_decorators
        self.doc = deindent_docstring(doc)
        self.parallel_step = any(getattr(deco, "IS_PARALLEL", False) for deco in decos)

        # these attributes are populated by _parse
        self.type = None  # Type of this step.
        self.out_funcs = []  # Outbound/child step(s) (called in self.next).
        self.num_args = 0  # Number of arguments, including `self`.
        self.switch_cases = {}  # For split-switch: maps case values to step names.
        self.condition = None  # For split-switch: the condition variable name.
        self.foreach_param = None
        self.num_parallel = 0
        self.parallel_foreach = False
        self._parse(func_ast, lineno)  # Parses the function AST and updates the metadata.

        # these attributes are populated by _traverse_graph
        self.in_funcs = set()  # Inbound/parent step(s).
        self.split_parents = []  # Parent steps that have started unclosed fan-out till current step.
        self.split_branches = []  # Branches originating from each split parent.
        self.matching_join = None  # The join (fan-in) step down the graph that matches current split (fan-out).
        # populated by _postprocess
        self.is_inside_foreach = False

Notable differences from earlier versions:

  1. The constructor now accepts wrappers, config_decorators, source_file, and lineno — supporting the richer decorator system and per-function source tracking.
  2. parallel_step checks if any decorator has IS_PARALLEL attribute (for the @parallel decorator).
  3. New fields switch_cases, condition, split_branches support the split-switch step type (conditional routing).
  4. self._parse(func_ast, lineno) is invoked for determining the step type and out_funcs (child steps).
# metaflow/graph.py

class DAGNode(object):
    def _parse(self, func_ast, lineno):
        self.num_args = len(func_ast.args.args)
        tail = func_ast.body[-1]

        # end doesn't need a transition
        if self.name == "end":
            # TYPE: end
            self.type = "end"

        # ensure that the tail an expression (a call of `self.next` is expected)
        if not isinstance(tail, ast.Expr):
            return

        # determine the type of self.next transition
        try:
            if not self._expr_str(tail.value.func) == "self.next":
                return

            self.has_tail_next = True
            self.invalid_tail_next = True
            self.tail_next_lineno = lineno + tail.lineno - 1

            # Check if first argument is a dictionary (switch case)
            if (
                len(tail.value.args) == 1
                and isinstance(tail.value.args[0], ast.Dict)
                and any(k.arg == "condition" for k in tail.value.keywords)
            ):
                switch_cases = self._parse_switch_dict(tail.value.args[0])
                # ... extract condition name from keywords ...
                if switch_cases and condition_name:
                    # TYPE: split-switch
                    self.type = "split-switch"
                    self.condition = condition_name
                    self.switch_cases = switch_cases
                    self.out_funcs = list(switch_cases.values())
                    self.invalid_tail_next = False
                    return
            else:
                self.out_funcs = [e.attr for e in tail.value.args]

            # Keyword arguments in `self.next` call.
            keywords = dict(
                (k.arg, getattr(k.value, "s", None)) for k in tail.value.keywords
            )
            if len(keywords) == 1:
                if "foreach" in keywords:
                    # TYPE: foreach
                    self.type = "foreach"
                    if len(self.out_funcs) == 1:
                        self.foreach_param = keywords["foreach"]
                        self.invalid_tail_next = False
                elif "num_parallel" in keywords:
                    self.type = "foreach"
                    self.parallel_foreach = True
                    if len(self.out_funcs) == 1:
                        self.num_parallel = keywords["num_parallel"]
                        self.invalid_tail_next = False
            elif len(keywords) == 0:
                if len(self.out_funcs) > 1:
                    # TYPE: split
                    self.type = "split"
                    self.invalid_tail_next = False
                elif len(self.out_funcs) == 1:
                    if self.name == "start":
                        # TYPE: start
                        self.type = "start"
                    elif self.num_args > 1:
                        # TYPE: join
                        self.type = "join"
                    else:
                        # TYPE: linear
                        self.type = "linear"
                    self.invalid_tail_next = False
        except AttributeError:
            return

Key takeaways from _parse(..):

  1. The last statement in a step must call self.next (except for end step).
  2. At most one keyword argument is allowed in a self.next call.
  3. A step is one of these types (internally):
    • start: the entry point step named start.
    • linear: single step in self.next call. This step must take no arguments except for self.
    • join: single step in self.next call. Must have at least one argument as inputs.
    • split: two or more steps in self.next call, all of which are executed. (Previously named split-and.)
    • split-switch: dictionary-based conditional routing, where the first argument to self.next is a dictionary mapping case values to steps, with a condition keyword argument. This replaces the old split-or type and is more powerful — it supports arbitrary number of branches (not just two), and can even route back to the same step for recursive patterns.
    • foreach (num_parallel): single step in self.next call. The number of executions is dictated by the data size from foreach (num_parallel) keyword argument.
  4. split, split-switch and foreach are fan-out, whereas join is fan-in.

Graph Traversal

Now that a DAG is successfully built, Metaflow traverses it to update other essential fields.

# metaflow/graph.py

class FlowGraph(object):
    def _traverse_graph(self):
        def traverse(node, seen, split_parents, split_branches):
            add_split_branch = False
            try:
                self.sorted_nodes.remove(node.name)
            except ValueError:
                pass
            self.sorted_nodes.append(node.name)
            if node.type in ("split", "foreach"):
                node.split_parents = split_parents
                node.split_branches = split_branches
                add_split_branch = True
                split_parents = split_parents + [node.name]
            elif node.type == "split-switch":
                node.split_parents = split_parents
                node.split_branches = split_branches
            elif node.type == "join":
                # ignore joins without splits
                if split_parents:
                    self[split_parents[-1]].matching_join = node.name
                    node.split_parents = split_parents
                    node.split_branches = split_branches[:-1]
                    split_parents = split_parents[:-1]
                    split_branches = split_branches[:-1]
            else:
                node.split_parents = split_parents
                node.split_branches = split_branches

            for n in node.out_funcs:
                # graph may contain loops - ignore them
                if n not in seen:
                    # graph may contain unknown transitions - ignore them
                    if n in self:
                        child = self[n]
                        child.in_funcs.add(node.name)
                        traverse(
                            child,
                            seen + [n],
                            split_parents,
                            split_branches + ([n] if add_split_branch else []),
                        )

        if "start" in self:
            traverse(self["start"], [], [], [])

        # fix the order of in_funcs
        for node in self.nodes.values():
            node.in_funcs = sorted(node.in_funcs)

The traversal is performed in a Depth First Search (DFS) fashion, beginning from the start step. It also builds sorted_nodes — a topological ordering of steps.

There are a few fields that are updated each time when a node/step is visited:

Graph Postprocess

The postprocessing marks if a node is inside an ongoing foreach fan-out.

# metaflow/graph.py

class FlowGraph(object):
    def _postprocess(self):
        # any node who has a foreach as any of its split parents
        # has is_inside_foreach=True *unless* all of those foreaches
        # are joined by the node
        for node in self.nodes.values():
            foreaches = [
                p for p in node.split_parents if self.nodes[p].type == "foreach"
            ]
            if [f for f in foreaches if self.nodes[f].matching_join != node.name]:
                node.is_inside_foreach = True

Lint Checks

The lint module (metaflow/lint.py) contains a set of validity checks that are run against a DAG. From these checks we can see all the rules Metaflow enforces. Should any of the checks fail, Metaflow raises an exception and tells you what and where the error is.

There are 18 checks in total, summarized below:

Closing Thoughts

It is a pleasure to open the hood of Metaflow. A few observations:

  1. A join step is determined by having more than one argument (in DAGNode._parse method) besides self, not by the number of parent steps it has. For example, if a step has multiple parent steps, but only self as an argument, then it is NOT a join.
  2. The constraint that a join step’s parents must all have the same fan-out step exists because it simplifies synchronization of execution and data flow. The runtime needs to know exactly which fan-out a join is closing so it can correctly gather all parallel results.
  3. Since the original version of this article (based on Metaflow 2.4.7), the codebase has evolved significantly:
    • The FlowSpecMeta metaclass now handles graph construction at class definition time, enabling flow inheritance and config-based decorators.
    • _create_nodes no longer uses module-level AST parsing with StepVisitor; instead, it inspects each step function individually via inspect.getsourcelines.
    • The old split-and and split-or types have been replaced by split and split-switch respectively, with split-switch supporting dictionary-based conditional routing and even recursive step patterns.
    • A dedicated start type was added (previously just treated as linear).
    • Three new lint checks were added: check_switch_splits, check_join_followed_by_parallel_step, and check_ambiguous_joins.

That would be all for this post! Hopefully you have enjoyed reading. In next post, I will discuss the details of workflow execution.