import streamlit as st import cv2 import requests import numpy as np import tempfile import time import mediapipe as mp # === CONFIGURATION === CCTVFEED_IDS = ['1KJRkSf2SKEZ1mXS9_si65IwMBtjs6p4n'] # List of folder IDs (Google Drive) SUPPORTED_EXTENSIONS = ['.mp4', '.avi'] # === INITIALIZE FACE DETECTION === mp_face_detection = mp.solutions.face_detection face_detection = mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5) # === HELPER FUNCTIONS === def list_gdrive_folder_files(folder_id): """Returns list of (filename, download_url) from public Google Drive folder.""" api_url = f"https://drive.google.com/embeddedfolderview?id={folder_id}#grid" resp = requests.get(api_url) file_lines = resp.text.splitlines() links = [] for line in file_lines: if 'data-id=' in line and any(ext in line for ext in SUPPORTED_EXTENSIONS): try: file_id = line.split('data-id="')[1].split('"')[0] file_name = line.split('data-title="')[1].split('"')[0] download_url = f"https://drive.google.com/uc?id={file_id}" links.append((file_name, download_url)) except Exception: continue return links def stream_and_process_video(video_url, filename): """Streams video frame-by-frame and runs face detection.""" st.markdown(f"### 🎥 Processing `{filename}`") vid_bytes = requests.get(video_url, stream=True).content tfile = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") tfile.write(vid_bytes) cap = cv2.VideoCapture(tfile.name) frame_count = 0 face_count = 0 frame_display = st.empty() stats_display = st.empty() while cap.isOpened(): ret, frame = cap.read() if not ret: break frame_count += 1 # Convert BGR to RGB image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) results = face_detection.process(image_rgb) # Draw faces if results.detections: for detection in results.detections: bboxC = detection.location_data.relative_bounding_box ih, iw, _ = frame.shape x, y, w, h = int(bboxC.xmin * iw), int(bboxC.ymin * ih), int(bboxC.width * iw), int(bboxC.height * ih) cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2) face_count += 1 # Resize for display display_frame = cv2.resize(frame, (640, 360)) frame_display.image(display_frame, channels="BGR") stats_display.markdown(f"**Frames Processed:** {frame_count} | **Faces Detected So Far:** {face_count}") time.sleep(1 / 24.0) # Approx. 24 FPS cap.release() st.success(f"✅ Done! `{filename}`: {frame_count} frames, {face_count} faces detected.") # === MAIN UI === st.title("🚨 CCTV Face Detection from Google Drive") st.markdown("Detects faces in `.mp4` and `.avi` videos from your public Google Drive folder.") for folder_id in CCTVFEED_IDS: videos = list_gdrive_folder_files(folder_id) if not videos: st.warning(f"No valid videos found in folder ID: {folder_id}") continue for name, url in videos: if st.button(f"▶️ Run Face Detection on `{name}`"): stream_and_process_video(url, name)