Spaces:
Sleeping
Sleeping
import streamlit as st | |
import cv2 | |
import numpy as np | |
import requests | |
import tempfile | |
import mediapipe as mp | |
import os | |
# === CONFIGURATION === | |
API_KEY = "AIzaSyDojJrpauA0XZtCCDUuo9xeQHZQamYKsC4" | |
CCTVFEED_IDS = ['1KJRkSf2SKEZ1mXS9_si65IwMBtjs6p4n'] | |
REG_FOLDER_ID = '1qkcR7nQTEtiMH9OFUv2bGxVn08E3dKjF' | |
INTRUDER_FOLDER_ID = '1PPAUWU-wMx7fek73p-hqPqYQypYtG8Ob' | |
MATCH_THRESHOLD = 8000 | |
# === SETUP === | |
mp_face_detection = mp.solutions.face_detection | |
mp_drawing = mp.solutions.drawing_utils | |
face_detector = mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5) | |
# === UTILS === | |
def get_drive_files(folder_id, mime_type_filter=None): | |
api_url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)" | |
response = requests.get(api_url) | |
files = response.json().get("files", []) | |
if mime_type_filter: | |
files = [f for f in files if f["mimeType"] in mime_type_filter] | |
return files | |
def download_file(file_id, suffix): | |
url = f"https://drive.google.com/uc?id={file_id}&export=download" | |
response = requests.get(url, stream=True) | |
temp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix) | |
for chunk in response.iter_content(chunk_size=1024*1024): | |
temp.write(chunk) | |
temp.close() | |
return temp.name | |
def upload_to_drive(local_path, folder_id): | |
file_name = os.path.basename(local_path) | |
metadata = { | |
'name': file_name, | |
'parents': [folder_id] | |
} | |
files = { | |
'data': ('metadata', '{"name": "%s"}' % file_name, 'application/json'), | |
'file': open(local_path, "rb") | |
} | |
upload_url = f"https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart&key={API_KEY}" | |
response = requests.post(upload_url, files=files) | |
return response.ok | |
def load_registered_faces(): | |
reg_images = get_drive_files(REG_FOLDER_ID, mime_type_filter=["image/jpeg", "image/png"]) | |
faces = [] | |
for img in reg_images: | |
path = download_file(img["id"], ".jpg") | |
img_gray = cv2.imread(path, cv2.IMREAD_GRAYSCALE) | |
if img_gray is not None: | |
resized = cv2.resize(img_gray, (100, 100)) | |
faces.append(resized) | |
os.remove(path) | |
return faces | |
def is_known_face(face_img, known_faces): | |
resized = cv2.resize(face_img, (100, 100)) | |
mse_scores = [np.mean((resized - kf) ** 2) for kf in known_faces] | |
return any(score < MATCH_THRESHOLD for score in mse_scores) | |
def process_video(video_path, known_faces): | |
cap = cv2.VideoCapture(video_path) | |
intruder_count = 0 | |
while cap.isOpened(): | |
ret, frame = cap.read() | |
if not ret: | |
break | |
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
results = face_detector.process(rgb) | |
if results.detections: | |
for det in results.detections: | |
bbox = det.location_data.relative_bounding_box | |
h, w = frame.shape[:2] | |
x, y, bw, bh = int(bbox.xmin * w), int(bbox.ymin * h), int(bbox.width * w), int(bbox.height * h) | |
face_crop = gray[max(0, y):y+bh, max(0, x):x+bw] | |
if face_crop.size < 100: | |
continue | |
if not is_known_face(face_crop, known_faces): | |
intruder_count += 1 | |
temp = tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") | |
cv2.imwrite(temp.name, frame) | |
upload_to_drive(temp.name, INTRUDER_FOLDER_ID) | |
os.remove(temp.name) | |
cap.release() | |
return intruder_count | |
# === STREAMLIT APP === | |
st.title("π Intruder Detection from CCTV Feed") | |
st.write("Streaming videos from Google Drive, comparing against registered faces using MediaPipe.") | |
with st.spinner("Loading known faces..."): | |
known_faces = load_registered_faces() | |
st.success(f"β Loaded {len(known_faces)} registered face(s).") | |
total_intruders = 0 | |
with st.spinner("Scanning videos..."): | |
for folder_id in CCTVFEED_IDS: | |
video_files = get_drive_files(folder_id, mime_type_filter=["video/mp4", "video/x-msvideo", "video/avi"]) | |
for f in video_files: | |
st.subheader(f"π₯ {f['name']}") | |
vpath = download_file(f['id'], ".mp4") | |
detected = process_video(vpath, known_faces) | |
os.remove(vpath) | |
if detected > 0: | |
st.error(f"β {detected} intruder(s) uploaded.") | |
total_intruders += detected | |
else: | |
st.success("β No intruders detected.") | |
st.info(f"π Total intruders across all videos: {total_intruders}") | |