Skip to content
云间札记
Go back

运维资产管理系统设计与实现:多云环境下的资源治理与堡垒机一体化

Updated:

项目概述

面向多云(AWS 为主)环境的运维资产统一管理平台,解决企业多账号、多区域下资源分散、变更无感知、到期易遗漏、权限不可控等痛点。

覆盖范围:

技术架构

前端层 (Jinja2 + Bootstrap)
  EC2列表 | LB安全组 | 列显配置 | Shield时间 | 权限按钮控制
                              |
API 层 (Flask 3.0)
  RBAC装饰器 | 资源CRUD | 导出API | 同步API | 堡垒机WS端点
                              |
服务层
  采集引擎 | 告警引擎 | 到期检测 | 域名证书检查 | 会话审计
                              |
数据层
  MySQL (SQLAlchemy 2.0) | APScheduler | Fernet加密 | boto3

核心技术栈

层级技术版本说明
后端Python3.11主语言
框架Flask3.0Web 框架
ORMSQLAlchemy2.0数据库操作
数据库MySQL8.0主存储
调度APScheduler-定时任务
加密Fernet-敏感凭证加密
AWS SDKboto3-多账号资源采集
SSHparamiko-Web SSH 实现
部署Docker + K8s-容器化部署
WSGIGunicorn-preload 模式生产部署

核心模块设计

1. 多账号资源采集器

设计模式:轮询采集器 + 账号隔离

class AWSResourceCollector:
    def __init__(self, accounts):
        self.accounts = accounts

    def collect_all(self, services=None):
        # 遍历所有账号,采集指定服务资源
        for account in self.accounts:
            session = self._assume_role(account.role_arn)
            for service in (services or DEFAULT_SERVICES):
                resources = self._collect_service(session, account.region, service)
                self._upsert_to_db(resources, account.id, service)

    def _collect_service(self, session, region, service):
        client = session.client(service, region_name=region)
        collector = SERVICE_COLLECTORS[service]
        return collector(client)

覆盖服务:

服务采集内容表名
EC2实例、安全组、密钥对aws_ec2_instances
ELBALB/NLB/CLB、监听器、目标组aws_elb_load_balancers
RDS实例、集群、快照aws_rds_instances
EKS集群、节点组、Podaws_eks_clusters
MSK集群、Broker、Topicaws_msk_clusters
WAFv2WebACL、规则、IP集aws_waf_web_acls
Shield防护资源、订阅aws_shield_protections
VPCVPC、子网、路由表aws_vpcs
IAM用户、角色、策略aws_iam_entities

采集策略:

2. 变更告警引擎

快照比对机制:

class ChangeDetector:
    def detect_changes(self, current_snapshot, previous_snapshot):
        # 比对两次采集快照,生成变更事件
        changes = []

        # 新增资源
        added = set(current_snapshot.keys()) - set(previous_snapshot.keys())
        for rid in added:
            changes.append(ChangeEvent('added', rid, current_snapshot[rid]))

        # 删除资源
        removed = set(previous_snapshot.keys()) - set(current_snapshot.keys())
        for rid in removed:
            changes.append(ChangeEvent('removed', rid, previous_snapshot[rid]))

        # 修改资源
        common = set(current_snapshot.keys()) & set(previous_snapshot.keys())
        for rid in common:
            if current_snapshot[rid] != previous_snapshot[rid]:
                diff = self._compute_diff(previous_snapshot[rid], current_snapshot[rid])
                changes.append(ChangeEvent('modified', rid, diff))

        return changes

告警规则:

规则触发条件通知渠道
资源新增any added飞书/Slack
资源删除any removed飞书/Slack + 邮件
配置变更any modified飞书/Slack
公网暴露security_group ingress 0.0.0.0/0飞书/Slack + 邮件
权限变更IAM policy 变更飞书/Slack + 邮件

防重发机制:

3. 到期提醒系统

支持类型:

资源类型到期字段检测方式
EC2 Reserved Instance到期时间AWS API
RDS Reserved Instance到期时间AWS API
域名证书NotAfterSSL 握手
域名注册到期日期RDAP/WHOIS
SSL 证书 (ACM)到期时间AWS API

