Skip to content

Save Detections

CSV Sink

supervision.detection.tools.csv_sink.CSVSink

A utility class for saving detection data to a CSV file. This class is designed to efficiently serialize detection objects into a CSV format, allowing for the inclusion of bounding box coordinates and additional attributes like confidence, class_id, and tracker_id.

Tip

CSVSink allows passing custom data alongside detection fields, providing flexibility for logging various types of information. When a list or tuple value in custom_data (or detections.data) has the same length as the detection count, each element is written to the corresponding detection row; any other value is broadcast to all rows.

Parameters:

Name Type Description Default

file_name

str

The name of the CSV file where the detections will be stored. Defaults to 'output.csv'.

'output.csv'
Example
>>> import supervision as sv
>>> import numpy as np
>>> import tempfile
>>> import os
>>> # Create synthetic detections
>>> detections = sv.Detections(
...     xyxy=np.array([[10, 20, 30, 40], [50, 60, 70, 80]]),
...     confidence=np.array([0.9, 0.8]),
...     class_id=np.array([0, 1])
... )
>>> # Use temporary file
>>> temp_file = tempfile.NamedTemporaryFile(
...     mode='w', suffix='.csv', delete=False
... )
>>> temp_file.close()
>>> csv_sink = sv.CSVSink(temp_file.name)
>>> with csv_sink as sink:
...     sink.append(detections, custom_data={'frame': 0})
>>> os.unlink(temp_file.name)  # Clean up
Source code in src/supervision/detection/tools/csv_sink.py
class CSVSink:
    """
    A utility class for saving detection data to a CSV file. This class is designed to
    efficiently serialize detection objects into a CSV format, allowing for the
    inclusion of bounding box coordinates and additional attributes like `confidence`,
    `class_id`, and `tracker_id`.

    !!! tip

        CSVSink allows passing custom data alongside detection fields, providing
        flexibility for logging various types of information.
        When a list or tuple value in custom_data (or detections.data) has the
        same length as the detection count, each element is written to the
        corresponding detection row; any other value is broadcast to all rows.

    Args:
        file_name: The name of the CSV file where the detections will be stored.
            Defaults to 'output.csv'.

    Example:
        ```pycon
        >>> import supervision as sv
        >>> import numpy as np
        >>> import tempfile
        >>> import os
        >>> # Create synthetic detections
        >>> detections = sv.Detections(
        ...     xyxy=np.array([[10, 20, 30, 40], [50, 60, 70, 80]]),
        ...     confidence=np.array([0.9, 0.8]),
        ...     class_id=np.array([0, 1])
        ... )
        >>> # Use temporary file
        >>> temp_file = tempfile.NamedTemporaryFile(
        ...     mode='w', suffix='.csv', delete=False
        ... )
        >>> temp_file.close()
        >>> csv_sink = sv.CSVSink(temp_file.name)
        >>> with csv_sink as sink:
        ...     sink.append(detections, custom_data={'frame': 0})
        >>> os.unlink(temp_file.name)  # Clean up

        ```
    """

    def __init__(self, file_name: str = "output.csv") -> None:
        """
        Initialize the CSVSink instance.

        Args:
            file_name: The name of the CSV file.
        """
        self.file_name = file_name
        self.file: io.TextIOWrapper | None = None
        self.writer: WriterProtocol | None = None
        self.header_written = False
        self.field_names: list[str] = []

    def __enter__(self) -> CSVSink:
        self.open()
        return self

    def __exit__(
        self,
        exc_type: type | None,
        exc_val: Exception | None,
        exc_tb: Any | None,
    ) -> None:
        self.close()

    def open(self) -> None:
        """
        Open the CSV file for writing.
        """
        parent_directory = os.path.dirname(self.file_name)
        if parent_directory and not os.path.exists(parent_directory):
            os.makedirs(parent_directory)

        self.file = open(self.file_name, "w", newline="")
        self.writer = csv.writer(self.file)

    def close(self) -> None:
        """
        Close the CSV file.
        """
        if self.file:
            self.file.close()

    @staticmethod
    def _slice_value(value: Any, i: int, n: int) -> Any:
        """
        Return the i-th element when the value stores per-detection data.

        Dispatch rules:
            - np.ndarray with ndim == 0: return as-is for broadcasting
            - np.ndarray with ndim >= 1: return value[i]
            - list or tuple with len equal to n: return value[i]
            - any other type: return as-is for broadcasting

        Args:
            value: Custom-data field value.
            i: Zero-based detection index.
            n: Total number of detections.

        Returns:
            Element at position i if value is a per-detection sequence,
            otherwise value unchanged.
        """
        if isinstance(value, np.ndarray):
            return value if value.ndim == 0 else value[i]
        if isinstance(value, (list, tuple)) and len(value) == n:
            return value[i]
        return value

    @staticmethod
    def parse_detection_data(
        detections: Detections, custom_data: dict[str, Any] | None = None
    ) -> list[dict[str, Any]]:
        """
        Convert detections and optional custom data into per-detection rows.

        Builds one dictionary per detection containing bounding box coordinates,
        detection attributes, and any values from ``detections.data`` or
        ``custom_data``. List and tuple values in ``custom_data`` with length
        equal to ``len(detections.xyxy)`` are sliced one element per row; all
        other values are broadcast to every row.

        Args:
            detections: Detection data to serialize into row dictionaries.
            custom_data: Optional extra fields to include in each row.

        Returns:
            A list of dictionaries, one per detection, containing ``xyxy``
            coordinates, ``class_id``, ``confidence``, ``tracker_id``, and any
            values from ``detections.data`` or ``custom_data``.
        """
        parsed_rows = []
        n = len(detections.xyxy)
        for i in range(n):
            row = {
                "x_min": detections.xyxy[i][0],
                "y_min": detections.xyxy[i][1],
                "x_max": detections.xyxy[i][2],
                "y_max": detections.xyxy[i][3],
                "class_id": ""
                if detections.class_id is None
                else str(detections.class_id[i]),
                "confidence": ""
                if detections.confidence is None
                else str(detections.confidence[i]),
                "tracker_id": ""
                if detections.tracker_id is None
                else str(detections.tracker_id[i]),
            }

            if hasattr(detections, "data"):
                for key, value in detections.data.items():
                    row[key] = CSVSink._slice_value(value, i, n)

            if custom_data:
                for key, value in custom_data.items():
                    row[key] = CSVSink._slice_value(value, i, n)

            parsed_rows.append(row)
        return parsed_rows

    def append(
        self, detections: Detections, custom_data: dict[str, Any] | None = None
    ) -> None:
        """
        Append detection data to the CSV file.

        Args:
            detections: The detection data.
            custom_data: Custom data to include. Scalars, dictionaries, and
                other non-sequence values are broadcast to every detection in
                this batch. NumPy arrays, lists, and tuples with length equal
                to ``len(detections)`` are sliced per detection; other lists
                and tuples are broadcast unchanged.
        """
        if not self.writer:
            raise Exception(
                f"Cannot append to CSV: The file '{self.file_name}' is not open."
            )
        field_names = CSVSink.parse_field_names(detections, custom_data)
        if not self.header_written:
            self.field_names = field_names
            self.writer.writerow(field_names)
            self.header_written = True

        if field_names != self.field_names:
            logger.warning(
                "Field names do not match the header. Expected: %s, given: %s",
                self.field_names,
                field_names,
            )

        parsed_rows = CSVSink.parse_detection_data(detections, custom_data)
        for row in parsed_rows:
            self.writer.writerow(
                [row.get(field_name, "") for field_name in self.field_names]
            )

    @staticmethod
    def parse_field_names(
        detections: Detections, custom_data: dict[str, Any] | None = None
    ) -> list[str]:
        custom_keys = set(custom_data.keys()) if custom_data else set()
        dynamic_header = sorted(
            custom_keys | set(getattr(detections, "data", {}).keys())
        )
        return BASE_HEADER + dynamic_header

