How To Analyze Occupancy with Supervision¶
In this notebook, we'll use a parking lot to demonstrate how we can extract numerous informative metrics and detailed graphics, all from one video, using Supervision.
This notebook accompanies the Occupancy Analytics with Computer Vision tutorial on the Roboflow Blog. Check it out for deeper explanations and context!
In this notebook, we will cover the following:
- Getting training data
- Training a object detection model
- Detect vehicles
- Analyze data and generate statistics
Before You Start¶
Let's make sure that we have access to GPU. We can use nvidia-smi
command to do that. In case of any problems navigate to Edit
-> Notebook settings
-> Hardware accelerator
, set it to GPU
, and then click Save
.
!nvidia-smi
Install Relevant Packages¶
Here, we will install the Roboflow package, for uploading and training our model, and Supervision for visualization and extracting metrics from our predicted model results.
!pip install roboflow supervision==0.19.0 -q
Getting Video Data¶
We will start with turning a single video into a folder of frame images, for training our model. Upload your video and set your video's file path here.
VIDEO_PATH = "/content/parkinglot1080.mov"
First, let's create a directory to save the video frames
import os
FRAMES_DIR = "/content/frames"
os.mkdir(FRAMES_DIR)
Then, we can use Supervision's get_video_frames_generator
function to get, then save, our video frames
import supervision as sv
from PIL import Image
frames_generator = sv.get_video_frames_generator(VIDEO_PATH)
for i, frame in enumerate(frames_generator):
img = Image.fromarray(frame)
img.save(f"{FRAMES_DIR}/{i}.jpg")
print(f"Saved frames to {FRAMES_DIR}")
Saved frames to /content/frames
Random Crop Sampling (If Using SAHI)¶
If we are using SAHI (which we are in our example), randomly sampling cropped portions of our image can help mimic the effect of SAHI detection during training, improving performance.
# Note: This code block was written by ChatGPT
import os
import random
from PIL import Image
import numpy as np
# import shutil
# shutil.rmtree("augmented")
def random_crop(img):
width, height = img.size
crop_width = random.randint(int(width * 0.1), int(width * 0.4))
crop_height = random.randint(int(height * 0.1), int(height * 0.4))
left = random.randint(0, width - crop_width)
top = random.randint(0, height - crop_height)
return img.crop((left, top, left + crop_width, top + crop_height))
def augment_images(source_folder, target_folder, num_images=100):
if not os.path.exists(target_folder):
os.makedirs(target_folder)
all_images = [file for file in os.listdir(source_folder) if file.endswith('.jpg')]
selected_images = np.random.choice(all_images, size=min(num_images, len(all_images)), replace=False)
for i, filename in enumerate(selected_images):
with Image.open(os.path.join(source_folder, filename)) as img:
cropped_img = random_crop(img)
cropped_img.save(os.path.join(target_folder, f'augmented_{i}.jpg'))
# Paths to the source and target folders
source_folder = '/content/frames'
target_folder = '/content/augmented'
# Augment images
augment_images(source_folder, target_folder)
Training a Model¶
Now that we have our images, we can upload our extracted frames as training data to Roboflow.
Upload Training Data¶
# Upload the extracted frames to Roboflow
import os
import roboflow
rf = roboflow.Roboflow(api_key="YOUR_ROBOFLOW_API_KEY")
project = rf.workspace().project("parking-lot-occupancy-detection-eoaek")
for filename in os.listdir(FRAMES_DIR):
img_path = os.path.join(FRAMES_DIR, filename)
if os.path.isfile(img_path):
project.upload(image_path=img_path)
loading Roboflow workspace... loading Roboflow project...
Training Model Using Autodistill (Optional)¶
We can train our model using Automated Labeling, powered by Autodistill, to automatically label our data. Copy the code required for this section from the Roboflow app.
Note: It's not required to use Autodistill
# PASTE CODE FROM ROBOFLOW HERE
Vehicle Detection¶
Now, we can run our model to get inference data for our video data.
Setup Model¶
First, set the model up as a callback function so that we can call it later on while using Supervision.
from roboflow import Roboflow
import supervision as sv
import numpy as np
import cv2
rf = Roboflow(api_key="YOUR_ROBOFLOW_API_KEY") # Get your own API key - This one won't work
project = rf.workspace().project("parking-lot-occupancy-detection-eoaek")
model = project.version("5").model
def callback(x: np.ndarray) -> sv.Detections:
result = model.predict(x, confidence=25, overlap=30).json()
return sv.Detections.from_inference(result)
loading Roboflow workspace... loading Roboflow project...
Configure Zones¶
Next, we will set up a list of the zones to be used with PolygonZone. You can get these polygon coordinates using this web utility.
For our example, we have have zones, but you can add as many or as little zones as you would like.
# Polygons From PolygonZone
zones = [
{
'name': "Zone 1",
'polygon': np.array([[229, 50],[-3, 306],[1, 614],[369, 50]]),
'max': 32
},
{
'name': 'Zone 2',
'polygon': np.array([[465, 46],[177, 574],[401, 578],[609, 46]]),
'max': 38
},
{
'name': 'Zone 3',
'polygon': np.array([[697, 58],[461, 858],[737, 858],[849, 58]]),
'max': 46
},
{
'name': 'Zone 4',
'polygon': np.array([[941, 58],[909, 862],[1273, 858],[1137, 58]]),
'max': 48
},
{
'name': 'Zone 5',
'polygon': np.array([[1229, 46],[1501, 1078],[1889, 1078],[1405, 46]]),
'max': 52
}
]
Setup Supervision¶
For our use case, we will use the following features of Supervision. Refer to the linked documentation for more details:
- ByteTrack: To track the location of our vehicles, so we can assess how long they are parked
- InferenceSlicer: A helper utility to run SAHI on our model
- TriangleAnnotator: To help visualize the locations of the vehicles
- HeatMapAnnotator: To generate heatmaps so we can identify our busiest areas
- PolygonZone, PolygonZoneAnnotator: To help count and identify vehicles in our respective zones and the annotator to help visualize those zones.
tracker = sv.ByteTrack()
slicer = sv.InferenceSlicer(
callback=callback,
slice_wh=(800, 800),
overlap_ratio_wh=(0.2, 0.2),
thread_workers=10,
iou_threshold=0.2
)
triangle_annotator = sv.TriangleAnnotator(
base=20,
height=20
)
heat_map_annotator = sv.HeatMapAnnotator()
def setup_zones(frame_wh):
if zones:
for zone in zones:
zone['history'] = []
zone['PolygonZone'] = sv.PolygonZone(
polygon=zone['polygon'],
frame_resolution_wh=frame_wh
)
zone['PolygonZoneAnnotator'] = sv.PolygonZoneAnnotator(
zone=zone['PolygonZone'],
color=sv.Color.WHITE,
thickness=4,
)
def process_frame(frame,heatmap=None):
detections = slicer(image=frame)
detections = tracker.update_with_detections(detections)
annotated_frame = frame.copy()
annotated_frame = triangle_annotator.annotate(
scene=annotated_frame,
detections=detections
)
if heatmap is None:
heatmap = np.full(frame.shape, 255, dtype=np.uint8)
heat_map_annotator.annotate(
scene=heatmap,
detections=detections
)
if zones:
for zone in zones:
zone_presence = zone['PolygonZone'].trigger(detections)
zone_present_idxs = [idx for idx, present in enumerate(zone_presence) if present]
zone_present = detections[zone_present_idxs]
zone_count = len(zone_present)
zone['history'].append(zone_count)
annotated_frame = zone['PolygonZoneAnnotator'].annotate(
scene=annotated_frame,
label=f"{zone['name']}: {zone_count}"
)
# Heatmap
heatmap = zone['PolygonZoneAnnotator'].annotate(
scene=heatmap,
label=" "
)
return annotated_frame, heatmap
Try With a Single Image¶
image = cv2.imread("./frames/5.jpg")
image_wh = (image.shape[1],image.shape[0])
setup_zones(image_wh)
annotated_image, heatmap = process_frame(image)
sv.plot_image(annotated_image)
sv.plot_image(heatmap)
(1920, 1080)