一.OPPO 实时数仓的演进思路
1.1.OPPO 业务与数据规模
大家都知道OPPO是智能手机,却不知道OPPO和互联网、大数据有什么关系。下图给出了OPPO的业务和数据概况:
OPPO作为手机厂商,基于Android定制了自己的ColorOS系统,日活跃用户超过2亿。围绕ColorOS,OPPO搭建了很多互联网应用,如应用商店、浏览器、信息流等。
在运营这些互联网应用的过程中,OPPO积累了大量的数据。上图右侧是整体数据规模的演变:从2012年开始,每年增长2~3倍。截至目前,总数据量超过100PB,日数据量超过200TB。
为了支撑如此大的数据量,OPPO开发了一套完整的数据系统和服务,并逐渐形成了自己的数据中间站系统。
1.2.OPPO 数据中台
今年,每个人都在谈论数据中心。OPPO如何理解数据中心?我们将其分为四个级别:
底层是统一工具体系,覆盖‘接入-治理-开发-消费’全数据链路;基于工具体系构建数据仓库,分为‘原始层-明细层-汇总层-应用层’,这也是经典的数据仓库架构;然后是全球数据系统。什么是全球?就是打通公司所有的业务数据,形成统一的数据资产,比如ID-Mapping、用户标签等。最终,如果数据可以被业务使用,它需要场景驱动的数据产品和服务。以上是OPPO数据中心的整个系统,其中数据仓库处于非常基础和核心的地位。
1.3. 构建 OPPO 离线数仓
在过去的两三年里,我们的重点是线下仓库的建设。上图大致描述了整个构建过程:首先,数据源基本是手机、日志文件、DB数据库。我们构建了基于Apache NiFi的高可用、高通量接入系统,将数据放入HDFS形成原始层;然后基于Hive的每小时ETL和每日汇总Hive任务分别负责计算生成明细层和汇总层;
最后,应用层基于OPPO开发的数据产品,主要包括报表分析、用户画像和界面服务。此外,中间明细层还支持基于Presto的即席查询和自助取款。随着线下仓库盘点的逐步完善,业务对实时仓库盘点的需求越来越强烈。
1.5. 离线到实时的平滑迁移
无论是平台还是系统,都离不开上下两层:上层是API,是面向用户的编程抽象和接口;底层运行时是面向内核的执行引擎。我们希望从离线到实时的迁移是平滑的,这意味着什么?从API层面来说,数据仓库的抽象是表,编程接口是SQL UDF。离线数据仓库时代,用户已经习惯了这个API,迁移到实时数据仓库后最好保持一致。在运行时层面,计算引擎从Hive进化到Flink,存储引擎从HDFS进化到卡夫卡。
基于以上思路,我们只需要对之前提到的线下仓库管道进行改造,就可以得到实时仓库管道。
1.6. 构建 OPPO 实时数仓
从上图可以看出,整个流水线基本和线下仓库差不多,只是Hive换成了Flink,HDFS换成了卡夫卡。从整体流程来看,基本模型不变,还是由原层、明细层、汇总层、应用层的级联计算组成。
所以,这里的核心问题是,如何基于Flink构建这个管道。下面看看我们基于Flink SQL的一些工作。
二. 基于 Flink SQL 的扩展工作
2.1.Why Flink SQL
首先,为什么要用Flink SQL?下图是Flink框架的基本结构,底层是运行时。我们认为这个执行引擎的核心优势有四点:第一,低延迟,高吞吐量;第二,端到端
的 Exactly-once;第三,可容错的状态管理;第四,Window & Event time 的支持。基于 Runtime 抽象出 3 个层次的 API,SQL 处于最上层。Flink SQL API 有哪些优势呢?我们也从四个方面去看:第一,支持 ANSI SQL 的标准;第二,支持丰富的数据类型与内置函数,包括常见的算术运算与统计聚合;第三,可自定义 Source/Sink,基于此可以灵活地扩展上下游;第四,批流统一,同样的 SQL,既可以跑离线也可以跑实时。
那么,基于 Flink SQL API 如何编程呢?下面是一个简单的演示:
首先是定义与注册输入 / 输出表,这里创建了 2 张 Kakfa 的表,指定 kafka 版本是什么、对应哪个 topic;接下来是注册 UDF,篇幅原因这里没有列出 UDF 的定义;最后是才是执行真正的 SQL。可以看到,为了执行 SQL,需要做这么多的编码工作,这并不是我们希望暴露给用户的接口。
2.2. 基于 WEB 的开发 IDE
2.5.Flink SQL 对接外部数据源
搞清楚了 Flink SQL 注册库表的过程,给我们带来这样一个思路:如果外部元数据创建的表也能被转换成 TableFactory 可识别的 map,那么就能被无缝地注册到 TableEnvironment。基于这个思路,我们实现了 Flink SQL 与已有元数据中心的对接,大致过程参见下图:
通过元数据中心创建的表,都会将元数据信息存储到 MySQL,我们用一张表来记录 Table 的基本信息,然后另外三张表分别记录 Connector、Format、Schema 转换成 key-value 后的描述信息。之所以拆开成三张表,是为了能够能独立的更新这三种描述信息。接下来是定制实现的 ExternalCatalog,能够读取 MySQL 这四张表,并转换成 map 结构。
2.6. 实时表 - 维表关联
到目前为止,我们的平台已经具备了元数据管理与 SQL 作业管理的能力,但是要真正开放给用户使用,还有一点基本特性存在缺失。通过我们去构建数仓,星型模型是无法避免的。这里有一个比较简单的案例:中间的事实表记录了广告点击流,周边是关于用户、广告、产品、渠道的维度表。
假定我们有一个 SQL 分析,需要将点击流表与用户维表进行关联,这个目前在 Flink SQL 中应该怎么来实现?我们有两种实现方式,一个基于 UDF,一个基于 SQL 转换。
三.构建实时数仓的应用案例
下面分享几个典型的应用案例,都是在我们的平台上用 Flink SQL 来实现的。
3.1. 实时 ETL 拆分
这里是一个典型的实时 ETL 链路,从大表中拆分出各业务对应的小表:
OPPO 的最大数据来源是手机端埋点,从手机 APP 过来的数据有一个特点,所有的数据是通过统一的几个通道上报过来。因为不可能每一次业务有新的埋点,都要去升级客户端,去增加新的通道。比如我们有个 sdk_log 通道,所有 APP 应用的埋点都往这个通道上报数据,导致这个通道对应的原始层表巨大,一天几十个 TB。但实际上,每个业务只关心它自身的那部分数据,这就要求我们在原始层进行 ETL 拆分。
这个 SQL 逻辑比较简单,无非是根据某些业务字段做筛选,插入到不同的业务表中去。它的特点是,多行 SQL 最终合并成一个 SQL 提交给 Flink 执行。大家担心的是,包含了 4 个 SQL,会不会对同一份数据重复读取 4 次?其实,在 Flink 编译 SQL 的阶段是会做一些优化的,因为最终指向的是同一个 kafka topic,所以只会读取 1 次数据。
另外,同样的 Flink SQL,我们同时用于离线与实时数仓的 ETL 拆分,分别落入 HDFS 与 Kafka。Flink 中本身支持写入 HDFS 的 Sink,比如 RollingFileSink。
3.2. 实时指标统计
这里是一个典型的计算信息流 CTR 的这个案例,分别计算一定时间段内的曝光与点击次数,相除得到点击率导入 Mysql,然后通过我们内部的报表系统来可视化。这个 SQL 的特点是它用到了窗口 (Tumbling Window) 以及子查询。
3.3. 实时标签导入
这里是一个实时标签导入的案例,手机端实时感知到当前用户的经纬度,转换成具体 POI 后导入 ES,最终在标签系统上做用户定向。
这个 SQL 的特点是用了 AggregateFunction,在 5 分钟的窗口内,我们只关心用户最新一次上报的经纬度。AggregateFunction 是一种 UDF 类型,通常是用于聚合指标的统计,比如计算 sum 或者 average。在这个示例中,由于我们只关心最新的经纬度,所以每次都替换老的数据即可。
四. 未来工作的思考和展望
最后,给大家分享一下关于未来工作,我们的一些思考与规划,还不是太成熟,抛出来和大家探讨一下。
4.1. 端到端的实时流处理
什么是端到端?一端是采集到的原始数据,另一端是报表 / 标签 / 接口这些对数据的呈现与应用,连接两端的是中间实时流。当前我们基于 SQL 的实时流处理,源表是 Kafka,目标表也是 Kafka,统一经过 Kafka 后再导入到 Druid/ES/HBase。
这样设计的目的是提高整体流程的稳定性与可用性:首先,kafka 作为下游系统的缓冲,可以避免下游系统的异常影响实时流的计算(一个系统保持稳定,比起多个系统同时稳定,概率上更高点);其次,kafka 到 kafka 的实时流,exactly-once 语义是比较成熟的,一致性上有保证。
然后,上述的端到端其实是由割裂的三个步骤来完成的,每一步可能需要由不同角色人去负责处理:数据处理需要数据开发人员,数据导入需要引擎开发人员,数据资产化需要产品开发人员。
我们的平台能否把端到端给自动化起来,只需要一次 SQL 提交就能打通处理、导入、资产化这三步?在这个思路下,数据开发中看到的不再是 Kafka Table,而应该是面向场景的展示表 / 标签表 / 接口表。比如对于展示表,创建表的时候只要指定维度、指标等字段,平台会将实时流结果数据从 Kafka 自动导入 Druid,再在报表系统自动导入 Druid 数据源,甚至自动生成报表模板。
4.2. 实时流的血缘分析
关于血缘分析,做过离线数仓的朋友都很清楚它的重要性,它在数据治理中都起着不可或缺的关键作用。对于实时数仓来说也莫不如此。我们希望构建端到端的血缘关系,从采集系统的接入通道开始,到中间流经的实时表与实时作业,再到消费数据的产品,都能很清晰地展现出来。基于血缘关系的分析,我们才能评估数据的应用价值,核算数据的计算成本。
4.3. 离线 - 实时数仓一体化
最后提一个方向是离线实时数仓的一体化。我们认为短期内,实时数仓无法替代离线数仓,两者并存是新常态。在离线数仓时代,我们积累的工具体系,如何去适配实时数仓,如何实现离线与实时数仓的一体化管理?理论上来讲,它们的数据来源是一致的,上层抽象也都是 Table 与 SQL,但本质上也有不同的点,比如时间粒度以及计算模式。对于数据工具与产品来说,需要做哪些改造来实现完全的一体化,这也是我们在探索和思考的。