Functions

__init__(file_name: str = 'output.csv') -> None

Initialize the CSVSink instance.

Parameters:

Name Type Description Default
file_name
str

The name of the CSV file.

'output.csv'
Source code in src/supervision/detection/tools/csv_sink.py
def __init__(self, file_name: str = "output.csv") -> None:
    """
    Initialize the CSVSink instance.

    Args:
        file_name: The name of the CSV file.
    """
    self.file_name = file_name
    self.file: io.TextIOWrapper | None = None
    self.writer: WriterProtocol | None = None
    self.header_written = False
    self.field_names: list[str] = []

append(detections: Detections, custom_data: dict[str, Any] | None = None) -> None

Append detection data to the CSV file.

Parameters:

Name Type Description Default
detections
Detections

The detection data.

required
custom_data
dict[str, Any] | None

Custom data to include. Scalars, dictionaries, and other non-sequence values are broadcast to every detection in this batch. NumPy arrays, lists, and tuples with length equal to len(detections) are sliced per detection; other lists and tuples are broadcast unchanged.

None
Source code in src/supervision/detection/tools/csv_sink.py
def append(
    self, detections: Detections, custom_data: dict[str, Any] | None = None
) -> None:
    """
    Append detection data to the CSV file.

    Args:
        detections: The detection data.
        custom_data: Custom data to include. Scalars, dictionaries, and
            other non-sequence values are broadcast to every detection in
            this batch. NumPy arrays, lists, and tuples with length equal
            to ``len(detections)`` are sliced per detection; other lists
            and tuples are broadcast unchanged.
    """
    if not self.writer:
        raise Exception(
            f"Cannot append to CSV: The file '{self.file_name}' is not open."
        )
    field_names = CSVSink.parse_field_names(detections, custom_data)
    if not self.header_written:
        self.field_names = field_names
        self.writer.writerow(field_names)
        self.header_written = True

    if field_names != self.field_names:
        logger.warning(
            "Field names do not match the header. Expected: %s, given: %s",
            self.field_names,
            field_names,
        )

    parsed_rows = CSVSink.parse_detection_data(detections, custom_data)
    for row in parsed_rows:
        self.writer.writerow(
            [row.get(field_name, "") for field_name in self.field_names]
        )

