06 Jun 2024

Min Read

Real-time Airline Data Pipeline for on time Flight Status with SQL

Air travel is one of the most popular modes of transportation, but it doesn’t come without the risk of flight delays from weather, technical malfunctions, or other reasons. These delays, while most of the time is out of the airline’s control, can be frustrating for passengers who only wish for stress-free travel. Accurate and timely communication of flight delays, gate changes, or other status updates are essential for maintaining customer satisfaction. To achieve this, airlines need a robust data platform that can handle flight status updates in real-time.

In this post, we will show off how DeltaStream can be used to easily set up streaming pipelines that process real-time airline data. As a fully managed service powered by Apache Flink, users are given the capabilities of Flink without needing to deal with the complexities of running and scaling Flink jobs. In fact, Flink is just an implementation detail, as users can simply write SQL queries to set up their stream processing pipelines. For our example, we will transform flight status to create an always up-to-date view of current flight statuses.

Raw Source Data for Airline Flight Status

For our example, we have 2 raw source streams:

  • flights: flight information, including scheduled departure and arrival times.
  • flight_updates: updates to flight status, including a new departure time.

In DeltaStream, we can create Relations to represent these data:

  1. CREATE CHANGELOG flights (
  2. flight_id VARCHAR,
  3. event_ts TIMESTAMP,
  4. origin VARCHAR,
  5. destination VARCHAR,
  6. scheduled_dep TIMESTAMP,
  7. scheduled_arr TIMESTAMP,
  8. PRIMARY KEY (flight_id)
  9. ) WITH (
  10. 'topic' = 'flights',
  11. 'value.format' = 'json',
  12. 'timestamp' = 'event_ts'
  13. );

  1. CREATE STREAM flight_updates (
  2. flight_id VARCHAR,
  3. event_ts TIMESTAMP,
  4. updated_departure TIMESTAMP,
  5. "description" VARCHAR
  6. ) WITH (
  7. 'topic' = 'flight_updates',
  8. 'value.format' = 'json',
  9. 'timestamp' = 'event_ts'
  10. );

Use Case: Create an Always Up-to-Date View of Current Flight Status

In order to get the latest complete flight status information, we need to enrich our flight_updates Stream by joining it with our flights Changelog. The result of this join will be a stream of flight updates which include the latest departure and arrival times. We can achieve this with the following query:

  1. CREATE STREAM enriched_flight_updates AS
  2. SELECT
  3. u.flight_id,
  4. u.event_ts,
  5. f.origin,
  6. f.destination,
  7. (DS_TOEPOCH(u.updated_departure) - DS_TOEPOCH(f.scheduled_dep)) / 60000 AS mins_delayed,
  8. u.updated_departure AS current_departure,
  9. CAST(TO_TIMESTAMP_LTZ(
  10. (
  11. DS_TOEPOCH(f.scheduled_arr) +
  12. (DS_TOEPOCH(u.updated_departure) - DS_TOEPOCH(f.scheduled_dep))
  13. ), 3) AS TIMESTAMP
  14. ) AS current_arrival,
  15. u."description"
  16. FROM
  17. flight_updates u
  18. JOIN flights f ON u.flight_id = f.flight_id;

We want to eventually materialize these update statuses into a view, using flight_id as the primary key. So, we can define a Changelog that is backed by the same data as the enriched_flight_updates Stream. Notice in the WITH clause of the following statement that the topic parameter is set to enriched_flight_updates.

  1. CREATE CHANGELOG enriched_flight_updates_log (
  2. flight_id VARCHAR,
  3. event_ts TIMESTAMP,
  4. origin VARCHAR,
  5. destination VARCHAR,
  6. mins_delayed BIGINT,
  7. current_departure TIMESTAMP,
  8. current_arrival TIMESTAMP,
  9. "description" VARCHAR,
  10. PRIMARY KEY (flight_id)
  11. ) WITH (
  12. 'topic' = 'enriched_flight_updates',
  13. 'value.format' = 'json',
  14. 'timestamp' = 'event_ts'
  15. );

Now, the enriched_flight_updates_log Changelog will contain all flight updates with the flight information that the original flight_updates Stream was missing. However, only updates are currently included and there are no events for flights that don’t have delays or updates in this Stream. To fix this, we can write an INSERT INTO query to generate updates from the flights. This will ensure that our enriched_flight_updates_log Changelog will capture all the statuses of all flights.

  1. INSERT INTO enriched_flight_updates_log
  2. SELECT
  3. flight_id,
  4. event_ts,
  5. origin,
  6. destination,
  7. CAST(0 AS BIGINT) AS mins_delayed,
  8. scheduled_dep AS current_departure,
  9. scheduled_arr AS current_arrival,
  10. CAST(NULL AS VARCHAR) AS "description"
  11. FROM
  12. flights;

Finally, we can materialize our enriched_flight_updates_log into a materialized view that users can query at any time and get the most up-to-date information. Since our enriched_flight_updates_log Changelog has the primary key of flight_id, the view will be updated with UPSERT mode on the flight_id key. If we had instead created our materialized view from the enriched_flight_updates Stream, then the view would be created in append mode where each event is a new row in the view. Using upsert mode, we can update the existing row, if any, based on the Changelog’s primary key.

  1. CREATE MATERIALIZED VIEW flight_status_view AS
  2. SELECT
  3. *
  4. FROM
  5. enriched_flight_updates_log;

After creating our materialized view, we can query it at any moment and get the latest flight statuses. The view can be queried directly from the DeltaStream console or CLI, as well as through the DeltaStream REST API for other applications to access programmatically. Let’s look at some sample input and output data below.

Input for flights:

  1. {"flight_id": "Flight_1", "event_ts": "2024-03-28 10:12:13.489", "origin": "LAX", "destination": "YVR", "scheduled_dep": "2024-05-29 10:33:00", "scheduled_arr": "2024-05-29 13:37:00"}
  2. {"flight_id": "Flight_2", "event_ts": "2024-04-11 11:58:56.489", "origin": "JFK", "destination": "SFO", "scheduled_dep": "2024-05-29 12:30:00", "scheduled_arr": "2024-05-29 19:10:00"}
  3. {"flight_id": "Flight_3", "event_ts": "2024-04-23 10:12:13.489", "origin": "SIN", "destination": "NRT", "scheduled_dep": "2024-05-30 09:25:00", "scheduled_arr": "2024-05-30 17:30:00"}
  4. {"flight_id": "Flight_4", "event_ts": "2024-05-30 15:52:13.837", "origin": "AUS", "destination": "ORD", "scheduled_dep": "2024-07-20 09:15:00", "scheduled_arr": "2024-07-20 11:30:00"}

Input for flight_updates:

  1. {"flight_id": "Flight_1", "event_ts": "2024-05-28 15:52:13.837", "updated_departure": "2024-05-29 12:30:00", "description": "Thunderstorms" }
  2. {"flight_id": "Flight_2", "event_ts": "2024-05-29 12:30:13.837", "updated_departure": "2024-05-29 13:30:00", "description": "Waiting for connecting passengers" }
  3. {"flight_id": "Flight_1", "event_ts": "2024-05-29 12:52:13.837", "updated_departure": "2024-05-29 13:30:00", "description": "More Thunderstorms" }
  4. {"flight_id": "Flight_3", "event_ts": "2024-05-30 06:52:13.837", "updated_departure": "2024-05-30 12:30:00", "description": "Mechanical delays" }

Query flight_status_view:

  1. SELECT * FROM flight_status_view ORDER BY flight_id;

Output results:

  1. flight_id | event_ts | origin | destination | mins_delayed | current_departure | current_arrival | description
  2. ------------+--------------------------+--------+-------------+--------------+----------------------+----------------------+---------------------------------
  3. Flight_1 | 2024-05-29T12:52:13.837Z | LAX | YVR | 177 | 2024-05-29T13:30:00Z | 2024-05-29T16:34:00Z | More Thunderstorms
  4. Flight_2 | 2024-05-29T12:30:13.837Z | JFK | SFO | 60 | 2024-05-29T13:30:00Z | 2024-05-29T20:10:00Z | Waiting FOR connecting
  5. | | | | | | | passengers
  6. Flight_3 | 2024-05-30T06:52:13.837Z | SIN | NRT | 185 | 2024-05-30T12:30:00Z | 2024-05-30T20:35:00Z | Mechanical delays
  7. Flight_4 | 2024-05-30T15:52:13.837Z | AUS | ORD | 0 | 2024-07-20T09:15:00Z | 2024-07-20T11:30:00Z | <nil>

Conclusion

When it comes to air travel, travelers have an expectation that airlines will communicate flight delays and flight status effectively. In this post, we demonstrated a simple use case of how airlines can set up real-time data pipelines to process and join flight and flight updates data, with just a few simple SQL queries using DeltaStream. As a fully managed and serverless service, DeltaStream enables its users to easily create powerful real-time applications without any of the overhead.

If you want to learn more about DeltaStream, sign up for our free trial or reach out to us.

27 Mar 2024

Min Read

Maximizing Performance: Processing Real-time Online Gaming Data

The gaming industry has seen immense growth in the past decade. According to an analysis by EY, the global market for gaming in 2021 was $193.4b, up from $93.6b in 2016, and the market is only expected to continue growing with an estimated market of $210.7b in 2025.

With billions of people playing video games, gaming companies need to ensure that their data platforms can handle the demands and requirements of this massive amount of data. Online gaming, which makes up a large portion of the gaming market, often has to handle and act on millions of real-time events per second. Consider all the real-time player interactions, chatrooms, leaderboards, and telemetry data that are part of modern online games. For this reason, game developers need a real-time streaming and stream processing platform that can seamlessly scale, process, and govern these data.

This is where DeltaStream comes in – to manage and process all of the streaming data in your organization. In this blog post, we’ll cover how DeltaStream can help game developers for two use cases:

  • Keeping leaderboards up to date
  • Temporarily ban players for leaving games early

Connecting a Streaming Store

Although DeltaStream can source real-time data from many different data stores such as Kafka, Kinesis, PostgreSQL (as CDC data), and others, for our use cases we’ll be using RedPanda. As mentioned in an article on gaming from RedPanda, RedPanda is a cost-effective, easily scalable, and very performant alternative to using Kafka. These attributes make it a great streaming storage option for real-time gaming data.

Since RedPanda is compatible with Kafka’s APIs, users can add RedPanda as a Store in DeltaStream with the following statement:

