Structured Data (Databases, APIs)

Structured Data (Databases, APIs)

Integrate structured data from databases and APIs into your RAG system.

Structured Data (Databases, APIs)

Structured data from databases and APIs provides precise, queryable information for RAG.

JSON Processing

import json

def process_json(json_data):
    """Convert JSON to searchable text"""
    if isinstance(json_data, str):
        data = json.loads(json_data)
    else:
        data = json_data
    
    # Flatten nested structures
    flattened = flatten_json(data)
    
    # Convert to natural language
    text = json_to_text(flattened)
    
    return {
        'content': text,
        'structured_data': data
    }

def flatten_json(nested_json, parent_key='', sep='_'):
    items = []
    for k, v in nested_json.items():
        new_key = f"{parent_key}{sep}{k}" if parent_key else k
        if isinstance(v, dict):
            items.extend(flatten_json(v, new_key, sep=sep).items())
        else:
            items.append((new_key, v))
    return dict(items)

Database Integration

import psycopg2

def index_database_table(table_name):
    conn = psycopg2.connect("dbname=mydb user=user")
    cursor = conn.execute(f"SELECT * FROM {table_name}")
    
    for row in cursor.fetchall():
        # Convert row to text
        row_text = format_db_row(row, cursor.description)
        
        # Embed and store
        embedding = embed(row_text)
        store(
            embedding=embedding,
            content=row_text,
            metadata={
                'source': 'database',
                'table': table_name,
                'id': row[0]  # Assuming first column is ID
            }
        )

API Data Ingestion

import requests

def ingest_from_api(api_url):
    response = requests.get(api_url)
    data = response.json()
    
    for item in data['results']:
        # Process each API result
        text = json_to_text(item)
        embedding = embed(text)
        
        store(
            embedding=embedding,
            content=text,
            metadata={
                'source': 'api',
                'endpoint': api_url,
                'id': item.get('id')
            }
        )

Text-to-SQL with RAG

def answer_db_question(question):
    # Generate SQL from natural language
    sql_query = text_to_sql(question)
    
    # Execute query
    results = execute_sql(sql_query)
    
    # Format results for LLM
    results_text = format_query_results(results)
    
    prompt = f"""
Question: {question}
SQL Query: {sql_query}
Results:
{results_text}

Provide a natural language answer.
    """
    
    return claude.generate(prompt)

Schema-Aware Processing

def process_with_schema(data, schema):
    """Use schema to improve embeddings"""
    
    enriched_text = []
    for field, value in data.items():
        field_info = schema.get(field, {})
        field_type = field_info.get('type', 'unknown')
        description = field_info.get('description', '')
        
        enriched_text.append(
            f"{field} ({field_type}): {value}. {description}"
        )
    
    return " | ".join(enriched_text)

Hybrid RAG + SQL

graph LR
    A[User Query] --> B{Query Type?}
    B -->|Factual| C[Vector Search]
    B -->|Analytical| D[Text-to-SQL]
    
    C --> E[RAG Response]
    D --> F[SQL Results]
    
    E & F --> G[LLM Synthesis]
    G --> H[Final Answer]

Best Practices

  • Schema documentation: Index database schemas
  • Metadata: Store table/field names as metadata
  • Hybrid approach: Combine RAG with direct SQL queries
  • Caching: Cache frequently accessed data
  • Freshness: Re-index on data updates

Module 4 complete! Next: Data ingestion pipelines.

Subscribe to our newsletter

Get the latest posts delivered right to your inbox.

Subscribe on LinkedIn