Module 3 · Build Phase 2

Real-Time Streaming Ingest

Stand up a Fabric Eventhouse, define KQL tables for three live data streams (hospital vitals, hospital movement, Metra trains), wire each Event Hub through an Eventstream, and watch live data start flowing.

75 minutes
🎯 Goal: 3 streams → 1 Eventhouse
🛠 Eventhouse · Eventstream · KQL

Learning Objectives

The Three Streams You'll Wire Up

Stream Source (Event Hub) KQL table Volume Domain
🏥 Hospital Vitals medicalvitals HospitalVitals ~1 evt/8s × 15 patients Health
🏥 Hospital Movement medicalmovement HospitalMovement ~1 evt/min Health
🚆 Metra Trains metrotrain TrainTelemetry ~1 evt/3s × 3 trains Transit
Streaming pipeline diagram
Figure 3.1 - Module 3 fan-in pattern: three Event Hubs → three Eventstreams → one Eventhouse with three KQL tables.

Step 1 - Create Your Eventhouse EVENTHOUSE

  1. Create the Eventhouse

    From your workspace home, click + New item → search "Eventhouse" → pick Eventhouse.

    Name it: eh_urbanpulse_rtiCreate.

    Fabric provisions the Eventhouse and a child KQL Database with the same name. You land on the Eventhouse System Overview page.

    📷
    SCREENSHOT NEEDED
    module-3/00-create-eventhouse.png
    "New Eventhouse" dialog with name eh_urbanpulse_rti filled in
    Eventhouse vs. KQL Database. The Eventhouse is the container; the KQL Database is the queryable surface. One Eventhouse can host many KQL Databases - we only need one for this lab.
  2. Open the KQL Database

    Inside the Eventhouse, click the KQL Database tile (same name). This is where all your KQL tables will live.

    📷
    SCREENSHOT NEEDED
    module-3/01-eventhouse-overview.png
    Eventhouse System Overview with the KQL Database tile
  3. Enable OneLake availability

    By default, KQL data lives in the Eventhouse's hot storage tier only. Turning on OneLake availability mirrors every KQL table out as Delta in OneLake automatically - no copy job, no pipeline. That's what lets your Direct Lake semantic model (Module 6), Fabric Ontology (Module 7), and Data Agents (Modules 8-9) read the same streaming data without bespoke ETL.

    Inside the KQL Database, click OneLake on the top toolbar → choose Yes from the dropdown. In the Enable OneLake Availability dialog, leave Apply to existing tables ticked and click Enable. The OneLake tile in the right-hand Database details panel will flip from Disabled to Enabled.

    📷
    SCREENSHOT NEEDED
    module-3/02-onelake-availability.png
    KQL Database details pane with the OneLake availability toggle switched On
    One stream, two surfaces. After this toggle, every record Eventstream writes to a KQL table will also land as a Delta row in OneLake within ~1 minute. Hot path stays sub-second for the Real-Time Dashboard; warm path is queryable from Lakehouse SQL endpoint, Power BI Direct Lake, or any other OneLake consumer.
  4. Open a query window

    Click the Explore your data button (or the database name in the left tree → Query). You'll write all the KQL in this query editor.

Step 2 - Create All Three KQL Tables KQL

Run the following KQL block in the query editor. It creates all three tables, enables streaming ingestion, and registers JSON ingestion mappings (which Eventstream will reference).

Why explicit mappings? Eventstream's no-code path can also infer schema, but for production-grade ingest you want named mappings so schema drift in the source doesn't silently break ingestion. This is the same pattern the hackathon repo uses - and it's the production-grade way.

2a - Hospital Vitals + Hospital Movement

KQL
// =========================================================
// HOSPITAL VITALS - patient telemetry
// =========================================================
.create table HospitalVitals (
    patient_id: string,
    age: int,
    gender: string,
    diagnosis_code: string,
    condition: string,
    heart_rate: int,
    bp_systolic: int,
    bp_diastolic: int,
    temperature_f: real,
    spo2: int,
    respiratory_rate: int,
    timestamp: datetime
)

.alter table HospitalVitals policy streamingingestion enable

