SuriRaja commited on
Commit
15c11e6
Β·
verified Β·
1 Parent(s): 558a7ec

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +76 -75
app.py CHANGED
@@ -5,7 +5,6 @@ import requests
5
  import tempfile
6
  import mediapipe as mp
7
  import os
8
- from PIL import Image
9
 
10
  # === CONFIGURATION ===
11
  API_KEY = "AIzaSyDojJrpauA0XZtCCDUuo9xeQHZQamYKsC4"
@@ -13,6 +12,7 @@ CCTVFEED_IDS = ['1KJRkSf2SKEZ1mXS9_si65IwMBtjs6p4n']
13
  REG_FOLDER_ID = '1qkcR7nQTEtiMH9OFUv2bGxVn08E3dKjF'
14
  INTRUDER_FOLDER_ID = '1PPAUWU-wMx7fek73p-hqPqYQypYtG8Ob'
15
  MATCH_THRESHOLD = 7000
 
16
 
17
  # === SETUP ===
18
  mp_face_detection = mp.solutions.face_detection
@@ -20,105 +20,106 @@ mp_drawing = mp.solutions.drawing_utils
20
  face_detector = mp_face_detection.FaceDetection(model_selection=1, min_detection_confidence=0.5)
21
 
22
  # === FUNCTIONS ===
23
- def get_drive_image_faces(folder_id):
24
- api_url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)"
25
- response = requests.get(api_url)
26
- files = response.json().get("files", [])
27
- faces = []
 
 
28
 
29
- for f in files:
30
- if "image" in f["mimeType"]:
31
- link = f"https://drive.google.com/uc?id={f['id']}&export=download"
32
- img = requests.get(link).content
33
- temp = tempfile.NamedTemporaryFile(delete=False, suffix=".jpg")
34
- temp.write(img)
35
- temp.close()
36
- gray = cv2.imread(temp.name, cv2.IMREAD_GRAYSCALE)
37
- if gray is not None:
38
- gray = cv2.resize(gray, (100, 100))
39
- faces.append(gray)
40
- os.remove(temp.name)
41
- return faces
42
-
43
- def is_match(face_img, known_faces, threshold=MATCH_THRESHOLD):
44
- face_img = cv2.resize(face_img, (100, 100))
45
- errors = [np.mean((face_img - kf) ** 2) for kf in known_faces]
46
- return any(err < threshold for err in errors)
47
-
48
- def get_drive_video_links(folder_id):
49
- api_url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)"
50
- response = requests.get(api_url)
51
- files = response.json().get("files", [])
52
- video_links = []
53
- for f in files:
54
- if f["mimeType"] in ["video/mp4", "video/x-msvideo", "video/avi"]:
55
- link = f"https://drive.google.com/uc?id={f['id']}&export=download"
56
- video_links.append((f["name"], link))
57
- return video_links
58
-
59
- def download_video(link):
60
- response = requests.get(link, stream=True)
61
- temp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
62
- for chunk in response.iter_content(chunk_size=1024*1024):
63
  temp.write(chunk)
64
  temp.close()
65
  return temp.name
66
 
67
- def detect_intruders(video_path, known_faces):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
68
  cap = cv2.VideoCapture(video_path)
69
- intruder_frames = 0
70
- faces_displayed = 0
71
 
 
72
  while cap.isOpened():
73
  ret, frame = cap.read()
74
  if not ret:
75
  break
 
 
 
 
 
76
  rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
 
77
  results = face_detector.process(rgb)
78
 
79
  if results.detections:
 
 
 
80
  for det in results.detections:
81
  bbox = det.location_data.relative_bounding_box
82
- h, w, _ = frame.shape
83
- x1 = int(bbox.xmin * w)
84
- y1 = int(bbox.ymin * h)
85
- x2 = int((bbox.xmin + bbox.width) * w)
86
- y2 = int((bbox.ymin + bbox.height) * h)
 
 
 
 
 
 
 
 
 
87
 
88
- face_crop = frame[max(y1,0):y2, max(x1,0):x2]
89
- gray_face = cv2.cvtColor(face_crop, cv2.COLOR_BGR2GRAY)
90
-
91
- if not is_match(gray_face, known_faces):
92
- intruder_frames += 1
93
- faces_displayed += 1
94
- st.image(face_crop, caption=f"Unmatched face #{faces_displayed}", channels="BGR", use_column_width=True)
95
- if faces_displayed >= 3:
96
- break
97
  cap.release()
98
- return intruder_frames
99
 
100
- # === STREAMLIT UI ===
101
- st.title("πŸ” Intruder Detection from CCTV Feed")
102
- st.write("Streaming videos from Google Drive, comparing with registered faces, and displaying unmatched intruders.")
103
 
104
  with st.spinner("Loading registered faces..."):
105
- registered_faces = get_drive_image_faces(REG_FOLDER_ID)
106
 
107
- if not registered_faces:
108
- st.error("No registered face images found.")
109
  else:
110
  with st.spinner("Processing videos..."):
111
  total_intruders = 0
112
  for folder_id in CCTVFEED_IDS:
113
- videos = get_drive_video_links(folder_id)
114
- for vname, link in videos:
115
- st.subheader(f"πŸŽ₯ {vname}")
116
- video_file = download_video(link)
117
- count = detect_intruders(video_file, registered_faces)
118
- os.remove(video_file)
119
- if count > 0:
120
- st.success(f"βœ… {count} unmatched faces detected.")
121
- total_intruders += 1
 
122
  else:
123
- st.warning("⚠️ No intruders detected.")
124
- st.info(f"πŸ”Ž Completed. Total videos with intruders: {total_intruders}")
 
