Skip to main content

Overview

Events in Sample workflows allow you to trigger workflows and control their execution based on external or internal events. This creates a reactive workflow system where workflows can respond to changes in your application state.
Events functionality requires samplehc >= 0.3.0 and workflows-py >= 0.1.18.

Emitting Events

You can emit events from your workflow functions using the SampleHC client. Events consist of a name and an optional payload.

Basic Event Emission

from client_manager import SampleHealthcareClient

def emit_order_updated(ctx: WorkflowRunContext):
    """Handle the update action - triggers eligibility if status is new"""
    order = get_order(ctx)
    client = get_client(ctx)

    client.v2.events.emit(
        name="order-updated",
        payload={
            "order_id": order["order_id"],
            "bill_type": get_bill_type(ctx)
        },
    )

Event Structure

Events have the following structure:
  • name: A string identifier for the event type (e.g., “order-updated”, “order-created”)
  • payload: Any JSON-serializable data relevant to the event

Idempotency Keys

You can attach an idempotency_key when emitting an event. The key is stored with the event and shown alongside it in the event history, which makes it easy to trace which emit produced which workflow runs — and to spot duplicate emissions from retried emitters.
client.v2.events.emit(
    name="order-updated",
    payload={"order_id": order["order_id"]},
    idempotency_key=f"order-updated-{order['order_id']}-{order['updated_at']}",
)
The idempotency key does not currently prevent duplicate processing — two emits with the same key both trigger listening workflows. If you need exactly-once behavior, track processed records in your database as shown in Emitter Workflows.

Listening to Events

Workflows can listen to events using two mechanisms: start_on and cancel_on.

Starting Workflows on Events

Use start_on to automatically start a workflow when specific events are received:
from workflows_py.workflow import Workflow

workflow = Workflow(
    get_display_col_values={"Order ID": lambda ctx: ctx.get_start_data()["order_id"]},
    start_on={
        "order-created": lambda ctx: create_insurance_verification_start_data_from_start_on_ctx(ctx),
        "order-updated": lambda ctx: create_insurance_verification_start_data_from_start_on_ctx(ctx),
    },
)

Start Handler Return Values

The start_on handler function can return:
  • An object with optional start_data field: Starts the workflow with the provided start data
  • None: Does not start the workflow
def handle_order_created(ctx):
    # Start the workflow with custom data
    return {
        "start_data": {
            "order_id": ctx.event.payload["order_id"],
            "priority": "high"
        }
    }

def handle_low_priority_order(ctx):
    # Don't start workflow for low priority orders
    if ctx.event.payload.get("priority") == "low":
        return None

    # Start workflow for other priorities
    return {"start_data": {"order_id": ctx.event.payload["order_id"]}}

# These handlers would be used in your workflow configuration like this:
workflow = Workflow(
    get_display_col_values={"Order ID": lambda ctx: ctx.get_start_data()["order_id"]},
    start_on={
        "order-created": handle_order_created,
        "order-updated": handle_low_priority_order,
    },
)

Canceling Workflows on Events

Use cancel_on to automatically cancel running workflows when specific events are received:
workflow = Workflow(
    # ... other configuration
    cancel_on={
        "order-updated": lambda ctx: ctx.get_start_data()["order_id"] == ctx.event.payload.get("order_id"),
    },
)

Cancel Handler Return Values

The cancel_on handler function should return:
  • True: Cancel the workflow run
  • False: Keep the workflow running
def should_cancel_on_order_update(ctx):
    # Cancel if this workflow is processing the same order
    workflow_order_id = ctx.get_start_data()["order_id"]
    event_order_id = ctx.event.payload.get("order_id")

    return workflow_order_id == event_order_id

Event Context

When handling events, you have access to an event context that provides:

StartOnContext

Available in start_on handlers:
def handle_start_event(ctx: StartOnContext):
    # Access the event that triggered this handler
    event_name = ctx.event.name
    event_payload = ctx.event.payload

    # Access authentication token (for API calls)
    backend_token = ctx.backend_token

    # Return start data or None
    return {"start_data": {"triggered_by": event_name}}

CancelOnContext

Available in cancel_on handlers:
def handle_cancel_event(ctx: CancelOnContext):
    # Access the event that triggered this handler
    event_name = ctx.event.name
    event_payload = ctx.event.payload

    # Access the workflow's original start data
    start_data = ctx.get_start_data()

    # Return boolean decision
    return should_cancel_workflow(start_data, event_payload)

Common Patterns

Order Processing Workflow

