Spaces:
Sleeping
Sleeping
Update app.py
Browse files
app.py
CHANGED
@@ -7,74 +7,115 @@ import mediapipe as mp
|
|
7 |
import os
|
8 |
|
9 |
# === CONFIGURATION ===
|
10 |
-
API_KEY = "AIzaSyDojJrpauA0XZtCCDUuo9xeQHZQamYKsC4"
|
11 |
CCTVFEED_IDS = ['1KJRkSf2SKEZ1mXS9_si65IwMBtjs6p4n']
|
12 |
REG_FOLDER_ID = '1qkcR7nQTEtiMH9OFUv2bGxVn08E3dKjF'
|
13 |
INTRUDER_FOLDER_ID = '1PPAUWU-wMx7fek73p-hqPqYQypYtG8Ob'
|
|
|
14 |
|
15 |
# === SETUP ===
|
16 |
mp_face_detection = mp.solutions.face_detection
|
17 |
mp_drawing = mp.solutions.drawing_utils
|
18 |
face_detector = mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5)
|
19 |
|
20 |
-
# ===
|
21 |
-
def
|
22 |
api_url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)"
|
23 |
response = requests.get(api_url)
|
24 |
files = response.json().get("files", [])
|
|
|
|
|
|
|
25 |
|
26 |
-
|
27 |
-
|
28 |
-
|
29 |
-
|
30 |
-
video_links = []
|
31 |
-
for f in files:
|
32 |
-
if f["mimeType"] in ["video/mp4", "video/x-msvideo", "video/avi"]:
|
33 |
-
link = f"https://drive.google.com/uc?id={f['id']}&export=download"
|
34 |
-
video_links.append((f["name"], link))
|
35 |
-
return video_links
|
36 |
-
|
37 |
-
def download_video(link):
|
38 |
-
response = requests.get(link, stream=True)
|
39 |
-
temp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
|
40 |
for chunk in response.iter_content(chunk_size=1024*1024):
|
41 |
temp.write(chunk)
|
42 |
temp.close()
|
43 |
return temp.name
|
44 |
|
45 |
-
def
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
46 |
cap = cv2.VideoCapture(video_path)
|
47 |
-
|
48 |
|
49 |
while cap.isOpened():
|
50 |
ret, frame = cap.read()
|
51 |
if not ret:
|
52 |
break
|
53 |
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
|
|
54 |
results = face_detector.process(rgb)
|
|
|
55 |
if results.detections:
|
56 |
-
frames_with_faces += 1
|
57 |
for det in results.detections:
|
58 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
59 |
cap.release()
|
60 |
-
return
|
61 |
|
62 |
-
# === STREAMLIT
|
63 |
st.title("π Intruder Detection from CCTV Feed")
|
64 |
-
st.write("Streaming videos from Google Drive,
|
|
|
|
|
|
|
|
|
65 |
|
66 |
-
|
67 |
-
|
68 |
for folder_id in CCTVFEED_IDS:
|
69 |
-
|
70 |
-
for
|
71 |
-
st.subheader(f"π₯ {
|
72 |
-
|
73 |
-
|
74 |
-
os.remove(
|
75 |
-
if
|
76 |
-
st.
|
77 |
-
|
78 |
else:
|
79 |
-
st.
|
80 |
-
|
|
|
7 |
import os
|
8 |
|
9 |
# === CONFIGURATION ===
|
10 |
+
API_KEY = "AIzaSyDojJrpauA0XZtCCDUuo9xeQHZQamYKsC4"
|
11 |
CCTVFEED_IDS = ['1KJRkSf2SKEZ1mXS9_si65IwMBtjs6p4n']
|
12 |
REG_FOLDER_ID = '1qkcR7nQTEtiMH9OFUv2bGxVn08E3dKjF'
|
13 |
INTRUDER_FOLDER_ID = '1PPAUWU-wMx7fek73p-hqPqYQypYtG8Ob'
|
14 |
+
MATCH_THRESHOLD = 8000
|
15 |
|
16 |
# === SETUP ===
|
17 |
mp_face_detection = mp.solutions.face_detection
|
18 |
mp_drawing = mp.solutions.drawing_utils
|
19 |
face_detector = mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5)
|
20 |
|
21 |
+
# === UTILS ===
|
22 |
+
def get_drive_files(folder_id, mime_type_filter=None):
|
23 |
api_url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)"
|
24 |
response = requests.get(api_url)
|
25 |
files = response.json().get("files", [])
|
26 |
+
if mime_type_filter:
|
27 |
+
files = [f for f in files if f["mimeType"] in mime_type_filter]
|
28 |
+
return files
|
29 |
|
30 |
+
def download_file(file_id, suffix):
|
31 |
+
url = f"https://drive.google.com/uc?id={file_id}&export=download"
|
32 |
+
response = requests.get(url, stream=True)
|
33 |
+
temp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
34 |
for chunk in response.iter_content(chunk_size=1024*1024):
|
35 |
temp.write(chunk)
|
36 |
temp.close()
|
37 |
return temp.name
|
38 |
|
39 |
+
def upload_to_drive(local_path, folder_id):
|
40 |
+
file_name = os.path.basename(local_path)
|
41 |
+
metadata = {
|
42 |
+
'name': file_name,
|
43 |
+
'parents': [folder_id]
|
44 |
+
}
|
45 |
+
files = {
|
46 |
+
'data': ('metadata', '{"name": "%s"}' % file_name, 'application/json'),
|
47 |
+
'file': open(local_path, "rb")
|
48 |
+
}
|
49 |
+
upload_url = f"https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart&key={API_KEY}"
|
50 |
+
response = requests.post(upload_url, files=files)
|
51 |
+
return response.ok
|
52 |
+
|
53 |
+
def load_registered_faces():
|
54 |
+
reg_images = get_drive_files(REG_FOLDER_ID, mime_type_filter=["image/jpeg", "image/png"])
|
55 |
+
faces = []
|
56 |
+
for img in reg_images:
|
57 |
+
path = download_file(img["id"], ".jpg")
|
58 |
+
img_gray = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
|
59 |
+
if img_gray is not None:
|
60 |
+
resized = cv2.resize(img_gray, (100, 100))
|
61 |
+
faces.append(resized)
|
62 |
+
os.remove(path)
|
63 |
+
return faces
|
64 |
+
|
65 |
+
def is_known_face(face_img, known_faces):
|
66 |
+
resized = cv2.resize(face_img, (100, 100))
|
67 |
+
mse_scores = [np.mean((resized - kf) ** 2) for kf in known_faces]
|
68 |
+
return any(score < MATCH_THRESHOLD for score in mse_scores)
|
69 |
+
|
70 |
+
def process_video(video_path, known_faces):
|
71 |
cap = cv2.VideoCapture(video_path)
|
72 |
+
intruder_count = 0
|
73 |
|
74 |
while cap.isOpened():
|
75 |
ret, frame = cap.read()
|
76 |
if not ret:
|
77 |
break
|
78 |
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
79 |
+
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
80 |
results = face_detector.process(rgb)
|
81 |
+
|
82 |
if results.detections:
|
|
|
83 |
for det in results.detections:
|
84 |
+
bbox = det.location_data.relative_bounding_box
|
85 |
+
h, w = frame.shape[:2]
|
86 |
+
x, y, bw, bh = int(bbox.xmin * w), int(bbox.ymin * h), int(bbox.width * w), int(bbox.height * h)
|
87 |
+
face_crop = gray[max(0, y):y+bh, max(0, x):x+bw]
|
88 |
+
if face_crop.size < 100:
|
89 |
+
continue
|
90 |
+
if not is_known_face(face_crop, known_faces):
|
91 |
+
intruder_count += 1
|
92 |
+
temp = tempfile.NamedTemporaryFile(delete=False, suffix=".jpg")
|
93 |
+
cv2.imwrite(temp.name, frame)
|
94 |
+
upload_to_drive(temp.name, INTRUDER_FOLDER_ID)
|
95 |
+
os.remove(temp.name)
|
96 |
cap.release()
|
97 |
+
return intruder_count
|
98 |
|
99 |
+
# === STREAMLIT APP ===
|
100 |
st.title("π Intruder Detection from CCTV Feed")
|
101 |
+
st.write("Streaming videos from Google Drive, comparing against registered faces using MediaPipe.")
|
102 |
+
|
103 |
+
with st.spinner("Loading known faces..."):
|
104 |
+
known_faces = load_registered_faces()
|
105 |
+
st.success(f"β
Loaded {len(known_faces)} registered face(s).")
|
106 |
|
107 |
+
total_intruders = 0
|
108 |
+
with st.spinner("Scanning videos..."):
|
109 |
for folder_id in CCTVFEED_IDS:
|
110 |
+
video_files = get_drive_files(folder_id, mime_type_filter=["video/mp4", "video/x-msvideo", "video/avi"])
|
111 |
+
for f in video_files:
|
112 |
+
st.subheader(f"π₯ {f['name']}")
|
113 |
+
vpath = download_file(f['id'], ".mp4")
|
114 |
+
detected = process_video(vpath, known_faces)
|
115 |
+
os.remove(vpath)
|
116 |
+
if detected > 0:
|
117 |
+
st.error(f"β {detected} intruder(s) uploaded.")
|
118 |
+
total_intruders += detected
|
119 |
else:
|
120 |
+
st.success("β
No intruders detected.")
|
121 |
+
st.info(f"π Total intruders across all videos: {total_intruders}")
|