提醒策略:

class ExpirationReminder:
    REMINDER_THRESHOLDS = [30, 15, 7, 3, 1]  # 到期前天数

    def check_and_notify(self, resource):
        days_until_expire = (resource.expire_date - datetime.now()).days

        for threshold in self.REMINDER_THRESHOLDS:
            if days_until_expire <= threshold:
                # 检查是否已发送过该阈值提醒
                if not self._is_reminded(resource.id, threshold):
                    self._send_reminder(resource, threshold)
                    self._mark_reminded(resource.id, threshold)

域名证书检测(RDAP 优先 + WHOIS 降级):

class DomainExpirationChecker:
    SUPPORTED_TLDS = ['com', 'net', 'org', 'ai', 'global', 'io']

    def check(self, domain):
        tld = domain.split('.')[-1]

        # 优先尝试 RDAP(IANA 官方协议)
        try:
            return self._check_rdap(domain)
        except RDAPNotAvailable:
            pass

        # 降级到 WHOIS
        try:
            return self._check_whois(domain)
        except WHOISFailed:
            raise CheckFailed(f"无法获取 {domain} 到期信息")

    def _check_rdap(self, domain):
        # 使用 RDAP 协议查询(RFC 7482)
        rdap_url = f"https://rdap.org/domain/{domain}"
        response = requests.get(rdap_url, timeout=30)
        data = response.json()

        # 解析 events 数组中的 expiration 事件
        for event in data.get('events', []):
            if event['eventAction'] == 'expiration':
                return datetime.fromisoformat(event['eventDate'])

4. RBAC 权限体系

权限模型:

from functools import wraps
from flask import abort, current_app

def require_permission(permission_code):
    # 资源级权限控制装饰器
    # 支持四级粒度:view / edit / export / sync
    # 权限点精确到按钮/API级别
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            user = current_user
            resource = kwargs.get('resource_type', 'global')

            # 检查用户是否拥有该资源的指定权限
            if not user.has_permission(f"{resource}:{permission_code}"):
                # 记录权限拒绝日志
                current_app.logger.warning(
                    f"Permission denied: {user.username} "
                    f"attempted {permission_code} on {resource}"
                )
                abort(403)

            return f(*args, **kwargs)
        return decorated_function
    return decorator

# 使用示例
@app.route('/api/ec2/export')
@require_permission('export')  # 需要 ec2:export 权限
def export_ec2():
    pass

@app.route('/api/ec2/sync')
@require_permission('sync')    # 需要 ec2:sync 权限
def sync_ec2():
    pass

权限点清单(20+):

资源vieweditexportsync
ec2YesYesYesYes
elbYesYesYesYes
rdsYesYesYesYes
eksYesNoYesYes
mskYesNoYesYes
wafYesYesYesYes
shieldYesNoYesYes
iamYesNoNoYes

改造范围:

5. 堡垒机模块(Web SSH)

架构:

浏览器 <-> WebSocket <-> Flask 后端 <-> paramiko <-> 目标服务器
              |
         心跳检测 (30s)
         空闲超时 (10min)
         最大时长 (4h)
              |
         命令审计日志 -> MySQL
         会话录像 -> 文件存储

核心实现:

