Skip to content

Video Utils

supervision.utils.video.VideoInfo dataclass

A class to store video information, including width, height, fps and total number of frames.

Attributes:

Name Type Description
width int

width of the video in pixels

height int

height of the video in pixels

fps int

frames per second of the video

total_frames Optional[int]

total number of frames in the video, default is None

Examples:

import supervision as sv

video_info = sv.VideoInfo.from_video_path(video_path=<SOURCE_VIDEO_FILE>)

video_info
# VideoInfo(width=3840, height=2160, fps=25, total_frames=538)

video_info.resolution_wh
# (3840, 2160)
Source code in supervision/utils/video.py
@dataclass
class VideoInfo:
    """
    A class to store video information, including width, height, fps and
        total number of frames.

    Attributes:
        width (int): width of the video in pixels
        height (int): height of the video in pixels
        fps (int): frames per second of the video
        total_frames (Optional[int]): total number of frames in the video,
            default is None

    Examples:
        ```python
        import supervision as sv

        video_info = sv.VideoInfo.from_video_path(video_path=<SOURCE_VIDEO_FILE>)

        video_info
        # VideoInfo(width=3840, height=2160, fps=25, total_frames=538)

        video_info.resolution_wh
        # (3840, 2160)
        ```
    """

    width: int
    height: int
    fps: int
    total_frames: int | None = None

    @classmethod
    def from_video_path(cls, video_path: str) -> VideoInfo:
        video = cv2.VideoCapture(video_path)
        if not video.isOpened():
            raise Exception(f"Could not open video at {video_path}")

        width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fps = int(video.get(cv2.CAP_PROP_FPS))
        total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
        video.release()
        return VideoInfo(width, height, fps, total_frames)

    @property
    def resolution_wh(self) -> tuple[int, int]:
        return self.width, self.height

supervision.utils.video.VideoSink

Context manager that saves video frames to a file using OpenCV.

Attributes:

Name Type Description
target_path str

The path to the output file where the video will be saved.

video_info VideoInfo

Information about the video resolution, fps, and total frame count.

codec str

FOURCC code for video format

Example
import supervision as sv

video_info = sv.VideoInfo.from_video_path(<SOURCE_VIDEO_PATH>)
frames_generator = sv.get_video_frames_generator(<SOURCE_VIDEO_PATH>)

with sv.VideoSink(target_path=<TARGET_VIDEO_PATH>, video_info=video_info) as sink:
    for frame in frames_generator:
        sink.write_frame(frame=frame)
Source code in supervision/utils/video.py
class VideoSink:
    """
    Context manager that saves video frames to a file using OpenCV.

    Attributes:
        target_path (str): The path to the output file where the video will be saved.
        video_info (VideoInfo): Information about the video resolution, fps,
            and total frame count.
        codec (str): FOURCC code for video format

    Example:
        ```python
        import supervision as sv

        video_info = sv.VideoInfo.from_video_path(<SOURCE_VIDEO_PATH>)
        frames_generator = sv.get_video_frames_generator(<SOURCE_VIDEO_PATH>)

        with sv.VideoSink(target_path=<TARGET_VIDEO_PATH>, video_info=video_info) as sink:
            for frame in frames_generator:
                sink.write_frame(frame=frame)
        ```
    """  # noqa: E501 // docs

    def __init__(self, target_path: str, video_info: VideoInfo, codec: str = "mp4v"):
        self.target_path = target_path
        self.video_info = video_info
        self.__codec = codec
        self.__writer = None

    def __enter__(self):
        try:
            self.__fourcc = cv2.VideoWriter_fourcc(*self.__codec)
        except TypeError as e:
            print(str(e) + ". Defaulting to mp4v...")
            self.__fourcc = cv2.VideoWriter_fourcc(*"mp4v")
        self.__writer = cv2.VideoWriter(
            self.target_path,
            self.__fourcc,
            self.video_info.fps,
            self.video_info.resolution_wh,
        )
        return self

    def write_frame(self, frame: np.ndarray):
        """
        Writes a single video frame to the target video file.

        Args:
            frame (np.ndarray): The video frame to be written to the file. The frame
                must be in BGR color format.
        """
        self.__writer.write(frame)

    def __exit__(self, exc_type, exc_value, exc_traceback):
        self.__writer.release()

