Spaces:
Sleeping
Sleeping
File size: 6,564 Bytes
de236dc 37b0940 de236dc 58574bf de236dc ae58e6e 58574bf 86a3f51 8157937 c2ed860 1e53150 615bf6c 8140af8 462c66a c89d622 462c66a 54b3feb 462c66a ce25643 462c66a ce25643 462c66a de236dc c43aee1 58574bf 16e29fd 8157937 37b0940 615bf6c ae58e6e 27b0f1b 8157937 c89d622 8157937 c89d622 8157937 8140af8 8157937 615bf6c 1e53150 8140af8 1e53150 8157937 c2ed860 137bfc1 8157937 1e53150 8140af8 1e53150 8157937 8140af8 8157937 c2ed860 8157937 86a3f51 8157937 462c66a c89d622 8157937 86a3f51 8157937 86a3f51 8157937 1e53150 8157937 1e53150 8157937 8140af8 8157937 1e53150 8157937 c89d622 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
import streamlit as st
import cv2
import numpy as np
import requests
import tempfile
import mediapipe as mp
import os
import pickle
import io
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from google.auth.transport.requests import Request
from PIL import Image
from simple_salesforce import Salesforce
from datetime import datetime
import uuid
# === SALESFORCE SETUP ===
sf = Salesforce(
username='suriraja822@agentforce.com',
password='Sati@1010',
security_token='B9nS0HEyBE7YmWllqXCyiOJpY',
domain='login'
)
def log_intruder_to_salesforce(frame_id):
try:
sf.Hotel_Security_Alert__c.create({
'Alert_Type__c': 'Critical',
'Detection_Timestamp__c': datetime.utcnow().isoformat(),
'HF_Detection_ID__c': f'intruder_{frame_id}_{uuid.uuid4()}',
'Confidence_Score__c': 0.90,
'Description__c': 'Detected unrecognized face from CCTV feed',
'Status__c': 'Active',
'Title__c': f'Intruder Detected Frame {frame_id}',
'Priority__c': 'High'
})
except Exception as e:
st.warning(f"β οΈ Failed to log to Salesforce: {e}")
# === CONFIGURATION ===
API_KEY = "AIzaSyDojJrpauA0XZtCCDUuo9xeQHZQamYKsC4"
CCTVFEED_IDS = ['1KJRkSf2SKEZ1mXS9_si65IwMBtjs6p4n']
REG_FOLDER_ID = '1qkcR7nQTEtiMH9OFUv2bGxVn08E3dKjF'
INTRUDER_FOLDER_ID = '1PPAUWU-wMx7fek73p-hqPqYQypYtG8Ob'
FRAME_SKIP = st.sidebar.slider("β© Frame Skip", 1, 15, 5)
# === SETUP ===
mp_face_detection = mp.solutions.face_detection
face_detector = mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5)
def mse(img1, img2):
try:
if img1.shape != img2.shape:
img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0]))
return np.mean((img1.astype("float") - img2.astype("float")) ** 2)
except Exception:
return float("inf")
def is_duplicate(img, img_list, threshold=200):
return any(mse(img, existing) < threshold for existing in img_list)
def get_drive_file_links(folder_id, filter_video=False):
url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)"
response = requests.get(url)
files = response.json().get("files", [])
links = []
for f in files:
mime = f["mimeType"]
if (not filter_video and mime.startswith("image/")) or (filter_video and mime.startswith("video/")):
link = f"https://drive.google.com/uc?id={f['id']}&export=download"
links.append((f["name"], link))
return links
def download_file(link, suffix):
response = requests.get(link, stream=True)
temp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
for chunk in response.iter_content(1024 * 1024):
temp.write(chunk)
temp.close()
return temp.name
def detect_faces(image):
rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
results = face_detector.process(rgb)
faces = []
if results.detections:
for det in results.detections:
bbox = det.location_data.relative_bounding_box
h, w, _ = image.shape
x, y, bw, bh = int(bbox.xmin * w), int(bbox.ymin * h), int(bbox.width * w), int(bbox.height * h)
crop = image[max(0, y):y+bh, max(0, x):x+bw]
if crop.size > 0:
faces.append(crop)
return faces
def upload_to_drive(image_np, filename):
creds = None
if os.path.exists("token.pickle"):
with open("token.pickle", "rb") as token:
creds = pickle.load(token)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
service = build('drive', 'v3', credentials=creds)
image_pil = Image.fromarray(cv2.cvtColor(image_np, cv2.COLOR_BGR2RGB))
buf = io.BytesIO()
image_pil.save(buf, format="JPEG")
buf.seek(0)
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp:
tmp.write(buf.read())
tmp.flush()
file_metadata = {'name': filename, 'parents': [INTRUDER_FOLDER_ID]}
media = MediaFileUpload(tmp.name, mimetype='image/jpeg')
service.files().create(body=file_metadata, media_body=media, fields='id').execute()
os.remove(tmp.name)
def detect_faces_from_video(video_path, registered_faces):
cap = cv2.VideoCapture(video_path)
detected = []
unique = []
frame_id = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
frame_id += 1
if frame_id % FRAME_SKIP != 0:
continue
faces = detect_faces(frame)
for f in faces:
if not any(mse(f, reg) < 200 for reg in registered_faces) and not is_duplicate(f, unique):
unique.append(f)
filename = f"intruder_{frame_id}.jpg"
upload_to_drive(f, filename)
log_intruder_to_salesforce(frame_id)
detected.extend(faces)
cap.release()
return detected, unique
# === STREAMLIT UI ===
st.title("π Intruder Detection")
st.write("Streams from Google Drive and detects intruders using MediaPipe.")
st.sidebar.header("π Registered Faces")
reg_faces_raw = get_drive_file_links(REG_FOLDER_ID, filter_video=False)
registered_faces = []
for name, link in reg_faces_raw:
img_path = download_file(link, ".jpg")
img = cv2.imread(img_path)
faces = detect_faces(img)
for f in faces:
registered_faces.append(f)
st.sidebar.image(cv2.cvtColor(f, cv2.COLOR_BGR2RGB), caption=name, width=100)
os.remove(img_path)
st.sidebar.header("πΈ Detected Intruders")
with st.spinner("Processing videos..."):
total_uploaded = 0
for folder_id in CCTVFEED_IDS:
vids = get_drive_file_links(folder_id, filter_video=True)
for vname, vlink in vids:
st.subheader(f"π₯ {vname}")
vfile = download_file(vlink, ".mp4")
detected_faces, unique_faces = detect_faces_from_video(vfile, registered_faces)
for i, f in enumerate(unique_faces):
st.sidebar.image(cv2.cvtColor(f, cv2.COLOR_BGR2RGB), caption=f"Intruder {i+1}", width=100)
if unique_faces:
st.success(f"β
Uploaded {len(unique_faces)} intruder(s).")
else:
st.info("π No new intruders detected.")
os.remove(vfile)
total_uploaded += len(unique_faces)
st.info("π― Completed processing all videos.")
|