.create table HospitalVitals ingestion json mapping 'HospitalVitals_mapping'
'[{"column":"patient_id","path":"$.patient_id","datatype":"string"},'
 '{"column":"age","path":"$.age","datatype":"int"},'
 '{"column":"gender","path":"$.gender","datatype":"string"},'
 '{"column":"diagnosis_code","path":"$.diagnosis_code","datatype":"string"},'
 '{"column":"condition","path":"$.condition","datatype":"string"},'
 '{"column":"heart_rate","path":"$.heart_rate","datatype":"int"},'
 '{"column":"bp_systolic","path":"$.bp_systolic","datatype":"int"},'
 '{"column":"bp_diastolic","path":"$.bp_diastolic","datatype":"int"},'
 '{"column":"temperature_f","path":"$.temperature_f","datatype":"real"},'
 '{"column":"spo2","path":"$.spo2","datatype":"int"},'
 '{"column":"respiratory_rate","path":"$.respiratory_rate","datatype":"int"},'
 '{"column":"timestamp","path":"$.timestamp","datatype":"datetime"}]'

// =========================================================
// HOSPITAL MOVEMENT - admit / transfer / discharge events
// =========================================================
.create table HospitalMovement (
    patient_id: string,
    age: int,
    gender: string,
    diagnosis_code: string,
    diagnosis_desc: string,
    event_type: string,
    from_location: string,
    to_location: string,
    floor: string,
    timestamp: datetime
)

.alter table HospitalMovement policy streamingingestion enable

.create table HospitalMovement ingestion json mapping 'HospitalMovement_mapping'
'[{"column":"patient_id","path":"$.patient_id","datatype":"string"},'
 '{"column":"age","path":"$.age","datatype":"int"},'
 '{"column":"gender","path":"$.gender","datatype":"string"},'
 '{"column":"diagnosis_code","path":"$.diagnosis_code","datatype":"string"},'
 '{"column":"diagnosis_desc","path":"$.diagnosis_desc","datatype":"string"},'
 '{"column":"event_type","path":"$.event_type","datatype":"string"},'
 '{"column":"from_location","path":"$.from_location","datatype":"string"},'
 '{"column":"to_location","path":"$.to_location","datatype":"string"},'
 '{"column":"floor","path":"$.floor","datatype":"string"},'
 '{"column":"timestamp","path":"$.timestamp","datatype":"datetime"}]'

2b - Metra Train Telemetry

KQL
// =========================================================
// TRAIN TELEMETRY - Red / Blue / Green lines
// =========================================================
.create table TrainTelemetry (
    trainId: string,
    line: string,
    lat: real,
    lon: real,
    speed: real,
    status: string,
    timestamp: datetime
)

.alter table TrainTelemetry policy streamingingestion enable

.create table TrainTelemetry ingestion json mapping 'TrainTelemetry_mapping'
'[{"column":"trainId","path":"$.trainId","datatype":"string"},'
 '{"column":"line","path":"$.line","datatype":"string"},'
 '{"column":"lat","path":"$.lat","datatype":"real"},'
 '{"column":"lon","path":"$.lon","datatype":"real"},'
 '{"column":"speed","path":"$.speed","datatype":"real"},'
 '{"column":"status","path":"$.status","datatype":"string"},'
 '{"column":"timestamp","path":"$.timestamp","datatype":"datetime"}]'
KQL
// =========================================================
// VERIFY - list all three tables
// =========================================================
.show tables

Validation

.show tables should return three rows: HospitalVitals, HospitalMovement, TrainTelemetry. All empty for now.

Step 3 - Build three Eventstreams EVENTSTREAM

Producers are running centrally. The three Event Hubs (medicalvitals, medicalmovement, metrotrain) are already receiving live JSON. As soon as you publish each Eventstream and activate its destination, the matching KQL table starts filling.

You'll build one Eventstream per data domain - three in total. Each Eventstream owns a single Azure Event Hubs source and a single KQL destination using Direct ingestion mode. The pattern is identical for all three; you'll walk through es_hospitalvitals end to end below, then repeat the same six sub-steps with the values from the table in 3b.

EventstreamEvent Hub sourceKQL tableMapping
es_hospitalvitalsmedicalvitalsHospitalVitalsHospitalVitals_mapping
es_hospitalmovementmedicalmovementHospitalMovementHospitalMovement_mapping
es_traintelemetrymetrotrainTrainTelemetryTrainTelemetry_mapping

