Control Flow Patterns

The workflows library provides four essential control flow patterns to structure your automation logic.

Sequential

Sequential execution runs steps one after another in order. Use the .then() method to chain steps together.

from workflows_py.workflow import Step, Workflow

def step_one(ctx):
    data = ctx.get_start_data()
    return {"message": f"Hello {data['name']}", "count": 1}

def step_two(ctx):
    result = ctx.get_step_result("step-1")
    return {"message": result["message"] + "!", "count": result["count"] + 1}

def step_three(ctx):
    result = ctx.get_step_result("step-2")
    return {"final_message": result["message"], "total_count": result["count"] + 1}

workflow = Workflow()
workflow.then(
    Step("step-1", step_one)
).then(
    Step("step-2", step_two)
).then(
    Step("step-3", step_three)
)

Each step can access the results of previous steps using ctx.get_step_result("step-id").

Conditionals

Conditional branching lets your workflow make different choices based on data. Use .if_condition(), .elif_branch(), and .else_branch() to create decision logic.

from workflows_py.workflow import Step, Workflow

def check_number(ctx):
    number = ctx.get_start_data()["value"]
    return {"number": number}

def handle_positive(ctx):
    return {"result": "Number is positive"}

def handle_negative(ctx):
    return {"result": "Number is negative or zero"}

workflow = Workflow()
workflow.then(
    Step("check-number", check_number)
).if_condition(
    condition=lambda ctx: ctx.get_step_result("check-number")["number"] > 0,
    branch_fn=lambda w: w.then(
        Step("positive-path", handle_positive)
    )
).else_branch(
    branch_fn=lambda w: w.then(
        Step("negative-path", handle_negative)
    )
)

Make sure your condition functions handle missing or invalid data gracefully.

Loops

Loops process a list of items by running the same workflow logic for each item. Use ctx.scope["item"] to access the current item and ctx.scope["i"] to get the index (0-based).

from workflows_py.workflow import Step, Workflow

def get_numbers(ctx):
    return [10, 20, 30, 40, 50]

def process_number(ctx):
    # Access the current item and its index
    current_number = ctx.scope["item"]
    current_index = ctx.scope["i"]
    
    return {
        "index": current_index,
        "original": current_number,
        "doubled": current_number * 2
    }

def log_result(ctx):
    current_number = ctx.scope["item"]
    result = ctx.get_step_result("process-number")
    
    return {
        "message": f"Item {result['index']}: {result['original']} -> {result['doubled']}"
    }

workflow = Workflow()
workflow.then(
    Step("get-numbers", get_numbers)
).loop(
    "process-numbers",
    get_items=lambda ctx: ctx.get_step_result("get-numbers"),
    branch_fn=lambda w: w.then(
        Step("process-number", process_number)
    ).then(
        Step("log-result", log_result)
    )
)

Loop iterations run in parallel by default.

Parallel

Parallel execution runs multiple workflow branches at the same time. All branches must complete before the workflow continues to the next step.

from workflows_py.workflow import Step, Workflow

def fetch_user_data(ctx):
    user_id = ctx.get_start_data()["user_id"]
    return {"name": "John", "email": "john@example.com"}

def fetch_user_orders(ctx):
    user_id = ctx.get_start_data()["user_id"]
    return {"orders": [{"id": 1, "total": 50}, {"id": 2, "total": 75}]}

def fetch_user_preferences(ctx):
    user_id = ctx.get_start_data()["user_id"]
    return {"theme": "dark", "notifications": True}

def combine_results(ctx):
    user_data = ctx.get_step_result("fetch-user-data")
    orders = ctx.get_step_result("fetch-user-orders")
    preferences = ctx.get_step_result("fetch-user-preferences")
    
    return {
        "user": user_data,
        "orders": orders["orders"],
        "preferences": preferences,
        "total_orders": len(orders["orders"])
    }

workflow = Workflow()
workflow.parallel(
    "gather-user-info",
    branches_fn=[
        lambda w: w.then(Step("fetch-user-data", fetch_user_data)),
        lambda w: w.then(Step("fetch-user-orders", fetch_user_orders)),
        lambda w: w.then(Step("fetch-user-preferences", fetch_user_preferences)),
    ]
).then(
    Step("combine-results", combine_results)
)

Parallel branches are perfect for independent tasks that can run simultaneously.

Combining Control Flow Patterns

You can combine different control flow patterns to create more complex workflows:

from workflows_py.workflow import Step, Workflow

def process_orders(ctx):
    return [
        {"id": 1, "priority": "high", "items": ["A", "B"]},
        {"id": 2, "priority": "low", "items": ["C"]},
        {"id": 3, "priority": "high", "items": ["D", "E", "F"]}
    ]

def process_high_priority(ctx):
    order = ctx.scope["item"]
    return {"order_id": order["id"], "expedited": True}

def process_low_priority(ctx):
    order = ctx.scope["item"]
    return {"order_id": order["id"], "expedited": False}

def send_notification(ctx):
    order = ctx.scope["item"]
    return {"notification_sent": True, "order_id": order["id"]}

workflow = Workflow()
workflow.then(
    Step("get-orders", process_orders)
).loop(
    "process-each-order",
    get_items=lambda ctx: ctx.get_step_result("get-orders"),
    branch_fn=lambda w: w.if_condition(
        condition=lambda ctx: ctx.scope["item"]["priority"] == "high",
        branch_fn=lambda w: w.then(
            Step("high-priority", process_high_priority)
        )
    ).else_branch(
        branch_fn=lambda w: w.then(
            Step("low-priority", process_low_priority)
        )
    ).then(
        Step("notify", send_notification)
    )
)

Mix and match control flow patterns to handle complex business logic.