Spaces:
Sleeping
Sleeping
| # ============================================================ | |
| # CodeAgent - Sub-agent for code generation and execution | |
| # Modified to log inputs & outputs (file + jsonl) | |
| # Author: Mustafa Albakkar (modified) | |
| # ============================================================ | |
| import os | |
| import json | |
| import atexit | |
| import logging | |
| import traceback | |
| import io | |
| import sys | |
| import contextlib | |
| import tempfile | |
| from logging.handlers import RotatingFileHandler | |
| import gradio as gr | |
| from llama_cpp import Llama | |
| # ------------------------------------------------------------ | |
| # 🔧 إعداد التسجيل (Logs) — ملفي و console | |
| # ------------------------------------------------------------ | |
| LOG_DIR = os.environ.get("CODEAGENT_LOG_DIR", "logs") | |
| os.makedirs(LOG_DIR, exist_ok=True) | |
| log_file = os.path.join(LOG_DIR, "codeagent.log") | |
| jsonl_file = os.path.join(LOG_DIR, "records.jsonl") | |
| logger = logging.getLogger("CodeAgent") | |
| logger.setLevel(logging.INFO) | |
| # Console handler | |
| ch = logging.StreamHandler(sys.stdout) | |
| ch.setLevel(logging.INFO) | |
| ch_formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s") | |
| ch.setFormatter(ch_formatter) | |
| logger.addHandler(ch) | |
| # Rotating file handler | |
| fh = RotatingFileHandler(log_file, maxBytes=5 * 1024 * 1024, backupCount=5, encoding="utf-8") | |
| fh.setLevel(logging.INFO) | |
| fh_formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s") | |
| fh.setFormatter(fh_formatter) | |
| logger.addHandler(fh) | |
| # ------------------------------------------------------------ | |
| # 🧠 تحميل النموذج (Qwen2.5-Coder-14B-Instruct-GGUF) | |
| # ------------------------------------------------------------ | |
| llm = None | |
| try: | |
| logger.info("🔄 Attempting to load model...") | |
| llm = Llama.from_pretrained( | |
| repo_id="Qwen/Qwen2.5-Coder-14B-Instruct-GGUF", | |
| filename="qwen2.5-coder-14b-instruct-q6_k.gguf", | |
| n_gpu_layers=-1, | |
| n_ctx=4096, | |
| n_threads=4, | |
| ) | |
| logger.info("✅ Model loaded successfully.") | |
| except Exception as e: | |
| logger.exception("❌ Model load failed: %s", e) | |
| llm = None | |
| # ------------------------------------------------------------ | |
| # 🧩 دوال مساعدة | |
| # ------------------------------------------------------------ | |
| def safe_text(x): | |
| """تحويل النصوص أو القيم المعقدة إلى نص نظيف وصالح للعرض.""" | |
| if isinstance(x, str): | |
| return x.strip() | |
| try: | |
| return json.dumps(x, ensure_ascii=False) | |
| except Exception: | |
| return str(x) | |
| def compute_safe_max_tokens(prompt: str, max_ctx=4096): | |
| """تقدير عدد التوكنات الآمنة حسب طول الإدخال.""" | |
| try: | |
| l = max(1, len(prompt.split())) | |
| avail = max_ctx - l - 1 | |
| return max(128, min(1024, avail)) | |
| except Exception: | |
| return 256 | |
| def append_log_record(prompt: str, model_raw: str, code_clean: str, exec_result: str, extra: dict = None): | |
| """ | |
| يضيف سطر JSON (JSONL) في ملف للمعالجة اللاحقة. | |
| كل سطر يمثل طلب واحد مع الطوابع الزمنية. | |
| """ | |
| try: | |
| record = { | |
| "ts": logging.Formatter().formatTime(logging.LogRecord("x", logging.INFO, "", 0, "", (), None)), | |
| "prompt": prompt, | |
| "model_raw": model_raw, | |
| "code_clean": code_clean, | |
| "exec_result": exec_result, | |
| } | |
| if extra: | |
| record.update(extra) | |
| with open(jsonl_file, "a", encoding="utf-8") as f: | |
| f.write(json.dumps(record, ensure_ascii=False) + "\n") | |
| except Exception: | |
| logger.exception("Failed to append JSONL record.") | |
| # ------------------------------------------------------------ | |
| # 🧮 أداة تنفيذ الكود بأمان | |
| # ------------------------------------------------------------ | |
| def execute_python_code(code_str: str) -> str: | |
| """ | |
| تقوم هذه الدالة بتنفيذ الكود البرمجي الناتج من النموذج داخل بيئة معزولة، | |
| وتعيد المخرجات النصية أو الأخطاء. | |
| """ | |
| code_str = code_str.strip() | |
| # تنظيف الكود من علامات أو أمثلة غير ضرورية | |
| code_str = code_str.replace("```python", "").replace("```", "").strip() | |
| buffer = io.StringIO() | |
| try: | |
| # إعادة توجيه stdout و stderr | |
| with contextlib.redirect_stdout(buffer), contextlib.redirect_stderr(buffer): | |
| # نحدد local_env فقط (لا نعطي globals حساسة) | |
| local_env = {} | |
| # تنفيذ الكود — ضع في اعتبارك مخاطر exec عند تشغيل كود غير موثوق | |
| exec(code_str, {}, local_env) | |
| result = buffer.getvalue().strip() | |
| if not result: | |
| result = "✅ Code executed successfully (no printed output)." | |
| return result | |
| except Exception: | |
| tb = traceback.format_exc() | |
| return f"❌ Code execution error:\n{tb}" | |
| finally: | |
| buffer.close() | |
| # ------------------------------------------------------------ | |
| # 🎯 الدالة الأساسية للتوليد والتنفيذ (مع لوق مفصّل) | |
| # ------------------------------------------------------------ | |
| def generate_and_execute_fn(prompt: str) -> str: | |
| """ | |
| - توليد الكود البرمجي باستخدام نموذج Qwen2.5-Coder | |
| - تنفيذ الكود مباشرة | |
| - تسجيل المدخلات والمخرجات | |
| """ | |
| try: | |
| prompt = safe_text(prompt) | |
| if not prompt: | |
| return "⚠️ No prompt provided." | |
| if llm is None: | |
| logger.error("Model not loaded when requested.") | |
| return "❌ Model not loaded." | |
| # --- تحضير إدخال النموذج بصيغة محادثة --- | |
| formatted_prompt = ( | |
| f"<|im_start|>system\n" | |
| f"You are CodeAgent, a skilled Python coding assistant. " | |
| f"Generate correct, fully functional Python code that solves the given task.\n" | |
| f"Always wrap your code in triple backticks and do not include explanations.\n" | |
| f"<|im_end|>\n" | |
| f"<|im_start|>user\n{prompt}\n<|im_end|>\n" | |
| f"<|im_start|>assistant\n" | |
| ) | |
| max_tokens = compute_safe_max_tokens(formatted_prompt) | |
| # سجل المدخل (prompt) | |
| logger.info("🔔 New request received.") | |
| logger.info("➡️ Prompt: %s", prompt) | |
| # --- تنفيذ التوليد --- | |
| try: | |
| out = llm( | |
| formatted_prompt, | |
| max_tokens=max_tokens, | |
| temperature=0.2, | |
| stop=["<|im_end|>"] | |
| ) | |
| except TypeError: | |
| out = llm( | |
| formatted_prompt, | |
| max_new_tokens=max_tokens, | |
| temperature=0.2, | |
| stop=["<|im_end|>"] | |
| ) | |
| except Exception: | |
| logger.exception("Model generation failed.") | |
| return "❌ Model generation failed." | |
| # --- استخراج النص الناتج --- | |
| if isinstance(out, dict): | |
| # قد يختلف هيكل الإرجاع باختلاف binding | |
| try: | |
| text = out.get("choices", [{}])[0].get("text", "") or out.get("text", "") | |
| except Exception: | |
| text = json.dumps(out, ensure_ascii=False) | |
| else: | |
| text = str(out) | |
| model_raw = text or "" | |
| if not model_raw.strip(): | |
| logger.warning("Empty response from model.") | |
| return "⚠️ Empty response from model." | |
| logger.info("🧾 Model raw output (truncated 1000 chars):\n%s", model_raw[:1000]) | |
| # --- تنظيف النص لاستخراج الكود فقط --- | |
| # إذا كان النموذج يلف الكود بثلاثي backticks، نفرّغها | |
| code_candidate = model_raw.replace("```python", "").replace("```", "").strip() | |
| logger.info("🛠 Cleaned code (first 1000 chars):\n%s", code_candidate[:1000]) | |
| # --- تنفيذ الكود وإعادة النتيجة --- | |
| exec_result = execute_python_code(code_candidate) | |
| logger.info("📤 Execution result (truncated 2000 chars):\n%s", exec_result[:2000]) | |
| # إضافة سجل JSONL | |
| try: | |
| append_log_record(prompt=prompt, model_raw=model_raw, code_clean=code_candidate, exec_result=exec_result) | |
| except Exception: | |
| logger.exception("Failed to write jsonl record.") | |
| final_output = ( | |
| f"🧠 **Prompt:** {prompt}\n\n" | |
| f"💻 **Generated Code:**\n{code_candidate}\n\n" | |
| f"🧾 **Execution Result:**\n{exec_result}" | |
| ) | |
| return final_output | |
| except Exception: | |
| logger.exception("Generation/Execution error") | |
| return "❌ Internal error during generation/execution." | |
| # ------------------------------------------------------------ | |
| # 🧱 واجهة Gradio | |
| # ------------------------------------------------------------ | |
| iface = gr.Interface( | |
| fn=generate_and_execute_fn, | |
| inputs=gr.Textbox(lines=4, placeholder="💬 اكتب سؤالك البرمجي هنا..."), | |
| outputs=gr.Textbox(lines=15, label="💡 ناتج CodeAgent"), | |
| title="🤖 CodeAgent — Code Generator & Executor", | |
| description="وكيل يقوم بتوليد الكود البرمجي بلغة بايثون وتنفيذه مباشرة باستخدام نموذج Qwen2.5-Coder-14B-Instruct-GGUF." | |
| ) | |
| # ------------------------------------------------------------ | |
| # 🧹 إغلاق آمن للنموذج | |
| # ------------------------------------------------------------ | |
| def safe_close(): | |
| global llm | |
| try: | |
| if llm is not None: | |
| llm.close() | |
| logger.info("🧩 Model closed successfully.") | |
| except Exception: | |
| logger.exception("Error closing model.") | |
| finally: | |
| llm = None | |
| atexit.register(safe_close) | |
| # ------------------------------------------------------------ | |
| # 🚀 الإطلاق المتوافق مع الوكيل الأساسي | |
| # ------------------------------------------------------------ | |
| if __name__ == "__main__": | |
| port = int(os.environ.get("PORT", 7860)) | |
| logger.info(f"🚀 Launching CodeAgent on port {port}") | |
| iface.launch(server_name="0.0.0.0", server_port=port) |