02 Jul 2024

Min Read

A Guide to RBAC vs ABAC vs ACL

Access control is necessary for data platforms to securely share data. In order for users to confidently share their data resources with the intended parties, access control should be easy to understand and scalable, especially as more data objects and more users are added. Without a sensible access control model, users have a higher risk of inadvertently sharing data objects with the wrong parties and failing to realize incorrect permissions. Choosing the right access control model depends heavily on the use case, so it’s important to understand the benefits and drawbacks of popular options.

In this post, we’ll cover three different access control models: access control lists (ACL), role-based access control (RBAC), and attribute-based access control (ABAC). We’ll discuss what they are, their pros and cons, and what to consider when choosing an access control model.

Access Control List (ACL)

An ACL is a list of permissions for a particular resource and is the simplest of the access control models that we’ll cover. When a user attempts an action on a resource, such as a read or write, the ACL associated with that resource is used to allow or deny the attempt. In order to add or remove permissions to a resource, an entry in the ACL is either added or deleted. ACLs are a simple model that are easy to understand and implement, however they can be difficult to manage when there are many users and resources as these lists can grow quickly.

To illustrate how ACLs work, let’s consider an example of a university with professors, teaching assistants, and students:

  • Students are able to submit assignments and view their grades
  • Teaching assistants are able to grade assignments
  • Professors are able to grade assignments and view student grades

As you can see from the diagram, each individual is given specific permissions for what they’re able to do. If another student were to join, the ACL would need to be updated to grant the new student privilege to submit assignments and view their grades.

Pros:

  • Simple and easy to understand: User privileges for a particular resource are stated plainly in a list.
  • Allows for fine-grained access control to resources: ACLs typically allow different types of access to be defined (i.e. read, write, share).

Cons:

  • Does not scale well: As more users, user groups, and resources are added, access must be individually specified in ACLs each time.
  • Low visibility on a user’s permissions: Checking a particular user’s privileges requires a lookup in every ACL in the organization.
  • Error-prone when used at scale: When ACLs are used at scale, it can be cumbersome to add the proper permissions for users, or detect if a user has been given permissions they shouldn’t have. The difficulty in managing ACLs at scale makes it more likely that errors will occur.

Role-based Access Control (RBAC)

RBAC manages permissions with roles, where roles act as an intermediary between users and resources. In this model, users are assigned a set of roles, and roles are given permissions on resources. This model works well when there are clear groups of users who need the same set of privileges and permissions. Compared to ACLs where every permission needs to be explicitly defined, RBAC scales well with new users and resources. New users can be assigned their relevant roles and adopt all the privileges associated with those roles. Similarly, permissions for new resources can be added to existing roles and users with those roles will automatically inherit the permissions for the new resource.

Using the example from earlier, we can see how RBAC might be applied to a university setting:

  • Students are able to submit assignments and view their grades
  • Teaching assistants are able to grade assignments
  • Professors are able to grade assignments and view student grades

As we can see, the relationships in this diagram are simpler than the diagram with ACLs. Instead of specifying direct access to resources, users are assigned roles which have privileges on resources. If a new student were to join the class, they would just need to be assigned the student role and all the permissions they need will be inherited through the “student” role.

Pros:

  • Easy-to-manage policy enforcement: Updating a privilege for a role will automatically update apply for all users with that role, making it easier to enforce policies at a more granular level.
  • Scalable: New users can be granted the roles that apply for them and inherit all the privileges with those roles. As new resources are created, access to them can be granted to roles or additional roles can easily be created.
  • Better security and compliance: RBAC ensures that users only have access to the roles relevant for them, and by extension, only the privileges given to those roles. This results in users only having the necessary permissions and reduces the risk of unauthorized access.
  • Widely adopted: RBAC has been around for decades and is used in many popular databases and data products, including PostgreSQL, MySQL, MongoDB, and Snowflake.

Cons:

  • Role explosion: While RBAC is generally quite scalable, creating too many roles can occur in cases where group privileges are not clearly differentiated. When too many roles get created, RBAC can become difficult to manage. Organizations should come up with and enforce best practices for defining roles to avoid role explosion.
  • Limited flexibility: For use cases where the privileges of roles are very dynamic, RBAC can feel rigid. For instance, if an organization restructures its team structure, new roles may need to be created and existing roles may need to change their permissions. The process of safely adding and removing permissions from roles, cleaning up any deprecated roles, and restructuring role hierarchy can be cumbersome, slow down productivity, and result in tech debt.

Attribute-based Access Control (ABAC)

ABAC gates access to resources based on attributes, as opposed to users or roles. Attributes, such as who the user is, what action they’re trying to perform, which environment they are performing the action in, and what resource they are trying to perform the action on, are all considered when deciding whether or not access should be permitted. Rules are set up such that access is only allowed when conditions, determined by attributes, are met. For example, a rule can be set up such that a teaching assistant can only view grades if they’re in the grading room and it’s between 4:00 pm and 8:00 pm.

Let’s see how ABAC might be applied to the university example:

In this diagram, we can see how the ABAC policy works for a student who is trying to submit their assignment. For a student to submit their assignment under this policy, the student needs to have specific attributes, such as being enrolled and not being suspended. There are also contextual constraints, such as the submission needing to be before the deadline. If all of the conditions in the policy are satisfied, then the student can successfully submit their assignment.

Pros:

  • Highly scalable: New rules and attributes can easily be added as business needs evolve. As resources evolve, administrators can simply assign attributes to the resource, as opposed to creating a new role or changing an existing one.
  • Flexible custom policies: Rules are highly customizable, enabling administrators to easily set up access policies based on context, such as time of day and location.
  • Attributes to ensure compliance with data regulations: Administrators can add attributes to sensitive resources, allowing for labels to be added such as personally identifiable information (PII) or HIPAA for healthcare related information. This makes it easier to set up rules to ensure data privacy and data compliance with various regulations are met.

Cons:

  • Complex to implement and maintain: Attributes and policies need to be carefully defined and governed. The initial designing and assigning of attributes for users and resources can be a time consuming and complex process. Then, continuing to maintain the attributes and access policies as business needs and applications change can require significant time and effort.
  • Difficult to assess risk exposure: Although it’s generally beneficial to be able to create highly customizable access policies, it can make it difficult to audit and assess risk exposure. For instance, understanding the full access a particular user has can be difficult since policies can be complex and contingent on context-specific conditions.

Choosing an Access Control Model

When it comes to choosing an access control model, users should consider how their organization may scale in the future, who will be responsible for maintaining the access control system, and if their needs actually require going with a more complex model. If there are a limited number of users and resources, ACLs may be the best approach as they are simple to understand and implement. If access policies need to be highly customized and dynamic, then ABAC may be a better approach. For something more scalable than ACLs but without the complexity of ABAC, then RBAC is probably sufficient. Organizations may also find that a hybrid approach of these models best serves their needs, such as RBAC and ABAC together.

At DeltaStream, we’ve taken the approach of adding RBAC to our platform. DeltaStream is a real-time stream processing platform that allows users to share, process, and govern their streaming data. In the data streaming space, Apache Kafka has been one of the leading open source projects for building streaming data pipelines and storing real-time events. However, access control with Kafka is managed through ACLs, and as the number of topics and users grow, managing these ACLs has been a pain point for Kafka users. As a data streaming platform that can connect to any streaming data source, DeltaStream allows users to manage and govern their streaming resources with RBAC. RBAC strikes the balance of improving on the scalability issues of ACLs without overcomplicating access control.

If you’re interested in discussing access control or learning more about DeltaStream, feel free to reach out or get a free trial.

23 May 2024

Min Read

Workload Isolation: Everything You Need to Know

In cloud computing, workload isolation is critical for providing efficiency and security when running business workloads. Workload isolation is the practice of separating computing tasks into their own resources and/or infrastructure. By providing physical and logical separations, one compromised workload or resource cannot impact the others. This offers security and performance benefits and may be necessary to comply with regulatory requirements for certain applications.

Benefits of Workload Isolation

  • Security: By isolating workloads, organizations can reduce the ‘blast radius’ of security breaches. For instance, if an attacker were able to compromise the workload in one environment, workload isolation would protect the other workloads because they are being run in different environments. This helps to minimize, contain, and resolve potential security issues.
  • Performance: Isolated workloads can operate without interference from other tasks, ensuring that resources are dedicated and performance is optimized for each specific task. By isolating workloads, task performance becomes more predictable as tasks don’t need to compete for shared resources, making it easier to provide service level agreements (SLAs). Without workload isolation, a sudden spike in resource utilization for one task could negatively impact the performance of other tasks running on the same resources.
  • Compliance: Workload isolation simplifies compliance with various regulations by clearly defining boundaries between different data sets and processing activities.

Achieving workload isolation