3a - Walk through es_hospitalvitals end to end

  1. Create the Eventstream

    From your workspace, click + New itemEventstream. Enter the name es_hospitalvitals and click Create.

    Fabric provisions the item and drops you onto an empty canvas with a placeholder source node and an internal stream named es_hospitalvitals-stream in the middle.

  2. Add the Event Hub source

    Click the placeholder source node → External sourceAzure Event HubsAdd. In the source pane on the right:

    1. Under Cloud connection, click + Create new connection.
    2. Fill in the connection fields using the values issued by your coach for this lab room (Event Hub namespace, hub name, listen-only SAS connection string), plus the per-Eventstream values below. See Module 0 · Setup & Environment Check for context.
      • Event Hub: medicalvitals
      • Connection name: cn-medicalvitals
    3. Click Connect, then back in the source pane confirm Consumer group = $Default and Data format = Json.
    4. (Optional) Rename the canvas node from azure-event-hub to src-medical-vitals.
    5. Click Add.

    The Connection settings pane asks for the Event Hub namespace and the Event Hub. The namespace is the same for all three streams; only the Event Hub name changes:

    StreamEvent Hub namespaceEvent Hub
    Hospital Vitalsrtidemomedicalvitals
    Hospital Movementrtidemomedicalmovement
    Metro Trainsrtidemometrotrain

    The source node turns green and shows Active within ~30 seconds.

    !
    Always create a new connection per Eventstream. The sendListen SAS policy is hub-scoped, so a key issued for one Event Hub will not authenticate against another. Reusing one connection across Eventstreams produces an InvalidSignature error on the second source you wire up.
  3. Verify the live preview

    With the source node selected, open the Data preview tab in the bottom pane. JSON events should populate within ~10 seconds. If the pane stays empty, the producer for this hub is not running, or the connection settings are wrong - fix the source before continuing.

  4. Add the KQL destination

    On the canvas, click the source node → + Add destinationEventhouse. In the destination pane on the right, fill in:

    • Data ingestion mode: Direct ingestion
    • Destination name: dest-hospital-vitals
    • Workspace: your lab workspace
    • Eventhouse: eh_urbanpulse_rti
    • KQL Database: eh_urbanpulse_rti

    Click Save. The destination tile lands on the canvas in an Unconfigured state - this is expected.

    No table picker yet. Direct ingestion mode does not ask for the destination table or mapping in this dialog. The notice "Activation for KQL database direct ingestion mode requires further actions after publish" is expected. You'll bind the destination to its KQL table in sub-step 6.
    📷
    SCREENSHOT NEEDED
    module-3/02-eventstream-source-destination.png
    Eventstream canvas with the Event Hub source connected through the internal stream to the Unconfigured Eventhouse destination tile
  5. Publish the Eventstream

    Click Publish on the toolbar. Fabric commits the topology and exits edit mode. The source tile remains Active, and the destination tile now shows a Configure banner because the table binding is still missing.

  6. Activate the destination - bind table and mapping

    On the published canvas, click Configure on the destination tile. In the Activate dialog, fill in:

    • Destination table: HospitalVitals
    • Input data format: JSON
    • Existing mapping: HospitalVitals_mapping

    Click Add. Within ~30 seconds the destination flips to Active and rows start landing in HospitalVitals.

    📷
    SCREENSHOT NEEDED
    module-3/03-eventstream-activate-destination.png
    Activate dialog for the Direct ingestion destination, with HospitalVitals selected as Destination table and HospitalVitals_mapping selected as Existing mapping
    !
    If the destination stays in Failed or rows never appear, the most common cause is a typo in the mapping name. Confirm the destination is bound to HospitalVitals_mapping exactly - mapping names are case-sensitive.

3b - Repeat for the other two Eventstreams

Run the same six sub-steps twice more, once for each row below. Create a brand-new connection in sub-step 2 every time:

EventstreamSource nameEvent HubConnection nameDestination nameTableMapping
es_hospitalmovementsrc-medical-movementmedicalmovementcn-medicalmovementdest-hospital-movementHospitalMovementHospitalMovement_mapping
es_traintelemetrysrc-metra-trainsmetrotraincn-metrotraindest-train-telemetryTrainTelemetryTrainTelemetry_mapping
📷
SCREENSHOT NEEDED
module-3/04-three-eventstreams-list.png
Workspace list view showing all three Eventstreams (es_hospitalvitals, es_hospitalmovement, es_traintelemetry) published with their sources and destinations Active

