FaceDetection / app.py
SuriRaja's picture
Update app.py
9b6a35d verified
raw
history blame
4.66 kB
import streamlit as st
import cv2
import numpy as np
import requests
import tempfile
import mediapipe as mp
import os
from PIL import Image
from io import BytesIO
# === CONFIGURATION ===
API_KEY = "AIzaSyDojJrpauA0XZtCCDUuo9xeQHZQamYKsC4"
CCTVFEED_IDS = ['1KJRkSf2SKEZ1mXS9_si65IwMBtjs6p4n']
REG_FOLDER_ID = '1qkcR7nQTEtiMH9OFUv2bGxVn08E3dKjF'
INTRUDER_FOLDER_ID = '1PPAUWU-wMx7fek73p-hqPqYQypYtG8Ob'
# === SETUP ===
mp_face_detection = mp.solutions.face_detection
mp_drawing = mp.solutions.drawing_utils
face_detector = mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5)
# === FUNCTIONS ===
def list_drive_files(folder_id):
url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)"
response = requests.get(url)
return response.json().get("files", [])
def stream_drive_file(file_id, suffix):
url = f"https://drive.google.com/uc?id={file_id}&export=download"
response = requests.get(url, stream=True)
temp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
for chunk in response.iter_content(1024 * 1024):
temp.write(chunk)
temp.close()
return temp.name
def mse(img1, img2):
if img1.shape != img2.shape:
return np.inf
return np.mean((img1 - img2) ** 2)
def load_registered_faces():
reg_faces = []
files = list_drive_files(REG_FOLDER_ID)
for f in files:
if "image" in f['mimeType']:
path = stream_drive_file(f['id'], ".jpg")
img = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
img = cv2.resize(img, (100, 100))
reg_faces.append(img)
os.remove(path)
return reg_faces
def upload_intruder_image(img_np, intruder_name="intruder_face.jpg"):
# Convert image to JPEG in memory
img = Image.fromarray(cv2.cvtColor(img_np, cv2.COLOR_BGR2RGB))
buffer = BytesIO()
img.save(buffer, format="JPEG")
buffer.seek(0)
# Upload to intruder folder using resumable upload
upload_url = f"https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart&key={API_KEY}"
metadata = {
"name": intruder_name,
"parents": [INTRUDER_FOLDER_ID]
}
files = {
"metadata": ("metadata.json", BytesIO(str.encode(str(metadata).replace("'", '"'))), "application/json"),
"file": ("image.jpg", buffer, "image/jpeg")
}
response = requests.post(upload_url, files=files)
return response.ok
def detect_faces_from_video(video_path, reg_faces):
cap = cv2.VideoCapture(video_path)
detected, intruders, sidebar_faces = 0, 0, []
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = face_detector.process(rgb)
if results.detections:
for det in results.detections:
bbox = det.location_data.relative_bounding_box
h, w, _ = frame.shape
x, y, bw, bh = int(bbox.xmin * w), int(bbox.ymin * h), int(bbox.width * w), int(bbox.height * h)
face = frame[y:y+bh, x:x+bw]
if face.size == 0:
continue
face_gray = cv2.cvtColor(cv2.resize(face, (100, 100)), cv2.COLOR_BGR2GRAY)
sidebar_faces.append(face)
is_known = any(mse(face_gray, kf) < 2000 for kf in reg_faces)
if not is_known:
upload_intruder_image(face, f"intruder_{intruders}.jpg")
intruders += 1
detected += 1
cap.release()
return detected, sidebar_faces
# === STREAMLIT UI ===
st.set_page_config(layout="wide")
st.title("πŸ” Intruder Detection from CCTV Feed")
st.write("Streams public videos from Google Drive, compares detected faces with registered images, and uploads unknown faces to the intruder folder.")
reg_faces = load_registered_faces()
st.sidebar.markdown("### Faces Detected")
with st.spinner("Processing CCTV feed..."):
total_intruders = 0
for folder_id in CCTVFEED_IDS:
files = list_drive_files(folder_id)
for f in files:
if f['mimeType'].startswith('video/'):
st.subheader(f"πŸŽ₯ {f['name']}")
path = stream_drive_file(f['id'], ".mp4")
intruders, detected_faces = detect_faces_from_video(path, reg_faces)
total_intruders += intruders
for face in detected_faces:
st.sidebar.image(face, width=100)
os.remove(path)
st.success(f"βœ… Intruders found and uploaded: {total_intruders}")