SuriRaja commited on
Commit
16e29fd
·
verified ·
1 Parent(s): 8b64005

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +79 -78
app.py CHANGED
@@ -5,121 +5,122 @@ import requests
5
  import tempfile
6
  import mediapipe as mp
7
  import os
8
- from PIL import Image
9
 
10
  # === CONFIGURATION ===
11
  API_KEY = "AIzaSyDojJrpauA0XZtCCDUuo9xeQHZQamYKsC4"
12
  CCTVFEED_IDS = ['1KJRkSf2SKEZ1mXS9_si65IwMBtjs6p4n']
13
  REG_FOLDER_ID = '1qkcR7nQTEtiMH9OFUv2bGxVn08E3dKjF'
14
- INTRUDER_UPLOAD_URL = "https://upload.gdrive.com/your-anon-endpoint-if-any" # You can change this if needed
15
 
16
- # === SETUP ===
17
  mp_face_detection = mp.solutions.face_detection
18
- mp_drawing = mp.solutions.drawing_utils
19
  face_detector = mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5)
20
 
21
- # === UTILS ===
22
- def get_drive_file_links(folder_id, mime_types):
23
- api_url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)"
24
- response = requests.get(api_url)
25
- files = response.json().get("files", [])
26
- return [(f['name'], f['id']) for f in files if f['mimeType'] in mime_types]
27
 
28
- def download_file(file_id, suffix):
29
- url = f"https://drive.google.com/uc?id={file_id}&export=download"
30
- response = requests.get(url, stream=True)
 
 
 
 
 
31
  temp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
32
  for chunk in response.iter_content(chunk_size=1024 * 1024):
33
  temp.write(chunk)
34
  temp.close()
35
  return temp.name
36
 
37
- def extract_faces_from_image(img_path):
38
- img = cv2.imread(img_path)
39
- rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
40
- results = face_detector.process(rgb)
41
- faces = []
42
- if results.detections:
43
- for det in results.detections:
44
- bbox = det.location_data.relative_bounding_box
45
- h, w, _ = rgb.shape
46
- x, y, bw, bh = int(bbox.xmin * w), int(bbox.ymin * h), int(bbox.width * w), int(bbox.height * h)
47
- face_crop = rgb[y:y+bh, x:x+bw]
48
- if face_crop.size > 0:
49
- face_resized = cv2.resize(face_crop, (100, 100))
50
- faces.append(face_resized)
51
- return faces
52
 
53
- def mse(img1, img2):
54
- if img1.shape != img2.shape:
55
- return np.inf
56
- return np.mean((img1.astype("float") - img2.astype("float")) ** 2)
 
 
 
 
 
57
 
58
  def is_intruder(face_img, known_faces, threshold=5000):
59
- face_gray = cv2.cvtColor(face_img, cv2.COLOR_RGB2GRAY)
60
- face_gray = cv2.resize(face_gray, (100, 100))
61
- return all(mse(face_gray, known) > threshold for known in known_faces)
62
 
63
- def upload_intruder(img_array, name):
64
- temp_img = tempfile.NamedTemporaryFile(delete=False, suffix=".jpg")
65
- Image.fromarray(cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR)).save(temp_img.name)
66
- # Simulate upload – requires actual implementation if you want to use Google Forms or Apps Script
67
- st.warning(f"Upload placeholder: {temp_img.name}")
68
- return temp_img.name
 
 
 
 
 
 
 
 
 
69
 
70
- # === APP UI ===
 
71
  st.title("🔐 Intruder Detection from CCTV Feed")
72
- st.write("Streaming videos from Google Drive, detecting faces using MediaPipe and flagging unknown individuals.")
73
-
74
- sidebar_faces = []
75
 
