互联网技术 / 互联网资讯 · 2023年11月8日 0

Flink SQL 实战与 HBase 应用结合

本文将探讨 HBase 与 Flink SQL 的结合应用。HBase 是 Google 发布 Bigtable 论文的开源实现,是一种分布式列式存储的 NoSQL 数据库,构建在 HDFS 之上,特别适合大规模实时查询,因此在实时计算领域被广泛使用。HBase 支持实时写入,同时也可以利用 bUCkload 将离线任务生成 HFile 加载到 HBase 表中。而时下 Flink SQL 的流行程度也不言而喻,Flink SQL 为 HBase 提供了连接器,因此将 HBase 与 Flink SQL 结合起来进行实践显得尤为重要。

Flink SQL 实战与 HBase 应用结合

值得注意的是,本文假设读者已具备一定的 HBase 知识,不会详细介绍 HBase 的架构和原理,而是重点讨论 HBase 与 Flink 在实际应用场景中的结合。主要有两种使用场景:第一种是 HBase 作为维表,与 Flink Kafka 表进行时间表连接;第二种是将 Flink SQL 计算结果写入 HBase 表,以供其他用户查询。

HBase 环境准备 数据准备 HBase 作为维度表进行时间表连接的场景 Flink SQL 计算结果写入 HBase 的场景 总结

01 HBase 环境准备

由于缺乏测试用的 HBase 环境,为了避免影响线上 HBase 环境,我们自己构建了一个 HBase Docker 镜像(可以通过 docker pull guxinglei/Myhbase 拉取到本地),该镜像基于官方的 Ubuntu 镜像,上面安装了 HBase 2.2.0 版本和 JDK 1.8 版本。

启动容器时,需暴露 HBase Web UI 端口和内置 ZK 端口,以便通过网页查看信息及创建 Flink HBase 表时需要的 Zookeeper 连接信息。

docker run -it –network=host -p 2181:2181 -p 60011:60011 docker.io/guxinglei/Myhbase:latest bash

Flink SQL 实战与 HBase 应用结合

进入容器后,启动 HBase 集群,并启动 REST 服务器,以方便后续通过 REST API 读取 Flink SQL 写入 HBase 的数据。

# 启动 HBase 集群 bin/start-hbase.sh # 后台启动 REST 服务器 bin/hbase-daemon.sh start rest -p 8000

Flink SQL 实战与 HBase 应用结合

02 数据准备

由于 HBase 环境是临时搭建的单机服务,初始没有数据,需要写入一些数据以供后续示例使用。在 Flink SQL 实战系列的第二篇中,我们介绍了如何注册 Flink MySQL 表,我们可以将广告位表的数据抽取到 HBase 表中,作为维表进行时间表连接。因此,我们需要在 HBase 中创建一张表,并同时创建 Flink HBase 表,这两张表通过 Flink SQL 的 HBase 连接器关联起来。

在容器中启动 HBase Shell,创建一张名为 dim_hbase 的 HBase 表,建表语句如下:

# 在 HBase Shell 创建 HBase 表 hbase(MAIN):002:0> create ‘dim_hbase’,’cf’ CReated table dim_hbase Took 1.3120 seconds => Hbase::Table – dim_hbase

Flink SQL 实战与 HBase 应用结合

在 Flink 中创建 Flink HBase 表,建表语句如下:

# 注册 Flink HBase 表 DROP TABLE IF EXISTS flink_Rtdw.DEMO.hbase_dim_table; CREATE TABLE flink_Rtdw.DEMO.hbase_dim_table( Rowkey STRING, cf ROW < adspace_name STRING >, PRIMARY KEY (Rowkey) NOT ENFORCED ) WITH ( ‘connector’ = ‘hbase-1.4’, ‘table-name’ = ‘dim_hbase’, ‘sink.buffer-flush.max-rows’ = ‘1000’, ‘zookeeper.quorum’ = ‘localhost:2181’ );

Flink MySQL 表和 Flink HBase 表已经创建完成,可以编写将数据抽取到 HBase 的 SQL 作业,SQL 语句和作业状态如下:

# 抽取 MySQL 数据到 HBase 表 insert into hbase_dim_table select CAST (ID as VARCHAR), ROW(name) from MySQL_dim_table;

Flink SQL 实战与 HBase 应用结合

Flink SQL 实战与 HBase 应用结合

Flink SQL 实战与 HBase 应用结合

Flink SQL 实战与 HBase 应用结合

03 HBase 作为维表与 Kafka 进行时间表连接的场景

在 Flink SQL 的连接中,维度表的连接是必不可少的,例如订单金额连接汇率表、点击流连接广告位明细表等,使用场景非常广泛。作为分布式数据库,HBase 在作为维度表连接时比 MySQL 更具优势。在 Flink SQL 实战系列的第二篇中,我们注册了广告的点击流,将 Kafka topic 注册为 Flink Kafka 表,并介绍了在 Flink SQL 中使用时间表连接;本节将介绍如何将 HBase 作为维度表使用,前面小节中已经将数据抽取到 HBase 中,现在可以直接编写时间表连接的计算逻辑。

作为广告点击流的 Flink Kafka 表与作为广告位的 Flink HBase 表通过广告位 ID 进行时间表连接,输出广告位 ID 和广告位中文名称,SQL 连接逻辑如下:

