Spaces:
Sleeping
Sleeping
import os | |
import json | |
import hashlib | |
from cryptography.fernet import Fernet | |
from cryptography.hazmat.primitives import hashes | |
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC | |
import base64 | |
import openai | |
import gradio as gr | |
from epub2txt import epub2txt | |
class GUI: | |
def __init__(self, *args, **kwargs): | |
# Configuration - restored your original settings | |
self.model_name = os.getenv("POE_MODEL", "GPT-5-mini") | |
self.prompt = os.getenv("prompt", "Summarize the following text:") | |
self.client = None | |
self.api_key = os.getenv("POE_API_KEY") | |
self.current_user = None | |
self.keys_file = "user_keys.json" | |
with gr.Blocks(title="ePub Summarizer", css=self._get_css()) as demo: | |
with gr.Row(): | |
welcome_md = gr.Markdown() | |
# Updated for Gradio 5 - removed LogoutButton (LoginButton transforms automatically) | |
login_btn = gr.LoginButton() | |
refresh_btn = gr.Button("🔄 Refresh", size="sm", variant="secondary") | |
# API Key input section (shown after login) | |
api_key_section = gr.Column(visible=False) | |
with api_key_section: | |
gr.Markdown(""" | |
### Poe API Key Setup | |
To use this tool, you need a Poe API key: | |
1. Visit [https://poe.com/api_key](https://poe.com/api_key) | |
2. If you don't have an account, create one first | |
3. Generate a new API key or copy your existing one | |
4. Paste it in the field below | |
**Security**: Your API key will be encrypted and tied to your login account. | |
""") | |
api_key_input = gr.Textbox( | |
label="Poe API Key", | |
placeholder="Enter your Poe API key here...", | |
type="password" | |
) | |
with gr.Row(): | |
remember_key = gr.Checkbox( | |
label="Remember my API key (encrypted storage)", | |
value=True, | |
info="Your key will be encrypted and saved for future sessions" | |
) | |
api_key_btn = gr.Button("Set API Key", variant="primary") | |
clear_key_btn = gr.Button("Clear Saved Key", variant="secondary", visible=False) | |
out = gr.Markdown() | |
inp = gr.File(file_types=['.epub'], visible=False, label="Upload ePub File") | |
# Event handlers - Fixed for Gradio 5 | |
demo.load( | |
fn=self.on_load, | |
outputs=[welcome_md, api_key_section, inp, clear_key_btn] | |
) | |
# Direct login button event handling | |
login_btn.click( | |
fn=self.handle_login_state_change, | |
outputs=[welcome_md, api_key_section, inp, clear_key_btn] | |
) | |
# Manual refresh button for debugging | |
refresh_btn.click( | |
fn=self.handle_login_state_change, | |
outputs=[welcome_md, api_key_section, inp, clear_key_btn] | |
) | |
api_key_btn.click( | |
self.set_api_key, | |
inputs=[api_key_input, remember_key], | |
outputs=[out, inp, api_key_section, clear_key_btn] | |
) | |
clear_key_btn.click( | |
self.clear_saved_key, | |
outputs=[out, api_key_section, clear_key_btn, api_key_input] | |
) | |
# Your original ePub processing | |
inp.change(self.process, inputs=[inp], outputs=[out]) | |
# Updated launch for Gradio 5 | |
demo.queue().launch( | |
share=True, | |
ssr_mode=False, # Disable SSR to avoid i18n issues | |
show_error=True | |
) | |
def _get_css(self): | |
"""Custom CSS for better styling""" | |
return """ | |
.gradio-container { | |
max-width: 1200px !important; | |
} | |
.epub-processor { | |
border: 2px dashed #ccc; | |
border-radius: 10px; | |
padding: 20px; | |
text-align: center; | |
} | |
.progress-display { | |
background: #f8f9fa; | |
padding: 15px; | |
border-radius: 8px; | |
margin: 10px 0; | |
} | |
""" | |
def _get_user_id(self, request): | |
"""Generate a unique user ID from request - Updated for Gradio 5""" | |
if not hasattr(request, 'username') or not request.username: | |
return None | |
# Use username as identifier | |
identifier = request.username | |
return hashlib.sha256(identifier.encode()).hexdigest() | |
def _generate_key_from_user(self, user_id): | |
"""Generate encryption key from user ID""" | |
# Use user ID as salt for key derivation | |
salt = user_id.encode()[:32].ljust(32, b'0') # Ensure 32 bytes | |
kdf = PBKDF2HMAC( | |
algorithm=hashes.SHA256(), | |
length=32, | |
salt=salt, | |
iterations=100000, | |
) | |
key = base64.urlsafe_b64encode(kdf.derive(user_id.encode())) | |
return Fernet(key) | |
def _load_user_keys(self): | |
"""Load encrypted user keys from file""" | |
try: | |
if os.path.exists(self.keys_file): | |
with open(self.keys_file, 'r') as f: | |
return json.load(f) | |
except Exception as e: | |
print(f"Error loading user keys: {e}") | |
return {} | |
def _save_user_keys(self, keys_data): | |
"""Save encrypted user keys to file""" | |
try: | |
with open(self.keys_file, 'w') as f: | |
json.dump(keys_data, f) | |
except Exception as e: | |
print(f"Error saving user keys: {e}") | |
def _get_saved_key(self, user_id): | |
"""Retrieve and decrypt user's API key""" | |
try: | |
keys_data = self._load_user_keys() | |
if user_id in keys_data: | |
cipher_suite = self._generate_key_from_user(user_id) | |
encrypted_key = base64.urlsafe_b64decode(keys_data[user_id].encode()) | |
return cipher_suite.decrypt(encrypted_key).decode() | |
except Exception as e: | |
print(f"Error retrieving saved key: {e}") | |
return None | |
def _save_encrypted_key(self, user_id, api_key): | |
"""Encrypt and save user's API key""" | |
try: | |
keys_data = self._load_user_keys() | |
cipher_suite = self._generate_key_from_user(user_id) | |
encrypted_key = cipher_suite.encrypt(api_key.encode()) | |
keys_data[user_id] = base64.urlsafe_b64encode(encrypted_key).decode() | |
self._save_user_keys(keys_data) | |
return True | |
except Exception as e: | |
print(f"Error saving encrypted key: {e}") | |
return False | |
def _delete_saved_key(self, user_id): | |
"""Delete user's saved API key""" | |
try: | |
keys_data = self._load_user_keys() | |
if user_id in keys_data: | |
del keys_data[user_id] | |
self._save_user_keys(keys_data) | |
return True | |
except Exception as e: | |
print(f"Error deleting saved key: {e}") | |
return False | |
def on_load(self, request: gr.Request = None): | |
"""Handle initial page load - Updated for Gradio 5""" | |
print(f"Page load - Username: {getattr(request, 'username', None)}") # Debug | |
return self.handle_login_state_change(request) | |
def handle_login_state_change(self, request: gr.Request = None): | |
"""Handle login state changes - works for both load and login button click""" | |
try: | |
print(f"Login state check - Username: {getattr(request, 'username', None)}") # Debug | |
print(f"Request attrs: {[attr for attr in dir(request) if not attr.startswith('_')]}") # Debug | |
if hasattr(request, 'username') and request.username: | |
return self.handle_user_logged_in(request) | |
else: | |
return self.handle_user_not_logged_in() | |
except Exception as e: | |
print(f"Error in login state change: {e}") | |
return self.handle_user_not_logged_in() | |
def check_login_state(self, request: gr.Request = None): | |
"""Check login state - with debug info""" | |
return self.handle_login_state_change(request) | |
def handle_user_not_logged_in(self): | |
"""Handle when user is not logged in""" | |
return ( | |
gr.update(value='# ePub Summarization Tool\n\nPlease login with Hugging Face to access the tool.\n\n*If you just logged in, click the 🔄 Refresh button above.*'), | |
gr.update(visible=False), # api_key_section | |
gr.update(visible=False), # inp | |
gr.update(visible=False) # clear_key_btn | |
) | |
def handle_user_logged_in(self, request): | |
"""Handle when user is logged in""" | |
if not self.current_user or self.current_user != self._get_user_id(request): | |
# New login or different user | |
self.current_user = self._get_user_id(request) | |
user_name = request.username | |
# Check if user has a saved API key | |
saved_key = self._get_saved_key(self.current_user) | |
if saved_key: | |
self.api_key = saved_key | |
if self._initialize_client(): | |
return ( | |
gr.update(value=f'# ePub Summarization Tool\n\nWelcome back {user_name}! ✅ Your saved API key is loaded and ready.'), | |
gr.update(visible=False), # api_key_section | |
gr.update(visible=True), # inp | |
gr.update(visible=True) # clear_key_btn | |
) | |
# No saved key or failed to initialize | |
return ( | |
gr.update(value=f'# ePub Summarization Tool\n\nWelcome {user_name}! Please set up your Poe API key below.'), | |
gr.update(visible=True), # api_key_section | |
gr.update(visible=False), # inp | |
gr.update(visible=False) # clear_key_btn | |
) | |
# User already processed, return current state | |
if self.client: | |
return ( | |
gr.update(value=f'# ePub Summarization Tool\n\nWelcome {request.username}! ✅ Ready to process ePub files.'), | |
gr.update(visible=False), # api_key_section | |
gr.update(visible=True), # inp | |
gr.update(visible=True) # clear_key_btn | |
) | |
else: | |
return ( | |
gr.update(value=f'# ePub Summarization Tool\n\nWelcome {request.username}! Please set up your Poe API key below.'), | |
gr.update(visible=True), # api_key_section | |
gr.update(visible=False), # inp | |
gr.update(visible=False) # clear_key_btn | |
) | |
def _initialize_client(self): | |
"""Initialize the Poe API client""" | |
try: | |
self.client = openai.OpenAI( | |
api_key=self.api_key, | |
base_url="https://api.poe.com/v1", | |
) | |
return True | |
except Exception as e: | |
print(f"Error initializing Poe client: {e}") | |
return False | |
def set_api_key(self, api_key, remember_key): | |
"""Set and validate the API key""" | |
if not self.current_user: | |
return ( | |
gr.update(value="❌ Please login first."), | |
gr.update(visible=False), | |
gr.update(visible=True), | |
gr.update(visible=False) | |
) | |
if not api_key or not api_key.strip(): | |
return ( | |
gr.update(value="⚠️ Please enter a valid API key."), | |
gr.update(visible=False), | |
gr.update(visible=True), | |
gr.update(visible=False) | |
) | |
self.api_key = api_key.strip() | |
if self._initialize_client(): | |
# Test the API key with a simple request | |
try: | |
test_chat = self.client.chat.completions.create( | |
model=self.model_name, | |
messages=[{"role": "user", "content": "Hello"}], | |
max_tokens=10 | |
) | |
# Save key if user wants to remember it | |
if remember_key: | |
if self._save_encrypted_key(self.current_user, self.api_key): | |
success_msg = "✅ API key validated and saved successfully! You can now upload an ePub file." | |
else: | |
success_msg = "✅ API key validated successfully! (Note: Failed to save for future use)" | |
else: | |
success_msg = "✅ API key validated successfully! You can now upload an ePub file." | |
return ( | |
gr.update(value=success_msg), | |
gr.update(visible=True), # inp | |
gr.update(visible=False), # api_key_section | |
gr.update(visible=remember_key) # clear_key_btn | |
) | |
except Exception as e: | |
error_msg = str(e) | |
if "401" in error_msg or "unauthorized" in error_msg.lower(): | |
return ( | |
gr.update(value="❌ Invalid API key. Please check your key and try again."), | |
gr.update(visible=False), | |
gr.update(visible=True), | |
gr.update(visible=False) | |
) | |
elif "quota" in error_msg.lower() or "limit" in error_msg.lower(): | |
return ( | |
gr.update(value="⚠️ API key valid but quota exceeded. Please check your Poe account."), | |
gr.update(visible=False), | |
gr.update(visible=True), | |
gr.update(visible=False) | |
) | |
else: | |
return ( | |
gr.update(value=f"❌ API connection error: {error_msg}"), | |
gr.update(visible=False), | |
gr.update(visible=True), | |
gr.update(visible=False) | |
) | |
else: | |
return ( | |
gr.update(value="❌ Failed to initialize API client."), | |
gr.update(visible=False), | |
gr.update(visible=True), | |
gr.update(visible=False) | |
) | |
def clear_saved_key(self): | |
"""Clear the user's saved API key""" | |
if not self.current_user: | |
return ( | |
gr.update(value="❌ No user logged in."), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(value="") | |
) | |
if self._delete_saved_key(self.current_user): | |
self.api_key = None | |
self.client = None | |
return ( | |
gr.update(value="✅ Saved API key cleared. Please enter your API key below."), | |
gr.update(visible=True), # api_key_section | |
gr.update(visible=False), # clear_key_btn | |
gr.update(value="") # clear input | |
) | |
else: | |
return ( | |
gr.update(value="⚠️ Failed to clear saved key."), | |
gr.update(visible=True), | |
gr.update(visible=True), | |
gr.update(value="") | |
) | |
def get_model_response(self, text: str) -> str: | |
"""Get response from Poe API for the given text - Your original function""" | |
if not self.client: | |
return "Error: API client not initialized" | |
try: | |
chat = self.client.chat.completions.create( | |
model=self.model_name, | |
messages=[{"role": "user", "content": text}], | |
) | |
return chat.choices[0].message.content | |
except Exception as e: | |
print(f"Error calling Poe API: {e}") | |
return f"Error processing text: {str(e)}" | |
def process(self, file, request: gr.Request = None): | |
"""Your original ePub processing workflow - restored completely""" | |
if not hasattr(request, 'username') or not request.username: | |
return gr.update(value='⚠️ Please login to access the tool.') | |
if not self.client: | |
return gr.update(value='⚠️ Please set your Poe API key first.') | |
if file is None: | |
return gr.update(value='Please upload an ePub file.') | |
try: | |
# Extract content from ePub - Your original workflow | |
ch_list = epub2txt(file.name, outputlist=True) | |
chapter_titles = epub2txt.content_titles | |
title = epub2txt.title | |
yield gr.update(value=f"# {title}\n\nProcessing ePub file...") | |
sm_list = [] | |
# Process each chapter (skip first 2 as they're usually metadata) - Your original logic | |
for idx, text in enumerate(ch_list[2:], 1): | |
if not text.strip(): | |
continue | |
yield gr.update(value=f"# {title}\n\nProcessing chapter {idx}...") | |
docs = [] | |
# Split chapter into chunks for processing - Your original chunking strategy | |
chunk_size = 2000 | |
for i in range(0, len(text), chunk_size): | |
chunk = text[i:i+2048] # Slight overlap for context | |
if len(chunk.strip()) > 0: | |
response = self.get_model_response(self.prompt + "\n\n" + chunk) | |
docs.append(response) | |
# Update UI with current progress | |
current_summaries = "\n\n".join([ | |
f"## {ct}\n\n{sm}" | |
for ct, sm in zip(chapter_titles[2:idx+1], sm_list + [f"Processing chunk {len(docs)}..."]) | |
]) | |
yield gr.update(value=f"# {title}\n\n{current_summaries}") | |
# Combine chunk summaries into chapter summary - Your original combining logic | |
if docs: | |
if len(docs) == 1: | |
hist = docs[0] | |
else: | |
hist = docs[0] | |
for doc in docs[1:]: | |
combined_text = f"{self.prompt}\n\nCombine these summaries:\n\n{hist}\n\n{doc}" | |
hist = self.get_model_response(combined_text) | |
# Update UI with draft summary | |
current_summaries = "\n\n".join([ | |
f"## {ct}\n\n{sm}" | |
for ct, sm in zip(chapter_titles[2:idx+1], sm_list + [f"Draft: {hist}"]) | |
]) | |
yield gr.update(value=f"# {title}\n\n{current_summaries}") | |
sm_list.append(hist) | |
# Update final output for this chapter | |
final_summaries = "\n\n".join([ | |
f"## {ct}\n\n{sm}" | |
for ct, sm in zip(chapter_titles[2:idx+1], sm_list) | |
]) | |
yield gr.update(value=f"# {title}\n\n{final_summaries}") | |
# Final complete summary - Your original final output | |
if sm_list: | |
complete_summary = f"# {title}\n\n" + "\n\n".join([ | |
f"## {ct}\n\n{sm}" | |
for ct, sm in zip(chapter_titles[2:len(sm_list)+2], sm_list) | |
]) | |
yield gr.update(value=complete_summary) | |
else: | |
yield gr.update(value=f"# {title}\n\nNo content found to summarize.") | |
except Exception as e: | |
yield gr.update(value=f"Error processing file: {str(e)}") | |
# Run the application | |
if __name__ == "__main__": | |
GUI() |