简介 离线计算 离线计算一般指通过批处理 的方式计算已知的所有输入数据 ,输入数据不会产生变化 ,一般计算量级较大,计算时间较长 。
例如今天凌晨一点,把昨天累积的日志,计算出所需结果。最经典的就是 Hadoop 的 MapReduce 方式;
一般需要根据前一日的数据生成报表,虽然统计指标、报表繁多,但是对时效性不敏感。
离线计算的特点:
数据在计算前已经全部就位,不会发生变化; 数据量大且保存时间长; 在大量数据上进行复杂的批量运算; 方便的查看批量计算的结果。 实时计算实时计算一般是指通过流处理 方式计算当日的数据 都算是实时计算。
也会有一些准实时计算,利用离线框架通过批处理完成(小时、10 分钟级)的计算,一般为过渡产品,不能算是实时计算。
实时计算的特点:
局部计算 。每次计算以输入的每条数据,或者微批次、小窗口的数据范围进行计算,没法像离线数据一样能够基于当日全部数据进行统计排序分组;
开发成本较高 。相比离线的批处理 SQL,实时计算需要通过代码 ,往往需要对接多种数据容器完成,相对开发较为复杂;
资源成本较高 。实时计算虽然单位时间内数据量不如批处理,但是需要 24 小时不停进行运行,一旦计算资源投入就无法释放,所以每个任务都要合理分配资源。
时效性 。实时计算往往对时效性有一定的要求,所以要尽量优化整个计算链,减少计算过程中的中间环节。
可视化性 。因为数据是不断变化的,所以相对于严谨整齐的离线报表,实时数据更寄希望通过图形化的手段能够及时的观察到数据趋势。
离线数仓架构设计 实时数仓架构设计在这个架构中,kafka既要提供源数据,也要存储计算产生的中间数据(也就是DW层的数据)。
并且计算时要判断mysql中的数据时事实数据还是维度数据,如果是维度数据需要保存到redis中(也就是DIM层的数据),供之后使用。
在实时数仓中,计算其实并没有进行一个明确的分层,需要自行设计,比如:
数据采集到Kafka中,是最原始的数据,作为ODS层; Kafka中的数据进行一次分流,数据写回kafka,最为DWD层; 维度数据存储到redis中,作为DIM层; 计算好的数据写到OLAP中,作为DWS层。 ADS聚合的话就是直接使用OLAP进行分析出结果了。 数据来源数据一共有两大来源:
业务库 :比如mysql,可以通过MaxWell抽取进入kafka,也可以通过datax、sqoop等直接抽取进HDFS;埋点日志 :线上系统会打入各种日志,这些日志一般以文件的形式保存,我们可以选择用 Flume 定时抽取,也可以用用 Spark Streaming 或者 Storm 来实时接入,当然,Kafka 也会是一个关键的角色。 数据分层 介绍数据分层是使用一种有效的数据组织和管理方法来让我们的数据体系更有序 。数据分层并不能解决所有的数据问题,但是,数据分层却可以给我们带来如下的好处:
清晰数据结构 :每一个数据分层都有它的作用域和职责,在使用表的时候能更方便地定位和理解减少重复开发 :规范数据分层,开发一些通用的中间层数据,能够减少极大的重复计算统一数据口径 :通过数据分层,提供统一的数据出口,统一对外输出的数据口径复杂问题简单化 :将一个复杂的任务分解成多个步骤来完成,每一层解决特定的问题我们将数据模型分为三层:数据运营层( ODS )、数据仓库层(DW)和数据应用层(APP):
ODS层存放的是接入的原始数据经 ; DW层是存放我们要重点设计的数据仓库中间层数据 ; APP是面向业务定制的应用数据 。 数据运营层:ODS(Operational Data Store)“面向主题的”数据运营层,也叫ODS层,是最接近数据源中数据的一层,数据源中的数据,经过抽取、洗净、传输,也就说传说中的 ETL 之后,装入本层。本层的数据,总体上大多是按照源头业务系统的分类方式而分类的 。
一般来讲,为了考虑后续可能需要追溯数据问题,因此对于这一层就不建议做过多的数据清洗工作,原封不动地接入原始数据即可,至于数据的去噪、去重、异常值处理等过程可以放在后面的DWD层来做。
**问:**还是不太明白 ods 和 dwd 层的区别,有了 ods 层后感觉 dwd 没有什么用了。
**答:**嗯,我是这样理解的,站在一个理想的角度来讲,如果 ods 层的数据就非常规整,基本能满足我们绝大部分的需求,这当然是好的,这时候 dwd 层其实也没太大必要。 但是现实中接触的情况是 ods 层的数据很难保证质量,毕竟数据的来源多种多样,推送方也会有自己的推送逻辑,在这种情况下,我们就需要通过额外的一层 dwd 来屏蔽一些底层的差异。
**问:**我大概明白了,是不是说 dwd 主要是对 ods 层做一些数据清洗和规范化的操作,dws 主要是对 ods 层数据做一些轻度的汇总?
**答:**对的,可以大致这样理解。
数据仓库层:DW(Data Warehouse)数据仓库层是我们在做数据仓库时要核心设计的一层,在这里,从 ODS 层中获得的数据按照主题进行分流建立各种数据模型 。DW层又细分为 **DWD(Data Warehouse Detail)**层、DWM(Data WareHouse Middle)层和 DWS(Data WareHouse Servce)层。
数据明细层:DWD(Data Warehouse Detail)该层一般保持和ODS层一样的数据粒度,并且提供一定的数据质量保证。同时,为了提高数据明细层的易用性,该层会采用一些维度退化手法,将维度退化至事实表中,减少事实表和维表的关联。
另外,在该层也会做一部分的数据聚合,将相同主题的数据汇集到一张表中,提高数据的可用性。
数据中间层:DWM(Data WareHouse Middle)该层会在DWD层的数据基础上,对数据做轻度的聚合操作,生成一系列的中间表 ,提升公共指标的复用性,减少重复加工。
直观来讲,就是对通用的核心维度进行聚合操作,算出相应的统计指标。
数据生成方式:由明细层按照一定的业务需求生成轻度汇总表。明细层需要复杂清洗的数据和需要MR处理的数据也经过处理后接入到轻度汇总层。 日志存储方式:内表,parquet文件格式。 日志删除方式:长久存储。 表schema:一般按天创建分区,没有时间概念的按具体业务选择分区字段。 库与表命名。库名:dwb,表名:初步考虑格式为:dwb日期业务表名,待定。 旧数据更新方式:直接覆盖 数据服务层:DWS(Data WareHouse Servce)又称数据集市或宽表。按照业务划分 ,如流量、订单、用户等,生成字段比较多的宽表 ,用于提供后续的业务查询,OLAP分析,数据分发等。
一般来讲,该层的数据表会相对比较少,一张表会涵盖比较多的业务内容,由于其字段较多,因此一般也会称该层的表为宽表。
在实际计算中,如果直接从DWD或者ODS计算出宽表的统计指标,会存在计算量太大并且维度太少的问题,因此一般的做法是,在DWM层先计算出多个小的中间表,然后再拼接成一张DWS的宽表。由于宽和窄的界限不易界定,也可以去掉DWM这一层,只留DWS层,将所有的数据在放在DWS亦可。
数据生成方式:由轻度汇总层和明细层数据计算生成。 日志存储方式:使用impala内表,parquet文件格式。 日志删除方式:长久存储。 表schema:一般按天创建分区,没有时间概念的按具体业务选择分区字段。 库与表命名。库名:dm,表名:初步考虑格式为:dm日期业务表名,待定。 旧数据更新方式:直接覆盖 问答一: dws 和 dwd 的关系
**问:**dws 和dwd 是并行而不是先后顺序?
**答:**并行的,dw 层
**问:**那其实对于同一个数据,这两个过程是串行的?
**答:**dws 会做汇总,dwd 和 ods 的粒度相同,这两层之间也没有依赖的关系
**问:**对呀,那这样 dws 里面的汇总没有经过数据质量和完整度的处理,或者单独做了这种质量相关的处理,为什么不在 dwd 之上再做汇总呢?我的疑问其实就是,dws的轻度汇总数据结果,有没有做数据质量的处理?
**答:**ods 直接到 dws 就好,没必要过 dwd,我举个例子,你的浏览商品行为,我做一层轻度汇总,就直接放在 dws 了。但是你的资料表,要从好多表凑成一份,我们从四五份个人资料表中凑出来了一份完整的资料表放在了 dwd 中。然后在 app 层,我们要出一张画像表,包含用户资料和用户近一年的行为,我们就直接从dwd中拿资料, 然后再在 dws 的基础上做一层统计,就成一个app表了。当然,这不是绝对,dws 和 dwd 有没有依赖关系主要看有没有这种需求。
维表层:DIM(Dimension)维表层主要包含两部分数据:
高基数维度数据:一般是用户资料表、商品资料表类似的资料表 。数据量可能是千万级或者上亿级别;**低基数维度数据:一般是配置表,比如枚举值对应的中文含义,或者日期维表。**数据量可能是个位数或者几千几万。 数据应用层:APP(Application)应用层是根据业务需要,由前面几层数据统计而出的结果,可以直接提供查询展现,或导入至Mysql中使用,也就是主要提供给数据产品和数据分析使用的数据 。一般会存放在 ES、PostgreSql、Redis等系统中供线上系统使用,也可能会存在 Hive 或者 Druid 中供数据分析和数据挖掘使用。比如我们经常说的报表数据,一般就放在这里。
这里面也主要分两种类型:
每日定时任务型:比如我们典型的日计算任务,每天凌晨算前一天的数据,早上起来看报表。 这种任务经常使用 Hive、Spark 或者生撸 MR 程序来计算,最终结果写入 Hive、Hbase、Mysql、Es 或者 Redis 中。 实时数据:这部分主要是各种实时的系统使用,比如我们的实时推荐、实时用户画像,一般我们会用 Spark Streaming、Storm 或者 Flink 来计算,最后会落入 Es、Hbase 或者 Redis 中。 数据生成方式:由明细层、轻度汇总层,数据集市层生成,一般要求数据主要来源于集市层。 日志存储方式:使用impala内表,parquet文件格式。 日志删除方式:长久存储。 表schema:一般按天创建分区,没有时间概念的按具体业务选择分区字段。 库与表命名。库名:暂定apl,另外根据业务不同,不限定一定要一个库。(其实就叫app_)就好了 旧数据更新方式:直接覆盖。 **问:**存到 Redis、ES 中的数据算是 app层吗?
**答:**算是的,我个人的理解,app 层主要存放一些相对成熟的表,能供业务侧使用的。这些表可以在 Hive 中,也可以是从 Hive 导入 Redis 或者 ES 这种查询性能比较好的系统中。
举例说明举个栗子说明一下,如下图,可以认为是一个电商网站的数据体系设计。我们暂且只关注用户访问日志这一部分数据。
在ODS层中,由于各端的开发团队不同或者各种其它问题,用户的访问日志被分成了好几张表上报到了我们的ODS层。 为了方便大家的使用,我们在DWD层做了一张用户访问行为天表,在这里,我们将PC网页、H5、小程序和原生APP访问日志汇聚到一张表里面,统一字段名,提升数据质量,这样就有了一张可供大家方便使用的明细表了。 在DWM层,我们会从DWD层中选取业务关注的核心维度来做聚合操作,比如只保留人、商品、设备和页面区域维度。类似的,我们这样做了很多个DWM的中间表 然后在DWS层,我们将一个人在整个网站中的行为数据放到一张表中,这就是我们的宽表了,有了这张表,就可以快速满足大部分的通用型业务需求了。 最后,在APP应用层,根据需求从DWS层的一张或者多张表取出数据拼接成一张应用表即可。 Hive数仓的分层在原有天级延迟的离线数据处理任务基础上,开发小时级延迟的数据处理链路,将核心数据按小时同步到Hive数仓中,每小时调度一次DAG任务,实现小时级任务计算。任务DAG示意图如下所示:
优点:
离线和小时级任务各自独立 代码逻辑复用性高,减少开发成本 可以使用离线数据覆盖小时级数据,进行数据修复 缺点:
小时级数据的延迟性还是很高,已无法满足业务对数据时效性的要求 MapReduce不适合分钟级频次的任务调度,主要是MapReduce任务启动慢,另外会过高的频次会产生很多小文件,影响HDFS的稳定性,以及SQL on Hadoop系统的查询速度 批量数据处理每次运行对资源要求高,尤其是当凌晨Hadoop资源紧张时,任务经常无法得到调度,延迟严重 参考:👉 数据仓库之数仓分层及hive分层
事实表和维度表 事实表事实表(Fact Table)是指存储有事实记录的表,如系统日志、销售记录等;事实表的记录在不断地动态增长,所以它的体积通常远大于其他表。
事实表作为数据仓库建模的核心,需要根据业务过程来设计,包含了引用的维度和业务过程有关的度量。
作为度量业务过程的事实,一般为整型或浮点型的十进制数值,有可加性,半可加性和不可加性三种类型:
可加:最灵活最有用的事实是完全可加,可加性度量可以按照与事实表关联的任意维度汇总。比如消费总金额; 半可加:半可加度量可以对某些维度汇总,但不能对所有维度汇总。差额是常见的半可加事实,除了时间维度外,他们可以跨所有维度进行操作。(比如每天的余额加起来毫无意义); 不可加:一些度量是完全不可加的,例如:比率。对非可加事实,一种好的方法是,分解为可加的组件来实现聚集。 维度表维度表(Dimension Table)或维表,有时也称查找表(Lookup Table),是与事实表相对应的一种表;它保存了维度的属性值,可以跟事实表做关联;相当于将事实表上经常重复出现的属性抽取、规范出来用一张表进行管理 。常见的维度表有:日期表(存储与日期对应的周、月、季度等的属性)、地点表(包含国家、省/州、城市等属性)等。维度是维度建模的基础和灵魂,
使用维度表有诸多好处,具体如下:
缩小了事实表的大小。 便于维度的管理和维护,增加、删除和修改维度的属性,不必对事实表的大量记录进行改动。 维度表可以为多个事实表重用,以减少重复工作。 事实表和维度表关系事实表和维度表的选择是根据目标对象的需要来选择的,以user表为例,当目标分析为宏观的量级分析时,加工后uesr表通常作为维度表(雪花型);当目标分析为微观的某用户细节变化分析时,加工后的user表也可作为事实表。
软件安装 zk & kafka1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 docker pull wurstmeister/zookeeper docker pull wurstmeister/kafka docker run --privileged=true -d -e TZ="Asia/Shanghai" -p 2181:2181 -v /root/zk/data:/data --name zookeeper --restart always zookeeper docker run \ -p 9092:9092 \ --name kafka \ --privileged=true \ -e KAFKA_BROKER_ID=0 \ -e KAFKA_ZOOKEEPER_CONNECT=192.168.6.134:2181 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.6.134:9092 \ -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \ -d wurstmeister/kafka mkdir -p ~/kafka docker cp -a kafka:/opt/kafka/config/ /mydata/kafka/config docker cp -a kafka:/opt/kafka/logs/ /mydata/kafka/logs docker rm -f kafka docker run \ -p 9092:9092 \ --name kafka \ --privileged=true \ -e KAFKA_BROKER_ID=0 \ -e KAFKA_ZOOKEEPER_CONNECT=192.168.6.134:2181 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.6.134:9092 \ -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \ -v ~/kafka/config:/opt/kafka/config \ -v ~/kafka/logs:/opt/kafka/logs \ -d wurstmeister/kafka
测试:
1 2 3 4 5 6 7 8 9 10 11 docker exec -it kafka /bin/bash cd /opt/kafka_2.11-2.0.0/bin./kafka-console-producer.sh --broker-list localhost:9092 --topic sun docker exec -it zookeeper /bin/bash cd bin./zkCli.sh ls /brokers/topics/sun/partitions
redis1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 docker pull redis mkdir ~/redis cd ~/redismkdir data mkdir conf cd conf vim redis.conf protected-mode no appendonly yes requirepass root docker run --privileged=true -p 6379:6379 --name redis -v /root/redis/data:/data -v /root/redis/conf/redis.conf:/usr/local /etc/redis/redis.conf -d redis redis-server /usr/local /etc/redis/redis.conf --appendonly yes --requirepass "root"
Maxwell将安装包上传到节点/opt/softwar目录
解压到/opt/module/目录下
1 tar -zxvf maxwell-1.29.2.tar.gz -C /opt/module/
修改名称
1 mv maxwell-1.29.2/ maxwell
Mysql1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 docker pull mysql docker pull biarms/mysql:5.7.30-linux-arm64v8 mkdir ~/conf mkdir ~/data mkdir ~/logs vim ~/mysql/conf/my.cnf [mysqld] pid-file = /var/run/mysqld/mysqld.pid socket = /var/run/mysqld/mysqld.sock datadir = /var/lib/mysql secure-file-priv= NULL symbolic-links=0 character-set-server=utf8 [client] default-character-set=utf8 [mysql] default-character-set=utf8 !includedir /etc/mysql/conf.d/ docker run --privileged=true --restart=always -d -v ~/mysql/conf/my.cnf:/etc/mysql/my.cnf -v ~/mysql/logs:/logs -v ~/mysql/data/mysql:/var/lib/mysql -p 3306:3306 --name mysql -e MYSQL_ROOT_PASSWORD=root biarms/mysql:5.7.30-linux-arm64v8
es1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 docker run -d --name es -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:8.3.2 docker cp elasticsearch:/usr/share/elasticsearch/config /opt/module vim elasticsearch.yml xpack.security.enabled: false vim jvm.options -Xms512m -Xmx512m docker run --privileged=true --name es -p 9200:9200 -p 9300:9300 \ -e "discovery.type=single-node" \ -v /opt/module/es/config:/usr/share/elasticsearch/config \ -v /opt/module/es/data:/usr/share/elasticsearch/data \ -v /opt/module/es/plugins:/usr/share/elasticsearch/plugins \ -d elasticsearch:8.3.2
kibana1 2 3 4 5 6 7 8 9 10 11 docker pull kibana:8.3.2 mkdir ~/kibana cd kibanavim kibana.yml server.host: "0.0.0.0" elasticsearch.hosts: ["http://127.0.0.1:9200" ] docker run --privileged=true --name kibana \ -v ~/kibana/config:/usr/share/kibana/config -p 5601:5601 \ -d kibana:8.3.2
这里我发现一个好坑的地方,一定要先启动es,再启动kibana,不然会有问题。
日志数据采集和分流 整体架构 模拟采集日志数据有如下工具:
1 application.yml gmall2020-mock-log-2021-11-29.jar logback.xml path.json
(1)启动一个kafka消费者来测试:
1 ./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic ODS_BASE_LOG_1018
(2)根据实际需要修改 application.yml 的如下配置
(3)生成测试数据
1 java -jar gmall2020-mock-log-2021-11-29.jar
(4)kafka消费者查看数据:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-X8E1qusQ-1663766816464)(https://github.com/1345414527/Spark-real-time-data-warehouse/raw/master/assets/image-20220912211617010.png )]
辅助脚本(1)模拟生成数据log.sh
1 2 3 4 5 6 7 8 # !/bin/bash # 根据传递的日期参数修改配置文件的日期 if [ $# -ge 1 ] then sed -i "/mock.date/c mock.date: $1" /root/spark实时数仓/数据生成器/日志数据生成器/application.yml fi cd /root/spark实时数仓/数据生成器/日志数据生成器; java -jar gmall2020-mock-log-2021-11-29.jar >/dev/null 2>&1 &
(2)Kafka脚本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 # !/bin/bash if [ $# -lt 1 ] then echo "Usage: kf.sh {start|stop|kc [topic]|kp [topic] |list |delete [topic] |describe [topic]}" exit fi case $1 in start) for i in myServer01 hadoop103 hadoop104 do echo "====================> START $i KF <====================" ssh $i kafka-server-start.sh -daemon /opt/module/kafka_2.11-2.4.1/config/server.properties done ;; stop) for i in myServer01 hadoop103 hadoop104 do echo "====================> STOP $i KF <====================" ssh $i kafka-server-stop.sh done ;; kc) if [ $2 ] then kafka-console-consumer.sh --bootstrap-server myServer01:9092,hadoop103:9092,hadoop104:9092 --topic $2 else echo "Usage: kf.sh {start|stop|kc [topic]|kp [topic] |list |delete [topic] |describe [topic]}" fi ;; kp) if [ $2 ] then kafka-console-producer.sh --broker-list myServer01:9092,hadoop103:9092,hadoop104:9092 --topic $2 else echo "Usage: kf.sh {start|stop|kc [topic]|kp [topic] |list |delete [topic] |describe [topic]}" fi ;; list) kafka-topics.sh --list --bootstrap-server myServer01:9092,hadoop103:9092,hadoop104:9092 ;; describe) if [ $2 ] then kafka-topics.sh --describe --bootstrap-server myServer01:9092,hadoop103:9092,hadoop104:9092 --topic $2 else echo "Usage: kf.sh {start|stop|kc [topic]|kp [topic] |list |delete [topic] |describe [topic]}" fi ;; delete) if [ $2 ] then kafka-topics.sh --delete --bootstrap-server myServer01:9092,hadoop103:9092,hadoop104:9092 --topic $2 else echo "Usage: kf.sh {start|stop|kc [topic]|kp [topic] |list |delete [topic] |describe [topic]}" fi ;; *) echo "Usage: kf.sh {start|stop|kc [topic]|kp [topic] |list |delete [topic] |describe [topic]}" exit ;; esac
创建工程 依赖1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 <?xml version="1.0" encoding="UTF-8"?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > <parent > <artifactId > spark-gmall-realtime</artifactId > <groupId > top.codekiller.gmall</groupId > <version > 1.0-SNAPSHOT</version > </parent > <modelVersion > 4.0.0</modelVersion > <artifactId > sparkstream-realtime</artifactId > <properties > <spark.version > 3.0.0</spark.version > <scala.version > 2.12.11</scala.version > <kafka.version > 2.4.1</kafka.version > <project.build.sourceEncoding > UTF-8</project.build.sourceEncoding > <project.reporting.outputEncoding > UTF-8</project.reporting.outputEncoding > <java.version > 1.8</java.version > </properties > <dependencies > <dependency > <groupId > com.alibaba</groupId > <artifactId > fastjson</artifactId > <version > 1.2.62</version > </dependency > <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-core_2.12</artifactId > <version > ${spark.version}</version > </dependency > <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-streaming_2.12</artifactId > <version > ${spark.version}</version > </dependency > <dependency > <groupId > org.apache.kafka</groupId > <artifactId > kafka-clients</artifactId > <version > ${kafka.version}</version > </dependency > <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-streaming-kafka-0-10_2.12</artifactId > <version > ${spark.version}</version > </dependency > <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-sql_2.12</artifactId > <version > ${spark.version}</version > </dependency > <dependency > <groupId > com.fasterxml.jackson.core</groupId > <artifactId > jackson-core</artifactId > <version > 2.11.0</version > </dependency > <dependency > <groupId > org.apache.logging.log4j</groupId > <artifactId > log4j-to-slf4j</artifactId > <version > 2.11.0</version > </dependency > <dependency > <groupId > mysql</groupId > <artifactId > mysql-connector-java</artifactId > <version > 5.1.47</version > </dependency > <dependency > <groupId > redis.clients</groupId > <artifactId > jedis</artifactId > <version > 3.3.0</version > </dependency > <dependency > <groupId > org.elasticsearch</groupId > <artifactId > elasticsearch</artifactId > <version > 7.8.0</version > </dependency > <dependency > <groupId > org.elasticsearch.client</groupId > <artifactId > elasticsearch-rest-high-level-client</artifactId > <version > 7.8.0</version > </dependency > <dependency > <groupId > org.apache.httpcomponents</groupId > <artifactId > httpclient</artifactId > <version > 4.5.10</version > </dependency > </dependencies > <build > <plugins > <plugin > <groupId > net.alchim31.maven</groupId > <artifactId > scala-maven-plugin</artifactId > <version > 3.4.6</version > <executions > <execution > <goals > <goal > compile</goal > <goal > testCompile</goal > </goals > </execution > </executions > </plugin > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-assembly-plugin</artifactId > <version > 3.0.0</version > <configuration > <descriptorRefs > <descriptorRef > jar-with-dependencies</descriptorRef > </descriptorRefs > </configuration > <executions > <execution > <id > make-assembly</id > <phase > package</phase > <goals > <goal > single</goal > </goals > </execution > </executions > </plugin > </plugins > </build > </project >
配置文件config.properties:
1 2 kafka.bootstrap-servers =192.168.6.134:9092
Log4j.properties:
1 2 3 4 5 log4j.appender.atguigu.MyConsole =org.apache.log4j.ConsoleAppender log4j.appender.atguigu.MyConsole.target =System.out log4j.appender.atguigu.MyConsole.layout =org.apache.log4j.PatternLayout log4j.appender.atguigu.MyConsole.layout.ConversionPattern =%d{yyyy-MM-dd HH:mm:ss} %10p (%c:%M) - %m%n log4j.rootLogger =error,atguigu.MyConsole
添加工具类 配置类1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 object MyPropsUtils { private val bundle: ResourceBundle = ResourceBundle .getBundle("config" ) def apply (propsKey: String ):String = { bundle.getString(propsKey) } }
1 2 3 4 5 6 7 8 object MyConfig { val KAFKA_BOOTSTRAP_SERVERS : String = "kafka.bootstrap-servers" }
kafka工具类1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 object MyKafkaUtils { private val consumerConfigs: mutable.Map [String ,Object ] = mutable.Map [String ,Object ]( ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG -> MyPropsUtils (MyConfig .KAFKA_BOOTSTRAP_SERVERS ), ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer" , ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer" , ConsumerConfig .ENABLE_AUTO_COMMIT_CONFIG -> "true" , ConsumerConfig .AUTO_OFFSET_RESET_CONFIG -> "latest" ) def getKafkaDStream (ssc : StreamingContext ,topic: String ,groupId : String ) ={ consumerConfigs.put(ConsumerConfig .GROUP_ID_CONFIG , groupId) val kafkaDStream: InputDStream [ConsumerRecord [String , String ]] = KafkaUtils .createDirectStream(ssc, LocationStrategies .PreferConsistent , ConsumerStrategies .Subscribe [String ,String ](Array (topic),consumerConfigs)) kafkaDStream } var producer : KafkaProducer [String ,String ] = createProducer() def createProducer ():KafkaProducer [String ,String ] = { val producerConfigs : util.HashMap [String , AnyRef ] = new util.HashMap [String ,AnyRef ] producerConfigs.put(ProducerConfig .BOOTSTRAP_SERVERS_CONFIG ,MyPropsUtils (MyConfig .KAFKA_BOOTSTRAP_SERVERS )) producerConfigs.put(ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.StringSerializer" ) producerConfigs.put(ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.StringSerializer" ) producerConfigs.put(ProducerConfig .ACKS_CONFIG , "all" ) producerConfigs.put(ProducerConfig .ENABLE_IDEMPOTENCE_CONFIG , "true" ) val producer: KafkaProducer [String , String ] = new KafkaProducer [String ,String ](producerConfigs) producer } def send (topic:String ,key:String ,msg :String ): Unit ={ producer.send(new ProducerRecord [String ,String ](topic,key,msg)) } def send (topic : String , msg : String ):Unit = { producer.send(new ProducerRecord [String ,String ](topic , msg )) } }
日志数据消费分流 相关bean1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 case class PageActionLog ( mid :String , user_id:String , province_id:String , channel:String , is_new:String , model:String , operate_system:String , version_code:String , brand:String , page_id:String , last_page_id:String , page_item:String , page_item_type:String , during_time:Long , sourceType:String , action_id:String , action_item:String , action_item_type:String , action_ts :Long , ts:Long ) {}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 case class PageDisplayLog ( mid :String , user_id:String , province_id:String , channel:String , is_new:String , model:String , operate_system:String , version_code:String , brand : String , page_id:String , last_page_id:String , page_item:String , page_item_type:String , during_time:Long , sourceType : String , display_type:String , display_item: String , display_item_type:String , display_order:String , display_pos_id:String , ts:Long ) {}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 case class PageLog ( mid :String , user_id:String , province_id:String , channel:String , is_new:String , model:String , operate_system:String , version_code:String , brand : String , page_id:String , last_page_id:String , page_item:String , page_item_type:String , during_time:Long , sourceType : String , ts:Long ) {}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 case class StartLog ( mid :String , user_id:String , province_id:String , channel:String , is_new:String , model:String , operate_system:String , version_code:String , brand : String , entry:String , open_ad_id:String , loading_time_ms:Long , open_ad_ms:Long , open_ad_skip_ms:Long , ts:Long ) {}
消费分流1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 object OdsBaseLogApp { def main (args :Array [String ]): Unit ={ val sparkConf :SparkConf = new SparkConf ().setAppName("ods_base_log_app" ).setMaster("local[3]" ) val ssc: StreamingContext = new StreamingContext (sparkConf,Seconds (5 )) val topicName : String = "ODS_BASE_LOG_1018" val groupId : String = "ODS_BASE_LOG_GROUP_1018" val kafkaDStream : InputDStream [ConsumerRecord [String ,String ]]= MyKafkaUtils .getKafkaDStream(ssc, topicName, groupId) val jsonObjDStream : DStream [JSONObject ] = kafkaDStream.map( consumerRecord =>{ val log: String = consumerRecord.value() val jsonObj: JSONObject = JSON .parseObject(log) jsonObj } ) val DWD_PAGE_LOG_TOPIC : String = "DWD_PAGE_LOG_TOPIC_1018" val DWD_PAGE_DISPLAY_TOPIC : String = "DWD_PAGE_DISPLAY_TOPIC_1018" val DWD_PAGE_ACTION_TOPIC : String = "DWD_PAGE_ACTION_TOPIC_1018" val DWD_START_LOG_TOPIC : String = "DWD_START_LOG_TOPIC_1018" val DWD_ERROR_LOG_TOPIC : String = "DWD_ERROR_LOG_TOPIC_1018" jsonObjDStream.foreachRDD( rdd => { rdd.foreach( jsonObj =>{ val errObj: JSONObject = jsonObj.getJSONObject("err" ) if (errObj != null ){ MyKafkaUtils .send(DWD_ERROR_LOG_TOPIC , jsonObj.toJSONString ) }else { val commonObj: JSONObject = jsonObj.getJSONObject("common" ) val ar: String = commonObj.getString("ar" ) val uid: String = commonObj.getString("uid" ) val os: String = commonObj.getString("os" ) val ch: String = commonObj.getString("ch" ) val isNew: String = commonObj.getString("is_new" ) val md: String = commonObj.getString("md" ) val mid: String = commonObj.getString("mid" ) val vc: String = commonObj.getString("vc" ) val ba: String = commonObj.getString("ba" ) val ts: Long = jsonObj.getLong("ts" ) val pageObj: JSONObject = jsonObj.getJSONObject("page" ) if (pageObj != null ){ val pageId: String = pageObj.getString("page_id" ) val pageItem: String = pageObj.getString("item" ) val pageItemType: String = pageObj.getString("item_type" ) val duringTime: Long = pageObj.getLong("during_time" ) val lastPageId: String = pageObj.getString("last_page_id" ) val sourceType: String = pageObj.getString("source_type" ) var pageLog = PageLog (mid,uid,ar,ch,isNew,md,os,vc,ba,pageId,lastPageId,pageItem,pageItemType,duringTime,sourceType,ts) MyKafkaUtils .send(DWD_PAGE_LOG_TOPIC , JSON .toJSONString(pageLog , new SerializeConfig (true ))) val displaysJsonArr: JSONArray = jsonObj.getJSONArray("displays" ) if (displaysJsonArr != null && displaysJsonArr.size() > 0 ){ for (i <- 0 until displaysJsonArr.size()){ val displayObj: JSONObject = displaysJsonArr.getJSONObject(i) val displayType: String = displayObj.getString("display_type" ) val displayItem: String = displayObj.getString("item" ) val displayItemType: String = displayObj.getString("item_type" ) val posId: String = displayObj.getString("pos_id" ) val order: String = displayObj.getString("order" ) val pageDisplayLog = PageDisplayLog (mid,uid,ar,ch,isNew,md,os,vc,ba,pageId,lastPageId,pageItem,pageItemType,duringTime,sourceType,displayType,displayItem,displayItemType,order,posId,ts) MyKafkaUtils .send(DWD_PAGE_DISPLAY_TOPIC , JSON .toJSONString(pageDisplayLog , new SerializeConfig (true ))) } } val actionJsonArr: JSONArray = jsonObj.getJSONArray("actions" ) if (actionJsonArr != null && actionJsonArr.size() > 0 ){ for (i <- 0 until actionJsonArr.size()){ val actionObj: JSONObject = actionJsonArr.getJSONObject(i) val actionId: String = actionObj.getString("action_id" ) val actionItem: String = actionObj.getString("item" ) val actionItemType: String = actionObj.getString("item_type" ) val actionTs: Long = actionObj.getLong("ts" ) var pageActionLog = PageActionLog (mid,uid,ar,ch,isNew,md,os,vc,ba,pageId,lastPageId,pageItem,pageItemType,duringTime,sourceType,actionId,actionItem,actionItemType,actionTs,ts) MyKafkaUtils .send(DWD_PAGE_ACTION_TOPIC , JSON .toJSONString(pageActionLog , new SerializeConfig (true ))) } } } val startJsonObj: JSONObject = jsonObj.getJSONObject("start" ) if (startJsonObj != null ){ val entry: String = startJsonObj.getString("entry" ) val loadingTime: Long = startJsonObj.getLong("loading_time" ) val openAdId: String = startJsonObj.getString("open_ad_id" ) val openAdMs: Long = startJsonObj.getLong("open_ad_ms" ) val openAdSkipMs: Long = startJsonObj.getLong("open_ad_skip_ms" ) var startLog = StartLog (mid,uid,ar,ch,isNew,md,os,vc,ba,entry,openAdId,loadingTime,openAdMs,openAdSkipMs,ts) MyKafkaUtils .send(DWD_START_LOG_TOPIC , JSON .toJSONString(startLog ,new SerializeConfig (true ))) } } } ) } ) ssc.start() ssc.awaitTermination() } }
测试:
启动五个消费者:
1 2 3 4 5 kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic DWD_PAGE_LOG_TOPIC_1018 kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic DWD_PAGE_DISPLAY_TOPIC_1018 kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic DWD_PAGE_ACTION_TOPIC_1018 kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic DWD_START_LOG_TOPIC_1018 kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic DWD_ERROR_LOG_TOPIC_1018
然后运行log.sh脚本产生日志数据,最后启动OdsBaseLogApp.main()函数。
就可以看到五个消费者消费的数据了。
优化:精确一次消费 相关语义 至少一次消费(at least once):主要是保证数据不会丢失,但有可能存在数据重复问题; 最多一次消费 (at most once):主要是保证数据不会重复,但有可能存在数据丢失问题; 精确一次消费(Exactly-once):指消息一定会被处理且只会被处理一次。不多不少就一次处理。如果达不到精确一次消费,可能会达到另外两种情况。 消费问题首先我们知道,kafka内部其实维护了消费者对于某个topic的偏移量,但是这并不能满足我们的开发过程中遇到的问题:
(1)漏消费(丢失数据)
比如实时计算任务进行计算,到数据结果存盘之前,进程崩溃,假设在进程崩溃前 kafka调整了偏移量,那么 kafka 就会认为数据已经被处理过,即使进程重启,kafka 也会从新的偏移量开始,所以之前没有保存的数据就被丢失掉了。
(2)重复消费(重复计算)
如果数据计算结果已经存盘了,在 kafka 调整偏移量之前,进程崩溃,那么 kafka 会认为数据没有被消费,进程重启,会重新从旧的偏移量开始,那么数据就会被 2 次消费,又会被存盘,数据就被存了 2 遍,造成数据重复。
(3)如果同时解决了数据丢失和数据重复的问题,那么就实现了精确一次消费的语义了。
目前 Kafka 默认每 5 秒钟做一次自动提交偏移量,这样并不能保证精准一次消费。
1 2 enable.auto.commit 的默认值是 true ;就是默认采用自动提交的机制。 auto.commit.interval.ms 的默认值是 5000,单位是毫秒
策略一:事务(1)策略: 利用关系型数据库的事务进行处理
出现丢失或者重复的问题,核心就是偏移量的提交与数据的保存,不是原子性的。如果能做成要么数据保存和偏移量都成功,要么两个失败,那么就不会出现丢失或者重复了。
这样的话可以把存数据和修改偏移量放到一个事务里。这样就做到前面的成功,如果后面做失败了,就回滚前面那么就达成了原子性,这种情况先存数据还是先修改偏移量没影响。
(2)好处
事务方式能够保证精准一次性消费
(3)问题与限制
数据必须都要放在一个关系型数据库中,无法使用其他功能强大的 nosql 数据库; 事务本身性能不好; 如果保存的数据量较大一个数据库节点不够,多个节点的话,还要考虑分布式事务的问题。分布式事务会带来管理的复杂性,一般企业不选择使用,有的企业会把分布式事务变成本地事务,例如把 Executor 上的数据通过 rdd.collect 算子提取到 Driver 端,由Driver 端统一写入数据库,这样会将分布式事务变成本地事务的单线程操作,降低了写入的吞吐量。 (4)使用场景
数据足够少(通常经过聚合后的数据量都比较小,明细数据一般数据量都比较大),并且支持事务的数据库。
策略二:后置提交offset+幂等方案(1)策略:手动提交偏移量 + 幂等性处理
我们知道如果能够同时解决数据丢失和数据重复问题,就等于做到了精确一次消费。那就各个击破
首先解决数据丢失问题,办法就是要等数据保存成功后再提交偏移量,所以就必须手工来控制偏移量的提交时机。
但是如果数据保存了,没等偏移量提交进程挂了,数据会被重复消费。怎么办?那就要把数据的保存做成幂等性保存。即同一批数据反复保存多次,数据不会翻倍,保存一次和保存一百次的效果是一样的。如果能做到这个,就达到了幂等性保存,就不用担心数据会重复了。
(2)难点
话虽如此,在实际的开发中手动提交偏移量其实不难,难的是幂等性的保存,有的时候并不一定能保证,这个需要看使用的数据库,如果数据库本身不支持幂等性操作,那只能优先保证的数据不丢失,数据重复难以避免,即只保证了至少一次消费的语义。
一般有主键的数据库都支持幂等性操作 upsert。
(3)使用场景
处理数据较多,或者数据保存在不支持事务的数据库上。
手动提交偏移流程 (1)偏移量保存在哪
kafka 0.9版本以后consumer的偏移量是保存在kafka的**__consumer_offsets主题中。但是如果用这种方式管理偏移量,有一个限制就是在提交偏移量时,数据流的元素结构不能发生转变,即 提交偏移量时数据流,必须是 InputDStream[ConsumerRecord[String, String]]**这种结构。但是在实际计算中,数据难免发生转变,或聚合,或关联,一旦发生转变,就无法在利用以下语句进行偏移量的提交:
1 xxDstream.asInstanceOf[CanCommitOffsets ].commitAsync(offsetRanges)
因为 offset 的存储于 HasOffsetRanges ,只有 kafkaRDD 继承了他,所以假如我们对KafkaRDD 进行了转化之后,其它 RDD 没有继承 HasOffsetRanges,所以就无法再获取 offset了。
所以实际生产中通常会利用 ZooKeeper,Redis,Mysql 等工具手动对偏移量进行保存 。
(2)流程
代码实现 (1)在 config.properties 中添加 redis 的连接配置
1 2 3 4 5 6 7 kafka.bootstrap-servers =192.168.6.134:9092 redis.host =192.168.6.134 redis.port =6379 redis.pwd = root
(2)添加 Redis 工具类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 object MyRedisUtils { var jedisPool : JedisPool = null def getJedisFromPool (): Jedis = { if (jedisPool == null ){ val jedisPoolConfig = new JedisPoolConfig () jedisPoolConfig.setMaxTotal(100 ) jedisPoolConfig.setMaxIdle(20 ) jedisPoolConfig.setMinIdle(20 ) jedisPoolConfig.setBlockWhenExhausted(true ) jedisPoolConfig.setMaxWaitMillis(5000 ) jedisPoolConfig.setTestOnBorrow(true ) val host:String = MyPropsUtils (MyConfig .REDIS_HOST ) val port:String = MyPropsUtils (MyConfig .REDIS_PORT ) val pwd:String = MyPropsUtils (MyConfig .REDIS_PWD ) jedisPool = new JedisPool (jedisPoolConfig,host,port.toInt,5000 ,pwd) } jedisPool.getResource } }
(3)添加偏移量工具类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 object MyOffsetUtils { def saveOffset (topic :String ,groupId :String ,offsetRanges: Array [OffsetRange ]): Unit ={ if (offsetRanges!=null && offsetRanges.length>0 ){ val offsets : util.HashMap [String ,String ] = new util.HashMap [String ,String ]() for (offsetRange <- offsetRanges){ val partition:Int = offsetRange.partition val endoffset:Long = offsetRange.untilOffset offsets.put(partition.toString,endoffset.toString) } println("提交offset: " + offsets) val jedis:Jedis = MyRedisUtils .getJedisFromPool() val redisKey:String = s"offsets:$topic :$groupId " jedis.hset(redisKey,offsets) jedis.close() } } def readOffset (topic: String , groupId : String ): Map [TopicPartition ,Long ] ={ val jedis: Jedis = MyRedisUtils .getJedisFromPool() val redisKey : String = s"offsets:$topic :$groupId " val offsets: util.Map [String , String ] = jedis.hgetAll(redisKey) println("读取到offset: " + offsets) val results: mutable.Map [TopicPartition , Long ] = mutable.Map [TopicPartition ,Long ]() import scala.collection.JavaConverters ._ for ((partition,offset) <- offsets.asScala) { val tp: TopicPartition = new TopicPartition (topic,partition.toInt) results.put(tp, offset.toLong) } jedis.close() results.toMap } }
(4)修补主程序的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 object OdsBaseLogApp { def main (args :Array [String ]): Unit ={ val sparkConf :SparkConf = new SparkConf ().setAppName("ods_base_log_app" ).setMaster("local[3]" ) val ssc: StreamingContext = new StreamingContext (sparkConf,Seconds (5 )) val topicName : String = "ODS_BASE_LOG_1018" val groupId : String = "ODS_BASE_LOG_GROUP_1018" val offsets:Map [TopicPartition ,Long ]= MyOffsetUtils .readOffset(topicName, groupId) var kafkaDStream : InputDStream [ConsumerRecord [String ,String ]] = null if (offsets!=null && offsets.nonEmpty){ kafkaDStream = MyKafkaUtils .getKafkaDStream(ssc,topicName,groupId,offsets) }else { kafkaDStream = MyKafkaUtils .getKafkaDStream(ssc, topicName, groupId) } var offsetRanges: Array [OffsetRange ] = null val offsetRangesDStream: DStream [ConsumerRecord [String , String ]] = kafkaDStream.transform( rdd => { offsetRanges = rdd.asInstanceOf[HasOffsetRanges ].offsetRanges rdd } ) val jsonObjDStream: DStream [JSONObject ] = offsetRangesDStream.map( consumerRecord => { val log: String = consumerRecord.value() val jsonObj: JSONObject = JSON .parseObject(log) jsonObj } ) val DWD_PAGE_LOG_TOPIC : String = "DWD_PAGE_LOG_TOPIC_1018" val DWD_PAGE_DISPLAY_TOPIC : String = "DWD_PAGE_DISPLAY_TOPIC_1018" val DWD_PAGE_ACTION_TOPIC : String = "DWD_PAGE_ACTION_TOPIC_1018" val DWD_START_LOG_TOPIC : String = "DWD_START_LOG_TOPIC_1018" val DWD_ERROR_LOG_TOPIC : String = "DWD_ERROR_LOG_TOPIC_1018" jsonObjDStream.foreachRDD( rdd => { rdd.foreachPartition( jsonObjIter => { for (jsonObj <- jsonObjIter) { val errObj: JSONObject = jsonObj.getJSONObject("err" ) if (errObj != null ){ MyKafkaUtils .send(DWD_ERROR_LOG_TOPIC , jsonObj.toJSONString ) }else { val commonObj: JSONObject = jsonObj.getJSONObject("common" ) val ar: String = commonObj.getString("ar" ) val uid: String = commonObj.getString("uid" ) val os: String = commonObj.getString("os" ) val ch: String = commonObj.getString("ch" ) val isNew: String = commonObj.getString("is_new" ) val md: String = commonObj.getString("md" ) val mid: String = commonObj.getString("mid" ) val vc: String = commonObj.getString("vc" ) val ba: String = commonObj.getString("ba" ) val ts: Long = jsonObj.getLong("ts" ) val pageObj: JSONObject = jsonObj.getJSONObject("page" ) if (pageObj != null ){ val pageId: String = pageObj.getString("page_id" ) val pageItem: String = pageObj.getString("item" ) val pageItemType: String = pageObj.getString("item_type" ) val duringTime: Long = pageObj.getLong("during_time" ) val lastPageId: String = pageObj.getString("last_page_id" ) val sourceType: String = pageObj.getString("source_type" ) var pageLog = PageLog (mid,uid,ar,ch,isNew,md,os,vc,ba,pageId,lastPageId,pageItem,pageItemType,duringTime,sourceType,ts) MyKafkaUtils .send(DWD_PAGE_LOG_TOPIC , JSON .toJSONString(pageLog , new SerializeConfig (true ))) val displaysJsonArr: JSONArray = jsonObj.getJSONArray("displays" ) if (displaysJsonArr != null && displaysJsonArr.size() > 0 ){ for (i <- 0 until displaysJsonArr.size()){ val displayObj: JSONObject = displaysJsonArr.getJSONObject(i) val displayType: String = displayObj.getString("display_type" ) val displayItem: String = displayObj.getString("item" ) val displayItemType: String = displayObj.getString("item_type" ) val posId: String = displayObj.getString("pos_id" ) val order: String = displayObj.getString("order" ) val pageDisplayLog = PageDisplayLog (mid,uid,ar,ch,isNew,md,os,vc,ba,pageId,lastPageId,pageItem,pageItemType,duringTime,sourceType,displayType,displayItem,displayItemType,order,posId,ts) MyKafkaUtils .send(DWD_PAGE_DISPLAY_TOPIC , JSON .toJSONString(pageDisplayLog , new SerializeConfig (true ))) } } val actionJsonArr: JSONArray = jsonObj.getJSONArray("actions" ) if (actionJsonArr != null && actionJsonArr.size() > 0 ){ for (i <- 0 until actionJsonArr.size()){ val actionObj: JSONObject = actionJsonArr.getJSONObject(i) val actionId: String = actionObj.getString("action_id" ) val actionItem: String = actionObj.getString("item" ) val actionItemType: String = actionObj.getString("item_type" ) val actionTs: Long = actionObj.getLong("ts" ) var pageActionLog = PageActionLog (mid,uid,ar,ch,isNew,md,os,vc,ba,pageId,lastPageId,pageItem,pageItemType,duringTime,sourceType,actionId,actionItem,actionItemType,actionTs,ts) MyKafkaUtils .send(DWD_PAGE_ACTION_TOPIC , JSON .toJSONString(pageActionLog , new SerializeConfig (true ))) } } } val startJsonObj: JSONObject = jsonObj.getJSONObject("start" ) if (startJsonObj != null ){ val entry: String = startJsonObj.getString("entry" ) val loadingTime: Long = startJsonObj.getLong("loading_time" ) val openAdId: String = startJsonObj.getString("open_ad_id" ) val openAdMs: Long = startJsonObj.getLong("open_ad_ms" ) val openAdSkipMs: Long = startJsonObj.getLong("open_ad_skip_ms" ) var startLog = StartLog (mid,uid,ar,ch,isNew,md,os,vc,ba,entry,openAdId,loadingTime,openAdMs,openAdSkipMs,ts) MyKafkaUtils .send(DWD_START_LOG_TOPIC , JSON .toJSONString(startLog ,new SerializeConfig (true ))) } } } MyKafkaUtils .flush() } ) MyOffsetUtils .saveOffset(topicName,groupId,offsetRanges) } ) ssc.start() ssc.awaitTermination() } }
幂等性操作目前处理完的数据写到了 kafka,如果程序出现宕机重试,kafka 是没有办法通过唯一性标识实现幂等性识别,但是也没有关系,因为 kafka 中的数据只是用于中间存储,并不会进行统计,所以只要保证不丢失即可,重复数据的幂等性处理可以交给下游处理,只要保证最终统计结果是不会有重复即可。
优化:kafka 消息发送问题 缓冲区问题Kafka 消息的发送分为同步发送和异步发送。 Kafka 默认使用异步发送的方式。Kafka的生产者将消息进行发送时,会先将消息发送到缓冲区中,待缓冲区写满或者到达指定的时间,才会真正的将缓冲区的数据写到 Broker。
假设消息发送到缓冲区中还未写到 Broker,我们认为数据已经成功写给了 Kafka,接下来会手动的提交 offset, 如果 offset 提交成功,但此刻 Kafka 集群突然出现故障。 缓冲区的数据会丢失,最终导致的问题就是数据没有成功写到 Kafka ,而 offset 已经提交,此部分的数据就会被漏掉。
问题解决 – 策略一将消息的发送修改为同步发送,保证每条数据都能发送到 Broker。 但带来的问题就是消息是一条一条写给 Broker,会牺牲性能,一般不推荐。
问题解决 – 策略二 **(1)**策略: 在手动提交 offset 之前,强制将缓冲区的数据 flush 到 broker
Kafka 的生产者对象提供了 flush 方法, 可以强制将缓冲区的数据刷到 Broker。
(2)修补主程序代码
上诉代码修改过了。这里不列出来了。
业务数据采集和分流 整体架构业务数据库的采集主要是基于对数据库的变化的实时监控。目前市面上的开源产品主要是 Canal 和 Maxwell 。要利用这些工具实时采集数据到 kafka,以备后续处理。
Maxwell 采集的日志数据,默认是放在一个统一的 kafka 的 Topic 中,为了后续方便处理要进行以表为单位拆分到不同 kafka 的 Topic 中。
针对维度数据,要单独保存。通常考虑用 redis、hbase、mysql、kudu 等通过唯一键查询性能较快的数据库中。
Maxwell简介 什么是MaxwellMaxwell 是由美国 Zendesk 公司开源,用 Java 编写的 MySQL 变更数据抓取软件。它会实时监控 Mysql 数据库的数据变更操作(包括 insert、update、delete),并将变更数据以 JSON格式发送给 Kafka、Kinesi 等流数据处理平台。
官网地址:http://maxwells-daemon.io/
MySQL主从复制(1)主从复制的应用场景如下
做数据库的热备:主数据库服务器故障后,可切换到从数据库继续工作。 分离:主数据库只负责业务数据的写入操作,而多个从数据库只负责业务数据的查询工作,在读多写少场景下,可以提高数据库工作效率。 (2)MySQL主从复制工作原理
Master 主库将数据变更记录,写到二进制日志(binary log)中 Slave 从库向 mysql master 发送 dump 协议,将 master 主库的 binary log events 拷贝到它的中继日志(relay log) Slave 从库读取并回放中继日志中的事件,将改变的数据同步到自己的数据库 (3)binlog
(1)什么是 binlog
MySQL 的二进制日志可以说 MySQL 最重要的日志了,它记录了所有的 DDL 和 DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL 的二进制日志是事务安全型的。
一般来说开启二进制日志大概会有 1%的性能损耗。
二进制日志包括两类文件:二进制日志索引文件(文件名后缀为.index)用于记录所有的二进制文件,二进制日志文件(文件名后缀为.00000*)记录数据库所有的 DDL 和 DML(除了数据查询语句)语句事件。
(2 )binlog 的分类设置
mysql binlog 的格式有三种,分别是 STATEMENT,MIXED,ROW。
1 binlog_format= statement|mixed|row
statement :语句级,binlog 会记录每次一执行写操作的语句。相对 row 模式节省空间,但是可能产生不一致性,比如 update tt set create_date=now() 如果用 binlog 日志进行恢复,由于执行时间不同可能产生的数据就不同。
优点: 节省空间
缺点: 有可能造成数据不一致。
row :行级, binlog 会记录每次操作后每行记录的变化。
优点:保持数据的绝对一致性。因为不管 sql 是什么,引用了什么函数,他只记录执行后的效果。
缺点:占用较大空间
mixed:statement 的升级版,一定程度上解决了因为一些情况而造成的 statement 模式不一致问题 。默认还是 statement,在某些情况下譬如:当函数中包含 UUID() 时;包含AUTO_INCREMENT 字段的表被更新时;执行 INSERT DELAYED 语句时;用 UDF 时;会按照 ROW 的方式进行处理
优点:节省空间,同时兼顾了一定的一致性。
缺点:还有些极个别情况依旧会造成不一致,另外 statement 和 mixed 对于需要对binlog 的监控的情况都不方便。
综合上面对比,Maxwell 想做监控分析,选择 row 格式比较合适。
Maxwell 工作原理 很简单,就是将自己伪装成 slave,并遵循 MySQL 主从复制的协议,从 master 同步数据。
采集业务数据 MySQL 部分 创建业务数据库(直接使用离线数仓的即可)
导入数据表(直接使用离线数仓的即可)
1 2 3 docker cp gmall.sql mysql:/ var docker exec - it mysql / bin/ bash mysql - uroot - proot gmall < gmall.sql
修改/etc/my.cnf 文件,开启 binlog
1 2 3 4 server-id=1 log-bin=mysql-bin binlog_format=row binlog-do-db=gmall
重启 MySQL 使配置生效
到**/var/lib/mysql **目录下查看是否生成 binlog 文件
模拟数据上传 jar 和 properties 文件上传到/opt/module/db_log目录下 ;
修改 application.properties 中数据库连接信息
1 2 3 4 spring.datasource.driver-class-name =com.mysql.jdbc.Driver spring.datasource.url =jdbc:mysql://192.168.6.134:3306/gmall?characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8&AllowPublicKeyRetrieval=True spring.datasource.username =root spring.datasource.password =root
给 maxwell 创建用户并赋权限
1 2 CREATE USER 'maxwell' @'%' IDENTIFIED BY 'maxwell' ;GRANT SELECT , REPLICATION CLIENT, REPLICATION SLAVE ON * .* TO 'maxwell' @'%' ;
创建数据库,用于存储 Maxwell 运行过程中的一些数据,包括 binlog 同步的断点位置等
1 2 create database maxwell;GRANT ALL ON maxwell.* TO 'maxwell' @'%' ;
修改 Maxwell 配置文件名字
1 mv config.properties.example config.properties
修改 Maxwell 配置文件内容
1 2 3 4 5 6 7 8 9 10 11 stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis producer=kafka kafka.bootstrap.servers=192.168.6.134:9092 kafka_topic=ODS_BASE_DB_M host=127.0.0.1 user=maxwell password=maxwell jdbc_options=useSSL=false &serverTimezone=Asia/Shanghai
启动 Maxwell
1 bin/maxwell --config config.properties --daemon
查看进程:
启动 Kafka 消费客户端测试,查看消费情况
1 kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic ODS_BASE_DB_M
模拟生成数据
1 java -jar gmall2020-mock-db-2021-01-22.jar
创建一个脚本:
1 2 3 4 5 6 7 8 # !/bin/bash # 根据传递的日期参数修改配置文件的日期 if [ $# -ge 1 ] then sed -i "/mock.date/c mock.date: $1" /root/spark实时数仓/数据生成器/业务数据生成器/application.properties fi cd /root/spark实时数仓/数据生成器/业务数据生成器; java -jar gmall2020-mock-db-2021-01-22.jar >/dev/null 2>&1 &
业务数据消费分流Maxwell 会追踪整个数据库的变更,把所有的数据变化都发到一个 topic 中了,但是为了后续处理方便,应该将事实表的数据分流到不同的 topic 中,将维度表的数据写入到 redis中 。
分流业务代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 object OdsbaseDbApp { def main (args: Array [String ]): Unit = { val sparkConf: SparkConf = new SparkConf ().setAppName("ods_base_db_app" ).setMaster("local[4]" ) val ssc: StreamingContext = new StreamingContext (sparkConf , Seconds (5 )) val topicName : String = "ODS_BASE_DB_M" val groupId : String = "ODS_BASE_DB_GROUP" val offsets: Map [TopicPartition , Long ] = MyOffsetUtils .readOffset(topicName, groupId) var kafkaDStream: InputDStream [ConsumerRecord [String , String ]] = null if (offsets != null && offsets.nonEmpty){ kafkaDStream = MyKafkaUtils .getKafkaDStream(ssc,topicName,groupId,offsets) }else { kafkaDStream = MyKafkaUtils .getKafkaDStream(ssc,topicName,groupId) } var offsetRanges: Array [OffsetRange ] = null val offsetRangesDStream : DStream [ConsumerRecord [String , String ]] = kafkaDStream.transform( rdd => { offsetRanges = rdd.asInstanceOf[HasOffsetRanges ].offsetRanges rdd } ) val jsonObjDStream: DStream [JSONObject ] = offsetRangesDStream.map( consumerRecord => { val dataJson: String = consumerRecord.value() val jSONObject: JSONObject = JSON .parseObject(dataJson) jSONObject } ) jsonObjDStream.print(100 ) val factTables : Array [String ] = Array [String ]( "order_info" ,"order_detail" ) val dimTables : Array [String ] = Array [String ]("user_info" , "base_province" ) jsonObjDStream.foreachRDD( rdd => { rdd.foreachPartition( jsonObjIter => { val jedis: Jedis = MyRedisUtils .getJedisFromPool() for (jsonObj <- jsonObjIter) { val operType: String = jsonObj.getString("type" ) val opValue: String = operType match { case "bootstrap-insert" => "I" case "insert" => "I" case "update" => "U" case "delete" => "D" case _ => null } if (opValue != null ){ val tableName: String = jsonObj.getString("table" ) if (factTables.contains(tableName)){ val data: String = jsonObj.getString("data" ) val dwdTopicName : String = s"DWD_${tableName.toUpperCase} _${opValue} _1018" MyKafkaUtils .send(dwdTopicName , data ) if (tableName.equals("order_detail" )){ Thread .sleep(200 ) } } if (dimTables.contains(tableName)){ val dataObj: JSONObject = jsonObj.getJSONObject("data" ) val id: String = dataObj.getString("id" ) val redisKey : String = s"DIM:${tableName.toUpperCase} :$id " jedis.set(redisKey, dataObj.toJSONString) } } } jedis.close() MyKafkaUtils .flush() } ) MyOffsetUtils .saveOffset(topicName, groupId,offsetRanges) } ) ssc.start() ssc.awaitTermination() } }
当我们修改user_info中的某行数据,在redis中可以看到变化。
历史维度数据初始引导Maxwell 提供了 bootstrap 功能来进行历史数据的全量同步,命令如下:
1 bin/maxwell-bootstrap --config config.properties --database gmall --table user_info
Bootstrap 数据格式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 { "database" : "fooDB" , "table" : "barTable" , "type" : "bootstrap-start" , "ts" : 1450557744, "data" : {} }{ "database" : "fooDB" , "table" : "barTable" , "type" : "bootstrap-insert" , "ts" : 1450557744, "data" : { "txt" : "hello" } }{ "database" : "fooDB" , "table" : "barTable" , "type" : "bootstrap-insert" , "ts" : 1450557744, "data" : { "txt" : "bootstrap!" } }{ "database" : "fooDB" , "table" : "barTable" , "type" : "bootstrap-complete" , "ts" : 1450557744, "data" : {} }
注意事项:
第一条 type 为 bootstrap-start 和最后一条 type 为 bootstrap-complete 的数据,是 bootstrap 开始和结束的标志,不包含数据,中间的 type 为 bootstrap-insert 的数据才包含数据。
一次 bootstrap 输出的所有记录的 ts 都相同,为 bootstrap 开始的时间。
动态配置表清单业务数据表包括了事实表和维度表,而对这两种表要进行不同的处理,在前面我们区分表是用了一个数组:
1 2 3 4 val factTables : Array [String ] = Array [String ]( "order_info" ,"order_detail" )val dimTables : Array [String ] = Array [String ]("user_info" , "base_province" )
但是这样的话每次都需要修改代码,改进的方法:
在一个文件中保存表名称,处理的时候进行动态加载; 将表名称保存到redis中,动态加载。 以redis为例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 jsonObjDStream.foreachRDD( rdd => { val redisFactKeys : String = "FACT:TABLES" val redisDimKeys : String = "DIM:TABLES" val jedis: Jedis = MyRedisUtils .getJedisFromPool() val factTables: util.Set [String ] = jedis.smembers(redisFactKeys) println("factTables: " + factTables) val factTablesBC: Broadcast [util.Set [String ]] = ssc.sparkContext.broadcast(factTables) val dimTables: util.Set [String ] = jedis.smembers(redisDimKeys) println("dimTables: " + dimTables) val dimTablesBC: Broadcast [util.Set [String ]] = ssc.sparkContext.broadcast(dimTables) jedis.close() rdd.foreachPartition( jsonObjIter => { val jedis: Jedis = MyRedisUtils .getJedisFromPool() for (jsonObj <- jsonObjIter) { val operType: String = jsonObj.getString("type" ) val opValue: String = operType match { case "bootstrap-insert" => "I" case "insert" => "I" case "update" => "U" case "delete" => "D" case _ => null } if (opValue != null ){ val tableName: String = jsonObj.getString("table" ) if (factTablesBC.value.contains(tableName)){ val data: String = jsonObj.getString("data" ) val dwdTopicName : String = s"DWD_${tableName.toUpperCase} _${opValue} _1018" MyKafkaUtils .send(dwdTopicName , data ) if (tableName.equals("order_detail" )){ Thread .sleep(200 ) } } if (dimTablesBC.value.contains(tableName)){ val dataObj: JSONObject = jsonObj.getJSONObject("data" ) val id: String = dataObj.getString("id" ) val redisKey : String = s"DIM:${tableName.toUpperCase} :$id " jedis.set(redisKey, dataObj.toJSONString) } } } jedis.close() MyKafkaUtils .flush() } ) MyOffsetUtils .saveOffset(topicName, groupId,offsetRanges) } )
用set数据结构保存了每种表的所有信息,读取后并不是直接使用,而是做成一个广播变量 。当然这里因为数据量比较少,直接使用也是可以的。
广播变量如果我们要在分布式计算里面分发大对象,例如:字典,集合,黑白名单等,这个都会由Driver端进行分发,一般来讲,如果这个变量不是广播变量,那么每个task就会分发一份,这在task数目十分多的情况下Driver的带宽会成为系统的瓶颈,而且会大量消耗task服务器上的资源 ,如果将这个变量声明为广播变量,那么知识每个executor拥有一份,这个executor启动的task会共享这个变量,节省了通信的成本和服务器的资源。
不使用广播变量:
使用广播变量:
注意:
能不能将一个RDD使用广播变量广播出去?不能,因为RDD是不存储数据的。可以将RDD的结果广播出去。 广播变量只能在Driver端定义,不能在Executor端定义。 在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。 如果executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本。 如果Executor端用到了Driver的变量,如果使用广播变量在每个Executor中只有一份Driver端的变量副本。 driver向executor传数据考虑两个问题:
能不能传?即能不能序列化? 传的数据大不大?大的话使用广播变量。 数据处理顺序性在实时计算中,对业务数据的计算,要考虑到数据处理的顺序, 即能否依照数据改变的顺序进行处理。
假设一个场景,如果将某个用户数据的姓名字段进行多次更改,由原先的 A 改为 B 再改为 C,在数据库层面最终的结果为 C, 但是我们能否保证数据经过实时处理后,在 DIM层存储的结果也为 C,可不可能存储的结果为 B。
我们依次审视一下,在实时处理的各个环节中,是否能保证数据的顺序?如果不能保证,是在哪个环节出的问题,最终导致存储的结果不正确。
解决:
通过分析,目前我们的计算过程中,只有可能在 Kafka 环节出现数据乱序,导致最终存储的结果不正确。如果想要保证数据处理的顺序性,我们可以将同一条数据的修改发往 topic的同一个分区中 。需要修改 maxwell 的配置文件,指定发送数据到 kafka 时要使用分区键。
修改config.properties 文件中的如下配置 :
1 2 3 4 5 6 7 8 9 10 11 12 13 producer_partition_by =column producer_partition_columns =id producer_partition_by_fallback =table
producer_partition_by=column:以某一列进行分区; producer_partition_columns=id:以id列进行分区; producer_partition_by_fallback=table:如果id列并不存在,则以table进行分区。 DWD 到 DWS首先明确的是数据的聚合操作,一律交给 OLAP 来完成,因为 OLAP 数据库的特性都是非常利于数据聚合统计的,可以说这也是 OLAP 的本职工作。
那么在此之前实时计算就要完成一些 OLAP 不是特别方便或者性能并不好的操作,比如 JOIN 操作,比如分组去重(需要开窗)或者复杂的数据计算等等,实际情况还要看 OLAP的选型。那这些工作可以交由实时计算完成。
从 ODS 到 DWD 层主要负责原始数据的整理拆分,形成一个一个的业务事实 topic。
从 DWD 层到 DWS 层主要负责把单个的业务事实 topic 变为面向统计的事实明细宽表,然后保存到 OLAP 中。
我的理解来看,就是将DWD中的日志数据或者业务数据关联一些DIM层的维度数据,又或者业务数据进行join,形成一张包含细致数据的宽表,从而省去了在OLAP数据库中的join操作。
日活宽表 需求举例(1)当日用户首次登录(日活)分时趋势图
(2)不同渠道柱形对比图
任务分析(1)去重
由于日活代表了用户当日的首次访问,因此除了当日的首次访问以外的其他访问,一律需要过滤掉。每个用户每天可能启动多次。要想计算日活,我们只需要把当前用户每天的第一次启动日志获取即可,所以要对启动日志进行去重,相当于做了一次清洗。
实时计算中的去重是一个比较常见的需求,可以有许多方式实现,比如:将状态存在Redis 中;存在关系型数据库中;通过 Spark 自身的 updateStateByKey(checkPoint 小文件等问题比较麻烦、不方便管理、程序无法变更升级)等。
我们这里结合 Redis 实现对当前用户启动日志去重操作
(2)维度关联
由于要针对各种对于不同角度的“日活”分析,而 OLAP 数据库中尽量减少 join 操作,所以在实时计算中要考虑其会被分析的维度,补充响应相应的维度数据,形成宽表。
由于维度数据已经保存在固定容器中了(redis),所以在实时计算中,维度与事实数据关联并不是通过 join 算子完成。而是在流中查询固定容器来实现维度补充。
(3)输出到 OLAP中
把清洗和组合好的宽表数据写入到 OLAP 中。这个部分要注意做到尽可能做到:
基本代码 bean1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 case class DauInfo ( //基本的页面访问日志的数据 var mid :String , var user_id:String , var province_id:String , var channel:String , var is_new:String , var model:String , var operate_system:String , var version_code:String , var brand : String , var page_id:String , var page_item:String , var page_item_type:String , var sourceType :String , var during_time:Long , //用户性别 年龄 var user_gender : String , var user_age : String , //地区信息 var province_name : String , var province_iso_code: String , var province_3166_2 :String , var province_area_code : String , //日期 var dt : String , var hr : String , var ts : Long ) { def this (){ this (null ,null ,null ,null ,null ,null ,null ,null ,null ,null ,null ,null ,null ,0 L,null ,null ,null ,null ,null ,null ,null ,null ,0 L) } }
对象拷贝工具类1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 object MyBeanUtils { def main (args: Array [String ]): Unit = { val pageLog: PageLog = PageLog ("mid1001" , "uid101" , "prov101" , null ,null ,null ,null ,null ,null ,null ,null ,null ,null ,0 L ,null ,123456 ) val dauInfo: DauInfo = new DauInfo () println("拷贝前: " + dauInfo) copyProperties(pageLog,dauInfo) println("拷贝后: " + dauInfo) } def copyProperties (srcObj : AnyRef , destObj: AnyRef ): Unit ={ if (srcObj == null || destObj == null ){ return } val srcFields: Array [Field ] = srcObj.getClass.getDeclaredFields for (srcField <- srcFields) { Breaks .breakable{ var getMethodName : String = srcField.getName var setMethodName : String = srcField.getName+"_$eq" val getMethod: Method = srcObj.getClass.getDeclaredMethod(getMethodName) val setMethod: Method = try { destObj.getClass.getDeclaredMethod(setMethodName, srcField.getType) }catch { case ex : Exception => Breaks .break () } val destField: Field = destObj.getClass.getDeclaredField(srcField.getName) if (destField.getModifiers.equals(Modifier .FINAL )){ Breaks .break () } setMethod.invoke(destObj, getMethod.invoke(srcObj)) } } } }
消费数据和去重日活的统计我们只需要考虑用户的首次访问行为,不同企业判断用户活跃的方式不同,可以通过启动数据或者页面数据来分析,我们采用页面数据来统计日活,页面数据中包含用户所有的访问行为,而我们只需要首次访问行为,所以可以在每批次的数据中先将包含有 last_page_id 的数据过滤掉, 剩下的数据再与第三方(redis)中所记录的今日访问用户进行比对 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 object DwdDauApp { def main (args: Array [String ]): Unit = { val sparkConf: SparkConf = new SparkConf ().setAppName("dwd_dau_app" ).setMaster("local[4]" ) val ssc: StreamingContext = new StreamingContext (sparkConf,Seconds (5 )) val topicName : String = "DWD_PAGE_LOG_TOPIC" val groupId : String = "DWD_DAU_GROUP" val offsets: Map [TopicPartition , Long ] = MyOffsetUtils .readOffset(topicName, groupId) var kafkaDStream: InputDStream [ConsumerRecord [String , String ]] = null if (offsets != null && offsets.nonEmpty){ kafkaDStream = MyKafkaUtils .getKafkaDStream(ssc,topicName,groupId,offsets) }else { kafkaDStream = MyKafkaUtils .getKafkaDStream(ssc,topicName,groupId) } var offsetRanges: Array [OffsetRange ] = null val offsetRangesDStream: DStream [ConsumerRecord [String , String ]] = kafkaDStream.transform( rdd => { offsetRanges = rdd.asInstanceOf[HasOffsetRanges ].offsetRanges rdd } ) val pageLogDStream: DStream [PageLog ] = offsetRangesDStream.map( consumerRecord => { val value: String = consumerRecord.value() val pageLog: PageLog = JSON .parseObject(value, classOf[PageLog ]) pageLog } ) pageLogDStream.cache() pageLogDStream.foreachRDD( rdd => println("自我审查前: " + rdd.count()) ) val filterDStream: DStream [PageLog ] = pageLogDStream.filter( pageLog => pageLog.last_page_id == null ) filterDStream.cache() filterDStream.foreachRDD( rdd => { println("自我审查后: " + rdd.count()) println("----------------------------" ) } ) val redisFilterDStream: DStream [PageLog ] = filterDStream.mapPartitions( pageLogIter => { val pageLogList: List [PageLog ] = pageLogIter.toList println("第三方审查前: " + pageLogList.size) val pageLogs: ListBuffer [PageLog ] = ListBuffer [PageLog ]() val sdf: SimpleDateFormat = new SimpleDateFormat ("yyyy-MM-dd" ) val jedis: Jedis = MyRedisUtils .getJedisFromPool() for (pageLog <- pageLogList) { val mid: String = pageLog.mid val ts: Long = pageLog.ts val date: Date = new Date (ts) val dateStr: String = sdf.format(date) val redisDauKey: String = s"DAU:$dateStr " val isNew: lang.Long = jedis.sadd(redisDauKey, mid) if (isNew == 1 L) { pageLogs.append(pageLog) } } jedis.close() println("第三方审查后: " + pageLogs.size) pageLogs.iterator } ) ssc.start() ssc.awaitTermination() } }
维度合并由于要针对各种对于不同角度的“日活”分析,而 OLAP 数据库中尽量减少 join 操作,所以在实时计算中要考虑其会被分析的维度,补充相应的维度数据,形成宽表。
由于维度数据已经保存在固定容器中了,所以在实时计算中,维度与事实数据关联并不是通过 join 算子完成。而是在流中查询固定容器(redis/mysql/hbase)来实现维度补充。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 object DwdDauApp { def main (args: Array [String ]): Unit = { ... val dauInfoDStream: DStream [DauInfo ] = redisFilterDStream.mapPartitions( pageLogIter => { val dauInfos: ListBuffer [DauInfo ] = ListBuffer [DauInfo ]() val sdf: SimpleDateFormat = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss" ) val jedis: Jedis = MyRedisUtils .getJedisFromPool() for (pageLog <- pageLogIter) { val dauInfo: DauInfo = new DauInfo () MyBeanUtils .copyProperties(pageLog, dauInfo) val uid: String = pageLog.user_id val redisUidkey: String = s"DIM:USER_INFO:$uid " val userInfoJson: String = jedis.get(redisUidkey) val userInfoJsonObj: JSONObject = JSON .parseObject(userInfoJson) val gender: String = userInfoJsonObj.getString("gender" ) val birthday: String = userInfoJsonObj.getString("birthday" ) val birthdayLd: LocalDate = LocalDate .parse(birthday) val nowLd: LocalDate = LocalDate .now() val period: Period = Period .between(birthdayLd, nowLd) val age: Int = period.getYears dauInfo.user_gender = gender dauInfo.user_age = age.toString val provinceID: String = dauInfo.province_id val redisProvinceKey: String = s"DIM:BASE_PROVINCE:$provinceID " val provinceJson: String = jedis.get(redisProvinceKey) val provinceJsonObj: JSONObject = JSON .parseObject(provinceJson) val provinceName: String = provinceJsonObj.getString("name" ) val provinceIsoCode: String = provinceJsonObj.getString("iso_code" ) val province3166: String = provinceJsonObj.getString("iso_3166_2" ) val provinceAreaCode: String = provinceJsonObj.getString("area_code" ) dauInfo.province_name = provinceName dauInfo.province_iso_code = provinceIsoCode dauInfo.province_3166_2 = province3166 dauInfo.province_area_code = provinceAreaCode val date: Date = new Date (pageLog.ts) val dtHr: String = sdf.format(date) val dtHrArr: Array [String ] = dtHr.split(" " ) val dt: String = dtHrArr(0 ) val hr: String = dtHrArr(1 ).split(":" )(0 ) dauInfo.dt = dt dauInfo.hr = hr dauInfos.append(dauInfo) } jedis.close() dauInfos.iterator } ) dauInfoDStream.print(100 ) ssc.start() ssc.awaitTermination() } }
es工具类 写入es(1)建立索引模板
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 PUT _template/gmall_dau_info_template { "index_patterns" : ["gmall_dau_info*" ], "settings" : { "number_of_shards" : 3 }, "aliases" : { "{index}-query" : {}, "gmall_dau_info_all" :{} }, "mappings" : { "properties" :{ "mid" :{ "type" :"keyword" }, "user_id" :{ "type" :"keyword" }, "province_id" :{ "type" :"keyword" }, "channel" :{ "type" :"keyword" }, "is_new" :{ "type" :"keyword" }, "model" :{ "type" :"keyword" }, "operate_system" :{ "type" :"keyword" }, "version_code" :{ "type" :"keyword" }, "page_id" :{ "type" :"keyword" }, "page_item" :{ "type" :"keyword" }, "page_item_type" :{ "type" :"keyword" }, "during_time" :{ "type" :"long" }, "user_gender" :{ "type" :"keyword" }, "user_age" :{ "type" :"integer" }, "province_name" :{ "type" :"keyword" }, "province_iso_code" :{ "type" :"keyword" }, "province_3166_2" :{ "type" :"keyword" }, "province_area_code" :{ "type" :"keyword" }, "dt" :{ "type" :"keyword" }, "hr" :{ "type" :"keyword" }, "ts" :{ "type" :"date" } } } }
(2)业务代码
主程序调用工具类批量写入 ES,其实我们前面已经使用 Redis 进行了去重操作,基本上是可以保证幂等性的。如果更严格的保证幂等性,那我们在批量向 ES 写数据的时候,指定 Index 的 id 即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 dauInfoDStream.foreachRDD( rdd => { rdd.foreachPartition( dauInfoIter => { val docs: List [(String , DauInfo )] = dauInfoIter.map( dauInfo=> (dauInfo.mid , dauInfo)).toList if (docs.size > 0 ){ val head: (String , DauInfo ) = docs.head val ts: Long = head._2.ts val sdf: SimpleDateFormat = new SimpleDateFormat ("yyyy-MM-dd" ) val dateStr: String = sdf.format(new Date (ts)) val indexName : String = s"gmall_dau_info_1018_$dateStr " MyEsUtils .bulkSave(indexName , docs) } } ) MyOffsetUtils .saveOffset(topicName, groupId , offsetRanges) } )
状态数据还原想象一个比较极端的情况, 如果某个用户某天的首次访问数据写入 redis 后, 接下来在写入到 es 的过程中,程序挂掉。 会出现什么问题 ?
程序挂掉,偏移量还未提交,重启后会触发数据的重试,但是因为 redis 中记录了相关的数据,所以该数据会被过滤掉。因此此数据,就再也无法进入 es,也就意味着丢失。
这个问题的本质就是,状态数据与最终数据库的数据以及偏移量,没有形成原子性事务造成的 。
当然可以通过事务数据库的方式解决该问题,而我们的项目中没有选择使用支持事务的数据库 ,例如 MySQL 等。在既有的环境下我们依然有很多破解方案,例如进行状态还原,在启动程序前,将 ES 中已有的数据的 mid 提取出来,覆盖到 Redis 中,这样就能保证Redis 和 ES 数据的同步 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 def revertState (): Unit ={ val date: LocalDate = LocalDate .now() val indexName : String = s"gmall_dau_info_1018_$date " val fieldName : String = "mid" val mids: List [ String ] = MyEsUtils .searchField(indexName , fieldName) val jedis: Jedis = MyRedisUtils .getJedisFromPool() val redisDauKey : String = s"DAU:$date " jedis.del(redisDauKey) if (mids != null && mids.size > 0 ){ val pipeline: Pipeline = jedis.pipelined() for (mid <- mids) { pipeline.sadd(redisDauKey , mid ) } pipeline.sync() } jedis.close() }
调整 ES 参数
此方法由于需要把当日全部 mid 取出,会受到 es 默认的结果返回数限制。需要修改配置扩大返回结果数,如下:
1 2 3 4 PUT /_settings { "index.max_result_window" :"5000000" }
修补主程序,在整个程序加载数据前进行状态还原
1 2 3 4 5 6 7 8 9 10 def main (args: Array [String ]): Unit = { revertDauState() ... ... ... }
订单业务宽表 需求举例(1)订单热力图
(2)订单交易分析
任务分析由于在很多 OLAP 数据库对聚合过滤都是非常强大,但是大表间的关联都不是长项。所以在数据进入 OLAP 前,尽量提前把数据关联组合好,不要在查询的时候临时进行 Join操作。所谓提前关联好,在实时计算中进行流 Join。
这里注意一下双流join ,指的是流与流之间的合并,通常是几乎同时产生的事实表的关联 。
基本代码-bean(1)OrderInfo , 封装订单表数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 case class OrderInfo ( id: Long =0L, province_id: Long =0L, order_status: String =null, user_id: Long =0L, total_amount: Double =0D, activity_reduce_amount: Double =0D, coupon_reduce_amount: Double =0D, original_total_amount: Double =0D, feight_fee: Double =0D, feight_fee_reduce: Double =0D, expire_time: String =null, refundable_time:String =null, create_time: String =null, operate_time: String =null, var create_date: String =null, // 把其他字段处理得到 var create_hour: String =null, var province_name:String =null,//查询维度表得到 var province_area_code:String =null, var province_3166_2_code:String =null, var province_iso_code:String =null, var user_age :Int =0, //查询维度表得到 var user_gender:String =null ) {}
(2)OrderDetail,封装订单详情数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 case class OrderDetail ( id : Long , order_id :Long , sku_id : Long , order_price : Double , sku_num : Long , sku_name :String , create_time : String , split_total_amount: Double = 0D, split_activity_amount: Double =0D, split_coupon_amount:Double = 0D ) {}
(3)OrderWide,订单宽表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 case class OrderWide ( var detail_id: Long =0L, var order_id:Long =0L, var sku_id: Long =0L, var order_price: Double =0D, var sku_num:Long =0L, var sku_name: String =null, var split_total_amount:Double =0D, var split_activity_amount:Double =0D, var split_coupon_amount:Double =0D, var province_id: Long =0L, var order_status: String =null, var user_id: Long =0L, var total_amount: Double =0D, var activity_reduce_amount: Double =0D, var coupon_reduce_amount: Double =0D, var original_total_amount: Double =0D, var feight_fee: Double =0D, var feight_fee_reduce: Double =0D, var expire_time: String =null, var refundable_time:String =null, var create_time: String =null, var operate_time: String =null, var create_date: String =null, var create_hour: String =null, var province_name:String =null, var province_area_code:String =null, var province_3166_2_code:String =null, var province_iso_code:String =null, var user_age :Int =0, var user_gender:String =null ) { def this (orderInfo : OrderInfo ,orderDetail: OrderDetail ){ this mergeOrderInfo(orderInfo) mergeOrderDetail(orderDetail) } def mergeOrderInfo (orderInfo: OrderInfo ): Unit ={ if (orderInfo != null ){ MyBeanUtils .copyProperties(orderInfo,this ) this .order_id = orderInfo.id } } def mergeOrderDetail (orderDetail: OrderDetail ): Unit ={ if (orderDetail != null ){ MyBeanUtils .copyProperties(orderDetail,this ) this .detail_id = orderDetail.id } } }
消费数据1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 object DwdOrderApp { def main (args: Array [String ]): Unit = { val sparconf: SparkConf = new SparkConf ().setAppName("dwd_order_app" ).setMaster("local[4]" ) val ssc: StreamingContext = new StreamingContext (sparconf , Seconds (5 )) val orderInfoTopicName : String = "DWD_ORDER_INFO_I_1018" val orderInfoGroup : String = "DWD_ORDER_INFO:GROUP" val orderInfoOffsets: Map [TopicPartition , Long ] = MyOffsetUtils .readOffset(orderInfoTopicName , orderInfoGroup) val orderDetailTopicName : String = "DWD_ORDER_DETAIL_I_1018" val orderDetailGroup : String = "DWD_ORDER_DETAIL_GROUP" val orderDetailOffsets: Map [TopicPartition , Long ] = MyOffsetUtils .readOffset(orderDetailTopicName ,orderDetailGroup) var orderInfoKafkaDStream: InputDStream [ConsumerRecord [String , String ]] = null if (orderInfoOffsets != null && orderInfoOffsets.nonEmpty){ orderInfoKafkaDStream = MyKafkaUtils .getKafkaDStream(ssc,orderInfoTopicName, orderInfoGroup,orderInfoOffsets) }else { orderInfoKafkaDStream = MyKafkaUtils .getKafkaDStream(ssc,orderInfoTopicName, orderInfoGroup) } var orderDetailKafkaDStream: InputDStream [ConsumerRecord [String , String ]] = null if (orderDetailOffsets!= null && orderDetailOffsets.nonEmpty){ orderDetailKafkaDStream = MyKafkaUtils .getKafkaDStream(ssc,orderDetailTopicName,orderDetailGroup,orderDetailOffsets) }else { orderDetailKafkaDStream = MyKafkaUtils .getKafkaDStream(ssc,orderDetailTopicName,orderDetailGroup) } var orderInfoOffsetRanges: Array [OffsetRange ] = null val orderInfoOffsetDStream: DStream [ConsumerRecord [String , String ]] = orderInfoKafkaDStream.transform( rdd => { orderInfoOffsetRanges = rdd.asInstanceOf[HasOffsetRanges ].offsetRanges rdd } ) var orderDetailOffsetRanges: Array [OffsetRange ] = null val orderDetailOffsetDStream: DStream [ConsumerRecord [String , String ]] = orderDetailKafkaDStream.transform( rdd => { orderDetailOffsetRanges = rdd.asInstanceOf[HasOffsetRanges ].offsetRanges rdd } ) val orderInfoDStream: DStream [OrderInfo ] = orderInfoOffsetDStream.map( consumerRecord => { val value: String = consumerRecord.value() val orderInfo: OrderInfo = JSON .parseObject(value, classOf[OrderInfo ]) orderInfo } ) val orderDetailDStream: DStream [OrderDetail ] = orderDetailOffsetDStream.map( consumerRecord => { val value: String = consumerRecord.value() val orderDetail: OrderDetail = JSON .parseObject(value, classOf[OrderDetail ]) orderDetail } ) ssc.start() ssc.awaitTermination() } }
维度合并1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 val orderInfoDimDStream: DStream [OrderInfo ] = orderInfoDStream.mapPartitions( orderInfoIter => { val orderInfos: List [OrderInfo ] = orderInfoIter.toList val jedis: Jedis = MyRedisUtils .getJedisFromPool() for (orderInfo <- orderInfos) { val uid: Long = orderInfo.user_id val redisUserKey: String = s"DIM:USER_INFO:$uid " val userInfoJson: String = jedis.get(redisUserKey) val userInfoJsonObj: JSONObject = JSON .parseObject(userInfoJson) val gender: String = userInfoJsonObj.getString("gender" ) val birthday: String = userInfoJsonObj.getString("birthday" ) val birthdayLd: LocalDate = LocalDate .parse(birthday) val nowLd: LocalDate = LocalDate .now() val period: Period = Period .between(birthdayLd, nowLd) val age: Int = period.getYears orderInfo.user_gender = gender orderInfo.user_age = age val provinceID: Long = orderInfo.province_id val redisProvinceKey: String = s"DIM:BASE_PROVINCE:$provinceID " val provinceJson: String = jedis.get(redisProvinceKey) val provinceJsonObj: JSONObject = JSON .parseObject(provinceJson) val provinceName: String = provinceJsonObj.getString("name" ) val provinceAreaCode: String = provinceJsonObj.getString("area_code" ) val province3166: String = provinceJsonObj.getString("iso_3166_2" ) val provinceIsoCode: String = provinceJsonObj.getString("iso_code" ) orderInfo.province_name = provinceName orderInfo.province_area_code = provinceAreaCode orderInfo.province_3166_2_code = province3166 orderInfo.province_iso_code = provinceIsoCode val createTime: String = orderInfo.create_time val createDtHr: Array [String ] = createTime.split(" " ) val createDate: String = createDtHr(0 ) val createHr: String = createDtHr(1 ).split(":" )(0 ) orderInfo.create_date = createDate orderInfo.create_hour = createHr } jedis.close() orderInfos.iterator } )
双流join(1)分析
由于两个流的数据是独立保存,独立消费,很有可能同一业务的数据,分布在不同的批次。因为 join 算子只 join 同一批次的数据。如果只用简单的 join 流方式,会丢失掉不同批次的数据。
(2)解决策略
增大采集周期 利用滑动窗口进行 join 然后再进行去重 把数据存入缓存 ,关联时进行 join 后 ,再去查询缓存中的数据,来弥补不同批次的问题。 (3)程序流程图(缓存策略)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 val orderInfoKVDStream: DStream [(Long , OrderInfo )] =orderInfoDimDStream.map( orderInfo => (orderInfo.id , orderInfo)) val orderDetailKVDStream: DStream [(Long , OrderDetail )] =orderDetailDStream.map(orderDetail => (orderDetail.order_id , orderDetail)) val orderJoinDStream: DStream [(Long , (Option [OrderInfo ], Option [OrderDetail ]))] =orderInfoKVDStream.fullOuterJoin(orderDetailKVDStream) val orderWideDStream: DStream [OrderWide ] = orderJoinDStream.mapPartitions( orderJoinIter => { val jedis: Jedis = MyRedisUtils .getJedisFromPool() val orderWides: ListBuffer [OrderWide ] = ListBuffer [OrderWide ]() for ((key, (orderInfoOp, orderDetailOp)) <- orderJoinIter) { if (orderInfoOp.isDefined) { val orderInfo: OrderInfo = orderInfoOp.get if (orderDetailOp.isDefined) { val orderDetail: OrderDetail = orderDetailOp.get val orderWide: OrderWide = new OrderWide (orderInfo, orderDetail) orderWides.append(orderWide) } val redisOrderInfoKey: String = s"ORDERJOIN:ORDER_INFO:${orderInfo.id} " jedis.setex(redisOrderInfoKey, 24 * 3600 , JSON .toJSONString(orderInfo, new SerializeConfig (true ))) val redisOrderDetailKey: String = s"ORDERJOIN:ORDER_DETAIL:${orderInfo.id} " val orderDetails: util.Set [String ] = jedis.smembers(redisOrderDetailKey) if (orderDetails != null && orderDetails.size() > 0 ) { import scala.collection.JavaConverters ._ for (orderDetailJson <- orderDetails.asScala) { val orderDetail: OrderDetail = JSON .parseObject(orderDetailJson, classOf[OrderDetail ]) val orderWide: OrderWide = new OrderWide (orderInfo, orderDetail) orderWides.append(orderWide) } } } else { val orderDetail: OrderDetail = orderDetailOp.get val redisOrderInfoKey: String = s"ORDERJOIN:ORDER_INFO:${orderDetail.order_id} " val orderInfoJson: String = jedis.get(redisOrderInfoKey) if (orderInfoJson != null && orderInfoJson.size > 0 ) { val orderInfo: OrderInfo = JSON .parseObject(orderInfoJson, classOf[OrderInfo ]) val orderWide: OrderWide = new OrderWide (orderInfo, orderDetail) orderWides.append(orderWide) } else { val redisOrderDetailKey: String = s"ORDERJOIN:ORDER_DETAIL:${orderDetail.order_id} " jedis.sadd(redisOrderDetailKey, JSON .toJSONString(orderDetail, new SerializeConfig (true ))) jedis.expire(redisOrderDetailKey, 24 * 3600 ) } } } jedis.close() orderWides.iterator } )
写入es(1)建索引模板
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 PUT _template/gmall_order_wide_template{ "index_patterns" : ["gmall_order_wide*" ], "settings" : { "number_of_shards" : 3 }, "aliases" : { "{index}-query" : {}, "gmall_order_wide-query" :{} }, "mappings" : { "properties" : { "detail_id" : { "type" : "keyword" }, "order_id" : { "type" : "keyword" }, "sku_id" : { "type" : "keyword" }, "sku_num" : { "type" : "long" }, "sku_name" : { "type" : "text" , "analyzer" : "ik_max_word" }, "order_price" : { "type" : "float" }, "split_total_amount" : { "type" : "float" }, "split_activity_amount" : { "type" : "float" }, "split_coupon_amount" : { "type" : "float" }, "province_id" : { "type" : "keyword" }, "order_status" : { "type" : "keyword" }, "user_id" : { "type" : "keyword" }, "total_amount" : { "type" : "float" }, "activity_reduce_amount" : { "type" : "float" }, "coupon_reduce_amount" : { "type" : "float" }, "original_total_amount" : { "type" : "float" }, "feight_fee" : { "type" : "float" }, "feight_fee_reduce" : { "type" : "float" }, "expire_time" : { "type" : "date" , "format" : "yyyy-MM-dd HH:mm:ss" }, "refundable_time" : { "type" : "date" , "format" : "yyyy-MM-dd HH:mm:ss" }, "create_time" : { "type" : "date" , "format" : "yyyy-MM-dd HH:mm:ss" }, "operate_time" : { "type" : "date" , "format" : "yyyy-MM-dd HH:mm:ss" }, "create_date" : { "type" : "keyword" }, "create_hour" : { "type" : "keyword" }, "province_name" : { "type" : "keyword" }, "province_area_code" : { "type" : "keyword" }, "province_3166_2_code" : { "type" : "keyword" }, "province_iso_code" : { "type" : "keyword" }, "user_age" : { "type" : "long" }, "user_gender" : { "type" : "keyword" } } } }
(2)业务代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 orderWideDStream.foreachRDD( rdd => { rdd.foreachPartition( orderWideIter => { val orderWides: List [(String , OrderWide )] = orderWideIter.map( orderWide => (orderWide.detail_id.toString , orderWide)).toList if (orderWides.size > 0 ){ val head: (String , OrderWide ) = orderWides.head val date: String = head._2.create_date val indexName : String = s"gmall_order_wide_1018_$date " MyEsUtils .bulkSave(indexName , orderWides) } } ) MyOffsetUtils .saveOffset(orderInfoTopicName , orderInfoGroup , orderInfoOffsetRanges) MyOffsetUtils .saveOffset(orderDetailTopicName , orderDetailGroup ,orderDetailOffsetRanges) } )
ES 简介ElasticSearch 是一个基于 Lucene 的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于 RESTful web 接口。Elasticsearch 是用 Java 开发的,并作为 Apache 许可条款下的开放源码发布,是当前流行的企业级搜索引擎。
使用场景为用户提供按关键字查询的全文搜索功能。 实现企业海量数据的处理分析的解决方案。大数据领域的重要一份子,如著名的ELK 框架(ElasticSearch,Logstash,Kibana)。 作为 OLAP 数据库,对数据进行统计分析。 与其他数据存储进行比较redis mysql es hbase Hadoop/hive 容量/容量扩展 低 中 较大 海量 海量 查询时效性 极高 较高(需要索引优化) 较高 较高(rowkey方式);较低(scan方式) 低 查询灵活性 最差,k-v模式 非常好,支持sql 较好,关联查询较弱,但是可以全文检索,DSL 语言可以处理过滤、匹配、排序、聚合等各种操作 较差,主要靠 rowkey,scan 的话性能不行,或者安装 phoenix 插件来实现 sql 及二级索引 非常好,支持sql 写入速度 极快 中等(同步写入) 较快(异写入) 较快(异步写入) 慢 一致性、事务 弱 强 弱 弱 弱
特点 天然的分布式数据库ES 把数据分成多个 shard(分片),下图中的 P0-P2,多个 shard 可以组成一份完整的数据,这些 shard 可以分布在集群中的各个机器节点中。随着数据的不断增加,集群可以增加多个分片,把多个分片放到多个机子上,已达到负载均衡,横向扩展。
天然索引之倒排索引ES 所有数据都是默认进行索引的 ,这点和 mysql 正好相反,mysql 是默认不加索引,要加索引必须特别说明,ES 只有不加索引才需要说明。而 ES 使用的是倒排索引和 Mysql 的 B+Tree 索引不同。
传统关系性数据库索引 保存数据的方式是 记录→单词:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EN52cNhI-1663766816465)(https://github.com/1345414527/Spark-real-time-data-warehouse/raw/master/assets/image-20220917204321011.png )]
弊端:对于传统的关系性数据库对于关键词的查询,只能逐字逐行的匹配,性能非常差。匹配方式不合理,比如搜索“小密手机” ,如果用 like 进行匹配, 根本匹配不到。但是考虑使用者的用户体验的话,除了完全匹配的记录,还应该显示一部分近似匹配的记录,至少应该匹配到“手机”。
**倒排索引:**全文搜索引擎目前主流的索引技术就是倒排索引的方式。倒排索引的保存数据的方式是 单词→记录:
基于分词技术构建倒排索引:
首先每个记录保存数据时,都不会直接存入数据库。系统先会对数据进行分词,然后以倒排索引结构保存。然后等到用户搜索的时候,会把搜索的关键词也进行分词,会把“红海行动”分词分成:红海和行动两个词。这样的话,先用红海进行匹配,得到 id 为 1 和 2 的记录编号,再用行动匹配可以迅速定位 id 为 1,3 的记录。
那么全文索引通常,还会根据匹配程度进行打分,显然 1 号记录能匹配的次数更多。所以显示的时候以评分进行排序的话,1 号记录会排到最前面。而 2、3 号记录也可以匹配到。
索引结构(1)B+ tree
(2)lucene 倒排索引结构
可以看到 lucene 为倒排索引(Term Dictionary)部分又增加一层 Term Index 结构,用于快速定位,而这 Term Index 是缓存在内存中的,但 mysql 的 B+tree 不在内存中,所以整体来看 ES 速度更快,但同时也更消耗资源(内存、磁盘)。
Term index → Term dictionary → posting list
天然索引之正排索引( Doc Value列式存储)倒排索引在搜索包含指定词条的文档时非常高效,但是在相反的操作时表现很差:查询一个文档中包含哪些词条。具体来说,倒排索引在搜索时最为高效,但在排序、聚合等与指定字段相关的操作时效率低下,需要用 doc_values。
在 Elasticsearch 中,Doc Values 就是一种列式存储结构,默认情况下每个字段的 Doc Values 都是激活的。
索引中某个字段的存储结构如下:
列式存储结构非常适合排序、聚合以及字段相关的脚本操作。而且这种存储方式便于压缩,尤其是数字类型。压缩后能够大大减少磁盘空间,提升访问速度。
lucene与Elasticsearch的关系咱们之前讲的处理分词,构建倒排索引,等等,都是这个叫 lucene 的做的。那么能不能说这个 lucene 就是搜索引擎呢?
还不能。lucene 只是一个提供全文搜索功能类库的核心工具包,而真正使用它还需要一个完善的服务框架搭建起来的应用。
好比 lucene 是类似于发动机,而搜索引擎软件(ES,Solr)就是汽车。
目前市面上流行的搜索引擎软件,主流的就两款,elasticsearch 和 solr,这两款都是基于lucene 的搭建的,可以独立部署启动的搜索引擎服务软件。由于内核相同,所以两者除了服务器安装、部署、管理、集群以外,对于数据的操作,修改、添加、保存、查询等等都十分类似。就好像都是支持 sql 语言的两种数据库软件。只要学会其中一个另一个很容易上手。
从实际企业使用情况来看,elasticSearch 的市场份额逐步在取代 solr,国内百度、京东、新浪都是基于 elasticSearch 实现的搜索功能。国外就更多了 像维基百科、GitHub、Stack Overflow 等等也都是基于 ES 的。
安装 修改操作系统参数因为默认 elasticsearch 是单机访问模式,就是只能自己访问自己。但是我们会设置成允许应用服务器通过网络方式访问,而且生产环境也是这种方式。这时,Elasticsearch 就会因为嫌弃单机版的低端默认配置而报错,甚至无法启动。所以我们在这里就要把服务器的一些限制打开,能支持更多并发。
(1)问题: max file descriptors [4096] for elasticsearch process likely too low, increase to at least [65536] elasticsearch
修改系统允许 Elasticsearch 打开的最大文件数需要修改成 65536。
1 2 3 4 5 vim /etc/security/limits.conf * soft nofile 65536 * hard nofile 131072 * soft nproc 2048 * hard nproc 65536
(2)问题:max virtual memory areas vm.max_map_count [65530] likely too low, increase to at least [262144]
修改一个进程可以拥有的虚拟内存区域的数量。
1 2 vim /etc/sysctl.conf vm.max_map_count=262144
(3)问题:max number of threads [1024] for user [judy2] likely too low, increase to at least [4096] (CentOS7.x 不用改)
修改允许最大线程数为 4096
1 2 vim /etc/security/limits.d/20-nproc.conf * soft nproc 4096
(4)重启 linux 使配置生效
安装es(1)上传安装包,将 elasticsearch 安装包上传到 opt/software/目录下
(2)将 ES 解压到/opt/module目录下
1 tar -zxvf elasticsearch-7.8.0.tar.gz -C /opt/module/
(3)在/opt/module目录下对ES重命名
1 mv elasticsearch-7.8.0 es7
(4)修改 ES 配置文件 elasticsearch.yml
1 2 cd config/vim elasticsearch.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 (1)集群名称,同一集群名称必须相同 cluster.name: my-es (2)单个节点名称,不同节点名称不能一样,例如:node-1,node-2,node-3 node.name: node-1 (3)把 bootstrap 自检程序关掉 bootstrap.memory_lock: false (4)网络部分 network.host: 0.0.0.0 http.port: 9200 transport.tcp.port: 9301 (5)自发现配置:新节点向集群报到的主机名 discovery.seed_hosts: ["192.168.6.134:9301" ] cluster.initial_master_nodes: ["node-1" ] discovery.zen.fd.ping_timeout: 1m discovery.zen.fd.ping_retries: 5
上面的配置最好不要少,我把集群的少了但是出错了。最后全加上了。
(5)教学环境启动优化
ES 是用在 Java 虚拟机中运行的,虚拟机默认启动占用 1G 内存。但是如果是装在 PC 机学习用,实际用不了 1 个 G。所以可以改小一点内存;但生产环境一般 31G 内存是标配,这个时候需要将这个内存调大。
1 2 3 4 vim jvm.options -Xms512m -Xmx512m
碰到的问题1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 [2022 -09-17T09:43 :54 ,694 ][ERROR][o.e.b.ElasticsearchUncaughtExceptionHandler] [node-1 ] uncaught exception in thread [main] org.elasticsearch.bootstrap.StartupException: java.lang.RuntimeException: can not run elasticsearch as root at org.elasticsearch.bootstrap.Elasticsearch.init(Elasticsearch.java:174 ) ~[elasticsearch-7.8 .0 .jar:7.8 .0 ] at org.elasticsearch.bootstrap.Elasticsearch.execute(Elasticsearch.java:161 ) ~[elasticsearch-7.8 .0 .jar:7.8 .0 ] at org.elasticsearch.cli.EnvironmentAwareCommand.execute(EnvironmentAwareCommand.java:86 ) ~[elasticsearch-7.8 .0 .jar:7.8 .0 ] at org.elasticsearch.cli.Command.mainWithoutErrorHandling(Command.java:127 ) ~[elasticsearch-cli-7.8 .0 .jar:7.8 .0 ] at org.elasticsearch.cli.Command.main(Command.java:90 ) ~[elasticsearch-cli-7.8 .0 .jar:7.8 .0 ] at org.elasticsearch.bootstrap.Elasticsearch.main(Elasticsearch.java:126 ) ~[elasticsearch-7.8 .0 .jar:7.8 .0 ] at org.elasticsearch.bootstrap.Elasticsearch.main(Elasticsearch.java:92 ) ~[elasticsearch-7.8 .0 .jar:7.8 .0 ] Caused by: java.lang.RuntimeException: can not run elasticsearch as root at org.elasticsearch.bootstrap.Bootstrap.initializeNatives(Bootstrap.java:111 ) ~[elasticsearch-7.8 .0 .jar:7.8 .0 ] at org.elasticsearch.bootstrap.Bootstrap.setup(Bootstrap.java:178 ) ~[elasticsearch-7.8 .0 .jar:7.8 .0 ] at org.elasticsearch.bootstrap.Bootstrap.init(Bootstrap.java:393 ) ~[elasticsearch-7.8 .0 .jar:7.8 .0 ] at org.elasticsearch.bootstrap.Elasticsearch.init(Elasticsearch.java:170 ) ~[elasticsearch-7.8 .0 .jar:7.8 .0 ] ... 6 more uncaught exception in thread [main] java.lang.RuntimeException: can not run elasticsearch as root at org.elasticsearch.bootstrap.Bootstrap.initializeNatives(Bootstrap.java:111 ) at org.elasticsearch.bootstrap.Bootstrap.setup(Bootstrap.java:178 ) at org.elasticsearch.bootstrap.Bootstrap.init(Bootstrap.java:393 ) at org.elasticsearch.bootstrap.Elasticsearch.init(Elasticsearch.java:170 ) at org.elasticsearch.bootstrap.Elasticsearch.execute(Elasticsearch.java:161 ) at org.elasticsearch.cli.EnvironmentAwareCommand.execute(EnvironmentAwareCommand.java:86 ) at org.elasticsearch.cli.Command.mainWithoutErrorHandling(Command.java:127 ) at org.elasticsearch.cli.Command.main(Command.java:90 ) at org.elasticsearch.bootstrap.Elasticsearch.main(Elasticsearch.java:126 ) at org.elasticsearch.bootstrap.Elasticsearch.main(Elasticsearch.java:92 ) For complete error details, refer to the log at /root/es/elasticsearch-7.8 .0 /logs/my-es.log
这里是不能使用root用户:
1 2 3 4 adduser es passwd es chown -R es:es elasticsearch-7.8.0 chmod 777 elasticsearch-7.8.0
切换到es用户运行,又是报错:
1 错误: 找不到或无法加载主类 org.elasticsearch.tools.java_version_checker.JavaVersionChecker
此时可能是因为将该文件夹放到了/root文件夹下,移动到/opt/moudle下就可以了。
1 mv elasticsearch-7.8.0 /opt/moudle
启动测试在 hadoop102 上执行启动命令
查看进程
命令行进行测试
1 curl http://127.0.0.1:9200
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 { "name" : "node-1" , "cluster_name" : "my-es" , "cluster_uuid" : "JjV4RSntTKKRQnEwCu_45g" , "version" : { "number" : "7.8.0" , "build_flavor" : "default" , "build_type" : "tar" , "build_hash" : "757314695644ea9a1dc2fecd26d1a43856725e65" , "build_date" : "2020-06-14T19:35:50.234439Z" , "build_snapshot" : false , "lucene_version" : "8.5.1" , "minimum_wire_compatibility_version" : "6.8.0" , "minimum_index_compatibility_version" : "6.0.0-beta1" }, "tagline" : "You Know, for Search" }
浏览器进行测试
在浏览器地址栏中输入: http://192.168.6.134:9200 进行访问
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 // 20220917224759 // http://192.168.6.134:9200/ { "name" : "node-1" , "cluster_name" : "my-es" , "cluster_uuid" : "JjV4RSntTKKRQnEwCu_45g" , "version" : { "number" : "7.8.0" , "build_flavor" : "default" , "build_type" : "tar" , "build_hash" : "757314695644ea9a1dc2fecd26d1a43856725e65" , "build_date" : "2020-06-14T19:35:50.234439Z" , "build_snapshot" : false , "lucene_version" : "8.5.1" , "minimum_wire_compatibility_version" : "6.8.0" , "minimum_index_compatibility_version" : "6.0.0-beta1" }, "tagline" : "You Know, for Search" }
Kibana 简介Elasticsearch 提供了一套全面和强大的 REST API,我们可以通过这套 API 与 ES 集群进行交互。
例如:我们可以通过 API: GET /_cat/nodes?v获取ES集群节点情况,要想访问这个API,我们需要使用 curl 命令工具来访问 Elasticsearch 服务:curl http://hdp1:9200/_cat/nodes?v ,当然也可以使用任何其他 HTTP/REST 调试工具,例如浏览器、POSTMAN 等。
Kibana 是为 Elasticsearch 设计的开源分析和可视化平台。你可以使用 Kibana 来搜索,查看存储在 Elasticsearch 索引中的数据并与之交互。你可以很容易实现高级的数据分析和可视化,以图表的形式展现出来。
安装kibana版本下载:https://www.elastic.co/cn/downloads/kibana(mac的虚拟机请选择Linux aarch64)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 1. 上传安装包将 Kibana 安装包上传到服务器,并解压 tar -zxvf kibana-7.8.0-linux-x86_64.tar.gz -C /opt/module/ 2. 重命名 mv kibana-7.8.0-linux-x86_64/ kibana7 3. 修改配置 cd config/vim kibana.yml 4. 授权远程访问 server.host: "0.0.0.0" 5. 指定 ElasticSearch 地址(可以指定多个,多个地之间用逗号分隔) elasticsearch.hosts: ["http://127.0.0.1:9200" ]
启动测试启动 Kinana
root用户没法启动。
查看进程
1 netstat -nltp | grep 5601
浏览器访问
浏览器访问 http://192.168.6.134:5601/
出了一些问题,懒得找了,直接docker安装。
集群脚本1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 # !/bin/bash es_home=/opt/module/es7 kibana_home=/opt/module/kibana7 if [ $# -lt 1 ] then echo "USAGE:es.sh {start|stop}" exit fi case $1 in "start") #启动 ES for i in hadoop102 hadoop103 hadoop104 do ssh $i "source /etc/profile;nohup ${es_home}/bin/elasticsearch >/dev/null 2>&1 &" done #启动 Kibana nohup ${kibana_home}/bin/kibana > ${kibana_home}/logs/kibana.log 2>&1 & ;; "stop") #停止 Kibana sudo netstat -nltp | grep 5601 |awk '{print $7}' | awk -F / '{print $1}'| xargs kill # 停止 ES for i in hadoop102 hadoop103 hadoop104 do ssh $i "ps -ef|grep $es_home |grep -v grep|awk '{print \$2}'|xargs kill" >/dev/null 2>&1 done ;; *) echo "USAGE:es.sh {start|stop}" exit ;; esac
ES介绍看这篇博客吧:👉 https://blog.csdn.net/qq_44766883/article/details/126973974
SpringBoot可视化这部分太简单了,这里我就看了下mapper的逻辑。具体的kibana使用和完整说明可以看pdf文件。
日活DSL:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 查询总数: GET gmall_dau_info_2022-01-14/_search { "_source" : "" , "size" : 0 } 查询分时: GET gmall_dau_info_2022-01-14/_search { "size" : 0 , "aggs" : { "groupby_hr" : { "terms" : { "field" : "hr" , "size" : 24 } } } }
代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 @Override public Map <String , Object > searchDau(String td) { Map <String ,Object > dauResults = new HashMap <>(); Long dauTotal = searchDauTotal(td); dauResults.put("dauTotal" ,dauTotal) ; Map <String , Long > dauTd = searchDauHr(td); dauResults.put("dauTd" , dauTd); LocalDate tdLd = LocalDate .parse(td); LocalDate ydLd = tdLd.minusDays(1 ); Map <String , Long > dauYd = searchDauHr(ydLd.toString()); dauResults.put("dauYd" , dauYd); return dauResults; } public Map <String ,Long > searchDauHr(String td ){ HashMap <String , Long > dauHr = new HashMap <>(); String indexName = dauIndexNamePrefix + td ; SearchRequest searchRequest = new SearchRequest (indexName); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder (); searchSourceBuilder.size(0 ); TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders .terms("groupbyhr" ).field("hr" ).size(24 ); searchSourceBuilder.aggregation(termsAggregationBuilder); searchRequest.source(searchSourceBuilder); try { SearchResponse searchResponse = esClient.search(searchRequest, RequestOptions .DEFAULT ); Aggregations aggregations = searchResponse.getAggregations(); ParsedTerms parsedTerms = aggregations.get("groupbyhr" ); List <? extends Terms .Bucket > buckets = parsedTerms.getBuckets(); for (Terms .Bucket bucket : buckets) { String hr = bucket.getKeyAsString(); long hrTotal = bucket.getDocCount(); dauHr.put(hr, hrTotal); } return dauHr ; } catch (ElasticsearchStatusException ese){ if (ese.status() == RestStatus .NOT_FOUND ){ log.warn( indexName +" 不存在......" ); } } catch (IOException e) { e.printStackTrace(); throw new RuntimeException ("查询ES失败...." ); } return dauHr ; } public Long searchDauTotal(String td ){ String indexName = dauIndexNamePrefix + td ; SearchRequest searchRequest = new SearchRequest (indexName); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder (); searchSourceBuilder.size(0 ); searchRequest.source(searchSourceBuilder); try { SearchResponse searchResponse = esClient.search(searchRequest, RequestOptions .DEFAULT ); long dauTotals = searchResponse.getHits().getTotalHits().value; return dauTotals ; }catch (ElasticsearchStatusException ese){ if (ese.status() == RestStatus .NOT_FOUND ){ log.warn( indexName +" 不存在......" ); } } catch (IOException e) { e.printStackTrace(); throw new RuntimeException ("查询ES失败...." ); } return 0 L; }
交易分析业务处理以年龄为例,DSL:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 GET gmall_order_wide_2022-01-14/_search { "query" : { "match" : { "sku_name" :{ "query" : "小米手机" , "operator" : "and" } } } , "aggs" : { "groupby_gender" : { "terms" : { "field" : "user_gender" , "size" : 2 } , "aggs" : { "sum_amount" : { "sum" : { "field" : "split_total_amount" } } } } } , "size" : 0 }
代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 @Override public List <NameValue > searchStatsByItem(String itemName, String date, String field) { ArrayList <NameValue > results = new ArrayList <>(); String indexName = orderIndexNamePrefix + date ; SearchRequest searchRequest = new SearchRequest (indexName); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder (); searchSourceBuilder.size(0 ); MatchQueryBuilder matchQueryBuilder = QueryBuilders .matchQuery("sku_name" , itemName).operator(Operator .AND ); searchSourceBuilder.query(matchQueryBuilder); TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders .terms("groupby" + field).field(field).size(100 ); SumAggregationBuilder sumAggregationBuilder = AggregationBuilders .sum("totalamount" ).field("split_total_amount" ); termsAggregationBuilder.subAggregation(sumAggregationBuilder); searchSourceBuilder.aggregation(termsAggregationBuilder); searchRequest.source(searchSourceBuilder); try { SearchResponse searchResponse = esClient.search(searchRequest, RequestOptions .DEFAULT ); Aggregations aggregations = searchResponse.getAggregations(); ParsedTerms parsedTerms = aggregations.get("groupby" + field); List <? extends Terms .Bucket > buckets = parsedTerms.getBuckets(); for (Terms .Bucket bucket : buckets) { String key = bucket.getKeyAsString(); Aggregations bucketAggregations = bucket.getAggregations(); ParsedSum parsedSum = bucketAggregations.get("totalamount" ); double totalamount = parsedSum.getValue(); results.add(new NameValue (key ,totalamount)); } return results ; } catch (ElasticsearchStatusException ese){ if (ese.status() == RestStatus .NOT_FOUND ){ log.warn( indexName +" 不存在......" ); } } catch (IOException e) { e.printStackTrace(); throw new RuntimeException ("查询ES失败...." ); } return results; }
交易分析明细DSL:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 GET gmall_order_wide_2022-01-14/_search { "query" : { "match" : { "sku_name" : { "query" : "小米手机" , "operator" : "and" } } } , "size" : 20 , "from" : 0 , "_source" : ["create_time" ,"order_price" ,"province_name" ,"sku_name" ,"sku_num" ,"total_amount" ,"user_age" ,"user_gender" ], "highlight" : { "fields" : {"sku_name" : {}} } }
代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 @Override public Map <String , Object > searchDetailByItem(String date, String itemName, Integer from, Integer pageSize) { HashMap <String , Object > results = new HashMap <>(); String indexName = orderIndexNamePrefix + date ; SearchRequest searchRequest = new SearchRequest (indexName); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder (); searchSourceBuilder.fetchSource(new String []{"create_time" , "order_price" , "province_name" , "sku_name" , "sku_num" , "total_amount" ,"user_age" ,"user_gender" }, null ); MatchQueryBuilder matchQueryBuilder = QueryBuilders .matchQuery("sku_name" , itemName).operator(Operator .AND ); searchSourceBuilder.query(matchQueryBuilder); searchSourceBuilder.from(from); searchSourceBuilder.size(pageSize); HighlightBuilder highlightBuilder = new HighlightBuilder (); highlightBuilder.field("sku_name" ); searchSourceBuilder.highlighter(highlightBuilder); searchRequest.source(searchSourceBuilder); try { SearchResponse searchResponse = esClient.search(searchRequest, RequestOptions .DEFAULT ); long total = searchResponse.getHits().getTotalHits().value; SearchHit [] searchHits = searchResponse.getHits().getHits(); ArrayList <Map <String , Object >> sourceMaps = new ArrayList <>(); for (SearchHit searchHit : searchHits) { Map <String , Object > sourceMap = searchHit.getSourceAsMap(); Map <String , HighlightField > highlightFields = searchHit.getHighlightFields(); HighlightField highlightField = highlightFields.get("sku_name" ); Text [] fragments = highlightField.getFragments(); String highLightSkuName = fragments[0 ].toString(); sourceMap.put("sku_name" ,highLightSkuName ) ; sourceMaps.add(sourceMap); } results.put("total" ,total ); results.put("detail" , sourceMaps); return results ; } catch (ElasticsearchStatusException ese){ if (ese.status() == RestStatus .NOT_FOUND ){ log.warn( indexName +" 不存在......" ); } } catch (IOException e) { e.printStackTrace(); throw new RuntimeException ("查询ES失败...." ); } return results; }