Stand up a Fabric Eventhouse, define KQL tables for three live data streams (hospital vitals, hospital movement, Metra trains), wire each Event Hub through an Eventstream, and watch live data start flowing.
| Stream | Source (Event Hub) | KQL table | Volume | Domain |
|---|---|---|---|---|
| 🏥 Hospital Vitals | medicalvitals |
HospitalVitals |
~1 evt/8s × 15 patients | Health |
| 🏥 Hospital Movement | medicalmovement |
HospitalMovement |
~1 evt/min | Health |
| 🚆 Metra Trains | metrotrain |
TrainTelemetry |
~1 evt/3s × 3 trains | Transit |
From your workspace home, click + New item → search "Eventhouse" → pick Eventhouse.
Name it: eh_urbanpulse_rti → Create.
Fabric provisions the Eventhouse and a child KQL Database with the same name. You land on the Eventhouse System Overview page.
eh_urbanpulse_rti filled inInside the Eventhouse, click the KQL Database tile (same name). This is where all your KQL tables will live.
By default, KQL data lives in the Eventhouse's hot storage tier only. Turning on OneLake availability mirrors every KQL table out as Delta in OneLake automatically - no copy job, no pipeline. That's what lets your Direct Lake semantic model (Module 6), Fabric Ontology (Module 7), and Data Agents (Modules 8-9) read the same streaming data without bespoke ETL.
Inside the KQL Database, click OneLake on the top toolbar → choose Yes from the dropdown. In the Enable OneLake Availability dialog, leave Apply to existing tables ticked and click Enable. The OneLake tile in the right-hand Database details panel will flip from Disabled to Enabled.
Click the Explore your data button (or the database name in the left tree → Query). You'll write all the KQL in this query editor.
Run the following KQL block in the query editor. It creates all three tables, enables streaming ingestion, and registers JSON ingestion mappings (which Eventstream will reference).
// =========================================================
// HOSPITAL VITALS - patient telemetry
// =========================================================
.create table HospitalVitals (
patient_id: string,
age: int,
gender: string,
diagnosis_code: string,
condition: string,
heart_rate: int,
bp_systolic: int,
bp_diastolic: int,
temperature_f: real,
spo2: int,
respiratory_rate: int,
timestamp: datetime
)
.alter table HospitalVitals policy streamingingestion enable
.create table HospitalVitals ingestion json mapping 'HospitalVitals_mapping'
'[{"column":"patient_id","path":"$.patient_id","datatype":"string"},'
'{"column":"age","path":"$.age","datatype":"int"},'
'{"column":"gender","path":"$.gender","datatype":"string"},'
'{"column":"diagnosis_code","path":"$.diagnosis_code","datatype":"string"},'
'{"column":"condition","path":"$.condition","datatype":"string"},'
'{"column":"heart_rate","path":"$.heart_rate","datatype":"int"},'
'{"column":"bp_systolic","path":"$.bp_systolic","datatype":"int"},'
'{"column":"bp_diastolic","path":"$.bp_diastolic","datatype":"int"},'
'{"column":"temperature_f","path":"$.temperature_f","datatype":"real"},'
'{"column":"spo2","path":"$.spo2","datatype":"int"},'
'{"column":"respiratory_rate","path":"$.respiratory_rate","datatype":"int"},'
'{"column":"timestamp","path":"$.timestamp","datatype":"datetime"}]'
// =========================================================
// HOSPITAL MOVEMENT - admit / transfer / discharge events
// =========================================================
.create table HospitalMovement (
patient_id: string,
age: int,
gender: string,
diagnosis_code: string,
diagnosis_desc: string,
event_type: string,
from_location: string,
to_location: string,
floor: string,
timestamp: datetime
)
.alter table HospitalMovement policy streamingingestion enable
.create table HospitalMovement ingestion json mapping 'HospitalMovement_mapping'
'[{"column":"patient_id","path":"$.patient_id","datatype":"string"},'
'{"column":"age","path":"$.age","datatype":"int"},'
'{"column":"gender","path":"$.gender","datatype":"string"},'
'{"column":"diagnosis_code","path":"$.diagnosis_code","datatype":"string"},'
'{"column":"diagnosis_desc","path":"$.diagnosis_desc","datatype":"string"},'
'{"column":"event_type","path":"$.event_type","datatype":"string"},'
'{"column":"from_location","path":"$.from_location","datatype":"string"},'
'{"column":"to_location","path":"$.to_location","datatype":"string"},'
'{"column":"floor","path":"$.floor","datatype":"string"},'
'{"column":"timestamp","path":"$.timestamp","datatype":"datetime"}]'
// =========================================================
// TRAIN TELEMETRY - Red / Blue / Green lines
// =========================================================
.create table TrainTelemetry (
trainId: string,
line: string,
lat: real,
lon: real,
speed: real,
status: string,
timestamp: datetime
)
.alter table TrainTelemetry policy streamingingestion enable
.create table TrainTelemetry ingestion json mapping 'TrainTelemetry_mapping'
'[{"column":"trainId","path":"$.trainId","datatype":"string"},'
'{"column":"line","path":"$.line","datatype":"string"},'
'{"column":"lat","path":"$.lat","datatype":"real"},'
'{"column":"lon","path":"$.lon","datatype":"real"},'
'{"column":"speed","path":"$.speed","datatype":"real"},'
'{"column":"status","path":"$.status","datatype":"string"},'
'{"column":"timestamp","path":"$.timestamp","datatype":"datetime"}]'
// =========================================================
// VERIFY - list all three tables
// =========================================================
.show tables
.show tables should return three rows: HospitalVitals, HospitalMovement, TrainTelemetry. All empty for now.
medicalvitals, medicalmovement, metrotrain) are already receiving live JSON. As soon as you publish each Eventstream and activate its destination, the matching KQL table starts filling.
You'll build one Eventstream per data domain - three in total. Each Eventstream owns a single Azure Event Hubs source and a single KQL destination using Direct ingestion mode. The pattern is identical for all three; you'll walk through es_hospitalvitals end to end below, then repeat the same six sub-steps with the values from the table in 3b.
| Eventstream | Event Hub source | KQL table | Mapping |
|---|---|---|---|
es_hospitalvitals | medicalvitals | HospitalVitals | HospitalVitals_mapping |
es_hospitalmovement | medicalmovement | HospitalMovement | HospitalMovement_mapping |
es_traintelemetry | metrotrain | TrainTelemetry | TrainTelemetry_mapping |
es_hospitalvitals end to endFrom your workspace, click + New item → Eventstream. Enter the name es_hospitalvitals and click Create.
Fabric provisions the item and drops you onto an empty canvas with a placeholder source node and an internal stream named es_hospitalvitals-stream in the middle.
Click the placeholder source node → External source → Azure Event Hubs → Add. In the source pane on the right:
medicalvitalscn-medicalvitals$Default and Data format = Json.azure-event-hub to src-medical-vitals.The Connection settings pane asks for the Event Hub namespace and the Event Hub. The namespace is the same for all three streams; only the Event Hub name changes:
| Stream | Event Hub namespace | Event Hub |
|---|---|---|
| Hospital Vitals | rtidemo | medicalvitals |
| Hospital Movement | rtidemo | medicalmovement |
| Metro Trains | rtidemo | metrotrain |
The source node turns green and shows Active within ~30 seconds.
sendListen SAS policy is hub-scoped, so a key issued for one Event Hub will not authenticate against another. Reusing one connection across Eventstreams produces an InvalidSignature error on the second source you wire up.
With the source node selected, open the Data preview tab in the bottom pane. JSON events should populate within ~10 seconds. If the pane stays empty, the producer for this hub is not running, or the connection settings are wrong - fix the source before continuing.
On the canvas, click the source node → + Add destination → Eventhouse. In the destination pane on the right, fill in:
Direct ingestiondest-hospital-vitalseh_urbanpulse_rtieh_urbanpulse_rtiClick Save. The destination tile lands on the canvas in an Unconfigured state - this is expected.
Click Publish on the toolbar. Fabric commits the topology and exits edit mode. The source tile remains Active, and the destination tile now shows a Configure banner because the table binding is still missing.
On the published canvas, click Configure on the destination tile. In the Activate dialog, fill in:
HospitalVitalsJSONHospitalVitals_mappingClick Add. Within ~30 seconds the destination flips to Active and rows start landing in HospitalVitals.
HospitalVitals_mapping exactly - mapping names are case-sensitive.
Run the same six sub-steps twice more, once for each row below. Create a brand-new connection in sub-step 2 every time:
| Eventstream | Source name | Event Hub | Connection name | Destination name | Table | Mapping |
|---|---|---|---|---|---|---|
es_hospitalmovement | src-medical-movement | medicalmovement | cn-medicalmovement | dest-hospital-movement | HospitalMovement | HospitalMovement_mapping |
es_traintelemetry | src-metra-trains | metrotrain | cn-metrotrain | dest-train-telemetry | TrainTelemetry | TrainTelemetry_mapping |
Switch back to the eh_urbanpulse_rti Eventhouse and open the KQL queryset editor. Run each block below on its own. Each should return ten recent rows with timestamps from the last minute or so.
HospitalVitals
| take 10
HospitalMovement
| take 10
TrainTelemetry
| take 10
Then run the row-count check across all three tables at once:
// Row counts in last 5 minutes - all three should have > 0
union withsource=table HospitalVitals, HospitalMovement, TrainTelemetry
| where timestamp > ago(5m)
| summarize events = count() by table
| order by events desc
A KQL update policy is a server-side rule attached to a destination table. Whenever rows are ingested into a source table, the policy runs a KQL function over the new batch and writes the function's output into the destination table - in the same transaction as the original ingest. The result is a second table that stays in lock-step with the first one without any external scheduler, pipeline, or notebook.
Eventstream gives you a no-code, JSON-passthrough path: bytes in Event Hub land verbatim in a KQL table. That's perfect for raw telemetry, but real workloads usually need more:
severity, region, or tenant_id column derived from the payload at write time, so consumers don't have to recompute it on every query.All four are write-time transformations: cheaper than running them on every read, and the derived table is queryable the instant a row lands.
Three KQL objects make up the pattern:
The function only sees the new batch (not the full history), so cost stays bounded as the source table grows.
VitalsAlerts table from HospitalVitalsTo make this concrete, the snippet below adds a VitalsAlerts table that contains only critical readings, with a derived severity column. Run it in the same KQL queryset you used in Step 4. The Eventstream you built in Step 3 keeps writing to HospitalVitals; the update policy makes VitalsAlerts populate automatically as rows flow in.
VitalsAlerts derived table (skip if short on time)// Only critical readings land here. Same datetime column name
// as the source so the update policy passes through cleanly.
.create table VitalsAlerts (
patient_id: string,
heart_rate: int,
spo2: int,
temperature_f: real,
severity: string,
timestamp: datetime
)
// Filters to critical readings and tags each row with a severity.
// Output schema MUST match VitalsAlerts column-for-column.
.create-or-alter function VitalsAlerts_TransformFn() {
HospitalVitals
| where heart_rate > 120 or heart_rate < 50
or spo2 < 90
or temperature_f > 101.5
| extend severity = case(
spo2 < 88 or heart_rate > 140, "critical",
"warning")
| project patient_id, heart_rate, spo2, temperature_f, severity, timestamp
}
// Runs the function on every ingest batch into HospitalVitals
// and appends matching rows into VitalsAlerts.
.alter table VitalsAlerts policy update
@'[{"Source":"HospitalVitals","Query":"VitalsAlerts_TransformFn()","IsEnabled":true,"PropagateIngestionProperties":true}]'
// Should return a small, growing set of critical readings.
VitalsAlerts
| where timestamp > ago(5m)
| summarize alerts = count() by severity
| order by alerts desc
project list has to match the destination table's columns by name, type, and order. A mismatch makes every ingest into the source fail until you fix or disable the policy.
Module 4 curates these and the Lakehouse tables before Module 5 builds the Real-Time Dashboard on top.