close() -> None

Close the CSV file.

Source code in src/supervision/detection/tools/csv_sink.py
def close(self) -> None:
    """
    Close the CSV file.
    """
    if self.file:
        self.file.close()

open() -> None

Open the CSV file for writing.

Source code in src/supervision/detection/tools/csv_sink.py
def open(self) -> None:
    """
    Open the CSV file for writing.
    """
    parent_directory = os.path.dirname(self.file_name)
    if parent_directory and not os.path.exists(parent_directory):
        os.makedirs(parent_directory)

    self.file = open(self.file_name, "w", newline="")
    self.writer = csv.writer(self.file)

parse_detection_data(detections: Detections, custom_data: dict[str, Any] | None = None) -> list[dict[str, Any]] staticmethod

Convert detections and optional custom data into per-detection rows.

Builds one dictionary per detection containing bounding box coordinates, detection attributes, and any values from detections.data or custom_data. List and tuple values in custom_data with length equal to len(detections.xyxy) are sliced one element per row; all other values are broadcast to every row.

Parameters:

Name Type Description Default
detections
Detections

Detection data to serialize into row dictionaries.

required
custom_data
dict[str, Any] | None

Optional extra fields to include in each row.

None

Returns:

Type Description
list[dict[str, Any]]

A list of dictionaries, one per detection, containing xyxy

list[dict[str, Any]]

coordinates, class_id, confidence, tracker_id, and any

list[dict[str, Any]]

values from detections.data or custom_data.