class WebSSHHandler:
    HEARTBEAT_INTERVAL = 30      # 心跳间隔(秒)
    IDLE_TIMEOUT = 600           # 空闲超时(秒)
    MAX_SESSION_DURATION = 14400  # 最大会话时长(秒)

    def __init__(self, websocket, host, username, credential):
        self.ws = websocket
        self.ssh = paramiko.SSHClient()
        self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

        # 记录活动时间
        self.last_activity = time.time()      # 任何活动(心跳/输入)
        self.last_user_activity = time.time()  # 仅用户输入

    def handle_message(self, message):
        # 处理 WebSocket 消息
        msg_type = message['type']

        if msg_type == 'heartbeat':
            self.last_activity = time.time()

        elif msg_type == 'input':
            self.last_activity = time.time()
            self.last_user_activity = time.time()
            # 发送命令到 SSH 通道
            self.ssh_chan.send(message['data'])

            # 记录命令审计
            self._audit_command(message['data'])

        elif msg_type == 'resize':
            self.ssh_chan.resize_pty(
                message['cols'], 
                message['rows']
            )

    def check_timeout(self):
        # 超时检查(独立线程)
        now = time.time()

        # 心跳超时:30s 无响应断开
        if now - self.last_activity > self.HEARTBEAT_INTERVAL * 2:
            self.close("心跳超时")
            return

        # 空闲超时:10min 无用户输入断开
        if now - self.last_user_activity > self.IDLE_TIMEOUT:
            self.close("空闲超时")
            return

        # 最大时长:4h 强制断开
        if now - self.session_start > self.MAX_SESSION_DURATION:
            self.close("会话时长超限")
            return

    def _audit_command(self, cmd):
        # 记录命令审计
        audit_log = {
            'session_id': self.session_id,
            'timestamp': datetime.now(),
            'user': self.username,
            'host': self.host,
            'command': cmd.strip(),
            'output': ''  # 异步填充
        }
        db.session.add(CommandAudit(**audit_log))
        db.session.commit()

会话录像:

class SessionRecorder:
    def __init__(self, session_id):
        self.session_id = session_id
        self.buffer = []
        self.start_time = datetime.now()

    def record(self, event_type, data, timestamp=None):
        # 记录事件到缓冲区
        self.buffer.append({
            'ts': timestamp or time.time(),
            'type': event_type,  # 'input' | 'output' | 'resize'
            'data': data
        })

    def save(self):
        # 会话结束保存录像文件
        filename = f"{self.session_id}_{self.start_time.strftime('%Y%m%d_%H%M%S')}.cast"
        filepath = os.path.join(RECORDINGS_DIR, filename)

        with open(filepath, 'w') as f:
            # 写入 asciicast v2 格式
            f.write(json.dumps({
                'version': 2,
                'width': 80,
                'height': 24,
                'timestamp': int(self.start_time.timestamp()),
                'env': {'SHELL': '/bin/bash', 'TERM': 'xterm-256color'}
            }) + '\n')

            for event in self.buffer:
                f.write(json.dumps([
                    event['ts'] - self.start_time.timestamp(),
                    event['type'],
                    event['data']
                ]) + '\n')

生产部署

Dockerfile

FROM python:3.11-slim

WORKDIR /app

# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY . .

# 创建日志目录
RUN mkdir -p /app/logs && chmod 755 /app/logs

# 非 root 运行
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser

EXPOSE 5000

CMD ["gunicorn", "-w", "4", 
     "-k", "geventwebsocket.gunicorn.workers.GeventWebSocketWorker", 
     "--preload", "-b", "0.0.0.0:5000", "app:app"]

K8s Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: ops-asset-mgmt
spec:
  replicas: 2
  selector:
    matchLabels:
      app: ops-asset-mgmt
  template:
    metadata:
      labels:
        app: ops-asset-mgmt
    spec:
      containers:
      - name: app
        image: ops-asset-mgmt:latest
        ports:
        - containerPort: 5000
        env:
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: ops-asset-secrets
              key: database_url
        - name: AWS_CREDENTIALS
          valueFrom:
            secretKeyRef:
              name: ops-asset-secrets
              key: aws_credentials_encrypted
        volumeMounts:
        - name: logs
          mountPath: /app/logs
        - name: recordings
          mountPath: /app/recordings
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "1Gi"
            cpu: "1000m"
      volumes:
      - name: logs
        emptyDir: {}
      - name: recordings
        persistentVolumeClaim:
          claimName: ops-asset-recordings

生产环境问题与解决

问题现象根因解决
.env Secret 卷挂载键名冲突容器启动失败,env 变量未注入K8s Secret 键名与代码读取的键名不一致统一 Secret 键名规范,增加启动校验
/app/logs 权限 deniedGunicorn 无法写入日志容器非 root 运行,logs 目录属主错误Dockerfile 中显式创建并 chmod 755
preload 模式模板缓存不刷新代码更新后页面未变化preload 模式下模板编译缓存配置 TEMPLATES_AUTO_RELOAD=true,或滚动重启
WebSocket 连接不稳定长会话随机断开Nginx proxy_read_timeout 默认 60s调整为 1h,并配置心跳保持