workflow = Workflow(
    get_display_col_values={
        "Order ID": lambda ctx: ctx.get_start_data()["order_id"],
        "Status": lambda ctx: ctx.get_start_data().get("status", "pending")
    },
    start_on={
        "order-created": lambda ctx: {
            "start_data": {
                "order_id": ctx.event.payload["order_id"],
                "status": "processing"
            }
        }
    },
    cancel_on={
        "order-cancelled": lambda ctx: (
            ctx.get_start_data()["order_id"] == ctx.event.payload["order_id"]
        )
    }
)

Multi-Event Workflow

workflow = Workflow(
    start_on={
        "patient-admitted": lambda ctx: create_admission_workflow_data(ctx),
        "insurance-verified": lambda ctx: create_insurance_workflow_data(ctx),
        "urgent-case": lambda ctx: create_urgent_workflow_data(ctx),
    },
    cancel_on={
        "patient-discharged": lambda ctx: (
            ctx.get_start_data()["patient_id"] == ctx.event.payload["patient_id"]
        ),
        "case-closed": lambda ctx: (
            ctx.get_start_data()["case_id"] == ctx.event.payload["case_id"]
        ),
    }
)

Emitter Workflows

A common way to structure high-volume automations is to split them into an emitter workflow and a processor workflow:
  • The emitter runs on a cron schedule, queries a system of record for new work (for example, new orders), and emits one event per record.
  • The processor declares start_on for that event, so each record gets its own workflow run — with its own task list entries, column values, and retry behavior.
To avoid emitting the same record twice, track processed records in a database table and only insert the tracking row after the event is emitted successfully:
# Emitter workflow (runs hourly)
def emit_new_orders(ctx):
    client = SampleHealthcareClient()

    # Select records that have not been emitted yet
    new_orders = get_orders_not_yet_processed()

    for order in new_orders:
        client.v2.events.emit(
            name="order-ready-for-review",
            payload={"order_id": order["order_id"]},
        )
        # Record the emit only after it succeeds, so failures are retried
        # on the next scheduled run
        mark_order_processed(order["order_id"])

workflow = Workflow(cron_schedule="0 * * * *")
workflow.then(Step("emit-new-orders", emit_new_orders))
# Processor workflow (one run per order)
workflow = Workflow(
    get_display_col_values={
        "Order ID": lambda ctx: ctx.get_start_data().get("order_id", ""),
    },
    start_on={
        "order-ready-for-review": lambda ctx: {
            "start_data": {"order_id": ctx.event.payload["order_id"]}
        },
    },
)
Keep event payloads small — pass identifiers and have the processor workflow look up full records itself. This keeps events readable and avoids stale data when the processor runs later than the emitter.

Best Practices

Event Naming

  • Use descriptive, consistent event names (e.g., “order-created”, “patient-admitted”)
  • Include entity type and action (e.g., “user-registered”, “document-processed”)
  • Use kebab-case for event names

Payload Design

  • Include essential identifiers in the payload (IDs, timestamps)
  • Keep payloads lightweight - avoid large objects
  • Include context needed for decision making

Error Handling

def safe_event_handler(ctx):
    try:
        # Your event handling logic
        return {"start_data": process_event(ctx.event)}
    except Exception as e:
        # Log error and decide whether to start workflow
        logger.error(f"Error processing event: {e}")
        return None  # Don't start workflow on error

Performance Considerations

  • Keep event handlers lightweight and fast
  • Avoid heavy computations in start_on and cancel_on handlers
  • Use specific event matching to avoid unnecessary workflow starts/cancellations

Sample System Emitted Events

Sample automatically emits certain events when specific actions occur in your system. These events are prefixed with sample: and can be used to trigger workflows automatically.

Email

All email-related events are emitted under the sample:email:* pattern.

received

When an email is sent to your sample mailbox (your_organization@start.onsample.com), Sample automatically emits a sample:email:received event. Event Name: sample:email:received Payload Structure:
{
  "subject": "Email subject line",
  "body": "Plain text email body",
  "html": "HTML email body (if available)",
  "from": ["sender@example.com"],
  "to": ["your_organization@start.onsample.com"],
  "attachments": [
    {
      "fileName": "document.pdf",
      "id": "file-metadata-id"
    }
  ]
}
Example Usage:
from workflows_py.workflow import Workflow

workflow = Workflow(
    get_display_col_values={
        "Subject": lambda ctx: ctx.get_start_data()["subject"],
        "From": lambda ctx: ", ".join(ctx.get_start_data()["from"])
    },
    start_on={
        "sample:email:received": lambda ctx: {
            "start_data": {
                "subject": ctx.event.payload["subject"],
                "body": ctx.event.payload["body"],
                "from": ctx.event.payload["from"],
                "attachments": ctx.event.payload["attachments"]
            }
        }
    }
)

