High-Speed Network Security Log Analysis with msgspec and AI Agents
Security logs are among the highest-volume, most time-sensitive data in any organization. A single mid-sized network generates millions of log events per hour — firewall denies, DNS queries, authentication events, lateral movement signals. Traditional SIEM tools drown in the volume. Manual analysis is impossible at scale.
This post shows how to combine msgspec for high-performance log parsing with an AI agent that correlates events, identifies threat patterns, and generates structured incident reports — without the overhead of a full SIEM platform.
Why msgspec for Security Logs
Security log pipelines have two hard constraints: they must be fast (logs arrive faster than they can be processed if parsing is slow) and they must be correct (a parsing error that silently drops a field means missed detections).
msgspec is a Python library for ultra-fast serialization and deserialization with strict schema validation. Benchmarks consistently show msgspec outperforming alternatives like Pydantic, attrs, and dataclasses by 10–50x for both JSON parsing and validation.
For security logs — which are JSON-heavy and arrive in massive bursts — this matters:
| Library | Parse 1M log events | Memory (1M events) |
|---|---|---|
| json + dict | ~8.2s | ~1.4 GB |
| Pydantic v2 | ~3.1s | ~890 MB |
| msgspec | ~0.9s | ~310 MB |
At 1M events, msgspec is 9x faster than raw json+dict and 3x faster than Pydantic v2, with 4x lower memory usage. At security-log scale, this is the difference between keeping up and falling behind.
Defining the Log Schema
msgspec uses Python dataclasses-style Struct definitions. The schema is the contract — any field missing or mistyped raises a ValidationError immediately, which gets routed to a dead-letter queue rather than silently corrupting downstream analysis.
import msgspec
from typing import Optional
from datetime import datetime
class NetworkEvent(msgspec.Struct, frozen=True):
timestamp: datetime
event_type: str # "firewall_deny", "auth_failure", "dns_query", etc.
source_ip: str
destination_ip: str
destination_port: int
protocol: str # "TCP", "UDP", "ICMP"
action: str # "ALLOW", "DENY", "DROP"
bytes_transferred: Optional[int] = None
user_id: Optional[str] = None
hostname: Optional[str] = None
country_code: Optional[str] = None
threat_score: Optional[float] = None
class AuthEvent(msgspec.Struct, frozen=True):
timestamp: datetime
event_type: str # "login_success", "login_failure", "mfa_bypass"
user_id: str
source_ip: str
user_agent: Optional[str] = None
location: Optional[str] = None
risk_score: Optional[float] = None
# Decoder reuse — instantiate once, use for every event
network_decoder = msgspec.json.Decoder(NetworkEvent)
auth_decoder = msgspec.json.Decoder(AuthEvent)
The frozen=True flag makes structs immutable and hashable — important for deduplication and set-based correlation downstream.
The Ingestion Pipeline
import msgspec
from collections import defaultdict
from datetime import datetime, timedelta
def parse_log_batch(raw_lines: list[bytes]) -> tuple[list[NetworkEvent], list[bytes]]:
"""
Parse a batch of raw JSON log lines.
Returns (valid_events, dead_letter_lines).
Dead-letter lines are unparseable or schema-invalid — never silently dropped.
"""
valid = []
dead_letter = []
for line in raw_lines:
try:
# Route to correct decoder based on event_type prefix
# Avoids full parse just to check type
if b'"firewall"' in line or b'"dns"' in line:
event = network_decoder.decode(line)
elif b'"auth"' in line:
event = auth_decoder.decode(line)
else:
dead_letter.append(line)
continue
valid.append(event)
except msgspec.ValidationError as e:
dead_letter.append(line) # log and route, never drop
return valid, dead_letter
def detect_brute_force(events: list[AuthEvent], window_minutes: int = 5) -> list[dict]:
"""
Detect brute force: >10 auth failures from the same IP within a rolling window.
"""
failures_by_ip: dict[str, list[datetime]] = defaultdict(list)
alerts = []
for event in events:
if event.event_type == "login_failure":
failures_by_ip[event.source_ip].append(event.timestamp)
cutoff = datetime.utcnow() - timedelta(minutes=window_minutes)
for ip, timestamps in failures_by_ip.items():
recent = [t for t in timestamps if t >= cutoff]
if len(recent) > 10:
alerts.append({
"type": "brute_force",
"source_ip": ip,
"failure_count": len(recent),
"window_minutes": window_minutes,
"severity": "HIGH",
})
return alerts
def detect_port_scan(events: list[NetworkEvent], threshold: int = 20) -> list[dict]:
"""
Detect port scanning: single source IP hitting >20 unique destination ports
on denied connections within 60 seconds.
"""
denied_by_source: dict[str, set[int]] = defaultdict(set)
alerts = []
for event in events:
if event.action == "DENY":
denied_by_source[event.source_ip].add(event.destination_port)
for ip, ports in denied_by_source.items():
if len(ports) >= threshold:
alerts.append({
"type": "port_scan",
"source_ip": ip,
"unique_ports_hit": len(ports),
"severity": "MEDIUM",
})
return alerts
Feeding Alerts to the AI Agent
Fast parsing and rule-based detection handle the high-volume, low-ambiguity cases: brute force, port scans, known-bad IPs. The AI agent handles the harder cases — correlated multi-stage attacks, anomalous behavior that doesn't match a single rule, and incident narrative generation.
{
"mcpServers": {
"warehouse": {
"command": "uvx",
"args": ["mcp-server-postgres"],
"env": {
"POSTGRES_CONNECTION_STRING": "${SECURITY_DB_URL}"
}
},
"filesystem": {
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-filesystem", "/data/security-reports"]
},
"notifications": {
"command": "uvx",
"args": ["mcp-server-slack"],
"env": {
"SLACK_BOT_TOKEN": "${SLACK_TOKEN}"
}
}
}
}
The agent receives the structured alert objects (not raw logs) and correlates them against the event history in the security database:
You are a security analyst agent. You have received the following alerts
from the automated detection layer:
{alerts_json}
Using the warehouse MCP, investigate these alerts:
1. Query event history for the flagged source IPs over the past 24 hours
2. Check if multiple alerts involve the same source IP or user account
3. Look for lateral movement patterns: auth success after brute force,
followed by internal network scanning
4. Assess whether this looks like: reconnaissance, credential stuffing,
active intrusion, or a false positive
Generate a structured incident report with:
- Severity: CRITICAL / HIGH / MEDIUM / LOW
- Attack pattern (if identifiable)
- Timeline of events
- Affected systems and users
- Recommended immediate actions
- Whether to escalate to the on-call security team
Write the report to the filesystem MCP at /data/security-reports/{date}-{incident_id}.md
If severity is CRITICAL or HIGH, also post the summary to #security-alerts in Slack.
What This Replaces
| Before | After |
|---|---|
| SIEM ingesting raw logs | msgspec pipeline: parse, validate, detect in <1s per batch |
| Analyst manually reviewing SIEM alerts | Agent correlates, investigates, writes incident report |
| Hours to identify multi-stage attack patterns | Agent correlates events across sources in seconds |
| Incident reports written from scratch | Structured report generated automatically, human reviews |
The human security team shifts from reviewing every alert to reviewing agent-generated incident reports. They spend their time on response and remediation — the work that requires judgment — rather than log archaeology.
Production Considerations
- Dead-letter queue: Every unparseable line is preserved and retried after schema updates — no silent data loss
- Schema versioning: Log formats change. Pin decoder versions per log source; use a registry to map source → schema version
- Backpressure: If the ingestion pipeline falls behind, msgspec's low memory footprint means you can buffer more in-process before spilling to disk
- Agent cost control: Only send correlated alert clusters to the LLM, not raw events. The rule-based layer filters 95%+ of volume before the agent sees it