
Audio (Speech, Meetings, Interviews)
Transcribe and process audio content for searchable RAG systems.
Audio (Speech, Meetings, Interviews)
Audio contains rich information. Transcription makes it searchable.
Audio Processing Pipeline
graph LR
A[Audio File] --> B[Transcription]
B --> C[Timestamp Alignment]
C --> D[Speaker Diarization]
D --> E[Text Processing]
E --> F[Indexing]
Transcription with Whisper
import whisper
model = whisper.load_model("large-v3")
def transcribe_audio(audio_path):
result = model.transcribe(
audio_path,
language="en",
task="transcribe",
verbose=False
)
return {
'text': result['text'],
'segments': result['segments'], # With timestamps
'language': result['language']
}
Timestamped Segments
def process_with_timestamps(audio_path):
result = whisper.transcribe(audio_path)
segments = []
for segment in result['segments']:
segments.append({
'start': segment['start'],
'end': segment['end'],
'text': segment['text'],
'speaker': None # Add with diarization
})
return segments
Speaker Diarization
from pyannote.audio import Pipeline
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization")
def identify_speakers(audio_path):
diarization = pipeline(audio_path)
speakers = []
for turn, _, speaker in diarization.itertracks(yield_label=True):
speakers.append({
'start': turn.start,
'end': turn.end,
'speaker': speaker
})
return speakers
Combined Transcription + Diarization
def transcribe_meeting(audio_path):
# Transcribe
transcript = transcribe_audio(audio_path)
# Identify speakers
speakers = identify_speakers(audio_path)
# Align
for segment in transcript['segments']:
# Find speaker for this timestamp
speaker = find_speaker_at_time(segment['start'], speakers)
segment['speaker'] = speaker
return transcript
Chunking for RAG
def chunk_transcript(transcript, chunk_duration=30):
"""Split transcript into 30-second chunks"""
chunks = []
current_chunk = {'text': '', 'start': 0, 'speakers': set()}
for segment in transcript['segments']:
if segment['end'] - current_chunk['start'] > chunk_duration:
chunks.append(current_chunk)
current_chunk = {'text': '', 'start': segment['start'], 'speakers': set()}
current_chunk['text'] += ' ' + segment['text']
current_chunk['speakers'].add(segment.get('speaker', 'Unknown'))
return chunks
Indexing Strategy
def index_audio(audio_path):
transcript = transcribe_meeting(audio_path)
chunks = chunk_transcript(transcript)
for chunk in chunks:
embedding = embed(chunk['text'])
store(
embedding=embedding,
content=chunk['text'],
metadata={
'source': audio_path,
'start_time': chunk['start'],
'speakers': list(chunk['speakers']),
'type': 'audio_transcript'
}
)
Query Results
{
"answer": "The team decided to postpone the launch to Q2.",
"source": {
"file": "team_meeting_2025-01-05.mp3",
"timestamp": "15:32 - 16:05",
"speaker": "Alice",
"transcript": "After reviewing the feedback, I think we should postpone the launch to Q2 to address the performance issues."
}
}
Next: Video processing.