Source code in src/supervision/detection/tools/csv_sink.py
@staticmethod
def parse_detection_data(
    detections: Detections, custom_data: dict[str, Any] | None = None
) -> list[dict[str, Any]]:
    """
    Convert detections and optional custom data into per-detection rows.

    Builds one dictionary per detection containing bounding box coordinates,
    detection attributes, and any values from ``detections.data`` or
    ``custom_data``. List and tuple values in ``custom_data`` with length
    equal to ``len(detections.xyxy)`` are sliced one element per row; all
    other values are broadcast to every row.

    Args:
        detections: Detection data to serialize into row dictionaries.
        custom_data: Optional extra fields to include in each row.

    Returns:
        A list of dictionaries, one per detection, containing ``xyxy``
        coordinates, ``class_id``, ``confidence``, ``tracker_id``, and any
        values from ``detections.data`` or ``custom_data``.
    """
    parsed_rows = []
    n = len(detections.xyxy)
    for i in range(n):
        row = {
            "x_min": detections.xyxy[i][0],
            "y_min": detections.xyxy[i][1],
            "x_max": detections.xyxy[i][2],
            "y_max": detections.xyxy[i][3],
            "class_id": ""
            if detections.class_id is None
            else str(detections.class_id[i]),
            "confidence": ""
            if detections.confidence is None
            else str(detections.confidence[i]),
            "tracker_id": ""
            if detections.tracker_id is None
            else str(detections.tracker_id[i]),
        }

        if hasattr(detections, "data"):
            for key, value in detections.data.items():
                row[key] = CSVSink._slice_value(value, i, n)

        if custom_data:
            for key, value in custom_data.items():
                row[key] = CSVSink._slice_value(value, i, n)

        parsed_rows.append(row)
    return parsed_rows

JSON Sink

supervision.detection.tools.json_sink.JSONSink

A utility class for saving detection data to a JSON file. This class is designed to efficiently serialize detection objects into a JSON format, allowing for the inclusion of bounding box coordinates and additional attributes like confidence, class_id, and tracker_id.

Tip

JSONSink allows passing custom data alongside detection fields, providing flexibility for logging various types of information. When a list or tuple value in custom_data (or detections.data) has the same length as the detection count, each element is written to the corresponding detection row; any other value is broadcast to all rows.

Parameters:

Name Type Description Default

file_name

str

The name of the JSON file where the detections will be stored. Defaults to 'output.json'.

'output.json'
Example
import supervision as sv
from ultralytics import YOLO

model = YOLO("<SOURCE_MODEL_PATH>")
json_sink = sv.JSONSink(<RESULT_JSON_FILE_PATH>)
frames_generator = sv.get_video_frames_generator("<SOURCE_VIDEO_PATH>")

with json_sink as sink:
    for frame in frames_generator:
        result = model(frame)[0]
        detections = sv.Detections.from_ultralytics(result)
        sink.append(detections, custom_data={"<CUSTOM_LABEL>":"<CUSTOM_DATA>"})
