
Structured Data (Databases, APIs)
Integrate structured data from databases and APIs into your RAG system.
Structured Data (Databases, APIs)
Structured data from databases and APIs provides precise, queryable information for RAG.
JSON Processing
import json
def process_json(json_data):
"""Convert JSON to searchable text"""
if isinstance(json_data, str):
data = json.loads(json_data)
else:
data = json_data
# Flatten nested structures
flattened = flatten_json(data)
# Convert to natural language
text = json_to_text(flattened)
return {
'content': text,
'structured_data': data
}
def flatten_json(nested_json, parent_key='', sep='_'):
items = []
for k, v in nested_json.items():
new_key = f"{parent_key}{sep}{k}" if parent_key else k
if isinstance(v, dict):
items.extend(flatten_json(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)
Database Integration
import psycopg2
def index_database_table(table_name):
conn = psycopg2.connect("dbname=mydb user=user")
cursor = conn.execute(f"SELECT * FROM {table_name}")
for row in cursor.fetchall():
# Convert row to text
row_text = format_db_row(row, cursor.description)
# Embed and store
embedding = embed(row_text)
store(
embedding=embedding,
content=row_text,
metadata={
'source': 'database',
'table': table_name,
'id': row[0] # Assuming first column is ID
}
)
API Data Ingestion
import requests
def ingest_from_api(api_url):
response = requests.get(api_url)
data = response.json()
for item in data['results']:
# Process each API result
text = json_to_text(item)
embedding = embed(text)
store(
embedding=embedding,
content=text,
metadata={
'source': 'api',
'endpoint': api_url,
'id': item.get('id')
}
)
Text-to-SQL with RAG
def answer_db_question(question):
# Generate SQL from natural language
sql_query = text_to_sql(question)
# Execute query
results = execute_sql(sql_query)
# Format results for LLM
results_text = format_query_results(results)
prompt = f"""
Question: {question}
SQL Query: {sql_query}
Results:
{results_text}
Provide a natural language answer.
"""
return claude.generate(prompt)
Schema-Aware Processing
def process_with_schema(data, schema):
"""Use schema to improve embeddings"""
enriched_text = []
for field, value in data.items():
field_info = schema.get(field, {})
field_type = field_info.get('type', 'unknown')
description = field_info.get('description', '')
enriched_text.append(
f"{field} ({field_type}): {value}. {description}"
)
return " | ".join(enriched_text)
Hybrid RAG + SQL
graph LR
A[User Query] --> B{Query Type?}
B -->|Factual| C[Vector Search]
B -->|Analytical| D[Text-to-SQL]
C --> E[RAG Response]
D --> F[SQL Results]
E & F --> G[LLM Synthesis]
G --> H[Final Answer]
Best Practices
- Schema documentation: Index database schemas
- Metadata: Store table/field names as metadata
- Hybrid approach: Combine RAG with direct SQL queries
- Caching: Cache frequently accessed data
- Freshness: Re-index on data updates
Module 4 complete! Next: Data ingestion pipelines.