SuriRaja commited on
Commit
8b64005
·
verified ·
1 Parent(s): 9b6a35d

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +87 -86
app.py CHANGED
@@ -6,119 +6,120 @@ import tempfile
6
  import mediapipe as mp
7
  import os
8
  from PIL import Image
9
- from io import BytesIO
10
 
11
  # === CONFIGURATION ===
12
  API_KEY = "AIzaSyDojJrpauA0XZtCCDUuo9xeQHZQamYKsC4"
13
  CCTVFEED_IDS = ['1KJRkSf2SKEZ1mXS9_si65IwMBtjs6p4n']
14
  REG_FOLDER_ID = '1qkcR7nQTEtiMH9OFUv2bGxVn08E3dKjF'
15
- INTRUDER_FOLDER_ID = '1PPAUWU-wMx7fek73p-hqPqYQypYtG8Ob'
16
 
17
  # === SETUP ===
18
  mp_face_detection = mp.solutions.face_detection
19
  mp_drawing = mp.solutions.drawing_utils
20
  face_detector = mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5)
21
 
22
- # === FUNCTIONS ===
23
- def list_drive_files(folder_id):
24
- url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)"
25
- response = requests.get(url)
26
- return response.json().get("files", [])
 
27
 
28
- def stream_drive_file(file_id, suffix):
29
  url = f"https://drive.google.com/uc?id={file_id}&export=download"
30
  response = requests.get(url, stream=True)
31
  temp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
32
- for chunk in response.iter_content(1024 * 1024):
33
  temp.write(chunk)
34
  temp.close()
35
  return temp.name
36
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
37
  def mse(img1, img2):
38
  if img1.shape != img2.shape:
39
  return np.inf
40
- return np.mean((img1 - img2) ** 2)
41
-
42
- def load_registered_faces():
43
- reg_faces = []
44
- files = list_drive_files(REG_FOLDER_ID)
45
- for f in files:
46
- if "image" in f['mimeType']:
47
- path = stream_drive_file(f['id'], ".jpg")
48
- img = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
49
- img = cv2.resize(img, (100, 100))
50
- reg_faces.append(img)
51
- os.remove(path)
52
- return reg_faces
53
 
54
- def upload_intruder_image(img_np, intruder_name="intruder_face.jpg"):
55
- # Convert image to JPEG in memory
56
- img = Image.fromarray(cv2.cvtColor(img_np, cv2.COLOR_BGR2RGB))
57
- buffer = BytesIO()
58
- img.save(buffer, format="JPEG")
59
- buffer.seek(0)
60
-
61
- # Upload to intruder folder using resumable upload
62
- upload_url = f"https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart&key={API_KEY}"
63
- metadata = {
64
- "name": intruder_name,
65
- "parents": [INTRUDER_FOLDER_ID]
66
- }
67
- files = {
68
- "metadata": ("metadata.json", BytesIO(str.encode(str(metadata).replace("'", '"'))), "application/json"),
69
- "file": ("image.jpg", buffer, "image/jpeg")
70
- }
71
- response = requests.post(upload_url, files=files)
72
- return response.ok
73
 
74
- def detect_faces_from_video(video_path, reg_faces):
75
- cap = cv2.VideoCapture(video_path)
76
- detected, intruders, sidebar_faces = 0, 0, []
77
-
78
- while cap.isOpened():
79
- ret, frame = cap.read()
80
- if not ret:
81
- break
82
- rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
83
- results = face_detector.process(rgb)
84
- if results.detections:
85
- for det in results.detections:
86
- bbox = det.location_data.relative_bounding_box
87
- h, w, _ = frame.shape
88
- x, y, bw, bh = int(bbox.xmin * w), int(bbox.ymin * h), int(bbox.width * w), int(bbox.height * h)
89
- face = frame[y:y+bh, x:x+bw]
90
- if face.size == 0:
91
- continue
92
- face_gray = cv2.cvtColor(cv2.resize(face, (100, 100)), cv2.COLOR_BGR2GRAY)
93
- sidebar_faces.append(face)
94
- is_known = any(mse(face_gray, kf) < 2000 for kf in reg_faces)
95
- if not is_known:
96
- upload_intruder_image(face, f"intruder_{intruders}.jpg")
97
- intruders += 1
98
- detected += 1
99
- cap.release()
100
- return detected, sidebar_faces
101
 
102
- # === STREAMLIT UI ===
103
- st.set_page_config(layout="wide")
104
  st.title("🔐 Intruder Detection from CCTV Feed")
105
- st.write("Streams public videos from Google Drive, compares detected faces with registered images, and uploads unknown faces to the intruder folder.")
106
 
107
- reg_faces = load_registered_faces()
108
- st.sidebar.markdown("### Faces Detected")
109
 
110
- with st.spinner("Processing CCTV feed..."):
111
- total_intruders = 0
 
 
 
 
 
 
 
 
 
 
 
112
  for folder_id in CCTVFEED_IDS:
113
- files = list_drive_files(folder_id)
114
- for f in files:
115
- if f['mimeType'].startswith('video/'):
116
- st.subheader(f"🎥 {f['name']}")
117
- path = stream_drive_file(f['id'], ".mp4")
118
- intruders, detected_faces = detect_faces_from_video(path, reg_faces)
119
- total_intruders += intruders
120
- for face in detected_faces:
121
- st.sidebar.image(face, width=100)
122
- os.remove(path)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
123
 
124
- st.success(f"✅ Intruders found and uploaded: {total_intruders}")
 
 
 
 
6
  import mediapipe as mp
7
  import os
8
  from PIL import Image
 
9
 
10
  # === CONFIGURATION ===
11
  API_KEY = "AIzaSyDojJrpauA0XZtCCDUuo9xeQHZQamYKsC4"