Workload isolation can take many different forms and can be achieved with different approaches. When thinking about workload isolation, it is best to consider the multiple ways your workloads can be isolated, and to take a combined approach.

  • Resource Governance: Resource Governance is the ability to specify boundaries and limits for computing task resources. Popular container orchestration systems, such as Kubernetes, allow users to set resource limits on their services and workloads. Containerizing and limiting the resources for specific tasks removes the “noisy neighbor” problem, where one task can starve other tasks by consuming all of the resources.
  • Governance and Access Control: Providing access controls on data sets and compute environments ensures that only necessary individuals and services can access specific workloads. Most data systems have some form of access control that can be defined, whether that is in the form of an access control list (ACL), role-based access control (RBAC), or attribute-based access control (ABAC). Defining access control for users is essential to protect against unauthorized access.
  • Network Level Isolation: Network isolation aims to create distinct boundaries within a network, creating subnetworks with limited access between them. This practice improves security by limiting access to particular environments and helps ensure that an attacker cannot affect workloads on different subnetworks.

Workload isolation for Streaming Resources with DeltaStream

DeltaStream is a stream processing platform that is fully managed and serverless, allowing users to easily govern and process their streaming data from sources such as Apache Kafka or AWS Kinesis. As a security-minded stream processing solution, DeltaStream’s workload isolation plays a significant role in ensuring that computational queries are secure and performant. Below are some ways DeltaStream provides workload isolation:

  • Each Query Runs in its Own Environment: Powered by Apache Flink, each DeltaStream query runs in its own Flink cluster with its own dedicated resources and network. This ensures that users’ data is the only data being processed in a particular environment, minimizing the risk of sensitive data leakage. It also boosts performance, as each query can be scaled and tuned independently.
  • Multiple Deployment Options: DeltaStream offers various deployment options, including dedicated deployment and private SaaS deployment (also known as bring your own cloud or BYOC), catering to security-sensitive users. With the dedicated deployment option, a DeltaStream data plane runs in a cloud account dedicated to a single organization. In the private SaaS deployment option, a DeltaStream data plane operates within an organization’s cloud account. These options provide users with an additional level of assurance that their data is confined to a non-shared network — in the case of private SaaS, the data never leaves the user’s own network.
  • Role-based Access Control (RBAC): Access to queries and data objects within the DeltaStream Catalog is managed through DeltaStream’s RBAC. This gives users an easy-to-use and scalable system for properly governing and restricting access to their streaming data and workloads.

Workload isolation is essential for maintaining security and compliance in cloud products, with the added benefit of protecting workload performance. At DeltaStream, we have designed a stream processing platform that fully embraces workload isolation. If you’re interested in giving it a try, sign up for a free trial or contact us for a demo.

24 Apr 2024

Min Read

Prepare Data for ClickHouse Using Apache Flink

ClickHouse and Apache Flink are two powerful tools used for high-performance data querying and real-time data processing. By using these tools together, businesses can significantly improve the efficiency of their data pipelines, enabling data teams to get insights into their datasets more quickly.

ClickHouse is a fast and resource efficient column-based database management system (DBMS). It specializes in online analytical processing (OLAP) and can handle many queries with minimal latency. With ClickPipes, users who have streaming data, such as data in Apache Kafka, can easily and efficiently build ClickHouse tables from their Kafka topics.

Apache Flink is a stream processing framework that allows users to perform stateful computations over their real-time data. It is fast, scalable, and has become an industry standard for event time stream processing. As a system with a rich connector ecosystem, Flink also integrates easily with Apache Kafka.

Typical Flink and ClickHouse Architecture

ClickHouse and Flink have been used together across the industry at companies like GoldSkyInstaCartLyft, and others. The typical infrastructure is as follows:

  1. Data from user product interactions, backend services, or database events via CDC are produced to a streaming data storage system (e.g. Kafka, Kinesis, Pulsar).
  2. Data in streaming storage is ingested by Flink, where it can be cleaned, filtered, aggregated, or otherwise sampled down.
  3. Flink produces the data back to the streaming storage where it is then loaded into ClickHouse via ClickPipes.
  4. Data scientists and data engineers can query ClickHouse tables for the latest up-to-date data and take advantage of ClickHouse’s high-performance querying capabilities.

How Flink Adds More Value to ClickHouse

You may be wondering why Flink is needed in this architecture. Since ClickPipes already enable users to load data from streaming stores directly into ClickHouse, why not just skip Flink altogether?

The answer is that although ClickHouse is a highly optimized DBMS for querying data, performing queries such as aggregations over large data sets still forces the ClickHouse query engine to bring the relevant columns of every entry into memory to perform the aggregation, which can affect query latency. In this ClickHouse blog, they listed that the following query took 15 seconds to complete:

  1. SELECT
  2. project,
  3. sum(hits) AS h
  4. FROM wikistat
  5. WHERE date(time) = '2015-05-01'
  6. GROUP BY project
  7. ORDER BY h DESC
  8. LIMIT 10

One feature that ClickHouse has to reduce latencies for commonly run queries is Materialized Views (ClickHouse docs on creating Views). In their blog, they first created a materialized view to compute the result, then ran the same query against the materialized view. The result was computed in 3ms as opposed to 15s.

For users who load their raw streaming data directly into a ClickHouse table, they can utilize materialized views to transform and prepare the data for consumption. However, these views need to be maintained by ClickHouse, and this overhead can add up, especially if a lot of views are being created. Having too many materialized views and putting too much computational load onto ClickHouse can lead to performance degradation, resulting in lower throughputs for writes and increased latencies for querying data.

Introducing a stream processing engine, such as Flink, lets users transform and prepare streaming data before loading it into ClickHouse. This helps alleviate pressure from ClickHouse and allows users to take advantage of the features that come with Flink. For instance, ClickHouse is known to struggle with queries that include joins. By using Flink, datasets can be joined and transformed in Flink before being loaded into ClickHouse. This way, instead of resources being diverted into data preparation queries, ClickHouse can focus on serving high-volume OLAP queries, which it excels at. Since Flink is built to be able to efficiently handle large and complex stream processing workloads, offloading complex computations from ClickHouse to Flink ultimately makes data available more quickly and reduces computational expenses.

Building with Cloud Products

There are many benefits to utilizing this architecture for real-time analytics, but the reality for many companies is that the systems involved require too many resources to maintain and operate. This is the classic build vs buy dilemma. If your company does decide to go with the buy route, here are the cloud offerings we recommend for the 3 main components of this architecture:

  1. Streaming Storage: For Kafka compatible solutions, Confluent Cloud, RedPanda, Amazon MSK, and WarpStream are all viable options with different tradeoffs. Other streaming storage solutions include Amazon Kinesis and StreamNative for managed Pulsar among others.
  2. Stream Processing: DeltaStream is a great serverless solution to handle stream processing workloads. Powered by Apache Flink, DeltaStream users can benefit from the capabilities of Flink without having to worry about the complexity of learning, managing, and deploying Flink themselves.
  3. ClickHouse: ClickHouse Cloud is a serverless ClickHouse solution that is simple to set up, reliable, and has an intuitive SQL-based user interface.

Conclusion

In this post, we discussed a popular architecture involving Kafka, Flink, and ClickHouse that many companies have been adopting across the industry. These systems work together to enable high-performance analytics for real-time data. In particular, we touched on how Flink complements both Kafka and ClickHouse in this architecture.

If you’re looking for a cloud-based stream processing solution, DeltaStream is a serverless platform that is powerful, intuitive, and easy to set up. Stay tuned for our next blog post as we cover a use case using this architecture, with DeltaStream in place of Flink. Meanwhile, if you want to give DeltaStream a try yourself, you can sign up for a free trial.

17 Apr 2024

Min Read

Data Warehouse vs Data Lake vs Data Lakehouse: What’s the difference?

As data technologies continue to advance, modern companies are ingesting, storing, and processing more data than ever before in order to make the most informed business decisions. While relational databases may have been enough for the data demands 25 years ago, the continual increase of data operations has led to the emergence of new data technologies to support the era of big data. These days, there are a host of cloud products for data teams to choose from, many of which describe themselves as data warehouses, data lakes, or data lakehouses. With such similar terms, it can be difficult to understand what vendors mean by these terms. In this post, we’ll break down what these terms mean, then discuss how real-time data streaming plays a role in the big data landscape.

What is a Data Warehouse?

A data warehouse is a storage and processing hub, primarily intended for generating reports and performing historical analysis. Data stored in data warehouses are structured and well-defined, allowing the warehouse to perform fast and performant analysis on its datasets. Data from relational databases, streaming storage systems, backend systems, and other sources are loaded into the data warehouse through ETL (extract, transform, load) processes, where data is cleaned and otherwise transformed to match the data integrity requirements expected by the data warehouse. Most data warehouses allow users to access data through SQL clients, business intelligence (BI) tools, or other analytical tools.

Data warehouses are a great choice for organizations that primarily need to do historical data analytics and reporting on structured data. However, the ETL process adds complexity to the ingestion of data into the data warehouse and the requirements for structured data can make the system limiting for some use cases. Popular data warehouse vendors include Snowflake, Amazon Redshift, Google BigQuery, and Oracle Autonomous Data Warehouse.

