Spaces:
Runtime error
Runtime error
import asyncio | |
import shutil | |
from pathlib import Path | |
import gradio as gr | |
from gradio.oauth import attach_oauth, OAuthToken | |
from huggingface_hub import HfApi | |
from src.team import TeamChatSession | |
from src.db import list_sessions_info | |
from src.config import UPLOAD_DIR | |
# Store active chat sessions | |
_SESSIONS: dict[tuple[str, str], TeamChatSession] = {} | |
_API = HfApi() | |
def _username(token: OAuthToken) -> str: | |
"""Return the username for the given token.""" | |
info = _API.whoami(token.token) | |
return info.get("name") or info.get("user", "unknown") | |
async def _get_chat(user: str, session: str) -> TeamChatSession: | |
"""Return an active :class:`TeamChatSession` for ``user`` and ``session``.""" | |
key = (user, session) | |
chat = _SESSIONS.get(key) | |
if chat is None: | |
chat = TeamChatSession(user=user, session=session) | |
await chat.__aenter__() | |
_SESSIONS[key] = chat | |
return chat | |
def _vm_host_path(user: str, vm_path: str) -> Path: | |
rel = Path(vm_path).relative_to("/data") | |
base = (Path(UPLOAD_DIR) / user).resolve() | |
target = (base / rel).resolve() | |
if not target.is_relative_to(base): | |
raise ValueError("Invalid path") | |
return target | |
async def send_message(message: str, history: list[tuple[str, str]], session: str, token: OAuthToken): | |
user = _username(token) | |
chat = await _get_chat(user, session) | |
history = history or [] | |
history.append((message, "")) | |
async for part in chat.chat_stream(message): | |
history[-1] = (message, history[-1][1] + part) | |
yield history | |
def load_sessions(token: OAuthToken): | |
user = _username(token) | |
infos = list_sessions_info(user) | |
names = [info["name"] for info in infos] | |
table = [[info["name"], info["last_message"]] for info in infos] | |
value = names[0] if names else "default" | |
return gr.update(choices=names or ["default"], value=value), table | |
def list_dir(path: str, token: OAuthToken): | |
user = _username(token) | |
target = _vm_host_path(user, path) | |
if not target.exists() or not target.is_dir(): | |
return [] | |
entries = [] | |
for entry in sorted(target.iterdir()): | |
entries.append({"name": entry.name, "is_dir": entry.is_dir()}) | |
return entries | |
def read_file(path: str, token: OAuthToken): | |
user = _username(token) | |
target = _vm_host_path(user, path) | |
if not target.exists(): | |
return "File not found" | |
if target.is_dir(): | |
return "Path is a directory" | |
try: | |
return target.read_text() | |
except UnicodeDecodeError: | |
return "Binary file not supported" | |
def save_file(path: str, content: str, token: OAuthToken): | |
user = _username(token) | |
target = _vm_host_path(user, path) | |
target.parent.mkdir(parents=True, exist_ok=True) | |
target.write_text(content) | |
return "Saved" | |
def delete_path(path: str, token: OAuthToken): | |
user = _username(token) | |
target = _vm_host_path(user, path) | |
if target.is_dir(): | |
shutil.rmtree(target) | |
elif target.exists(): | |
target.unlink() | |
else: | |
return "File not found" | |
return "Deleted" | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
attach_oauth(demo.app) | |
login_btn = gr.LoginButton() | |
with gr.Tab("Chat"): | |
session_dd = gr.Dropdown(["default"], label="Session", value="default") | |
refresh = gr.Button("Refresh Sessions") | |
chatbox = gr.Chatbot(type="messages") | |
msg = gr.Textbox(label="Message") | |
send = gr.Button("Send") | |
with gr.Tab("Files"): | |
dir_path = gr.Textbox(label="Directory", value="/data") | |
list_btn = gr.Button("List") | |
table = gr.Dataframe(headers=["name", "is_dir"], datatype=["str", "bool"]) | |
file_path = gr.Textbox(label="File Path") | |
load_btn = gr.Button("Load") | |
content = gr.Code(label="Content", language=None) | |
save_btn = gr.Button("Save") | |
del_btn = gr.Button("Delete") | |
refresh.click(load_sessions, outputs=[session_dd, table]) | |
send_click = send.click( | |
send_message, | |
inputs=[msg, chatbox, session_dd], | |
outputs=chatbox, | |
) | |
list_btn.click(list_dir, inputs=dir_path, outputs=table) | |
load_btn.click(read_file, inputs=file_path, outputs=content) | |
save_btn.click(save_file, inputs=[file_path, content], outputs=content) | |
del_btn.click(delete_path, inputs=file_path, outputs=content) | |
send_click.then(lambda: "", None, msg) | |
demo.queue() | |
if __name__ == "__main__": | |
demo.launch() | |