SuriRaja commited on
Commit
ccde07a
Β·
verified Β·
1 Parent(s): 841e5f0

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +84 -83
app.py CHANGED
@@ -5,119 +5,120 @@ import requests
5
  import tempfile
6
  import mediapipe as mp
7
  import os
 
8
 
9
  # === CONFIGURATION ===
10
  API_KEY = "AIzaSyDojJrpauA0XZtCCDUuo9xeQHZQamYKsC4"
11
  CCTVFEED_IDS = ['1KJRkSf2SKEZ1mXS9_si65IwMBtjs6p4n']
12
  REG_FOLDER_ID = '1qkcR7nQTEtiMH9OFUv2bGxVn08E3dKjF'
13
  INTRUDER_FOLDER_ID = '1PPAUWU-wMx7fek73p-hqPqYQypYtG8Ob'
14
- MATCH_THRESHOLD = 8000
15
 
16
  # === SETUP ===
17
  mp_face_detection = mp.solutions.face_detection
18
  mp_drawing = mp.solutions.drawing_utils
19
- face_detector = mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5)
20
 
21
- # === UTILS ===
22
- def get_drive_files(folder_id, mime_filter=None):
23
- url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)"
24
- resp = requests.get(url)
25
- files = resp.json().get("files", [])
26
- if mime_filter:
27
- files = [f for f in files if f['mimeType'] in mime_filter]
28
- return files
29
 
30
- def download_file(file_id, suffix=".tmp"):
31
- url = f"https://drive.google.com/uc?id={file_id}&export=download"
32
- r = requests.get(url, stream=True)
33
- temp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
34
- for chunk in r.iter_content(chunk_size=1024*1024):
35
- temp.write(chunk)
36
- temp.close()
37
- return temp.name
 
 
 
 
 
38
 
39
- def upload_to_drive(filepath, folder_id):
40
- file_name = os.path.basename(filepath)
41
- metadata = {
42
- 'name': file_name,
43
- 'parents': [folder_id]
44
- }
45
- files = {
46
- 'data': ('metadata', '{"name": "%s"}' % file_name, 'application/json'),
47
- 'file': open(filepath, 'rb')
48
- }
49
- requests.post(
50
- f'https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart&key={API_KEY}',
51
- files=files
52
- )
53
 
54
- def is_known_face(face_img, known_faces):
55
- resized = cv2.resize(face_img, (100, 100))
56
- errors = [np.mean((resized - kf) ** 2) for kf in known_faces]
57
- return any(err < MATCH_THRESHOLD for err in errors)
 
 
 
 
 
 
58
 
59
- def load_registered_faces():
60
- reg_files = get_drive_files(REG_FOLDER_ID, ['image/jpeg', 'image/jpg', 'image/png'])
61
- known_faces = []
62
- for f in reg_files:
63
- path = download_file(f['id'], suffix=".jpg")
64
- img = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
65
- if img is not None:
66
- resized = cv2.resize(img, (100, 100))
67
- known_faces.append(resized)
68
- os.remove(path)
69
- return known_faces
70
 
71
- def process_video(video_path, known_faces):
72
  cap = cv2.VideoCapture(video_path)
73
- intruder_count = 0
 
74
 
75
  while cap.isOpened():
76
  ret, frame = cap.read()
77
  if not ret:
78
  break
79
  rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
80
- gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
81
  results = face_detector.process(rgb)
82
 
83
  if results.detections:
84
  for det in results.detections:
85
  bbox = det.location_data.relative_bounding_box
86
- h, w = frame.shape[:2]
87
- x, y = int(bbox.xmin * w), int(bbox.ymin * h)
88
- bw, bh = int(bbox.width * w), int(bbox.height * h)
89
- face_crop = gray[max(0, y):y+bh, max(0, x):x+bw]
90
- if face_crop.size < 100:
91
- continue
92
- if not is_known_face(face_crop, known_faces):
93
- intruder_count += 1
94
- intruder_rgb = rgb[max(0, y):y+bh, max(0, x):x+bw]
95
- st.image(intruder_rgb, caption=f"Intruder #{intruder_count}", width=150)
96
- temp = tempfile.NamedTemporaryFile(delete=False, suffix=".jpg")
97
- cv2.imwrite(temp.name, cv2.cvtColor(intruder_rgb, cv2.COLOR_RGB2BGR))
98
- upload_to_drive(temp.name, INTRUDER_FOLDER_ID)
99
- os.remove(temp.name)
 
100
  cap.release()
101
- return intruder_count
102
 
103
- # === UI ===
104
  st.title("πŸ” Intruder Detection from CCTV Feed")
105
- st.write("Streaming videos from Google Drive, comparing faces with registered users, and uploading intruders.")
106
-
107
- with st.spinner("πŸ”„ Loading registered faces..."):
108
- known_faces = load_registered_faces()
109
 
110
- with st.spinner("πŸ“Ή Processing CCTV feeds..."):
111
- total_intruders = 0
112
- for folder_id in CCTVFEED_IDS:
113
- vids = get_drive_files(folder_id, ['video/mp4', 'video/x-msvideo', 'video/avi'])
114
- st.write(f"πŸ“ Found {len(vids)} video(s) in CCTV folder.")
115
- for v in vids:
116
- st.subheader(f"πŸŽ₯ {v['name']}")
117
- vpath = download_file(v['id'], suffix=".mp4")
118
- intruders = process_video(vpath, known_faces)
119
- os.remove(vpath)
120
- st.success(f"βœ… Detected {intruders} intruder(s) in this video.")
121
- total_intruders += intruders
122
 
