29 Jul 2025
Min Read
From Chaos to Clarity: Building an AI-Powered Security Pipeline with DeltaStream and GenAI
In the world of cybersecurity, time is the most critical asset. A Security Operations Center (SOC) analyst's day is a race against the clock, sifting through a relentless flood of alerts from dozens of different tools. An anomalous login from Okta, a suspicious file download from Zscaler, a malware detection from CrowdStrike—are they related? By the time an analyst manually connects these dots, an attacker could already be deep inside the network.

What if we could automate this entire process? What if we could build a real-time pipeline that not only correlates these disparate events but also uses Generative AI to understand them, summarize them, and tell us exactly how critical they are?
Using DeltaStream, we can build exactly that. Let's walk through how to create a real-time inference pipeline that transforms raw security logs into AI-triaged, actionable intelligence.
The Architecture: A Unified Stream Processing Pipeline
Our goal is to take three separate event streams and turn them into one high-fidelity alert stream.
- Data Sources: We'll simulate logs from Okta (identity), Zscaler (network), and CrowdStrike (endpoint), each feeding into its own Kafka topic.
- Real-Time Correlation: Using DeltaStream, we'll ingest these streams and join them in real-time to create a single, correlated event that represents a potential incident.
- AI-Powered Triage: For each correlated incident, we will call a Generative AI model directly from our SQL query to get a human-readable summary and a priority level.
-- Stream defined on the Kafka topic ingesting Okta logs CREATE STREAM okta_logs_stream ( event_timestamp BIGINT, user_email VARCHAR, event_type VARCHAR, outcome VARCHAR, ip_address VARCHAR, country VARCHAR, is_suspicious BOOLEAN ) WITH ('topic' = 'siem_okta_logs', 'value.format' = 'json', 'timestamp' = 'event_timestamp'); -- Stream defined on the Kafka topic ingesting Zscalar logs CREATE STREAM zscaler_logs_stream ( event_timestamp BIGINT, "user" VARCHAR, "action" VARCHAR, url VARCHAR, file_name VARCHAR, file_size_bytes INTEGER ) WITH ('topic' = 'siem_zscaler_logs', 'value.format' = 'json', 'timestamp' = 'event_timestamp'); -- Stream defined on the Kafka topic ingesting CrowdStrike logs CREATE STREAM crowdstrike_alerts_stream ( event_timestamp BIGINT, user_email VARCHAR, detection_name VARCHAR, severity VARCHAR, file_hash_sha256 VARCHAR, file_name VARCHAR, action_taken VARCHAR ) WITH ('topic' = 'siem_crowdstrike_alerts', 'value.format' = 'json', 'timestamp' = 'event_timestamp');
Step 1: Correlating Events in Real-Time
The first step is to find the signal in the noise. We use DeltaStream's powerful stream-to-stream join capabilities to look for a specific sequence of events: a suspicious login, followed by a file download, followed by a malware detection, all from the same user within a 5-minute window.
-- Corelate Okta, Zscalar and CrowdStrike events by joining the streams with 5 minutes window CREATE STREAM correlated_okta_zscaler AS SELECT o.user_email, o.event_timestamp AS login_timestamp, o.ip_address, o.country, o.is_suspicious, z.url AS downloaded_url, z.file_name AS downloaded_file FROM okta_logs_stream AS o JOIN zscaler_logs_stream AS z WITHIN 5 MINUTES ON o.user_email = z."user" WHERE o.is_suspicious = true; CREATE STREAM correlated_security_incidents AS SELECT coz.user_email, coz.login_timestamp, coz.ip_address, coz.country, coz.downloaded_url, coz.downloaded_file, cs.detection_name AS malware_name, cs.severity AS malware_severity, cs.action_taken AS edr_action FROM correlated_okta_zscaler AS coz JOIN crowdstrike_alerts_stream AS cs WITHIN 5 MINUTES ON coz.user_email = cs.user_email WHERE coz.downloaded_file = cs.file_name AND cs.severity = 'CRITICAL';
The correlated_security_incidents
stream does exactly this, producing a single, enriched record that contains all the relevant details from the three source systems.
Step 2: Prompt Engineering Directly in SQL
This is where the magic happens. With our correlated data stream, we can now "talk" to our AI model. We need to give the model all the context it needs to make an informed decision. Instead of complex code, we can construct our prompt directly within our SQL statement.
This is the core of our AI integration, taken directly from the ai_triaged_alerts
stream definition:
CREATE STREAM ai_triaged_alerts AS SELECT *, call_gemini_explainer( -- Concatenate strings and column values to build the full prompt 'Analyze the following correlated security incident and return a single, valid JSON object with two keys: "summary" and "priority". Priority must be one of: CRITICAL, HIGH, MEDIUM, LOW. Incident Details: ' || 'User Email: ' || user_email || ', ' || 'Source IP Address: ' || ip_address || ', ' || 'Source Country: ' || country || ', ' || 'Downloaded URL: ' || downloaded_url || ', ' || 'Downloaded File Name: ' || downloaded_file || ', ' || 'Malware Detected: ' || malware_name || ', ' || 'Malware Severity: ' || malware_severity || ', ' || 'EDR Action Taken: ' || edr_action ) AS gemini_response_json FROM correlated_security_incidents
Here, we are doing several powerful things at once:
- Dynamic Prompt Generation: For every single event in our
correlated_security_incidents
stream, we dynamically construct a new, detailed prompt. - Contextualization: We provide the AI with all the critical data points we joined together—the user, the IP, the country, the malware name, and more.
- Instruction Tuning: We give the model clear instructions on what we need: a JSON object with a
summary
and apriority
. This ensures we get a structured, predictable response that our downstream systems can parse.
Step 3: Parsing the AI's Response
The call_gemini_explainer
UDF returns a JSON string. We use the built-in JSON_VALUE
function to easily extract the summary
and priority
fields from this response. The final stream, ai_triaged_alerts
, contains not just the raw data, but the AI's expert analysis appended to every event.
The Result: From Three Noisy Logs to One Actionable Insight
Before:
- Okta:
user [email protected] logged in from Nigeria (IP: 81.22.45.101)
- Zscaler:
user [email protected] downloaded evil.exe
- CrowdStrike:
CRITICAL malware Generic.Malware.SHeur detected on host for [email protected]
After (a single record in the ai_triaged_alerts
stream):
- user_email:
[email protected]
- summary:
"A user logged in from a high-risk country (Nigeria) and downloaded a file that was immediately identified as critical malware by the EDR system, which blocked the execution."
- priority:
"CRITICAL"
We have successfully built an autonomous, real-time inference pipeline. This isn't just about saving time; it's about fundamentally changing the security paradigm. We can reduce Mean Time to Respond (MTTR) from hours to seconds, eliminate analyst fatigue, and empower even junior team members to act on high-fidelity, pre-triaged alerts. This is the future of the autonomous SOC, built today.
This blog was written by the author with assistance from AI to help with outlining, drafting, or editing.