SuriRaja commited on
Commit
9b6a35d
Β·
verified Β·
1 Parent(s): 27b0f1b

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +79 -82
app.py CHANGED
@@ -5,123 +5,120 @@ import requests
5
  import tempfile
6
  import mediapipe as mp
7
  import os
 
 
8
 
9
  # === CONFIGURATION ===
10
  API_KEY = "AIzaSyDojJrpauA0XZtCCDUuo9xeQHZQamYKsC4"
11
  CCTVFEED_IDS = ['1KJRkSf2SKEZ1mXS9_si65IwMBtjs6p4n']
12
  REG_FOLDER_ID = '1qkcR7nQTEtiMH9OFUv2bGxVn08E3dKjF'
13
  INTRUDER_FOLDER_ID = '1PPAUWU-wMx7fek73p-hqPqYQypYtG8Ob'
14
- FACE_SIZE = (100, 100)
15
- MATCH_THRESHOLD = 6000
16
 
17
  # === SETUP ===
18
  mp_face_detection = mp.solutions.face_detection
19
  mp_drawing = mp.solutions.drawing_utils
20
  face_detector = mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5)
21
 
22
- def get_drive_links(folder_id, mime_filter):
23
- api_url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)"
24
- response = requests.get(api_url)
25
- files = response.json().get("files", [])
26
- return [(f["name"], f"https://drive.google.com/uc?id={f['id']}&export=download")
27
- for f in files if f["mimeType"] in mime_filter]
28
 
29
- def download_file(link, suffix):
30
- response = requests.get(link, stream=True)
 
31
  temp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
32
- for chunk in response.iter_content(chunk_size=1024 * 1024):
33
  temp.write(chunk)
34
  temp.close()
35
  return temp.name
36
 
37
- def extract_faces_from_image(image_path):
38
- img = cv2.imread(image_path)
39
- faces = []
40
- if img is None: return []
41
- rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
42
- results = face_detector.process(rgb)
43
- if results.detections:
44
- for det in results.detections:
45
- bboxC = det.location_data.relative_bounding_box
46
- ih, iw, _ = img.shape
47
- x, y, w, h = int(bboxC.xmin * iw), int(bboxC.ymin * ih), \
48
- int(bboxC.width * iw), int(bboxC.height * ih)
49
- face = img[y:y + h, x:x + w]
50
- if face.size > 0:
51
- face = cv2.resize(cv2.cvtColor(face, cv2.COLOR_BGR2GRAY), FACE_SIZE)
52
- faces.append(face)
53
- return faces
54
 
55
  def load_registered_faces():
56
  reg_faces = []
57
- images = get_drive_links(REG_FOLDER_ID, ['image/jpeg', 'image/png'])
58
- st.sidebar.write(f"πŸ“ Registered Images: {len(images)}")
59
- for name, link in images:
60
- img_path = download_file(link, ".jpg")
61
- faces = extract_faces_from_image(img_path)
62
- reg_faces.extend(faces)
63
- os.remove(img_path)
 
64
  return reg_faces
65
 
66
- def mse(imageA, imageB):
67
- return np.mean((imageA.astype("float") - imageB.astype("float")) ** 2)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
68
 
69
- def is_intruder(face, known_faces):
70
- errors = [mse(face, kf) for kf in known_faces]
71
- return all(err > MATCH_THRESHOLD for err in errors)
72
-
73
- def extract_and_match_faces_from_video(video_path, known_faces):
74
  cap = cv2.VideoCapture(video_path)
75
- detected_faces = []
76
- intruders = []
77
  while cap.isOpened():
78
  ret, frame = cap.read()
79
- if not ret: break
 
80
  rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
81
  results = face_detector.process(rgb)
82
  if results.detections:
83
  for det in results.detections:
84
  bbox = det.location_data.relative_bounding_box
