Spaces:
Running
Running
import gradio as gr | |
import torch | |
import numpy as np | |
import cv2 | |
from PIL import Image | |
import time | |
import os | |
import json | |
from datetime import datetime | |
from scipy import stats | |
from skimage.feature import graycomatrix, graycoprops, local_binary_pattern | |
from transformers import AutoImageProcessor, AutoModelForImageClassification | |
from functools import lru_cache | |
import pywt # 用于小波变换 | |
import uuid # 用于生成唯一ID | |
# 设置缓存目录 | |
os.environ["TRANSFORMERS_CACHE"] = "/tmp/transformers_cache" | |
os.environ["HF_HOME"] = "/tmp/hf_home" | |
os.makedirs("/tmp/transformers_cache", exist_ok=True) | |
os.makedirs("/tmp/hf_home", exist_ok=True) | |
# 创建反馈存储目录 | |
FEEDBACK_DIR = "/tmp/feedback" | |
os.makedirs(FEEDBACK_DIR, exist_ok=True) | |
############################################# | |
# 模型管理部分 - 增强版 | |
############################################# | |
class EnhancedModelManager: | |
"""Enhanced model manager with support for specialized AI detection models""" | |
def __init__(self): | |
self.models = { | |
"ai_detector": { | |
"name": "umm-maybe/AI-image-detector", # 将来替换为CNNDetection | |
"processor": None, | |
"model": None, | |
"weight": 0.6 # 增加专业AI检测模型权重 | |
}, | |
"general_classifier1": { | |
"name": "microsoft/resnet-50", # 将来替换为DFDC模型 | |
"processor": None, | |
"model": None, | |
"weight": 0.2 # 降低通用模型权重 | |
}, | |
"general_classifier2": { | |
"name": "google/vit-base-patch16-224", # 保留作为辅助模型 | |
"processor": None, | |
"model": None, | |
"weight": 0.2 # 降低通用模型权重 | |
} | |
} | |
self.loaded_models = set() | |
def load_model(self, key): | |
"""Lazy load a specific model only when needed""" | |
if key not in self.models: | |
return False | |
if key in self.loaded_models: | |
return True | |
try: | |
model_info = self.models[key] | |
model_info["processor"] = AutoImageProcessor.from_pretrained( | |
model_info["name"], | |
cache_dir="/tmp/transformers_cache" | |
) | |
# 加载模型 | |
model_info["model"] = AutoModelForImageClassification.from_pretrained( | |
model_info["name"], | |
cache_dir="/tmp/transformers_cache" | |
) | |
# 量化模型以减少内存使用并加速推理 | |
if torch.cuda.is_available(): | |
model_info["model"].to('cuda') | |
else: | |
# 尝试量化,如果失败则使用原始模型 | |
try: | |
model_info["model"] = torch.quantization.quantize_dynamic( | |
model_info["model"], {torch.nn.Linear}, dtype=torch.qint8 | |
) | |
except Exception as e: | |
print(f"量化失败,使用原始模型: {str(e)}") | |
self.loaded_models.add(key) | |
print(f"成功加载模型: {model_info['name']}") | |
return True | |
except Exception as e: | |
print(f"加载模型失败 {self.models[key]['name']}: {str(e)}") | |
return False | |
def get_model_prediction(self, key, image): | |
"""Get prediction from a specific model""" | |
if not self.load_model(key): | |
return None | |
model_info = self.models[key] | |
try: | |
# 处理图像 | |
inputs = model_info["processor"](images=image, return_tensors="pt") | |
# 如果有GPU则使用 | |
if torch.cuda.is_available(): | |
inputs = {k: v.to('cuda') for k, v in inputs.items()} | |
# 推理 | |
with torch.no_grad(): | |
outputs = model_info["model"](**inputs) | |
# 获取概率 | |
probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1) | |
# 处理输出 | |
ai_probability = self.process_model_output(model_info, outputs, probabilities) | |
# 返回结果 | |
predicted_class_idx = outputs.logits.argmax(-1).item() | |
return { | |
"model_name": model_info["name"], | |
"ai_probability": ai_probability, | |
"predicted_class": model_info["model"].config.id2label[predicted_class_idx], | |
"confidence": float(probabilities[0][predicted_class_idx].item()), | |
"raw_probability": ai_probability # 保存原始概率用于校准 | |
} | |
except Exception as e: | |
return { | |
"model_name": model_info["name"], | |
"error": str(e) | |
} | |
def process_model_output(self, model_info, outputs, probabilities): | |
"""Process different model outputs, return AI generation probability""" | |
model_name = model_info["name"].lower() | |
# 处理AI-image-detector模型 | |
if "ai-image-detector" in model_name: | |
ai_label_idx = None | |
human_label_idx = None | |
for idx, label in model_info["model"].config.id2label.items(): | |
label_lower = label.lower() | |
if "ai" in label_lower or "generated" in label_lower or "fake" in label_lower: | |
ai_label_idx = idx | |
if "human" in label_lower or "real" in label_lower: | |
human_label_idx = idx | |
if human_label_idx is not None: | |
ai_probability = 1 - float(probabilities[0][human_label_idx].item()) | |
elif ai_label_idx is not None: | |
ai_probability = float(probabilities[0][ai_label_idx].item()) | |
else: | |
ai_probability = 0.5 | |
return ai_probability | |
# 处理通用图像分类模型 | |
predicted_class_idx = outputs.logits.argmax(-1).item() | |
predicted_class = model_info["model"].config.id2label[predicted_class_idx].lower() | |
# 检查AI相关关键词 | |
ai_keywords = ["artificial", "generated", "synthetic", "fake", "computer", "digital", "cgi", "rendered"] | |
for keyword in ai_keywords: | |
if keyword in predicted_class: | |
return float(probabilities[0][predicted_class_idx].item()) | |
# 默认处理 | |
if "ai" in predicted_class or "generated" in predicted_class or "fake" in predicted_class: | |
return float(probabilities[0][predicted_class_idx].item()) | |
else: | |
return 0.3 # 降低默认AI概率,更保守的判断 | |
############################################# | |
# 特征提取部分 - 增强版 | |
############################################# | |
class EnhancedFeatureExtractor: | |
"""Enhanced image feature extraction with advanced features""" | |
def __init__(self): | |
# 中间结果缓存 | |
self.cache = {} | |
def clear_cache(self): | |
"""Clear the cache between images""" | |
self.cache = {} | |
def detect_image_type(self, image): | |
"""Detect the type of image (portrait, landscape, etc.)""" | |
img_array = np.array(image) | |
# 检测人脸 | |
try: | |
face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') | |
if len(img_array.shape) == 3: | |
gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) | |
else: | |
gray = img_array | |
faces = face_cascade.detectMultiScale(gray, 1.1, 4) | |
if len(faces) > 0: | |
return "portrait" | |
except: | |
pass | |
# 检查是否为风景图 (简单启发式方法) | |
if image.width > image.height * 1.5: | |
return "landscape" | |
# 默认类型 | |
return "general" | |
def get_grayscale(self, image_id): | |
"""Get grayscale version of image with caching""" | |
img_cv = self.cache.get('img_cv') | |
if img_cv is None: | |
return None | |
if len(img_cv.shape) == 3: | |
gray = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY) | |
else: | |
gray = img_cv | |
return gray | |
def analyze_image_features(self, image, downscale_factor=1.0): | |
"""Extract image features with optimizations""" | |
# 转换为OpenCV格式 | |
img_array = np.array(image) | |
if len(img_array.shape) == 3 and img_array.shape[2] == 3: | |
img_cv = cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR) | |
else: | |
img_cv = img_array | |
# 存入缓存 | |
self.cache['img_cv'] = img_cv | |
# 如果需要,对大图像进行降采样处理 | |
if downscale_factor < 1.0: | |
h, w = img_cv.shape[:2] | |
new_h, new_w = int(h * downscale_factor), int(w * downscale_factor) | |
img_cv = cv2.resize(img_cv, (new_w, new_h)) | |
self.cache['img_cv'] = img_cv | |
# 为图像创建唯一ID (用于lru_cache) | |
image_id = id(image) | |
self.cache['image_id'] = image_id | |
# 检测图像类型 | |
image_type = self.detect_image_type(image) | |
self.cache['image_type'] = image_type | |
features = {} | |
features["image_type"] = image_type | |
# 基本特征 | |
features["width"] = image.width | |
features["height"] = image.height | |
features["aspect_ratio"] = image.width / max(1, image.height) | |
# 并行提取不同组的特征 | |
self._extract_color_features(img_array, features) | |
self._extract_edge_features(img_cv, features, image_id) | |
self._extract_texture_features(img_cv, features, image_id) | |
self._extract_noise_features(img_cv, features) | |
self._extract_symmetry_features(img_cv, features) | |
self._extract_frequency_features(img_cv, features, image_id) | |
self._extract_advanced_features(img_cv, features, image_id) | |
return features | |
def _extract_color_features(self, img_array, features): | |
"""Extract color-related features""" | |
if len(img_array.shape) == 3: | |
# 使用向量化操作提高速度 | |
features["avg_red"] = float(np.mean(img_array[:,:,0])) | |
features["avg_green"] = float(np.mean(img_array[:,:,1])) | |
features["avg_blue"] = float(np.mean(img_array[:,:,2])) | |
# 颜色标准差 | |
features["color_std"] = float(np.std([ | |
features["avg_red"], | |
features["avg_green"], | |
features["avg_blue"] | |
])) | |
# 局部颜色变化 - 使用步进操作提高速度 | |
block_size = 10 | |
stride = 10 | |
h, w = img_array.shape[:2] | |
# 使用数组切片加速处理 | |
blocks = [] | |
for i in range(0, h-block_size, stride): | |
for j in range(0, w-block_size, stride): | |
blocks.append(img_array[i:i+block_size, j:j+block_size]) | |
if blocks: | |
# 转换为numpy数组进行向量化操作 | |
blocks_array = np.array(blocks) | |
std_values = np.std(blocks_array.reshape(len(blocks), -1), axis=1) | |
features["local_color_variation"] = float(np.mean(std_values)) | |
# 颜色直方图和熵 | |
r_hist, _ = np.histogram(img_array[:,:,0].flatten(), bins=64, range=(0, 256)) | |
g_hist, _ = np.histogram(img_array[:,:,1].flatten(), bins=64, range=(0, 256)) | |
b_hist, _ = np.histogram(img_array[:,:,2].flatten(), bins=64, range=(0, 256)) | |
# 计算熵 | |
r_entropy = stats.entropy(r_hist + 1e-10) | |
g_entropy = stats.entropy(g_hist + 1e-10) | |
b_entropy = stats.entropy(b_hist + 1e-10) | |
features["color_entropy"] = float((r_entropy + g_entropy + b_entropy) / 3) | |
# 颜色一致性 - AI生成图像通常颜色过于一致 | |
color_blocks = [] | |
for i in range(0, h-block_size, stride): | |
for j in range(0, w-block_size, stride): | |
block = img_array[i:i+block_size, j:j+block_size] | |
avg_color = np.mean(block, axis=(0,1)) | |
color_blocks.append(avg_color) | |
if color_blocks: | |
color_blocks = np.array(color_blocks) | |
features["color_consistency"] = float(1.0 - np.std(color_blocks) / 128.0) | |
def _extract_edge_features(self, img_cv, features, image_id): | |
"""Extract edge-related features""" | |
# 获取灰度图 | |
gray = self.get_grayscale(image_id) | |
if gray is None: | |
if len(img_cv.shape) == 3: | |
gray = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY) | |
else: | |
gray = img_cv | |
self.cache['gray'] = gray | |
# 边缘检测 | |
edges = cv2.Canny(gray, 100, 200) | |
self.cache['edges'] = edges | |
features["edge_density"] = float(np.sum(edges > 0) / (img_cv.shape[0] * img_cv.shape[1])) | |
# 边缘方向分析 | |
sobelx = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3) | |
sobely = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=3) | |
self.cache['sobelx'] = sobelx | |
self.cache['sobely'] = sobely | |
edge_magnitude = np.sqrt(sobelx**2 + sobely**2) | |
self.cache['edge_magnitude'] = edge_magnitude | |
features["edge_variance"] = float(np.var(edge_magnitude)) | |
# 边缘方向分布 | |
edge_direction = np.arctan2(sobely, sobelx) * 180 / np.pi | |
self.cache['edge_direction'] = edge_direction | |
# 只在显著边缘上计算以提高速度 | |
mask = edge_magnitude > 30 | |
if np.sum(mask) > 0: | |
edge_dir_hist, _ = np.histogram(edge_direction[mask], bins=18, range=(-180, 180)) | |
features["edge_direction_entropy"] = float(stats.entropy(edge_dir_hist + 1e-10)) | |
# 边缘方向一致性 - AI生成图像通常边缘方向过于一致 | |
edge_dir_normalized = edge_dir_hist / np.sum(edge_dir_hist) | |
features["edge_direction_consistency"] = float(np.max(edge_dir_normalized)) | |
def _extract_texture_features(self, img_cv, features, image_id): | |
"""Extract texture-related features""" | |
# 获取灰度图 | |
gray = self.cache.get('gray') | |
if gray is None: | |
gray = self.get_grayscale(image_id) | |
if gray is None: | |
if len(img_cv.shape) == 3: | |
gray = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY) | |
else: | |
gray = img_cv | |
self.cache['gray'] = gray | |
# 减少GLCM计算量,使用更少的角度和距离 | |
distances = [5] | |
angles = [0, np.pi/2] # 从4个角度减少到2个 | |
# 如果图像较大,对GLCM计算进行降采样 | |
h, w = gray.shape | |
if h > 512 or w > 512: | |
gray_small = cv2.resize(gray, (min(w, 512), min(h, 512))) | |
else: | |
gray_small = gray | |
# 转换为uint8并缩放到0-255用于GLCM | |
if gray_small.dtype != np.uint8: | |
gray_small = ((gray_small - gray_small.min()) / | |
(gray_small.max() - gray_small.min() + 1e-10) * 255).astype(np.uint8) | |
# 使用更少的灰度级以加速计算 | |
gray_small = (gray_small // 8) * 8 # 减少到32个灰度级 | |
# 计算GLCM | |
try: | |
glcm = graycomatrix(gray_small, distances=distances, angles=angles, | |
symmetric=True, normed=True) | |
# 计算GLCM属性 | |
features["texture_contrast"] = float(np.mean(graycoprops(glcm, 'contrast')[0])) | |
features["texture_homogeneity"] = float(np.mean(graycoprops(glcm, 'homogeneity')[0])) | |
features["texture_correlation"] = float(np.mean(graycoprops(glcm, 'correlation')[0])) | |
features["texture_energy"] = float(np.mean(graycoprops(glcm, 'energy')[0])) | |
except Exception as e: | |
print(f"GLCM计算错误: {e}") | |
# LBP用于微观纹理分析 - 对AI检测至关重要 | |
try: | |
# 使用更小的半径和更少的点以提高速度 | |
radius = 2 # 从3减少到2 | |
n_points = 8 # 从24减少到8 | |
# 如果需要,对LBP进行降采样 | |
if h > 512 or w > 512: | |
if 'gray_small' not in locals(): | |
gray_small = cv2.resize(gray, (min(w, 512), min(h, 512))) | |
else: | |
gray_small = gray | |
lbp = local_binary_pattern(gray_small, n_points, radius, method='uniform') | |
lbp_hist, _ = np.histogram(lbp, bins=n_points + 2, range=(0, n_points + 2)) | |
lbp_hist = lbp_hist.astype(float) / (sum(lbp_hist) + 1e-10) | |
features["lbp_entropy"] = float(stats.entropy(lbp_hist + 1e-10)) | |
# LBP一致性 - AI生成图像通常LBP模式过于一致 | |
features["lbp_uniformity"] = float(np.sum(lbp_hist**2)) | |
except Exception as e: | |
print(f"LBP计算错误: {e}") | |
def _extract_noise_features(self, img_cv, features): | |
"""Extract noise-related features""" | |
if len(img_cv.shape) == 3: | |
# 使用更小的核以加速模糊 | |
blurred = cv2.GaussianBlur(img_cv, (3, 3), 0) | |
noise = cv2.absdiff(img_cv, blurred) | |
features["noise_level"] = float(np.mean(noise)) | |
features["noise_std"] = float(np.std(noise)) | |
# 噪声频谱分析 - 只使用一个通道以提高速度 | |
noise_fft = np.fft.fft2(noise[:,:,0]) | |
noise_fft_shift = np.fft.fftshift(noise_fft) | |
noise_magnitude = np.abs(noise_fft_shift) | |
features["noise_spectrum_std"] = float(np.std(noise_magnitude)) | |
# 噪声空间一致性 - 使用更大的块以提高速度 | |
block_size = 64 # 从32增加到64 | |
h, w = noise.shape[:2] | |
# 使用步进操作 | |
noise_blocks = [] | |
for i in range(0, h-block_size, block_size): | |
for j in range(0, w-block_size, block_size): | |
block = noise[i:i+block_size, j:j+block_size] | |
noise_blocks.append(np.mean(block)) | |
if noise_blocks: | |
features["noise_spatial_std"] = float(np.std(noise_blocks)) | |
# 噪声颜色通道相关性 - 真实照片的噪声在通道间相关性较低 | |
if len(img_cv.shape) == 3: | |
noise_r = noise[:,:,0].flatten() | |
noise_g = noise[:,:,1].flatten() | |
noise_b = noise[:,:,2].flatten() | |
corr_rg = np.corrcoef(noise_r, noise_g)[0,1] | |
corr_rb = np.corrcoef(noise_r, noise_b)[0,1] | |
corr_gb = np.corrcoef(noise_g, noise_b)[0,1] | |
features["noise_channel_correlation"] = float((abs(corr_rg) + abs(corr_rb) + abs(corr_gb)) / 3) | |
def _extract_symmetry_features(self, img_cv, features): | |
"""Extract symmetry-related features""" | |
h, w = img_cv.shape[:2] | |
# 水平对称性 | |
if w % 2 == 0: | |
left_half = img_cv[:, :w//2] | |
right_half = cv2.flip(img_cv[:, w//2:], 1) | |
if left_half.shape == right_half.shape: | |
# 对大图像使用采样以加速计算 | |
if h > 512 or w//2 > 512: | |
step = max(1, min(h, w//2) // 512) | |
diff = cv2.absdiff(left_half[::step, ::step], right_half[::step, ::step]) | |
else: | |
diff = cv2.absdiff(left_half, right_half) | |
h_symmetry = 1 - float(np.mean(diff) / 255) | |
features["horizontal_symmetry"] = h_symmetry | |
# 垂直对称性 | |
if h % 2 == 0: | |
top_half = img_cv[:h//2, :] | |
bottom_half = cv2.flip(img_cv[h//2:, :], 0) | |
if top_half.shape == bottom_half.shape: | |
# 对大图像使用采样以加速计算 | |
if h//2 > 512 or w > 512: | |
step = max(1, min(h//2, w) // 512) | |
diff = cv2.absdiff(top_half[::step, ::step], bottom_half[::step, ::step]) | |
else: | |
diff = cv2.absdiff(top_half, bottom_half) | |
v_symmetry = 1 - float(np.mean(diff) / 255) | |
features["vertical_symmetry"] = v_symmetry | |
# 径向对称性 - 对于人脸等中心对象很有用 | |
if min(h, w) > 100: # 只对足够大的图像计算 | |
try: | |
center_y, center_x = h // 2, w // 2 | |
max_radius = min(center_x, center_y) - 10 | |
if max_radius > 20: # 确保有足够的半径 | |
# 创建径向对称性掩码 | |
y, x = np.ogrid[-center_y:h-center_y, -center_x:w-center_x] | |
mask = x*x + y*y <= max_radius*max_radius | |
# 计算对称性 | |
masked_img = img_cv.copy() | |
if len(masked_img.shape) == 3: | |
for c in range(masked_img.shape[2]): | |
masked_img[:,:,c][~mask] = 0 | |
else: | |
masked_img[~mask] = 0 | |
# 旋转180度比较 | |
rotated = cv2.rotate(masked_img, cv2.ROTATE_180) | |
diff = cv2.absdiff(masked_img, rotated) | |
features["radial_symmetry"] = 1 - float(np.mean(diff) / 255) | |
except: | |
pass | |
def _extract_frequency_features(self, img_cv, features, image_id): | |
"""Extract frequency domain features""" | |
# 获取灰度图 | |
gray = self.cache.get('gray') | |
if gray is None: | |
gray = self.get_grayscale(image_id) | |
if gray is None: | |
if len(img_cv.shape) == 3: | |
gray = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY) | |
else: | |
gray = img_cv | |
self.cache['gray'] = gray | |
# 如果图像较大,对FFT进行降采样 | |
h, w = gray.shape | |
if h > 512 or w > 512: | |
gray_small = cv2.resize(gray, (512, 512)) | |
else: | |
gray_small = gray | |
# FFT分析 | |
f_transform = np.fft.fft2(gray_small) | |
f_shift = np.fft.fftshift(f_transform) | |
magnitude = np.log(np.abs(f_shift) + 1) | |
# 计算高/低频率比 | |
h, w = magnitude.shape | |
center_h, center_w = h // 2, w // 2 | |
# 低频区域(中心) | |
low_freq_region = magnitude[center_h-h//8:center_h+h//8, center_w-w//8:center_w+w//8] | |
low_freq_mean = np.mean(low_freq_region) | |
# 高频区域 | |
high_freq_mean = np.mean(magnitude) - low_freq_mean | |
features["freq_ratio"] = float(high_freq_mean / max(low_freq_mean, 0.001)) | |
features["freq_std"] = float(np.std(magnitude)) | |
# 频率各向异性 - 对AI检测至关重要 | |
# 使用更少的角度以提高速度 | |
freq_blocks = [] | |
for angle in range(0, 180, 45): # 从20°步长减少到45°步长 | |
mask = np.zeros_like(magnitude) | |
cv2.ellipse(mask, (center_w, center_h), (w//2, h//2), angle, -10, 10, 1, -1) | |
freq_blocks.append(np.mean(magnitude * mask)) | |
features["freq_anisotropy"] = float(np.std(freq_blocks)) | |
# 频率峰值分析 - AI生成图像通常有特定的频率峰值模式 | |
try: | |
# 计算径向平均功率谱 | |
y, x = np.ogrid[-center_h:h-center_h, -center_w:w-center_w] | |
r = np.sqrt(x*x + y*y) | |
r = r.astype(np.int32) | |
# 创建径向平均 | |
radial_mean = np.zeros(min(center_h, center_w)) | |
for i in range(1, len(radial_mean)): | |
mask = (r == i) | |
if np.sum(mask) > 0: | |
radial_mean[i] = np.mean(magnitude[mask]) | |
# 计算峰值特征 | |
peaks, _ = stats.find_peaks(radial_mean) | |
if len(peaks) > 0: | |
features["freq_peak_count"] = len(peaks) | |
features["freq_peak_prominence"] = float(np.mean(radial_mean[peaks])) | |
except: | |
pass | |
def _extract_advanced_features(self, img_cv, features, image_id): | |
"""Extract advanced features for AI detection""" | |
# 获取灰度图 | |
gray = self.cache.get('gray') | |
if gray is None: | |
gray = self.get_grayscale(image_id) | |
if gray is None: | |
if len(img_cv.shape) == 3: | |
gray = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY) | |
else: | |
gray = img_cv | |
self.cache['gray'] = gray | |
# 小波变换特征 | |
try: | |
# 如果图像太大,先降采样 | |
if gray.shape[0] > 512 or gray.shape[1] > 512: | |
gray_small = cv2.resize(gray, (512, 512)) | |
else: | |
gray_small = gray | |
# 执行小波变换 | |
coeffs = pywt.dwt2(gray_small, 'haar') | |
cA, (cH, cV, cD) = coeffs | |
# 计算各子带的统计特征 | |
features["wavelet_h_std"] = float(np.std(cH)) | |
features["wavelet_v_std"] = float(np.std(cV)) | |
features["wavelet_d_std"] = float(np.std(cD)) | |
# 计算小波系数的熵 | |
cH_hist, _ = np.histogram(cH.flatten(), bins=50) | |
cV_hist, _ = np.histogram(cV.flatten(), bins=50) | |
cD_hist, _ = np.histogram(cD.flatten(), bins=50) | |
features["wavelet_h_entropy"] = float(stats.entropy(cH_hist + 1e-10)) | |
features["wavelet_v_entropy"] = float(stats.entropy(cV_hist + 1e-10)) | |
features["wavelet_d_entropy"] = float(stats.entropy(cD_hist + 1e-10)) | |
except: | |
pass | |
# DCT变换特征 | |
try: | |
# 执行DCT变换 | |
dct = cv2.dct(np.float32(gray_small)) | |
# 计算DCT系数的统计特征 | |
dct_std = np.std(dct) | |
features["dct_std"] = float(dct_std) | |
# 计算DCT系数的熵 | |
dct_hist, _ = np.histogram(dct.flatten(), bins=50) | |
features["dct_entropy"] = float(stats.entropy(dct_hist + 1e-10)) | |
except: | |
pass | |
# 图像质量评估 | |
try: | |
# 计算梯度幅度图像 | |
sobelx = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3) | |
sobely = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=3) | |
gradient_magnitude = np.sqrt(sobelx**2 + sobely**2) | |
# 计算自然度指标 | |
naturalness = np.mean(gradient_magnitude) / np.std(gradient_magnitude) | |
features["naturalness_index"] = float(naturalness) | |
except: | |
pass | |
############################################# | |
# 特征分析与决策逻辑部分 - 重新调整 | |
############################################# | |
# 基于图像类型的特征阈值 - 更严格的阈值 | |
OPTIMIZED_THRESHOLDS = { | |
"lbp_entropy": { | |
"default": 1.8, # 更严格的阈值 | |
"portrait": 1.7, | |
"landscape": 1.9, | |
}, | |
"freq_anisotropy": { | |
"default": 0.005, # 更严格的阈值 | |
"portrait": 0.004, | |
"landscape": 0.006, | |
}, | |
"texture_correlation": { | |
"default": 0.97, # 更严格的阈值 | |
"portrait": 0.96, | |
"landscape": 0.97, | |
}, | |
"horizontal_symmetry": { | |
"default": 0.9, # 更严格的阈值 | |
"portrait": 0.88, | |
"landscape": 0.9, | |
}, | |
"vertical_symmetry": { | |
"default": 0.9, # 更严格的阈值 | |
"portrait": 0.88, | |
"landscape": 0.9, | |
}, | |
"noise_spatial_std": { | |
"default": 0.2, # 更严格的阈值 | |
"portrait": 0.2, | |
"landscape": 0.25, | |
}, | |
"freq_ratio": { | |
"default": 0.0, # 更严格的阈值 | |
"portrait": -0.05, | |
"landscape": 0.0, | |
}, | |
"noise_spectrum_std": { | |
"default": 600, # 更严格的阈值 | |
"portrait": 600, | |
"landscape": 700, | |
}, | |
"color_entropy": { | |
"default": 3.0, # 更严格的阈值 | |
"portrait": 3.0, | |
"landscape": 3.2, | |
}, | |
"lbp_uniformity": { | |
"default": 0.25, # 更严格的阈值 | |
"portrait": 0.25, | |
"landscape": 0.2, | |
}, | |
"noise_channel_correlation": { | |
"default": 0.97, # 更严格的阈值 | |
"portrait": 0.97, | |
"landscape": 0.95, | |
}, | |
"wavelet_h_entropy": { | |
"default": 1.5, # 更严格的阈值 | |
"portrait": 1.4, | |
"landscape": 1.6, | |
}, | |
"dct_entropy": { | |
"default": 0.005, # 更严格的阈值 | |
"portrait": 0.004, | |
"landscape": 0.006, | |
}, | |
"naturalness_index": { | |
"default": 0.6, # 更严格的阈值 | |
"portrait": 0.55, | |
"landscape": 0.65, | |
} | |
} | |
# 特征重要性权重 - 调整权重 | |
FEATURE_IMPORTANCE = { | |
"lbp_entropy": 0.12, | |
"freq_anisotropy": 0.12, | |
"texture_correlation": 0.06, | |
"horizontal_symmetry": 0.02, | |
"vertical_symmetry": 0.02, | |
"noise_spatial_std": 0.06, | |
"freq_ratio": 0.04, | |
"noise_spectrum_std": 0.04, | |
"color_entropy": 0.06, | |
"lbp_uniformity": 0.04, | |
"noise_channel_correlation": 0.12, # 增加权重 | |
"wavelet_h_entropy": 0.12, # 增加权重 | |
"dct_entropy": 0.12, # 增加权重 | |
"naturalness_index": 0.06 | |
} | |
def get_threshold(feature_name, image_type="default"): | |
"""Get the appropriate threshold for a feature based on image type""" | |
if feature_name not in OPTIMIZED_THRESHOLDS: | |
return None | |
return OPTIMIZED_THRESHOLDS[feature_name].get(image_type, | |
OPTIMIZED_THRESHOLDS[feature_name]["default"]) | |
def check_ai_specific_features(image_features): | |
"""Enhanced check for AI-generated image features with stricter criteria""" | |
ai_score = 0 | |
ai_signs = [] | |
# 获取图像类型 | |
image_type = image_features.get("image_type", "default") | |
# 处理每个特征 | |
for feature_name, importance in FEATURE_IMPORTANCE.items(): | |
if feature_name not in image_features: | |
continue | |
value = image_features[feature_name] | |
threshold = get_threshold(feature_name, image_type) | |
if threshold is None: | |
continue | |
# 不同特征有不同的比较逻辑 | |
feature_score = 0 | |
# 低值表示AI生成的特征 | |
if feature_name in ["lbp_entropy", "freq_anisotropy", "noise_spatial_std", | |
"freq_ratio", "noise_spectrum_std", "color_entropy", | |
"wavelet_h_entropy", "dct_entropy", "naturalness_index"]: | |
if value < threshold: | |
# 更严格的评分,只有显著低于阈值才给高分 | |
feature_score = min(1.0, (threshold - value) / threshold * 1.5) | |
if feature_score > 0.7: # 提高显著性阈值 | |
ai_signs.append(f"{feature_name} 异常低 ({value:.2f})") | |
# 高值表示AI生成的特征 | |
elif feature_name in ["texture_correlation", "horizontal_symmetry", "vertical_symmetry", | |
"lbp_uniformity", "noise_channel_correlation"]: | |
if value > threshold: | |
# 更严格的评分,只有显著高于阈值才给高分 | |
feature_score = min(1.0, (value - threshold) / (1 - threshold) * 1.5) | |
if feature_score > 0.7: # 提高显著性阈值 | |
ai_signs.append(f"{feature_name} 异常高 ({value:.2f})") | |
# 累加加权分数 | |
ai_score += feature_score * importance | |
# 特征组合分析 - 检查特定组合模式 | |
if ("dct_entropy" in image_features and image_features["dct_entropy"] < 0.005 and | |
"wavelet_h_entropy" in image_features and image_features["wavelet_h_entropy"] < 1.5 and | |
"noise_channel_correlation" in image_features and image_features["noise_channel_correlation"] > 0.97): | |
# 这种组合强烈表明是AI生成 | |
ai_score = max(ai_score, 0.9) | |
if "特征组合模式" not in ai_signs: | |
ai_signs.append("特征组合模式: 多个关键特征同时异常") | |
# 计算检测到多少关键特征 | |
critical_count = len(ai_signs) | |
if critical_count >= 5: | |
ai_score = max(ai_score, 0.8) # 更保守,从0.9改为0.8 | |
elif critical_count >= 3: | |
ai_score = max(ai_score, 0.6) # 更保守,从0.7改为0.6 | |
return min(ai_score, 1.0), ai_signs | |
def detect_beauty_filter_signs(image_features): | |
"""Detect beauty filter traces""" | |
beauty_score = 0 | |
beauty_signs = [] | |
# 获取图像类型 | |
image_type = image_features.get("image_type", "default") | |
# 只检查最重要的美颜滤镜指标 | |
if "face_skin_std" in image_features: | |
threshold = 15 if image_type == "portrait" else 20 | |
if image_features["face_skin_std"] < threshold: | |
beauty_score += 0.3 | |
beauty_signs.append("皮肤质感过于均匀") | |
if "edge_density" in image_features: | |
threshold = 0.03 if image_type == "portrait" else 0.04 | |
if image_features["edge_density"] < threshold: | |
beauty_score += 0.2 | |
beauty_signs.append("边缘过于平滑") | |
if "noise_level" in image_features: | |
threshold = 1.0 if image_type == "portrait" else 1.5 | |
if image_features["noise_level"] < threshold: | |
beauty_score += 0.2 | |
beauty_signs.append("噪点异常少") | |
if "texture_homogeneity" in image_features: | |
threshold = 0.5 if image_type == "portrait" else 0.4 | |
if image_features["texture_homogeneity"] > threshold: | |
beauty_score += 0.2 | |
beauty_signs.append("纹理过于均匀") | |
return min(beauty_score, 1.0), beauty_signs | |
def detect_photoshop_signs(image_features): | |
"""Detect Photoshop traces""" | |
ps_score = 0 | |
ps_signs = [] | |
# 获取图像类型 | |
image_type = image_features.get("image_type", "default") | |
# 只检查最重要的PS指标 | |
if "texture_homogeneity" in image_features: | |
threshold = 0.4 if image_type == "portrait" else 0.35 | |
if image_features["texture_homogeneity"] > threshold: | |
ps_score += 0.2 | |
ps_signs.append("皮肤质感过于均匀") | |
if "edge_density" in image_features: | |
threshold = 0.01 if image_type == "portrait" else 0.015 | |
if image_features["edge_density"] < threshold: | |
ps_score += 0.2 | |
ps_signs.append("边缘过于平滑") | |
if "color_std" in image_features: | |
threshold = 50 if image_type == "portrait" else 40 | |
if image_features["color_std"] > threshold: | |
ps_score += 0.2 | |
ps_signs.append("颜色分布极不自然") | |
if "noise_spatial_std" in image_features: | |
threshold = 1.0 if image_type == "portrait" else 1.2 | |
if image_features["noise_spatial_std"] > threshold: | |
ps_score += 0.2 | |
ps_signs.append("噪点分布不均匀") | |
return min(ps_score, 1.0), ps_signs | |
def calibrate_model_output(probability): | |
"""校准模型输出,解决模型偏向真人的问题""" | |
# 如果模型输出总是在0.3-0.4之间,我们可以拉伸这个范围 | |
if 0.25 <= probability <= 0.4: | |
# 将0.25-0.4范围拉伸到0.1-0.9 | |
return 0.1 + (probability - 0.25) * (0.9 - 0.1) / (0.4 - 0.25) | |
return probability | |
def calibrate_feature_score(score, signs): | |
"""校准特征分析分数,解决特征分析偏向AI的问题""" | |
# 如果特征分数高但检测到的特征少,降低分数 | |
if score > 0.7 and len(signs) < 3: | |
return 0.5 + (score - 0.5) * 0.5 # 降低分数 | |
# 如果特征分数高且检测到多个特征,保持高分 | |
elif score > 0.7 and len(signs) >= 4: | |
return score | |
# 中等分数情况 | |
elif 0.4 < score <= 0.7: | |
return 0.4 + (score - 0.4) * 0.7 # 略微降低 | |
return score | |
def calculate_model_consistency(model_results): | |
"""Calculate the consistency between model predictions""" | |
if not model_results: | |
return 0.0 | |
# 提取AI概率 | |
probabilities = [result.get("ai_probability", 0.5) for result in model_results.values() | |
if "error" not in result] | |
if not probabilities: | |
return 0.0 | |
# 计算方差作为一致性度量 | |
variance = np.var(probabilities) | |
# 方差越小,一致性越高 | |
consistency = max(0.0, 1.0 - variance * 5) # 缩放以获得合理的一致性分数 | |
return consistency | |
def get_detailed_analysis(ai_probability, ps_score, beauty_score, ps_signs, ai_signs, beauty_signs, | |
valid_models_count, ai_feature_score, model_consistency): | |
"""Provide detailed analysis with improved confidence assessment""" | |
# 根据模型和特征分析的一致性调整置信度 | |
if abs(ai_probability - ai_feature_score) > 0.2: | |
confidence_prefix = "低置信度:" | |
explanation = "(模型预测和特征分析结果存在较大差异)" | |
elif model_consistency > 0.8 and valid_models_count >= 2: | |
confidence_prefix = "高置信度:" | |
explanation = "" | |
elif model_consistency > 0.6 and valid_models_count >= 2: | |
confidence_prefix = "中等置信度:" | |
explanation = "" | |
else: | |
confidence_prefix = "低置信度:" | |
explanation = "" | |
# 计算编辑分数(在所有路径中都需要) | |
combined_edit_score = max(ps_score, beauty_score) | |
# 分类逻辑 - 从中立起点开始 | |
if ai_probability > 0.6: | |
category = confidence_prefix + "AI生成图像" + explanation | |
description = "图像很可能是由AI完全生成,几乎没有真人照片的特征。" | |
main_category = "AI生成" | |
elif ai_probability < 0.4: | |
# 第二级分类:素人 vs 修图 | |
if combined_edit_score > 0.5: | |
category = confidence_prefix + "真人照片,修图痕迹明显" + explanation | |
description = "图像基本是真人照片,但经过了明显的后期处理或美颜,修饰痕迹明显。" | |
main_category = "真人照片-修图明显" | |
else: | |
category = confidence_prefix + "真实素人照片" + explanation | |
description = "图像很可能是未经大量处理的真人照片,保留了自然的细节和特征。" | |
main_category = "真人照片-素人" | |
else: | |
# 处理边界情况 - 添加"无法确定"类别 | |
category = confidence_prefix + "无法确定" + explanation | |
description = "系统无法确定该图像是AI生成还是真实照片。模型预测和特征分析结果不一致,需要人工判断。" | |
main_category = "无法确定" | |
# 处理边界情况 - AI生成与高度修图 | |
if 0.45 < ai_probability < 0.55 and combined_edit_score > 0.7: | |
category = confidence_prefix + "真人照片,修图痕迹明显(也可能是AI生成)" + explanation | |
description = "图像可能是真人照片经过大量后期处理,也可能是AI生成图像。由于现代AI技术与高度修图效果相似,难以完全区分。" | |
main_category = "真人照片-修图明显" | |
# 格式化详情 | |
ps_details = "检测到的修图痕迹:" + "、".join(ps_signs) if ps_signs else "未检测到明显的修图痕迹。" | |
ai_details = "检测到的AI特征:" + "、".join(ai_signs) if ai_signs else "未检测到明显的AI生成特征。" | |
beauty_details = "检测到的美颜特征:" + "、".join(beauty_signs) if beauty_signs else "未检测到明显的美颜特征。" | |
return category, description, ps_details, ai_details, beauty_details, main_category | |
def detect_ai_image(image): | |
"""Enhanced main detection function with improved decision logic""" | |
if image is None: | |
return {"error": "未提供图像"} | |
start_time = time.time() | |
image_id = str(uuid.uuid4()) # 为图像生成唯一ID,用于反馈收集 | |
# 初始化管理器 | |
model_manager = EnhancedModelManager() | |
feature_extractor = EnhancedFeatureExtractor() | |
# 第一阶段:快速特征分析 | |
# 确定是否需要降采样 | |
downscale_factor = 1.0 | |
if image.width * image.height > 1024 * 1024: # 对于大于1MP的图像 | |
downscale_factor = min(1.0, 1024 * 1024 / (image.width * image.height)) | |
# 提取特征 | |
feature_extractor.clear_cache() # 清除之前运行的缓存 | |
image_features = feature_extractor.analyze_image_features(image, downscale_factor) | |
# 快速特征分析 | |
ai_feature_score, ai_signs = check_ai_specific_features(image_features) | |
# 第二阶段:模型分析 | |
results = {} | |
valid_models = 0 | |
weighted_ai_probability = 0 | |
# 使用每个模型处理 | |
for key, model_info in model_manager.models.items(): | |
model_result = model_manager.get_model_prediction(key, image) | |
if model_result and "error" not in model_result: | |
# 校准模型输出 | |
model_result["ai_probability"] = calibrate_model_output(model_result["ai_probability"]) | |
results[key] = model_result | |
weighted_ai_probability += model_result["ai_probability"] * model_info["weight"] | |
valid_models += 1 | |
else: | |
results[key] = model_result or {"model_name": model_info["name"], "error": "处理失败"} | |
# 计算最终加权概率 | |
if valid_models > 0: | |
final_ai_probability = weighted_ai_probability / sum( | |
m["weight"] for k, m in model_manager.models.items() | |
if k in model_manager.loaded_models | |
) | |
else: | |
return {"error": "所有模型加载失败"} | |
# 计算模型一致性 | |
model_consistency = calculate_model_consistency(results) | |
# 分析PS和美颜痕迹 | |
ps_score, ps_signs = detect_photoshop_signs(image_features) | |
beauty_score, beauty_signs = detect_beauty_filter_signs(image_features) | |
# 校准特征分析分数 | |
calibrated_feature_score = calibrate_feature_score(ai_feature_score, ai_signs) | |
# 从中立起点开始决策 | |
adjusted_probability = 0.5 | |
# 根据校准后的模型和特征分析调整概率 | |
if final_ai_probability > 0.7 or calibrated_feature_score > 0.8: | |
# 强AI证据 | |
adjusted_probability = 0.5 + (final_ai_probability * 0.25 + calibrated_feature_score * 0.25) | |
elif final_ai_probability < 0.3 and calibrated_feature_score < 0.3: | |
# 强真人证据 | |
adjusted_probability = 0.5 - ((0.3 - final_ai_probability) * 0.25 + (0.3 - calibrated_feature_score) * 0.25) | |
else: | |
# 证据不足,保持在中间区域 | |
adjusted_probability = 0.5 + (final_ai_probability - 0.5) * 0.2 + (calibrated_feature_score - 0.5) * 0.2 | |
# 特征组合分析 - 强有力的组合证据 | |
if ("dct_entropy" in image_features and image_features["dct_entropy"] < 0.005 and | |
"wavelet_h_entropy" in image_features and image_features["wavelet_h_entropy"] < 1.5 and | |
"noise_channel_correlation" in image_features and image_features["noise_channel_correlation"] > 0.97): | |
# 这种组合强烈表明是AI生成 | |
adjusted_probability = max(adjusted_probability, 0.7) | |
# 确保概率在有效范围内 | |
adjusted_probability = min(1.0, max(0.0, adjusted_probability)) | |
# 获取详细分析 | |
category, description, ps_details, ai_details, beauty_details, main_category = get_detailed_analysis( | |
adjusted_probability, ps_score, beauty_score, ps_signs, ai_signs, beauty_signs, | |
valid_models, calibrated_feature_score, model_consistency | |
) | |
# 构建最终结果 | |
processing_time = time.time() - start_time | |
final_result = { | |
"image_id": image_id, | |
"ai_probability": adjusted_probability, | |
"original_ai_probability": final_ai_probability, | |
"ps_score": ps_score, | |
"beauty_score": beauty_score, | |
"ai_feature_score": calibrated_feature_score, | |
"raw_feature_score": ai_feature_score, | |
"model_consistency": model_consistency, | |
"category": category, | |
"main_category": main_category, | |
"description": description, | |
"ps_details": ps_details, | |
"ai_details": ai_details, | |
"beauty_details": beauty_details, | |
"processing_time": f"{processing_time:.2f} seconds", | |
"individual_model_results": results, | |
# 只包含最重要的特征以减少响应大小 | |
"key_features": {k: image_features[k] for k in FEATURE_IMPORTANCE if k in image_features} | |
} | |
# 保存结果用于后续分析 | |
try: | |
with open(f"{FEEDBACK_DIR}/{image_id}.json", "w") as f: | |
json.dump(final_result, f) | |
except: | |
pass | |
# 返回两个值:JSON结果和标签数据 | |
label_data = {main_category: 1.0} | |
return final_result, label_data | |
def save_user_feedback(image_id, user_feedback): | |
"""Save user feedback for continuous learning""" | |
if not image_id: | |
return {"status": "error", "message": "未提供图像ID"} | |
try: | |
# 读取原始结果 | |
result_path = f"{FEEDBACK_DIR}/{image_id}.json" | |
if not os.path.exists(result_path): | |
return {"status": "error", "message": "找不到对应的图像分析结果"} | |
with open(result_path, "r") as f: | |
original_result = json.load(f) | |
# 添加用户反馈 | |
feedback_data = { | |
"image_id": image_id, | |
"original_result": original_result, | |
"user_feedback": user_feedback, | |
"timestamp": datetime.now().isoformat() | |
} | |
# 保存反馈 | |
feedback_path = f"{FEEDBACK_DIR}/{image_id}_feedback.json" | |
with open(feedback_path, "w") as f: | |
json.dump(feedback_data, f) | |
return {"status": "success", "message": "反馈已保存,感谢您的贡献!"} | |
except Exception as e: | |
return {"status": "error", "message": f"保存反馈时出错: {str(e)}"} | |
############################################# | |
# Gradio界面部分 | |
############################################# | |
def process_image_and_show_results(image): | |
"""Process image and format results for Gradio interface""" | |
if image is None: | |
return {"error": "请上传图像"}, None, "未检测" | |
try: | |
result, label = detect_ai_image(image) | |
return result, result["image_id"], result["main_category"] | |
except Exception as e: | |
return {"error": f"处理图像时出错: {str(e)}"}, None, "错误" | |
def submit_feedback(image_id, correct_classification, comments): | |
"""Submit user feedback""" | |
if not image_id: | |
return "请先上传图像进行分析" | |
feedback = { | |
"correct_classification": correct_classification, | |
"comments": comments | |
} | |
result = save_user_feedback(image_id, feedback) | |
return result["message"] | |
# 创建Gradio界面 | |
with gr.Blocks(title="增强型AI图像检测系统") as iface: | |
gr.Markdown("# 增强型AI图像检测系统") | |
gr.Markdown("上传图像,系统将分析该图像是AI生成还是真实照片") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
input_image = gr.Image(type="pil", label="上传图像") | |
analyze_btn = gr.Button("分析图像", variant="primary") | |
with gr.Column(scale=2): | |
result_json = gr.JSON(label="详细分析结果") | |
image_id_output = gr.Textbox(label="图像ID", visible=False) | |
result_label = gr.Label(label="主要分类") | |
gr.Markdown("## 用户反馈") | |
gr.Markdown("您认为上述分类结果是否正确?您的反馈将帮助我们改进系统。") | |
with gr.Row(): | |
correct_classification = gr.Radio( | |
["正确", "错误 - 这是AI生成图像", "错误 - 这是真实照片", "不确定"], | |
label="分类结果是否正确" | |
) | |
comments = gr.Textbox(label="其他评论(可选)") | |
feedback_btn = gr.Button("提交反馈") | |
feedback_result = gr.Textbox(label="反馈结果") | |
# 设置事件 | |
analyze_btn.click( | |
process_image_and_show_results, | |
inputs=[input_image], | |
outputs=[result_json, image_id_output, result_label] | |
) | |
feedback_btn.click( | |
submit_feedback, | |
inputs=[image_id_output, correct_classification, comments], | |
outputs=[feedback_result] | |
) | |
# 启动应用 | |
if __name__ == "__main__": | |
iface.launch(share=True) | |