Skip to main content

Building a RAG Pipeline on Your Existing Data Warehouse

· 6 min read
Metadata Morph
AI & Data Engineering Team

The most common failure mode in enterprise AI projects is asking an LLM questions about your business data and getting confidently wrong answers. The model doesn't know your revenue figures, your customer data, or your internal processes — it only knows what it was trained on.

Retrieval-Augmented Generation (RAG) fixes this by giving the model the relevant context it needs at query time, retrieved from your actual data. The surprising part: you probably don't need a new data infrastructure to do it. Your existing warehouse already has the data — you just need the retrieval layer on top.

What RAG Actually Does

RAG is a three-step process:

  1. Retrieve — given a question, find the most relevant data from your knowledge base
  2. Augment — add that retrieved data to the prompt as context
  3. Generate — the LLM answers using both its general knowledge and the provided context

Without RAG:

User: What was our gross margin last quarter? LLM: I don't have access to your financial data. [or worse: makes up a number]

With RAG:

User: What was our gross margin last quarter? RAG pipeline: Retrieves: Q3 2025 gross margin = 62.3%, up from 58.1% in Q2 LLM: Your gross margin in Q3 2025 was 62.3%, an improvement of 4.2 percentage points from Q2 2025's 58.1%.

Two RAG Patterns for Warehouse Data

Pattern 1: SQL RAG (Structured Data)

For structured, tabular data in your warehouse, skip embeddings entirely. Use the LLM to convert natural language questions into SQL, execute against your warehouse, then answer based on the results.

User question


LLM converts to SQL (using dbt manifest as schema context)


Execute SQL against warehouse


LLM answers using query results as context

This is faster, cheaper, and more accurate than vector search for structured data. The key ingredient is providing the LLM with accurate schema context — column names, descriptions, relationships — so it generates correct SQL. Your dbt manifest is perfect for this.

Pattern 2: Vector RAG (Unstructured Data)

For documents, support tickets, emails, free-text fields — anything that doesn't fit neatly into a SQL query — vector search is the right approach.

Documents → Chunk → Embed → Store in vector DB

User question → Embed → Similarity search

Top-K chunks

LLM generates answer

Architecture: Warehouse-Native RAG

┌──────────────────────────────────────────────────────────────┐
│ RAG PIPELINE │
│ │
│ ┌────────────────┐ ┌──────────────────────────────┐ │
│ │ SQL RAG Agent │ │ Vector RAG Agent │ │
│ │ │ │ │ │
│ │ 1. Parse query │ │ 1. Embed question │ │
│ │ 2. Get schema │ │ 2. Similarity search │ │
│ │ from dbt │ │ 3. Retrieve top-K chunks │ │
│ │ 3. Generate SQL│ │ 4. Generate answer │ │
│ │ 4. Execute │ │ │ │
│ │ 5. Answer │ │ │ │
│ └───────┬────────┘ └──────────────┬───────────────┘ │
└──────────┼──────────────────────────────┼────────────────────┘
│ │
┌──────▼──────┐ ┌───────▼────────┐
│ Warehouse │ │ Vector Store │
│ (Snowflake/ │ │ (pgvector / │
│ BigQuery/ │ │ Pinecone / │
│ Redshift) │ │ Weaviate) │
└─────────────┘ └────────────────┘

Implementation: SQL RAG with dbt Schema Context

import anthropic
import json

def load_dbt_schema_context(manifest_path: str, catalog_path: str) -> str:
"""Extract table and column descriptions from dbt manifest for LLM context."""
with open(manifest_path) as f:
manifest = json.load(f)
with open(catalog_path) as f:
catalog = json.load(f)

schema_context = []
for node_id, node in manifest["nodes"].items():
if node["resource_type"] != "model":
continue
table_name = node["name"]
description = node.get("description", "No description")
columns = {
col: meta.get("description", "")
for col, meta in node.get("columns", {}).items()
}
schema_context.append(
f"Table: {table_name}\nDescription: {description}\n"
f"Columns: {json.dumps(columns, indent=2)}"
)

return "\n\n---\n\n".join(schema_context)


def sql_rag_query(question: str, warehouse_conn, dbt_manifest: str, dbt_catalog: str) -> str:
client = anthropic.Anthropic()
schema_context = load_dbt_schema_context(dbt_manifest, dbt_catalog)

