1. 项目概述为什么用 PySpark 直连 Snowflake 做只读分析而不是导出 CSV 或走中间表在数据工程一线干了十多年我经手过上百个数仓对接项目从 Oracle RAC 到 Teradata从 Redshift 到 BigQuery再到最近三年高频出现的 Snowflake。但凡遇到“把 Snowflake 里的订单、用户、行为日志拉到 Spark 做宽表聚合、机器学习特征工程、或跨源比对”的需求团队第一反应往往是——“先用 SnowSQL 导出成 CSV再用 Spark 读 HDFS”或者“写个 Airflow 任务每天凌晨把 Snowflake 表同步到 Hive”。这两种做法我全试过也全踩过坑CSV 导出动辄几十 GB网络传输慢、压缩解压耗 CPU、字段类型丢失比如 TIMESTAMP 变成字符串、NULL 处理不一致而 Hive 同步则带来严重时效性问题——业务方要“看今天上午 10 点的实时漏斗”你却只能给“昨天凌晨同步的快照”。直到我们把 PySpark 和 Snowflake JDBC 驱动真正跑通、调优、压测稳定后才意识到这不是一个“能不能连上”的技术验证而是一次数据链路范式的切换——从“搬运数据”转向“按需计算”。本篇聚焦 Part1 的核心场景PySpark 对 Snowflake 数仓的只读操作。它不涉及写入逻辑、不依赖中间存储、不改造 Snowflake 原有权限模型却能直接复用 Spark 的 DataFrame API、SQL 引擎、Catalyst 优化器甚至无缝接入 MLlib 和 Structured Streaming。关键词PySpark、Snowflake、Data Warehouse、Read Only、JDBC、Scala/Python 互操作、pushdown 优化、query pushdown、partitioning strategy全部落在这个动作里。适合三类人一是刚接手 Snowflake 迁移项目的 Spark 工程师需要快速验证数据可访问性二是 BI 团队想绕过 Tableau/Power BI 的 SQL 层用 PySpark 写复杂窗口函数做自助分析三是 MLOps 工程师要把 Snowflake 中清洗好的特征表直接喂给 Spark ML 模型训练 pipeline。它不是替代 Snowflake 原生 SQL 的方案而是当 SQL 表达力不够比如需要自定义 UDF、迭代计算、图算法或生态耦合度太高比如必须和 Delta Lake、Alluxio 联动时最轻量、最可控的破局点。2. 整体设计与思路拆解为什么选 JDBC 而非 Spark Connector为什么坚持只读为什么必须做 Query Pushdown2.1 方案选型JDBC 驱动 vs Snowflake Spark Connector —— 不是功能多寡的问题而是控制粒度的问题很多人一上来就搜 “snowflake spark connector”官方确实提供了snowflake-jdbc和spark-snowflake_2.12两个包。但我在三个不同规模客户现场实测对比后明确建议生产环境优先使用原生 JDBC 驱动net.snowflake:snowflake-jdbc:3.13.23慎用 Spark Connectornet.snowflake:spark-snowflake_2.12:2.11.0-spark_3.3。理由非常实际版本锁死风险Spark Connector 是绑定 Spark 版本和 Scala 版本的二进制包。比如你用 Spark 3.3.2 Scala 2.12就必须用spark-snowflake_2.12:2.11.0-spark_3.3。一旦 Spark 升级到 3.4Connector 就得等官方发新版而 Snowflake 官方更新节奏明显慢于社区 Spark 迭代速度。我们曾因 Connector 不兼容 Spark 3.3.3 的 Catalyst 优化器导致filter().select()被错误下推为全表扫描查询耗时从 8 秒暴涨到 6 分钟。而 JDBC 驱动是纯 Java 实现只要 JDBC 接口没变JDBC 4.2 规范已稳定多年它就能适配任意 Spark 版本升级零成本。SQL 控制权让渡Spark Connector 封装了一层DataFrameReader.format(snowflake)看似简洁实则隐藏了关键执行路径。它内部会把df.filter(amount 100).count()自动翻译成SELECT COUNT(*) FROM t WHERE amount 100发给 Snowflake。这听起来很美但问题在于它无法控制哪些条件能 pushdown哪些不能。比如你写了df.filter(col(created_at).cast(date) lit(2024-05-01))Connector 会把cast()一起下推而 Snowflake 的 CAST 函数在谓词下推时可能触发隐式转换失败报SQL compilation error: Cannot cast VARCHAR to DATE。而 JDBC 方式你完全掌控 SQL 字符串可以提前TO_DATE(created_at)写死规避所有类型陷阱。连接池与超时管理更透明JDBC 驱动支持标准HikariCP连接池配置sfConnectionPoolSize,sfQueryTimeout你可以精确控制最大连接数、空闲连接存活时间、查询超时秒数。Spark Connector 的连接池是黑盒只暴露sfConnectionPoolSize一个参数且实测发现其连接复用率远低于 HikariCP在高并发小查询场景如每分钟 200 个SELECT COUNT(*)检查任务下连接泄漏率高达 17%。我们最终用 JMX 监控HikariPool-1.active指标把连接池大小从默认 10 改为 30配合connection-timeout30000稳住了 SLA。所以本项目的设计起点就是放弃封装拥抱裸驱。用spark.read.format(jdbc)直接对接 Snowflake JDBC把 SQL 构建、参数绑定、结果映射全部收归己手。这不是倒退而是把“不可控的魔法”变成“可审计的代码”。2.2 只读策略不是能力不足而是安全与性能的双重契约标题明确写着 “Read Only”这不是临时妥协而是经过三次线上事故后定下的铁律。第一次是某同事误写df.write.mode(overwrite).saveAsTable(prod.sales_orders)Spark Connector 把整个分区表清空重写而 Snowflake 的OVERWRITE语义是TRUNCATE INSERT没有事务回滚点4 小时后才发现第二次是权限配置失误JDBC URL 里用了ACCOUNTADMIN角色一个df.write操作顺手删掉了UTIL_DB第三次最隐蔽某 ETL 任务用INSERT INTO往 Snowflake 写中间表但未设置CLUSTER BY导致后续JOIN性能暴跌 10 倍监控告警却只显示 “Spark driver OOM”排查三天才发现根因在 Snowflake 端数据倾斜。因此我们在所有 PySpark 作业启动前强制注入一条校验逻辑# 在 SparkSession 创建后立即执行 def enforce_readonly(spark): # 检查 JDBC URL 是否包含写权限关键词防御性编程 jdbc_url spark.conf.get(spark.sql.adaptive.enabled, ) if any(kw in jdbc_url.lower() for kw in [accountadmin, sysadmin, write, insert, update, delete]): raise PermissionError(JDBC URL contains write-capable role or keyword. ReadOnly mode enforced.) # 动态重写 Spark SQL 解析器拦截 DML 语句仅限 SQL 方式 from pyspark.sql import SparkSession spark.sparkContext.setLocalProperty(spark.sql.adaptive.enabled, false) # 更彻底的做法用 SparkListener 拦截 ExecutionPlan但成本过高此处略只读不仅是权限控制更是性能保障。Snowflake 的虚拟仓库Virtual Warehouse是按秒计费的计算资源。当你执行SELECT * FROM huge_table时Snowflake 会自动分配足够内存完成扫描但如果你在 Spark 端发起df.write它会尝试建立长连接、维持事务上下文、处理两阶段提交这些都会让虚拟仓库持续运行哪怕只是插入一行。我们测算过一个 5 分钟的只读查询平均消耗 0.02 个 credit而同等逻辑的写入操作因连接保持和事务开销credit 消耗飙升至 0.15。对于日均 500 次的分析任务年化成本差额超过 $12,000。所以“只读”是写在架构图上的经济性声明。2.3 Query Pushdown 的底层逻辑为什么filter能变WHERE而withColumn不能下推这是理解整个方案效能的核心。很多人以为 “Spark 读数据库filter 自然就下推”其实不然。Pushdown 的发生取决于三个条件是否同时满足驱动支持 Spark DataSource V2 接口实现 SQL 语法兼容性。JDBC 数据源的 pushdown 是由JDBCRelation.buildScan()方法触发的它会遍历 LogicalPlan 中的Filter、Project、Limit节点并尝试将它们转译成 ANSI SQL 片段。但这个过程充满限制Filter 下推的边界只有基础比较运算符,!,,,,,IN,BETWEEN和布尔逻辑AND,OR,NOT能被安全下推。像upper(name) JOHN这种函数调用JDBC 驱动会拒绝下推因为不同数据库 upper 函数行为不一致Oracle 区分字符集PostgreSQL 默认 C locale。但 Snowflake 的UPPER()是确定性函数且 JDBC 驱动 3.13 版本已显式支持所以你可以放心写df.filter(UPPER(name) JOHN)它会被转成WHERE UPPER(name) JOHN。Project 下推的陷阱select(id, name, amount)能下推为SELECT id, name, amount但select(id, name, col(amount) * 1.1)就不行——乘法运算必须在 Spark Executor 端完成。这里有个关键技巧把计算逻辑前置到 Snowflake SQL 中。比如你要算含税金额不要写df.withColumn(amount_taxed, col(amount) * 1.1)而是直接在 JDBC 查询里写SELECT id, name, amount * 1.1 AS amount_taxed FROM sales。这样计算发生在 Snowflake 的 MPP 引擎上利用其向量化执行优势比 Spark 的 JVM 字节码快 3~5 倍。Partitioning 的物理意义option(partitionColumn, order_date)不是简单切分数据而是告诉 Spark“请生成 N 个并行查询每个查一个日期范围”。比如numPartitions4Spark 会算出min(order_date)2024-01-01,max(order_date)2024-04-01然后生成四条 SQLSELECT * FROM sales WHERE order_date 2024-01-01 AND order_date 2024-02-01 SELECT * FROM sales WHERE order_date 2024-02-01 AND order_date 2024-03-01 ...这要求order_date列在 Snowflake 端必须是DATE或TIMESTAMP_NTZ类型且有统计信息SHOW STATISTICS ON sales可查否则 Spark 无法准确估算范围可能导致数据倾斜。我们曾因order_date是VARCHAR存储20240101格式Spark 用字符串比较算出min20240101,max20240401但实际数据中20240201缺失导致一个分区查 0 行另一个查全量CPU 利用率曲线像心电图。所以Query Pushdown 不是开关而是一套需要精细校准的“翻译规则”。它决定了 90% 的性能上限。3. 核心细节解析与实操要点从驱动下载到 DataFrame 映射每一个参数都关乎成败3.1 环境准备JDBC 驱动、Spark 版本、Python 依赖的黄金组合别跳过这一步。我见过太多团队卡在驱动版本不匹配上浪费两天排查。以下是经过 12 个生产集群验证的“黄金组合”组件推荐版本为什么选它替代方案风险Spark3.3.2 (Scala 2.12)Snowflake JDBC 3.13.x 官方认证最高兼容版Catalyst 优化器对 JDBC pushdown 支持最完善Spark 3.4JDBC 驱动需升到 3.14但 3.14.1 有ResultSetMetaData元数据解析 Bug导致inferSchemaTrue时列类型全判为 StringSnowflake JDBC3.13.23修复了 3.13.20 的TIME类型精度丢失问题纳秒级时间戳被截断为毫秒且对ARRAY/OBJECT类型的 JSON 解析更稳定3.13.18在并发查询时偶发NullPointerException堆栈指向SFBaseResultSet.next()PySpark3.3.2必须与 Spark 二进制版本严格一致否则pyspark.sql.DataFrame类加载失败任何高于 3.3.2 的 PySpark会尝试加载 Spark 3.4 的UnsafeRow类引发NoClassDefFoundErrorPython3.9.18兼容cryptography38.0.4JDBC 驱动依赖且无asyncio事件循环冲突Python 3.11cryptography41 要求 Rust 编译器Docker 构建失败率陡增安装命令必须按顺序执行顺序错会导致依赖冲突# 1. 先装 cryptography避免 pip 自动降级 pip install cryptography38.0.4 # 2. 再装 pyspark指定 exact version pip install pyspark3.3.2 # 3. 最后手动下载 JDBC 驱动不要用 pip install snowflake-jdbc wget https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/3.13.23/snowflake-jdbc-3.13.23.jar提示JDBC 驱动 jar 包必须放在 Spark 的jars/目录下如$SPARK_HOME/jars/或通过--jars参数传入。如果只放 Python 环境里Spark Driver 和 Executor 都找不到类报java.lang.ClassNotFoundException: net.snowflake.client.jdbc.SnowflakeDriver。这是新手最高频错误占所有连接失败案例的 68%。3.2 JDBC URL 构建账户名、区域、角色一个都不能错Snowflake 的 JDBC URL 格式是jdbc:snowflake://account_identifier.snowflakecomputing.com/?params其中account_identifier是最易错的部分。它不是控制台左上角显示的 “Account” 名字而是完整的账户定位符格式为org_name-account_name或account_name。比如你的 Snowflake 控制台显示 Account Name 是myprodOrganization 是acme-corp那么真实 identifier 是acme-corp-myprod。如果填错错误不是 “Connection refused”而是模糊的net.snowflake.client.jdbc.SnowflakeSQLException: JDBC driver encountered communication error. Message: IOException: Connection reset。我们写了个校验函数import re def validate_account_identifier(identifier: str) - bool: # Snowflake account identifier must be lowercase, alphanumeric, hyphen, max 255 chars pattern r^[a-z0-9\-]{3,255}$ if not re.match(pattern, identifier): return False # Must contain at least one hyphen if org-based, or be pure alphanumeric for legacy accounts if - not in identifier and len(identifier) 10: # legacy short names like ab12345 return True return - in identifier and identifier.count(-) 1 # 使用示例 assert validate_account_identifier(acme-corp-myprod) True assert validate_account_identifier(AB12345) False # uppercase disallowedURL 参数部分必须包含以下 5 个核心参数少一个都可能失败参数值示例必填作用说明useretl_reader✓Snowflake 用户名非邮箱。必须提前创建CREATE USER etl_reader PASSWORD xxx;passwordxxx✓明文密码生产环境必须用密钥管理服务注入见 3.4 节authenticatorSNOWFLAKE✓指定认证方式。SNOWFLAKE用户名密码EXTERNALBROWSERSSOKEYPAIR密钥对。只读场景强烈推荐KEYPAIR避免密码轮换导致作业中断roleANALYST_READ_ONLY✓角色决定数据访问权限。必须提前授权GRANT USAGE ON WAREHOUSE compute_wh TO ROLE ANALYST_READ_ONLY; GRANT SELECT ON ALL TABLES IN SCHEMA raw.sales TO ROLE ANALYST_READ_ONLY;warehousecompute_wh✓虚拟仓库名。必须存在且处于STARTED状态否则报SQL compilation error: Warehouse compute_wh does not exist or not authorized.完整 URL 示例已脱敏jdbc:snowflake://acme-corp-myprod.snowflakecomputing.com/ ?useretl_reader passwordxxx authenticatorSNOWFLAKE roleANALYST_READ_ONLY warehousecompute_wh dbPROD schemaRAW_SALES applicationPYSPARK_ETL_V1注意db和schema参数是可选的但强烈建议显式指定。如果不写Spark 会连接到用户默认 database/schema而该默认值可能被管理员修改导致某天所有作业突然查不到表。application参数用于 Snowflake 管理控制台识别流量来源方便审计。3.3 DataFrame 读取从read.jdbc()到createOrReplaceTempView()的七步精调下面这段代码是我在线上稳定运行 18 个月的模板每一行都有其不可删除的理由from pyspark.sql import SparkSession from pyspark.sql.functions import col, lit from pyspark.sql.types import * # 1. 创建 SparkSession必须指定 hadoop 配置否则 S3 读写失败 spark SparkSession.builder \ .appName(snowflake-read-demo) \ .config(spark.sql.adaptive.enabled, true) \ .config(spark.sql.adaptive.coalescePartitions.enabled, true) \ .config(spark.hadoop.fs.s3a.impl, org.apache.hadoop.fs.s3a.S3AFileSystem) \ .getOrCreate() # 2. JDBC 选项字典所有 key 必须小写Snowflake 驱动严格区分大小写 options { url: jdbc:snowflake://acme-corp-myprod.snowflakecomputing.com/, dbtable: (SELECT id, user_id, amount, TO_DATE(order_ts) as order_date FROM raw_sales.orders WHERE order_ts 2024-01-01) as t, # 关键子查询包装确保 pushdown user: etl_reader, password: xxx, authenticator: SNOWFLAKE, role: ANALYST_READ_ONLY, warehouse: compute_wh, driver: net.snowflake.client.jdbc.SnowflakeDriver, truncate: false, # 防止意外截断 batchsize: 10000, # 每次 fetch 10k 行平衡内存与网络 fetchsize: 10000, # 同上JDBC 标准参数 } # 3. 显式定义 schema绝对不要用 inferSchemaTrue # 原因Snowflake 的 VARIANT 类型 infer 为 StringJSON 字段无法解析TIMESTAMP_LTZ infer 为 TimestampType 但时区丢失 custom_schema StructType([ StructField(id, LongType(), False), StructField(user_id, StringType(), True), StructField(amount, DecimalType(18, 2), True), StructField(order_date, DateType(), True), ]) # 4. 执行读取注意schema 必须传入不能链式调用 df spark.read \ .format(jdbc) \ .options(**options) \ .schema(custom_schema) \ .load() # 5. 强制触发 action验证数据可访问避免 lazy evaluation 隐藏错误 df.count() # 这里会真正发起 JDBC 查询 # 6. 添加处理时间戳只读场景的黄金实践 df_with_ts df.withColumn(processed_at, lit(spark.sparkContext._gateway.jvm.java.time.Instant.now())) # 7. 注册为临时视图供 SQL 分析 df_with_ts.createOrReplaceTempView(vw_snowflake_orders)逐行解释关键点Step 1hadoop.fs.s3a.impl即使你只读 SnowflakeSpark 内部仍可能用 S3 作为 shuffle storage尤其在repartition()时。不配置此参数df.repartition(100).count()会报ClassNotFoundException: org.apache.hadoop.fs.s3a.S3AFileSystem。这是 Spark 3.3 的默认行为变更。Step 2dbtable用子查询包装这是保证WHERE条件下推的终极保险。如果直接写dbtable: raw_sales.ordersSpark 可能生成SELECT * FROM raw_sales.orders再在 Spark 端 filter导致全表扫描。而(SELECT ... FROM ... WHERE ...) as t强制 Snowflake 先执行过滤再返回结果集。Step 3schema显式定义inferSchemaTrue在 Snowflake 场景下是毒药。Snowflake 的VARIANTJSON列会被 infer 为StringType后续get_json_object(col(meta), $.device)就失效TIMESTAMP_TZ列 infer 后丢失时区信息date_format(col(ts), yyyy-MM-dd HH:mm:ss)输出全是 UTC 时间。我们维护了一个snowflake_type_mapping.json文件把 Snowflake SQL TYPE 映射到 Spark DataType自动生成 schema。Step 4schema必须.load()前传入.schema()是 transformation.load()才是 action。如果写成spark.read.format(...).load().schema(custom_schema)schema 不生效因为 load 已经执行完毕。Step 5df.count()强制触发Spark 的 lazy evaluation 让错误延迟暴露。比如 JDBC URL 密码错误df spark.read...这行不报错直到df.show()或df.count()才抛异常。提前触发让失败发生在 pipeline 早期。Step 6processed_at时间戳只读不等于无状态。添加处理时间是构建可重入replayablepipeline 的基础。当某天 Snowflake 数据被修正你可以用WHERE processed_at 2024-05-01精确重跑增量。Step 7createOrReplaceTempView这是打通 Spark SQL 生态的关键。注册后你可以写spark.sql(SELECT user_id, SUM(amount) FROM vw_snowflake_orders GROUP BY user_id)享受 Spark SQL 的全部优化能力包括 AQEAdaptive Query Execution自动合并小文件、动态调整 join 策略。3.4 安全加固密钥管理、角色最小化、连接池监控生产环境绝不能明文写密码。我们采用三级防护第一级密钥注入Kubernetes Secrets在 K8s Job YAML 中用envFrom注入 secretsenvFrom: - secretRef: name: snowflake-creds # secret snowflake-creds 包含 key: SNOWFLAKE_PASSWORDPySpark 代码中读取import os password os.getenv(SNOWFLAKE_PASSWORD) if not password: raise EnvironmentError(SNOWFLAKE_PASSWORD not found in environment) options[password] password第二级角色最小化Principle of Least PrivilegeANALYST_READ_ONLY角色的授权脚本必须严格限定-- 1. 只给必要 warehouse usage GRANT USAGE ON WAREHOUSE compute_wh TO ROLE ANALYST_READ_ONLY; -- 2. 只给特定 database 的 usage不是 ownership GRANT USAGE ON DATABASE PROD TO ROLE ANALYST_READ_ONLY; -- 3. 只给特定 schema 的 usage GRANT USAGE ON SCHEMA PROD.RAW_SALES TO ROLE ANALYST_READ_ONLY; -- 4. 只给 SELECT 权限且只针对具体表不用 ALL TABLES防新增表自动授权 GRANT SELECT ON TABLE PROD.RAW_SALES.ORDERS TO ROLE ANALYST_READ_ONLY; GRANT SELECT ON TABLE PROD.RAW_SALES.USERS TO ROLE ANALYST_READ_ONLY; -- 5. 禁用所有危险权限 REVOKE CREATE TABLE ON SCHEMA PROD.RAW_SALES FROM ROLE ANALYST_READ_ONLY; REVOKE MONITOR ON WAREHOUSE compute_wh FROM ROLE ANALYST_READ_ONLY; -- 防止查看 query history第三级连接池监控Prometheus Grafana用 Spark 的MetricsSystem暴露连接池指标# 在 SparkSession 创建后 spark.sparkContext.setLocalProperty(spark.metrics.namespace, snowflake_jdbc) # 配置 metrics.properties 文件启用 jvm、executor、driver 的 JMX sink关键监控指标指标名告警阈值含义排查方向jvm.pools.Metaspace.used 80%Metaspace 内存溢出JDBC 驱动类加载泄漏重启 Driverdriver.JVMThreads.ThreadCount 200线程数爆炸连接池未 close或大量短生命周期 SparkSessionjvm.gc.PS-MarkSweep.time 5000msFull GC 频繁结果集过大调小fetchsize或加WHERE过滤我们用 Grafana 看板实时盯这三个指标一旦越线自动触发 PagerDuty 告警。这套机制上线后JDBC 连接相关故障平均恢复时间MTTR从 47 分钟降至 3.2 分钟。4. 实操过程与核心环节实现从单表读取到跨库 JOIN真实代码与性能对比4.1 单表高效读取10 亿行订单表如何 90 秒内完成扫描场景PROD.RAW_SALES.ORDERS表12 个字段10.2 亿行按ORDER_TS分区Snowflake 自动微分区总存储 1.8 TB。目标读取 2024 年 4 月全量数据约 8500 万行用于 Spark ML 特征计算。错误做法我们踩过的坑# ❌ 错误全表扫描 Spark 端 filter df spark.read \ .format(jdbc) \ .option(url, url) \ .option(dbtable, PROD.RAW_SALES.ORDERS) \ .load() \ .filter(TO_DATE(ORDER_TS) 2024-04-01 AND TO_DATE(ORDER_TS) 2024-04-30) # 结果Spark Driver 内存 OOMExecutor 报 java.lang.OutOfMemoryError: Java heap space # 原因全表 1.8TB 数据试图加载到 Spark网络和内存双崩溃正确做法七步优化# ✅ 正确全链路 pushdown 分区裁剪 类型优化 from pyspark.sql.types import * # 1. 构建精准 SQLSnowflake 端完成所有过滤和投影 sql_query (SELECT ORDER_ID::BIGINT as order_id, USER_ID::STRING as user_id, AMOUNT::DECIMAL(18,2) as amount, TO_DATE(ORDER_TS) as order_date, CHANNEL::STRING as channel, STATUS::STRING as status FROM PROD.RAW_SALES.ORDERS WHERE ORDER_TS 2024-04-01 00:00:00 AND ORDER_TS 2024-05-01 00:00:00 AND STATUS IN (shipped, delivered) ) as orders_sub # 2. 显式 schema避免 infer 的精度丢失 schema StructType([ StructField(order_id, LongType(), False), StructField(user_id, StringType(), True), StructField(amount, DecimalType(18, 2), True), StructField(order_date, DateType(), True), StructField(channel, StringType(), True), StructField(status, StringType(), True), ]) # 3. JDBC options关键partitionColumn numPartitions options { url: url, dbtable: sql_query, user: etl_reader, password: pwd, authenticator: KEYPAIR, # 改用密钥对更安全 role: ANALYST_READ_ONLY, warehouse: compute_wh, driver: net.snowflake.client.jdbc.SnowflakeDriver, fetchsize: 50000, # 提高单次 fetch 行数 batchsize: 50000, partitionColumn: ORDER_ID, # 用主键分区保证均匀 lowerBound: 1000000000000, # 2024-04 最小 order_id upperBound: 1999999999999, # 2024-04 最大 order_id numPartitions: 32, # 32 个并发查询 } # 4. 执行读取 df spark.read \ .format(jdbc) \ .options(**options) \ .schema(schema) \ .load() # 5. 验证数据量触发 action count df.count() # 实测85,234,192 行耗时 87.3 秒 # 6. 添加业务时间戳 df_final df.withColumn(etl_batch_id, lit(202404_batch)) \ .withColumn(processed_at, current_timestamp()) # 7. 写入 Delta Lake非 Snowflake体现只读价值 df_final.write \ .mode(overwrite) \ .option(mergeSchema, true) \ .save(s3a://my-bucket/delta/orders_202404/)性能对比同一集群相同数据方案耗时Snowflake Credit 消耗Spark Executor CPU 利用率备注全表扫描 Spark filter失败OOM-100% 持续 5 分钟不可行子查询 WHERE pushdown无分区214 秒0.38 credits45% 峰值单连接瓶颈在 Snowflake 网络带宽子查询 partitionColumnORDER_ID 32 partitions87.3 秒0.21 credits78% 均值并发 32充分利用 Snowflake MPP先用 SnowSQL 导出 CSV 到 S3再 Spark 读312 秒0.05 credits导出 0.12Spark65%额外 120 秒网络传输 解压实操心得partitionColumn选ORDER_IDBIGINT 主键而非ORDER_TSTIMESTAMP是因为ORDER_ID分布绝对均匀而ORDER_TS在促销日如双11会出现严重热点导致一个分区查 2000 万行其他 31 个分区各查 2