SuriRaja commited on
Commit
615bf6c
Β·
verified Β·
1 Parent(s): c68c3f5

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +113 -109
app.py CHANGED
@@ -5,134 +5,138 @@ import requests
5
  import tempfile
6
  import mediapipe as mp
7
  import os
8
- import pickle
 
 
9
  from googleapiclient.discovery import build
10
  from googleapiclient.http import MediaFileUpload
11
- from PIL import Image
 
 
12
 
13
  # === CONFIGURATION ===
14
  API_KEY = "AIzaSyDojJrpauA0XZtCCDUuo9xeQHZQamYKsC4"
15
  CCTVFEED_IDS = ['1KJRkSf2SKEZ1mXS9_si65IwMBtjs6p4n']
16
  REG_FOLDER_ID = '1qkcR7nQTEtiMH9OFUv2bGxVn08E3dKjF'
17
  INTRUDER_FOLDER_ID = '1PPAUWU-wMx7fek73p-hqPqYQypYtG8Ob'
 
18
 
 
19
  mp_face_detection = mp.solutions.face_detection
 
20
  face_detector = mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5)
21
 
22
- # === INTRUDER UPLOAD ===
23
- def get_drive_service():
24
- try:
25
- with open("token.pickle", "rb") as token:
 
26
  creds = pickle.load(token)
27
- return build("drive", "v3", credentials=creds)
28
- except Exception as e:
29
- print(f"❌ Drive auth failed: {e}")
30
- return None
31
-
32
- def upload_intruder_face(image_path):
33
- service = get_drive_service()
34
- if service is None:
35
- print("πŸ”’ Upload skipped (auth failed)")
36
- return
37
-
38
- metadata = {"name": os.path.basename(image_path), "parents": [INTRUDER_FOLDER_ID]}
39
- media = MediaFileUpload(image_path, mimetype="image/jpeg")
40
- try:
41
- service.files().create(body=metadata, media_body=media, fields="id").execute()
42
- print(f"βœ… Uploaded to INTRUDER folder: {image_path}")
43
- except Exception as e:
44
- print(f"❌ Upload failed: {e}")
45
-
46
- # === HELPERS ===
47
- def get_drive_files(folder_id, allowed_mime):
48
- url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)"
49
- response = requests.get(url)
50
- files = response.json().get("files", [])
51
- return [(f["name"], f["id"]) for f in files if f["mimeType"] in allowed_mime]
52
-
53
- def download_file(file_id, suffix):
54
- url = f"https://drive.google.com/uc?id={file_id}&export=download"
55
  response = requests.get(url, stream=True)
56
- temp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
57
  for chunk in response.iter_content(chunk_size=1024 * 1024):
58
  temp.write(chunk)
59
  temp.close()
60
  return temp.name
61
 
62
- def extract_face_embedding(image):
63
- rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
64
- results = face_detector.process(rgb)
65
- if results.detections:
66
- for det in results.detections:
67
- bbox = det.location_data.relative_bounding_box
68
- h, w, _ = image.shape
69
- x1 = int(bbox.xmin * w)
70
- y1 = int(bbox.ymin * h)
71
- x2 = int((bbox.xmin + bbox.width) * w)
72
- y2 = int((bbox.ymin + bbox.height) * h)
73
- face_crop = image[y1:y2, x1:x2]
74
- return cv2.resize(face_crop, (128, 128))
75
- return None
76
-
77
- def is_match(face1, face2):
78
- if face1 is None or face2 is None:
79
- return False
80
- diff = np.linalg.norm(face1.astype("float") - face2.astype("float"))
81
- return diff < 30.0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
82
 
83
  # === STREAMLIT UI ===
84
- st.set_page_config(page_title="Intruder Detection", layout="wide")
85
- st.title("πŸ” Intruder Detection from CCTV Feed")
86
- st.write("Streaming videos from Google Drive, detecting and matching faces.")
87
-
88
- sidebar_faces = []
89
-
90
- with st.spinner("Loading registered faces..."):
91
- registered = []
92
- reg_images = get_drive_files(REG_FOLDER_ID, ["image/jpeg", "image/png"])
93
- for name, fid in reg_images:
94
- img_path = download_file(fid, ".jpg")
95
- img = cv2.imread(img_path)
96
- emb = extract_face_embedding(img)
97
- if emb is not None:
98
- registered.append((name, emb))
99
-
100
- with st.spinner("Scanning CCTV feed..."):
101
- total_intruders = 0
102
  for folder_id in CCTVFEED_IDS:
103
- videos = get_drive_files(folder_id, ["video/mp4", "video/x-msvideo", "video/avi"])
104
- for vname, fid in videos:
105
- st.subheader(f"πŸŽ₯ {vname}")
106
- vid_path = download_file(fid, ".mp4")
107
- cap = cv2.VideoCapture(vid_path)
108
- frame_count = 0
109
- matched_faces = 0
110
- unmatched_faces = 0
111
-
112
- while cap.isOpened():
113
- ret, frame = cap.read()
114
- if not ret:
115
- break
116
- face_img = extract_face_embedding(frame)
117
- if face_img is not None:
118
- match_found = any(is_match(face_img, emb) for _, emb in registered)
119
- if match_found:
120
- matched_faces += 1
121
- else:
122
- unmatched_faces += 1
123
- temp = tempfile.NamedTemporaryFile(delete=False, suffix=".jpg")
124
- cv2.imwrite(temp.name, face_img)
125
- upload_intruder_face(temp.name)
126
- sidebar_faces.append(temp.name)
127
- frame_count += 1
128
- cap.release()
129
-
130
- st.success(f"🎯 Processed {frame_count} frames β€” Matched: {matched_faces}, Unmatched: {unmatched_faces}")
131
- total_intruders += unmatched_faces
132
-
133
- st.info(f"🎯 Completed processing all videos. Intruders detected: {total_intruders}")
134
-
135
- with st.sidebar:
136
- st.markdown("## 🚨 Intruder Snapshots")
137
- for face_path in sidebar_faces:
138
- st.image(face_path, width=160)
 
5
  import tempfile
6
  import mediapipe as mp
7
  import os
8
+ import hashlib
9
+ import io
10
+ from PIL import Image
11
  from googleapiclient.discovery import build
12
  from googleapiclient.http import MediaFileUpload
13
+ from google_auth_oauthlib.flow import InstalledAppFlow
14
+ from google.auth.transport.requests import Request
15
+ import pickle
16
 
17
  # === CONFIGURATION ===
18
  API_KEY = "AIzaSyDojJrpauA0XZtCCDUuo9xeQHZQamYKsC4"
19
  CCTVFEED_IDS = ['1KJRkSf2SKEZ1mXS9_si65IwMBtjs6p4n']
20
  REG_FOLDER_ID = '1qkcR7nQTEtiMH9OFUv2bGxVn08E3dKjF'
21
  INTRUDER_FOLDER_ID = '1PPAUWU-wMx7fek73p-hqPqYQypYtG8Ob'
22
+ SCOPES = ['https://www.googleapis.com/auth/drive.file']
23
 
24
+ # === SETUP ===
25
  mp_face_detection = mp.solutions.face_detection
26
+ mp_drawing = mp.solutions.drawing_utils
27
  face_detector = mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5)
28
 
29
+ # === AUTH FOR UPLOAD ===
30
+ def get_drive_service_from_token():
31
+ creds = None
32
+ if os.path.exists('token.pickle'):
33
+ with open('token.pickle', 'rb') as token:
34
  creds = pickle.load(token)
35
+ if creds and creds.valid:
36
+ return build('drive', 'v3', credentials=creds)
37
+ return None
38
+
39
+ def upload_to_drive_using_token(file_path, filename, folder_id):
40
+ service = get_drive_service_from_token()
41
+ if not service:
42
+ return False
43
+ file_metadata = {
44
+ 'name': filename,
45
+ 'parents': [folder_id]
46
+ }
47
+ media = MediaFileUpload(file_path, resumable=True)
48
+ service.files().create(body=file_metadata, media_body=media, fields='id').execute()
49
+ return True
50
+
51
+ # === UTILITIES ===
52
+ def get_drive_links(folder_id, mime_prefix="video/"):
53
+ api_url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)"
54
+ res = requests.get(api_url).json()
55
+ files = res.get("files", [])
56
+ return [(f["name"], f["id"]) for f in files if f["mimeType"].startswith(mime_prefix)]
57
+
58
+ def get_file_download_url(file_id):
59
+ return f"https://drive.google.com/uc?id={file_id}&export=download"
60
+
61
+ def download_file(url):
 
