21 Nov 2022

Min Read

DeltaStream 101 Part 4 – Data Serialization Formats

So far in our DeltaStream101 blog series, we’ve covered the basics of DeltaStream as well as a couple case studies centered around creating materialized views and processing data from different streaming stores like Apache Kafka and AWS Kinesis. If you’ve missed any of the previous posts, check them out below:

You may recall from previous DeltaStream 101 posts that users create DeltaStream stores on top of streaming storage systems like Apache Kafka and AWS Kinesis. After creating a store, the data that lies within the store ultimately needs to be read and deserialized from bytes for processing. The most popular data formats are JSON, Protocol Buffers (Protobuf), and Apache Avro, all of which are supported by DeltaStream. Each of these data formats require their own serialization/deserialization logic, which can add complexity to the task of working with multiple serialization/deserialization formats. However, with DeltaStream, these complexities are handled behind the scenes, creating a streamlined development process for the user.

In this blog post, we will explore how DeltaStream seamlessly integrates with different data serialization formats, and walk through an example use case.

Imagine that you work for an eCommerce website and you have two streams of data – one that represents transactional data and another that represents customer information. The transactional data is encoded using Protobuf and the customer data is encoded using Avro. The goal is to build a real time analytics dashboard of key performance indicators so your business can visualize the most up to date information about how your products are doing in the marketplace. Let’s also assume that the service that creates the dashboard expects the data to come in JSON format. In our analytics dashboard, we want the following metrics:

  • Revenue Metric: real-time dollar sum of all transactions per hour
  • Geographic Traffic Metric: real-time count of transactions from customers by state per hour

Diagram 1: Overview of SQL pipelines
Query 1 aggregates revenue from transactions,
Query 2 enriches transactions by joining it on customers,
Query 3 aggregates transactions from enriched_transactions by state

Set Up Data Formats: Descriptors and Schema Registries

In our use case we have two streams, the Transactions stream in Protobuf format, and the Customers stream in Avro format. We’ll first cover how to set these streams up as sources for DeltaStream.

Transactions Stream

Below in Code 1a, you’ll see an example of a record from the transactions stream. A new record is created every time a customer buys some item. The record itself contains a “tx_time” timestamp of when the transaction occurred, a “tx_id” which is unique per transaction, fields describing which item was purchased and for how much, and also “customer_id” which is used to identify which customer the transaction belongs to. Code 1b shows the Protobuf message used to create the Protobuf descriptor that serializes and deserializes these transactions.

  1. {
  2. "tx_time": 1667260111428,
  3. "tx_id": "fe92644b-973a-4b65-ae4c-4b4eed23b5a0",
  4. "item_id": "Item_7",
  5. "price": 11,
  6. "quantity": 1,
  7. "customer_id": "Customer_9"
  8. }

Code 1a: An example transactions record

  1. message Transactions {
  2. int64 tx_time = 1;
  3. string tx_id = 2;
  4. string item_id = 3;
  5. int32 price = 4;
  6. int32 quantity = 5;
  7. string customer_id = 6;
  8. }

Code 1b: Protobuf message for transactions

For the Transactions stream, you can upload the Protobuf descriptor as a DeltaStream descriptor. A DeltaStream descriptor is an object that resources necessary for the streaming SQL query to serialize and deserialize data, such as a Protobuf descriptor, are uploaded to. After creating the DeltaStream descriptor, you can attach it to the relevant DeltaStream topic.

  1. CREATE DESCRIPTOR_SOURCE pb WITH (
  2. 'file' = '/path/to/protos/transactions_value.proto'
  3. );
  4.  
  5. SHOW DESCRIPTORS;
  6.  
  7. # Name
  8. # -------------------------
  9. # pb.transactions_value

Code 1c: Commands to create and show DeltaStream descriptors

Now, let’s observe the contents of our Protobuf topic before and after we attach a descriptor:

  1. PRINT transactions;
  2.  
  3. # |�����0$fe92644b-973a-4b65-ae4c-4b4eed23b5a0Item_8%H�2A(2 Customer_7
  4. # |�����0$b6c7bba9-753d-46ec-85e3-32856df574faItem_4%���@(2 Customer_4

Code 1d: Print DeltaStream topic before attaching descriptor

  1. UPDATE
  2. TOPIC transactions WITH (
  3. 'value.descriptor' = pb.transactions_value
  4. );

