ReallyFloppyPenguin's picture
Create app.py
c6899d7 verified
raw
history blame
27.5 kB
import gradio as gr
import os
import json
import hashlib
import datetime
from typing import List, Dict, Any, Optional
import requests
import time
import uuid
from pinecone import Pinecone
class RAGMemorySystem:
"""RAG system using Pinecone with integrated inference for embeddings and vector storage"""
def __init__(self):
# Initialize Pinecone - use the hardcoded key or environment variable
self.pinecone_api_key = os.getenv("PINECONE_API_KEY")
self.pinecone_environment = os.getenv("PINECONE_ENVIRONMENT", "us-east-1") # Serverless doesn't need specific environment
# Generate unique index name with timestamp to avoid conflicts
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M")
user_id = str(uuid.uuid4())[:8]
self.index_name = os.getenv("PINECONE_INDEX_NAME", f"ai-experiences-{timestamp}-{user_id}")
# Pinecone inference model configuration
self.embedding_model = os.getenv("PINECONE_EMBEDDING_MODEL", "multilingual-e5-large")
self.rerank_model = os.getenv("PINECONE_RERANK_MODEL", "pinecone-rerank-v0")
# Initialize OpenRouter
self.openrouter_api_key = os.getenv("OPENROUTER_API_KEY")
self.model_name = os.getenv("MODEL_NAME", "meta-llama/llama-4-maverick:free")
# Initialize Pinecone client
self.pc = None
self.index = None
# Initialize Pinecone
self.init_pinecone()
def init_pinecone(self):
"""Initialize Pinecone connection with integrated inference"""
try:
if self.pinecone_api_key:
# Initialize Pinecone client
self.pc = Pinecone(api_key=self.pinecone_api_key)
print(f"Attempting to connect to Pinecone...")
# Check existing indexes
try:
existing_indexes = [idx.name for idx in self.pc.list_indexes()]
print(f"Existing indexes: {existing_indexes}")
except Exception as list_error:
print(f"Error listing indexes: {list_error}")
existing_indexes = []
# Create index with integrated inference if it doesn't exist
if self.index_name not in existing_indexes:
print(f"Creating new Pinecone index with integrated inference: {self.index_name}")
try:
# Create index with integrated embedding model
index_model = self.pc.create_index_for_model(
name=self.index_name,
cloud="aws",
region="us-east-1",
embed={
"model": self.embedding_model,
"field_map": {"text": "content"} # Map 'text' field to 'content' field
}
)
print(f"Successfully created index with integrated inference: {self.index_name}")
print(f"Index details: {index_model}")
# Wait for index to be ready
time.sleep(10)
except Exception as create_error:
print(f"Error creating index with integrated inference: {create_error}")
# Fallback to traditional index creation
try:
self.pc.create_index(
name=self.index_name,
dimension=1024, # multilingual-e5-large dimension
metric="cosine",
spec={
"serverless": {
"cloud": "aws",
"region": "us-east-1"
}
}
)
print(f"Created fallback traditional index: {self.index_name}")
time.sleep(5)
except Exception as fallback_error:
print(f"Failed to create fallback index: {fallback_error}")
# Try with simpler name
simple_name = f"ai-exp-{str(uuid.uuid4())[:6]}"
try:
self.pc.create_index(
name=simple_name,
dimension=1024,
metric="cosine",
spec={
"serverless": {
"cloud": "aws",
"region": "us-east-1"
}
}
)
self.index_name = simple_name
print(f"Created simple fallback index: {self.index_name}")
time.sleep(5)
except Exception as final_error:
print(f"Final index creation failed: {final_error}")
self.index = None
return
# Connect to the index
try:
self.index = self.pc.Index(self.index_name)
print(f"Successfully connected to Pinecone index: {self.index_name}")
# Test the connection
stats = self.index.describe_index_stats()
print(f"Index stats: {stats}")
except Exception as connect_error:
print(f"Error connecting to index: {connect_error}")
self.index = None
else:
print("Warning: Pinecone API key not found. Memory storage disabled.")
self.index = None
except Exception as e:
print(f"Error initializing Pinecone: {e}")
self.index = None
def create_embedding(self, text: str) -> List[float]:
"""Create embedding using Pinecone's inference API"""
try:
if not self.pc:
print("Pinecone client not available, returning zero vector")
return [0.0] * 1024
# Use Pinecone's inference API for embeddings
response = self.pc.inference.embed(
model=self.embedding_model,
inputs=[text],
parameters={
"input_type": "passage", # Use 'passage' for storing, 'query' for searching
"truncate": "END"
}
)
if response and len(response.data) > 0:
return response.data[0].values
else:
print("No embedding data received, returning zero vector")
return [0.0] * 1024
except Exception as e:
print(f"Error creating embedding with Pinecone inference: {e}")
return [0.0] * 1024 # Return zero vector as fallback
def create_query_embedding(self, text: str) -> List[float]:
"""Create embedding for query using Pinecone's inference API"""
try:
if not self.pc:
print("Pinecone client not available, returning zero vector")
return [0.0] * 1024
# Use Pinecone's inference API for query embeddings
response = self.pc.inference.embed(
model=self.embedding_model,
inputs=[text],
parameters={
"input_type": "query", # Use 'query' for searching
"truncate": "END"
}
)
if response and len(response.data) > 0:
return response.data[0].values
else:
print("No embedding data received, returning zero vector")
return [0.0] * 1024
except Exception as e:
print(f"Error creating query embedding with Pinecone inference: {e}")
return [0.0] * 1024 # Return zero vector as fallback
def store_experience(self, user_input: str, ai_response: str, context: str = "") -> str:
"""Store conversation experience in Pinecone using integrated inference"""
if not self.index:
return "Memory storage not available (Pinecone not configured)"
try:
# Create a unique ID for this experience
experience_id = hashlib.md5(
f"{user_input}_{ai_response}_{datetime.datetime.now()}_{uuid.uuid4()}".encode()
).hexdigest()
# Create combined text for embedding
combined_text = f"User: {user_input}\nAI: {ai_response}\nContext: {context}"
# Check if index supports integrated inference
try:
# Try using integrated inference first (if index was created with create_index_for_model)
record = {
"id": experience_id,
"content": combined_text, # This will be automatically embedded
"metadata": {
"user_input": user_input[:1000],
"ai_response": ai_response[:1000],
"context": context[:500],
"timestamp": datetime.datetime.now().isoformat(),
"interaction_type": "conversation",
"session_id": getattr(self, 'session_id', 'default')
}
}
# Try upsert with integrated inference
self.index.upsert_records([record])
return f"βœ… Experience stored with integrated inference, ID: {experience_id[:8]}... in index: {self.index_name}"
except Exception as integrated_error:
print(f"Integrated inference failed: {integrated_error}")
# Fallback to manual embedding
embedding = self.create_embedding(combined_text)
# Store in Pinecone with manual embedding
self.index.upsert([(experience_id, embedding, {
"user_input": user_input[:1000],
"ai_response": ai_response[:1000],
"context": context[:500],
"timestamp": datetime.datetime.now().isoformat(),
"interaction_type": "conversation",
"session_id": getattr(self, 'session_id', 'default')
})])
return f"βœ… Experience stored with manual embedding, ID: {experience_id[:8]}... in index: {self.index_name}"
except Exception as e:
return f"❌ Error storing experience: {e}"
def retrieve_relevant_experiences(self, query: str, top_k: int = 3) -> List[Dict]:
"""Retrieve relevant past experiences based on query using Pinecone inference"""
if not self.index:
return []
try:
# Try integrated search first
try:
results = self.index.search_records(
query={
"top_k": top_k,
"inputs": {"text": query}
},
include_metadata=True
)
relevant_experiences = []
if hasattr(results, 'matches'):
for match in results.matches:
if match.score > 0.3:
relevant_experiences.append({
"score": match.score,
"user_input": match.metadata.get("user_input", ""),
"ai_response": match.metadata.get("ai_response", ""),
"context": match.metadata.get("context", ""),
"timestamp": match.metadata.get("timestamp", ""),
"id": match.id
})
return relevant_experiences
except Exception as integrated_error:
print(f"Integrated search failed: {integrated_error}")
# Fallback to manual embedding + query
query_embedding = self.create_query_embedding(query)
# Search Pinecone with manual embedding
results = self.index.query(
vector=query_embedding,
top_k=top_k,
include_metadata=True
)
relevant_experiences = []
for match in results.matches:
if match.score > 0.3:
relevant_experiences.append({
"score": match.score,
"user_input": match.metadata.get("user_input", ""),
"ai_response": match.metadata.get("ai_response", ""),
"context": match.metadata.get("context", ""),
"timestamp": match.metadata.get("timestamp", ""),
"id": match.id
})
return relevant_experiences
except Exception as e:
print(f"Error retrieving experiences: {e}")
return []
def rerank_results(self, query: str, documents: List[str]) -> List[Dict]:
"""Rerank results using Pinecone's reranking model"""
if not self.pc or not documents:
return []
try:
# Use Pinecone's inference API for reranking
response = self.pc.inference.rerank(
model=self.rerank_model,
query=query,
documents=documents,
top_k=min(len(documents), 5) # Rerank top 5
)
reranked_results = []
if response and hasattr(response, 'data'):
for result in response.data:
reranked_results.append({
"document": result.document.text,
"score": result.relevance_score,
"index": result.index
})
return reranked_results
except Exception as e:
print(f"Error reranking results: {e}")
return []
def call_openrouter(self, messages: List[Dict], temperature: float = 0.7) -> str:
"""Call OpenRouter API"""
if not self.openrouter_api_key:
return "Error: OpenRouter API key not configured. Please set the OPENROUTER_API_KEY environment variable."
try:
headers = {
"Authorization": f"Bearer {self.openrouter_api_key}",
"Content-Type": "application/json",
"HTTP-Referer": "https://huggingface.co",
"X-Title": "AI RAG Memory System"
}
data = {
"model": self.model_name,
"messages": messages,
"temperature": temperature,
"max_tokens": 1000
}
response = requests.post(
"https://openrouter.ai/api/v1/chat/completions",
headers=headers,
json=data,
timeout=30
)
if response.status_code == 200:
result = response.json()
return result["choices"][0]["message"]["content"]
else:
return f"API Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Error calling OpenRouter: {e}"
def generate_response_with_rag(self, user_input: str, conversation_history: List = None) -> tuple:
"""Generate AI response using RAG with stored experiences and Pinecone inference"""
# Retrieve relevant past experiences
relevant_experiences = self.retrieve_relevant_experiences(user_input)
# Build context from relevant experiences
context_parts = []
if relevant_experiences:
context_parts.append("🧠 Relevant past experiences (powered by Pinecone inference):")
# Extract documents for reranking
documents = [f"User: {exp['user_input']} AI: {exp['ai_response']}" for exp in relevant_experiences]
# Try to rerank the results
reranked = self.rerank_results(user_input, documents)
if reranked:
context_parts.append(f"\nπŸ”„ Reranked results using {self.rerank_model}:")
for i, result in enumerate(reranked, 1):
context_parts.append(f"{i}. (Relevance: {result['score']:.3f}) {result['document'][:200]}...")
else:
# Fallback to original results
for i, exp in enumerate(relevant_experiences, 1):
context_parts.append(f"\n{i}. Previous interaction (similarity: {exp['score']:.2f}):")
context_parts.append(f" πŸ‘€ User: {exp['user_input'][:200]}...")
context_parts.append(f" πŸ€– AI: {exp['ai_response'][:200]}...")
context_parts.append(f" πŸ•’ Time: {exp['timestamp'][:19]}")
if exp['context']:
context_parts.append(f" πŸ“ Context: {exp['context'][:100]}...")
context_parts.append("")
else:
context_parts.append("πŸ†• No previous relevant experiences found. This is a fresh conversation!")
context_str = "\n".join(context_parts)
# Build messages for the AI
messages = [
{
"role": "system",
"content": f"""You are an AI assistant with access to your past experiences and interactions through Pinecone's vector database with integrated inference.
The embeddings are generated using {self.embedding_model} and results are reranked with {self.rerank_model}.
Use the relevant past experiences below to inform your response, but don't just repeat them - learn from them and provide thoughtful, personalized responses.
{context_str}
Guidelines:
- Reference past experiences when relevant and helpful
- Show that you remember and learn from interactions using Pinecone's memory system
- Provide helpful, contextual responses
- Be conversational and engaging
- If you see similar questions from before, build upon previous responses
- Acknowledge when you're learning something new through the memory system"""
}
]
# Add conversation history if provided
if conversation_history:
for msg in conversation_history[-5:]: # Last 5 messages
messages.append(msg)
# Add current user input
messages.append({"role": "user", "content": user_input})
# Generate response
ai_response = self.call_openrouter(messages)
# Store this interaction as a new experience
storage_result = self.store_experience(user_input, ai_response, context_str)
return ai_response, context_str, storage_result
# Initialize the RAG system
rag_system = RAGMemorySystem()
def chat_with_rag(message: str, history: List = None) -> tuple:
"""Main chat function for Gradio interface"""
if not message.strip():
return "Please enter a message.", "", ""
# Convert Gradio history format to OpenAI format
conversation_history = []
if history:
for user_msg, ai_msg in history:
if user_msg:
conversation_history.append({"role": "user", "content": user_msg})
if ai_msg:
conversation_history.append({"role": "assistant", "content": ai_msg})
# Generate response with RAG
ai_response, context_used, storage_info = rag_system.generate_response_with_rag(
message, conversation_history
)
return ai_response, context_used, storage_info
def clear_conversation():
"""Clear the conversation history"""
return [], "", "", ""
def get_system_status():
"""Get current system status"""
status = []
# Check Pinecone connection
if rag_system.index:
try:
stats = rag_system.index.describe_index_stats()
status.append(f"βœ… Pinecone: Connected to '{rag_system.index_name}'")
status.append(f"πŸ“Š Stored experiences: {stats.get('total_vector_count', 0)}")
status.append(f"🧠 Embedding model: {rag_system.embedding_model}")
status.append(f"πŸ”„ Reranking model: {rag_system.rerank_model}")
except:
status.append(f"⚠️ Pinecone: Connected but cannot get stats")
else:
status.append("❌ Pinecone: Not connected")
# Check OpenRouter
if rag_system.openrouter_api_key:
status.append(f"βœ… OpenRouter: API key configured")
status.append(f"πŸ€– Model: {rag_system.model_name}")
else:
status.append("❌ OpenRouter: API key not configured")
return "\n".join(status)
# Create Gradio interface
with gr.Blocks(
title="AI with Pinecone Integrated Inference RAG",
theme=gr.themes.Soft(),
css="""
.container { max-width: 1200px; margin: auto; }
.chat-container { height: 400px; overflow-y: auto; }
.context-box { background-color: #f8f9fa; padding: 10px; border-radius: 5px; font-family: monospace; }
.status-box { background-color: #e8f4f8; padding: 10px; border-radius: 5px; font-family: monospace; }
"""
) as demo:
gr.HTML("""
<div style="text-align: center; padding: 20px;">
<h1>πŸ€– AI Assistant with Pinecone Integrated Inference RAG</h1>
<p>This AI assistant uses Pinecone's integrated inference for embeddings and reranking with vector storage for memory.</p>
<p>Powered by <strong>multilingual-e5-large</strong> embeddings and <strong>pinecone-rerank-v0</strong> reranking model.</p>
<p><strong>πŸ†• Auto-Environment Creation:</strong> The system automatically creates a new Pinecone environment with integrated inference!</p>
</div>
""")
# System Status
with gr.Row():
with gr.Column():
status_display = gr.Textbox(
label="πŸ”§ System Status",
value=get_system_status(),
lines=8,
interactive=False,
elem_classes=["status-box"]
)
refresh_status_btn = gr.Button("πŸ”„ Refresh Status", variant="secondary")
with gr.Row():
with gr.Column(scale=2):
chatbot = gr.Chatbot(
label="Conversation",
height=400,
elem_classes=["chat-container"]
)
with gr.Row():
msg = gr.Textbox(
placeholder="Type your message here...",
label="Your Message",
lines=2,
scale=4
)
send_btn = gr.Button("Send", variant="primary", scale=1)
clear_btn = gr.Button("Clear", variant="secondary", scale=1)
with gr.Column(scale=1):
gr.HTML("<h3>πŸ“š RAG Context</h3>")
context_display = gr.Textbox(
label="Retrieved & Reranked Experiences",
lines=15,
interactive=False,
elem_classes=["context-box"]
)
storage_info = gr.Textbox(
label="Memory Storage Info",
lines=3,
interactive=False
)
with gr.Row():
with gr.Column():
gr.HTML("""
<div style="margin-top: 20px; padding: 15px; background-color: #e8f4f8; border-radius: 8px;">
<h3>πŸ”§ Configuration</h3>
<p><strong>Pinecone:</strong> βœ… Auto-configured with integrated inference</p>
<p><strong>Embedding Model:</strong> multilingual-e5-large (1024 dimensions)</p>
<p><strong>Reranking Model:</strong> pinecone-rerank-v0</p>
<p><strong>OpenRouter:</strong> Set <code>OPENROUTER_API_KEY</code> environment variable</p>
<br>
<p><strong>πŸš€ Pinecone Integrated Inference Features:</strong></p>
<ul>
<li>🧠 Automatic text-to-vector conversion during upsert and search</li>
<li>πŸ” Smart retrieval with multilingual embeddings</li>
<li>πŸ”„ Advanced reranking for improved relevance</li>
<li>πŸ“ˆ Learning and improvement over time</li>
<li>πŸ†” Unique environment creation for each session</li>
<li>⚑ Single API for embedding, storage, and retrieval</li>
</ul>
<br>
<p><strong>Model Options:</strong></p>
<ul>
<li><code>multilingual-e5-large</code> - Multilingual embeddings (default)</li>
<li><code>pinecone-rerank-v0</code> - Pinecone's reranking model (default)</li>
<li><code>cohere-rerank-v3.5</code> - Cohere's reranking model</li>
<li><code>pinecone-sparse-english-v0</code> - Sparse embeddings for keyword search</li>
</ul>
</div>
""")
# Event handlers
def respond(message, history):
if not message:
return history, "", "", ""
# Get AI response
ai_response, context_used, storage_info_text = chat_with_rag(message, history)
# Update history
if history is None:
history = []
history.append((message, ai_response))
return history, "", context_used, storage_info_text
# Wire up the interface
send_btn.click(
respond,
inputs=[msg, chatbot],
outputs=[chatbot, msg, context_display, storage_info]
)
msg.submit(
respond,
inputs=[msg, chatbot],
outputs=[chatbot, msg, context_display, storage_info]
)
clear_btn.click(
clear_conversation,
outputs=[chatbot, msg, context_display, storage_info]
)
refresh_status_btn.click(
get_system_status,
outputs=[status_display]
)
# Launch the app
if __name__ == "__main__":
demo.launch(
share=True,
server_name="0.0.0.0",
server_port=7860
)