Spaces:
Sleeping
Sleeping
File size: 6,408 Bytes
293ab16 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
from typing import List, Tuple, Union
from llama_cpp import Llama
import os, json, subprocess, datetime, requests
from langchain_community.llms import LlamaCpp
from langchain.agents.agent_types import AgentType
from langchain.agents import Tool, AgentExecutor, initialize_agent
from langchain.agents.agent_types import AgentType
from langchain.agents import initialize_agent, Tool, AgentExecutor, AgentType
# === Load Model ===
MODEL_PATH = "models/capybarahermes-2.5-mistral-7b.Q5_K_S.gguf"
if not os.path.exists(MODEL_PATH):
raise FileNotFoundError(f"β Model not found at {MODEL_PATH}")
llm = Llama(model_path=MODEL_PATH, n_ctx=2048, verbose=True)
HISTORY_FILE = "agent_logs.json"
# === Log Prompt/Response ===
def log_task(prompt: str, response: str):
log = {
"timestamp": datetime.datetime.now().isoformat(),
"prompt": prompt,
"response": response,
}
with open(HISTORY_FILE, "a") as f:
f.write(json.dumps(log) + "\n")
# === Query LLaMA ===
def query_llama(prompt: str, max_tokens: int = 256, temperature: float = 0.7, top_p: float = 0.9, stream: bool = False) -> str:
output = llm(prompt=prompt, max_tokens=max_tokens, temperature=temperature, top_p=top_p, echo=False, stream=stream)
if stream:
return "".join([chunk["choices"][0]["text"] for chunk in output]).strip()
return output["choices"][0]["text"].strip()
# === Utilities ===
def exec_python(code: str) -> str:
try:
local_env = {}
exec(code, {}, local_env)
return str(local_env.get("result", "β
Code executed."))
except Exception as e:
return f"β Python Error: {e}"
def read_file(filepath: str) -> str:
try:
if not os.path.exists(filepath):
return f"β File not found: {filepath}"
with open(filepath, "r") as f:
return f.read()
except Exception as e:
return f"β File Read Error: {e}"
def create_langchain_agent(model_path: str) -> AgentExecutor:
llm = Llama(
model_path=model_path,
temperature=0.7,
max_tokens=512,
verbose=True,
)
tools = [
Tool(
name="LLaMA Model",
func=lambda x: llm(x)["choices"][0]["text"],
description="Answer general questions using local LLaMA model"
)
]
return initialize_agent(
tools=tools,
llm=llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
verbose=True,
)
def make_local_agent(model_path: str):
# Use LangChain-compatible wrapper
llm = LlamaCpp(
model_path=model_path,
n_ctx=4096,
temperature=0.7,
verbose=True
)
tools = [
Tool(
name="LLaMA Tool",
func=lambda x: llm.invoke(x), # Proper call
description="Use the LLaMA model to answer questions"
)
]
agent = initialize_agent(
tools=tools,
llm=llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
verbose=True
)
return agent
def write_file(filepath: str, content: str) -> str:
try:
with open(filepath, "w") as f:
f.write(content)
return f"β
File written to {filepath}"
except Exception as e:
return f"β File Write Error: {e}"
def eval_math(expr: str) -> str:
try:
return str(eval(expr))
except Exception as e:
return f"β Math Eval Error: {e}"
def translate(text: str, lang: str = "fr") -> str:
prompt = f"Translate this to {lang}:\n{text}"
return query_llama(prompt)
def summarize(text: str) -> str:
prompt = f"Summarize this:\n{text}"
return query_llama(prompt)
def run_command(cmd: str) -> str:
try:
result = subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=5)
return result.stdout.decode().strip()
except Exception as e:
return f"β Command Error: {e}"
def search_web(query: str) -> str:
try:
url = f"https://duckduckgo.com/html/?q={query.replace(' ', '+')}"
return f"π Try this DuckDuckGo search:\n{url}"
except Exception as e:
return f"β Web Search Error: {e}"
# === Task Planning ===
def plan_task(query: str) -> Tuple[str, Union[str, List[str]]]:
q = query.lower()
if "read file" in q:
return "read_file", query.split()[-1]
elif "write file" in q:
parts = query.split("::")
return "write_file", parts if len(parts) == 2 else [None, None]
elif "calculate" in q or any(op in q for op in "+-*/"):
return "eval_math", query
elif "translate" in q:
return "translate", query
elif "summarize" in q:
return "summarize", query
elif "search" in q:
return "web_search", query
elif "run code" in q or "python" in q:
return "run_code", query
elif "run command" in q:
return "system_command", query.replace("run command", "").strip()
else:
return "llama_prompt", query
# === Main Handler ===
def run_agent(prompt: str, temperature: float = 0.7, top_p: float = 0.9, stream: bool = False) -> str:
task, data = plan_task(prompt)
try:
if task == "run_code":
result = exec_python(data)
elif task == "read_file":
result = read_file(data)
elif task == "write_file":
result = write_file(data[0], data[1])
elif task == "eval_math":
result = eval_math(data)
elif task == "translate":
result = translate(data)
elif task == "summarize":
result = summarize(data)
elif task == "web_search":
result = search_web(data)
elif task == "system_command":
result = run_command(data)
else:
result = query_llama(data, temperature=temperature, top_p=top_p, stream=stream)
except Exception as e:
result = f"β Error during task: {e}"
log_task(prompt, result)
return result
# === CLI ===
if __name__ == "__main__":
print("π€ Enhanced LLaMA Agent Ready! (type 'exit' to quit)\n")
while True:
try:
prompt = input("π§ You > ")
if prompt.lower() in {"exit", "quit"}:
break
response = run_agent(prompt, stream=True)
print(f"π LLaMA > {response}\n")
except KeyboardInterrupt:
break
|