Rivalcoder commited on
Commit
defb3ac
Β·
1 Parent(s): c73c7d8
Files changed (3) hide show
  1. README.md +7 -10
  2. app.py +206 -55
  3. requirements.txt +4 -7
README.md CHANGED
@@ -1,12 +1,9 @@
1
  ---
2
- title: Video Processing
3
- emoji: πŸ‘
4
- colorFrom: gray
5
- colorTo: green
6
- sdk: gradio
7
- sdk_version: 5.23.1
8
- app_file: app.py
9
  pinned: false
10
- ---
11
-
12
- Check out the configuration reference at https://huggingface.co/docs/hub/spaces-config-reference
 
1
  ---
2
+ title: Emotion Detection API
3
+ emoji: 😊
4
+ colorFrom: blue
5
+ colorTo: purple
6
+ sdk: docker
7
+ app_port: 8000
 
8
  pinned: false
9
+ ---
 
 
app.py CHANGED
@@ -1,79 +1,201 @@
1
- import os
2
  import cv2
3
  import torch
4
  import numpy as np
5
  from PIL import Image
6
  import torchvision.transforms as transforms
7
  import time
 
8
  import json
9
- from typing import Dict, Any
10
- from fastapi import FastAPI, HTTPException, File, UploadFile
11
- from pydantic import BaseModel
12
- import gradio as gr
13
- import tempfile
14
 
15
  app = FastAPI()
16
 
17
  # Global variable to store the history of largest face detections
18
  largest_face_detections = []
19
 
20
- # EmotionCNN model definition (same as in your original code)
21
  class EmotionCNN(torch.nn.Module):
22
  def __init__(self, num_classes=7):
23
  super(EmotionCNN, self).__init__()
24
 
25
- # Your convolutional layers and other definitions
26
- # ...
27
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
28
  def forward(self, x):
29
- # Forward method as in your code
30
- pass
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
31
 
32
- # Load emotion model
33
  def load_emotion_model(model_path, device='cuda' if torch.cuda.is_available() else 'cpu'):
 
34
  checkpoint = torch.load(model_path, map_location=device)
 
35
  model = EmotionCNN(num_classes=7)
36
  model.load_state_dict(checkpoint['model_state_dict'])
37
  model.to(device)
38
  model.eval()
 
39
  return model
40
 
41
- # Process the uploaded video (either MP4 or WebM)
42
- async def process_video(video_file: UploadFile) -> Dict[str, Any]:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
43
  global largest_face_detections
44
  largest_face_detections = [] # Reset detections for new video
45
 
46
- # Path to models and other setup
47
  face_cascade_path = cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
48
- emotion_model_path = "best_emotion_model.pth"
49
 
 
50
  if not os.path.exists(face_cascade_path):
51
- raise HTTPException(status_code=400, detail="Face cascade classifier not found")
 
 
 
 
 
52
 
53
  if not os.path.exists(emotion_model_path):
54
- raise HTTPException(status_code=400, detail="Emotion model not found")
 
 
 
 
 
55
 
 
56
  device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
57
 
 
58
  try:
59
  face_cascade = cv2.CascadeClassifier(face_cascade_path)
60
  emotion_model = load_emotion_model(emotion_model_path, device)
61
  except Exception as e:
62
- raise HTTPException(status_code=500, detail=f"Error loading models: {str(e)}")
 
 
 
 
 
63
 
 
64
  emotions = ['Angry', 'Disgust', 'Fear', 'Happy', 'Sad', 'Surprise', 'Neutral']
65
 
66
- # Save the uploaded video file to a temporary directory without using shutil
67
- temp_dir = tempfile.mkdtemp()
68
- video_path = os.path.join(temp_dir, "uploaded_video")
69
-
70
- # Open the video file stream and save it as a local file
71
- with open(video_path, "wb") as f:
72
- f.write(await video_file.read())
73
-
74
  cap = cv2.VideoCapture(video_path)
