Overview
Events in Sample workflows allow you to trigger workflows and control their execution based on external or internal events. This creates a reactive workflow system where workflows can respond to changes in your application state.
Events functionality requires samplehc >= 0.3.0 and workflows-py >=
0.1.18.
Emitting Events
You can emit events from your workflow functions using the SampleHC client. Events consist of a name and an optional payload.
Basic Event Emission
from client_manager import SampleHealthcareClient
def emit_order_updated(ctx: WorkflowRunContext):
"""Handle the update action - triggers eligibility if status is new"""
order = get_order(ctx)
client = get_client(ctx)
client.v2.events.emit(
name="order-updated",
payload={
"order_id": order["order_id"],
"bill_type": get_bill_type(ctx)
},
)
Event Structure
Events have the following structure:
- name: A string identifier for the event type (e.g., “order-updated”, “order-created”)
- payload: Any JSON-serializable data relevant to the event
Idempotency Keys
You can attach an idempotency_key when emitting an event. The key is stored
with the event and shown alongside it in the event history, which makes it easy
to trace which emit produced which workflow runs — and to spot duplicate
emissions from retried emitters.
client.v2.events.emit(
name="order-updated",
payload={"order_id": order["order_id"]},
idempotency_key=f"order-updated-{order['order_id']}-{order['updated_at']}",
)
The idempotency key does not currently prevent duplicate processing — two
emits with the same key both trigger listening workflows. If you need
exactly-once behavior, track processed records in your
database as shown in Emitter
Workflows.
Listening to Events
Workflows can listen to events using two mechanisms: start_on and cancel_on.
Starting Workflows on Events
Use start_on to automatically start a workflow when specific events are received:
from workflows_py.workflow import Workflow
workflow = Workflow(
get_display_col_values={"Order ID": lambda ctx: ctx.get_start_data()["order_id"]},
start_on={
"order-created": lambda ctx: create_insurance_verification_start_data_from_start_on_ctx(ctx),
"order-updated": lambda ctx: create_insurance_verification_start_data_from_start_on_ctx(ctx),
},
)
Start Handler Return Values
The start_on handler function can return:
- An object with optional
start_data field: Starts the workflow with the provided start data
None: Does not start the workflow
def handle_order_created(ctx):
# Start the workflow with custom data
return {
"start_data": {
"order_id": ctx.event.payload["order_id"],
"priority": "high"
}
}
def handle_low_priority_order(ctx):
# Don't start workflow for low priority orders
if ctx.event.payload.get("priority") == "low":
return None
# Start workflow for other priorities
return {"start_data": {"order_id": ctx.event.payload["order_id"]}}
# These handlers would be used in your workflow configuration like this:
workflow = Workflow(
get_display_col_values={"Order ID": lambda ctx: ctx.get_start_data()["order_id"]},
start_on={
"order-created": handle_order_created,
"order-updated": handle_low_priority_order,
},
)
Canceling Workflows on Events
Use cancel_on to automatically cancel running workflows when specific events are received:
workflow = Workflow(
# ... other configuration
cancel_on={
"order-updated": lambda ctx: ctx.get_start_data()["order_id"] == ctx.event.payload.get("order_id"),
},
)
Cancel Handler Return Values
The cancel_on handler function should return:
True: Cancel the workflow run
False: Keep the workflow running
def should_cancel_on_order_update(ctx):
# Cancel if this workflow is processing the same order
workflow_order_id = ctx.get_start_data()["order_id"]
event_order_id = ctx.event.payload.get("order_id")
return workflow_order_id == event_order_id
Event Context
When handling events, you have access to an event context that provides:
StartOnContext
Available in start_on handlers:
def handle_start_event(ctx: StartOnContext):
# Access the event that triggered this handler
event_name = ctx.event.name
event_payload = ctx.event.payload
# Access authentication token (for API calls)
backend_token = ctx.backend_token
# Return start data or None
return {"start_data": {"triggered_by": event_name}}
CancelOnContext
Available in cancel_on handlers:
def handle_cancel_event(ctx: CancelOnContext):
# Access the event that triggered this handler
event_name = ctx.event.name
event_payload = ctx.event.payload
# Access the workflow's original start data
start_data = ctx.get_start_data()
# Return boolean decision
return should_cancel_workflow(start_data, event_payload)
Common Patterns
Order Processing Workflow
workflow = Workflow(
get_display_col_values={
"Order ID": lambda ctx: ctx.get_start_data()["order_id"],
"Status": lambda ctx: ctx.get_start_data().get("status", "pending")
},
start_on={
"order-created": lambda ctx: {
"start_data": {
"order_id": ctx.event.payload["order_id"],
"status": "processing"
}
}
},
cancel_on={
"order-cancelled": lambda ctx: (
ctx.get_start_data()["order_id"] == ctx.event.payload["order_id"]
)
}
)
Multi-Event Workflow
workflow = Workflow(
start_on={
"patient-admitted": lambda ctx: create_admission_workflow_data(ctx),
"insurance-verified": lambda ctx: create_insurance_workflow_data(ctx),
"urgent-case": lambda ctx: create_urgent_workflow_data(ctx),
},
cancel_on={
"patient-discharged": lambda ctx: (
ctx.get_start_data()["patient_id"] == ctx.event.payload["patient_id"]
),
"case-closed": lambda ctx: (
ctx.get_start_data()["case_id"] == ctx.event.payload["case_id"]
),
}
)
Emitter Workflows
A common way to structure high-volume automations is to split them into an
emitter workflow and a processor workflow:
- The emitter runs on a cron schedule,
queries a system of record for new work (for example, new orders), and emits
one event per record.
- The processor declares
start_on for that event, so each record gets its
own workflow run — with its own task list entries, column values, and retry
behavior.
To avoid emitting the same record twice, track processed records in a
database table and only insert the tracking row after the
event is emitted successfully:
# Emitter workflow (runs hourly)
def emit_new_orders(ctx):
client = SampleHealthcareClient()
# Select records that have not been emitted yet
new_orders = get_orders_not_yet_processed()
for order in new_orders:
client.v2.events.emit(
name="order-ready-for-review",
payload={"order_id": order["order_id"]},
)
# Record the emit only after it succeeds, so failures are retried
# on the next scheduled run
mark_order_processed(order["order_id"])
workflow = Workflow(cron_schedule="0 * * * *")
workflow.then(Step("emit-new-orders", emit_new_orders))
# Processor workflow (one run per order)
workflow = Workflow(
get_display_col_values={
"Order ID": lambda ctx: ctx.get_start_data().get("order_id", ""),
},
start_on={
"order-ready-for-review": lambda ctx: {
"start_data": {"order_id": ctx.event.payload["order_id"]}
},
},
)
Keep event payloads small — pass identifiers and have the processor workflow
look up full records itself. This keeps events readable and avoids stale data
when the processor runs later than the emitter.
Best Practices
Event Naming
- Use descriptive, consistent event names (e.g., “order-created”, “patient-admitted”)
- Include entity type and action (e.g., “user-registered”, “document-processed”)
- Use kebab-case for event names
Payload Design
- Include essential identifiers in the payload (IDs, timestamps)
- Keep payloads lightweight - avoid large objects
- Include context needed for decision making
Error Handling
def safe_event_handler(ctx):
try:
# Your event handling logic
return {"start_data": process_event(ctx.event)}
except Exception as e:
# Log error and decide whether to start workflow
logger.error(f"Error processing event: {e}")
return None # Don't start workflow on error
- Keep event handlers lightweight and fast
- Avoid heavy computations in
start_on and cancel_on handlers
- Use specific event matching to avoid unnecessary workflow starts/cancellations
Sample System Emitted Events
Sample automatically emits certain events when specific actions occur in your system. These events are prefixed with sample: and can be used to trigger workflows automatically.
Email
All email-related events are emitted under the sample:email:* pattern.
received
When an email is sent to your sample mailbox (your_organization@start.onsample.com), Sample automatically emits a sample:email:received event.
Event Name: sample:email:received
Payload Structure:
{
"subject": "Email subject line",
"body": "Plain text email body",
"html": "HTML email body (if available)",
"from": ["sender@example.com"],
"to": ["your_organization@start.onsample.com"],
"attachments": [
{
"fileName": "document.pdf",
"id": "file-metadata-id"
}
]
}
Example Usage:
from workflows_py.workflow import Workflow
workflow = Workflow(
get_display_col_values={
"Subject": lambda ctx: ctx.get_start_data()["subject"],
"From": lambda ctx: ", ".join(ctx.get_start_data()["from"])
},
start_on={
"sample:email:received": lambda ctx: {
"start_data": {
"subject": ctx.event.payload["subject"],
"body": ctx.event.payload["body"],
"from": ctx.event.payload["from"],
"attachments": ctx.event.payload["attachments"]
}
}
}
)
Kno2
All Kno2-related events are emitted under the sample:kno2:* pattern when messages are received through your Kno2 integration.
message-receive
When a message (such as a fax) is received through your Kno2 connection, Sample automatically emits a sample:kno2:${connection.slug}:message-receive event, where ${connection.slug} corresponds to the specific Kno2 connection that received the message.
Event Name: sample:kno2:${connection.slug}:message-receive
Payload Structure:
{
"message": {
"id": "msg_abc123def456",
"body": null,
"type": "Intake",
"isNew": false,
"origin": "eFax",
"status": "Received",
"isDraft": false,
"patient": {
"fullName": "John Doe",
"lastName": "Doe",
"firstName": "John",
"middleName": null,
"birthDate": null,
"gender": null,
"patientId": null,
"patientIds": [],
"telephone": null,
"streetAddress1": null,
"streetAddress2": null,
"city": null,
"state": null,
"postalCode": null,
"country": null,
"telecom": []
},
"subject": "Medical Records Request",
"isUrgent": false,
"priority": "NotUrgent",
"toAddress": "555-123-4567",
"fromAddress": "555-987-6543",
"properties": {
"faxPageCount": "4",
"originalFaxPageCount": "4"
},
"sourceType": "Fax",
"attachments": [
{
"id": "att_xyz789",
"key": "attachment-key-123",
"fileName": "medical_records.pdf",
"mimeType": "application/pdf",
"messageId": "msg_abc123def456",
"sizeInBytes": 207212,
"documentType": "",
"attachmentMeta": {
"documentDate": "2025-01-15T10:30:00.000",
"documentType": "",
"confidentiality": "Normal"
}
}
],
"createdDate": "2025-01-15T10:30:00.000",
"messageDate": "2025-01-15T10:29:45.000",
"messageType": "default",
"patientName": "John Doe",
"organizationId": "org_sample123"
},
"attachments": [
{
"id": "fmd_abc123",
"fileName": "medical_records.pdf"
}
]
}
Key Fields:
message.id: Unique identifier for the message
message.subject: Subject line of the fax/message
message.toAddress: Recipient fax number
message.fromAddress: Sender fax number
message.sourceType: Type of message (e.g., “Fax”)
message.attachments: Array of attachment metadata from Kno2
attachments: Array of Sample file metadata IDs for downloaded attachments
Example Usage:
from workflows_py.workflow import Workflow
workflow = Workflow(
get_display_col_values={
"From": lambda ctx: ctx.get_start_data()["message"]["fromAddress"],
"Subject": lambda ctx: ctx.get_start_data()["message"]["subject"],
"Pages": lambda ctx: ctx.get_start_data()["message"]["properties"]["faxPageCount"]
},
start_on={
"sample:kno2:production:message-receive": lambda ctx: {
"start_data": {
"message": ctx.event.payload["message"],
"attachments": ctx.event.payload["attachments"],
"from_fax": ctx.event.payload["message"]["fromAddress"],
"to_fax": ctx.event.payload["message"]["toAddress"]
}
}
}
)
# Process the received fax attachments
def process_kno2_fax(ctx: WorkflowRunContext):
attachments = ctx.get_start_data()["attachments"]
for attachment in attachments:
# Process each attachment using its file metadata ID
file_id = attachment["id"]
file_name = attachment["fileName"]
# Your processing logic here
# To listen to messages from multiple Kno2 connections:
workflow = Workflow(
start_on={
"sample:kno2:production:message-receive": lambda ctx: process_kno2_message(ctx),
"sample:kno2:staging:message-receive": lambda ctx: process_kno2_message(ctx),
}
)
Troubleshooting
Common Issues
Workflow not starting on events:
- Verify event names match exactly between emit and listen
- Check that handler functions return appropriate values
- Ensure version requirements are met
Unexpected workflow cancellations:
- Review
cancel_on handler logic
- Check event payload structure
- Verify cancellation conditions are specific enough
Missing event data:
- Ensure event payload contains required fields
- Check that
get_start_data() returns expected structure
- Verify event emission includes necessary context