Keeping Leaderboards Up to Date with SQL and Materialized Views

Let’s assume we have a topic called “game_results” in our RedPanda Store. We can think of the events in this topic to be the results of playing some game. So, every time a player finishes a game, a new record is logged into the topic which includes the timestamp of the end of the game, the player ID, and whether or not they won the game. This topic contains records that look like the following:

We can define a DeltaStream Stream that is backed by this topic with the following query:

Next, let’s create a Materialized View to keep track for each player how many games they have completed and how many games they have won:

By creating this Materialized View, DeltaStream launches a Flink job behind the scenes which continuously ingests from the “game_results” topic and updates the view with the latest data. In the world of online gaming, thousands of games could be finished every minute, and as these games are completed, the view will stay up to date with the latest high scores.

Next, a leaderboard can be generated by querying the Materialized View. For example, the following query will return the 10 players with the most wins:

If we wanted to find out which 10 players have the highest win/loss ratio, then we can run the following query:

Temporarily Ban Players for Leaving Games Early with Applications

Although most people enjoy online gaming, sometimes our competitive nature can bring out the worst in us. It’s not uncommon for players to leave games early, phenomenon commonly known as “rage quitting.” For team-based competitive online games however, rage quitters can be detrimental to maintaining a fun and balanced competitive game, as the teammates of the rage quitter have to deal with the consequences of being a man down. To deal with this, gaming developers often add a timeout to players who continuously quit games early to dissuade this behavior.

For this use case, we want to detect when a player has quit 2 of their last 4 games. Let’s assume that there is a topic called “player_game_actions” in our RedPanda Store. Below is an example of a record in this topic:

The action field here describes the interaction between the player and the game lobby. Possible values include JOIN, QUIT, COMPLETE. We can define a Stream backed by this topic:

Now, let’s process this Stream of data to find out which players have left 2 of their last 4 games. While we can solve this problem writing a query with very nested subqueries, we’ll use DeltaStream’s latest Application feature for this example:

Now, let’s process this Stream of data to find out which players have left 2 of their last 4 games. While we can solve this problem writing a query with very nested subqueries, we’ll use DeltaStream’s latest Application feature for this example:

Use DeltaStream to Unlock Real-time Gaming Analytics

Real-time interactions are one of the core components of online gaming, and as the industry continues to grow, it becomes increasingly necessary for these gaming companies to find cost-effective, scalable, and low-latency data solutions. As a fully-managed solution powered by Apache Flink, DeltaStream is an easy-to-use, scalable, and resilient system for all real-time stream processing workloads.

In the examples in this post, we built solutions using DeltaStream to process real-time gaming data, ingesting from storage systems such as RedPanda or Confluent Cloud. In the first use case, we used Materialized Views to build a real-time user-driven solution that keeps track of each player’s wins and losses. In the second use case, we built a real-time event-driven solution to detect when a player is being unsportsmanlike, so that downstream backend services can decide how to act on these players with minimal latency. As a system that can do Streaming Analytics and act as a Streaming Database, DeltaStream is a system built for all stream processing workloads. If you want to learn more about how DeltaStream can help unlock real-time insights for your gaming data, reach out to us or get a trial.

28 Feb 2024

Min Read

Up-to-date Driver Ratings for Rideshare Apps

Markets like e-commerce, the gig economy, and local businesses all rely on ratings to give an accurate representation of how good a product or service is. Ratings are not only useful for consumers, but they’re also useful for the companies managing these goods and services. While the use of ratings don’t typically require low-latencies, there are still some cases where having an up-to-date rating can unlock opportunities. For example, in a rideshare app, a company may want to block a driver whose rating has fallen below a certain threshold from taking more trips. This is to prevent riders from taking rides from unsafe or unpleasant drivers. Without a rating that is up to date, the systems that validate if a driver is eligible to drive more passengers are working with outdated data and cannot accurately determine if a driver should be allowed to drive or not.

In this post, we’ll showcase how DeltaStream can be used to keep ratings up to date to solve this rideshare use case.

Connect a Store and Create a Stream

Let’s assume we have a Kafka cluster with a topic called “driver_ratings” which contains the data that we’ll be sourcing from, and we’ve already defined a DeltaStream Store for this Kafka cluster (see tutorial for how to create a Store in DeltaStream) called “kafka_cluster.” We can create a Stream to represent this data with the following statement in DeltaStream:

  1. CREATE STREAM driver_ratings (
  2. event_ts TIMESTAMP,
  3. driver_id VARCHAR,
  4. rider_id VARCHAR,
  5. rating DOUBLE
  6. ) WITH (
  7. 'store' = 'kafka_store',
  8. 'topic' = 'driver_ratings',
  9. 'value.format' = 'json',
  10. 'timestamp' = 'event_ts',
  11. 'timestamp.format' = 'iso8601'
  12. );

Real-Time Driver Ratings

For our use case, we want to keep an up-to-date rating for drivers, as well as an up-to-date count of how many reviews a driver has. These values will help downstream applications determine if a driver should be suspended. If a driver has a certain number of rides and their rating is below some threshold, then they should be suspended.

Notice in the schema of the data, from the Stream definition in the setup, that there is a driver_id and rating field. The driver_id field specifies which driver is being rated, and the rating field specifies the rating that the driver has received for a trip. To determine the driver’s rating, we need to keep an up-to-date average of all of a driver’s rides for each driver. We can do this in SQL by grouping by the driver_id field, then using the AVG function to calculate the average. Similarly, for finding the number of reviews, we can use the COUNT function. These results will be persisted to a materialized view so that data consumers can easily query the view to find the latest driver ratings.

CREATE MATERIALIZED VIEW AS SELECT Query:

  1. CREATE MATERIALIZED VIEW avg_driver_ratings AS
  2. SELECT
  3. driver_id,
  4. AVG(rating) AS avg_driver_rating,
  5. COUNT(*) AS num_reviews
  6. FROM
  7. driver_ratings
  8. GROUP BY
  9. driver_id;

Since the above query performs an aggregation grouping by the driver_id field, the result has a primary key of driver_id. This creates a materialized view in UPSERT mode, such that there is always one row per driver_id that reflects that driver’s current ratings.

By submitting the query, DeltaStream launches a long-lived continuous job in the background which constantly reads from the driver_ratings topic, computes the latest averages and counts, then updates the materialized view. This way, as new ratings arrive in the source topic, the materialized view is updated immediately.

Users can use DeltaStream’s Web AppCLI, or REST API to query the materialized view. Using one of these methods, downstream data consumers, such as the team responsible for driver suspensions in the rideshare company, can query the materialized view for the latest results. For example, we can query the materialized view for driver IDs with a rating below 4 and at least 15 rides.

Query 1 against avg_driver_ratings materialized view:

  1. SELECT
  2. *
  3. FROM
  4. avg_driver_ratings
  5. WHERE
  6. avg_driver_rating < 4
  7. AND num_reviews >= 15;

  1. driver_id | avg_driver_rating | num_reviews
  2. ------------+--------------------+--------------
  3. Driver_5 | 3.75 | 16
  4. Driver_1 | 3.8823529411764706 | 17

Let’s also run a query to select all of our rows in our materialized view to see what the full result set looks like.

Query 2 against avg_driver_ratings materialized view:

  1. SELECT * FROM avg_driver_ratings ORDER BY avg_driver_rating;

  1. driver_id | avg_driver_rating | num_reviews
  2. ------------+--------------------+--------------
  3. Driver_6 | 3.5714285714285716 | 14
  4. Driver_5 | 3.75 | 16
  5. Driver_1 | 3.8823529411764706 | 17
  6. Driver_3 | 4.111111111111111 | 18
  7. Driver_7 | 4.166666666666667 | 18
  8. Driver_2 | 4.166666666666667 | 18
  9. Driver_9 | 4.208333333333333 | 24
  10. Driver_4 | 4.222222222222222 | 9
  11. Driver_8 | 4.25 | 16

Wrapping Up

In this post, we showcased how DeltaStream can keep average ratings up to date in real time. While calculating average reviews is typically a job done with batch processing, recreating this use case with stream processing creates opportunities for features and data products to be built on the new real-time assumptions. Although we focused on ridesharing in this example, ratings are used in plenty of different contexts and the same pipeline can be used to keep those ratings up to date in DeltaStream.

DeltaStream is the platform to unify, process, and govern all of your streaming data. If you want to learn more about DeltaStream’s materialized views or other capabilities, come schedule a demo with us or sign up for a free trial.

21 Feb 2024

Min Read

Stream Processing for Blockchain Data

Cryptocurrencies, smart contracts, NFTs, and Web3 have infiltrated mainstream media as the newest hot tech (other than generative AI of course). These technologies are backed by blockchains, which are distributed ledgers that rely on cryptography and decentralization to make secure transactions. In this blog post, we’ll be building a stream processing application using DeltaStream to inspect the gas fees associated with Ethereum transactions.

Ethereum is one of the most popular cryptocurrencies. Using Ethereum, users can set up wallets, transfer ether between accounts, interact with smart contracts, and more. With over a million transactions occurring on the Ethereum network every day (the latest Ethereum usage statistics), there is a lot of  real-time data getting processed by the network. However, off-chain analytics can also play a role – to extract insights from the blockchain or the metadata associated with the blockchain.

For this use case, we are going to be doing real-time analysis from the transactions data persisted to Ethereum’s blockchain. Any time an Ethereum user wants to perform an action in the network, whether it’s running a smart contract or simply transferring ether from their wallet to another wallet, a transaction needs to be created. The user will send this transaction to what are called “validators” who will persist this transaction to the blockchain. Once a transaction is part of a block on the blockchain, that transaction is completed as blocks are generally irreversible. However, each block on the blockchain has a gas limit, which essentially caps how many transactions can be in a particular block. Each transaction requires a certain amount of gas – a simple transfer of ether from one wallet to another costs 21,000 gas for example. Running complex smart contracts will require more gas and will fill up a block’s gas limit more quickly. This means that not every transaction automatically gets persisted to the blockchain as validators pick which set of transactions they want to include in the next block (read more about gas fees in Ethereum).

After the Ethereum Improvement Proposal (EIP) 1559 upgrade was added to Ethereum, the gas fee structure changed to include priority fees. In order to make your own transaction more attractive for validators to pick, a priority fee can be attached to the transaction. This priority fee is a tip to the validator if they write your transaction to a block. So, the larger the priority fee, the more likely a validator will include the transaction. The question we want to help solve in this post is what should we set the priority fee to be?

