Audio (Speech, Meetings, Interviews)

Audio (Speech, Meetings, Interviews)

Transcribe and process audio content for searchable RAG systems.

Audio (Speech, Meetings, Interviews)

Audio contains rich information. Transcription makes it searchable.

Audio Processing Pipeline

graph LR
    A[Audio File] --> B[Transcription]
    B --> C[Timestamp Alignment]
    C --> D[Speaker Diarization]
    D --> E[Text Processing]
    E --> F[Indexing]

Transcription with Whisper

import whisper

model = whisper.load_model("large-v3")

def transcribe_audio(audio_path):
    result = model.transcribe(
        audio_path,
        language="en",
        task="transcribe",
        verbose=False
    )
    
    return {
        'text': result['text'],
        'segments': result['segments'],  # With timestamps
        'language': result['language']
    }

Timestamped Segments

def process_with_timestamps(audio_path):
    result = whisper.transcribe(audio_path)
    
    segments = []
    for segment in result['segments']:
        segments.append({
            'start': segment['start'],
            'end': segment['end'],
            'text': segment['text'],
            'speaker': None  # Add with diarization
        })
    
    return segments

Speaker Diarization

from pyannote.audio import Pipeline

pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization")

def identify_speakers(audio_path):
    diarization = pipeline(audio_path)
    
    speakers = []
    for turn, _, speaker in diarization.itertracks(yield_label=True):
        speakers.append({
            'start': turn.start,
            'end': turn.end,
            'speaker': speaker
        })
    
    return speakers

Combined Transcription + Diarization

def transcribe_meeting(audio_path):
    # Transcribe
    transcript = transcribe_audio(audio_path)
    
    # Identify speakers
    speakers = identify_speakers(audio_path)
    
    # Align
    for segment in transcript['segments']:
        # Find speaker for this timestamp
        speaker = find_speaker_at_time(segment['start'], speakers)
        segment['speaker'] = speaker
    
    return transcript

Chunking for RAG

def chunk_transcript(transcript, chunk_duration=30):
    """Split transcript into 30-second chunks"""
    chunks = []
    current_chunk = {'text': '', 'start': 0, 'speakers': set()}
    
    for segment in transcript['segments']:
        if segment['end'] - current_chunk['start'] > chunk_duration:
            chunks.append(current_chunk)
            current_chunk = {'text': '', 'start': segment['start'], 'speakers': set()}
        
        current_chunk['text'] += ' ' + segment['text']
        current_chunk['speakers'].add(segment.get('speaker', 'Unknown'))
    
    return chunks

Indexing Strategy

def index_audio(audio_path):
    transcript = transcribe_meeting(audio_path)
    chunks = chunk_transcript(transcript)
    
    for chunk in chunks:
        embedding = embed(chunk['text'])
        
        store(
            embedding=embedding,
            content=chunk['text'],
            metadata={
                'source': audio_path,
                'start_time': chunk['start'],
                'speakers': list(chunk['speakers']),
                'type': 'audio_transcript'
            }
        )

Query Results

{
    "answer": "The team decided to postpone the launch to Q2.",
    "source": {
        "file": "team_meeting_2025-01-05.mp3",
        "timestamp": "15:32 - 16:05",
        "speaker": "Alice",
        "transcript": "After reviewing the feedback, I think we should postpone the launch to Q2 to address the performance issues."
    }
}

Next: Video processing.

Subscribe to our newsletter

Get the latest posts delivered right to your inbox.

Subscribe on LinkedIn