Extract transcripts and metadata from YouTube videos
Extracts transcripts and metadata from YouTube videos for RAG ingestion. Use when you need to process YouTube content, triggered by requests to harvest videos, playlists, or channels with timestamp-aware chunking.
/plugin marketplace add mindmorass/reflex/plugin install reflex@mindmorass-reflexThis skill inherits all available tools. When active, it can use any tool Claude has access to.
Extract and ingest YouTube video transcripts into RAG with proper chunking and metadata.
YouTube is a rich source of tutorials, lectures, and explanations. This skill covers:
# Install yt-dlp for video metadata and subtitles
pip install yt-dlp
# Install youtube-transcript-api for cleaner transcript access
pip install youtube-transcript-api
# Optional: for audio transcription fallback
pip install openai-whisper
Best for clean transcript text with timestamps.
#!/usr/bin/env python3
"""Extract YouTube transcripts using youtube-transcript-api."""
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api.formatters import TextFormatter
import json
import re
from typing import Dict, List, Optional
from datetime import datetime
def extract_video_id(url: str) -> str:
"""Extract video ID from various YouTube URL formats."""
patterns = [
r'(?:v=|/v/|youtu\.be/)([a-zA-Z0-9_-]{11})',
r'(?:embed/)([a-zA-Z0-9_-]{11})',
]
for pattern in patterns:
match = re.search(pattern, url)
if match:
return match.group(1)
raise ValueError(f"Could not extract video ID from: {url}")
def get_transcript(video_id: str, languages: List[str] = ['en']) -> List[Dict]:
"""
Fetch transcript for a video.
Args:
video_id: YouTube video ID
languages: Preferred languages in order
Returns:
List of transcript segments with text, start, duration
"""
try:
# Try to get manual captions first
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
try:
transcript = transcript_list.find_manually_created_transcript(languages)
except:
# Fall back to auto-generated
transcript = transcript_list.find_generated_transcript(languages)
return transcript.fetch()
except Exception as e:
print(f"Error fetching transcript: {e}")
return []
def format_timestamp(seconds: float) -> str:
"""Convert seconds to HH:MM:SS format."""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
if hours > 0:
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
return f"{minutes:02d}:{secs:02d}"
def get_video_metadata(video_id: str) -> Dict:
"""Get video metadata using yt-dlp."""
import subprocess
import json
result = subprocess.run(
['yt-dlp', '--dump-json', '--no-download', f'https://youtube.com/watch?v={video_id}'],
capture_output=True,
text=True
)
if result.returncode == 0:
data = json.loads(result.stdout)
return {
'title': data.get('title'),
'channel': data.get('channel'),
'channel_id': data.get('channel_id'),
'upload_date': data.get('upload_date'),
'duration': data.get('duration'),
'view_count': data.get('view_count'),
'description': data.get('description', '')[:500], # Truncate
'tags': data.get('tags', [])[:10], # Limit tags
}
return {}
Better for batch processing and when API limits are hit.
#!/bin/bash
# Extract subtitles using yt-dlp
VIDEO_URL="$1"
OUTPUT_DIR="${2:-.}"
# Download auto-generated subtitles
yt-dlp \
--write-auto-sub \
--sub-lang en \
--sub-format vtt \
--skip-download \
--output "$OUTPUT_DIR/%(title)s.%(ext)s" \
"$VIDEO_URL"
# Convert VTT to plain text
for vtt in "$OUTPUT_DIR"/*.vtt; do
# Remove VTT formatting, keep just text
sed -e '/^WEBVTT/d' \
-e '/^Kind:/d' \
-e '/^Language:/d' \
-e '/^[0-9][0-9]:[0-9][0-9]/d' \
-e '/-->/d' \
-e 's/<[^>]*>//g' \
-e '/^$/d' \
"$vtt" > "${vtt%.vtt}.txt"
done
Split transcript into fixed time intervals.
def chunk_by_time(
transcript: List[Dict],
chunk_duration: int = 300 # 5 minutes
) -> List[Dict]:
"""
Chunk transcript by time intervals.
Args:
transcript: List of transcript segments
chunk_duration: Seconds per chunk
"""
chunks = []
current_chunk = {
'text': '',
'start': 0,
'end': 0,
'segments': []
}
for segment in transcript:
segment_start = segment['start']
# Check if we need to start a new chunk
if segment_start >= current_chunk['start'] + chunk_duration:
if current_chunk['text']:
chunks.append(current_chunk)
current_chunk = {
'text': '',
'start': segment_start,
'end': segment_start,
'segments': []
}
current_chunk['text'] += ' ' + segment['text']
current_chunk['end'] = segment['start'] + segment.get('duration', 0)
current_chunk['segments'].append(segment)
# Don't forget the last chunk
if current_chunk['text']:
chunks.append(current_chunk)
return chunks
Split when topic appears to change (silence gaps or topic markers).
def chunk_by_topic(
transcript: List[Dict],
gap_threshold: float = 5.0, # Seconds of silence indicating topic change
min_chunk_size: int = 100 # Minimum words per chunk
) -> List[Dict]:
"""
Chunk transcript by topic boundaries.
Uses gaps in speech and sentence boundaries to identify topic changes.
"""
chunks = []
current_chunk = {
'text': '',
'start': 0,
'end': 0,
'word_count': 0
}
prev_end = 0
for segment in transcript:
segment_start = segment['start']
gap = segment_start - prev_end
word_count = len(segment['text'].split())
# Check for topic boundary
is_boundary = (
gap > gap_threshold and
current_chunk['word_count'] >= min_chunk_size
)
if is_boundary:
if current_chunk['text']:
chunks.append(current_chunk)
current_chunk = {
'text': '',
'start': segment_start,
'end': segment_start,
'word_count': 0
}
current_chunk['text'] += ' ' + segment['text']
current_chunk['end'] = segment_start + segment.get('duration', 0)
current_chunk['word_count'] += word_count
prev_end = current_chunk['end']
if current_chunk['text']:
chunks.append(current_chunk)
return chunks
Use embeddings to find natural topic boundaries.
def chunk_by_semantics(
transcript: List[Dict],
similarity_threshold: float = 0.7,
window_size: int = 5
) -> List[Dict]:
"""
Chunk based on semantic similarity between segments.
Groups semantically similar consecutive segments together.
"""
from sentence_transformers import SentenceTransformer
import numpy as np
model = SentenceTransformer('all-MiniLM-L6-v2')
# Combine segments into windows for more stable embeddings
windows = []
for i in range(0, len(transcript), window_size):
window_text = ' '.join(
s['text'] for s in transcript[i:i+window_size]
)
windows.append({
'text': window_text,
'start': transcript[i]['start'],
'end': transcript[min(i+window_size-1, len(transcript)-1)]['start'],
'segments': transcript[i:i+window_size]
})
# Get embeddings
embeddings = model.encode([w['text'] for w in windows])
# Find boundaries where similarity drops
chunks = []
current_chunk = windows[0].copy() if windows else None
for i in range(1, len(windows)):
similarity = np.dot(embeddings[i-1], embeddings[i]) / (
np.linalg.norm(embeddings[i-1]) * np.linalg.norm(embeddings[i])
)
if similarity < similarity_threshold:
# Topic boundary detected
if current_chunk:
chunks.append(current_chunk)
current_chunk = windows[i].copy()
else:
# Continue current chunk
current_chunk['text'] += ' ' + windows[i]['text']
current_chunk['end'] = windows[i]['end']
current_chunk['segments'].extend(windows[i]['segments'])
if current_chunk:
chunks.append(current_chunk)
return chunks
#!/usr/bin/env python3
"""Complete YouTube harvesting pipeline."""
import json
from typing import List, Dict, Optional
from datetime import datetime
async def harvest_youtube_video(
url: str,
collection: str,
chunk_strategy: str = "time", # time, topic, semantic
chunk_size: int = 300
) -> Dict:
"""
Harvest a single YouTube video into RAG.
Args:
url: YouTube video URL
collection: Target RAG collection
chunk_strategy: How to chunk the transcript
chunk_size: Size parameter for chunking
Returns:
Harvest report
"""
video_id = extract_video_id(url)
# Get metadata
metadata = get_video_metadata(video_id)
if not metadata:
return {"status": "error", "error": "Could not fetch metadata"}
# Get transcript
transcript = get_transcript(video_id)
if not transcript:
return {"status": "error", "error": "No transcript available"}
# Chunk based on strategy
if chunk_strategy == "time":
chunks = chunk_by_time(transcript, chunk_size)
elif chunk_strategy == "topic":
chunks = chunk_by_topic(transcript)
elif chunk_strategy == "semantic":
chunks = chunk_by_semantics(transcript)
else:
chunks = chunk_by_time(transcript, chunk_size)
# Ingest each chunk
ingested = 0
for i, chunk in enumerate(chunks):
chunk_metadata = {
"source_type": "youtube",
"source_url": url,
"video_id": video_id,
"title": metadata.get("title"),
"channel": metadata.get("channel"),
"upload_date": metadata.get("upload_date"),
"duration_seconds": metadata.get("duration"),
"timestamp_start": format_timestamp(chunk["start"]),
"timestamp_end": format_timestamp(chunk["end"]),
"chunk_index": i,
"total_chunks": len(chunks),
"harvested_at": datetime.now().isoformat()
}
await ingest(
content=chunk["text"].strip(),
collection=collection,
metadata=chunk_metadata,
doc_id=f"yt_{video_id}_chunk_{i}"
)
ingested += 1
return {
"status": "success",
"video_id": video_id,
"title": metadata.get("title"),
"chunks": ingested,
"collection": collection
}
async def harvest_youtube_playlist(
playlist_url: str,
collection: str,
**kwargs
) -> Dict:
"""Harvest all videos in a playlist."""
import subprocess
# Get playlist video IDs
result = subprocess.run(
['yt-dlp', '--flat-playlist', '--print', 'id', playlist_url],
capture_output=True,
text=True
)
video_ids = result.stdout.strip().split('
')
results = []
for video_id in video_ids:
url = f"https://youtube.com/watch?v={video_id}"
result = await harvest_youtube_video(url, collection, **kwargs)
results.append(result)
success = sum(1 for r in results if r.get("status") == "success")
return {
"status": "success",
"videos_processed": len(video_ids),
"videos_succeeded": success,
"videos_failed": len(video_ids) - success,
"details": results
}
async def harvest_youtube_channel(
channel_url: str,
collection: str,
max_videos: int = 50,
**kwargs
) -> Dict:
"""Harvest recent videos from a channel."""
import subprocess
# Get recent video IDs from channel
result = subprocess.run(
['yt-dlp', '--flat-playlist', '--print', 'id',
'--playlist-end', str(max_videos), channel_url],
capture_output=True,
text=True
)
video_ids = result.stdout.strip().split('
')
results = []
for video_id in video_ids:
if video_id:
url = f"https://youtube.com/watch?v={video_id}"
result = await harvest_youtube_video(url, collection, **kwargs)
results.append(result)
success = sum(1 for r in results if r.get("status") == "success")
return {
"status": "success",
"videos_processed": len(video_ids),
"videos_succeeded": success,
"collection": collection
}
# YouTube video chunk metadata
source_type: youtube
source_url: https://youtube.com/watch?v=...
video_id: dQw4w9WgXcQ
title: "Video Title"
channel: "Channel Name"
channel_id: UC...
upload_date: "20240101"
duration_seconds: 930
timestamp_start: "05:30"
timestamp_end: "10:00"
chunk_index: 2
total_chunks: 12
harvested_at: "2024-01-01T12:00:00Z"
tags: [tag1, tag2]
| Error | Handling |
|---|---|
| No transcript available | Log, skip, note in report |
| Private/deleted video | Skip with error note |
| Age-restricted | May need authentication |
| Rate limited | Back off, retry with delay |
| API quota exceeded | Switch to yt-dlp method |
# Single video
result = await harvest_youtube_video(
url="https://youtube.com/watch?v=VIDEO_ID",
collection="ml_tutorials",
chunk_strategy="topic"
)
# Playlist
result = await harvest_youtube_playlist(
playlist_url="https://youtube.com/playlist?list=PLAYLIST_ID",
collection="course_lectures",
chunk_strategy="time",
chunk_size=600 # 10-minute chunks
)
# Channel (recent videos)
result = await harvest_youtube_channel(
channel_url="https://youtube.com/@ChannelName",
collection="channel_content",
max_videos=20
)
Track improvements as you use this skill.
This skill should be used when the user asks to "create a slash command", "add a command", "write a custom command", "define command arguments", "use command frontmatter", "organize commands", "create command with file references", "interactive command", "use AskUserQuestion in command", or needs guidance on slash command structure, YAML frontmatter fields, dynamic arguments, bash execution in commands, user interaction patterns, or command development best practices for Claude Code.
This skill should be used when the user asks to "create an agent", "add an agent", "write a subagent", "agent frontmatter", "when to use description", "agent examples", "agent tools", "agent colors", "autonomous agent", or needs guidance on agent structure, system prompts, triggering conditions, or agent development best practices for Claude Code plugins.
This skill should be used when the user asks to "create a hook", "add a PreToolUse/PostToolUse/Stop hook", "validate tool use", "implement prompt-based hooks", "use ${CLAUDE_PLUGIN_ROOT}", "set up event-driven automation", "block dangerous commands", or mentions hook events (PreToolUse, PostToolUse, Stop, SubagentStop, SessionStart, SessionEnd, UserPromptSubmit, PreCompact, Notification). Provides comprehensive guidance for creating and implementing Claude Code plugin hooks with focus on advanced prompt-based hooks API.