Using real Ethereum transactions data from Infura.io, we want to look at what transactions are being persisted to Ethereum’s blockchain in real-time and get a sense of how big the priority fees are for these transactions. So, in the following sections you’ll see how we can create an application to analyze gas fees in real time using DeltaStream. As an Ethereum user, these insights will be valuable for setting reasonable priority fees on transactions without overpaying.

Setup and Assumptions for Analyzing Real-Time Data

Using Infura.io’s APIs, we are able to get Ethereum’s entire blockchain block data. We wrote a small program to wait for new blocks, get the transactions in each block, and write these transactions as JSON-formatted payloads to Kafka. We’ll be using these transactions as the source data for our use case. However, there are some assumptions that we’ll be making to simplify our use case. These assumptions are listed out below for completeness:

  • We are ignoring legacy transactions that are not type 2 (read more about Ethereum’s transaction types). Type 2 transactions are those that adhere to the post EIP-1559 specifications and use priority fees. Since the EIP-1559 upgrade is backwards compatible, users can still send transactions using the old specifications, but we are ignoring these transactions to simplify our use case.
  • For our transactions data, we are enriching each transaction payload with the respective block’s timestamp and the transaction’s hash. These fields are not part of the transaction message itself.
  • Users set the maxPriorityFeePerGas and the maxFeePerGas when making a transaction. While it’s not always the case, for simplicity we will assume that the (maxPriorityFeePerGas + baseFeePerGas) > maxFeePerGas.
  • Occasionally, transactions with maxPriorityFeePerGas set to 0 or very low values will make it onto the blockchain. The reason validators choose these transactions is likely because users have bribed the validators (learn more about bribes in Ethereum in this research paper). As you’ll see later in the setup, we are going to filter out any transactions with maxPriorityFeePerGas <= 100.

Let’s get on with the setup of our use case. In DeltaStream, after setting up Kafka as a Store, we can create a Stream that is backed by the transactions data in our Kafka topic. The Stream that we create is metadata that informs DeltaStream how to deserialize the records in the Kafka topic. The following CREATE STREAM statement creates a new Stream called eth_txns that is backed by the ethereum_transactions topic.

  1. CREATE STREAM eth_txns (
  2. "txn_hash" VARCHAR,
  3. "block_ts" BIGINT,
  4. "blockNumber" BIGINT,
  5. "gas" BIGINT,
  6. "maxFeePerGas" BIGINT,
  7. "maxPriorityFeePerGas" BIGINT,
  8. "value" BIGINT
  9. ) WITH (
  10. 'topic' = 'ethereum_transactions',
  11. 'value.format' = 'json',
  12. 'timestamp' = 'block_ts'
  13. );

Now that we have our eth_txns source Stream defined, we first need to filter out transactions that don’t fit our assumptions. We can write a CREATE STREAM AS SELECT (CSAS) query that will continuously ingest data from the eth_txns Stream, filter out records that don’t meet our criteria, and sink the resulting records to a new Stream backed by a new Kafka topic. In the following query, note in the WHERE clause that we only accept transactions that have maxPriorityFeePerGas > 100 (transactions chosen due to bribes) and maxPriorityFeePerGas < maxFeePerGas (transactions whose priority fee is not accurately represented by maxPriorityFeePerGas).

CSAS query to create eth_txns_filtered Stream:

  1. CREATE STREAM eth_txns_filtered AS
  2. SELECT
  3. "txn_hash",
  4. "block_ts",
  5. "blockNumber",
  6. "maxPriorityFeePerGas",
  7. "gas"
  8. FROM eth_txns WITH ('source.deserialization.error.handling'='IGNORE')
  9. WHERE "maxPriorityFeePerGas" > 100 AND "maxPriorityFeePerGas" < "maxFeePerGas";

Analyzing Ethereum’s Blockchain Transaction Gas Fee Data

When forming the next block, Ethereum’s validators basically get to select whichever transactions they want from a pool of pending transactions. Since new blocks are only added every several seconds and blocks have a gas limit, we can expect validators to choose transactions that are in their best financial interest and choose the transactions with the highest priority fees per gas. So, we’ll analyze the maxPriorityFeePerGas field over time as transactions flow in to get a sense of what priority fees are currently being accepted.

The following query is a CREATE CHANGELOG AS SELECT (CCAS) query that is calculating the moving average, min, max, and standard deviation of priority fees over a 2 minute window.

CCAS query to create eth_txns_priority_fee_analysis Changelog:

  1. CREATE CHANGELOG eth_txns_priority_fee_analysis AS
  2. SELECT
  3. window_start,
  4. window_end,
  5. COUNT(*) AS txns_cnt,
  6. MIN("maxPriorityFeePerGas") AS min_priority_fee,
  7. MAX("maxPriorityFeePerGas") AS max_priority_fee,
  8. AVG("maxPriorityFeePerGas") AS avg_priority_fee,
  9. STDDEV_SAMP("maxPriorityFeePerGas") AS priority_fee_stddev
  10. FROM HOP(eth_txns_filtered, SIZE 2 MINUTES, ADVANCE BY 15 SECONDS)
  11. GROUP BY window_start, window_end;

Let’s see some result records in the eth_txns_priority_fee_analysis topic:

  1. {
  2. "window_start": "2023-12-18 21:14:45",
  3. "window_end": "2023-12-18 21:16:45",
  4. "txns_cnt": 368,
  5. "min_priority_fee": 50000000,
  6. "max_priority_fee": 32250000000,
  7. "avg_priority_fee": 859790456,
  8. "priority_fee_stddev": 97003259
  9. }
  10. {
  11. "window_start": "2023-12-18 21:15:00",
  12. "window_end": "2023-12-18 21:17:00",
  13. "txns_cnt": 514,
  14. "min_priority_fee": 50000000,
  15. "max_priority_fee": 219087884691,
  16. "avg_priority_fee": 1951491416,
  17. "priority_fee_stddev": 79308531
  18. }

Using these results, users can be better informed at setting priority fees for their own transactions. For example, if the transactions are more urgent, they can choose to set the priority fee to a value greater than the average. Similarly, if users want to save money and don’t mind waiting for some time for their transactions to make it onto the blockchain, they can choose a priority fee that is less than the average. These results are also useful for follow on use cases, such as tracking priority fees over a period of time. DeltaStream’s pattern recognition capabilities also allow users to track patterns in the priority fees. For example, users could set up a pattern recognition query to detect when priority fees stop trending upwards or when priority fees experience a sudden drop off.

Intersecting Web3 and Stream Processing

In this blog post, we put together a real-time streaming analytics pipeline to analyze Ethereum’s gas fees. With DeltaStream’s easy-to-use platform, we were able to solve the use case and deploy our pipeline within minutes, using only a few simple SQL queries. Although this is an entry level example, we illustrate a use case at the intersection of these two emerging technologies.

If you are interested in learning more about DeltaStream, schedule a demo with us or sign up for a free trial.

13 Feb 2024

Min Read

Stream Processing for IoT Data

The Internet of Things (IoT) refers to sensors and other devices that share and exchange data over a network. IoT has been on the rise for years and only seems to continue in its growing popularity with other technological advances, such as 5G cellular networks and more “smart” devices. From tracking patient health to monitoring agriculture, the applications for IoT are plentiful and diverse. Other sectors where IoT are used include security, transportation, home automation, and manufacturing.

Oracle defines Big Data as “data that contains greater variety, arriving in increasing volumes and with more velocity.” This definition is simply described with the 3 Vs – volume, velocity, and variety. IoT definitely matches this description, as sensors can emit a lot of data from numerous sensors and devices.

A platform capable of processing IoT data needs to be scalable in order to keep up with the volume of Big Data. It’s very common for many IoT applications to have new sensors added. Consider a drone fleet for package deliveries as an example – you may start off with 10 or 20 drones, but as demands for deliveries increases the size of your drone fleet can grow by orders of magnitude. The underlying systems processing these data needs to be able to scale horizontally to match the increase in data volume.

Many IoT use cases such as tracking patient health and monitoring security feeds require low latency insights. Sensors and devices providing real-time data often need to be acted on in real-time as well. For this reason, streaming and stream processing technologies have become increasingly popular and perhaps essential for solving these use cases. Streaming storage technologies such as Apache Kafka, Amazon Kinesis, and RedPanda can meet the low latency data transportation requirements of IoT. On the stream processing side, technologies such as Apache Flink and managed solutions such as DeltaStream can provide low latency streaming analytics.

IoT data can also come in various types and structures. Different sensors can have different data formats. Take a smart home for example, the cameras in a smart home will likely send very different data from a light or a thermometer. However, these sensors are all related to the same smart home. It’s important for a data platform handling IoT use cases to be able to join across different data sets and handle any variations in data structure, format, or type.

DeltaStream as a Streaming Analytics Platform and a Streaming Database

DeltaStream is a platform to unify, process, and govern streaming data. DeltaStream sits as the compute and governance layer on top of streaming storage systems such as Kafka. Powered by Apache Flink, DeltaStream is a fully managed solution that can process streaming data with very low latencies.

In this blog post we’ll cover 2 examples to show how DeltaStream can solve real-time IoT use cases. In the first use case, we’ll use DeltaStream’s Materialized Views to build a real-time request driven application. For the second use case, we’ll use DeltaStream to power real-time event-driven pipelines.

Use Case Setup: Transportation Sensor Data

For simplicity, both use cases will use the same source data. Let’s assume that our data is available in Apache Kafka and represents updates and sensor information for a truck fleet. We’ll first define Relations for the data in 2 Kafka topics.

The first Relation represents truck information. This includes an identifier for the truck, the speed of the truck, which thermometer is in the truck, and a timestamp for this update event represented as epoch milliseconds. Later on, we will use this event timestamp field to perform a join with data from other sensors. Since we expect regular truck information updates, we’ll define a Stream for this data.

Create truck_info Stream:

  1. CREATE STREAM truck_info (
  2. event_ts BIGINT,
  3. truck_id INT,
  4. speed_kmph INT,
  5. thermometer VARCHAR
  6. ) WITH (
  7. 'topic' = 'truck_info', 'value.format' = 'json', 'timestamp' = 'event_ts'
  8. );

The second Relation represents a thermometer sensor’s readings. The fields include an identifier for the thermometer, the temperature reading, and a timestamp for when the temperature was taken that is represented as epoch milliseconds. Later on, the event timestamp will be used when joining with the truck_info Stream. We will define a Changelog for this data using sensor_id as the primary key.