75
  if not cap.isOpened():
76
- raise HTTPException(status_code=400, detail=f"Could not open video file at {video_path}")
 
 
 
 
 
77
 
78
  frame_count = 0
79
  total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
@@ -85,15 +207,27 @@ async def process_video(video_file: UploadFile) -> Dict[str, Any]:
85
 
86
  frame_count += 1
87
 
 
88
  largest_face_area = 0
89
  current_detection = None
90
 
 
91
  gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
92
 
93
- faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))
 
 
 
 
 
 
94
 
 
95
  for (x, y, w, h) in faces:
 
96
  face_area = w * h
 
 
97
  margin = 20
98
  x1 = max(0, x - margin)
99
  y1 = max(0, y - margin)
@@ -102,11 +236,14 @@ async def process_video(video_file: UploadFile) -> Dict[str, Any]:
102
 
103
  face_img = frame[y1:y2, x1:x2]
104
 
 
105
  if face_img.size == 0 or face_img.shape[0] < 20 or face_img.shape[1] < 20:
106
  continue
107
 
 
108
  face_tensor = preprocess_face(face_img)
109
 
 
110
  with torch.no_grad():
111
  face_tensor = face_tensor.to(device)
112
  output = emotion_model(face_tensor)
@@ -114,8 +251,10 @@ async def process_video(video_file: UploadFile) -> Dict[str, Any]:
114
  emotion_idx = torch.argmax(output, dim=1).item()
115
  confidence = probabilities[0][emotion_idx].item()
116
 
 
117
  emotion = emotions[emotion_idx]
118
 
 
119
  if face_area > largest_face_area:
120
  largest_face_area = face_area
121
  current_detection = {
@@ -125,11 +264,14 @@ async def process_video(video_file: UploadFile) -> Dict[str, Any]:
125
  'frame_number': frame_count
126
  }
127
 
 
128
  if current_detection:
129
  largest_face_detections.append(current_detection)
130
 
 
131
  cap.release()
132
 
 
133
  if not largest_face_detections:
134
  return {
135
  "success": True,
@@ -138,11 +280,13 @@ async def process_video(video_file: UploadFile) -> Dict[str, Any]:
138
  "error": None
139
  }
140
 
 
141
  emotions_count = {}
142
  for detection in largest_face_detections:
143
  emotion = detection['emotion']
144
  emotions_count[emotion] = emotions_count.get(emotion, 0) + 1
145
 
 
146
  dominant_emotion = max(emotions_count.items(), key=lambda x: x[1])[0]
147
 
148
  return {
@@ -160,32 +304,39 @@ async def process_video(video_file: UploadFile) -> Dict[str, Any]:
160
  "error": None
161
  }
162
 
163
- class VideoRequest(BaseModel):
164
- path: str
165
-
166
- # FastAPI endpoint for processing the video file
167
- @app.post("/api/video")
168
- async def process_video_request(file: UploadFile = File(...)):
169
  try:
170
- results = await process_video(file)
171
- return results
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
172
  except Exception as e:
 
 
 
173
  raise HTTPException(status_code=500, detail=str(e))
174
 
175
- # Gradio interface
176
- def gradio_interface():
177
- def process_gradio_video(video_file):
178
- # This function now accepts WebM files and other video formats.
179
- return process_video(video_file)
180
-
181
- # Remove the `type` argument from `gr.Video()`
182
- interface = gr.Interface(
183
- fn=process_gradio_video,
184
- inputs=gr.Video(), # This will automatically handle file uploads
185
- outputs="json"
186
- )
187
-
188
- return interface
189
-
190
- # Launch Gradio Interface on FastAPI
191
- gradio_interface().launch(server_name="0.0.0.0", server_port=7860)
 
 
1
  import cv2
2
  import torch
3
  import numpy as np
4
  from PIL import Image
5
  import torchvision.transforms as transforms
6
  import time
7
+ import os
8
  import json
