SuriRaja commited on
Commit
ae58e6e
·
verified ·
1 Parent(s): 445748f

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +78 -103
app.py CHANGED
@@ -1,119 +1,94 @@
1
- import os
2
  import cv2
3
- import face_recognition
4
- import numpy as np
5
- import time
6
- import pickle
 
 
7
  import gradio as gr
8
- from googleapiclient.discovery import build
9
- from googleapiclient.http import MediaIoBaseDownload, MediaFileUpload
10
- from io import BytesIO
11
-
12
- # === CONFIGURATION ===
13
- CCTVFEED_IDS = ['1KJRkSf2SKEZ1mXS9_si65IwMBtjs6p4n']
14
- REG_FOLDER_ID = '1qkcR7nQTEtiMH9OFUv2bGxVn08E3dKjF'
15
- INTRUDER_FOLDER_ID = '1PPAUWU-wMx7fek73p-hqPqYQypYtG8Ob'
16
- TOKEN_PICKLE = 'token.pickle'
17
 
18
- def init_gdrive():
19
- with open(TOKEN_PICKLE, 'rb') as token:
20
- creds = pickle.load(token)
21
- return build('drive', 'v3', credentials=creds)
22
 
23
- def download_file(service, file_id, local_path):
24
- request = service.files().get_media(fileId=file_id)
25
- fh = BytesIO()
26
- downloader = MediaIoBaseDownload(fh, request)
27
- while not downloader.next_chunk()[1]:
28
- pass
29
- fh.seek(0)
30
- with open(local_path, 'wb') as f:
31
- f.write(fh.read())
 
 
 
 
32
 
33
- def list_files(service, folder_id, mime_type='video/mp4'):
34
- query = f"'{folder_id}' in parents and mimeType='{mime_type}'"
35
- result = service.files().list(q=query, fields="files(id, name)").execute()
36
- return result.get('files', [])
 
 
 
 
 
 
37
 
38
- def load_known_faces(service):
39
- known_encodings = []
40
- files = list_files(service, REG_FOLDER_ID, mime_type='image/jpeg')
41
- print(f"[INFO] Found {len(files)} registered images.")
42
- for f in files:
43
- fname = f"{f['id']}.jpg"
44
- download_file(service, f['id'], fname)
45
- image = face_recognition.load_image_file(fname)
46
- encodings = face_recognition.face_encodings(image)
47
- if encodings:
48
- known_encodings.append(encodings[0])
49
- os.remove(fname)
50
- return known_encodings
51
 
52
- def upload_intruder(service, img_path):
53
- file_metadata = {'name': os.path.basename(img_path), 'parents': [INTRUDER_FOLDER_ID]}
54
- media = MediaFileUpload(img_path, mimetype='image/jpeg')
55
- service.files().create(body=file_metadata, media_body=media, fields='id').execute()
56
 
57
- def is_match(encoding, known_encodings, tolerance=0.45):
58
- results = face_recognition.compare_faces(known_encodings, encoding, tolerance)
59
- return any(results)
 
 
 
 
 
60
 
61
- def process_video(video_path, known_encodings, service):
62
- cap = cv2.VideoCapture(video_path)
63
- frame_count, detections = 0, 0
64
- uploaded = []
65
 
66
- while True:
67
- ret, frame = cap.read()
68
- if not ret:
69
- break
70
- if frame_count % 30 == 0:
71
- rgb = frame[:, :, ::-1]
72
- locations = face_recognition.face_locations(rgb)
73
- encodings = face_recognition.face_encodings(rgb, locations)
74
- print(f"[FRAME {frame_count}] Found {len(encodings)} face(s).")
75
- for encoding in encodings:
76
- if not is_match(encoding, known_encodings):
77
- ts = int(time.time())
78
- fname = f"intruder_{ts}_{frame_count}.jpg"
79
- cv2.imwrite(fname, frame)
80
- upload_intruder(service, fname)
81
- uploaded.append(fname)
82
- os.remove(fname)
83
- detections += 1
84
- frame_count += 1
85
 
86
  cap.release()
87
- print(f"[INFO] Done with {video_path}, {frame_count} frames processed.")
88
- return detections, uploaded
89
 
90
- def run_detection():
91
- try:
92
- service = init_gdrive()
93
- known_encodings = load_known_faces(service)
94
- total_detected, total_videos, intruder_imgs = 0, 0, []
 
 
 
 
 
 
 
 
 
95
 
96
- for folder_id in CCTVFEED_IDS:
97
- videos = list_files(service, folder_id)
98
- total_videos += len(videos)
99
- print(f"[INFO] Found {len(videos)} video(s) in {folder_id}.")
100
- for v in videos:
101
- path = f"{v['id']}.mp4"
102
- download_file(service, v['id'], path)
103
- detected, found = process_video(path, known_encodings, service)
104
- total_detected += detected
105
- intruder_imgs.extend(found)
106
- os.remove(path)
107
 
