Air travel is one of the most popular modes of transportation, but it doesn’t come without the risk of flight delays from weather, technical malfunctions, or other reasons. These delays, while most of the time is out of the airline’s control, can be frustrating for passengers who only wish for stress-free travel. Accurate and timely communication of flight delays, gate changes, or other status updates are essential for maintaining customer satisfaction. To achieve this, airlines need a robust data platform that can handle flight status updates in real-time.

In this post, we will show off how DeltaStream can be used to easily set up streaming pipelines that process real-time airline data. As a fully managed service powered by Apache Flink, users are given the capabilities of Flink without needing to deal with the complexities of running and scaling Flink jobs. In fact, Flink is just an implementation detail, as users can simply write SQL queries to set up their stream processing pipelines. For our example, we will transform flight status to create an always up-to-date view of current flight statuses.

Raw Source Data for Airline Flight Status

For our example, we have 2 raw source streams:

  • flights: flight information, including scheduled departure and arrival times.
  • flight_updates: updates to flight status, including a new departure time.

In DeltaStream, we can create Relations to represent these data:

  1. CREATE CHANGELOG flights (
  2. flight_id VARCHAR,
  3. event_ts TIMESTAMP,
  4. origin VARCHAR,
  5. destination VARCHAR,
  6. scheduled_dep TIMESTAMP,
  7. scheduled_arr TIMESTAMP,
  8. PRIMARY KEY (flight_id)
  9. ) WITH (
  10. 'topic' = 'flights',
  11. 'value.format' = 'json',
  12. 'timestamp' = 'event_ts'
  13. );

  1. CREATE STREAM flight_updates (
  2. flight_id VARCHAR,
  3. event_ts TIMESTAMP,
  4. updated_departure TIMESTAMP,
  5. "description" VARCHAR
  6. ) WITH (
  7. 'topic' = 'flight_updates',
  8. 'value.format' = 'json',
  9. 'timestamp' = 'event_ts'
  10. );

Use Case: Create an Always Up-to-Date View of Current Flight Status

In order to get the latest complete flight status information, we need to enrich our flight_updates Stream by joining it with our flights Changelog. The result of this join will be a stream of flight updates which include the latest departure and arrival times. We can achieve this with the following query:

  1. CREATE STREAM enriched_flight_updates AS
  2. SELECT
  3. u.flight_id,
  4. u.event_ts,
  5. f.origin,
  6. f.destination,
  7. (DS_TOEPOCH(u.updated_departure) - DS_TOEPOCH(f.scheduled_dep)) / 60000 AS mins_delayed,
  8. u.updated_departure AS current_departure,
  9. CAST(TO_TIMESTAMP_LTZ(
  10. (
  11. DS_TOEPOCH(f.scheduled_arr) +
  12. (DS_TOEPOCH(u.updated_departure) - DS_TOEPOCH(f.scheduled_dep))
  13. ), 3) AS TIMESTAMP
  14. ) AS current_arrival,
  15. u."description"
  16. FROM
  17. flight_updates u
  18. JOIN flights f ON u.flight_id = f.flight_id;

We want to eventually materialize these update statuses into a view, using flight_id as the primary key. So, we can define a Changelog that is backed by the same data as the enriched_flight_updates Stream. Notice in the WITH clause of the following statement that the topic parameter is set to enriched_flight_updates.

  1. CREATE CHANGELOG enriched_flight_updates_log (
  2. flight_id VARCHAR,
  3. event_ts TIMESTAMP,
  4. origin VARCHAR,
  5. destination VARCHAR,
  6. mins_delayed BIGINT,
  7. current_departure TIMESTAMP,
  8. current_arrival TIMESTAMP,
  9. "description" VARCHAR,
  10. PRIMARY KEY (flight_id)
  11. ) WITH (
  12. 'topic' = 'enriched_flight_updates',
  13. 'value.format' = 'json',
  14. 'timestamp' = 'event_ts'
  15. );

Now, the enriched_flight_updates_log Changelog will contain all flight updates with the flight information that the original flight_updates Stream was missing. However, only updates are currently included and there are no events for flights that don’t have delays or updates in this Stream. To fix this, we can write an INSERT INTO query to generate updates from the flights. This will ensure that our enriched_flight_updates_log Changelog will capture all the statuses of all flights.

  1. INSERT INTO enriched_flight_updates_log
  2. SELECT
  3. flight_id,
  4. event_ts,
  5. origin,
  6. destination,
  7. CAST(0 AS BIGINT) AS mins_delayed,
  8. scheduled_dep AS current_departure,
  9. scheduled_arr AS current_arrival,
  10. CAST(NULL AS VARCHAR) AS "description"
  11. FROM
  12. flights;