Create temperature_sensor Changelog:

  1. CREATE CHANGELOG temperature_sensor (
  2. "time" BIGINT,
  3. temperature_c INTEGER,
  4. sensor_id VARCHAR,
  5. PRIMARY KEY (sensor_id)
  6. ) WITH (
  7. 'topic' = 'temperature_sensor', 'value.format' = 'json', 'timestamp' = 'time'
  8. );

Using the Relations we have just defined, we want to find out what the latest temperature readings are in each truck. We can achieve this by using a temporal join to enrich our truck_info updates with the latest temperature readings from the temperature_sensor Changelog. The result of this join will be a Stream of enriched truck information updates with the latest temperature readings in the truck. The following SQL statement will launch a long-lived continuous query that will continually join these two Relations and write the results to a new Stream that is backed by a new Kafka topic.

Create truck_info_enriched Stream using CSAS:

  1. CREATE STREAM truck_info_enriched AS
  2. SELECT
  3. truck_info.event_ts,
  4. truck_info.truck_id,
  5. truck_info.speed_kmph,
  6. temp.sensor_id AS thermometer,
  7. temp.temperature_c
  8. FROM truck_info
  9. JOIN temperature_sensor temp
  10. ON truck_info.thermometer = temp.sensor_id;

While a truck fleet in a real-world environment will likely have many more sensors, such as cameras, humidity sensors, and others, we’ll keep this use case simple and just use a thermometer as the additional sensor. However, users could continue to enrich their truck information events with joins for each additional sensor data feed.

Use Case Part 1: Powering a real-time dashboard

Monitoring and health metrics are essential for managing a truck fleet. Being able to check on the status of particular trucks and generally see that trucks are doing fine can provide peace of mind for the truck fleet manager. This is where a real-time dashboard can be helpful – to have the latest metrics readily available on the status of the truck fleet.

So for our first use case, we’ll use Materialized Views to power a real-time dashboard. By materializing our truck_info_enriched Stream into a queryable view, we can build charts that can query the view and get the latest truck information. We’ll build the Materialized View in two steps. First we’ll define a new Changelog that mirrors the truck_info_enriched Stream, then we’ll create a Materialized View from this Changelog.

Create truck_info_enriched_changelog Changelog:

  1. CREATE CHANGELOG truck_info_enriched_changelog (
  2. event_ts BIGINT,
  3. truck_id INT,
  4. speed_kmph INT,
  5. thermometer VARCHAR,
  6. temperature_c INTEGER,
  7. PRIMARY KEY (truck_id)
  8. ) WITH (
  9. 'topic' = 'truck_info_enriched',
  10. 'value.format' = 'json'
  11. );

Create truck_info_mview Materialized View using CVAS:

  1. CREATE MATERIALIZED VIEW truck_info_mview AS
  2. SELECT * FROM truck_info_enriched_changelog;

Note that we could have created this Materialized View sourcing from the truck_info_enriched Stream, but if we created the Materialized View from the Stream, then each event would be a new row in the Materialized View (append mode). Instead we are building the Materialized View from a Changelog so that each event will add a new row or update an existing one based on the Changelog’s primary key (upsert mode). For our example, we only need to know the current status of each truck, so building the Materialized View with upsert mode better suits our use case.

A continuous query will power this Materialized View, constantly ingesting records from the truck_info_enriched Stream and sinking the results to truck_info_mview. Then, we can write queries to SELECT from the Materialized View. A dashboard can easily be built that simply queries this Materialized View to get the latest statuses for trucks. Here are some example queries that might be helpful when building a dashboard for the truck fleet.

Query to get truck IDs with the highest 10 temperatures:

  1. SELECT truck_id, temperature_c
  2. FROM truck_info_mview
  3. ORDER BY temperature_c DESC
  4. LIMIT 10;

Query to get all information about a truck:

  1. SELECT *
  2. FROM truck_info_mview
  3. WHERE truck_id = 3;

Query to count the number of trucks that are speeding:

  1. SELECT COUNT(truck_id) AS num_speeding_trucks
  2. FROM truck_info_mview
  3. WHERE speed_kmph > 90;

Use Case Part 2: Building a real-time alerting pipeline

While it’s great to be able to pull for real-time metrics for our truck fleet (Use Case Part 1), there are also situations where we may want the truck update events themselves to trigger actions. In our example, we’ve included thermometers as one of the sensors in each of our delivery trucks. Groceries, medicines, and some chemicals need to be delivered in refrigerated trucks. If the trucks aren’t able to stay within a desired temperature range, it could cause the items inside to go bad or degrade. This can be quite serious, especially for medicines and hazardous materials that can have a direct impact on people’s health.

For our second use case, we want to build out a streaming analytics pipeline to power an alerting service. We can use a CSAS to perform real-time stateful transformations on our data set, then sink the results into a new Stream backed by a Kafka topic. Then the sink topic will contain alertable events that the truck fleet company can feed into their alerting system or other backend systems. Let’s stick to our refrigeration example and write a query that detects if a truck’s temperature exceeds a certain threshold.

Create overheated_trucks Stream using CSAS:

  1. CREATE STREAM overheated_trucks AS
  2. SELECT * FROM truck_info_enriched WHERE temperature_c > 10;

Submitting this CSAS will launch a long-lived continuous query that ingests from the truck_info_enriched Stream, filters for only events where the truck’s temperature is greater than 10 degrees celsius, and sink the results to a new Stream called overheated_trucks. Downstream, the truck fleet company can ingest these records and send alerts to the correct teams or use these records to trigger actions in other backend systems.

Processing IoT Data with DeltaStream

IoT data can be challenging to process due to the high volume of data, the inherent real-time requirements of many IoT applications, and the distributed nature of collecting data from many different sources. While we often treat IoT use cases as their own category, they really span many sectors and use cases. That’s why using a real-time streaming platform, such as DeltaStream, that is able to keep up with the processing demands of IoT and can serve as both a streaming database and streaming analytics platform is essential.

If you want to learn more about how DeltaStream can help your business, schedule a demo with us. We also have a free trial available if you want to try out DeltaStream yourself.

06 Feb 2024

Min Read

Data Governance for Teams with RBAC

In our previous blog post, Streaming Data Governance and DeltaStream, we discussed the importance of Data Unification and Data Governance in a stream processing data platform. In particular, we highlighted the Streaming Catalog and Role-Based Access Control (RBAC) on the Streaming Catalog tools DeltaStream exposes to users to help them govern their streaming data. In this post, we’ll go over an example use case to show off how the Streaming Catalog and RBAC work in DeltaStream.

For our use case, let’s assume we are a company that needs to do real-time analytics for ads and marketing reports. In our setup, we have the following:

  • 3 streaming Stores – ”CC_kafka” (Confluent Cloud), “kafka_store”, “kinesis_store”
  • 2 Teams – ”Reports Team” and “Ads Team”
  • 1 Organization administrator

Notice in our setup that there are 3 different Stores, 2 Kafka Stores and 1 Kinesis Store. The data in these Stores do not belong to a single team, in fact each team may actually be responsible for data in multiple Stores. For instance, our “Ads Team” needs read and write access to one topic from each of the Stores (when we say “topic” we are referring to the topics in Kafka and the streams in Kinesis).

The goal of this use case is twofold. First, to unify and organize the streaming data from the 3 Stores so that the organization of the data aligns with the team structure. Second, to set up roles for each of the teams so that users belonging to those teams can easily be granted the appropriate access to the resources that pertain to their team.

Below is a visualization of our use case.

The Administrator Role

The administrator will have access to the sysadmin and useradmin built-in roles. These, along with the securityadmin and orgadmin roles, are special roles in DeltaStream with powerful privileges that should only be given to a handful of people in an organization. To solve our use case, our administrator will first assume the useradmin role to create the appropriate roles that specific team members will be granted access to. Then, the administrator needs to use the sysadmin role to set up the Streaming Catalog and define Stores for our data sources, as well as grant the appropriate permissions for the roles created by the useradmin.

useradmin

The useradmin role has privileges to manage users and roles within the Organization. The administrator will assume the useradmin role to create new custom roles for our “Reports Team” and “Ads Team.”

We can switch to the useradmin role using the USE ROLE command before we start creating custom roles.

  1. USER ROLE useradmin;

Following the best practices for creating custom roles, we will build out a role hierarchy where sysadmin is the top-most role. The below diagram illustrates the hierarchy of roles.

The following statements create roles to match the diagram in Figure 2:

  1. CREATE ROLE "MarketingRole" WITH (IN ROLE (sysadmin));
  2.  
  3. CREATE ROLE "ContentRole" WITH (IN ROLE ("MarketingRole"));
  4.  
  5. CREATE ROLE "ReportsRole" WITH (IN ROLE ("MarketingRole"));
  6.  
  7. CREATE ROLE "AdsRole" WITH (IN ROLE (sysadmin));
  8.  
  9. CREATE ROLE "TrafficRole" WITH (IN ROLE ("AdsRole"));

Although we’ve created the roles, we haven’t actually assigned them any permissions. We’ll do this using the sysadmin role in the next section.

To invite our team members to our Organization, we can use the INVITE USER command. The following statement invites a new user on the “Ads” team and grants them the new “AdsRole” role.

  1. INVITE USER 'ads_[email protected]' WITH ('roles'=("AdsRole"), 'default'="AdsRole");

Similarly, we can invite a new user on the “Reports” team and assign the “ReportsRole” role to them.

  1. INVITE USER 'reports_[email protected]' WITH ('roles'=("ReportsRole"), 'default'="ReportsRole");

sysadmin

The sysadmin role has privileges to create, manage, and drop objects. As the administrator, we’ll be using this role to do the following:

  1. Add the connectivity to our data storage systems (ie. Kafka and Kinesis) by creating Stores
  2. Set up the Databases and Schemas in the Streaming Catalog to provide the organizational framework for step 3
  3. Define Relations for the topics in our Stores and assign them to the correct Database and Schema
  4. Grant access to these Databases and Schemas to the appropriate roles

Before we begin, let’s ensure that we are using the sysadmin role.

  1. USE ROLE sysadmin;

