Spaces:
Sleeping
Sleeping
File size: 5,235 Bytes
de236dc 37b0940 de236dc 58574bf de236dc ae58e6e 58574bf 9b6a35d 347e5c4 de236dc c43aee1 58574bf 8b64005 37b0940 58574bf ae58e6e 27b0f1b 8b64005 27b0f1b 8b64005 9b6a35d 15c11e6 8b64005 ccde07a c43aee1 8b64005 9b6a35d 8b64005 27b0f1b 8b64005 27b0f1b 8b64005 58574bf 8b64005 9b6a35d 8b64005 58574bf 8b64005 841e5f0 8b64005 27b0f1b 8b64005 27b0f1b 8b64005 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
import streamlit as st
import cv2
import numpy as np
import requests
import tempfile
import mediapipe as mp
import os
from PIL import Image
# === CONFIGURATION ===
API_KEY = "AIzaSyDojJrpauA0XZtCCDUuo9xeQHZQamYKsC4"
CCTVFEED_IDS = ['1KJRkSf2SKEZ1mXS9_si65IwMBtjs6p4n']
REG_FOLDER_ID = '1qkcR7nQTEtiMH9OFUv2bGxVn08E3dKjF'
INTRUDER_UPLOAD_URL = "https://upload.gdrive.com/your-anon-endpoint-if-any" # You can change this if needed
# === SETUP ===
mp_face_detection = mp.solutions.face_detection
mp_drawing = mp.solutions.drawing_utils
face_detector = mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5)
# === UTILS ===
def get_drive_file_links(folder_id, mime_types):
api_url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)"
response = requests.get(api_url)
files = response.json().get("files", [])
return [(f['name'], f['id']) for f in files if f['mimeType'] in mime_types]
def download_file(file_id, suffix):
url = f"https://drive.google.com/uc?id={file_id}&export=download"
response = requests.get(url, stream=True)
temp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
for chunk in response.iter_content(chunk_size=1024 * 1024):
temp.write(chunk)
temp.close()
return temp.name
def extract_faces_from_image(img_path):
img = cv2.imread(img_path)
rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
results = face_detector.process(rgb)
faces = []
if results.detections:
for det in results.detections:
bbox = det.location_data.relative_bounding_box
h, w, _ = rgb.shape
x, y, bw, bh = int(bbox.xmin * w), int(bbox.ymin * h), int(bbox.width * w), int(bbox.height * h)
face_crop = rgb[y:y+bh, x:x+bw]
if face_crop.size > 0:
face_resized = cv2.resize(face_crop, (100, 100))
faces.append(face_resized)
return faces
def mse(img1, img2):
if img1.shape != img2.shape:
return np.inf
return np.mean((img1.astype("float") - img2.astype("float")) ** 2)
def is_intruder(face_img, known_faces, threshold=5000):
face_gray = cv2.cvtColor(face_img, cv2.COLOR_RGB2GRAY)
face_gray = cv2.resize(face_gray, (100, 100))
return all(mse(face_gray, known) > threshold for known in known_faces)
def upload_intruder(img_array, name):
temp_img = tempfile.NamedTemporaryFile(delete=False, suffix=".jpg")
Image.fromarray(cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR)).save(temp_img.name)
# Simulate upload โ requires actual implementation if you want to use Google Forms or Apps Script
st.warning(f"Upload placeholder: {temp_img.name}")
return temp_img.name
# === APP UI ===
st.title("๐ Intruder Detection from CCTV Feed")
st.write("Streaming videos from Google Drive, detecting faces using MediaPipe and flagging unknown individuals.")
sidebar_faces = []
with st.spinner("Loading registered faces..."):
known_faces = []
for fname, fid in get_drive_file_links(REG_FOLDER_ID, ["image/jpeg", "image/png"]):
img_path = download_file(fid, ".jpg")
faces = extract_faces_from_image(img_path)
for face in faces:
gray = cv2.cvtColor(face, cv2.COLOR_RGB2GRAY)
known_faces.append(cv2.resize(gray, (100, 100)))
os.remove(img_path)
st.success(f"Loaded {len(known_faces)} registered faces.")
with st.spinner("Processing CCTV videos..."):
intruder_count = 0
for folder_id in CCTVFEED_IDS:
video_files = get_drive_file_links(folder_id, ["video/mp4", "video/x-msvideo", "video/avi"])
for vname, vid in video_files:
st.subheader(f"๐ฅ {vname}")
video_path = download_file(vid, ".mp4")
cap = cv2.VideoCapture(video_path)
frame_id = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
frame_id += 1
if frame_id % 30 != 0:
continue
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = face_detector.process(rgb)
if results.detections:
for det in results.detections:
bbox = det.location_data.relative_bounding_box
h, w, _ = rgb.shape
x, y, bw, bh = int(bbox.xmin * w), int(bbox.ymin * h), int(bbox.width * w), int(bbox.height * h)
face_crop = rgb[y:y+bh, x:x+bw]
if face_crop.size == 0:
continue
if is_intruder(face_crop, known_faces):
sidebar_faces.append(face_crop)
intruder_count += 1
upload_intruder(face_crop, f"intruder_{intruder_count}.jpg")
cap.release()
os.remove(video_path)
st.success(f"๐ฏ Intruders detected: {intruder_count}")
# === SIDEBAR ===
st.sidebar.title("๐ธ Faces Detected as Intruders")
for idx, face in enumerate(sidebar_faces):
st.sidebar.image(face, caption=f"Intruder #{idx + 1}", width=100)
|