|
import os |
|
import gradio as gr |
|
from gradio import ChatMessage |
|
from typing import Iterator, List, Dict, Tuple, Any |
|
import google.generativeai as genai |
|
from huggingface_hub import HfApi |
|
import requests |
|
import re |
|
import traceback |
|
|
|
|
|
HF_TOKEN = os.getenv("HF_TOKEN") |
|
hf_api = HfApi(token=HF_TOKEN) |
|
|
|
|
|
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") |
|
genai.configure(api_key=GEMINI_API_KEY) |
|
model = genai.GenerativeModel("gemini-2.0-flash-thinking-exp-01-21") |
|
|
|
|
|
def get_headers(): |
|
if not HF_TOKEN: |
|
raise ValueError("Hugging Face token not found in environment variables") |
|
return {"Authorization": f"Bearer {HF_TOKEN}"} |
|
|
|
|
|
def get_file_content(space_id: str, file_path: str) -> str: |
|
file_url = f"https://huggingface.co/spaces/{space_id}/raw/main/{file_path}" |
|
try: |
|
response = requests.get(file_url, headers=get_headers()) |
|
if response.status_code == 200: |
|
return response.text |
|
else: |
|
return f"File not found or inaccessible: {file_path}" |
|
except requests.RequestException: |
|
return f"Error fetching content for file: {file_path}" |
|
|
|
|
|
def get_space_structure(space_id: str) -> Dict: |
|
try: |
|
files = hf_api.list_repo_files(repo_id=space_id, repo_type="space") |
|
tree = {"type": "directory", "path": "", "name": space_id, "children": []} |
|
for file in files: |
|
path_parts = file.split('/') |
|
current = tree |
|
for i, part in enumerate(path_parts): |
|
if i == len(path_parts) - 1: |
|
current["children"].append({"type": "file", "path": file, "name": part}) |
|
else: |
|
found = False |
|
for child in current["children"]: |
|
if child["type"] == "directory" and child["name"] == part: |
|
current = child |
|
found = True |
|
break |
|
if not found: |
|
new_dir = {"type": "directory", "path": '/'.join(path_parts[:i+1]), "name": part, "children": []} |
|
current["children"].append(new_dir) |
|
current = new_dir |
|
return tree |
|
except Exception as e: |
|
print(f"Error in get_space_structure: {str(e)}") |
|
return {"error": f"API request error: {str(e)}"} |
|
|
|
|
|
def format_tree_structure(tree_data: Dict, indent: str = "") -> str: |
|
if "error" in tree_data: |
|
return tree_data["error"] |
|
formatted = f"{indent}{'๐' if tree_data.get('type') == 'directory' else '๐'} {tree_data.get('name', 'Unknown')}\n" |
|
if tree_data.get("type") == "directory": |
|
|
|
for child in sorted(tree_data.get("children", []), key=lambda x: (x.get("type", "") != "directory", x.get("name", ""))): |
|
formatted += format_tree_structure(child, indent + " ") |
|
return formatted |
|
|
|
|
|
def analyze_space(url: str, progress=gr.Progress()): |
|
""" |
|
HuggingFace Space์ app.py์ ํ์ผ๊ตฌ์กฐ ๋ฑ์ ๋ถ๋ฌ์์: |
|
1) ์ฝ๋ ์์ฝ |
|
2) ์ฝ๋ ๋ถ์ |
|
3) ์ฌ์ฉ๋ฒ |
|
๋ฑ์ ๋ฐํํฉ๋๋ค. |
|
""" |
|
try: |
|
space_id = url.split('spaces/')[-1] |
|
if not re.match(r'^[\w.-]+/[\w.-]+$', space_id): |
|
raise ValueError(f"Invalid Space ID format: {space_id}") |
|
|
|
progress(0.1, desc="ํ์ผ ๊ตฌ์กฐ ๋ถ์ ์ค...") |
|
tree_structure = get_space_structure(space_id) |
|
if "error" in tree_structure: |
|
raise ValueError(tree_structure["error"]) |
|
tree_view = format_tree_structure(tree_structure) |
|
|
|
progress(0.3, desc="app.py ๋ด์ฉ ๊ฐ์ ธ์ค๋ ์ค...") |
|
app_content = get_file_content(space_id, "app.py") |
|
|
|
progress(0.5, desc="์ฝ๋ ์์ฝ ์ค...") |
|
summary = summarize_code(app_content) |
|
|
|
progress(0.7, desc="์ฝ๋ ๋ถ์ ์ค...") |
|
analysis = analyze_code(app_content) |
|
|
|
progress(0.9, desc="์ฌ์ฉ๋ฒ ์ค๋ช
์์ฑ ์ค...") |
|
usage = explain_usage(app_content) |
|
|
|
|
|
lines_for_app_py = adjust_lines_for_code(app_content) |
|
|
|
progress(1.0, desc="์๋ฃ") |
|
return app_content, tree_view, tree_structure, space_id, summary, analysis, usage, lines_for_app_py |
|
|
|
except Exception as e: |
|
print(f"Error in analyze_space: {str(e)}") |
|
print(traceback.format_exc()) |
|
return f"์ค๋ฅ๊ฐ ๋ฐ์ํ์ต๋๋ค: {str(e)}", "", None, "", "", "", "", 10 |
|
|
|
|
|
def adjust_lines_for_code(code_content: str, min_lines: int = 10, max_lines: int = 100) -> int: |
|
""" |
|
์ฝ๋์ ์ค ์์ ๋ง์ถฐ ํ์ํ lines ์๋ฅผ ๋์ ์ผ๋ก ์กฐ์ ํฉ๋๋ค. |
|
""" |
|
num_lines = len(code_content.split('\n')) |
|
return min(max(num_lines, min_lines), max_lines) |
|
|
|
|
|
|
|
|
|
|
|
from gradio import ChatMessage |
|
|
|
def format_chat_history(messages: List[ChatMessage]) -> List[Dict]: |
|
""" |
|
ChatMessage ๋ชฉ๋ก์ Gemini ๋ชจ๋ธ์ด ์ดํดํ ์ ์๋ ํ์์ผ๋ก ๋ณํ |
|
(Thinking ๋ฉํ๋ฐ์ดํฐ ํฌํจ ๋ฉ์์ง๋ ๋ฌด์) |
|
""" |
|
formatted = [] |
|
for m in messages: |
|
|
|
if hasattr(m, "metadata") and m.metadata: |
|
continue |
|
role = "assistant" if m.role == "assistant" else "user" |
|
formatted.append({"role": role, "parts": [m.content or ""]}) |
|
return formatted |
|
|
|
|
|
def gemini_chat_completion(system_message: str, user_message: str, max_tokens: int = 200, temperature: float = 0.7) -> str: |
|
""" |
|
์์คํ
๋ฉ์์ง์ ์ ์ ๋ฉ์์ง๋ฅผ ๋ฐ์ Gemini์ ์คํธ๋ฆฌ๋ฐ ์์ฒญ, |
|
์ต์ข
์๋ต ํ
์คํธ๋ฅผ ๋ฐํํฉ๋๋ค. |
|
""" |
|
init_msgs = [ |
|
ChatMessage(role="system", content=system_message), |
|
ChatMessage(role="user", content=user_message) |
|
] |
|
chat_history = format_chat_history(init_msgs) |
|
chat = model.start_chat(history=chat_history) |
|
final = "" |
|
try: |
|
for chunk in chat.send_message(user_message, stream=True): |
|
parts = chunk.candidates[0].content.parts |
|
if len(parts) == 2: |
|
|
|
final += parts[1].text |
|
else: |
|
final += parts[0].text |
|
return final.strip() |
|
except Exception as e: |
|
return f"LLM ํธ์ถ ์ค ์ค๋ฅ ๋ฐ์: {str(e)}" |
|
|
|
|
|
def summarize_code(app_content: str): |
|
system_msg = "๋น์ ์ Python ์ฝ๋๋ฅผ ๋ถ์ํ๊ณ ์์ฝํ๋ AI ์กฐ์์
๋๋ค. ์ฃผ์ด์ง ์ฝ๋๋ฅผ 3์ค ์ด๋ด๋ก ๊ฐ๊ฒฐํ๊ฒ ์์ฝํด์ฃผ์ธ์." |
|
user_msg = f"๋ค์ Python ์ฝ๋๋ฅผ 3์ค ์ด๋ด๋ก ์์ฝํด์ฃผ์ธ์:\n\n{app_content}" |
|
try: |
|
return gemini_chat_completion(system_msg, user_msg, max_tokens=200, temperature=0.7) |
|
except Exception as e: |
|
return f"์์ฝ ์์ฑ ์ค ์ค๋ฅ ๋ฐ์: {str(e)}" |
|
|
|
|
|
def analyze_code(app_content: str): |
|
|
|
system_msg = ( |
|
"You are a deep thinking AI. You may use extremely long chains of thought to deeply consider the problem " |
|
"and deliberate with yourself via systematic reasoning processes to help come to a correct solution prior to answering. " |
|
"You should enclose your thoughts and internal monologue inside tags, and then provide your solution or response to the problem. " |
|
"๋น์ ์ Python ์ฝ๋๋ฅผ ๋ถ์ํ๋ AI ์กฐ์์
๋๋ค. ์ฃผ์ด์ง ์ฝ๋๋ฅผ ๋ถ์ํ์ฌ ์๋น์ค์ ํจ์ฉ์ฑ๊ณผ ํ์ฉ ์ธก๋ฉด์์ ๋ค์ ํญ๋ชฉ์ ๋ํด ์ค๋ช
ํด์ฃผ์ธ์:\n" |
|
"A. ๋ฐฐ๊ฒฝ ๋ฐ ํ์์ฑ\n" |
|
"B. ๊ธฐ๋ฅ์ ํจ์ฉ์ฑ ๋ฐ ๊ฐ์น\n" |
|
"C. ํน์ฅ์ \n" |
|
"D. ์ ์ฉ ๋์ ๋ฐ ํ๊ฒ\n" |
|
"E. ๊ธฐ๋ํจ๊ณผ\n" |
|
"๊ธฐ์กด ๋ฐ ์ ์ฌ ํ๋ก์ ํธ์ ๋น๊ตํ์ฌ ๋ถ์ํด์ฃผ์ธ์. Markdown ํ์์ผ๋ก ์ถ๋ ฅํ์ธ์." |
|
) |
|
user_msg = f"๋ค์ Python ์ฝ๋๋ฅผ ๋ถ์ํด์ฃผ์ธ์:\n\n{app_content}" |
|
try: |
|
return gemini_chat_completion(system_msg, user_msg, max_tokens=1000, temperature=0.7) |
|
except Exception as e: |
|
return f"๋ถ์ ์์ฑ ์ค ์ค๋ฅ ๋ฐ์: {str(e)}" |
|
|
|
|
|
def explain_usage(app_content: str): |
|
|
|
system_msg = ( |
|
"You are a deep thinking AI. You may use extremely long chains of thought to deeply consider the problem " |
|
"and deliberate with yourself via systematic reasoning processes to help come to a correct solution prior to answering. " |
|
"You should enclose your thoughts and internal monologue inside tags, and then provide your solution or response to the problem. " |
|
"๋น์ ์ Python ์ฝ๋๋ฅผ ๋ถ์ํ์ฌ ์ฌ์ฉ๋ฒ์ ์ค๋ช
ํ๋ AI ์กฐ์์
๋๋ค. ์ฃผ์ด์ง ์ฝ๋๋ฅผ ๋ฐํ์ผ๋ก ๋ง์น ํ๋ฉด์ ๋ณด๋ ๊ฒ์ฒ๋ผ ์ฌ์ฉ๋ฒ์ ์์ธํ ์ค๋ช
ํด์ฃผ์ธ์. Markdown ํ์์ผ๋ก ์ถ๋ ฅํ์ธ์." |
|
) |
|
user_msg = f"๋ค์ Python ์ฝ๋์ ์ฌ์ฉ๋ฒ์ ์ค๋ช
ํด์ฃผ์ธ์:\n\n{app_content}" |
|
try: |
|
return gemini_chat_completion(system_msg, user_msg, max_tokens=800, temperature=0.7) |
|
except Exception as e: |
|
return f"์ฌ์ฉ๋ฒ ์ค๋ช
์์ฑ ์ค ์ค๋ฅ ๋ฐ์: {str(e)}" |
|
|
|
|
|
def stream_gemini_response(user_message: str, conversation_state: List[ChatMessage]) -> Iterator[List[ChatMessage]]: |
|
""" |
|
conversation_state: ChatMessage ๊ฐ์ฒด๋ก๋ง ์ด๋ฃจ์ด์ง '๋ํ ์ด๋ ฅ' (Gradio State). |
|
(์์ ) ๋น ๋ฌธ์์ด์ด์ด๋ ์ฒ๋ฆฌํ๋๋ก ๋ณ๊ฒฝ. ์๋ฌ๋ฅผ ๋์ฐ์ง ์์. |
|
""" |
|
|
|
|
|
|
|
|
|
print(f"\n=== New Request ===\nUser message: {user_message if user_message.strip() else '(Empty)'}") |
|
|
|
|
|
chat_history = format_chat_history(conversation_state) |
|
chat = model.start_chat(history=chat_history) |
|
|
|
response = chat.send_message(user_message, stream=True) |
|
thought_buffer = "" |
|
response_buffer = "" |
|
thinking_complete = False |
|
|
|
|
|
conversation_state.append( |
|
ChatMessage( |
|
role="assistant", |
|
content="", |
|
metadata={"title": "โ๏ธ Thinking: *The thoughts produced by the model are experimental"} |
|
) |
|
) |
|
|
|
try: |
|
for chunk in response: |
|
parts = chunk.candidates[0].content.parts |
|
current_chunk = parts[0].text |
|
|
|
if len(parts) == 2 and not thinking_complete: |
|
thought_buffer += current_chunk |
|
print(f"\n=== Complete Thought ===\n{thought_buffer}") |
|
conversation_state[-1] = ChatMessage( |
|
role="assistant", |
|
content=thought_buffer, |
|
metadata={"title": "โ๏ธ Thinking: *The thoughts produced by the model are experimental"} |
|
) |
|
yield conversation_state |
|
|
|
response_buffer = parts[1].text |
|
print(f"\n=== Starting Response ===\n{response_buffer}") |
|
conversation_state.append( |
|
ChatMessage(role="assistant", content=response_buffer) |
|
) |
|
thinking_complete = True |
|
|
|
elif thinking_complete: |
|
response_buffer += current_chunk |
|
print(f"\n=== Response Chunk ===\n{current_chunk}") |
|
conversation_state[-1] = ChatMessage( |
|
role="assistant", |
|
content=response_buffer |
|
) |
|
else: |
|
thought_buffer += current_chunk |
|
print(f"\n=== Thinking Chunk ===\n{current_chunk}") |
|
conversation_state[-1] = ChatMessage( |
|
role="assistant", |
|
content=thought_buffer, |
|
metadata={"title": "โ๏ธ Thinking: *The thoughts produced by the model are experimental"} |
|
) |
|
yield conversation_state |
|
|
|
print(f"\n=== Final Response ===\n{response_buffer}") |
|
|
|
except Exception as e: |
|
print(f"\n=== Error ===\n{str(e)}") |
|
conversation_state.append( |
|
ChatMessage( |
|
role="assistant", |
|
content=f"I apologize, but I encountered an error: {str(e)}" |
|
) |
|
) |
|
yield conversation_state |
|
|
|
|
|
def convert_to_display_tuples(messages: List[ChatMessage]) -> List[Tuple[str, str]]: |
|
""" |
|
ํ๋ฉด์ ํ์ํ๊ธฐ ์ํด (user, assistant) ํํ ๋ชฉ๋ก์ผ๋ก ๋ณํ |
|
""" |
|
result = [] |
|
i = 0 |
|
while i < len(messages): |
|
if messages[i].role == "user": |
|
user_text = messages[i].content |
|
assistant_text = "" |
|
if i + 1 < len(messages) and messages[i+1].role == "assistant": |
|
assistant_text = messages[i+1].content |
|
i += 2 |
|
else: |
|
i += 1 |
|
result.append((user_text, assistant_text)) |
|
else: |
|
|
|
result.append(("", messages[i].content)) |
|
i += 1 |
|
return result |
|
|
|
|
|
def user_submit_message(msg: str, conversation_state: List[ChatMessage]): |
|
""" |
|
์ฌ์ฉ์๊ฐ ๋ฉ์์ง๋ฅผ ์
๋ ฅํ ๋ ํธ์ถ. |
|
ChatMessage ๋ฆฌ์คํธ(conversation_state)์ user ๋ฉ์์ง๋ฅผ ์ถ๊ฐํ ๋ค ๋ฐํ. |
|
""" |
|
conversation_state.append(ChatMessage(role="user", content=msg)) |
|
|
|
return "", conversation_state |
|
|
|
|
|
def respond_wrapper(message: str, conversation_state: List[ChatMessage], max_tokens, temperature, top_p): |
|
""" |
|
์ ์ ๋ฉ์์ง๋ฅผ ๋ฐ์ Gemini์๊ฒ ์์ฒญ(์คํธ๋ฆฌ๋ฐ)ํ๊ณ , ๋ํ ์ด๋ ฅ์ ์
๋ฐ์ดํธ ํ |
|
ํ๋ฉด์๋ (user, assistant) ํํ์ ๋ฐํํ๋ค. |
|
""" |
|
for updated_messages in stream_gemini_response(message, conversation_state): |
|
|
|
yield "", convert_to_display_tuples(updated_messages) |
|
|
|
|
|
def create_ui(): |
|
""" |
|
Gradio UI๋ฅผ ๊ตฌ์ฑํ๋ ํจ์ |
|
""" |
|
try: |
|
css = """ |
|
footer {visibility: hidden;} |
|
""" |
|
|
|
with gr.Blocks(css=css) as demo: |
|
gr.Markdown("# MOUSE: Space Research Thinking") |
|
|
|
with gr.Tabs(): |
|
with gr.TabItem("๋ถ์"): |
|
with gr.Row(): |
|
with gr.Column(): |
|
url_input = gr.Textbox(label="HuggingFace Space URL") |
|
analyze_button = gr.Button("๋ถ์") |
|
|
|
summary_output = gr.Markdown(label="์์ฝ") |
|
analysis_output = gr.Markdown(label="๋ถ์") |
|
usage_output = gr.Markdown(label="์ฌ์ฉ๋ฒ") |
|
tree_view_output = gr.Textbox(label="ํ์ผ ๊ตฌ์กฐ", lines=20) |
|
|
|
with gr.Column(): |
|
code_tabs = gr.Tabs() |
|
with code_tabs: |
|
with gr.TabItem("app.py"): |
|
app_py_content = gr.Code( |
|
language="python", |
|
label="app.py", |
|
lines=50 |
|
) |
|
with gr.TabItem("requirements.txt"): |
|
requirements_content = gr.Textbox( |
|
label="requirements.txt", |
|
lines=50 |
|
) |
|
|
|
with gr.TabItem("AI ์ฝ๋์ฑ"): |
|
gr.Markdown("## ์์ ๋ฅผ ์
๋ ฅ ๋๋ ์์ค ์ฝ๋๋ฅผ ๋ถ์ฌ๋ฃ๊ณ ์ง๋ฌธํ์ธ์") |
|
|
|
|
|
chatbot = gr.Chatbot( |
|
label="๋ํ", |
|
height=400 |
|
) |
|
|
|
msg = gr.Textbox( |
|
label="๋ฉ์์ง", |
|
placeholder="๋ฉ์์ง๋ฅผ ์
๋ ฅํ์ธ์..." |
|
) |
|
|
|
|
|
max_tokens = gr.Slider( |
|
minimum=1, maximum=8000, |
|
value=4000, label="Max Tokens", |
|
visible=False |
|
) |
|
temperature = gr.Slider( |
|
minimum=0, maximum=1, |
|
value=0.7, label="Temperature", |
|
visible=False |
|
) |
|
top_p = gr.Slider( |
|
minimum=0, maximum=1, |
|
value=0.9, label="Top P", |
|
visible=False |
|
) |
|
|
|
examples = [ |
|
["์์ธํ ์ฌ์ฉ ๋ฐฉ๋ฒ์ 4000 ํ ํฐ ์ด์ ์์ธํ ์ค๋ช
"], |
|
["FAQ 20๊ฑด์ 4000 ํ ํฐ ์ด์ ์์ฑ"], |
|
["๊ธฐ์ ์ฐจ๋ณ์ , ๊ฐ์ ์ ์ค์ฌ์ผ๋ก 4000 ํ ํฐ ์ด์ ์ค๋ช
"], |
|
["ํนํ ์ถ์์ ํ์ฉ ๊ฐ๋ฅํ ํ์ ์์ด๋์ด๋ฅผ 4000 ํ ํฐ ์ด์ ์์ฑ"], |
|
["๋
ผ๋ฌธ ํ์์ผ๋ก 4000 ํ ํฐ ์ด์ ์์ฑ"], |
|
["๊ณ์ ์ด์ด์ ๋ต๋ณํ๋ผ"] |
|
] |
|
gr.Examples(examples, inputs=msg) |
|
|
|
|
|
conversation_state = gr.State([]) |
|
|
|
|
|
|
|
|
|
msg.submit( |
|
user_submit_message, |
|
inputs=[msg, conversation_state], |
|
outputs=[msg, conversation_state], |
|
queue=False |
|
).then( |
|
respond_wrapper, |
|
inputs=[msg, conversation_state, max_tokens, temperature, top_p], |
|
outputs=[msg, chatbot], |
|
) |
|
|
|
with gr.TabItem("Recommended Best"): |
|
gr.Markdown( |
|
"Discover recommended HuggingFace Spaces [here](https://huggingface.co/spaces/openfree/Korean-Leaderboard)." |
|
) |
|
|
|
|
|
space_id_state = gr.State() |
|
tree_structure_state = gr.State() |
|
app_py_content_lines = gr.State() |
|
|
|
analyze_button.click( |
|
analyze_space, |
|
inputs=[url_input], |
|
outputs=[ |
|
app_py_content, |
|
tree_view_output, |
|
tree_structure_state, |
|
space_id_state, |
|
summary_output, |
|
analysis_output, |
|
usage_output, |
|
app_py_content_lines |
|
] |
|
).then( |
|
lambda space_id: get_file_content(space_id, "requirements.txt"), |
|
inputs=[space_id_state], |
|
outputs=[requirements_content] |
|
).then( |
|
lambda lines: gr.update(lines=lines), |
|
inputs=[app_py_content_lines], |
|
outputs=[app_py_content] |
|
) |
|
|
|
return demo |
|
|
|
except Exception as e: |
|
print(f"Error in create_ui: {str(e)}") |
|
print(traceback.format_exc()) |
|
raise |
|
|
|
|
|
if __name__ == "__main__": |
|
try: |
|
print("Starting HuggingFace Space Analyzer...") |
|
demo = create_ui() |
|
print("UI created successfully.") |
|
print("Configuring Gradio queue...") |
|
demo.queue() |
|
print("Gradio queue configured.") |
|
print("Launching Gradio app...") |
|
demo.launch( |
|
server_name="0.0.0.0", |
|
server_port=7860, |
|
share=False, |
|
debug=True, |
|
show_api=False |
|
) |
|
print("Gradio app launched successfully.") |
|
except Exception as e: |
|
print(f"Error in main: {str(e)}") |
|
print("Detailed error information:") |
|
print(traceback.format_exc()) |
|
raise |
|
|