Control Flow Patterns
The workflows library provides four essential control flow patterns to structure your automation logic.
Sequential
Sequential execution runs steps one after another in order. Use the .then()
method to chain steps together.
from workflows_py.workflow import Step, Workflow
def step_one ( ctx ):
data = ctx.get_start_data()
return { "message" : f "Hello { data[ 'name' ] } " , "count" : 1 }
def step_two ( ctx ):
result = ctx.get_step_result( "step-1" )
return { "message" : result[ "message" ] + "!" , "count" : result[ "count" ] + 1 }
def step_three ( ctx ):
result = ctx.get_step_result( "step-2" )
return { "final_message" : result[ "message" ], "total_count" : result[ "count" ] + 1 }
workflow = Workflow()
workflow.then(
Step( "step-1" , step_one)
).then(
Step( "step-2" , step_two)
).then(
Step( "step-3" , step_three)
)
Each step can access the results of previous steps using ctx.get_step_result("step-id")
.
Conditionals
Conditional branching lets your workflow make different choices based on data. Use .if_condition()
, .elif_branch()
, and .else_branch()
to create decision logic.
Simple If/Else
Multiple Conditions
Using the result from a conditional
from workflows_py.workflow import Step, Workflow
def check_number ( ctx ):
number = ctx.get_start_data()[ "value" ]
return { "number" : number}
def handle_positive ( ctx ):
return { "result" : "Number is positive" }
def handle_negative ( ctx ):
return { "result" : "Number is negative or zero" }
workflow = Workflow()
workflow.then(
Step( "check-number" , check_number)
).if_condition(
condition = lambda ctx : ctx.get_step_result( "check-number" )[ "number" ] > 0 ,
branch_fn = lambda w : w.then(
Step( "positive-path" , handle_positive)
)
).else_branch(
branch_fn = lambda w : w.then(
Step( "negative-path" , handle_negative)
)
)
Make sure your condition functions handle missing or invalid data gracefully.
Loops
Loops process a list of items by running the same workflow logic for each item. Use ctx.scope["item"]
to access the current item and ctx.scope["i"]
to get the index (0-based).
Simple List Processing
Processing Objects
from workflows_py.workflow import Step, Workflow
def get_numbers ( ctx ):
return [ 10 , 20 , 30 , 40 , 50 ]
def process_number ( ctx ):
# Access the current item and its index
current_number = ctx.scope[ "item" ]
current_index = ctx.scope[ "i" ]
return {
"index" : current_index,
"original" : current_number,
"doubled" : current_number * 2
}
def log_result ( ctx ):
current_number = ctx.scope[ "item" ]
result = ctx.get_step_result( "process-number" )
return {
"message" : f "Item { result[ 'index' ] } : { result[ 'original' ] } -> { result[ 'doubled' ] } "
}
workflow = Workflow()
workflow.then(
Step( "get-numbers" , get_numbers)
).loop(
"process-numbers" ,
get_items = lambda ctx : ctx.get_step_result( "get-numbers" ),
branch_fn = lambda w : w.then(
Step( "process-number" , process_number)
).then(
Step( "log-result" , log_result)
)
)
Loop iterations run in parallel by default.
Parallel
Parallel execution runs multiple workflow branches at the same time. All branches must complete before the workflow continues to the next step.
from workflows_py.workflow import Step, Workflow
def fetch_user_data ( ctx ):
user_id = ctx.get_start_data()[ "user_id" ]
return { "name" : "John" , "email" : "john@example.com" }
def fetch_user_orders ( ctx ):
user_id = ctx.get_start_data()[ "user_id" ]
return { "orders" : [{ "id" : 1 , "total" : 50 }, { "id" : 2 , "total" : 75 }]}
def fetch_user_preferences ( ctx ):
user_id = ctx.get_start_data()[ "user_id" ]
return { "theme" : "dark" , "notifications" : True }
def combine_results ( ctx ):
user_data = ctx.get_step_result( "fetch-user-data" )
orders = ctx.get_step_result( "fetch-user-orders" )
preferences = ctx.get_step_result( "fetch-user-preferences" )
return {
"user" : user_data,
"orders" : orders[ "orders" ],
"preferences" : preferences,
"total_orders" : len (orders[ "orders" ])
}
workflow = Workflow()
workflow.parallel(
"gather-user-info" ,
branches_fn = [
lambda w : w.then(Step( "fetch-user-data" , fetch_user_data)),
lambda w : w.then(Step( "fetch-user-orders" , fetch_user_orders)),
lambda w : w.then(Step( "fetch-user-preferences" , fetch_user_preferences)),
]
).then(
Step( "combine-results" , combine_results)
)
Parallel branches are perfect for independent tasks that can run simultaneously.
Combining Control Flow Patterns
You can combine different control flow patterns to create more complex workflows:
from workflows_py.workflow import Step, Workflow
def process_orders ( ctx ):
return [
{ "id" : 1 , "priority" : "high" , "items" : [ "A" , "B" ]},
{ "id" : 2 , "priority" : "low" , "items" : [ "C" ]},
{ "id" : 3 , "priority" : "high" , "items" : [ "D" , "E" , "F" ]}
]
def process_high_priority ( ctx ):
order = ctx.scope[ "item" ]
return { "order_id" : order[ "id" ], "expedited" : True }
def process_low_priority ( ctx ):
order = ctx.scope[ "item" ]
return { "order_id" : order[ "id" ], "expedited" : False }
def send_notification ( ctx ):
order = ctx.scope[ "item" ]
return { "notification_sent" : True , "order_id" : order[ "id" ]}
workflow = Workflow()
workflow.then(
Step( "get-orders" , process_orders)
).loop(
"process-each-order" ,
get_items = lambda ctx : ctx.get_step_result( "get-orders" ),
branch_fn = lambda w : w.if_condition(
condition = lambda ctx : ctx.scope[ "item" ][ "priority" ] == "high" ,
branch_fn = lambda w : w.then(
Step( "high-priority" , process_high_priority)
)
).else_branch(
branch_fn = lambda w : w.then(
Step( "low-priority" , process_low_priority)
)
).then(
Step( "notify" , send_notification)
)
)
Mix and match control flow patterns to handle complex business logic.
Responses are generated using AI and may contain mistakes.