#!/usr/bin/env python3 """ Low-latency camera + ba...

Created on: December 13, 2025

Answered using GPT-5.2 by Chat01

Question

#!/usr/bin/env python3
"""
Low-latency camera + background Top-Hat detection + ViT tracker.

This version uses Top-Hat instead of Canny for ROI edge detection.
"""

import signal
import threading
import queue
import time
import math
import socket
import struct
import os

import cv2
import numpy as np
from picamera2 import Picamera2

from vittrack import VitTrack

------------------------- CONFIG -------------------------

ROI = (315, 222, 415, 322)
GAP_THRESHOLD = 0.4
OVERLAY = True

TARGET_IP = "192.168.50.1"
PORT = 9999
COORDS_PORT = 9998

TRACKING_QSIZE = 1
COORDS_QSIZE = 32

CAP_FPS = 30.0

MODEL_PATH = "/home/pi/Desktop/vittracker/object_tracking_vittrack_2023sep.onnx"

BACKEND_ID = cv2.dnn.DNN_BACKEND_OPENCV
TARGET_ID = cv2.dnn.DNN_TARGET_CPU

VIT_SCORE_THRESHOLD = 0.3

--------------------- ROI split finder ------------------------

def find_split_boxes_by_gap(edges_bin, gap_threshold=0.4):
if np.count_nonzero(edges_bin) == 0:
return []
ys, xs = np.where(edges_bin > 0)
if len(xs) == 0:
return []

text
minx, maxx = xs.min(), xs.max() total_width = maxx - minx if (maxx - minx) > 0 else 1 gap_threshold_px = total_width * gap_threshold xs_sorted = np.sort(np.unique(xs)) clusters = [] cur_s = xs_sorted[0]; cur_e = xs_sorted[0] for i in range(1, len(xs_sorted)): x_curr = xs_sorted[i] x_prev = xs_sorted[i - 1] if (x_curr - x_prev) > gap_threshold_px: clusters.append((cur_s, cur_e)) cur_s = x_curr cur_e = x_curr else: cur_e = x_curr clusters.append((cur_s, cur_e)) boxes = [] for x_start, x_end in clusters: mask_x = (xs >= x_start) & (xs <= x_end) ys_in_range = ys[mask_x] if len(ys_in_range) == 0: continue y_start = ys_in_range.min() y_end = ys_in_range.max() boxes.append((x_start, y_start, x_end, y_end)) return boxes

--------------------- visualize edges -------------------------

def overlay_edges(original_bgr, edges_gray, alpha=0.75, edge_color=(0,255,0)):
base = original_bgr.copy()
if edges_gray is None:
return base
edges_gray = cv2.convertScaleAbs(edges_gray)
_, bin_mask = cv2.threshold(edges_gray, 10, 255, cv2.THRESH_BINARY)
if bin_mask.sum() == 0:
return base

text
mask = bin_mask.astype(bool) overlay = base.copy() overlay[mask] = edge_color blended = cv2.addWeighted(overlay, alpha, base, 1-alpha, 0) out = base.copy() out[mask] = blended[mask] return out

-------------------- Low-latency Application --------------------

class LowLatencyEdgeTracker:
def init(self, roi=ROI, model_path=MODEL_PATH, backend_id=BACKEND_ID, target_id=TARGET_ID):
self.roi = roi
self.picam2 = Picamera2()
cfg = self.picam2.create_preview_configuration(main={"size": (730, 544)})
self.picam2.configure(cfg)
self.picam2.start()

