24 Jul 2025
Min Read
Beyond the Chatbot: Building a Proactive AI Agent withReal-Time Data
For years, we've interacted with chatbots. They answer questions, fetch information, and follow commands. But they are fundamentally reactive. They wait for us to ask. The next leap in AI is the agentic AI—a system that can reason, plan, and proactively take action to achieve a goal.
Imagine an AI that doesn't just answer "Where is my order?" but notices you're struggling with the checkout process and proactively offers help. That's not a chatbot; that's an agent.
The secret to this proactive ability isn't just a smarter model; it's real-time context. An agent is only as good as its awareness of the present moment. In this post, we'll build a fully functional pipeline for a proactive customer service agent that knows what a user is doing, as they are doing it. We'll use Java for our applications, Kafka for event streaming, and the powerful real-time processing of DeltaStream to serve as the agent's central nervous system.

The Architecture: Our Agent's Senses
Before we dive into the code, let's look at our plan. Our system has four key components:
- The Data Generator (Java & Kafka): A simulator that acts like a real e-commerce website, generating a constant stream of user events (page views, cart updates, chat messages) into Kafka.
- The Real-Time Brain Lobe (DeltaStream): This is the core of our system. DeltaStream will ingest the raw event streams from Kafka, process them in real-time, and shape them into a high-signal, contextual profile of each user.
- The Context Hotline (MCP Server): A simple Java web server that acts as our "Model Context Protocol" (MCP) endpoint. It provides a clean, on-demand API for the AI agent to get the real-time context from DeltaStream.
- The Agent (Conceptual): The final piece. The agentic AI application will call our MCP Server to get the context it needs to make intelligent, proactive decisions.
Ready? Let's build.
Part 1: The "Real World" - Simulating User Data with Java & Kafka
First, our agent needs a world to observe. We'll create a simple Java application that continuously generates user events and sends them to three different Kafka topics: page_views
, cart_updates
, and chat_messages
. For brevity, we skip presenting the code here, but you can get the details in DeltaStream’s public examples github repo(https://github.com/deltastreaminc/examples/tree/main/ProactiveCustomerSupportAgent).
Part 2: The Sixth Sense - Real-Time Context with DeltaStream
An agent can't act on raw data. It needs insight. We'll use DeltaStream to consume the raw Kafka events and forge them into a clean, contextualized profile for each user. This is the most critical piece of our architecture.
In your DeltaStream workspace, run the following SQL statements.
Step 1: Create Streams on Kafka Topics First, we tell DeltaStream about our Kafka topics, creating a live, queryable STREAM
for each one.
-- Step 1: Create Streams for each Kafka topic. -- A stream is a real-time, append-only table that represents the data flowing through Kafka. CREATE STREAM page_views ( event_timestamp TIMESTAMP_LTZ(6), user_id VARCHAR, page VARCHAR ) WITH ( 'topic'='csa_pageviews', 'value.format'='json', 'timestamp'='event_timestamp' ); CREATE STREAM cart_updates ( event_timestamp TIMESTAMP_LTZ(6), user_id VARCHAR, cart_action VARCHAR, item_id VARCHAR ) WITH ( 'topic'='csa_cart_updates', 'value.format'='json', 'timestamp'='event_timestamp' ); CREATE STREAM chat_messages ( event_timestamp TIMESTAMP_LTZ(6), user_id VARCHAR, message VARCHAR ) WITH ( 'topic'='csa_chat_messages', 'value.format'='json', 'timestamp'='event_timestamp' );
Step 2: Build the Context with a Materialized View This is where the magic happens. We'll create a single MATERIALIZED VIEW
that joins the three streams together and calculates key metrics in real-time. This view will be the single source of truth for our agent's context.
-- Step 2: Create the Materialized View for the live customer context. -- This MV continuously processes events from the streams to build a real-time profile of each user. -- We first join all three streams and then calculate metrics over a 10-minute sliding window. CREATE STREAM pv_cu WITH ('kafka.topic.retention.ms' = '9172800000') AS SELECT pv.event_timestamp AS event_timestamp, pv.user_id AS user_id, page, cart_action AS cart_action, item_id FROM page_views AS pv WITH ('timestamp' = 'event_timestamp') JOIN cart_updates AS cu WITH ('timestamp' = 'event_timestamp') WITHIN 10 SECONDS ON pv.user_id = cu.user_id; CREATE STREAM pv_cu_cm WITH ('kafka.topic.retention.ms' = '9172800000') ASSELECT pv_cu.event_timestamp AS event_timestamp, pv_cu.user_id AS user_id, page, cart_action AS cart_action, item_id, cm.message FROM pv_cu AS pv_cu LEFT JOIN chat_messages AS cm WITHIN 10 SECONDS ON pv_cu.user_id = cm.user_id; CREATE MATERIALIZED VIEW live_customer_context_view AS SELECT window_start, window_end, user_id, -- Get the most recent page visited LISTAGG (page) AS page_visited_list, -- Count distinct pages visited in the last 10 minutes COUNT(DISTINCT page) AS distinct_pages_visited_10m, -- Count how many items were added to the cart in the last 10 minutes SUM( CASE WHEN cart_action = 'ADD' THEN 1 ELSE 0 END ) AS items_added_to_cart_10m, -- Get the last chat message sent by the user LISTAGG (message) AS chat_message_list FROM HOP (pv_cu_cm, SIZE 10 MINUTE, ADVANCE BY 5 SECOND) WITH ( 'timestamp' = 'event_timestamp', 'starting.position' = 'earliest' ) GROUP BY window_start, window_end, user_id;
You can now SELECT * FROM live_customer_context_view LIMIT 5;
in DeltaStream and watch as your users' profiles update with every new event from Kafka. This live, structured context is the fuel our agent needs.
Part 3: The Context Hotline - Building the MCP Server
Our agent needs a simple way to ask for this context. We'll build a lightweight Java web server that provides a clean REST API endpoint. This server implements our "Model Context Protocol"—a consistent format for requesting and receiving context.
This server creates an endpoint GET /context/:userId.
When called, it queries the materialized view in DeltaStream and returns the user's live profile as a JSON object. Remember to fill in your DeltaStream connection credentials correctly. Again, for brevity, we skip presenting the code here, but you can get the details in DeltaStream’s public examples github repo(https://github.com/deltastreaminc/examples/tree/main/ProactiveCustomerSupportAgent).
Part 4: The Final Piece - The Agent Takes Action
We now have all the plumbing in place. The final step is for the agentic AI application (which you might build using a framework like LangChain) to use this context.
While we won't build a full agent here, its core logic would look something like this in pseudocode:
// Conceptual Agent Logic public class ProactiveAgent { public void decideNextAction(String userId) { // 1. Call the MCP Server to get real-time context Context currentUserContext = callMcpServer("/context/" + userId); // 2. Use the context to reason about the user's intent boolean isStuck = currentUserContext.getDistinctPagesVisited10m() > 5 && currentUserContext.getItemsAddedToCart10m() == 0; boolean isHesitatingAtCheckout = currentUserContext.getLastPageVisited().equals("/checkout") && currentUserContext.getLastChatMessage() == null; // 3. Take proactive action if (isStuck) { // Proactively open a chat window with a helpful message sendProactiveChatMessage(userId, "Having trouble finding what you're looking for? I can help!"); } else if (isHesitatingAtCheckout) { // Proactively offer a small discount to encourage purchase offerDiscount(userId, "10_PERCENT_OFF"); } } }
This simple logic shows the power of what we've built. The agent isn't waiting for a command. It's observing the user's real-time behavior, inferring their state of mind, and taking proactive steps to help.
Conclusion
We've successfully built a robust, end-to-end pipeline for a truly modern AI application. We moved beyond the simple, reactive chatbot and laid the foundation for a proactive AI agent.
By using Kafka for event transport, DeltaStream for powerful real-time processing, and a simple API server to serve context, we've enabled our AI to have the one thing it needs most to be truly helpful: awareness of the now. The next time you think about building an AI, don't just give it a brain—give it senses.
This blog was written by the author with assistance from AI to help with outlining, drafting, or editing.