#!/usr/bin/env python # coding=utf-8 # Copyright 2024 The HuggingFace Inc. team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import base64 import json import pickle import re import time from io import BytesIO from pathlib import Path from textwrap import dedent from typing import Any, Dict, List, Tuple import requests from PIL import Image from .local_python_executor import PythonExecutor from .monitoring import LogLevel from .tools import Tool, get_tools_definition_code try: from dotenv import load_dotenv load_dotenv() except ModuleNotFoundError: pass class RemotePythonExecutor(PythonExecutor): def __init__(self, additional_imports: List[str], logger): self.additional_imports = additional_imports self.logger = logger self.logger.log("Initializing executor, hold on...") self.final_answer_pattern = re.compile(r"^final_answer\((.*)\)$", re.M) self.installed_packages = [] def run_code_raise_errors(self, code: str, return_final_answer: bool = False) -> Tuple[Any, str]: raise NotImplementedError def send_tools(self, tools: Dict[str, Tool]): tool_definition_code = get_tools_definition_code(tools) packages_to_install = set() for tool in tools.values(): for package in tool.to_dict()["requirements"]: if package not in self.installed_packages: packages_to_install.add(package) self.installed_packages.append(package) execution = self.run_code_raise_errors( f"!pip install {' '.join(packages_to_install)}\n" + tool_definition_code ) self.logger.log(execution[1]) def send_variables(self, variables: dict): """ Send variables to the kernel namespace using pickle. """ pickled_vars = base64.b64encode(pickle.dumps(variables)).decode() code = f""" import pickle, base64 vars_dict = pickle.loads(base64.b64decode('{pickled_vars}')) locals().update(vars_dict) """ self.run_code_raise_errors(code) def __call__(self, code_action: str) -> Tuple[Any, str, bool]: """Check if code is a final answer and run it accordingly""" is_final_answer = bool(self.final_answer_pattern.search(code_action)) output = self.run_code_raise_errors(code_action, return_final_answer=is_final_answer) return output[0], output[1], is_final_answer def install_packages(self, additional_imports: List[str]): additional_imports = additional_imports + ["smolagents"] self.run_code_raise_errors(f"!pip install {' '.join(additional_imports)}") return additional_imports class E2BExecutor(RemotePythonExecutor): def __init__(self, additional_imports: List[str], logger): super().__init__(additional_imports, logger) try: from e2b_code_interpreter import Sandbox except ModuleNotFoundError: raise ModuleNotFoundError( """Please install 'e2b' extra to use E2BExecutor: `pip install 'smolagents[e2b]'`""" ) self.sandbox = Sandbox() self.installed_packages = self.install_packages(additional_imports) self.logger.log("E2B is running", level=LogLevel.INFO) def run_code_raise_errors(self, code: str, return_final_answer: bool = False) -> Tuple[Any, str]: execution = self.sandbox.run_code( code, ) if execution.error: execution_logs = "\n".join([str(log) for log in execution.logs.stdout]) logs = execution_logs logs += "Executing code yielded an error:" logs += execution.error.name logs += execution.error.value logs += execution.error.traceback raise ValueError(logs) self.logger.log(execution.logs) execution_logs = "\n".join([str(log) for log in execution.logs.stdout]) if not execution.results: return None, execution_logs else: for result in execution.results: if result.is_main_result: for attribute_name in ["jpeg", "png"]: if getattr(result, attribute_name) is not None: image_output = getattr(result, attribute_name) decoded_bytes = base64.b64decode(image_output.encode("utf-8")) return Image.open(BytesIO(decoded_bytes)), execution_logs for attribute_name in [ "chart", "data", "html", "javascript", "json", "latex", "markdown", "pdf", "svg", "text", ]: if getattr(result, attribute_name) is not None: return getattr(result, attribute_name), execution_logs if return_final_answer: raise ValueError("No main result returned by executor!") return None, execution_logs class DockerExecutor(RemotePythonExecutor): """ Executes Python code using Jupyter Kernel Gateway in a Docker container. """ def __init__( self, additional_imports: List[str], logger, host: str = "127.0.0.1", port: int = 8888, ): """ Initialize the Docker-based Jupyter Kernel Gateway executor. """ super().__init__(additional_imports, logger) try: import docker from websocket import create_connection except ModuleNotFoundError: raise ModuleNotFoundError( "Please install 'docker' extra to use DockerExecutor: `pip install 'smolagents[docker]'`" ) self.host = host self.port = port # Initialize Docker try: self.client = docker.from_env() except docker.errors.DockerException as e: raise RuntimeError("Could not connect to Docker daemon: make sure Docker is running.") from e # Build and start container try: self.logger.log("Building Docker image...", level=LogLevel.INFO) dockerfile_path = Path(__file__).parent / "Dockerfile" if not dockerfile_path.exists(): with open(dockerfile_path, "w") as f: f.write("""FROM python:3.12-slim RUN pip install jupyter_kernel_gateway requests numpy pandas RUN pip install jupyter_client notebook EXPOSE 8888 CMD ["jupyter", "kernelgateway", "--KernelGatewayApp.ip='0.0.0.0'", "--KernelGatewayApp.port=8888", "--KernelGatewayApp.allow_origin='*'"] """) _, build_logs = self.client.images.build( path=str(dockerfile_path.parent), dockerfile=str(dockerfile_path), tag="jupyter-kernel" ) self.logger.log(build_logs, level=LogLevel.DEBUG) self.logger.log(f"Starting container on {host}:{port}...", level=LogLevel.INFO) self.container = self.client.containers.run( "jupyter-kernel", ports={"8888/tcp": (host, port)}, detach=True ) retries = 0 while self.container.status != "running" and retries < 5: self.logger.log(f"Container status: {self.container.status}, waiting...", level=LogLevel.INFO) time.sleep(1) self.container.reload() retries += 1 self.base_url = f"http://{host}:{port}" # Create new kernel via HTTP r = requests.post(f"{self.base_url}/api/kernels") if r.status_code != 201: error_details = { "status_code": r.status_code, "headers": dict(r.headers), "url": r.url, "body": r.text, "request_method": r.request.method, "request_headers": dict(r.request.headers), "request_body": r.request.body, } self.logger.log_error(f"Failed to create kernel. Details: {json.dumps(error_details, indent=2)}") raise RuntimeError(f"Failed to create kernel: Status {r.status_code}\nResponse: {r.text}") from None self.kernel_id = r.json()["id"] ws_url = f"ws://{host}:{port}/api/kernels/{self.kernel_id}/channels" self.ws = create_connection(ws_url) self.installed_packages = self.install_packages(additional_imports) self.logger.log( f"Container {self.container.short_id} is running with kernel {self.kernel_id}", level=LogLevel.INFO ) except Exception as e: self.cleanup() raise RuntimeError(f"Failed to initialize Jupyter kernel: {e}") from e def run_code_raise_errors(self, code_action: str, return_final_answer: bool = False) -> Tuple[Any, str]: """ Execute code and return result based on whether it's a final answer. """ try: if return_final_answer: match = self.final_answer_pattern.search(code_action) if match: pre_final_answer_code = self.final_answer_pattern.sub("", code_action) result_expr = match.group(1) wrapped_code = pre_final_answer_code + dedent(f""" import pickle, base64 _result = {result_expr} print("RESULT_PICKLE:" + base64.b64encode(pickle.dumps(_result)).decode()) """) else: wrapped_code = code_action # Send execute request msg_id = self._send_execute_request(wrapped_code) # Collect output and results outputs = [] result = None waiting_for_idle = False while True: msg = json.loads(self.ws.recv()) msg_type = msg.get("msg_type", "") parent_msg_id = msg.get("parent_header", {}).get("msg_id") # Only process messages related to our execute request if parent_msg_id != msg_id: continue if msg_type == "stream": text = msg["content"]["text"] if return_final_answer and text.startswith("RESULT_PICKLE:"): pickle_data = text[len("RESULT_PICKLE:") :].strip() result = pickle.loads(base64.b64decode(pickle_data)) waiting_for_idle = True else: outputs.append(text) elif msg_type == "error": traceback = msg["content"].get("traceback", []) raise RuntimeError("\n".join(traceback)) from None elif msg_type == "status" and msg["content"]["execution_state"] == "idle": if not return_final_answer or waiting_for_idle: break return result, "".join(outputs) except Exception as e: self.logger.log_error(f"Code execution failed: {e}") raise def _send_execute_request(self, code: str) -> str: """Send code execution request to kernel.""" import uuid # Generate a unique message ID msg_id = str(uuid.uuid4()) # Create execute request execute_request = { "header": { "msg_id": msg_id, "username": "anonymous", "session": str(uuid.uuid4()), "msg_type": "execute_request", "version": "5.0", }, "parent_header": {}, "metadata": {}, "content": { "code": code, "silent": False, "store_history": True, "user_expressions": {}, "allow_stdin": False, }, } self.ws.send(json.dumps(execute_request)) return msg_id def cleanup(self): """Clean up resources.""" try: if hasattr(self, "container"): self.logger.log(f"Stopping and removing container {self.container.short_id}...", level=LogLevel.INFO) self.container.stop() self.container.remove() self.logger.log("Container cleanup completed", level=LogLevel.INFO) except Exception as e: self.logger.log_error(f"Error during cleanup: {e}") def delete(self): """Ensure cleanup on deletion.""" self.cleanup() __all__ = ["E2BExecutor", "DockerExecutor"]