Spaces:
Sleeping
Sleeping
import streamlit as st | |
import cv2 | |
import numpy as np | |
import requests | |
import tempfile | |
import mediapipe as mp | |
import os | |
from PIL import Image | |
# === CONFIGURATION === | |
API_KEY = "AIzaSyDojJrpauA0XZtCCDUuo9xeQHZQamYKsC4" | |
CCTVFEED_IDS = ['1KJRkSf2SKEZ1mXS9_si65IwMBtjs6p4n'] | |
REG_FOLDER_ID = '1qkcR7nQTEtiMH9OFUv2bGxVn08E3dKjF' | |
INTRUDER_UPLOAD_URL = "https://upload.gdrive.com/your-anon-endpoint-if-any" # You can change this if needed | |
# === SETUP === | |
mp_face_detection = mp.solutions.face_detection | |
mp_drawing = mp.solutions.drawing_utils | |
face_detector = mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5) | |
# === UTILS === | |
def get_drive_file_links(folder_id, mime_types): | |
api_url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)" | |
response = requests.get(api_url) | |
files = response.json().get("files", []) | |
return [(f['name'], f['id']) for f in files if f['mimeType'] in mime_types] | |
def download_file(file_id, suffix): | |
url = f"https://drive.google.com/uc?id={file_id}&export=download" | |
response = requests.get(url, stream=True) | |
temp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix) | |
for chunk in response.iter_content(chunk_size=1024 * 1024): | |
temp.write(chunk) | |
temp.close() | |
return temp.name | |
def extract_faces_from_image(img_path): | |
img = cv2.imread(img_path) | |
rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
results = face_detector.process(rgb) | |
faces = [] | |
if results.detections: | |
for det in results.detections: | |
bbox = det.location_data.relative_bounding_box | |
h, w, _ = rgb.shape | |
x, y, bw, bh = int(bbox.xmin * w), int(bbox.ymin * h), int(bbox.width * w), int(bbox.height * h) | |
face_crop = rgb[y:y+bh, x:x+bw] | |
if face_crop.size > 0: | |
face_resized = cv2.resize(face_crop, (100, 100)) | |
faces.append(face_resized) | |
return faces | |
def mse(img1, img2): | |
if img1.shape != img2.shape: | |
return np.inf | |
return np.mean((img1.astype("float") - img2.astype("float")) ** 2) | |
def is_intruder(face_img, known_faces, threshold=5000): | |
face_gray = cv2.cvtColor(face_img, cv2.COLOR_RGB2GRAY) | |
face_gray = cv2.resize(face_gray, (100, 100)) | |
return all(mse(face_gray, known) > threshold for known in known_faces) | |
def upload_intruder(img_array, name): | |
temp_img = tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") | |
Image.fromarray(cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR)).save(temp_img.name) | |
# Simulate upload β requires actual implementation if you want to use Google Forms or Apps Script | |
st.warning(f"Upload placeholder: {temp_img.name}") | |
return temp_img.name | |
# === APP UI === | |
st.title("π Intruder Detection from CCTV Feed") | |
st.write("Streaming videos from Google Drive, detecting faces using MediaPipe and flagging unknown individuals.") | |
sidebar_faces = [] | |
with st.spinner("Loading registered faces..."): | |
known_faces = [] | |
for fname, fid in get_drive_file_links(REG_FOLDER_ID, ["image/jpeg", "image/png"]): | |
img_path = download_file(fid, ".jpg") | |
faces = extract_faces_from_image(img_path) | |
for face in faces: | |
gray = cv2.cvtColor(face, cv2.COLOR_RGB2GRAY) | |
known_faces.append(cv2.resize(gray, (100, 100))) | |
os.remove(img_path) | |
st.success(f"Loaded {len(known_faces)} registered faces.") | |
with st.spinner("Processing CCTV videos..."): | |
intruder_count = 0 | |
for folder_id in CCTVFEED_IDS: | |
video_files = get_drive_file_links(folder_id, ["video/mp4", "video/x-msvideo", "video/avi"]) | |
for vname, vid in video_files: | |
st.subheader(f"π₯ {vname}") | |
video_path = download_file(vid, ".mp4") | |
cap = cv2.VideoCapture(video_path) | |
frame_id = 0 | |
while cap.isOpened(): | |
ret, frame = cap.read() | |
if not ret: | |
break | |
frame_id += 1 | |
if frame_id % 30 != 0: | |
continue | |
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
results = face_detector.process(rgb) | |
if results.detections: | |
for det in results.detections: | |
bbox = det.location_data.relative_bounding_box | |
h, w, _ = rgb.shape | |
x, y, bw, bh = int(bbox.xmin * w), int(bbox.ymin * h), int(bbox.width * w), int(bbox.height * h) | |
face_crop = rgb[y:y+bh, x:x+bw] | |
if face_crop.size == 0: | |
continue | |
if is_intruder(face_crop, known_faces): | |
sidebar_faces.append(face_crop) | |
intruder_count += 1 | |
upload_intruder(face_crop, f"intruder_{intruder_count}.jpg") | |
cap.release() | |
os.remove(video_path) | |
st.success(f"π― Intruders detected: {intruder_count}") | |
# === SIDEBAR === | |
st.sidebar.title("πΈ Faces Detected as Intruders") | |
for idx, face in enumerate(sidebar_faces): | |
st.sidebar.image(face, caption=f"Intruder #{idx + 1}", width=100) | |