Source code in src/supervision/detection/tools/json_sink.py
class JSONSink:
    """
    A utility class for saving detection data to a JSON file. This class is designed to
    efficiently serialize detection objects into a JSON format, allowing for the
    inclusion of bounding box coordinates and additional attributes like `confidence`,
    `class_id`, and `tracker_id`.

    !!! tip

        JSONSink allows passing custom data alongside detection fields, providing
        flexibility for logging various types of information.
        When a list or tuple value in custom_data (or detections.data) has the
        same length as the detection count, each element is written to the
        corresponding detection row; any other value is broadcast to all rows.

    Args:
        file_name: The name of the JSON file where the detections will be stored.
            Defaults to 'output.json'.

    Example:
        ```python
        import supervision as sv
        from ultralytics import YOLO

        model = YOLO("<SOURCE_MODEL_PATH>")
        json_sink = sv.JSONSink(<RESULT_JSON_FILE_PATH>)
        frames_generator = sv.get_video_frames_generator("<SOURCE_VIDEO_PATH>")

        with json_sink as sink:
            for frame in frames_generator:
                result = model(frame)[0]
                detections = sv.Detections.from_ultralytics(result)
                sink.append(detections, custom_data={"<CUSTOM_LABEL>":"<CUSTOM_DATA>"})
        ```
    """

    def __init__(self, file_name: str = "output.json") -> None:
        """
        Initialize the JSONSink instance.

        Args:
            file_name: The name of the JSON file.
        """
        self.file_name = file_name
        self.file: io.TextIOWrapper | None = None
        self.data: list[dict[str, Any]] = []

    def __enter__(self) -> JSONSink:
        self.open()
        return self

    def __exit__(
        self,
        exc_type: type | None,
        exc_val: Exception | None,
        exc_tb: Any | None,
    ) -> None:
        self.write_and_close()

    def open(self) -> None:
        """
        Open the JSON file for writing.
        """
        parent_directory = os.path.dirname(self.file_name)
        if parent_directory and not os.path.exists(parent_directory):
            os.makedirs(parent_directory)

        self.file = open(self.file_name, "w")

    def write_and_close(self) -> None:
        """
        Write and close the JSON file.
        """
        if self.file:
            json.dump(self.data, self.file, indent=4)
            self.file.close()

    @staticmethod
    def _slice_value(value: Any, i: int, n: int) -> Any:
        """
        Return the i-th element when the value stores per-detection data.

        Dispatch rules:
            - np.ndarray with ndim == 0: return as-is for broadcasting
            - np.ndarray with ndim >= 1: return value[i]
            - list or tuple with len equal to n: return value[i]
            - any other type: return as-is for broadcasting

        Args:
            value: Custom-data field value.
            i: Zero-based detection index.
            n: Total number of detections.

        Returns:
            Element at position i if value is a per-detection sequence,
            otherwise value unchanged.
        """
        if isinstance(value, np.ndarray):
            return value if value.ndim == 0 else value[i]
        if isinstance(value, (list, tuple)) and len(value) == n:
            return value[i]
        return value

    @staticmethod
    def parse_detection_data(
        detections: Detections, custom_data: dict[str, Any] | None = None
    ) -> list[dict[str, Any]]:
        """
        Convert detections and optional custom data into per-detection rows.

        Builds one dictionary per detection containing bounding box coordinates,
        detection attributes, and any values from ``detections.data`` or
        ``custom_data``. List and tuple values in ``custom_data`` with length
        equal to ``len(detections.xyxy)`` are sliced one element per row; all
        other values are broadcast to every row.

        Args:
            detections: Detection data to serialize into row dictionaries.
            custom_data: Optional extra fields to include in each row.

        Returns:
            A list of dictionaries, one per detection, containing ``xyxy``
            coordinates, ``class_id``, ``confidence``, ``tracker_id``, and any
            values from ``detections.data`` or ``custom_data``.
        """
        parsed_rows = []
        n = len(detections.xyxy)
        for i in range(n):
            row = {
                "x_min": float(detections.xyxy[i][0]),
                "y_min": float(detections.xyxy[i][1]),
                "x_max": float(detections.xyxy[i][2]),
                "y_max": float(detections.xyxy[i][3]),
                "class_id": ""
                if detections.class_id is None
                else int(detections.class_id[i]),
                "confidence": ""
                if detections.confidence is None
                else float(detections.confidence[i]),
                "tracker_id": ""
                if detections.tracker_id is None
                else int(detections.tracker_id[i]),
            }

            if hasattr(detections, "data"):
                for key, value in detections.data.items():
                    row[key] = str(JSONSink._slice_value(value, i, n))

            if custom_data:
                for key, value in custom_data.items():
                    v = JSONSink._slice_value(value, i, n)
                    row[key] = str(v) if isinstance(value, np.ndarray) else v

            parsed_rows.append(row)
        return parsed_rows

    def append(
        self, detections: Detections, custom_data: dict[str, Any] | None = None
    ) -> None:
        """
        Append detection data to the JSON file.

        Args:
            detections: The detection data.
            custom_data: Custom data to include. Scalars, dictionaries, and
                other non-sequence values are broadcast to every detection in
                this batch. NumPy arrays, lists, and tuples with length equal
                to ``len(detections)`` are sliced per detection; other lists
                and tuples are broadcast unchanged.
        """
        parsed_rows = JSONSink.parse_detection_data(detections, custom_data)
        self.data.extend(parsed_rows)

Functions

__init__(file_name: str = 'output.json') -> None

Initialize the JSONSink instance.

Parameters:

Name Type Description Default
file_name
str

The name of the JSON file.

'output.json'
Source code in src/supervision/detection/tools/json_sink.py
def __init__(self, file_name: str = "output.json") -> None:
    """
    Initialize the JSONSink instance.

    Args:
        file_name: The name of the JSON file.
    """
    self.file_name = file_name
    self.file: io.TextIOWrapper | None = None
    self.data: list[dict[str, Any]] = []

append(detections: Detections, custom_data: dict[str, Any] | None = None) -> None

Append detection data to the JSON file.

Parameters:

Name Type Description Default
detections
Detections

The detection data.

required
custom_data
dict[str, Any] | None