First, we’ll define the Stores for our data. Since we can’t share our real Kafka or Kinesis connection configurations, the below SQL statement is a template for the CREATE STORE statement (CREATE STORE documentation).

  1. CREATE STORE kafka_store WITH (
  2. 'type' = KAFKA, 'access_region' = "AWS us-east-1",
  3. 'kafka.sasl.hash_function' = PLAIN,
  4. 'kafka.sasl.password' = '',
  5. 'kafka.sasl.username' = '',
  6. 'uris' = ''
  7. );

The next step is to create Databases and Schemas for our Streaming Catalog. As you can see in Figure 1 above, there will be two Databases – ”Marketing” and “Ads”. Within the “Marketing” Database, there exists a “Content” Schema and a “Reports” Schema. Within the “Ads” Database, there exists a single “Traffic” Schema.

  1. CREATE DATABASE "Marketing";
  2.  
  3. CREATE SCHEMA "Content" IN DATABASE "Marketing";
  4.  
  5. CREATE SCHEMA "Reports" IN DATABASE "Marketing";
  6.  
  7. CREATE DATABASE "Ads";
  8.  
  9. CREATE SCHEMA "Traffic" IN DATABASE "Ads";

Now that we have the namespaces in our Streaming Catalog set up, we can move on to our third task of defining Relations backed by the topics in our Stores to populate the Streaming Catalog. As you can see in Figure 1 above, there are many topics that exist in our Stores, and thus many Relations that need to be written. For the sake of brevity, we’ll just provide one example statement for CREATE STREAM (tutorial on creating Relations).

  1. CREATE STREAM "Marketing"."Reports".reports_data (
  2. col0 BIGINT, col1 VARCHAR, col2 VARCHAR
  3. ) WITH (
  4. 'store' = 'cc_kafka', 'topic' = 'reporting',
  5. 'value.format' = 'json'
  6. );

This CREATE STREAM statement is creating a Stream called “reports_data” in the “Reports” Schema, which is in the “Marketing” Database. This Stream has three fields, simply called “col0”, “col1”, and “col2”, and is backed by the topic “reporting” in the “cc_kafka” Store. Similar CREATE STREAM or CREATE CHANGELOG statements can be created for the other topics in the same Store or other Stores.

For our fourth task, we must now grant the custom roles, which were created by the useradmin in the previous section, access to the Databases and Schemas. Based on the diagram in Figure 1, the following statements will grant privileges to the correct data objects corresponding to the appropriate roles. The USAGE privilege is similar to read, and the CREATE privilege is similar to write.

  1. GRANT USAGE, CREATE ON DATABASE "Marketing" TO ROLE "MarketingRole";
  2. GRANT USAGE, CREATE ON SCHEMA "Marketing"."Content" TO ROLE "ContentRole";
  3. GRANT USAGE, CREATE ON SCHEMA "Marketing"."Reports" TO ROLE "ReportsRole";
  4. GRANT USAGE, CREATE ON DATABASE "Ads" TO ROLE "AdsRole";
  5. GRANT USAGE, CREATE ON SCHEMA "Ads"."Traffic" TO ROLE "TrafficRole";

Member of the Ads Team Role

As a new user on the “Reports” team, after accepting the invitation that the useradmin sent, I should expect the following:

  1. Access to the “ReportsRole” only
  2. Access to the “Ads” Database and “Traffic” Schema

By listing our roles in the DeltaStream CLI, we can see which role is currently being used:

  1. /# LIST ROLES;
  2.  
  3. Name | Current | Created at
  4. ----------------+---------+-----------------------
  5. ContentRole | | 2024-01-05T04:57:30Z
  6. ReportsRole | ✓ | 2024-01-05T04:57:30Z
  7. MarketingRole | | 2024-01-05T04:57:30Z

We can also describe the “AdsRole” role to see that “TrafficRole” is properly inherited:

  1. /# DESCRIBE ROLE "ReportsRole";
  2.  
  3. Name | Created at
  4. ------------+-----------------------
  5. ReportsRole | 2024-01-05T04:57:30Z
  6.  
  7. Granted Roles
  8.  
  9. Name
  10. ----------
  11. public
  12.  
  13.  
  14. Granted Privileges
  15.  
  16. Type | Target | ID/Name | Grant option
  17. -------+----------+-----------+---------------
  18. Usage | Database | Marketing |
  19. Usage | Schema | Reports |
  20. Create | Schema | Reports |

Finally, we can list the Databases and Schemas to see that we indeed have access to the “Ads” Database and “Traffic” Schema. Note that the “Marketing” Database is not visible, because only the “MarketingRole” role and any roles that inherit from the “MarketingRole” have access to that Database.

  1. /# LIST DATABASES;
  2.  
  3. Name | Default | Owner | Created at | Updated at
  4. ----------+---------+----------+----------------------+-----------------------
  5. Marketing | | sysadmin | 2024-01-04T23:12:15Z | 2024-01-04T23:12:15Z
  6.  
  7.  
  8. /# LIST SCHEMAS IN DATABASE "Marketing";
  9.  
  10. Name | Default | Owner | Created at | Updated at
  11. --------+---------+----------+----------------------+-----------------------
  12. Reports | | sysadmin | 2024-01-04T23:12:15Z | 2024-01-04T23:12:15Z

Conclusion

RBAC is one of DeltaStream’s core features that manages the access to different data objects in DeltaStream. In this example, we show off how different roles can be created to match an organization’s team structure. This way, giving permissions to specific roles inherently gives permissions to entire teams. While we focused on RBAC in the context of DeltaStream’s Streaming Catalog, giving access to Databases and Schemas in particular, RBAC can also be applied to other data assets such as Stores, Descriptors, and Queries.

If you want to learn more about DeltaStream’s RBAC, or try it for yourself, get a free trial.

13 Dec 2023

Min Read

Detecting Suspicious Login Activity with Stream Processing

Cybersecurity is challenging, but it’s one of the most important components of any digital business. Cyberattacks can cause disruptions to your application and put your users into harm’s way. Successful cyberattacks can result in things like identity and credit card theft, which can have a very tangible effect on people’s lives and reputations. With regulations such as the General Data Protection Regulation (GDPR), businesses can in fact be fined for lackluster cybersecurity (e.g. Cathay Pacific fined £500k by UK’s ICO over data breach disclosed in 2018).

One of the most popular tools for cybersecurity is stream processing. For most cyber threats, responsiveness is crucial to prevent or minimize the impact of an attack. In this post, we’ll show how DeltaStream can be used to quickly identify suspicious login activity from a stream of login events. By identifying suspicious activity quickly, follow-up actions such as freezing accounts, sending notifications to account owners, and involving security teams can happen right away.

Setting up your Data Stream

We’ll assume that a Kafka Store has already been set up with a topic called login_attempts. The records in this topic contain failed login events. Before we get to our use case, we need to set up a Stream that is backed by this topic. We’ll use this Stream later on as the source data for our use case.

CREATE STREAM DDL to create the login_attempts Stream:

Cybersecurity Use Case: Detecting Suspicious User Login Activity

For our use case, we want to determine if a user is attempting to gain access to accounts they are not authorized to use. One common way attackers will try to gain access to accounts is by writing scripts or having bots attempt to log in to different accounts using commonly used passphrases. We can use our stream of login data to detect these malicious users. Based on our source Stream, we have fields ip_address and user_agent which can identify a particular user. The account_id field represents the account that a user is trying to log in to. If a particular user attempts to log in to 3 unique accounts in the span of 1 minute, then we want to flag this user as being suspicious. The following query does this by utilizing OVER Aggregation and writing the results into a new Stream called suspicious_user_login_activity.

Create Stream As Select Query

CSAS to create suspicious_user_login_activity Stream:

In this query’s subquery, an OVER aggregation is used to count the number of unique accounts that a particular user has attempted to log in to. The outer query then filters for results where the projected field of the aggregation, num_distinct_accounts_user_login_attempted, is equal to 3. Thus, the output of the entire query contains the IP address and user agent information for suspicious users who have attempted to log in to 3 different accounts within a 1 minute window. The resulting event stream can then be ingested by downstream applications for further review or actions.

By submitting this SQL statement, a long-lived continuous query will be launched in the background. This continuous query will constantly ingest from the source Stream as new records arrive, process the data, then write the results to the sink Stream instantaneously. Any downstream applications reading from this sink Stream will then be able to act on these suspicious users right away.

Create Stream As Select Query Results

To get a better understanding of how the CSAS query behaves, we can inspect some records from our source login_attempts Stream and our results suspicious_user_login_activity Stream.

Records in the source Stream login_attempts:

Records in the sink Stream suspicious_user_login_activity:

In the results Stream, there is a record for a Windows user who tried to log in to 3 different accounts. Inspecting the source Stream, we can see that records 3 through 5 are associated with that output. Records 1, 2, and 6 also are from the same Android user, but this user only attempted to log in to 2 unique accounts, so there is no output record for this user since we don’t deem this activity as suspicious.

The Power of Stream Processing and Cybersecurity

Streaming and stream processing capabilities are incredibly helpful for tackling cybersecurity challenges. Having systems and processes that act on events with minimal latency can be the difference between a successful or unsuccessful cyber attack. In this post, we showcased one example of how DeltaStream users can set up and deploy a streaming analytics pipeline to detect cyber threats as they’re happening. While this example is relatively simple, DeltaStream’s rich SQL feature set is capable of handling much more complex queries to support all kinds of cybersecurity use cases.

DeltaStream is the platform to unify, process, and govern all of your streaming data. If you want to learn more about DeltaStream, sign up for a free trial or schedule a demo with us.

20 Nov 2023

Min Read

Analyzing Real-Time NYC Bus Data with DeltaStream

Most of us who have lived in a big city have had some experience taking public transportation. While it’s extremely satisfying when everything works out, I know I’m not alone in my frustrations when subway time estimates aren’t accurate or buses just never show up. Standing there and waiting for a bus or train can be very stressful as you begin to wonder if the train has already left, if this is going to impact your transfer to another bus, or if you’ll make it to your destination on time (especially stressful if you’re trying to catch a plane). One way that Google is playing a part in improving some of these challenges is with the General Transit Feed Specification (GTFS) and its real-time counterpart, GTFS Realtime. GTFS helps bring standardization to transit feeds by providing a framework for transportation companies to submit feeds and for developers to write applications to process these feeds.

In this blog post, we’ll be showcasing how you can use DeltaStream to process New York City’s real-time bus feed, which adopts the GTFS realtime specification, to identify buses that are becoming increasingly delayed in real time.

Setting up DeltaStream 

