Loading...
# architecture/cloud_design.py
from typing import Dict, List
from dataclasses import dataclass
from enum import Enum
class CloudProvider(Enum):
AWS = "aws"
GCP = "gcp"
AZURE = "azure"
class ServiceTier(Enum):
COMPUTE = "compute"
DATABASE = "database"
STORAGE = "storage"
NETWORKING = "networking"
MONITORING = "monitoring"
@dataclass
class CloudService:
provider: CloudProvider
tier: ServiceTier
service_name: str
region: str
redundancy: str
cost_per_month: float
class MultiCloudArchitect:
def __init__(self):
self.service_mappings = {
# Compute
(ServiceTier.COMPUTE, "container"): {
CloudProvider.AWS: "ECS/EKS",
CloudProvider.GCP: "GKE",
CloudProvider.AZURE: "AKS"
},
(ServiceTier.COMPUTE, "serverless"): {
CloudProvider.AWS: "Lambda",
CloudProvider.GCP: "Cloud Functions",
CloudProvider.AZURE: "Azure Functions"
},
# Database
(ServiceTier.DATABASE, "relational"): {
CloudProvider.AWS: "RDS PostgreSQL",
CloudProvider.GCP: "Cloud SQL",
CloudProvider.AZURE: "Azure Database"
},
(ServiceTier.DATABASE, "nosql"): {
CloudProvider.AWS: "DynamoDB",
CloudProvider.GCP: "Firestore",
CloudProvider.AZURE: "Cosmos DB"
},
# Storage
(ServiceTier.STORAGE, "object"): {
CloudProvider.AWS: "S3",
CloudProvider.GCP: "Cloud Storage",
CloudProvider.AZURE: "Blob Storage"
},
# Networking
(ServiceTier.NETWORKING, "cdn"): {
CloudProvider.AWS: "CloudFront",
CloudProvider.GCP: "Cloud CDN",
CloudProvider.AZURE: "Azure CDN"
},
(ServiceTier.NETWORKING, "load_balancer"): {
CloudProvider.AWS: "ALB/NLB",
CloudProvider.GCP: "Cloud Load Balancing",
CloudProvider.AZURE: "Azure Load Balancer"
},
}
def design_architecture(self,
requirements: Dict,
preferred_provider: CloudProvider = CloudProvider.AWS) -> List[CloudService]:
"""Design cloud architecture based on requirements"""
architecture = []
# Compute layer
if requirements.get('container_workload'):
architecture.append(CloudService(
provider=preferred_provider,
tier=ServiceTier.COMPUTE,
service_name=self.service_mappings[(ServiceTier.COMPUTE, "container")][preferred_provider],
region=requirements.get('primary_region', 'us-east-1'),
redundancy='multi-az',
cost_per_month=self._estimate_cost('container', requirements.get('compute_units', 10))
))
# Database layer
if requirements.get('database_type') == 'relational':
architecture.append(CloudService(
provider=preferred_provider,
tier=ServiceTier.DATABASE,
service_name=self.service_mappings[(ServiceTier.DATABASE, "relational")][preferred_provider],
region=requirements.get('primary_region', 'us-east-1'),
redundancy='multi-az' if requirements.get('high_availability') else 'single-az',
cost_per_month=self._estimate_cost('database', requirements.get('storage_gb', 100))
))
# Storage layer
architecture.append(CloudService(
provider=preferred_provider,
tier=ServiceTier.STORAGE,
service_name=self.service_mappings[(ServiceTier.STORAGE, "object")][preferred_provider],
region=requirements.get('primary_region', 'us-east-1'),
redundancy='cross-region' if requirements.get('disaster_recovery') else 'regional',
cost_per_month=self._estimate_cost('storage', requirements.get('storage_tb', 1))
))
# CDN for global distribution
if requirements.get('global_distribution'):
architecture.append(CloudService(
provider=preferred_provider,
tier=ServiceTier.NETWORKING,
service_name=self.service_mappings[(ServiceTier.NETWORKING, "cdn")][preferred_provider],
region='global',
redundancy='global',
cost_per_month=self._estimate_cost('cdn', requirements.get('data_transfer_tb', 5))
))
return architecture
def _estimate_cost(self, service_type: str, units: float) -> float:
"""Estimate monthly cost"""
cost_map = {
'container': 50 * units, # $50 per compute unit
'database': 0.20 * units, # $0.20 per GB
'storage': 0.023 * units * 1000, # $0.023 per GB
'cdn': 0.085 * units * 1000, # $0.085 per GB transferred
}
return cost_map.get(service_type, 0)# aws/well_architected.py
import boto3
from typing import Dict, List
import json
class WellArchitectedReview:
def __init__(self):
self.wa_client = boto3.client('wellarchitected')
self.pillars = [
'operational_excellence',
'security',
'reliability',
'performance_efficiency',
'cost_optimization',
'sustainability'
]
def create_workload_review(self, workload_name: str, environment: str) -> str:
"""Create Well-Architected workload review"""
response = self.wa_client.create_workload(
WorkloadName=workload_name,
Description=f'{environment} environment workload',
Environment=environment.upper(),
ReviewOwner='cloud-team@company.com',
ArchitecturalDesign='Multi-tier web application',
Lenses=['wellarchitected'],
PillarPriorities=self.pillars
)
return response['WorkloadId']
def analyze_architecture(self, resources: List[Dict]) -> Dict:
"""Analyze architecture against Well-Architected pillars"""
findings = {
'operational_excellence': [],
'security': [],
'reliability': [],
'performance_efficiency': [],
'cost_optimization': [],
'sustainability': []
}
for resource in resources:
# Security checks
if resource['type'] == 'ec2_instance':
if not resource.get('encrypted_volumes'):
findings['security'].append({
'resource': resource['id'],
'issue': 'EBS volumes not encrypted',
'severity': 'high',
'recommendation': 'Enable EBS encryption by default'
})
if resource.get('public_ip'):
findings['security'].append({
'resource': resource['id'],
'issue': 'Instance has public IP',
'severity': 'medium',
'recommendation': 'Use private subnets with NAT gateway'
})
# Reliability checks
if resource['type'] == 'rds_instance':
if not resource.get('multi_az'):
findings['reliability'].append({
'resource': resource['id'],
'issue': 'Database not deployed in Multi-AZ',
'severity': 'high',
'recommendation': 'Enable Multi-AZ for high availability'
})
if not resource.get('automated_backups'):
findings['reliability'].append({
'resource': resource['id'],
'issue': 'Automated backups not enabled',
'severity': 'critical',
'recommendation': 'Enable automated backups with 7-day retention'
})
# Cost optimization checks
if resource['type'] == 'ec2_instance':
if resource.get('instance_type', '').startswith('m5.'):
if resource.get('cpu_utilization', 100) < 20:
findings['cost_optimization'].append({
'resource': resource['id'],
'issue': 'Instance underutilized (CPU < 20%)',
'severity': 'medium',
'recommendation': 'Rightsize to smaller instance type or use auto-scaling',
'potential_savings': self._calculate_rightsizing_savings(resource)
})
# Performance efficiency
if resource['type'] == 's3_bucket':
if not resource.get('transfer_acceleration'):
findings['performance_efficiency'].append({
'resource': resource['id'],
'issue': 'Transfer acceleration not enabled',
'severity': 'low',
'recommendation': 'Enable S3 Transfer Acceleration for faster uploads'
})
return findings
def _calculate_rightsizing_savings(self, resource: Dict) -> float:
"""Calculate potential cost savings from rightsizing"""
# Simplified calculation
current_cost = 100 # Monthly cost
recommended_cost = 60 # After rightsizing
return current_cost - recommended_cost# terraform/main.tf - Multi-cloud deployment
terraform {
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.0"
}
google = {
source = "hashicorp/google"
version = "~> 5.0"
}
azurerm = {
source = "hashicorp/azurerm"
version = "~> 3.0"
}
}
backend "s3" {
bucket = "company-terraform-state"
key = "multi-cloud/terraform.tfstate"
region = "us-east-1"
encrypt = true
dynamodb_table = "terraform-locks"
}
}
# AWS Provider
provider "aws" {
region = var.aws_region
default_tags {
tags = local.common_tags
}
}
# GCP Provider
provider "google" {
project = var.gcp_project_id
region = var.gcp_region
}
# Azure Provider
provider "azurerm" {
features {}
subscription_id = var.azure_subscription_id
}
# Common tags
locals {
common_tags = {
Environment = var.environment
ManagedBy = "Terraform"
Owner = "CloudOps"
CostCenter = var.cost_center
}
}
# AWS - VPC and Networking
module "aws_vpc" {
source = "./modules/aws/vpc"
vpc_cidr = "10.0.0.0/16"
availability_zones = ["us-east-1a", "us-east-1b", "us-east-1c"]
public_subnets = ["10.0.1.0/24", "10.0.2.0/24", "10.0.3.0/24"]
private_subnets = ["10.0.11.0/24", "10.0.12.0/24", "10.0.13.0/24"]
enable_nat_gateway = true
single_nat_gateway = var.environment == "dev"
tags = local.common_tags
}
# AWS - EKS Cluster
module "aws_eks" {
source = "./modules/aws/eks"
cluster_name = "${var.environment}-eks"
cluster_version = "1.28"
vpc_id = module.aws_vpc.vpc_id
subnet_ids = module.aws_vpc.private_subnets
node_groups = {
general = {
desired_size = 3
min_size = 2
max_size = 10
instance_types = ["t3.large"]
labels = {
role = "general"
}
taints = []
}
spot = {
desired_size = 2
min_size = 0
max_size = 5
instance_types = ["t3.large", "t3a.large"]
capacity_type = "SPOT"
labels = {
role = "spot"
}
}
}
tags = local.common_tags
}
# AWS - RDS PostgreSQL
module "aws_rds" {
source = "./modules/aws/rds"
identifier = "${var.environment}-postgres"
engine = "postgres"
engine_version = "15.4"
instance_class = var.environment == "prod" ? "db.r6g.xlarge" : "db.t4g.medium"
allocated_storage = 100
max_allocated_storage = 1000
storage_encrypted = true
multi_az = var.environment == "prod"
backup_retention_period = var.environment == "prod" ? 30 : 7
backup_window = "03:00-04:00"
maintenance_window = "mon:04:00-mon:05:00"
enabled_cloudwatch_logs_exports = ["postgresql", "upgrade"]
performance_insights_enabled = true
vpc_security_group_ids = [aws_security_group.rds.id]
db_subnet_group_name = module.aws_vpc.database_subnet_group
tags = local.common_tags
}
# GCP - GKE Cluster (for multi-region)
module "gcp_gke" {
source = "./modules/gcp/gke"
count = var.enable_gcp ? 1 : 0
project_id = var.gcp_project_id
region = var.gcp_region
cluster_name = "${var.environment}-gke"
network = "default"
subnetwork = "default"
node_pools = [
{
name = "general-pool"
machine_type = "e2-standard-4"
min_count = 2
max_count = 10
auto_upgrade = true
}
]
labels = local.common_tags
}# finops/cost_optimizer.py
import boto3
from datetime import datetime, timedelta
from typing import Dict, List
import pandas as pd
class AWSCostOptimizer:
def __init__(self):
self.ce_client = boto3.client('ce') # Cost Explorer
self.ec2_client = boto3.client('ec2')
self.rds_client = boto3.client('rds')
self.compute_optimizer = boto3.client('compute-optimizer')
def analyze_costs(self, days: int = 30) -> Dict:
"""Analyze costs and identify optimization opportunities"""
end_date = datetime.now().date()
start_date = end_date - timedelta(days=days)
# Get cost and usage
response = self.ce_client.get_cost_and_usage(
TimePeriod={
'Start': start_date.isoformat(),
'End': end_date.isoformat()
},
Granularity='DAILY',
Metrics=['UnblendedCost'],
GroupBy=[
{'Type': 'DIMENSION', 'Key': 'SERVICE'},
]
)
# Analyze results
cost_by_service = {}
for result in response['ResultsByTime']:
date = result['TimePeriod']['Start']
for group in result['Groups']:
service = group['Keys'][0]
cost = float(group['Metrics']['UnblendedCost']['Amount'])
if service not in cost_by_service:
cost_by_service[service] = []
cost_by_service[service].append(cost)
# Calculate total and trends
summary = {}
for service, costs in cost_by_service.items():
summary[service] = {
'total': sum(costs),
'daily_avg': sum(costs) / len(costs),
'trend': 'increasing' if costs[-1] > costs[0] else 'decreasing'
}
return summary
def get_rightsizing_recommendations(self) -> List[Dict]:
"""Get EC2 rightsizing recommendations"""
response = self.compute_optimizer.get_ec2_instance_recommendations(
maxResults=100
)
recommendations = []
for rec in response.get('instanceRecommendations', []):
current_type = rec['currentInstanceType']
recommended_type = rec['recommendationOptions'][0]['instanceType']
current_cost = rec['currentInstanceType']
recommended_cost = rec['recommendationOptions'][0]['estimatedMonthlySavings']['value']
recommendations.append({
'instance_id': rec['instanceArn'].split('/')[-1],
'current_type': current_type,
'recommended_type': recommended_type,
'monthly_savings': recommended_cost,
'cpu_utilization': rec['utilizationMetrics'][0]['value'],
'finding': rec['finding']
})
return recommendations
def identify_idle_resources(self) -> Dict:
"""Identify idle and underutilized resources"""
idle_resources = {
'ec2_instances': [],
'ebs_volumes': [],
'elastic_ips': [],
'load_balancers': []
}
# Idle EC2 instances (low CPU)
cloudwatch = boto3.client('cloudwatch')
ec2_response = self.ec2_client.describe_instances(
Filters=[{'Name': 'instance-state-name', 'Values': ['running']}]
)
for reservation in ec2_response['Reservations']:
for instance in reservation['Instances']:
instance_id = instance['InstanceId']
# Check CPU utilization
metrics = cloudwatch.get_metric_statistics(
Namespace='AWS/EC2',
MetricName='CPUUtilization',
Dimensions=[{'Name': 'InstanceId', 'Value': instance_id}],
StartTime=datetime.now() - timedelta(days=7),
EndTime=datetime.now(),
Period=86400,
Statistics=['Average']
)
if metrics['Datapoints']:
avg_cpu = sum(dp['Average'] for dp in metrics['Datapoints']) / len(metrics['Datapoints'])
if avg_cpu < 5:
idle_resources['ec2_instances'].append({
'instance_id': instance_id,
'instance_type': instance['InstanceType'],
'avg_cpu': avg_cpu,
'estimated_monthly_cost': self._estimate_ec2_cost(instance['InstanceType']),
'recommendation': 'Stop or terminate'
})
# Unattached EBS volumes
volumes = self.ec2_client.describe_volumes(
Filters=[{'Name': 'status', 'Values': ['available']}]
)
for volume in volumes['Volumes']:
idle_resources['ebs_volumes'].append({
'volume_id': volume['VolumeId'],
'size_gb': volume['Size'],
'volume_type': volume['VolumeType'],
'monthly_cost': volume['Size'] * 0.10, # Approximate
'recommendation': 'Delete if not needed'
})
return idle_resources
def _estimate_ec2_cost(self, instance_type: str) -> float:
"""Estimate monthly EC2 cost"""
# Simplified pricing (actual pricing varies by region)
pricing_map = {
't3.micro': 7.50,
't3.small': 15.00,
't3.medium': 30.00,
't3.large': 60.00,
'm5.large': 70.00,
'm5.xlarge': 140.00,
}
return pricing_map.get(instance_type, 100.00)# dr/failover_orchestrator.py
import boto3
from typing import Dict, List
import time
class DisasterRecoveryOrchestrator:
def __init__(self, primary_region: str, dr_region: str):
self.primary_region = primary_region
self.dr_region = dr_region
self.route53 = boto3.client('route53')
self.rds_primary = boto3.client('rds', region_name=primary_region)
self.rds_dr = boto3.client('rds', region_name=dr_region)
def initiate_failover(self, workload_id: str) -> Dict:
"""Initiate DR failover to secondary region"""
steps = []
try:
# Step 1: Update Route53 to point to DR region
steps.append(self._update_dns_to_dr())
# Step 2: Promote RDS read replica to primary
steps.append(self._promote_rds_replica())
# Step 3: Scale up compute in DR region
steps.append(self._scale_dr_compute())
# Step 4: Verify application health
steps.append(self._verify_application_health())
return {
'success': True,
'failover_time': sum(s['duration'] for s in steps),
'steps': steps
}
except Exception as e:
return {
'success': False,
'error': str(e),
'completed_steps': steps
}
def _update_dns_to_dr(self) -> Dict:
"""Update Route53 records to DR region"""
start_time = time.time()
# Update weighted routing or failover routing
response = self.route53.change_resource_record_sets(
HostedZoneId='Z1234567890ABC',
ChangeBatch={
'Changes': [{
'Action': 'UPSERT',
'ResourceRecordSet': {
'Name': 'app.example.com',
'Type': 'A',
'SetIdentifier': 'DR',
'Weight': 100,
'AliasTarget': {
'HostedZoneId': 'Z1234567890XYZ',
'DNSName': 'dr-alb.us-west-2.elb.amazonaws.com',
'EvaluateTargetHealth': True
}
}
}]
}
)
duration = time.time() - start_time
return {
'step': 'DNS Failover',
'success': True,
'duration': duration,
'change_id': response['ChangeInfo']['Id']
}
def _promote_rds_replica(self) -> Dict:
"""Promote RDS read replica to standalone instance"""
start_time = time.time()
response = self.rds_dr.promote_read_replica(
DBInstanceIdentifier='app-db-replica'
)
# Wait for promotion to complete
waiter = self.rds_dr.get_waiter('db_instance_available')
waiter.wait(DBInstanceIdentifier='app-db-replica')
duration = time.time() - start_time
return {
'step': 'RDS Promotion',
'success': True,
'duration': duration,
'new_endpoint': response['DBInstance']['Endpoint']['Address']
}{
"maxTokens": 4000,
"temperature": 0.3,
"systemPrompt": "You are a cloud infrastructure architect agent focused on multi-cloud design and optimization"
}Terraform state lock errors preventing infrastructure deployments
Use terraform force-unlock with lock ID from error message. Configure lock timeouts with -lock-timeout=15m flag. Verify DynamoDB table permissions for S3 backend. Ensure state file isn't replicated across regions causing conflicts.
AWS Lambda functions experiencing cold start latency over 3 seconds
Enable provisioned concurrency for critical functions. Reduce deployment package size by removing unused dependencies. Use ARM64 architecture for better price-performance. Implement SnapStart for Java functions or warm-up events.
Multi-cloud networking connectivity failing between AWS and GCP VPCs
Verify VPN tunnel status and IPsec configuration on both sides. Check route tables have correct CIDR propagation. Ensure security groups and firewall rules allow cross-cloud traffic. Test with traceroute and tcpdump for packet inspection.
CloudFormation stack rollback failing leaving resources in inconsistent state
Use ContinueUpdateRollback API with resources to skip. Check stack events for specific resource failure reasons. Set DeletionPolicy Retain on critical resources. Execute manual resource cleanup and stack delete if necessary.
Kubernetes autoscaler not scaling pods despite high CPU utilization
Verify metrics-server is running with kubectl top nodes. Check HPA configuration targets match pod resource requests. Ensure cluster-autoscaler has permissions to modify node groups. Review --horizontal-pod-autoscaler-sync-period timing settings.
Loading reviews...
Expert backend architect specializing in scalable system design, microservices, API development, and infrastructure planning
Specialized agent for designing, building, and optimizing RESTful APIs and GraphQL services with modern best practices
Expert database architect and optimizer specializing in SQL, NoSQL, performance tuning, and data modeling
Growing community of AI engineers actively building with Claude
Live in 5 minutes • Growing community