Code 1e: Command to update DeltaStream topic with descriptor

  1. PRINT transactions;
  2.  
  3. # | {"eventTime":"1667324708830","txId":"fe92644b-973a-4b65-ae4c-4b4eed23b5a0","itemId":"Item_9","price":8,"quantity":3,"customerId":"Customer_2"}
  4. # | {"eventTime":"1667324709830","txId":"b6c7bba9-753d-46ec-85e3-32856df574fa","itemId":"Item_4","price":13,"quantity":4,"customerId":"Customer_7"}

Code 1f: Print DeltaStream topic after attaching descriptor

Notice how in Code 1d before the DeltaStream descriptor was attached, the contents of the topic are indiscernible bytes. After the DeltaStream descriptor is attached to the topic, the contents are properly deserialized as shown in Code 1e.

Finally, we can define a stream from this topic as shown in DDL 1:

  1. CREATE STREAM transactions (
  2. tx_time BIGINT, tx_id VARCHAR, item_id VARCHAR,
  3. price INTEGER, quantity INTEGER, customer_id VARCHAR
  4. ) WITH (
  5. 'topic' = 'transactions', 'value.format' = 'PROTOBUF',
  6. 'timestamp' = 'tx_time'
  7. );

DDL 1: A DDL statement for operating on the transactions records

Customers Stream

Below in Code 2a, you’ll see an example of a record from the customers stream. The data in this stream describes information about a particular customer, and a new record is created each time a customer’s information is updated. In each record, there is a field for “update_time” which is a timestamp of when the update occurred, an “id” which maps to a unique customer, the customer’s name, and the up to date address for the customer. Code 1b shows the Avro schema used to serialize and deserialize the records in the customers stream.

  1. {
  2. "update_time": 1667260173792,
  3. "id": "Customer_1",
  4. "name": "Jill",
  5. "address": {
  6. "state": "AZ",
  7. "city": "Tucson",
  8. "zipcode": "85721"
  9. }
  10. }

Code 2a: An example customers record

  1. {
  2. "fields": [
  3. {
  4. "name": "update_time",
  5. "type": {
  6. "format_as_time": "unix_long",
  7. "type": "long"
  8. }
  9. },
  10. {
  11. "name": "id",
  12. "type": {
  13. "type": "string"
  14. }
  15. },
  16. {
  17. "name": "name",
  18. "type": {
  19. "type": "string"
  20. }
  21. },
  22. {
  23. "name": "address",
  24. "type": {
  25. "type": "record",
  26. "name": "addressUSRecord",
  27. "fields": [
  28. {
  29. "name": "state",
  30. "type": "string"
  31. },
  32. {
  33. "name": "city",
  34. "type": "string"
  35. },
  36. {
  37. "name": "zipcode",
  38. "type": "string"
  39. }
  40. ]
  41. }
  42. }
  43. ],
  44. "name": "customers",
  45. "namespace": "deltastream",
  46. "type": "record"
  47. }

Code 2b: Avro schema for customers

For serialization of Avro records, it’s common to use a schema registry for storing Avro schemas. DeltaStream makes it easy to integrate with an external schema registry. You can import a schema registry by providing a name, the type of the schema registry, and any required connectivity related configuration. The imported schema registry is then attached to a store, so any data from that store can be serialized and deserialized by the schemas in the configured schema registry. Note that even though a schema registry is attached to the store, it operates at the topic level. This means a store can contain both topics with data formats that require the schema registry, and topics that don’t, such as a JSON schema external to the store’s schema registry. The schema registry will simply be used for the topics that require it and ignored for the other topics. Currently, we require a schema registry to be used if your data is serialized with Avro.

  1. CREATE SCHEMA_REGISTRY sr WITH (
  2. 'type' = CONFLUENT_CLOUD, 'availability_zone' = 'us-east-1',
  3. 'uris' = 'https://abcd-efghi.us-east-2.aws.confluent.cloud',
  4. 'confluent_cloud.key' = 'fake_key',
  5. 'confluent_cloud.secret' = 'fake_secret'
  6. );

Code 2c: Command to create DeltaStream schema registry

  1. PRINT customers;
  2.  
  3. # ��Customer_5 | �����ΆaCustomer_Jane
  4. # cstateNcitySanta Fezipcode
  5. # 87505
  6. # ��Customer_4 | �����ΆaCustomer_Jill
  7. # stateCcity
  8. # Irvinezipcode
  9. # 92612