What is a Data Lake?

A data lake is a massive storage system designed to store both structured and unstructured data at any scale. Similar to data warehouses, data lakes can ingest data from many different sources. However, data lakes are designed to be flexible so that users are able to store their raw data as-is, without needing to clean, reformat, or restructure the data first. By utilizing cheap object data storage and accommodating a wide range of data formats, data lakes make it easy for developers to simply store their data. This ultimately results in organizations accumulating large repositories of data that can be used to power use cases such as machine learning analytics, aggregations on large datasets, and exploring patterns in data from different data sources. One of the challenges of working with data lakes, however, is that downstream tasks need to make sense of differently formatted data to perform analysis on them. Further, if poorly maintained, data quality can very easily become an issue in data lakes. Tools like Apache Hadoop and Apache Spark are popular for doing analysis with a data lake, as these tools allow developers to write custom logic to make sense of different kinds of data, but they require more expertise to work with which limits the set of people who can feasibly work with the data lake.

Data lakes are a good choice for organizations that have a lot of data they need to store, accommodating both structured and unstructured data, but analyzing and maintaining the data lake can be a challenge. Data lakes are commonly built on cheap cloud storage solutions such as AWS S3, Azure Data Lake Storage, and Google Cloud Storage.

What is a Data Lakehouse?

Data lakehouses merge the features of data warehouses and data lakes into a single system, hence the name. As data warehouses began adding more features found in data lakes, and as data lakes began adding more features found in data warehouses, the distinction between the two concepts became somewhat blurred. Before data lakehouses, organizations would typically need both a data lake for storage and a data warehouse for processing, but this setup could end up causing data teams a lot of overhead, as data from one location would often need to be processed or duplicated to the other location for data engineers to perform complete analyses. By merging the two concepts into a single system, data lakehouses aim to remove these silos and get the benefits of both worlds. Similar to data lakes, storing data in a data lakehouse is still cheap, scalable, and flexible, but metadata layers are also provided to enforce things like schemas and data validation where necessary. This allows the data lakehouse to still be performant for querying and analytics, like data warehouses are.

Since data is typically loaded into a data lakehouse in its raw format, it’s common for a medallion architecture to be used. The medallion architecture describes a series of queries or processing steps to transform raw data (bronze), to filtered/cleaned data (silver), to business ready aggregated results (gold), where the gold set of data can be easily queried for BI purposes.

While the actual distinctions of what makes a system a data lakehouse instead of a data lake or data warehouse are somewhat nuanced, popular cloud vendors that have data lakehouse capabilities include Databricks Lakehouse Platform, Snowflake, Amazon Redshift Spectrum, and Google Cloud BigLake. While data lakehouses can handle a wide range of use cases, they can be complex to manage and still require skilled data experts to extract its full benefits.

Impacts of Real-time Streaming Data

As big data technologies continue to evolve, there has been an increasing demand for real-time data products. Users are becoming more accustomed to getting results instantly, and in order to support these use cases, companies have been adopting streaming technologies such as Apache Kafka and Apache Flink.

The Challenges of Streaming Data in the Current Ecosystem

Apache Kafka is a real-time event log that uses a publisher/consumer model. Micro-services, clients, and other systems with real-time data will produce events to Kafka topics, then the data events in these topics are consumed by other real-time services that act on these events. Data in Kafka and other streaming storage systems typically set some expiration period for their data events, so in order to keep their real-time data long-term, organizations typically load this data into a data lake, data warehouse, or data lakehouse for analysis later on. However, streaming data coming from IoT sensors, financial services, and web interactions can sum up to a large volume of data, and doing computation raw form of this data can be too slow or too computationally expensive to be viable. In order to address this, data engineers will typically do downsampling or other transformations to prepare the raw data for end users. In the case of data lakehouses, a medallion architecture, as mentioned earlier, is recommended to prepare the data for general consumption. For data lakes, a compute engine such as a data warehouse, or some Spark/Hadoop infrastructure, is needed to transform the data into more consumable results.

A setup that requires constant recomputation comes with an inherent tradeoff. Real-time data is constantly arriving into the data lake or data lakehouse, so users will need to choose between recomputing results often, which can be computationally expensive, or recompute less frequently, resulting in stale datasets. Another issue with the setup mentioned earlier is that computed results need to be stored as well. In the medallion architecture for example, where raw data needs to go through multiple steps of processing before being ready for warehouse-like querying, this could involve storing the same data multiple times. This results in higher storage costs and higher latencies, as each processing step needs to be scheduled for recomputation.

Using Stream Processing to Prepare Streaming Data

This is where a stream processing solution, such as Apache Flink, can become beneficial. Stream processing jobs are long-lived and can produce analytical results incrementally, as new data events arrive. Contrast this to the medallion architecture where new result datasets need to be completely recomputed. By adding stream processing to the data stack, streaming data can be filtered, transformed, and aggregated before ever arriving to the data lake, data warehouse, and data lakehouse layer. This results in lower computational costs and lower end-to-end latencies.

One of the main burdens of Apache Flink and other stream processing frameworks is their complexity. Understanding how to develop, manage, scale, and provide fault tolerance for stream processing applications requires skilled personnel and time. With DeltaStream, we take all of that complexity away so that users can focus on their processing logic. DeltaStream is a fully managed serverless stream processing solution that is powered by Apache Flink. If you’re interested in how DeltaStream can help you manage your streaming data, schedule a demo with us or reach out to us on one of our socials.

13 Mar 2024

Min Read

How to Read Kafka Source Offsets with Flink’s State Processor API

Apache Flink is one of the most popular frameworks for data stream processing. As a stateful processing engine, Flink is able to handle processing logic with aggregations, joins, and windowing. To ensure that Flink jobs are recoverable with exactly-once semantics, Flink has a state-of-the-art state snapshotting mechanism, so in the event of a failure, the job can be resumed from the latest snapshot.

In some advanced use cases, such as job migrations or job auditing, users may be required to inspect or modify their Flink job’s state snapshots (called Savepoints and Checkpoints in Flink). For this purpose, Flink provides the State Processor API. However, this API is not always straightforward to use and requires deep understanding of Flink operator states.

In this post, we’ll cover an example of using the State Processor API, broken up into 3 parts:

  1. Introduce our Flink job which reads data from an Apache Kafka topic
  2. Deep dive into how Flink’s KafkaSource maintains its state
  3. Use the State Processor API to extract the Kafka partition-offset state from the Flink job’s savepoint/checkpoint

If you want to see an example of the State Processor API in use, feel free to skip ahead to the last section.

Note that this post is a technical tutorial for those who want to get started with the State Processor API, and is intended for readers who already have some familiarity with Apache Flink and stream processing concepts.

Creating a Flink Job

Below is the Java code for our Flink job. This job simply reads from the “source” topic in Kafka, deserializes the records as simple Strings, then writes the results to the “sink” topic.

  1. public class FlinkTest {
  2.  
  3. public static void main(String[] args) throws Exception {
  4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5.  
  6. KafkaSource<String> source = KafkaSource.<String>builder()
  7. .setBootstrapServers("localhost:9092")
  8. .setTopics("source")
  9. .setGroupId("my-group")
  10. .setStartingOffsets(OffsetsInitializer.latest())
  11. .setValueOnlyDeserializer(new SimpleStringSchema())
  12. .build();
  13.  
  14. DataStream<String> sourceStream = env.fromSource(
  15. source, WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source")
  16. .uid("kafkasourceuid");
  17.  
  18. KafkaRecordSerializationSchema<String> serializer = KafkaRecordSerializationSchema.builder()
  19. .setValueSerializationSchema(new SimpleStringSchema())
  20. .setTopic("sink")
  21. .build();
  22. Properties kprops = new Properties();
  23. kprops.setProperty("transaction.timeout.ms", "300000"); // e.g., 5 mins
  24. KafkaSink<String> sink = KafkaSink.<String>builder()
  25. .setBootstrapServers("localhost:9092")
  26. .setRecordSerializer(serializer)
  27. .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
  28. .setKafkaProducerConfig(kprops)
  29. .setTransactionalIdPrefix("txn-prefix")
  30. .build();
  31.  
  32. sourceStream.sinkTo(sink);
  33. env.enableCheckpointing(10000L);
  34. env.getCheckpointConfig().setCheckpointTimeout(60000);
  35. env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  36. env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);
  37. env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
  38. env.getCheckpointConfig().setCheckpointStorage("file:///tmp/checkpoints");
  39. env.execute("tester");
  40. }
  41. }

