27 Apr 2023
Min Read
On-time Delivery Shipments using interval Left Join
On-time deliveries can make or break a business. When orders are dropped, it can be extremely frustrating for customers and can tarnish the brand of the supplier. However, the reality is that on-time shipping is challenging and mistakes are bound to happen. With DeltaStream, you easily build an application in minutes to detect unshipped orders in real-time. This way, missing orders can be caught ASAP instead of customers calling and complaining that they haven’t received their goods.
In DeltaStream 101, we demonstrated how to add DeltaStream Stores, create Streams, and write queries. In this blog post, we will use these core concepts to demonstrate how a DeltaStream user can easily join two Streams, `Orders` and `Shipments`, to ensure that delivery orders are shipped on time. If a specific order is not shipped within 30 minutes, that order will be flagged to indicate that it needs a closer look.
Kafka Store Topics for E-commerce
Assume we have created a Kafka Store called `KafkaStore` with two topics – `Orders` and `Shipments`. We can observe the contents for each of these topics by printing them:
A record in the `Orders
` topic
A record in the `Shipments
` topic
When an item is ordered, an event is logged in the `Orders` topic with an `orderID` that is unique for that order. When that item is shipped, a corresponding event is logged in the `Shipments` topic that has the same `orderID` value in its payload. The `orderID` field is used to associate an order with its shipment and will be used later on for the join condition in our join query. Suppose for this use case that we expect every order to be shipped within 30 minutes. If an order is not shipped within that time frame, it indicates that something is wrong and someone should be notified.
Define Relations, Creating Data Streams
As a first step, let’s create Streams for our `Orders` and `Shipments` topics that we will later use as sources to our queries. We will name the Streams the same name as the topics that back them.
DDL Statement to create the `Orders` Stream
DDL Statement to create the `Shipments` Stream
The Data Definition Language (DDL) statements above are describing the columns and column types of the data we are creating a Stream on. This metadata, along with the `value.format` provided in the query’s `WITH` clause, allows us to properly deserialize the data coming from the physical Kafka topics. Note that also in the query’s `WITH` clause, we specify a column for our `timestamp` property. The `timestamp` property signals which value should be used as the event time in our queries, and if it’s left unspecified then the record’s timestamp will be used by default (i.e. the Kafka record’s timestamp in this case). We also specify the `topic` property which associates each Stream with the physical Kafka topic matching the name of the property’s value. If `topic` isn’t provided, the topic matching the name of the Stream will be used. So in our first DDL for example, specifying the `topic` property value of ‘Orders’ is redundant, but included for extra clarity.
Joining Data Streams with the interval Left Join
Now that we have created our source Streams, the next step is to create an output Stream that joins the orders with the shipments so we can see which orders were shipped on time and which orders weren’t. We can achieve this with a `CREATE STREAM AS SELECT` query (CSAS), where we create a new Stream `OrdersShipmentsJoin` by writing a query that selects fields from our sources. In our query, we will use an interval `LEFT JOIN` to join `Orders` on `Shipments` so that we can access fields from both Streams. The interval in this case is 30 minutes, as defined in the problem statement. Note that we are using a `LEFT JOIN` so that the output includes all `Orders` records, regardless of if there is a matching `Shipments` record to join with.
CREATE STREAM "OrdersShipmentsJoin" AS SELECT * FROM "Orders" o LEFT JOIN "Shipments" s WITHIN 30 minutes ON o."orderID" = s."orderID";
CSAS 1: query that joins orders with their corresponding shipments within 30 minutes of the order_time
Filter out Orders with Late or Missing Shipments using SQL
For this use case, we are specifically interested in `Orders` records that didn’t have a matching `Shipments` record within 30 minutes of their event times. We can filter for this condition using a `WHERE` clause from the `OrdersShipmentsJoin` Stream that we’ve just created, and output the results to another Stream called `OrdersNotShipped`.
CSAS 2: query that filters out any orders that were properly shipped so we are left with a Stream of orders that didn’t ship or didn’t ship on time
The new `OrdersNotShipped` Stream that we’ve created is essentially a real-time log of all orders that did not ship on time. Other fields such as `customerID`, `itemID`, and `destination_zipcode` are provided in the payload to make it easier to track what went wrong with the shipment and inform the customer if needed. Let’s observe how this query behaves with some example inputs and outputs.
Sample Input Records in `Orders` Topic:
{ "order_timestamp": "2023-03-02T13:53:22", "orderID": "Order_1", "customerID": "customer_32", "price": "12.31", "itemID": "7740485A-836A-447F-96F9-DBBD00E92387", "destination_address": { "number": 2555, "street": "University", "city": "Palo Alto", "state": "CA", "zipcode": "94500" } } { "order_timestamp": "2023-03-02T14:00:17", "orderID": "Order_2", "customerID": "customer_851", "price": "189.12", "itemID": "80D2C068-CDFA-431C-9A33-079792A8D95C", "destination_address": { "number": 1660, "street": "Johnson RD NW", "city": "Atlanta", "state": "GA", "zipcode": "30318" } } { "order_timestamp": "2023-03-02T14:12:44", "orderID": "Order_3", "customerID": "customer_192", "price": "562.11", "itemID": "98E090E3-756B-4A06-8E4E-15DF9026295E", "destination_address": { "number": 400, "street": "Whittier St", "city": "Columbus", "state": "OH", "zipcode": "43215" } } { "order_timestamp": "2023-03-02T15:13:29", "orderID": "Order_4", "customerID": "customer_621", "price": "298.37", "itemID": "7F38AA51-C018-41E5-8EBD-57B77DEFB2D1", "destination_address": { "number": 2050, "street": "River Rd", "city": "Louisville", "state": "KY", "zipcode": "40206" } }
Sample Input Records in `Shipments` Topic:
{ "shipment_timestamp": "2023-03-02T14:08:43", "orderID": "Order_1", "shipment_center_ID": 7 } { "shipment_timestamp": "2023-03-02T14:45:17", "orderID": "Order_2", "shipment_center_ID": 3 } { "shipment_timestamp": "2023-03-02T15:20:10", "orderID": "Order_4", "shipment_center_ID": 3 }
Sample Output Records in `OrdersShipmentsJoin` Topic:
{ "order_timestamp": "2023-03-02T13:53:22", "o_orderID": "Order_1", "customerID": "customer_32", "price": 12.31, "itemID": "7740485A-836A-447F-96F9-DBBD00E92387", "destination_address": { "number": 2555, "street": "University", "city": "Palo Alto", "state": "CA", "zipcode": "94500" }, "shipment_timestamp": "2023-03-02T14:08:43", "s_orderID": "Order_1", "shipment_center_ID": 7 } { "order_timestamp": "2023-03-02T15:33:29", "o_orderID": "Order_4", "customerID": "customer_621", "price": 298.37, "itemID": "7F38AA51-C018-41E5-8EBD-57B77DEFB2D1", "destination_address": { "number": 2050, "street": "River Rd", "city": "Louisville", "state": "KY", "zipcode": "40206" }, "shipment_timestamp": "2023-03-02T15:40:10", "s_orderID": "Order_4", "shipment_center_ID": 3 } { "order_timestamp": "2023-03-02T14:00:17", "o_orderID": "Order_2", "customerID": "customer_851", "price": 189.12, "itemID": "80D2C068-CDFA-431C-9A33-079792A8D95C", "destination_address": { "number": 1660, "street": "Johnson RD NW", "city": "Atlanta", "state": "GA", "zipcode": "30318" }, "shipment_timestamp": null, "s_orderID": null, "shipment_center_ID": null } { "order_timestamp": "2023-03-02T14:12:44", "o_orderID": "Order_3", "customerID": "customer_192", "price": 562.11, "itemID": "98E090E3-756B-4A06-8E4E-15DF9026295E", "destination_address": { "number": 400, "street": "Whittier St", "city": "Columbus", "state": "OH", "zipcode": "43215" }, "shipment_timestamp": null, "s_orderID": null, "shipment_center_ID": null }
Sample Output Records in `OrdersNotShipped` Topic:
{ "order_timestamp": "2023-03-02T14:00:17", "orderID": "Order_2", "customerID": "customer_851", "itemID": "80D2C068-CDFA-431C-9A33-079792A8D95C", "Destination_zipcode": "30318" } { "order_timestamp": "2023-03-02T14:12:44", "orderID": "Order_3", "customerID": "customer_192", "itemID": "98E090E3-756B-4A06-8E4E-15DF9026295E", "Destination_zipcode": "43215" }
Conclusion
In this blog post, we’ve demonstrated how using DeltaStream, you can easily build a real-time application in minutes using easy to understand SQL queries. Specifically, we were able to solve the use case of determining if an order has been shipped on time by using a continuous interval join. Further, by using a left outer join, we were able to then filter for unmatched results – orders that didn’t have a matching shipment. In this case, this kind of application is invaluable for setting up downstream alerting so the right team can be notified right away when an order isn’t shipped or isn’t shipped on time, indicating a problem with the order or a problem with the shipping facility.
Expect more blog posts in the coming weeks as we showcase more of DeltaStream’s capabilities for a variety of use cases. Meanwhile, if you want to try this yourself, you can request a demo.