Spaces:
Sleeping
Sleeping
import streamlit as st | |
import cv2 | |
import numpy as np | |
import requests | |
import tempfile | |
import mediapipe as mp | |
import os | |
import pickle | |
import io | |
from googleapiclient.discovery import build | |
from googleapiclient.http import MediaFileUpload | |
from google.auth.transport.requests import Request | |
from PIL import Image | |
from simple_salesforce import Salesforce | |
from datetime import datetime | |
import uuid | |
# === SALESFORCE SETUP === | |
sf = Salesforce( | |
username='suriraja822@agentforce.com', | |
password='Sati@1010', | |
security_token='B9nS0HEyBE7YmWllqXCyiOJpY', | |
domain='login' | |
) | |
def log_intruder_to_salesforce(frame_id): | |
try: | |
sf.Hotel_Security_Alert__c.create({ | |
'Alert_Type__c': 'Critical', | |
'Detection_Timestamp__c': datetime.utcnow().isoformat(), | |
'HF_Detection_ID__c': f'intruder_{frame_id}_{uuid.uuid4()}', | |
'Confidence_Score__c': 0.90, | |
'Description__c': 'Detected unrecognized face from CCTV feed', | |
'Status__c': 'Active', | |
'Title__c': f'Intruder Detected Frame {frame_id}', | |
'Priority__c': 'High' | |
}) | |
except Exception as e: | |
st.warning(f"β οΈ Failed to log to Salesforce: {e}") | |
# === CONFIGURATION === | |
API_KEY = "AIzaSyDojJrpauA0XZtCCDUuo9xeQHZQamYKsC4" | |
CCTVFEED_IDS = ['1KJRkSf2SKEZ1mXS9_si65IwMBtjs6p4n'] | |
REG_FOLDER_ID = '1qkcR7nQTEtiMH9OFUv2bGxVn08E3dKjF' | |
INTRUDER_FOLDER_ID = '1PPAUWU-wMx7fek73p-hqPqYQypYtG8Ob' | |
FRAME_SKIP = st.sidebar.slider("β© Frame Skip", 1, 15, 5) | |
# === SETUP === | |
mp_face_detection = mp.solutions.face_detection | |
face_detector = mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5) | |
def mse(img1, img2): | |
try: | |
if img1.shape != img2.shape: | |
img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0])) | |
return np.mean((img1.astype("float") - img2.astype("float")) ** 2) | |
except Exception: | |
return float("inf") | |
def is_duplicate(img, img_list, threshold=200): | |
return any(mse(img, existing) < threshold for existing in img_list) | |
def get_drive_file_links(folder_id, filter_video=False): | |
url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)" | |
response = requests.get(url) | |
files = response.json().get("files", []) | |
links = [] | |
for f in files: | |
mime = f["mimeType"] | |
if (not filter_video and mime.startswith("image/")) or (filter_video and mime.startswith("video/")): | |
link = f"https://drive.google.com/uc?id={f['id']}&export=download" | |
links.append((f["name"], link)) | |
return links | |
def download_file(link, suffix): | |
response = requests.get(link, stream=True) | |
temp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix) | |
for chunk in response.iter_content(1024 * 1024): | |
temp.write(chunk) | |
temp.close() | |
return temp.name | |
def detect_faces(image): | |
rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
results = face_detector.process(rgb) | |
faces = [] | |
if results.detections: | |
for det in results.detections: | |
bbox = det.location_data.relative_bounding_box | |
h, w, _ = image.shape | |
x, y, bw, bh = int(bbox.xmin * w), int(bbox.ymin * h), int(bbox.width * w), int(bbox.height * h) | |
crop = image[max(0, y):y+bh, max(0, x):x+bw] | |
if crop.size > 0: | |
faces.append(crop) | |
return faces | |
def upload_to_drive(image_np, filename): | |
creds = None | |
if os.path.exists("token.pickle"): | |
with open("token.pickle", "rb") as token: | |
creds = pickle.load(token) | |
if not creds or not creds.valid: | |
if creds and creds.expired and creds.refresh_token: | |
creds.refresh(Request()) | |
service = build('drive', 'v3', credentials=creds) | |
image_pil = Image.fromarray(cv2.cvtColor(image_np, cv2.COLOR_BGR2RGB)) | |
buf = io.BytesIO() | |
image_pil.save(buf, format="JPEG") | |
buf.seek(0) | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp: | |
tmp.write(buf.read()) | |
tmp.flush() | |
file_metadata = {'name': filename, 'parents': [INTRUDER_FOLDER_ID]} | |
media = MediaFileUpload(tmp.name, mimetype='image/jpeg') | |
service.files().create(body=file_metadata, media_body=media, fields='id').execute() | |
os.remove(tmp.name) | |
def detect_faces_from_video(video_path, registered_faces): | |
cap = cv2.VideoCapture(video_path) | |
detected = [] | |
unique = [] | |
frame_id = 0 | |
while cap.isOpened(): | |
ret, frame = cap.read() | |
if not ret: | |
break | |
frame_id += 1 | |
if frame_id % FRAME_SKIP != 0: | |
continue | |
faces = detect_faces(frame) | |
for f in faces: | |
if not any(mse(f, reg) < 200 for reg in registered_faces) and not is_duplicate(f, unique): | |
unique.append(f) | |
filename = f"intruder_{frame_id}.jpg" | |
upload_to_drive(f, filename) | |
log_intruder_to_salesforce(frame_id) | |
detected.extend(faces) | |
cap.release() | |
return detected, unique | |
# === STREAMLIT UI === | |
st.title("π Intruder Detection") | |
st.write("Streams from Google Drive and detects intruders using MediaPipe.") | |
st.sidebar.header("π Registered Faces") | |
reg_faces_raw = get_drive_file_links(REG_FOLDER_ID, filter_video=False) | |
registered_faces = [] | |
for name, link in reg_faces_raw: | |
img_path = download_file(link, ".jpg") | |
img = cv2.imread(img_path) | |
faces = detect_faces(img) | |
for f in faces: | |
registered_faces.append(f) | |
st.sidebar.image(cv2.cvtColor(f, cv2.COLOR_BGR2RGB), caption=name, width=100) | |
os.remove(img_path) | |
st.sidebar.header("πΈ Detected Intruders") | |
with st.spinner("Processing videos..."): | |
total_uploaded = 0 | |
for folder_id in CCTVFEED_IDS: | |
vids = get_drive_file_links(folder_id, filter_video=True) | |
for vname, vlink in vids: | |
st.subheader(f"π₯ {vname}") | |
vfile = download_file(vlink, ".mp4") | |
detected_faces, unique_faces = detect_faces_from_video(vfile, registered_faces) | |
for i, f in enumerate(unique_faces): | |
st.sidebar.image(cv2.cvtColor(f, cv2.COLOR_BGR2RGB), caption=f"Intruder {i+1}", width=100) | |
if unique_faces: | |
st.success(f"β Uploaded {len(unique_faces)} intruder(s).") | |
else: | |
st.info("π No new intruders detected.") | |
os.remove(vfile) | |
total_uploaded += len(unique_faces) | |
st.info("π― Completed processing all videos.") | |