Code 2d: Print DeltaStream topic before attaching schema registry

  1. UPDATE STORE kafkastore
  2. WITH ('schema_registry.name' = sr);

Code 2e: Command to update DeltaStream store with schema registry

  1. PRINT customers;
  2.  
  3. # {"id":"Customer_1"} | {"update_time":1667335058024,"id":"Customer_1","name":"Jane","address":{"zipcode":"92612","city":"Irvine","state":"CA"}}
  4. # {"id":"Customer_5"} | {"update_time":1667335059489,"id":"Customer_5","name":"Jane","address":{"zipcode":"87505","city":"Santa Fe","state":"NM"}}

Code 2f: Print DeltaStream topic after attaching schema registry

Similar to how we needed the DeltaStream descriptor to deserialize data in transactions, note how the schema registry must be attached to the store to properly deserialize data in customers. In Code 2d, before the schema registry is added to the store, the records in the customers topic are indiscernible. After updating the store with the schema registry, we can see the contents are properly deserialized as shown in Code 2e.

Since the customers data is really keyed data, where information is updated per customer “id”, it makes sense to create a changelog on this topic. We can define that changelog and specify “id” as the primary key as shown in DDL 2:

  1. CREATE CHANGELOG customers (
  2. update_time BIGINT,
  3. id VARCHAR,
  4. "name" VARCHAR,
  5. address STRUCT < "state" VARCHAR,
  6. city VARCHAR,
  7. zipcode VARCHAR >,
  8. PRIMARY KEY(id)
  9. ) WITH (
  10. 'topic' = 'customers', 'value.format' = 'AVRO',
  11. 'timestamp' = 'update_time'
  12. );

DDL 2: A changelog DDL to capture the latest customer information from the customers topic

Revenue Metric: real-time dollar sum of all transactions per hour

For our hourly dollar sum revenue metric, we need to perform a windowed aggregation on the transactions data. We’ve already created and attached a descriptor for this topic and a stream relation that represents this topic. From there, in Query 1, we can aggregate dollar sums of all transactions by simply writing a short SQL query:

  1. CREATE stream hourly_revenue WITH (‘value.format’ = ’json’) AS
  2. SELECT
  3. window_start,
  4. window_end,
  5. SUM(price * quantity) AS revenue
  6. FROM
  7. tumble(transactions, SIZE 1 hour)
  8. GROUP BY
  9. window_start,
  10. window_end;

Query 1: Aggregation of hourly revenue from transactions stream

By default, a new stream created from an existing stream will inherit the properties of the source stream. However, in this query we specify ’value.format’=’json’ in the WITH clause, which signals the output stream to serialize its records as JSON.

We can inspect the results of the new stream using an interactive query, which prints the results to our console:

  1. SELECT * FROM hourly_revenue;
  2.  
  3. # | {"window_start":"2022-11-01T21:18:40","window_end":"2022-11-01T22:18:40","revenue":1459}
  4. # | {"window_start":"2022-11-01T21:18:50","window_end":"2022-11-01T22:18:50","revenue":2232}

Geographic Traffic Metric: real-time count of transactions from customers by state per hour

The customers stream defined earlier, provides helpful information to compute geographic traffic information for customers using our eCommerce website, but we also need the transactions stream to generate the number of transactions per state.

We can achieve this by joining the transactions on to the customers information. The persistent SQL statement in Query 2 shows how we can enrich the transactions stream as intended:

  1. CREATE stream transactions_enriched WITH (
  2. 'value.format' = 'json', 'timestamp' = 'tx_time'
  3. ) AS
  4. SELECT
  5. transactions.tx_time,
  6. transactions.tx_id,
  7. transactions.item_id,
  8. transactions.price,
  9. transactions.quantity,
  10. transactions.customer_id,
  11. customers.name AS "name",
  12. customers.address
  13. FROM
  14. transactions
  15. join customers ON transactions.customer_id = customers.id;

Query 2: Enrich transaction records with customers information for the metrics by state

Running a simple interactive query to inspect the results, we can see the enriched stream includes the customer address information with the transaction information:

  1. SELECT * FROM transactions_enriched;
  2.  
  3. # | {"tx_time":1667433311829,"tx_id":"fe92644b-973a-4b65-ae4c-4b4eed23b5a0","item_id":"Item_5","price":7,"quantity":3,"customer_id":"Customer_1","name":"Jill","address":{"state":"CA","city":"San Mateo","zipcode":"94401"}}

