Spaces:
Sleeping
Sleeping
File size: 5,176 Bytes
de236dc 37b0940 de236dc 58574bf de236dc ae58e6e 58574bf c2ed860 347e5c4 de236dc c43aee1 58574bf 16e29fd 37b0940 ae58e6e 27b0f1b c2ed860 085ef33 c2ed860 137bfc1 c2ed860 137bfc1 c2ed860 137bfc1 c2ed860 9027813 c2ed860 137bfc1 c2ed860 27b0f1b c2ed860 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
import streamlit as st
import cv2
import numpy as np
import requests
import tempfile
import mediapipe as mp
import os
import pickle
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from PIL import Image
# === CONFIGURATION ===
API_KEY = "AIzaSyDojJrpauA0XZtCCDUuo9xeQHZQamYKsC4"
CCTVFEED_IDS = ['1KJRkSf2SKEZ1mXS9_si65IwMBtjs6p4n']
REG_FOLDER_ID = '1qkcR7nQTEtiMH9OFUv2bGxVn08E3dKjF'
INTRUDER_FOLDER_ID = '1PPAUWU-wMx7fek73p-hqPqYQypYtG8Ob'
mp_face_detection = mp.solutions.face_detection
face_detector = mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5)
# === INTRUDER UPLOAD ===
def get_drive_service():
try:
with open("token.pickle", "rb") as token:
creds = pickle.load(token)
return build("drive", "v3", credentials=creds)
except Exception as e:
print(f"β Drive auth failed: {e}")
return None
def upload_intruder_face(image_path):
service = get_drive_service()
if service is None:
print("π Upload skipped (auth failed)")
return
metadata = {"name": os.path.basename(image_path), "parents": [INTRUDER_FOLDER_ID]}
media = MediaFileUpload(image_path, mimetype="image/jpeg")
try:
service.files().create(body=metadata, media_body=media, fields="id").execute()
print(f"β
Uploaded to INTRUDER folder: {image_path}")
except Exception as e:
print(f"β Upload failed: {e}")
# === HELPERS ===
def get_drive_files(folder_id, allowed_mime):
url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)"
response = requests.get(url)
files = response.json().get("files", [])
return [(f["name"], f["id"]) for f in files if f["mimeType"] in allowed_mime]
def download_file(file_id, suffix):
url = f"https://drive.google.com/uc?id={file_id}&export=download"
response = requests.get(url, stream=True)
temp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
for chunk in response.iter_content(chunk_size=1024 * 1024):
temp.write(chunk)
temp.close()
return temp.name
def extract_face_embedding(image):
rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
results = face_detector.process(rgb)
if results.detections:
for det in results.detections:
bbox = det.location_data.relative_bounding_box
h, w, _ = image.shape
x1 = int(bbox.xmin * w)
y1 = int(bbox.ymin * h)
x2 = int((bbox.xmin + bbox.width) * w)
y2 = int((bbox.ymin + bbox.height) * h)
face_crop = image[y1:y2, x1:x2]
return cv2.resize(face_crop, (128, 128))
return None
def is_match(face1, face2):
if face1 is None or face2 is None:
return False
diff = np.linalg.norm(face1.astype("float") - face2.astype("float"))
return diff < 30.0
# === STREAMLIT UI ===
st.set_page_config(page_title="Intruder Detection", layout="wide")
st.title("π Intruder Detection from CCTV Feed")
st.write("Streaming videos from Google Drive, detecting and matching faces.")
sidebar_faces = []
with st.spinner("Loading registered faces..."):
registered = []
reg_images = get_drive_files(REG_FOLDER_ID, ["image/jpeg", "image/png"])
for name, fid in reg_images:
img_path = download_file(fid, ".jpg")
img = cv2.imread(img_path)
emb = extract_face_embedding(img)
if emb is not None:
registered.append((name, emb))
with st.spinner("Scanning CCTV feed..."):
total_intruders = 0
for folder_id in CCTVFEED_IDS:
videos = get_drive_files(folder_id, ["video/mp4", "video/x-msvideo", "video/avi"])
for vname, fid in videos:
st.subheader(f"π₯ {vname}")
vid_path = download_file(fid, ".mp4")
cap = cv2.VideoCapture(vid_path)
frame_count = 0
matched_faces = 0
unmatched_faces = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
face_img = extract_face_embedding(frame)
if face_img is not None:
match_found = any(is_match(face_img, emb) for _, emb in registered)
if match_found:
matched_faces += 1
else:
unmatched_faces += 1
temp = tempfile.NamedTemporaryFile(delete=False, suffix=".jpg")
cv2.imwrite(temp.name, face_img)
upload_intruder_face(temp.name)
sidebar_faces.append(temp.name)
frame_count += 1
cap.release()
st.success(f"π― Processed {frame_count} frames β Matched: {matched_faces}, Unmatched: {unmatched_faces}")
total_intruders += unmatched_faces
st.info(f"π― Completed processing all videos. Intruders detected: {total_intruders}")
with st.sidebar:
st.markdown("## π¨ Intruder Snapshots")
for face_path in sidebar_faces:
st.image(face_path, width=160)
|