SuriRaja commited on
Commit
1cfb844
·
verified ·
1 Parent(s): fabf2b6

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +22 -25
app.py CHANGED
@@ -1,3 +1,4 @@
 
1
  import os
2
  import cv2
3
  import numpy as np
@@ -8,39 +9,34 @@ from googleapiclient.discovery import build
8
  from googleapiclient.http import MediaIoBaseDownload, MediaFileUpload
9
  from io import BytesIO
10
 
11
-
12
  # === CONFIGURATION ===
13
  CCTVFEED_IDS = ['1KJRkSf2SKEZ1mXS9_si65IwMBtjs6p4n'] # Replace with actual folder IDs
14
  REG_FOLDER_ID = '1qkcR7nQTEtiMH9OFUv2bGxVn08E3dKjF'
15
  INTRUDER_FOLDER_ID = '1PPAUWU-wMx7fek73p-hqPqYQypYtG8Ob'
16
- TOKEN_PICKLE = 'token.pickle' # Must be uploaded to your HF Space
17
  CASCADE_PATH = cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
 
18
 
19
- # === GOOGLE DRIVE AUTH ===
20
  def init_gdrive():
21
  with open(TOKEN_PICKLE, 'rb') as token:
22
  creds = pickle.load(token)
23
  return build('drive', 'v3', credentials=creds)
24
 
25
- # === DOWNLOAD FILE FROM DRIVE ===
26
  def download_file(service, file_id, local_path):
27
  request = service.files().get_media(fileId=file_id)
28
  fh = BytesIO()
29
  downloader = MediaIoBaseDownload(fh, request)
30
- done = False
31
- while not done:
32
- status, done = downloader.next_chunk()
33
  fh.seek(0)
34
  with open(local_path, 'wb') as f:
35
  f.write(fh.read())
36
 
37
- # === LIST FILES IN A FOLDER ===
38
  def list_files(service, folder_id, mime_type='video/mp4'):
39
  query = f"'{folder_id}' in parents and mimeType='{mime_type}'"
40
  result = service.files().list(q=query, fields="files(id, name)").execute()
41
  return result.get('files', [])
42
 
43
- # === LOAD REGISTERED FACES ===
44
  def load_known_faces(service):
45
  known_faces = []
46
  reg_files = list_files(service, REG_FOLDER_ID, mime_type='image/jpeg')
@@ -54,24 +50,23 @@ def load_known_faces(service):
54
  os.remove(fname)
55
  return known_faces
56
 
57
- # === UPLOAD INTRUDER IMAGE ===
58
  def upload_intruder(service, img_path):
59
  file_metadata = {'name': os.path.basename(img_path), 'parents': [INTRUDER_FOLDER_ID]}
60
  media = MediaFileUpload(img_path, mimetype='image/jpeg')
61
  service.files().create(body=file_metadata, media_body=media, fields='id').execute()
62
 
63
- # === BASIC FACE MATCH CHECK ===
64
- def is_match(face_img, known_faces, threshold=2000):
65
  face_img = cv2.resize(face_img, (100, 100))
66
  errors = [np.mean((face_img - kf) ** 2) for kf in known_faces]
 
67
  return any(err < threshold for err in errors)
68
 
69
- # === PROCESS A VIDEO ===
70
  def process_video(video_path, known_faces, service):
71
  face_cascade = cv2.CascadeClassifier(CASCADE_PATH)
72
  video_capture = cv2.VideoCapture(video_path)
73
- frame_count = 0
74
- detections = 0
 
75
  while True:
76
  ret, frame = video_capture.read()
77
  if not ret:
@@ -86,35 +81,37 @@ def process_video(video_path, known_faces, service):
86
  fname = f"intruder_{ts}.jpg"
87
  cv2.imwrite(fname, frame)
88
  upload_intruder(service, fname)
 
89
  os.remove(fname)
90
  detections += 1
91
  frame_count += 1
92
  video_capture.release()
93
- return detections
94
 
95
- # === MAIN RUN FUNCTION ===
96
  def run_detection():
97
  try:
98
  drive = init_gdrive()
99
  known_faces = load_known_faces(drive)
100
- total_detections = 0
 
101
  for folder_id in CCTVFEED_IDS:
102
  videos = list_files(drive, folder_id)
103
  for v in videos:
104
  video_path = f"{v['id']}.mp4"
105
  download_file(drive, v['id'], video_path)
106
- detections = process_video(video_path, known_faces, drive)
107
  total_detections += detections
 
108
  os.remove(video_path)
109
- return f"✅ Detection complete. {total_detections} intruder(s) uploaded."
 
110
  except Exception as e:
111
- return f"❌ Error: {str(e)}"
112
 