Kno2

All Kno2-related events are emitted under the sample:kno2:* pattern when messages are received through your Kno2 integration.

message-receive

When a message (such as a fax) is received through your Kno2 connection, Sample automatically emits a sample:kno2:${connection.slug}:message-receive event, where ${connection.slug} corresponds to the specific Kno2 connection that received the message. Event Name: sample:kno2:${connection.slug}:message-receive Payload Structure:
{
  "message": {
    "id": "msg_abc123def456",
    "body": null,
    "type": "Intake",
    "isNew": false,
    "origin": "eFax",
    "status": "Received",
    "isDraft": false,
    "patient": {
      "fullName": "John Doe",
      "lastName": "Doe",
      "firstName": "John",
      "middleName": null,
      "birthDate": null,
      "gender": null,
      "patientId": null,
      "patientIds": [],
      "telephone": null,
      "streetAddress1": null,
      "streetAddress2": null,
      "city": null,
      "state": null,
      "postalCode": null,
      "country": null,
      "telecom": []
    },
    "subject": "Medical Records Request",
    "isUrgent": false,
    "priority": "NotUrgent",
    "toAddress": "555-123-4567",
    "fromAddress": "555-987-6543",
    "properties": {
      "faxPageCount": "4",
      "originalFaxPageCount": "4"
    },
    "sourceType": "Fax",
    "attachments": [
      {
        "id": "att_xyz789",
        "key": "attachment-key-123",
        "fileName": "medical_records.pdf",
        "mimeType": "application/pdf",
        "messageId": "msg_abc123def456",
        "sizeInBytes": 207212,
        "documentType": "",
        "attachmentMeta": {
          "documentDate": "2025-01-15T10:30:00.000",
          "documentType": "",
          "confidentiality": "Normal"
        }
      }
    ],
    "createdDate": "2025-01-15T10:30:00.000",
    "messageDate": "2025-01-15T10:29:45.000",
    "messageType": "default",
    "patientName": "John Doe",
    "organizationId": "org_sample123"
  },
  "attachments": [
    {
      "id": "fmd_abc123",
      "fileName": "medical_records.pdf"
    }
  ]
}
Key Fields:
  • message.id: Unique identifier for the message
  • message.subject: Subject line of the fax/message
  • message.toAddress: Recipient fax number
  • message.fromAddress: Sender fax number
  • message.sourceType: Type of message (e.g., “Fax”)
  • message.attachments: Array of attachment metadata from Kno2
  • attachments: Array of Sample file metadata IDs for downloaded attachments
Example Usage:
from workflows_py.workflow import Workflow

workflow = Workflow(
    get_display_col_values={
        "From": lambda ctx: ctx.get_start_data()["message"]["fromAddress"],
        "Subject": lambda ctx: ctx.get_start_data()["message"]["subject"],
        "Pages": lambda ctx: ctx.get_start_data()["message"]["properties"]["faxPageCount"]
    },
    start_on={
        "sample:kno2:production:message-receive": lambda ctx: {
            "start_data": {
                "message": ctx.event.payload["message"],
                "attachments": ctx.event.payload["attachments"],
                "from_fax": ctx.event.payload["message"]["fromAddress"],
                "to_fax": ctx.event.payload["message"]["toAddress"]
            }
        }
    }
)

# Process the received fax attachments
def process_kno2_fax(ctx: WorkflowRunContext):
    attachments = ctx.get_start_data()["attachments"]

    for attachment in attachments:
        # Process each attachment using its file metadata ID
        file_id = attachment["id"]
        file_name = attachment["fileName"]
        # Your processing logic here

# To listen to messages from multiple Kno2 connections:
workflow = Workflow(
    start_on={
        "sample:kno2:production:message-receive": lambda ctx: process_kno2_message(ctx),
        "sample:kno2:staging:message-receive": lambda ctx: process_kno2_message(ctx),
    }
)

Troubleshooting

Common Issues

Workflow not starting on events:
  • Verify event names match exactly between emit and listen
  • Check that handler functions return appropriate values
  • Ensure version requirements are met
Unexpected workflow cancellations:
  • Review cancel_on handler logic
  • Check event payload structure
  • Verify cancellation conditions are specific enough
Missing event data:
  • Ensure event payload contains required fields
  • Check that get_start_data() returns expected structure
  • Verify event emission includes necessary context