Functions

write_frame(frame)

Writes a single video frame to the target video file.

Parameters:

Name Type Description Default
frame
ndarray

The video frame to be written to the file. The frame must be in BGR color format.

required
Source code in supervision/utils/video.py
def write_frame(self, frame: np.ndarray):
    """
    Writes a single video frame to the target video file.

    Args:
        frame (np.ndarray): The video frame to be written to the file. The frame
            must be in BGR color format.
    """
    self.__writer.write(frame)

supervision.utils.video.FPSMonitor

A class for monitoring frames per second (FPS) to benchmark latency.

Source code in supervision/utils/video.py
class FPSMonitor:
    """
    A class for monitoring frames per second (FPS) to benchmark latency.
    """

    def __init__(self, sample_size: int = 30):
        """
        Args:
            sample_size (int): The maximum number of observations for latency
                benchmarking.

        Examples:
            ```python
            import supervision as sv

            frames_generator = sv.get_video_frames_generator(source_path=<SOURCE_FILE_PATH>)
            fps_monitor = sv.FPSMonitor()

            for frame in frames_generator:
                # your processing code here
                fps_monitor.tick()
                fps = fps_monitor.fps
            ```
        """  # noqa: E501 // docs
        self.all_timestamps = deque(maxlen=sample_size)

    @property
    def fps(self) -> float:
        """
        Computes and returns the average FPS based on the stored time stamps.

        Returns:
            float: The average FPS. Returns 0.0 if no time stamps are stored.
        """
        if not self.all_timestamps:
            return 0.0
        taken_time = self.all_timestamps[-1] - self.all_timestamps[0]
        return (len(self.all_timestamps)) / taken_time if taken_time != 0 else 0.0

    def tick(self) -> None:
        """
        Adds a new time stamp to the deque for FPS calculation.
        """
        self.all_timestamps.append(time.monotonic())

    def reset(self) -> None:
        """
        Clears all the time stamps from the deque.
        """
        self.all_timestamps.clear()

Attributes

fps property

Computes and returns the average FPS based on the stored time stamps.

Returns:

Name Type Description
float float

The average FPS. Returns 0.0 if no time stamps are stored.

Functions

__init__(sample_size=30)

Parameters:

Name Type Description Default
sample_size
int

The maximum number of observations for latency benchmarking.

30

Examples:

import supervision as sv

frames_generator = sv.get_video_frames_generator(source_path=<SOURCE_FILE_PATH>)
fps_monitor = sv.FPSMonitor()

for frame in frames_generator:
    # your processing code here
    fps_monitor.tick()
    fps = fps_monitor.fps
Source code in supervision/utils/video.py
def __init__(self, sample_size: int = 30):
    """
    Args:
        sample_size (int): The maximum number of observations for latency
            benchmarking.

    Examples:
        ```python
        import supervision as sv

        frames_generator = sv.get_video_frames_generator(source_path=<SOURCE_FILE_PATH>)
        fps_monitor = sv.FPSMonitor()

        for frame in frames_generator:
            # your processing code here
            fps_monitor.tick()
            fps = fps_monitor.fps
        ```
    """  # noqa: E501 // docs
    self.all_timestamps = deque(maxlen=sample_size)

reset()

Clears all the time stamps from the deque.

Source code in supervision/utils/video.py
def reset(self) -> None:
    """
    Clears all the time stamps from the deque.
    """
    self.all_timestamps.clear()

tick()

Adds a new time stamp to the deque for FPS calculation.

Source code in supervision/utils/video.py
def tick(self) -> None:
    """
    Adds a new time stamp to the deque for FPS calculation.
    """
    self.all_timestamps.append(time.monotonic())