85
- ih, iw, _ = frame.shape
86
- x, y, w, h = int(bbox.xmin * iw), int(bbox.ymin * ih), \
87
- int(bbox.width * iw), int(bbox.height * ih)
88
- face = frame[y:y + h, x:x + w]
89
- if face.size > 0:
90
- gray_face = cv2.resize(cv2.cvtColor(face, cv2.COLOR_BGR2GRAY), FACE_SIZE)
91
- detected_faces.append(face)
92
- if is_intruder(gray_face, known_faces):
93
- intruders.append(face)
94
- if len(detected_faces) >= 5:
95
- break
 
96
  cap.release()
97
- return detected_faces, intruders
98
-
99
- def show_faces(title, face_list):
100
- st.sidebar.subheader(title)
101
- for face in face_list:
102
- st.sidebar.image(cv2.cvtColor(face, cv2.COLOR_BGR2RGB), width=100)
103
 
104
- # === STREAMLIT APP ===
105
- st.title("πŸ” Intruder Detection (with Face Match)")
106
- st.write("Detect and compare faces from CCTV videos streamed from Google Drive.")
 
107
 
108
- with st.spinner("πŸš€ Loading registered faces..."):
109
- known_faces = load_registered_faces()
110
 
111
- with st.spinner("πŸ“½ Processing CCTV videos..."):
112
- all_detected, all_intruders = [], []
113
  for folder_id in CCTVFEED_IDS:
114
- videos = get_drive_links(folder_id, ['video/mp4', 'video/x-msvideo', 'video/avi'])
115
- for vname, link in videos:
116
- st.subheader(f"πŸŽ₯ {vname}")
117
- video_file = download_file(link, ".mp4")
118
- detected, intruders = extract_and_match_faces_from_video(video_file, known_faces)
119
- os.remove(video_file)
120
- st.success(f"βœ… Detected {len(detected)} faces | 🚨 Intruders: {len(intruders)}")
121
- all_detected.extend(detected)
122
- all_intruders.extend(intruders)
123
-
124
- show_faces("πŸ‘₯ Faces from Video", all_detected)
125
- show_faces("🚨 Intruders", all_intruders)
126
 
127
- st.info(f"🎯 Total videos scanned: {len(CCTVFEED_IDS)} | Total intruders detected: {len(all_intruders)}")
 
5
  import tempfile
6
  import mediapipe as mp
7
  import os
8
+ from PIL import Image
9
+ from io import BytesIO
10
 
11
  # === CONFIGURATION ===
12
  API_KEY = "AIzaSyDojJrpauA0XZtCCDUuo9xeQHZQamYKsC4"
13
  CCTVFEED_IDS = ['1KJRkSf2SKEZ1mXS9_si65IwMBtjs6p4n']
14
  REG_FOLDER_ID = '1qkcR7nQTEtiMH9OFUv2bGxVn08E3dKjF'
15
  INTRUDER_FOLDER_ID = '1PPAUWU-wMx7fek73p-hqPqYQypYtG8Ob'
 
 
16
 
17
  # === SETUP ===
18
  mp_face_detection = mp.solutions.face_detection
19
  mp_drawing = mp.solutions.drawing_utils
20
  face_detector = mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5)
21
 
22
+ # === FUNCTIONS ===
23
+ def list_drive_files(folder_id):
24
+ url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)"
25
+ response = requests.get(url)
26
+ return response.json().get("files", [])
 
27
 
28
+ def stream_drive_file(file_id, suffix):
29
+ url = f"https://drive.google.com/uc?id={file_id}&export=download"
30
+ response = requests.get(url, stream=True)
31
  temp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
32
+ for chunk in response.iter_content(1024 * 1024):
33
  temp.write(chunk)
34
  temp.close()
35
  return temp.name
36
 
37
+ def mse(img1, img2):
38
+ if img1.shape != img2.shape:
39
+ return np.inf
40
+ return np.mean((img1 - img2) ** 2)
 
 
 
 
 
 
 
 
 
 
 
 
 
41
 
