Spaces:
Sleeping
Sleeping
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
""" | |
Unit tests for the Security Scanner service | |
""" | |
import unittest | |
from unittest.mock import patch, MagicMock, mock_open | |
import os | |
import sys | |
import json | |
from pathlib import Path | |
# Add the project root directory to the Python path | |
project_root = Path(__file__).resolve().parent.parent | |
sys.path.insert(0, str(project_root)) | |
from src.services.security_scanner import SecurityScanner | |
class TestSecurityScanner(unittest.TestCase): | |
"""Test cases for the SecurityScanner class""" | |
def setUp(self): | |
"""Set up test fixtures""" | |
self.scanner = SecurityScanner() | |
self.test_repo_path = "/test/repo" | |
def test_scan_python_dependencies(self, mock_run, mock_exists): | |
"""Test scan_python_dependencies method""" | |
# Set up the mocks | |
mock_exists.return_value = True | |
# Mock the requirements.txt file | |
with patch('builtins.open', mock_open(read_data="requests==2.25.1\ndjango==2.2.0\n")): | |
# Mock the subprocess.run result | |
mock_process = MagicMock() | |
mock_process.returncode = 0 | |
mock_process.stdout = json.dumps({ | |
"vulnerabilities": [ | |
{ | |
"package_name": "django", | |
"vulnerable_spec": "<2.2.28", | |
"installed_version": "2.2.0", | |
"description": "Django before 2.2.28 has a potential directory traversal via ../ in the file name.", | |
"id": "CVE-2022-34265", | |
"cvss_v3_score": "7.5" | |
} | |
] | |
}) | |
mock_run.return_value = mock_process | |
# Call the method | |
result = self.scanner.scan_python_dependencies(self.test_repo_path) | |
# Verify the result | |
self.assertEqual(len(result['vulnerabilities']), 1) | |
self.assertEqual(result['vulnerability_count'], 1) | |
self.assertEqual(result['vulnerabilities'][0]['package'], 'django') | |
self.assertEqual(result['vulnerabilities'][0]['installed_version'], '2.2.0') | |
self.assertEqual(result['vulnerabilities'][0]['vulnerability_id'], 'CVE-2022-34265') | |
self.assertEqual(result['vulnerabilities'][0]['severity'], 'high') # 7.5 maps to high | |
def test_scan_javascript_dependencies(self, mock_run, mock_exists): | |
"""Test scan_javascript_dependencies method""" | |
# Set up the mocks | |
mock_exists.return_value = True | |
# Mock the subprocess.run result | |
mock_process = MagicMock() | |
mock_process.returncode = 0 | |
mock_process.stdout = json.dumps({ | |
"vulnerabilities": { | |
"lodash": [ | |
{ | |
"name": "lodash", | |
"severity": "high", | |
"via": [ | |
{ | |
"source": 1065, | |
"name": "lodash", | |
"dependency": "lodash", | |
"title": "Prototype Pollution", | |
"url": "https://npmjs.com/advisories/1065", | |
"severity": "high", | |
"range": "<4.17.12" | |
} | |
], | |
"effects": [], | |
"range": "<4.17.12", | |
"nodes": ["node_modules/lodash"], | |
"fixAvailable": true | |
} | |
] | |
} | |
}) | |
mock_run.return_value = mock_process | |
# Call the method | |
result = self.scanner.scan_javascript_dependencies(self.test_repo_path) | |
# Verify the result | |
self.assertEqual(len(result['vulnerabilities']), 1) | |
self.assertEqual(result['vulnerability_count'], 1) | |
self.assertEqual(result['vulnerabilities'][0]['package'], 'lodash') | |
self.assertEqual(result['vulnerabilities'][0]['severity'], 'high') | |
self.assertEqual(result['vulnerabilities'][0]['title'], 'Prototype Pollution') | |
def test_scan_go_dependencies(self, mock_run, mock_exists): | |
"""Test scan_go_dependencies method""" | |
# Set up the mocks | |
mock_exists.return_value = True | |
# Mock the subprocess.run result | |
mock_process = MagicMock() | |
mock_process.returncode = 0 | |
mock_process.stdout = json.dumps({ | |
"Vulns": [ | |
{ | |
"ID": "GO-2020-0015", | |
"Details": "Improper certificate validation in crypto/x509", | |
"Affected": [ | |
{ | |
"Module": { | |
"Path": "golang.org/x/crypto", | |
"Versions": [ | |
{ | |
"Fixed": "v0.0.0-20200221170555-0f29369cfe45" | |
} | |
] | |
}, | |
"Packages": [ | |
{ | |
"Path": "golang.org/x/crypto/cryptobyte", | |
"Symbols": ["String.ReadASN1"] | |
} | |
] | |
} | |
], | |
"References": [ | |
{ | |
"Type": "FIX", | |
"URL": "https://go.dev/cl/219877" | |
}, | |
{ | |
"Type": "REPORT", | |
"URL": "https://go.dev/issue/36837" | |
}, | |
{ | |
"Type": "WEB", | |
"URL": "https://nvd.nist.gov/vuln/detail/CVE-2020-7919" | |
} | |
], | |
"Description": "Due to improper bounds checking, maliciously crafted X.509 certificates can cause a panic in certificate verification.", | |
"CVEs": ["CVE-2020-7919"], | |
"Severity": "MODERATE" | |
} | |
] | |
}) | |
mock_run.return_value = mock_process | |
# Call the method | |
result = self.scanner.scan_go_dependencies(self.test_repo_path) | |
# Verify the result | |
self.assertEqual(len(result['vulnerabilities']), 1) | |
self.assertEqual(result['vulnerability_count'], 1) | |
self.assertEqual(result['vulnerabilities'][0]['package'], 'golang.org/x/crypto') | |
self.assertEqual(result['vulnerabilities'][0]['vulnerability_id'], 'GO-2020-0015') | |
self.assertEqual(result['vulnerabilities'][0]['severity'], 'medium') # MODERATE maps to medium | |
def test_scan_rust_dependencies(self, mock_run, mock_exists): | |
"""Test scan_rust_dependencies method""" | |
# Set up the mocks | |
mock_exists.return_value = True | |
# Mock the subprocess.run result | |
mock_process = MagicMock() | |
mock_process.returncode = 0 | |
mock_process.stdout = json.dumps({ | |
"vulnerabilities": { | |
"RUSTSEC-2020-0071": { | |
"advisory": { | |
"id": "RUSTSEC-2020-0071", | |
"package": "smallvec", | |
"title": "Buffer overflow in SmallVec::insert_many", | |
"description": "Affected versions of smallvec did not properly calculate capacity when inserting multiple elements, which could result in a buffer overflow.", | |
"date": "2020-12-02", | |
"aliases": ["CVE-2021-25900"], | |
"categories": ["memory-corruption"], | |
"keywords": ["buffer-overflow", "heap-overflow"], | |
"cvss": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H", | |
"related": [] | |
}, | |
"versions": { | |
"patched": [">=1.6.1"], | |
"unaffected": ["<1.0.0"] | |
}, | |
"affected": { | |
"arch": [], | |
"os": [], | |
"functions": ["smallvec::SmallVec::insert_many"] | |
} | |
} | |
}, | |
"warnings": [] | |
}) | |
mock_run.return_value = mock_process | |
# Call the method | |
result = self.scanner.scan_rust_dependencies(self.test_repo_path) | |
# Verify the result | |
self.assertEqual(len(result['vulnerabilities']), 1) | |
self.assertEqual(result['vulnerability_count'], 1) | |
self.assertEqual(result['vulnerabilities'][0]['package'], 'smallvec') | |
self.assertEqual(result['vulnerabilities'][0]['vulnerability_id'], 'RUSTSEC-2020-0071') | |
self.assertEqual(result['vulnerabilities'][0]['title'], 'Buffer overflow in SmallVec::insert_many') | |
self.assertEqual(result['vulnerabilities'][0]['severity'], 'critical') # CVSS 9.8 maps to critical | |
def test_scan_python_code(self, mock_run, mock_exists): | |
"""Test scan_python_code method""" | |
# Set up the mocks | |
mock_exists.return_value = True | |
# Mock the subprocess.run result | |
mock_process = MagicMock() | |
mock_process.returncode = 0 | |
mock_process.stdout = json.dumps({ | |
"results": [ | |
{ | |
"filename": "test.py", | |
"line_number": 42, | |
"issue_severity": "HIGH", | |
"issue_confidence": "HIGH", | |
"issue_text": "Possible hardcoded password: 'super_secret'", | |
"test_id": "B105", | |
"test_name": "hardcoded_password_string" | |
} | |
] | |
}) | |
mock_run.return_value = mock_process | |
# Mock the file discovery | |
with patch.object(self.scanner, '_find_files', return_value=['/test/repo/test.py']): | |
# Call the method | |
result = self.scanner.scan_python_code(self.test_repo_path) | |
# Verify the result | |
self.assertEqual(len(result['vulnerabilities']), 1) | |
self.assertEqual(result['vulnerability_count'], 1) | |
self.assertEqual(result['vulnerabilities'][0]['file'], 'test.py') | |
self.assertEqual(result['vulnerabilities'][0]['line'], 42) | |
self.assertEqual(result['vulnerabilities'][0]['severity'], 'high') | |
self.assertEqual(result['vulnerabilities'][0]['message'], "Possible hardcoded password: 'super_secret'") | |
def test_scan_javascript_code(self, mock_run, mock_exists): | |
"""Test scan_javascript_code method""" | |
# Set up the mocks | |
mock_exists.return_value = True | |
# Mock the subprocess.run result | |
mock_process = MagicMock() | |
mock_process.returncode = 0 | |
mock_process.stdout = json.dumps([ | |
{ | |
"filePath": "/test/repo/test.js", | |
"messages": [ | |
{ | |
"ruleId": "security/detect-eval-with-expression", | |
"severity": 2, | |
"message": "eval() with variable content can allow an attacker to run arbitrary code.", | |
"line": 10, | |
"column": 1, | |
"nodeType": "CallExpression" | |
} | |
], | |
"errorCount": 1, | |
"warningCount": 0, | |
"fixableErrorCount": 0, | |
"fixableWarningCount": 0 | |
} | |
]) | |
mock_run.return_value = mock_process | |
# Mock the file discovery | |
with patch.object(self.scanner, '_find_files', return_value=['/test/repo/test.js']): | |
# Call the method | |
result = self.scanner.scan_javascript_code(self.test_repo_path) | |
# Verify the result | |
self.assertEqual(len(result['vulnerabilities']), 1) | |
self.assertEqual(result['vulnerability_count'], 1) | |
self.assertEqual(result['vulnerabilities'][0]['file'], 'test.js') | |
self.assertEqual(result['vulnerabilities'][0]['line'], 10) | |
self.assertEqual(result['vulnerabilities'][0]['severity'], 'high') # Severity 2 maps to high | |
self.assertEqual(result['vulnerabilities'][0]['message'], "eval() with variable content can allow an attacker to run arbitrary code.") | |
def test_scan_repository(self): | |
"""Test scan_repository method""" | |
# Mock the language-specific scanning methods | |
self.scanner.scan_python_dependencies = MagicMock(return_value={ | |
'vulnerabilities': [{'package': 'django', 'vulnerability_id': 'CVE-2022-34265', 'severity': 'high'}], | |
'vulnerability_count': 1 | |
}) | |
self.scanner.scan_python_code = MagicMock(return_value={ | |
'vulnerabilities': [{'file': 'test.py', 'line': 42, 'severity': 'high'}], | |
'vulnerability_count': 1 | |
}) | |
self.scanner.scan_javascript_dependencies = MagicMock(return_value={ | |
'vulnerabilities': [{'package': 'lodash', 'severity': 'high'}], | |
'vulnerability_count': 1 | |
}) | |
self.scanner.scan_javascript_code = MagicMock(return_value={ | |
'vulnerabilities': [{'file': 'test.js', 'line': 10, 'severity': 'high'}], | |
'vulnerability_count': 1 | |
}) | |
# Call the method | |
result = self.scanner.scan_repository(self.test_repo_path, ['Python', 'JavaScript']) | |
# Verify the result | |
self.assertEqual(len(result), 2) # Two languages | |
self.assertIn('Python', result) | |
self.assertIn('JavaScript', result) | |
# Check Python results | |
self.assertEqual(result['Python']['dependency_vulnerabilities']['vulnerability_count'], 1) | |
self.assertEqual(result['Python']['code_vulnerabilities']['vulnerability_count'], 1) | |
self.assertEqual(result['Python']['total_vulnerabilities'], 2) | |
# Check JavaScript results | |
self.assertEqual(result['JavaScript']['dependency_vulnerabilities']['vulnerability_count'], 1) | |
self.assertEqual(result['JavaScript']['code_vulnerabilities']['vulnerability_count'], 1) | |
self.assertEqual(result['JavaScript']['total_vulnerabilities'], 2) | |
# Verify the method calls | |
self.scanner.scan_python_dependencies.assert_called_once_with(self.test_repo_path) | |
self.scanner.scan_python_code.assert_called_once_with(self.test_repo_path) | |
self.scanner.scan_javascript_dependencies.assert_called_once_with(self.test_repo_path) | |
self.scanner.scan_javascript_code.assert_called_once_with(self.test_repo_path) | |
def test_find_files(self, mock_walk): | |
"""Test _find_files method""" | |
# Set up the mock | |
mock_walk.return_value = [ | |
('/test/repo', ['dir1'], ['file1.py', 'file2.js']), | |
('/test/repo/dir1', [], ['file3.py']) | |
] | |
# Call the method | |
python_files = self.scanner._find_files(self.test_repo_path, '.py') | |
# Verify the result | |
self.assertEqual(len(python_files), 2) | |
self.assertIn('/test/repo/file1.py', python_files) | |
self.assertIn('/test/repo/dir1/file3.py', python_files) | |
def test_check_tool_availability(self, mock_exists): | |
"""Test _check_tool_availability method""" | |
# Set up the mock | |
mock_exists.side_effect = [True, False] # First tool exists, second doesn't | |
# Call the method | |
result1 = self.scanner._check_tool_availability('tool1') | |
result2 = self.scanner._check_tool_availability('tool2') | |
# Verify the result | |
self.assertTrue(result1) | |
self.assertFalse(result2) | |
def test_run_command(self, mock_run): | |
"""Test _run_command method""" | |
# Set up the mock | |
mock_process = MagicMock() | |
mock_process.returncode = 0 | |
mock_process.stdout = "Test output" | |
mock_run.return_value = mock_process | |
# Call the method | |
returncode, output = self.scanner._run_command(['test', 'command']) | |
# Verify the result | |
self.assertEqual(returncode, 0) | |
self.assertEqual(output, "Test output") | |
mock_run.assert_called_once() | |
def test_map_cvss_to_severity(self): | |
"""Test _map_cvss_to_severity method""" | |
# Call the method with different CVSS scores | |
low = self.scanner._map_cvss_to_severity(3.5) | |
medium = self.scanner._map_cvss_to_severity(5.5) | |
high = self.scanner._map_cvss_to_severity(8.0) | |
critical = self.scanner._map_cvss_to_severity(9.5) | |
# Verify the results | |
self.assertEqual(low, 'low') | |
self.assertEqual(medium, 'medium') | |
self.assertEqual(high, 'high') | |
self.assertEqual(critical, 'critical') | |
if __name__ == "__main__": | |
unittest.main() |