FaceDetection / app.py
SuriRaja's picture
Update app.py
ce25643 verified
import streamlit as st
import cv2
import numpy as np
import requests
import tempfile
import mediapipe as mp
import os
import pickle
import io
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from google.auth.transport.requests import Request
from PIL import Image
from simple_salesforce import Salesforce
from datetime import datetime
import uuid
# === SALESFORCE SETUP ===
sf = Salesforce(
username='suriraja822@agentforce.com',
password='Sati@1010',
security_token='B9nS0HEyBE7YmWllqXCyiOJpY',
domain='login'
)
def log_intruder_to_salesforce(frame_id):
try:
sf.Hotel_Security_Alert__c.create({
'Alert_Type__c': 'Critical',
'Detection_Timestamp__c': datetime.utcnow().isoformat(),
'HF_Detection_ID__c': f'intruder_{frame_id}_{uuid.uuid4()}',
'Confidence_Score__c': 0.90,
'Description__c': 'Detected unrecognized face from CCTV feed',
'Status__c': 'Active',
'Title__c': f'Intruder Detected Frame {frame_id}',
'Priority__c': 'High'
})
except Exception as e:
st.warning(f"⚠️ Failed to log to Salesforce: {e}")
# === CONFIGURATION ===
API_KEY = "AIzaSyDojJrpauA0XZtCCDUuo9xeQHZQamYKsC4"
CCTVFEED_IDS = ['1KJRkSf2SKEZ1mXS9_si65IwMBtjs6p4n']
REG_FOLDER_ID = '1qkcR7nQTEtiMH9OFUv2bGxVn08E3dKjF'
INTRUDER_FOLDER_ID = '1PPAUWU-wMx7fek73p-hqPqYQypYtG8Ob'
FRAME_SKIP = st.sidebar.slider("⏩ Frame Skip", 1, 15, 5)
# === SETUP ===
mp_face_detection = mp.solutions.face_detection
face_detector = mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5)
def mse(img1, img2):
try:
if img1.shape != img2.shape:
img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0]))
return np.mean((img1.astype("float") - img2.astype("float")) ** 2)
except Exception:
return float("inf")
def is_duplicate(img, img_list, threshold=200):
return any(mse(img, existing) < threshold for existing in img_list)
def get_drive_file_links(folder_id, filter_video=False):
url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)"
response = requests.get(url)
files = response.json().get("files", [])
links = []
for f in files:
mime = f["mimeType"]
if (not filter_video and mime.startswith("image/")) or (filter_video and mime.startswith("video/")):
link = f"https://drive.google.com/uc?id={f['id']}&export=download"
links.append((f["name"], link))
return links
def download_file(link, suffix):
response = requests.get(link, stream=True)
temp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
for chunk in response.iter_content(1024 * 1024):
temp.write(chunk)
temp.close()
return temp.name
def detect_faces(image):
rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
results = face_detector.process(rgb)
faces = []
if results.detections:
for det in results.detections:
bbox = det.location_data.relative_bounding_box
h, w, _ = image.shape
x, y, bw, bh = int(bbox.xmin * w), int(bbox.ymin * h), int(bbox.width * w), int(bbox.height * h)
crop = image[max(0, y):y+bh, max(0, x):x+bw]
if crop.size > 0:
faces.append(crop)
return faces
def upload_to_drive(image_np, filename):
creds = None
if os.path.exists("token.pickle"):
with open("token.pickle", "rb") as token:
creds = pickle.load(token)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
service = build('drive', 'v3', credentials=creds)
image_pil = Image.fromarray(cv2.cvtColor(image_np, cv2.COLOR_BGR2RGB))
buf = io.BytesIO()
image_pil.save(buf, format="JPEG")
buf.seek(0)
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp:
tmp.write(buf.read())
tmp.flush()
file_metadata = {'name': filename, 'parents': [INTRUDER_FOLDER_ID]}
media = MediaFileUpload(tmp.name, mimetype='image/jpeg')
service.files().create(body=file_metadata, media_body=media, fields='id').execute()
os.remove(tmp.name)
def detect_faces_from_video(video_path, registered_faces):
cap = cv2.VideoCapture(video_path)
detected = []
unique = []
frame_id = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
frame_id += 1
if frame_id % FRAME_SKIP != 0:
continue
faces = detect_faces(frame)
for f in faces:
if not any(mse(f, reg) < 200 for reg in registered_faces) and not is_duplicate(f, unique):
unique.append(f)
filename = f"intruder_{frame_id}.jpg"
upload_to_drive(f, filename)
log_intruder_to_salesforce(frame_id)
detected.extend(faces)
cap.release()
return detected, unique
# === STREAMLIT UI ===
st.title("πŸ” Intruder Detection")
st.write("Streams from Google Drive and detects intruders using MediaPipe.")
st.sidebar.header("πŸ“ Registered Faces")
reg_faces_raw = get_drive_file_links(REG_FOLDER_ID, filter_video=False)
registered_faces = []
for name, link in reg_faces_raw:
img_path = download_file(link, ".jpg")
img = cv2.imread(img_path)
faces = detect_faces(img)
for f in faces:
registered_faces.append(f)
st.sidebar.image(cv2.cvtColor(f, cv2.COLOR_BGR2RGB), caption=name, width=100)
os.remove(img_path)
st.sidebar.header("πŸ“Έ Detected Intruders")
with st.spinner("Processing videos..."):
total_uploaded = 0
for folder_id in CCTVFEED_IDS:
vids = get_drive_file_links(folder_id, filter_video=True)
for vname, vlink in vids:
st.subheader(f"πŸŽ₯ {vname}")
vfile = download_file(vlink, ".mp4")
detected_faces, unique_faces = detect_faces_from_video(vfile, registered_faces)
for i, f in enumerate(unique_faces):
st.sidebar.image(cv2.cvtColor(f, cv2.COLOR_BGR2RGB), caption=f"Intruder {i+1}", width=100)
if unique_faces:
st.success(f"βœ… Uploaded {len(unique_faces)} intruder(s).")
else:
st.info("😎 No new intruders detected.")
os.remove(vfile)
total_uploaded += len(unique_faces)
st.info("🎯 Completed processing all videos.")