资源联动与运维效率设计

设计目标

以运维用户为核心使用场景,解决多云环境下资源查询链路长、跨服务排查效率低、终端登录工具割裂等痛点。通过资源拓扑自动关联 + 逐层点击跳转 + 终端一站式直达,实现”查资源 → 定位问题 → 连终端排查”的闭环体验。

资源联动导航链路

Route53 域名列表
    │ 点击域名

解析详情页 ──→ 后端指向 CDN / LB / S3 / EC2
    │ 点击 CDN

CDN 详情页 ──→ 回源配置 → 点击回源域名/LB
    │ 点击 LB

LB 详情页 ──→ 监听器列表 → 目标组 → 健康检查状态
    │ 点击目标组 / 监听端口

后端实例列表 ──→ EC2 实例卡片(含状态、AZ、标签)
    │ 点击实例

EC2 详情页 ──→ 规格 / 安全组 / 磁盘 / 标签 / 终端入口
    │ 点击 [SSH 登录](需授权)

Web SSH 终端 ──→ 直接登录,开始排查

核心交互设计

1. Route53 → 后端资源自动解析

class Route53Resolver:
    """域名后端指向解析器"""

    def resolve_target(self, record):
        record_type = record["Type"]  # A / CNAME / ALIAS
        value = record["ResourceRecords"] or record.get("AliasTarget")

        # 识别后端类型
        if self._is_cloudfront_domain(value):
            return {"type": "CDN", "service": "CloudFront", "id": self._extract_distribution_id(value)}

        elif self._is_elb_domain(value):
            return {"type": "LB", "service": self._detect_lb_type(value), "name": self._extract_lb_name(value)}

        elif self._is_s3_endpoint(value):
            return {"type": "S3", "bucket": self._extract_bucket_name(value)}

        elif record_type == "A" and self._is_ec2_ip(value):
            return {"type": "EC2", "ip": value}

        return {"type": "UNKNOWN", "raw": value}

前端展示:域名列表每行显示 后端类型标签(CDN / LB / S3 / EC2),点击直接跳转对应资源详情页。

2. CDN → LB 回源联动

CDN 信息回源配置点击行为
Distribution ID回源域名 / 自定义源站跳转到对应 LB 详情
状态 / 域名回源协议 / 端口或展示 S3 桶信息
边缘节点数缓存行为

3. LB → 监听端口 → 后端实例

class LBNavigator:
    """LB 详情页数据聚合"""

    def get_lb_detail(self, lb_arn):
        return {
            "basic": self._get_lb_info(lb_arn),           # 名称 / 类型 / AZ / 状态
            "listeners": self._get_listeners(lb_arn),      # 端口 / 协议 / 规则
            "target_groups": self._get_target_groups(lb_arn),  # 目标组 / 健康状态
            "backends": self._get_backend_instances(lb_arn)    # 关联 EC2 列表
        }

前端交互:

4. EC2 实例 → 终端一键登录

class InstanceCard:
    """EC2 实例卡片渲染逻辑"""

    def render(self, instance, user):
        card = {
            "instance_id": instance.id,
            "name": instance.tags.get("Name"),
            "type": instance.instance_type,
            "state": instance.state,
            "public_ip": instance.public_ip,
            "private_ip": instance.private_ip,
            "az": instance.placement["AvailabilityZone"],
            # 终端登录入口
            "ssh_enabled": user.has_permission(f"ec2:ssh:{instance.id}"),
            "ssh_url": f"/ssh/{instance.id}" if user.has_permission(f"ec2:ssh:{instance.id}") else None
        }
        return card

前端展示:

性能优化