123
- st.info(f"πŸ”Ž Completed. Total intruders detected across all videos: {total_intruders}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5
  import tempfile
6
  import mediapipe as mp
7
  import os
8
+ from PIL import Image
9
 
10
  # === CONFIGURATION ===
11
  API_KEY = "AIzaSyDojJrpauA0XZtCCDUuo9xeQHZQamYKsC4"
12
  CCTVFEED_IDS = ['1KJRkSf2SKEZ1mXS9_si65IwMBtjs6p4n']
13
  REG_FOLDER_ID = '1qkcR7nQTEtiMH9OFUv2bGxVn08E3dKjF'
14
  INTRUDER_FOLDER_ID = '1PPAUWU-wMx7fek73p-hqPqYQypYtG8Ob'
15
+ MATCH_THRESHOLD = 7000
16
 
17
  # === SETUP ===
18
  mp_face_detection = mp.solutions.face_detection
19
  mp_drawing = mp.solutions.drawing_utils
20
+ face_detector = mp_face_detection.FaceDetection(model_selection=1, min_detection_confidence=0.5)
21
 
22
+ # === FUNCTIONS ===
23
+ def get_drive_image_faces(folder_id):
24
+ api_url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)"
25
+ response = requests.get(api_url)
26
+ files = response.json().get("files", [])
27
+ faces = []
 
 
28
 
29
+ for f in files:
30
+ if "image" in f["mimeType"]:
31
+ link = f"https://drive.google.com/uc?id={f['id']}&export=download"
32
+ img = requests.get(link).content
33
+ temp = tempfile.NamedTemporaryFile(delete=False, suffix=".jpg")
34
+ temp.write(img)
35
+ temp.close()
36
+ gray = cv2.imread(temp.name, cv2.IMREAD_GRAYSCALE)
37
+ if gray is not None:
38
+ gray = cv2.resize(gray, (100, 100))
39
+ faces.append(gray)
40
+ os.remove(temp.name)
41
+ return faces
42
 
43
+ def is_match(face_img, known_faces, threshold=MATCH_THRESHOLD):
44
+ face_img = cv2.resize(face_img, (100, 100))
45
+ errors = [np.mean((face_img - kf) ** 2) for kf in known_faces]
46
+ return any(err < threshold for err in errors)
 
 
 
 
 
 
 
 
 
 
47
 
48
+ def get_drive_video_links(folder_id):
49
+ api_url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)"
50
+ response = requests.get(api_url)
51
+ files = response.json().get("files", [])
52
+ video_links = []
53
+ for f in files:
54
+ if f["mimeType"] in ["video/mp4", "video/x-msvideo", "video/avi"]:
55
+ link = f"https://drive.google.com/uc?id={f['id']}&export=download"
56
+ video_links.append((f["name"], link))
57
+ return video_links
58
 
59
+ def download_video(link):
60
+ response = requests.get(link, stream=True)
61
+ temp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
62
+ for chunk in response.iter_content(chunk_size=1024*1024):
63
+ temp.write(chunk)
64
+ temp.close()
65
+ return temp.name
 
 
 
 
66
 
67
+ def detect_intruders(video_path, known_faces):
68
  cap = cv2.VideoCapture(video_path)
69
+ intruder_frames = 0
70
+ faces_displayed = 0
71
 
72
  while cap.isOpened():
73
  ret, frame = cap.read()
74
  if not ret:
75
  break
76
  rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
 
77
  results = face_detector.process(rgb)
78
 
79
  if results.detections:
80
  for det in results.detections:
81
  bbox = det.location_data.relative_bounding_box
82
+ h, w, _ = frame.shape
83
+ x1 = int(bbox.xmin * w)
84
+ y1 = int(bbox.ymin * h)
85
+ x2 = int((bbox.xmin + bbox.width) * w)
86
+ y2 = int((bbox.ymin + bbox.height) * h)
87
+
88
+ face_crop = frame[max(y1,0):y2, max(x1,0):x2]
89
+ gray_face = cv2.cvtColor(face_crop, cv2.COLOR_BGR2GRAY)
90
+
91
+ if not is_match(gray_face, known_faces):
92
+ intruder_frames += 1
93
+ faces_displayed += 1
94
+ st.image(face_crop, caption=f"Unmatched face #{faces_displayed}", channels="BGR", use_column_width=True)
95
+ if faces_displayed >= 3:
96
+ break
97
  cap.release()
98
+ return intruder_frames
99
 
100
+ # === STREAMLIT UI ===
101
  st.title("πŸ” Intruder Detection from CCTV Feed")
102
+ st.write("Streaming videos from Google Drive, comparing with registered faces, and displaying unmatched intruders.")
 
 
 
103
 
104
+ with st.spinner("Loading registered faces..."):
105
+ registered_faces = get_drive_image_faces(REG_FOLDER_ID)
 
 
 
 
 
 
 
 
 
 
106
 
107
+ if not registered_faces:
108
+ st.error("No registered face images found.")
109
+ else:
110
+ with st.spinner("Processing videos..."):
111
+ total_intruders = 0
112
+ for folder_id in CCTVFEED_IDS:
113
+ videos = get_drive_video_links(folder_id)
114
+ for vname, link in videos:
115
+ st.subheader(f"πŸŽ₯ {vname}")
116
+ video_file = download_video(link)
117
+ count = detect_intruders(video_file, registered_faces)
118
+ os.remove(video_file)
119
+ if count > 0:
120
+ st.success(f"βœ… {count} unmatched faces detected.")
121
+ total_intruders += 1
122
+ else:
123
+ st.warning("⚠️ No intruders detected.")
124
+ st.info(f"πŸ”Ž Completed. Total videos with intruders: {total_intruders}")