supervision.utils.video.get_video_frames_generator(source_path, stride=1, start=0, end=None, iterative_seek=False)

Get a generator that yields the frames of the video.

Parameters:

Name Type Description Default

source_path

str

The path of the video file.

required

stride

int

Indicates the interval at which frames are returned, skipping stride - 1 frames between each.

1

start

int

Indicates the starting position from which video should generate frames

0

end

Optional[int]

Indicates the ending position at which video should stop generating frames. If None, video will be read to the end.

None

iterative_seek

bool

If True, the generator will seek to the start frame by grabbing each frame, which is much slower. This is a workaround for videos that don't open at all when you set the start value.

False

Returns:

Type Description
Generator[ndarray, None, None]

A generator that yields the frames of the video.

Examples:

import supervision as sv

for frame in sv.get_video_frames_generator(source_path=<SOURCE_VIDEO_PATH>):
    ...
Source code in supervision/utils/video.py
def get_video_frames_generator(
    source_path: str,
    stride: int = 1,
    start: int = 0,
    end: int | None = None,
    iterative_seek: bool = False,
) -> Generator[np.ndarray]:
    """
    Get a generator that yields the frames of the video.

    Args:
        source_path (str): The path of the video file.
        stride (int): Indicates the interval at which frames are returned,
            skipping stride - 1 frames between each.
        start (int): Indicates the starting position from which
            video should generate frames
        end (Optional[int]): Indicates the ending position at which video
            should stop generating frames. If None, video will be read to the end.
        iterative_seek (bool): If True, the generator will seek to the
            `start` frame by grabbing each frame, which is much slower. This is a
            workaround for videos that don't open at all when you set the `start` value.

    Returns:
        (Generator[np.ndarray, None, None]): A generator that yields the
            frames of the video.

    Examples:
        ```python
        import supervision as sv

        for frame in sv.get_video_frames_generator(source_path=<SOURCE_VIDEO_PATH>):
            ...
        ```
    """
    video, start, end = _validate_and_setup_video(
        source_path, start, end, iterative_seek
    )
    frame_position = start
    while True:
        success, frame = video.read()
        if not success or frame_position >= end:
            break
        yield frame
        for _ in range(stride - 1):
            success = video.grab()
            if not success:
                break
        frame_position += stride
    video.release()

supervision.utils.video.process_video(source_path, target_path, callback, *, max_frames=None, prefetch=32, writer_buffer=32, show_progress=False, progress_message='Processing video')

Process video frames asynchronously using a threaded pipeline.

This function orchestrates a three-stage pipeline to optimize video processing throughput:

  1. Reader thread: Continuously reads frames from the source video file and enqueues them into a bounded queue (frame_read_queue). The queue size is limited by the prefetch parameter to control memory usage.
  2. Main thread (Processor): Dequeues frames from frame_read_queue, applies the user-defined callback function to process each frame, then enqueues the processed frames into another bounded queue (frame_write_queue) for writing. The processing happens in the main thread, simplifying use of stateful objects without synchronization.
  3. Writer thread: Dequeues processed frames from frame_write_queue and writes them sequentially to the output video file.

Parameters:

Name Type Description Default

source_path

str

Path to the input video file.

required

target_path

str

Path where the processed video will be saved.

required

callback

Callable[[ndarray, int], ndarray]

Function called for each frame, accepting the frame as a numpy array and its zero-based index, returning the processed frame.

required

max_frames

int | None

Optional maximum number of frames to process. If None, the entire video is processed (default).

None

prefetch

int

Maximum number of frames buffered by the reader thread. Controls memory use; default is 32.

32

writer_buffer

int

Maximum number of frames buffered before writing. Controls output buffer size; default is 32.

32

show_progress

bool

Whether to display a tqdm progress bar during processing. Default is False.

False

progress_message

str

Description shown in the progress bar.

'Processing video'

Returns:

Type Description
None

None

Example
import cv2
import supervision as sv
from rfdetr import RFDETRMedium

model = RFDETRMedium()

def callback(frame, frame_index):
    return model.predict(frame)