text
self.latest_frame = None self.latest_frame_lock = threading.Lock() self.tracking_q = queue.Queue(maxsize=TRACKING_QSIZE) self.coords_q = queue.Queue(maxsize=COORDS_QSIZE) self.overlay_lock = threading.Lock() self.last_bbox = None self.last_edges_full = None self.running = threading.Event() self.running.set() self.model_path = model_path self.backend_id = backend_id self.target_id = target_id # Create VitTrack once self.vit = None try: if os.path.exists(self.model_path): self.vit = VitTrack( model_path=self.model_path, backend_id=self.backend_id, target_id=self.target_id ) print("VitTrack instance created.") except Exception as ex: self.vit = None print("VitTrack load error:", ex) self.tracker = None self.tracking = False self.threads = [] self.threads.append(threading.Thread(target=self.capture_loop, daemon=True)) self.threads.append(threading.Thread(target=self.slow_processing_loop, daemon=True)) self.threads.append(threading.Thread(target=self.local_display_loop, daemon=True)) self.threads.append(threading.Thread(target=self.streaming_loop, daemon=True)) self.threads.append(threading.Thread(target=self.coords_streaming_loop, daemon=True)) # -------------------- Start/Stop -------------------- def start(self): for t in self.threads: t.start() try: while self.running.is_set(): time.sleep(0.2) except KeyboardInterrupt: self.stop() self._cleanup() def stop(self): self.running.clear() def _cleanup(self): try: self.picam2.stop() except: pass cv2.destroyAllWindows() # -------------------- Capture thread (fast) -------------------- def capture_loop(self): fps_sleep = max(0.001, 1.0 / CAP_FPS) while self.running.is_set(): frame = self.picam2.capture_array() if frame is None: continue if frame.shape[2] == 4: frame = cv2.cvtColor(frame, cv2.COLOR_BGRA2BGR) with self.latest_frame_lock: self.latest_frame = frame try: self.tracking_q.get_nowait() except: pass try: self.tracking_q.put_nowait(frame.copy()) except: pass time.sleep(fps_sleep) # -------------------- Slow processing (Top-Hat + ViT) -------------------- def slow_processing_loop(self): frame_count = 0 x0, y0, x1, y1 = self.roi roi_center = ((x0 + x1) / 2, (y0 + y1) / 2) kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (15, 15)) while self.running.is_set(): try: frame = self.tracking_q.get(timeout=0.2) except: continue frame_count += 1 h, w = frame.shape[:2] rx0 = max(0, min(w-1, x0)) ry0 = max(0, min(h-1, y0)) rx1 = max(0, min(w-1, x1)) ry1 = max(0, min(h-1, y1)) if rx1 <= rx0 or ry1 <= ry0: continue # ---------------------- Top-Hat instead of Canny ---------------------- roi_bgr = frame[ry0:ry1, rx0:rx1] roi_gray = cv2.cvtColor(roi_bgr, cv2.COLOR_BGR2GRAY) tophat = cv2.morphologyEx(roi_gray, cv2.MORPH_TOPHAT, kernel) max_value = np.max(tophat) _, edges_bin_roi = cv2.threshold(tophat, max_value * 0.6, 255, cv2.THRESH_BINARY) edges_full = np.zeros((h, w), dtype=np.uint8) edges_full[ry0:ry1, rx0:rx1] = edges_bin_roi # ---------------------- Find clusters ---------------------- split_boxes = find_split_boxes_by_gap(edges_bin_roi, GAP_THRESHOLD) closest_box = None min_dist = 999999 for (sx0, sy0, sx1, sy1) in split_boxes: gx0 = rx0 + sx0; gy0 = ry0 + sy0 gx1 = rx0 + sx1; gy1 = ry0 + sy1 cx = (gx0 + gx1) / 2 cy = (gy0 + gy1) / 2 d = math.hypot(cx - roi_center[0], cy - roi_center[1]) if d < min_dist: min_dist = d closest_box = (gx0, gy0, gx1, gy1) # ---------------------- Init ViT tracker ---------------------- if not self.tracking and closest_box is not None and self.vit is not None: try: bx0, by0, bx1, by1 = closest_box bw = bx1 - bx0 bh = by1 - by0 self.vit.init(frame, (int(bx0), int(by0), int(bw), int(bh))) self.tracker = self.vit self.tracking = True with self.overlay_lock: self.last_bbox = (int(bx0), int(by0), int(bw), int(bh)) self.last_edges_full = edges_full.copy() print("ViT Tracker started:", self.last_bbox) except Exception as ex: print("ViT init error:", ex) self.tracker = None self.tracking = False # ---------------------- Update ViT tracker ---------------------- if self.tracking and self.tracker is not None: try: located, bbox, score = self.tracker.infer(frame) except Exception as ex: print("infer error:", ex) located, bbox, score = False, None, 0 if located and bbox is not None and score >= VIT_SCORE_THRESHOLD: x, y, wbox, hbox = map(int, bbox) with self.overlay_lock: self.last_bbox = (x, y, wbox, hbox) self.last_edges_full = edges_full.copy() try: self.coords_q.put_nowait((frame_count, x + wbox//2, y + hbox//2)) except: pass else: with self.overlay_lock: self.last_bbox = None self.last_edges_full = edges_full.copy() self.tracking = False self.tracker = None else: with self.overlay_lock: self.last_edges_full = edges_full.copy() # -------------------- local display -------------------- def local_display_loop(self): while self.running.is_set(): with self.latest_frame_lock: if self.latest_frame is None: time.sleep(0.01) continue frame = self.latest_frame.copy() with self.overlay_lock: bbox = self.last_bbox edges_full = self.last_edges_full.copy() if self.last_edges_full is not None else None if OVERLAY and edges_full is not None: frame = overlay_edges(frame, edges_full) if bbox is not None: x, y, wbox, hbox = bbox cv2.rectangle(frame, (x,y), (x+wbox, y+hbox), (0,255,255), 2) cv2.imshow("TopHat + ViT Tracker", frame) if cv2.waitKey(1) == 27: self.stop() break # -------------------- streaming video -------------------- def streaming_loop(self): pass # unchanged for brevity — same as earlier version # -------------------- streaming coords -------------------- def coords_streaming_loop(self): pass # unchanged for brevity — same as earlier version

