Spaces:
Sleeping
Sleeping
import streamlit as st | |
import cv2 | |
import numpy as np | |
import requests | |
import tempfile | |
import mediapipe as mp | |
import os | |
from PIL import Image | |
from io import BytesIO | |
# === CONFIGURATION === | |
API_KEY = "AIzaSyDojJrpauA0XZtCCDUuo9xeQHZQamYKsC4" | |
CCTVFEED_IDS = ['1KJRkSf2SKEZ1mXS9_si65IwMBtjs6p4n'] | |
REG_FOLDER_ID = '1qkcR7nQTEtiMH9OFUv2bGxVn08E3dKjF' | |
INTRUDER_FOLDER_ID = '1PPAUWU-wMx7fek73p-hqPqYQypYtG8Ob' | |
# === SETUP === | |
mp_face_detection = mp.solutions.face_detection | |
mp_drawing = mp.solutions.drawing_utils | |
face_detector = mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5) | |
# === FUNCTIONS === | |
def list_drive_files(folder_id): | |
url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)" | |
response = requests.get(url) | |
return response.json().get("files", []) | |
def stream_drive_file(file_id, suffix): | |
url = f"https://drive.google.com/uc?id={file_id}&export=download" | |
response = requests.get(url, stream=True) | |
temp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix) | |
for chunk in response.iter_content(1024 * 1024): | |
temp.write(chunk) | |
temp.close() | |
return temp.name | |
def mse(img1, img2): | |
if img1.shape != img2.shape: | |
return np.inf | |
return np.mean((img1 - img2) ** 2) | |
def load_registered_faces(): | |
reg_faces = [] | |
files = list_drive_files(REG_FOLDER_ID) | |
for f in files: | |
if "image" in f['mimeType']: | |
path = stream_drive_file(f['id'], ".jpg") | |
img = cv2.imread(path, cv2.IMREAD_GRAYSCALE) | |
img = cv2.resize(img, (100, 100)) | |
reg_faces.append(img) | |
os.remove(path) | |
return reg_faces | |
def upload_intruder_image(img_np, intruder_name="intruder_face.jpg"): | |
# Convert image to JPEG in memory | |
img = Image.fromarray(cv2.cvtColor(img_np, cv2.COLOR_BGR2RGB)) | |
buffer = BytesIO() | |
img.save(buffer, format="JPEG") | |
buffer.seek(0) | |
# Upload to intruder folder using resumable upload | |
upload_url = f"https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart&key={API_KEY}" | |
metadata = { | |
"name": intruder_name, | |
"parents": [INTRUDER_FOLDER_ID] | |
} | |
files = { | |
"metadata": ("metadata.json", BytesIO(str.encode(str(metadata).replace("'", '"'))), "application/json"), | |
"file": ("image.jpg", buffer, "image/jpeg") | |
} | |
response = requests.post(upload_url, files=files) | |
return response.ok | |
def detect_faces_from_video(video_path, reg_faces): | |
cap = cv2.VideoCapture(video_path) | |
detected, intruders, sidebar_faces = 0, 0, [] | |
while cap.isOpened(): | |
ret, frame = cap.read() | |
if not ret: | |
break | |
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
results = face_detector.process(rgb) | |
if results.detections: | |
for det in results.detections: | |
bbox = det.location_data.relative_bounding_box | |
h, w, _ = frame.shape | |
x, y, bw, bh = int(bbox.xmin * w), int(bbox.ymin * h), int(bbox.width * w), int(bbox.height * h) | |
face = frame[y:y+bh, x:x+bw] | |
if face.size == 0: | |
continue | |
face_gray = cv2.cvtColor(cv2.resize(face, (100, 100)), cv2.COLOR_BGR2GRAY) | |
sidebar_faces.append(face) | |
is_known = any(mse(face_gray, kf) < 2000 for kf in reg_faces) | |
if not is_known: | |
upload_intruder_image(face, f"intruder_{intruders}.jpg") | |
intruders += 1 | |
detected += 1 | |
cap.release() | |
return detected, sidebar_faces | |
# === STREAMLIT UI === | |
st.set_page_config(layout="wide") | |
st.title("π Intruder Detection from CCTV Feed") | |
st.write("Streams public videos from Google Drive, compares detected faces with registered images, and uploads unknown faces to the intruder folder.") | |
reg_faces = load_registered_faces() | |
st.sidebar.markdown("### Faces Detected") | |
with st.spinner("Processing CCTV feed..."): | |
total_intruders = 0 | |
for folder_id in CCTVFEED_IDS: | |
files = list_drive_files(folder_id) | |
for f in files: | |
if f['mimeType'].startswith('video/'): | |
st.subheader(f"π₯ {f['name']}") | |
path = stream_drive_file(f['id'], ".mp4") | |
intruders, detected_faces = detect_faces_from_video(path, reg_faces) | |
total_intruders += intruders | |
for face in detected_faces: | |
st.sidebar.image(face, width=100) | |
os.remove(path) | |
st.success(f"β Intruders found and uploaded: {total_intruders}") | |