76
- with st.spinner("Loading registered faces..."):
77
- known_faces = []
78
- for fname, fid in get_drive_file_links(REG_FOLDER_ID, ["image/jpeg", "image/png"]):
79
- img_path = download_file(fid, ".jpg")
80
- faces = extract_faces_from_image(img_path)
81
- for face in faces:
82
- gray = cv2.cvtColor(face, cv2.COLOR_RGB2GRAY)
83
- known_faces.append(cv2.resize(gray, (100, 100)))
84
- os.remove(img_path)
85
- st.success(f"Loaded {len(known_faces)} registered faces.")
86
 
87
- with st.spinner("Processing CCTV videos..."):
88
  intruder_count = 0
 
89
  for folder_id in CCTVFEED_IDS:
90
- video_files = get_drive_file_links(folder_id, ["video/mp4", "video/x-msvideo", "video/avi"])
91
- for vname, vid in video_files:
92
  st.subheader(f"🎥 {vname}")
93
- video_path = download_file(vid, ".mp4")
94
  cap = cv2.VideoCapture(video_path)
95
- frame_id = 0
 
96
  while cap.isOpened():
97
  ret, frame = cap.read()
98
  if not ret:
99
  break
100
- frame_id += 1
101
- if frame_id % 30 != 0:
102
- continue
103
- rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
104
- results = face_detector.process(rgb)
105
  if results.detections:
106
  for det in results.detections:
107
  bbox = det.location_data.relative_bounding_box
108
- h, w, _ = rgb.shape
109
- x, y, bw, bh = int(bbox.xmin * w), int(bbox.ymin * h), int(bbox.width * w), int(bbox.height * h)
110
- face_crop = rgb[y:y+bh, x:x+bw]
111
- if face_crop.size == 0:
 
 
 
112
  continue
113
- if is_intruder(face_crop, known_faces):
114
- sidebar_faces.append(face_crop)
115
  intruder_count += 1
116
- upload_intruder(face_crop, f"intruder_{intruder_count}.jpg")
 
 
 
 
117
  cap.release()
118
  os.remove(video_path)
 
 
 
 
119
 
120
- st.success(f"🎯 Intruders detected: {intruder_count}")
121
-
122
- # === SIDEBAR ===
123
- st.sidebar.title("📸 Faces Detected as Intruders")
124
- for idx, face in enumerate(sidebar_faces):
125
- st.sidebar.image(face, caption=f"Intruder #{idx + 1}", width=100)
 
5
  import tempfile
6
  import mediapipe as mp
7
  import os
8
+ import base64
9
 
10
  # === CONFIGURATION ===
11
  API_KEY = "AIzaSyDojJrpauA0XZtCCDUuo9xeQHZQamYKsC4"
12
  CCTVFEED_IDS = ['1KJRkSf2SKEZ1mXS9_si65IwMBtjs6p4n']
13
  REG_FOLDER_ID = '1qkcR7nQTEtiMH9OFUv2bGxVn08E3dKjF'
14
+ INTRUDER_FOLDER_ID = '1PPAUWU-wMx7fek73p-hqPqYQypYtG8Ob'
15
 
 
16
  mp_face_detection = mp.solutions.face_detection
 
17
  face_detector = mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5)
18
 
19
+ # === FUNCTIONS ===
20
+ def get_drive_image_links(folder_id):
21
+ url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents+and+mimeType='image/jpeg'&key={API_KEY}&fields=files(id,name)"
22
+ response = requests.get(url).json()
23
+ return [f"https://drive.google.com/uc?id={f['id']}&export=download" for f in response.get("files", [])]
 
24
 
25
+ def get_drive_video_links(folder_id):
26
+ url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)"
27
+ files = requests.get(url).json().get("files", [])
28
+ return [(f["name"], f"https://drive.google.com/uc?id={f['id']}&export=download")
29
+ for f in files if f["mimeType"] in ["video/mp4", "video/x-msvideo", "video/avi"]]
30
+
31
+ def download_file(link, suffix=".mp4"):
32
+ response = requests.get(link, stream=True)
33
  temp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
34
  for chunk in response.iter_content(chunk_size=1024 * 1024):
35
  temp.write(chunk)
36
  temp.close()
37
  return temp.name
38
 
