import streamlit as st import cv2 import numpy as np import requests import tempfile import mediapipe as mp import os import pickle import io from googleapiclient.discovery import build from googleapiclient.http import MediaFileUpload from google.auth.transport.requests import Request from PIL import Image from simple_salesforce import Salesforce from datetime import datetime import uuid # === SALESFORCE SETUP === sf = Salesforce( username='suriraja822@agentforce.com', password='Sati@1010', security_token='B9nS0HEyBE7YmWllqXCyiOJpY', domain='login' ) def log_intruder_to_salesforce(frame_id): try: sf.Hotel_Security_Alert__c.create({ 'Alert_Type__c': 'Critical', 'Detection_Timestamp__c': datetime.utcnow().isoformat(), 'HF_Detection_ID__c': f'intruder_{frame_id}_{uuid.uuid4()}', 'Confidence_Score__c': 0.90, 'Description__c': 'Detected unrecognized face from CCTV feed', 'Status__c': 'Active', 'Title__c': f'Intruder Detected Frame {frame_id}', 'Priority__c': 'High' }) except Exception as e: st.warning(f"⚠️ Failed to log to Salesforce: {e}") # === CONFIGURATION === API_KEY = "AIzaSyDojJrpauA0XZtCCDUuo9xeQHZQamYKsC4" CCTVFEED_IDS = ['1KJRkSf2SKEZ1mXS9_si65IwMBtjs6p4n'] REG_FOLDER_ID = '1qkcR7nQTEtiMH9OFUv2bGxVn08E3dKjF' INTRUDER_FOLDER_ID = '1PPAUWU-wMx7fek73p-hqPqYQypYtG8Ob' FRAME_SKIP = st.sidebar.slider("⏩ Frame Skip", 1, 15, 5) # === SETUP === mp_face_detection = mp.solutions.face_detection face_detector = mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5) def mse(img1, img2): try: if img1.shape != img2.shape: img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0])) return np.mean((img1.astype("float") - img2.astype("float")) ** 2) except Exception: return float("inf") def is_duplicate(img, img_list, threshold=200): return any(mse(img, existing) < threshold for existing in img_list) def get_drive_file_links(folder_id, filter_video=False): url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)" response = requests.get(url) files = response.json().get("files", []) links = [] for f in files: mime = f["mimeType"] if (not filter_video and mime.startswith("image/")) or (filter_video and mime.startswith("video/")): link = f"https://drive.google.com/uc?id={f['id']}&export=download" links.append((f["name"], link)) return links def download_file(link, suffix): response = requests.get(link, stream=True) temp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix) for chunk in response.iter_content(1024 * 1024): temp.write(chunk) temp.close() return temp.name def detect_faces(image): rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) results = face_detector.process(rgb) faces = [] if results.detections: for det in results.detections: bbox = det.location_data.relative_bounding_box h, w, _ = image.shape x, y, bw, bh = int(bbox.xmin * w), int(bbox.ymin * h), int(bbox.width * w), int(bbox.height * h) crop = image[max(0, y):y+bh, max(0, x):x+bw] if crop.size > 0: faces.append(crop) return faces def upload_to_drive(image_np, filename): creds = None if os.path.exists("token.pickle"): with open("token.pickle", "rb") as token: creds = pickle.load(token) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) service = build('drive', 'v3', credentials=creds) image_pil = Image.fromarray(cv2.cvtColor(image_np, cv2.COLOR_BGR2RGB)) buf = io.BytesIO() image_pil.save(buf, format="JPEG") buf.seek(0) with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp: tmp.write(buf.read()) tmp.flush() file_metadata = {'name': filename, 'parents': [INTRUDER_FOLDER_ID]} media = MediaFileUpload(tmp.name, mimetype='image/jpeg') service.files().create(body=file_metadata, media_body=media, fields='id').execute() os.remove(tmp.name) def detect_faces_from_video(video_path, registered_faces): cap = cv2.VideoCapture(video_path) detected = [] unique = [] frame_id = 0 while cap.isOpened(): ret, frame = cap.read() if not ret: break frame_id += 1 if frame_id % FRAME_SKIP != 0: continue faces = detect_faces(frame) for f in faces: if not any(mse(f, reg) < 200 for reg in registered_faces) and not is_duplicate(f, unique): unique.append(f) filename = f"intruder_{frame_id}.jpg" upload_to_drive(f, filename) log_intruder_to_salesforce(frame_id) detected.extend(faces) cap.release() return detected, unique # === STREAMLIT UI === st.title("🔐 Intruder Detection") st.write("Streams from Google Drive and detects intruders using MediaPipe.") st.sidebar.header("📁 Registered Faces") reg_faces_raw = get_drive_file_links(REG_FOLDER_ID, filter_video=False) registered_faces = [] for name, link in reg_faces_raw: img_path = download_file(link, ".jpg") img = cv2.imread(img_path) faces = detect_faces(img) for f in faces: registered_faces.append(f) st.sidebar.image(cv2.cvtColor(f, cv2.COLOR_BGR2RGB), caption=name, width=100) os.remove(img_path) st.sidebar.header("📸 Detected Intruders") with st.spinner("Processing videos..."): total_uploaded = 0 for folder_id in CCTVFEED_IDS: vids = get_drive_file_links(folder_id, filter_video=True) for vname, vlink in vids: st.subheader(f"🎥 {vname}") vfile = download_file(vlink, ".mp4") detected_faces, unique_faces = detect_faces_from_video(vfile, registered_faces) for i, f in enumerate(unique_faces): st.sidebar.image(cv2.cvtColor(f, cv2.COLOR_BGR2RGB), caption=f"Intruder {i+1}", width=100) if unique_faces: st.success(f"✅ Uploaded {len(unique_faces)} intruder(s).") else: st.info("😎 No new intruders detected.") os.remove(vfile) total_uploaded += len(unique_faces) st.info("🎯 Completed processing all videos.")