aaappp7878's picture
Update app.py
b7211f1 verified
import gradio as gr
import torch
import numpy as np
import cv2
from PIL import Image
import time
import os
import json
from datetime import datetime
from scipy import stats
from skimage.feature import graycomatrix, graycoprops, local_binary_pattern
from transformers import AutoImageProcessor, AutoModelForImageClassification
from functools import lru_cache
import pywt # 用于小波变换
import uuid # 用于生成唯一ID
# 设置缓存目录
os.environ["TRANSFORMERS_CACHE"] = "/tmp/transformers_cache"
os.environ["HF_HOME"] = "/tmp/hf_home"
os.makedirs("/tmp/transformers_cache", exist_ok=True)
os.makedirs("/tmp/hf_home", exist_ok=True)
# 创建反馈存储目录
FEEDBACK_DIR = "/tmp/feedback"
os.makedirs(FEEDBACK_DIR, exist_ok=True)
#############################################
# 模型管理部分 - 增强版
#############################################
class EnhancedModelManager:
"""Enhanced model manager with support for specialized AI detection models"""
def __init__(self):
self.models = {
"ai_detector": {
"name": "umm-maybe/AI-image-detector", # 将来替换为CNNDetection
"processor": None,
"model": None,
"weight": 0.6 # 增加专业AI检测模型权重
},
"general_classifier1": {
"name": "microsoft/resnet-50", # 将来替换为DFDC模型
"processor": None,
"model": None,
"weight": 0.2 # 降低通用模型权重
},
"general_classifier2": {
"name": "google/vit-base-patch16-224", # 保留作为辅助模型
"processor": None,
"model": None,
"weight": 0.2 # 降低通用模型权重
}
}
self.loaded_models = set()
def load_model(self, key):
"""Lazy load a specific model only when needed"""
if key not in self.models:
return False
if key in self.loaded_models:
return True
try:
model_info = self.models[key]
model_info["processor"] = AutoImageProcessor.from_pretrained(
model_info["name"],
cache_dir="/tmp/transformers_cache"
)
# 加载模型
model_info["model"] = AutoModelForImageClassification.from_pretrained(
model_info["name"],
cache_dir="/tmp/transformers_cache"
)
# 量化模型以减少内存使用并加速推理
if torch.cuda.is_available():
model_info["model"].to('cuda')
else:
# 尝试量化,如果失败则使用原始模型
try:
model_info["model"] = torch.quantization.quantize_dynamic(
model_info["model"], {torch.nn.Linear}, dtype=torch.qint8
)
except Exception as e:
print(f"量化失败,使用原始模型: {str(e)}")
self.loaded_models.add(key)
print(f"成功加载模型: {model_info['name']}")
return True
except Exception as e:
print(f"加载模型失败 {self.models[key]['name']}: {str(e)}")
return False
def get_model_prediction(self, key, image):
"""Get prediction from a specific model"""
if not self.load_model(key):
return None
model_info = self.models[key]
try:
# 处理图像
inputs = model_info["processor"](images=image, return_tensors="pt")
# 如果有GPU则使用
if torch.cuda.is_available():
inputs = {k: v.to('cuda') for k, v in inputs.items()}
# 推理
with torch.no_grad():
outputs = model_info["model"](**inputs)
# 获取概率
probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1)
# 处理输出
ai_probability = self.process_model_output(model_info, outputs, probabilities)
# 返回结果
predicted_class_idx = outputs.logits.argmax(-1).item()
return {
"model_name": model_info["name"],
"ai_probability": ai_probability,
"predicted_class": model_info["model"].config.id2label[predicted_class_idx],
"confidence": float(probabilities[0][predicted_class_idx].item()),
"raw_probability": ai_probability # 保存原始概率用于校准
}
except Exception as e:
return {
"model_name": model_info["name"],
"error": str(e)
}
def process_model_output(self, model_info, outputs, probabilities):
"""Process different model outputs, return AI generation probability"""
model_name = model_info["name"].lower()
# 处理AI-image-detector模型
if "ai-image-detector" in model_name:
ai_label_idx = None
human_label_idx = None
for idx, label in model_info["model"].config.id2label.items():
label_lower = label.lower()
if "ai" in label_lower or "generated" in label_lower or "fake" in label_lower:
ai_label_idx = idx
if "human" in label_lower or "real" in label_lower:
human_label_idx = idx
if human_label_idx is not None:
ai_probability = 1 - float(probabilities[0][human_label_idx].item())
elif ai_label_idx is not None:
ai_probability = float(probabilities[0][ai_label_idx].item())
else:
ai_probability = 0.5
return ai_probability
# 处理通用图像分类模型
predicted_class_idx = outputs.logits.argmax(-1).item()
predicted_class = model_info["model"].config.id2label[predicted_class_idx].lower()
# 检查AI相关关键词
ai_keywords = ["artificial", "generated", "synthetic", "fake", "computer", "digital", "cgi", "rendered"]
for keyword in ai_keywords:
if keyword in predicted_class:
return float(probabilities[0][predicted_class_idx].item())
# 默认处理
if "ai" in predicted_class or "generated" in predicted_class or "fake" in predicted_class:
return float(probabilities[0][predicted_class_idx].item())
else:
return 0.3 # 降低默认AI概率,更保守的判断
#############################################
# 特征提取部分 - 增强版
#############################################
class EnhancedFeatureExtractor:
"""Enhanced image feature extraction with advanced features"""
def __init__(self):
# 中间结果缓存
self.cache = {}
def clear_cache(self):
"""Clear the cache between images"""
self.cache = {}
def detect_image_type(self, image):
"""Detect the type of image (portrait, landscape, etc.)"""
img_array = np.array(image)
# 检测人脸
try:
face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
if len(img_array.shape) == 3:
gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY)
else:
gray = img_array
faces = face_cascade.detectMultiScale(gray, 1.1, 4)
if len(faces) > 0:
return "portrait"
except:
pass
# 检查是否为风景图 (简单启发式方法)
if image.width > image.height * 1.5:
return "landscape"
# 默认类型
return "general"
@lru_cache(maxsize=8)
def get_grayscale(self, image_id):
"""Get grayscale version of image with caching"""
img_cv = self.cache.get('img_cv')
if img_cv is None:
return None
if len(img_cv.shape) == 3:
gray = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY)
else:
gray = img_cv
return gray
def analyze_image_features(self, image, downscale_factor=1.0):
"""Extract image features with optimizations"""
# 转换为OpenCV格式
img_array = np.array(image)
if len(img_array.shape) == 3 and img_array.shape[2] == 3:
img_cv = cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR)
else:
img_cv = img_array
# 存入缓存
self.cache['img_cv'] = img_cv
# 如果需要,对大图像进行降采样处理
if downscale_factor < 1.0:
h, w = img_cv.shape[:2]
new_h, new_w = int(h * downscale_factor), int(w * downscale_factor)
img_cv = cv2.resize(img_cv, (new_w, new_h))
self.cache['img_cv'] = img_cv
# 为图像创建唯一ID (用于lru_cache)
image_id = id(image)
self.cache['image_id'] = image_id
# 检测图像类型
image_type = self.detect_image_type(image)
self.cache['image_type'] = image_type
features = {}
features["image_type"] = image_type
# 基本特征
features["width"] = image.width
features["height"] = image.height
features["aspect_ratio"] = image.width / max(1, image.height)
# 并行提取不同组的特征
self._extract_color_features(img_array, features)
self._extract_edge_features(img_cv, features, image_id)
self._extract_texture_features(img_cv, features, image_id)
self._extract_noise_features(img_cv, features)
self._extract_symmetry_features(img_cv, features)
self._extract_frequency_features(img_cv, features, image_id)
self._extract_advanced_features(img_cv, features, image_id)
return features
def _extract_color_features(self, img_array, features):
"""Extract color-related features"""
if len(img_array.shape) == 3:
# 使用向量化操作提高速度
features["avg_red"] = float(np.mean(img_array[:,:,0]))
features["avg_green"] = float(np.mean(img_array[:,:,1]))
features["avg_blue"] = float(np.mean(img_array[:,:,2]))
# 颜色标准差
features["color_std"] = float(np.std([
features["avg_red"],
features["avg_green"],
features["avg_blue"]
]))
# 局部颜色变化 - 使用步进操作提高速度
block_size = 10
stride = 10
h, w = img_array.shape[:2]
# 使用数组切片加速处理
blocks = []
for i in range(0, h-block_size, stride):
for j in range(0, w-block_size, stride):
blocks.append(img_array[i:i+block_size, j:j+block_size])
if blocks:
# 转换为numpy数组进行向量化操作
blocks_array = np.array(blocks)
std_values = np.std(blocks_array.reshape(len(blocks), -1), axis=1)
features["local_color_variation"] = float(np.mean(std_values))
# 颜色直方图和熵
r_hist, _ = np.histogram(img_array[:,:,0].flatten(), bins=64, range=(0, 256))
g_hist, _ = np.histogram(img_array[:,:,1].flatten(), bins=64, range=(0, 256))
b_hist, _ = np.histogram(img_array[:,:,2].flatten(), bins=64, range=(0, 256))
# 计算熵
r_entropy = stats.entropy(r_hist + 1e-10)
g_entropy = stats.entropy(g_hist + 1e-10)
b_entropy = stats.entropy(b_hist + 1e-10)
features["color_entropy"] = float((r_entropy + g_entropy + b_entropy) / 3)
# 颜色一致性 - AI生成图像通常颜色过于一致
color_blocks = []
for i in range(0, h-block_size, stride):
for j in range(0, w-block_size, stride):
block = img_array[i:i+block_size, j:j+block_size]
avg_color = np.mean(block, axis=(0,1))
color_blocks.append(avg_color)
if color_blocks:
color_blocks = np.array(color_blocks)
features["color_consistency"] = float(1.0 - np.std(color_blocks) / 128.0)
def _extract_edge_features(self, img_cv, features, image_id):
"""Extract edge-related features"""
# 获取灰度图
gray = self.get_grayscale(image_id)
if gray is None:
if len(img_cv.shape) == 3:
gray = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY)
else:
gray = img_cv
self.cache['gray'] = gray
# 边缘检测
edges = cv2.Canny(gray, 100, 200)
self.cache['edges'] = edges
features["edge_density"] = float(np.sum(edges > 0) / (img_cv.shape[0] * img_cv.shape[1]))
# 边缘方向分析
sobelx = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3)
sobely = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=3)
self.cache['sobelx'] = sobelx
self.cache['sobely'] = sobely
edge_magnitude = np.sqrt(sobelx**2 + sobely**2)
self.cache['edge_magnitude'] = edge_magnitude
features["edge_variance"] = float(np.var(edge_magnitude))
# 边缘方向分布
edge_direction = np.arctan2(sobely, sobelx) * 180 / np.pi
self.cache['edge_direction'] = edge_direction
# 只在显著边缘上计算以提高速度
mask = edge_magnitude > 30
if np.sum(mask) > 0:
edge_dir_hist, _ = np.histogram(edge_direction[mask], bins=18, range=(-180, 180))
features["edge_direction_entropy"] = float(stats.entropy(edge_dir_hist + 1e-10))
# 边缘方向一致性 - AI生成图像通常边缘方向过于一致
edge_dir_normalized = edge_dir_hist / np.sum(edge_dir_hist)
features["edge_direction_consistency"] = float(np.max(edge_dir_normalized))
def _extract_texture_features(self, img_cv, features, image_id):
"""Extract texture-related features"""
# 获取灰度图
gray = self.cache.get('gray')
if gray is None:
gray = self.get_grayscale(image_id)
if gray is None:
if len(img_cv.shape) == 3:
gray = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY)
else:
gray = img_cv
self.cache['gray'] = gray
# 减少GLCM计算量,使用更少的角度和距离
distances = [5]
angles = [0, np.pi/2] # 从4个角度减少到2个
# 如果图像较大,对GLCM计算进行降采样
h, w = gray.shape
if h > 512 or w > 512:
gray_small = cv2.resize(gray, (min(w, 512), min(h, 512)))
else:
gray_small = gray
# 转换为uint8并缩放到0-255用于GLCM
if gray_small.dtype != np.uint8:
gray_small = ((gray_small - gray_small.min()) /
(gray_small.max() - gray_small.min() + 1e-10) * 255).astype(np.uint8)
# 使用更少的灰度级以加速计算
gray_small = (gray_small // 8) * 8 # 减少到32个灰度级
# 计算GLCM
try:
glcm = graycomatrix(gray_small, distances=distances, angles=angles,
symmetric=True, normed=True)
# 计算GLCM属性
features["texture_contrast"] = float(np.mean(graycoprops(glcm, 'contrast')[0]))
features["texture_homogeneity"] = float(np.mean(graycoprops(glcm, 'homogeneity')[0]))
features["texture_correlation"] = float(np.mean(graycoprops(glcm, 'correlation')[0]))
features["texture_energy"] = float(np.mean(graycoprops(glcm, 'energy')[0]))
except Exception as e:
print(f"GLCM计算错误: {e}")
# LBP用于微观纹理分析 - 对AI检测至关重要
try:
# 使用更小的半径和更少的点以提高速度
radius = 2 # 从3减少到2
n_points = 8 # 从24减少到8
# 如果需要,对LBP进行降采样
if h > 512 or w > 512:
if 'gray_small' not in locals():
gray_small = cv2.resize(gray, (min(w, 512), min(h, 512)))
else:
gray_small = gray
lbp = local_binary_pattern(gray_small, n_points, radius, method='uniform')
lbp_hist, _ = np.histogram(lbp, bins=n_points + 2, range=(0, n_points + 2))
lbp_hist = lbp_hist.astype(float) / (sum(lbp_hist) + 1e-10)
features["lbp_entropy"] = float(stats.entropy(lbp_hist + 1e-10))
# LBP一致性 - AI生成图像通常LBP模式过于一致
features["lbp_uniformity"] = float(np.sum(lbp_hist**2))
except Exception as e:
print(f"LBP计算错误: {e}")
def _extract_noise_features(self, img_cv, features):
"""Extract noise-related features"""
if len(img_cv.shape) == 3:
# 使用更小的核以加速模糊
blurred = cv2.GaussianBlur(img_cv, (3, 3), 0)
noise = cv2.absdiff(img_cv, blurred)
features["noise_level"] = float(np.mean(noise))
features["noise_std"] = float(np.std(noise))
# 噪声频谱分析 - 只使用一个通道以提高速度
noise_fft = np.fft.fft2(noise[:,:,0])
noise_fft_shift = np.fft.fftshift(noise_fft)
noise_magnitude = np.abs(noise_fft_shift)
features["noise_spectrum_std"] = float(np.std(noise_magnitude))
# 噪声空间一致性 - 使用更大的块以提高速度
block_size = 64 # 从32增加到64
h, w = noise.shape[:2]
# 使用步进操作
noise_blocks = []
for i in range(0, h-block_size, block_size):
for j in range(0, w-block_size, block_size):
block = noise[i:i+block_size, j:j+block_size]
noise_blocks.append(np.mean(block))
if noise_blocks:
features["noise_spatial_std"] = float(np.std(noise_blocks))
# 噪声颜色通道相关性 - 真实照片的噪声在通道间相关性较低
if len(img_cv.shape) == 3:
noise_r = noise[:,:,0].flatten()
noise_g = noise[:,:,1].flatten()
noise_b = noise[:,:,2].flatten()
corr_rg = np.corrcoef(noise_r, noise_g)[0,1]
corr_rb = np.corrcoef(noise_r, noise_b)[0,1]
corr_gb = np.corrcoef(noise_g, noise_b)[0,1]
features["noise_channel_correlation"] = float((abs(corr_rg) + abs(corr_rb) + abs(corr_gb)) / 3)
def _extract_symmetry_features(self, img_cv, features):
"""Extract symmetry-related features"""
h, w = img_cv.shape[:2]
# 水平对称性
if w % 2 == 0:
left_half = img_cv[:, :w//2]
right_half = cv2.flip(img_cv[:, w//2:], 1)
if left_half.shape == right_half.shape:
# 对大图像使用采样以加速计算
if h > 512 or w//2 > 512:
step = max(1, min(h, w//2) // 512)
diff = cv2.absdiff(left_half[::step, ::step], right_half[::step, ::step])
else:
diff = cv2.absdiff(left_half, right_half)
h_symmetry = 1 - float(np.mean(diff) / 255)
features["horizontal_symmetry"] = h_symmetry
# 垂直对称性
if h % 2 == 0:
top_half = img_cv[:h//2, :]
bottom_half = cv2.flip(img_cv[h//2:, :], 0)
if top_half.shape == bottom_half.shape:
# 对大图像使用采样以加速计算
if h//2 > 512 or w > 512:
step = max(1, min(h//2, w) // 512)
diff = cv2.absdiff(top_half[::step, ::step], bottom_half[::step, ::step])
else:
diff = cv2.absdiff(top_half, bottom_half)
v_symmetry = 1 - float(np.mean(diff) / 255)
features["vertical_symmetry"] = v_symmetry
# 径向对称性 - 对于人脸等中心对象很有用
if min(h, w) > 100: # 只对足够大的图像计算
try:
center_y, center_x = h // 2, w // 2
max_radius = min(center_x, center_y) - 10
if max_radius > 20: # 确保有足够的半径
# 创建径向对称性掩码
y, x = np.ogrid[-center_y:h-center_y, -center_x:w-center_x]
mask = x*x + y*y <= max_radius*max_radius
# 计算对称性
masked_img = img_cv.copy()
if len(masked_img.shape) == 3:
for c in range(masked_img.shape[2]):
masked_img[:,:,c][~mask] = 0
else:
masked_img[~mask] = 0
# 旋转180度比较
rotated = cv2.rotate(masked_img, cv2.ROTATE_180)
diff = cv2.absdiff(masked_img, rotated)
features["radial_symmetry"] = 1 - float(np.mean(diff) / 255)
except:
pass
def _extract_frequency_features(self, img_cv, features, image_id):
"""Extract frequency domain features"""
# 获取灰度图
gray = self.cache.get('gray')
if gray is None:
gray = self.get_grayscale(image_id)
if gray is None:
if len(img_cv.shape) == 3:
gray = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY)
else:
gray = img_cv
self.cache['gray'] = gray
# 如果图像较大,对FFT进行降采样
h, w = gray.shape
if h > 512 or w > 512:
gray_small = cv2.resize(gray, (512, 512))
else:
gray_small = gray
# FFT分析
f_transform = np.fft.fft2(gray_small)
f_shift = np.fft.fftshift(f_transform)
magnitude = np.log(np.abs(f_shift) + 1)
# 计算高/低频率比
h, w = magnitude.shape
center_h, center_w = h // 2, w // 2
# 低频区域(中心)
low_freq_region = magnitude[center_h-h//8:center_h+h//8, center_w-w//8:center_w+w//8]
low_freq_mean = np.mean(low_freq_region)
# 高频区域
high_freq_mean = np.mean(magnitude) - low_freq_mean
features["freq_ratio"] = float(high_freq_mean / max(low_freq_mean, 0.001))
features["freq_std"] = float(np.std(magnitude))
# 频率各向异性 - 对AI检测至关重要
# 使用更少的角度以提高速度
freq_blocks = []
for angle in range(0, 180, 45): # 从20°步长减少到45°步长
mask = np.zeros_like(magnitude)
cv2.ellipse(mask, (center_w, center_h), (w//2, h//2), angle, -10, 10, 1, -1)
freq_blocks.append(np.mean(magnitude * mask))
features["freq_anisotropy"] = float(np.std(freq_blocks))
# 频率峰值分析 - AI生成图像通常有特定的频率峰值模式
try:
# 计算径向平均功率谱
y, x = np.ogrid[-center_h:h-center_h, -center_w:w-center_w]
r = np.sqrt(x*x + y*y)
r = r.astype(np.int32)
# 创建径向平均
radial_mean = np.zeros(min(center_h, center_w))
for i in range(1, len(radial_mean)):
mask = (r == i)
if np.sum(mask) > 0:
radial_mean[i] = np.mean(magnitude[mask])
# 计算峰值特征
peaks, _ = stats.find_peaks(radial_mean)
if len(peaks) > 0:
features["freq_peak_count"] = len(peaks)
features["freq_peak_prominence"] = float(np.mean(radial_mean[peaks]))
except:
pass
def _extract_advanced_features(self, img_cv, features, image_id):
"""Extract advanced features for AI detection"""
# 获取灰度图
gray = self.cache.get('gray')
if gray is None:
gray = self.get_grayscale(image_id)
if gray is None:
if len(img_cv.shape) == 3:
gray = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY)
else:
gray = img_cv
self.cache['gray'] = gray
# 小波变换特征
try:
# 如果图像太大,先降采样
if gray.shape[0] > 512 or gray.shape[1] > 512:
gray_small = cv2.resize(gray, (512, 512))
else:
gray_small = gray
# 执行小波变换
coeffs = pywt.dwt2(gray_small, 'haar')
cA, (cH, cV, cD) = coeffs
# 计算各子带的统计特征
features["wavelet_h_std"] = float(np.std(cH))
features["wavelet_v_std"] = float(np.std(cV))
features["wavelet_d_std"] = float(np.std(cD))
# 计算小波系数的熵
cH_hist, _ = np.histogram(cH.flatten(), bins=50)
cV_hist, _ = np.histogram(cV.flatten(), bins=50)
cD_hist, _ = np.histogram(cD.flatten(), bins=50)
features["wavelet_h_entropy"] = float(stats.entropy(cH_hist + 1e-10))
features["wavelet_v_entropy"] = float(stats.entropy(cV_hist + 1e-10))
features["wavelet_d_entropy"] = float(stats.entropy(cD_hist + 1e-10))
except:
pass
# DCT变换特征
try:
# 执行DCT变换
dct = cv2.dct(np.float32(gray_small))
# 计算DCT系数的统计特征
dct_std = np.std(dct)
features["dct_std"] = float(dct_std)
# 计算DCT系数的熵
dct_hist, _ = np.histogram(dct.flatten(), bins=50)
features["dct_entropy"] = float(stats.entropy(dct_hist + 1e-10))
except:
pass
# 图像质量评估
try:
# 计算梯度幅度图像
sobelx = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3)
sobely = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=3)
gradient_magnitude = np.sqrt(sobelx**2 + sobely**2)
# 计算自然度指标
naturalness = np.mean(gradient_magnitude) / np.std(gradient_magnitude)
features["naturalness_index"] = float(naturalness)
except:
pass
#############################################
# 特征分析与决策逻辑部分 - 重新调整
#############################################
# 基于图像类型的特征阈值 - 更严格的阈值
OPTIMIZED_THRESHOLDS = {
"lbp_entropy": {
"default": 1.8, # 更严格的阈值
"portrait": 1.7,
"landscape": 1.9,
},
"freq_anisotropy": {
"default": 0.005, # 更严格的阈值
"portrait": 0.004,
"landscape": 0.006,
},
"texture_correlation": {
"default": 0.97, # 更严格的阈值
"portrait": 0.96,
"landscape": 0.97,
},
"horizontal_symmetry": {
"default": 0.9, # 更严格的阈值
"portrait": 0.88,
"landscape": 0.9,
},
"vertical_symmetry": {
"default": 0.9, # 更严格的阈值
"portrait": 0.88,
"landscape": 0.9,
},
"noise_spatial_std": {
"default": 0.2, # 更严格的阈值
"portrait": 0.2,
"landscape": 0.25,
},
"freq_ratio": {
"default": 0.0, # 更严格的阈值
"portrait": -0.05,
"landscape": 0.0,
},
"noise_spectrum_std": {
"default": 600, # 更严格的阈值
"portrait": 600,
"landscape": 700,
},
"color_entropy": {
"default": 3.0, # 更严格的阈值
"portrait": 3.0,
"landscape": 3.2,
},
"lbp_uniformity": {
"default": 0.25, # 更严格的阈值
"portrait": 0.25,
"landscape": 0.2,
},
"noise_channel_correlation": {
"default": 0.97, # 更严格的阈值
"portrait": 0.97,
"landscape": 0.95,
},
"wavelet_h_entropy": {
"default": 1.5, # 更严格的阈值
"portrait": 1.4,
"landscape": 1.6,
},
"dct_entropy": {
"default": 0.005, # 更严格的阈值
"portrait": 0.004,
"landscape": 0.006,
},
"naturalness_index": {
"default": 0.6, # 更严格的阈值
"portrait": 0.55,
"landscape": 0.65,
}
}
# 特征重要性权重 - 调整权重
FEATURE_IMPORTANCE = {
"lbp_entropy": 0.12,
"freq_anisotropy": 0.12,
"texture_correlation": 0.06,
"horizontal_symmetry": 0.02,
"vertical_symmetry": 0.02,
"noise_spatial_std": 0.06,
"freq_ratio": 0.04,
"noise_spectrum_std": 0.04,
"color_entropy": 0.06,
"lbp_uniformity": 0.04,
"noise_channel_correlation": 0.12, # 增加权重
"wavelet_h_entropy": 0.12, # 增加权重
"dct_entropy": 0.12, # 增加权重
"naturalness_index": 0.06
}
def get_threshold(feature_name, image_type="default"):
"""Get the appropriate threshold for a feature based on image type"""
if feature_name not in OPTIMIZED_THRESHOLDS:
return None
return OPTIMIZED_THRESHOLDS[feature_name].get(image_type,
OPTIMIZED_THRESHOLDS[feature_name]["default"])
def check_ai_specific_features(image_features):
"""Enhanced check for AI-generated image features with stricter criteria"""
ai_score = 0
ai_signs = []
# 获取图像类型
image_type = image_features.get("image_type", "default")
# 处理每个特征
for feature_name, importance in FEATURE_IMPORTANCE.items():
if feature_name not in image_features:
continue
value = image_features[feature_name]
threshold = get_threshold(feature_name, image_type)
if threshold is None:
continue
# 不同特征有不同的比较逻辑
feature_score = 0
# 低值表示AI生成的特征
if feature_name in ["lbp_entropy", "freq_anisotropy", "noise_spatial_std",
"freq_ratio", "noise_spectrum_std", "color_entropy",
"wavelet_h_entropy", "dct_entropy", "naturalness_index"]:
if value < threshold:
# 更严格的评分,只有显著低于阈值才给高分
feature_score = min(1.0, (threshold - value) / threshold * 1.5)
if feature_score > 0.7: # 提高显著性阈值
ai_signs.append(f"{feature_name} 异常低 ({value:.2f})")
# 高值表示AI生成的特征
elif feature_name in ["texture_correlation", "horizontal_symmetry", "vertical_symmetry",
"lbp_uniformity", "noise_channel_correlation"]:
if value > threshold:
# 更严格的评分,只有显著高于阈值才给高分
feature_score = min(1.0, (value - threshold) / (1 - threshold) * 1.5)
if feature_score > 0.7: # 提高显著性阈值
ai_signs.append(f"{feature_name} 异常高 ({value:.2f})")
# 累加加权分数
ai_score += feature_score * importance
# 特征组合分析 - 检查特定组合模式
if ("dct_entropy" in image_features and image_features["dct_entropy"] < 0.005 and
"wavelet_h_entropy" in image_features and image_features["wavelet_h_entropy"] < 1.5 and
"noise_channel_correlation" in image_features and image_features["noise_channel_correlation"] > 0.97):
# 这种组合强烈表明是AI生成
ai_score = max(ai_score, 0.9)
if "特征组合模式" not in ai_signs:
ai_signs.append("特征组合模式: 多个关键特征同时异常")
# 计算检测到多少关键特征
critical_count = len(ai_signs)
if critical_count >= 5:
ai_score = max(ai_score, 0.8) # 更保守,从0.9改为0.8
elif critical_count >= 3:
ai_score = max(ai_score, 0.6) # 更保守,从0.7改为0.6
return min(ai_score, 1.0), ai_signs
def detect_beauty_filter_signs(image_features):
"""Detect beauty filter traces"""
beauty_score = 0
beauty_signs = []
# 获取图像类型
image_type = image_features.get("image_type", "default")
# 只检查最重要的美颜滤镜指标
if "face_skin_std" in image_features:
threshold = 15 if image_type == "portrait" else 20
if image_features["face_skin_std"] < threshold:
beauty_score += 0.3
beauty_signs.append("皮肤质感过于均匀")
if "edge_density" in image_features:
threshold = 0.03 if image_type == "portrait" else 0.04
if image_features["edge_density"] < threshold:
beauty_score += 0.2
beauty_signs.append("边缘过于平滑")
if "noise_level" in image_features:
threshold = 1.0 if image_type == "portrait" else 1.5
if image_features["noise_level"] < threshold:
beauty_score += 0.2
beauty_signs.append("噪点异常少")
if "texture_homogeneity" in image_features:
threshold = 0.5 if image_type == "portrait" else 0.4
if image_features["texture_homogeneity"] > threshold:
beauty_score += 0.2
beauty_signs.append("纹理过于均匀")
return min(beauty_score, 1.0), beauty_signs
def detect_photoshop_signs(image_features):
"""Detect Photoshop traces"""
ps_score = 0
ps_signs = []
# 获取图像类型
image_type = image_features.get("image_type", "default")
# 只检查最重要的PS指标
if "texture_homogeneity" in image_features:
threshold = 0.4 if image_type == "portrait" else 0.35
if image_features["texture_homogeneity"] > threshold:
ps_score += 0.2
ps_signs.append("皮肤质感过于均匀")
if "edge_density" in image_features:
threshold = 0.01 if image_type == "portrait" else 0.015
if image_features["edge_density"] < threshold:
ps_score += 0.2
ps_signs.append("边缘过于平滑")
if "color_std" in image_features:
threshold = 50 if image_type == "portrait" else 40
if image_features["color_std"] > threshold:
ps_score += 0.2
ps_signs.append("颜色分布极不自然")
if "noise_spatial_std" in image_features:
threshold = 1.0 if image_type == "portrait" else 1.2
if image_features["noise_spatial_std"] > threshold:
ps_score += 0.2
ps_signs.append("噪点分布不均匀")
return min(ps_score, 1.0), ps_signs
def calibrate_model_output(probability):
"""校准模型输出,解决模型偏向真人的问题"""
# 如果模型输出总是在0.3-0.4之间,我们可以拉伸这个范围
if 0.25 <= probability <= 0.4:
# 将0.25-0.4范围拉伸到0.1-0.9
return 0.1 + (probability - 0.25) * (0.9 - 0.1) / (0.4 - 0.25)
return probability
def calibrate_feature_score(score, signs):
"""校准特征分析分数,解决特征分析偏向AI的问题"""
# 如果特征分数高但检测到的特征少,降低分数
if score > 0.7 and len(signs) < 3:
return 0.5 + (score - 0.5) * 0.5 # 降低分数
# 如果特征分数高且检测到多个特征,保持高分
elif score > 0.7 and len(signs) >= 4:
return score
# 中等分数情况
elif 0.4 < score <= 0.7:
return 0.4 + (score - 0.4) * 0.7 # 略微降低
return score
def calculate_model_consistency(model_results):
"""Calculate the consistency between model predictions"""
if not model_results:
return 0.0
# 提取AI概率
probabilities = [result.get("ai_probability", 0.5) for result in model_results.values()
if "error" not in result]
if not probabilities:
return 0.0
# 计算方差作为一致性度量
variance = np.var(probabilities)
# 方差越小,一致性越高
consistency = max(0.0, 1.0 - variance * 5) # 缩放以获得合理的一致性分数
return consistency
def get_detailed_analysis(ai_probability, ps_score, beauty_score, ps_signs, ai_signs, beauty_signs,
valid_models_count, ai_feature_score, model_consistency):
"""Provide detailed analysis with improved confidence assessment"""
# 根据模型和特征分析的一致性调整置信度
if abs(ai_probability - ai_feature_score) > 0.2:
confidence_prefix = "低置信度:"
explanation = "(模型预测和特征分析结果存在较大差异)"
elif model_consistency > 0.8 and valid_models_count >= 2:
confidence_prefix = "高置信度:"
explanation = ""
elif model_consistency > 0.6 and valid_models_count >= 2:
confidence_prefix = "中等置信度:"
explanation = ""
else:
confidence_prefix = "低置信度:"
explanation = ""
# 计算编辑分数(在所有路径中都需要)
combined_edit_score = max(ps_score, beauty_score)
# 分类逻辑 - 从中立起点开始
if ai_probability > 0.6:
category = confidence_prefix + "AI生成图像" + explanation
description = "图像很可能是由AI完全生成,几乎没有真人照片的特征。"
main_category = "AI生成"
elif ai_probability < 0.4:
# 第二级分类:素人 vs 修图
if combined_edit_score > 0.5:
category = confidence_prefix + "真人照片,修图痕迹明显" + explanation
description = "图像基本是真人照片,但经过了明显的后期处理或美颜,修饰痕迹明显。"
main_category = "真人照片-修图明显"
else:
category = confidence_prefix + "真实素人照片" + explanation
description = "图像很可能是未经大量处理的真人照片,保留了自然的细节和特征。"
main_category = "真人照片-素人"
else:
# 处理边界情况 - 添加"无法确定"类别
category = confidence_prefix + "无法确定" + explanation
description = "系统无法确定该图像是AI生成还是真实照片。模型预测和特征分析结果不一致,需要人工判断。"
main_category = "无法确定"
# 处理边界情况 - AI生成与高度修图
if 0.45 < ai_probability < 0.55 and combined_edit_score > 0.7:
category = confidence_prefix + "真人照片,修图痕迹明显(也可能是AI生成)" + explanation
description = "图像可能是真人照片经过大量后期处理,也可能是AI生成图像。由于现代AI技术与高度修图效果相似,难以完全区分。"
main_category = "真人照片-修图明显"
# 格式化详情
ps_details = "检测到的修图痕迹:" + "、".join(ps_signs) if ps_signs else "未检测到明显的修图痕迹。"
ai_details = "检测到的AI特征:" + "、".join(ai_signs) if ai_signs else "未检测到明显的AI生成特征。"
beauty_details = "检测到的美颜特征:" + "、".join(beauty_signs) if beauty_signs else "未检测到明显的美颜特征。"
return category, description, ps_details, ai_details, beauty_details, main_category
def detect_ai_image(image):
"""Enhanced main detection function with improved decision logic"""
if image is None:
return {"error": "未提供图像"}
start_time = time.time()
image_id = str(uuid.uuid4()) # 为图像生成唯一ID,用于反馈收集
# 初始化管理器
model_manager = EnhancedModelManager()
feature_extractor = EnhancedFeatureExtractor()
# 第一阶段:快速特征分析
# 确定是否需要降采样
downscale_factor = 1.0
if image.width * image.height > 1024 * 1024: # 对于大于1MP的图像
downscale_factor = min(1.0, 1024 * 1024 / (image.width * image.height))
# 提取特征
feature_extractor.clear_cache() # 清除之前运行的缓存
image_features = feature_extractor.analyze_image_features(image, downscale_factor)
# 快速特征分析
ai_feature_score, ai_signs = check_ai_specific_features(image_features)
# 第二阶段:模型分析
results = {}
valid_models = 0
weighted_ai_probability = 0
# 使用每个模型处理
for key, model_info in model_manager.models.items():
model_result = model_manager.get_model_prediction(key, image)
if model_result and "error" not in model_result:
# 校准模型输出
model_result["ai_probability"] = calibrate_model_output(model_result["ai_probability"])
results[key] = model_result
weighted_ai_probability += model_result["ai_probability"] * model_info["weight"]
valid_models += 1
else:
results[key] = model_result or {"model_name": model_info["name"], "error": "处理失败"}
# 计算最终加权概率
if valid_models > 0:
final_ai_probability = weighted_ai_probability / sum(
m["weight"] for k, m in model_manager.models.items()
if k in model_manager.loaded_models
)
else:
return {"error": "所有模型加载失败"}
# 计算模型一致性
model_consistency = calculate_model_consistency(results)
# 分析PS和美颜痕迹
ps_score, ps_signs = detect_photoshop_signs(image_features)
beauty_score, beauty_signs = detect_beauty_filter_signs(image_features)
# 校准特征分析分数
calibrated_feature_score = calibrate_feature_score(ai_feature_score, ai_signs)
# 从中立起点开始决策
adjusted_probability = 0.5
# 根据校准后的模型和特征分析调整概率
if final_ai_probability > 0.7 or calibrated_feature_score > 0.8:
# 强AI证据
adjusted_probability = 0.5 + (final_ai_probability * 0.25 + calibrated_feature_score * 0.25)
elif final_ai_probability < 0.3 and calibrated_feature_score < 0.3:
# 强真人证据
adjusted_probability = 0.5 - ((0.3 - final_ai_probability) * 0.25 + (0.3 - calibrated_feature_score) * 0.25)
else:
# 证据不足,保持在中间区域
adjusted_probability = 0.5 + (final_ai_probability - 0.5) * 0.2 + (calibrated_feature_score - 0.5) * 0.2
# 特征组合分析 - 强有力的组合证据
if ("dct_entropy" in image_features and image_features["dct_entropy"] < 0.005 and
"wavelet_h_entropy" in image_features and image_features["wavelet_h_entropy"] < 1.5 and
"noise_channel_correlation" in image_features and image_features["noise_channel_correlation"] > 0.97):
# 这种组合强烈表明是AI生成
adjusted_probability = max(adjusted_probability, 0.7)
# 确保概率在有效范围内
adjusted_probability = min(1.0, max(0.0, adjusted_probability))
# 获取详细分析
category, description, ps_details, ai_details, beauty_details, main_category = get_detailed_analysis(
adjusted_probability, ps_score, beauty_score, ps_signs, ai_signs, beauty_signs,
valid_models, calibrated_feature_score, model_consistency
)
# 构建最终结果
processing_time = time.time() - start_time
final_result = {
"image_id": image_id,
"ai_probability": adjusted_probability,
"original_ai_probability": final_ai_probability,
"ps_score": ps_score,
"beauty_score": beauty_score,
"ai_feature_score": calibrated_feature_score,
"raw_feature_score": ai_feature_score,
"model_consistency": model_consistency,
"category": category,
"main_category": main_category,
"description": description,
"ps_details": ps_details,
"ai_details": ai_details,
"beauty_details": beauty_details,
"processing_time": f"{processing_time:.2f} seconds",
"individual_model_results": results,
# 只包含最重要的特征以减少响应大小
"key_features": {k: image_features[k] for k in FEATURE_IMPORTANCE if k in image_features}
}
# 保存结果用于后续分析
try:
with open(f"{FEEDBACK_DIR}/{image_id}.json", "w") as f:
json.dump(final_result, f)
except:
pass
# 返回两个值:JSON结果和标签数据
label_data = {main_category: 1.0}
return final_result, label_data
def save_user_feedback(image_id, user_feedback):
"""Save user feedback for continuous learning"""
if not image_id:
return {"status": "error", "message": "未提供图像ID"}
try:
# 读取原始结果
result_path = f"{FEEDBACK_DIR}/{image_id}.json"
if not os.path.exists(result_path):
return {"status": "error", "message": "找不到对应的图像分析结果"}
with open(result_path, "r") as f:
original_result = json.load(f)
# 添加用户反馈
feedback_data = {
"image_id": image_id,
"original_result": original_result,
"user_feedback": user_feedback,
"timestamp": datetime.now().isoformat()
}
# 保存反馈
feedback_path = f"{FEEDBACK_DIR}/{image_id}_feedback.json"
with open(feedback_path, "w") as f:
json.dump(feedback_data, f)
return {"status": "success", "message": "反馈已保存,感谢您的贡献!"}
except Exception as e:
return {"status": "error", "message": f"保存反馈时出错: {str(e)}"}
#############################################
# Gradio界面部分
#############################################
def process_image_and_show_results(image):
"""Process image and format results for Gradio interface"""
if image is None:
return {"error": "请上传图像"}, None, "未检测"
try:
result, label = detect_ai_image(image)
return result, result["image_id"], result["main_category"]
except Exception as e:
return {"error": f"处理图像时出错: {str(e)}"}, None, "错误"
def submit_feedback(image_id, correct_classification, comments):
"""Submit user feedback"""
if not image_id:
return "请先上传图像进行分析"
feedback = {
"correct_classification": correct_classification,
"comments": comments
}
result = save_user_feedback(image_id, feedback)
return result["message"]
# 创建Gradio界面
with gr.Blocks(title="增强型AI图像检测系统") as iface:
gr.Markdown("# 增强型AI图像检测系统")
gr.Markdown("上传图像,系统将分析该图像是AI生成还是真实照片")
with gr.Row():
with gr.Column(scale=1):
input_image = gr.Image(type="pil", label="上传图像")
analyze_btn = gr.Button("分析图像", variant="primary")
with gr.Column(scale=2):
result_json = gr.JSON(label="详细分析结果")
image_id_output = gr.Textbox(label="图像ID", visible=False)
result_label = gr.Label(label="主要分类")
gr.Markdown("## 用户反馈")
gr.Markdown("您认为上述分类结果是否正确?您的反馈将帮助我们改进系统。")
with gr.Row():
correct_classification = gr.Radio(
["正确", "错误 - 这是AI生成图像", "错误 - 这是真实照片", "不确定"],
label="分类结果是否正确"
)
comments = gr.Textbox(label="其他评论(可选)")
feedback_btn = gr.Button("提交反馈")
feedback_result = gr.Textbox(label="反馈结果")
# 设置事件
analyze_btn.click(
process_image_and_show_results,
inputs=[input_image],
outputs=[result_json, image_id_output, result_label]
)
feedback_btn.click(
submit_feedback,
inputs=[image_id_output, correct_classification, comments],
outputs=[feedback_result]
)
# 启动应用
if __name__ == "__main__":
iface.launch(share=True)