一、数据仓库分层概述数据仓库分层是数据架构的核心设计合理的分层能降低复杂度逐层处理减少依赖提高复用性中间层可供多个应用使用便于数据溯源问题定位更简单隔离变化上游变化不影响下游二、分层架构1. 经典四层架构┌─────────────────────────────────────────────────┐ │ 数据源层ODS │ │ Operational Data Store - 原始数据层 │ └─────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────┐ │ 数据明细层DWD │ │ Data Warehouse Detail - 明细事实表 │ └─────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────┐ │ 数据汇总层DWS │ │ Data Warehouse Summary - 汇总宽表 │ └─────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────┐ │ 数据应用层ADS │ │ Application Data Store - 应用数据层 │ └─────────────────────────────────────────────────┘2. 各层职责层级全称职责特点ODS操作数据存储原始数据保留历史数据原样不做转换DWD数据明细层清洗、标准化、一致性原子指标明细事实DWS数据汇总层轻度汇总主题宽表通用汇总应用复用ADS数据应用层业务定制报表数据最终结果面向应用三、ODS层原始数据层1. 特点数据原样存储不做任何清洗转换保留历史变更如拉链表、CDC数据多数据源整合MySQL、Kafka、埋点日志等数据可回溯出现问题可重新计算2. 建表语句-- ODS层订单表CREATETABLEods_order(idBIGINTCOMMENT订单ID,order_no STRINGCOMMENT订单号,user_idBIGINTCOMMENT用户ID,shop_idBIGINTCOMMENT商家ID,order_statusINTCOMMENT订单状态,pay_amountDECIMAL(12,2)COMMENT支付金额,order_timeTIMESTAMPCOMMENT下单时间,pay_timeTIMESTAMPCOMMENT支付时间,source_type STRINGCOMMENT来源类型,_source_table STRINGCOMMENT源表名,_load_timeTIMESTAMPCOMMENT数据加载时间,_update_timeTIMESTAMPCOMMENT数据更新时间)COMMENTODS层订单表PARTITIONEDBY(dt STRING)STOREDASPARQUET LOCATION/warehouse/ods/order;-- ODS层埋点日志表CREATETABLEods_app_event(event_id STRINGCOMMENT事件ID,event_name STRINGCOMMENT事件名称,user_id STRINGCOMMENT用户ID,device_id STRINGCOMMENT设备ID,platform STRINGCOMMENT平台,app_version STRINGCOMMENTApp版本,event_timeTIMESTAMPCOMMENT事件时间,properties STRINGCOMMENT事件属性JSON,_load_timeTIMESTAMPCOMMENT加载时间)COMMENTODS层App事件表PARTITIONEDBY(dt STRING)STOREDASPARQUET LOCATION/warehouse/ods/app_event;3. 数据加载# 使用Spark加载ODS数据defload_ods_data():sparkSparkSession.builder.getOrCreate()# 从MySQL加载订单数据orders_dfspark.read \.format(jdbc)\.option(url,jdbc:mysql://mysql:3306/order_db)\.option(dbtable,orders)\.option(user,root)\.option(password,password)\.load()# 添加元数据字段orders_dforders_df \.withColumn(_source_table,lit(orders))\.withColumn(_load_time,current_timestamp())\.withColumn(_update_time,current_timestamp())# 写入ODS层orders_df.write \.format(parquet)\.mode(overwrite)\.partitionBy(dt)\.saveAsTable(ods_order)四、DWD层明细数据层1. 特点数据清洗去除脏数据、异常值标准化字段命名、单位统一一致性维度数据一致性处理拉链存储保留历史变更2. 建表语句-- DWD层订单明细事实表CREATETABLEdwd_order_detail(order_idBIGINTCOMMENT订单ID,order_no STRINGCOMMENT订单号,user_idBIGINTCOMMENT用户ID,user_name STRINGCOMMENT用户姓名,user_phone STRINGCOMMENT用户手机,shop_idBIGINTCOMMENT商家ID,shop_name STRINGCOMMENT商家名称,category_idBIGINTCOMMENT商家品类ID,category_name STRINGCOMMENT商家品类名称,order_status STRINGCOMMENT订单状态,order_status_name STRINGCOMMENT订单状态名称,order_amountDECIMAL(12,2)COMMENT订单金额,discount_amountDECIMAL(12,2)COMMENT优惠金额,pay_amountDECIMAL(12,2)COMMENT实付金额,pay_type STRINGCOMMENT支付方式,order_timeTIMESTAMPCOMMENT下单时间,pay_timeTIMESTAMPCOMMENT支付时间,cancel_timeTIMESTAMPCOMMENT取消时间,start_date STRINGCOMMENT有效期开始日期,end_date STRINGCOMMENT有效期结束日期,is_currentINTCOMMENT是否最新1是0否,_load_timeTIMESTAMPCOMMENT数据加载时间)COMMENTDWD层订单明细表STOREDASPARQUET LOCATION/warehouse/dwd/order_detail;-- DWD层用户维度表拉链CREATETABLEdwd_user_dimension(user_idBIGINTCOMMENT用户ID,user_name STRINGCOMMENT用户姓名,phone STRINGCOMMENT手机号,register_timeTIMESTAMPCOMMENT注册时间,user_level STRINGCOMMENT用户等级,start_date STRINGCOMMENT有效期开始日期,end_date STRINGCOMMENT有效期结束日期,is_currentINTCOMMENT是否当前,_load_timeTIMESTAMPCOMMENT加载时间)COMMENTDWD层用户维度表STOREDASPARQUET LOCATION/warehouse/dwd/user_dimension;3. 数据清洗defclean_order_data(df):# 去除重复dfdf.dropDuplicates([order_id])# 处理空值dfdf.fillna({discount_amount:0,pay_time:1970-01-01 00:00:00})# 数据标准化dfdf.withColumn(phone,regexp_replace(col(phone),[^0-9],))dfdf.withColumn(phone,concat(lit(86),col(phone)))# 异常值处理dfdf.filter((col(pay_amount)0)(col(pay_amount)1000000)(col(order_status).isin([1,2,3,4,5,6])))# 枚举值映射status_map{1:待支付,2:已支付,3:已完成,4:已取消,5:退款中,6:已退款}forstatus,nameinstatus_map.items():dfdf.withColumn(order_status_name,when(col(order_status)status,name).otherwise(col(order_status_name)))returndf4. 拉链处理defslowly_changing_dimension(df,pk,start_col,end_col,scd_cols): 拉链表处理 # 读取历史数据history_dfspark.read.parquet(f/warehouse/dwd/{table_name})# 找出变化的数据changed_dfdf.alias(new)\.join(history_df.alias(old),df[pk]history_df[pk],left)\.filter(# 新增记录history_df[pk].isNull()|# 变化记录|any(col(fnew.{c})!col(fold.{c})forcinscd_cols))\.select(new.*)# 关闭历史记录history_dfhistory_df.join(changed_df.select(pk),pk,leftanti).withColumn(end_col,current_date())# 新增记录设置有效期changed_dfchanged_df \.withColumn(start_col,current_date())\.withColumn(end_col,lit(9999-12-31))# 合并resulthistory_df.union(changed_df)returnresult五、DWS层数据汇总层1. 特点轻度汇总按主题按天/周/月汇总通用性可供多个应用复用宽表设计减少Join提高查询性能预计算减少实时计算压力2. 建表语句-- DWS层订单汇总宽表每日CREATETABLEdws_order_daily(shop_idBIGINTCOMMENT商家ID,shop_name STRINGCOMMENT商家名称,category_idBIGINTCOMMENT品类ID,category_name STRINGCOMMENT品类名称,order_date STRINGCOMMENT统计日期,order_countINTCOMMENT订单数,order_user_countINTCOMMENT下单用户数,order_amountDECIMAL(14,2)COMMENT订单金额,pay_countINTCOMMENT支付订单数,pay_user_countINTCOMMENT支付用户数,pay_amountDECIMAL(14,2)COMMENT支付金额,refund_countINTCOMMENT退款订单数,refund_amountDECIMAL(14,2)COMMENT退款金额,new_user_countINTCOMMENT新用户下单数,_load_timeTIMESTAMPCOMMENT加载时间)COMMENTDWS层订单汇总表PARTITIONEDBY(dt STRING)STOREDASPARQUET LOCATION/warehouse/dws/order_daily;-- DWS层用户行为宽表CREATETABLEdws_user_behavior_daily(user_idBIGINTCOMMENT用户ID,user_name STRINGCOMMENT用户姓名,user_level STRINGCOMMENT用户等级,register_date STRINGCOMMENT注册日期,behavior_date STRINGCOMMENT行为日期,pvINTCOMMENT浏览商品数,cart_countINTCOMMENT加购次数,order_countINTCOMMENT下单次数,order_amountDECIMAL(12,2)COMMENT下单金额,pay_countINTCOMMENT支付次数,pay_amountDECIMAL(12,2)COMMENT支付金额,_load_timeTIMESTAMPCOMMENT加载时间)COMMENTDWS层用户行为表PARTITIONEDBY(dt STRING)STOREDASPARQUET LOCATION/warehouse/dws/user_behavior_daily;3. 数据汇总defbuild_order_daily():sparkSparkSession.builder.getOrCreate()# 读取DWD层数据order_dfspark.read.parquet(/warehouse/dwd/order_detail)# 过滤当日数据today2024-01-15order_todayorder_df.filter(col(dt)today)# 汇总计算daily_statsorder_today.groupBy(shop_id,category_id,order_date)\.agg(count(*).alias(order_count),countDistinct(user_id).alias(order_user_count),sum(order_amount).alias(order_amount),count(when(col(pay_time).isNotNull(),1)).alias(pay_count),sum(when(col(pay_time).isNotNull(),col(pay_amount))).alias(pay_amount),count(when(col(order_status)4,1)).alias(refund_count),sum(when(col(order_status)4,col(pay_amount))).alias(refund_amount))# 写入DWS层daily_stats.write \.format(parquet)\.mode(overwrite)\.partitionBy(dt)\.saveAsTable(dws_order_daily)六、ADS层数据应用层1. 特点面向应用根据业务需求定制直接可查供报表、数据产品使用性能优先预计算好结果2. 报表宽表-- ADS层商家经营报表CREATETABLEads_shop_report(shop_idBIGINTCOMMENT商家ID,shop_name STRINGCOMMENT商家名称,category_name STRINGCOMMENT品类,region_name STRINGCOMMENT地区,report_date STRINGCOMMENT报表日期,order_countINTCOMMENT订单数,order_amountDECIMAL(14,2)COMMENT订单金额,pay_rateDECIMAL(6,4)COMMENT支付转化率,avg_order_amountDECIMAL(12,2)COMMENT客单价,refund_rateDECIMAL(6,4)COMMENT退款率,new_user_countINTCOMMENT新用户数,old_user_countINTCOMMENT老用户数,repeat_rateDECIMAL(6,4)COMMENT复购率,_load_timeTIMESTAMPCOMMENT加载时间)COMMENTADS层商家经营报表STOREDASPARQUET LOCATION/warehouse/ads/shop_report;3. 报表计算defbuild_shop_report():sparkSparkSession.builder.getOrCreate()# 读取DWS层数据order_dailyspark.read.parquet(/warehouse/dws/order_daily)shop_dfspark.read.parquet(/warehouse/dwd/shop_dimension)# 计算报表reportorder_daily.alias(o)\.join(shop_df.alias(s),shop_id,left)\.groupBy(o.shop_id,report_date)\.agg(sum(order_count).alias(order_count),sum(order_amount).alias(order_amount),avg(when(col(pay_count)0,col(pay_count)/col(order_count))).alias(pay_rate),avg(order_amount/col(order_count)).alias(avg_order_amount))report.write \.format(parquet)\.mode(overwrite)\.saveAsTable(ads_shop_report)七、总结数据仓库分层是数据架构的基础ODS原始数据层保留历史DWD明细数据层清洗标准化DWS汇总数据层主题宽表ADS应用数据层报表定制最佳实践合理设计分层减少数据冗余统一命名规范便于理解建立数据质量监控做好数据血缘追踪个人观点仅供参考