To set up our use case, first we need to load the bus data into Kafka:

  1. Sign up for an MTA BusTime Developer Key
  2. Have Kafka or a Kafka-compatible integration such as a Confluent or RedPanda cluster available, Confluent Cloud and RedPanda both offer free trials of their product
  3. Clone this github repository then follow the instructions to build and run a Java program which will poll the bus feed and forward the events into your Kafka cluster

DeltaStream for real-time processing

Now that we have our bus feed data in Kafka, we can use DeltaStream to process the data. If you are new to DeltaStream and don’t have an account, you can sign up for a free trial here.

Connect to and inspect source data

First, add your Kafka cluster as a Store in DeltaStream. Adding the Store defines the connectivity between DeltaStream and your storage layer, in this case a Kafka cluster. You can choose any name you want for your Store, but for this use case let’s assume the Store is called “kafka_store”. From here, we can inspect the topics by printing them. The two topics we’ll be using for our example are the “nyc_bus_trip_updates” and “nyc_bus_vehicle_positions” topics.

Print the nyc_bus_trip_updates topic:

  1. db.public/kafka_store# PRINT TOPIC nyc_bus_trip_updates;
  2. {
  3. "trip": {
  4. "tripId": "FP_D3-Weekday-SDon-103300_B13_6",
  5. "startDate": "20231030",
  6. "routeId": "Q55",
  7. "directionId": 1
  8. },
  9. "stopTimeUpdate": [{
  10. "stopSequence": 0,
  11. "arrival": {
  12. "time": "1698701100"
  13. },
  14. "departure": {
  15. "time": "1698701100"
  16. },
  17. "stopId": "504354"
  18. }, {
  19. "stopSequence": 1,
  20. "arrival": {
  21. "time": "1698701144"
  22. },
  23. "departure": {
  24. "time": "1698701144"
  25. },
  26. "stopId": "504434"
  27. }, …],
  28. "vehicle": {
  29. "id": "MTA NYCT_1234"
  30. },
  31. "timestamp": "1698699097",
  32. "delay": 385
  33. }

Print the nyc_bus_vehicle_positions topic:

  1.  
  2. db.public/kafka_store# PRINT TOPIC nyc_bus_vehicle_positions;
  3. {
  4. "trip": {
  5. "tripId": "FP_D3-Weekday-SDon-103300_B13_6",
  6. "startDate": "20231030",
  7. "routeId": "Q55",
  8. "directionId": 1
  9. },
  10. "position": {
  11. "latitude": 40.69352,
  12. "longitude": -73.990486,
  13. "bearing": 63.434948
  14. },
  15. "timestamp": "1698700533",
  16. "stopId": "504434",
  17. "vehicle": {
  18. "id": "MTA NYCT_1234"
  19. }
  20. }

We can define Streams for nyc_bus_trip_updates and for nyc_bus_vehicle_positions with the following queries.

DDL to create nyc_bus_trip_updates Stream:

  1. CREATE STREAM nyc_bus_trip_updates (
  2. trip STRUCT <
  3. "tripId" VARCHAR,
  4. "startDate" VARCHAR,
  5. "routeId" VARCHAR,
  6. "directionId" TINYINT >,
  7. "stopTimeUpdate" ARRAY <
  8. STRUCT <
  9. "stopSequence" INTEGER,
  10. "arrival" STRUCT <
  11. "time" BIGINT >,
  12. "departure" STRUCT <
  13. "time" BIGINT >,
  14. "stopId" INTEGER >>,
  15. vehicle STRUCT <
  16. id VARCHAR >,
  17. "timestamp" BIGINT,
  18. delay INTEGER
  19. ) WITH ('topic' = 'nyc_bus_trip_updates', 'value.format'='JSON');

DDL to create nyc_bus_vehicle_positions Stream:

  1. CREATE STREAM nyc_bus_vehicle_positions (
  2. trip STRUCT <
  3. "tripId" VARCHAR,
  4. "startDate" VARCHAR,
  5. "routeId" VARCHAR,
  6. "directionId" TINYINT >,
  7. "position" STRUCT <
  8. "latitude" DOUBLE,
  9. "longitude" DOUBLE,
  10. "bearing" DOUBLE>,
  11. vehicle STRUCT <
  12. id VARCHAR >,
  13. "timestamp" BIGINT,
  14. "stopId" INTEGER
  15. ) WITH ('topic' = 'nyc_bus_vehicle_positions', 'value.format'='JSON');

Notice that both feeds have a field called trip which represents a particular trip a bus is taking. We’ll be using this field to join these Streams later on.

Also, since the timestamp field is given as epoch seconds, let’s make our data easier to read by defining new Streams that convert these fields to timestamps. We can do this with the following CREATE STREAM AS SELECT (CSAS) queries:

CSAS to create trip_updates:

  1. CREATE STREAM trip_updates AS
  2. SELECT
  3. trip,
  4. "stopTimeUpdate",
  5. vehicle,
  6. CAST(FROM_UNIXTIME("timestamp") AS TIMESTAMP) AS ts,
  7. "timestamp" AS epoch_secs,
  8. delay
  9. FROM
  10. nyc_bus_trip_updates;

CSAS to create vehicle_positions:

  1. CREATE STREAM vehicle_positions AS
  2. SELECT
  3. trip,
  4. "position",
  5. vehicle,
  6. CAST(FROM_UNIXTIME("timestamp") AS TIMESTAMP) AS ts,
  7. "stopId"
  8. FROM
  9. nyc_bus_vehicle_positions;

Detecting transportation delays

You may have noticed in the nyc_bus_trip_updates topic that there is a field called delay. This field represents the number of seconds that a bus is currently delayed from its original route. Reporting this data is really helpful, as it provides transparency to transit-goers on how late they’re going to be or how long they may need to wait for the bus. However, what’s not always clear is if delays are increasing. For our use case, this is exactly what we want to detect. Once we detect which bus trips are becoming increasingly delayed, we also want to provide additional information about where the bus is and where the bus has recently been so that city officials and bus planners can see where delays may be occurring.

Processing real-time bus data

For our use case, we’ll be splitting up the processing into two queries.

In the first query, we will analyze the trip_updates Stream to find trips where delays are significantly increasing. We consider three consecutive trip updates that each grow in delay by 30 seconds to be significant, so we can write a pattern recognition query to detect trips that match our requirements. Those trips will then be written to a Stream to be used as the input for our second query.

Query 1:

  1. CREATE STREAM trips_delay_increasing AS
  2. SELECT
  3. trip,
  4. vehicle,
  5. start_delay,
  6. end_delay,
  7. start_ts,
  8. end_ts,
  9. CAST(FROM_UNIXTIME((start_epoch_secs + end_epoch_secs) / 2) AS TIMESTAMP) AS avg_ts
  10. FROM trip_updates
  11. MATCH_RECOGNIZE(
  12. PARTITION BY trip
  13. ORDER BY "ts"
  14. MEASURES
  15. C.row_timestamp AS row_timestamp,
  16. C.row_key AS row_key,
  17. C.row_metadata AS row_metadata,
  18. C.vehicle AS vehicle,
  19. A.delay AS start_delay,
  20. C.delay AS end_delay,
  21. A.ts AS start_ts,
  22. C.ts AS end_ts,
  23. A.epoch_secs AS start_epoch_secs,
  24. C.epoch_secs AS end_epoch_secs
  25. ONE ROW PER MATCH
  26. AFTER MATCH SKIP TO LAST C
  27. PATTERN (A B C)
  28. DEFINE
  29. A AS delay > 0,
  30. B AS delay > A.delay + 30,
  31. C AS delay > B.delay + 30
  32. ) AS MR WITH ('timestamp'='ts')
  33. QUERY WITH ('state.ttl.millis'='3600000');

Pattern recognition query to find bus trips that are increasing in delay

In the second query, we will join the output of our first query with the vehicle_positions Stream on the trip field. When joining two Streams, we need to specify a WITHIN interval as part of the join condition (these kinds of joins are called Interval Joins). For our query, we’ll specify the timestamp field to be avg_ts, the middle point in our increasing delay interval that we identified from the first query. We’ll also use 3 minutes for our WITHIN interval, meaning positions for a trip with a timestamp 3 minutes before and 3 minutes after avg_ts will satisfy the join condition. The resulting records of this query will represent the positions of buses that are part of delayed trips.

Query 2:

  1. CREATE STREAM delayed_trip_positions AS
  2. SELECT
  3. t.trip,
  4. t.vehicle,
  5. t.start_delay,
  6. t.end_delay,
  7. t.start_ts,
  8. t.end_ts,
  9. p."position",
  10. p.ts AS position_ts
  11. FROM
  12. trips_delay_increasing t WITH ('timestamp'='avg_ts')
  13. JOIN vehicle_positions p WITH ('timestamp'='ts')
  14. WITHIN 3 MINUTES
  15. ON t.trip = p.trip;

Interval join query to join the bus trips that are growing in delay with bus locations

By submitting these queries, DeltaStream has launched long-lived jobs that will continually read from their sources, transform the data, then write to their sinks. So, as bus data arrives in our Kafka topics, we can expect data processing to happen immediately and the results to arrive nearly instantaneously.

Inspecting real-time results

Let’s inspect the contents of these Streams to see how our queries behaved.

Data in our source trip_updates Stream:

  1.  
  2. SELECT * FROM trip_updates WITH ('starting.position'='earliest');
  3. | {"trip":{"tripId":"CS_D3-Weekday-SDon-124200_Q28_655","startDate":"20231031","routeId":"Q28","directionId":1},"stopTimeUpdate":[...],"vehicle":{"id":"MTA NYCT_8865"},"ts":"2023-11-01 01:39:01","epoch_secs":1698802741,"delay":1689}
  4. | {"trip":{"tripId":"CS_D3-Weekday-SDon-124200_Q28_655","startDate":"20231031","routeId":"Q28","directionId":1},"stopTimeUpdate":[...],"vehicle":{"id":"MTA NYCT_8865"},"ts":"2023-11-01 01:41:31","epoch_secs":1698802891,"delay":1839}
  5. | {"trip":{"tripId":"CS_D3-Weekday-SDon-124200_Q28_655","startDate":"20231031","routeId":"Q28","directionId":1},"stopTimeUpdate":[...],"vehicle":{"id":"MTA NYCT_8865"},"ts":"2023-11-01 01:45:01","epoch_secs":1698803101,"delay":2049}

