from typing import TypedDict, Annotated import os import requests from urllib.parse import urlparse from pathlib import Path from langchain_community.tools import DuckDuckGoSearchRun from langchain_community.document_loaders import WikipediaLoader, YoutubeLoader from langchain_community.document_loaders.youtube import TranscriptFormat from pytube import YouTube from langgraph.graph.message import add_messages from langchain_core.messages import AnyMessage, HumanMessage, AIMessage from langgraph.prebuilt import ToolNode from langchain_openai import ChatOpenAI from langgraph.graph import START, StateGraph from langfuse.langchain import CallbackHandler from langgraph.prebuilt import tools_condition from langchain_huggingface import HuggingFaceEndpoint, ChatHuggingFace from langchain_core.tools import tool # Web search tool using DuckDuckGo search_tool = DuckDuckGoSearchRun() # Create Wikipedia search tool using WikipediaLoader @tool def search_wikipedia(query: str) -> str: """Search Wikipedia for information about a topic. Args: query: The search query or topic to look up on Wikipedia Returns: str: The Wikipedia content related to the query """ try: # Load Wikipedia documents for the query loader = WikipediaLoader(query=query, load_max_docs=2) docs = loader.load() if not docs: return f"No Wikipedia articles found for query: {query}" # Combine the content from the documents content = "" for doc in docs: content += f"Title: {doc.metadata.get('title', 'Unknown')}\n" content += f"Content: {doc.page_content}...\n\n" return content except Exception as e: return f"Error searching Wikipedia: {str(e)}" # Create YouTube transcript analysis tool @tool def analyze_youtube_video(video_url: str) -> str: """Analyze a YouTube video by loading and processing its transcript. Args: video_url: The YouTube video URL to analyze Returns: str: The transcript content of the YouTube video """ try: loader = YoutubeLoader.from_youtube_url( video_url, add_video_info=False, transcript_format=TranscriptFormat.CHUNKS, chunk_size_seconds=60 ) docs = loader.load() if docs: content = f"Video URL: {video_url}\n" content += "Transcript (Chunked):\n" for i, doc in enumerate(docs[:5]): # Limit to first 5 chunks content += f"Chunk {i+1}: {doc.page_content}\n" return content except Exception as e: print(f"Analyze video failed: {e}") # Create file download tool @tool def download_file(url: str, filename: str = None, download_dir: str = "downloaded_files") -> str: """Download a file from a URL and save it locally. Args: url: The URL of the file to download filename: Optional custom filename. If not provided, will extract from URL download_dir: Directory to save the file (default: 'downloaded_files') Returns: str: Success message with file path or error message """ try: # Create download directory if it doesn't exist Path(download_dir).mkdir(exist_ok=True) # Parse URL to get filename if not provided if not filename: parsed_url = urlparse(url) filename = os.path.basename(parsed_url.path) if not filename: filename = "downloaded_file" # Full file path file_path = os.path.join(download_dir, filename) # Download the file print(f"Downloading file from: {url}") response = requests.get(url, stream=True, timeout=30) response.raise_for_status() # Get file size for progress tracking total_size = int(response.headers.get('content-length', 0)) # Write file in chunks with open(file_path, 'wb') as file: downloaded = 0 for chunk in response.iter_content(chunk_size=8192): if chunk: file.write(chunk) downloaded += len(chunk) # Get file info file_size = os.path.getsize(file_path) file_size_mb = file_size / (1024 * 1024) return f"""File downloaded successfully! File Details: - URL: {url} - Saved as: {file_path} - File size: {file_size_mb:.2f} MB - Content type: {response.headers.get('content-type', 'Unknown')} You can now access the file at: {os.path.abspath(file_path)}""" except requests.exceptions.RequestException as e: return f"Error downloading file from {url}: Network error - {str(e)}" except OSError as e: return f"Error saving file: File system error - {str(e)}" except Exception as e: return f"Error downloading file from {url}: {str(e)}" # Initialize Langfuse CallbackHandler globally def get_langfuse_handler(): """Get configured Langfuse handler""" # Langfuse will automatically read LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY, and LANGFUSE_HOST from environment return CallbackHandler() def build_jasper(): # Generate the chat interface, including the tools # llm = HuggingFaceEndpoint( # repo_id="Qwen/Qwen2.5-Coder-32B-Instruct", # huggingfacehub_api_token=os.getenv("HUGGINGFACE_API_TOKEN"), # ) tools = [search_tool, search_wikipedia, analyze_youtube_video] # llm = HuggingFaceEndpoint( # repo_id="Qwen/Qwen2.5-Omni-3B", # huggingfacehub_api_token=os.getenv("HUGGINGFACE_API_TOKEN"), # ) # chat = ChatHuggingFace(llm=llm, verbose=True) # chat_with_tools = chat.bind_tools(tools) # Set your OpenAI API key here llm = ChatOpenAI( model="gpt-4o", temperature=0, api_key=os.getenv("OPENAI_API_KEY") ) chat_with_tools = llm.bind_tools(tools, parallel_tool_calls=False) # Generate the AgentState and Agent graph class AgentState(TypedDict): messages: Annotated[list[AnyMessage], add_messages] def assistant(state: AgentState): return { "messages": [chat_with_tools.invoke(state["messages"])], } ## The graph builder = StateGraph(AgentState) # Define nodes: these do the work builder.add_node("assistant", assistant) builder.add_node("tools", ToolNode(tools)) # Define edges: these determine how the control flow moves builder.add_edge(START, "assistant") builder.add_conditional_edges( "assistant", # If the latest message requires a tool, route to tools # Otherwise, provide a direct response tools_condition, ) builder.add_edge("tools", "assistant") # Compile the graph without callback parameter jasper = builder.compile() print("Langfuse tracing enabled - traces will be available in your Langfuse dashboard") return jasper def run_jasper(): jasper = build_jasper() messages = [HumanMessage(content="Examine the video at https://www.youtube.com/watch?v=1htKBjuUWec.\n\nWhat does Teal'c say in response to the question \"Isn't that hot?\"")] # Get Langfuse handler for tracing langfuse_handler = get_langfuse_handler() # Add trace metadata for this specific run response = jasper.invoke( {"messages": messages}, config={ "callbacks": [langfuse_handler], "metadata": { "trace_name": "YouTube_Video_Analysis", "user_id": "jasper-user", "session_id": "jasper-agent-session" } } ) print("Jasper's Response:") print(response['messages'][-1].content) if __name__ == "__main__": run_jasper()