Step 4 - Verify End-to-End with KQL

Switch back to the eh_urbanpulse_rti Eventhouse and open the KQL queryset editor. Run each block below on its own. Each should return ten recent rows with timestamps from the last minute or so.

KQL · vitals
HospitalVitals
| take 10
KQL · movement
HospitalMovement
| take 10
KQL · trains
TrainTelemetry
| take 10

Then run the row-count check across all three tables at once:

KQL · row counts
// Row counts in last 5 minutes - all three should have > 0
union withsource=table HospitalVitals, HospitalMovement, TrainTelemetry
| where timestamp > ago(5m)
| summarize events = count() by table
| order by events desc
📷
SCREENSHOT NEEDED
module-3/03-kql-results.png
KQL query editor showing the union row-count query with non-zero counts for all three tables

Validation

  • All three tables return rows from the last 5 minutes.
  • Hospital Vitals should be highest volume (most patients × frequency); Movement is the smallest.
  • Train should show steady arrivals every few seconds.

Bonus - When to Use KQL Update Policies

What is an update policy?

A KQL update policy is a server-side rule attached to a destination table. Whenever rows are ingested into a source table, the policy runs a KQL function over the new batch and writes the function's output into the destination table - in the same transaction as the original ingest. The result is a second table that stays in lock-step with the first one without any external scheduler, pipeline, or notebook.

Why use one?

Eventstream gives you a no-code, JSON-passthrough path: bytes in Event Hub land verbatim in a KQL table. That's perfect for raw telemetry, but real workloads usually need more:

All four are write-time transformations: cheaper than running them on every read, and the derived table is queryable the instant a row lands.

How does it work?

Three KQL objects make up the pattern:

  1. A destination table with the shape you want consumers to see.
  2. A KQL function that reads from the source table and returns rows that match the destination's schema exactly.
  3. An update policy bound to the destination table that names the source table and the function. Once enabled, every ingest batch into the source automatically triggers the function and appends its output to the destination.

The function only sees the new batch (not the full history), so cost stays bounded as the source table grows.

Walk-through: derive a VitalsAlerts table from HospitalVitals

To make this concrete, the snippet below adds a VitalsAlerts table that contains only critical readings, with a derived severity column. Run it in the same KQL queryset you used in Step 4. The Eventstream you built in Step 3 keeps writing to HospitalVitals; the update policy makes VitalsAlerts populate automatically as rows flow in.

Optional - Add a VitalsAlerts derived table (skip if short on time)
KQL · 1. destination table
// Only critical readings land here. Same datetime column name
// as the source so the update policy passes through cleanly.
.create table VitalsAlerts (
    patient_id: string,
    heart_rate: int,
    spo2: int,
    temperature_f: real,
    severity: string,
    timestamp: datetime
)
KQL · 2. transform function
// Filters to critical readings and tags each row with a severity.
// Output schema MUST match VitalsAlerts column-for-column.
.create-or-alter function VitalsAlerts_TransformFn() {
    HospitalVitals
    | where heart_rate > 120 or heart_rate < 50
         or spo2 < 90
         or temperature_f > 101.5
    | extend severity = case(
        spo2 < 88 or heart_rate > 140, "critical",
        "warning")
    | project patient_id, heart_rate, spo2, temperature_f, severity, timestamp
}
KQL · 3. bind the policy
// Runs the function on every ingest batch into HospitalVitals
// and appends matching rows into VitalsAlerts.
.alter table VitalsAlerts policy update
@'[{"Source":"HospitalVitals","Query":"VitalsAlerts_TransformFn()","IsEnabled":true,"PropagateIngestionProperties":true}]'
KQL · 4. validate
// Should return a small, growing set of critical readings.
VitalsAlerts
| where timestamp > ago(5m)
| summarize alerts = count() by severity
| order by alerts desc
Update policies run synchronously on the ingest path, so the derived table is always in lock-step with the source. Combine that with a Real-Time Dashboard tile (Module 5) for a true sub-minute alerting pipeline.
!
Schema must match exactly. The function's project list has to match the destination table's columns by name, type, and order. A mismatch makes every ingest into the source fail until you fix or disable the policy.

What You Just Built

Module 4 curates these and the Lakehouse tables before Module 5 builds the Real-Time Dashboard on top.

References