Loading...
# AI-powered anomaly detection for system metrics
import numpy as np
from sklearn.ensemble import IsolationForest
import pandas as pd
class PredictiveMonitoring:
def __init__(self):
self.model = IsolationForest(
contamination=0.1,
random_state=42
)
self.baseline_data = []
def train_baseline(self, historical_metrics):
"""Train on normal operating conditions"""
df = pd.DataFrame(historical_metrics)
features = df[['cpu_usage', 'memory_usage', 'response_time', 'error_rate']]
self.model.fit(features)
self.baseline_data = features.describe()
def detect_anomalies(self, current_metrics):
"""Detect anomalous behavior in real-time"""
df = pd.DataFrame([current_metrics])
features = df[['cpu_usage', 'memory_usage', 'response_time', 'error_rate']]
prediction = self.model.predict(features)
anomaly_score = self.model.score_samples(features)
if prediction[0] == -1: # Anomaly detected
return {
'is_anomaly': True,
'severity': self._calculate_severity(anomaly_score[0]),
'affected_metrics': self._identify_affected_metrics(current_metrics),
'recommended_action': self._recommend_action(current_metrics)
}
return {'is_anomaly': False}
def _calculate_severity(self, score):
if score < -0.5:
return 'critical'
elif score < -0.3:
return 'high'
elif score < -0.1:
return 'medium'
return 'low'
def _identify_affected_metrics(self, metrics):
affected = []
for metric, value in metrics.items():
baseline_mean = self.baseline_data[metric]['mean']
baseline_std = self.baseline_data[metric]['std']
if abs(value - baseline_mean) > 2 * baseline_std:
affected.append(metric)
return affected
def _recommend_action(self, metrics):
if metrics['error_rate'] > 5:
return 'rollback_deployment'
elif metrics['cpu_usage'] > 90:
return 'scale_up'
elif metrics['memory_usage'] > 85:
return 'restart_services'
elif metrics['response_time'] > 1000:
return 'investigate_database'
return 'monitor_closely'# Self-healing system with automated remediation
import boto3
import requests
from typing import Dict, List
class SelfHealingSystem:
def __init__(self):
self.ec2 = boto3.client('ec2')
self.ecs = boto3.client('ecs')
self.remediation_history = []
def handle_incident(self, incident: Dict):
"""Automatically respond to detected incidents"""
incident_type = incident['type']
severity = incident['severity']
# Log incident
self._log_incident(incident)
# Determine remediation strategy
remediation = self._select_remediation(incident_type, severity)
# Execute remediation
result = self._execute_remediation(remediation, incident)
# Verify remediation
if self._verify_remediation(incident):
self._send_notification(
f"Successfully remediated {incident_type}",
severity='info'
)
else:
self._escalate_to_human(incident, result)
return result
def _select_remediation(self, incident_type, severity):
strategies = {
'high_cpu': [
'scale_horizontal',
'restart_high_cpu_processes',
'enable_cpu_throttling'
],
'high_memory': [
'clear_caches',
'restart_services',
'scale_vertical'
],
'high_error_rate': [
'rollback_deployment',
'restart_services',
'switch_to_backup'
],
'service_down': [
'restart_service',
'failover_to_backup',
'restore_from_snapshot'
]
}
return strategies.get(incident_type, ['manual_intervention'])
def _execute_remediation(self, strategies: List[str], incident: Dict):
for strategy in strategies:
try:
if strategy == 'scale_horizontal':
return self._scale_services(incident['service_id'], direction='out')
elif strategy == 'restart_services':
return self._restart_services(incident['service_id'])
elif strategy == 'rollback_deployment':
return self._rollback_deployment(incident['deployment_id'])
elif strategy == 'clear_caches':
return self._clear_caches(incident['service_id'])
except Exception as e:
continue # Try next strategy
return {'success': False, 'message': 'All strategies failed'}
def _scale_services(self, service_id, direction='out'):
response = self.ecs.update_service(
cluster='production',
service=service_id,
desiredCount=self._calculate_desired_count(service_id, direction)
)
return {'success': True, 'action': 'scaled', 'response': response}
def _restart_services(self, service_id):
self.ecs.update_service(
cluster='production',
service=service_id,
forceNewDeployment=True
)
return {'success': True, 'action': 'restarted'}
def _rollback_deployment(self, deployment_id):
# Rollback to previous stable version
previous_version = self._get_previous_stable_version(deployment_id)
self._deploy_version(previous_version)
return {'success': True, 'action': 'rolled_back'}# .github/workflows/ai-optimized-deploy.yml
name: AI-Optimized Deployment
on:
push:
branches: [main]
jobs:
analyze-changes:
runs-on: ubuntu-latest
outputs:
affected-services: ${{ steps.analyze.outputs.services }}
deployment-strategy: ${{ steps.analyze.outputs.strategy }}
steps:
- uses: actions/checkout@v3
with:
fetch-depth: 0
- name: AI-Powered Change Analysis
id: analyze
run: |
python scripts/ai_analyze_changes.py \
--base-ref ${{ github.event.before }} \
--head-ref ${{ github.sha }} \
--output-format github
- name: Predict Deployment Risk
run: |
python scripts/predict_deployment_risk.py \
--changes "${{ steps.analyze.outputs.services }}" \
--historical-data deployment_history.json
intelligent-testing:
needs: analyze-changes
runs-on: ubuntu-latest
steps:
- name: Run Prioritized Tests
run: |
# AI selects most relevant tests based on changes
python scripts/ai_test_selection.py \
--affected-files "${{ needs.analyze-changes.outputs.affected-services }}" \
--run-tests
- name: Predictive Test Analysis
if: failure()
run: |
python scripts/analyze_test_failures.py \
--suggest-fixes
deploy:
needs: [analyze-changes, intelligent-testing]
runs-on: ubuntu-latest
strategy:
matrix:
service: ${{ fromJson(needs.analyze-changes.outputs.affected-services) }}
steps:
- name: Deploy with AI-Selected Strategy
run: |
STRATEGY="${{ needs.analyze-changes.outputs.deployment-strategy }}"
if [ "$STRATEGY" == "canary" ]; then
kubectl apply -f k8s/canary-deployment.yaml
python scripts/monitor_canary.py --duration 10m
elif [ "$STRATEGY" == "blue-green" ]; then
kubectl apply -f k8s/green-deployment.yaml
python scripts/switch_traffic.py --validate
else
kubectl apply -f k8s/rolling-deployment.yaml
fi
- name: AI-Powered Health Check
run: |
python scripts/ai_health_check.py \
--service ${{ matrix.service }} \
--auto-rollback-on-failure# AI-driven resource optimization
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
class ResourceOptimizer:
def __init__(self):
self.scaler = StandardScaler()
self.usage_patterns = {}
def analyze_usage_patterns(self, historical_data):
"""Identify usage patterns and recommend optimizations"""
df = pd.DataFrame(historical_data)
# Extract temporal features
df['hour'] = pd.to_datetime(df['timestamp']).dt.hour
df['day_of_week'] = pd.to_datetime(df['timestamp']).dt.dayofweek
# Cluster similar usage patterns
features = df[['cpu_usage', 'memory_usage', 'requests_per_sec', 'hour', 'day_of_week']]
scaled_features = self.scaler.fit_transform(features)
kmeans = KMeans(n_clusters=4, random_state=42)
df['cluster'] = kmeans.fit_predict(scaled_features)
# Analyze each cluster
for cluster_id in range(4):
cluster_data = df[df['cluster'] == cluster_id]
self.usage_patterns[cluster_id] = {
'avg_cpu': cluster_data['cpu_usage'].mean(),
'avg_memory': cluster_data['memory_usage'].mean(),
'peak_hours': self._identify_peak_hours(cluster_data),
'recommendation': self._generate_recommendation(cluster_data)
}
return self.usage_patterns
def _identify_peak_hours(self, data):
hourly_avg = data.groupby('hour')['requests_per_sec'].mean()
peak_threshold = hourly_avg.mean() + hourly_avg.std()
return hourly_avg[hourly_avg > peak_threshold].index.tolist()
def _generate_recommendation(self, data):
avg_cpu = data['cpu_usage'].mean()
avg_memory = data['memory_usage'].mean()
recommendations = []
if avg_cpu < 30:
recommendations.append('Consider downsizing instance type')
elif avg_cpu > 70:
recommendations.append('Consider upsizing or horizontal scaling')
if avg_memory < 40:
recommendations.append('Reduce memory allocation')
elif avg_memory > 80:
recommendations.append('Increase memory allocation')
return recommendations
def get_autoscaling_schedule(self, service_id):
"""Generate intelligent autoscaling schedule"""
pattern = self.usage_patterns.get(service_id, {})
peak_hours = pattern.get('peak_hours', [])
schedule = {
'scale_up': [
{
'time': f"{hour-1}:00",
'target_count': self._calculate_target_count('high')
}
for hour in peak_hours
],
'scale_down': [
{
'time': f"{hour+2}:00",
'target_count': self._calculate_target_count('low')
}
for hour in peak_hours
]
}
return schedule# AI-powered security scanner
from typing import List, Dict
import subprocess
import json
class AISecurityScanner:
def __init__(self):
self.vulnerability_db = self._load_vulnerability_db()
self.risk_model = self._train_risk_model()
def scan_infrastructure(self) -> Dict:
"""Comprehensive security scan with AI prioritization"""
results = {
'container_vulnerabilities': self._scan_containers(),
'iac_security': self._scan_terraform(),
'secrets_detection': self._scan_secrets(),
'compliance_checks': self._check_compliance()
}
# AI-driven prioritization
prioritized = self._prioritize_findings(results)
# Auto-remediate low-risk issues
self._auto_remediate(prioritized['auto_fix'])
# Alert on high-risk issues
self._alert_security_team(prioritized['critical'])
return prioritized
def _scan_containers(self) -> List[Dict]:
"""Scan container images for vulnerabilities"""
result = subprocess.run(
['trivy', 'image', '--format', 'json', '--severity', 'HIGH,CRITICAL', 'myapp:latest'],
capture_output=True,
text=True
)
vulnerabilities = json.loads(result.stdout)
return self._enrich_vulnerabilities(vulnerabilities)
def _scan_terraform(self) -> List[Dict]:
"""Scan Infrastructure as Code"""
result = subprocess.run(
['tfsec', '.', '--format', 'json'],
capture_output=True,
text=True
)
return json.loads(result.stdout)
def _prioritize_findings(self, results: Dict) -> Dict:
"""Use AI to prioritize security findings"""
all_findings = []
for category, findings in results.items():
for finding in findings:
risk_score = self._calculate_risk_score(finding)
finding['risk_score'] = risk_score
finding['category'] = category
all_findings.append(finding)
# Sort by risk score
sorted_findings = sorted(all_findings, key=lambda x: x['risk_score'], reverse=True)
return {
'critical': [f for f in sorted_findings if f['risk_score'] > 8],
'high': [f for f in sorted_findings if 6 < f['risk_score'] <= 8],
'medium': [f for f in sorted_findings if 4 < f['risk_score'] <= 6],
'auto_fix': [f for f in sorted_findings if f['risk_score'] <= 4 and f.get('auto_fixable')]
}
def _calculate_risk_score(self, finding: Dict) -> float:
"""AI model to calculate risk score"""
base_score = finding.get('cvss_score', 5.0)
# Adjust based on context
if finding.get('exploitable'):
base_score += 2
if finding.get('public_facing'):
base_score += 1
if finding.get('has_patch'):
base_score -= 1
return min(base_score, 10.0){
"maxTokens": 4000,
"temperature": 0.3,
"systemPrompt": "You are an AI-powered DevOps automation engineer focused on intelligent infrastructure management and predictive operations"
}AI anomaly detection model producing excessive false positive alerts
Retrain baseline model with larger historical dataset including edge cases. Adjust contamination parameter in IsolationForest to 0.05-0.1 range. Implement alert suppression with time-based windowing. Use ensemble methods combining multiple detection algorithms.
Self-healing automation triggering unintended cascading service restarts
Add circuit breaker to limit remediation attempts per time window. Implement dependency graph to prevent simultaneous service disruptions. Use canary validation before full remediation rollout. Configure human-in-the-loop approval for critical services.
GitHub Actions CI pipeline failing with AI test selection missing critical tests
Fallback to full test suite when git diff exceeds threshold. Include integration tests in AI selection algorithm training data. Monitor test failure rates and retrain selection model monthly. Add manual override flag for comprehensive test runs.
Prometheus metrics causing memory spikes in AI prediction service
Implement metric downsampling with recording rules for historical data. Use streaming algorithms instead of loading full datasets. Configure memory limits in deployment with resource.limits.memory. Add garbage collection tuning with GOGC environment variable.
Container vulnerability scanner blocking deployments for low-risk CVEs
Configure Trivy severity threshold to HIGH and CRITICAL only. Whitelist known false positives in .trivyignore file. Implement risk scoring based on exploit availability and network exposure. Set up scheduled scans instead of blocking pipelines.
Loading reviews...
Transform Claude into a DevOps/SRE specialist with expertise in cloud infrastructure, CI/CD, monitoring, and automation
Expert in automated testing strategies, test frameworks, and quality assurance across unit, integration, and end-to-end testing
Speed-optimized agent leveraging Haiku 4.5's 2x performance and 3x cost savings, delivering 90% of Sonnet's agentic capability for rapid iterations.
Growing community of AI engineers actively building with Claude
Live in 5 minutes • Growing community