Overview

Events in Sample workflows allow you to trigger workflows and control their execution based on external or internal events. This creates a reactive workflow system where workflows can respond to changes in your application state.
Events functionality requires samplehc >= 0.3.0 and workflows-py >= 0.1.18.

Emitting Events

You can emit events from your workflow functions using the SampleHC client. Events consist of a name and an optional payload.

Basic Event Emission

from client_manager import SampleHealthcareClient

def emit_order_updated(ctx: WorkflowRunContext):
    """Handle the update action - triggers eligibility if status is new"""
    order = get_order(ctx)
    client = get_client(ctx)
    
    client.v2.events.emit(
        name="order-updated",
        payload={
            "order_id": order["order_id"], 
            "bill_type": get_bill_type(ctx)
        },
    )

Event Structure

Events have the following structure:
  • name: A string identifier for the event type (e.g., “order-updated”, “order-created”)
  • payload: Any JSON-serializable data relevant to the event

Listening to Events

Workflows can listen to events using two mechanisms: start_on and cancel_on.

Starting Workflows on Events

Use start_on to automatically start a workflow when specific events are received:
from workflows_py.workflow import Workflow

workflow = Workflow(
    get_display_col_values={"Order ID": lambda ctx: ctx.get_start_data()["order_id"]},
    start_on={
        "order-created": lambda ctx: create_insurance_verification_start_data_from_start_on_ctx(ctx),
        "order-updated": lambda ctx: create_insurance_verification_start_data_from_start_on_ctx(ctx),
    },
)

Start Handler Return Values

The start_on handler function can return:
  • An object with optional start_data field: Starts the workflow with the provided start data
  • None: Does not start the workflow
def handle_order_created(ctx):
    # Start the workflow with custom data
    return {
        "start_data": {
            "order_id": ctx.event.payload["order_id"],
            "priority": "high"
        }
    }

def handle_low_priority_order(ctx):
    # Don't start workflow for low priority orders
    if ctx.event.payload.get("priority") == "low":
        return None
    
    # Start workflow for other priorities
    return {"start_data": {"order_id": ctx.event.payload["order_id"]}}

# These handlers would be used in your workflow configuration like this:
workflow = Workflow(
    get_display_col_values={"Order ID": lambda ctx: ctx.get_start_data()["order_id"]},
    start_on={
        "order-created": handle_order_created,
        "order-updated": handle_low_priority_order,
    },
)

Canceling Workflows on Events

Use cancel_on to automatically cancel running workflows when specific events are received:
workflow = Workflow(
    # ... other configuration
    cancel_on={
        "order-updated": lambda ctx: ctx.get_start_data()["order_id"] == ctx.event.payload.get("order_id"),
    },
)

Cancel Handler Return Values

The cancel_on handler function should return:
  • True: Cancel the workflow run
  • False: Keep the workflow running
def should_cancel_on_order_update(ctx):
    # Cancel if this workflow is processing the same order
    workflow_order_id = ctx.get_start_data()["order_id"]
    event_order_id = ctx.event.payload.get("order_id")
    
    return workflow_order_id == event_order_id

Event Context

When handling events, you have access to an event context that provides:

StartOnContext

Available in start_on handlers:
def handle_start_event(ctx: StartOnContext):
    # Access the event that triggered this handler
    event_name = ctx.event.name
    event_payload = ctx.event.payload
    
    # Access authentication token (for API calls)
    backend_token = ctx.backend_token
    
    # Return start data or None
    return {"start_data": {"triggered_by": event_name}}

CancelOnContext

Available in cancel_on handlers:
def handle_cancel_event(ctx: CancelOnContext):
    # Access the event that triggered this handler
    event_name = ctx.event.name
    event_payload = ctx.event.payload
    
    # Access the workflow's original start data
    start_data = ctx.get_start_data()
    
    # Return boolean decision
    return should_cancel_workflow(start_data, event_payload)

Common Patterns

Order Processing Workflow

workflow = Workflow(
    get_display_col_values={
        "Order ID": lambda ctx: ctx.get_start_data()["order_id"],
        "Status": lambda ctx: ctx.get_start_data().get("status", "pending")
    },
    start_on={
        "order-created": lambda ctx: {
            "start_data": {
                "order_id": ctx.event.payload["order_id"],
                "status": "processing"
            }
        }
    },
    cancel_on={
        "order-cancelled": lambda ctx: (
            ctx.get_start_data()["order_id"] == ctx.event.payload["order_id"]
        )
    }
)

Multi-Event Workflow

workflow = Workflow(
    start_on={
        "patient-admitted": lambda ctx: create_admission_workflow_data(ctx),
        "insurance-verified": lambda ctx: create_insurance_workflow_data(ctx),
        "urgent-case": lambda ctx: create_urgent_workflow_data(ctx),
    },
    cancel_on={
        "patient-discharged": lambda ctx: (
            ctx.get_start_data()["patient_id"] == ctx.event.payload["patient_id"]
        ),
        "case-closed": lambda ctx: (
            ctx.get_start_data()["case_id"] == ctx.event.payload["case_id"]
        ),
    }
)

Best Practices

Event Naming

  • Use descriptive, consistent event names (e.g., “order-created”, “patient-admitted”)
  • Include entity type and action (e.g., “user-registered”, “document-processed”)
  • Use kebab-case for event names

Payload Design

  • Include essential identifiers in the payload (IDs, timestamps)
  • Keep payloads lightweight - avoid large objects
  • Include context needed for decision making

Error Handling

def safe_event_handler(ctx):
    try:
        # Your event handling logic
        return {"start_data": process_event(ctx.event)}
    except Exception as e:
        # Log error and decide whether to start workflow
        logger.error(f"Error processing event: {e}")
        return None  # Don't start workflow on error

