Flink 实时计算 x SLS 存储下推:阿里云 OpenAPI 网关监控平台实践
作者潘伟龙阿里云可观测、阮孝振阿里云开放平台背景与挑战业务背景阿里云开放平台OpenAPI是开发者管理云上资源的标准入口。开放平台承载了几乎所有云产品的对外接口承载客户自动化运维与云资源管控的核心诉求。随着企业对自动化的依赖日益加深OpenAPI 的稳定性建设变得至关重要。监控体系的需求方包括开放平台运维团队负责网关整体可用性需要全局视角的监控告警。各云产品团队ECS/RDS/SLB 等需要查看自己产品的 API 调用指标和大盘并配置细粒度告警。SRE 团队需要快速定位故障进行根因分析。任何接口的波动都可能影响客户的生产业务因此必须建立全方位的指标监控体系并配套及时的告警能力以确保高可用性。核心挑战构建监控体系的核心数据源是API 网关的访问日志。这些日志由分布式部署在各地域的网关节点产生具有以下鲜明特点技术方案针对上述挑战我们采用Flink SLS日志服务的云原生组合来构建实时监控体系。技术选型该方案的核心组件及选型理由如下这套组合的核心优势全托管运维SLS 和 Flink基于阿里云实时计算均为全托管服务无需运维基础设施。弹性扩展消费吞吐和计算资源可按需扩缩。端到端保障从采集到告警的全链路可观测。整体架构整个数据处理链路采用地域化部署 中心化汇聚的架构设计各地域内独立完成日志采集与聚合计算实现就近处理、降低延迟最终将轻量化指标跨域汇聚至中心 MetricStore实现全局统一监控。地域内处理Regional Processing每个地域独立部署完整的数据处理链路实现就近计算、降低延迟数据采集由Logtail实时采集本地域网关节点日志。Logtail 是阿里云自研的高性能日志采集 Agent具备毫秒级延迟和百万级 EPS 吞吐能力确保海量日志的可靠传输。日志存储各地域的SLS Logstore存储本地域 OpenAPI 的原始访问日志支持对请求明细的实时查询与分析同时作为 Flink 流计算的数据源。流式聚合计算各地域独立部署Flink Job 1聚合作业关联 MySQL 维表存储网关机器的集群信息、API 业务域如 ECS 等元数据进行局部维度的业务指标汇总。地域内处理可大幅减少跨域传输的数据量。跨地域汇聚Cross-Region Aggregation多个地域的聚合结果统一写入中心化的 MetricStore实现全局视图指标汇聚各地域独立部署的Flink Job 2指标转换将聚合结果转为时序指标格式统一跨域推送到中心地域的 SLS MetricStore。通过汇聚设计运维团队可在单一视图查看全球所有地域的指标。可视化与告警基于Grafana对接中心化的 SLS MetricStore通过标准 PromQL 实现多维度的指标大盘展示并配置细粒度告警规则实现从异常发现到通知的闭环。分层设计理念为什么要分两层核心考虑是平衡实时性与资源效率为什么不直接一层聚合数据倾斜Data SkewOpenAPI 流量分布极不均匀某些大产品如 ECS的 QPS 是其他产品的数千倍。如果直接按 Product 进行 GroupBy会导致特定 Flink Task 出现严重的数据倾斜和状态膨胀。资源效率通过第一层单机维度的局部聚合输出到下游的数据量可降低 90% 以上大幅减轻了全局聚合作业的计算和存储开销。指标体系设计我们需要生成的指标体系由Metric Name指标名称和Labels标签组成覆盖以下四个核心维度指标命名规范Prefix_MetricName。例如 ECS 产品的 QPS 指标名为 namespace_product_gw_http_req。核心作业实践Job 1聚合作业设计意图消费原始日志关联 MySQL 维表补充元数据网关集群信息、API 业务域等并进行多阶段聚合先进行细粒度的多维聚合按产品/API/租户等以减少下游数据量再进行全局指标汇总。1. 数据源网关原始日志原始日志由 Logtail 从网关节点采集。以下是一条典型的网关访问日志已脱敏{ AK: STS.NZD***Lgwc, Api: DescribeCustomResourceDetail, CallerUid: 109837***3503, ClientIp: 192.168.xx.xx, Domain: acc-vpc.cn-huhehaote.aliyuncs.com, ErrorCode: ResourceNotFound, Ext5: {\logRegionId\:\cn-huhehaote\,\appGroup\:\pop-region-cn-huhehaote\,\callerInfo\:{...},\headers\:{...}}, HttpCode: 404, LocalIp: 11.197.xxx.xxx, Product: acc, RegionId: cn-huhehaote, RequestContent: RegionIdcn-huhehaote;ActionDescribeCustomResourceDetail;Version2024-04-02;..., TotalUsedTime: 14, Version: 2024-04-02, __time__: 1768484243 }字段说明Ext5 包含嵌套的 JSON 结构调用者信息、请求头等RequestContent 是 KV 格式的请求参数。这些复杂结构需要在后续处理中解析。基于上述日志结构我们定义 Flink Source 表CREATE TABLE openapi_log_source ( __time__ BIGINT, LocalIp STRING, -- 网关节点 IP Product STRING, -- 产品 Code (如 Ecs, RDS) Api STRING, -- API 名称 (如 DescribeInstances) Version STRING, -- API 版本 Domain STRING, -- 请求域名 AK STRING, -- AccessKey CallerUid STRING, -- 调用者 UID HttpCode STRING, -- HTTP 状态码 ErrorCode STRING, -- 网关错误码 TotalUsedTime BIGINT, -- 请求耗时 (ms) ClientIp STRING, -- 客户端 IP RegionId STRING, -- 地域 Ext5 STRING, -- 扩展字段 (嵌套 JSON) RequestContent STRING, -- 请求参数 (KV 格式) ts AS TO_TIMESTAMP_LTZ(__time__ * 1000, 3), WATERMARK FOR ts AS ts - INTERVAL 5 SECOND ) WITH ( connector sls, project *****, logstore pop_rpc_trace_log, endpoint cn-shanghai-intranet.log.aliyuncs.com );Watermark 策略说明这里设置 ts - INTERVAL ‘5’ SECOND 表示允许最多 5 秒的乱序数据。该值需根据实际业务场景权衡生产环境中网关日志通过 Logtail 采集端到端延迟通常在 2-3 秒内5 秒的 Watermark 延迟可以覆盖绝大多数场景。对于跨地域同步场景可适当放宽至 10-15 秒。2. MySQL 维表元数据富化为了满足指标的标签需求如app_groupgc_level需要关联维表-- 网关集群信息 (关联 LocalIp) CREATE TABLE gateway_cluster_dim ( local_ip STRING, app_group STRING, -- 集群名称 region_id STRING, -- 物理 Region PRIMARY KEY (local_ip) NOT ENFORCED ) WITH (connector jdbc, ...); -- 租户等级信息 (关联 Uid) CREATE TABLE user_level_dim ( uid STRING, gc_level STRING, -- 客户等级 (GC5/GC6/GC7) PRIMARY KEY (uid) NOT ENFORCED ) WITH ( connector jdbc, url jdbc:mysql://xxx:3306/dim_db, table-name user_level, lookup.cache.max-rows 50000, -- 缓存最大行数 lookup.cache.ttl 10min, -- 缓存过期时间 lookup.max-retries 3 -- 查询失败重试次数 );维表缓存策略选择生产环境中gateway_cluster_dim 采用 ALL 策略启动时全量加载并定时刷新user_level_dim 采用 LRU 策略缓存 5 万条热点租户数据TTL 设为 10 分钟以平衡命中率和数据新鲜度。3. Job 1 输出写入本地域聚合日志计算结果写入本地域的 SLS Logstoremachine_agg_log作为中间存储。-- 定义本地聚合日志存储 CREATE TABLE machine_agg_log_sink ( window_start TIMESTAMP(3), product STRING, api STRING, version STRING, caller_uid STRING, region_id STRING, app_group STRING, gc_level STRING, http_code STRING, error_code STRING, qps BIGINT, rt_mean DOUBLE, slow1s_count BIGINT, http_2xx BIGINT, http_5xx BIGINT, http_503 BIGINT ) WITH ( connector sls, project ****, logstore machine_agg_log, -- 地域化部署写入本地域 Logstore endpoint cn-shanghai-intranet.log.aliyuncs.com -- 实际部署时替换为各地域 Endpoint ); -- 执行写入 INSERT INTO machine_agg_log_sink SELECT TUMBLE_START(l.ts, INTERVAL 10 SECOND), l.Product, l.Api, l.Version, l.CallerUid, g.region_id, g.app_group, u.gc_level, l.HttpCode, l.ErrorCode, COUNT(*) as qps, AVG(CAST(l.TotalUsedTime AS DOUBLE)), SUM(CASE WHEN l.TotalUsedTime 1000 THEN 1 ELSE 0 END), SUM(CASE WHEN l.HttpCode 200 AND l.HttpCode 300 THEN 1 ELSE 0 END), SUM(CASE WHEN l.HttpCode 500 THEN 1 ELSE 0 END), SUM(CASE WHEN l.HttpCode 503 THEN 1 ELSE 0 END) FROM openapi_log_source l LEFT JOIN gateway_cluster_dim FOR SYSTEM_TIME AS OF l.ts AS g ON l.LocalIp g.local_ip LEFT JOIN user_level_dim FOR SYSTEM_TIME AS OF l.ts AS u ON l.CallerUid u.uid GROUP BY TUMBLE(l.ts, INTERVAL 10 SECOND), l.Product, l.Api, l.Version, l.CallerUid, g.region_id, g.app_group, u.gc_level, l.HttpCode, l.ErrorCode;Job 2指标转换与跨域汇聚设计意图各地域独立部署 Job 2消费本地域的聚合日志machine_agg_log将数据转换为时序格式并跨域写入中心地域cn-shanghai的 MetricStore。1. 数据源消费本地聚合日志CREATE TABLE machine_agg_log_source ( window_start TIMESTAMP(3), product STRING, region_id STRING, -- ... 其他字段同 Sink 定义 WATERMARK FOR window_start AS window_start - INTERVAL 5 SECOND ) WITH ( connector sls, project ****, logstore machine_agg_log, -- 消费本地域 Logstore endpoint cn-shanghai-intranet.log.aliyuncs.com );2. 目标汇聚中心化 MetricStore SinkCREATE TABLE metricstore_sink ( __time_nano__ BIGINT, __name__ STRING, __labels__ STRING, __value__ DOUBLE ) WITH ( connector sls, project ****, -- 中心化 Project logstore openapi_metrics, -- 中心化 MetricStore endpoint cn-shanghai-intranet.log.aliyuncs.com -- 统一指向中心地域 Endpoint );3. 计算与汇聚逻辑Job 2 将聚合日志进一步汇总如按 Product 维度并格式化为 Metric 写入中心。示例计算产品维度 QPS 并汇聚INSERT INTO metricstore_sink SELECT UNIX_TIMESTAMP(CAST(TUMBLE_START(window_start, INTERVAL 1 MINUTE) AS STRING)) * 1000000000, namespace_product_gw_http_req, CONCAT(product, product, |region_id, region_id), -- 保留地域标签实现全球视角 CAST(SUM(qps) AS DOUBLE) FROM machine_agg_log_source GROUP BY TUMBLE(window_start, INTERVAL 1 MINUTE), product, region_id;架构优势带宽节省Job 1 将海量明细日志聚合为少量统计数据数据量减少 99%Job 2 仅跨域传输这些轻量级指标极大降低了专线带宽成本。隔离性各地域计算独立单地域故障不影响其他地域及中心监控的写入。作业配置与调优为了保障作业的稳定性和数据准确性生产环境中我们对 Checkpoint 和状态后端进行了专项调优。Checkpoint 配置与权衡提供了两种配置策略需根据业务对数据一致性与服务可用性的偏好进行选择策略 A标准与一致性优先推荐通用场景适用于绝大多数对数据准确性有要求的监控场景。SET execution.checkpointing.interval 60s; -- 1分钟 Checkpoint SET execution.checkpointing.mode EXACTLY_ONCE; -- 精确一次语义 SET execution.checkpointing.timeout 10min;策略 B高可用优化配置本案例生产实践在 OpenAPI 网关这种超高并发且对可用性极度敏感的场景下为了避免 Checkpoint 过于频繁导致的性能抖动同时又不希望完全牺牲数据可靠性我们采取了“弱一致性 低频打点 允许失败”的组合策略SET execution.checkpointing.interval 180s; -- 延长至 3 分钟减少频率 SET execution.checkpointing.mode AT_LEAST_ONCE; -- 降低 Barrier 对齐开销 SET execution.checkpointing.timeout 15min; -- 放宽超时时间 SET execution.checkpointing.max-concurrent-checkpoints 1; SET execution.checkpointing.tolerable-failed-checkpoints 10; -- 允许连续失败不因 CP 失败重启作业优化思路状态后端选择阿里云实时计算 Flink 版提供了企业级的 GeminiStateBackend相比开源 RocksDB它在存算分离架构下针对大状态场景进行了深度优化。针对本案例中“状态大GB 级、聚合 Key 多”的特点我们开启了KV 分离功能SET table.exec.state.backend gemini; -- 使用企业级 GeminiStateBackend SET state.backend.gemini.kv.separate.mode GLOBAL_ENABLE; -- 开启 KV 分离GeminiStateBackend 核心优势对比生产建议对于网关日志聚合这种 State Size 较大且吞吐要求极高的场景强烈推荐使用 GeminiStateBackend KV 分离。实测开启后作业在流量高峰期的 CPU 使用率下降了 20%且 Checkpoint 耗时更加稳定。可视化与告警监控效果展示通过两个 Flink 作业的聚合我们在 Grafana 中构建了多维度的 OpenAPI 监控大盘实现了从产品全局视图到具体错误码的深度下钻。自助查询与告警在 Grafana 中添加 SLS MetricStore 作为数据源后各云产品团队可以使用 PromQL 语法自助查询指标并配置告警规则常用查询示例# QPS 趋势 sum(namespace_product_gw_http_req) by (product) # 错误率环比当前 1 分钟与 1 小时前对比 ( sum(rate(namespace_product_gw_http_5xx[1m])) / sum(rate(namespace_product_gw_http_req[1m])) ) / ( sum(rate(namespace_product_gw_http_5xx[1m] offset 1h)) / sum(rate(namespace_product_gw_http_req[1m] offset 1h)) ) 2 # 平均延迟趋势 avg(namespace_product_gw_rt_mean) by (product)告警规则示例- alert: HighErrorRate expr: sum(namespace_product_gw_http_5xx) by (product) / sum(namespace_product_gw_http_req) by (product) 0.01 for: 2m labels: severity: warning annotations: summary: {{ $labels.product }} 错误率过高 description: 当前错误率: {{ $value | printf \%.2f\ }}%各云产品团队可以在 Grafana 中配置自己产品的监控大盘和告警规则实现自主运维。规模化验证该方案已在阿里云开放平台稳定运行以下是生产环境的核心指标上图展示了系统在生产环境中的核心运行指标。得益于 Flink 的分布式计算能力和 SLS 的高吞吐存储该方案成功支撑了阿里云开放平台全量 API 调用的实时监控覆盖 60 个全球地域、300 个云产品日均处理 200TB 压缩日志原始日志约 2PB单条日志约 4-5KB生成 50 万 时序指标。数据处理规模指标生成能力系统稳定性业务价值故障发现提速故障发现时间从分钟级缩短到秒级。运维效率提升300 云产品团队实现自助监控配置。在方案落地过程中我们发现原始日志包含大量冗余字段和嵌套结构而指标计算只需其中的核心字段。为此我们引入了Source 端谓词下推技术在数据进入 Flink 之前完成字段裁剪有效降低了网络传输量并加速了 Flink 计算。进阶优化Source 端谓词下推Predicate Pushdown谓词下推概念与 Connector 能力对比谓词下推Predicate Pushdown是数据库和大数据领域的经典优化策略核心思想是将过滤条件下推到数据源端执行减少数据传输量和计算开销。Flink 的下推能力取决于 Source Connector 的实现SLS 消费处理器一种 Source 端下推实现早期版本的 Flink SLS Connector 会默认全量拉取 SLS Logstore 的数据但实际上很多字段是不需要的。借助SLS 消费处理器我们实现了真正的 Source 端谓词下推——过滤和转换逻辑在 SLS 服务端执行Flink 只接收处理后的结果。技术优势SIMD 向量化引擎SPL 底层采用向量化执行引擎利用 CPU SIMD 指令集如 AVX2/AVX-512批量处理数据相比逐行处理性能提升数倍。同机房本地计算数据处理在 SLS 存储节点本地完成无需跨网络传输原始数据避免了网络 I/O 成为瓶颈。列式存储加速SLS 底层采用列式存储配合project列裁剪只读取必要的列数据大幅减少磁盘 I/O。零拷贝传输处理后的数据直接进入消费通道减少内存拷贝开销。计费提示普通消费按传输的压缩数据量计费。使用SPL消费处理器按扫描的原始未压缩数据量计费。详细定价及区别请参考 SLS 产品定价https://www.aliyun.com/price/product#/sls/detail。SPL 配置示例基于前文介绍的网关日志结构我们通过 SPL 消费处理器实现 Source 端过滤。对比传统的 Flink 侧过滤-- 传统方式Flink 侧过滤需拉取全量数据 SELECT * FROM openapi_log_source WHERE Domain ! popwarmup.aliyuncs.com AND JSON_VALUE(Ext5, $.logRegionId) NOT IN (cn-shanghai, cn-beijing)使用 SPL 消费处理器后过滤和转换在 SLS 服务端完成-- 1. 行过滤排除无效数据 * | where Domain ! popwarmup.aliyuncs.com -- 2. 嵌套 JSON 分层展开只对有效数据执行 | parse-json -prefixext5_ Ext5 | where ext5_logRegionId not in (cn-shanghai, cn-beijing, cn-hangzhou) | parse-json -prefixcallerInfo_ ext5_callerInfo | parse-json -prefixheaders_ ext5_headers -- 3. 正则提取 KV 格式字段 | parse-regexp RequestContent, [;]RegionId([^;]*) as request_regionId -- 4. 列裁剪只保留必要字段放在最后减少输出数据量 | project LocalIp, Product, Version, Api, Domain, ErrorCode, HttpCode, TotalUsedTime, AK, RegionId, ClientIp, callerInfo_callerType, callerInfo_callerUid, callerInfo_ownerId, ext5_regionId, ext5_appGroup, ext5_stage, request_regionId在 Flink SLS Source 中引用在 Flink SQL 中通过 processor 参数引用预先配置好的消费处理器CREATE TABLE openapi_log_source ( __time__ BIGINT, -- SPL 处理后的字段已展开嵌套 JSON、已裁剪冗余列 LocalIp STRING, Product STRING, Version STRING, Api STRING, Domain STRING, ErrorCode STRING, HttpCode STRING, TotalUsedTime BIGINT, AK STRING, RegionId STRING, ClientIp STRING, callerInfo_callerType STRING, -- 从 Ext5.callerInfo 展开 callerInfo_callerUid STRING, callerInfo_ownerId STRING, ext5_regionId STRING, -- 从 Ext5 展开 ext5_appGroup STRING, ext5_stage STRING, request_regionId STRING, -- 从 RequestContent 正则提取 ts AS TO_TIMESTAMP_LTZ(__time__ * 1000, 3), WATERMARK FOR ts AS ts - INTERVAL 5 SECOND ) WITH ( connector sls, project ****, logstore pop_rpc_trace_log, endpoint cn-shanghai-intranet.log.aliyuncs.com, processor openapi-processor -- 引用消费处理器实现过滤下推 );优化效果通过 SPL 源头预处理我们在以下几个维度取得了显著提升总结通过 Flink SLS 的云原生组合我们成功构建了阿里云 OpenAPI 网关的实时监控体系Flink 核心技术要点架构设计启示分层聚合缓解数据倾斜流量分布不均时先按物理节点局部聚合再按业务维度全局汇总。谓词下推降低成本将过滤逻辑下推到 Source 端如 SLS 消费处理器减少网络传输和计算资源消耗。选择企业级状态后端大状态场景选用 GeminiStateBackend KV 分离显著提升 I/O 效率与作业稳定性。本案例的技术方案可推广至微服务调用链监控、CDN 日志分析、物联网数据聚合等类似场景。开发者资源Flink SLS Connector 文档https://help.aliyun.com/zh/flink/realtime-flink/developer-reference/log-service-connectorSLS MetricStorehttps://help.aliyun.com/zh/sls/manage-a-metricstoreGrafana SLS 集成https://help.aliyun.com/zh/sls/send-time-series-data-from-log-service-to-grafanaSLS SPL 语法文档https://help.aliyun.com/zh/sls/spl-overview