Spaces:
Sleeping
Sleeping
File size: 4,751 Bytes
de236dc 37b0940 de236dc 58574bf de236dc ae58e6e 58574bf ccde07a 347e5c4 de236dc c43aee1 58574bf ccde07a 37b0940 58574bf ae58e6e 58574bf ccde07a 58574bf ccde07a 58574bf ccde07a 58574bf ccde07a c43aee1 ccde07a 841e5f0 ccde07a c43aee1 ccde07a 58574bf ccde07a de236dc 58574bf c43aee1 de236dc 58574bf c43aee1 ccde07a de236dc ccde07a 58574bf ccde07a 58574bf ccde07a 58574bf ccde07a 841e5f0 ccde07a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
import streamlit as st
import cv2
import numpy as np
import requests
import tempfile
import mediapipe as mp
import os
from PIL import Image
# === CONFIGURATION ===
API_KEY = "AIzaSyDojJrpauA0XZtCCDUuo9xeQHZQamYKsC4"
CCTVFEED_IDS = ['1KJRkSf2SKEZ1mXS9_si65IwMBtjs6p4n']
REG_FOLDER_ID = '1qkcR7nQTEtiMH9OFUv2bGxVn08E3dKjF'
INTRUDER_FOLDER_ID = '1PPAUWU-wMx7fek73p-hqPqYQypYtG8Ob'
MATCH_THRESHOLD = 7000
# === SETUP ===
mp_face_detection = mp.solutions.face_detection
mp_drawing = mp.solutions.drawing_utils
face_detector = mp_face_detection.FaceDetection(model_selection=1, min_detection_confidence=0.5)
# === FUNCTIONS ===
def get_drive_image_faces(folder_id):
api_url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)"
response = requests.get(api_url)
files = response.json().get("files", [])
faces = []
for f in files:
if "image" in f["mimeType"]:
link = f"https://drive.google.com/uc?id={f['id']}&export=download"
img = requests.get(link).content
temp = tempfile.NamedTemporaryFile(delete=False, suffix=".jpg")
temp.write(img)
temp.close()
gray = cv2.imread(temp.name, cv2.IMREAD_GRAYSCALE)
if gray is not None:
gray = cv2.resize(gray, (100, 100))
faces.append(gray)
os.remove(temp.name)
return faces
def is_match(face_img, known_faces, threshold=MATCH_THRESHOLD):
face_img = cv2.resize(face_img, (100, 100))
errors = [np.mean((face_img - kf) ** 2) for kf in known_faces]
return any(err < threshold for err in errors)
def get_drive_video_links(folder_id):
api_url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)"
response = requests.get(api_url)
files = response.json().get("files", [])
video_links = []
for f in files:
if f["mimeType"] in ["video/mp4", "video/x-msvideo", "video/avi"]:
link = f"https://drive.google.com/uc?id={f['id']}&export=download"
video_links.append((f["name"], link))
return video_links
def download_video(link):
response = requests.get(link, stream=True)
temp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
for chunk in response.iter_content(chunk_size=1024*1024):
temp.write(chunk)
temp.close()
return temp.name
def detect_intruders(video_path, known_faces):
cap = cv2.VideoCapture(video_path)
intruder_frames = 0
faces_displayed = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = face_detector.process(rgb)
if results.detections:
for det in results.detections:
bbox = det.location_data.relative_bounding_box
h, w, _ = frame.shape
x1 = int(bbox.xmin * w)
y1 = int(bbox.ymin * h)
x2 = int((bbox.xmin + bbox.width) * w)
y2 = int((bbox.ymin + bbox.height) * h)
face_crop = frame[max(y1,0):y2, max(x1,0):x2]
gray_face = cv2.cvtColor(face_crop, cv2.COLOR_BGR2GRAY)
if not is_match(gray_face, known_faces):
intruder_frames += 1
faces_displayed += 1
st.image(face_crop, caption=f"Unmatched face #{faces_displayed}", channels="BGR", use_column_width=True)
if faces_displayed >= 3:
break
cap.release()
return intruder_frames
# === STREAMLIT UI ===
st.title("๐ Intruder Detection from CCTV Feed")
st.write("Streaming videos from Google Drive, comparing with registered faces, and displaying unmatched intruders.")
with st.spinner("Loading registered faces..."):
registered_faces = get_drive_image_faces(REG_FOLDER_ID)
if not registered_faces:
st.error("No registered face images found.")
else:
with st.spinner("Processing videos..."):
total_intruders = 0
for folder_id in CCTVFEED_IDS:
videos = get_drive_video_links(folder_id)
for vname, link in videos:
st.subheader(f"๐ฅ {vname}")
video_file = download_video(link)
count = detect_intruders(video_file, registered_faces)
os.remove(video_file)
if count > 0:
st.success(f"โ
{count} unmatched faces detected.")
total_intruders += 1
else:
st.warning("โ ๏ธ No intruders detected.")
st.info(f"๐ Completed. Total videos with intruders: {total_intruders}")
|