# Step 1: Generate SQL
sql_response = client.messages.create(
model="claude-opus-4-6",
max_tokens=1024,
messages=[{
"role": "user",
"content": f"""You have access to a data warehouse with the following schema:

{schema_context}

Generate a single SQL query to answer this question: {question}

Rules:
- Return ONLY the SQL query, no explanation
- Use exact table and column names from the schema
- If the question cannot be answered with the available schema, return: CANNOT_ANSWER
"""
}]
)

sql = sql_response.content[0].text.strip()
if sql == "CANNOT_ANSWER":
return "This question cannot be answered with the available data."

# Step 2: Execute SQL
results = warehouse_conn.execute(sql).fetchdf()

# Step 3: Generate natural language answer
answer_response = client.messages.create(
model="claude-opus-4-6",
max_tokens=512,
messages=[{
"role": "user",
"content": f"""Question: {question}

SQL executed: {sql}

Query results:
{results.to_markdown()}

Answer the question in 1-3 sentences using the query results. Be specific with numbers.
Do not explain the SQL — just answer the question."""
}]
)

return answer_response.content[0].text

Implementation: Vector RAG for Unstructured Data

import anthropic
from pgvector.psycopg2 import register_vector
import psycopg2
import numpy as np

def embed_text(text: str, client: anthropic.Anthropic) -> list[float]:
"""Generate embeddings using Claude's embedding model."""
response = client.embeddings.create(
model="voyage-3",
input=text,
)
return response.embeddings[0]


def ingest_documents(documents: list[dict], pg_conn, client: anthropic.Anthropic):
"""Chunk, embed, and store documents in pgvector."""
register_vector(pg_conn)
cur = pg_conn.cursor()

for doc in documents:
# Simple fixed-size chunking — use semantic chunking in production
chunks = chunk_text(doc["content"], chunk_size=500, overlap=50)

for i, chunk in enumerate(chunks):
embedding = embed_text(chunk, client)
cur.execute(
"INSERT INTO document_chunks (doc_id, chunk_index, content, embedding) VALUES (%s, %s, %s, %s)",
(doc["id"], i, chunk, np.array(embedding))
)

pg_conn.commit()


def vector_rag_query(question: str, pg_conn, client: anthropic.Anthropic, top_k: int = 5) -> str:
"""Retrieve relevant chunks and generate an answer."""
register_vector(pg_conn)
question_embedding = embed_text(question, client)

# Similarity search
cur = pg_conn.cursor()
cur.execute(
"""
SELECT content, 1 - (embedding <=> %s) AS similarity
FROM document_chunks
ORDER BY embedding <=> %s
LIMIT %s
""",
(np.array(question_embedding), np.array(question_embedding), top_k)
)
chunks = cur.fetchall()

context = "\n\n---\n\n".join([chunk[0] for chunk in chunks])

response = client.messages.create(
model="claude-opus-4-6",
max_tokens=1024,
messages=[{
"role": "user",
"content": f"""Answer the following question using ONLY the provided context.
If the context doesn't contain enough information, say so explicitly.

Context:
{context}

Question: {question}"""
}]
)

return response.content[0].text

Combining Both Patterns

A production RAG system routes questions to the right retrieval pattern:

def hybrid_rag(question: str, warehouse_conn, pg_conn, client) -> str:
# Classify question type
classification = client.messages.create(
model="claude-haiku-4-5-20251001", # cheap, fast classification
max_tokens=10,
messages=[{
"role": "user",
"content": f"""Is this question about structured metrics/numbers (SQL) or
unstructured text/documents (VECTOR)?
Return only: SQL or VECTOR

Question: {question}"""
}]
).content[0].text.strip()

if classification == "SQL":
return sql_rag_query(question, warehouse_conn, DBT_MANIFEST, DBT_CATALOG)
else:
return vector_rag_query(question, pg_conn, client)

Using a cheap, fast model (Haiku) for routing and reserving the more capable model (Sonnet/Opus) for answer generation cuts cost by 60–80% on classification-heavy workloads.

What This Enables

Once your RAG pipeline is live, your AI agents can answer questions grounded in your actual data — no hallucinations, no stale training data, no "I don't have access to that information." The warehouse you already have becomes the knowledge base your agents reason from.

Book a strategy session to build your RAG pipeline.