Real-Time Agent Context with Kafka: Sub-Second Data Freshness for AI Pipelines
Batch pipelines are sufficient for most analytical workloads. They're not sufficient for AI agents making time-sensitive decisions. An anomaly detection agent that works on yesterday's data misses the incident happening right now. A customer churn agent fed weekly snapshots can't act on a user who disengaged three hours ago.
Real-time streaming closes this gap. With Kafka as the event backbone and Flink for stream processing, your agents can operate on data that is seconds old rather than hours or days.
The Freshness Problem in Agentic AI
Every AI agent has a knowledge cutoff — not just the LLM's training cutoff, but the data freshness cutoff of the context it receives. A daily batch pipeline means your agent's context is up to 24 hours stale. For operational use cases — fraud detection, infrastructure alerts, live customer support — that's unusable.
Streaming doesn't just make data fresher. It changes what your agents can do:
| Batch (daily) | Streaming (seconds) |
|---|---|
| Detect fraud after the fact | Block fraudulent transaction in flight |
| Alert on yesterday's outage | Page on-call during the outage |
| Weekly churn report | Real-time intervention when user disengages |
| Report on completed orders | Intervene on abandoned cart now |
Architecture
Source Systems (DB, APIs, apps)
│
▼ (CDC / event publishing)
┌─────────────────────────────────────────┐
│ KAFKA CLUSTER │
│ │
│ Topics: │
│ • raw.orders (all events) │
│ • raw.user_events (clickstream) │
│ • raw.payments (transactions) │
│ • processed.alerts (agent inputs) │
└──────────┬──────────────────────────────┘
│
┌──────▼──────┐
│ FLINK │ ← stream processing: filter, join, aggregate
│ JOBS │
└──────┬──────┘
│
┌──────▼──────────────────────────────┐
│ AGENT CONTEXT LAYER │
│ │
│ • Feature store (Redis) │
│ • Materialized views (warehouse) │
│ • Alert topic (Kafka) │
└──────┬──────────────────────────────┘
│
┌──────▼──────┐
│ AI AGENTS │ ← consume fresh context, act
└─────────────┘
Setting Up Kafka Topics
Topic design determines throughput and ordering guarantees. Each domain gets its own topic; partition count controls parallelism.
from confluent_kafka.admin import AdminClient, NewTopic
admin = AdminClient({"bootstrap.servers": "kafka:9092"})
topics = [
NewTopic("raw.orders", num_partitions=12, replication_factor=3),
NewTopic("raw.user_events", num_partitions=24, replication_factor=3),
NewTopic("raw.payments", num_partitions=12, replication_factor=3),
NewTopic("processed.alerts", num_partitions=6, replication_factor=3),
]
admin.create_topics(topics)
Partition count rules:
- More partitions = more parallelism but more overhead
- Target 1–2 partitions per consumer thread in your Flink job
- Use a meaningful partition key (e.g.,
user_id,order_id) so related events land on the same partition and maintain ordering
Producing Events
import msgspec
from confluent_kafka import Producer
from datetime import datetime
class OrderEvent(msgspec.Struct):
order_id: str
user_id: str
amount_usd: float
status: str # "placed", "paid", "shipped", "cancelled"
timestamp: datetime
encoder = msgspec.json.Encoder()
producer = Producer({"bootstrap.servers": "kafka:9092"})
def publish_order_event(event: OrderEvent):
producer.produce(
topic="raw.orders",
key=event.user_id.encode(), # partition by user for ordering
value=encoder.encode(event),
callback=lambda err, msg: print(f"Delivered: {msg.topic()}" if not err else f"Error: {err}")
)
producer.poll(0) # trigger delivery callbacks without blocking
Using msgspec for serialization (introduced in our earlier post) keeps encoding overhead negligible even at 100K+ events/second.
Flink Stream Processing Job
Flink processes the raw event stream — windowing, joining, filtering — and writes results to the agent context layer.
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaSink
from pyflink.common import WatermarkStrategy, Duration
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
# Source: raw orders
order_source = (
KafkaSource.builder()
.set_bootstrap_servers("kafka:9092")
.set_topics("raw.orders")
.set_group_id("flink-alert-processor")
.set_value_only_deserializer(JsonRowDeserializationSchema())
.build()
)
orders = env.from_source(
order_source,
WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(5)),
"OrderSource"
)
# Detect high-value orders for immediate agent review
high_value_alerts = (
orders
.filter(lambda o: o["amount_usd"] > 10_000)
.map(lambda o: {
"alert_type": "high_value_order",
"order_id": o["order_id"],
"user_id": o["user_id"],
"amount_usd": o["amount_usd"],
"triggered_at": datetime.utcnow().isoformat(),
})
)
# Write alerts to processed topic for agent consumption
alert_sink = (
KafkaSink.builder()
.set_bootstrap_servers("kafka:9092")
.set_record_serializer(JsonRowSerializationSchema(topic="processed.alerts"))
.build()
)
high_value_alerts.sink_to(alert_sink)
env.execute("OrderAlertProcessor")
Windowed Aggregation for Agent Features
Beyond simple filtering, Flink computes windowed aggregates that give agents time-aware context:
from pyflink.datastream.window import TumblingEventTimeWindows
from pyflink.common import Time
# 5-minute tumbling window: orders per user for rate-limiting / fraud detection
order_rate = (
orders
.key_by(lambda o: o["user_id"])
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(
CountAndSumAggregateFunction(), # count + sum per window
WindowResultFunction()
)
.map(lambda w: {
"user_id": w.user_id,
"order_count_5m": w.count,
"order_value_5m": w.sum,
"window_end": w.window_end,
})
)
This feature stream gets written to Redis as a real-time feature store. When an agent needs context on a user — "has this user placed an unusual number of orders in the last 5 minutes?" — it reads from Redis in microseconds rather than querying the warehouse.
Agent Integration: Consuming the Alert Stream
from confluent_kafka import Consumer
import anthropic
consumer = Consumer({
"bootstrap.servers": "kafka:9092",
"group.id": "fraud-detection-agent",
"auto.offset.reset": "latest",
})
consumer.subscribe(["processed.alerts"])
client = anthropic.Anthropic()
def process_alert(alert: dict) -> None:
# Enrich with Redis feature store
user_features = redis_client.get(f"user_features:{alert['user_id']}")
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=512,
messages=[{
"role": "user",
"content": f"""Evaluate this transaction for fraud risk.
Transaction: {alert}
User features (last 5 min): {user_features}
Respond with JSON:
{{"risk_level": "LOW|MEDIUM|HIGH", "reason": "...", "action": "approve|review|block"}}"""
}]
)
decision = json.loads(response.content[0].text)
if decision["action"] == "block":
block_transaction(alert["order_id"])
notify_fraud_team(alert, decision)
elif decision["action"] == "review":
queue_for_manual_review(alert, decision)
# Consume loop
while True:
msg = consumer.poll(timeout=1.0)
if msg and not msg.error():
alert = json.loads(msg.value())
process_alert(alert)
End-to-end latency from event to agent decision: typically 200–800ms. Enough to intervene on a transaction before it settles.
Monitoring the Streaming Pipeline
Key metrics to track:
- Consumer lag — if the agent consumer falls behind the produced events, it's a bottleneck
- End-to-end latency — time from event creation to agent decision (instrument with event timestamps)
- Throughput — messages per second per partition
- Flink checkpoint duration — if checkpoints take longer than the checkpoint interval, the job is overloaded