select adsdw_dwd_Max_click_MoBIleapp.publisher_adspace_adspaceId as publisher_adspace_adspaceId, hbase_dim_table.cf.adspace_name as publisher_adspace_name from adsdw_dwd_Max_click_MoBIleapp left join hbase_dim_table FOR SYSTEM_TIME AS OF adsdw_dwd_Max_click_MoBIleapp.ProcTime on cast(adsdw_dwd_Max_click_MoBIleapp.publisher_adspace_adspaceId as string) = hbase_dim_table.Rowkey;

时间表连接作业提交至 Flink 集群后的状态及连接结果如下:

Flink SQL 实战与 HBase 应用结合

Flink SQL 实战与 HBase 应用结合

04 计算结果写入 HBase 的场景

在前面的小节中,HBase 作为维度表用于时间表连接是非常常见的场景,实际上 HBase 作为存储计算结果的场景同样非常普遍。由于 HBase 是分布式数据库,底层存储借助多副本机制的 HDFS,维护简便、扩容灵活、实时查询速度快,同时提供多种客户端接口,方便下游对存储在 HBase 中数据的使用。因此,本小节将介绍如何通过 Flink SQL 将计算结果写入 HBase,并通过 REST API 查询计算结果的场景。

进入容器后,在 HBase 中新建一张表,一个列族即可满足需求,建表语句如下:

# 注册 HBase sink 表 create ‘dwa_hbase_click_report’,’cf’

Flink SQL 实战与 HBase 应用结合

创建好 HBase 表后,我们需要在 Flink SQL 中创建一张 Flink HBase 表,此时需要明确列族下的字段。前面已经注册了作为点击流的 Flink Kafka 表,因此本节将计算点击流的 UV 和点击数,两个字段分别为 uv 和 click_count,建表语句如下:

# 注册 Flink HBase 表 DROP TABLE IF EXISTS flink_Rtdw.DEMO.dwa_hbase_click_report; CREATE TABLE flink_Rtdw.DEMO.dwa_hbase_click_report( Rowkey STRING, cf ROW < uv BIGINT, click_count BIGINT >, PRIMARY KEY (Rowkey) NOT ENFORCED ) WITH ( ‘connector’ = ‘hbase-1.4’, ‘table-name’ = ‘dwa_hbase_click_report’, ‘sink.buffer-flush.max-rows’ = ‘1000’, ‘zookeeper.quorum’ = ‘hostname:2181’ );

Flink SQL 实战与 HBase 应用结合

前面创建的点击流 Flink Kafka 表和用于存储计算结果的 HBase 表及 Flink HBase 表已准备就绪,我们将进行一个 1 分钟的翻转窗口计算 UV 和点击数,并将计算结果写入 HBase。对于了解 HBase 的人来说,Rowkey 的设计对 HBase Region 的分布有着重要影响,因此我们的 Rowkey 设计为使用 Flink SQL 内置的 REVERSE 函数对广告位 ID 进行反转,并与窗口起始时间进行连接。SQL 逻辑语句如下:

INSERT INTO dwa_hbase_click_report SELECT CONCAT(REVERSE(CAST(publisher_adspace_adspaceId AS STRING)) , ‘_’ , CAST((UNIX_TIMESTAMP(DATE_FORMAT(TUMBLE_START(ets, INTERVAL ‘1’ MINUTE), ‘YYYY-MM-dd HH:MM:SS’)) * 1000) AS STRING) ) as Rowkey, ROW(COUNT(DISTINCT audience_Mvid) , COUNT(audience_behavior_click_creative_impressionId)) as cf FROM adsdw_dwd_Max_click_MoBIleapp WHERE publisher_adspace_adspaceId IS NOT NULL AND audience_Mvid IS NOT NULL AND audience_behavior_click_creative_impressionId IS NOT NULL GROUP BY TUMBLE(ets, INTERVAL ‘1’ MINUTE), publisher_adspace_adspaceId;

Flink SQL 实战与 HBase 应用结合

提交 SQL 作业后的状态及结果检查如下:

Flink SQL 实战与 HBase 应用结合

Flink SQL 实战与 HBase 应用结合

上述 SQL 作业已成功将计算结果写入 HBase。对于线上 HBase 服务而言,许多同事可能没有 HBase 客户端的权限,因此无法通过 HBase Shell 读取数据;另外,作为线上报表服务显然不可能通过 HBase Shell 查询数据。因此,在实时报表场景中,数据开发工程师将数据写入 HBase,而前端工程师则通过 REST API 读取数据。我们之前已启动了 HBase REST 服务器进程,可以通过 REST 服务读取 HBase 中的数据。

我们先获取一条刚刚写入 HBase 的数据,如下所示:

Flink SQL 实战与 HBase 应用结合

接下来开始通过 REST API 查询 HBase 中的数据,第一步,执行以下语句以获取 scannerId;首先需要将要查询的 Rowkey 进行 base64 编码,后续需要对结果进行 base64 解码。

Rowkey base64 编码前:0122612_1606295280000 base64 编码之后:MDEyMjYxMl8xNjA2Mjk1MjgwMDAw

curl -vi -X PUT -H “Accept: text/xml” -H “Content-Type: text/xml” -d ”””” “http://hostname:8000/dwa_hbase_click_report/scanner”

Flink SQL 实战与 HBase 应用结合