Spaces:
Running
Running
| import httpx | |
| import json | |
| import logging | |
| import html | |
| from config import ANTCHAT_BASE_URL, ANTCHAT_API_KEY, CHAT_MODEL_SPECS | |
| logging.basicConfig( | |
| level=logging.DEBUG, | |
| format="%(asctime)s [%(levelname)s] %(message)s" | |
| ) | |
| logger = logging.getLogger(__name__) | |
| def get_model_response(model_id, history, system_prompt, temperature, escape_html=True): | |
| """ | |
| 与 AntChat API 交互以获取模型响应。 | |
| """ | |
| # The model_id passed in is now the ground truth, potentially overridden by local.py | |
| api_model_id = CHAT_MODEL_SPECS[model_id]["model_id"] | |
| headers = { | |
| "Authorization": f"Bearer {ANTCHAT_API_KEY}", | |
| "Content-Type": "application/json", | |
| } | |
| # 构建消息历史 | |
| messages = [{"role": "system", "content": system_prompt}] | |
| for user_msg, assistant_msg in history: | |
| # 关键修复:只处理包含用户消息的轮次,以过滤掉UI的初始欢迎语 | |
| if user_msg: | |
| messages.append({"role": "user", "content": user_msg}) | |
| # 只有在用户消息之后,才可能追加对应的助手消息 | |
| if assistant_msg: | |
| messages.append({"role": "assistant", "content": assistant_msg}) | |
| json_data = { | |
| "model": api_model_id, | |
| "messages": messages, | |
| "stream": True, | |
| "temperature": temperature, | |
| } | |
| logger.debug(f"请求 URL: {ANTCHAT_BASE_URL}/chat/completions") | |
| logger.debug(f"请求头: {headers}") | |
| logger.debug(f"请求体: {json.dumps(json_data, ensure_ascii=False)}") | |
| try: | |
| with httpx.stream( | |
| "POST", | |
| f"{ANTCHAT_BASE_URL}/chat/completions", | |
| headers=headers, | |
| json=json_data, | |
| timeout=120, | |
| ) as response: | |
| logger.debug(f"响应状态码: {response.status_code}") | |
| response.raise_for_status() | |
| for chunk in response.iter_lines(): | |
| if chunk.startswith("data:"): | |
| chunk = chunk[5:] | |
| if chunk.strip() == "[DONE]": | |
| break | |
| try: | |
| data = json.loads(chunk) | |
| if "choices" in data and data["choices"]: | |
| delta = data["choices"][0].get("delta", {}) | |
| content_chunk = delta.get("content") | |
| if content_chunk: | |
| yield html.escape(content_chunk) if escape_html else content_chunk | |
| elif "tool_calls" in delta: | |
| tool_calls = delta.get("tool_calls", []) | |
| if tool_calls: | |
| func_chunk = tool_calls[0].get("function", {}) | |
| args_chunk = func_chunk.get("arguments") | |
| if args_chunk: | |
| yield html.escape(args_chunk) if escape_html else args_chunk | |
| except json.JSONDecodeError as e: | |
| logger.error(f"JSON 解析错误: {e}, 数据: {chunk}") | |
| except Exception as e: | |
| logger.error(f"请求异常: {e}") | |
| def perform_web_search(query): | |
| # 调用 Tavily 或 Serper API | |
| #... | |
| return "搜索结果摘要" | |
| def generate_code_for_tab(system_prompt, user_prompt, code_type, model_choice): | |
| """ | |
| 为代码生成标签页调用 Ring 模型。 | |
| """ | |
| logger.info(f"为 '{code_type}' 类型生成代码,Prompt: '{user_prompt}', Model: '{model_choice}'") | |
| if code_type == "静态页面": | |
| # 从 UI 的选项中解析出模型名称 | |
| if "inclusionai/ling-1t" in model_choice: | |
| model_name = "inclusionai/ling-1t" | |
| elif "inclusionai/ring-flash-2.0" in model_choice: | |
| model_name = "inclusionai/ring-flash-2.0" | |
| else: | |
| # 默认或备用模型 | |
| model_name = "inclusionai/ling-1t" | |
| logger.warning(f"未知的模型选项 '{model_choice}', 回退到默认模型 'inclusionai/ling-1t'") | |
| history = [[user_prompt, None]] | |
| temperature = 0.7 | |
| # For code, we don't want to escape HTML entities | |
| yield from get_model_response(model_name, history, system_prompt, temperature, escape_html=False) | |
| elif code_type == "Gradio 应用": | |
| # Currently mocked | |
| yield f""" | |
| import gradio as gr | |
| def greet(name): | |
| return f"Hello, {user_prompt} a.k.a. {{name}}!" | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## Simple Greeting App") | |
| name_input = gr.Textbox(label="Enter your name") | |
| greet_button = gr.Button("Greet") | |
| output_text = gr.Textbox(label="Output") | |
| greet_button.click(fn=greet, inputs=name_input, outputs=output_text) | |
| demo.launch() | |
| """ | |
| else: | |
| return | |
| yield |