import os import gradio as gr from gradio import ChatMessage from typing import Iterator, List, Dict, Tuple, Any import google.generativeai as genai from huggingface_hub import HfApi import requests import re import traceback # HuggingFace API key for space analysis HF_TOKEN = os.getenv("HF_TOKEN") hf_api = HfApi(token=HF_TOKEN) # Gemini 2.0 Flash Thinking model API key and client (for LLM) GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") genai.configure(api_key=GEMINI_API_KEY) model = genai.GenerativeModel("gemini-2.0-flash-thinking-exp-01-21") def get_headers(): if not HF_TOKEN: raise ValueError("Hugging Face token not found in environment variables") return {"Authorization": f"Bearer {HF_TOKEN}"} def get_file_content(space_id: str, file_path: str) -> str: file_url = f"https://huggingface.co/spaces/{space_id}/raw/main/{file_path}" try: response = requests.get(file_url, headers=get_headers()) if response.status_code == 200: return response.text else: return f"File not found or inaccessible: {file_path}" except requests.RequestException: return f"Error fetching content for file: {file_path}" def get_space_structure(space_id: str) -> Dict: try: files = hf_api.list_repo_files(repo_id=space_id, repo_type="space") tree = {"type": "directory", "path": "", "name": space_id, "children": []} for file in files: path_parts = file.split('/') current = tree for i, part in enumerate(path_parts): if i == len(path_parts) - 1: # file current["children"].append({"type": "file", "path": file, "name": part}) else: found = False for child in current["children"]: if child["type"] == "directory" and child["name"] == part: current = child found = True break if not found: new_dir = {"type": "directory", "path": '/'.join(path_parts[:i+1]), "name": part, "children": []} current["children"].append(new_dir) current = new_dir return tree except Exception as e: print(f"Error in get_space_structure: {str(e)}") return {"error": f"API request error: {str(e)}"} def format_tree_structure(tree_data: Dict, indent: str = "") -> str: if "error" in tree_data: return tree_data["error"] formatted = f"{indent}{'📁' if tree_data.get('type') == 'directory' else '📄'} {tree_data.get('name', 'Unknown')}\n" if tree_data.get("type") == "directory": for child in sorted(tree_data.get("children", []), key=lambda x: (x.get("type", "") != "directory", x.get("name", ""))): formatted += format_tree_structure(child, indent + " ") return formatted def adjust_lines_for_code(code_content: str, min_lines: int = 10, max_lines: int = 100) -> int: num_lines = len(code_content.split('\n')) return min(max(num_lines, min_lines), max_lines) def analyze_space(url: str, progress=gr.Progress()): try: space_id = url.split('spaces/')[-1] if not re.match(r'^[\w.-]+/[\w.-]+$', space_id): raise ValueError(f"Invalid Space ID format: {space_id}") progress(0.1, desc="Analyzing file structure...") tree_structure = get_space_structure(space_id) if "error" in tree_structure: raise ValueError(tree_structure["error"]) tree_view = format_tree_structure(tree_structure) progress(0.3, desc="Fetching app.py content...") app_content = get_file_content(space_id, "app.py") progress(0.5, desc="Summarizing code...") summary = summarize_code(app_content) progress(0.7, desc="Analyzing code...") analysis = analyze_code(app_content) progress(0.9, desc="Generating usage instructions...") usage = explain_usage(app_content) lines_for_app_py = adjust_lines_for_code(app_content) progress(1.0, desc="Complete") return app_content, tree_view, tree_structure, space_id, summary, analysis, usage, lines_for_app_py except Exception as e: print(f"Error in analyze_space: {str(e)}") print(traceback.format_exc()) return f"An error occurred: {str(e)}", "", None, "", "", "", "", 10 # -------------------------------------------------- # Gemini 2.0 Flash Thinking model (LLM) functions # -------------------------------------------------- from gradio import ChatMessage def format_chat_history(messages: List[ChatMessage]) -> List[Dict]: """ Convert a list of ChatMessages to a format that the Gemini model can understand. (Skip messages with 'Thinking' metadata) """ formatted = [] for m in messages: if hasattr(m, "metadata") and m.metadata: # Skip 'Thinking' messages continue role = "assistant" if m.role == "assistant" else "user" formatted.append({"role": role, "parts": [m.content or ""]}) return formatted def gemini_chat_completion(system_message: str, user_message: str, max_tokens: int = 200, temperature: float = 0.7) -> str: init_msgs = [ ChatMessage(role="system", content=system_message), ChatMessage(role="user", content=user_message) ] chat_history = format_chat_history(init_msgs) chat = model.start_chat(history=chat_history) final = "" try: for chunk in chat.send_message(user_message, stream=True): parts = chunk.candidates[0].content.parts if len(parts) == 2: final += parts[1].text else: final += parts[0].text return final.strip() except Exception as e: return f"Error calling LLM: {str(e)}" def summarize_code(app_content: str): system_msg = "You are an AI assistant that analyzes and summarizes Python code. Please summarize the provided code in no more than 3 lines." user_msg = f"Please summarize the following Python code in no more than 3 lines:\n\n{app_content}" try: return gemini_chat_completion(system_msg, user_msg, max_tokens=200, temperature=0.7) except Exception as e: return f"Error generating summary: {str(e)}" def analyze_code(app_content: str): system_msg = ( "You are an AI assistant that analyzes Python code. Please analyze the provided code in terms of its service utility and application with respect to the following aspects:\n" "A. Background and Necessity\n" "B. Functional Utility and Value\n" "C. Key Features\n" "D. Target Audience\n" "E. Expected Impact\n" "Please also compare with existing and similar projects. Output in Markdown format." ) user_msg = f"Please analyze the following Python code:\n\n{app_content}" try: return gemini_chat_completion(system_msg, user_msg, max_tokens=1000, temperature=0.7) except Exception as e: return f"Error generating analysis: {str(e)}" def explain_usage(app_content: str): system_msg = ( "You are an AI assistant that analyzes Python code to explain its usage. Based on the provided code, please describe how to use it as if you were viewing the interface. Output in Markdown format." ) user_msg = f"Please explain how to use the following Python code:\n\n{app_content}" try: return gemini_chat_completion(system_msg, user_msg, max_tokens=800, temperature=0.7) except Exception as e: return f"Error generating usage instructions: {str(e)}" def stream_gemini_response(user_message: str, conversation_state: List[ChatMessage]) -> Iterator[List[ChatMessage]]: """ Send a streaming request to Gemini. If the user_message is empty, append a minimal guidance message from the assistant and yield. """ if not user_message.strip(): conversation_state.append( ChatMessage( role="assistant", content="No input provided. Please enter a question!" ) ) yield conversation_state return print(f"\n=== New Request ===\nUser message: {user_message}") chat_history = format_chat_history(conversation_state) chat = model.start_chat(history=chat_history) response = chat.send_message(user_message, stream=True) thought_buffer = "" response_buffer = "" thinking_complete = False conversation_state.append( ChatMessage( role="assistant", content="", metadata={"title": "⚙️ Thinking: *The thoughts produced by the model are experimental"} ) ) try: for chunk in response: parts = chunk.candidates[0].content.parts current_chunk = parts[0].text if len(parts) == 2 and not thinking_complete: thought_buffer += current_chunk print(f"\n=== Complete Thought ===\n{thought_buffer}") conversation_state[-1] = ChatMessage( role="assistant", content=thought_buffer, metadata={"title": "⚙️ Thinking: *The thoughts produced by the model are experimental"} ) yield conversation_state response_buffer = parts[1].text print(f"\n=== Starting Response ===\n{response_buffer}") conversation_state.append( ChatMessage(role="assistant", content=response_buffer) ) thinking_complete = True elif thinking_complete: response_buffer += current_chunk print(f"\n=== Response Chunk ===\n{current_chunk}") conversation_state[-1] = ChatMessage( role="assistant", content=response_buffer ) else: thought_buffer += current_chunk print(f"\n=== Thinking Chunk ===\n{current_chunk}") conversation_state[-1] = ChatMessage( role="assistant", content=thought_buffer, metadata={"title": "⚙️ Thinking: *The thoughts produced by the model are experimental"} ) yield conversation_state print(f"\n=== Final Response ===\n{response_buffer}") except Exception as e: print(f"\n=== Error ===\n{str(e)}") conversation_state.append( ChatMessage( role="assistant", content=f"I apologize, but encountered an error: {str(e)}" ) ) yield conversation_state def convert_for_messages_format(messages: List[ChatMessage]) -> List[Dict[str, str]]: """ Convert a list of ChatMessages to the format [{"role": "assistant"/"user", "content": "..."}]. """ output = [] for msg in messages: output.append({"role": msg.role, "content": msg.content}) return output def user_submit_message(msg: str, conversation_state: List[ChatMessage]): conversation_state.append(ChatMessage(role="user", content=msg)) return "", conversation_state def respond_wrapper(message: str, conversation_state: List[ChatMessage], max_tokens, temperature, top_p): # Get the last user message last_user_message = "" for msg in reversed(conversation_state): if msg.role == "user": last_user_message = msg.content break # Generate response based on the last user message for updated_messages in stream_gemini_response(last_user_message, conversation_state): yield "", convert_for_messages_format(updated_messages) def create_ui(): try: css = """ body { background: linear-gradient(to right, #f0f2f5, #ffffff); font-family: 'Segoe UI', sans-serif; } .gradio-container { border-radius: 15px; box-shadow: 0 4px 6px rgba(0,0,0,0.1); } footer {visibility: hidden;} .tabitem-header { font-weight: bold; color: #3b3b3b; } .gradio-markdown h1 { color: #ff6f61; } """ with gr.Blocks(css=css) as demo: gr.Markdown("# 🚀 MOUSE: Space Research Thinking") with gr.Tabs(): with gr.TabItem("🔍 Analysis"): with gr.Row(): with gr.Column(): url_input = gr.Textbox(label="🔗 HuggingFace Space URL", placeholder="e.g.: https://huggingface.co/spaces/username/space-name") analyze_button = gr.Button("Start Analysis 🚀", variant="primary") summary_output = gr.Markdown(label="📝 Code Summary") analysis_output = gr.Markdown(label="🔍 Code Analysis") usage_output = gr.Markdown(label="📚 Usage Instructions") tree_view_output = gr.Textbox(label="📁 File Structure", lines=20) with gr.Column(): code_tabs = gr.Tabs() with code_tabs: with gr.TabItem("app.py"): app_py_content = gr.Code( language="python", label="app.py", lines=50 ) with gr.TabItem("requirements.txt"): requirements_content = gr.Textbox( label="requirements.txt", lines=50 ) with gr.TabItem("🤖 AI Code Chat"): gr.Markdown("## 💬 Enter an example or paste your source code and ask your question!") chatbot = gr.Chatbot( label="Chat Window", height=400, type="messages" ) msg = gr.Textbox( label="Enter your message", placeholder="Type your message here..." ) max_tokens = gr.Slider( minimum=1, maximum=8000, value=4000, label="Max Tokens", visible=False ) temperature = gr.Slider( minimum=0, maximum=1, value=0.7, label="Temperature", visible=False ) top_p = gr.Slider( minimum=0, maximum=1, value=0.9, label="Top P", visible=False ) examples = [ ["Explain detailed usage instructions in over 4000 tokens"], ["Generate 20 FAQs in over 4000 tokens"], ["Describe technical differentiators and strengths in over 4000 tokens"], ["Generate innovative ideas for patent applications in over 4000 tokens"], ["Write an academic paper in over 4000 tokens"], ["Continue your answer"] ] gr.Examples(examples, inputs=msg) conversation_state = gr.State([]) msg.submit( user_submit_message, inputs=[msg, conversation_state], outputs=[msg, conversation_state], queue=False ).then( respond_wrapper, inputs=[msg, conversation_state, max_tokens, temperature, top_p], outputs=[msg, chatbot], ) with gr.TabItem("⭐ Recommended Best"): gr.Markdown( "Discover recommended HuggingFace Spaces [here](https://huggingface.co/spaces/openfree/Korean-Leaderboard)." ) # Analysis tab logic space_id_state = gr.State() tree_structure_state = gr.State() app_py_content_lines = gr.State() analyze_button.click( analyze_space, inputs=[url_input], outputs=[ app_py_content, tree_view_output, tree_structure_state, space_id_state, summary_output, analysis_output, usage_output, app_py_content_lines ] ).then( lambda space_id: get_file_content(space_id, "requirements.txt"), inputs=[space_id_state], outputs=[requirements_content] ).then( lambda lines: gr.update(lines=lines), inputs=[app_py_content_lines], outputs=[app_py_content] ) return demo except Exception as e: print(f"Error in create_ui: {str(e)}") print(traceback.format_exc()) raise if __name__ == "__main__": try: print("Starting HuggingFace Space Analyzer...") demo = create_ui() print("UI created successfully.") print("Configuring Gradio queue...") demo.queue() print("Gradio queue configured.") print("Launching Gradio app...") demo.launch( server_name="0.0.0.0", server_port=7860, share=False, debug=True, show_api=False ) print("Gradio app launched successfully.") except Exception as e: print(f"Error in main: {str(e)}") print("Detailed error information:") print(traceback.format_exc()) raise