Skip to main content

Real-Time Agent Context with Kafka: Sub-Second Data Freshness for AI Pipelines

· 5 min read
Metadata Morph
AI & Data Engineering Team

Batch pipelines are sufficient for most analytical workloads. They're not sufficient for AI agents making time-sensitive decisions. An anomaly detection agent that works on yesterday's data misses the incident happening right now. A customer churn agent fed weekly snapshots can't act on a user who disengaged three hours ago.

Real-time streaming closes this gap. With Kafka as the event backbone and Flink for stream processing, your agents can operate on data that is seconds old rather than hours or days.

The Freshness Problem in Agentic AI

Every AI agent has a knowledge cutoff — not just the LLM's training cutoff, but the data freshness cutoff of the context it receives. A daily batch pipeline means your agent's context is up to 24 hours stale. For operational use cases — fraud detection, infrastructure alerts, live customer support — that's unusable.

Streaming doesn't just make data fresher. It changes what your agents can do:

Batch (daily)Streaming (seconds)
Detect fraud after the factBlock fraudulent transaction in flight
Alert on yesterday's outagePage on-call during the outage
Weekly churn reportReal-time intervention when user disengages
Report on completed ordersIntervene on abandoned cart now

Architecture

Source Systems (DB, APIs, apps)

▼ (CDC / event publishing)
┌─────────────────────────────────────────┐
│ KAFKA CLUSTER │
│ │
│ Topics: │
│ • raw.orders (all events) │
│ • raw.user_events (clickstream) │
│ • raw.payments (transactions) │
│ • processed.alerts (agent inputs) │
└──────────┬──────────────────────────────┘

┌──────▼──────┐
│ FLINK │ ← stream processing: filter, join, aggregate
│ JOBS │
└──────┬──────┘

┌──────▼──────────────────────────────┐
│ AGENT CONTEXT LAYER │
│ │
│ • Feature store (Redis) │
│ • Materialized views (warehouse) │
│ • Alert topic (Kafka) │
└──────┬──────────────────────────────┘

┌──────▼──────┐
│ AI AGENTS │ ← consume fresh context, act
└─────────────┘

Setting Up Kafka Topics

Topic design determines throughput and ordering guarantees. Each domain gets its own topic; partition count controls parallelism.

from confluent_kafka.admin import AdminClient, NewTopic

admin = AdminClient({"bootstrap.servers": "kafka:9092"})

topics = [
NewTopic("raw.orders", num_partitions=12, replication_factor=3),
NewTopic("raw.user_events", num_partitions=24, replication_factor=3),
NewTopic("raw.payments", num_partitions=12, replication_factor=3),
NewTopic("processed.alerts", num_partitions=6, replication_factor=3),
]

admin.create_topics(topics)

Partition count rules:

  • More partitions = more parallelism but more overhead
  • Target 1–2 partitions per consumer thread in your Flink job
  • Use a meaningful partition key (e.g., user_id, order_id) so related events land on the same partition and maintain ordering

Producing Events

import msgspec
from confluent_kafka import Producer
from datetime import datetime

class OrderEvent(msgspec.Struct):
order_id: str
user_id: str
amount_usd: float
status: str # "placed", "paid", "shipped", "cancelled"
timestamp: datetime

encoder = msgspec.json.Encoder()
producer = Producer({"bootstrap.servers": "kafka:9092"})

def publish_order_event(event: OrderEvent):
producer.produce(
topic="raw.orders",
key=event.user_id.encode(), # partition by user for ordering
value=encoder.encode(event),
callback=lambda err, msg: print(f"Delivered: {msg.topic()}" if not err else f"Error: {err}")
)
producer.poll(0) # trigger delivery callbacks without blocking

Using msgspec for serialization (introduced in our earlier post) keeps encoding overhead negligible even at 100K+ events/second.

Flink processes the raw event stream — windowing, joining, filtering — and writes results to the agent context layer.

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaSink
from pyflink.common import WatermarkStrategy, Duration

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)

# Source: raw orders
order_source = (
KafkaSource.builder()
.set_bootstrap_servers("kafka:9092")
.set_topics("raw.orders")
.set_group_id("flink-alert-processor")
.set_value_only_deserializer(JsonRowDeserializationSchema())
.build()
)

orders = env.from_source(
order_source,
WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(5)),
"OrderSource"
)

# Detect high-value orders for immediate agent review
high_value_alerts = (
orders
.filter(lambda o: o["amount_usd"] > 10_000)
.map(lambda o: {
"alert_type": "high_value_order",
"order_id": o["order_id"],
"user_id": o["user_id"],
"amount_usd": o["amount_usd"],
"triggered_at": datetime.utcnow().isoformat(),
})
)

# Write alerts to processed topic for agent consumption
alert_sink = (
KafkaSink.builder()
.set_bootstrap_servers("kafka:9092")
.set_record_serializer(JsonRowSerializationSchema(topic="processed.alerts"))
.build()
)

high_value_alerts.sink_to(alert_sink)
env.execute("OrderAlertProcessor")

Windowed Aggregation for Agent Features

Beyond simple filtering, Flink computes windowed aggregates that give agents time-aware context:

from pyflink.datastream.window import TumblingEventTimeWindows
from pyflink.common import Time

# 5-minute tumbling window: orders per user for rate-limiting / fraud detection
order_rate = (
orders
.key_by(lambda o: o["user_id"])
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(
CountAndSumAggregateFunction(), # count + sum per window
WindowResultFunction()
)
.map(lambda w: {
"user_id": w.user_id,
"order_count_5m": w.count,
"order_value_5m": w.sum,
"window_end": w.window_end,
})
)

This feature stream gets written to Redis as a real-time feature store. When an agent needs context on a user — "has this user placed an unusual number of orders in the last 5 minutes?" — it reads from Redis in microseconds rather than querying the warehouse.

Agent Integration: Consuming the Alert Stream

from confluent_kafka import Consumer
import anthropic

consumer = Consumer({
"bootstrap.servers": "kafka:9092",
"group.id": "fraud-detection-agent",
"auto.offset.reset": "latest",
})
consumer.subscribe(["processed.alerts"])

client = anthropic.Anthropic()

def process_alert(alert: dict) -> None:
# Enrich with Redis feature store
user_features = redis_client.get(f"user_features:{alert['user_id']}")

response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=512,
messages=[{
"role": "user",
"content": f"""Evaluate this transaction for fraud risk.

Transaction: {alert}
User features (last 5 min): {user_features}

Respond with JSON:
{{"risk_level": "LOW|MEDIUM|HIGH", "reason": "...", "action": "approve|review|block"}}"""
}]
)

decision = json.loads(response.content[0].text)

if decision["action"] == "block":
block_transaction(alert["order_id"])
notify_fraud_team(alert, decision)
elif decision["action"] == "review":
queue_for_manual_review(alert, decision)

# Consume loop
while True:
msg = consumer.poll(timeout=1.0)
if msg and not msg.error():
alert = json.loads(msg.value())
process_alert(alert)

End-to-end latency from event to agent decision: typically 200–800ms. Enough to intervene on a transaction before it settles.

Monitoring the Streaming Pipeline

Key metrics to track:

  • Consumer lag — if the agent consumer falls behind the produced events, it's a bottleneck
  • End-to-end latency — time from event creation to agent decision (instrument with event timestamps)
  • Throughput — messages per second per partition
  • Flink checkpoint duration — if checkpoints take longer than the checkpoint interval, the job is overloaded
Book a strategy session to design your real-time data pipeline.