07 Nov 2023

Min Read

Open Sourcing our Apache Flink + Snowflake Connector

At DeltaStream our mission is to bring a serverless and unified view of all streams to make stream processing possible for any product use case. By using Apache Flink as our underlying processing engine, we have been able to leverage its rich connector ecosystem to connect to many different data systems, breaking down the barriers of siloed data. As we mentioned in our Building Upon Apache Flink for Better Stream Processing article, using Apache Flink is more than using a robust software with a good track record at DeltaStream. Using Flink has allowed us to iterate faster on improvements or issues that arise from solving the latest and greatest data engineering challenges. However, one connector that was missing until today was the Snowflake connector.

Today, in our efforts to make solving data challenges possible, we are open sourcing our Apache Flink sink connector built for writing data to Snowflake. This connector has already provided DeltaStream with native integration between other sources of data and Snowflake. This also aligns well with our vision of providing a unified view over all data, and we want to open this project up for public use and contribution so that others in the Flink community can benefit from this connector as well.

The open source repository will be open for any contributions, suggestions, or discussions. In this article, we touch on some of the highlights around this new Flink connector.

Utilizing the Snowflake Sink

The Flink connector uses the latest Flink Sink<InputT> and SinkWriter<InputT> interfaces to build a Snowflake sink connector and write data to a configurable Snowflake table, respectively:

Diagram 1: Each SnowflakeSinkWriter inserts rows into Snowflake table using their own dedicated ingest channel

The Snowflake sink connector can be configured with a parallelism of more than 1, where each task relies on the order of data in which they receive from their upstream operator. For example, the following shows how data can be written with parallelism of 3:

  1.  
  2. DataStream<InputT>.sinkTo(SnowflakeSinkWriter<InputT>).setParallelism(3);

Diagram 1 shows the flow of data between TaskManager(s) and the destination Snowflake table. The diagram is heavily simplified to focus on the concrete SnowflakeSinkWriter<InputT>, and it shows that each sink task connects to its Snowflake table using a dedicated SnowflakeStreamingIngestChannel from Snowpipe Streaming APIs.

The SnowflakeSink<InputT> is also shipped with a generic SnowflakeRowSerializationSchema<T> interface that allows each implementation of the sink to provide its own concrete serialization to a Snowflake row of Map<String, Object> based on a given use case.

Write Records At Least Once

The first version of the Snowflake sink can write data into Snowflake tables with the delivery guarantee of NONE or AT_LEAST_ONCE, using AT_LEAST_ONCE by default. Supporting EXACTLY_ONCE semantics is a goal for a future version of this connector.

The sink writes data into its destination table after buffering records for a fixed time interval. This buffering time interval is also bounded by Flink’s checkpointing interval, which is configured as part of the StreamExecutionEnvironment. In other words, if Flink’s checkpointing interval and buffering time are configured to be different values, then records are flushed as fast as the shorter interval:

  1.  
  2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. env.enableCheckpointing(100L);
  4. SnowflakeSink<Map<String, Object>> sf_sink = SnowflakeSink.<Row>builder()
  5. .bufferTimeMillis(1000L)
  6. .build(jobId);
  7. env.fromSequence(1, 10).map(new SfRowMapFunction()).sinkTo(sf_sink);
  8. env.execute();

In this example, checkpointing interval is set to 100 milliseconds and buffering interval is configured as 1 second, which tells the Flink job to flush the records at least every 100 milliseconds, i.e. on every checkpoint.

Read more about Snowpipe Streaming best practices in the Snowflake documentation.

The Flink Community, to Infinity and Beyond

We are very excited about the opportunity to contribute our Snowflake connector to the Flink community. We’re hoping that this connector will add more value to the rich connector ecosystem of Flink that’s powering many data application use cases.

If you want to check out the connector for yourself, head over to the GitHub repository, or if you want to learn more about DeltaStream’s integration with Snowflake, read our Snowflake integration blog.

16 Oct 2023

Min Read

Always Fresh: Snowflake and DeltaStream Integrate

Snowflake is a popular data warehouse destination for streaming data. It allows users to process large amounts of data for a variety of purposes, such as dashboards, machine learning, and applications. However, to keep these use cases up to date, users need to keep the data fresh. DeltaStream’s Snowflake integration makes it easy to keep your data fresh in Snowflake with a simple SQL query.

How can I use Snowflake with DeltaStream?

Our new Snowflake integration provides a way to write pre-processed data into Snowflake without the need for storing the data in an intermediary storage:

Using DeltaStream queries, source(s) data can be transformed in the shape and form that best suits your Snowflake tables, then it can use your Snowflake Store and continuously write the resulting data in the target Snowflake table.