Custom data to include. Scalars, dictionaries, and other non-sequence values are broadcast to every detection in this batch. NumPy arrays, lists, and tuples with length equal to len(detections) are sliced per detection; other lists and tuples are broadcast unchanged.

None
Source code in src/supervision/detection/tools/json_sink.py
def append(
    self, detections: Detections, custom_data: dict[str, Any] | None = None
) -> None:
    """
    Append detection data to the JSON file.

    Args:
        detections: The detection data.
        custom_data: Custom data to include. Scalars, dictionaries, and
            other non-sequence values are broadcast to every detection in
            this batch. NumPy arrays, lists, and tuples with length equal
            to ``len(detections)`` are sliced per detection; other lists
            and tuples are broadcast unchanged.
    """
    parsed_rows = JSONSink.parse_detection_data(detections, custom_data)
    self.data.extend(parsed_rows)

open() -> None

Open the JSON file for writing.

Source code in src/supervision/detection/tools/json_sink.py
def open(self) -> None:
    """
    Open the JSON file for writing.
    """
    parent_directory = os.path.dirname(self.file_name)
    if parent_directory and not os.path.exists(parent_directory):
        os.makedirs(parent_directory)

    self.file = open(self.file_name, "w")

parse_detection_data(detections: Detections, custom_data: dict[str, Any] | None = None) -> list[dict[str, Any]] staticmethod

Convert detections and optional custom data into per-detection rows.

Builds one dictionary per detection containing bounding box coordinates, detection attributes, and any values from detections.data or custom_data. List and tuple values in custom_data with length equal to len(detections.xyxy) are sliced one element per row; all other values are broadcast to every row.

Parameters:

Name Type Description Default
detections
Detections

Detection data to serialize into row dictionaries.

required
custom_data
dict[str, Any] | None

Optional extra fields to include in each row.

None

Returns:

Type Description
list[dict[str, Any]]

A list of dictionaries, one per detection, containing xyxy

list[dict[str, Any]]

coordinates, class_id, confidence, tracker_id, and any

list[dict[str, Any]]

values from detections.data or custom_data.

Source code in src/supervision/detection/tools/json_sink.py
@staticmethod
def parse_detection_data(
    detections: Detections, custom_data: dict[str, Any] | None = None
) -> list[dict[str, Any]]:
    """
    Convert detections and optional custom data into per-detection rows.

    Builds one dictionary per detection containing bounding box coordinates,
    detection attributes, and any values from ``detections.data`` or
    ``custom_data``. List and tuple values in ``custom_data`` with length
    equal to ``len(detections.xyxy)`` are sliced one element per row; all
    other values are broadcast to every row.

    Args:
        detections: Detection data to serialize into row dictionaries.
        custom_data: Optional extra fields to include in each row.

    Returns:
        A list of dictionaries, one per detection, containing ``xyxy``
        coordinates, ``class_id``, ``confidence``, ``tracker_id``, and any
        values from ``detections.data`` or ``custom_data``.
    """
    parsed_rows = []
    n = len(detections.xyxy)
    for i in range(n):
        row = {
            "x_min": float(detections.xyxy[i][0]),
            "y_min": float(detections.xyxy[i][1]),
            "x_max": float(detections.xyxy[i][2]),
            "y_max": float(detections.xyxy[i][3]),
            "class_id": ""
            if detections.class_id is None
            else int(detections.class_id[i]),
            "confidence": ""
            if detections.confidence is None
            else float(detections.confidence[i]),
            "tracker_id": ""
            if detections.tracker_id is None
            else int(detections.tracker_id[i]),
        }

        if hasattr(detections, "data"):
            for key, value in detections.data.items():
                row[key] = str(JSONSink._slice_value(value, i, n))

        if custom_data:
            for key, value in custom_data.items():
                v = JSONSink._slice_value(value, i, n)
                row[key] = str(v) if isinstance(value, np.ndarray) else v

        parsed_rows.append(row)
    return parsed_rows

write_and_close() -> None

Write and close the JSON file.

Source code in src/supervision/detection/tools/json_sink.py
def write_and_close(self) -> None:
    """
    Write and close the JSON file.
    """
    if self.file:
        json.dump(self.data, self.file, indent=4)
        self.file.close()

Comments