Spaces:
Sleeping
Sleeping
Update app.py
Browse files
app.py
CHANGED
@@ -1,8 +1,7 @@
|
|
1 |
import os
|
2 |
import cv2
|
3 |
-
import dlib
|
4 |
-
import numpy as np
|
5 |
import face_recognition
|
|
|
6 |
import time
|
7 |
import pickle
|
8 |
import gradio as gr
|
@@ -11,7 +10,7 @@ from googleapiclient.http import MediaIoBaseDownload, MediaFileUpload
|
|
11 |
from io import BytesIO
|
12 |
|
13 |
# === CONFIGURATION ===
|
14 |
-
CCTVFEED_IDS = ['1KJRkSf2SKEZ1mXS9_si65IwMBtjs6p4n']
|
15 |
REG_FOLDER_ID = '1qkcR7nQTEtiMH9OFUv2bGxVn08E3dKjF'
|
16 |
INTRUDER_FOLDER_ID = '1PPAUWU-wMx7fek73p-hqPqYQypYtG8Ob'
|
17 |
TOKEN_PICKLE = 'token.pickle'
|
@@ -38,9 +37,9 @@ def list_files(service, folder_id, mime_type='video/mp4'):
|
|
38 |
|
39 |
def load_known_faces(service):
|
40 |
known_encodings = []
|
41 |
-
|
42 |
-
print(f"[INFO] Found {len(
|
43 |
-
for f in
|
44 |
fname = f"{f['id']}.jpg"
|
45 |
download_file(service, f['id'], fname)
|
46 |
image = face_recognition.load_image_file(fname)
|
@@ -55,25 +54,26 @@ def upload_intruder(service, img_path):
|
|
55 |
media = MediaFileUpload(img_path, mimetype='image/jpeg')
|
56 |
service.files().create(body=file_metadata, media_body=media, fields='id').execute()
|
57 |
|
58 |
-
def is_match(
|
59 |
-
|
60 |
-
return any(
|
61 |
|
62 |
def process_video(video_path, known_encodings, service):
|
63 |
-
|
64 |
-
|
65 |
-
|
66 |
|
67 |
while True:
|
68 |
-
ret, frame =
|
69 |
if not ret:
|
70 |
break
|
71 |
if frame_count % 30 == 0:
|
72 |
-
|
73 |
-
|
74 |
-
|
75 |
-
|
76 |
-
|
|
|
77 |
ts = int(time.time())
|
78 |
fname = f"intruder_{ts}_{frame_count}.jpg"
|
79 |
cv2.imwrite(fname, frame)
|
@@ -83,30 +83,30 @@ def process_video(video_path, known_encodings, service):
|
|
83 |
detections += 1
|
84 |
frame_count += 1
|
85 |
|
86 |
-
|
87 |
-
print(f"[INFO]
|
88 |
return detections, uploaded
|
89 |
|
90 |
def run_detection():
|
91 |
try:
|
92 |
-
|
93 |
-
known_encodings = load_known_faces(
|
94 |
-
|
95 |
-
total_videos = 0
|
96 |
|
97 |
for folder_id in CCTVFEED_IDS:
|
98 |
-
videos = list_files(
|
99 |
total_videos += len(videos)
|
|
|
100 |
for v in videos:
|
101 |
-
|
102 |
-
|
103 |
-
|
104 |
-
|
105 |
-
|
106 |
-
|
107 |
-
os.remove(video_path)
|
108 |
|
109 |
-
|
|
|
110 |
except Exception as e:
|
111 |
return f"❌ Error: {str(e)}", []
|
112 |
|
@@ -114,6 +114,6 @@ gr.Interface(
|
|
114 |
fn=run_detection,
|
115 |
inputs=[],
|
116 |
outputs=["text", "gallery"],
|
117 |
-
title="CCTV Intruder
|
118 |
-
description="
|
119 |
).launch()
|
|
|
1 |
import os
|
2 |
import cv2
|
|
|
|
|
3 |
import face_recognition
|
4 |
+
import numpy as np
|
5 |
import time
|
6 |
import pickle
|
7 |
import gradio as gr
|
|
|
10 |
from io import BytesIO
|
11 |
|
12 |
# === CONFIGURATION ===
|
13 |
+
CCTVFEED_IDS = ['1KJRkSf2SKEZ1mXS9_si65IwMBtjs6p4n']
|
14 |
REG_FOLDER_ID = '1qkcR7nQTEtiMH9OFUv2bGxVn08E3dKjF'
|
15 |
INTRUDER_FOLDER_ID = '1PPAUWU-wMx7fek73p-hqPqYQypYtG8Ob'
|
16 |
TOKEN_PICKLE = 'token.pickle'
|
|
|
37 |
|
38 |
def load_known_faces(service):
|
39 |
known_encodings = []
|
40 |
+
files = list_files(service, REG_FOLDER_ID, mime_type='image/jpeg')
|
41 |
+
print(f"[INFO] Found {len(files)} registered images.")
|
42 |
+
for f in files:
|
43 |
fname = f"{f['id']}.jpg"
|
44 |
download_file(service, f['id'], fname)
|
45 |
image = face_recognition.load_image_file(fname)
|
|
|
54 |
media = MediaFileUpload(img_path, mimetype='image/jpeg')
|
55 |
service.files().create(body=file_metadata, media_body=media, fields='id').execute()
|
56 |
|
57 |
+
def is_match(encoding, known_encodings, tolerance=0.45):
|
58 |
+
results = face_recognition.compare_faces(known_encodings, encoding, tolerance)
|
59 |
+
return any(results)
|
60 |
|
61 |
def process_video(video_path, known_encodings, service):
|
62 |
+
cap = cv2.VideoCapture(video_path)
|
63 |
+
frame_count, detections = 0, 0
|
64 |
+
uploaded = []
|
65 |
|
66 |
while True:
|
67 |
+
ret, frame = cap.read()
|
68 |
if not ret:
|
69 |
break
|
70 |
if frame_count % 30 == 0:
|
71 |
+
rgb = frame[:, :, ::-1]
|
72 |
+
locations = face_recognition.face_locations(rgb)
|
73 |
+
encodings = face_recognition.face_encodings(rgb, locations)
|
74 |
+
print(f"[FRAME {frame_count}] Found {len(encodings)} face(s).")
|
75 |
+
for encoding in encodings:
|
76 |
+
if not is_match(encoding, known_encodings):
|
77 |
ts = int(time.time())
|
78 |
fname = f"intruder_{ts}_{frame_count}.jpg"
|
79 |
cv2.imwrite(fname, frame)
|
|
|
83 |
detections += 1
|
84 |
frame_count += 1
|
85 |
|
86 |
+
cap.release()
|
87 |
+
print(f"[INFO] Done with {video_path}, {frame_count} frames processed.")
|
88 |
return detections, uploaded
|
89 |
|
90 |
def run_detection():
|
91 |
try:
|
92 |
+
service = init_gdrive()
|
93 |
+
known_encodings = load_known_faces(service)
|
94 |
+
total_detected, total_videos, intruder_imgs = 0, 0, []
|
|
|
95 |
|
96 |
for folder_id in CCTVFEED_IDS:
|
97 |
+
videos = list_files(service, folder_id)
|
98 |
total_videos += len(videos)
|
99 |
+
print(f"[INFO] Found {len(videos)} video(s) in {folder_id}.")
|
100 |
for v in videos:
|
101 |
+
path = f"{v['id']}.mp4"
|
102 |
+
download_file(service, v['id'], path)
|
103 |
+
detected, found = process_video(path, known_encodings, service)
|
104 |
+
total_detected += detected
|
105 |
+
intruder_imgs.extend(found)
|
106 |
+
os.remove(path)
|
|
|
107 |
|
108 |
+
summary = f"✅ Processed {total_videos} video(s). {total_detected} intruder(s) detected."
|
109 |
+
return summary, intruder_imgs
|
110 |
except Exception as e:
|
111 |
return f"❌ Error: {str(e)}", []
|
112 |
|
|
|
114 |
fn=run_detection,
|
115 |
inputs=[],
|
116 |
outputs=["text", "gallery"],
|
117 |
+
title="CCTV Intruder Detector (GPU Ready)",
|
118 |
+
description="Detects intruders by comparing faces in CCTV footage with registered staff/guest photos."
|
119 |
).launch()
|