Spaces:
Sleeping
Sleeping
Update app.py
Browse files
app.py
CHANGED
@@ -1,4 +1,3 @@
|
|
1 |
-
|
2 |
import streamlit as st
|
3 |
import cv2
|
4 |
import numpy as np
|
@@ -7,9 +6,8 @@ import tempfile
|
|
7 |
import mediapipe as mp
|
8 |
import os
|
9 |
import pickle
|
10 |
-
import io
|
11 |
from googleapiclient.discovery import build
|
12 |
-
from googleapiclient.http import
|
13 |
from google_auth_oauthlib.flow import InstalledAppFlow
|
14 |
from google.auth.transport.requests import Request
|
15 |
from PIL import Image
|
@@ -23,118 +21,114 @@ SCOPES = ['https://www.googleapis.com/auth/drive.file']
|
|
23 |
|
24 |
# === SETUP ===
|
25 |
mp_face_detection = mp.solutions.face_detection
|
|
|
26 |
face_detector = mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5)
|
27 |
|
28 |
-
|
29 |
-
def get_drive_service_with_pickle():
|
30 |
creds = None
|
31 |
-
if os.path.exists(
|
32 |
-
with open(
|
33 |
creds = pickle.load(token)
|
34 |
if not creds or not creds.valid:
|
35 |
if creds and creds.expired and creds.refresh_token:
|
36 |
creds.refresh(Request())
|
37 |
else:
|
38 |
-
flow = InstalledAppFlow.from_client_secrets_file(
|
39 |
creds = flow.run_local_server(port=0)
|
40 |
-
with open(
|
41 |
pickle.dump(creds, token)
|
42 |
-
return build(
|
43 |
-
|
44 |
-
drive_service = get_drive_service_with_pickle()
|
45 |
-
|
46 |
-
# === UTILITY FUNCTIONS ===
|
47 |
-
def get_drive_images(folder_id):
|
48 |
-
api_url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)"
|
49 |
-
res = requests.get(api_url)
|
50 |
-
files = res.json().get("files", [])
|
51 |
-
images = []
|
52 |
-
for f in files:
|
53 |
-
if f["mimeType"].startswith("image/"):
|
54 |
-
url = f"https://drive.google.com/uc?id={f['id']}&export=download"
|
55 |
-
img_resp = requests.get(url)
|
56 |
-
img = Image.open(io.BytesIO(img_resp.content)).convert("RGB")
|
57 |
-
images.append(np.array(img))
|
58 |
-
return images
|
59 |
|
60 |
-
def
|
61 |
api_url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)"
|
62 |
response = requests.get(api_url)
|
63 |
files = response.json().get("files", [])
|
64 |
-
return [(f["name"], f"https://drive.google.com/uc?id={f['id']}&export=download") for f in files if f["mimeType"]
|
65 |
|
66 |
-
def
|
67 |
response = requests.get(link, stream=True)
|
68 |
-
temp = tempfile.NamedTemporaryFile(delete=False, suffix=
|
69 |
-
for chunk in response.iter_content(chunk_size=1024
|
70 |
temp.write(chunk)
|
71 |
temp.close()
|
72 |
return temp.name
|
73 |
|
74 |
-
def
|
75 |
-
rgb = cv2.cvtColor(
|
76 |
results = face_detector.process(rgb)
|
77 |
faces = []
|
78 |
if results.detections:
|
79 |
for det in results.detections:
|
80 |
bbox = det.location_data.relative_bounding_box
|
81 |
-
h, w, _ =
|
82 |
-
x, y = int(bbox.xmin * w), int(bbox.ymin * h)
|
83 |
-
|
84 |
-
|
85 |
-
if
|
86 |
-
|
|
|
87 |
return faces
|
88 |
|
89 |
def mse(img1, img2):
|
90 |
return np.mean((img1.astype("float") - img2.astype("float")) ** 2)
|
91 |
|
92 |
-
def is_duplicate(
|
93 |
-
return any(mse(
|
94 |
|
95 |
-
def
|
96 |
-
|
97 |
-
|
98 |
-
|
99 |
-
|
100 |
-
|
101 |
-
|
102 |
-
drive_service.files().create(
|
103 |
-
body={"name": "intruder.jpg", "parents": [INTRUDER_FOLDER_ID]},
|
104 |
-
media_body=media,
|
105 |
-
fields="id"
|
106 |
-
).execute()
|
107 |
-
return True
|
108 |
-
except Exception as e:
|
109 |
-
st.error(f"Upload error: {e}")
|
110 |
-
return False
|
111 |
|
112 |
-
# === MAIN
|
113 |
-
st.set_page_config(
|
114 |
-
st.title("π
|
|
|
115 |
|
116 |
-
|
117 |
-
st.sidebar.header("π Registered Faces")
|
118 |
-
for img in registered_faces:
|
119 |
-
st.sidebar.image(img, width=100)
|
120 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
121 |
uploaded_intruders = []
|
122 |
-
|
|
|
|
|
123 |
for folder_id in CCTVFEED_IDS:
|
124 |
-
|
125 |
-
|
126 |
-
|
127 |
-
|
|
|
|
|
128 |
while cap.isOpened():
|
129 |
ret, frame = cap.read()
|
130 |
if not ret:
|
131 |
break
|
132 |
-
|
133 |
-
|
134 |
-
|
|
|
135 |
if not any(mse(f, reg) < 200 for reg in registered_faces) and not is_duplicate(f, uploaded_intruders):
|
136 |
-
|
137 |
-
|
|
|
|
|
138 |
cap.release()
|
139 |
-
os.remove(
|
140 |
-
|
|
|
|
|
|
1 |
import streamlit as st
|
2 |
import cv2
|
3 |
import numpy as np
|
|
|
6 |
import mediapipe as mp
|
7 |
import os
|
8 |
import pickle
|
|
|
9 |
from googleapiclient.discovery import build
|
10 |
+
from googleapiclient.http import MediaFileUpload
|
11 |
from google_auth_oauthlib.flow import InstalledAppFlow
|
12 |
from google.auth.transport.requests import Request
|
13 |
from PIL import Image
|
|
|
21 |
|
22 |
# === SETUP ===
|
23 |
mp_face_detection = mp.solutions.face_detection
|
24 |
+
mp_drawing = mp.solutions.drawing_utils
|
25 |
face_detector = mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5)
|
26 |
|
27 |
+
def authenticate_drive():
|
|
|
28 |
creds = None
|
29 |
+
if os.path.exists('token.pickle'):
|
30 |
+
with open('token.pickle', 'rb') as token:
|
31 |
creds = pickle.load(token)
|
32 |
if not creds or not creds.valid:
|
33 |
if creds and creds.expired and creds.refresh_token:
|
34 |
creds.refresh(Request())
|
35 |
else:
|
36 |
+
flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)
|
37 |
creds = flow.run_local_server(port=0)
|
38 |
+
with open('token.pickle', 'wb') as token:
|
39 |
pickle.dump(creds, token)
|
40 |
+
return build('drive', 'v3', credentials=creds)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
41 |
|
42 |
+
def get_drive_files(folder_id, types):
|
43 |
api_url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)"
|
44 |
response = requests.get(api_url)
|
45 |
files = response.json().get("files", [])
|
46 |
+
return [(f["name"], f"https://drive.google.com/uc?id={f['id']}&export=download") for f in files if f["mimeType"] in types]
|
47 |
|
48 |
+
def download_file(link, suffix):
|
49 |
response = requests.get(link, stream=True)
|
50 |
+
temp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
|
51 |
+
for chunk in response.iter_content(chunk_size=1024*1024):
|
52 |
temp.write(chunk)
|
53 |
temp.close()
|
54 |
return temp.name
|
55 |
|
56 |
+
def extract_faces(image):
|
57 |
+
rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
|
58 |
results = face_detector.process(rgb)
|
59 |
faces = []
|
60 |
if results.detections:
|
61 |
for det in results.detections:
|
62 |
bbox = det.location_data.relative_bounding_box
|
63 |
+
h, w, _ = image.shape
|
64 |
+
x, y, w_, h_ = int(bbox.xmin * w), int(bbox.ymin * h), int(bbox.width * w), int(bbox.height * h)
|
65 |
+
x, y = max(x, 0), max(y, 0)
|
66 |
+
cropped = image[y:y+h_, x:x+w_]
|
67 |
+
if cropped.size:
|
68 |
+
face = cv2.resize(cropped, (128, 128))
|
69 |
+
faces.append(face)
|
70 |
return faces
|
71 |
|
72 |
def mse(img1, img2):
|
73 |
return np.mean((img1.astype("float") - img2.astype("float")) ** 2)
|
74 |
|
75 |
+
def is_duplicate(new_face, existing_faces):
|
76 |
+
return any(mse(new_face, old) < 200 for old in existing_faces)
|
77 |
|
78 |
+
def upload_image_to_drive(service, folder_id, image_array, name):
|
79 |
+
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".jpg")
|
80 |
+
cv2.imwrite(temp_file.name, image_array)
|
81 |
+
file_metadata = {'name': name, 'parents': [folder_id]}
|
82 |
+
media = MediaFileUpload(temp_file.name, mimetype='image/jpeg')
|
83 |
+
service.files().create(body=file_metadata, media_body=media, fields='id').execute()
|
84 |
+
os.remove(temp_file.name)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
85 |
|
86 |
+
# === MAIN UI ===
|
87 |
+
st.set_page_config(layout="wide")
|
88 |
+
st.title("π Intruder Detection from CCTV Feed")
|
89 |
+
st.sidebar.header("πΈ Face Viewer")
|
90 |
|
91 |
+
drive_service = authenticate_drive()
|
|
|
|
|
|
|
92 |
|
93 |
+
# === Load Registered Faces ===
|
94 |
+
registered_faces = []
|
95 |
+
with st.spinner("π Loading registered faces..."):
|
96 |
+
reg_files = get_drive_files(REG_FOLDER_ID, ["image/jpeg", "image/png"])
|
97 |
+
for name, link in reg_files:
|
98 |
+
img_path = download_file(link, ".jpg")
|
99 |
+
img = cv2.imread(img_path)
|
100 |
+
faces = extract_faces(img)
|
101 |
+
for f in faces:
|
102 |
+
registered_faces.append(f)
|
103 |
+
st.sidebar.image(cv2.cvtColor(f, cv2.COLOR_BGR2RGB), caption=f"REG: {name}", width=100)
|
104 |
+
os.remove(img_path)
|
105 |
+
|
106 |
+
# === Process Videos ===
|
107 |
uploaded_intruders = []
|
108 |
+
total_detections = 0
|
109 |
+
|
110 |
+
with st.spinner("πΉ Processing videos..."):
|
111 |
for folder_id in CCTVFEED_IDS:
|
112 |
+
videos = get_drive_files(folder_id, ["video/mp4", "video/x-msvideo", "video/avi"])
|
113 |
+
for vname, vlink in videos:
|
114 |
+
st.subheader(f"π₯ {vname}")
|
115 |
+
vpath = download_file(vlink, ".mp4")
|
116 |
+
cap = cv2.VideoCapture(vpath)
|
117 |
+
frame_id = 0
|
118 |
while cap.isOpened():
|
119 |
ret, frame = cap.read()
|
120 |
if not ret:
|
121 |
break
|
122 |
+
frame_id += 1
|
123 |
+
faces = extract_faces(frame)
|
124 |
+
for idx, f in enumerate(faces):
|
125 |
+
st.sidebar.image(cv2.cvtColor(f, cv2.COLOR_BGR2RGB), caption=f"VID Frame {frame_id}", width=100)
|
126 |
if not any(mse(f, reg) < 200 for reg in registered_faces) and not is_duplicate(f, uploaded_intruders):
|
127 |
+
upload_image_to_drive(drive_service, INTRUDER_FOLDER_ID, f, f"intruder_{vname}_{frame_id}_{idx}.jpg")
|
128 |
+
st.success(f"β οΈ Intruder detected in frame {frame_id}")
|
129 |
+
uploaded_intruders.append(f)
|
130 |
+
total_detections += 1
|
131 |
cap.release()
|
132 |
+
os.remove(vpath)
|
133 |
+
|
134 |
+
st.info(f"π― Completed processing. Total intruders uploaded: {total_detections}")
|