Using Query 2, we were able to join transactions in Protobuf format with customers in Avro format, and write the result to transactions_enriched in JSON format without worrying about what the format requirements are for each of the source or destination relations. Now that we have the transactions_enriched stream, we can perform a simple aggregation to produce our desired metric:

  1. CREATE stream hourly_tx_count_by_state AS
  2. SELECT
  3. window_start,
  4. window_end,
  5. count(tx_id) AS tx_count,
  6. address -> state AS "state"
  7. FROM
  8. tumble (
  9. transactions_enriched, SIZE 1 hour
  10. )
  11. GROUP BY
  12. window_start,
  13. window_end,
  14. address -> state;

Query 3: Aggregation of hourly count of unique transactions by state

Inspecting the records in the hourly_tx_count_by_state relation, we can see aggregate transaction counts separated by state and for what time window:

  1. SELECT * FROM hourly_tx_count_by_state;
  2.  
  3. # | {"window_start":"2022-11-02T22:58:40","window_end":"2022-11-02T23:58:40","tx_count":512,"state":"AZ"}
  4. # | {"window_start":"2022-11-02T22:58:40","window_end":"2022-11-02T23:58:40","tx_count":330,"state":"NM"}
  5. # | {"window_start":"2022-11-02T22:58:40","window_end":"2022-11-02T23:58:40","tx_count":956,"state":"CA"}

Conclusion

In this post, we demonstrated how DeltaStream makes it easy to work with different serialization formats, whether you need to attach descriptors that describe your data, or link your schema registry. The above example demonstrated how a user can easily set up pipelines in minutes to transform data from one format to another, or to join data available in different data formats. DeltaStream eliminates the need or complexity of managing streaming applications and dealing with complicated serialization or deserialization logic so the user can focus on what matters most: writing easy-to-understand SQL queries and generating valuable data for real-time insights or features.

Expect more blog posts in the coming weeks as we showcase more of DeltaStream’s capabilities for a variety of use cases. Meanwhile, if you want to try this yourself, you can request a demo.

01 Nov 2022

Min Read

DeltaStream 101 Part 3 – Enriching Apache Kafka Topics with Amazon Kinesis Data Streams

In Part 1 of our DeltaStream 101 series, we uncovered how DeltaStream connects to your existing streaming storage, Apache Kafka or Amazon Kinesis, using the DeltaStream Store. In this part of the series, we’re going to expand on that concept and use a real-life example around how you can enrich, filter, and aggregate your data between different streaming stores to simplify your product’s data needs.

As you may remember, we created a clicks stream backed by a clicks topic in an Apache Kafka cluster, and ran an aggregate query to count the number of clicks per URL and device type. In this post, we’re going to enrich the clicks stream with the user data. The user data will come from an Amazon Kinesis data stream where we will declare a users changelog on it in DeltaStream. Changelog represents a stream of upserts or deletions to our users’ information. This allows the resulting enriched stream(s) to include a registered user information as well. Using the enriched user clicks, we’re going to aggregate the number of clicks per URL and region.

This pipeline is demonstrated in Diagram 1:

Diagram 1: Query 1 enriches the clicks stream in Apache Kafka with users changelog in Amazon Kinesis, and Query 2 aggregates users clicks per region.

Accessing the Enrichment Information

First, we need to set up a store to access our Amazon Kinesis data streams:  

  1. cat ./kinesis.properties'
  2. 'kinesis.access_key_id='[AWS access key ID]'
  3. 'kinesis.secret_access_key’='[AWS secret access key]'

The following statement creates a store named prod_kinesis with the provided configurations:

  1. CREATE STORE prod_kinesis
  2. WITH (
  3. 'type' = KINESIS,
  4. 'availability_zone'='us-east-2',
  5. 'uris'=’https://kinesis.us-east-2.amazonaws.com:443',
  6. 'config_file'='./kinesis.properties'
  7. );

Once we declare the prod_kinesis store, as with any DeltaStream store, we can inspect our Kinesis data stream, users, that holds our user information by printing it as a topic:

  1. SELECT * FROM user_clicks;

