Have you ever spent time in the hospital or had biometric tests done? Those devices generate and save a lot of information that should prompt some action or process. We often look for anomalies in alert situations, but there are significantly more uses for that data. You might be wearing a heart monitor to record information for a possible procedure or a sleep study that records your vitals to identify a problem. These devices are constantly improving their ability to collect actionable information and improve our lives.
DeltaStream provides a powerful ability to process data in Apache Kafka topics and write to various destinations, including Apache Iceberg. The AWS Glue and Apache Polaris (incubating) catalogs are supported. For this example, we’ll be using Polaris. If you are unfamiliar with DeltaStream, this short interactive demo will walk you through it. With that out of the way, let’s look at our setup.

Use Case: Aggregating IOT Data to Iceberg

For this example, we’ll use a stream of simulated data containing a timestamp, patient ID, and assorted vital information. The description will be in the solution section. The data stream will be aggregated into a new table written to Iceberg, which will be our source of truth.

Our data is comprised of the following:

  • Inbound health_data Kafka topic with the following data:
    • 100 patients
    • Sensor data emitted every 10 minutes for 6 months starting June 1, 2024
    • Sensor data fields are randomly generated between normal bounds for the fields
  • Outbound to Iceberg aggregated patient and heart rate data

Our queries do the following:

  • Aggregate the hourly average heart rate per patient from Kafka and write to Iceberg
  • Find the top 3 patients with the highest daily heart rate for each month

Setup and Solution

First, let’s define the data we are working with. Our Kafka topic health_data looks like this:

  1. {
  2. "key": NULL,
  3. "value": {
  4. "event_timestamp": 17260295530,
  5. "patientId": "Patient-1",
  6. "heartRate": 74,
  7. "bodyTemperature": 99.3,
  8. "bloodPressureSystolic": 94,
  9. "bloodPressureDiastolic": 68,
  10. "respiratoryRate": 16,
  11. "oxygenLevel": 96,
  12. "bodyPosition": "Standing"
  13. }
  14. }

That means we need to create a DeltaStream Stream-type Object.  We do this with a CSAS statement:

  1. CREATE STREAM health_data (
  2. event_timestamp BIGINT,
  3. "patientId" STRING,
  4. "heartRate" INT,
  5. "bodyTemperature" DOUBLE,
  6. "bloodPressureSystolic" INT,
  7. "bloodPressureDiastolic" INT,
  8. "respiratoryRate" INT,
  9. "oxygenLevel" INT,
  10. "bodyPosition" STRING
  11. )
  12. WITH (
  13. 'starting.position' = 'earliest',
  14. 'topic' = 'health_metrics_sg',
  15. 'value.format' = 'JSON'
  16. );

Our next step is to create an Iceberg table to hold our hourly average heart rate data. The following CTAS does that for us, and this is where the magic happens. We can aggregate our data in flight before we land it in Iceberg. This dramatically reduces latency, eliminates compute costs for transformation, and reduces storage size. If you are unfamiliar with configuring Iceberg in DeltaStream, this short interactive demo can walk you through it.

  1. CREATE TABLE avg_heart_rate WITH(
  2. 'iceberg.rest.catalog.namespace.name'='sgns',
  3. 'iceberg.rest.catalog.table.name'='avg_heart_rate')
  4. AS SELECT
  5. window_start, window_end, "patientId", AVG("heartRate") AS avg_hr
  6. FROM
  7. TUMBLE(health_data, SIZE 1 HOUR)
  8. WITH('timestamp'='event_timestamp', 'starting.position'='earliest')
  9. GROUP BY
  10. window_start, window_end, "patientId";

Our resulting data looks like this:

  1. {
  2. "window_start": "2024-08-22T23:00",
  3. "window_end": "2024-08-23T00:00",
  4. "patientId": "Patient-1",
  5. "avg_hr": 83
  6. }

Now that we have an Iceberg table with our patient ID, the average hourly heart rate, and the time window, we can look at the patients with the highest heart rate in a given month. DeltaStream enables us to immediately perform analytics on the data it just processed from Kafka to Iceberg without leaving the DeltaStream environment or performing any additional configurations – using only familiar SQL.

  1. WITH ranked_heart_rates AS (
  2. SELECT
  3. "patientId",
  4. avg_hr,
  5. window_start,
  6. window_end,
  7. MONTH(window_start) AS MONTH,
  8. YEAR(window_start) AS YEAR,
  9. ROW_NUMBER() OVER (
  10. PARTITION BY YEAR(window_start), MONTH(window_start)
  11. ORDER BY avg_hr DESC
  12. ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
  13. ) AS rank
  14. FROM
  15. avg_heart_rate
  16. WHERE
  17. MONTH(window_start) = MONTH(window_end)
  18. AND YEAR(window_start) = YEAR(window_end)
  19. )
  20. SELECT
  21. "patientId",
  22. avg_hr,
  23. MONTH,
  24. YEAR
  25. FROM
  26. ranked_heart_rates
  27. WHERE
  28. rank <= 3
  29. ORDER BY
  30. YEAR, MONTH, rank;

This query can look daunting, but it’s not a lot of code for some powerful results. Let’s break it down.

Step 1: Creating a Common Table Expression (CTE)

The query begins with a CTE named ranked_heart_rates. This temporary result set performs the initial data preparation and ranking.

Step 2: Extracting Time Components

We extract the month and year from the window_start timestamp, creating separate columns for easier grouping and sorting using DeltaStream Temporal Functions:

  1. MONTH(window_start) AS MONTH,
  2. YEAR(window_start) AS YEAR,

Step 3: Ranking Records with Window Functions

The heart of this query uses the ROW_NUMBER() window function to assign a rank to each patient's heart rate within its month-year group:

  1. ROW_NUMBER() OVER (
  2. PARTITION BY YEAR(window_start), MONTH(window_start)
  3. ORDER BY avg_hr DESC
  4. ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
  5. ) AS rank

This function:

  • PARTITION BY divides the data into month-year segments
  • ORDER BY avg_hr DESC ranks records with highest heart rates first
  • The window frame specification ensures proper calculation across the partition

Step 4: Filtering for Valid Time Windows

We only include records where the start and end timestamps fall within the same calendar month and year. Our sample data is all in 2024, but this takes into account crossing the year boundary:

  1. WHERE
  2. MONTH(window_start) = MONTH(window_end)
  3. AND YEAR(window_start) = YEAR(window_end)

Step 5: Selecting the Top 3 Per Month

Finally, the main query filters the ranked results only to include the top 3 entries per month and orders them chronologically:

  1. SELECT patientId, avg_hr, window_start, MONTH, YEAR
  2. FROM ranked_heart_rates
  3. WHERE rank <= 3
  4. ORDER BY YEAR, MONTH, rank;

Processing Medical Device Data

We just walked through a scenario for processing medical device data coming in via a Kafka topic by consolidating average hourly data per patient and writing it to Iceberg. We then immediately queried the data in Iceberg from DeltaStream to find patients with the highest average heart rate per month. A visualization tool could be used on that same Iceberg data, further taking advantage of open table formats. Notably, we performed a “shift-left” with our processing. We were able to aggregate data in flight. Then we landed that data in Iceberg, thus reducing latency and additional storage and compute costs, seamlessly preparing the data for usage in a single place.