BookSumBeta / app.py
npc0's picture
Update app.py
00261f9 verified
raw
history blame
20.3 kB
import os
import json
import hashlib
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
import openai
import gradio as gr
from epub2txt import epub2txt
class GUI:
def __init__(self, *args, **kwargs):
# Configuration - restored your original settings
self.model_name = os.getenv("POE_MODEL", "GPT-5-mini")
self.prompt = os.getenv("prompt", "Summarize the following text:")
self.client = None
self.api_key = os.getenv("POE_API_KEY")
self.current_user = None
self.keys_file = "user_keys.json"
with gr.Blocks(title="ePub Summarizer", css=self._get_css()) as demo:
with gr.Row():
welcome_md = gr.Markdown()
# Updated for Gradio 5 - removed LogoutButton (LoginButton transforms automatically)
login_btn = gr.LoginButton()
refresh_btn = gr.Button("🔄 Refresh", size="sm", variant="secondary")
# API Key input section (shown after login)
api_key_section = gr.Column(visible=False)
with api_key_section:
gr.Markdown("""
### Poe API Key Setup
To use this tool, you need a Poe API key:
1. Visit [https://poe.com/api_key](https://poe.com/api_key)
2. If you don't have an account, create one first
3. Generate a new API key or copy your existing one
4. Paste it in the field below
**Security**: Your API key will be encrypted and tied to your login account.
""")
api_key_input = gr.Textbox(
label="Poe API Key",
placeholder="Enter your Poe API key here...",
type="password"
)
with gr.Row():
remember_key = gr.Checkbox(
label="Remember my API key (encrypted storage)",
value=True,
info="Your key will be encrypted and saved for future sessions"
)
api_key_btn = gr.Button("Set API Key", variant="primary")
clear_key_btn = gr.Button("Clear Saved Key", variant="secondary", visible=False)
out = gr.Markdown()
inp = gr.File(file_types=['.epub'], visible=False, label="Upload ePub File")
# Event handlers - Fixed for Gradio 5
demo.load(
fn=self.on_load,
outputs=[welcome_md, api_key_section, inp, clear_key_btn]
)
# Direct login button event handling
login_btn.click(
fn=self.handle_login_state_change,
outputs=[welcome_md, api_key_section, inp, clear_key_btn]
)
# Manual refresh button for debugging
refresh_btn.click(
fn=self.handle_login_state_change,
outputs=[welcome_md, api_key_section, inp, clear_key_btn]
)
api_key_btn.click(
self.set_api_key,
inputs=[api_key_input, remember_key],
outputs=[out, inp, api_key_section, clear_key_btn]
)
clear_key_btn.click(
self.clear_saved_key,
outputs=[out, api_key_section, clear_key_btn, api_key_input]
)
# Your original ePub processing
inp.change(self.process, inputs=[inp], outputs=[out])
# Updated launch for Gradio 5
demo.queue().launch(
share=True,
ssr_mode=False, # Disable SSR to avoid i18n issues
show_error=True
)
def _get_css(self):
"""Custom CSS for better styling"""
return """
.gradio-container {
max-width: 1200px !important;
}
.epub-processor {
border: 2px dashed #ccc;
border-radius: 10px;
padding: 20px;
text-align: center;
}
.progress-display {
background: #f8f9fa;
padding: 15px;
border-radius: 8px;
margin: 10px 0;
}
"""
def _get_user_id(self, request):
"""Generate a unique user ID from request - Updated for Gradio 5"""
if not hasattr(request, 'username') or not request.username:
return None
# Use username as identifier
identifier = request.username
return hashlib.sha256(identifier.encode()).hexdigest()
def _generate_key_from_user(self, user_id):
"""Generate encryption key from user ID"""
# Use user ID as salt for key derivation
salt = user_id.encode()[:32].ljust(32, b'0') # Ensure 32 bytes
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
)
key = base64.urlsafe_b64encode(kdf.derive(user_id.encode()))
return Fernet(key)
def _load_user_keys(self):
"""Load encrypted user keys from file"""
try:
if os.path.exists(self.keys_file):
with open(self.keys_file, 'r') as f:
return json.load(f)
except Exception as e:
print(f"Error loading user keys: {e}")
return {}
def _save_user_keys(self, keys_data):
"""Save encrypted user keys to file"""
try:
with open(self.keys_file, 'w') as f:
json.dump(keys_data, f)
except Exception as e:
print(f"Error saving user keys: {e}")
def _get_saved_key(self, user_id):
"""Retrieve and decrypt user's API key"""
try:
keys_data = self._load_user_keys()
if user_id in keys_data:
cipher_suite = self._generate_key_from_user(user_id)
encrypted_key = base64.urlsafe_b64decode(keys_data[user_id].encode())
return cipher_suite.decrypt(encrypted_key).decode()
except Exception as e:
print(f"Error retrieving saved key: {e}")
return None
def _save_encrypted_key(self, user_id, api_key):
"""Encrypt and save user's API key"""
try:
keys_data = self._load_user_keys()
cipher_suite = self._generate_key_from_user(user_id)
encrypted_key = cipher_suite.encrypt(api_key.encode())
keys_data[user_id] = base64.urlsafe_b64encode(encrypted_key).decode()
self._save_user_keys(keys_data)
return True
except Exception as e:
print(f"Error saving encrypted key: {e}")
return False
def _delete_saved_key(self, user_id):
"""Delete user's saved API key"""
try:
keys_data = self._load_user_keys()
if user_id in keys_data:
del keys_data[user_id]
self._save_user_keys(keys_data)
return True
except Exception as e:
print(f"Error deleting saved key: {e}")
return False
def on_load(self, request: gr.Request = None):
"""Handle initial page load - Updated for Gradio 5"""
print(f"Page load - Username: {getattr(request, 'username', None)}") # Debug
return self.handle_login_state_change(request)
def handle_login_state_change(self, request: gr.Request = None):
"""Handle login state changes - works for both load and login button click"""
try:
print(f"Login state check - Username: {getattr(request, 'username', None)}") # Debug
print(f"Request attrs: {[attr for attr in dir(request) if not attr.startswith('_')]}") # Debug
if hasattr(request, 'username') and request.username:
return self.handle_user_logged_in(request)
else:
return self.handle_user_not_logged_in()
except Exception as e:
print(f"Error in login state change: {e}")
return self.handle_user_not_logged_in()
def check_login_state(self, request: gr.Request = None):
"""Check login state - with debug info"""
return self.handle_login_state_change(request)
def handle_user_not_logged_in(self):
"""Handle when user is not logged in"""
return (
gr.update(value='# ePub Summarization Tool\n\nPlease login with Hugging Face to access the tool.\n\n*If you just logged in, click the 🔄 Refresh button above.*'),
gr.update(visible=False), # api_key_section
gr.update(visible=False), # inp
gr.update(visible=False) # clear_key_btn
)
def handle_user_logged_in(self, request):
"""Handle when user is logged in"""
if not self.current_user or self.current_user != self._get_user_id(request):
# New login or different user
self.current_user = self._get_user_id(request)
user_name = request.username
# Check if user has a saved API key
saved_key = self._get_saved_key(self.current_user)
if saved_key:
self.api_key = saved_key
if self._initialize_client():
return (
gr.update(value=f'# ePub Summarization Tool\n\nWelcome back {user_name}! ✅ Your saved API key is loaded and ready.'),
gr.update(visible=False), # api_key_section
gr.update(visible=True), # inp
gr.update(visible=True) # clear_key_btn
)
# No saved key or failed to initialize
return (
gr.update(value=f'# ePub Summarization Tool\n\nWelcome {user_name}! Please set up your Poe API key below.'),
gr.update(visible=True), # api_key_section
gr.update(visible=False), # inp
gr.update(visible=False) # clear_key_btn
)
# User already processed, return current state
if self.client:
return (
gr.update(value=f'# ePub Summarization Tool\n\nWelcome {request.username}! ✅ Ready to process ePub files.'),
gr.update(visible=False), # api_key_section
gr.update(visible=True), # inp
gr.update(visible=True) # clear_key_btn
)
else:
return (
gr.update(value=f'# ePub Summarization Tool\n\nWelcome {request.username}! Please set up your Poe API key below.'),
gr.update(visible=True), # api_key_section
gr.update(visible=False), # inp
gr.update(visible=False) # clear_key_btn
)
def _initialize_client(self):
"""Initialize the Poe API client"""
try:
self.client = openai.OpenAI(
api_key=self.api_key,
base_url="https://api.poe.com/v1",
)
return True
except Exception as e:
print(f"Error initializing Poe client: {e}")
return False
def set_api_key(self, api_key, remember_key):
"""Set and validate the API key"""
if not self.current_user:
return (
gr.update(value="❌ Please login first."),
gr.update(visible=False),
gr.update(visible=True),
gr.update(visible=False)
)
if not api_key or not api_key.strip():
return (
gr.update(value="⚠️ Please enter a valid API key."),
gr.update(visible=False),
gr.update(visible=True),
gr.update(visible=False)
)
self.api_key = api_key.strip()
if self._initialize_client():
# Test the API key with a simple request
try:
test_chat = self.client.chat.completions.create(
model=self.model_name,
messages=[{"role": "user", "content": "Hello"}],
max_tokens=10
)
# Save key if user wants to remember it
if remember_key:
if self._save_encrypted_key(self.current_user, self.api_key):
success_msg = "✅ API key validated and saved successfully! You can now upload an ePub file."
else:
success_msg = "✅ API key validated successfully! (Note: Failed to save for future use)"
else:
success_msg = "✅ API key validated successfully! You can now upload an ePub file."
return (
gr.update(value=success_msg),
gr.update(visible=True), # inp
gr.update(visible=False), # api_key_section
gr.update(visible=remember_key) # clear_key_btn
)
except Exception as e:
error_msg = str(e)
if "401" in error_msg or "unauthorized" in error_msg.lower():
return (
gr.update(value="❌ Invalid API key. Please check your key and try again."),
gr.update(visible=False),
gr.update(visible=True),
gr.update(visible=False)
)
elif "quota" in error_msg.lower() or "limit" in error_msg.lower():
return (
gr.update(value="⚠️ API key valid but quota exceeded. Please check your Poe account."),
gr.update(visible=False),
gr.update(visible=True),
gr.update(visible=False)
)
else:
return (
gr.update(value=f"❌ API connection error: {error_msg}"),
gr.update(visible=False),
gr.update(visible=True),
gr.update(visible=False)
)
else:
return (
gr.update(value="❌ Failed to initialize API client."),
gr.update(visible=False),
gr.update(visible=True),
gr.update(visible=False)
)
def clear_saved_key(self):
"""Clear the user's saved API key"""
if not self.current_user:
return (
gr.update(value="❌ No user logged in."),
gr.update(visible=False),
gr.update(visible=False),
gr.update(value="")
)
if self._delete_saved_key(self.current_user):
self.api_key = None
self.client = None
return (
gr.update(value="✅ Saved API key cleared. Please enter your API key below."),
gr.update(visible=True), # api_key_section
gr.update(visible=False), # clear_key_btn
gr.update(value="") # clear input
)
else:
return (
gr.update(value="⚠️ Failed to clear saved key."),
gr.update(visible=True),
gr.update(visible=True),
gr.update(value="")
)
def get_model_response(self, text: str) -> str:
"""Get response from Poe API for the given text - Your original function"""
if not self.client:
return "Error: API client not initialized"
try:
chat = self.client.chat.completions.create(
model=self.model_name,
messages=[{"role": "user", "content": text}],
)
return chat.choices[0].message.content
except Exception as e:
print(f"Error calling Poe API: {e}")
return f"Error processing text: {str(e)}"
def process(self, file, request: gr.Request = None):
"""Your original ePub processing workflow - restored completely"""
if not hasattr(request, 'username') or not request.username:
return gr.update(value='⚠️ Please login to access the tool.')
if not self.client:
return gr.update(value='⚠️ Please set your Poe API key first.')
if file is None:
return gr.update(value='Please upload an ePub file.')
try:
# Extract content from ePub - Your original workflow
ch_list = epub2txt(file.name, outputlist=True)
chapter_titles = epub2txt.content_titles
title = epub2txt.title
yield gr.update(value=f"# {title}\n\nProcessing ePub file...")
sm_list = []
# Process each chapter (skip first 2 as they're usually metadata) - Your original logic
for idx, text in enumerate(ch_list[2:], 1):
if not text.strip():
continue
yield gr.update(value=f"# {title}\n\nProcessing chapter {idx}...")
docs = []
# Split chapter into chunks for processing - Your original chunking strategy
chunk_size = 2000
for i in range(0, len(text), chunk_size):
chunk = text[i:i+2048] # Slight overlap for context
if len(chunk.strip()) > 0:
response = self.get_model_response(self.prompt + "\n\n" + chunk)
docs.append(response)
# Update UI with current progress
current_summaries = "\n\n".join([
f"## {ct}\n\n{sm}"
for ct, sm in zip(chapter_titles[2:idx+1], sm_list + [f"Processing chunk {len(docs)}..."])
])
yield gr.update(value=f"# {title}\n\n{current_summaries}")
# Combine chunk summaries into chapter summary - Your original combining logic
if docs:
if len(docs) == 1:
hist = docs[0]
else:
hist = docs[0]
for doc in docs[1:]:
combined_text = f"{self.prompt}\n\nCombine these summaries:\n\n{hist}\n\n{doc}"
hist = self.get_model_response(combined_text)
# Update UI with draft summary
current_summaries = "\n\n".join([
f"## {ct}\n\n{sm}"
for ct, sm in zip(chapter_titles[2:idx+1], sm_list + [f"Draft: {hist}"])
])
yield gr.update(value=f"# {title}\n\n{current_summaries}")
sm_list.append(hist)
# Update final output for this chapter
final_summaries = "\n\n".join([
f"## {ct}\n\n{sm}"
for ct, sm in zip(chapter_titles[2:idx+1], sm_list)
])
yield gr.update(value=f"# {title}\n\n{final_summaries}")
# Final complete summary - Your original final output
if sm_list:
complete_summary = f"# {title}\n\n" + "\n\n".join([
f"## {ct}\n\n{sm}"
for ct, sm in zip(chapter_titles[2:len(sm_list)+2], sm_list)
])
yield gr.update(value=complete_summary)
else:
yield gr.update(value=f"# {title}\n\nNo content found to summarize.")
except Exception as e:
yield gr.update(value=f"Error processing file: {str(e)}")
# Run the application
if __name__ == "__main__":
GUI()