Printing the users topic shows the content as followed:

  1. [
  2. {
  3. "registertime": 1665780360439,
  4. "name": "Edna Hook",
  5. "email": "[email protected]",
  6. "userid": "User_4",
  7. "regionid": "Region_6",
  8. "gender": "OTHER",
  9. "interests": [
  10. "News",
  11. "Movies"
  12. ],
  13. "contactinfo": {
  14. "phone": "6503349999",
  15. "city": "San Mateo",
  16. "state": "CA",
  17. "zipcode": "94403"
  18. }
  19. },
  20. {
  21. "registertime": 1665780361439,
  22. "name": "Shaan Gough",
  23. "email": "[email protected]",
  24. "userid": "User_6",
  25. "regionid": "Region_9",
  26. "gender": "OTHER",
  27. "interests": [
  28. "Game",
  29. "Sport"
  30. ],
  31. "contactinfo": {
  32. "phone": "6503889999",
  33. "city": "Palo Alto",
  34. "state": "CA",
  35. "zipcode": "94301"
  36. }
  37. }
  38. ]

Using the values in the data stream, we can create a changelog using the following Data Definition Language (DDL) statement. Note that we’re using the same DeltaStream Database and Schema, clickstream_db.public, that we declared in part 1 of this series for the newly declared changelog:

  1. CREATE CHANGELOG users (
  2. registertime BIGINT,
  3. name VARCHAR,
  4. email VARCHAR,
  5. userid VARCHAR,
  6. regionid VARCHAR,
  7. gender VARCHAR,
  8. interests ARRAY<VARCHAR>,
  9. contactinfo STRUCT<phone VARCHAR, city VARCHAR, "state" VARCHAR, zipcode VARCHAR>,
  10. PRIMARY KEY(userid)
  11. )
  12. WITH ( 'store'='prod_kinesis', 'topic'='users', 'value.format'='json');

Every CHANGELOG defines a PRIMARY KEY as context around the changes in a changelog.

Enriching the Clicks

Let’s now use our user information to enrich the click events in the clicks stream, and publish the results back into the prod_kafka store from our previous post in this series:

  1. CREATE STREAM user_clicks
  2. WITH ('store'=’prod_kafka’)
  3. AS SELECT
  4. u.registertime AS user_registertime,
  5. u.userid AS uid,
  6. u.regionid AS user_regionid,
  7. u.gender AS user_gender,
  8. u.interests AS user_interests,
  9. c.event_time AS click_time,
  10. c.device_id AS device_type,
  11. c.url AS click_url,
  12. c.ip AS click_location
  13. FROM clicks c
  14. JOIN users u ON c.userid = u.userid;

Query 1: Enriching product clicks with users information to be able to expand the clicks report with region

Using just a single persistent SQL statement, Query 1, we were able to:

  • Enrich the click events by joining the clicks and users relations on the userid column from Kafka and Kinesis, respectively.
  • Project only the non-PII data from the enriched clicks stream, since we don’t want the sensitive user data to leave our Kinesis store.
  • and, write back the result of the enrichment into the prod_kafka store, while creating a new user_clicks stream backed by a Kafka topic, configured the same as the underlying topic for the clicks stream.

Since we’re joining a stream with a changelog, a temporal join is implied. In other words, click events are enriched with the correct version of the user information, updating the resulting user_clicks stream and any other downstream streams with the latest user information.

We can inspect the result of the temporal join between clicks and users using the following query:

  1. SELECT * FROM user_clicks;

Showing the following records in the user_clicks stream:

  1. [
  2. {
  3. "user_registertime": 1665780360439,
  4. "uid": "User_4",
  5. "user_regionid": "Region_6",
  6. "gender": "OTHER",
  7. "interests": [
  8. "News",
  9. "Movies"
  10. ],
  11. "click_time": 1497014222380,
  12. "device_id": "mobile",
  13. "click_url": "./home",
  14. "click_location": "12.12.12.12"
  15. },
  16. {
  17. "user_registertime": 1665780361439,
  18. "uid": "User_6",
  19. "user_regionid": "Region_9",
  20. "gender": "OTHER",
  21. "interests": [
  22. "Game",
  23. "Sport"
  24. ],
  25. "click_time": 1497014222385,
  26. "device_id": "desktop",
  27. "click_url": "./home",
  28. "click_location": "12.12.12.12"
  29. }
  30. ]

Clicks per URL and Region

