In Part 1 of our DeltaStream 101 series, we uncovered how DeltaStream connects to your existing streaming storage, Apache Kafka or Amazon Kinesis, using the DeltaStream Store. In this part of the series, we’re going to expand on that concept and use a real-life example around how you can enrich, filter, and aggregate your data between different streaming stores to simplify your product’s data needs.

As you may remember, we created a clicks stream backed by a clicks topic in an Apache Kafka cluster, and ran an aggregate query to count the number of clicks per URL and device type. In this post, we’re going to enrich the clicks stream with the user data. The user data will come from an Amazon Kinesis data stream where we will declare a users changelog on it in DeltaStream. Changelog represents a stream of upserts or deletions to our users’ information. This allows the resulting enriched stream(s) to include a registered user information as well. Using the enriched user clicks, we’re going to aggregate the number of clicks per URL and region.

This pipeline is demonstrated in Diagram 1:

Diagram 1: Query 1 enriches the clicks stream in Apache Kafka with users changelog in Amazon Kinesis, and Query 2 aggregates users clicks per region.

Accessing the Enrichment Information

First, we need to set up a store to access our Amazon Kinesis data streams:  

  1. cat ./'
  2. 'kinesis.access_key_id='[AWS access key ID]'
  3. 'kinesis.secret_access_key’='[AWS secret access key]'

The following statement creates a store named prod_kinesis with the provided configurations:

  1. CREATE STORE prod_kinesis
  2. WITH (
  3. 'type' = KINESIS,
  4. 'availability_zone'='us-east-2',
  5. 'uris'=’',
  6. 'config_file'='./'
  7. );

Once we declare the prod_kinesis store, as with any DeltaStream store, we can inspect our Kinesis data stream, users, that holds our user information by printing it as a topic:

  1. SELECT * FROM user_clicks;

Printing the users topic shows the content as followed:

  1. [
  2. {
  3. "registertime": 1665780360439,
  4. "name": "Edna Hook",
  5. "email": "[email protected]",
  6. "userid": "User_4",
  7. "regionid": "Region_6",
  8. "gender": "OTHER",
  9. "interests": [
  10. "News",
  11. "Movies"
  12. ],
  13. "contactinfo": {
  14. "phone": "6503349999",
  15. "city": "San Mateo",
  16. "state": "CA",
  17. "zipcode": "94403"
  18. }
  19. },
  20. {
  21. "registertime": 1665780361439,
  22. "name": "Shaan Gough",
  23. "email": "[email protected]",
  24. "userid": "User_6",
  25. "regionid": "Region_9",
  26. "gender": "OTHER",
  27. "interests": [
  28. "Game",
  29. "Sport"
  30. ],
  31. "contactinfo": {
  32. "phone": "6503889999",
  33. "city": "Palo Alto",
  34. "state": "CA",
  35. "zipcode": "94301"
  36. }
  37. }
  38. ]

Using the values in the data stream, we can create a changelog using the following Data Definition Language (DDL) statement. Note that we’re using the same DeltaStream Database and Schema, clickstream_db.public, that we declared in part 1 of this series for the newly declared changelog:

  2. registertime BIGINT,
  3. name VARCHAR,
  4. email VARCHAR,
  5. userid VARCHAR,
  6. regionid VARCHAR,
  7. gender VARCHAR,
  8. interests ARRAY<VARCHAR>,
  9. contactinfo STRUCT<phone VARCHAR, city VARCHAR, "state" VARCHAR, zipcode VARCHAR>,
  10. PRIMARY KEY(userid)
  11. )
  12. WITH ( 'store'='prod_kinesis', 'topic'='users', 'value.format'='json');

Every CHANGELOG defines a PRIMARY KEY as context around the changes in a changelog.

Enriching the Clicks

Let’s now use our user information to enrich the click events in the clicks stream, and publish the results back into the prod_kafka store from our previous post in this series:

  1. CREATE STREAM user_clicks
  2. WITH ('store'=’prod_kafka’)
  4. u.registertime AS user_registertime,
  5. u.userid AS uid,
  6. u.regionid AS user_regionid,
  7. u.gender AS user_gender,
  8. u.interests AS user_interests,
  9. c.event_time AS click_time,
  10. c.device_id AS device_type,
  11. c.url AS click_url,
  12. c.ip AS click_location
  13. FROM clicks c
  14. JOIN users u ON c.userid = u.userid;

Query 1: Enriching product clicks with users information to be able to expand the clicks report with region

Using just a single persistent SQL statement, Query 1, we were able to:

  • Enrich the click events by joining the clicks and users relations on the userid column from Kafka and Kinesis, respectively.
  • Project only the non-PII data from the enriched clicks stream, since we don’t want the sensitive user data to leave our Kinesis store.
  • and, write back the result of the enrichment into the prod_kafka store, while creating a new user_clicks stream backed by a Kafka topic, configured the same as the underlying topic for the clicks stream.

Since we’re joining a stream with a changelog, a temporal join is implied. In other words, click events are enriched with the correct version of the user information, updating the resulting user_clicks stream and any other downstream streams with the latest user information.

We can inspect the result of the temporal join between clicks and users using the following query:

  1. SELECT * FROM user_clicks;

Showing the following records in the user_clicks stream:

  1. [
  2. {
  3. "user_registertime": 1665780360439,
  4. "uid": "User_4",
  5. "user_regionid": "Region_6",
  6. "gender": "OTHER",
  7. "interests": [
  8. "News",
  9. "Movies"
  10. ],
  11. "click_time": 1497014222380,
  12. "device_id": "mobile",
  13. "click_url": "./home",
  14. "click_location": ""
  15. },
  16. {
  17. "user_registertime": 1665780361439,
  18. "uid": "User_6",
  19. "user_regionid": "Region_9",
  20. "gender": "OTHER",
  21. "interests": [
  22. "Game",
  23. "Sport"
  24. ],
  25. "click_time": 1497014222385,
  26. "device_id": "desktop",
  27. "click_url": "./home",
  28. "click_location": ""
  29. }
  30. ]

Clicks per URL and Region

We can now create a new persistent SQL statement, Query 2, to continuously aggregate user_clicks and count the number of clicks per URL per region, and publish the result back into our Kafka store, prod_kafka, under a new url_region_click_count relation, which is a changelog:

  1. CREATE CHANGELOG url_region_click_count
  3. click_url,
  4. user_regionid,
  5. count(*) AS url_region_click_count
  6. FROM user_clicks
  7. GROUP BY click_url, user_regionid;

Query 2: Aggregating number of clicks per URL and region

User Experience and Beyond

In this post, we looked at a case study where we enriched, transformed and aggregated data from multiple streaming storages, namely, Apache Kafka and Amazon Kinesis. We built a pipeline that was up and running in seconds, without the need for writing streaming applications that could take much longer to develop and require ongoing maintenance. Just a simple example of how DeltaStream make it possible for developers to implement complex streaming applications.