Spaces:
Sleeping
Sleeping
from typing import List, Tuple, Union | |
from llama_cpp import Llama | |
import os, json, subprocess, datetime, requests | |
from langchain_community.llms import LlamaCpp | |
from langchain.agents.agent_types import AgentType | |
from langchain.agents import Tool, AgentExecutor, initialize_agent | |
from langchain.agents.agent_types import AgentType | |
from langchain.agents import initialize_agent, Tool, AgentExecutor, AgentType | |
# === Load Model === | |
MODEL_PATH = "models/capybarahermes-2.5-mistral-7b.Q5_K_S.gguf" | |
if not os.path.exists(MODEL_PATH): | |
raise FileNotFoundError(f"β Model not found at {MODEL_PATH}") | |
llm = Llama(model_path=MODEL_PATH, n_ctx=2048, verbose=True) | |
HISTORY_FILE = "agent_logs.json" | |
# === Log Prompt/Response === | |
def log_task(prompt: str, response: str): | |
log = { | |
"timestamp": datetime.datetime.now().isoformat(), | |
"prompt": prompt, | |
"response": response, | |
} | |
with open(HISTORY_FILE, "a") as f: | |
f.write(json.dumps(log) + "\n") | |
# === Query LLaMA === | |
def query_llama(prompt: str, max_tokens: int = 256, temperature: float = 0.7, top_p: float = 0.9, stream: bool = False) -> str: | |
output = llm(prompt=prompt, max_tokens=max_tokens, temperature=temperature, top_p=top_p, echo=False, stream=stream) | |
if stream: | |
return "".join([chunk["choices"][0]["text"] for chunk in output]).strip() | |
return output["choices"][0]["text"].strip() | |
# === Utilities === | |
def exec_python(code: str) -> str: | |
try: | |
local_env = {} | |
exec(code, {}, local_env) | |
return str(local_env.get("result", "β Code executed.")) | |
except Exception as e: | |
return f"β Python Error: {e}" | |
def read_file(filepath: str) -> str: | |
try: | |
if not os.path.exists(filepath): | |
return f"β File not found: {filepath}" | |
with open(filepath, "r") as f: | |
return f.read() | |
except Exception as e: | |
return f"β File Read Error: {e}" | |
def create_langchain_agent(model_path: str) -> AgentExecutor: | |
llm = Llama( | |
model_path=model_path, | |
temperature=0.7, | |
max_tokens=512, | |
verbose=True, | |
) | |
tools = [ | |
Tool( | |
name="LLaMA Model", | |
func=lambda x: llm(x)["choices"][0]["text"], | |
description="Answer general questions using local LLaMA model" | |
) | |
] | |
return initialize_agent( | |
tools=tools, | |
llm=llm, | |
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, | |
verbose=True, | |
) | |
def make_local_agent(model_path: str): | |
# Use LangChain-compatible wrapper | |
llm = LlamaCpp( | |
model_path=model_path, | |
n_ctx=4096, | |
temperature=0.7, | |
verbose=True | |
) | |
tools = [ | |
Tool( | |
name="LLaMA Tool", | |
func=lambda x: llm.invoke(x), # Proper call | |
description="Use the LLaMA model to answer questions" | |
) | |
] | |
agent = initialize_agent( | |
tools=tools, | |
llm=llm, | |
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, | |
verbose=True | |
) | |
return agent | |
def write_file(filepath: str, content: str) -> str: | |
try: | |
with open(filepath, "w") as f: | |
f.write(content) | |
return f"β File written to {filepath}" | |
except Exception as e: | |
return f"β File Write Error: {e}" | |
def eval_math(expr: str) -> str: | |
try: | |
return str(eval(expr)) | |
except Exception as e: | |
return f"β Math Eval Error: {e}" | |
def translate(text: str, lang: str = "fr") -> str: | |
prompt = f"Translate this to {lang}:\n{text}" | |
return query_llama(prompt) | |
def summarize(text: str) -> str: | |
prompt = f"Summarize this:\n{text}" | |
return query_llama(prompt) | |
def run_command(cmd: str) -> str: | |
try: | |
result = subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=5) | |
return result.stdout.decode().strip() | |
except Exception as e: | |
return f"β Command Error: {e}" | |
def search_web(query: str) -> str: | |
try: | |
url = f"https://duckduckgo.com/html/?q={query.replace(' ', '+')}" | |
return f"π Try this DuckDuckGo search:\n{url}" | |
except Exception as e: | |
return f"β Web Search Error: {e}" | |
# === Task Planning === | |
def plan_task(query: str) -> Tuple[str, Union[str, List[str]]]: | |
q = query.lower() | |
if "read file" in q: | |
return "read_file", query.split()[-1] | |
elif "write file" in q: | |
parts = query.split("::") | |
return "write_file", parts if len(parts) == 2 else [None, None] | |
elif "calculate" in q or any(op in q for op in "+-*/"): | |
return "eval_math", query | |
elif "translate" in q: | |
return "translate", query | |
elif "summarize" in q: | |
return "summarize", query | |
elif "search" in q: | |
return "web_search", query | |
elif "run code" in q or "python" in q: | |
return "run_code", query | |
elif "run command" in q: | |
return "system_command", query.replace("run command", "").strip() | |
else: | |
return "llama_prompt", query | |
# === Main Handler === | |
def run_agent(prompt: str, temperature: float = 0.7, top_p: float = 0.9, stream: bool = False) -> str: | |
task, data = plan_task(prompt) | |
try: | |
if task == "run_code": | |
result = exec_python(data) | |
elif task == "read_file": | |
result = read_file(data) | |
elif task == "write_file": | |
result = write_file(data[0], data[1]) | |
elif task == "eval_math": | |
result = eval_math(data) | |
elif task == "translate": | |
result = translate(data) | |
elif task == "summarize": | |
result = summarize(data) | |
elif task == "web_search": | |
result = search_web(data) | |
elif task == "system_command": | |
result = run_command(data) | |
else: | |
result = query_llama(data, temperature=temperature, top_p=top_p, stream=stream) | |
except Exception as e: | |
result = f"β Error during task: {e}" | |
log_task(prompt, result) | |
return result | |
# === CLI === | |
if __name__ == "__main__": | |
print("π€ Enhanced LLaMA Agent Ready! (type 'exit' to quit)\n") | |
while True: | |
try: | |
prompt = input("π§ You > ") | |
if prompt.lower() in {"exit", "quit"}: | |
break | |
response = run_agent(prompt, stream=True) | |
print(f"π LLaMA > {response}\n") | |
except KeyboardInterrupt: | |
break | |