113
- # === GRADIO INTERFACE ===
114
  gr.Interface(
115
  fn=run_detection,
116
  inputs=[],
117
- outputs="text",
118
- title="CCTV Face Detection",
119
- description="Click the button to run detection on uploaded CCTV feeds."
120
  ).launch()
 
1
+ # app.py
2
  import os
3
  import cv2
4
  import numpy as np
 
9
  from googleapiclient.http import MediaIoBaseDownload, MediaFileUpload
10
  from io import BytesIO
11
 
 
12
  # === CONFIGURATION ===
13
  CCTVFEED_IDS = ['1KJRkSf2SKEZ1mXS9_si65IwMBtjs6p4n'] # Replace with actual folder IDs
14
  REG_FOLDER_ID = '1qkcR7nQTEtiMH9OFUv2bGxVn08E3dKjF'
15
  INTRUDER_FOLDER_ID = '1PPAUWU-wMx7fek73p-hqPqYQypYtG8Ob'
16
+ TOKEN_PICKLE = 'token.pickle'
17
  CASCADE_PATH = cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
18
+ MATCH_THRESHOLD = 8000 # More lenient threshold
19
 
 
20
  def init_gdrive():
21
  with open(TOKEN_PICKLE, 'rb') as token:
22
  creds = pickle.load(token)
23
  return build('drive', 'v3', credentials=creds)
24
 
 
25
  def download_file(service, file_id, local_path):
26
  request = service.files().get_media(fileId=file_id)
27
  fh = BytesIO()
28
  downloader = MediaIoBaseDownload(fh, request)
29
+ while not downloader.next_chunk()[1]:
30
+ pass
 
31
  fh.seek(0)
32
  with open(local_path, 'wb') as f:
33
  f.write(fh.read())
34
 
 
35
  def list_files(service, folder_id, mime_type='video/mp4'):
36
  query = f"'{folder_id}' in parents and mimeType='{mime_type}'"
37
  result = service.files().list(q=query, fields="files(id, name)").execute()
38
  return result.get('files', [])
39
 
 
40
  def load_known_faces(service):
41
  known_faces = []
42
  reg_files = list_files(service, REG_FOLDER_ID, mime_type='image/jpeg')
 
50
  os.remove(fname)
51
  return known_faces
52
 
 
53
  def upload_intruder(service, img_path):
54
  file_metadata = {'name': os.path.basename(img_path), 'parents': [INTRUDER_FOLDER_ID]}
55
  media = MediaFileUpload(img_path, mimetype='image/jpeg')
56
  service.files().create(body=file_metadata, media_body=media, fields='id').execute()
57
 
58
+ def is_match(face_img, known_faces, threshold=MATCH_THRESHOLD):
 
59
  face_img = cv2.resize(face_img, (100, 100))
60
  errors = [np.mean((face_img - kf) ** 2) for kf in known_faces]
61
+ print(f"[DEBUG] MSE Scores: {errors}")
62
  return any(err < threshold for err in errors)
63
 
 
64
  def process_video(video_path, known_faces, service):
65
  face_cascade = cv2.CascadeClassifier(CASCADE_PATH)
66
  video_capture = cv2.VideoCapture(video_path)
67
+ frame_count, detections = 0, 0
68
+ uploaded = []
69
+
70
  while True:
71
  ret, frame = video_capture.read()
72
  if not ret:
 
81
  fname = f"intruder_{ts}.jpg"
82
  cv2.imwrite(fname, frame)
83
  upload_intruder(service, fname)
84
+ uploaded.append(fname)
85
  os.remove(fname)
86
  detections += 1
87
  frame_count += 1
88
  video_capture.release()
89
+ return detections, uploaded
90
 
 
91
  def run_detection():
92
  try:
93
  drive = init_gdrive()
94
  known_faces = load_known_faces(drive)
95
+ total_detections, intruder_imgs = 0, []
96
+
97
  for folder_id in CCTVFEED_IDS:
98
  videos = list_files(drive, folder_id)
99
  for v in videos:
100
  video_path = f"{v['id']}.mp4"
101
  download_file(drive, v['id'], video_path)
102
+ detections, uploads = process_video(video_path, known_faces, drive)
103
  total_detections += detections
104
+ intruder_imgs.extend(uploads)
105
  os.remove(video_path)
106
+
107
+ return f"✅ {total_detections} intruder(s) detected.", [f for f in intruder_imgs]
108
  except Exception as e:
109
+ return f"❌ Error: {str(e)}", []
110
 
 
111
  gr.Interface(
112
  fn=run_detection,
113
  inputs=[],
114
+ outputs=["text", "gallery"],
115
+ title="CCTV Intruder Detection",
116
+ description="Click to run detection. Will display and upload unknown faces."
117
  ).launch()