Data in our source vehicle_positions Stream:

  1. SELECT * FROM vehicle_positions WITH ('starting.position'='earliest');
  2. | {"trip":{"tripId":"CS_D3-Weekday-SDon-124200_Q28_655","startDate":"20231031","routeId":"Q28","directionId":1},"position":{"latitude":40.76075,"longitude":-73.8282,"bearing":13.835851},"vehicle":{"id":"MTA NYCT_8865"},"ts":"2023-11-01 01:39:31","stopId":505121}
  3. | {"trip":{"tripId":"CS_D3-Weekday-SDon-124200_Q28_655","startDate":"20231031","routeId":"Q28","directionId":1},"position":{"latitude":40.76072,"longitude":-73.82832,"bearing":13.835851},"vehicle":{"id":"MTA NYCT_8865"},"ts":"2023-11-01 01:42:31","stopId":505121}
  4. | {"trip":{"tripId":"CS_D3-Weekday-SDon-124200_Q28_655","startDate":"20231031","routeId":"Q28","directionId":1},"position":{"latitude":40.76073,"longitude":-73.828285,"bearing":13.835851},"vehicle":{"id":"MTA NYCT_8865"},"ts":"2023-11-01 01:45:01","stopId":505121}
  5.  

The results of Query 1 in our trips_delay_increasing Stream:

  1.  
  2. SELECT * FROM trips_delay_increasing WITH ('starting.position'='earliest');
  3. | {"trip":{"tripId":"CS_D3-Weekday-SDon-124200_Q28_655","startDate":"20231031","routeId":"Q28","directionId":1},"vehicle":{"id":"MTA NYCT_8865"},"start_delay":1689,"end_delay":2049,"start_ts":"2023-11-01 01:39:01","end_ts":"2023-11-01 01:45:01","avg_ts":"2023-11-01 01:42:01"}

The results of Query 2 in our delayed_trip_positions Stream:

  1.  
  2. SELECT * FROM delayed_trip_positions WITH ('starting.position'='earliest');
  3. | {"trip":{"tripId":"CS_D3-Weekday-SDon-124200_Q28_655","startDate":"20231031","routeId":"Q28","directionId":1},"vehicle":{"id":"MTA NYCT_8865"},"start_delay":1689,"end_delay":2049,"start_ts":"2023-11-01 01:39:01","end_ts":"2023-11-01 01:45:01","position":{"latitude":40.76075,"longitude":-73.8282,"bearing":13.835851},"position_ts":"2023-11-01 01:39:31"}
  4. | {"trip":{"tripId":"CS_D3-Weekday-SDon-124200_Q28_655","startDate":"20231031","routeId":"Q28","directionId":1},"vehicle":{"id":"MTA NYCT_8865"},"start_delay":1689,"end_delay":2049,"start_ts":"2023-11-01 01:39:01","end_ts":"2023-11-01 01:45:01","position":{"latitude":40.76072,"longitude":-73.82832,"bearing":13.835851},"position_ts":"2023-11-01 01:42:31"}
  5. | {"trip":{"tripId":"CS_D3-Weekday-SDon-124200_Q28_655","startDate":"20231031","routeId":"Q28","directionId":1},"vehicle":{"id":"MTA NYCT_8865"},"start_delay":1689,"end_delay":2049,"start_ts":"2023-11-01 01:39:01","end_ts":"2023-11-01 01:45:01","position":{"latitude":40.76073,"longitude":-73.828285,"bearing":13.835851},"position_ts":"2023-11-01 01:45:01"}

In the results above, we can see the trip with a tripId of CS_D3-Weekday-SDon-124200_Q28_655 with increasing delays in a short period of time. Our first query identifies that this trip’s delay is growing, and outputs a record for this trip. Our second query ingests that record and finds the vehicle positions at the time of the delay.

By plotting the positions seen in our results for delayed_trip_positions above, we get the following map:

There must be some traffic on 39th Avenue!

In this example, we’ve highlighted two ways real-time processing can help provide a better experience to public transit users:

  1. Having real-time data about growing delays can help provide more accurate time estimates on bus arrival times
  2. Insights into locations where bus delays grow can help city planners better understand and improve traffic patterns in their cities

Conclusion

The GTFS real-time data feeds are great for building real-time transit applications. However, for any real-time computations that are complex or require stateful operations, it can be difficult. In this blog post, we showcased how you can easily build stateful real-time jobs on top of this data feed in minutes using DeltaStream. Further, as a fully managed serverless product, DeltaStream handles all of the operational overhead of running long-lived stream processing jobs.

If you want to learn more about DeltaStream or try it for yourself, you can request a demo or join our free trial.

18 Jul 2023

Min Read

Create Stream Processing Pipelines with Superblocks and DeltaStream

In the previous installments of the DeltaStream 101 blog series, we’ve covered the basics of creating stores and defining different types of relations such as Streams, Changelogs and Materialized views on top of the data records in streaming stores. In this blog post, we look into how we can quickly create an application to explore and visualize real-time changes in our data via integrating DeltaStream with a third party data visualization platform such as Superblocks.

Let’s assume we are creating a live dashboard for a delivery company to let them track their vehicles and drivers in real-time. Each driver is identified via a unique identifier. Similarly, one can refer to a given vehicle using its unique ID. The company’s vehicles are equipped with sensors and GPS that keep sending various information about the vehicles to a configured streaming store, in this case an Apache Kafka cluster. The company is interested in tracking its drivers and vehicles, at any given time, via a live feed on a map, using either a vehicle’s or driver’s ID. The map shows the latest location of a vehicle based on its most recent speed and geographical location.

First, let’s take a look at an example of a “navigation record”, in the JSON format, which captures information about a driver, his/her vehicle along with its latest reported location in our system. A stream of such records, collected from different vehicles, is continuously ingested into a topic, called “navigation”, in a Kafka store.

  1. {
  2. "driver": {
  3. "id": "38200",
  4. "name": "James Smith",
  5. "license_id": "TX42191S",
  6. "car": {
  7. "id": "21820700",
  8. "model": "Ford",
  9. "plate": "2HTY910",
  10. "is_electric": false
  11. }
  12. },
  13. "route": [
  14. {
  15. "location": {
  16. "latitude": 38.083128,
  17. "longitude": -121.472887
  18. },
  19. "speed": 64
  20. },
  21. {
  22. "location": {
  23. "latitude": 38.339525,
  24. "longitude": -123.253794
  25. },
  26. "speed": 72
  27. }
  28. ]
  29. }

Sample navigation record in Json

DSQL statements to access and query the data

As the first step, we need to create a “store” in DeltaStream to access the data in the Kafka topic that collects navigation records. We also assume we have already defined our database and schema. Our previous blog post covers details on how one can define them.

In order to query the data, we create a stream, called “navigation”. In DeltaStream, we use the “STRUCT” data type to define records with nesting, and the “ARRAY” data type is used to define an ordered collection of data items of the same type.  As you can see in the DDL statement, shown in Statement 1 below, the navigation stream has two columns: ‘driver’ and ‘route’. The driver column’s data type is a struct, whose fields capture information on a driver’s ID, fullname, license_id and his/her car. The car field is defined as a nested struct, inside the driver’s struct, and shows various information about the driver’s car such as its id, model, etc. The route column lists a sequence of data items which report the latest location and speed of the driver’s car. Therefore, the data type for the route column is defined as an array of structs where each struct has two fields: location and speed. The location field is a nested struct containing latitude and longitude values, collected by the vehicle’s GPS, and the speed field is defined as integer.

  1. CREATE STREAM navigation (
  2. driver STRUCT<id VARCHAR, fullname VARCHAR, license_id VARCHAR, car<id VARCHAR, model VARCHAR, plate VARCHAR, is_electric BOOLEAN>>,
  3.  
  4. route ARRAY<STRUCT<location STRUCT<latitude DOUBLE, longitude DOUBLE>, speed INTEGER>>
  5. ) WITH ('topic'='navigation', 'value.format'='json');

Statement 1. DDL to define navigation stream.

Now that we can access the data in Kafka, using the navigation stream we just created, we run a CSAS statement to extract the columns and nested fields that are relevant to our application. We are going to query the latest navigation information about a driver or car, using their IDs. Hence, we need to select the driver’s ID and his/her car’s ID from the driver column. We also select the driver’s name to show it on the dashboard. We pick the first element of the route column to show the latest location and speed of the car on the map. We unnest location coordinates and show them as separate columns along with the speed in the new stream. As you can see in Statement 2, in DeltaStream the `->` operator is used to access fields of a struct. Moreover, given that arrays are one-indexed (i.e., the first element of an array is at index 1), route[1] is fetching the very first element from a given route array.

  1. Create Stream flat_data AS
  2. SELECT driver->id AS driver_id,
  3. driver->fullname AS driver_name,
  4. driver->car->id AS car_id,
  5. route[1]->location->latitude AS latitude,
  6. route[1]->location->longitude AS longitude,
  7. route[1]->speed AS speed
  8. FROM navigation;

Statement 2. CSAS to define flat_data stream

For a given driver or car ID, we can run a query on the flat_data stream and get the latest relevant data we need to show on the map. We are going to use Superblocks to visualize the location of the queried drivers or cars. Currently, Superblock’s available APIs do not let us directly send the result of our query to update the map. We can achieve this by creating a materialized view on top of the flat_data stream. Moreover, given that we are only interested in the most recent location of a car or a driver, when showing it on the map, we need to make sure our materialized view ingests the data in the “upsert” mode. This way, if a new record is inserted into the materialized view for an existing driver or car id, it overwrites the current record and updates the materialized view. We can use a changelog in DeltaStream to interpret the records in a given topic in the upsert mode. You can use the DDL statement in Statement 3 to define such a changelog. We define the primary key for the changelog as a composite key, using the driver_id and car_id columns.

  1. CREATE CHANGELOG navigation_logs (
  2. driver_id VARCHAR,
  3. car_id VARCHAR,
  4. driver_name VARCHAR,
  5. latitude DOUBLE,
  6. longitude DOUBLE,
  7. speed INTEGER,
  8. PRIMARY KEY (driver_id, car_id)
  9. ) WITH ('topic'='flat_data', 'value.format'='json');

Statement 3. DDL to define navigation_logs Changelog

Our final step to prepare the data for Superblocks is creating a materialized view, called “naviagtion_view”, by selecting all records from the navigation_logs changelog defined above. Now, for a given driver or car id, we can run a simple filter query on the navigation_view to fetch the latest location coordinates and speed of the queried driver or car. This query’s result is directly usable by Superblock to update the map on our dashboard.

  1. CREATE MATERIALIZED VIEW navigation_view AS
  2. SELECT * FROM navigation_logs;