9
+ from typing import Dict, List, Any
10
+ from fastapi import FastAPI, UploadFile, File, Form, HTTPException
11
+ from fastapi.responses import JSONResponse
12
+ import uuid
13
+ from pathlib import Path
14
 
15
  app = FastAPI()
16
 
17
  # Global variable to store the history of largest face detections
18
  largest_face_detections = []
19
 
20
+ # EmotionCNN model definition
21
  class EmotionCNN(torch.nn.Module):
22
  def __init__(self, num_classes=7):
23
  super(EmotionCNN, self).__init__()
24
 
25
+ # First convolutional block
26
+ self.conv1 = torch.nn.Sequential(
27
+ torch.nn.Conv2d(1, 64, kernel_size=3, padding=1),
28
+ torch.nn.BatchNorm2d(64),
29
+ torch.nn.ReLU(),
30
+ torch.nn.MaxPool2d(kernel_size=2, stride=2)
31
+ )
32
+
33
+ # Second convolutional block
34
+ self.conv2 = torch.nn.Sequential(
35
+ torch.nn.Conv2d(64, 128, kernel_size=3, padding=1),
36
+ torch.nn.BatchNorm2d(128),
37
+ torch.nn.ReLU(),
38
+ torch.nn.MaxPool2d(kernel_size=2, stride=2)
39
+ )
40
+
41
+ # Third convolutional block
42
+ self.conv3 = torch.nn.Sequential(
43
+ torch.nn.Conv2d(128, 256, kernel_size=3, padding=1),
44
+ torch.nn.BatchNorm2d(256),
45
+ torch.nn.ReLU(),
46
+ torch.nn.MaxPool2d(kernel_size=2, stride=2)
47
+ )
48
+
49
+ # Fourth convolutional block
50
+ self.conv4 = torch.nn.Sequential(
51
+ torch.nn.Conv2d(256, 512, kernel_size=3, padding=1),
52
+ torch.nn.BatchNorm2d(512),
53
+ torch.nn.ReLU(),
54
+ torch.nn.MaxPool2d(kernel_size=2, stride=2)
55
+ )
56
+
57
+ # Fifth convolutional block with residual connection
58
+ self.conv5 = torch.nn.Sequential(
59
+ torch.nn.Conv2d(512, 512, kernel_size=3, padding=1),
60
+ torch.nn.BatchNorm2d(512),
61
+ torch.nn.ReLU()
62
+ )
63
+
64
+ # Attention mechanism
65
+ self.attention = torch.nn.Sequential(
66
+ torch.nn.Conv2d(512, 1, kernel_size=1),
67
+ torch.nn.Sigmoid()
68
+ )
69
+
70
+ # Fully connected layers
71
+ self.fc = torch.nn.Sequential(
72
+ torch.nn.Dropout(0.5),
73
+ torch.nn.Linear(512 * 3 * 3, 1024),
74
+ torch.nn.ReLU(),
75
+ torch.nn.Dropout(0.5),
76
+ torch.nn.Linear(1024, 512),
77
+ torch.nn.ReLU(),
78
+ torch.nn.Dropout(0.3),
79
+ torch.nn.Linear(512, num_classes)
80
+ )
81
+
82
  def forward(self, x):
83
+ x = self.conv1(x)
84
+ x = self.conv2(x)
85
+ x = self.conv3(x)
86
+ x = self.conv4(x)
87
+
88
+ # Fifth conv block with residual connection
89
+ x_res = x
90
+ x = self.conv5(x)
91
+ x = x + x_res
92
+
93
+ # Apply attention
94
+ attn = self.attention(x)
95
+ x = x * attn
96
+
97
+ # Flatten
98
+ x = x.view(x.size(0), -1)
99
+
100
+ # Fully connected
101
+ x = self.fc(x)
102
+ return x
103
 
 
104
  def load_emotion_model(model_path, device='cuda' if torch.cuda.is_available() else 'cpu'):
105
+ """Load the emotion recognition model"""
106
  checkpoint = torch.load(model_path, map_location=device)