DeltaStream’s new Snowflake Store uses the Snowpipe Streaming APIs to provide the best streaming experience to your Snowflake account without any data loss. Stay tuned for more information on this in the upcoming blog posts.

How DeltaStream integrates with Snowflake

Creating a Snowflake Store

In DeltaStream, a Snowflake Store can be created for connectivity with a specific Snowflake Account. Use the SNOWFLAKE store type to create a new Snowflake Store:

  1.  
  2. CREATE STORE sf WITH (
  3. 'type' = SNOWFLAKE,
  4. 'uris' = 'https://<account_identifier>.snowflakecomputing.com',
  5. 'access_region' = 'AWS us-east-1',
  6. 'snowflake.account_id' = '<account_identifier>',
  7. 'snowflake.username' = '<account_username>',
  8. 'snowflake.role_name' = 'ACCOUNTADMIN',
  9. 'snowflake.client.key_file' = '/path/to/key.p8',
  10. 'snowflake.client.key_passphrase' = '<key_passphrase_if_any>',
  11. 'snowflake.warehouse_name' = '<warehouse_name>'
  12. );

Just like any other Store creation in DeltaStream, properties.file can also be used to offload the extra store information to a properties file. See Store Parameters for more information.

Once the Snowflake Store is created, you get access to any CRUD operation on databases, schemas, and tables on the specified Snowflake account without leaving your DeltaStream account.

For example, you can list available Snowflake databases:

  1. main_db.public/sf# LIST ENTITIES;
  2. Entity name
  3. -------------------------
  4. DELTA_STREAMING
  5. SNOWFLAKE
  6. SNOWFLAKE_SAMPLE_DATA

Or operate on the Snowflake schemas and tables:

  1. main_db.public/sf# LIST ENTITIES IN "DELTA_STREAMING";
  2. Entity name
  3. ----------------------
  4. INFORMATION_SCHEMA
  5. PUBLIC
  6.  
  7. main_db.public/sf# CREATE ENTITY "DELTA_STREAMING"."MY_SCHEMA";
  8. main_db.public/sf# LIST ENTITIES IN "DELTA_STREAMING";
  9. Entity name
  10. ----------------------
  11. INFORMATION_SCHEMA
  12. MY_SCHEMA
  13. PUBLIC

  1. main_db.public/sf# LIST ENTITIES IN "DELTA_STREAMING"."PUBLIC";
  2. Entity name
  3. -------------------
  4. STREAM_DATA_TBL
  5. sf_pv_table

The Snowflake entities can also be described for additional information:

  1. main_db.public/sf# DESCRIBE ENTITY "DELTA_STREAMING"."PUBLIC".sf_pv_table;
  2. Type | Name | Created at | Last DDL At | Last DDL By | Retention time | Updated At | Comment
  3. --------+----------------------------------------+----------------------+----------------------+-------------------------+----------------+----------------------+----------
  4. Table | "DELTA_STREAMING"."PUBLIC".sf_pv_table | 2023-09-05T22:26:24Z | 2023-09-05T22:26:24Z | SNOWPIPE_STREAMING_USER | 1 | 2023-09-05T22:26:25Z |
  5.  
  6. Columns:
  7. Name | Type | Nullable | Policy name | Primary key | Unique key | Comment
  8. -----------+-------------------+----------+-------------+-------------+------------+----------
  9. VIEWTIME | NUMBER(38,0) | ✓ | | | |
  10. USERID | VARCHAR(16777216) | ✓ | | | |
  11. PAGEID | VARCHAR(16777216) | ✓ | | | |
  12.  

Materializing Streaming Data with Snowflake

Once your Snowflake Store has been created, you are ready to write queries that’ll enable you to apply any necessary computation to an upstream stream of data and continuously write the result of that computation to a corresponding Snowflake table.

Let’s take a look at a query where we count the number of transactions per type of credit card:

  1.  
  2. CREATE TABLE "CC_TYPE_USAGE" WITH (
  3. 'snowflake.db.name' = 'DELTA_STREAMING',
  4. 'snowflake.schema.name' = 'PUBLIC'
  5. ) AS SELECT
  6. cc_type AS "CREDIT_TYPE",
  7. COUNT(*) AS "TOTAL_SO_FAR"
  8. FROM transactions
  9. GROUP BY cc_type;