62
  response = requests.get(url, stream=True)
63
+ temp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
64
  for chunk in response.iter_content(chunk_size=1024 * 1024):
65
  temp.write(chunk)
66
  temp.close()
67
  return temp.name
68
 
69
+ def list_registered_faces():
70
+ st.sidebar.markdown("### πŸ§‘β€πŸ’Ό Registered Faces")
71
+ images = get_drive_links(REG_FOLDER_ID, mime_prefix="image/")
72
+ for name, file_id in images:
73
+ img_url = get_file_download_url(file_id)
74
+ img = Image.open(requests.get(img_url, stream=True).raw)
75
+ st.sidebar.image(img, caption=name, width=120)
76
+
77
+ # === FACE DETECTION ===
78
+ def detect_faces_from_video(video_path, uploaded_hashes):
79
+ cap = cv2.VideoCapture(video_path)
80
+ detected = 0
81
+ displayed_faces = []
82
+
83
+ st.sidebar.markdown("### πŸŽ₯ Faces Detected from Video")
84
+
85
+ while cap.isOpened():
86
+ ret, frame = cap.read()
87
+ if not ret:
88
+ break
89
+
90
+ frame = cv2.resize(frame, (640, 480))
91
+ rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
92
+ results = face_detector.process(rgb)
93
+
94
+ if results.detections:
95
+ for det in results.detections:
96
+ bbox = det.location_data.relative_bounding_box
97
+ ih, iw, _ = frame.shape
98
+ x, y, w, h = int(bbox.xmin * iw), int(bbox.ymin * ih), int(bbox.width * iw), int(bbox.height * ih)
99
+ x, y = max(x, 0), max(y, 0)
100
+ face = frame[y:y + h, x:x + w]
101
+ gray = cv2.cvtColor(face, cv2.COLOR_BGR2GRAY)
102
+ resized = cv2.resize(gray, (100, 100))
103
+ hsh = hashlib.md5(resized.tobytes()).hexdigest()
104
+
105
+ if hsh not in uploaded_hashes:
106
+ uploaded_hashes.add(hsh)
107
+ tmp_path = os.path.join(tempfile.gettempdir(), f"intruder_{hsh}.jpg")
108
+ cv2.imwrite(tmp_path, face)
109
+ success = upload_to_drive_using_token(tmp_path, f"intruder_{hsh}.jpg", INTRUDER_FOLDER_ID)
110
+ if success:
111
+ st.sidebar.image(face, caption="🚨 Intruder", width=120)
112
+ else:
113
+ st.warning("❌ Upload failed.")
114
+ else:
115
+ st.sidebar.image(face, caption="β›” Duplicate", width=120)
116
+ detected += 1
117
+ cap.release()
118
+ return detected
119
 
120
  # === STREAMLIT UI ===
121
+ st.set_page_config(page_title="CCTV Face Detector", layout="wide")
122
+ st.title("πŸ” Intruder Detection from Google Drive CCTV Feed")
123
+
124
+ list_registered_faces()
125
+ uploaded_hashes = set()
126
+
127
+ with st.spinner("Processing videos..."):
128
+ total_detected = 0
 
 
 
 
 
 
 
 
 
 
129
  for folder_id in CCTVFEED_IDS:
130
+ videos = get_drive_links(folder_id)
131
+ for name, vid_id in videos:
132
+ st.subheader(f"🎞️ {name}")
133
+ vpath = download_file(get_file_download_url(vid_id))
134
+ count = detect_faces_from_video(vpath, uploaded_hashes)
135
+ os.remove(vpath)
136
+ if count > 0:
137
+ st.success(f"βœ… {count} faces detected in {name}")
138
+ total_detected += count
139
+ else:
140
+ st.warning(f"⚠️ No faces found in {name}")
141
+
142
+ st.info(f"🎯 Completed processing all videos.")