Spaces:
Sleeping
Sleeping
import streamlit as st | |
import cv2 | |
import numpy as np | |
import requests | |
import tempfile | |
import mediapipe as mp | |
import os | |
from PIL import Image | |
# === CONFIGURATION === | |
API_KEY = "AIzaSyDojJrpauA0XZtCCDUuo9xeQHZQamYKsC4" | |
CCTVFEED_IDS = ['1KJRkSf2SKEZ1mXS9_si65IwMBtjs6p4n'] | |
REG_FOLDER_ID = '1qkcR7nQTEtiMH9OFUv2bGxVn08E3dKjF' | |
INTRUDER_FOLDER_ID = '1PPAUWU-wMx7fek73p-hqPqYQypYtG8Ob' | |
MATCH_THRESHOLD = 7000 | |
# === SETUP === | |
mp_face_detection = mp.solutions.face_detection | |
mp_drawing = mp.solutions.drawing_utils | |
face_detector = mp_face_detection.FaceDetection(model_selection=1, min_detection_confidence=0.5) | |
# === FUNCTIONS === | |
def get_drive_image_faces(folder_id): | |
api_url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)" | |
response = requests.get(api_url) | |
files = response.json().get("files", []) | |
faces = [] | |
for f in files: | |
if "image" in f["mimeType"]: | |
link = f"https://drive.google.com/uc?id={f['id']}&export=download" | |
img = requests.get(link).content | |
temp = tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") | |
temp.write(img) | |
temp.close() | |
gray = cv2.imread(temp.name, cv2.IMREAD_GRAYSCALE) | |
if gray is not None: | |
gray = cv2.resize(gray, (100, 100)) | |
faces.append(gray) | |
os.remove(temp.name) | |
return faces | |
def is_match(face_img, known_faces, threshold=MATCH_THRESHOLD): | |
face_img = cv2.resize(face_img, (100, 100)) | |
errors = [np.mean((face_img - kf) ** 2) for kf in known_faces] | |
return any(err < threshold for err in errors) | |
def get_drive_video_links(folder_id): | |
api_url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)" | |
response = requests.get(api_url) | |
files = response.json().get("files", []) | |
video_links = [] | |
for f in files: | |
if f["mimeType"] in ["video/mp4", "video/x-msvideo", "video/avi"]: | |
link = f"https://drive.google.com/uc?id={f['id']}&export=download" | |
video_links.append((f["name"], link)) | |
return video_links | |
def download_video(link): | |
response = requests.get(link, stream=True) | |
temp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") | |
for chunk in response.iter_content(chunk_size=1024*1024): | |
temp.write(chunk) | |
temp.close() | |
return temp.name | |
def detect_intruders(video_path, known_faces): | |
cap = cv2.VideoCapture(video_path) | |
intruder_frames = 0 | |
faces_displayed = 0 | |
while cap.isOpened(): | |
ret, frame = cap.read() | |
if not ret: | |
break | |
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
results = face_detector.process(rgb) | |
if results.detections: | |
for det in results.detections: | |
bbox = det.location_data.relative_bounding_box | |
h, w, _ = frame.shape | |
x1 = int(bbox.xmin * w) | |
y1 = int(bbox.ymin * h) | |
x2 = int((bbox.xmin + bbox.width) * w) | |
y2 = int((bbox.ymin + bbox.height) * h) | |
face_crop = frame[max(y1,0):y2, max(x1,0):x2] | |
gray_face = cv2.cvtColor(face_crop, cv2.COLOR_BGR2GRAY) | |
if not is_match(gray_face, known_faces): | |
intruder_frames += 1 | |
faces_displayed += 1 | |
st.image(face_crop, caption=f"Unmatched face #{faces_displayed}", channels="BGR", use_column_width=True) | |
if faces_displayed >= 3: | |
break | |
cap.release() | |
return intruder_frames | |
# === STREAMLIT UI === | |
st.title("π Intruder Detection from CCTV Feed") | |
st.write("Streaming videos from Google Drive, comparing with registered faces, and displaying unmatched intruders.") | |
with st.spinner("Loading registered faces..."): | |
registered_faces = get_drive_image_faces(REG_FOLDER_ID) | |
if not registered_faces: | |
st.error("No registered face images found.") | |
else: | |
with st.spinner("Processing videos..."): | |
total_intruders = 0 | |
for folder_id in CCTVFEED_IDS: | |
videos = get_drive_video_links(folder_id) | |
for vname, link in videos: | |
st.subheader(f"π₯ {vname}") | |
video_file = download_video(link) | |
count = detect_intruders(video_file, registered_faces) | |
os.remove(video_file) | |
if count > 0: | |
st.success(f"β {count} unmatched faces detected.") | |
total_intruders += 1 | |
else: | |
st.warning("β οΈ No intruders detected.") | |
st.info(f"π Completed. Total videos with intruders: {total_intruders}") | |