5
  import tempfile
6
  import mediapipe as mp
7
  import os
 
8
 
9
  # === CONFIGURATION ===
10
  API_KEY = "AIzaSyDojJrpauA0XZtCCDUuo9xeQHZQamYKsC4"
 
12
  REG_FOLDER_ID = '1qkcR7nQTEtiMH9OFUv2bGxVn08E3dKjF'
13
  INTRUDER_FOLDER_ID = '1PPAUWU-wMx7fek73p-hqPqYQypYtG8Ob'
14
  MATCH_THRESHOLD = 7000
15
+ FRAME_SKIP = 30 # Show every 30th frame for debugging
16
 
17
  # === SETUP ===
18
  mp_face_detection = mp.solutions.face_detection
 
20
  face_detector = mp_face_detection.FaceDetection(model_selection=1, min_detection_confidence=0.5)
21
 
22
  # === FUNCTIONS ===
23
+ def get_drive_files(folder_id, mime_filter=None):
24
+ url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)"
25
+ resp = requests.get(url)
26
+ files = resp.json().get("files", [])
27
+ if mime_filter:
28
+ files = [f for f in files if f['mimeType'] in mime_filter]
29
+ return files
30
 
31
+ def download_file(file_id, suffix=".tmp"):
32
+ url = f"https://drive.google.com/uc?id={file_id}&export=download"
33
+ r = requests.get(url, stream=True)
34
+ temp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
35
+ for chunk in r.iter_content(chunk_size=1024*1024):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
36
  temp.write(chunk)
37
  temp.close()
38
  return temp.name
39
 
40
+ def is_known_face(face_img, known_faces):
41
+ resized = cv2.resize(face_img, (100, 100))
42
+ errors = [np.mean((resized - kf) ** 2) for kf in known_faces]
43
+ return any(err < MATCH_THRESHOLD for err in errors)
44
+
45
+ def load_registered_faces():
46
+ reg_files = get_drive_files(REG_FOLDER_ID, ['image/jpeg', 'image/jpg', 'image/png'])
47
+ known_faces = []
48
+ for f in reg_files:
49
+ path = download_file(f['id'], suffix=".jpg")
50
+ img = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
51
+ if img is not None:
52
+ resized = cv2.resize(img, (100, 100))
53
+ known_faces.append(resized)
54
+ os.remove(path)
55
+ return known_faces
56
+
57
+ def process_video_with_debug(video_path, known_faces):
58
  cap = cv2.VideoCapture(video_path)
59
+ frame_number = 0
60
+ intruder_count = 0
61
 
62
+ st.subheader("πŸ” Detection Debug Log")
63
  while cap.isOpened():
64
  ret, frame = cap.read()
65
  if not ret:
66
  break
67
+ frame_number += 1
68
+
69
+ if frame_number % FRAME_SKIP != 0:
70
+ continue
71
+
72
  rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
73
+ gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
74
  results = face_detector.process(rgb)
75
 
76
  if results.detections:
77
+ status = "βœ… Face Detected"
78
+ color = (0, 255, 0)
79
+ intruder_found = False
80
  for det in results.detections:
81
  bbox = det.location_data.relative_bounding_box
82
+ h, w = frame.shape[:2]
83
+ x, y = int(bbox.xmin * w), int(bbox.ymin * h)
84
+ bw, bh = int(bbox.width * w), int(bbox.height * h)
85
+ face_crop = gray[max(0, y):y+bh, max(0, x):x+bw]
86
+ if face_crop.size > 0 and not is_known_face(face_crop, known_faces):
87
+ intruder_count += 1
88
+ intruder_found = True
89
+ st.image(frame, caption=f"🚨 Intruder #{intruder_count} @ frame {frame_number}", channels="BGR")
90
+ if not intruder_found:
91
+ st.image(frame, caption=f"βœ… Known face(s) @ frame {frame_number}", channels="BGR")
92
+ else:
93
+ status = "❌ No Face"
94
+ color = (0, 0, 255)
95
+ st.image(frame, caption=f"❌ No Face Detected @ frame {frame_number}", channels="BGR")
96
 
 
 
 
 
 
 
 
 
 
97
  cap.release()
98
+ return intruder_count
99
 
100
+ # === UI ===
101
+ st.title("πŸ” Intruder Detection with Visual Debug")
102
+ st.write("Streaming videos from Google Drive. Displays detection status frame by frame.")
103
 
104
  with st.spinner("Loading registered faces..."):
105
+ known_faces = load_registered_faces()
106
 
107
+ if not known_faces:
108
+ st.error("❌ No registered face images found.")
109
  else:
110
  with st.spinner("Processing videos..."):
111
  total_intruders = 0
112
  for folder_id in CCTVFEED_IDS:
113
+ vids = get_drive_files(folder_id, ['video/mp4', 'video/x-msvideo', 'video/avi'])
114
+ st.write(f"πŸ“ Found {len(vids)} video(s) in CCTV folder.")
115
+ for v in vids:
116
+ st.subheader(f"πŸŽ₯ {v['name']}")
117
+ vpath = download_file(v['id'], suffix=".mp4")
118
+ detected = process_video_with_debug(vpath, known_faces)
119
+ os.remove(vpath)
120
+ if detected > 0:
121
+ st.success(f"βœ… Detected {detected} intruder(s) in this video.")
122
+ total_intruders += detected
123
  else:
124
+ st.info("No intruders detected.")
125
+ st.info(f"πŸ”Ž Completed. Total intruders across all videos: {total_intruders}")