We can now create a new persistent SQL statement, Query 2, to continuously aggregate user_clicks and count the number of clicks per URL per region, and publish the result back into our Kafka store, prod_kafka, under a new url_region_click_count relation, which is a changelog:

  1. CREATE CHANGELOG url_region_click_count
  2. AS SELECT
  3. click_url,
  4. user_regionid,
  5. count(*) AS url_region_click_count
  6. FROM user_clicks
  7. GROUP BY click_url, user_regionid;

Query 2: Aggregating number of clicks per URL and region

User Experience and Beyond

In this post, we looked at a case study where we enriched, transformed and aggregated data from multiple streaming storages, namely, Apache Kafka and Amazon Kinesis. We built a pipeline that was up and running in seconds, without the need for writing streaming applications that could take much longer to develop and require ongoing maintenance. Just a simple example of how DeltaStream make it possible for developers to implement complex streaming applications.

21 Sep 2022

Min Read

DeltaStream 101 Part 2 – Always Up-to-date Materialized Views for Kafka and Kinesis

If you recall in DeltaStream 101 Part 1, we introduced DeltaStream, a serverless stream processing platform to manage, secure and process all your streams on cloud, and walked through a simple clickstream analytics use case.

In this post, we will continue to build on that base. Here, we’ll walk through how you can build materialized views that are continuously updated based on the results of the streaming queries in our previous post, and serve the results of those views to a user on a web page. The event streams that we begin with to ultimately wind up with data in DeltaStream could be built in popular event stream storage platforms such as Apache Kafka, Confluent Cloud, Amazon MSK, or Amazon Kinesis.

This is a sample of a web page built with materialized views in DeltaStream to serve user statistics to a visitor.

Before we dive in, what is a materialized view? In short, a materialized view is the result of a query, stored as a table. Sounds simple enough. But in a database built for streaming data, queries must produce the most up-to-date results in a real-time manner whenever called, so creating and updating a materialized view becomes more complex. Fortunately, DeltaStream takes care of all of these concerns under the hood, serving results with sub second latency. For the DeltaStream user, everything behind creating materialized views for streaming data looks like familiar SQL. The following figure shows how continuous queries in DeltaStream can build Materialized Views from a stream of events in Apache Kafka.

Materialized View #1: Number of times a URL has been visited

Let’s take a look at our first materialized view. Here, we are using the queries from our previous post to create a materialized view that represents the number of times every url has been visited. While this looks like standard SQL, the fact is, if the user visited that URL only a half second ago, it will be represented in our materialized view.

  1. CREATE MATERIALIZED VIEW url_visit_count AS
  2. SELECT
  3. url,
  4. count(*) AS url_visit_count
  5. FROM
  6. clicks_dev
  7. GROUP BY
  8. url;

Once we create the materialized view in DeltaStream, we can query it for the latest result the same way we would query a table in a relational database. For instance, the following query returns the number of times a url with address “./home” has been visited:

  1. SELECT
  2. *
  3. FROM
  4. url_visit_count
  5. WHERE
  6. url = './home';

With another query, we could find the url with the most number of views. This can be computed easily using the following query on the materialized view. Note that, again, since our materialized view continuously updates as click events are received, the result of this query will be the accurate real-time value.

  1. SELECT
  2. url,
  3. url_visit_count
  4. FROM
  5. url_visit_count
  6. ORDER BY
  7. url_visit_count
  8. LIMIT 1;

Materialized View #2: Number of visits for each URL via every device type

Now we will build another materialized view. This time we want to build a view to store the number of visits a url has on different devices. If you recall in our previous post, we had filtered events with a desktop device ID and computed the number of events per url and device ID using a continuous query. Here is how we turn that into a materialized view that updates with very low latency with DeltaStream.

  1. CREATE MATERIALIZED VIEW url_device_visit_count AS
  2. SELECT
  3. url,
  4. device_id,
  5. count(*) as url_device_visit_count
  6. FROM
  7. clicks_dev
  8. GROUP BY
  9. url, device_id;

Materialized View #3: Number of times a user visited a url on each device

And finally, we want to create a materialized view that we can query to know how many times a user visited the website on each device. For example, we may want to see how many times a given user visited various urls in the website on laptops, mobile devices, and tablets. Below is the SQL we can use to create this materialized view in DeltaStream.

  1. CREATE MATERIALIZED VIEW user_device_visit_count AS
  2. SELECT
  3. user_id,
  4. device_id,
  5. count(*) as user_device_visit_count
  6. FROM
  7. clicks_dev
  8. GROUP BY
  9. user_id, device_id;