In this example, we’re using a CREATE TABLE AS SELECT (CTAS) query to let the system know that we want to create a Table in Snowflake under the “DELTA_STREAMING”.”PUBLIC” namespace with the name CC_TYPE_USAGE. Once the CTAS query is created, we can describe the new Snowflake table of the same name:

  1. main_db.public/sf# DESCRIBE ENTITY "DELTA_STREAMING"."PUBLIC"."CC_TYPE_USAGE";
  2. Type | Name | Created at | Last DDL At | Last DDL By | Retention time | Updated At | Comment
  3. --------+--------------------------------------------+----------------------+----------------------+----------------+----------------+----------------------+----------
  4. Table | "DELTA_STREAMING"."PUBLIC"."CC_TYPE_USAGE" | 2023-09-06T22:23:18Z | 2023-09-06T22:23:18Z | SFACCOUNTOWNER | 1 | 2023-09-06T22:23:18Z |
  5.  
  6. Columns:
  7. Name | Type | Nullable | Policy name | Primary key | Unique key | Comment
  8. ---------------+-------------------+----------+-------------+-------------+------------+----------
  9. CREDIT_TYPE | VARCHAR(16777216) | ✓ | | | |
  10. TOTAL_SO_FAR | NUMBER(38,0) | ✓ | | | |

By creating the above CTAS query, we have started a process that’ll continuously count the number of credit card types that are used in our product transactions. The result set of this query appends a new row to the CC_TYPE_USAGE table for every new count for a specific cc_type, where we can use the last count for each type to perform required analysis for the business.

The result set can be previewed in the CLI:

  1. main_db.public/sf# PRINT ENTITY "DELTA_STREAMING"."PUBLIC"."CC_TYPE_USAGE";
  2. CREDIT_TYPE | TOTAL_SO_FAR
  3. --------------+---------------
  4. VISA | 1
  5. VISA | 2
  6. DISC | 1

Modern Streaming and Analytics

At DeltaStream we believe in providing a modern and unified stream processing solution that can unlock the full potential of your data, leading to improved operational efficiency, data security, and ultimately a competitive advantage for every organization when developing products. We hope that with the new Snowflake integration we open yet another door for providing data unity across your organization.

If you have any questions or want to try out this integration, reach out to us or request a free trial!

22 May 2023

Min Read

Denormalizing Distributed Datasets in Real-Time

While a distributed data mesh empowers teams in a company to securely build modern applications as they reduce data dependency, it also poses challenges for non-product teams. Certain teams within a company may require access to anonymous and denormalized data to further grow the business. In this post, we will take a look at how such teams can use DeltaStream to capture the data they need to do their work, while the data owners control the security of their data.

Training Machine Learning Models

For the purpose of this exercise, let’s assume a Machine Learning team needs access to anonymous user data for building models to reduce fraud in a financial institution based on frequency and location of payments made to an account. This team stores their data in a topic in an Apache Kafka cluster that is declared as a Store in DeltaStream:

  1. mldb.product/mlai_msk# LIST STORES;
  2. Name | Kind | Access Region | Metadata | Owner | Created at | Updated at
  3. ---------------+---------+---------------+----------+----------+----------------------+-----------------------
  4. mlai_msk | Kafka | AWS us-east-1 | {} | sysadmin | 2023-01-12T20:38:16Z | 2023-01-12T20:38:16Z

and we already have access to the payments made by `payerid` over time:

  1. CREATE STREAM payments_log (
  2. paymenttime BIGINT,
  3. payerid VARCHAR,
  4. accountid VARCHAR,
  5. paymentid VARCHAR
  6. ) WITH (‘topic’=’topic_name’, 'value.format'='JSON');

DDL 1: `payments_log` definition for the `payments_log` Kafka topic

`DDL 1` defines the running log of each payment through the product that was created using the `CREATE STREAM` statement. The `payments_log` references the `accountId` that is the recipient of each payment, and `paymentId` that includes extra payment information.

In addition to frequency of payments made to a specific `accountid`, we also need to take into account the location that payments are being made from so the training model can better detect anomalies over time. We will expand on this in the next section.

Sharing Anonymous User Data

As the stream of payments are provided in the `payments_log` Stream above, we need to securely denormalize the `payerid` field to also include where the payments are coming from without exposing users’ sensitive information. This can be done by the team that owns the additional payer information, identified by a `userid` and described by the following Changelog in the `userdb.product` Schema:

  1. CREATE CHANGELOG userdb.product.users_log (
  2. registertime BIGINT,
  3. userid VARCHAR,
  4. regionid VARCHAR,
  5. contactinfo STRUCT<email VARCHAR, phone VARCHAR, city VARCHAR, country VARCHAR>,
  6. PRIMARY KEY(userid)
  7. ) WITH ('topic'='users', 'value.format'='json');

