In the world of cybersecurity, time is the most critical asset. A Security Operations Center (SOC) analyst's day is a race against the clock, sifting through a relentless flood of alerts from dozens of different tools. An anomalous login from Okta, a suspicious file download from Zscaler, a malware detection from CrowdStrike—are they related? By the time an analyst manually connects these dots, an attacker could already be deep inside the network.

What if we could automate this entire process? What if we could build a real-time pipeline that not only correlates these disparate events but also uses Generative AI to understand them, summarize them, and tell us exactly how critical they are?

Using DeltaStream, we can build exactly that. Let's walk through how to create a real-time inference pipeline that transforms raw security logs into AI-triaged, actionable intelligence.

The Architecture: A Unified Stream Processing Pipeline

Our goal is to take three separate event streams and turn them into one high-fidelity alert stream.

  1. Data Sources: We'll simulate logs from Okta (identity), Zscaler (network), and CrowdStrike (endpoint), each feeding into its own Kafka topic.
  2. Real-Time Correlation: Using DeltaStream, we'll ingest these streams and join them in real-time to create a single, correlated event that represents a potential incident.
  3. AI-Powered Triage: For each correlated incident, we will call a Generative AI model directly from our SQL query to get a human-readable summary and a priority level.
  1. -- Stream defined on the Kafka topic ingesting Okta logs
  2. CREATE STREAM okta_logs_stream (
  3. event_timestamp BIGINT, user_email VARCHAR, event_type VARCHAR,
  4. outcome VARCHAR, ip_address VARCHAR, country VARCHAR, is_suspicious BOOLEAN
  5. ) WITH ('topic' = 'siem_okta_logs', 'value.format' = 'json', 'timestamp' = 'event_timestamp');
  6.  
  7. -- Stream defined on the Kafka topic ingesting Zscalar logs
  8. CREATE STREAM zscaler_logs_stream (
  9. event_timestamp BIGINT, "user" VARCHAR, "action" VARCHAR,
  10. url VARCHAR, file_name VARCHAR, file_size_bytes INTEGER
  11. ) WITH ('topic' = 'siem_zscaler_logs', 'value.format' = 'json', 'timestamp' = 'event_timestamp');
  12.  
  13. -- Stream defined on the Kafka topic ingesting CrowdStrike logs
  14. CREATE STREAM crowdstrike_alerts_stream (
  15. event_timestamp BIGINT, user_email VARCHAR, detection_name VARCHAR,
  16. severity VARCHAR, file_hash_sha256 VARCHAR, file_name VARCHAR, action_taken VARCHAR
  17. ) WITH ('topic' = 'siem_crowdstrike_alerts', 'value.format' = 'json', 'timestamp' = 'event_timestamp');

Step 1: Correlating Events in Real-Time

The first step is to find the signal in the noise. We use DeltaStream's powerful stream-to-stream join capabilities to look for a specific sequence of events: a suspicious login, followed by a file download, followed by a malware detection, all from the same user within a 5-minute window.

  1. -- Corelate Okta, Zscalar and CrowdStrike events by joining the streams with 5 minutes window
  2. CREATE STREAM correlated_okta_zscaler AS SELECT
  3. o.user_email, o.event_timestamp AS login_timestamp, o.ip_address, o.country, o.is_suspicious, z.url AS downloaded_url, z.file_name AS downloaded_file
  4. FROM okta_logs_stream AS o
  5. JOIN zscaler_logs_stream AS z
  6. WITHIN 5 MINUTES ON o.user_email = z."user"
  7. WHERE o.is_suspicious = true;
  8. CREATE STREAM correlated_security_incidents AS SELECT
  9. coz.user_email, coz.login_timestamp, coz.ip_address, coz.country, coz.downloaded_url, coz.downloaded_file, cs.detection_name AS malware_name, cs.severity AS malware_severity, cs.action_taken AS edr_action
  10. FROM correlated_okta_zscaler AS coz
  11. JOIN crowdstrike_alerts_stream AS cs
  12. WITHIN 5 MINUTES ON coz.user_email = cs.user_email
  13. WHERE coz.downloaded_file = cs.file_name AND cs.severity = 'CRITICAL';

The correlated_security_incidents stream does exactly this, producing a single, enriched record that contains all the relevant details from the three source systems.

Step 2: Prompt Engineering Directly in SQL

This is where the magic happens. With our correlated data stream, we can now "talk" to our AI model. We need to give the model all the context it needs to make an informed decision. Instead of complex code, we can construct our prompt directly within our SQL statement.

This is the core of our AI integration, taken directly from the ai_triaged_alerts stream definition:

  1. CREATE STREAM ai_triaged_alerts AS SELECT
  2. *,
  3. call_gemini_explainer(
  4. -- Concatenate strings and column values to build the full prompt
  5. 'Analyze the following correlated security incident and return a single, valid JSON object with two keys: "summary" and "priority". Priority must be one of: CRITICAL, HIGH, MEDIUM, LOW. Incident Details: ' ||
  6. 'User Email: ' || user_email || ', ' ||
  7. 'Source IP Address: ' || ip_address || ', ' ||
  8. 'Source Country: ' || country || ', ' ||
  9. 'Downloaded URL: ' || downloaded_url || ', ' ||
  10. 'Downloaded File Name: ' || downloaded_file || ', ' ||
  11. 'Malware Detected: ' || malware_name || ', ' ||
  12. 'Malware Severity: ' || malware_severity || ', ' ||
  13. 'EDR Action Taken: ' || edr_action
  14. ) AS gemini_response_json
  15. FROM correlated_security_incidents

Here, we are doing several powerful things at once:

  • Dynamic Prompt Generation: For every single event in our correlated_security_incidents stream, we dynamically construct a new, detailed prompt.
  • Contextualization: We provide the AI with all the critical data points we joined together—the user, the IP, the country, the malware name, and more.
  • Instruction Tuning: We give the model clear instructions on what we need: a JSON object with a summary and a priority. This ensures we get a structured, predictable response that our downstream systems can parse.

Step 3: Parsing the AI's Response

The call_gemini_explainer UDF returns a JSON string. We use the built-in JSON_VALUE function to easily extract the summary and priority fields from this response. The final stream, ai_triaged_alerts, contains not just the raw data, but the AI's expert analysis appended to every event.

The Result: From Three Noisy Logs to One Actionable Insight

Before:

After (a single record in the ai_triaged_alerts stream):

  • user_email: [email protected]
  • summary: "A user logged in from a high-risk country (Nigeria) and downloaded a file that was immediately identified as critical malware by the EDR system, which blocked the execution."
  • priority: "CRITICAL"

We have successfully built an autonomous, real-time inference pipeline. This isn't just about saving time; it's about fundamentally changing the security paradigm. We can reduce Mean Time to Respond (MTTR) from hours to seconds, eliminate analyst fatigue, and empower even junior team members to act on high-fidelity, pre-triaged alerts. This is the future of the autonomous SOC, built today.

This blog was written by the author with assistance from AI to help with outlining, drafting, or editing.