Similar to the above examples, once we create the materialized view in DeltaStream, it is ready for querying. Here is another example of querying the last materialized view we created to get the number of visits the user with userid of ‘User_9’ from mobile devices had.

  1. SELECT
  2. user_id,
  3. user_device_visit_count
  4. FROM
  5. user_device_visit_count
  6. WHERE
  7. user_id = 'User_9' AND device_id = 'mobile';

I hope you enjoyed these examples so far of how you can use DeltaStream to go from raw event streaming to materialized views that can serve results to a web page for the latest, absolutely accurate results. In future posts, we’ll cover more capabilities for building, managing and securing real-time applications and pipelines. In the meantime, if you want to try this yourself please request a demo.

22 Aug 2022

Min Read

DeltaStream 101

DeltaStream is a serverless stream processing platform to manage, secure and process all your streams on cloud. One of the main goals of DeltaStream is to make stream processing fast and easy by providing a familiar SQL interface to build real time streaming applications, pipelines and materialized views and eliminating the complexity of running infrastructure for such applications.

This is the first blog post in a series where we will show some of the capabilities and features of DeltaStream platform through real world use cases. In this post, we will use a simple clickstream analytics use case.

Imagine we have a stream of click events continuously appended to a topic named ‘clicks’ in our production Kafka cluster. The following is a sample click event in JSON format:

  1. {
  2. "event_time": 1658710466005,
  3. "device_id": "mobile",
  4. "user_id": "User_16",
  5. "url": "./home",
  6. "ip": "12.12.12.12"
  7. }

Let’s assume we have a new project to build where we will compute different metrics over the click events in real time. We prefer to build our project on a separate Kafka cluster since we don’t want to make any changes or write any data into the production Kafka cluster while developing our new application. Also let’s assume our Kafka clusters are on Confluent Cloud and DeltaStream can access these clusters through their public endpoints (in future posts we will show how to configure private-link for such connectivity). And finally, we want to have the results in protobuf format.

DeltaStream has a REST API with GraphQL where users can interact with the service. We provide a Web-based client along with a CLI client, however, users can also directly interact with the service through the REST API. In this blog we will use the DeltaStream CLI and we assume we have already logged into the service.

For our clickstream project we will:

  • Replicate the clicks events from the production Kafka cluster into the development Kafka cluster along with the following changes:
    • Convert the event_time to timestamp type
    • Drop ip field
    • Filter events that have desktop device id
    • Convert the format to protobuf
  • Compute the number of events per url using a continuous query
  • Compute the number of events per url and device id using a continuous query

The following figure depicts a high level overview of what we plan to accomplish. Query1 will perform the first item above, Query2 and query3 will perform the second and third bullet points above.

Create stores

The first step in DeltaStream to access your data in your streaming storage service such as Kafka cluster is to declare stores. This can be done using the create store statement. In our project, we have two Kafka clusters, so we will declare two stores in DeltaStream. A store in DeltaStream is an abstraction that represents a streaming storage service such as Apache Kafka cluster or AWS Kinesis. Note that DeltaStream does not create a new Kafka cluster in this case and simply defines a store for an existing Kafka cluster. Once you define a store, you will be able to explore the content of the store and inspect data stored there.

The following statements declare our production Kafka cluster. A store has a name along with the metadata that will be used to access the store.

  1. $ cat ./confluent_cloud.properties
  2. 'kafka.sasl.hash_function'=PLAIN
  3. 'kafka.sasl.username'='[cluster API key]'
  4. 'kafka.sasl.password'='[cluster API secret]'
  5.  
  6. <no-db>/<no-store>$ create store prod_kafka WITH (
  7. 'type' = KAFKA,
  8. 'availability_zone'='us-east-1',
  9. 'uris'='pkcxxxxxx.gcp.confluent.cloud:9092',
  10. 'config_file'='./confluent_cloud.properties'
  11. );

We also need to declare a store for our development Kafka cluster.

  1. <no-db>/prod_kafka $ create store dev_kafka WITH (
  2. 'type' = KAFKA,
  3. 'availability_zone'='us-east-1',
  4. 'uris'='pkcxxxxxx.gcp.confluent.cloud:9092',
  5. 'config_file'='./confluent_cloud_dev.properties'
  6. );

