import streamlit as st import cv2 import numpy as np import requests import tempfile import mediapipe as mp import os # === CONFIGURATION === API_KEY = "AIzaSyDojJrpauA0XZtCCDUuo9xeQHZQamYKsC4" CCTVFEED_IDS = ['1KJRkSf2SKEZ1mXS9_si65IwMBtjs6p4n'] REG_FOLDER_ID = '1qkcR7nQTEtiMH9OFUv2bGxVn08E3dKjF' INTRUDER_FOLDER_ID = '1PPAUWU-wMx7fek73p-hqPqYQypYtG8Ob' MATCH_THRESHOLD = 8000 # === SETUP === mp_face_detection = mp.solutions.face_detection mp_drawing = mp.solutions.drawing_utils face_detector = mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5) # === UTILS === def get_drive_files(folder_id, mime_type_filter=None): api_url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)" response = requests.get(api_url) files = response.json().get("files", []) if mime_type_filter: files = [f for f in files if f["mimeType"] in mime_type_filter] return files def download_file(file_id, suffix): url = f"https://drive.google.com/uc?id={file_id}&export=download" response = requests.get(url, stream=True) temp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix) for chunk in response.iter_content(chunk_size=1024*1024): temp.write(chunk) temp.close() return temp.name def upload_to_drive(local_path, folder_id): file_name = os.path.basename(local_path) metadata = { 'name': file_name, 'parents': [folder_id] } files = { 'data': ('metadata', '{"name": "%s"}' % file_name, 'application/json'), 'file': open(local_path, "rb") } upload_url = f"https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart&key={API_KEY}" response = requests.post(upload_url, files=files) return response.ok def load_registered_faces(): reg_images = get_drive_files(REG_FOLDER_ID, mime_type_filter=["image/jpeg", "image/png"]) faces = [] for img in reg_images: path = download_file(img["id"], ".jpg") img_gray = cv2.imread(path, cv2.IMREAD_GRAYSCALE) if img_gray is not None: resized = cv2.resize(img_gray, (100, 100)) faces.append(resized) os.remove(path) return faces def is_known_face(face_img, known_faces): resized = cv2.resize(face_img, (100, 100)) mse_scores = [np.mean((resized - kf) ** 2) for kf in known_faces] return any(score < MATCH_THRESHOLD for score in mse_scores) def process_video(video_path, known_faces): cap = cv2.VideoCapture(video_path) intruder_count = 0 while cap.isOpened(): ret, frame = cap.read() if not ret: break rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) results = face_detector.process(rgb) if results.detections: for det in results.detections: bbox = det.location_data.relative_bounding_box h, w = frame.shape[:2] x, y, bw, bh = int(bbox.xmin * w), int(bbox.ymin * h), int(bbox.width * w), int(bbox.height * h) face_crop = gray[max(0, y):y+bh, max(0, x):x+bw] if face_crop.size < 100: continue if not is_known_face(face_crop, known_faces): intruder_count += 1 temp = tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") cv2.imwrite(temp.name, frame) upload_to_drive(temp.name, INTRUDER_FOLDER_ID) os.remove(temp.name) cap.release() return intruder_count # === STREAMLIT APP === st.title("🔐 Intruder Detection from CCTV Feed") st.write("Streaming videos from Google Drive, comparing against registered faces using MediaPipe.") with st.spinner("Loading known faces..."): known_faces = load_registered_faces() st.success(f"✅ Loaded {len(known_faces)} registered face(s).") total_intruders = 0 with st.spinner("Scanning videos..."): for folder_id in CCTVFEED_IDS: video_files = get_drive_files(folder_id, mime_type_filter=["video/mp4", "video/x-msvideo", "video/avi"]) for f in video_files: st.subheader(f"🎥 {f['name']}") vpath = download_file(f['id'], ".mp4") detected = process_video(vpath, known_faces) os.remove(vpath) if detected > 0: st.error(f"❗ {detected} intruder(s) uploaded.") total_intruders += detected else: st.success("✅ No intruders detected.") st.info(f"🔎 Total intruders across all videos: {total_intruders}")