39
+ def embed_face(face_img):
40
+ face_img = cv2.resize(face_img, (100, 100))
41
+ return face_img.flatten()
 
 
 
 
 
 
 
 
 
 
 
 
42
 
43
+ def load_known_faces():
44
+ known_faces = []
45
+ for link in get_drive_image_links(REG_FOLDER_ID):
46
+ path = download_file(link, suffix=".jpg")
47
+ img = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
48
+ os.remove(path)
49
+ if img is not None:
50
+ known_faces.append(embed_face(img))
51
+ return known_faces
52
 
53
  def is_intruder(face_img, known_faces, threshold=5000):
54
+ target = embed_face(cv2.cvtColor(face_img, cv2.COLOR_BGR2GRAY))
55
+ distances = [np.mean((target - kf) ** 2) for kf in known_faces]
56
+ return all(d > threshold for d in distances)
57
 
58
+ def upload_to_drive(folder_id, img_path):
59
+ metadata = {
60
+ 'name': os.path.basename(img_path),
61
+ 'parents': [folder_id]
62
+ }
63
+ headers = {"Authorization": f"Bearer {API_KEY}"}
64
+ with open(img_path, "rb") as f:
65
+ data = f.read()
66
+ multipart = {
67
+ "data": ('metadata', '{"name": "%s", "parents": ["%s"]}' % (os.path.basename(img_path), folder_id), 'application/json'),
68
+ "file": (os.path.basename(img_path), data, 'image/jpeg')
69
+ }
70
+ # Public API keys can't upload directly. This stub can be used with service-based token later.
71
+ st.warning("⚠️ Upload not supported using public API key. Simulated only.")
72
+ return True
73
 
74
+ # === UI START ===
75
+ st.set_page_config(layout="wide")
76
  st.title("🔐 Intruder Detection from CCTV Feed")
77
+ st.write(" Using MediaPipe and Google Drive (Public). Only unmatched faces will be flagged.")
 
 
78
 
79
+ known_faces = load_known_faces()
80
+ if not known_faces:
81
+ st.error("No registered faces found in REG folder.")
82
+ st.stop()
 
 
 
 
 
 
83
 
84
+ with st.spinner("Processing videos..."):
85
  intruder_count = 0
86
+ st.sidebar.title("📸 Faces Detected")
87
  for folder_id in CCTVFEED_IDS:
88
+ videos = get_drive_video_links(folder_id)
89
+ for vname, link in videos:
90
  st.subheader(f"🎥 {vname}")
91
+ video_path = download_file(link)
92
  cap = cv2.VideoCapture(video_path)
93
+ intruders = []
94
+
95
  while cap.isOpened():
96
  ret, frame = cap.read()
97
  if not ret:
98
  break
99
+ results = face_detector.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
 
 
 
 
100
  if results.detections:
101
  for det in results.detections:
102
  bbox = det.location_data.relative_bounding_box
103
+ h, w, _ = frame.shape
104
+ x1 = int(bbox.xmin * w)
105
+ y1 = int(bbox.ymin * h)
106
+ x2 = int((bbox.xmin + bbox.width) * w)
107
+ y2 = int((bbox.ymin + bbox.height) * h)
108
+ face = frame[y1:y2, x1:x2]
109
+ if face.size == 0:
110
  continue
111
+ st.sidebar.image(face, caption="Detected Face", width=100)
112
+ if is_intruder(face, known_faces):
113
  intruder_count += 1
114
+ temp_path = f"intruder_{intruder_count}.jpg"
115
+ cv2.imwrite(temp_path, face)
116
+ upload_to_drive(INTRUDER_FOLDER_ID, temp_path)
117
+ intruders.append(temp_path)
118
+ os.remove(temp_path)
119
  cap.release()
120
  os.remove(video_path)
121
+ if intruders:
122
+ st.success(f"🚨 {len(intruders)} intruder(s) found.")
123
+ else:
124
+ st.info("No intruders found.")
125
 
126
+ st.info(f" Processed completed. Total intruders flagged: {intruder_count}")