Spaces:
Sleeping
Sleeping
import streamlit as st | |
import requests | |
import os | |
import uuid | |
# === CONFIG === | |
API_URL = os.getenv("API_URL", "http://localhost:8000") | |
DEFAULT_HEADERS = {} | |
st.set_page_config(page_title="π§ LLaMA Local AI Agent", layout="wide") | |
# === SESSION STATE INIT === | |
if "token" not in st.session_state: | |
st.session_state.token = "" | |
if "session_id" not in st.session_state: | |
st.session_state.session_id = str(uuid.uuid4()) | |
# === AUTH HELPERS === | |
def auth_headers(): | |
headers = DEFAULT_HEADERS.copy() | |
if st.session_state.token: | |
headers["Authorization"] = f"Bearer {st.session_state.token}" | |
return headers | |
def post(endpoint, data=None, files=None, json=None): | |
try: | |
res = requests.post(f"{API_URL}{endpoint}", data=data, files=files, json=json, headers=auth_headers()) | |
res.raise_for_status() | |
return res.json() | |
except Exception as e: | |
st.error(f"β API POST Error: {e}") | |
return None | |
def get(endpoint, params=None): | |
try: | |
res = requests.get(f"{API_URL}{endpoint}", params=params, headers=auth_headers()) | |
res.raise_for_status() | |
return res.json() | |
except Exception as e: | |
st.error(f"β API GET Error: {e}") | |
return None | |
# === SIDEBAR AUTH === | |
st.sidebar.title("π Authentication") | |
# === LOGIN === | |
st.sidebar.subheader("Login") | |
username = st.sidebar.text_input("Username", key="login_username") | |
password = st.sidebar.text_input("Password", type="password", key="login_password") | |
if st.sidebar.button("Login"): | |
login_response = post("/login", data={"username": username, "password": password}) | |
if login_response and "access_token" in login_response: | |
st.session_state.token = login_response["access_token"] | |
st.success("β Logged in!") | |
st.rerun() | |
else: | |
st.error("β Login failed. Check credentials.") | |
# === REGISTER === | |
st.sidebar.markdown("---") | |
st.sidebar.subheader("π New here? Register") | |
new_username = st.sidebar.text_input("New Username", key="reg_username") | |
new_password = st.sidebar.text_input("New Password", type="password", key="reg_password") | |
confirm_password = st.sidebar.text_input("Confirm Password", type="password", key="reg_confirm") | |
role = st.sidebar.selectbox("Role", ["user", "admin"], key="reg_role") | |
if st.sidebar.button("Register"): | |
if new_password != confirm_password: | |
st.sidebar.error("β Passwords do not match.") | |
elif not new_username or not new_password: | |
st.sidebar.warning("Please fill all fields.") | |
else: | |
reg_payload = { | |
"username": new_username, | |
"password": new_password, | |
"role": role | |
} | |
reg_result = post("/register", json=reg_payload) | |
if reg_result and "access_token" in reg_result: | |
st.session_state.token = reg_result["access_token"] | |
st.sidebar.success("β Registered & Logged in!") | |
st.rerun() | |
else: | |
st.sidebar.error("β Registration failed (User may exist).") | |
# === IF LOGGED IN === | |
if st.session_state.token: | |
tabs = st.tabs(["π¬ Chat", "π File Q&A", "π§ Tools", "πΌοΈ Image AI", "π€ Voice", "π€ Agent", "π§ Email"]) | |
# === π¬ Chat Tab === | |
with tabs[0]: | |
st.header("π¬ Chat with AI") | |
user_input = st.text_input("Ask something...") | |
if st.button("Send", key="chat"): | |
if user_input: | |
result = post("/chat", json={ | |
"session_id": st.session_state.session_id, | |
"user_message": user_input | |
}) | |
if result: | |
st.markdown(f"**AI:** {result.get('bot_response')}") | |
# === π File Upload and QA Tab === | |
with tabs[1]: | |
st.header("π Upload & Ask") | |
file = st.file_uploader("Upload PDF, TXT or image", type=["pdf", "txt", "jpg", "jpeg", "png"]) | |
if file: | |
st.success(f"β Uploaded: {file.name}") | |
if file.type.startswith("image/"): | |
st.image(file) | |
with st.spinner("Processing..."): | |
caption = post("/image-caption", files={"file": file}) | |
ocr = post("/ocr", files={"file": file}) | |
if caption: | |
st.info(f"πΌοΈ Caption: {caption.get('caption')}") | |
if ocr: | |
st.info(f"π OCR: {ocr.get('text')}") | |
else: | |
upload_res = post("/upload", files={"file": file}) | |
if upload_res: | |
question = st.text_input("Ask a question about the file") | |
if st.button("Ask"): | |
answer = get("/query_file", params={"filename": file.name, "question": question}) | |
if answer: | |
st.success(f"π‘ Answer: {answer.get('answer')}") | |
# === π§ Tools Tab === | |
with tabs[2]: | |
st.header("π οΈ AI Tools") | |
tool_input = st.text_input("Enter input") | |
tool = st.selectbox("Select Tool", ["calc", "time", "joke", "search", "browse", "exec_python"]) | |
if st.button("Run Tool"): | |
result = post(f"/tool/{tool}", json={"prompt": tool_input}) | |
if result: | |
st.markdown(f"π§ Result: {result.get('result')}") | |
# === πΌοΈ Image AI Tab === | |
with tabs[3]: | |
st.header("πΌοΈ Caption Any Image") | |
img_file = st.file_uploader("Upload an image", type=["jpg", "jpeg", "png"]) | |
if img_file: | |
st.image(img_file, width=400) | |
if st.button("Generate Caption"): | |
caption = post("/caption_image", files={"file": img_file}) | |
if caption: | |
st.success(f"π Caption: {caption.get('caption')}") | |
# === π€ Voice Tab === | |
with tabs[4]: | |
st.header("π€ Voice Chat with AI") | |
voice_file = st.file_uploader("Upload MP3 or WAV", type=["mp3", "wav"]) | |
if voice_file: | |
st.audio(voice_file) | |
transcribed = post("/transcribe", files={"file": voice_file}) | |
if transcribed: | |
st.success(f"π Transcription: {transcribed.get('transcription')}") | |
if st.button("Respond"): | |
response = post("/chat", json={"message": transcribed["transcription"]}) | |
if response: | |
reply = response.get("response") | |
st.success(f"π£οΈ AI says: {reply}") | |
tts = requests.post(f"{API_URL}/speak", params={"text": reply}, headers=auth_headers()) | |
if tts.status_code == 200: | |
st.audio(tts.content) | |
else: | |
st.error("β TTS failed.") | |
# === π€ Agent Tab === | |
with tabs[5]: | |
st.header("π€ AI Agent") | |
task = st.text_input("Enter an agent task") | |
if st.button("Run Agent"): | |
result = post("/agent", json={"prompt": task}) | |
if result: | |
st.markdown(f"π οΈ Result: {result.get('result')}") | |
st.divider() | |
if st.button("π Export Chat History"): | |
history = get("/history/export") | |
if history: | |
st.text_area("History", history.get("text", ""), height=300) | |
# === π§ Email Generator Tab === | |
with tabs[6]: | |
st.header("π§ Promo Email Builder") | |
product = st.text_input("Product") | |
recipient = st.text_input("Recipient Email") | |
discount = st.slider("Discount (%)", 5, 80, 15) | |
if st.button("Generate Email"): | |
email = get("/generate_email", params={"to": recipient, "product": product, "discount": discount}) | |
if email: | |
st.code(email.get("email")) | |
else: | |
st.warning("π Please login or register using the sidebar to access the AI features.") | |