|
from typing import TypedDict, Annotated |
|
import os |
|
import requests |
|
from urllib.parse import urlparse |
|
from pathlib import Path |
|
from langchain_community.tools import DuckDuckGoSearchRun |
|
from langchain_community.document_loaders import WikipediaLoader, YoutubeLoader |
|
from langchain_community.document_loaders.youtube import TranscriptFormat |
|
from pytube import YouTube |
|
from langgraph.graph.message import add_messages |
|
from langchain_core.messages import AnyMessage, HumanMessage, AIMessage |
|
from langgraph.prebuilt import ToolNode |
|
from langchain_openai import ChatOpenAI |
|
from langgraph.graph import START, StateGraph |
|
from langfuse.langchain import CallbackHandler |
|
from langgraph.prebuilt import tools_condition |
|
from langchain_huggingface import HuggingFaceEndpoint, ChatHuggingFace |
|
from langchain_core.tools import tool |
|
|
|
|
|
search_tool = DuckDuckGoSearchRun() |
|
|
|
|
|
@tool |
|
def search_wikipedia(query: str) -> str: |
|
"""Search Wikipedia for information about a topic. |
|
|
|
Args: |
|
query: The search query or topic to look up on Wikipedia |
|
|
|
Returns: |
|
str: The Wikipedia content related to the query |
|
""" |
|
try: |
|
|
|
loader = WikipediaLoader(query=query, load_max_docs=2) |
|
docs = loader.load() |
|
|
|
if not docs: |
|
return f"No Wikipedia articles found for query: {query}" |
|
|
|
|
|
content = "" |
|
for doc in docs: |
|
content += f"Title: {doc.metadata.get('title', 'Unknown')}\n" |
|
content += f"Content: {doc.page_content}...\n\n" |
|
|
|
return content |
|
except Exception as e: |
|
return f"Error searching Wikipedia: {str(e)}" |
|
|
|
|
|
@tool |
|
def analyze_youtube_video(video_url: str) -> str: |
|
"""Analyze a YouTube video by loading and processing its transcript. |
|
|
|
Args: |
|
video_url: The YouTube video URL to analyze |
|
|
|
Returns: |
|
str: The transcript content of the YouTube video |
|
""" |
|
try: |
|
loader = YoutubeLoader.from_youtube_url( |
|
video_url, |
|
add_video_info=False, |
|
transcript_format=TranscriptFormat.CHUNKS, |
|
chunk_size_seconds=60 |
|
) |
|
docs = loader.load() |
|
|
|
if docs: |
|
content = f"Video URL: {video_url}\n" |
|
content += "Transcript (Chunked):\n" |
|
for i, doc in enumerate(docs[:5]): |
|
content += f"Chunk {i+1}: {doc.page_content}\n" |
|
return content |
|
except Exception as e: |
|
print(f"Analyze video failed: {e}") |
|
|
|
|
|
@tool |
|
def download_file(url: str, filename: str = None, download_dir: str = "downloaded_files") -> str: |
|
"""Download a file from a URL and save it locally. |
|
|
|
Args: |
|
url: The URL of the file to download |
|
filename: Optional custom filename. If not provided, will extract from URL |
|
download_dir: Directory to save the file (default: 'downloaded_files') |
|
|
|
Returns: |
|
str: Success message with file path or error message |
|
""" |
|
try: |
|
|
|
Path(download_dir).mkdir(exist_ok=True) |
|
|
|
|
|
if not filename: |
|
parsed_url = urlparse(url) |
|
filename = os.path.basename(parsed_url.path) |
|
if not filename: |
|
filename = "downloaded_file" |
|
|
|
|
|
file_path = os.path.join(download_dir, filename) |
|
|
|
|
|
print(f"Downloading file from: {url}") |
|
response = requests.get(url, stream=True, timeout=30) |
|
response.raise_for_status() |
|
|
|
|
|
total_size = int(response.headers.get('content-length', 0)) |
|
|
|
|
|
with open(file_path, 'wb') as file: |
|
downloaded = 0 |
|
for chunk in response.iter_content(chunk_size=8192): |
|
if chunk: |
|
file.write(chunk) |
|
downloaded += len(chunk) |
|
|
|
|
|
file_size = os.path.getsize(file_path) |
|
file_size_mb = file_size / (1024 * 1024) |
|
|
|
return f"""File downloaded successfully! |
|
|
|
File Details: |
|
- URL: {url} |
|
- Saved as: {file_path} |
|
- File size: {file_size_mb:.2f} MB |
|
- Content type: {response.headers.get('content-type', 'Unknown')} |
|
|
|
You can now access the file at: {os.path.abspath(file_path)}""" |
|
|
|
except requests.exceptions.RequestException as e: |
|
return f"Error downloading file from {url}: Network error - {str(e)}" |
|
except OSError as e: |
|
return f"Error saving file: File system error - {str(e)}" |
|
except Exception as e: |
|
return f"Error downloading file from {url}: {str(e)}" |
|
|
|
|
|
def get_langfuse_handler(): |
|
"""Get configured Langfuse handler""" |
|
|
|
return CallbackHandler() |
|
|
|
def build_jasper(): |
|
|
|
|
|
|
|
|
|
|
|
|
|
tools = [search_tool, search_wikipedia, analyze_youtube_video] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
llm = ChatOpenAI( |
|
model="gpt-4o", |
|
temperature=0, |
|
api_key=os.getenv("OPENAI_API_KEY") |
|
) |
|
chat_with_tools = llm.bind_tools(tools, parallel_tool_calls=False) |
|
|
|
|
|
|
|
class AgentState(TypedDict): |
|
messages: Annotated[list[AnyMessage], add_messages] |
|
|
|
|
|
def assistant(state: AgentState): |
|
return { |
|
"messages": [chat_with_tools.invoke(state["messages"])], |
|
} |
|
|
|
|
|
builder = StateGraph(AgentState) |
|
|
|
|
|
builder.add_node("assistant", assistant) |
|
builder.add_node("tools", ToolNode(tools)) |
|
|
|
|
|
builder.add_edge(START, "assistant") |
|
builder.add_conditional_edges( |
|
"assistant", |
|
|
|
|
|
tools_condition, |
|
) |
|
builder.add_edge("tools", "assistant") |
|
|
|
|
|
jasper = builder.compile() |
|
print("Langfuse tracing enabled - traces will be available in your Langfuse dashboard") |
|
return jasper |
|
|
|
def run_jasper(): |
|
jasper = build_jasper() |
|
messages = [HumanMessage(content="Examine the video at https://www.youtube.com/watch?v=1htKBjuUWec.\n\nWhat does Teal'c say in response to the question \"Isn't that hot?\"")] |
|
|
|
|
|
langfuse_handler = get_langfuse_handler() |
|
|
|
|
|
response = jasper.invoke( |
|
{"messages": messages}, |
|
config={ |
|
"callbacks": [langfuse_handler], |
|
"metadata": { |
|
"trace_name": "YouTube_Video_Analysis", |
|
"user_id": "jasper-user", |
|
"session_id": "jasper-agent-session" |
|
} |
|
} |
|
) |
|
|
|
print("Jasper's Response:") |
|
print(response['messages'][-1].content) |
|
|
|
if __name__ == "__main__": |
|
run_jasper() |