Spaces:
Sleeping
Sleeping
File size: 20,340 Bytes
4967799 212347a 4967799 212347a a4b714f 212347a 00261f9 b8becf7 212347a b8becf7 212347a b8becf7 00261f9 212347a 00261f9 212347a 4967799 212347a 4967799 212347a 00261f9 212347a 00261f9 4984061 00261f9 4984061 00261f9 4984061 00261f9 4984061 00261f9 4984061 212347a 4984061 212347a 0612cdd 4984061 212347a b8becf7 212347a 0612cdd 212347a e594988 212347a b8becf7 212347a b8becf7 212347a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 |
import os
import json
import hashlib
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
import openai
import gradio as gr
from epub2txt import epub2txt
class GUI:
def __init__(self, *args, **kwargs):
# Configuration - restored your original settings
self.model_name = os.getenv("POE_MODEL", "GPT-5-mini")
self.prompt = os.getenv("prompt", "Summarize the following text:")
self.client = None
self.api_key = os.getenv("POE_API_KEY")
self.current_user = None
self.keys_file = "user_keys.json"
with gr.Blocks(title="ePub Summarizer", css=self._get_css()) as demo:
with gr.Row():
welcome_md = gr.Markdown()
# Updated for Gradio 5 - removed LogoutButton (LoginButton transforms automatically)
login_btn = gr.LoginButton()
refresh_btn = gr.Button("π Refresh", size="sm", variant="secondary")
# API Key input section (shown after login)
api_key_section = gr.Column(visible=False)
with api_key_section:
gr.Markdown("""
### Poe API Key Setup
To use this tool, you need a Poe API key:
1. Visit [https://poe.com/api_key](https://poe.com/api_key)
2. If you don't have an account, create one first
3. Generate a new API key or copy your existing one
4. Paste it in the field below
**Security**: Your API key will be encrypted and tied to your login account.
""")
api_key_input = gr.Textbox(
label="Poe API Key",
placeholder="Enter your Poe API key here...",
type="password"
)
with gr.Row():
remember_key = gr.Checkbox(
label="Remember my API key (encrypted storage)",
value=True,
info="Your key will be encrypted and saved for future sessions"
)
api_key_btn = gr.Button("Set API Key", variant="primary")
clear_key_btn = gr.Button("Clear Saved Key", variant="secondary", visible=False)
out = gr.Markdown()
inp = gr.File(file_types=['.epub'], visible=False, label="Upload ePub File")
# Event handlers - Fixed for Gradio 5
demo.load(
fn=self.on_load,
outputs=[welcome_md, api_key_section, inp, clear_key_btn]
)
# Direct login button event handling
login_btn.click(
fn=self.handle_login_state_change,
outputs=[welcome_md, api_key_section, inp, clear_key_btn]
)
# Manual refresh button for debugging
refresh_btn.click(
fn=self.handle_login_state_change,
outputs=[welcome_md, api_key_section, inp, clear_key_btn]
)
api_key_btn.click(
self.set_api_key,
inputs=[api_key_input, remember_key],
outputs=[out, inp, api_key_section, clear_key_btn]
)
clear_key_btn.click(
self.clear_saved_key,
outputs=[out, api_key_section, clear_key_btn, api_key_input]
)
# Your original ePub processing
inp.change(self.process, inputs=[inp], outputs=[out])
# Updated launch for Gradio 5
demo.queue().launch(
share=True,
ssr_mode=False, # Disable SSR to avoid i18n issues
show_error=True
)
def _get_css(self):
"""Custom CSS for better styling"""
return """
.gradio-container {
max-width: 1200px !important;
}
.epub-processor {
border: 2px dashed #ccc;
border-radius: 10px;
padding: 20px;
text-align: center;
}
.progress-display {
background: #f8f9fa;
padding: 15px;
border-radius: 8px;
margin: 10px 0;
}
"""
def _get_user_id(self, request):
"""Generate a unique user ID from request - Updated for Gradio 5"""
if not hasattr(request, 'username') or not request.username:
return None
# Use username as identifier
identifier = request.username
return hashlib.sha256(identifier.encode()).hexdigest()
def _generate_key_from_user(self, user_id):
"""Generate encryption key from user ID"""
# Use user ID as salt for key derivation
salt = user_id.encode()[:32].ljust(32, b'0') # Ensure 32 bytes
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
)
key = base64.urlsafe_b64encode(kdf.derive(user_id.encode()))
return Fernet(key)
def _load_user_keys(self):
"""Load encrypted user keys from file"""
try:
if os.path.exists(self.keys_file):
with open(self.keys_file, 'r') as f:
return json.load(f)
except Exception as e:
print(f"Error loading user keys: {e}")
return {}
def _save_user_keys(self, keys_data):
"""Save encrypted user keys to file"""
try:
with open(self.keys_file, 'w') as f:
json.dump(keys_data, f)
except Exception as e:
print(f"Error saving user keys: {e}")
def _get_saved_key(self, user_id):
"""Retrieve and decrypt user's API key"""
try:
keys_data = self._load_user_keys()
if user_id in keys_data:
cipher_suite = self._generate_key_from_user(user_id)
encrypted_key = base64.urlsafe_b64decode(keys_data[user_id].encode())
return cipher_suite.decrypt(encrypted_key).decode()
except Exception as e:
print(f"Error retrieving saved key: {e}")
return None
def _save_encrypted_key(self, user_id, api_key):
"""Encrypt and save user's API key"""
try:
keys_data = self._load_user_keys()
cipher_suite = self._generate_key_from_user(user_id)
encrypted_key = cipher_suite.encrypt(api_key.encode())
keys_data[user_id] = base64.urlsafe_b64encode(encrypted_key).decode()
self._save_user_keys(keys_data)
return True
except Exception as e:
print(f"Error saving encrypted key: {e}")
return False
def _delete_saved_key(self, user_id):
"""Delete user's saved API key"""
try:
keys_data = self._load_user_keys()
if user_id in keys_data:
del keys_data[user_id]
self._save_user_keys(keys_data)
return True
except Exception as e:
print(f"Error deleting saved key: {e}")
return False
def on_load(self, request: gr.Request = None):
"""Handle initial page load - Updated for Gradio 5"""
print(f"Page load - Username: {getattr(request, 'username', None)}") # Debug
return self.handle_login_state_change(request)
def handle_login_state_change(self, request: gr.Request = None):
"""Handle login state changes - works for both load and login button click"""
try:
print(f"Login state check - Username: {getattr(request, 'username', None)}") # Debug
print(f"Request attrs: {[attr for attr in dir(request) if not attr.startswith('_')]}") # Debug
if hasattr(request, 'username') and request.username:
return self.handle_user_logged_in(request)
else:
return self.handle_user_not_logged_in()
except Exception as e:
print(f"Error in login state change: {e}")
return self.handle_user_not_logged_in()
def check_login_state(self, request: gr.Request = None):
"""Check login state - with debug info"""
return self.handle_login_state_change(request)
def handle_user_not_logged_in(self):
"""Handle when user is not logged in"""
return (
gr.update(value='# ePub Summarization Tool\n\nPlease login with Hugging Face to access the tool.\n\n*If you just logged in, click the π Refresh button above.*'),
gr.update(visible=False), # api_key_section
gr.update(visible=False), # inp
gr.update(visible=False) # clear_key_btn
)
def handle_user_logged_in(self, request):
"""Handle when user is logged in"""
if not self.current_user or self.current_user != self._get_user_id(request):
# New login or different user
self.current_user = self._get_user_id(request)
user_name = request.username
# Check if user has a saved API key
saved_key = self._get_saved_key(self.current_user)
if saved_key:
self.api_key = saved_key
if self._initialize_client():
return (
gr.update(value=f'# ePub Summarization Tool\n\nWelcome back {user_name}! β
Your saved API key is loaded and ready.'),
gr.update(visible=False), # api_key_section
gr.update(visible=True), # inp
gr.update(visible=True) # clear_key_btn
)
# No saved key or failed to initialize
return (
gr.update(value=f'# ePub Summarization Tool\n\nWelcome {user_name}! Please set up your Poe API key below.'),
gr.update(visible=True), # api_key_section
gr.update(visible=False), # inp
gr.update(visible=False) # clear_key_btn
)
# User already processed, return current state
if self.client:
return (
gr.update(value=f'# ePub Summarization Tool\n\nWelcome {request.username}! β
Ready to process ePub files.'),
gr.update(visible=False), # api_key_section
gr.update(visible=True), # inp
gr.update(visible=True) # clear_key_btn
)
else:
return (
gr.update(value=f'# ePub Summarization Tool\n\nWelcome {request.username}! Please set up your Poe API key below.'),
gr.update(visible=True), # api_key_section
gr.update(visible=False), # inp
gr.update(visible=False) # clear_key_btn
)
def _initialize_client(self):
"""Initialize the Poe API client"""
try:
self.client = openai.OpenAI(
api_key=self.api_key,
base_url="https://api.poe.com/v1",
)
return True
except Exception as e:
print(f"Error initializing Poe client: {e}")
return False
def set_api_key(self, api_key, remember_key):
"""Set and validate the API key"""
if not self.current_user:
return (
gr.update(value="β Please login first."),
gr.update(visible=False),
gr.update(visible=True),
gr.update(visible=False)
)
if not api_key or not api_key.strip():
return (
gr.update(value="β οΈ Please enter a valid API key."),
gr.update(visible=False),
gr.update(visible=True),
gr.update(visible=False)
)
self.api_key = api_key.strip()
if self._initialize_client():
# Test the API key with a simple request
try:
test_chat = self.client.chat.completions.create(
model=self.model_name,
messages=[{"role": "user", "content": "Hello"}],
max_tokens=10
)
# Save key if user wants to remember it
if remember_key:
if self._save_encrypted_key(self.current_user, self.api_key):
success_msg = "β
API key validated and saved successfully! You can now upload an ePub file."
else:
success_msg = "β
API key validated successfully! (Note: Failed to save for future use)"
else:
success_msg = "β
API key validated successfully! You can now upload an ePub file."
return (
gr.update(value=success_msg),
gr.update(visible=True), # inp
gr.update(visible=False), # api_key_section
gr.update(visible=remember_key) # clear_key_btn
)
except Exception as e:
error_msg = str(e)
if "401" in error_msg or "unauthorized" in error_msg.lower():
return (
gr.update(value="β Invalid API key. Please check your key and try again."),
gr.update(visible=False),
gr.update(visible=True),
gr.update(visible=False)
)
elif "quota" in error_msg.lower() or "limit" in error_msg.lower():
return (
gr.update(value="β οΈ API key valid but quota exceeded. Please check your Poe account."),
gr.update(visible=False),
gr.update(visible=True),
gr.update(visible=False)
)
else:
return (
gr.update(value=f"β API connection error: {error_msg}"),
gr.update(visible=False),
gr.update(visible=True),
gr.update(visible=False)
)
else:
return (
gr.update(value="β Failed to initialize API client."),
gr.update(visible=False),
gr.update(visible=True),
gr.update(visible=False)
)
def clear_saved_key(self):
"""Clear the user's saved API key"""
if not self.current_user:
return (
gr.update(value="β No user logged in."),
gr.update(visible=False),
gr.update(visible=False),
gr.update(value="")
)
if self._delete_saved_key(self.current_user):
self.api_key = None
self.client = None
return (
gr.update(value="β
Saved API key cleared. Please enter your API key below."),
gr.update(visible=True), # api_key_section
gr.update(visible=False), # clear_key_btn
gr.update(value="") # clear input
)
else:
return (
gr.update(value="β οΈ Failed to clear saved key."),
gr.update(visible=True),
gr.update(visible=True),
gr.update(value="")
)
def get_model_response(self, text: str) -> str:
"""Get response from Poe API for the given text - Your original function"""
if not self.client:
return "Error: API client not initialized"
try:
chat = self.client.chat.completions.create(
model=self.model_name,
messages=[{"role": "user", "content": text}],
)
return chat.choices[0].message.content
except Exception as e:
print(f"Error calling Poe API: {e}")
return f"Error processing text: {str(e)}"
def process(self, file, request: gr.Request = None):
"""Your original ePub processing workflow - restored completely"""
if not hasattr(request, 'username') or not request.username:
return gr.update(value='β οΈ Please login to access the tool.')
if not self.client:
return gr.update(value='β οΈ Please set your Poe API key first.')
if file is None:
return gr.update(value='Please upload an ePub file.')
try:
# Extract content from ePub - Your original workflow
ch_list = epub2txt(file.name, outputlist=True)
chapter_titles = epub2txt.content_titles
title = epub2txt.title
yield gr.update(value=f"# {title}\n\nProcessing ePub file...")
sm_list = []
# Process each chapter (skip first 2 as they're usually metadata) - Your original logic
for idx, text in enumerate(ch_list[2:], 1):
if not text.strip():
continue
yield gr.update(value=f"# {title}\n\nProcessing chapter {idx}...")
docs = []
# Split chapter into chunks for processing - Your original chunking strategy
chunk_size = 2000
for i in range(0, len(text), chunk_size):
chunk = text[i:i+2048] # Slight overlap for context
if len(chunk.strip()) > 0:
response = self.get_model_response(self.prompt + "\n\n" + chunk)
docs.append(response)
# Update UI with current progress
current_summaries = "\n\n".join([
f"## {ct}\n\n{sm}"
for ct, sm in zip(chapter_titles[2:idx+1], sm_list + [f"Processing chunk {len(docs)}..."])
])
yield gr.update(value=f"# {title}\n\n{current_summaries}")
# Combine chunk summaries into chapter summary - Your original combining logic
if docs:
if len(docs) == 1:
hist = docs[0]
else:
hist = docs[0]
for doc in docs[1:]:
combined_text = f"{self.prompt}\n\nCombine these summaries:\n\n{hist}\n\n{doc}"
hist = self.get_model_response(combined_text)
# Update UI with draft summary
current_summaries = "\n\n".join([
f"## {ct}\n\n{sm}"
for ct, sm in zip(chapter_titles[2:idx+1], sm_list + [f"Draft: {hist}"])
])
yield gr.update(value=f"# {title}\n\n{current_summaries}")
sm_list.append(hist)
# Update final output for this chapter
final_summaries = "\n\n".join([
f"## {ct}\n\n{sm}"
for ct, sm in zip(chapter_titles[2:idx+1], sm_list)
])
yield gr.update(value=f"# {title}\n\n{final_summaries}")
# Final complete summary - Your original final output
if sm_list:
complete_summary = f"# {title}\n\n" + "\n\n".join([
f"## {ct}\n\n{sm}"
for ct, sm in zip(chapter_titles[2:len(sm_list)+2], sm_list)
])
yield gr.update(value=complete_summary)
else:
yield gr.update(value=f"# {title}\n\nNo content found to summarize.")
except Exception as e:
yield gr.update(value=f"Error processing file: {str(e)}")
# Run the application
if __name__ == "__main__":
GUI() |