#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Security Scanner Service This module provides functionality for scanning code for security vulnerabilities. """ import os import subprocess import logging import json import tempfile from collections import defaultdict logger = logging.getLogger(__name__) class SecurityScanner: """ Service for scanning code for security vulnerabilities. """ def __init__(self): """ Initialize the SecurityScanner. """ logger.info("Initialized SecurityScanner") self.scanners = { 'Python': self._scan_python, 'JavaScript': self._scan_javascript, 'TypeScript': self._scan_javascript, # TypeScript uses the same scanner as JavaScript 'Java': self._scan_java, 'Go': self._scan_go, 'Rust': self._scan_rust, } def scan_repository(self, repo_path, languages): """ Scan a repository for security vulnerabilities in the specified languages. Args: repo_path (str): The path to the repository. languages (list): A list of programming languages to scan. Returns: dict: A dictionary containing scan results for each language. """ logger.info(f"Scanning repository at {repo_path} for security vulnerabilities in languages: {languages}") results = {} # Scan dependencies first (language-agnostic) results['dependencies'] = self._scan_dependencies(repo_path) # Scan each language for language in languages: if language in self.scanners: try: logger.info(f"Scanning {language} code in {repo_path} for security vulnerabilities") results[language] = self.scanners[language](repo_path) except Exception as e: logger.error(f"Error scanning {language} code for security vulnerabilities: {e}") results[language] = { 'status': 'error', 'error': str(e), 'vulnerabilities': [], } else: logger.warning(f"No security scanner available for {language}") results[language] = { 'status': 'not_supported', 'message': f"Security scanning for {language} is not supported yet.", 'vulnerabilities': [], } return results def _scan_dependencies(self, repo_path): """ Scan dependencies for known vulnerabilities. Args: repo_path (str): The path to the repository. Returns: dict: Dependency scan results. """ logger.info(f"Scanning dependencies in {repo_path}") results = { 'python': self._scan_python_dependencies(repo_path), 'javascript': self._scan_javascript_dependencies(repo_path), 'java': self._scan_java_dependencies(repo_path), 'go': self._scan_go_dependencies(repo_path), 'rust': self._scan_rust_dependencies(repo_path), } # Aggregate vulnerabilities all_vulnerabilities = [] for lang_result in results.values(): all_vulnerabilities.extend(lang_result.get('vulnerabilities', [])) return { 'status': 'success', 'vulnerabilities': all_vulnerabilities, 'vulnerability_count': len(all_vulnerabilities), 'language_results': results, } def _scan_python_dependencies(self, repo_path): """ Scan Python dependencies for known vulnerabilities using safety. Args: repo_path (str): The path to the repository. Returns: dict: Scan results for Python dependencies. """ logger.info(f"Scanning Python dependencies in {repo_path}") # Find requirements files requirements_files = [] for root, _, files in os.walk(repo_path): for file in files: if file == 'requirements.txt' or file == 'Pipfile' or file == 'Pipfile.lock' or file == 'setup.py': requirements_files.append(os.path.join(root, file)) if not requirements_files: return { 'status': 'no_dependencies', 'message': 'No Python dependency files found.', 'vulnerabilities': [], } vulnerabilities = [] for req_file in requirements_files: try: # Run safety check cmd = [ 'safety', 'check', '--file', req_file, '--json', ] process = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False, ) # Parse safety output if process.stdout.strip(): try: safety_results = json.loads(process.stdout) for vuln in safety_results.get('vulnerabilities', []): vulnerabilities.append({ 'package': vuln.get('package_name', ''), 'installed_version': vuln.get('installed_version', ''), 'affected_versions': vuln.get('vulnerable_spec', ''), 'description': vuln.get('advisory', ''), 'severity': vuln.get('severity', ''), 'file': req_file, 'language': 'Python', }) except json.JSONDecodeError: logger.error(f"Error parsing safety output: {process.stdout}") except Exception as e: logger.error(f"Error running safety on {req_file}: {e}") return { 'status': 'success', 'vulnerabilities': vulnerabilities, 'vulnerability_count': len(vulnerabilities), 'files_scanned': requirements_files, } def _scan_javascript_dependencies(self, repo_path): """ Scan JavaScript/TypeScript dependencies for known vulnerabilities using npm audit. Args: repo_path (str): The path to the repository. Returns: dict: Scan results for JavaScript dependencies. """ logger.info(f"Scanning JavaScript dependencies in {repo_path}") # Find package.json files package_files = [] for root, _, files in os.walk(repo_path): if 'package.json' in files: package_files.append(os.path.join(root, 'package.json')) if not package_files: return { 'status': 'no_dependencies', 'message': 'No JavaScript dependency files found.', 'vulnerabilities': [], } vulnerabilities = [] for pkg_file in package_files: pkg_dir = os.path.dirname(pkg_file) try: # Run npm audit cmd = [ 'npm', 'audit', '--json', ] process = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False, cwd=pkg_dir, # Run in the directory containing package.json ) # Parse npm audit output if process.stdout.strip(): try: audit_results = json.loads(process.stdout) # Extract vulnerabilities from npm audit results for vuln_id, vuln_info in audit_results.get('vulnerabilities', {}).items(): vulnerabilities.append({ 'package': vuln_info.get('name', ''), 'installed_version': vuln_info.get('version', ''), 'affected_versions': vuln_info.get('range', ''), 'description': vuln_info.get('overview', ''), 'severity': vuln_info.get('severity', ''), 'file': pkg_file, 'language': 'JavaScript', 'cwe': vuln_info.get('cwe', ''), 'recommendation': vuln_info.get('recommendation', ''), }) except json.JSONDecodeError: logger.error(f"Error parsing npm audit output: {process.stdout}") except Exception as e: logger.error(f"Error running npm audit on {pkg_file}: {e}") return { 'status': 'success', 'vulnerabilities': vulnerabilities, 'vulnerability_count': len(vulnerabilities), 'files_scanned': package_files, } def _scan_java_dependencies(self, repo_path): """ Scan Java dependencies for known vulnerabilities. Args: repo_path (str): The path to the repository. Returns: dict: Scan results for Java dependencies. """ logger.info(f"Scanning Java dependencies in {repo_path}") # Find pom.xml or build.gradle files dependency_files = [] for root, _, files in os.walk(repo_path): for file in files: if file == 'pom.xml' or file == 'build.gradle': dependency_files.append(os.path.join(root, file)) if not dependency_files: return { 'status': 'no_dependencies', 'message': 'No Java dependency files found.', 'vulnerabilities': [], } # For now, we'll just return a placeholder since we don't have a direct tool # In a real implementation, you might use OWASP Dependency Check or similar return { 'status': 'not_implemented', 'message': 'Java dependency scanning is not fully implemented yet.', 'vulnerabilities': [], 'files_scanned': dependency_files, } def _scan_go_dependencies(self, repo_path): """ Scan Go dependencies for known vulnerabilities using govulncheck. Args: repo_path (str): The path to the repository. Returns: dict: Scan results for Go dependencies. """ logger.info(f"Scanning Go dependencies in {repo_path}") # Check if go.mod exists go_mod_path = os.path.join(repo_path, 'go.mod') if not os.path.exists(go_mod_path): return { 'status': 'no_dependencies', 'message': 'No Go dependency files found.', 'vulnerabilities': [], } try: # Run govulncheck cmd = [ 'govulncheck', '-json', './...', ] process = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False, cwd=repo_path, # Run in the repository directory ) # Parse govulncheck output vulnerabilities = [] if process.stdout.strip(): for line in process.stdout.splitlines(): try: result = json.loads(line) if 'vulnerability' in result: vuln = result['vulnerability'] vulnerabilities.append({ 'package': vuln.get('package', ''), 'description': vuln.get('details', ''), 'severity': 'high', # govulncheck doesn't provide severity 'file': go_mod_path, 'language': 'Go', 'cve': vuln.get('osv', {}).get('id', ''), 'affected_versions': vuln.get('osv', {}).get('affected', ''), }) except json.JSONDecodeError: continue return { 'status': 'success', 'vulnerabilities': vulnerabilities, 'vulnerability_count': len(vulnerabilities), 'files_scanned': [go_mod_path], } except Exception as e: logger.error(f"Error running govulncheck: {e}") return { 'status': 'error', 'error': str(e), 'vulnerabilities': [], } def _scan_rust_dependencies(self, repo_path): """ Scan Rust dependencies for known vulnerabilities using cargo-audit. Args: repo_path (str): The path to the repository. Returns: dict: Scan results for Rust dependencies. """ logger.info(f"Scanning Rust dependencies in {repo_path}") # Check if Cargo.toml exists cargo_toml_path = os.path.join(repo_path, 'Cargo.toml') if not os.path.exists(cargo_toml_path): return { 'status': 'no_dependencies', 'message': 'No Rust dependency files found.', 'vulnerabilities': [], } try: # Run cargo-audit cmd = [ 'cargo', 'audit', '--json', ] process = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False, cwd=repo_path, # Run in the repository directory ) # Parse cargo-audit output vulnerabilities = [] if process.stdout.strip(): try: audit_results = json.loads(process.stdout) for vuln in audit_results.get('vulnerabilities', {}).get('list', []): vulnerabilities.append({ 'package': vuln.get('package', {}).get('name', ''), 'installed_version': vuln.get('package', {}).get('version', ''), 'description': vuln.get('advisory', {}).get('description', ''), 'severity': vuln.get('advisory', {}).get('severity', ''), 'file': cargo_toml_path, 'language': 'Rust', 'cve': vuln.get('advisory', {}).get('id', ''), }) except json.JSONDecodeError: logger.error(f"Error parsing cargo-audit output: {process.stdout}") return { 'status': 'success', 'vulnerabilities': vulnerabilities, 'vulnerability_count': len(vulnerabilities), 'files_scanned': [cargo_toml_path], } except Exception as e: logger.error(f"Error running cargo-audit: {e}") return { 'status': 'error', 'error': str(e), 'vulnerabilities': [], } def _scan_python(self, repo_path): """ Scan Python code for security vulnerabilities using bandit. Args: repo_path (str): The path to the repository. Returns: dict: Scan results for Python code. """ logger.info(f"Scanning Python code in {repo_path} for security vulnerabilities") # Find Python files python_files = [] for root, _, files in os.walk(repo_path): for file in files: if file.endswith('.py'): python_files.append(os.path.join(root, file)) if not python_files: return { 'status': 'no_files', 'message': 'No Python files found in the repository.', 'vulnerabilities': [], } try: # Run bandit cmd = [ 'bandit', '-r', '-f', 'json', repo_path, ] process = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False, ) # Parse bandit output vulnerabilities = [] if process.stdout.strip(): try: bandit_results = json.loads(process.stdout) for result in bandit_results.get('results', []): vulnerabilities.append({ 'file': result.get('filename', ''), 'line': result.get('line_number', 0), 'code': result.get('code', ''), 'issue': result.get('issue_text', ''), 'severity': result.get('issue_severity', ''), 'confidence': result.get('issue_confidence', ''), 'cwe': result.get('cwe', ''), 'test_id': result.get('test_id', ''), 'test_name': result.get('test_name', ''), 'language': 'Python', }) except json.JSONDecodeError: logger.error(f"Error parsing bandit output: {process.stdout}") # Group vulnerabilities by severity vulns_by_severity = defaultdict(list) for vuln in vulnerabilities: severity = vuln.get('severity', 'unknown') vulns_by_severity[severity].append(vuln) return { 'status': 'success', 'vulnerabilities': vulnerabilities, 'vulnerabilities_by_severity': dict(vulns_by_severity), 'vulnerability_count': len(vulnerabilities), 'files_scanned': len(python_files), } except Exception as e: logger.error(f"Error running bandit: {e}") return { 'status': 'error', 'error': str(e), 'vulnerabilities': [], } def _scan_javascript(self, repo_path): """ Scan JavaScript/TypeScript code for security vulnerabilities using NodeJSScan. Args: repo_path (str): The path to the repository. Returns: dict: Scan results for JavaScript/TypeScript code. """ logger.info(f"Scanning JavaScript/TypeScript code in {repo_path} for security vulnerabilities") # Find JavaScript/TypeScript files js_files = [] for root, _, files in os.walk(repo_path): if 'node_modules' in root: continue for file in files: if file.endswith(('.js', '.jsx', '.ts', '.tsx')): js_files.append(os.path.join(root, file)) if not js_files: return { 'status': 'no_files', 'message': 'No JavaScript/TypeScript files found in the repository.', 'vulnerabilities': [], } # For now, we'll use a simplified approach since NodeJSScan might not be available # In a real implementation, you might use NodeJSScan or similar # Create a temporary ESLint configuration file with security rules eslint_config = { "env": { "browser": True, "es2021": True, "node": True }, "extends": [ "eslint:recommended", "plugin:security/recommended" ], "plugins": [ "security" ], "parserOptions": { "ecmaVersion": 12, "sourceType": "module", "ecmaFeatures": { "jsx": True } }, "rules": {} } with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as temp_config: json.dump(eslint_config, temp_config) temp_config_path = temp_config.name try: # Run ESLint with security plugin cmd = [ 'npx', 'eslint', '--config', temp_config_path, '--format', 'json', '--plugin', 'security', ] + js_files process = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False, ) # Parse ESLint output vulnerabilities = [] if process.stdout.strip(): try: eslint_results = json.loads(process.stdout) for result in eslint_results: file_path = result.get('filePath', '') for message in result.get('messages', []): # Only include security-related issues rule_id = message.get('ruleId', '') if rule_id and ('security' in rule_id or 'no-eval' in rule_id or 'no-implied-eval' in rule_id): vulnerabilities.append({ 'file': file_path, 'line': message.get('line', 0), 'column': message.get('column', 0), 'issue': message.get('message', ''), 'severity': 'high' if message.get('severity', 0) == 2 else 'medium', 'rule': rule_id, 'language': 'JavaScript', }) except json.JSONDecodeError: logger.error(f"Error parsing ESLint output: {process.stdout}") # Group vulnerabilities by severity vulns_by_severity = defaultdict(list) for vuln in vulnerabilities: severity = vuln.get('severity', 'unknown') vulns_by_severity[severity].append(vuln) return { 'status': 'success', 'vulnerabilities': vulnerabilities, 'vulnerabilities_by_severity': dict(vulns_by_severity), 'vulnerability_count': len(vulnerabilities), 'files_scanned': len(js_files), } except Exception as e: logger.error(f"Error scanning JavaScript/TypeScript code: {e}") return { 'status': 'error', 'error': str(e), 'vulnerabilities': [], } finally: # Clean up the temporary configuration file if os.path.exists(temp_config_path): os.unlink(temp_config_path) def _scan_java(self, repo_path): """ Scan Java code for security vulnerabilities. Args: repo_path (str): The path to the repository. Returns: dict: Scan results for Java code. """ logger.info(f"Scanning Java code in {repo_path} for security vulnerabilities") # Find Java files java_files = [] for root, _, files in os.walk(repo_path): for file in files: if file.endswith('.java'): java_files.append(os.path.join(root, file)) if not java_files: return { 'status': 'no_files', 'message': 'No Java files found in the repository.', 'vulnerabilities': [], } # For now, we'll just return a placeholder since we don't have a direct tool # In a real implementation, you might use FindSecBugs or similar return { 'status': 'not_implemented', 'message': 'Java security scanning is not fully implemented yet.', 'vulnerabilities': [], 'files_scanned': java_files, } def _scan_go(self, repo_path): """ Scan Go code for security vulnerabilities using gosec. Args: repo_path (str): The path to the repository. Returns: dict: Scan results for Go code. """ logger.info(f"Scanning Go code in {repo_path} for security vulnerabilities") # Find Go files go_files = [] for root, _, files in os.walk(repo_path): for file in files: if file.endswith('.go'): go_files.append(os.path.join(root, file)) if not go_files: return { 'status': 'no_files', 'message': 'No Go files found in the repository.', 'vulnerabilities': [], } try: # Run gosec cmd = [ 'gosec', '-fmt', 'json', '-quiet', './...', ] process = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False, cwd=repo_path, # Run in the repository directory ) # Parse gosec output vulnerabilities = [] if process.stdout.strip(): try: gosec_results = json.loads(process.stdout) for issue in gosec_results.get('Issues', []): vulnerabilities.append({ 'file': issue.get('file', ''), 'line': issue.get('line', ''), 'code': issue.get('code', ''), 'issue': issue.get('details', ''), 'severity': issue.get('severity', ''), 'confidence': issue.get('confidence', ''), 'cwe': issue.get('cwe', {}).get('ID', ''), 'rule_id': issue.get('rule_id', ''), 'language': 'Go', }) except json.JSONDecodeError: logger.error(f"Error parsing gosec output: {process.stdout}") # Group vulnerabilities by severity vulns_by_severity = defaultdict(list) for vuln in vulnerabilities: severity = vuln.get('severity', 'unknown') vulns_by_severity[severity].append(vuln) return { 'status': 'success', 'vulnerabilities': vulnerabilities, 'vulnerabilities_by_severity': dict(vulns_by_severity), 'vulnerability_count': len(vulnerabilities), 'files_scanned': len(go_files), } except Exception as e: logger.error(f"Error running gosec: {e}") return { 'status': 'error', 'error': str(e), 'vulnerabilities': [], } def _scan_rust(self, repo_path): """ Scan Rust code for security vulnerabilities. Args: repo_path (str): The path to the repository. Returns: dict: Scan results for Rust code. """ logger.info(f"Scanning Rust code in {repo_path} for security vulnerabilities") # Find Rust files rust_files = [] for root, _, files in os.walk(repo_path): for file in files: if file.endswith('.rs'): rust_files.append(os.path.join(root, file)) if not rust_files: return { 'status': 'no_files', 'message': 'No Rust files found in the repository.', 'vulnerabilities': [], } # For now, we'll just return a placeholder since we don't have a direct tool # In a real implementation, you might use cargo-audit or similar for code scanning return { 'status': 'not_implemented', 'message': 'Rust security scanning is not fully implemented yet.', 'vulnerabilities': [], 'files_scanned': rust_files, }