107
+
108
  model = EmotionCNN(num_classes=7)
109
  model.load_state_dict(checkpoint['model_state_dict'])
110
  model.to(device)
111
  model.eval()
112
+
113
  return model
114
 
115
+ def preprocess_face(face_img, size=(48, 48)):
116
+ """Preprocess face image for emotion detection"""
117
+ transform = transforms.Compose([
118
+ transforms.Resize(size),
119
+ transforms.ToTensor(),
120
+ transforms.Normalize(mean=[0.5], std=[0.5])
121
+ ])
122
+
123
+ # Convert to PIL Image
124
+ if isinstance(face_img, np.ndarray):
125
+ face_img = Image.fromarray(cv2.cvtColor(face_img, cv2.COLOR_BGR2RGB))
126
+
127
+ # Convert to grayscale
128
+ face_img = face_img.convert('L')
129
+
130
+ # Apply transformations
131
+ face_tensor = transform(face_img).unsqueeze(0)
132
+ return face_tensor
133
+
134
+ def process_video(video_path: str) -> Dict[str, Any]:
135
+ """
136
+ Process a video file and return emotion detection results.
137
+
138
+ Args:
139
+ video_path (str): Path to the video file
140
+
141
+ Returns:
142
+ Dict containing:
143
+ - success (bool): Whether processing was successful
144
+ - message (str): Status message
145
+ - results (List[Dict]): List of emotion detection results
146
+ - error (str): Error message if any
147
+ """
148
  global largest_face_detections
149
  largest_face_detections = [] # Reset detections for new video
150
 
151
+ # Paths - adjust these paths according to your Hugging Face Space
152
  face_cascade_path = cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
153
+ emotion_model_path = "/data/best_emotion_model.pth" # Path in Hugging Face Space
154
 
155
+ # Check if models exist
156
  if not os.path.exists(face_cascade_path):
157
+ return {
158
+ "success": False,
159
+ "message": "Face cascade classifier not found",
160
+ "results": [],
161
+ "error": f"Error: Face cascade classifier not found at {face_cascade_path}"
162
+ }
163
 
164
  if not os.path.exists(emotion_model_path):
165
+ return {
166
+ "success": False,
167
+ "message": "Emotion model not found",
168
+ "results": [],
169
+ "error": f"Error: Emotion model not found at {emotion_model_path}"
170
+ }
171
 
172
+ # Set device
173
  device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
174
 
175
+ # Load models
176
  try:
177
  face_cascade = cv2.CascadeClassifier(face_cascade_path)
178
  emotion_model = load_emotion_model(emotion_model_path, device)
179
  except Exception as e:
180
+ return {
181
+ "success": False,
182
+ "message": "Error loading models",
183
+ "results": [],
184
+ "error": str(e)
185
+ }
186
 
187
+ # Emotion labels
188
  emotions = ['Angry', 'Disgust', 'Fear', 'Happy', 'Sad', 'Surprise', 'Neutral']
189
 
190
+ # Open video
 
 
 
 
 
 
 
191
  cap = cv2.VideoCapture(video_path)
192
  if not cap.isOpened():
193
+ return {
194
+ "success": False,
195
+ "message": "Could not open video file",
196
+ "results": [],
197
+ "error": f"Error: Could not open video file at {video_path}"
198
+ }
199
 
200
  frame_count = 0
201
  total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
 
207
 
208
  frame_count += 1
209
 
210
+ # Variables to track largest face
211
  largest_face_area = 0
212
  current_detection = None
213
 
214
+ # Convert frame to grayscale for face detection
215
  gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
216
 
217
+ # Detect faces using Haar Cascade
218
+ faces = face_cascade.detectMultiScale(
219
+ gray,
220
+ scaleFactor=1.1,
221
+ minNeighbors=5,
222
+ minSize=(30, 30)
223
+ )
224
 
225
+ # Process each detected face
226
  for (x, y, w, h) in faces:
227
+ # Calculate face area
228
  face_area = w * h
229
+
230
+ # Extract face region with margin
231
  margin = 20
232
  x1 = max(0, x - margin)
233
  y1 = max(0, y - margin)
 
236
 
237
  face_img = frame[y1:y2, x1:x2]
238
 
239
+ # Skip if face is too small
240
  if face_img.size == 0 or face_img.shape[0] < 20 or face_img.shape[1] < 20:
241
  continue
242
 
243
+ # Convert face to PIL Image and preprocess
244
  face_tensor = preprocess_face(face_img)
245
 
246
+ # Predict emotion
247
  with torch.no_grad():
248
  face_tensor = face_tensor.to(device)
249
  output = emotion_model(face_tensor)
 
251
  emotion_idx = torch.argmax(output, dim=1).item()
252
  confidence = probabilities[0][emotion_idx].item()
253
 
254
+ # Get emotion label
255
  emotion = emotions[emotion_idx]
256
 
257
+ # Update largest face if current face is larger
258
  if face_area > largest_face_area:
259
  largest_face_area = face_area
260
  current_detection = {
 
264
  'frame_number': frame_count
265
  }
266
 
267
+ # Add current detection to history if a face was detected
268
  if current_detection:
269
  largest_face_detections.append(current_detection)
270
 
271
+ # Release resources
272
  cap.release()
273
 
274
+ # Process results
275
  if not largest_face_detections:
276
  return {
277
  "success": True,
 
280
  "error": None
281
  }
282
 
283
+ # Calculate summary statistics
284
  emotions_count = {}
285
  for detection in largest_face_detections:
286
  emotion = detection['emotion']
287
  emotions_count[emotion] = emotions_count.get(emotion, 0) + 1
288
 
289
+ # Get dominant emotion
290
  dominant_emotion = max(emotions_count.items(), key=lambda x: x[1])[0]
291
 
292
  return {
 
304
  "error": None
305
  }
306
 
307
+ @app.post("/analyze-video")
308
+ async def analyze_video(file: UploadFile = File(...)):
 
 
 
 
309
  try:
310
+ # Create uploads directory if it doesn't exist
311
+ upload_dir = Path("uploads")
312
+ upload_dir.mkdir(exist_ok=True)
313
+
314
+ # Generate unique filename
315
+ file_ext = file.filename.split(".")[-1]
316
+ temp_filename = f"{uuid.uuid4()}.{file_ext}"
317
+ temp_path = upload_dir / temp_filename
318
+
319
+ # Save the uploaded file
320
+ with open(temp_path, "wb") as buffer:
321
+ buffer.write(await file.read())
322
+
323
+ # Process the video
324
+ result = process_video(str(temp_path))
325
+
326
+ # Clean up - remove the temporary file
327
+ os.remove(temp_path)
328
+
329
+ if not result["success"]:
330
+ raise HTTPException(status_code=400, detail=result.get("error", "Processing failed"))
331
+
332
+ return JSONResponse(content=result)
333
+
334
  except Exception as e:
335
+ # Clean up if file was created
336
+ if 'temp_path' in locals() and os.path.exists(temp_path):
337
+ os.remove(temp_path)
338
  raise HTTPException(status_code=500, detail=str(e))
339
 
340
+ if __name__ == "__main__":
341
+ import uvicorn
342
+ uvicorn.run(app, host="0.0.0.0", port=8000)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
requirements.txt CHANGED
@@ -1,11 +1,8 @@
1
- ultralytics
2
- torch
3
- torchvision
4
- gradio
5
  fastapi
6
  uvicorn
 
 
7
  opencv-python
8
- pillow
9
- opencv-python-headless
10
  numpy
11
- pydantic
 
 
 
 
 
 
1
  fastapi
2
  uvicorn
3
+ torch
4
+ torchvision
5
  opencv-python
 
 
6
  numpy
7
+ Pillow
8
+ python-multipart