优化点实现效果
资源关联预加载采集时写入关联表(domain→cdn→lb→ec2)页面加载 < 200ms
前端缓存localStorage 存储最近访问资源路径二次访问秒开
异步聚合LB 详情页并行查询 listeners/targets/backends减少 60% 等待时间
懒加载后端实例列表滚动加载,默认展示前 20 条首屏渲染 < 1s

权限控制

# 实例级 SSH 权限(细粒度到单台机器)
@require_permission("ec2:ssh:{instance_id}")
def open_ssh_terminal(instance_id):
    """仅对授权实例开放终端"""
    pass

# 资源查看权限(按服务隔离)
@require_permission("route53:view")
def list_domains():
    pass

@require_permission("cloudfront:view")
def get_cdn_detail(distribution_id):
    pass

个人核心贡献

1. 权限体系重构

背景: 导出、同步操作原本绑定在查看权限中,无法满足”仅给某用户开通 EC2 导出”的细粒度需求。

改造:

# 改造前:导出与查看绑定
@require_permission('view')  # 有查看就能导出
def export_ec2():
    pass

# 改造后:独立权限码
@require_permission('export')  # 需要单独授权 ec2:export
def export_ec2():
    pass

@require_permission('sync')    # 需要单独授权 ec2:sync
def sync_ec2():
    pass

影响范围:

2. 终端稳定性优化

问题: 用户静止阅读日志时被误断连。

根因: 原逻辑仅用 last_activity(包含心跳),用户不输入时心跳仍在,但空闲检测误判。

解决: 分离两个时间戳

# 改造前
def on_message(self, msg):
    self.last_activity = time.time()  # 心跳和输入都更新

def check_idle(self):
    if time.time() - self.last_activity > IDLE_TIMEOUT:
        self.close()  # 误断!

# 改造后
def on_message(self, msg):
    if msg['type'] == 'heartbeat':
        self.last_activity = time.time()  # 仅心跳
    elif msg['type'] == 'input':
        self.last_activity = time.time()
        self.last_user_activity = time.time()  # 仅用户输入

def check_idle(self):
    # 空闲检测基于用户输入,而非心跳
    if time.time() - self.last_user_activity > IDLE_TIMEOUT:
        self.close()  # 正确:用户真的没操作

附加修复: 子线程中 current_app 不可用的日志异常

# 修复前(异常)
def background_task():
    current_app.logger.info("task started")  # RuntimeError!

# 修复后
from flask import has_app_context

def background_task(app):
    with app.app_context():
        app.logger.info("task started")  # 正确

3. 告警准确性提升

问题: EIP 变更频繁误报。

根因: ALB/NLB 类型的公网 IP 归属判定缺失,导致 IP 漂移时误判为新增/删除。

解决: 增加负载均衡器类型识别

def classify_eip_owner(self, ip, resources):
    # 判定 EIP 归属,消除误报

    # 检查是否为 ALB/NLB 出口 IP
    for lb in resources.get('load_balancers', []):
        if lb['type'] in ['application', 'network']:
            # ALB/NLB 的 IP 会动态变化,不计入变更
            if ip in lb.get('public_ips', []):
                return 'alb_nlb_dynamic'  # 标记为动态,忽略变更

    # 检查是否为 EC2 绑定
    for ec2 in resources.get('ec2_instances', []):
        if ec2.get('public_ip') == ip:
            return 'ec2_static'

    return 'unknown'

4. 前端交互优化

优化项实现效果
EC2/LB 安全组列可点击跳转安全组详情页减少 2 步操作
列显配置用户自定义显示/隐藏列,持久化到 localStorage个性化体验
Shield 时间显示后端统一 CST 格式化,前端直接渲染消除时区歧义
批量导出支持跨页选择,异步生成下载大数据量不卡顿
快捷筛选保存常用筛选条件为书签重复操作减少 80%

复盘

问题根因: 初期权限设计过于粗放,未考虑运营场景下的细粒度授权需求。

改进措施:

本文首发于 wr.mrchi.cn,转载请注明出处。



Previous Post
Docker 多环境安装指南:CentOS、Ubuntu、AWS 与 Mac 全平台覆盖
Next Post
Blockscout L2 浏览器部署:OP Stack 链上数据可视化