一、什么是拉链表?
定义
- 针对数据仓库设计中表存储数据的方式而定义的,设计表的时候添加 start_date和 end_date 两个字段,数据更新时,通过修改 end_date 来设置数据的有效时间。
- 所谓拉链,就是记录历史
- 记录一个事物从开始,一直到当前状态的所有变化的信息。
- 我们可以使用这张表拿到最新的当天的最新数据以及之前的历史数据。
- 既能满足反应数据的历史状态,又可以最大限度地节省存储空间
目的
- 解决SCD(Slowly Changing Dimensions),缓慢变化维。
收益
- 最大程度的节省存储
- 快速、高效的获取历史上任意一天的快照数据
适用场景
- 数据量很大且业务系统不会长期保留历史数据,需要在大数据平台保存
- 表字段会被update更新操作
- 需要查看某一个时间点或者时间段的历史快照信息
- 表中的记录变化的比例和频率很小
拉链表和流水表
- 流水表存放的是一个用户的变更记录,比如在一张流水表中,一天的数据中,会存放一个用户的每条修改记录,但是在拉链表中只有一条记录。
- 这是拉链表设计时需要注意的一个粒度问题。我们当然也可以设置的粒度更小一些,一般按天就足够。
二、缓慢变化维
2.1 谈到拉链表就不得不谈SCD(缓慢变化维问题)
- 缓慢变化维,简称SCD(Slowly Changing Dimensions)
- 一些维度表的数据不是静态的,而是会随着时间而缓慢地变化(这里的缓慢是相对事实表而言,事实表数据变化的速度比维度表快)
- 这种随着时间发生变化的维度称之为缓慢变化维
- 把处理维度表数据历史变化的问题,称为缓慢变化维问题,简称SCD问题
- 举例
- 这个用户的数据不是一直不变,而是有可能发生变化。例如:用户修改了出生日期 或者用户修改了住址。
2.2 缓慢变化维怎么解决?(粗看有五种)
2.2.1 保留初始值(不让改)
- 如出生日期的数据,始终按照用户第一次填写的数据为准
2.2.2 改写属性值
- 对其相应需要重写维度行中的旧值,以当前值替换。因此其始终反映最近的情况。
- 当一个维度值的数据源发生变化,并且不需要在维度表中保留变化历史时,通常用新数据来覆盖旧数据。这样的处理使属性所反映的中是最新的赋值。
- 这种方法有个前提,用户不关心这个数据的变化。
- 用户需要历史数据怎么办?
- 我要分析历史变化数据怎么办?
2.2.3 增加维度新行
-
- 那么怎么区分那条数据是最新的,或者怎么看历史呢?这里可以考虑拉链表的方式,在后面新增两列。。。。。。(具体的话我们接着往下看拉链~)
2.2.4 增加维度新列
- 用不同的字段来保存不同的值,就是在表中增加一个字段,这个字段用来保存变化后的当前值,而原来的值则被称为变化前的值。总的来说,这种方法通过添加字段来保存变化后的痕迹。
- 修改前:
- 修改后
2.2.5 使用历史表
- 另外建一个表来保存历史记录,这种方式就是将历史数据与当前数据完全分开来,在维度中只保存当前最新的数据。
- 优点
- 可以同时分析当前及前一次变化的属性值
- 缺点
- 只保留了最后一次变化信息
- 修改前:
- 修改后
- 修改后历史表:
- 在数据仓库的数据模型设计过程中,经常会遇到下面这种表的设计:
- 有一些表的数据量很大,比如一张用户表,大约10亿条记录,50个字段,这种表,即使使用ORC压缩,单张表的存储也会超过100G,在HDFS使用双备份或者三备份的话就更大一些。
- 表中的部分字段会被update更新操作,如用户联系方式,产品的描述信息,订单的状态等等。
- 需要查看某一个时间点或者时间段的历史快照信息,比如,查看某一个订单在历史某一个时间点的状态。
- 表中的记录变化的比例和频率不是很大,比如,总共有10亿的用户,每天新增和发生变化的有200万左右,变化的比例占的很小。
- 那么对于这种表我该如何设计呢?下面有几种方案可选:
- 方案一:每天只留最新的一份,比如我们每天抽取最新的一份全量数据到Hive中。
-
优点
节省空间,一些普通的使用也很方便,不用在选择表的时候加一个时间分区什么的。
缺点
没有历史数据,想翻翻旧账只能通过其它方式,比如从流水表里面抽。
方案二:每天保留一份全量的切片数据。
优点
每天一份全量的切片是一种比较稳妥的方案,而且历史数据也在。
缺点
就是存储空间占用量太大,如果对这边表每天都保留一份全量,那么每次全量中会保存很多不变的信息,对存储是极大的浪费,这点我感触还是很深的…
当然我们也可以做一些取舍,比如只保留近一个月的数据?但是,需求是无耻的,数据的生命周期不是我们能完全左右的。
方案三:使用拉链表。
优点
在空间上做了一个取舍,虽说不像方案一那样占用量那么小,但是它每日的增量可能只有方案二的千分之一甚至是万分之一。
既能获取最新的数据,也能添加筛选条件也获取历史的数据。
- 这里举例测试一下
- 2023-05-01 首次抽取
- 2023-05-02 修改 出生日期
- 2023-05-03 修改 地址
- 普通拉链表的话在后面增加两个字段,即start_date和end_date
- 这里我们讨论一个简单的优化
- 新增3个字段(start_date,end_date,dp)
- “start_date”用来备注该记录的起始时间
- 不代表该记录的创建时间,因此该字段为非业务时间。
- 同时拉链表会有个分区字段,分别为end_date和status。分区代表不同的数据维护方式。
- end_date:记录数据有效期的截止日期
- status:表示数据的当前状态
- 线上(active)
- 与线上库保持一致,同时假定线上库的记录一直有效,因此将其有效截止日期分区end_date值设置为一个未来时间终结点,例如’9999-12-31’
- 过期(expired)
- 如果数据量太大,可以加一个history归档
- step1:准备数仓的表,这里我们采用增量拉链的方式处理
CREATE TABLE tmp.temp_ods_user(
`id` int comment 'id',
`user_id` int comment '用户id',
`user_name` string comment '用户名称',
`date_of_birth` string comment '出生日期',
`address_of_birth` string comment '出生地址',
`update_time` string comment '更新时间'
)
comment '测试拉链表,用户信息表ods抽取层'
PARTITIONED BY ( `dt` string COMMENT '增量抽取日期')
stored as orc
;
CREATE TABLE tmp.temp_dw_user_chain(
`start_date` string comment '起始日期',
`change_code` string comment '字段MD5值',
`id` int comment 'id',
`user_id` int comment '用户id',
`user_name` string comment '用户名称',
`date_of_birth` string comment '出生日期',
`address_of_birth` string comment '出生地址',
`update_time` string comment '更新时间'
)
comment '测试拉链表,用户信息表dw明细层拉链处理'
PARTITIONED BY ( `status` string COMMENT '状态'
,`end_date` string COMMENT '截止日期'
)
stored as orc
;
- step2:2023-05-01 第一次抽取
-- 模拟第一次抽取 ods
insert overwrite table tmp.temp_ods_user partition (dt='2023-05-01')
select 9527 as id
,114 as user_id
,'张三' as user_name
,'1988-09-08' as date_of_birth
,'北京市朝阳区' as address_of_birth
,'2023-01-01 10:00:00' as update_time
;
-- 模拟第一次抽取 拉链
insert overwrite table tmp.temp_dw_user_chain partition (status='expired',end_date='2023-05-01')
select
case when h.change_code<>c.change_code then h.start_date else e.start_date end as start_date
,case when h.change_code<>c.change_code then h.change_code else e.change_code end as change_code
,case when h.change_code<>c.change_code then h.id else e.id end as id
,case when h.change_code<>c.change_code then h.user_id else e.user_id end as user_id
,case when h.change_code<>c.change_code then h.user_name else e.user_name end as user_name
,case when h.change_code<>c.change_code then h.date_of_birth else e.date_of_birth end as date_of_birth
,case when h.change_code<>c.change_code then h.address_of_birth else e.address_of_birth end as address_of_birth
,case when h.change_code<>c.change_code then h.update_time else e.update_time end as update_time
from(select *
from tmp.temp_dw_user_chain
where status = 'active'
and id is not null
) h -- 上次的active数据
full join(select `(dt|rank)?+.+`
from (select id,user_id,user_name,date_of_birth,address_of_birth,update_time,change_code
,row_number(id) as rank
from (select *,md5(concat_ws('_',id,user_id,user_name,date_of_birth,address_of_birth,update_time)) as change_code
from tmp.temp_ods_user
where dt = '2023-05-01'
and id is not null
distribute by id sort by id desc
) x
) t
where t.rank = 1
) c -- 抽取的增量数据
on h.id = c.id
full join(select *
from tmp.temp_dw_user_chain
where status='expired'
and end_date='2023-05-01'
) e -- 过期数据
on e.id = c.id
where h.id is not null and c.id is not null and (( h.change_code <> c.change_code ) or ( h.change_code = c.change_code and e.id is not null))
;
insert overwrite table tmp.temp_dw_user_chain partition (status='active',end_date='9999-12-31')
select if(h.id is null or (c.id is not null and (h.change_code <> c.change_code)),'2023-05-01',h.start_date) as start_date
,case
when h.id is null then
c.change_code
when h.id is not null and c.id is not null and h.change_code <> c.change_code then c.change_code
else
h.change_code
end as change_code
,case when c.id is not null then c.id else h.id end as id
,case when c.id is not null then c.user_id else h.user_id end as user_id
,case when c.id is not null then c.user_name else h.user_name end as user_name
,case when c.id is not null then c.date_of_birth else h.date_of_birth end as date_of_birth
,case when c.id is not null then c.address_of_birth else h.address_of_birth end as address_of_birth
,case when c.id is not null then c.update_time else h.update_time end as update_time
from(select *
from tmp.temp_dw_user_chain
where status = 'active'
and id is not null
) h -- 上次的active数据
full join(select `(dt|rank)?+.+`
from (select id,user_id,user_name,date_of_birth,address_of_birth,update_time,change_code
,row_number(id) as rank
from (select *
,md5(concat_ws('_',id,user_id,user_name,date_of_birth,address_of_birth,update_time)) as change_code
from tmp.temp_ods_user
where dt = '2023-05-01'
and id is not null
distribute by id sort by id desc
) x
) t
where t.rank = 1
) c -- 抽取的增量数据
on h.id = c.id
;
-- 拉链任务可能在一天内手工跑多次,当天第一次跑拉链任务时,EXPIRED分区中是没有数据的,此时会将被更新的旧数据写入EXPIRED分区中。当天第二次手工重跑拉链任务时,EXPIRED分区中已有数据,会直接将EXPIRED分区数据写入EXPIRED分区。
-- 拉链SQL中EXPIRED分区是必须使用的。拉链任务当天第二次重跑时ACTIVE分区数据已经更新,不是昨天的状态,不使用EXPIRED分区中已有的数据会清空EXPIRED分区数据。
- 查看dw拉链之后的数据
- step3:2023-05-02 第一次变更后抽取