12
  CCTVFEED_IDS = ['1KJRkSf2SKEZ1mXS9_si65IwMBtjs6p4n']
13
  REG_FOLDER_ID = '1qkcR7nQTEtiMH9OFUv2bGxVn08E3dKjF'
14
+ INTRUDER_UPLOAD_URL = "https://upload.gdrive.com/your-anon-endpoint-if-any" # You can change this if needed
15
 
16
  # === SETUP ===
17
  mp_face_detection = mp.solutions.face_detection
18
  mp_drawing = mp.solutions.drawing_utils
19
  face_detector = mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5)
20
 
21
+ # === UTILS ===
22
+ def get_drive_file_links(folder_id, mime_types):
23
+ api_url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)"
24
+ response = requests.get(api_url)
25
+ files = response.json().get("files", [])
26
+ return [(f['name'], f['id']) for f in files if f['mimeType'] in mime_types]
27
 
28
+ def download_file(file_id, suffix):
29
  url = f"https://drive.google.com/uc?id={file_id}&export=download"
30
  response = requests.get(url, stream=True)
31
  temp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
32
+ for chunk in response.iter_content(chunk_size=1024 * 1024):
33
  temp.write(chunk)
34
  temp.close()
35
  return temp.name
36
 
37
+ def extract_faces_from_image(img_path):
38
+ img = cv2.imread(img_path)
39
+ rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
40
+ results = face_detector.process(rgb)
41
+ faces = []
42
+ if results.detections:
43
+ for det in results.detections:
44
+ bbox = det.location_data.relative_bounding_box
45
+ h, w, _ = rgb.shape
46
+ x, y, bw, bh = int(bbox.xmin * w), int(bbox.ymin * h), int(bbox.width * w), int(bbox.height * h)
47
+ face_crop = rgb[y:y+bh, x:x+bw]
48
+ if face_crop.size > 0:
49
+ face_resized = cv2.resize(face_crop, (100, 100))
50
+ faces.append(face_resized)
51
+ return faces
52
+
53
  def mse(img1, img2):
54
  if img1.shape != img2.shape:
55
  return np.inf
56
+ return np.mean((img1.astype("float") - img2.astype("float")) ** 2)
 
 
 
 
 
 
 
 
 
 
 
 
57
 
58
+ def is_intruder(face_img, known_faces, threshold=5000):
59
+ face_gray = cv2.cvtColor(face_img, cv2.COLOR_RGB2GRAY)
60
+ face_gray = cv2.resize(face_gray, (100, 100))
61
+ return all(mse(face_gray, known) > threshold for known in known_faces)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
62
 
63
+ def upload_intruder(img_array, name):
64
+ temp_img = tempfile.NamedTemporaryFile(delete=False, suffix=".jpg")
65
+ Image.fromarray(cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR)).save(temp_img.name)
66
+ # Simulate upload – requires actual implementation if you want to use Google Forms or Apps Script
67
+ st.warning(f"Upload placeholder: {temp_img.name}")
68
+ return temp_img.name
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69
 
70
+ # === APP UI ===
 
71
  st.title("🔐 Intruder Detection from CCTV Feed")
72
+ st.write("Streaming videos from Google Drive, detecting faces using MediaPipe and flagging unknown individuals.")
73
 
74
+ sidebar_faces = []
 
75
 
76
+ with st.spinner("Loading registered faces..."):
77
+ known_faces = []
78
+ for fname, fid in get_drive_file_links(REG_FOLDER_ID, ["image/jpeg", "image/png"]):
79
+ img_path = download_file(fid, ".jpg")
80
+ faces = extract_faces_from_image(img_path)
81
+ for face in faces:
82
+ gray = cv2.cvtColor(face, cv2.COLOR_RGB2GRAY)
83
+ known_faces.append(cv2.resize(gray, (100, 100)))
84
+ os.remove(img_path)
85
+ st.success(f"Loaded {len(known_faces)} registered faces.")
86
+
87
+ with st.spinner("Processing CCTV videos..."):
88
+ intruder_count = 0
89
  for folder_id in CCTVFEED_IDS:
90
+ video_files = get_drive_file_links(folder_id, ["video/mp4", "video/x-msvideo", "video/avi"])
91
+ for vname, vid in video_files:
92
+ st.subheader(f"🎥 {vname}")
93
+ video_path = download_file(vid, ".mp4")
94
+ cap = cv2.VideoCapture(video_path)
95
+ frame_id = 0
96
+ while cap.isOpened():
97
+ ret, frame = cap.read()
98
+ if not ret:
99
+ break
100
+ frame_id += 1
101
+ if frame_id % 30 != 0:
102
+ continue
103
+ rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
104
+ results = face_detector.process(rgb)
105
+ if results.detections:
106
+ for det in results.detections:
107
+ bbox = det.location_data.relative_bounding_box
108
+ h, w, _ = rgb.shape
109
+ x, y, bw, bh = int(bbox.xmin * w), int(bbox.ymin * h), int(bbox.width * w), int(bbox.height * h)
110
+ face_crop = rgb[y:y+bh, x:x+bw]
111
+ if face_crop.size == 0:
112
+ continue
113
+ if is_intruder(face_crop, known_faces):
114
+ sidebar_faces.append(face_crop)
115
+ intruder_count += 1
116
+ upload_intruder(face_crop, f"intruder_{intruder_count}.jpg")
117
+ cap.release()
118
+ os.remove(video_path)
119
+
120
+ st.success(f"🎯 Intruders detected: {intruder_count}")
121
 
122
+ # === SIDEBAR ===
123
+ st.sidebar.title("📸 Faces Detected as Intruders")
124
+ for idx, face in enumerate(sidebar_faces):
125
+ st.sidebar.image(face, caption=f"Intruder #{idx + 1}", width=100)