For simplicity, let’s assume all payers are registered as a user with the product. At this point, only the users team has access to the `userdb` Database, hence the `users_log` is not accessible by the Machine Learning team, for data security reasons. The users team is the owner of the `payments_log` data, so they have usage permissions and can read from and write to the Stream.

Using the following query, we can provide the anonymous user information to the Machine Learning team in real time:

  1. CREATE STREAM payments_location
  2. AS SELECT
  3. p.paymenttime AS paytime,
  4. u.registertime AS payer_register_time,
  5. u.regionid AS region,
  6. contactinfo->city AS payment_city,
  7. contactinfo->country AS payment_country,
  8. p.accountid AS payee,
  9. p.paymentid AS paymentid
  10. FROM payments p
  11. JOIN users_log u ON u.userid = p.payerid;

Query 1: Enrich payments with anonymous payer location info with a temporal join on `users_log.userid`

In `Query 1`, we are looking up the payer represented by `payerid` in the `users_log` Changelog identified by `userid`, and while doing that, we omitted `userid`, `contactinfo.email`, and `contactinfo.phone` as they were identified as Personally Identifiable Information (PII) by the users team, prevented this data from leaking outside of the `userdb` Database.

As a result of `Query 1`, a new `payments_location` Stream is created that provides the location information for each payment made to an account in addition to the existing payment information:

  1. CREATE STREAM payments_location (
  2. paytime BIGINT,
  3. payer_register_time BIGINT,
  4. region VARCHAR,
  5. payment_city VARCHAR,
  6. payment_country VARCHAR,
  7. payee VARCHAR,
  8. paymentid VARCHAR
  9. ) WITH (‘topic’=’topicname’, 'value.format'='json');

DDL 2: Underlying DDL for the denormalized `payments_location` in `Query 1`

`DDL 2` statement reveals how `payments_location` Stream was created when `Query 1` was launched.

Model Training with Real-Time Data

Now, let’s assume that additional payment information can be provided by the `paymentid` field, and by inspecting the `payments` Stream, the `chargeinfo` structure can be very useful to our fraud detection model:

  1. CREATE STREAM payments (
  2. id VARCHAR,
  3. chargeinfo STRUCT<vcc VARCHAR, amount FLOAT, type VARCHAR>,
  4. payer VARCHAR,
  5. payee VARCHAR,
  6. paymenttime BIGINT
  7. ) WITH (‘topic’=’topicname’, 'value.format'='json');

Using the `payments` DDL, the following query can be created to continuously provide the additional charge information to the ML team:

  1. CREATE STREAM payments_full
  2. AS SELECT
  3. pl.paytime AS paytime,
  4. pl.payer_register_time AS payer_register_time,
  5. pl.region AS region,
  6. pl.payment_city AS payment_city,
  7. pl.payment_country AS payment_country,
  8. pl.payee AS payee,
  9. p.chargeinfo AS charge
  10. FROM payments_location pl
  11. JOIN payments p ON p.id = pl.paymentid;

Query 2: Denormalize payment ID into charge information

In `Query 2`, we directly replaced the `paymentid` reference with the charge information to allow the model training pipeline to get the full picture for finding payment anomalies that may be occurring within our product. As a result, the `payments_full` Stream is created as such:

  1. CREATE STREAM payments_location (
  2. paytime BIGINT,
  3. payer_register_time BIGINT,
  4. region VARCHAR,
  5. payment_city VARCHAR,
  6. payment_country VARCHAR,
  7. payee VARCHAR,
  8. charge STRUCT<vcc VARCHAR, amount FLOAT, type VARCHAR>
  9. ) WITH (‘topic’=’topicname’, 'value.format'='json');

In addition to providing the right information to the model training pipeline, the pipeline is receiving this information in real-time where it can evolve faster over time, positively impacting the business.

What’s next?

In this post, we looked at some of the techniques that can be used alongside modern products today to securely denormalize data that may be useful to other teams within the company without access to the original data. While this may be an oversimplification of the scenario, we have extensive support for different data types and data processing operations that fit endless production cases. Please refer to our developer documentation to learn more about how your scenario can be simplified using DeltaStream.

If you are using streaming storage systems such as Apache Kafka (Confluent Cloud, AWS MSK, Redpanda or any other Apache Kafka) or AWS Kinesis, you should check out DeltaStream as the platform for processing, organizing and securing your streaming data. You can schedule a demo where you can see all these capabilities in the context of a real world streaming application.

alert-icon

Please enter a valid email address.

Request Submitted

Thank you for requesting a demo.
You will receive your login information to your email soon.