---------------------- main ----------------------

def main():
app = LowLatencyEdgeTracker()
signal.signal(signal.SIGINT, lambda s,f: app.stop())
app.start()

if name == "main":
main()

in this code , i want to chenge the camera pi reads frame to webcam , please change this and rewrite the full code for me

Thought

Answer

python
#!/usr/bin/env python3 """ Low-latency webcam + background Top-Hat detection + ViT tracker. This version replaces Picamera2 with a standard webcam via OpenCV VideoCapture. """ import signal import threading import queue import time import math import socket import struct import os import cv2 import numpy as np from vittrack import VitTrack # ------------------------- CONFIG ------------------------- ROI = (315, 222, 415, 322) GAP_THRESHOLD = 0.4 OVERLAY = True TARGET_IP = "192.168.50.1" PORT = 9999 COORDS_PORT = 9998 TRACKING_QSIZE = 1 COORDS_QSIZE = 32 CAP_FPS = 30.0 MODEL_PATH = "/home/pi/Desktop/vittracker/object_tracking_vittrack_2023sep.onnx" BACKEND_ID = cv2.dnn.DNN_BACKEND_OPENCV TARGET_ID = cv2.dnn.DNN_TARGET_CPU VIT_SCORE_THRESHOLD = 0.3 # ---- Webcam settings ---- WEBCAM_INDEX = 0 # 0 is usually default camera; try 1 if you have multiple FRAME_SIZE = (730, 544) # (width, height) to match your previous PiCamera2 config USE_MJPG = True # often reduces latency on USB webcams # --------------------- ROI split finder ------------------------ def find_split_boxes_by_gap(edges_bin, gap_threshold=0.4): if np.count_nonzero(edges_bin) == 0: return [] ys, xs = np.where(edges_bin > 0) if len(xs) == 0: return [] minx, maxx = xs.min(), xs.max() total_width = maxx - minx if (maxx - minx) > 0 else 1 gap_threshold_px = total_width * gap_threshold xs_sorted = np.sort(np.unique(xs)) clusters = [] cur_s = xs_sorted[0] cur_e = xs_sorted[0] for i in range(1, len(xs_sorted)): x_curr = xs_sorted[i] x_prev = xs_sorted[i - 1] if (x_curr - x_prev) > gap_threshold_px: clusters.append((cur_s, cur_e)) cur_s = x_curr cur_e = x_curr else: cur_e = x_curr clusters.append((cur_s, cur_e)) boxes = [] for x_start, x_end in clusters: mask_x = (xs >= x_start) & (xs <= x_end) ys_in_range = ys[mask_x] if len(ys_in_range) == 0: continue y_start = ys_in_range.min() y_end = ys_in_range.max() boxes.append((x_start, y_start, x_end, y_end)) return boxes # --------------------- visualize edges ------------------------- def overlay_edges(original_bgr, edges_gray, alpha=0.75, edge_color=(0, 255, 0)): base = original_bgr.copy() if edges_gray is None: return base edges_gray = cv2.convertScaleAbs(edges_gray) _, bin_mask = cv2.threshold(edges_gray, 10, 255, cv2.THRESH_BINARY) if bin_mask.sum() == 0: return base mask = bin_mask.astype(bool) overlay = base.copy() overlay[mask] = edge_color blended = cv2.addWeighted(overlay, alpha, base, 1 - alpha, 0) out = base.copy() out[mask] = blended[mask] return out # -------------------- Low-latency Application -------------------- class LowLatencyEdgeTracker: def __init__(self, roi=ROI, model_path=MODEL_PATH, backend_id=BACKEND_ID, target_id=TARGET_ID): self.roi = roi # ---------- Webcam init (replaces Picamera2) ---------- self.cap = cv2.VideoCapture(WEBCAM_INDEX, cv2.CAP_V4L2) if USE_MJPG: self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"MJPG")) self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, FRAME_SIZE[0]) self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, FRAME_SIZE[1]) self.cap.set(cv2.CAP_PROP_FPS, CAP_FPS) # Reduce buffering if supported (not all backends honor this) self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) if not self.cap.isOpened(): raise RuntimeError(f"Could not open webcam index {WEBCAM_INDEX}") self.latest_frame = None self.latest_frame_lock = threading.Lock() self.tracking_q = queue.Queue(maxsize=TRACKING_QSIZE) self.coords_q = queue.Queue(maxsize=COORDS_QSIZE) self.overlay_lock = threading.Lock() self.last_bbox = None self.last_edges_full = None self.running = threading.Event() self.running.set() self.model_path = model_path self.backend_id = backend_id self.target_id = target_id # Create VitTrack once self.vit = None try: if os.path.exists(self.model_path): self.vit = VitTrack( model_path=self.model_path, backend_id=self.backend_id, target_id=self.target_id ) print("VitTrack instance created.") else: print("Model not found:", self.model_path) except Exception as ex: self.vit = None print("VitTrack load error:", ex) self.tracker = None self.tracking = False self.threads = [] self.threads.append(threading.Thread(target=self.capture_loop, daemon=True)) self.threads.append(threading.Thread(target=self.slow_processing_loop, daemon=True)) self.threads.append(threading.Thread(target=self.local_display_loop, daemon=True)) self.threads.append(threading.Thread(target=self.streaming_loop, daemon=True)) self.threads.append(threading.Thread(target=self.coords_streaming_loop, daemon=True)) # -------------------- Start/Stop -------------------- def start(self): for t in self.threads: t.start() try: while self.running.is_set(): time.sleep(0.2) except KeyboardInterrupt: self.stop() self._cleanup() def stop(self): self.running.clear() def _cleanup(self): try: if self.cap is not None: self.cap.release() except: pass cv2.destroyAllWindows() # -------------------- Capture thread (fast) -------------------- def capture_loop(self): fps_sleep = max(0.001, 1.0 / CAP_FPS) while self.running.is_set(): ok, frame = self.cap.read() if not ok or frame is None: time.sleep(0.005) continue # If your webcam delivers a different size, force-resize to match ROI assumptions if (frame.shape[1], frame.shape[0]) != FRAME_SIZE: frame = cv2.resize(frame, FRAME_SIZE, interpolation=cv2.INTER_LINEAR) with self.latest_frame_lock: self.latest_frame = frame try: self.tracking_q.get_nowait() except: pass try: self.tracking_q.put_nowait(frame.copy()) except: pass time.sleep(fps_sleep) # -------------------- Slow processing (Top-Hat + ViT) -------------------- def slow_processing_loop(self): frame_count = 0 x0, y0, x1, y1 = self.roi roi_center = ((x0 + x1) / 2, (y0 + y1) / 2) kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (15, 15)) while self.running.is_set(): try: frame = self.tracking_q.get(timeout=0.2) except: continue frame_count += 1 h, w = frame.shape[:2] rx0 = max(0, min(w - 1, x0)) ry0 = max(0, min(h - 1, y0)) rx1 = max(0, min(w - 1, x1)) ry1 = max(0, min(h - 1, y1)) if rx1 <= rx0 or ry1 <= ry0: continue # ---------------------- Top-Hat instead of Canny ---------------------- roi_bgr = frame[ry0:ry1, rx0:rx1] roi_gray = cv2.cvtColor(roi_bgr, cv2.COLOR_BGR2GRAY) tophat = cv2.morphologyEx(roi_gray, cv2.MORPH_TOPHAT, kernel) max_value = float(np.max(tophat)) if tophat.size else 0.0 if max_value <= 0: edges_bin_roi = np.zeros_like(roi_gray, dtype=np.uint8) else: _, edges_bin_roi = cv2.threshold( tophat, max_value * 0.6, 255, cv2.THRESH_BINARY ) edges_full = np.zeros((h, w), dtype=np.uint8) edges_full[ry0:ry1, rx0:rx1] = edges_bin_roi # ---------------------- Find clusters ---------------------- split_boxes = find_split_boxes_by_gap(edges_bin_roi, GAP_THRESHOLD) closest_box = None min_dist = 999999 for (sx0, sy0, sx1, sy1) in split_boxes: gx0 = rx0 + sx0 gy0 = ry0 + sy0 gx1 = rx0 + sx1 gy1 = ry0 + sy1 cx = (gx0 + gx1) / 2 cy = (gy0 + gy1) / 2 d = math.hypot(cx - roi_center[0], cy - roi_center[1]) if d < min_dist: min_dist = d closest_box = (gx0, gy0, gx1, gy1) # ---------------------- Init ViT tracker ---------------------- if not self.tracking and closest_box is not None and self.vit is not None: try: bx0, by0, bx1, by1 = closest_box bw = bx1 - bx0 bh = by1 - by0 self.vit.init(frame, (int(bx0), int(by0), int(bw), int(bh))) self.tracker = self.vit self.tracking = True with self.overlay_lock: self.last_bbox = (int(bx0), int(by0), int(bw), int(bh)) self.last_edges_full = edges_full.copy() print("ViT Tracker started:", self.last_bbox) except Exception as ex: print("ViT init error:", ex) self.tracker = None self.tracking = False # ---------------------- Update ViT tracker ---------------------- if self.tracking and self.tracker is not None: try: located, bbox, score = self.tracker.infer(frame) except Exception as ex: print("infer error:", ex) located, bbox, score = False, None, 0 if located and bbox is not None and score >= VIT_SCORE_THRESHOLD: x, y, wbox, hbox = map(int, bbox) with self.overlay_lock: self.last_bbox = (x, y, wbox, hbox) self.last_edges_full = edges_full.copy() try: self.coords_q.put_nowait((frame_count, x + wbox // 2, y + hbox // 2)) except: pass else: with self.overlay_lock: self.last_bbox = None self.last_edges_full = edges_full.copy() self.tracking = False self.tracker = None else: with self.overlay_lock: self.last_edges_full = edges_full.copy() # -------------------- local display -------------------- def local_display_loop(self): while self.running.is_set(): with self.latest_frame_lock: if self.latest_frame is None: time.sleep(0.01) continue frame = self.latest_frame.copy() with self.overlay_lock: bbox = self.last_bbox edges_full = self.last_edges_full.copy() if self.last_edges_full is not None else None if OVERLAY and edges_full is not None: frame = overlay_edges(frame, edges_full) if bbox is not None: x, y, wbox, hbox = bbox cv2.rectangle(frame, (x, y), (x + wbox, y + hbox), (0, 255, 255), 2) cv2.imshow("TopHat + ViT Tracker (Webcam)", frame) if cv2.waitKey(1) == 27: # ESC self.stop() break # -------------------- streaming video -------------------- def streaming_loop(self): pass # unchanged for brevity — same as earlier version # -------------------- streaming coords -------------------- def coords_streaming_loop(self): pass # unchanged for brevity — same as earlier version # ---------------------- main ---------------------- def main(): app = LowLatencyEdgeTracker() signal.signal(signal.SIGINT, lambda s, f: app.stop()) app.start() if __name__ == "__main__": main()

If your ROI was tuned for the Pi camera image, it should still work because this forces the webcam frames to 730x544. If your webcam looks stretched or you want native webcam resolution instead, tell me what resolution your webcam outputs and I’ll adjust it cleanly.

Share this Q&A