Now that we have declared our stores, we can inspect them. In the case of Kafka stores, for instance, we can list topics, create and delete topics with desired partitions and replication factors and print the content of topics. Note that we can only perform these operations on a given store if the authentication and authorization we provided while declaring the store has enough permissions to perform these operations.

As an example, the following figure shows how we can list the topics in the production Kafka cluster and print the content of the clicks topic.

  1. <no-db>/prod_kafka$ show TOPICS;
  2. Topic name
  3. --------------
  4. clicks
  5. pageviews
  6. userid
  7. <no-db>/cc$ PRINT TOPIC clicks;
  8. | {"event_time":1497014222380,"device_id":"mobile","user_id":"User_16","url":"./home","ip","12.12.12.12"}
  9. | {"event_time":1497014222385,"device_id":"desktop","user_id":"User_18","url":"./home","ip","12.12.12.12"}
  10. | {"event_time":1497014222390,"device_id":"mobile","user_id":"User_1","url":"./home","ip","12.12.12.12"}

Once we have our stores declared and tested, we can go to the next step where we will use the relational capabilities of DeltaStream to build our clickstream analysis application.

Create databases and streams

Similar to other relational databases, DeltaStream uses databases and schemas to organize relational entities such as streams. The first step of using relational capabilities of DeltaStream is to create a database. For our clickstream analysis application, we create a new database using the following statement.

  1. CREATE DATABASE clickstream_db;

Similar to other relational databases, DeltaStream creates a default schema named ‘public’ when a database is created. Once we create the first database in the DeltaStream, it will be the default database. Now we can create a stream for our source topic which is in our production Kafka cluster. The following statement is a DDL statement that declares a new stream over the clicks topic in prod_kafka store.

  1. CREATE STREAM clicks
  2. (
  3. event_time BIGINT,
  4. device_id VARCHAR,
  5. user_id VARCHAR,
  6. url VARCHAR,
  7. ip VARCHAR
  8. )
  9. WITH (
  10. 'store' = 'prod_kafka',
  11. 'topic'=clicks,
  12. 'value.format'='JSON'
  13. );

Queries

Once we declare a stream over a topic we will be able to build our application by writing continuous queries to process the data in real time.

The first step is to transform and replicate the clicks data from the production Kafka cluster into the development Kafka cluster. In DeltaStream, this can be easily done with a simple query as the following.

  1. CREATE STREAM clicks_dev
  2. WITH (
  3. ‘store’ = ‘dev_kafka’,
  4. 'value.format'='protobuf'
  5. ) AS
  6. SELECT
  7. toTimestamp(event_time) AS event_timestamp,
  8. device_id,
  9. user_id,
  10. url
  11. FROM clicks
  12. WHERE device_id <> 'desktop';

The above query creates a new stream backed by a topic named clicks_dev in the dev_kafka cluster and continuously reads the clicks events from the clicks stream in the production Kafka cluster and apply the transformations, projection and filtering and writes the results into the clicks_dev stream.

Now that we have the clicks_dev we can write aggregate queries and build the results of these queries in the dev_kafka Kafka cluster. The first query will create a CHANGELOG that continuously computes and updates the number of events per url.

  1. CREATE CHANGELOG click_count_per_url AS
  2. SELECT
  3. url,
  4. count(*) as url_visit_count
  5. FROM clicks_dev
  6. GROUP BY url;

Finally, the following query computes the number of visits per url and per device.

  1. CREATE CHANGELOG click_count_per_url_per_device AS
  2. SELECT
  3. url,
  4. device_id,
  5. count(*) as url_device_visit_count
  6. FROM clicks_dev
  7. GROUP BY url, device_id;

In this blog post, we showed how you can build a simple clickstream analytics application using DeltaStream. We showed DeltaStream’s capabilities of reading from and writing into different streaming data stores and how easily you can build stream processing applications with a few SQL statements. This is the first blog post in a series where we will show some of the capabilities and features of DeltaStream platform through real world use cases. In future posts we will show more capabilities of DeltaStream in building, managing and securing real-time applications and pipelines. In the meantime, if you want to try this yourself please request a demo.

alert-icon

Please enter a valid email address.

Request Submitted

Thank you for requesting a demo.
You will receive your login information to your email soon.