42
  def load_registered_faces():
43
  reg_faces = []
44
+ files = list_drive_files(REG_FOLDER_ID)
45
+ for f in files:
46
+ if "image" in f['mimeType']:
47
+ path = stream_drive_file(f['id'], ".jpg")
48
+ img = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
49
+ img = cv2.resize(img, (100, 100))
50
+ reg_faces.append(img)
51
+ os.remove(path)
52
  return reg_faces
53
 
54
+ def upload_intruder_image(img_np, intruder_name="intruder_face.jpg"):
55
+ # Convert image to JPEG in memory
56
+ img = Image.fromarray(cv2.cvtColor(img_np, cv2.COLOR_BGR2RGB))
57
+ buffer = BytesIO()
58
+ img.save(buffer, format="JPEG")
59
+ buffer.seek(0)
60
+
61
+ # Upload to intruder folder using resumable upload
62
+ upload_url = f"https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart&key={API_KEY}"
63
+ metadata = {
64
+ "name": intruder_name,
65
+ "parents": [INTRUDER_FOLDER_ID]
66
+ }
67
+ files = {
68
+ "metadata": ("metadata.json", BytesIO(str.encode(str(metadata).replace("'", '"'))), "application/json"),
69
+ "file": ("image.jpg", buffer, "image/jpeg")
70
+ }
71
+ response = requests.post(upload_url, files=files)
72
+ return response.ok
73
 
74
+ def detect_faces_from_video(video_path, reg_faces):
 
 
 
 
75
  cap = cv2.VideoCapture(video_path)
76
+ detected, intruders, sidebar_faces = 0, 0, []
77
+
78
  while cap.isOpened():
79
  ret, frame = cap.read()
80
+ if not ret:
81
+ break
82
  rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
83
  results = face_detector.process(rgb)
84
  if results.detections:
85
  for det in results.detections:
86
  bbox = det.location_data.relative_bounding_box
87
+ h, w, _ = frame.shape
88
+ x, y, bw, bh = int(bbox.xmin * w), int(bbox.ymin * h), int(bbox.width * w), int(bbox.height * h)
89
+ face = frame[y:y+bh, x:x+bw]
90
+ if face.size == 0:
91
+ continue
92
+ face_gray = cv2.cvtColor(cv2.resize(face, (100, 100)), cv2.COLOR_BGR2GRAY)
93
+ sidebar_faces.append(face)
94
+ is_known = any(mse(face_gray, kf) < 2000 for kf in reg_faces)
95
+ if not is_known:
96
+ upload_intruder_image(face, f"intruder_{intruders}.jpg")
97
+ intruders += 1
98
+ detected += 1
99
  cap.release()
100
+ return detected, sidebar_faces
 
 
 
 
 
101
 
102
+ # === STREAMLIT UI ===
103
+ st.set_page_config(layout="wide")
104
+ st.title("πŸ” Intruder Detection from CCTV Feed")
105
+ st.write("Streams public videos from Google Drive, compares detected faces with registered images, and uploads unknown faces to the intruder folder.")
106
 
107
+ reg_faces = load_registered_faces()
108
+ st.sidebar.markdown("### Faces Detected")
109
 
110
+ with st.spinner("Processing CCTV feed..."):
111
+ total_intruders = 0
112
  for folder_id in CCTVFEED_IDS:
113
+ files = list_drive_files(folder_id)
114
+ for f in files:
115
+ if f['mimeType'].startswith('video/'):
116
+ st.subheader(f"πŸŽ₯ {f['name']}")
117
+ path = stream_drive_file(f['id'], ".mp4")
118
+ intruders, detected_faces = detect_faces_from_video(path, reg_faces)
119
+ total_intruders += intruders
120
+ for face in detected_faces:
121
+ st.sidebar.image(face, width=100)
122
+ os.remove(path)
 
 
123
 
124
+ st.success(f"βœ… Intruders found and uploaded: {total_intruders}")