Spaces:
Sleeping
Sleeping
import streamlit as st | |
import cv2 | |
import numpy as np | |
import requests | |
import tempfile | |
import mediapipe as mp | |
import os | |
import pickle | |
from googleapiclient.discovery import build | |
from googleapiclient.http import MediaFileUpload | |
from PIL import Image | |
# === CONFIGURATION === | |
API_KEY = "AIzaSyDojJrpauA0XZtCCDUuo9xeQHZQamYKsC4" | |
CCTVFEED_IDS = ['1KJRkSf2SKEZ1mXS9_si65IwMBtjs6p4n'] | |
REG_FOLDER_ID = '1qkcR7nQTEtiMH9OFUv2bGxVn08E3dKjF' | |
INTRUDER_FOLDER_ID = '1PPAUWU-wMx7fek73p-hqPqYQypYtG8Ob' | |
mp_face_detection = mp.solutions.face_detection | |
face_detector = mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5) | |
# === INTRUDER UPLOAD === | |
def get_drive_service(): | |
try: | |
with open("token.pickle", "rb") as token: | |
creds = pickle.load(token) | |
return build("drive", "v3", credentials=creds) | |
except Exception as e: | |
print(f"β Drive auth failed: {e}") | |
return None | |
def upload_intruder_face(image_path): | |
service = get_drive_service() | |
if service is None: | |
print("π Upload skipped (auth failed)") | |
return | |
metadata = {"name": os.path.basename(image_path), "parents": [INTRUDER_FOLDER_ID]} | |
media = MediaFileUpload(image_path, mimetype="image/jpeg") | |
try: | |
service.files().create(body=metadata, media_body=media, fields="id").execute() | |
print(f"β Uploaded to INTRUDER folder: {image_path}") | |
except Exception as e: | |
print(f"β Upload failed: {e}") | |
# === HELPERS === | |
def get_drive_files(folder_id, allowed_mime): | |
url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)" | |
response = requests.get(url) | |
files = response.json().get("files", []) | |
return [(f["name"], f["id"]) for f in files if f["mimeType"] in allowed_mime] | |
def download_file(file_id, suffix): | |
url = f"https://drive.google.com/uc?id={file_id}&export=download" | |
response = requests.get(url, stream=True) | |
temp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix) | |
for chunk in response.iter_content(chunk_size=1024 * 1024): | |
temp.write(chunk) | |
temp.close() | |
return temp.name | |
def extract_face_embedding(image): | |
rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
results = face_detector.process(rgb) | |
if results.detections: | |
for det in results.detections: | |
bbox = det.location_data.relative_bounding_box | |
h, w, _ = image.shape | |
x1 = int(bbox.xmin * w) | |
y1 = int(bbox.ymin * h) | |
x2 = int((bbox.xmin + bbox.width) * w) | |
y2 = int((bbox.ymin + bbox.height) * h) | |
face_crop = image[y1:y2, x1:x2] | |
return cv2.resize(face_crop, (128, 128)) | |
return None | |
def is_match(face1, face2): | |
if face1 is None or face2 is None: | |
return False | |
diff = np.linalg.norm(face1.astype("float") - face2.astype("float")) | |
return diff < 30.0 | |
# === STREAMLIT UI === | |
st.set_page_config(page_title="Intruder Detection", layout="wide") | |
st.title("π Intruder Detection from CCTV Feed") | |
st.write("Streaming videos from Google Drive, detecting and matching faces.") | |
sidebar_faces = [] | |
with st.spinner("Loading registered faces..."): | |
registered = [] | |
reg_images = get_drive_files(REG_FOLDER_ID, ["image/jpeg", "image/png"]) | |
for name, fid in reg_images: | |
img_path = download_file(fid, ".jpg") | |
img = cv2.imread(img_path) | |
emb = extract_face_embedding(img) | |
if emb is not None: | |
registered.append((name, emb)) | |
with st.spinner("Scanning CCTV feed..."): | |
total_intruders = 0 | |
for folder_id in CCTVFEED_IDS: | |
videos = get_drive_files(folder_id, ["video/mp4", "video/x-msvideo", "video/avi"]) | |
for vname, fid in videos: | |
st.subheader(f"π₯ {vname}") | |
vid_path = download_file(fid, ".mp4") | |
cap = cv2.VideoCapture(vid_path) | |
frame_count = 0 | |
matched_faces = 0 | |
unmatched_faces = 0 | |
while cap.isOpened(): | |
ret, frame = cap.read() | |
if not ret: | |
break | |
face_img = extract_face_embedding(frame) | |
if face_img is not None: | |
match_found = any(is_match(face_img, emb) for _, emb in registered) | |
if match_found: | |
matched_faces += 1 | |
else: | |
unmatched_faces += 1 | |
temp = tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") | |
cv2.imwrite(temp.name, face_img) | |
upload_intruder_face(temp.name) | |
sidebar_faces.append(temp.name) | |
frame_count += 1 | |
cap.release() | |
st.success(f"π― Processed {frame_count} frames β Matched: {matched_faces}, Unmatched: {unmatched_faces}") | |
total_intruders += unmatched_faces | |
st.info(f"π― Completed processing all videos. Intruders detected: {total_intruders}") | |
with st.sidebar: | |
st.markdown("## π¨ Intruder Snapshots") | |
for face_path in sidebar_faces: | |
st.image(face_path, width=160) | |