In the world of Decentralized Finance (DeFi), fortunes are made and lost in a single block. The protocols handling billions of dollars in Total Value Locked (TVL) are battlegrounds for some of the most sophisticated exploits imaginable—flash loan attacks, price oracle manipulation, and liquidity drains that can erase value in seconds. For security teams, the challenge is immense. Traditional, batch-based monitoring is like trying to read yesterday's newspaper to predict tomorrow's weather. It's simply too slow.

DeFi doesn't need another dashboard; it needs an autonomous immune system. It needs a sentinel that can watch every transaction, every price tick, and every on-chain event, understand the context, and trigger a defense before an attack can succeed. This isn't science fiction. By combining a real-time stream processing platform like DeltaStream with the reasoning power of Generative AI, we can build this unblinking eye.

Let's walk through the end-to-end implementation of an Autonomous On-Chain Risk Sentinel, from the raw data to the final, AI-powered alert.

The Mission: Detecting a Flash Loan Attack in Real-Time

Our Sentinel's primary mission is to detect the signature of a flash loan attack. This classic DeFi exploit involves a specific, lightning-fast sequence of events:

  1. An attacker borrows millions of dollars in a "flash loan" (a loan that must be repaid in the same transaction).
  2. They use this massive capital to manipulate the price of an asset on a decentralized exchange (DEX).
  3. They exploit this artificial price difference on another protocol.
  4. They repay the loan, pocketing the profit, all within a few seconds.

To catch this, our Sentinel needs to ingest and understand three distinct, high-velocity data streams simultaneously:

  • onchain_transactions: The firehose of all blockchain transactions.
  • dex_prices: Real-time price updates from exchanges like Uniswap.
  • mempool_data: A view into pending transactions and network congestion (gas prices).

Step 1: Simulating the On-Chain World