108
- summary = f"✅ Processed {total_videos} video(s). {total_detected} intruder(s) detected."
109
- return summary, intruder_imgs
110
- except Exception as e:
111
- return f" Error: {str(e)}", []
 
 
 
112
 
113
- gr.Interface(
114
- fn=run_detection,
115
- inputs=[],
116
- outputs=["text", "gallery"],
117
- title="CCTV Intruder Detector (GPU Ready)",
118
- description="Detects intruders by comparing faces in CCTV footage with registered staff/guest photos."
119
- ).launch()
 
 
1
  import cv2
2
+ import mediapipe as mp
3
+ import os
4
+ import tempfile
5
+ import shutil
6
+ import requests
7
+ from bs4 import BeautifulSoup
8
  import gradio as gr
 
 
 
 
 
 
 
 
 
9
 
10
+ # MediaPipe setup
11
+ mp_face_detection = mp.solutions.face_detection
12
+ mp_drawing = mp.solutions.drawing_utils
 
13
 
14
+ def download_public_gdrive_file(file_id, save_dir):
15
+ try:
16
+ url = f"https://drive.google.com/uc?id={file_id}&export=download"
17
+ response = requests.get(url, stream=True)
18
+ if response.status_code == 200:
19
+ ext = ".avi" if "avi" in response.headers.get('Content-Disposition', '') else ".mp4"
20
+ output_path = os.path.join(save_dir, f"{file_id}{ext}")
21
+ with open(output_path, 'wb') as f:
22
+ f.write(response.content)
23
+ return output_path
24
+ except Exception as e:
25
+ print(f"Error downloading {file_id}: {e}")
26
+ return None
27
 
28
+ def get_video_ids_from_folder(folder_url):
29
+ response = requests.get(folder_url)
30
+ soup = BeautifulSoup(response.text, "html.parser")
31
+ video_ids = []
32
+ for link in soup.find_all("a"):
33
+ href = link.get("href", "")
34
+ if "/file/d/" in href:
35
+ video_id = href.split("/file/d/")[1].split("/")[0]
36
+ video_ids.append(video_id)
37
+ return list(set(video_ids))
38
 
39
+ def detect_faces_from_video(video_path):
40
+ face_count = 0
41
+ temp_dir = tempfile.mkdtemp()
42
+ frames_with_faces = []
 
 
 
 
 
 
 
 
 
43
 
44
+ cap = cv2.VideoCapture(video_path)
45
+ frame_index = 0
 
 
46
 
47
+ with mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5) as detector:
48
+ while cap.isOpened():
49
+ ret, frame = cap.read()
50
+ if not ret:
51
+ break
52
+ frame_index += 1
53
+ rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
54
+ results = detector.process(rgb_frame)
55
 
56
+ if results.detections:
57
+ face_count += len(results.detections)
58
+ for detection in results.detections:
59
+ mp_drawing.draw_detection(frame, detection)
60
 
61
+ frame_path = os.path.join(temp_dir, f"frame_{frame_index}.jpg")
62
+ cv2.imwrite(frame_path, frame)
63
+ frames_with_faces.append(frame_path)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
64
 
65
  cap.release()
66
+ return frames_with_faces, face_count
 
67
 
68
+ def run_on_drive_folder(folder_url):
69
+ folder_url = folder_url.strip()
70
+ video_ids = get_video_ids_from_folder(folder_url)
71
+ if not video_ids:
72
+ return "❌ No videos found or access denied.", []
73
+
74
+ frames_all = []
75
+ total_faces = 0
76
+ for vid_id in video_ids[:5]: # Limit to 5 videos to manage processing time
77
+ video_path = download_public_gdrive_file(vid_id, "/tmp")
78
+ if video_path:
79
+ frames, count = detect_faces_from_video(video_path)
80
+ frames_all.extend(frames)
81
+ total_faces += count
82
 
83
+ return f"✅ Detected {total_faces} face(s) across {len(frames_all)} frame(s) from {len(video_ids)} video(s).", frames_all
 
 
 
 
 
 
 
 
 
 
84
 
85
+ demo = gr.Interface(
86
+ fn=run_on_drive_folder,
87
+ inputs=gr.Textbox(label="Google Drive Folder URL (Public)"),
88
+ outputs=[gr.Textbox(label="Summary"), gr.Gallery(label="Detected Frames")],
89
+ title="Intruder Detection from Google Drive Folder (MP4 + AVI)",
90
+ description="Paste the link to a *public* Google Drive folder containing .mp4 or .avi videos."
91
+ )
92
 
93
+ if __name__ == "__main__":
94
+ demo.launch()