There are a few important things to note from this Flink job:

  1. We are creating a KafkaSource object. On line 6, the KafkaSource is then given to the StreamExecutionEnvironment’s fromSource method which returns a DataStreamSource object, which represents the actual Flink source operator.
  2. We set the operator ID for our KafkaSource operator using the uid method. This is set on line 16. It’s best practice to set all the IDs for all Flink operators when possible, but we’re emphasizing this here because we’ll actually need to refer to this ID when we use the State Processor API to inspect the state snapshots.
  3. Flink checkpointing is turned on. On lines 33-38, we are configuring our Flink environment’s checkpointing configurations to ensure that the Flink job will take incremental checkpoints. We’ll be analyzing these checkpoints later on.

Understanding the KafkaSource State

Before we inspect the checkpoints generated from our test Flink job, we first need to understand how the KafkaSource Flink operator saves its state.

As we’ve already mentioned, we’re using Flink’s KafkaSource to connect to our source Kafka data. Flink sources have 3 main components – Split, SourceReader, SplitEnumerator (Flink docs). A Split represents a portion of data that a source consumes and is the granularity that the source can parallelize reading data. For the KafkaSource, each Kafka partition corresponds to a separate Split, represented by the KafkaPartitionSplit class. The KafkaPartitionSplit is serialized by the KafkaPartitionSplitSerializer class. The logic for this serializer is pretty simple, it writes out a byte array of the Split’s topic, partition, and offset.

KafkaPartitionSplitSerializer’s serialize method:

  1. @Override
  2. public byte[] serialize(KafkaPartitionSplit split) throws IOException {
  3. try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
  4. DataOutputStream out = new DataOutputStream(baos)) {
  5. out.writeUTF(split.getTopic());
  6. out.writeInt(split.getPartition());
  7. out.writeLong(split.getStartingOffset());
  8. out.writeLong(split.getStoppingOffset().orElse(KafkaPartitionSplit.NO_STOPPING_OFFSET));
  9. out.flush();
  10. return baos.toByteArray();
  11. }
  12. }

At runtime, Flink will instantiate all of the operators, including the SourceOperator objects. For each stateful Flink operator, there is a name associated with each stateful object. In the case of a source operator, the name associated with the split states are defined by SPLIT_STATE_DESC.

  1. static final ListStateDescriptor<byte[]> SPLITS_STATE_DESC =
  2. new ListStateDescriptor<>("SourceReaderState", BytePrimitiveArraySerializer.INSTANCE);

We can inspect the SourceOperator class further to see where these split states are initialized, in the initializeState method.

SourceOperator’s initializeState method:

  1. @Override
  2. public void initializeState(StateInitializationContext context) throws Exception {
  3. super.initializeState(context);
  4. final ListState<byte[]> rawState =
  5. context.getOperatorStateStore().getListState(SPLITS_STATE_DESC);
  6. readerState = new SimpleVersionedListState<>(rawState, splitSerializer);
  7. }

The Flink state that source operators use is the SimpleVersionedListState, which uses the SimpleVersionedSerialization class. In the SimpleVersionedListState class, the serialize method calls the writeVersionAndSerialize method to ultimately serialize the state.

Finally, if we inspect the writeVersionAndSerialize method in the SimpleVersionedSerialization, we can see that before writing the actual data associated with our source operator, we first write out a few bytes for the serializer version and the data’s length.

SimpleVersionedSerialization’s writeVersionAndSerialize method:

  1. public static <T> void writeVersionAndSerialize(
  2. SimpleVersionedSerializer<T> serializer, T datum, DataOutputView out)
  3. throws IOException {
  4. checkNotNull(serializer, "serializer");
  5. checkNotNull(datum, "datum");
  6. checkNotNull(out, "out");
  7.  
  8. final byte[] data = serializer.serialize(datum);
  9.  
  10. out.writeInt(serializer.getVersion());
  11. out.writeInt(data.length);
  12. out.write(data);
  13. }

Let’s quickly recap the important parts from above:

  1. The KafkaSource operator stores its state in KafkaPartitionSplit objects.
  2. The KafkaPartitionSplit keeps track of the current topic, partition, and offset that the KafkaSource has last processed.
  3. When Flink savepointing/checkpointing occurs, a byte array representing the KafkaSource state gets written to the state snapshot. The byte array has a header which includes the serializer version and the length of data. Then the actual state data, which is a serialized version of the KafkaPartitionSplit, makes up the rest of the state byte array.

Now that we have some idea of how data is being serialized into Flink savepoints and checkpoints, let’s see how we can use the State Processor API to extract the Kafka source operator information from these state snapshots.

State Processor API to Inspect Kafka Source State

For maven projects, you can add the following dependency to your pom.xml file to start using the Flink State Processor API.

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-state-processor-api</artifactId>
  4. <version>1.18.0</version>
  5. </dependency>

