Spaces:
Sleeping
Sleeping
File size: 4,684 Bytes
de236dc 37b0940 de236dc 58574bf de236dc ae58e6e 58574bf 347e5c4 de236dc c43aee1 58574bf c43aee1 37b0940 58574bf ae58e6e 58574bf c43aee1 58574bf c43aee1 58574bf c43aee1 58574bf c43aee1 58574bf c43aee1 de236dc 58574bf c43aee1 58574bf c43aee1 de236dc 58574bf c43aee1 de236dc c43aee1 58574bf c43aee1 58574bf c43aee1 58574bf c43aee1 58574bf c43aee1 58574bf c43aee1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
import streamlit as st
import cv2
import numpy as np
import requests
import tempfile
import mediapipe as mp
import os
# === CONFIGURATION ===
API_KEY = "AIzaSyDojJrpauA0XZtCCDUuo9xeQHZQamYKsC4"
CCTVFEED_IDS = ['1KJRkSf2SKEZ1mXS9_si65IwMBtjs6p4n']
REG_FOLDER_ID = '1qkcR7nQTEtiMH9OFUv2bGxVn08E3dKjF'
INTRUDER_FOLDER_ID = '1PPAUWU-wMx7fek73p-hqPqYQypYtG8Ob'
MATCH_THRESHOLD = 8000
# === SETUP ===
mp_face_detection = mp.solutions.face_detection
mp_drawing = mp.solutions.drawing_utils
face_detector = mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5)
# === UTILS ===
def get_drive_files(folder_id, mime_type_filter=None):
api_url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)"
response = requests.get(api_url)
files = response.json().get("files", [])
if mime_type_filter:
files = [f for f in files if f["mimeType"] in mime_type_filter]
return files
def download_file(file_id, suffix):
url = f"https://drive.google.com/uc?id={file_id}&export=download"
response = requests.get(url, stream=True)
temp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
for chunk in response.iter_content(chunk_size=1024*1024):
temp.write(chunk)
temp.close()
return temp.name
def upload_to_drive(local_path, folder_id):
file_name = os.path.basename(local_path)
metadata = {
'name': file_name,
'parents': [folder_id]
}
files = {
'data': ('metadata', '{"name": "%s"}' % file_name, 'application/json'),
'file': open(local_path, "rb")
}
upload_url = f"https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart&key={API_KEY}"
response = requests.post(upload_url, files=files)
return response.ok
def load_registered_faces():
reg_images = get_drive_files(REG_FOLDER_ID, mime_type_filter=["image/jpeg", "image/png"])
faces = []
for img in reg_images:
path = download_file(img["id"], ".jpg")
img_gray = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
if img_gray is not None:
resized = cv2.resize(img_gray, (100, 100))
faces.append(resized)
os.remove(path)
return faces
def is_known_face(face_img, known_faces):
resized = cv2.resize(face_img, (100, 100))
mse_scores = [np.mean((resized - kf) ** 2) for kf in known_faces]
return any(score < MATCH_THRESHOLD for score in mse_scores)
def process_video(video_path, known_faces):
cap = cv2.VideoCapture(video_path)
intruder_count = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
results = face_detector.process(rgb)
if results.detections:
for det in results.detections:
bbox = det.location_data.relative_bounding_box
h, w = frame.shape[:2]
x, y, bw, bh = int(bbox.xmin * w), int(bbox.ymin * h), int(bbox.width * w), int(bbox.height * h)
face_crop = gray[max(0, y):y+bh, max(0, x):x+bw]
if face_crop.size < 100:
continue
if not is_known_face(face_crop, known_faces):
intruder_count += 1
temp = tempfile.NamedTemporaryFile(delete=False, suffix=".jpg")
cv2.imwrite(temp.name, frame)
upload_to_drive(temp.name, INTRUDER_FOLDER_ID)
os.remove(temp.name)
cap.release()
return intruder_count
# === STREAMLIT APP ===
st.title("π Intruder Detection from CCTV Feed")
st.write("Streaming videos from Google Drive, comparing against registered faces using MediaPipe.")
with st.spinner("Loading known faces..."):
known_faces = load_registered_faces()
st.success(f"β
Loaded {len(known_faces)} registered face(s).")
total_intruders = 0
with st.spinner("Scanning videos..."):
for folder_id in CCTVFEED_IDS:
video_files = get_drive_files(folder_id, mime_type_filter=["video/mp4", "video/x-msvideo", "video/avi"])
for f in video_files:
st.subheader(f"π₯ {f['name']}")
vpath = download_file(f['id'], ".mp4")
detected = process_video(vpath, known_faces)
os.remove(vpath)
if detected > 0:
st.error(f"β {detected} intruder(s) uploaded.")
total_intruders += detected
else:
st.success("β
No intruders detected.")
st.info(f"π Total intruders across all videos: {total_intruders}")
|