项目概述
面向多云(AWS 为主)环境的运维资产统一管理平台,解决企业多账号、多区域下资源分散、变更无感知、到期易遗漏、权限不可控等痛点。
覆盖范围:
- 16+ 种 AWS 服务:EC2 / ELB / RDS / EKS / MSK / WAF / Shield 等
- 28+ 张资源表自动采集
- 资源变更 Diff 告警
- 到期提醒(按天阈值 + 时间窗口 + 频次防重发)
- 多通知渠道推送
- 内置堡垒机(Web SSH / SFTP / 审计回放)
技术架构
前端层 (Jinja2 + Bootstrap)
EC2列表 | LB安全组 | 列显配置 | Shield时间 | 权限按钮控制
|
API 层 (Flask 3.0)
RBAC装饰器 | 资源CRUD | 导出API | 同步API | 堡垒机WS端点
|
服务层
采集引擎 | 告警引擎 | 到期检测 | 域名证书检查 | 会话审计
|
数据层
MySQL (SQLAlchemy 2.0) | APScheduler | Fernet加密 | boto3
核心技术栈
| 层级 | 技术 | 版本 | 说明 |
|---|---|---|---|
| 后端 | Python | 3.11 | 主语言 |
| 框架 | Flask | 3.0 | Web 框架 |
| ORM | SQLAlchemy | 2.0 | 数据库操作 |
| 数据库 | MySQL | 8.0 | 主存储 |
| 调度 | APScheduler | - | 定时任务 |
| 加密 | Fernet | - | 敏感凭证加密 |
| AWS SDK | boto3 | - | 多账号资源采集 |
| SSH | paramiko | - | Web SSH 实现 |
| 部署 | Docker + K8s | - | 容器化部署 |
| WSGI | Gunicorn | - | preload 模式生产部署 |
核心模块设计
1. 多账号资源采集器
设计模式:轮询采集器 + 账号隔离
class AWSResourceCollector:
def __init__(self, accounts):
self.accounts = accounts
def collect_all(self, services=None):
# 遍历所有账号,采集指定服务资源
for account in self.accounts:
session = self._assume_role(account.role_arn)
for service in (services or DEFAULT_SERVICES):
resources = self._collect_service(session, account.region, service)
self._upsert_to_db(resources, account.id, service)
def _collect_service(self, session, region, service):
client = session.client(service, region_name=region)
collector = SERVICE_COLLECTORS[service]
return collector(client)
覆盖服务:
| 服务 | 采集内容 | 表名 |
|---|---|---|
| EC2 | 实例、安全组、密钥对 | aws_ec2_instances |
| ELB | ALB/NLB/CLB、监听器、目标组 | aws_elb_load_balancers |
| RDS | 实例、集群、快照 | aws_rds_instances |
| EKS | 集群、节点组、Pod | aws_eks_clusters |
| MSK | 集群、Broker、Topic | aws_msk_clusters |
| WAFv2 | WebACL、规则、IP集 | aws_waf_web_acls |
| Shield | 防护资源、订阅 | aws_shield_protections |
| VPC | VPC、子网、路由表 | aws_vpcs |
| IAM | 用户、角色、策略 | aws_iam_entities |
采集策略:
- 定时同步:每小时全量采集
- 手动触发:管理后台一键同步
- 增量采集:支持按服务/按账号选择性同步
2. 变更告警引擎
快照比对机制:
class ChangeDetector:
def detect_changes(self, current_snapshot, previous_snapshot):
# 比对两次采集快照,生成变更事件
changes = []
# 新增资源
added = set(current_snapshot.keys()) - set(previous_snapshot.keys())
for rid in added:
changes.append(ChangeEvent('added', rid, current_snapshot[rid]))
# 删除资源
removed = set(previous_snapshot.keys()) - set(current_snapshot.keys())
for rid in removed:
changes.append(ChangeEvent('removed', rid, previous_snapshot[rid]))
# 修改资源
common = set(current_snapshot.keys()) & set(previous_snapshot.keys())
for rid in common:
if current_snapshot[rid] != previous_snapshot[rid]:
diff = self._compute_diff(previous_snapshot[rid], current_snapshot[rid])
changes.append(ChangeEvent('modified', rid, diff))
return changes
告警规则:
| 规则 | 触发条件 | 通知渠道 |
|---|---|---|
| 资源新增 | any added | 飞书/Slack |
| 资源删除 | any removed | 飞书/Slack + 邮件 |
| 配置变更 | any modified | 飞书/Slack |
| 公网暴露 | security_group ingress 0.0.0.0/0 | 飞书/Slack + 邮件 |
| 权限变更 | IAM policy 变更 | 飞书/Slack + 邮件 |
防重发机制:
- 时间窗口:同一资源 1 小时内不重复告警
- 频次限制:单账号每小时最多 50 条告警
- 聚合通知:5 分钟内同类变更聚合为一条
3. 到期提醒系统
支持类型:
| 资源类型 | 到期字段 | 检测方式 |
|---|---|---|
| EC2 Reserved Instance | 到期时间 | AWS API |
| RDS Reserved Instance | 到期时间 | AWS API |
| 域名证书 | NotAfter | SSL 握手 |
| 域名注册 | 到期日期 | RDAP/WHOIS |
| SSL 证书 (ACM) | 到期时间 | AWS API |
提醒策略:
class ExpirationReminder:
REMINDER_THRESHOLDS = [30, 15, 7, 3, 1] # 到期前天数
def check_and_notify(self, resource):
days_until_expire = (resource.expire_date - datetime.now()).days
for threshold in self.REMINDER_THRESHOLDS:
if days_until_expire <= threshold:
# 检查是否已发送过该阈值提醒
if not self._is_reminded(resource.id, threshold):
self._send_reminder(resource, threshold)
self._mark_reminded(resource.id, threshold)
域名证书检测(RDAP 优先 + WHOIS 降级):
class DomainExpirationChecker:
SUPPORTED_TLDS = ['com', 'net', 'org', 'ai', 'global', 'io']
def check(self, domain):
tld = domain.split('.')[-1]
# 优先尝试 RDAP(IANA 官方协议)
try:
return self._check_rdap(domain)
except RDAPNotAvailable:
pass
# 降级到 WHOIS
try:
return self._check_whois(domain)
except WHOISFailed:
raise CheckFailed(f"无法获取 {domain} 到期信息")
def _check_rdap(self, domain):
# 使用 RDAP 协议查询(RFC 7482)
rdap_url = f"https://rdap.org/domain/{domain}"
response = requests.get(rdap_url, timeout=30)
data = response.json()
# 解析 events 数组中的 expiration 事件
for event in data.get('events', []):
if event['eventAction'] == 'expiration':
return datetime.fromisoformat(event['eventDate'])
4. RBAC 权限体系
权限模型:
from functools import wraps
from flask import abort, current_app
def require_permission(permission_code):
# 资源级权限控制装饰器
# 支持四级粒度:view / edit / export / sync
# 权限点精确到按钮/API级别
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
user = current_user
resource = kwargs.get('resource_type', 'global')
# 检查用户是否拥有该资源的指定权限
if not user.has_permission(f"{resource}:{permission_code}"):
# 记录权限拒绝日志
current_app.logger.warning(
f"Permission denied: {user.username} "
f"attempted {permission_code} on {resource}"
)
abort(403)
return f(*args, **kwargs)
return decorated_function
return decorator
# 使用示例
@app.route('/api/ec2/export')
@require_permission('export') # 需要 ec2:export 权限
def export_ec2():
pass
@app.route('/api/ec2/sync')
@require_permission('sync') # 需要 ec2:sync 权限
def sync_ec2():
pass
权限点清单(20+):
| 资源 | view | edit | export | sync |
|---|---|---|---|---|
| ec2 | Yes | Yes | Yes | Yes |
| elb | Yes | Yes | Yes | Yes |
| rds | Yes | Yes | Yes | Yes |
| eks | Yes | No | Yes | Yes |
| msk | Yes | No | Yes | Yes |
| waf | Yes | Yes | Yes | Yes |
| shield | Yes | No | Yes | Yes |
| iam | Yes | No | No | Yes |
改造范围:
- 20+ 权限点定义
- 15+ API 端点权限校验
- 18+ 前端模板按钮级控制
5. 堡垒机模块(Web SSH)
架构:
浏览器 <-> WebSocket <-> Flask 后端 <-> paramiko <-> 目标服务器
|
心跳检测 (30s)
空闲超时 (10min)
最大时长 (4h)
|
命令审计日志 -> MySQL
会话录像 -> 文件存储
核心实现:
class WebSSHHandler:
HEARTBEAT_INTERVAL = 30 # 心跳间隔(秒)
IDLE_TIMEOUT = 600 # 空闲超时(秒)
MAX_SESSION_DURATION = 14400 # 最大会话时长(秒)
def __init__(self, websocket, host, username, credential):
self.ws = websocket
self.ssh = paramiko.SSHClient()
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 记录活动时间
self.last_activity = time.time() # 任何活动(心跳/输入)
self.last_user_activity = time.time() # 仅用户输入
def handle_message(self, message):
# 处理 WebSocket 消息
msg_type = message['type']
if msg_type == 'heartbeat':
self.last_activity = time.time()
elif msg_type == 'input':
self.last_activity = time.time()
self.last_user_activity = time.time()
# 发送命令到 SSH 通道
self.ssh_chan.send(message['data'])
# 记录命令审计
self._audit_command(message['data'])
elif msg_type == 'resize':
self.ssh_chan.resize_pty(
message['cols'],
message['rows']
)
def check_timeout(self):
# 超时检查(独立线程)
now = time.time()
# 心跳超时:30s 无响应断开
if now - self.last_activity > self.HEARTBEAT_INTERVAL * 2:
self.close("心跳超时")
return
# 空闲超时:10min 无用户输入断开
if now - self.last_user_activity > self.IDLE_TIMEOUT:
self.close("空闲超时")
return
# 最大时长:4h 强制断开
if now - self.session_start > self.MAX_SESSION_DURATION:
self.close("会话时长超限")
return
def _audit_command(self, cmd):
# 记录命令审计
audit_log = {
'session_id': self.session_id,
'timestamp': datetime.now(),
'user': self.username,
'host': self.host,
'command': cmd.strip(),
'output': '' # 异步填充
}
db.session.add(CommandAudit(**audit_log))
db.session.commit()
会话录像:
class SessionRecorder:
def __init__(self, session_id):
self.session_id = session_id
self.buffer = []
self.start_time = datetime.now()
def record(self, event_type, data, timestamp=None):
# 记录事件到缓冲区
self.buffer.append({
'ts': timestamp or time.time(),
'type': event_type, # 'input' | 'output' | 'resize'
'data': data
})
def save(self):
# 会话结束保存录像文件
filename = f"{self.session_id}_{self.start_time.strftime('%Y%m%d_%H%M%S')}.cast"
filepath = os.path.join(RECORDINGS_DIR, filename)
with open(filepath, 'w') as f:
# 写入 asciicast v2 格式
f.write(json.dumps({
'version': 2,
'width': 80,
'height': 24,
'timestamp': int(self.start_time.timestamp()),
'env': {'SHELL': '/bin/bash', 'TERM': 'xterm-256color'}
}) + '\n')
for event in self.buffer:
f.write(json.dumps([
event['ts'] - self.start_time.timestamp(),
event['type'],
event['data']
]) + '\n')
生产部署
Dockerfile
FROM python:3.11-slim
WORKDIR /app
# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 创建日志目录
RUN mkdir -p /app/logs && chmod 755 /app/logs
# 非 root 运行
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser
EXPOSE 5000
CMD ["gunicorn", "-w", "4",
"-k", "geventwebsocket.gunicorn.workers.GeventWebSocketWorker",
"--preload", "-b", "0.0.0.0:5000", "app:app"]
K8s Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: ops-asset-mgmt
spec:
replicas: 2
selector:
matchLabels:
app: ops-asset-mgmt
template:
metadata:
labels:
app: ops-asset-mgmt
spec:
containers:
- name: app
image: ops-asset-mgmt:latest
ports:
- containerPort: 5000
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: ops-asset-secrets
key: database_url
- name: AWS_CREDENTIALS
valueFrom:
secretKeyRef:
name: ops-asset-secrets
key: aws_credentials_encrypted
volumeMounts:
- name: logs
mountPath: /app/logs
- name: recordings
mountPath: /app/recordings
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m"
volumes:
- name: logs
emptyDir: {}
- name: recordings
persistentVolumeClaim:
claimName: ops-asset-recordings
生产环境问题与解决
| 问题 | 现象 | 根因 | 解决 |
|---|---|---|---|
| .env Secret 卷挂载键名冲突 | 容器启动失败,env 变量未注入 | K8s Secret 键名与代码读取的键名不一致 | 统一 Secret 键名规范,增加启动校验 |
| /app/logs 权限 denied | Gunicorn 无法写入日志 | 容器非 root 运行,logs 目录属主错误 | Dockerfile 中显式创建并 chmod 755 |
| preload 模式模板缓存不刷新 | 代码更新后页面未变化 | preload 模式下模板编译缓存 | 配置 TEMPLATES_AUTO_RELOAD=true,或滚动重启 |
| WebSocket 连接不稳定 | 长会话随机断开 | Nginx proxy_read_timeout 默认 60s | 调整为 1h,并配置心跳保持 |
资源联动与运维效率设计
设计目标
以运维用户为核心使用场景,解决多云环境下资源查询链路长、跨服务排查效率低、终端登录工具割裂等痛点。通过资源拓扑自动关联 + 逐层点击跳转 + 终端一站式直达,实现”查资源 → 定位问题 → 连终端排查”的闭环体验。
资源联动导航链路
Route53 域名列表
│ 点击域名
▼
解析详情页 ──→ 后端指向 CDN / LB / S3 / EC2
│ 点击 CDN
▼
CDN 详情页 ──→ 回源配置 → 点击回源域名/LB
│ 点击 LB
▼
LB 详情页 ──→ 监听器列表 → 目标组 → 健康检查状态
│ 点击目标组 / 监听端口
▼
后端实例列表 ──→ EC2 实例卡片(含状态、AZ、标签)
│ 点击实例
▼
EC2 详情页 ──→ 规格 / 安全组 / 磁盘 / 标签 / 终端入口
│ 点击 [SSH 登录](需授权)
▼
Web SSH 终端 ──→ 直接登录,开始排查
核心交互设计
1. Route53 → 后端资源自动解析
class Route53Resolver:
"""域名后端指向解析器"""
def resolve_target(self, record):
record_type = record["Type"] # A / CNAME / ALIAS
value = record["ResourceRecords"] or record.get("AliasTarget")
# 识别后端类型
if self._is_cloudfront_domain(value):
return {"type": "CDN", "service": "CloudFront", "id": self._extract_distribution_id(value)}
elif self._is_elb_domain(value):
return {"type": "LB", "service": self._detect_lb_type(value), "name": self._extract_lb_name(value)}
elif self._is_s3_endpoint(value):
return {"type": "S3", "bucket": self._extract_bucket_name(value)}
elif record_type == "A" and self._is_ec2_ip(value):
return {"type": "EC2", "ip": value}
return {"type": "UNKNOWN", "raw": value}
前端展示:域名列表每行显示 后端类型标签(CDN / LB / S3 / EC2),点击直接跳转对应资源详情页。
2. CDN → LB 回源联动
| CDN 信息 | 回源配置 | 点击行为 |
|---|---|---|
| Distribution ID | 回源域名 / 自定义源站 | 跳转到对应 LB 详情 |
| 状态 / 域名 | 回源协议 / 端口 | 或展示 S3 桶信息 |
| 边缘节点数 | 缓存行为 |
3. LB → 监听端口 → 后端实例
class LBNavigator:
"""LB 详情页数据聚合"""
def get_lb_detail(self, lb_arn):
return {
"basic": self._get_lb_info(lb_arn), # 名称 / 类型 / AZ / 状态
"listeners": self._get_listeners(lb_arn), # 端口 / 协议 / 规则
"target_groups": self._get_target_groups(lb_arn), # 目标组 / 健康状态
"backends": self._get_backend_instances(lb_arn) # 关联 EC2 列表
}
前端交互:
- LB 详情页 监听器卡片 → 点击端口展开目标组
- 目标组卡片显示 健康实例数 / 总实例数 → 点击展开实例列表
- 实例列表支持 批量跳转 或 单实例详情
4. EC2 实例 → 终端一键登录
class InstanceCard:
"""EC2 实例卡片渲染逻辑"""
def render(self, instance, user):
card = {
"instance_id": instance.id,
"name": instance.tags.get("Name"),
"type": instance.instance_type,
"state": instance.state,
"public_ip": instance.public_ip,
"private_ip": instance.private_ip,
"az": instance.placement["AvailabilityZone"],
# 终端登录入口
"ssh_enabled": user.has_permission(f"ec2:ssh:{instance.id}"),
"ssh_url": f"/ssh/{instance.id}" if user.has_permission(f"ec2:ssh:{instance.id}") else None
}
return card
前端展示:
- 实例卡片右上角显示 终端图标(未授权 / 可登录)
- 授权用户点击图标 → 新开标签页打开 Web SSH 终端
- 未授权用户 → 提示”申请权限”并跳转审批流
性能优化
| 优化点 | 实现 | 效果 |
|---|---|---|
| 资源关联预加载 | 采集时写入关联表(domain→cdn→lb→ec2) | 页面加载 < 200ms |
| 前端缓存 | localStorage 存储最近访问资源路径 | 二次访问秒开 |
| 异步聚合 | LB 详情页并行查询 listeners/targets/backends | 减少 60% 等待时间 |
| 懒加载 | 后端实例列表滚动加载,默认展示前 20 条 | 首屏渲染 < 1s |
权限控制
# 实例级 SSH 权限(细粒度到单台机器)
@require_permission("ec2:ssh:{instance_id}")
def open_ssh_terminal(instance_id):
"""仅对授权实例开放终端"""
pass
# 资源查看权限(按服务隔离)
@require_permission("route53:view")
def list_domains():
pass
@require_permission("cloudfront:view")
def get_cdn_detail(distribution_id):
pass
个人核心贡献
1. 权限体系重构
背景: 导出、同步操作原本绑定在查看权限中,无法满足”仅给某用户开通 EC2 导出”的细粒度需求。
改造:
# 改造前:导出与查看绑定
@require_permission('view') # 有查看就能导出
def export_ec2():
pass
# 改造后:独立权限码
@require_permission('export') # 需要单独授权 ec2:export
def export_ec2():
pass
@require_permission('sync') # 需要单独授权 ec2:sync
def sync_ec2():
pass
影响范围:
- 20+ 权限点新增/拆分
- 15+ API 端点装饰器替换
- 18+ 前端模板按钮级权限控制
- 数据库迁移脚本:历史权限数据平滑升级
2. 终端稳定性优化
问题: 用户静止阅读日志时被误断连。
根因: 原逻辑仅用 last_activity(包含心跳),用户不输入时心跳仍在,但空闲检测误判。
解决: 分离两个时间戳
# 改造前
def on_message(self, msg):
self.last_activity = time.time() # 心跳和输入都更新
def check_idle(self):
if time.time() - self.last_activity > IDLE_TIMEOUT:
self.close() # 误断!
# 改造后
def on_message(self, msg):
if msg['type'] == 'heartbeat':
self.last_activity = time.time() # 仅心跳
elif msg['type'] == 'input':
self.last_activity = time.time()
self.last_user_activity = time.time() # 仅用户输入
def check_idle(self):
# 空闲检测基于用户输入,而非心跳
if time.time() - self.last_user_activity > IDLE_TIMEOUT:
self.close() # 正确:用户真的没操作
附加修复: 子线程中 current_app 不可用的日志异常
# 修复前(异常)
def background_task():
current_app.logger.info("task started") # RuntimeError!
# 修复后
from flask import has_app_context
def background_task(app):
with app.app_context():
app.logger.info("task started") # 正确
3. 告警准确性提升
问题: EIP 变更频繁误报。
根因: ALB/NLB 类型的公网 IP 归属判定缺失,导致 IP 漂移时误判为新增/删除。
解决: 增加负载均衡器类型识别
def classify_eip_owner(self, ip, resources):
# 判定 EIP 归属,消除误报
# 检查是否为 ALB/NLB 出口 IP
for lb in resources.get('load_balancers', []):
if lb['type'] in ['application', 'network']:
# ALB/NLB 的 IP 会动态变化,不计入变更
if ip in lb.get('public_ips', []):
return 'alb_nlb_dynamic' # 标记为动态,忽略变更
# 检查是否为 EC2 绑定
for ec2 in resources.get('ec2_instances', []):
if ec2.get('public_ip') == ip:
return 'ec2_static'
return 'unknown'
4. 前端交互优化
| 优化项 | 实现 | 效果 |
|---|---|---|
| EC2/LB 安全组列 | 可点击跳转安全组详情页 | 减少 2 步操作 |
| 列显配置 | 用户自定义显示/隐藏列,持久化到 localStorage | 个性化体验 |
| Shield 时间显示 | 后端统一 CST 格式化,前端直接渲染 | 消除时区歧义 |
| 批量导出 | 支持跨页选择,异步生成下载 | 大数据量不卡顿 |
| 快捷筛选 | 保存常用筛选条件为书签 | 重复操作减少 80% |
复盘
问题根因: 初期权限设计过于粗放,未考虑运营场景下的细粒度授权需求。
改进措施:
- 权限体系重构为资源级四级粒度(view/edit/export/sync)
- 建立权限变更审批流,防止过度授权
- 增加权限使用审计,定期回收闲置权限
- 堡垒机会话增加双人授权机制(敏感操作)
本文首发于 wr.mrchi.cn,转载请注明出处。