To build our Sentinel, we first need to replicate the chaotic environment of a live blockchain. We use a Java data generator (as seen in the DeFiEventGenerator.java in the examples repo on Github: https://github.com/deltastreaminc/examples/tree/main/cryptodefi) to produce a realistic mix of normal background traffic and, periodically, inject the precise sequence of transactions that constitute a flash loan attack. This gives us a controlled environment to test and validate our detection logic.

Step 2: Building the Sentinel's Brain with DeltaStream

This is where the magic happens. DeltaStream acts as the central nervous system for our Sentinel, executing the entire detection and analysis pipeline with a series of SQL statements.

Ingesting and Filtering the Noise

First, we create streams on top of our Kafka topics. This allows us to work with the raw on-chain data as if it were a database table. Here are the declarations of the three streams we will use.

  1. CREATE STREAM onchain_transactions (
  2. tx_hash VARCHAR, block_number BIGINT, event_timestamp BIGINT,
  3. from_address VARCHAR, to_address VARCHAR, tx_token VARCHAR, tx_amount DOUBLE
  4. ) WITH ('topic' = 'onchain_transactions', 'value.format' = 'json');
  5.  
  6. CREATE STREAM dex_prices (
  7. event_timestamp BIGINT, token_pair VARCHAR, price DOUBLE, join_helper VARCHAR
  8. ) WITH ('topic' = 'dex_prices', 'value.format' = 'json');
  9.  
  10. CREATE STREAM mempool_data (
  11. event_timestamp BIGINT, block_number BIGINT, avg_gas_price_gwei INTEGER
  12. ) WITH ('topic' = 'mempool_data', 'value.format' = 'json');

We then create intermediate streams to filter for only the most critical atomic events, like a loan of over $1M from a known flash loan provider.

  1. -- A stream of very large loans from known flash loan providers.
  2. CREATE STREAM flash_loans AS
  3. SELECT from_address AS provider, to_address AS attacker, tx_token, tx_amount, block_number, event_timestamp
  4. FROM onchain_transactions WITH ('starting.position' = 'earliest')
  5. WHERE
  6. from_address = '0xA9754f1D6516a24A413148Db4534A4684344C1fE' -- Simulated Aave Pool
  7. AND tx_amount > 1000000; -- Loan amount over $1M

Detecting the Attack Pattern with Streaming Joins

The core of the detection logic is finding the loan and repayment from the same address within the same transaction (or within 1-2 blocks). A JOIN in DeltaStream allows us to correlate these events in real-time.

  1. -- This is the crucial step: finding a loan and repayment by the same actor in a tight window.
  2. CREATE STREAM potential_flash_loan_attacks AS
  3. SELECT
  4. l.attacker, l.tx_token, l.tx_amount, l.block_number, l.event_timestamp AS attack_timestamp, 'A' AS pfla_join_helper
  5. FROM flash_loans AS l WITH ('timestamp' = 'event_timestamp', 'starting.position' = 'earliest')
  6. JOIN loan_repayments AS r WITH ('timestamp' = 'event_timestamp', 'starting.position' = 'earliest')
  7. WITHIN 24 HOURS
  8. -- Join on the attacker's address
  9. ON l.attacker = r.attacker
  10. -- The repayment must happen within 2 blocks of the loan, a key flash loan indicator
  11. WHERE r.block_number BETWEEN l.block_number AND l.block_number + 2;

Contextualizing the Threat

An attack pattern alone isn't enough. We need context. Was there a corresponding price drop on a DEX? Did gas fees spike? Another JOIN enriches our detected attack with this crucial market data, creating a complete picture of the event. For full statements in DeltaStream refer to the examples repository on Github(https://github.com/deltastreaminc/examples/cryptodefi).

Step 3: Real-Time Inference with a GenAI Agent

Once we have a high-confidence, fully contextualized event, we trigger our AI agent. We use a UDF, call_gemini_explainer(VARCHAR), to send all the information to a GenAI model. The real power here is prompt engineering directly in SQL. We are not just sending data; we are giving the AI a role, a task, and a required output format.

  1. -- This assumes a built-in UDF `call_sentinel_agent(VARCHAR)` that sends a prompt to a GenAI model.
  2. CREATE STREAM ai_risk_assessments AS
  3. SELECT
  4. attacker,
  5. tx_token,
  6. tx_amount,
  7. -- Use JSON_VALUE to parse the structured JSON response from the GenAI agent
  8. call_gemini_explainer(
  9. -- This is sophisticated prompt engineering done directly in SQL
  10. 'You are an autonomous on-chain risk sentinel for a DeFi protocol. Analyze the following real-time, correlated event data for signs of a flash loan-based price manipulation attack. ' ||
  11. 'The attack signature is a large loan and repayment within the same transaction, correlated with anomalous market activity. ' ||
  12. 'Based on the data, provide a structured JSON response with three keys: "risk_level" (CRITICAL, HIGH, MEDIUM, LOW), "summary" (a plain-English explanation of the event for a human analyst), and "recommended_actions" (a list of 2-3 immediate, automated actions). ' ||
  13. '--- Correlated Event Data --- ' ||
  14. 'Attacker Address: ' || attacker || ', ' ||
  15. 'Flash Loan Token: ' || tx_token || ', ' ||
  16. 'Flash Loan Amount: ' || CAST(tx_amount AS VARCHAR) || ', ' ||
  17. 'Block Number: ' || CAST(block_number AS VARCHAR) || ', ' ||
  18. 'DEX Price at Attack: ' || CAST(price_at_attack AS VARCHAR) || ', ' ||
  19. 'Gas Price (Gwei) at Attack: ' || CAST(gas_at_attack AS VARCHAR)
  20. ) AS genai_response
  21. FROM sentinel_triggers WITH ('starting.position' = 'earliest');

The Outcome: From Raw Data to Autonomous Defense

The final output of our DeltaStream pipeline is not another noisy alert. It's a complete, AI-generated intelligence briefing, produced within seconds of the first malicious transaction.

Final ai_risk_assessments Event:

  1. {
  2. "attacker": "0xde0b295669a9fd93d5f28d9ec85e40f4cb697bae",
  3. "risk_level":"CRITICAL",
  4. "summary":"A potential flash loan attack has been detected. A large flash loan of 50 million DAI was taken by address 0x4aae242b3ad24a6891e290d96d9cb529 in block 18013130. The DEX price is at 2303.97, which should be closely monitored for anomalous fluctuations that could be caused by the sudden injection of capital. High gas price suggests the attacker is prioritizing transaction inclusion.",
  5. "recommended_actions":[
  6. "Pause trading on impacted DEX pools to prevent further manipulation and losses.",
  7. "Initiate an emergency oracle update to provide a more accurate and potentially smoothed price feed to mitigate the impact of price distortions on dependent smart contracts."
  8. ]
  9. }

This is the future of on-chain security. By using DeltaStream as the brain of our operation, we've built a system that doesn't just watch—it understands, assesses, and enables an immediate, intelligent response. We've turned a chaotic flood of data into the clear, unblinking eye of a sentinel, standing guard over the decentralized future.

This blog was written by the author with assistance from AI to help with outlining, drafting, or editing.