Finally, we can materialize our enriched_flight_updates_log into a materialized view that users can query at any time and get the most up-to-date information. Since our enriched_flight_updates_log Changelog has the primary key of flight_id, the view will be updated with UPSERT mode on the flight_id key. If we had instead created our materialized view from the enriched_flight_updates Stream, then the view would be created in append mode where each event is a new row in the view. Using upsert mode, we can update the existing row, if any, based on the Changelog’s primary key.

  1. CREATE MATERIALIZED VIEW flight_status_view AS
  2. SELECT
  3. *
  4. FROM
  5. enriched_flight_updates_log;

After creating our materialized view, we can query it at any moment and get the latest flight statuses. The view can be queried directly from the DeltaStream console or CLI, as well as through the DeltaStream REST API for other applications to access programmatically. Let’s look at some sample input and output data below.

Input for flights:

  1. {"flight_id": "Flight_1", "event_ts": "2024-03-28 10:12:13.489", "origin": "LAX", "destination": "YVR", "scheduled_dep": "2024-05-29 10:33:00", "scheduled_arr": "2024-05-29 13:37:00"}
  2. {"flight_id": "Flight_2", "event_ts": "2024-04-11 11:58:56.489", "origin": "JFK", "destination": "SFO", "scheduled_dep": "2024-05-29 12:30:00", "scheduled_arr": "2024-05-29 19:10:00"}
  3. {"flight_id": "Flight_3", "event_ts": "2024-04-23 10:12:13.489", "origin": "SIN", "destination": "NRT", "scheduled_dep": "2024-05-30 09:25:00", "scheduled_arr": "2024-05-30 17:30:00"}
  4. {"flight_id": "Flight_4", "event_ts": "2024-05-30 15:52:13.837", "origin": "AUS", "destination": "ORD", "scheduled_dep": "2024-07-20 09:15:00", "scheduled_arr": "2024-07-20 11:30:00"}

Input for flight_updates:

  1. {"flight_id": "Flight_1", "event_ts": "2024-05-28 15:52:13.837", "updated_departure": "2024-05-29 12:30:00", "description": "Thunderstorms" }
  2. {"flight_id": "Flight_2", "event_ts": "2024-05-29 12:30:13.837", "updated_departure": "2024-05-29 13:30:00", "description": "Waiting for connecting passengers" }
  3. {"flight_id": "Flight_1", "event_ts": "2024-05-29 12:52:13.837", "updated_departure": "2024-05-29 13:30:00", "description": "More Thunderstorms" }
  4. {"flight_id": "Flight_3", "event_ts": "2024-05-30 06:52:13.837", "updated_departure": "2024-05-30 12:30:00", "description": "Mechanical delays" }

Query flight_status_view:

  1. SELECT * FROM flight_status_view ORDER BY flight_id;

Output results:

  1. flight_id | event_ts | origin | destination | mins_delayed | current_departure | current_arrival | description
  2. ------------+--------------------------+--------+-------------+--------------+----------------------+----------------------+---------------------------------
  3. Flight_1 | 2024-05-29T12:52:13.837Z | LAX | YVR | 177 | 2024-05-29T13:30:00Z | 2024-05-29T16:34:00Z | More Thunderstorms
  4. Flight_2 | 2024-05-29T12:30:13.837Z | JFK | SFO | 60 | 2024-05-29T13:30:00Z | 2024-05-29T20:10:00Z | Waiting FOR connecting
  5. | | | | | | | passengers
  6. Flight_3 | 2024-05-30T06:52:13.837Z | SIN | NRT | 185 | 2024-05-30T12:30:00Z | 2024-05-30T20:35:00Z | Mechanical delays
  7. Flight_4 | 2024-05-30T15:52:13.837Z | AUS | ORD | 0 | 2024-07-20T09:15:00Z | 2024-07-20T11:30:00Z | <nil>

Conclusion

When it comes to air travel, travelers have an expectation that airlines will communicate flight delays and flight status effectively. In this post, we demonstrated a simple use case of how airlines can set up real-time data pipelines to process and join flight and flight updates data, with just a few simple SQL queries using DeltaStream. As a fully managed and serverless service, DeltaStream enables its users to easily create powerful real-time applications without any of the overhead.

If you want to learn more about DeltaStream, sign up for our free trial or reach out to us.