Performance Considerations

  • Keep event handlers lightweight and fast
  • Avoid heavy computations in start_on and cancel_on handlers
  • Use specific event matching to avoid unnecessary workflow starts/cancellations

Sample System Emitted Events

Sample automatically emits certain events when specific actions occur in your system. These events are prefixed with sample: and can be used to trigger workflows automatically.

Email

All email-related events are emitted under the sample:email:* pattern.

received

When an email is sent to your sample mailbox (your_organization@start.onsample.com), Sample automatically emits a sample:email:received event. Event Name: sample:email:received Payload Structure:
{
  "subject": "Email subject line",
  "body": "Plain text email body",
  "html": "HTML email body (if available)",
  "from": ["sender@example.com"],
  "to": ["your_organization@start.onsample.com"],
  "attachments": [
    {
      "fileName": "document.pdf",
      "id": "file-metadata-id"
    }
  ]
}
Example Usage:
from workflows_py.workflow import Workflow

workflow = Workflow(
    get_display_col_values={
        "Subject": lambda ctx: ctx.get_start_data()["subject"],
        "From": lambda ctx: ", ".join(ctx.get_start_data()["from"])
    },
    start_on={
        "sample:email:received": lambda ctx: {
            "start_data": {
                "subject": ctx.event.payload["subject"],
                "body": ctx.event.payload["body"],
                "from": ctx.event.payload["from"],
                "attachments": ctx.event.payload["attachments"]
            }
        }
    }
)

Kno2

All Kno2-related events are emitted under the sample:kno2:* pattern when messages are received through your Kno2 integration.

message-receive

When a message (such as a fax) is received through your Kno2 connection, Sample automatically emits a sample:kno2:${connection.slug}:message-receive event, where ${connection.slug} corresponds to the specific Kno2 connection that received the message. Event Name: sample:kno2:${connection.slug}:message-receive Payload Structure:
{
  "message": {
    "id": "msg_abc123def456",
    "body": null,
    "type": "Intake",
    "isNew": false,
    "origin": "eFax",
    "status": "Received",
    "isDraft": false,
    "patient": {
      "fullName": "John Doe",
      "lastName": "Doe",
      "firstName": "John",
      "middleName": null,
      "birthDate": null,
      "gender": null,
      "patientId": null,
      "patientIds": [],
      "telephone": null,
      "streetAddress1": null,
      "streetAddress2": null,
      "city": null,
      "state": null,
      "postalCode": null,
      "country": null,
      "telecom": []
    },
    "subject": "Medical Records Request",
    "isUrgent": false,
    "priority": "NotUrgent",
    "toAddress": "555-123-4567",
    "fromAddress": "555-987-6543",
    "properties": {
      "faxPageCount": "4",
      "originalFaxPageCount": "4"
    },
    "sourceType": "Fax",
    "attachments": [
      {
        "id": "att_xyz789",
        "key": "attachment-key-123",
        "fileName": "medical_records.pdf",
        "mimeType": "application/pdf",
        "messageId": "msg_abc123def456",
        "sizeInBytes": 207212,
        "documentType": "",
        "attachmentMeta": {
          "documentDate": "2025-01-15T10:30:00.000",
          "documentType": "",
          "confidentiality": "Normal"
        }
      }
    ],
    "createdDate": "2025-01-15T10:30:00.000",
    "messageDate": "2025-01-15T10:29:45.000",
    "messageType": "default",
    "patientName": "John Doe",
    "organizationId": "org_sample123"
  },
  "attachments": [
    {
      "id": "fmd_abc123",
      "fileName": "medical_records.pdf"
    }
  ]
}
Key Fields:
  • message.id: Unique identifier for the message
  • message.subject: Subject line of the fax/message
  • message.toAddress: Recipient fax number
  • message.fromAddress: Sender fax number
  • message.sourceType: Type of message (e.g., “Fax”)
  • message.attachments: Array of attachment metadata from Kno2
  • attachments: Array of Sample file metadata IDs for downloaded attachments
Example Usage:
from workflows_py.workflow import Workflow

workflow = Workflow(
    get_display_col_values={
        "From": lambda ctx: ctx.get_start_data()["message"]["fromAddress"],
        "Subject": lambda ctx: ctx.get_start_data()["message"]["subject"],
        "Pages": lambda ctx: ctx.get_start_data()["message"]["properties"]["faxPageCount"]
    },
    start_on={
        "sample:kno2:production:message-receive": lambda ctx: {
            "start_data": {
                "message": ctx.event.payload["message"],
                "attachments": ctx.event.payload["attachments"],
                "from_fax": ctx.event.payload["message"]["fromAddress"],
                "to_fax": ctx.event.payload["message"]["toAddress"]
            }
        }
    }
)

# Process the received fax attachments
def process_kno2_fax(ctx: WorkflowRunContext):
    attachments = ctx.get_start_data()["attachments"]
    
    for attachment in attachments:
        # Process each attachment using its file metadata ID
        file_id = attachment["id"]
        file_name = attachment["fileName"]
        # Your processing logic here

# To listen to messages from multiple Kno2 connections:
workflow = Workflow(
    start_on={
        "sample:kno2:production:message-receive": lambda ctx: process_kno2_message(ctx),
        "sample:kno2:staging:message-receive": lambda ctx: process_kno2_message(ctx),
    }
)

Troubleshooting

Common Issues

Workflow not starting on events:
  • Verify event names match exactly between emit and listen
  • Check that handler functions return appropriate values
  • Ensure version requirements are met
Unexpected workflow cancellations:
  • Review cancel_on handler logic
  • Check event payload structure
  • Verify cancellation conditions are specific enough
Missing event data:
  • Ensure event payload contains required fields
  • Check that get_start_data() returns expected structure
  • Verify event emission includes necessary context