Complete FFmpeg + OpenCV + Python integration guide for video processing pipelines. PROACTIVELY activate for: (1) FFmpeg to OpenCV frame handoff, (2) cv2.VideoCapture vs ffmpeg subprocess, (3) BGR/RGB color format conversion gotchas, (4) Frame dimension order img[y,x] vs img[x,y], (5) ffmpegcv GPU-accelerated video I/O, (6) VidGear multi-threaded streaming, (7) Decord batch video loading for ML, (8) PyAV frame-level processing, (9) Audio stream preservation with video filters, (10) Memory-efficient frame generators, (11) OpenCV + FFmpeg + Modal parallel processing, (12) Pipe frames between FFmpeg and OpenCV. Provides: Color format conversion patterns, coordinate system gotchas, library selection guide, memory management, subprocess pipe patterns, GPU-accelerated alternatives to cv2.VideoCapture. Ensures: Correct integration between FFmpeg and OpenCV without color/coordinate bugs. See also: ffmpeg-python-integration-reference for type-safe parameter mappings.
/plugin marketplace add JosiahSiegel/claude-plugin-marketplace/plugin install ffmpeg-master@claude-plugin-marketplaceThis skill inherits all available tools. When active, it can use any tool Claude has access to.
| Task | Best Library | Why |
|---|---|---|
| Simple video read | OpenCV cv2.VideoCapture | Built-in, easy API |
| GPU video I/O | ffmpegcv | NVDEC/NVENC, OpenCV-compatible API |
| Multi-threaded streaming | VidGear | RTSP/RTMP, camera capture |
| ML batch loading | Decord | 2x faster than OpenCV, batch GPU decode |
| Frame-level precision | PyAV | Direct libav access, precise seeking |
| Complex filter graphs | ffmpeg-python subprocess | Full FFmpeg power |
| Color Format | Library | Conversion |
|---|---|---|
| BGR | OpenCV (cv2) | cv2.cvtColor(img, cv2.COLOR_BGR2RGB) |
| RGB | FFmpeg, PIL, PyAV | cv2.cvtColor(img, cv2.COLOR_RGB2BGR) |
| YUV | FFmpeg internal | Convert to RGB/BGR for processing |
Use for FFmpeg + OpenCV combined workflows:
Complete guide to combining FFmpeg's video I/O power with OpenCV's image processing capabilities.
The #1 source of bugs in FFmpeg + OpenCV pipelines.
import cv2
import numpy as np
# OpenCV uses BGR by default
img_bgr = cv2.imread("image.jpg") # BGR format!
# FFmpeg outputs RGB by default (in most configs)
# PyAV outputs RGB
# PIL/Pillow uses RGB
# WRONG: Using FFmpeg RGB output directly with OpenCV
# Colors will be swapped (red becomes blue)
# CORRECT: Always convert explicitly
def bgr_to_rgb(frame: np.ndarray) -> np.ndarray:
"""Convert OpenCV BGR to RGB for FFmpeg/PIL/ML."""
return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
def rgb_to_bgr(frame: np.ndarray) -> np.ndarray:
"""Convert RGB (FFmpeg/PyAV/PIL) to OpenCV BGR."""
return cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
# When piping FFmpeg to OpenCV:
def read_ffmpeg_frame_for_opencv(raw_bytes: bytes, width: int, height: int) -> np.ndarray:
"""Read FFmpeg raw frame and convert to OpenCV BGR."""
# FFmpeg rawvideo is RGB by default
frame_rgb = np.frombuffer(raw_bytes, dtype=np.uint8).reshape(height, width, 3)
# Convert to BGR for OpenCV
frame_bgr = cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR)
return frame_bgr
OpenCV and NumPy use (row, col) = (y, x), not (x, y).
import cv2
import numpy as np
img = cv2.imread("image.jpg")
print(img.shape) # (height, width, channels) = (y, x, c)
# WRONG: Accessing pixel at x=100, y=200
# pixel = img[100, 200] # This gets row=100, col=200 = (y=100, x=200)
# CORRECT: Accessing pixel at x=100, y=200
pixel = img[200, 100] # row=200 (y), col=100 (x)
# WRONG: Creating image with width=1920, height=1080
# img = np.zeros((1920, 1080, 3)) # Creates 1920 rows × 1080 cols!
# CORRECT: Creating 1920×1080 image
img = np.zeros((1080, 1920, 3), dtype=np.uint8) # (height, width, channels)
# When reading frame dimensions:
cap = cv2.VideoCapture("video.mp4")
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) # x dimension
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # y dimension
# NumPy array will be: frame.shape = (height, width, 3)
Video filters drop audio by default in ffmpeg-python.
import ffmpeg
# WRONG: Audio is silently dropped
(
ffmpeg
.input('input.mp4')
.filter('scale', 1280, 720)
.output('output.mp4')
.run()
)
# CORRECT: Explicitly handle both streams
input_file = ffmpeg.input('input.mp4')
video = input_file.video.filter('scale', 1280, 720)
audio = input_file.audio # Preserve audio stream
(
ffmpeg
.output(video, audio, 'output.mp4')
.overwrite_output()
.run()
)
# CORRECT: For complex pipelines
input_file = ffmpeg.input('input.mp4')
video = (
input_file.video
.filter('scale', 1280, 720)
.filter('fps', fps=30)
)
audio = input_file.audio.filter('loudnorm')
ffmpeg.output(video, audio, 'output.mp4', vcodec='libx264', acodec='aac').run()
Always release VideoCapture and destroy windows.
import cv2
# WRONG: No cleanup
cap = cv2.VideoCapture("video.mp4")
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
cv2.imshow("Frame", frame)
# Memory leak!
# CORRECT: Use try/finally
cap = cv2.VideoCapture("video.mp4")
try:
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
cv2.imshow("Frame", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
finally:
cap.release()
cv2.destroyAllWindows()
# CORRECT: Use context manager pattern
class VideoReader:
def __init__(self, path: str):
self.cap = cv2.VideoCapture(path)
if not self.cap.isOpened():
raise IOError(f"Cannot open video: {path}")
def __enter__(self):
return self
def __exit__(self, *args):
self.cap.release()
def frames(self):
while True:
ret, frame = self.cap.read()
if not ret:
break
yield frame
# Usage
with VideoReader("video.mp4") as reader:
for frame in reader.frames():
# Process frame...
pass
For complex input handling (network streams, unusual formats), use FFmpeg to decode and pipe raw frames to OpenCV.
import subprocess
import numpy as np
import cv2
def ffmpeg_to_opencv_pipe(input_path: str, width: int, height: int):
"""Read video with FFmpeg, process frames with OpenCV."""
# FFmpeg command to output raw BGR24 frames
cmd = [
'ffmpeg',
'-i', input_path,
'-f', 'rawvideo',
'-pix_fmt', 'bgr24', # BGR for OpenCV!
'-s', f'{width}x{height}',
'-' # Output to stdout
]
# Start FFmpeg process
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL, # Suppress FFmpeg logs
bufsize=10**8 # Large buffer for video
)
frame_size = width * height * 3
try:
while True:
raw_frame = process.stdout.read(frame_size)
if len(raw_frame) != frame_size:
break
# Convert to NumPy array (already BGR for OpenCV)
frame = np.frombuffer(raw_frame, dtype=np.uint8).reshape(height, width, 3)
# Process with OpenCV
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
edges = cv2.Canny(gray, 100, 200)
yield edges
finally:
process.stdout.close()
process.wait()
# Get video dimensions first
def get_video_dimensions(path: str) -> tuple[int, int]:
"""Get video width and height using ffprobe."""
cmd = [
'ffprobe', '-v', 'error',
'-select_streams', 'v:0',
'-show_entries', 'stream=width,height',
'-of', 'csv=p=0',
path
]
result = subprocess.run(cmd, capture_output=True, text=True)
width, height = map(int, result.stdout.strip().split(','))
return width, height
# Usage
width, height = get_video_dimensions("input.mp4")
for edge_frame in ffmpeg_to_opencv_pipe("input.mp4", width, height):
cv2.imshow("Edges", edge_frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
Process frames with OpenCV, encode output with FFmpeg.
import subprocess
import numpy as np
import cv2
def opencv_to_ffmpeg_pipe(
input_path: str,
output_path: str,
process_frame: callable,
fps: float = 30.0
):
"""Process video frames with OpenCV, encode with FFmpeg."""
# Open input with OpenCV
cap = cv2.VideoCapture(input_path)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# FFmpeg command to receive raw BGR frames
cmd = [
'ffmpeg', '-y',
'-f', 'rawvideo',
'-vcodec', 'rawvideo',
'-s', f'{width}x{height}',
'-pix_fmt', 'bgr24', # OpenCV BGR format
'-r', str(fps),
'-i', '-', # Read from stdin
'-c:v', 'libx264',
'-preset', 'fast',
'-crf', '23',
'-pix_fmt', 'yuv420p',
output_path
]
process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stderr=subprocess.DEVNULL
)
try:
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# Process frame with user function
processed = process_frame(frame)
# Write to FFmpeg
process.stdin.write(processed.tobytes())
finally:
cap.release()
process.stdin.close()
process.wait()
# Example usage
def add_blur(frame: np.ndarray) -> np.ndarray:
return cv2.GaussianBlur(frame, (15, 15), 0)
opencv_to_ffmpeg_pipe("input.mp4", "blurred.mp4", add_blur)
For full control over input/output codecs while processing with OpenCV.
import subprocess
import numpy as np
import cv2
from concurrent.futures import ThreadPoolExecutor
def ffmpeg_opencv_ffmpeg_pipeline(
input_path: str,
output_path: str,
process_frame: callable,
preserve_audio: bool = True
):
"""Complete pipeline: FFmpeg decode → OpenCV process → FFmpeg encode."""
# Get video info
probe_cmd = [
'ffprobe', '-v', 'error',
'-select_streams', 'v:0',
'-show_entries', 'stream=width,height,r_frame_rate',
'-of', 'csv=p=0',
input_path
]
probe_result = subprocess.run(probe_cmd, capture_output=True, text=True)
parts = probe_result.stdout.strip().split(',')
width, height = int(parts[0]), int(parts[1])
fps_parts = parts[2].split('/')
fps = int(fps_parts[0]) / int(fps_parts[1])
# FFmpeg decode command (output BGR24 for OpenCV)
decode_cmd = [
'ffmpeg',
'-i', input_path,
'-f', 'rawvideo',
'-pix_fmt', 'bgr24',
'-'
]
# FFmpeg encode command
encode_cmd = [
'ffmpeg', '-y',
'-f', 'rawvideo',
'-vcodec', 'rawvideo',
'-s', f'{width}x{height}',
'-pix_fmt', 'bgr24',
'-r', str(fps),
'-i', '-',
]
# Add audio from original file if preserving
if preserve_audio:
encode_cmd.extend(['-i', input_path, '-map', '0:v', '-map', '1:a', '-c:a', 'copy'])
encode_cmd.extend([
'-c:v', 'libx264',
'-preset', 'fast',
'-crf', '23',
'-pix_fmt', 'yuv420p',
output_path
])
# Start processes
decoder = subprocess.Popen(
decode_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL
)
encoder = subprocess.Popen(
encode_cmd,
stdin=subprocess.PIPE,
stderr=subprocess.DEVNULL
)
frame_size = width * height * 3
try:
while True:
raw_frame = decoder.stdout.read(frame_size)
if len(raw_frame) != frame_size:
break
# Convert to NumPy, process, convert back
frame = np.frombuffer(raw_frame, dtype=np.uint8).reshape(height, width, 3)
processed = process_frame(frame)
encoder.stdin.write(processed.tobytes())
finally:
decoder.stdout.close()
decoder.wait()
encoder.stdin.close()
encoder.wait()
# Usage
def detect_edges(frame: np.ndarray) -> np.ndarray:
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
edges = cv2.Canny(gray, 50, 150)
# Convert back to BGR for encoding
return cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR)
ffmpeg_opencv_ffmpeg_pipeline("input.mp4", "edges.mp4", detect_edges)
ffmpegcv provides an OpenCV-compatible API with GPU acceleration (NVDEC/NVENC).
pip install ffmpegcv
import ffmpegcv
# Read video (uses NVDEC if available)
cap = ffmpegcv.VideoCapture("video.mp4")
# CPU-only reading
cap = ffmpegcv.VideoCapture("video.mp4", gpu=-1)
# Force GPU 0
cap = ffmpegcv.VideoCapture("video.mp4", gpu=0)
# Read frames (returns BGR like OpenCV!)
while True:
ret, frame = cap.read()
if not ret:
break
# frame is BGR NumPy array, just like cv2.VideoCapture
print(frame.shape) # (height, width, 3)
cap.release()
import ffmpegcv
import cv2
import numpy as np
# GPU-accelerated writing with NVENC
writer = ffmpegcv.VideoWriter(
"output.mp4",
codec="h264_nvenc", # NVIDIA GPU encoding
fps=30,
frameSize=(1920, 1080)
)
# Write frames
for i in range(300):
frame = np.random.randint(0, 255, (1080, 1920, 3), dtype=np.uint8)
writer.write(frame) # Accepts BGR like OpenCV
writer.release()
import ffmpegcv
# Read frames 100-200 only (efficient seeking)
cap = ffmpegcv.VideoCapture("video.mp4")
cap.set(cv2.CAP_PROP_POS_FRAMES, 100)
for i in range(100):
ret, frame = cap.read()
if not ret:
break
# Process frame...
cap.release()
import ffmpegcv
import cv2
import numpy as np
def process_with_ffmpegcv(input_path: str, output_path: str):
"""GPU decode → OpenCV process → GPU encode."""
# GPU reader
cap = ffmpegcv.VideoCapture(input_path, gpu=0)
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# GPU writer
writer = ffmpegcv.VideoWriter(
output_path,
codec="h264_nvenc",
fps=fps,
frameSize=(width, height)
)
try:
while True:
ret, frame = cap.read()
if not ret:
break
# OpenCV processing (CPU)
processed = cv2.GaussianBlur(frame, (5, 5), 0)
processed = cv2.Canny(processed, 100, 200)
processed = cv2.cvtColor(processed, cv2.COLOR_GRAY2BGR)
writer.write(processed)
finally:
cap.release()
writer.release()
process_with_ffmpegcv("input.mp4", "processed.mp4")
VidGear provides multi-threaded, high-performance video streaming with OpenCV integration.
pip install vidgear[core]
from vidgear.gears import CamGear
import cv2
# Multi-threaded video reading (faster than cv2.VideoCapture)
stream = CamGear(source="video.mp4").start()
while True:
frame = stream.read()
if frame is None:
break
# frame is BGR like OpenCV
cv2.imshow("Frame", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
stream.stop()
cv2.destroyAllWindows()
from vidgear.gears import CamGear
# RTSP stream capture
options = {
"THREADED_QUEUE_MODE": True,
"CAP_PROP_FRAME_WIDTH": 1920,
"CAP_PROP_FRAME_HEIGHT": 1080,
}
stream = CamGear(
source="rtsp://192.168.1.100:554/stream",
stream_mode=True,
**options
).start()
while True:
frame = stream.read()
if frame is None:
break
# Process frame...
stream.stop()
from vidgear.gears import WriteGear
import cv2
import numpy as np
# High-performance writing with FFmpeg backend
output_params = {
"-vcodec": "libx264",
"-crf": 23,
"-preset": "fast",
"-pix_fmt": "yuv420p",
}
writer = WriteGear(output="output.mp4", **output_params)
# Generate and write frames
for i in range(300):
frame = np.random.randint(0, 255, (1080, 1920, 3), dtype=np.uint8)
writer.write(frame)
writer.close()
from vidgear.gears import WriteGear
# NVENC GPU encoding
output_params = {
"-vcodec": "h264_nvenc",
"-preset": "p4",
"-rc": "vbr",
"-cq": 23,
"-pix_fmt": "yuv420p",
}
writer = WriteGear(output="output.mp4", **output_params)
# ... write frames ...
writer.close()
Decord provides 2x faster video loading than OpenCV, optimized for deep learning.
pip install decord
from decord import VideoReader, cpu, gpu
import numpy as np
# CPU video reader
vr = VideoReader("video.mp4", ctx=cpu(0))
# Get video info
print(f"Total frames: {len(vr)}")
print(f"FPS: {vr.get_avg_fps()}")
# Read single frame (returns RGB!)
frame = vr[0]
print(frame.shape) # (height, width, 3) - RGB format!
# CRITICAL: Decord returns RGB, not BGR
# Convert for OpenCV:
frame_bgr = frame[:, :, ::-1] # RGB to BGR
# Or use cv2:
import cv2
frame_bgr = cv2.cvtColor(frame.asnumpy(), cv2.COLOR_RGB2BGR)
from decord import VideoReader, cpu
import numpy as np
vr = VideoReader("video.mp4", ctx=cpu(0))
# Load batch of frames (very efficient!)
frame_indices = [0, 10, 20, 30, 40]
batch = vr.get_batch(frame_indices)
print(batch.shape) # (5, height, width, 3) - batch of RGB frames
# Load every 10th frame
all_frames = vr.get_batch(range(0, len(vr), 10))
# Convert batch to PyTorch tensor
import torch
tensor = torch.from_numpy(batch.asnumpy())
tensor = tensor.permute(0, 3, 1, 2) # NHWC → NCHW for PyTorch
tensor = tensor.float() / 255.0 # Normalize
from decord import VideoReader, gpu
# GPU video reader (NVDEC)
vr = VideoReader("video.mp4", ctx=gpu(0))
# Batch read with GPU
frames = vr.get_batch([0, 1, 2, 3, 4])
# frames is on GPU, can be used directly with PyTorch
from decord import VideoReader, cpu
import cv2
import numpy as np
def process_video_with_decord(path: str, batch_size: int = 32):
"""Efficient batch processing with Decord and OpenCV."""
vr = VideoReader(path, ctx=cpu(0))
total_frames = len(vr)
results = []
for start in range(0, total_frames, batch_size):
end = min(start + batch_size, total_frames)
batch = vr.get_batch(range(start, end))
for frame_rgb in batch.asnumpy():
# Convert RGB (Decord) to BGR (OpenCV)
frame_bgr = cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR)
# OpenCV processing
gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
edges = cv2.Canny(gray, 100, 200)
results.append(edges)
return results
PyAV provides direct access to libav for precise frame-level control.
pip install av
import av
import numpy as np
# Open video
container = av.open("video.mp4")
for frame in container.decode(video=0):
# frame.to_ndarray() returns RGB by default!
img_rgb = frame.to_ndarray(format="rgb24")
# Convert to BGR for OpenCV
import cv2
img_bgr = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2BGR)
print(img_bgr.shape) # (height, width, 3)
container.close()
import av
container = av.open("video.mp4")
stream = container.streams.video[0]
# Get frame at timestamp
target_pts = 10.0 # 10 seconds
container.seek(int(target_pts * av.time_base))
for frame in container.decode(video=0):
# Process frame at approximately 10 seconds
img = frame.to_ndarray(format="rgb24")
break
container.close()
import av
import numpy as np
from typing import Generator
def extract_frames_pyav(
path: str,
fps: float = None
) -> Generator[np.ndarray, None, None]:
"""Extract frames with PyAV (yields BGR for OpenCV)."""
container = av.open(path)
stream = container.streams.video[0]
# Set frame rate if specified
if fps:
stream.codec_context.framerate = fps
for frame in container.decode(stream):
# Get RGB array
img_rgb = frame.to_ndarray(format="rgb24")
# Convert to BGR for OpenCV
img_bgr = img_rgb[:, :, ::-1]
yield img_bgr
container.close()
# Usage
for frame_bgr in extract_frames_pyav("video.mp4"):
# Direct OpenCV processing
edges = cv2.Canny(frame_bgr, 100, 200)
import av
import numpy as np
def write_video_pyav(frames: list, output_path: str, fps: float = 30.0):
"""Write frames to video with PyAV."""
height, width = frames[0].shape[:2]
container = av.open(output_path, mode='w')
stream = container.add_stream('libx264', rate=fps)
stream.width = width
stream.height = height
stream.pix_fmt = 'yuv420p'
stream.options = {'crf': '23', 'preset': 'fast'}
for frame_bgr in frames:
# Convert BGR to RGB for PyAV
frame_rgb = frame_bgr[:, :, ::-1]
# Create VideoFrame
av_frame = av.VideoFrame.from_ndarray(frame_rgb, format='rgb24')
# Encode
for packet in stream.encode(av_frame):
container.mux(packet)
# Flush encoder
for packet in stream.encode():
container.mux(packet)
container.close()
Deploy FFmpeg + OpenCV pipelines on Modal's serverless infrastructure.
import modal
# Complete image with FFmpeg, OpenCV, and GPU libraries
video_image = (
modal.Image.debian_slim(python_version="3.12")
.apt_install(
"ffmpeg", # FFmpeg CLI
"libsm6", # OpenCV dependencies
"libxext6",
"libgl1",
"libglib2.0-0",
)
.pip_install(
"opencv-python-headless", # No GUI for server
"ffmpeg-python",
"numpy",
"Pillow",
)
)
# GPU image with additional libraries
gpu_video_image = (
modal.Image.debian_slim(python_version="3.12")
.apt_install("ffmpeg", "libsm6", "libxext6", "libgl1", "libglib2.0-0")
.pip_install(
"opencv-python-headless",
"ffmpeg-python",
"numpy",
"torch",
"decord",
"ffmpegcv",
)
)
app = modal.App("ffmpeg-opencv-pipeline", image=video_image)
import modal
app = modal.App("opencv-processing")
image = (
modal.Image.debian_slim()
.apt_install("ffmpeg", "libsm6", "libxext6", "libgl1")
.pip_install("opencv-python-headless", "numpy")
)
@app.function(image=image)
def process_frame(frame_bytes: bytes) -> bytes:
"""Process single frame with OpenCV on Modal."""
import cv2
import numpy as np
# Decode image (PNG or JPEG)
nparr = np.frombuffer(frame_bytes, np.uint8)
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) # BGR
# OpenCV processing
processed = cv2.GaussianBlur(frame, (15, 15), 0)
# Encode back to PNG
_, encoded = cv2.imencode('.png', processed)
return encoded.tobytes()
@app.function(image=image, timeout=600)
def extract_and_process(video_bytes: bytes) -> list[bytes]:
"""Extract frames with FFmpeg, process with OpenCV."""
import subprocess
import tempfile
from pathlib import Path
import cv2
import numpy as np
with tempfile.TemporaryDirectory() as tmpdir:
input_path = Path(tmpdir) / "input.mp4"
input_path.write_bytes(video_bytes)
# Extract frames with FFmpeg
subprocess.run([
"ffmpeg", "-i", str(input_path),
"-vf", "fps=1", # 1 frame per second
f"{tmpdir}/frame_%04d.png"
], check=True, capture_output=True)
# Process each frame with OpenCV
results = []
for frame_path in sorted(Path(tmpdir).glob("frame_*.png")):
frame = cv2.imread(str(frame_path))
# Apply edge detection
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
edges = cv2.Canny(gray, 100, 200)
# Encode result
_, encoded = cv2.imencode('.png', edges)
results.append(encoded.tobytes())
return results
@app.local_entrypoint()
def main():
video_bytes = Path("input.mp4").read_bytes()
processed_frames = extract_and_process.remote(video_bytes)
for i, frame_bytes in enumerate(processed_frames):
Path(f"output/frame_{i:04d}.png").write_bytes(frame_bytes)
import modal
from pathlib import Path
app = modal.App("parallel-opencv")
image = (
modal.Image.debian_slim()
.apt_install("ffmpeg", "libsm6", "libxext6", "libgl1")
.pip_install("opencv-python-headless", "numpy")
)
@app.function(image=image)
def extract_frames(video_bytes: bytes) -> list[bytes]:
"""Extract all frames from video."""
import subprocess
import tempfile
from pathlib import Path
with tempfile.TemporaryDirectory() as tmpdir:
input_path = Path(tmpdir) / "input.mp4"
input_path.write_bytes(video_bytes)
subprocess.run([
"ffmpeg", "-i", str(input_path),
"-vsync", "0",
f"{tmpdir}/frame_%06d.png"
], check=True, capture_output=True)
frames = []
for path in sorted(Path(tmpdir).glob("frame_*.png")):
frames.append(path.read_bytes())
return frames
@app.function(image=image)
def process_single_frame(frame_data: tuple[int, bytes]) -> tuple[int, bytes]:
"""Process a single frame."""
import cv2
import numpy as np
frame_idx, frame_bytes = frame_data
nparr = np.frombuffer(frame_bytes, np.uint8)
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
# Heavy OpenCV processing
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
edges = cv2.Canny(gray, 50, 150)
dilated = cv2.dilate(edges, None, iterations=2)
contours, _ = cv2.findContours(dilated, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# Draw contours on original frame
result = frame.copy()
cv2.drawContours(result, contours, -1, (0, 255, 0), 2)
_, encoded = cv2.imencode('.png', result)
return frame_idx, encoded.tobytes()
@app.function(image=image, timeout=600)
def combine_frames(processed_frames: list[tuple[int, bytes]], fps: float) -> bytes:
"""Combine processed frames back into video."""
import subprocess
import tempfile
from pathlib import Path
with tempfile.TemporaryDirectory() as tmpdir:
# Sort by frame index and write
for idx, frame_bytes in sorted(processed_frames):
path = Path(tmpdir) / f"frame_{idx:06d}.png"
path.write_bytes(frame_bytes)
output_path = Path(tmpdir) / "output.mp4"
subprocess.run([
"ffmpeg", "-y",
"-framerate", str(fps),
"-i", f"{tmpdir}/frame_%06d.png",
"-c:v", "libx264",
"-preset", "fast",
"-crf", "23",
"-pix_fmt", "yuv420p",
str(output_path)
], check=True, capture_output=True)
return output_path.read_bytes()
@app.local_entrypoint()
def main():
video_bytes = Path("input.mp4").read_bytes()
# Extract frames (single container)
frames = extract_frames.remote(video_bytes)
print(f"Extracted {len(frames)} frames")
# Process frames in parallel (many containers!)
inputs = [(i, frame) for i, frame in enumerate(frames)]
processed = list(process_single_frame.map(inputs))
print(f"Processed {len(processed)} frames")
# Combine back into video
output = combine_frames.remote(processed, fps=30.0)
Path("output.mp4").write_bytes(output)
print("Done!")
import modal
app = modal.App("gpu-video-pipeline")
# GPU image with ffmpegcv
gpu_image = (
modal.Image.from_registry("nvidia/cuda:12.4.0-runtime-ubuntu22.04", add_python="3.12")
.apt_install("ffmpeg", "libsm6", "libxext6", "libgl1", "libglib2.0-0")
.pip_install("opencv-python-headless", "numpy", "ffmpegcv")
)
@app.function(image=gpu_image, gpu="T4")
def gpu_video_processing(video_bytes: bytes) -> bytes:
"""GPU-accelerated video processing with ffmpegcv."""
import cv2
import numpy as np
import ffmpegcv
import tempfile
from pathlib import Path
with tempfile.TemporaryDirectory() as tmpdir:
input_path = Path(tmpdir) / "input.mp4"
output_path = Path(tmpdir) / "output.mp4"
input_path.write_bytes(video_bytes)
# GPU reader (NVDEC)
cap = ffmpegcv.VideoCapture(str(input_path), gpu=0)
fps = cap.fps
width = int(cap.width)
height = int(cap.height)
# GPU writer (NVENC)
writer = ffmpegcv.VideoWriter(
str(output_path),
codec="h264_nvenc",
fps=fps,
frameSize=(width, height)
)
try:
while True:
ret, frame = cap.read()
if not ret:
break
# OpenCV processing (CPU - can't avoid this)
processed = cv2.bilateralFilter(frame, 9, 75, 75)
writer.write(processed)
finally:
cap.release()
writer.release()
return output_path.read_bytes()
@app.local_entrypoint()
def main():
video = Path("input.mp4").read_bytes()
result = gpu_video_processing.remote(video)
Path("output.mp4").write_bytes(result)
import cv2
import numpy as np
# OpenCV BGR → RGB (for FFmpeg, PIL, PyTorch)
rgb = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
rgb = bgr[:, :, ::-1] # Faster, pure NumPy
# RGB → BGR (for OpenCV from FFmpeg, PyAV, Decord)
bgr = cv2.cvtColor(rgb, cv2.COLOR_RGB2BGR)
bgr = rgb[:, :, ::-1] # Faster
# Grayscale
gray = cv2.cvtColor(bgr, cv2.COLOR_BGR2GRAY)
bgr_from_gray = cv2.cvtColor(gray, cv2.COLOR_GRAY2BGR)
# HSV (for color filtering)
hsv = cv2.cvtColor(bgr, cv2.COLOR_BGR2HSV)
Need video I/O?
├── Simple local file?
│ └── cv2.VideoCapture (built-in, easy)
├── Need GPU acceleration?
│ └── ffmpegcv (NVDEC/NVENC, OpenCV-compatible)
├── Network streaming (RTSP/RTMP)?
│ └── VidGear CamGear (multi-threaded)
├── ML batch training?
│ └── Decord (2x faster, batch GPU decode)
├── Frame-level precision/seeking?
│ └── PyAV (direct libav access)
└── Complex filters/formats?
└── FFmpeg subprocess with pipes
def frame_generator(path: str, batch_size: int = 1):
"""Memory-efficient frame generator."""
cap = cv2.VideoCapture(path)
try:
batch = []
while True:
ret, frame = cap.read()
if not ret:
if batch:
yield batch
break
batch.append(frame)
if len(batch) == batch_size:
yield batch
batch = []
finally:
cap.release()
# Usage - never loads entire video into memory
for batch in frame_generator("large_video.mp4", batch_size=32):
# Process batch of 32 frames
pass
ffmpeg-python-integration-reference - Type-safe Python-FFmpeg parameter mappings, color conversions, time unitsffmpeg-fundamentals-2025 - Core FFmpeg operationsffmpeg-captions-subtitles - Subtitle processing with PythonThis skill should be used when the user asks to "create an agent", "add an agent", "write a subagent", "agent frontmatter", "when to use description", "agent examples", "agent tools", "agent colors", "autonomous agent", or needs guidance on agent structure, system prompts, triggering conditions, or agent development best practices for Claude Code plugins.
This skill should be used when the user asks to "create a slash command", "add a command", "write a custom command", "define command arguments", "use command frontmatter", "organize commands", "create command with file references", "interactive command", "use AskUserQuestion in command", or needs guidance on slash command structure, YAML frontmatter fields, dynamic arguments, bash execution in commands, user interaction patterns, or command development best practices for Claude Code.
This skill should be used when the user asks to "create a hook", "add a PreToolUse/PostToolUse/Stop hook", "validate tool use", "implement prompt-based hooks", "use ${CLAUDE_PLUGIN_ROOT}", "set up event-driven automation", "block dangerous commands", or mentions hook events (PreToolUse, PostToolUse, Stop, SubagentStop, SessionStart, SessionEnd, UserPromptSubmit, PreCompact, Notification). Provides comprehensive guidance for creating and implementing Claude Code plugin hooks with focus on advanced prompt-based hooks API.