Spaces:
Sleeping
Sleeping
import os | |
import json | |
import logging | |
from typing import List, Dict, Any, Optional | |
from dotenv import load_dotenv | |
from openai import OpenAI | |
from pypdf import PdfReader | |
import requests | |
import gradio as gr | |
from pydantic import BaseModel | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Load environment variables | |
load_dotenv(override=True) | |
class Config: | |
"""Configuration management""" | |
def __init__(self): | |
self.openai_client = None | |
self.gemini_client = None | |
self.pushover_user = os.getenv("PUSHOVER_USER") | |
self.pushover_token = os.getenv("PUSHOVER_TOKEN_EU") | |
self.google_api_key = os.getenv("GOOGLE_API_KEY") | |
self.pushover_url = "https://api.pushover.net/1/messages.json" | |
self.pdf_path = "EU_AI_ACT.pdf" | |
self._validate_config() | |
self._initialize_clients() | |
def _validate_config(self): | |
"""Validate required environment variables""" | |
required_vars = { | |
"OPENAI_API_KEY": os.getenv("OPENAI_API_KEY"), | |
"GOOGLE_API_KEY": self.google_api_key, | |
"PUSHOVER_USER": self.pushover_user, | |
"PUSHOVER_TOKEN_EU": self.pushover_token | |
} | |
missing = [var for var, value in required_vars.items() if not value] | |
if missing: | |
raise ValueError(f"Missing required environment variables: {missing}") | |
def _initialize_clients(self): | |
"""Initialize API clients""" | |
try: | |
self.openai_client = OpenAI() | |
self.gemini_client = OpenAI( | |
api_key=self.google_api_key, | |
base_url="https://generativelanguage.googleapis.com/v1beta/openai/" | |
) | |
except Exception as e: | |
logger.error(f"Failed to initialize API clients: {e}") | |
raise | |
class PushNotificationService: | |
"""Handle push notifications""" | |
def __init__(self, config: Config): | |
self.config = config | |
def send_notification(self, message: str) -> bool: | |
"""Send push notification""" | |
try: | |
logger.info(f"Sending notification: {message}") | |
payload = { | |
"user": self.config.pushover_user, | |
"token": self.config.pushover_token, | |
"message": message | |
} | |
response = requests.post(self.config.pushover_url, data=payload, timeout=10) | |
response.raise_for_status() | |
return True | |
except Exception as e: | |
logger.error(f"Failed to send notification: {e}") | |
return False | |
class DocumentLoader: | |
"""Handle document loading and processing""" | |
def __init__(self, config: Config): | |
self.config = config | |
self.document_content = "" | |
self._load_document() | |
def _load_document(self): | |
"""Load and extract text from PDF""" | |
try: | |
if not os.path.exists(self.config.pdf_path): | |
raise FileNotFoundError(f"PDF file not found: {self.config.pdf_path}") | |
reader = PdfReader(self.config.pdf_path) | |
text_parts = [] | |
for page_num, page in enumerate(reader.pages): | |
try: | |
text = page.extract_text() | |
if text: | |
text_parts.append(text) | |
except Exception as e: | |
logger.warning(f"Failed to extract text from page {page_num}: {e}") | |
self.document_content = "\n".join(text_parts) | |
logger.info(f"Successfully loaded document with {len(self.document_content)} characters") | |
except Exception as e: | |
logger.error(f"Failed to load document: {e}") | |
# Provide fallback content | |
self.document_content = "Document loading failed. Operating with limited information." | |
class ToolHandler: | |
"""Handle tool calls and user interactions""" | |
def __init__(self, notification_service: PushNotificationService): | |
self.notification_service = notification_service | |
def record_user_details(self, email: str, name: str = "Name not provided", | |
notes: str = "No additional notes") -> Dict[str, str]: | |
"""Record user contact details""" | |
try: | |
message = f"Recording interest from {name} with email {email} and notes: {notes}" | |
success = self.notification_service.send_notification(message) | |
return {"status": "success" if success else "notification_failed", "recorded": "ok"} | |
except Exception as e: | |
logger.error(f"Failed to record user details: {e}") | |
return {"status": "error", "message": str(e)} | |
def record_unknown_question(self, question: str) -> Dict[str, str]: | |
"""Record questions that couldn't be answered""" | |
try: | |
message = f"Unanswered question: {question}" | |
success = self.notification_service.send_notification(message) | |
return {"status": "success" if success else "notification_failed", "recorded": "ok"} | |
except Exception as e: | |
logger.error(f"Failed to record unknown question: {e}") | |
return {"status": "error", "message": str(e)} | |
def get_tools_schema(self) -> List[Dict[str, Any]]: | |
"""Return tool schemas for OpenAI""" | |
return [ | |
{ | |
"type": "function", | |
"function": { | |
"name": "record_user_details", | |
"description": "Record user contact information when they express interest in follow-up", | |
"parameters": { | |
"type": "object", | |
"properties": { | |
"email": { | |
"type": "string", | |
"description": "The user's email address" | |
}, | |
"name": { | |
"type": "string", | |
"description": "The user's name if provided" | |
}, | |
"notes": { | |
"type": "string", | |
"description": "Additional context about the conversation" | |
} | |
}, | |
"required": ["email"], | |
"additionalProperties": False | |
} | |
} | |
}, | |
{ | |
"type": "function", | |
"function": { | |
"name": "record_unknown_question", | |
"description": "Record questions that couldn't be answered from the documentation", | |
"parameters": { | |
"type": "object", | |
"properties": { | |
"question": { | |
"type": "string", | |
"description": "The question that couldn't be answered" | |
} | |
}, | |
"required": ["question"], | |
"additionalProperties": False | |
} | |
} | |
} | |
] | |
def handle_tool_calls(self, tool_calls) -> List[Dict[str, Any]]: | |
"""Process tool calls from the LLM""" | |
results = [] | |
for tool_call in tool_calls: | |
try: | |
tool_name = tool_call.function.name | |
arguments = json.loads(tool_call.function.arguments) | |
logger.info(f"Executing tool: {tool_name}") | |
if tool_name == "record_user_details": | |
result = self.record_user_details(**arguments) | |
elif tool_name == "record_unknown_question": | |
result = self.record_unknown_question(**arguments) | |
else: | |
result = {"status": "error", "message": f"Unknown tool: {tool_name}"} | |
results.append({ | |
"role": "tool", | |
"content": json.dumps(result), | |
"tool_call_id": tool_call.id | |
}) | |
except Exception as e: | |
logger.error(f"Tool call failed: {e}") | |
results.append({ | |
"role": "tool", | |
"content": json.dumps({"status": "error", "message": str(e)}), | |
"tool_call_id": tool_call.id | |
}) | |
return results | |
class Evaluation(BaseModel): | |
"""Pydantic model for response evaluation""" | |
is_acceptable: bool | |
feedback: str | |
class ResponseEvaluator: | |
"""Evaluate chatbot responses for quality""" | |
def __init__(self, config: Config, document_content: str): | |
self.config = config | |
self.document_content = document_content | |
self.system_prompt = self._build_evaluator_prompt() | |
def _build_evaluator_prompt(self) -> str: | |
"""Build the evaluator system prompt""" | |
return f"""You are an evaluator for an EU AI Act expert chatbot. | |
Your task is to determine if the chatbot's response is acceptable quality based on: | |
1. Accuracy relative to the provided EU AI Act documentation | |
2. Clarity and helpfulness for the user | |
3. Professional and engaging tone | |
4. Appropriate use of the documentation context | |
The chatbot has access to this EU AI Act documentation: | |
{self.document_content[:5000]}... | |
Evaluate whether the response is acceptable and provide constructive feedback.""" | |
def evaluate_response(self, reply: str, message: str, history: List[Dict[str, str]]) -> Optional[Evaluation]: | |
"""Evaluate a chatbot response""" | |
try: | |
user_prompt = self._build_user_prompt(reply, message, history) | |
messages = [ | |
{"role": "system", "content": self.system_prompt}, | |
{"role": "user", "content": user_prompt} | |
] | |
response = self.config.gemini_client.beta.chat.completions.parse( | |
model="gemini-2.0-flash", | |
messages=messages, | |
response_format=Evaluation, | |
timeout=30 | |
) | |
return response.choices[0].message.parsed | |
except Exception as e: | |
logger.error(f"Evaluation failed: {e}") | |
return None | |
def _build_user_prompt(self, reply: str, message: str, history: List[Dict[str, str]]) -> str: | |
"""Build evaluation prompt for specific conversation""" | |
history_text = "\n".join([f"{msg['role']}: {msg['content']}" for msg in history[-5:]]) | |
return f""" | |
Conversation history (last 5 messages): | |
{history_text} | |
Latest user message: {message} | |
Chatbot response: {reply} | |
Please evaluate if this response is acceptable and provide feedback. | |
""" | |
class EUAIActChatbot: | |
"""Main chatbot class""" | |
def __init__(self): | |
self.config = Config() | |
self.notification_service = PushNotificationService(self.config) | |
self.document_loader = DocumentLoader(self.config) | |
self.tool_handler = ToolHandler(self.notification_service) | |
self.evaluator = ResponseEvaluator(self.config, self.document_loader.document_content) | |
self.system_prompt = self._build_system_prompt() | |
self.max_retries = 2 | |
def _build_system_prompt(self) -> str: | |
"""Build the main system prompt""" | |
return f"""You are an expert assistant specializing in the EU Artificial Intelligence Act (EU AI Act). | |
Your role is to help users understand: | |
- Key principles and obligations under the EU AI Act | |
- Risk classifications for AI systems | |
- Compliance requirements for businesses | |
- How the Act applies to different sectors and use cases | |
Guidelines: | |
- Provide accurate, clear, and actionable guidance based on the official documentation | |
- Make complex legal language accessible to business owners and compliance officers | |
- Maintain a professional, informative, and approachable tone | |
- If you cannot answer a question from the provided documentation, use the record_unknown_question tool | |
- If users show interest in deeper support, encourage them to share contact details using record_user_details | |
## EU AI Act Documentation: | |
{self.document_loader.document_content} | |
Use this documentation to provide accurate, helpful responses about the EU AI Act.""" | |
def generate_response(self, message: str, history: List[Dict[str, str]]) -> str: | |
"""Generate a response with tool support and evaluation""" | |
messages = [{"role": "system", "content": self.system_prompt}] | |
messages.extend(history) | |
messages.append({"role": "user", "content": message}) | |
try: | |
# Generate initial response with tools | |
response = self._call_openai_with_tools(messages) | |
# Evaluate response quality | |
evaluation = self.evaluator.evaluate_response(response, message, history) | |
if evaluation and not evaluation.is_acceptable and self.max_retries > 0: | |
logger.info("Response failed evaluation, retrying...") | |
response = self._retry_with_feedback(messages, response, evaluation.feedback) | |
return response | |
except Exception as e: | |
logger.error(f"Failed to generate response: {e}") | |
return "I'm sorry, I'm experiencing technical difficulties. Please try again later." | |
def _call_openai_with_tools(self, messages: List[Dict[str, str]]) -> str: | |
"""Call OpenAI API with tool support""" | |
tools = self.tool_handler.get_tools_schema() | |
max_iterations = 5 | |
iteration = 0 | |
while iteration < max_iterations: | |
response = self.config.openai_client.chat.completions.create( | |
model="gpt-4o-mini", | |
messages=messages, | |
tools=tools, | |
timeout=60 | |
) | |
finish_reason = response.choices[0].finish_reason | |
if finish_reason == "tool_calls": | |
# Handle tool calls | |
message_with_tools = response.choices[0].message | |
tool_results = self.tool_handler.handle_tool_calls(message_with_tools.tool_calls) | |
messages.append(message_with_tools) | |
messages.extend(tool_results) | |
iteration += 1 | |
else: | |
return response.choices[0].message.content | |
return "I apologize, but I encountered an issue processing your request. Please try rephrasing your question." | |
def _retry_with_feedback(self, original_messages: List[Dict[str, str]], | |
failed_response: str, feedback: str) -> str: | |
"""Retry generation with evaluator feedback""" | |
try: | |
retry_prompt = f"""Your previous response was not acceptable. Here's what needs improvement: | |
Previous response: {failed_response} | |
Feedback: {feedback} | |
Please provide a better response addressing these concerns.""" | |
messages = original_messages + [{"role": "user", "content": retry_prompt}] | |
response = self.config.openai_client.chat.completions.create( | |
model="gpt-4o-mini", | |
messages=messages, | |
timeout=60 | |
) | |
return response.choices[0].message.content | |
except Exception as e: | |
logger.error(f"Retry failed: {e}") | |
return failed_response # Return original if retry fails | |
def create_gradio_interface(): | |
"""Create and configure Gradio interface""" | |
try: | |
chatbot = EUAIActChatbot() | |
def chat_wrapper(message: str, history: List[List[str]]) -> str: | |
# Convert Gradio format to OpenAI format | |
formatted_history = [] | |
for i, (user_msg, assistant_msg) in enumerate(history): | |
formatted_history.append({"role": "user", "content": user_msg}) | |
if assistant_msg: # Only add if assistant responded | |
formatted_history.append({"role": "assistant", "content": assistant_msg}) | |
return chatbot.generate_response(message, formatted_history) | |
# Create interface | |
interface = gr.ChatInterface( | |
fn=chat_wrapper, | |
title="EU AI Act Expert Assistant", | |
description="Ask questions about the EU Artificial Intelligence Act. I can help you understand compliance requirements, risk classifications, and how the Act applies to your business.", | |
examples=[ | |
"What are the main risk categories in the EU AI Act?", | |
"How does the EU AI Act affect my e-commerce business?", | |
"What are the compliance requirements for high-risk AI systems?", | |
"Can you explain the prohibited AI practices?" | |
], | |
retry_btn=True, | |
undo_btn=True, | |
clear_btn=True | |
) | |
return interface | |
except Exception as e: | |
logger.error(f"Failed to create interface: {e}") | |
raise | |
if __name__ == "__main__": | |
try: | |
interface = create_gradio_interface() | |
interface.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
share=False, | |
debug=False | |
) | |
except Exception as e: | |
logger.error(f"Failed to launch application: {e}") | |
print(f"Error: {e}") | |
print("Please check your configuration and try again.") |