process_video(
    source_path="source.mp4",
    target_path="target.mp4",
    callback=frame_callback,
)
Source code in supervision/utils/video.py
def process_video(
    source_path: str,
    target_path: str,
    callback: Callable[[np.ndarray, int], np.ndarray],
    *,
    max_frames: int | None = None,
    prefetch: int = 32,
    writer_buffer: int = 32,
    show_progress: bool = False,
    progress_message: str = "Processing video",
) -> None:
    """
    Process video frames asynchronously using a threaded pipeline.

    This function orchestrates a three-stage pipeline to optimize video processing
    throughput:

    1. Reader thread: Continuously reads frames from the source video file and
       enqueues them into a bounded queue (`frame_read_queue`). The queue size is
       limited by the `prefetch` parameter to control memory usage.
    2. Main thread (Processor): Dequeues frames from `frame_read_queue`, applies the
       user-defined `callback` function to process each frame, then enqueues the
       processed frames into another bounded queue (`frame_write_queue`) for writing.
       The processing happens in the main thread, simplifying use of stateful objects
       without synchronization.
    3. Writer thread: Dequeues processed frames from `frame_write_queue` and writes
       them sequentially to the output video file.

    Args:
        source_path (str): Path to the input video file.
        target_path (str): Path where the processed video will be saved.
        callback (Callable[[numpy.ndarray, int], numpy.ndarray]): Function called for
            each frame, accepting the frame as a numpy array and its zero-based index,
            returning the processed frame.
        max_frames (int | None): Optional maximum number of frames to process.
            If None, the entire video is processed (default).
        prefetch (int): Maximum number of frames buffered by the reader thread.
            Controls memory use; default is 32.
        writer_buffer (int): Maximum number of frames buffered before writing.
            Controls output buffer size; default is 32.
        show_progress (bool): Whether to display a tqdm progress bar during processing.
            Default is False.
        progress_message (str): Description shown in the progress bar.

    Returns:
        None

    Example:
        ```python
        import cv2
        import supervision as sv
        from rfdetr import RFDETRMedium

        model = RFDETRMedium()

        def callback(frame, frame_index):
            return model.predict(frame)

        process_video(
            source_path="source.mp4",
            target_path="target.mp4",
            callback=frame_callback,
        )
        ```
    """
    video_info = VideoInfo.from_video_path(video_path=source_path)
    total_frames = (
        min(video_info.total_frames, max_frames)
        if max_frames is not None
        else video_info.total_frames
    )

    frame_read_queue: Queue[tuple[int, np.ndarray] | None] = Queue(maxsize=prefetch)
    frame_write_queue: Queue[np.ndarray | None] = Queue(maxsize=writer_buffer)

    def reader_thread() -> None:
        frame_generator = get_video_frames_generator(
            source_path=source_path,
            end=max_frames,
        )
        for frame_index, frame in enumerate(frame_generator):
            frame_read_queue.put((frame_index, frame))
        frame_read_queue.put(None)

    def writer_thread(video_sink: VideoSink) -> None:
        while True:
            frame = frame_write_queue.get()
            if frame is None:
                break
            video_sink.write_frame(frame=frame)

    reader_worker = threading.Thread(target=reader_thread, daemon=True)
    with VideoSink(target_path=target_path, video_info=video_info) as video_sink:
        writer_worker = threading.Thread(
            target=writer_thread,
            args=(video_sink,),
            daemon=True,
        )

        reader_worker.start()
        writer_worker.start()

        progress_bar = tqdm(
            total=total_frames,
            disable=not show_progress,
            desc=progress_message,
        )

        try:
            while True:
                read_item = frame_read_queue.get()
                if read_item is None:
                    break

                frame_index, frame = read_item
                processed_frame = callback(frame, frame_index)

                frame_write_queue.put(processed_frame)
                progress_bar.update(1)
        finally:
            frame_write_queue.put(None)
            reader_worker.join()
            writer_worker.join()
            progress_bar.close()

Comments