Loading...
# AI-powered OWASP vulnerability scanner
import ast
import re
from typing import List, Dict, Any
from dataclasses import dataclass
@dataclass
class SecurityIssue:
severity: str # critical, high, medium, low
category: str # OWASP category
file: str
line: int
description: str
recommendation: str
cwe_id: str
class OWASPScanner:
def __init__(self):
self.issues: List[SecurityIssue] = []
self.patterns = self._load_vulnerability_patterns()
def scan_file(self, filepath: str, content: str) -> List[SecurityIssue]:
"""Scan file for OWASP Top 10 vulnerabilities"""
self.issues = []
# A01:2021 - Broken Access Control
self._check_access_control(filepath, content)
# A02:2021 - Cryptographic Failures
self._check_crypto_issues(filepath, content)
# A03:2021 - Injection
self._check_injection_flaws(filepath, content)
# A04:2021 - Insecure Design
self._check_insecure_design(filepath, content)
# A05:2021 - Security Misconfiguration
self._check_security_config(filepath, content)
# A06:2021 - Vulnerable Components
self._check_dependencies(filepath)
# A07:2021 - Authentication Failures
self._check_auth_issues(filepath, content)
# A08:2021 - Software and Data Integrity
self._check_integrity_issues(filepath, content)
# A09:2021 - Security Logging Failures
self._check_logging_issues(filepath, content)
# A10:2021 - Server-Side Request Forgery
self._check_ssrf(filepath, content)
return self.issues
def _check_injection_flaws(self, filepath: str, content: str):
"""Detect SQL injection, NoSQL injection, command injection"""
lines = content.split('\n')
# SQL injection patterns
sql_patterns = [
r'execute\(.*\+.*\)',
r'query\(.*f["\'].*{.*}.*["\']\)',
r'\.raw\(.*\+',
r'WHERE.*\+.*\+',
]
for line_num, line in enumerate(lines, 1):
for pattern in sql_patterns:
if re.search(pattern, line, re.IGNORECASE):
self.issues.append(SecurityIssue(
severity='critical',
category='A03:2021 - Injection',
file=filepath,
line=line_num,
description='Potential SQL injection vulnerability detected',
recommendation='Use parameterized queries or an ORM with prepared statements',
cwe_id='CWE-89'
))
# Command injection
cmd_patterns = [
r'os\.system\(',
r'subprocess\.call\(.*shell=True',
r'eval\(',
r'exec\(',
]
for line_num, line in enumerate(lines, 1):
for pattern in cmd_patterns:
if re.search(pattern, line):
self.issues.append(SecurityIssue(
severity='critical',
category='A03:2021 - Injection',
file=filepath,
line=line_num,
description='Command injection risk detected',
recommendation='Avoid shell execution with user input. Use subprocess with shell=False',
cwe_id='CWE-78'
))
def _check_crypto_issues(self, filepath: str, content: str):
"""Detect weak cryptography and plaintext secrets"""
lines = content.split('\n')
weak_crypto_patterns = [
(r'MD5\(', 'MD5 is cryptographically broken', 'CWE-328'),
(r'SHA1\(', 'SHA1 is deprecated', 'CWE-328'),
(r'DES', 'DES encryption is insecure', 'CWE-327'),
(r'ECB', 'ECB mode is insecure', 'CWE-327'),
]
for line_num, line in enumerate(lines, 1):
for pattern, desc, cwe in weak_crypto_patterns:
if re.search(pattern, line, re.IGNORECASE):
self.issues.append(SecurityIssue(
severity='high',
category='A02:2021 - Cryptographic Failures',
file=filepath,
line=line_num,
description=desc,
recommendation='Use SHA-256 or stronger. Use AES-GCM for encryption',
cwe_id=cwe
))
def _check_access_control(self, filepath: str, content: str):
"""Detect broken access control issues"""
if filepath.endswith('.py'):
try:
tree = ast.parse(content)
for node in ast.walk(tree):
# Check for missing authorization checks
if isinstance(node, ast.FunctionDef):
# Look for route handlers without auth decorators
if any(dec.id in ['route', 'get', 'post', 'put', 'delete']
for dec in node.decorator_list
if isinstance(dec, ast.Name)):
has_auth = any(
getattr(dec, 'id', None) in ['requires_auth', 'login_required', 'authenticated']
for dec in node.decorator_list
)
if not has_auth:
self.issues.append(SecurityIssue(
severity='high',
category='A01:2021 - Broken Access Control',
file=filepath,
line=node.lineno,
description=f'Endpoint {node.name} lacks authentication',
recommendation='Add authentication/authorization decorator',
cwe_id='CWE-284'
))
except SyntaxError:
pass
def _check_auth_issues(self, filepath: str, content: str):
"""Detect authentication and session management issues"""
lines = content.split('\n')
auth_patterns = [
(r'password.*=.*input', 'Password transmitted without hashing', 'CWE-319'),
(r'session\.cookie\.secure.*=.*False', 'Session cookie not secure', 'CWE-614'),
(r'JWT.*algorithm.*none', 'JWT with none algorithm', 'CWE-347'),
]
for line_num, line in enumerate(lines, 1):
for pattern, desc, cwe in auth_patterns:
if re.search(pattern, line, re.IGNORECASE):
self.issues.append(SecurityIssue(
severity='critical',
category='A07:2021 - Authentication Failures',
file=filepath,
line=line_num,
description=desc,
recommendation='Implement secure authentication practices',
cwe_id=cwe
))
def _check_ssrf(self, filepath: str, content: str):
"""Detect Server-Side Request Forgery vulnerabilities"""
lines = content.split('\n')
ssrf_patterns = [
r'requests\.get\(.*input.*\)',
r'fetch\(.*req\.query',
r'urllib\.request\.urlopen\(.*user',
]
for line_num, line in enumerate(lines, 1):
for pattern in ssrf_patterns:
if re.search(pattern, line):
self.issues.append(SecurityIssue(
severity='high',
category='A10:2021 - SSRF',
file=filepath,
line=line_num,
description='Potential SSRF vulnerability',
recommendation='Validate and whitelist URLs before making requests',
cwe_id='CWE-918'
))import re
import math
from typing import List, Tuple
class SecretsScanner:
def __init__(self):
self.entropy_threshold = 4.5
self.patterns = {
'aws_access_key': r'AKIA[0-9A-Z]{16}',
'aws_secret_key': r'aws_secret[\w\s]*[=:]\s*[\'"][0-9a-zA-Z/+]{40}[\'"]',
'github_token': r'gh[pousr]_[A-Za-z0-9_]{36,}',
'slack_token': r'xox[baprs]-[0-9]{10,12}-[0-9]{10,12}-[a-zA-Z0-9]{24,}',
'private_key': r'-----BEGIN (RSA|OPENSSH|DSA|EC) PRIVATE KEY-----',
'jwt': r'eyJ[A-Za-z0-9_-]*\.eyJ[A-Za-z0-9_-]*\.[A-Za-z0-9_-]*',
'stripe_key': r'sk_live_[0-9a-zA-Z]{24,}',
'google_api': r'AIza[0-9A-Za-z_-]{35}',
}
def scan_content(self, content: str, filepath: str) -> List[Dict]:
"""Scan content for secrets and high-entropy strings"""
findings = []
# Pattern-based detection
for secret_type, pattern in self.patterns.items():
matches = re.finditer(pattern, content)
for match in matches:
line_num = content[:match.start()].count('\n') + 1
findings.append({
'type': secret_type,
'severity': 'critical',
'file': filepath,
'line': line_num,
'matched': match.group()[:20] + '...', # Partial match
'description': f'Detected {secret_type} in plaintext',
'recommendation': 'Remove secret and use environment variables or secret manager'
})
# Entropy-based detection for unknown secrets
lines = content.split('\n')
for line_num, line in enumerate(lines, 1):
# Look for variable assignments
assignment_match = re.search(r'([\w_]+)\s*=\s*[\'"]([^\'"]{16,})[\'"]', line)
if assignment_match:
var_name = assignment_match.group(1).lower()
value = assignment_match.group(2)
# Check if variable name suggests a secret
secret_keywords = ['password', 'secret', 'key', 'token', 'api', 'auth']
if any(keyword in var_name for keyword in secret_keywords):
entropy = self._calculate_entropy(value)
if entropy > self.entropy_threshold:
findings.append({
'type': 'high_entropy_secret',
'severity': 'high',
'file': filepath,
'line': line_num,
'entropy': entropy,
'description': f'High-entropy value in {var_name} (entropy: {entropy:.2f})',
'recommendation': 'Use environment variables or a secret manager'
})
return findings
def _calculate_entropy(self, string: str) -> float:
"""Calculate Shannon entropy of a string"""
if not string:
return 0.0
entropy = 0.0
for char in set(string):
prob = string.count(char) / len(string)
entropy -= prob * math.log2(prob)
return entropyimport json
import subprocess
from typing import List, Dict
import requests
class DependencyScanner:
def __init__(self):
self.nvd_api_key = None # Optional NVD API key
self.severity_priority = {'critical': 4, 'high': 3, 'medium': 2, 'low': 1}
def scan_dependencies(self, package_file: str) -> Dict:
"""Scan dependencies for known vulnerabilities"""
results = {
'total_vulnerabilities': 0,
'by_severity': {'critical': 0, 'high': 0, 'medium': 0, 'low': 0},
'vulnerabilities': [],
'fixable': 0,
'auto_fix_available': []
}
if package_file.endswith('package.json'):
vulns = self._scan_npm()
elif package_file.endswith('requirements.txt'):
vulns = self._scan_python()
elif package_file.endswith('go.mod'):
vulns = self._scan_go()
else:
return results
for vuln in vulns:
results['total_vulnerabilities'] += 1
results['by_severity'][vuln['severity']] += 1
results['vulnerabilities'].append(vuln)
if vuln.get('fix_available'):
results['fixable'] += 1
results['auto_fix_available'].append(vuln)
# Sort by severity
results['vulnerabilities'].sort(
key=lambda x: self.severity_priority.get(x['severity'], 0),
reverse=True
)
return results
def _scan_npm(self) -> List[Dict]:
"""Scan npm dependencies"""
try:
result = subprocess.run(
['npm', 'audit', '--json'],
capture_output=True,
text=True
)
audit_data = json.loads(result.stdout)
vulnerabilities = []
for vuln_id, vuln_data in audit_data.get('vulnerabilities', {}).items():
vulnerabilities.append({
'package': vuln_id,
'severity': vuln_data['severity'],
'title': vuln_data.get('title', 'Unknown vulnerability'),
'cve': vuln_data.get('cves', []),
'affected_versions': vuln_data.get('range', 'unknown'),
'fix_available': vuln_data.get('fixAvailable', False),
'recommendation': self._generate_fix_recommendation(vuln_data)
})
return vulnerabilities
except Exception as e:
print(f'Error scanning npm: {e}')
return []
def _scan_python(self) -> List[Dict]:
"""Scan Python dependencies with safety or pip-audit"""
try:
result = subprocess.run(
['pip-audit', '--format', 'json'],
capture_output=True,
text=True
)
audit_data = json.loads(result.stdout)
vulnerabilities = []
for vuln in audit_data.get('vulnerabilities', []):
vulnerabilities.append({
'package': vuln['name'],
'severity': self._map_cvss_to_severity(vuln.get('cvss', 0)),
'title': vuln.get('description', 'Unknown'),
'cve': [vuln.get('id')],
'affected_versions': vuln.get('version', 'unknown'),
'fix_available': bool(vuln.get('fix_versions')),
'fix_versions': vuln.get('fix_versions', []),
'recommendation': f"Update to {vuln.get('fix_versions', ['latest'])[0]}"
})
return vulnerabilities
except Exception as e:
print(f'Error scanning Python: {e}')
return []
def _map_cvss_to_severity(self, cvss_score: float) -> str:
"""Map CVSS score to severity level"""
if cvss_score >= 9.0:
return 'critical'
elif cvss_score >= 7.0:
return 'high'
elif cvss_score >= 4.0:
return 'medium'
else:
return 'low'
def _generate_fix_recommendation(self, vuln_data: Dict) -> str:
"""Generate actionable fix recommendation"""
if vuln_data.get('fixAvailable'):
if isinstance(vuln_data['fixAvailable'], dict):
fix_version = vuln_data['fixAvailable'].get('version')
return f"Run 'npm update {vuln_data['name']}@{fix_version}'"
return f"Run 'npm audit fix' to automatically fix"
else:
return "No automatic fix available. Consider alternative package or manual patch"import torch
import transformers
from typing import List, Dict
class AISecurityAnalyzer:
def __init__(self, model_name='microsoft/codebert-base'):
self.tokenizer = transformers.AutoTokenizer.from_pretrained(model_name)
self.model = transformers.AutoModel.from_pretrained(model_name)
self.vulnerability_patterns = self._load_trained_patterns()
def analyze_code_snippet(self, code: str, language: str) -> Dict:
"""AI-powered security analysis of code snippet"""
# Tokenize code
inputs = self.tokenizer(
code,
return_tensors='pt',
max_length=512,
truncation=True,
padding=True
)
# Get embeddings
with torch.no_grad():
outputs = self.model(**inputs)
embeddings = outputs.last_hidden_state.mean(dim=1)
# Compare against known vulnerability patterns
vulnerabilities = []
for pattern_name, pattern_embedding in self.vulnerability_patterns.items():
similarity = torch.cosine_similarity(
embeddings,
pattern_embedding,
dim=1
).item()
if similarity > 0.85: # High similarity threshold
vulnerabilities.append({
'pattern': pattern_name,
'confidence': similarity,
'severity': self._get_pattern_severity(pattern_name),
'description': self._get_pattern_description(pattern_name)
})
return {
'code': code,
'language': language,
'vulnerabilities': sorted(
vulnerabilities,
key=lambda x: x['confidence'],
reverse=True
),
'safe': len(vulnerabilities) == 0
}
def _load_trained_patterns(self) -> Dict[str, torch.Tensor]:
"""Load pre-trained vulnerability pattern embeddings"""
# In production, load from trained model
return {}
def _get_pattern_severity(self, pattern: str) -> str:
severity_map = {
'sql_injection': 'critical',
'xss': 'high',
'path_traversal': 'high',
'insecure_deserialization': 'critical',
'xxe': 'high',
}
return severity_map.get(pattern, 'medium')
def _get_pattern_description(self, pattern: str) -> str:
descriptions = {
'sql_injection': 'SQL injection vulnerability detected',
'xss': 'Cross-site scripting (XSS) vulnerability',
'path_traversal': 'Path traversal vulnerability',
}
return descriptions.get(pattern, 'Security issue detected')from typing import List
class SecurityTestGenerator:
def generate_tests(self, endpoint: str, method: str, params: List[str]) -> str:
"""Generate security tests for API endpoint"""
tests = []
# SQL Injection tests
tests.append(self._generate_sql_injection_tests(endpoint, method, params))
# XSS tests
tests.append(self._generate_xss_tests(endpoint, method, params))
# Authentication tests
tests.append(self._generate_auth_tests(endpoint, method))
# Rate limiting tests
tests.append(self._generate_rate_limit_tests(endpoint, method))
return '\n\n'.join(tests)
def _generate_sql_injection_tests(self, endpoint: str, method: str, params: List[str]) -> str:
return f'''"""SQL Injection Security Tests for {endpoint}"""
import pytest
from app.test_utils import client
class TestSQLInjection:
@pytest.mark.parametrize("payload", [
"' OR '1'='1",
"1; DROP TABLE users--",
"' UNION SELECT * FROM users--",
"admin'--",
])
def test_sql_injection_prevention(self, payload):
"""Verify SQL injection payloads are rejected"""
response = client.{method.lower()}(
"{endpoint}",
json={{"{params[0] if params else 'input'}": payload}}
)
# Should either reject or safely escape
assert response.status_code in [400, 422], "SQL injection payload not rejected"
assert "error" in response.json().get("message", "").lower()
'''
def _generate_xss_tests(self, endpoint: str, method: str, params: List[str]) -> str:
return f'''class TestXSSPrevention:
@pytest.mark.parametrize("payload", [
"<script>alert('XSS')</script>",
"<img src=x onerror=alert('XSS')>",
"javascript:alert('XSS')",
])
def test_xss_prevention(self, payload):
"""Verify XSS payloads are sanitized"""
response = client.{method.lower()}(
"{endpoint}",
json={{"{params[0] if params else 'content'}": payload}}
)
if response.status_code == 200:
# If accepted, verify it's escaped in response
assert "<script>" not in response.text
assert "onerror=" not in response.text
'''
def _generate_auth_tests(self, endpoint: str, method: str) -> str:
return f'''class TestAuthentication:
def test_requires_authentication(self):
"""Verify endpoint requires authentication"""
response = client.{method.lower()}("{endpoint}")
assert response.status_code == 401, "Endpoint accessible without auth"
def test_invalid_token_rejected(self):
"""Verify invalid tokens are rejected"""
headers = {{"Authorization": "Bearer invalid_token"}}
response = client.{method.lower()}("{endpoint}", headers=headers)
assert response.status_code == 401
def test_expired_token_rejected(self):
"""Verify expired tokens are rejected"""
expired_token = generate_expired_token()
headers = {{"Authorization": f"Bearer {{expired_token}}"}}
response = client.{method.lower()}("{endpoint}", headers=headers)
assert response.status_code == 401
'''name: AI Security Review
on:
pull_request:
types: [opened, synchronize]
jobs:
security-scan:
runs-on: ubuntu-latest
permissions:
pull-requests: write
contents: read
steps:
- uses: actions/checkout@v3
with:
fetch-depth: 0
- name: Get Changed Files
id: changed-files
uses: tj-actions/changed-files@v40
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install Security Tools
run: |
pip install bandit semgrep safety pip-audit
npm install -g @microsoft/rush
- name: Run OWASP Scanner
run: |
python scripts/owasp_scanner.py \
--files "${{ steps.changed-files.outputs.all_changed_files }}" \
--output owasp-report.json
- name: Run Secrets Scanner
run: |
python scripts/secrets_scanner.py \
--files "${{ steps.changed-files.outputs.all_changed_files }}" \
--output secrets-report.json
- name: Dependency Vulnerability Scan
run: |
pip-audit --format json --output pip-audit.json || true
npm audit --json > npm-audit.json || true
- name: Run Semgrep
run: |
semgrep --config=auto --json --output semgrep-report.json .
- name: AI Security Analysis
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
run: |
python scripts/ai_security_analyzer.py \
--changed-files "${{ steps.changed-files.outputs.all_changed_files }}" \
--output ai-analysis.json
- name: Generate Security Report
run: |
python scripts/generate_security_report.py \
--owasp owasp-report.json \
--secrets secrets-report.json \
--dependencies pip-audit.json,npm-audit.json \
--semgrep semgrep-report.json \
--ai ai-analysis.json \
--output final-report.md
- name: Comment PR
uses: actions/github-script@v6
with:
script: |
const fs = require('fs');
const report = fs.readFileSync('final-report.md', 'utf8');
github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: report
});
- name: Fail on Critical Issues
run: |
python scripts/check_security_threshold.py \
--report final-report.md \
--max-critical 0 \
--max-high 5{
"maxTokens": 4000,
"temperature": 0.2,
"systemPrompt": "You are an AI-powered code review security agent focused on vulnerability detection and security best practices"
}Semgrep OWASP rules generating 86% false positive rate
Use --exclude for test files. Configure .semgrepignore for generated code. Add custom rules for app logic. Run: semgrep --config=auto --exclude='tests/**' --json > report.json
Bandit scanner missing SQL injection in f-string queries
Bandit doesn't track data flow. Add manual review for database queries. Use semgrep with taint tracking rules. Run: semgrep --config=p/security-audit --config=p/sql-injection for better detection.
High-entropy secrets scanner flagging legitimate constants as API keys
Add allowlist for known constants. Set entropy threshold >4.5. Use pattern matching for known secret formats. Configure .secretsignore file. Verify findings manually or use TruffleHog verify mode.
npm audit reporting unfixable vulnerabilities in transitive dependencies
Run: npm audit fix --force for breaking changes. Use npm override in package.json to patch versions. Consider alternative packages. Document risk acceptance for low-severity unfixable issues.
GitHub Actions security scan timing out on large monorepos
Scan only changed files with tj-actions/changed-files. Use matrix strategy to parallelize scans. Set timeout-minutes: 30. Cache dependencies. Run: semgrep --config=auto $changed_files for speed.
Loading reviews...
Expert code reviewer that provides thorough, constructive feedback on code quality, security, performance, and best practices
Deploy AI-powered DevOps automation with predictive analytics, self-healing systems, and intelligent CI/CD optimization for modern infrastructure.
Full-stack AI development specialist bridging frontend, backend, and AI/ML with AI-assisted coding workflows, intelligent code generation, and end-to-end type safety
Growing community of AI engineers actively building with Claude
Live in 5 minutes • Growing community