Spaces:
Sleeping
Sleeping
import gradio as gr | |
import subprocess | |
import tempfile | |
import os | |
import json | |
import docker | |
import time | |
from typing import Dict, List, Tuple | |
import black | |
import autopep8 | |
import ast | |
import sys | |
from contextlib import redirect_stdout, redirect_stderr | |
from io import StringIO | |
class CodeRunner: | |
def __init__(self): | |
self.supported_languages = { | |
'python': {'ext': '.py', 'cmd': ['python'], 'formatter': self.format_python}, | |
'javascript': {'ext': '.js', 'cmd': ['node'], 'formatter': self.format_javascript}, | |
'typescript': {'ext': '.ts', 'cmd': ['ts-node'], 'formatter': None}, | |
'go': {'ext': '.go', 'cmd': ['go', 'run'], 'formatter': None}, | |
'rust': {'ext': '.rs', 'cmd': ['rustc', '--edition=2021'], 'formatter': None}, | |
'c': {'ext': '.c', 'cmd': ['gcc', '-o', '/tmp/output'], 'formatter': None}, | |
'cpp': {'ext': '.cpp', 'cmd': ['g++', '-o', '/tmp/output'], 'formatter': None}, | |
'java': {'ext': '.java', 'cmd': ['javac'], 'formatter': None}, | |
'bash': {'ext': '.sh', 'cmd': ['bash'], 'formatter': None} | |
} | |
# Security restrictions | |
self.forbidden_imports = { | |
'python': ['os', 'subprocess', 'sys', 'importlib', '__import__'], | |
'javascript': ['fs', 'child_process', 'cluster', 'worker_threads'], | |
'bash': ['rm', 'sudo', 'chmod', 'chown'] | |
} | |
def run_code_sandboxed(self, code: str, language: str = "python", timeout: int = 10) -> Dict: | |
""" | |
Safely execute code with multiple sandboxing layers | |
""" | |
if language not in self.supported_languages: | |
return { | |
"success": False, | |
"output": f"Language '{language}' not supported. Available: {list(self.supported_languages.keys())}", | |
"execution_time": 0 | |
} | |
# Security check | |
security_issues = self.check_security(code, language) | |
if security_issues: | |
return { | |
"success": False, | |
"output": f"Security violations detected: {', '.join(security_issues)}", | |
"execution_time": 0 | |
} | |
start_time = time.time() | |
try: | |
# Method 1: Direct execution for Python (fastest) | |
if language == "python": | |
result = self._run_python_restricted(code, timeout) | |
# Method 2: Docker sandboxing for other languages | |
else: | |
result = self._run_in_docker(code, language, timeout) | |
execution_time = time.time() - start_time | |
result["execution_time"] = round(execution_time, 3) | |
return result | |
except Exception as e: | |
return { | |
"success": False, | |
"output": f"Execution error: {str(e)}", | |
"execution_time": time.time() - start_time | |
} | |
def _run_python_restricted(self, code: str, timeout: int) -> Dict: | |
""" | |
Run Python code with restricted builtins and memory/time limits | |
""" | |
# Create restricted environment | |
restricted_builtins = { | |
'print': print, | |
'len': len, | |
'str': str, | |
'int': int, | |
'float': float, | |
'list': list, | |
'dict': dict, | |
'tuple': tuple, | |
'set': set, | |
'range': range, | |
'enumerate': enumerate, | |
'zip': zip, | |
'map': map, | |
'filter': filter, | |
'sorted': sorted, | |
'sum': sum, | |
'min': min, | |
'max': max, | |
'abs': abs, | |
'round': round, | |
'isinstance': isinstance, | |
'type': type, | |
'hasattr': hasattr, | |
'getattr': getattr, | |
'setattr': setattr, | |
} | |
# Capture output | |
stdout_buffer = StringIO() | |
stderr_buffer = StringIO() | |
try: | |
# Parse and validate AST | |
tree = ast.parse(code) | |
# Execute with restrictions | |
with redirect_stdout(stdout_buffer), redirect_stderr(stderr_buffer): | |
exec(compile(tree, '<string>', 'exec'), { | |
'__builtins__': restricted_builtins, | |
'__name__': '__main__' | |
}) | |
return { | |
"success": True, | |
"output": stdout_buffer.getvalue(), | |
"errors": stderr_buffer.getvalue() | |
} | |
except SyntaxError as e: | |
return { | |
"success": False, | |
"output": f"Syntax Error: {str(e)}" | |
} | |
except Exception as e: | |
return { | |
"success": False, | |
"output": f"Runtime Error: {str(e)}" | |
} | |
def _run_in_docker(self, code: str, language: str, timeout: int) -> Dict: | |
""" | |
Run code in isolated Docker container (requires Docker daemon) | |
""" | |
try: | |
client = docker.from_env() | |
# Language-specific Docker images | |
images = { | |
'javascript': 'node:18-alpine', | |
'typescript': 'node:18-alpine', | |
'go': 'golang:1.21-alpine', | |
'rust': 'rust:1.70-alpine', | |
'c': 'gcc:latest', | |
'cpp': 'gcc:latest', | |
'java': 'openjdk:17-alpine', | |
'bash': 'alpine:latest' | |
} | |
image = images.get(language, 'alpine:latest') | |
lang_config = self.supported_languages[language] | |
# Create temporary file | |
with tempfile.NamedTemporaryFile(mode='w', suffix=lang_config['ext'], delete=False) as f: | |
f.write(code) | |
temp_file = f.name | |
try: | |
# Run in container | |
result = client.containers.run( | |
image, | |
command=lang_config['cmd'] + [f'/tmp/code{lang_config["ext"]}'], | |
volumes={temp_file: {'bind': f'/tmp/code{lang_config["ext"]}', 'mode': 'ro'}}, | |
mem_limit='128m', | |
cpu_quota=50000, # 50% CPU | |
network_disabled=True, | |
timeout=timeout, | |
remove=True, | |
capture_output=True, | |
text=True | |
) | |
return { | |
"success": True, | |
"output": result.decode('utf-8') if isinstance(result, bytes) else str(result) | |
} | |
finally: | |
os.unlink(temp_file) | |
except docker.errors.ContainerError as e: | |
return {"success": False, "output": f"Container error: {e.stderr.decode()}"} | |
except docker.errors.ImageNotFound: | |
return {"success": False, "output": f"Docker image not found for {language}"} | |
except Exception as e: | |
return {"success": False, "output": f"Docker execution failed: {str(e)}"} | |
def _run_with_subprocess(self, code: str, language: str, timeout: int) -> Dict: | |
""" | |
Fallback: Run using subprocess with basic sandboxing | |
""" | |
lang_config = self.supported_languages[language] | |
with tempfile.NamedTemporaryFile(mode='w', suffix=lang_config['ext'], delete=False) as f: | |
f.write(code) | |
temp_file = f.name | |
try: | |
result = subprocess.run( | |
lang_config['cmd'] + [temp_file], | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
text=True, | |
timeout=timeout, | |
cwd='/tmp', # Restrict to /tmp directory | |
env={'PATH': '/usr/bin:/bin'} # Minimal environment | |
) | |
return { | |
"success": result.returncode == 0, | |
"output": result.stdout, | |
"errors": result.stderr | |
} | |
except subprocess.TimeoutExpired: | |
return {"success": False, "output": f"Execution timed out after {timeout}s"} | |
except FileNotFoundError: | |
return {"success": False, "output": f"Language runtime not found for {language}"} | |
finally: | |
os.unlink(temp_file) | |
def check_security(self, code: str, language: str) -> List[str]: | |
""" | |
Basic security checks for dangerous patterns | |
""" | |
issues = [] | |
forbidden = self.forbidden_imports.get(language, []) | |
code_lower = code.lower() | |
# Check forbidden imports/modules | |
for forbidden_item in forbidden: | |
if forbidden_item in code_lower: | |
issues.append(f"Forbidden: {forbidden_item}") | |
# Language-specific checks | |
if language == "python": | |
dangerous_patterns = ['eval(', 'exec(', '__import__', 'open(', 'file('] | |
for pattern in dangerous_patterns: | |
if pattern in code_lower: | |
issues.append(f"Dangerous pattern: {pattern}") | |
elif language == "bash": | |
dangerous_cmds = ['rm -rf', 'sudo', '> /dev/', 'curl', 'wget'] | |
for cmd in dangerous_cmds: | |
if cmd in code_lower: | |
issues.append(f"Dangerous command: {cmd}") | |
return issues | |
def format_python(self, code: str) -> str: | |
"""Format Python code using black and autopep8""" | |
try: | |
# First pass with autopep8 | |
formatted = autopep8.fix_code(code) | |
# Second pass with black | |
formatted = black.format_str(formatted, mode=black.Mode()) | |
return formatted | |
except Exception as e: | |
return f"Formatting error: {str(e)}" | |
def format_javascript(self, code: str) -> str: | |
"""Basic JavaScript formatting (would use prettier in production)""" | |
# This is a simplified formatter - in production, use prettier | |
lines = code.split('\n') | |
formatted_lines = [] | |
indent_level = 0 | |
for line in lines: | |
stripped = line.strip() | |
if not stripped: | |
formatted_lines.append('') | |
continue | |
if stripped.endswith('{'): | |
formatted_lines.append(' ' * indent_level + stripped) | |
indent_level += 1 | |
elif stripped.startswith('}'): | |
indent_level = max(0, indent_level - 1) | |
formatted_lines.append(' ' * indent_level + stripped) | |
else: | |
formatted_lines.append(' ' * indent_level + stripped) | |
return '\n'.join(formatted_lines) | |
def format_code(self, code: str, language: str = "python") -> str: | |
"""Format code based on language""" | |
if language not in self.supported_languages: | |
return f"Formatting not supported for {language}" | |
formatter = self.supported_languages[language]['formatter'] | |
if formatter: | |
return formatter(code) | |
else: | |
return f"No formatter available for {language}" | |
def analyze_code(self, code: str, language: str = "python") -> List[str]: | |
"""Analyze code for potential issues""" | |
issues = [] | |
# Security analysis | |
security_issues = self.check_security(code, language) | |
issues.extend([f"Security: {issue}" for issue in security_issues]) | |
# Language-specific analysis | |
if language == "python": | |
try: | |
tree = ast.parse(code) | |
# Check for common issues | |
for node in ast.walk(tree): | |
if isinstance(node, ast.Global): | |
issues.append("Code Quality: Avoid global variables") | |
elif isinstance(node, ast.Import): | |
for alias in node.names: | |
if alias.name.startswith('_'): | |
issues.append(f"Style: Avoid importing private modules: {alias.name}") | |
elif isinstance(node, ast.FunctionDef): | |
if len(node.args.args) > 5: | |
issues.append(f"Code Quality: Function '{node.name}' has too many parameters") | |
except SyntaxError as e: | |
issues.append(f"Syntax Error: {str(e)}") | |
return issues if issues else ["No issues found"] | |
def generate_code(self, prompt: str, language: str = "python") -> str: | |
"""Generate basic code templates based on prompt""" | |
templates = { | |
"python": { | |
"function": "def {name}():\n \"\"\"\n {description}\n \"\"\"\n pass", | |
"class": "class {name}:\n def __init__(self):\n pass", | |
"api": "import requests\n\ndef fetch_data(url):\n response = requests.get(url)\n return response.json()", | |
"file": "with open('file.txt', 'r') as f:\n content = f.read()\nprint(content)" | |
}, | |
"javascript": { | |
"function": "function {name}() {\n // {description}\n return null;\n}", | |
"async": "async function {name}() {\n try {\n // {description}\n return await someAsyncOperation();\n } catch (error) {\n console.error(error);\n }\n}", | |
"api": "fetch('/api/data')\n .then(response => response.json())\n .then(data => console.log(data));" | |
} | |
} | |
prompt_lower = prompt.lower() | |
lang_templates = templates.get(language, {}) | |
# Simple template matching | |
if "function" in prompt_lower: | |
template = lang_templates.get("function", f"// {prompt}") | |
return template.format(name="myFunction", description=prompt) | |
elif "class" in prompt_lower and language == "python": | |
template = lang_templates.get("class", f"# {prompt}") | |
return template.format(name="MyClass") | |
elif "api" in prompt_lower or "fetch" in prompt_lower: | |
return lang_templates.get("api", f"// API code for: {prompt}") | |
else: | |
return f"# Generated code for: {prompt}\n# TODO: Implement functionality" | |
# Initialize the CodeRunner | |
runner = CodeRunner() | |
# Gradio interface functions | |
def run_code_interface(code: str, language: str, timeout: int = 10): | |
result = runner.run_code_sandboxed(code, language.lower(), timeout) | |
output = [] | |
output.append(f"β Success: {result['success']}") | |
output.append(f"β±οΈ Execution Time: {result.get('execution_time', 0)}s") | |
output.append(f"\nπ€ Output:\n{result.get('output', '')}") | |
if result.get('errors'): | |
output.append(f"\nβ Errors:\n{result['errors']}") | |
return "\n".join(output) | |
def format_code_interface(code: str, language: str): | |
return runner.format_code(code, language.lower()) | |
def analyze_code_interface(code: str, language: str): | |
issues = runner.analyze_code(code, language.lower()) | |
return "\n".join([f"β’ {issue}" for issue in issues]) | |
def generate_code_interface(prompt: str, language: str): | |
return runner.generate_code(prompt, language.lower()) | |
# Create Gradio interfaces | |
with gr.Blocks(title="CodeRunner - Multi-Language Code Execution Tool") as demo: | |
gr.Markdown("# π CodeRunner - Secure Multi-Language Code Execution") | |
gr.Markdown("Execute, format, analyze, and generate code in multiple programming languages with built-in security sandboxing.") | |
with gr.Tab("π Run Code"): | |
with gr.Row(): | |
with gr.Column(scale=2): | |
code_input = gr.Textbox( | |
label="Code", | |
lines=10, | |
placeholder="Enter your code here...", | |
value="print('Hello, World!')" | |
) | |
language_select = gr.Dropdown( | |
choices=list(runner.supported_languages.keys()), | |
value="python", | |
label="Language" | |
) | |
timeout_slider = gr.Slider(1, 30, value=10, label="Timeout (seconds)") | |
with gr.Column(scale=2): | |
output = gr.Textbox(label="Output", lines=12, max_lines=20) | |
run_btn = gr.Button("βΆοΈ Run Code", variant="primary") | |
run_btn.click( | |
run_code_interface, | |
inputs=[code_input, language_select, timeout_slider], | |
outputs=output | |
) | |
with gr.Tab("β¨ Format Code"): | |
with gr.Row(): | |
with gr.Column(): | |
format_input = gr.Textbox(label="Code to Format", lines=8) | |
format_lang = gr.Dropdown(choices=["python", "javascript"], value="python", label="Language") | |
format_btn = gr.Button("π¨ Format Code") | |
with gr.Column(): | |
format_output = gr.Textbox(label="Formatted Code", lines=8) | |
format_btn.click(format_code_interface, inputs=[format_input, format_lang], outputs=format_output) | |
with gr.Tab("π Analyze Code"): | |
with gr.Row(): | |
with gr.Column(): | |
analyze_input = gr.Textbox(label="Code to Analyze", lines=8) | |
analyze_lang = gr.Dropdown(choices=list(runner.supported_languages.keys()), value="python", label="Language") | |
analyze_btn = gr.Button("π Analyze Code") | |
with gr.Column(): | |
analyze_output = gr.Textbox(label="Analysis Results", lines=8) | |
analyze_btn.click(analyze_code_interface, inputs=[analyze_input, analyze_lang], outputs=analyze_output) | |
with gr.Tab("π€ Generate Code"): | |
with gr.Row(): | |
with gr.Column(): | |
prompt_input = gr.Textbox(label="Describe what you want to generate", lines=3, placeholder="e.g., 'Create a function that calculates fibonacci numbers'") | |
generate_lang = gr.Dropdown(choices=["python", "javascript"], value="python", label="Language") | |
generate_btn = gr.Button("π Generate Code") | |
with gr.Column(): | |
generate_output = gr.Textbox(label="Generated Code", lines=10) | |
generate_btn.click(generate_code_interface, inputs=[prompt_input, generate_lang], outputs=generate_output) | |
with gr.Tab("π API Documentation"): | |
gr.Markdown(""" | |
## API Endpoints | |
This CodeRunner can be used as a HuggingChat tool with the following functions: | |
### 1. `run_code(code: str, language: str, timeout: int = 10)` | |
- Executes code safely in a sandboxed environment | |
- Supports: Python, JavaScript, TypeScript, Go, Rust, C, C++, Java, Bash | |
- Returns: execution result with output and timing | |
### 2. `format_code(code: str, language: str)` | |
- Formats and prettifies code | |
- Currently supports: Python (black), JavaScript (basic) | |
- Returns: formatted code string | |
### 3. `analyze_code(code: str, language: str)` | |
- Analyzes code for security issues and code quality | |
- Checks for dangerous patterns and common mistakes | |
- Returns: list of issues and recommendations | |
### 4. `generate_code(prompt: str, language: str)` | |
- Generates basic code templates from natural language | |
- Supports common patterns: functions, classes, API calls | |
- Returns: generated code template | |
## Security Features | |
- Restricted Python execution environment | |
- Docker containerization for other languages | |
- Memory and CPU limits | |
- Network isolation | |
- Forbidden imports/commands detection | |
""") | |
if __name__ == "__main__": | |
demo.launch(server_name="0.0.0.0", server_port=7860) |