Conditional branching lets your workflow make different choices based on data. Use .if_condition(), .elif_branch(), and .else_branch() to create decision logic.
from workflows_py.workflow import Step, Workflowdef check_number(ctx):number = ctx.get_start_data()["value"]return {"number": number}def handle_positive(ctx):return {"result": "Number is positive"}def handle_negative(ctx):return {"result": "Number is negative or zero"}workflow = Workflow()workflow.then(Step("check-number", check_number)).if_condition(condition=lambda ctx: ctx.get_step_result("check-number")["number"] > 0,branch_fn=lambda w: w.then(Step("positive-path", handle_positive))).else_branch(branch_fn=lambda w: w.then(Step("negative-path", handle_negative)))
Make sure your condition functions handle missing or invalid data gracefully.
Loops process a list of items by running the same workflow logic for each item. Use ctx.scope["item"] to access the current item and ctx.scope["i"] to get the index (0-based).
You can configure automatic retries for individual steps using the options parameter on Step.
from workflows_py.workflow import Step, Workflowdef unstable_operation(ctx): # Your step logic that may raise return {"ok": True}workflow = Workflow()workflow.then( Step( "unstable-operation", unstable_operation, options={ "num_retries": 3, # retry up to 3 times on failure "backoff": 1000, # start with 1000ms delay; doubles each retry }, ))
num_retries: Number of retry attempts after the initial failure. Default is 0 (no retries).
backoff: Initial delay in milliseconds before the first retry. The delay doubles after each failed attempt (exponential backoff). Default is 1000ms. Must be >= 0.
Partial config: You may specify only one key:
If you set only num_retries, the backoff defaults to 1000ms.
If you set only backoff, retries will not happen unless num_retries > 0.
You can define custom error handling logic for individual steps using the on_failure option. This handler is called when a step fails after all retry attempts have been exhausted.
from workflows_py.workflow import Step, Workflowdef process_payment(ctx): # This step might fail raise Exception("Payment gateway unavailable")def handle_payment_failure(ctx, error): # Custom error handling logic print(f"Payment failed: {error}") # You could notify admins, log to external service, etc.workflow = Workflow()workflow.then( Step( "process-payment", process_payment, options={ "on_failure": handle_payment_failure, }, ))
The on_failure handler receives two arguments:
ctx: The WorkflowRunContext with access to get_step_result(), get_start_data(), etc.
error: The error information (may be None in some cases)
Combine on_failure with num_retries to first attempt automatic recovery,
then execute custom logic if all retries fail.
The on_failure handler must accept exactly two arguments: (ctx, error). A
handler with only one argument will raise a TypeError.
You can override which task Sample opens next after a step completes by using
the next_task option. This is useful when a workflow fans out into multiple
manual tasks and you want to send the user to a specific one, such as the most
recently created task in the workflow run or the latest task in the task list.
from workflows_py.workflow import Step, Workflowdef build_review_tasks(ctx): # Your normal step logic here return {"ok": True}def route_to_latest_task(ctx): if not ctx.workflow_run_id: return None # Replace this with the SQL helper your workflow already uses, # for example sample_client.execute_sql_query(...). rows = query_tasks( """ select id from sample.tasks where workflow_run_id = %(workflow_run_id)s and status = 'suspended' order by created_at desc limit 1 """, {"workflow_run_id": ctx.workflow_run_id}, ) return rows[0]["id"] if rows else Noneworkflow = Workflow()workflow.then( Step( "build-review-tasks", build_review_tasks, options={ "next_task": route_to_latest_task, }, ))
In practice, query_tasks(...) should wrap whatever SQL access your workflow
already uses, such as sample_client.execute_sql_query(...).The next_task handler receives the normal WorkflowRunContext and should
return one of:
A task ID like "tsk_123" to route to that task
None to fall back to Sample’s default next-task routing
next_task is a routing override only. It does not change the workflow graph
or which step executes next. The workflow still advances normally. This only
controls which task ID is returned to the UI or API after the current step
completes.
For “send me to the latest task in this run” behavior, query tasks using
ctx.workflow_run_id. The next_task callback is evaluated after the
workflow advances, so downstream suspended tasks already exist when this
lookup runs.
You can return a task ID from another workflow run. If the task ID is
invalid, outside your org, or does not point to a suspended screen task,
Sample falls back to the default routing behavior.