Statement 4. Statement to define navigation_view Materialized View

Visualize the data using Superblocks

Now, let’s use Superblocks to visualize the location of drivers and cars on a map, in real time. We can achieve this by creating an application in Superblocks which fetches the latest location of a driver or car from the naviagation_view Materialized View, we defined above.

Generate API Token in DeltaStream

DeltaStream uses Api Tokens to authenticate third-party applications and let them run queries and access the results securely. In order to generate an Api Token, on your DeltaStream Home page, click on your avatar icon on the main navigation bar, and under your profile select the “Api Token” tab. You need to pick a name for the new token and DeltaStream will generate it for you. Let’s call our new Api Token “SuperblocksToken”. You won’t be able to access the content of a generated token once you exit this page; Therefore make sure you download the new token and save it in a safe place for future reference.

Create a Superblocks Application

Next step is creating a Superblocks application and connecting it to DeltaStream. Our new application receives the driver and car ids as inputs from the user, generates a DSQL query and submits it to DeltaStream to fetch the latest location of the car from the Materialized View. It then shows this location on a map. Login into your Superblocks account, and select the “new application” option to create one.

First step is defining the input fields. Use the “Components” panel on the left to create two input boxes, named “Driver” and “Car”, and give them proper labels and placeholders. Make sure both fields are set as required.

Next step is creating the “Submit” button for the application. The Submit button calls DeltaStream’s REST API to run new queries and get their results. It puts the DeltaStream API token, generated before, in the header of requests to authenticate for secure access. For this purpose, add a new button to the application and set its click handler to be a new REST API. The API should be configured as below to connect to DeltaStream:

  • Method: POST
  • URLhttps://api.deltastream.io/run-statement
  • Headers: Authorization: Bearer <YOUR-API-TOKEN>
  • Body Content Type: Form
  • Set Form Data as following:
    • orgID: <YOUR-ORGANIZATION-ID>
    • roleName: sysadmin
    • storeName: <YOUR-STORE-NAME>
    • databaseName: <YOUR-DATABASE-NAME>
    • schemaName: <YOUR-SCHEMA-NAME>
    • statement: select * from navigation_views where driver_id = ‘{{Input1.value}}’ and car_id = ‘{{Input2.value}}’;

You can check your organization ID in your DeltaStream account’s Home page. Click on your avatar icon and find it under the “organizations” tab.

As you can see, the “Statement” defined in the above REST API configuration is  the DSQL query that is generated using the input values for the Driver and Car ids. Superblocks generates this query each  time the “Submit” button is clicked and sends it to the configured DeltaStream’s endpoint. Go ahead and set valid input values for the Driver and Car ids and submit a request. Once the query is run successfully, DeltaStream returns its results, wrapped in a list. In order to show the latest location of the driver, we only need the very first element in that list. We define a function to pick that element from the returned results. Click on the plus button on the left panel and select the ”Javascript Function” option. This opens a new panel with a placeholder to add code for defining a new function. Set the function code as: “return Step1.output[0].data;”.

The very last step is creating a map to show the fetched location on it. For this purpose, select the “Map” option from the component panel and configure it as:

  • Initial location: {{API1.response[0]}}
  • Default markers: {{API1.response}}

Now, once you put valid values for the driver and car ids in the input field boxes and submit a request, the latest location of the car is marked on the map.

22 May 2023

Min Read

Denormalizing Distributed Datasets in Real-Time

While a distributed data mesh empowers teams in a company to securely build modern applications as they reduce data dependency, it also poses challenges for non-product teams. Certain teams within a company may require access to anonymous and denormalized data to further grow the business. In this post, we will take a look at how such teams can use DeltaStream to capture the data they need to do their work, while the data owners control the security of their data.

Training Machine Learning Models

For the purpose of this exercise, let’s assume a Machine Learning team needs access to anonymous user data for building models to reduce fraud in a financial institution based on frequency and location of payments made to an account. This team stores their data in a topic in an Apache Kafka cluster that is declared as a Store in DeltaStream:

  1. mldb.product/mlai_msk# LIST STORES;
  2. Name | Kind | Access Region | Metadata | Owner | Created at | Updated at
  3. ---------------+---------+---------------+----------+----------+----------------------+-----------------------
  4. mlai_msk | Kafka | AWS us-east-1 | {} | sysadmin | 2023-01-12T20:38:16Z | 2023-01-12T20:38:16Z

and we already have access to the payments made by `payerid` over time:

  1. CREATE STREAM payments_log (
  2. paymenttime BIGINT,
  3. payerid VARCHAR,
  4. accountid VARCHAR,
  5. paymentid VARCHAR
  6. ) WITH (‘topic’=’topic_name’, 'value.format'='JSON');

DDL 1: `payments_log` definition for the `payments_log` Kafka topic

`DDL 1` defines the running log of each payment through the product that was created using the `CREATE STREAM` statement. The `payments_log` references the `accountId` that is the recipient of each payment, and `paymentId` that includes extra payment information.

In addition to frequency of payments made to a specific `accountid`, we also need to take into account the location that payments are being made from so the training model can better detect anomalies over time. We will expand on this in the next section.

Sharing Anonymous User Data

As the stream of payments are provided in the `payments_log` Stream above, we need to securely denormalize the `payerid` field to also include where the payments are coming from without exposing users’ sensitive information. This can be done by the team that owns the additional payer information, identified by a `userid` and described by the following Changelog in the `userdb.product` Schema:

  1. CREATE CHANGELOG userdb.product.users_log (
  2. registertime BIGINT,
  3. userid VARCHAR,
  4. regionid VARCHAR,
  5. contactinfo STRUCT<email VARCHAR, phone VARCHAR, city VARCHAR, country VARCHAR>,
  6. PRIMARY KEY(userid)
  7. ) WITH ('topic'='users', 'value.format'='json');

For simplicity, let’s assume all payers are registered as a user with the product. At this point, only the users team has access to the `userdb` Database, hence the `users_log` is not accessible by the Machine Learning team, for data security reasons. The users team is the owner of the `payments_log` data, so they have usage permissions and can read from and write to the Stream.

Using the following query, we can provide the anonymous user information to the Machine Learning team in real time:

  1. CREATE STREAM payments_location
  2. AS SELECT
  3. p.paymenttime AS paytime,
  4. u.registertime AS payer_register_time,
  5. u.regionid AS region,
  6. contactinfo->city AS payment_city,
  7. contactinfo->country AS payment_country,
  8. p.accountid AS payee,
  9. p.paymentid AS paymentid
  10. FROM payments p
  11. JOIN users_log u ON u.userid = p.payerid;

Query 1: Enrich payments with anonymous payer location info with a temporal join on `users_log.userid`

In `Query 1`, we are looking up the payer represented by `payerid` in the `users_log` Changelog identified by `userid`, and while doing that, we omitted `userid`, `contactinfo.email`, and `contactinfo.phone` as they were identified as Personally Identifiable Information (PII) by the users team, prevented this data from leaking outside of the `userdb` Database.

As a result of `Query 1`, a new `payments_location` Stream is created that provides the location information for each payment made to an account in addition to the existing payment information:

  1. CREATE STREAM payments_location (
  2. paytime BIGINT,
  3. payer_register_time BIGINT,
  4. region VARCHAR,
  5. payment_city VARCHAR,
  6. payment_country VARCHAR,
  7. payee VARCHAR,
  8. paymentid VARCHAR
  9. ) WITH (‘topic’=’topicname’, 'value.format'='json');

DDL 2: Underlying DDL for the denormalized `payments_location` in `Query 1`

`DDL 2` statement reveals how `payments_location` Stream was created when `Query 1` was launched.

Model Training with Real-Time Data

Now, let’s assume that additional payment information can be provided by the `paymentid` field, and by inspecting the `payments` Stream, the `chargeinfo` structure can be very useful to our fraud detection model:

  1. CREATE STREAM payments (
  2. id VARCHAR,
  3. chargeinfo STRUCT<vcc VARCHAR, amount FLOAT, type VARCHAR>,
  4. payer VARCHAR,
  5. payee VARCHAR,
  6. paymenttime BIGINT
  7. ) WITH (‘topic’=’topicname’, 'value.format'='json');

Using the `payments` DDL, the following query can be created to continuously provide the additional charge information to the ML team:

  1. CREATE STREAM payments_full
  2. AS SELECT
  3. pl.paytime AS paytime,
  4. pl.payer_register_time AS payer_register_time,
  5. pl.region AS region,
  6. pl.payment_city AS payment_city,
  7. pl.payment_country AS payment_country,
  8. pl.payee AS payee,
  9. p.chargeinfo AS charge
  10. FROM payments_location pl
  11. JOIN payments p ON p.id = pl.paymentid;

Query 2: Denormalize payment ID into charge information

In `Query 2`, we directly replaced the `paymentid` reference with the charge information to allow the model training pipeline to get the full picture for finding payment anomalies that may be occurring within our product. As a result, the `payments_full` Stream is created as such:

  1. CREATE STREAM payments_location (
  2. paytime BIGINT,
  3. payer_register_time BIGINT,
  4. region VARCHAR,
  5. payment_city VARCHAR,
  6. payment_country VARCHAR,
  7. payee VARCHAR,
  8. charge STRUCT<vcc VARCHAR, amount FLOAT, type VARCHAR>
  9. ) WITH (‘topic’=’topicname’, 'value.format'='json');

In addition to providing the right information to the model training pipeline, the pipeline is receiving this information in real-time where it can evolve faster over time, positively impacting the business.

What’s next?

In this post, we looked at some of the techniques that can be used alongside modern products today to securely denormalize data that may be useful to other teams within the company without access to the original data. While this may be an oversimplification of the scenario, we have extensive support for different data types and data processing operations that fit endless production cases. Please refer to our developer documentation to learn more about how your scenario can be simplified using DeltaStream.

If you are using streaming storage systems such as Apache Kafka (Confluent Cloud, AWS MSK, Redpanda or any other Apache Kafka) or AWS Kinesis, you should check out DeltaStream as the platform for processing, organizing and securing your streaming data. You can schedule a demo where you can see all these capabilities in the context of a real world streaming application.

alert-icon

Please enter a valid email address.

Request Submitted

Thank you for requesting a demo.
You will receive your login information to your email soon.