import json import time import hashlib import base64 import asyncio import aiohttp from fastapi import FastAPI, Request, Response from typing import Optional, Dict, List, Any import uvicorn from pydantic import BaseModel # 配置参数 API_CHALLENGE_URL = 'https://api.eqing.tech/api/altcaptcha/challenge' # 验证码挑战接口 NEXTWAY_CHAT_URL = 'https://origin.eqing.tech/api/openai/v1/chat/completions' # 聊天完成接口 CREDENTIAL_EXPIRY_MARGIN = 60 * 1000 # 凭证过期边界时间(60秒,毫秒单位) PORT = 7860 # 服务器端口 API_ENDPOINT = '/v1/chat/completions' # API端点 MODEL_NAME = "gpt-4o-free" # 默认模型名称 REQUEST_TIMEOUT = 480 # 请求超时时间(秒) MAX_RETRIES = 3 # 最大重试次数 RETRY_DELAY = 1 # 重试延迟(秒) # 全局变量 current_credential = None # 当前凭证 credential_expiry = None # 凭证过期时间 is_refreshing_credential = False # 是否正在刷新凭证 # 模型映射字典 MODEL_MAPPING = { 'gpt-4o-all-lite': 'gpt-4o-mini', 'gpt-4o-image': 'gpt-4o-mini-image-free', 'grok-3-re': 'grok-3-free', 'gemini-2.0-flash': 'gemini-2.0-flash-free' } app = FastAPI() class ChatRequest(BaseModel): """聊天请求的数据模型""" messages: List[Dict[str, str]] model: Optional[str] stream: Optional[bool] = True temperature: Optional[float] = 0.5 presence_penalty: Optional[float] = 0 frequency_penalty: Optional[float] = 0 top_p: Optional[float] = 1 max_tokens: Optional[int] = 4000 async def extract_content(text: str) -> str: """ 从响应文本中提取内容 Args: text: 响应文本 Returns: 提取的AI响应内容 """ lines = text.split('\n') ai_response = '' ignored_id = 'chatcmpl-Em8sqOsIIDLYPNvC65WfFvqv' created = 1687070102 for line in lines: line = line.strip() if line.startswith('data:'): data_str = line[5:].strip() if not data_str or data_str in ['[ORIGIN]', '[DONE]']: continue try: json_data = json.loads(data_str) # 跳过特定的响应 if json_data.get('id') == ignored_id or json_data.get('created') == created: continue # 提取内容 if (json_data.get('choices') and json_data['choices'][0].get('delta') and 'content' in json_data['choices'][0]['delta']): content = json_data['choices'][0]['delta']['content'] ai_response += content except json.JSONDecodeError: print(f'跳过非JSON数据') return ai_response async def solve_challenge(challenge: str, salt: str, algorithm: str = "SHA-512", max_number: int = 1000000): """ 解决验证码挑战 Args: challenge: 挑战字符串 salt: 盐值 algorithm: 哈希算法 max_number: 最大尝试次数 Returns: 解决方案字典 """ start_time = time.time() for number in range(max_number): hash_value = await verify_hash(salt, number, algorithm) if hash_value == challenge: return { "number": number, "took": int((time.time() - start_time) * 1000) } return None async def verify_hash(salt: str, number: int, algorithm: str) -> str: """ 验证哈希值 Args: salt: 盐值 number: 数字 algorithm: 哈希算法 Returns: 哈希字符串 """ input_str = f"{salt}{number}" if algorithm == "SHA-512": hash_obj = hashlib.sha512(input_str.encode()) return hash_obj.hexdigest() elif algorithm == "SHA-256": hash_obj = hashlib.sha256(input_str.encode()) return hash_obj.hexdigest() else: raise ValueError(f"不支持的算法: {algorithm}") async def generate_credential(): """ 生成新的凭证 Returns: 凭证信息字典 """ global current_credential, credential_expiry async with aiohttp.ClientSession() as session: try: async with session.get(API_CHALLENGE_URL) as response: if response.status != 200: print(f"验证码请求失败,状态码: {response.status}") return None data = await response.json() solution = await solve_challenge( data['challenge'], data['salt'], data['algorithm'], data['maxnumber'] ) if not solution: print("解决验证码挑战失败") return None credential_obj = { "algorithm": data['algorithm'], "challenge": data['challenge'], "number": solution['number'], "salt": data['salt'], "signature": data['signature'], "took": solution['took'] } credential = base64.b64encode(json.dumps(credential_obj).encode()).decode() expiry = int(data['salt'].split('?expires=')[1]) * 1000 return {"credential": credential, "expiry": expiry} except Exception as e: print(f"生成凭证时出错: {e}") return None async def get_credential(): """ 获取有效的凭证 Returns: 当前有效的凭证 """ global current_credential, credential_expiry, is_refreshing_credential if (not current_credential or not credential_expiry or credential_expiry <= time.time() * 1000 + CREDENTIAL_EXPIRY_MARGIN): if not is_refreshing_credential: is_refreshing_credential = True try: cred_data = await generate_credential() if cred_data: current_credential = cred_data['credential'] credential_expiry = cred_data['expiry'] finally: is_refreshing_credential = False else: await asyncio.sleep(2) # 等待其他进程完成凭证刷新 return current_credential @app.post(API_ENDPOINT) async def chat_endpoint(request: ChatRequest): """ 聊天API端点 Args: request: 聊天请求对象 Returns: 聊天响应 """ try: model = MODEL_MAPPING.get(request.model, request.model or MODEL_NAME) response_content = await handle_chat_request(model, request.messages) if response_content is None: return Response( content="从API获取响应失败", status_code=500 ) return { "choices": [{ "message": { "role": "assistant", "content": response_content.strip() }, "finish_reason": "stop", "index": 0 }], "model": model, "object": "chat.completion" } except Exception as e: print(f"处理聊天请求时出错: {e}") return Response(content="内部服务器错误", status_code=500) async def handle_chat_request(model: str, messages: List[Dict[str, str]]): """ 处理聊天请求 Args: model: 模型名称 messages: 消息列表 Returns: 聊天响应内容 """ captcha_token = await get_credential() if not captcha_token: return None body = { "messages": messages, "stream": True, "model": model, "temperature": 0.5, "presence_penalty": 0, "frequency_penalty": 0, "top_p": 1, "max_tokens": 4000, "captchaToken": captcha_token } timeout = aiohttp.ClientTimeout(total=REQUEST_TIMEOUT) async with aiohttp.ClientSession(timeout=timeout) as session: try: async with session.post(NEXTWAY_CHAT_URL, json=body) as response: if response.status != 200: print(f"请求失败,状态码: {response.status}") return None complete_response = "" buffer = "" # 用于存储未完成的数据块 async for chunk in response.content.iter_chunks(): if chunk[0]: # chunk[0] 是数据,chunk[1] 是布尔值表示是否是最后一块 try: chunk_text = (buffer + chunk[0].decode()).strip() buffer = "" # 清空缓冲区 # 处理可能的不完整JSON if chunk_text.endswith('}'): content = await extract_content(chunk_text) if content: complete_response += content else: buffer = chunk_text # 存储不完整的数据到缓冲区 except UnicodeDecodeError as e: print(f"解码错误: {e}") buffer = "" # 清空缓冲区,跳过损坏的数据 continue # 检查最终响应 if not complete_response: print("警告: 收到空响应") return None return complete_response.strip() except asyncio.TimeoutError: print("请求超时") return None except Exception as e: print(f"处理请求时出错: {e}") await get_credential() # 刷新凭证 return None if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=PORT)