The following class showcases the full example of how we can use the State Processor API to read KafkaSource offsets from a Flink savepoint or checkpoint.

  1. public class StateProcessorTest {
  2.  
  3. public static void main(String[] args) throws Exception {
  4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5.  
  6. String savepointPath = Path.of("/tmp/checkpoints/609bc335486ca6cfcc8692e4c1ff8782/chk-8").toString();
  7. SavepointReader savepoint = SavepointReader.read(env, savepointPath, new HashMapStateBackend());
  8. DataStream<byte[]> listState = savepoint.readListState(
  9. OperatorIdentifier.forUid("kafkasourceuid"),
  10. "SourceReaderState",
  11. PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
  12. CloseableIterator<byte[]> states = listState.executeAndCollect();
  13. while (states.hasNext()) {
  14. byte[] s = states.next();
  15. KafkaPartitionSplitSerializer serializer = new KafkaPartitionSplitSerializer();
  16. KafkaPartitionSplit split = serializer.deserialize(serializer.getVersion(), Arrays.copyOfRange(s, 8, s.length));
  17. System.out.println(
  18. String.format("topic=%s, partition=%s, startingOffset=%s, stoppingOffset=%s, topicPartition=%s",
  19. split.getTopic(), split.getPartition(),
  20. split.getStartingOffset(), split.getStoppingOffset(), split.getTopicPartition()));
  21. }
  22.  
  23. System.out.println("DONE");
  24. }
  25. }

First, we’ll load the savepoint. The SavepointReader class from the State Processor API allows us to load a full savepoint or checkpoint. On line 7, we are loading a checkpoint that was created in “/tmp/checkpoints” as a result of running the test Flink job. As we mentioned in the previous section, the source operators use a SimpleVersionedListState, which the SavepointReader can read using the readListState method. When reading the list states, we need to know 3 things:

  1. Operator ID: “kafkasourceuid” set in our test Flink job
  2. State Name: “SourceReaderState” set in Flink’s SourceOperator class
  3. State TypeInformation: PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO set in Flink’s SourceOperator class

After we get our list states, we can simply iterate through each of the states, which are given as byte arrays. Since the SimpleVersionedSerialization serializer first writes the version and data length, which we don’t care about, we need to skip those headers. You’ll see on line 16 that we deserialize the byte array as a KafkaPartitionSplit after skipping the first 8 bytes of the state byte array.

Running the above code example gives the following result:

  1. topic=source, partition=0, startingOffset=3, stoppingOffset=Optional.empty, topicPartition=source-0
  2. DONE

Conclusion

In this post, we explained how Flink’s KafkaSource state is serialized into savepoints and covered an example of reading this state with the State Processor API. Flink’s State Processor API can be a powerful tool to analyze and modify Flink savepoints and checkpoints. However, it can be confusing for beginners to use and requires some in-depth knowledge about how the Flink operators manage their individual states. Hopefully this guide will help you understand the KafkaSource and serve as a good tutorial for getting started with the State Processor API.

For more content about Flink and stream processing, check out more content from DeltaStream’s blog. DeltaStream is a platform that simplifies the unification, processing, and governance of streaming data.

Resources:

13 Feb 2024

Min Read

Stream Processing for IoT Data

The Internet of Things (IoT) refers to sensors and other devices that share and exchange data over a network. IoT has been on the rise for years and only seems to continue in its growing popularity with other technological advances, such as 5G cellular networks and more “smart” devices. From tracking patient health to monitoring agriculture, the applications for IoT are plentiful and diverse. Other sectors where IoT are used include security, transportation, home automation, and manufacturing.

Oracle defines Big Data as “data that contains greater variety, arriving in increasing volumes and with more velocity.” This definition is simply described with the 3 Vs – volume, velocity, and variety. IoT definitely matches this description, as sensors can emit a lot of data from numerous sensors and devices.

A platform capable of processing IoT data needs to be scalable in order to keep up with the volume of Big Data. It’s very common for many IoT applications to have new sensors added. Consider a drone fleet for package deliveries as an example – you may start off with 10 or 20 drones, but as demands for deliveries increases the size of your drone fleet can grow by orders of magnitude. The underlying systems processing these data needs to be able to scale horizontally to match the increase in data volume.

Many IoT use cases such as tracking patient health and monitoring security feeds require low latency insights. Sensors and devices providing real-time data often need to be acted on in real-time as well. For this reason, streaming and stream processing technologies have become increasingly popular and perhaps essential for solving these use cases. Streaming storage technologies such as Apache Kafka, Amazon Kinesis, and RedPanda can meet the low latency data transportation requirements of IoT. On the stream processing side, technologies such as Apache Flink and managed solutions such as DeltaStream can provide low latency streaming analytics.

IoT data can also come in various types and structures. Different sensors can have different data formats. Take a smart home for example, the cameras in a smart home will likely send very different data from a light or a thermometer. However, these sensors are all related to the same smart home. It’s important for a data platform handling IoT use cases to be able to join across different data sets and handle any variations in data structure, format, or type.

DeltaStream as a Streaming Analytics Platform and a Streaming Database

DeltaStream is a platform to unify, process, and govern streaming data. DeltaStream sits as the compute and governance layer on top of streaming storage systems such as Kafka. Powered by Apache Flink, DeltaStream is a fully managed solution that can process streaming data with very low latencies.

In this blog post we’ll cover 2 examples to show how DeltaStream can solve real-time IoT use cases. In the first use case, we’ll use DeltaStream’s Materialized Views to build a real-time request driven application. For the second use case, we’ll use DeltaStream to power real-time event-driven pipelines.

Use Case Setup: Transportation Sensor Data

For simplicity, both use cases will use the same source data. Let’s assume that our data is available in Apache Kafka and represents updates and sensor information for a truck fleet. We’ll first define Relations for the data in 2 Kafka topics.

The first Relation represents truck information. This includes an identifier for the truck, the speed of the truck, which thermometer is in the truck, and a timestamp for this update event represented as epoch milliseconds. Later on, we will use this event timestamp field to perform a join with data from other sensors. Since we expect regular truck information updates, we’ll define a Stream for this data.

Create truck_info Stream:

  1. CREATE STREAM truck_info (
  2. event_ts BIGINT,
  3. truck_id INT,
  4. speed_kmph INT,
  5. thermometer VARCHAR
  6. ) WITH (
  7. 'topic' = 'truck_info', 'value.format' = 'json', 'timestamp' = 'event_ts'
  8. );

The second Relation represents a thermometer sensor’s readings. The fields include an identifier for the thermometer, the temperature reading, and a timestamp for when the temperature was taken that is represented as epoch milliseconds. Later on, the event timestamp will be used when joining with the truck_info Stream. We will define a Changelog for this data using sensor_id as the primary key.

Create temperature_sensor Changelog:

  1. CREATE CHANGELOG temperature_sensor (
  2. "time" BIGINT,
  3. temperature_c INTEGER,
  4. sensor_id VARCHAR,
  5. PRIMARY KEY (sensor_id)
  6. ) WITH (
  7. 'topic' = 'temperature_sensor', 'value.format' = 'json', 'timestamp' = 'time'
  8. );

Using the Relations we have just defined, we want to find out what the latest temperature readings are in each truck. We can achieve this by using a temporal join to enrich our truck_info updates with the latest temperature readings from the temperature_sensor Changelog. The result of this join will be a Stream of enriched truck information updates with the latest temperature readings in the truck. The following SQL statement will launch a long-lived continuous query that will continually join these two Relations and write the results to a new Stream that is backed by a new Kafka topic.

Create truck_info_enriched Stream using CSAS:

  1. CREATE STREAM truck_info_enriched AS
  2. SELECT
  3. truck_info.event_ts,
  4. truck_info.truck_id,
  5. truck_info.speed_kmph,
  6. temp.sensor_id AS thermometer,
  7. temp.temperature_c
  8. FROM truck_info
  9. JOIN temperature_sensor temp
  10. ON truck_info.thermometer = temp.sensor_id;

While a truck fleet in a real-world environment will likely have many more sensors, such as cameras, humidity sensors, and others, we’ll keep this use case simple and just use a thermometer as the additional sensor. However, users could continue to enrich their truck information events with joins for each additional sensor data feed.

Use Case Part 1: Powering a real-time dashboard

Monitoring and health metrics are essential for managing a truck fleet. Being able to check on the status of particular trucks and generally see that trucks are doing fine can provide peace of mind for the truck fleet manager. This is where a real-time dashboard can be helpful – to have the latest metrics readily available on the status of the truck fleet.

So for our first use case, we’ll use Materialized Views to power a real-time dashboard. By materializing our truck_info_enriched Stream into a queryable view, we can build charts that can query the view and get the latest truck information. We’ll build the Materialized View in two steps. First we’ll define a new Changelog that mirrors the truck_info_enriched Stream, then we’ll create a Materialized View from this Changelog.

Create truck_info_enriched_changelog Changelog:

  1. CREATE CHANGELOG truck_info_enriched_changelog (
  2. event_ts BIGINT,
  3. truck_id INT,
  4. speed_kmph INT,
  5. thermometer VARCHAR,
  6. temperature_c INTEGER,
  7. PRIMARY KEY (truck_id)
  8. ) WITH (
  9. 'topic' = 'truck_info_enriched',
  10. 'value.format' = 'json'
  11. );

Create truck_info_mview Materialized View using CVAS:

  1. CREATE MATERIALIZED VIEW truck_info_mview AS
  2. SELECT * FROM truck_info_enriched_changelog;

Note that we could have created this Materialized View sourcing from the truck_info_enriched Stream, but if we created the Materialized View from the Stream, then each event would be a new row in the Materialized View (append mode). Instead we are building the Materialized View from a Changelog so that each event will add a new row or update an existing one based on the Changelog’s primary key (upsert mode). For our example, we only need to know the current status of each truck, so building the Materialized View with upsert mode better suits our use case.

A continuous query will power this Materialized View, constantly ingesting records from the truck_info_enriched Stream and sinking the results to truck_info_mview. Then, we can write queries to SELECT from the Materialized View. A dashboard can easily be built that simply queries this Materialized View to get the latest statuses for trucks. Here are some example queries that might be helpful when building a dashboard for the truck fleet.

Query to get truck IDs with the highest 10 temperatures:

  1. SELECT truck_id, temperature_c
  2. FROM truck_info_mview
  3. ORDER BY temperature_c DESC
  4. LIMIT 10;

Query to get all information about a truck:

  1. SELECT *
  2. FROM truck_info_mview
  3. WHERE truck_id = 3;

Query to count the number of trucks that are speeding:

  1. SELECT COUNT(truck_id) AS num_speeding_trucks
  2. FROM truck_info_mview
  3. WHERE speed_kmph > 90;

Use Case Part 2: Building a real-time alerting pipeline

While it’s great to be able to pull for real-time metrics for our truck fleet (Use Case Part 1), there are also situations where we may want the truck update events themselves to trigger actions. In our example, we’ve included thermometers as one of the sensors in each of our delivery trucks. Groceries, medicines, and some chemicals need to be delivered in refrigerated trucks. If the trucks aren’t able to stay within a desired temperature range, it could cause the items inside to go bad or degrade. This can be quite serious, especially for medicines and hazardous materials that can have a direct impact on people’s health.

For our second use case, we want to build out a streaming analytics pipeline to power an alerting service. We can use a CSAS to perform real-time stateful transformations on our data set, then sink the results into a new Stream backed by a Kafka topic. Then the sink topic will contain alertable events that the truck fleet company can feed into their alerting system or other backend systems. Let’s stick to our refrigeration example and write a query that detects if a truck’s temperature exceeds a certain threshold.

Create overheated_trucks Stream using CSAS:

  1. CREATE STREAM overheated_trucks AS
  2. SELECT * FROM truck_info_enriched WHERE temperature_c > 10;

Submitting this CSAS will launch a long-lived continuous query that ingests from the truck_info_enriched Stream, filters for only events where the truck’s temperature is greater than 10 degrees celsius, and sink the results to a new Stream called overheated_trucks. Downstream, the truck fleet company can ingest these records and send alerts to the correct teams or use these records to trigger actions in other backend systems.

Processing IoT Data with DeltaStream

IoT data can be challenging to process due to the high volume of data, the inherent real-time requirements of many IoT applications, and the distributed nature of collecting data from many different sources. While we often treat IoT use cases as their own category, they really span many sectors and use cases. That’s why using a real-time streaming platform, such as DeltaStream, that is able to keep up with the processing demands of IoT and can serve as both a streaming database and streaming analytics platform is essential.

If you want to learn more about how DeltaStream can help your business, schedule a demo with us. We also have a free trial available if you want to try out DeltaStream yourself.

01 Feb 2024

Min Read

Secure Sharing for Real-Time Data Streams

Real-time data has become increasingly valuable across many industries, including healthcare, IoT, machine learning, and cybersecurity. Stream processing frameworks such as Apache Flink enable organizations to gain immediate insights into their streaming data. For example, in the cases of IoT, companies can understand within sub-seconds if a sensor has failed. However, what if you want to share that processed data (stream) with another team who needs it for a downstream application? The processed stream is what we refer to as a Data Product, and sharing these Data Products is difficult because of the underlying permissioning in a streaming store such as Kafka. If we build a system of governing these source and processed streams, we unlock the ability to securely share Data Products. This enables cross functional and external sharing use cases.

What does it mean to Securely Share Data?

Secure data sharing is a technique that enables data owners to share their data resources in a secure and controlled way with data consumers. Secure sharing for real-time data streams involves the following aspects:

  • Data privacy: ensuring that the data streams are protected from unauthorized access, disclosure, and modification, and that the data owners have control over who can access their data and for what purpose.
  • Data access: ensuring that the data streams are available, accessible, and usable for the intended data consumers, and that the data sharing is scalable, efficient, and cost-effective.
  • Data auditing: ensuring that the usage and sharing of data streams is transparent, accountable, and auditable.

Secure data sharing improves both data collaboration and data innovation. With secure data sharing, data owners and consumers can safely exchange real-time data and insights, encouraging cross team collaboration and extracting more value out of the otherwise siloed data.

Current State of Sharing Streaming Data

Managing data authorization is one of the most common ways to securely share real-time streaming data. Let’s consider Apache Kafka for example, the most popular real-time event streaming storage systems. Data events in Kafka are organized into topics, and access to these topics is managed through Access Control Lists (ACLs). However, at scale, ACLs are difficult to manage and prone to mistakes.

To process real-time streaming data, users often look towards stream processing frameworks like Flink. ACLs would need to be configured in the streaming store to allow the Flink job to read from and write to topics. Then, to share the topic containing the processed data with other users, the ACL needs further additions. There are many streaming and stream processing technologies that make up a streaming data ecosystem, and all of these systems put stress on the ACLs used to control access to data streams

In order to overcome these challenges, there needs to be a data platform that can manage data processing, access control, and data sharing capabilities at a higher level.

Best Practices for Securely Sharing Streaming Data

There are two main types of data sharing that data platforms should support – internal data sharing and 3rd party data sharing.

Internal Data Sharing

Internal data sharing refers to the process of making data accessible for other users, teams, or applications, within an organization. Secure internal data sharing capabilities play along nicely with data meshes, where data owners can be any user or team within an organization. These data owners have the capability of authorizing who has access to data, and determining what access permissions each party has.

One of the common ways we see popular data systems, such as Snowflake, provide internal data sharing capabilities is through Role-Based Access Control (RBAC). RBAC is scalable and easy to understand, making it an effective tool for controlling access to data. While access to streaming data has typically been defined with ACLs, an RBAC-based approach would address the scalability issues seen in streaming storage systems today. A data platform that sits on top of streaming storage systems, such as Kafka, and provides an RBAC interface for managing access to the underlying Kafka topics would provide a much more intuitive access control experience for real-time data users.

3rd Party Data Sharing

3rd party data sharing refers to the process of making data accessible for parties outside of an organization. This enables organizations to collaborate with other organizations without giving them access to their data ecosystem directly. In the current streaming landscape, this kind of data sharing is not natively supported. For instance, only internal data sharing is allowed in Kafka, through ACLs.

Using Snowflake as an example, they enable secure 3rd party data sharing through the concept of shares, allowing data objects in a Snowflake organization to be made shareable for other Snowflake organizations. The data providers in this case can specify which accounts can consume from these data objects. This is just one example out of many possible ways to implement 3rd party sharing. Providing such functionality for streaming data would unlock opportunities for organizations to collaborate with real-time streaming data as well.

Conclusion

Secure sharing for real-time data streams improves data accessibility and enhances collaboration with real-time analytics, while enabling teams to maintain the privacy of their data assets. However, the data sharing capabilities in the current state of real-time streaming data are not up to par with the capabilities seen in the batch world for at-rest data. 

DeltaStream is a stream processing data platform that aims to provide an intuitive way to share streaming data, both internally and with 3rd parties. At DeltaStream, we use RBAC as the approach for access control and provide capabilities for sharing data between organizations. Data Governance and secure data sharing are essential for providing an easy-to-use data ecosystem that allows users to focus on their data products. If you are interested in learning more about DeltaStream, reach out to us for a demo or sign up for a free trial.

24 Jan 2024

Min Read

The Importance of Data Unification in Real-Time Stream Processing

In our previous blog post, Streaming Data Governance in DeltaStream, we discussed what Data Governance is, why it’s important, and how it works hand-in-hand with Data Unification in the context of real-time streaming data. In this blog, we’ll dive deeper into the concept of Data Unification.

What is Data Unification and why is it Important?

Stream processing solutions have relied on connecting directly to streaming storage systems, such as Apache Kafka and RedPanda, and running transformations on a series of topics. For the stream processing user, this requires an understanding of where and how the data (i.e. topics) are stored. An ops team would then need to authorize access to these data assets. The current stream processing approach is similar to running individual Spark jobs as opposed to using a platform such as Databricks.

The fundamental problems with this approach include the following:

  1. Access to streaming storage is limited to a small number of individuals due to complex and disjointed permissioning.
  2. Data analysts are required to understand which data storage systems and which topics contain the data needed for analysis.
  3. Stream processing workloads are created in silos by different teams. It’s very common for teams to be running their own Flink or other stream processing workloads.
  4. Reusing and sharing new data streams and data assets is difficult without a central platform that enables collaboration.

Data Unification for stream processing is needed to provide an organizational layer on top of the streaming stores that provide a complete view of streaming, relational and other data stores. Once a unified view is created, it unlocks the ability to seamlessly govern, access, and run stream processing workloads across an organization’s data footprint. 

The Current Data Landscape

The technologies that make up the current landscape of real-time streaming data were not built with Data Unification in mind. This isn’t a criticism of these technologies, as they have enabled real-time data products and solve complex distributed systems problems, but more of a statement to point out what’s been missing in the streaming data landscape.

Let’s consider Apache Kafka, which is currently the most popular streaming storage system. Topics are the organizational objects in Kafka that hold data events, and these topics are stored in clusters. Access to these topics is granted through Access Control Lists (ACLs). In most cases, organizations with real-time data will have multiple Kafka clusters or utilize a combination of streaming storage solutions, including RedPanda, Kinesis, and Apache Pulsar. Performing streaming analytics on these data sources requires users to work directly with the storage system APIs or use a stream processing framework, such as Apache Flink. This setup has 3 problems for Data Unification:

  1. Managing access control through ACLs is cumbersome and error prone. Access must be given per user, per topic. As the number of users and the number of topics grow, these lists can easily become unmanageable, resulting in security risks. Also, ACLs can’t be applied across multiple Kafka clusters, so access control operations are still siloed to the individual clusters or systems.
  2. Organization of data assets (topics) is flat. Without any namespacing capabilities, there is no way to organize or categorize topics. A flat namespace results in challenges with data discovery and data governance.
  3. Connecting directly to source/sink topics for each stream processing job is redundant and error prone. Writing stream processing applications that interact directly with the storage layer results in a large overhead to configure/maintain permissions. This can easily lead to mistakes in providing the wrong data access, resulting in organizations limiting the set of users that have access to data assets.

In the batch world, products like Databricks and Snowflake address these exact issues. Let’s consider Databricks for example. Databricks’s Unity Catalog provides a hierarchical namespace to Databricks Tables, such that each Table exists within a Catalog and Schema. While the Table is backed by parquet files existing in some S3 location (in the case of using Databricks on AWS), the logical representation of the Table in the Unity Catalog can be namespaced into any Catalog and Schema. This is very similar to the organizational structure of relational databases. Another similarity to relational databases is Databricks’s support of RBAC on their Unity Catalog. A particular Databricks user or team can be authorized access to a Catalog, Schema, or Table. Databricks also allows users to define SQL queries for data processing, which utilizes Apache Spark behind the scenes. As a result of having a Unity Catalog to view all of a user’s Tables, when a user writes such SQL queries, the queries can simply source from or write to Tables in the Unity Catalog. By operating at this logical abstraction layer with the Unity Catalog, the burden of obtaining S3 access, setting up connectivity to S3, and interacting directly with the storage layer is eliminated for users.

When compared to a data streaming system like Kafka, it becomes clear that Kafka is more akin to a physical storage system than a platform like Databricks, which offers products built on top of the storage layer. What is missing in real-time data stream processing is a solution that builds on top of streaming storage systems, such as Kafka and Kinesis, and allows users to organize and represent their streaming data in a single unified data catalog.

DeltaStream as the platform for Streaming Data Unification

DeltaStream is a complete stream processing platform to Unify, Process, and Govern all of your streaming data. Taking after the examples in the batch world, DeltaStream utilizes concepts such as data catalogs and RBAC to provide a unified and governed data platform for real-time streaming data.

DeltaStream can connect to any data streaming storage system, including Kafka, RedPanda, and Kinesis, as well as relational databases, such as PostgreSQL. A Store in DeltaStream is used to define the connection to these storage systems. Once a Store has been defined, users can create Relations that are backed by the data objects in each Store. For example, a Relation can be created that is backed by a topic in a Kafka Store. These Relations are organized into DeltaStream’s Streaming Catalog. The Streaming Catalog has two levels of namespacing in the form of Databases and Schemas. Relations can belong to any Database and Schema, and Relations from different storage systems can be co-located into the same Database and Schema. Since Relations in the Streaming Catalog can be backed by data in different storage systems, the Streaming Catalog becomes the singular unified place to view and interact with all of this data.

With a Streaming Catalog, DeltaStream users can write queries which read and write to Relations in the Streaming Catalog. Access to the Streaming Catalog is managed through a robust RBAC model that enables privileges only to the data a user or team needs. With RBAC, users can easily and securely share their data assets. By writing DeltaStream queries using Relations, users can simply focus on their business logic as opposed to dealing with the underlying storage layer connectivity.

Overview of DeltaStream’s Streaming Catalog

Bring Data Unification to your Organization

In this post, we covered what Data Unification is and why it is important in the context of streaming data and stream processing. The part of Data Unification that often gets overlooked is having a unified view in the form of a data catalog. With a unified data catalog, Data Governance and data processing features built on this catalog become simpler and more intuitive to use. This is exactly why DeltaStream not only connects to different data storage systems, but also provides a Streaming Catalog to provide this unified view of data to users.

If you want to learn more about DeltaStream, reach out to us and schedule a demo.

05 Dec 2023

Min Read

Exploring Pattern Recognition using MATCH_RECOGNIZE

Pattern recognition is a common use case in data processing. Detecting trend reversals, identifying anomalies, and finding sequences in data are all examples of pattern recognition problems. In SQL, Row Pattern Recognition (RPR) became part of the SQL standard in 2016 (ISO/IEC 9075:2016) and introduced the MATCH RECOGNIZE SQL syntax. Using this new syntax, users can write concise SQL queries to solve pattern recognition problems.

While pattern recognition is an important challenge to solve in the batch world, there are also many pattern recognition use cases in the real-time context. That’s why as a leading streaming platform, it is necessary for DeltaStream to support MATCH RECOGNIZE in its SQL syntax.

In our previous blog post Analyzing Real-Time NYC Bus Data with DeltaStream, we showed how we could write a SQL query in DeltaStream to solve the pattern recognition use case of detecting bus trips where the delays are significantly increasing. In this blog post we will do a deep dive into that query and explain the purpose and meaning behind each line.

DSQL MATCH_RECOGNIZE Query

Below is the same pattern recognition SQL query from our Analyzing Real-Time NYC Bus Data with DeltaStream post:

  1. CREATE STREAM trips_delay_increasing AS
  2. SELECT
  3. trip,
  4. vehicle,
  5. start_delay,
  6. end_delay,
  7. start_ts,
  8. end_ts,
  9. CAST(FROM_UNIXTIME((start_epoch_secs + end_epoch_secs) / 2) AS TIMESTAMP) AS avg_ts
  10. FROM trip_updates
  11. MATCH_RECOGNIZE(
  12. PARTITION BY trip
  13. ORDER BY "ts"
  14. MEASURES
  15. C.row_timestamp AS row_timestamp,
  16. C.row_key AS row_key,
  17. C.row_metadata AS row_metadata,
  18. C.vehicle AS vehicle,
  19. A.delay AS start_delay,
  20. C.delay AS end_delay,
  21. A.ts AS start_ts,
  22. C.ts AS end_ts,
  23. A.epoch_secs AS start_epoch_secs,
  24. C.epoch_secs AS end_epoch_secs
  25. ONE ROW PER MATCH
  26. AFTER MATCH SKIP TO LAST C
  27. PATTERN (A B C)
  28. DEFINE
  29. A AS delay > 0,
  30. B AS delay > A.delay + 30,
  31. C AS delay > B.delay + 30
  32. ) AS MR WITH ('timestamp'='ts')
  33. QUERY WITH ('state.ttl.millis'='3600000');

Breakdown of MATCH_RECOGNIZE Query

First of all, you can find all of our documentation on MATCH RECOGNIZE here. In this section, we’ll be breaking down the above query and discussing the thought process behind each line of the query.

In this query, the source Stream is bus trip updates with a field called delay which represents the number of seconds a bus is delayed on its current route. The goal of the query is to output bus trips where the delay is significantly increasing so that we can get a better understanding of when buses will actually arrive at their upcoming stops.

CSAS (lines 1-2)

This query is what we call a CREATE STREAM AS SELECT (CSAS) query, where we are creating a new Stream that will be the output of running the continuous SELECT statement. Running a continuous query will launch a long-lived stream processing job behind the scenes and write the results to the physical storage that is backing the new Stream. As explained in the original blog post Analyzing Real-Time NYC Bus Data with DeltaStream, a Kafka topic is the physical storage layer that is backing this Stream.

Projection (lines 2-9)

The first few lines of the SELECT query are the fields that are being projected for the resulting Stream trips_delay_increasing. These fields are made available by the MEASURE and the PARTITION BY clauses in the MATCH_RECOGNIZE statement. The following fields are being projected:

  • trip and vehicle represents the particular bus trip that is experiencing increasing delays
  • start_delay, start_ts, end_delay, and end_ts give insights into the pattern that was matched and how fast delays are increasing
  • avg_ts is the midpoint between the start_epoch_secs and end_epoch_secs, which also represents the midpoint of the matched pattern. To evaluate this field, we use built-in functions to convert from epoch seconds, an INTEGER, into a TIMESTAMP. In the original blog post, this field was used in a follow up query to find the positions of the buses during the time that the delays were increasing.

Source (lines 10-11, 32)

The FROM clause defines the source of data for the query. In this query, we are sourcing from the result of the MATCH_RECOGNIZE clause on the trip_updates Stream. This source also has a WITH clause to specify a timestamp field, ts, which is a field in the trip_updates Stream. By specifying the timestamp field, we are setting the event time of incoming records to be equal to the value of that field, so later on in the ORDER BY clause we can use the correct timestamp for ordering events in our patterns.

MATCH_RECOGNIZE – PARTITION BY (line 12)

The first line in the MATCH_RECOGNIZE query is the optional PARTITION BY clause. This subclause groups the underlying data based on the partitioning expression, and optimizes DeltaStream’s compute resources for parallel processing of the source data. In this query, we are partitioning the events by the trip field, so any detected patterns are unique to a particular bus trip. In other words, we want to know for each particular bus trip if there are increasing delays. The PARTITION BY clause is necessary for our use case because the delays for one bus trip are not relevant to other bus trips. Note that the fields in the PARTITION BY clause are available for projection, as shown in this query by selecting the trip field in the SELECT statement.

MATCH_RECOGNIZE – ORDER BY (line 13)

The ORDER BY clause is required for MATCH_RECOGNIZE. This clause defines the order in which the data should be sorted for each partition before they are evaluated for matched patterns. For our query, the ordering field is ts, so bus trip updates will be ordered in ascending order according to the value of the ts field. One requirement for the ORDER BY subclause is that the ordering field must be the same as the timestamp field for the source Relation, which is set on line 32 (also mentioned in the Source section above).

MATCH_RECOGNIZE – MEASURES (lines 14-24)

The MEASURES clause defines the output schema for the MATCH_RECOGNIZE statement and has a similar meaning as a SELECT clause. The fields specified in the MEASURES subclause are made available to the SELECT clause. The MEASURES subclause and the DEFINE subclause (lines 28-31) are closely related, as the MEASURES subclause is projecting fields from rows defined by the DEFINE subclause. For example, on line 19 we define start_delay as A.delay. In this case, the delay for the row matching A’s definition is being projected as start_delay, whereas on line 20 the delay for the row matching C’s definition is being projected as end_delay. There are 3 fields in the MEASURES sub-clause that aren’t being used in our query’s SELECT clause. These are the row metadata columns – row_timestamp, row_key, and row_metadata. Since the MATCH_RECOGNIZE operator alters the projection columns of its input Relation, a PATTERN variable must be chosen for these special fields as part of the MEASURES subclause, which we do on lines 15-17. See the MATCH_RECOGNIZE - PATTERN below for more information on the PATTERN variables.

MATCH_RECOGNIZE – Output Mode (line 25)

ONE ROW PER MATCH is the only supported output mode, which means for a given sequence of events that matches our pattern, we should output one result. So in our query, for a bus trip with significantly increasing delays, we should output one event for this trip.

MATCH_RECOGNIZE – AFTER MATCH strategy (line 26)

The after match strategy defines where to begin looking for the next pattern. In this query, we specify AFTER MATCH SKIP TO LAST C, which means after finding a pattern match, look for the next pattern match starting with the last event of the current match. Other match strategies could inform our query to start looking for the next pattern starting from a different event, such as the event after a pattern match. However in our case, we want to make sure we are capturing continuous patterns. Specifically for our query, the pattern that we are looking for is 3 trip updates with increases in delay in a row (see MATCH_RECOGNIZE – PATTERN section below). So, if there was a bus trip with 5 trip updates with strictly increasing delay, then there would be 2 results from our MATCH_RECOGNIZE statement with our after match strategy. The first result would be for updates 1-3, and the second result for updates 3-5. The first match’s C would also be the second match’s A in this case. See other after match strategies in our documentation.

MATCH_RECOGNIZE – PATTERN (line 27)

The PATTERN clause specifies the pattern of events that should be considered a match. The PATTERN subclause contains pattern variables, which can each be associated with a quantifier to specify how many rows of that variable to allow in the pattern (see the documentation for more details). In our query, we have a simple pattern of (A B C) without any quantifiers, meaning that in order for a series of events to be considered a match, there needs to be 3 consecutive events with the first one matching A’s definition, the second one matching B’s definition, and the third one matching C’s definition.

MATCH_RECOGNIZE – DEFINE (lines 28-31)

The DEFINE subclause defines each of the pattern variables from the PATTERN subclause. If a variable is not defined then it will evaluate to true for an event contributing to a pattern match. This clause is similar to the WHERE clause in SQL, in that it specifies conditions that an event must meet in order to be considered as one of the pattern variables. When defining these pattern variables, we can access fields from the original events of the source Relation, in our case the trip_updates Stream, and define expressions for evaluation. Aggregation and offset functions can also be applied here. In our query, we are defining our pattern variables based on their delay. For the first event in our pattern, A, we want to see bus trips that are already delayed by 30 seconds. B is defined as an event that is 30 seconds more delayed than A, and similarly C is defined as an event that is 30 seconds more delayed than B. Combined with our PATTERN of (A B C), our query is essentially finding patterns where the delay is increasing by 30 seconds with each trip update for 3 trip updates in a row.

QUERY WITH (line 33)

The last line of our query is the QUERY WITH clause. This optional clause is used to set query properties. For our query, we are setting the state.ttl.millis which is used to inform the query when it is safe to purge its state. Another way to limit the state size for MATCH_RECOGNIZE queries is the optional WITHIN subclause that specifies a duration for patterns. Without specifying some way for the query to purge state, the amount of information the query needs to keep in memory will grow indefinitely and the query will eventually fail.

Conclusion

Queries using the MATCH_RECOGNIZE syntax can seem very complex at first glance. This blog post aims to bring clarity to the different parts of a MATCH_RECOGNIZE query by using a specific, easy to follow example. Hopefully this post helps make pattern recognition queries easier to understand, as it is quite a powerful operator that can solve many use cases.

If you’re interested in trying out MATCH_RECOGNIZE queries in DeltaStream yourself, sign up for a free trial or schedule a demo with us.

29 Nov 2023

Min Read

Top 3 Challenges of Stream Processing and How to Address Them

Real-time shipment tracking, truck fleet management, real-time training of ML models, and fraud detection are all real use-cases that are powered by streaming technologies. As companies try to keep up with the trends of building real-time products, they require a data stack capable of handling and processing streaming data. Unlocking stream processing allows businesses to do advanced data analytics at low latencies, which ultimately allows for faster business insights and more informed decision making. However, many of the popular stream processing frameworks, like Apache Flink, are quite complex and can be challenging to operate. In this blog post we’ll cover 3 of the main challenges with operating a stream processing platform and how to address them.

Challenge 1: Resource Management

Stream processing jobs are long lived in order to continuously process data. Because these jobs are long lived, it’s important that your stream processing platform properly allocates resources for the stream processing jobs. On one hand, over-allocating resources will result in overspending, while on the other hand, under-allocating resources will cause jobs to fall behind or fail. The proper allocation for stream processing jobs varies case by case. For instance, jobs with high throughputs that need to hold a lot of state should receive more memory, while jobs with complex transformations should receive more CPU. Many workloads can also fluctuate and a dynamic allocation of resources in such cases is necessary to match the workload. Take a stream of data for page visits on a website for example – it’s likely that the website will be visited more during the day when people aren’t asleep. So, stream processing jobs that source from this data should scale up during the day, then scale down for the night.

Exposing resource utilization metrics is an important first step to tackling the challenges of resource management. Having visibility into the resource utilization trends of your jobs can allow your stream processing platform to have rules for resource allocation. In the simplest case, if your job’s resource allocation is stable, you can allocate the amount of resources to match what’s shown in the metrics. For jobs with predictable fluctuations, such as a job sourcing from data that peaks during the day and dips during the night, you can set up a system that adjusts the resource allocation on a timely basis. In the most complex case, for jobs with unpredictable resource fluctuations, the best approach is to add an auto-scaling service that can automatically resize compute resources based on resource metrics. Building a platform that exposes the correct metrics, can safely resize your stream processing jobs, and includes an auto-scaling service are all necessary to generically support stream processing workloads, but can take a lot of engineering time and effort. If building these systems is too costly of an engineering investment, you can also consider implementing fully managed third party solutions that can help to partially or fully address these challenges.

Challenge 2: Data Heterogeneity

For production use cases, data can come from many different sources and in many different formats. Streaming data coming from sensors, cloud providers, databases, and backend services will differ from each other, which makes them difficult to compare. It is not easy to create a stream processing platform that can handle various data formats and quality levels. The engineering team that supports this platform will need to understand the nuances of different data sources and provide tools/features that can help to make variable data more uniform. However, this platform can create many possibilities as businesses can use data from sources that were isolated before.

Standardizing the data across your organization and implementing quality control over the data are the best solutions for dealing with data heterogeneity. Providing data standards and encouraging data schematization at the data’s source is the best practice, but if that’s not possible, stream processing can also help to transform a set of data into a standardized format that can be easily processed and integrated with other data streams. Stream processing platforms that enable users to filter out bad or duplicate data, enrich data with missing fields, and mutate data to fit standardized data formats can mitigate many of the issues of variable data.

One tip when it comes to dealing with data with variable data quality is to provide different configurations for error handling. For many stream processing frameworks, if there is a record that a job doesn’t know how to deserialize or make sense of, the job will simply fail. For data sources that don’t have great data integrity, having options to skip over those records or produce them to a dead-letter queue for further analysis can be better options for your overall application.

Challenge 3: Streaming Knowledge Gaps

Most stream processing frameworks are highly complex pieces of software. Building up expertise to understand the challenges of streaming vs. the traditional challenges in the batch world takes time. For some organizations, having engineers ramp up on streaming technologies may not be a worthwhile or affordable investment. For organizations that do end up investing in a team of streaming experts, a knowledge gap between the streaming team and other teams often form. Even with a stream processing platform available to them, product engineers in many cases may not have much exposure to streaming concepts or how to best leverage the streaming tools available to them. In these cases, engineers working on product features or applications may require a lot of back and forth with the team of streaming engineers to realize their projects, or they may not even realize the benefits of adding stream processing to their projects in the first place. These situations can lead to loss of business potential and impact developer velocity.

Two ways to help address these challenges is by investing in developer education and by democratizing the streaming and stream processing platforms. Setting up regular knowledge sharing sessions and encouraging collaboration between teams can go a long way to reducing knowledge gaps between teams. From the platform perspective, democratizing streaming and stream processing by making these platforms easy to use will lower the barrier of entry to these technologies. Popular stream processing frameworks such as Flink and Spark Streaming have SQL APIs for defining data processing jobs. Exposing SQL to abstract away some of the complexities of the underlying system is one way to make the platform easier to use.

Conclusion

In this blog post we highlighted 3 of the main challenges we’ve seen organizations face when building and operating their own stream processing platforms. Overcoming each of these challenges requires engineering time and effort. While some organizations may be able to spend the up front time and money to build their own in-house data streaming platforms, others may not be able to afford to. This is where fully managed cloud services, such as DeltaStream, can help.

Our aim at DeltaStream is to provide an easy to use data streaming platform to unify, process, and govern your data. Here’s how DeltaStream addresses each of the challenges above:

  1. Resource Management: DeltaStream is a serverless platform, meaning resource scaling and operations are completely taken care of. No cluster sizing or resource provisioning.
  2. Data Heterogeneity: Out of the box, DeltaStream has support for all major data serialization formats – JSON, ProtoBuf, and Avro. DeltaStream also has native support for many data storage systems including Kafka, Kinesis, Delta Lake, Snowflake, and PostgreSQL. DeltaStream’s rich processing capabilities also allow users to filter, enrich, and transform data to mitigate data quality issues.
  3. Streaming Knowledge Gaps: DeltaStream exposes an easy to use SQL interface for interacting with all streaming resources and streaming queries. Tools for sampling data and testing queries are also provided to help users iterate faster.

If you want to learn more about how DeltaStream can enable stream processing in your organization, schedule a demo with us. If you want to try DeltaStream out yourself, sign up for a free trial.

alert-icon

Please enter a valid email address.

Request Submitted

Thank you for requesting a demo.
You will receive your login information to your email soon.