|
import gradio as gr |
|
import os |
|
import json |
|
import hashlib |
|
import datetime |
|
from typing import List, Dict, Any, Optional |
|
import requests |
|
import time |
|
import uuid |
|
from pinecone import Pinecone |
|
|
|
class RAGMemorySystem: |
|
"""RAG system using Pinecone with integrated inference for embeddings and vector storage""" |
|
|
|
def __init__(self): |
|
|
|
self.pinecone_api_key = os.getenv("PINECONE_API_KEY") |
|
self.pinecone_environment = os.getenv("PINECONE_ENVIRONMENT", "us-east-1") |
|
|
|
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M") |
|
user_id = str(uuid.uuid4())[:8] |
|
self.index_name = os.getenv("PINECONE_INDEX_NAME", f"ai-experiences-{timestamp}-{user_id}") |
|
|
|
|
|
self.embedding_model = os.getenv("PINECONE_EMBEDDING_MODEL", "multilingual-e5-large") |
|
self.rerank_model = os.getenv("PINECONE_RERANK_MODEL", "pinecone-rerank-v0") |
|
|
|
|
|
self.openrouter_api_key = os.getenv("OPENROUTER_API_KEY") |
|
self.model_name = os.getenv("MODEL_NAME", "meta-llama/llama-4-maverick:free") |
|
|
|
|
|
self.pc = None |
|
self.index = None |
|
|
|
|
|
self.init_pinecone() |
|
|
|
def init_pinecone(self): |
|
"""Initialize Pinecone connection with integrated inference""" |
|
try: |
|
if self.pinecone_api_key: |
|
|
|
self.pc = Pinecone(api_key=self.pinecone_api_key) |
|
|
|
print(f"Attempting to connect to Pinecone...") |
|
|
|
|
|
try: |
|
existing_indexes = [idx.name for idx in self.pc.list_indexes()] |
|
print(f"Existing indexes: {existing_indexes}") |
|
except Exception as list_error: |
|
print(f"Error listing indexes: {list_error}") |
|
existing_indexes = [] |
|
|
|
|
|
if self.index_name not in existing_indexes: |
|
print(f"Creating new Pinecone index with integrated inference: {self.index_name}") |
|
try: |
|
|
|
index_model = self.pc.create_index_for_model( |
|
name=self.index_name, |
|
cloud="aws", |
|
region="us-east-1", |
|
embed={ |
|
"model": self.embedding_model, |
|
"field_map": {"text": "content"} |
|
} |
|
) |
|
print(f"Successfully created index with integrated inference: {self.index_name}") |
|
print(f"Index details: {index_model}") |
|
|
|
|
|
time.sleep(10) |
|
|
|
except Exception as create_error: |
|
print(f"Error creating index with integrated inference: {create_error}") |
|
|
|
try: |
|
self.pc.create_index( |
|
name=self.index_name, |
|
dimension=1024, |
|
metric="cosine", |
|
spec={ |
|
"serverless": { |
|
"cloud": "aws", |
|
"region": "us-east-1" |
|
} |
|
} |
|
) |
|
print(f"Created fallback traditional index: {self.index_name}") |
|
time.sleep(5) |
|
except Exception as fallback_error: |
|
print(f"Failed to create fallback index: {fallback_error}") |
|
|
|
|
|
simple_name = f"ai-exp-{str(uuid.uuid4())[:6]}" |
|
try: |
|
self.pc.create_index( |
|
name=simple_name, |
|
dimension=1024, |
|
metric="cosine", |
|
spec={ |
|
"serverless": { |
|
"cloud": "aws", |
|
"region": "us-east-1" |
|
} |
|
} |
|
) |
|
self.index_name = simple_name |
|
print(f"Created simple fallback index: {self.index_name}") |
|
time.sleep(5) |
|
except Exception as final_error: |
|
print(f"Final index creation failed: {final_error}") |
|
self.index = None |
|
return |
|
|
|
|
|
try: |
|
self.index = self.pc.Index(self.index_name) |
|
print(f"Successfully connected to Pinecone index: {self.index_name}") |
|
|
|
|
|
stats = self.index.describe_index_stats() |
|
print(f"Index stats: {stats}") |
|
|
|
except Exception as connect_error: |
|
print(f"Error connecting to index: {connect_error}") |
|
self.index = None |
|
|
|
else: |
|
print("Warning: Pinecone API key not found. Memory storage disabled.") |
|
self.index = None |
|
|
|
except Exception as e: |
|
print(f"Error initializing Pinecone: {e}") |
|
self.index = None |
|
|
|
def create_embedding(self, text: str) -> List[float]: |
|
"""Create embedding using Pinecone's inference API""" |
|
try: |
|
if not self.pc: |
|
print("Pinecone client not available, returning zero vector") |
|
return [0.0] * 1024 |
|
|
|
|
|
response = self.pc.inference.embed( |
|
model=self.embedding_model, |
|
inputs=[text], |
|
parameters={ |
|
"input_type": "passage", |
|
"truncate": "END" |
|
} |
|
) |
|
|
|
if response and len(response.data) > 0: |
|
return response.data[0].values |
|
else: |
|
print("No embedding data received, returning zero vector") |
|
return [0.0] * 1024 |
|
|
|
except Exception as e: |
|
print(f"Error creating embedding with Pinecone inference: {e}") |
|
return [0.0] * 1024 |
|
|
|
def create_query_embedding(self, text: str) -> List[float]: |
|
"""Create embedding for query using Pinecone's inference API""" |
|
try: |
|
if not self.pc: |
|
print("Pinecone client not available, returning zero vector") |
|
return [0.0] * 1024 |
|
|
|
|
|
response = self.pc.inference.embed( |
|
model=self.embedding_model, |
|
inputs=[text], |
|
parameters={ |
|
"input_type": "query", |
|
"truncate": "END" |
|
} |
|
) |
|
|
|
if response and len(response.data) > 0: |
|
return response.data[0].values |
|
else: |
|
print("No embedding data received, returning zero vector") |
|
return [0.0] * 1024 |
|
|
|
except Exception as e: |
|
print(f"Error creating query embedding with Pinecone inference: {e}") |
|
return [0.0] * 1024 |
|
|
|
def store_experience(self, user_input: str, ai_response: str, context: str = "") -> str: |
|
"""Store conversation experience in Pinecone using integrated inference""" |
|
if not self.index: |
|
return "Memory storage not available (Pinecone not configured)" |
|
|
|
try: |
|
|
|
experience_id = hashlib.md5( |
|
f"{user_input}_{ai_response}_{datetime.datetime.now()}_{uuid.uuid4()}".encode() |
|
).hexdigest() |
|
|
|
|
|
combined_text = f"User: {user_input}\nAI: {ai_response}\nContext: {context}" |
|
|
|
|
|
try: |
|
|
|
record = { |
|
"id": experience_id, |
|
"content": combined_text, |
|
"metadata": { |
|
"user_input": user_input[:1000], |
|
"ai_response": ai_response[:1000], |
|
"context": context[:500], |
|
"timestamp": datetime.datetime.now().isoformat(), |
|
"interaction_type": "conversation", |
|
"session_id": getattr(self, 'session_id', 'default') |
|
} |
|
} |
|
|
|
|
|
self.index.upsert_records([record]) |
|
return f"β
Experience stored with integrated inference, ID: {experience_id[:8]}... in index: {self.index_name}" |
|
|
|
except Exception as integrated_error: |
|
print(f"Integrated inference failed: {integrated_error}") |
|
|
|
|
|
embedding = self.create_embedding(combined_text) |
|
|
|
|
|
self.index.upsert([(experience_id, embedding, { |
|
"user_input": user_input[:1000], |
|
"ai_response": ai_response[:1000], |
|
"context": context[:500], |
|
"timestamp": datetime.datetime.now().isoformat(), |
|
"interaction_type": "conversation", |
|
"session_id": getattr(self, 'session_id', 'default') |
|
})]) |
|
|
|
return f"β
Experience stored with manual embedding, ID: {experience_id[:8]}... in index: {self.index_name}" |
|
|
|
except Exception as e: |
|
return f"β Error storing experience: {e}" |
|
|
|
def retrieve_relevant_experiences(self, query: str, top_k: int = 3) -> List[Dict]: |
|
"""Retrieve relevant past experiences based on query using Pinecone inference""" |
|
if not self.index: |
|
return [] |
|
|
|
try: |
|
|
|
try: |
|
results = self.index.search_records( |
|
query={ |
|
"top_k": top_k, |
|
"inputs": {"text": query} |
|
}, |
|
include_metadata=True |
|
) |
|
|
|
relevant_experiences = [] |
|
if hasattr(results, 'matches'): |
|
for match in results.matches: |
|
if match.score > 0.3: |
|
relevant_experiences.append({ |
|
"score": match.score, |
|
"user_input": match.metadata.get("user_input", ""), |
|
"ai_response": match.metadata.get("ai_response", ""), |
|
"context": match.metadata.get("context", ""), |
|
"timestamp": match.metadata.get("timestamp", ""), |
|
"id": match.id |
|
}) |
|
|
|
return relevant_experiences |
|
|
|
except Exception as integrated_error: |
|
print(f"Integrated search failed: {integrated_error}") |
|
|
|
|
|
query_embedding = self.create_query_embedding(query) |
|
|
|
|
|
results = self.index.query( |
|
vector=query_embedding, |
|
top_k=top_k, |
|
include_metadata=True |
|
) |
|
|
|
relevant_experiences = [] |
|
for match in results.matches: |
|
if match.score > 0.3: |
|
relevant_experiences.append({ |
|
"score": match.score, |
|
"user_input": match.metadata.get("user_input", ""), |
|
"ai_response": match.metadata.get("ai_response", ""), |
|
"context": match.metadata.get("context", ""), |
|
"timestamp": match.metadata.get("timestamp", ""), |
|
"id": match.id |
|
}) |
|
|
|
return relevant_experiences |
|
|
|
except Exception as e: |
|
print(f"Error retrieving experiences: {e}") |
|
return [] |
|
|
|
def rerank_results(self, query: str, documents: List[str]) -> List[Dict]: |
|
"""Rerank results using Pinecone's reranking model""" |
|
if not self.pc or not documents: |
|
return [] |
|
|
|
try: |
|
|
|
response = self.pc.inference.rerank( |
|
model=self.rerank_model, |
|
query=query, |
|
documents=documents, |
|
top_k=min(len(documents), 5) |
|
) |
|
|
|
reranked_results = [] |
|
if response and hasattr(response, 'data'): |
|
for result in response.data: |
|
reranked_results.append({ |
|
"document": result.document.text, |
|
"score": result.relevance_score, |
|
"index": result.index |
|
}) |
|
|
|
return reranked_results |
|
|
|
except Exception as e: |
|
print(f"Error reranking results: {e}") |
|
return [] |
|
|
|
def call_openrouter(self, messages: List[Dict], temperature: float = 0.7) -> str: |
|
"""Call OpenRouter API""" |
|
if not self.openrouter_api_key: |
|
return "Error: OpenRouter API key not configured. Please set the OPENROUTER_API_KEY environment variable." |
|
|
|
try: |
|
headers = { |
|
"Authorization": f"Bearer {self.openrouter_api_key}", |
|
"Content-Type": "application/json", |
|
"HTTP-Referer": "https://huggingface.co", |
|
"X-Title": "AI RAG Memory System" |
|
} |
|
|
|
data = { |
|
"model": self.model_name, |
|
"messages": messages, |
|
"temperature": temperature, |
|
"max_tokens": 1000 |
|
} |
|
|
|
response = requests.post( |
|
"https://openrouter.ai/api/v1/chat/completions", |
|
headers=headers, |
|
json=data, |
|
timeout=30 |
|
) |
|
|
|
if response.status_code == 200: |
|
result = response.json() |
|
return result["choices"][0]["message"]["content"] |
|
else: |
|
return f"API Error: {response.status_code} - {response.text}" |
|
|
|
except Exception as e: |
|
return f"Error calling OpenRouter: {e}" |
|
|
|
def generate_response_with_rag(self, user_input: str, conversation_history: List = None) -> tuple: |
|
"""Generate AI response using RAG with stored experiences and Pinecone inference""" |
|
|
|
relevant_experiences = self.retrieve_relevant_experiences(user_input) |
|
|
|
|
|
context_parts = [] |
|
if relevant_experiences: |
|
context_parts.append("π§ Relevant past experiences (powered by Pinecone inference):") |
|
|
|
|
|
documents = [f"User: {exp['user_input']} AI: {exp['ai_response']}" for exp in relevant_experiences] |
|
|
|
|
|
reranked = self.rerank_results(user_input, documents) |
|
|
|
if reranked: |
|
context_parts.append(f"\nπ Reranked results using {self.rerank_model}:") |
|
for i, result in enumerate(reranked, 1): |
|
context_parts.append(f"{i}. (Relevance: {result['score']:.3f}) {result['document'][:200]}...") |
|
else: |
|
|
|
for i, exp in enumerate(relevant_experiences, 1): |
|
context_parts.append(f"\n{i}. Previous interaction (similarity: {exp['score']:.2f}):") |
|
context_parts.append(f" π€ User: {exp['user_input'][:200]}...") |
|
context_parts.append(f" π€ AI: {exp['ai_response'][:200]}...") |
|
context_parts.append(f" π Time: {exp['timestamp'][:19]}") |
|
if exp['context']: |
|
context_parts.append(f" π Context: {exp['context'][:100]}...") |
|
context_parts.append("") |
|
else: |
|
context_parts.append("π No previous relevant experiences found. This is a fresh conversation!") |
|
|
|
context_str = "\n".join(context_parts) |
|
|
|
|
|
messages = [ |
|
{ |
|
"role": "system", |
|
"content": f"""You are an AI assistant with access to your past experiences and interactions through Pinecone's vector database with integrated inference. |
|
The embeddings are generated using {self.embedding_model} and results are reranked with {self.rerank_model}. |
|
|
|
Use the relevant past experiences below to inform your response, but don't just repeat them - learn from them and provide thoughtful, personalized responses. |
|
|
|
{context_str} |
|
|
|
Guidelines: |
|
- Reference past experiences when relevant and helpful |
|
- Show that you remember and learn from interactions using Pinecone's memory system |
|
- Provide helpful, contextual responses |
|
- Be conversational and engaging |
|
- If you see similar questions from before, build upon previous responses |
|
- Acknowledge when you're learning something new through the memory system""" |
|
} |
|
] |
|
|
|
|
|
if conversation_history: |
|
for msg in conversation_history[-5:]: |
|
messages.append(msg) |
|
|
|
|
|
messages.append({"role": "user", "content": user_input}) |
|
|
|
|
|
ai_response = self.call_openrouter(messages) |
|
|
|
|
|
storage_result = self.store_experience(user_input, ai_response, context_str) |
|
|
|
return ai_response, context_str, storage_result |
|
|
|
|
|
rag_system = RAGMemorySystem() |
|
|
|
def chat_with_rag(message: str, history: List = None) -> tuple: |
|
"""Main chat function for Gradio interface""" |
|
if not message.strip(): |
|
return "Please enter a message.", "", "" |
|
|
|
|
|
conversation_history = [] |
|
if history: |
|
for user_msg, ai_msg in history: |
|
if user_msg: |
|
conversation_history.append({"role": "user", "content": user_msg}) |
|
if ai_msg: |
|
conversation_history.append({"role": "assistant", "content": ai_msg}) |
|
|
|
|
|
ai_response, context_used, storage_info = rag_system.generate_response_with_rag( |
|
message, conversation_history |
|
) |
|
|
|
return ai_response, context_used, storage_info |
|
|
|
def clear_conversation(): |
|
"""Clear the conversation history""" |
|
return [], "", "", "" |
|
|
|
def get_system_status(): |
|
"""Get current system status""" |
|
status = [] |
|
|
|
|
|
if rag_system.index: |
|
try: |
|
stats = rag_system.index.describe_index_stats() |
|
status.append(f"β
Pinecone: Connected to '{rag_system.index_name}'") |
|
status.append(f"π Stored experiences: {stats.get('total_vector_count', 0)}") |
|
status.append(f"π§ Embedding model: {rag_system.embedding_model}") |
|
status.append(f"π Reranking model: {rag_system.rerank_model}") |
|
except: |
|
status.append(f"β οΈ Pinecone: Connected but cannot get stats") |
|
else: |
|
status.append("β Pinecone: Not connected") |
|
|
|
|
|
if rag_system.openrouter_api_key: |
|
status.append(f"β
OpenRouter: API key configured") |
|
status.append(f"π€ Model: {rag_system.model_name}") |
|
else: |
|
status.append("β OpenRouter: API key not configured") |
|
|
|
return "\n".join(status) |
|
|
|
|
|
with gr.Blocks( |
|
title="AI with Pinecone Integrated Inference RAG", |
|
theme=gr.themes.Soft(), |
|
css=""" |
|
.container { max-width: 1200px; margin: auto; } |
|
.chat-container { height: 400px; overflow-y: auto; } |
|
.context-box { background-color: #f8f9fa; padding: 10px; border-radius: 5px; font-family: monospace; } |
|
.status-box { background-color: #e8f4f8; padding: 10px; border-radius: 5px; font-family: monospace; } |
|
""" |
|
) as demo: |
|
|
|
gr.HTML(""" |
|
<div style="text-align: center; padding: 20px;"> |
|
<h1>π€ AI Assistant with Pinecone Integrated Inference RAG</h1> |
|
<p>This AI assistant uses Pinecone's integrated inference for embeddings and reranking with vector storage for memory.</p> |
|
<p>Powered by <strong>multilingual-e5-large</strong> embeddings and <strong>pinecone-rerank-v0</strong> reranking model.</p> |
|
<p><strong>π Auto-Environment Creation:</strong> The system automatically creates a new Pinecone environment with integrated inference!</p> |
|
</div> |
|
""") |
|
|
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
status_display = gr.Textbox( |
|
label="π§ System Status", |
|
value=get_system_status(), |
|
lines=8, |
|
interactive=False, |
|
elem_classes=["status-box"] |
|
) |
|
refresh_status_btn = gr.Button("π Refresh Status", variant="secondary") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=2): |
|
chatbot = gr.Chatbot( |
|
label="Conversation", |
|
height=400, |
|
elem_classes=["chat-container"] |
|
) |
|
|
|
with gr.Row(): |
|
msg = gr.Textbox( |
|
placeholder="Type your message here...", |
|
label="Your Message", |
|
lines=2, |
|
scale=4 |
|
) |
|
send_btn = gr.Button("Send", variant="primary", scale=1) |
|
clear_btn = gr.Button("Clear", variant="secondary", scale=1) |
|
|
|
with gr.Column(scale=1): |
|
gr.HTML("<h3>π RAG Context</h3>") |
|
context_display = gr.Textbox( |
|
label="Retrieved & Reranked Experiences", |
|
lines=15, |
|
interactive=False, |
|
elem_classes=["context-box"] |
|
) |
|
|
|
storage_info = gr.Textbox( |
|
label="Memory Storage Info", |
|
lines=3, |
|
interactive=False |
|
) |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
gr.HTML(""" |
|
<div style="margin-top: 20px; padding: 15px; background-color: #e8f4f8; border-radius: 8px;"> |
|
<h3>π§ Configuration</h3> |
|
<p><strong>Pinecone:</strong> β
Auto-configured with integrated inference</p> |
|
<p><strong>Embedding Model:</strong> multilingual-e5-large (1024 dimensions)</p> |
|
<p><strong>Reranking Model:</strong> pinecone-rerank-v0</p> |
|
<p><strong>OpenRouter:</strong> Set <code>OPENROUTER_API_KEY</code> environment variable</p> |
|
<br> |
|
<p><strong>π Pinecone Integrated Inference Features:</strong></p> |
|
<ul> |
|
<li>π§ Automatic text-to-vector conversion during upsert and search</li> |
|
<li>π Smart retrieval with multilingual embeddings</li> |
|
<li>π Advanced reranking for improved relevance</li> |
|
<li>π Learning and improvement over time</li> |
|
<li>π Unique environment creation for each session</li> |
|
<li>β‘ Single API for embedding, storage, and retrieval</li> |
|
</ul> |
|
<br> |
|
<p><strong>Model Options:</strong></p> |
|
<ul> |
|
<li><code>multilingual-e5-large</code> - Multilingual embeddings (default)</li> |
|
<li><code>pinecone-rerank-v0</code> - Pinecone's reranking model (default)</li> |
|
<li><code>cohere-rerank-v3.5</code> - Cohere's reranking model</li> |
|
<li><code>pinecone-sparse-english-v0</code> - Sparse embeddings for keyword search</li> |
|
</ul> |
|
</div> |
|
""") |
|
|
|
|
|
def respond(message, history): |
|
if not message: |
|
return history, "", "", "" |
|
|
|
|
|
ai_response, context_used, storage_info_text = chat_with_rag(message, history) |
|
|
|
|
|
if history is None: |
|
history = [] |
|
history.append((message, ai_response)) |
|
|
|
return history, "", context_used, storage_info_text |
|
|
|
|
|
send_btn.click( |
|
respond, |
|
inputs=[msg, chatbot], |
|
outputs=[chatbot, msg, context_display, storage_info] |
|
) |
|
|
|
msg.submit( |
|
respond, |
|
inputs=[msg, chatbot], |
|
outputs=[chatbot, msg, context_display, storage_info] |
|
) |
|
|
|
clear_btn.click( |
|
clear_conversation, |
|
outputs=[chatbot, msg, context_display, storage_info] |
|
) |
|
|
|
refresh_status_btn.click( |
|
get_system_status, |
|
outputs=[status_display] |
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
demo.launch( |
|
share=True, |
|
server_name="0.0.0.0", |
|
server_port=7860 |
|
) |