File size: 6,408 Bytes
293ab16
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
from typing import List, Tuple, Union
from llama_cpp import Llama
import os, json, subprocess, datetime, requests
from langchain_community.llms import LlamaCpp
from langchain.agents.agent_types import AgentType
from langchain.agents import Tool, AgentExecutor, initialize_agent
from langchain.agents.agent_types import AgentType
from langchain.agents import initialize_agent, Tool, AgentExecutor, AgentType
# === Load Model ===
MODEL_PATH = "models/capybarahermes-2.5-mistral-7b.Q5_K_S.gguf"
if not os.path.exists(MODEL_PATH):
    raise FileNotFoundError(f"❌ Model not found at {MODEL_PATH}")

llm = Llama(model_path=MODEL_PATH, n_ctx=2048, verbose=True)
HISTORY_FILE = "agent_logs.json"

# === Log Prompt/Response ===
def log_task(prompt: str, response: str):
    log = {
        "timestamp": datetime.datetime.now().isoformat(),
        "prompt": prompt,
        "response": response,
    }
    with open(HISTORY_FILE, "a") as f:
        f.write(json.dumps(log) + "\n")

# === Query LLaMA ===
def query_llama(prompt: str, max_tokens: int = 256, temperature: float = 0.7, top_p: float = 0.9, stream: bool = False) -> str:
    output = llm(prompt=prompt, max_tokens=max_tokens, temperature=temperature, top_p=top_p, echo=False, stream=stream)
    if stream:
        return "".join([chunk["choices"][0]["text"] for chunk in output]).strip()
    return output["choices"][0]["text"].strip()

# === Utilities ===
def exec_python(code: str) -> str:
    try:
        local_env = {}
        exec(code, {}, local_env)
        return str(local_env.get("result", "βœ… Code executed."))
    except Exception as e:
        return f"❌ Python Error: {e}"

def read_file(filepath: str) -> str:
    try:
        if not os.path.exists(filepath):
            return f"❌ File not found: {filepath}"
        with open(filepath, "r") as f:
            return f.read()
    except Exception as e:
        return f"❌ File Read Error: {e}"
def create_langchain_agent(model_path: str) -> AgentExecutor:
    llm = Llama(
        model_path=model_path,
        temperature=0.7,
        max_tokens=512,
        verbose=True,
    )

    tools = [
        Tool(
            name="LLaMA Model",
            func=lambda x: llm(x)["choices"][0]["text"],
            description="Answer general questions using local LLaMA model"
        )
    ]

    return initialize_agent(
        tools=tools,
        llm=llm,
        agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
        verbose=True,
    )
def make_local_agent(model_path: str):
    # Use LangChain-compatible wrapper
    llm = LlamaCpp(
        model_path=model_path,
        n_ctx=4096,
        temperature=0.7,
        verbose=True
    )

    tools = [
        Tool(
            name="LLaMA Tool",
            func=lambda x: llm.invoke(x),  # Proper call
            description="Use the LLaMA model to answer questions"
        )
    ]

    agent = initialize_agent(
        tools=tools,
        llm=llm,
        agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
        verbose=True
    )

    return agent
def write_file(filepath: str, content: str) -> str:
    try:
        with open(filepath, "w") as f:
            f.write(content)
        return f"βœ… File written to {filepath}"
    except Exception as e:
        return f"❌ File Write Error: {e}"

def eval_math(expr: str) -> str:
    try:
        return str(eval(expr))
    except Exception as e:
        return f"❌ Math Eval Error: {e}"

def translate(text: str, lang: str = "fr") -> str:
    prompt = f"Translate this to {lang}:\n{text}"
    return query_llama(prompt)

def summarize(text: str) -> str:
    prompt = f"Summarize this:\n{text}"
    return query_llama(prompt)

def run_command(cmd: str) -> str:
    try:
        result = subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=5)
        return result.stdout.decode().strip()
    except Exception as e:
        return f"❌ Command Error: {e}"

def search_web(query: str) -> str:
    try:
        url = f"https://duckduckgo.com/html/?q={query.replace(' ', '+')}"
        return f"πŸ”— Try this DuckDuckGo search:\n{url}"
    except Exception as e:
        return f"❌ Web Search Error: {e}"

# === Task Planning ===
def plan_task(query: str) -> Tuple[str, Union[str, List[str]]]:
    q = query.lower()
    if "read file" in q:
        return "read_file", query.split()[-1]
    elif "write file" in q:
        parts = query.split("::")
        return "write_file", parts if len(parts) == 2 else [None, None]
    elif "calculate" in q or any(op in q for op in "+-*/"):
        return "eval_math", query
    elif "translate" in q:
        return "translate", query
    elif "summarize" in q:
        return "summarize", query
    elif "search" in q:
        return "web_search", query
    elif "run code" in q or "python" in q:
        return "run_code", query
    elif "run command" in q:
        return "system_command", query.replace("run command", "").strip()
    else:
        return "llama_prompt", query

# === Main Handler ===
def run_agent(prompt: str, temperature: float = 0.7, top_p: float = 0.9, stream: bool = False) -> str:
    task, data = plan_task(prompt)
    try:
        if task == "run_code":
            result = exec_python(data)
        elif task == "read_file":
            result = read_file(data)
        elif task == "write_file":
            result = write_file(data[0], data[1])
        elif task == "eval_math":
            result = eval_math(data)
        elif task == "translate":
            result = translate(data)
        elif task == "summarize":
            result = summarize(data)
        elif task == "web_search":
            result = search_web(data)
        elif task == "system_command":
            result = run_command(data)
        else:
            result = query_llama(data, temperature=temperature, top_p=top_p, stream=stream)
    except Exception as e:
        result = f"❌ Error during task: {e}"

    log_task(prompt, result)
    return result

# === CLI ===
if __name__ == "__main__":
    print("πŸ€– Enhanced LLaMA Agent Ready! (type 'exit' to quit)\n")
    while True:
        try:
            prompt = input("🧠 You > ")
            if prompt.lower() in {"exit", "quit"}:
                break
            response = run_agent(prompt, stream=True)
            print(f"πŸ“ LLaMA > {response}\n")
        except KeyboardInterrupt:
            break