Spaces:
Sleeping
Sleeping
| #βββ Basic imports βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| import os | |
| import math | |
| import sqlite3 | |
| import fitz # PyMuPDF for PDF parsing | |
| import re | |
| from dotenv import load_dotenv | |
| # Load environment variables from .env file | |
| load_dotenv() # This line ensures .env variables are loaded | |
| from langgraph.graph import START, StateGraph, MessagesState, END | |
| from langgraph.prebuilt import tools_condition | |
| from langgraph.prebuilt import ToolNode | |
| from langgraph.constants import START | |
| from langchain_core.tools import tool | |
| from langchain.schema import SystemMessage | |
| #from langchain.chat_models import init_chat_model | |
| #from langgraph.prebuilt import create_react_agent | |
| from langchain.embeddings import HuggingFaceEmbeddings | |
| #from langchain.vectorstores import Pinecone | |
| from langchain.tools.retriever import create_retriever_tool | |
| #import pinecone | |
| #from pinecone import Pinecone as PineconeClient, ServerlessSpec | |
| #from pinecone import Index # the blockingβcall client constructor | |
| #from pinecone import Pinecone as PineconeClient, ServerlessSpec | |
| from langchain.embeddings import HuggingFaceEmbeddings | |
| from langchain_community.vectorstores.pinecone import Pinecone as LC_Pinecone | |
| # βββ Langchain Frameworks βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| #from langchain.tools import Tool | |
| from langchain.chat_models import ChatOpenAI | |
| from langchain_groq import ChatGroq | |
| from langchain_mistralai import ChatMistralAI | |
| from langchain.agents import initialize_agent, AgentType | |
| from langchain.schema import Document | |
| from langchain.chains import RetrievalQA | |
| from langchain.embeddings import OpenAIEmbeddings | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from langchain.vectorstores import FAISS | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain.prompts import PromptTemplate | |
| from langchain_community.document_loaders import TextLoader, PyMuPDFLoader | |
| from langchain_community.document_loaders.wikipedia import WikipediaLoader | |
| from langchain_community.document_loaders.arxiv import ArxivLoader | |
| from langchain_experimental.tools.python.tool import PythonREPLTool | |
| # βββ Memory βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| from langchain.agents import initialize_agent, AgentType | |
| from langchain.tools import Tool | |
| from typing import List, Callable | |
| from langchain.schema import BaseMemory, AIMessage, HumanMessage, SystemMessage | |
| from langchain.schema import HumanMessage, SystemMessage | |
| from langchain.llms.base import LLM | |
| from langchain.memory.chat_memory import BaseChatMemory | |
| from pydantic import PrivateAttr | |
| from langchain_core.messages import get_buffer_string | |
| # βββ Image Processing ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| from PIL import Image | |
| import pytesseract | |
| from transformers import pipeline | |
| from groq import Groq | |
| import requests | |
| from io import BytesIO | |
| from transformers import pipeline, TrOCRProcessor, VisionEncoderDecoderModel | |
| import requests | |
| import base64 | |
| from PIL import UnidentifiedImageError | |
| # βββ Browser var βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| from typing import List, Dict | |
| import json | |
| from io import BytesIO | |
| #from langchain.tools import tool # or langchain_core.tools | |
| from playwright.sync_api import sync_playwright | |
| from duckduckgo_search import DDGS | |
| import time | |
| import random | |
| import logging | |
| from functools import lru_cache, wraps | |
| import requests | |
| from playwright.sync_api import sync_playwright | |
| from bs4 import BeautifulSoup | |
| import tenacity | |
| from tenacity import retry, stop_after_attempt, wait_exponential | |
| # Initialize logger | |
| logger = logging.getLogger(__name__) | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| # Additional imports for new functionality | |
| import pandas as pd | |
| from PyPDF2 import PdfReader | |
| import docx | |
| import pytesseract | |
| import speech_recognition as sr | |
| from pydub import AudioSegment | |
| from pytube import YouTube | |
| from newspaper import Article | |
| from langchain.document_loaders import ArxivLoader | |
| from langchain_community.document_loaders.youtube import YoutubeLoader, TranscriptFormat | |
| from playwright.sync_api import sync_playwright | |
| # Attempt to import Playwright for dynamic page rendering | |
| try: | |
| from playwright.sync_api import sync_playwright | |
| _playwright_available = True | |
| except ImportError: | |
| _playwright_available = False | |
| # Define forbidden keywords for basic NSFW filtering | |
| _forbidden = ["porn", "sex", "xxx", "nude", "erotic"] | |
| # βββ LLM Setup βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Load OpenAI API key from environment (required for LLM and embeddings) | |
| # API Keys from .env file | |
| os.environ.setdefault("OPENAI_API_KEY", "<YOUR_OPENAI_KEY>") # Set your own key or env var | |
| os.environ["GROQ_API_KEY"] = os.getenv("GROQ_API_KEY", "default_key_or_placeholder") | |
| os.environ["MISTRAL_API_KEY"] = os.getenv("MISTRAL_API_KEY", "default_key_or_placeholder") | |
| # Tavily API Key | |
| TAVILY_API_KEY = os.getenv("TAVILY_API_KEY", "default_key_or_placeholder") | |
| _forbidden = ["nsfw", "porn", "sex", "explicit"] | |
| _playwright_available = True # set False to disable Playwright | |
| # Globals for RAG system | |
| vector_store = None | |
| rag_chain = None | |
| DB_PATH = None # will be set when a .db is uploaded | |
| DOC_PATH = None # will be set when a document is uploaded | |
| IMG_PATH = None # will be set when an image is uploaded | |
| OTH_PATH = None # will be set when an other file is uploaded | |
| # βββ LLMS ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| #llm = ChatOpenAI(model_name="gpt-3.5-turbo", streaming=True, temperature=0) | |
| from tenacity import retry, stop_after_attempt, wait_exponential | |
| # Import the RetryingChatGroq client | |
| from retry_groq import RetryingChatGroq | |
| # Use the retrying version instead | |
| llm = RetryingChatGroq(model="deepseek-r1-distill-llama-70b", streaming=False, temperature=0) | |
| #llm = ChatMistralAI(model="mistral-large-latest", streaming=True, temperature=0) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # βββββββββββββββββββββββββββββββββββββββββββββββ Tool for multiply ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def multiply(a: int, b: int) -> int: | |
| """ | |
| Multiply two numbers. | |
| Args: | |
| a (int): The first factor. | |
| b (int): The second factor. | |
| Returns: | |
| int: The product of a and b. | |
| """ | |
| try: | |
| # Direct calculation without relying on LangChain handling | |
| result = a * b | |
| return result | |
| except Exception as e: | |
| return f"Error in multiplication: {str(e)}" | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # βββββββββββββββββββββββββββββββββββββββββββββββ Tool for add ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def add(a: int, b: int) -> int: | |
| """ | |
| Add two numbers. | |
| Args: | |
| a (int): The first factor. | |
| b (int): The second factor. | |
| Returns: | |
| int: The addition of a and b. | |
| """ | |
| try: | |
| # Direct calculation without relying on LangChain handling | |
| result = a + b | |
| return result | |
| except Exception as e: | |
| return f"Error in addition: {str(e)}" | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # βββββββββββββββββββββββββββββββββββββββββββββββ Tool for subtract ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def subtract(a: int, b: int) -> int: | |
| """ | |
| Subtract two numbers. | |
| Args: | |
| a (int): The first factor. | |
| b (int): The second factor. | |
| Returns: | |
| int: The subtraction of a and b. | |
| """ | |
| try: | |
| # Direct calculation without relying on LangChain handling | |
| result = a - b | |
| return result | |
| except Exception as e: | |
| return f"Error in subtraction: {str(e)}" | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # βββββββββββββββββββββββββββββββββββββββββββββββ Tool for divide ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def divide(a: int, b: int) -> int: | |
| """ | |
| Divide two numbers. | |
| Args: | |
| a (int): The numerator. | |
| b (int): The denominator. | |
| Returns: | |
| float: The result of a divided by b. | |
| Raises: | |
| ValueError: If b is zero. | |
| """ | |
| try: | |
| if b == 0: | |
| return "Error: Cannot divide by zero." | |
| # Direct calculation without relying on LangChain handling | |
| result = a / b | |
| return result | |
| except Exception as e: | |
| return f"Error in division: {str(e)}" | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # βββββββββββββββββββββββββββββββββββββββββββββββ Tool for modulus ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def modulus(a: int, b: int) -> int: | |
| """ | |
| Get the modulus (remainder) of two numbers. | |
| Args: | |
| a (int): The dividend. | |
| b (int): The divisor. | |
| Returns: | |
| int: The remainder when a is divided by b. | |
| """ | |
| try: | |
| if b == 0: | |
| return "Error: Cannot calculate modulus with zero divisor." | |
| # Direct calculation without relying on LangChain handling | |
| result = a % b | |
| return result | |
| except Exception as e: | |
| return f"Error in modulus calculation: {str(e)}" | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # βββββββββββββββββββββββββββββββββββββββββββββββ Tool for browsing ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def with_retry(max_attempts: int = 3, backoff_base: int = 2): | |
| """ | |
| Decorator for retrying a function with exponential backoff on exception. | |
| """ | |
| def decorator(fn): | |
| def wrapper(*args, **kwargs): | |
| for attempt in range(max_attempts): | |
| try: | |
| return fn(*args, **kwargs) | |
| except Exception as e: | |
| wait = backoff_base ** attempt + random.uniform(0, 1) | |
| logger.warning(f"{fn.__name__} failed (attempt {attempt+1}/{max_attempts}): {e}") | |
| if attempt < max_attempts - 1: | |
| time.sleep(wait) | |
| logger.error(f"{fn.__name__} failed after {max_attempts} attempts.") | |
| return [] | |
| return wrapper | |
| return decorator | |
| def tavily_search(query: str, top_k: int = 3) -> List[Dict]: | |
| """Call Tavily API and return a list of result dicts.""" | |
| if not TAVILY_API_KEY: | |
| logger.info("[Tavily] No API key set. Skipping Tavily search.") | |
| return [] | |
| url = "https://api.tavily.com/search" | |
| headers = { | |
| "Authorization": f"Bearer {TAVILY_API_KEY}", | |
| "Content-Type": "application/json", | |
| } | |
| payload = {"query": query, "num_results": top_k} | |
| resp = requests.post(url, headers=headers, json=payload, timeout=10) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| results = [] | |
| for item in data.get("results", []): | |
| results.append({ | |
| "title": item.get("title", ""), | |
| "url": item.get("url", ""), | |
| "content": item.get("content", "")[:200], | |
| "source": "Tavily" | |
| }) | |
| return results | |
| def duckduckgo_search(query: str, top_k: int = 3) -> List[Dict]: | |
| """Query DuckDuckGo and return up to top_k raw SERP hits.""" | |
| results = [] | |
| try: | |
| with DDGS(timeout=15) as ddgs: # Increase timeout from default | |
| for hit in ddgs.text(query, safesearch="On", max_results=top_k, timeout=15): | |
| results.append({ | |
| "title": hit.get("title", ""), | |
| "url": hit.get("href") or hit.get("url", ""), | |
| "content": hit.get("body", ""), | |
| "source": "DuckDuckGo" | |
| }) | |
| if len(results) >= top_k: | |
| break | |
| except Exception as e: | |
| logger.warning(f"DuckDuckGo search failed: {e}") | |
| # Don't re-raise - just return empty results to allow fallbacks to work | |
| return results | |
| # Additional fallback search alternative | |
| def simple_google_search(query: str, top_k: int = 3) -> List[Dict]: | |
| """Simplified Google search as a fallback when other methods fail.""" | |
| try: | |
| # Encode the query | |
| import urllib.parse | |
| import bs4 | |
| encoded_query = urllib.parse.quote(query) | |
| url = f"https://www.google.com/search?q={encoded_query}" | |
| headers = { | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36", | |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", | |
| "Accept-Language": "en-US,en;q=0.5", | |
| "Referer": "https://www.google.com/", | |
| "Connection": "keep-alive", | |
| } | |
| response = requests.get(url, headers=headers, timeout=20) | |
| response.raise_for_status() | |
| soup = bs4.BeautifulSoup(response.text, "html.parser") | |
| results = [] | |
| # Extract search results | |
| for result in soup.select("div.g")[:top_k]: | |
| title_elem = result.select_one("h3") | |
| link_elem = result.select_one("a") | |
| snippet_elem = result.select_one("div.VwiC3b") | |
| if title_elem and link_elem and snippet_elem and "href" in link_elem.attrs: | |
| href = link_elem["href"] | |
| if href.startswith("/url?q="): | |
| href = href.split("/url?q=")[1].split("&")[0] | |
| if href.startswith("http"): | |
| results.append({ | |
| "title": title_elem.get_text(), | |
| "url": href, | |
| "content": snippet_elem.get_text(), | |
| "source": "Google" | |
| }) | |
| return results | |
| except Exception as e: | |
| logger.warning(f"Simple Google search failed: {e}") | |
| return [] | |
| def hybrid_search(query: str, top_k: int = 3) -> List[Dict]: | |
| """Combine multiple search sources with fallbacks.""" | |
| # Try primary search methods first | |
| results = [] | |
| # Start with Tavily if API key is available | |
| if TAVILY_API_KEY and TAVILY_API_KEY != "default_key_or_placeholder": | |
| try: | |
| tavily_results = tavily_search(query, top_k) | |
| results.extend(tavily_results) | |
| logger.info(f"Retrieved {len(tavily_results)} results from Tavily") | |
| except Exception as e: | |
| logger.warning(f"Tavily search failed: {e}") | |
| # If we don't have enough results, try DuckDuckGo | |
| if len(results) < top_k: | |
| try: | |
| ddg_results = duckduckgo_search(query, top_k - len(results)) | |
| results.extend(ddg_results) | |
| logger.info(f"Retrieved {len(ddg_results)} results from DuckDuckGo") | |
| except Exception as e: | |
| logger.warning(f"DuckDuckGo search failed: {e}") | |
| # If we still don't have enough results, try Google | |
| if len(results) < top_k: | |
| try: | |
| google_results = simple_google_search(query, top_k - len(results)) | |
| results.extend(google_results) | |
| logger.info(f"Retrieved {len(google_results)} results from Google") | |
| except Exception as e: | |
| logger.warning(f"Google search failed: {e}") | |
| # If all search methods failed, return a dummy result | |
| if not results: | |
| results.append({ | |
| "title": "Search Failed", | |
| "url": "", | |
| "content": f"Sorry, I couldn't find results for '{query}'. Please try refining your search terms or check your internet connection.", | |
| "source": "No results" | |
| }) | |
| return results[:top_k] # Ensure we only return top_k results | |
| def format_search_docs(search_docs: List[Dict]) -> Dict[str, str]: | |
| """ | |
| Turn a list of {source, page, content} dicts into one big | |
| string with <Document ...>β¦</Document> entries separated by `---`. | |
| """ | |
| formatted_search_docs = "\n\n---\n\n".join( | |
| [ | |
| f'<Document source="{doc["source"]}" page="{doc.get("page", "")}"/>\n' | |
| f'{doc.get("content", "")}\n' | |
| f'</Document>' | |
| for doc in search_docs | |
| ] | |
| ) | |
| return {"web_results": formatted_search_docs} | |
| def web_search(query: str, top_k: int = 3) -> Dict[str, str]: | |
| """ | |
| Perform a hybrid web search combining multiple search engines with robust fallbacks. | |
| Args: | |
| query: The search query string to look up. | |
| top_k: The maximum number of search results to return (default is 3). | |
| Returns: | |
| A dictionary mapping result indices to XML-like <Document> blocks, each containing: | |
| - source: The URL of the webpage. | |
| - page: Placeholder for page identifier (empty string by default). | |
| - content: The first 200 words of the page text, cleaned of HTML tags. | |
| """ | |
| try: | |
| # Use our robust hybrid search to get initial results | |
| search_results = hybrid_search(query, top_k) | |
| results = [] | |
| # Process each search result to get better content | |
| for hit in search_results: | |
| url = hit.get("url") | |
| if not url: | |
| continue | |
| # Start with the snippet from search | |
| content = hit.get("content", "") | |
| title = hit.get("title", "") | |
| # Try to scrape additional content if possible | |
| try: | |
| # Use a random user agent to avoid blocking | |
| headers = { | |
| "User-Agent": random.choice([ | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36", | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Safari/605.1.15", | |
| "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.45 Safari/537.36", | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36 Edg/97.0.1072.62" | |
| ]), | |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", | |
| "Accept-Language": "en-US,en;q=0.5", | |
| "Referer": "https://www.google.com/", | |
| "DNT": "1", | |
| "Connection": "keep-alive" | |
| } | |
| # Higher timeout for better reliability | |
| resp = requests.get(url, timeout=15, headers=headers) | |
| # Only process if successful | |
| if resp.status_code == 200: | |
| soup = BeautifulSoup(resp.text, "html.parser") | |
| # Try to find main content | |
| main_content = soup.find('main') or soup.find('article') or soup.find('div', class_='content') | |
| # If we found main content, use it | |
| if main_content: | |
| extracted_text = main_content.get_text(separator=" ", strip=True) | |
| # Take first 200 words | |
| content = " ".join(extracted_text.split()[:200]) | |
| else: | |
| # Otherwise use all text | |
| all_text = soup.get_text(separator=" ", strip=True) | |
| content = " ".join(all_text.split()[:200]) | |
| # Use content from page only if it's substantial | |
| if len(content) < 50: | |
| content = hit.get("content", "")[:200] | |
| # Random delay between 0.5-1.5 seconds to avoid rate limits | |
| time.sleep(0.5 + random.random()) | |
| except requests.exceptions.HTTPError as e: | |
| logger.warning(f"HTTP error when scraping {url}: {e}") | |
| # Keep the search snippet as a fallback | |
| except requests.exceptions.RequestException as e: | |
| logger.warning(f"Request error when scraping {url}: {e}") | |
| # Keep the search snippet as a fallback | |
| except Exception as e: | |
| logger.warning(f"Unexpected error when scraping {url}: {e}") | |
| # Keep the search snippet as a fallback | |
| # Filter out inappropriate content | |
| if any(f in content.lower() for f in _forbidden): | |
| continue | |
| # Add to results | |
| results.append({ | |
| "source": url, | |
| "page": "", | |
| "content": content | |
| }) | |
| # Return formatted search docs | |
| return format_search_docs(results[:top_k]) | |
| except Exception as e: | |
| logger.error(f"Web search failed: {e}") | |
| # Return a helpful error message | |
| return format_search_docs([{ | |
| "source": "Error", | |
| "page": "", | |
| "content": f"Search failed with error: {e}. Please try again with different search terms." | |
| }]) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # βββββββββββββββββββββββββββββββββββββββββββββββ Tool for File System βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def download_file(url: str, dest_path: str) -> str: | |
| """ | |
| Download a file from a given URL and save it locally. | |
| Args: | |
| url: The direct URL of the file to download. | |
| dest_path: The local path to save the downloaded file. | |
| Returns: | |
| The destination path where the file was saved. | |
| """ | |
| r = requests.get(url, stream=True) | |
| r.raise_for_status() | |
| with open(dest_path, 'wb') as f: | |
| for chunk in r.iter_content(8192): | |
| f.write(chunk) | |
| return dest_path | |
| def process_excel_to_text(file_path: str) -> str: | |
| """ | |
| Convert an Excel file into CSV-formatted text. | |
| Args: | |
| file_path: Path to the Excel (.xlsx) file. | |
| Returns: | |
| A string of CSV-formatted content extracted from the Excel file. | |
| """ | |
| try: | |
| # Check if file exists | |
| import os | |
| if not os.path.exists(file_path): | |
| return f"Error: Excel file '{file_path}' does not exist." | |
| # Try different engines | |
| engines = ['openpyxl', 'xlrd', None] | |
| for engine in engines: | |
| try: | |
| # For engine=None, pandas will try to auto-detect | |
| if engine: | |
| df = pd.read_excel(file_path, engine=engine) | |
| else: | |
| df = pd.read_excel(file_path) | |
| return df.to_csv(index=False) | |
| except Exception as e: | |
| print(f"Excel engine {engine} failed: {e}") | |
| last_error = e | |
| continue | |
| # If we got here, all engines failed | |
| return f"Error processing Excel file: {str(last_error)}" | |
| except Exception as e: | |
| return f"Error with Excel file: {str(e)}" | |
| def read_text_from_pdf(file_path: str, question: str = None) -> str: | |
| """ | |
| Extract text from a PDF file, chunking large documents if needed. | |
| Args: | |
| file_path: Path to the PDF file. | |
| question: Optional question to help retrieve relevant parts of long documents. | |
| Returns: | |
| The extracted text content, potentially chunked if the document is large. | |
| """ | |
| try: | |
| # Check if file exists | |
| import os | |
| if not os.path.exists(file_path): | |
| return f"Error: PDF file '{file_path}' does not exist." | |
| reader = PdfReader(file_path) | |
| full_text = "\n".join([page.extract_text() or "" for page in reader.pages]) | |
| # If a question is provided, use retrieval to get relevant parts | |
| if question and len(full_text) > 5000: # Only chunk if text is large | |
| return process_large_document(full_text, question) | |
| return full_text | |
| except Exception as e: | |
| return f"Error reading PDF: {str(e)}" | |
| def read_text_from_docx(file_path: str, question: str = None) -> str: | |
| """ | |
| Extract text from a DOCX (Word) document, chunking large documents if needed. | |
| Args: | |
| file_path: Path to the DOCX file. | |
| question: Optional question to help retrieve relevant parts of long documents. | |
| Returns: | |
| The extracted text, potentially chunked if the document is large. | |
| """ | |
| try: | |
| # Check if file exists | |
| import os | |
| if not os.path.exists(file_path): | |
| return f"Error: File '{file_path}' does not exist." | |
| try: | |
| doc = docx.Document(file_path) | |
| full_text = "\n".join([para.text for para in doc.paragraphs]) | |
| except Exception as docx_err: | |
| # Handle "Package not found" error specifically | |
| if "Package not found" in str(docx_err): | |
| # Try to read raw text if possible | |
| try: | |
| import zipfile | |
| from xml.etree.ElementTree import XML | |
| WORD_NAMESPACE = '{http://schemas.openxmlformats.org/wordprocessingml/2006/main}' | |
| PARA = WORD_NAMESPACE + 'p' | |
| TEXT = WORD_NAMESPACE + 't' | |
| with zipfile.ZipFile(file_path) as docx_file: | |
| with docx_file.open('word/document.xml') as document: | |
| tree = XML(document.read()) | |
| paragraphs = [] | |
| for paragraph in tree.iter(PARA): | |
| texts = [node.text for node in paragraph.iter(TEXT) if node.text] | |
| if texts: | |
| paragraphs.append(''.join(texts)) | |
| full_text = '\n'.join(paragraphs) | |
| except Exception as e: | |
| return f"Error reading DOCX file: {str(e)}" | |
| else: | |
| return f"Error reading DOCX file: {str(docx_err)}" | |
| # If a question is provided, use retrieval to get relevant parts | |
| if question and len(full_text) > 5000: # Only chunk if text is large | |
| return process_large_document(full_text, question) | |
| return full_text | |
| except Exception as e: | |
| return f"Error reading DOCX file: {str(e)}" | |
| def transcribe_audio(file_path: str) -> str: | |
| """ | |
| Transcribe speech from a local audio file to text. | |
| Args: | |
| file_path: Path to the audio file. | |
| Returns: | |
| Transcribed text using Google Web Speech API. | |
| """ | |
| try: | |
| # Check if file exists | |
| import os | |
| if not os.path.exists(file_path): | |
| return f"Error: Audio file '{file_path}' does not exist." | |
| # For non-WAV files, convert to WAV first | |
| if not file_path.lower().endswith('.wav'): | |
| try: | |
| from pydub import AudioSegment | |
| temp_wav = os.path.splitext(file_path)[0] + "_temp.wav" | |
| audio = AudioSegment.from_file(file_path) | |
| audio.export(temp_wav, format="wav") | |
| file_path = temp_wav | |
| except Exception as e: | |
| return f"Failed to convert audio to WAV format: {str(e)}" | |
| recognizer = sr.Recognizer() | |
| with sr.AudioFile(file_path) as src: | |
| audio = recognizer.record(src) | |
| return recognizer.recognize_google(audio) | |
| except Exception as e: | |
| if "Audio file could not be read" in str(e): | |
| return f"Error: Audio format not supported. Try converting to WAV, MP3, OGG, or FLAC." | |
| return f"Error transcribing audio: {str(e)}" | |
| def youtube_audio_processing(youtube_url: str) -> str: | |
| """ | |
| Download and transcribe audio from a YouTube video. | |
| Args: | |
| youtube_url: URL of the YouTube video. | |
| Returns: | |
| Transcription text extracted from the video's audio. | |
| """ | |
| yt = YouTube(youtube_url) | |
| audio_stream = yt.streams.filter(only_audio=True).first() | |
| out_file = audio_stream.download(output_path='.', filename='yt_audio') | |
| wav_path = 'yt_audio.wav' | |
| AudioSegment.from_file(out_file).export(wav_path, format='wav') | |
| return transcribe_audio(wav_path) | |
| def extract_article_text(url: str, question: str = None) -> str: | |
| """ | |
| Download and extract the main article content from a webpage, chunking large articles if needed. | |
| Args: | |
| url: The URL of the article to extract. | |
| question: Optional question to help retrieve relevant parts of long articles. | |
| Returns: | |
| The article's textual content, potentially chunked if large. | |
| """ | |
| try: | |
| art = Article(url) | |
| art.download() | |
| art.parse() | |
| full_text = art.text | |
| # If a question is provided, use retrieval to get relevant parts | |
| if question and len(full_text) > 5000: # Only chunk if text is large | |
| return process_large_document(full_text, question) | |
| return full_text | |
| except Exception as e: | |
| return f"Error extracting article: {str(e)}" | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ Tool for ArXiv ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def arvix_search(query: str) -> Dict[str, str]: | |
| """ | |
| Search for academic papers on ArXiv. | |
| Args: | |
| query: The search term to look for in ArXiv. | |
| Returns: | |
| A dictionary of up to 3 relevant paper entries in JSON format. | |
| """ | |
| papers = ArxivLoader(query=query, load_max_docs=3).load() | |
| results = [] | |
| for doc in papers: | |
| try: | |
| # Handle different metadata formats that might be returned | |
| source = doc.metadata.get("source", "ArXiv") | |
| doc_id = doc.metadata.get("id", doc.metadata.get("entry_id", "")) | |
| result = { | |
| "source": source, | |
| "id": doc_id, | |
| "summary": doc.page_content[:1000] if hasattr(doc, "page_content") else str(doc)[:1000], | |
| } | |
| results.append(result) | |
| except Exception as e: | |
| # Add error information as a fallback | |
| results.append({ | |
| "source": "ArXiv Error", | |
| "id": "error", | |
| "summary": f"Error processing paper: {str(e)}" | |
| }) | |
| return {"arvix_results": json.dumps(results)} | |
| def answer_youtube_video_question( | |
| youtube_url: str, | |
| question: str, | |
| chunk_size_seconds: int = 30 | |
| ) -> str: | |
| """ | |
| Answer a question based on a YouTube video's transcript. | |
| Args: | |
| youtube_url: URL of the YouTube video. | |
| question: The question to be answered using video content. | |
| chunk_size_seconds: Duration of each transcript chunk. | |
| Returns: | |
| The answer to the question generated from the video transcript. | |
| """ | |
| loader = YoutubeLoader.from_youtube_url( | |
| youtube_url, | |
| add_video_info=True, | |
| transcript_format=TranscriptFormat.CHUNKS, | |
| chunk_size_seconds=chunk_size_seconds, | |
| ) | |
| documents = loader.load() | |
| embeddings = HuggingFaceEmbeddings(model_name='sentence-transformers/all-mpnet-base-v2') | |
| vectorstore = FAISS.from_documents(documents, embeddings) | |
| llm = RetryingChatGroq(model="deepseek-r1-distill-llama-70b", streaming=False) | |
| qa_chain = RetrievalQA.from_chain_type(llm=llm, retriever=vectorstore.as_retriever()) | |
| return qa_chain.run(question) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ Tool for Python REPL tool ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| python_repl = PythonREPLTool() | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ Tool for Wiki ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def wiki_search(query: str) -> str: | |
| """ | |
| Search Wikipedia for information on a given topic. | |
| Args: | |
| query: The search term for Wikipedia. | |
| Returns: | |
| A JSON string with up to 3 summary results. | |
| """ | |
| # load up to top_k pages | |
| pages = WikipediaLoader(query=query, load_max_docs=3).load() | |
| results: List[Dict] = [] | |
| for doc in pages: | |
| results.append({ | |
| "source": doc.metadata["source"], | |
| "page": doc.metadata.get("page", ""), | |
| "content": doc.page_content[:1000], # truncate if you like | |
| }) | |
| return {"wiki_results": format_search_docs(results)} | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # βββββββββββββββββββββββββββββββββββββ Tool for Image (understading, captioning & classification) βββββββββββββββββββββββββββββββββββββββββ | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _load_image(img_path: str, resize_to=(512, 512)) -> Image.Image: | |
| """ | |
| Load, verify, convert, and resize an image. | |
| Raises ValueError on failure. | |
| """ | |
| if not img_path: | |
| raise ValueError("No image path provided.") | |
| try: | |
| with Image.open(img_path) as img: | |
| img.verify() | |
| img = Image.open(img_path).convert("RGB") | |
| img = img.resize(resize_to) | |
| return img | |
| except UnidentifiedImageError: | |
| raise ValueError(f"File at {img_path} is not a valid image.") | |
| except Exception as e: | |
| raise ValueError(f"Failed to load image at {img_path}: {e}") | |
| def _encode_image_to_base64(img_path: str) -> str: | |
| """ | |
| Load an image, save optimized PNG into memory, and base64βencode it. | |
| """ | |
| img = _load_image(img_path) | |
| buffer = BytesIO() | |
| img.save(buffer, format="PNG", optimize=True) | |
| return base64.b64encode(buffer.getvalue()).decode("utf-8") | |
| def image_processing(prompt: str, img_path: str) -> str: | |
| """Process an image using a vision LLM, with OCR fallback. | |
| Args: | |
| prompt: Instruction or question related to the image. | |
| img_path: Path to the image file. | |
| Returns: | |
| The model's response or fallback OCR result. | |
| """ | |
| try: | |
| import os | |
| # Check if file exists | |
| if not os.path.exists(img_path): | |
| return f"Error: Image file '{img_path}' does not exist." | |
| try: | |
| b64 = _encode_image_to_base64(img_path) | |
| # Build a single markdown string with inline base64 image | |
| md = f"{prompt}\n\n" | |
| message = HumanMessage(content=md) | |
| # Use RetryingChatGroq with Llama 4 Maverick for vision | |
| llm = RetryingChatGroq(model="meta-llama/llama-4-maverick-17b-128e-instruct", streaming=False, temperature=0) | |
| try: | |
| resp = llm.invoke([message]) | |
| if hasattr(resp, 'content'): | |
| return resp.content.strip() | |
| elif isinstance(resp, str): | |
| return resp.strip() | |
| else: | |
| # Handle dictionary or other response types | |
| return str(resp) | |
| except Exception as invoke_err: | |
| print(f"[LLM invoke error] {invoke_err}") | |
| # Fall back to OCR | |
| raise ValueError("LLM invocation failed") | |
| except Exception as llama_err: | |
| print(f"[LLM vision failed] {llama_err}") | |
| try: | |
| img = _load_image(img_path) | |
| return pytesseract.image_to_string(img).strip() | |
| except Exception as ocr_err: | |
| print(f"[OCR fallback failed] {ocr_err}") | |
| return "Unable to process the image. Please check the file and try again." | |
| except Exception as e: | |
| # Catch any other errors | |
| print(f"[image_processing error] {e}") | |
| return f"Error processing image: {str(e)}" | |
| python_repl_tool = PythonREPLTool() | |
| def echo(text: str) -> str: | |
| """Echo back the input text. | |
| Args: | |
| text: The string to be echoed. | |
| Returns: | |
| The same text that was provided as input. | |
| """ | |
| return text | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # βββββββββββββββββββββββββββββββββββββββββββββββ Langgraph Agent βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Build graph function | |
| from langchain_core.tools import tool | |
| from langchain.chat_models import ChatOpenAI | |
| from langgraph.prebuilt.chat_agent_executor import create_react_agent, AgentState | |
| from langchain.chat_models import init_chat_model | |
| def build_graph(provider: str = "groq"): | |
| """Construct and compile the multiβagent GAIA workflow StateGraph. | |
| This graph wires together three Reactβstyle agents into a streamlined pipeline: | |
| PerceptionAgent β ActionAgent β EvaluationAgent (with appropriate entry/exit points) | |
| The agents have the following responsibilities: | |
| - PerceptionAgent: Handles web searches, Wikipedia, ArXiv, and image processing | |
| - ActionAgent: Performs calculations, file operations, and code analysis | |
| - EvaluationAgent: Reviews results and ensures the final answer is properly formatted | |
| Args: | |
| provider: The name of the LLM provider. Must be "groq". | |
| Returns: | |
| CompiledGraph: A compiled LangGraph state machine ready for invocation. | |
| Raises: | |
| ValueError: If `provider` is anything other than "groq". | |
| """ | |
| try: | |
| if provider != "groq": | |
| raise ValueError("Invalid provider. Expected 'groq'.") | |
| # Initialize LLM | |
| try: | |
| logger.info("Initializing LLM with model: deepseek-r1-distill-llama-70b") | |
| api_key = os.getenv("GROQ_API_KEY") | |
| if not api_key or api_key == "default_key_or_placeholder": | |
| logger.error("GROQ_API_KEY is not set or is using placeholder value") | |
| raise ValueError("GROQ_API_KEY environment variable is not set properly. Please set a valid API key.") | |
| llm = RetryingChatGroq(model="deepseek-r1-distill-llama-70b", temperature=0) | |
| logger.info("LLM initialized successfully") | |
| except Exception as e: | |
| logger.error(f"Error initializing LLM: {str(e)}") | |
| raise | |
| # General system message for agents | |
| sys_msg = SystemMessage(content=""" | |
| You are a general AI assistant. I will ask you a question. Report your thoughts, and finish your answer with the following template: | |
| FINAL ANSWER: [YOUR FINAL ANSWER] | |
| YOUR FINAL ANSWER should be a number OR as few words as possible OR a comma-separated list of numbers and/or strings. | |
| If you are asked for a number, don't use commas or units (e.g., $, %, kg) unless specified otherwise. | |
| If you are asked for a string, don't use articles (a, an, the), and don't use abbreviations (e.g., for states). | |
| If you are asked for a comma-separated list, apply the above rules to each element in the list. | |
| """.strip()) | |
| # Special system message for the evaluation agent with stricter formatting requirements | |
| eval_sys_msg = SystemMessage(content=""" | |
| You are a specialized evaluation agent. Your job is to review the work done by other agents | |
| and provide a final, properly formatted answer. | |
| IMPORTANT: You MUST ALWAYS format your answer using this exact template: | |
| FINAL ANSWER: [concise answer] | |
| Rules for formatting the answer: | |
| 1. The answer must be extremely concise - use as few words as possible | |
| 2. For numeric answers, provide only the number without units unless units are specifically requested | |
| 3. For text answers, avoid articles (a, an, the) and unnecessary words | |
| 4. For list answers, use a comma-separated format | |
| 5. NEVER explain your reasoning in the FINAL ANSWER section | |
| 6. NEVER skip the "FINAL ANSWER:" prefix | |
| Example good answers: | |
| FINAL ANSWER: 42 | |
| FINAL ANSWER: Paris | |
| FINAL ANSWER: 1912, 1945, 1989 | |
| Example bad answers (don't do these): | |
| - Based on my analysis, the answer is 42. | |
| - I think it's Paris because that's the capital of France. | |
| - The years were 1912, 1945, and 1989. | |
| Remember: ALWAYS include "FINAL ANSWER:" followed by the most concise answer possible. | |
| """.strip()) | |
| # Define tools for each agent | |
| logger.info("Setting up agent tools") | |
| perception_tools = [web_search, wiki_search, news_article_search, arvix_search, image_processing, echo] | |
| execution_tools = [ | |
| multiply, add, subtract, divide, modulus, | |
| download_file, process_excel_to_text, | |
| read_text_from_pdf, read_text_from_docx, | |
| transcribe_audio, youtube_audio_processing, | |
| extract_article_text, answer_youtube_video_question, | |
| python_repl_tool, analyze_code, read_code_file, analyze_python_function | |
| ] | |
| # βββββββββββββββ Agent Creation βββββββββββββββ | |
| logger.info("Creating agents") | |
| try: | |
| # Create agents with proper error handling | |
| PerceptionAgent = create_react_agent( | |
| model=llm, | |
| tools=perception_tools, | |
| prompt=sys_msg, | |
| state_schema=AgentState, | |
| name="PerceptionAgent" | |
| ) | |
| logger.info("Created PerceptionAgent successfully") | |
| # Combined Planning and Execution agent for better efficiency | |
| ActionAgent = create_react_agent( | |
| model=llm, | |
| tools=execution_tools, # Has access to all execution tools | |
| prompt=sys_msg, | |
| state_schema=AgentState, | |
| name="ActionAgent" | |
| ) | |
| logger.info("Created ActionAgent successfully") | |
| # Evaluation agent with stricter prompt | |
| EvaluationAgent = create_react_agent( | |
| model=llm, | |
| tools=[], # No tools needed for evaluation | |
| prompt=eval_sys_msg, # Use the specialized evaluation prompt | |
| state_schema=AgentState, | |
| name="EvaluationAgent" | |
| ) | |
| logger.info("Created EvaluationAgent successfully") | |
| except Exception as e: | |
| logger.error(f"Error creating agent: {str(e)}") | |
| import traceback | |
| logger.error(f"Traceback: {traceback.format_exc()}") | |
| raise | |
| # Build the StateGraph | |
| logger.info("Building StateGraph") | |
| try: | |
| builder = StateGraph(AgentState) | |
| # Add agent nodes first | |
| builder.add_node("PerceptionAgent", PerceptionAgent) | |
| builder.add_node("ActionAgent", ActionAgent) | |
| builder.add_node("EvaluationAgent", EvaluationAgent) | |
| # Define the flow with a starting edge | |
| builder.set_entry_point("PerceptionAgent") | |
| # Add the edges for the simpler linear flow | |
| builder.add_edge("PerceptionAgent", "ActionAgent") | |
| builder.add_edge("ActionAgent", "EvaluationAgent") | |
| # Set EvaluationAgent as the end node | |
| builder.set_finish_point("EvaluationAgent") | |
| logger.info("Compiling StateGraph") | |
| return builder.compile() | |
| except Exception as e: | |
| logger.error(f"Error building graph: {str(e)}") | |
| import traceback | |
| logger.error(f"Traceback: {traceback.format_exc()}") | |
| raise | |
| except Exception as e: | |
| logger.error(f"Overall error in build_graph: {str(e)}") | |
| import traceback | |
| logger.error(f"Traceback: {traceback.format_exc()}") | |
| raise | |
| def get_final_answer(text): | |
| """Extract just the FINAL ANSWER from the model's response. | |
| Args: | |
| text: The full text response from the LLM | |
| Returns: | |
| str: The extracted answer without the "FINAL ANSWER:" prefix | |
| """ | |
| # Log the raw text for debugging if needed | |
| logger.debug(f"Extracting answer from: {text[:200]}...") | |
| if not text: | |
| logger.warning("Empty response received") | |
| return "No answer provided." | |
| # Method 1: Look for "FINAL ANSWER:" with most comprehensive pattern matching | |
| pattern = r'(?:^|\n)FINAL ANSWER:\s*(.*?)(?:\n\s*$|$)' | |
| match = re.search(pattern, text, re.DOTALL | re.IGNORECASE) | |
| if match: | |
| # Return just the answer part, cleaned up | |
| logger.debug("Found answer using pattern 1") | |
| return match.group(1).strip() | |
| # Method 2: Try looking for variations on the final answer format | |
| for variant in ["FINAL ANSWER:", "FINAL_ANSWER:", "Final Answer:", "Answer:"]: | |
| lines = text.split('\n') | |
| for i, line in enumerate(reversed(lines)): | |
| if variant in line: | |
| # Extract everything after the variant text | |
| logger.debug(f"Found answer using variant: {variant}") | |
| answer = line[line.find(variant) + len(variant):].strip() | |
| if answer: | |
| return answer | |
| # If the answer is on the next line, return that | |
| if i > 0: | |
| next_line = lines[len(lines) - i] | |
| if next_line.strip(): | |
| return next_line.strip() | |
| # Method 3: Look for phrases that suggest an answer | |
| for phrase in ["The answer is", "The result is", "We get", "Therefore,", "In conclusion,"]: | |
| phrase_pos = text.find(phrase) | |
| if phrase_pos != -1: | |
| # Try to extract everything after the phrase until the end of the sentence | |
| sentence_end = text.find(".", phrase_pos) | |
| if sentence_end != -1: | |
| logger.debug(f"Found answer using phrase: {phrase}") | |
| return text[phrase_pos + len(phrase):sentence_end].strip() | |
| # Method 4: Fall back to taking the last paragraph with actual content | |
| paragraphs = text.strip().split('\n\n') | |
| for para in reversed(paragraphs): | |
| para = para.strip() | |
| if para and not para.startswith("I ") and not para.lower().startswith("to "): | |
| logger.debug("Using last meaningful paragraph") | |
| # If paragraph is very long, try to extract a concise answer | |
| if len(para) > 100: | |
| sentences = re.split(r'[.!?]', para) | |
| for sentence in reversed(sentences): | |
| sent = sentence.strip() | |
| if sent and len(sent) > 5 and not sent.startswith("I "): | |
| return sent | |
| return para | |
| # Method 5: Last resort - just return the last line with content | |
| lines = text.strip().split('\n') | |
| for line in reversed(lines): | |
| line = line.strip() | |
| if line and len(line) > 3: | |
| logger.debug("Using last line with content") | |
| return line | |
| # If everything fails, warn and return the truncated response | |
| logger.warning("Could not find a properly formatted answer") | |
| return text[:100] + "..." if len(text) > 100 else text | |
| # test | |
| if __name__ == "__main__": | |
| question = "When was a picture of St. Thomas Aquinas first added to the Wikipedia page on the Principle of double effect?" | |
| # Build the graph | |
| graph = build_graph(provider="groq") | |
| # Run the graph | |
| messages = [HumanMessage(content=question)] | |
| messages = graph.invoke({"messages": messages}) | |
| for m in messages["messages"]: | |
| m.pretty_print() | |
| # βββββββββββββββββββββββββββββββββββββββββββββββ Tool for Code Analysis βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def analyze_code(code_string: str) -> str: | |
| """Analyze a string of code to understand its structure, functionality, and potential issues. | |
| Args: | |
| code_string: The code to analyze as a string. | |
| Returns: | |
| A structured analysis of the code including functions, classes, and key operations. | |
| """ | |
| try: | |
| import ast | |
| # Try to parse with Python's AST module | |
| try: | |
| parsed = ast.parse(code_string) | |
| # Extract functions and classes | |
| functions = [node.name for node in ast.walk(parsed) if isinstance(node, ast.FunctionDef)] | |
| classes = [node.name for node in ast.walk(parsed) if isinstance(node, ast.ClassDef)] | |
| imports = [node.names[0].name for node in ast.walk(parsed) if isinstance(node, ast.Import)] | |
| imports.extend([f"{node.module}.{name.name}" if node.module else name.name | |
| for node in ast.walk(parsed) if isinstance(node, ast.ImportFrom) | |
| for name in node.names]) | |
| # Count various node types for complexity assessment | |
| num_loops = len([node for node in ast.walk(parsed) | |
| if isinstance(node, (ast.For, ast.While))]) | |
| num_conditionals = len([node for node in ast.walk(parsed) | |
| if isinstance(node, (ast.If, ast.IfExp))]) | |
| analysis = { | |
| "language": "Python", | |
| "functions": functions, | |
| "classes": classes, | |
| "imports": imports, | |
| "complexity": { | |
| "functions": len(functions), | |
| "classes": len(classes), | |
| "loops": num_loops, | |
| "conditionals": num_conditionals | |
| } | |
| } | |
| return str(analysis) | |
| except SyntaxError: | |
| # If not valid Python, try some simple pattern matching | |
| if "{" in code_string and "}" in code_string: | |
| if "function" in code_string or "=>" in code_string: | |
| language = "JavaScript/TypeScript" | |
| elif "func" in code_string or "struct" in code_string: | |
| language = "Go or Rust" | |
| elif "public" in code_string or "private" in code_string or "class" in code_string: | |
| language = "Java/C#/C++" | |
| else: | |
| language = "Unknown C-like language" | |
| elif "<" in code_string and ">" in code_string and ("/>" in code_string or "</"): | |
| language = "HTML/XML/JSX" | |
| else: | |
| language = "Unknown" | |
| return f"Non-Python code detected ({language}). Basic code structure analysis not available." | |
| except Exception as e: | |
| return f"Error analyzing code: {str(e)}" | |
| def read_code_file(file_path: str) -> str: | |
| """Read a code file and return its contents with proper syntax detection. | |
| Args: | |
| file_path: Path to the code file. | |
| Returns: | |
| The file contents and detected language. | |
| """ | |
| try: | |
| # Check if file exists | |
| import os | |
| if not os.path.exists(file_path): | |
| return f"Error: File '{file_path}' does not exist." | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| # Try to detect language from extension | |
| ext = os.path.splitext(file_path)[1].lower() | |
| language_map = { | |
| '.py': 'Python', | |
| '.js': 'JavaScript', | |
| '.ts': 'TypeScript', | |
| '.html': 'HTML', | |
| '.css': 'CSS', | |
| '.java': 'Java', | |
| '.c': 'C', | |
| '.cpp': 'C++', | |
| '.cs': 'C#', | |
| '.go': 'Go', | |
| '.rs': 'Rust', | |
| '.php': 'PHP', | |
| '.rb': 'Ruby', | |
| '.sh': 'Shell', | |
| '.bat': 'Batch', | |
| '.ps1': 'PowerShell', | |
| '.sql': 'SQL', | |
| '.json': 'JSON', | |
| '.xml': 'XML', | |
| '.yaml': 'YAML', | |
| '.yml': 'YAML', | |
| } | |
| language = language_map.get(ext, 'Unknown') | |
| return f"File content ({language}):\n\n{content}" | |
| except Exception as e: | |
| return f"Error reading file: {str(e)}" | |
| def analyze_python_function(function_name: str, code_string: str) -> str: | |
| """Extract and analyze a specific function from Python code. | |
| Args: | |
| function_name: The name of the function to analyze. | |
| code_string: The complete code containing the function. | |
| Returns: | |
| Analysis of the function including parameters, return type, and docstring. | |
| """ | |
| try: | |
| import ast | |
| import inspect | |
| from types import CodeType, FunctionType | |
| # Parse the code string | |
| parsed = ast.parse(code_string) | |
| # Find the function definition | |
| function_def = None | |
| for node in ast.walk(parsed): | |
| if isinstance(node, ast.FunctionDef) and node.name == function_name: | |
| function_def = node | |
| break | |
| if not function_def: | |
| return f"Function '{function_name}' not found in the provided code." | |
| # Extract parameters | |
| params = [] | |
| for arg in function_def.args.args: | |
| param_name = arg.arg | |
| # Get annotation if it exists | |
| if arg.annotation: | |
| if isinstance(arg.annotation, ast.Name): | |
| param_type = arg.annotation.id | |
| elif isinstance(arg.annotation, ast.Attribute): | |
| param_type = f"{arg.annotation.value.id}.{arg.annotation.attr}" | |
| else: | |
| param_type = "complex_type" | |
| params.append(f"{param_name}: {param_type}") | |
| else: | |
| params.append(param_name) | |
| # Extract return type if it exists | |
| return_type = None | |
| if function_def.returns: | |
| if isinstance(function_def.returns, ast.Name): | |
| return_type = function_def.returns.id | |
| elif isinstance(function_def.returns, ast.Attribute): | |
| return_type = f"{function_def.returns.value.id}.{function_def.returns.attr}" | |
| else: | |
| return_type = "complex_return_type" | |
| # Extract docstring | |
| docstring = ast.get_docstring(function_def) | |
| # Create a summary | |
| summary = { | |
| "function_name": function_name, | |
| "parameters": params, | |
| "return_type": return_type, | |
| "docstring": docstring, | |
| "decorators": [d.id if isinstance(d, ast.Name) else "complex_decorator" for d in function_def.decorator_list], | |
| "line_count": len(function_def.body) | |
| } | |
| # Create a more explicit string representation that ensures key terms are included | |
| result = f"Function '{function_name}' analysis:\n" | |
| result += f"- Parameters: {', '.join(params)}\n" | |
| result += f"- Return type: {return_type or 'None specified'}\n" | |
| result += f"- Docstring: {docstring or 'None'}\n" | |
| result += f"- Line count: {len(function_def.body)}" | |
| return result | |
| except Exception as e: | |
| return f"Error analyzing function: {str(e)}" | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # βββββββββββββββββββββββββββββββββββββββββββββββ Tool for News Article Retrieval ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def news_article_search(query: str, top_k: int = 3) -> Dict[str, str]: | |
| """Search for and retrieve news articles with robust error handling for news sites. | |
| Args: | |
| query: The news topic or keywords to search for. | |
| top_k: Maximum number of articles to retrieve. | |
| Returns: | |
| A dictionary with search results formatted as XML-like document entries. | |
| """ | |
| # First, get URLs from DuckDuckGo with "news" focus | |
| results = [] | |
| news_sources = [ | |
| "bbc.com", "reuters.com", "apnews.com", "nasa.gov", | |
| "space.com", "universetoday.com", "nature.com", "science.org", | |
| "scientificamerican.com", "nytimes.com", "theguardian.com" | |
| ] | |
| # Find news from reliable sources | |
| try: | |
| with DDGS() as ddgs: | |
| search_query = f"{query} site:{' OR site:'.join(news_sources)}" | |
| for hit in ddgs.text(search_query, safesearch="On", max_results=top_k*2): | |
| url = hit.get("href") or hit.get("url", "") | |
| if not url: | |
| continue | |
| # Add the search snippet first as a fallback | |
| result = { | |
| "source": url, | |
| "page": "", | |
| "content": hit.get("body", "")[:250], | |
| "title": hit.get("title", "") | |
| } | |
| # Try to get better content via a more robust method | |
| try: | |
| headers = { | |
| "User-Agent": random.choice([ | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36", | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Safari/605.1.15", | |
| "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.45 Safari/537.36" | |
| ]), | |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", | |
| "Accept-Language": "en-US,en;q=0.5", | |
| "Referer": "https://www.google.com/", | |
| "DNT": "1", | |
| "Connection": "keep-alive", | |
| "Upgrade-Insecure-Requests": "1" | |
| } | |
| # Add a short delay between requests | |
| time.sleep(1 + random.random()) | |
| # Try to use newspaper3k for more reliable article extraction | |
| from newspaper import Article | |
| article = Article(url) | |
| article.download() | |
| article.parse() | |
| # If we got meaningful content, update the result | |
| if article.text and len(article.text) > 100: | |
| # Get a summary - first paragraph + some highlights | |
| paragraphs = article.text.split('\n\n') | |
| first_para = paragraphs[0] if paragraphs else "" | |
| summary = first_para[:300] | |
| if len(paragraphs) > 1: | |
| summary += "... " + paragraphs[1][:200] | |
| result["content"] = summary | |
| if article.title: | |
| result["title"] = article.title | |
| except Exception as article_err: | |
| logger.warning(f"Article extraction failed for {url}: {article_err}") | |
| # Fallback to simple requests-based extraction | |
| try: | |
| resp = requests.get(url, timeout=12, headers=headers) | |
| resp.raise_for_status() | |
| soup = BeautifulSoup(resp.text, "html.parser") | |
| # Try to get main content | |
| main_content = soup.find('main') or soup.find('article') or soup.find('div', class_='content') | |
| if main_content: | |
| content = " ".join(main_content.get_text(separator=" ", strip=True).split()[:250]) | |
| result["content"] = content | |
| except Exception as req_err: | |
| logger.warning(f"Fallback extraction failed for {url}: {req_err}") | |
| # Keep the original snippet as fallback | |
| results.append(result) | |
| if len(results) >= top_k: | |
| break | |
| except Exception as e: | |
| logger.error(f"News search failed: {e}") | |
| return format_search_docs([{ | |
| "source": "Error", | |
| "page": "", | |
| "content": f"Failed to retrieve news articles for '{query}': {str(e)}" | |
| }]) | |
| if not results: | |
| # Fallback to regular web search | |
| logger.info(f"No news results found, falling back to web_search for {query}") | |
| return web_search(query, top_k) | |
| return format_search_docs(results[:top_k]) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ Document Chunking Utilities ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def chunk_document(text: str, chunk_size: int = 1000, overlap: int = 100) -> List[str]: | |
| """ | |
| Split a large document into smaller chunks with overlap to maintain context across chunks. | |
| Args: | |
| text: The document text to split into chunks | |
| chunk_size: Maximum size of each chunk in characters | |
| overlap: Number of characters to overlap between chunks | |
| Returns: | |
| List of text chunks | |
| """ | |
| # If text is smaller than chunk_size, return it as is | |
| if len(text) <= chunk_size: | |
| return [text] | |
| chunks = [] | |
| start = 0 | |
| while start < len(text): | |
| # Get chunk with overlap | |
| end = min(start + chunk_size, len(text)) | |
| # Try to find sentence boundary for cleaner breaks | |
| if end < len(text): | |
| # Look for sentence endings: period, question mark, or exclamation followed by space | |
| for sentence_end in ['. ', '? ', '! ']: | |
| last_period = text[start:end].rfind(sentence_end) | |
| if last_period != -1: | |
| end = start + last_period + 2 # +2 to include the period and space | |
| break | |
| # Add chunk to list | |
| chunks.append(text[start:end]) | |
| # Move start position, accounting for overlap | |
| start = end - overlap if end < len(text) else len(text) | |
| return chunks | |
| # Document processing utility that uses chunking | |
| def process_large_document(text: str, question: str, llm=None) -> str: | |
| """ | |
| Process a large document by chunking it and using retrieval to find relevant parts. | |
| Args: | |
| text: The document text to process | |
| question: The question being asked about the document | |
| llm: Optional language model to use (defaults to agent's LLM) | |
| Returns: | |
| Summarized answer based on relevant chunks | |
| """ | |
| if not llm: | |
| llm = RetryingChatGroq(model="deepseek-r1-distill-llama-70b", streaming=False, temperature=0) | |
| # Split document into chunks | |
| chunks = chunk_document(text) | |
| # If document is small enough, don't bother with retrieval | |
| if len(chunks) <= 1: | |
| return text | |
| # For larger documents, create embeddings to find relevant chunks | |
| try: | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from langchain.vectorstores import FAISS | |
| from langchain.schema import Document | |
| # Create documents with chunk content | |
| documents = [Document(page_content=chunk, metadata={"chunk_id": i}) for i, chunk in enumerate(chunks)] | |
| # Create embeddings and vector store | |
| embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2") | |
| vectorstore = FAISS.from_documents(documents, embeddings) | |
| # Get most relevant chunks | |
| relevant_chunks = vectorstore.similarity_search(question, k=2) # Get top 2 most relevant chunks | |
| # Join the relevant chunks | |
| relevant_text = "\n\n".join([doc.page_content for doc in relevant_chunks]) | |
| # Option 1: Return relevant chunks directly | |
| return relevant_text | |
| # Option 2: Summarize with LLM (commented out for now) | |
| # prompt = f"Using only the following information, answer the question: '{question}'\n\nInformation:\n{relevant_text}" | |
| # response = llm.invoke([HumanMessage(content=prompt)]) | |
| # return response.content | |
| except Exception as e: | |
| # Fall back to first chunk if retrieval fails | |
| logger.warning(f"Retrieval failed: {e}. Falling back to first chunk.") | |
| return chunks[0] |