Conditional branching lets your workflow make different choices based on data. Use .if_condition(), .elif_branch(), and .else_branch() to create decision logic.
Copy
from workflows_py.workflow import Step, Workflowdef check_number(ctx):number = ctx.get_start_data()["value"]return {"number": number}def handle_positive(ctx):return {"result": "Number is positive"}def handle_negative(ctx):return {"result": "Number is negative or zero"}workflow = Workflow()workflow.then(Step("check-number", check_number)).if_condition(condition=lambda ctx: ctx.get_step_result("check-number")["number"] > 0,branch_fn=lambda w: w.then(Step("positive-path", handle_positive))).else_branch(branch_fn=lambda w: w.then(Step("negative-path", handle_negative)))
Make sure your condition functions handle missing or invalid data gracefully.
Loops process a list of items by running the same workflow logic for each item. Use ctx.scope["item"] to access the current item and ctx.scope["i"] to get the index (0-based).
You can configure automatic retries for individual steps using the options parameter on Step.
Copy
from workflows_py.workflow import Step, Workflowdef unstable_operation(ctx): # Your step logic that may raise return {"ok": True}workflow = Workflow()workflow.then( Step( "unstable-operation", unstable_operation, options={ "num_retries": 3, # retry up to 3 times on failure "backoff": 1000, # start with 1000ms delay; doubles each retry }, ))
num_retries: Number of retry attempts after the initial failure. Default is 0 (no retries).
backoff: Initial delay in milliseconds before the first retry. The delay doubles after each failed attempt (exponential backoff). Default is 1000ms. Must be >= 0.
Partial config: You may specify only one key:
If you set only num_retries, the backoff defaults to 1000ms.
If you set only backoff, retries will not happen unless num_retries > 0.
You can define custom error handling logic for individual steps using the on_failure option. This handler is called when a step fails after all retry attempts have been exhausted.
Copy
from workflows_py.workflow import Step, Workflowdef process_payment(ctx): # This step might fail raise Exception("Payment gateway unavailable")def handle_payment_failure(ctx, error): # Custom error handling logic print(f"Payment failed: {error}") # You could notify admins, log to external service, etc.workflow = Workflow()workflow.then( Step( "process-payment", process_payment, options={ "on_failure": handle_payment_failure, }, ))
The on_failure handler receives two arguments:
ctx: The WorkflowRunContext with access to get_step_result(), get_start_data(), etc.
error: The error information (may be None in some cases)
Combine on_failure with num_retries to first attempt automatic recovery,
then execute custom logic if all retries fail.
The on_failure handler must accept exactly two arguments: (ctx, error). A
handler with only one argument will raise a TypeError.