详情开发使用介绍
DataBand(数据帮),快速采集清洗,任务管理,实时流和批处理数据分析,数据可视化展现,快速数据模板开发,ETL工具集、数据科学等。是轻量级的一站式的大数据平台。 我们致力于通过提供智能应用程序、数据分析和咨询服务来提供最优解决方案。
软件架构
技术栈
存储
- 分布式存储:HDFS、HBase
- 行式关系存储:MySQL、Oracle
- 列式存储:ClickHouse
- 列族存储:HBase、Cassandra
- 文档库:ElasticSearch、MongoDB
计算
- 计算引擎:Presto、Hive
- 流处理:Storm、Flink
集成:
- Flume
- Filebeat
- Logstash
前端技术栈
- Vue
- Element UI
后端技术栈
- Spring Boot
- Spring Cloud
- MyBatis
工程说明
大数据模拟数据源生成数据(数据准备工程)
数据源:
- databand-mock-api:接口模拟工具,模拟业务系统api;
- databand-mock-log:日志模拟工具,手动产生大量的日志数据供调试测试,比如Syslog、log、CSV生成、Json、MySQL注入、RPC写、NetCat等;
- databand-mock-mq:日志模拟工具,通过MQ写产生大量的日志数据供调试测试,比如RabbitMQ写、Kafka写等;
- databand-mock-hadoop:大数据日志模拟工具,hdfs和mapreduce相关;
数据采集清洗(采集清洗工程)
- databand-etl-mysql_ods:采集清洗mysql数据比如MySQL到ods临时中间库(包括Redis、Kafka等);
- databand-etl-mysql_olap:采集清洗mysql数据到OLAP数据仓库;
- databand-etl-mysql_hadoop:采集清洗mysql数据到Hadoop分布式存储;
- databand-etl-logfile_ods:采集清洗半结构化日志文件,比如json、xml、log、csv文件数据到ods临时中间库;
- databand-etl-logfile_olap:采集清洗半结构化日志文件数据到OLAP数据仓库;
- databand-etl-logfile_hadoop:采集清洗日志文件数据到Hadoop分布式存储;
- databand-etl-mq_ods:通过MQ消费采集数据,入ods库;
- databand-etl-mq_olap:通过MQ消费采集数据,入OLAP库;
- databand-etl-mq_hadoop:通过MQ消费采集数据,入Hadoop;- databand-ml:数据科学工程;
数据分析作业(定时作业调度工程)
- databand-job-springboot:定时任务作业调度服务,支持shell,hive,python,spark-sql,java jar任务。
- databand-streamjob-springboot:流数据作业,支持kafka数据消费至clickhouse、mysql、es等。
数据分析门户(后端管理和前端展示工程)
- databand-admin-ui:前后端分离的纯前端UI工程,数据展现(目前未开发);
- databand-admin-thymeleaf:后端权限、关系、站点配置管理(前后端不分离,正在开发的版本),基于若依框架;
- databand-admin-api:数据api服务;
- databand-admin-tools:BI工具集;
实时流数据(2021年-9月更新)
- databand-rt-flinkstreaming:flink实时数据流处理。主要是PV、UV,涉及窗口、聚合、延时、水印、统计、checkpoint等基本用法;
- databand-rt-redis:实时处理的一些缓存存储;
- databand-rt-sparkstreaming:spark实时数据流处理,和flink的功能近似,主要structured streaming;
愿景目标
3年愿景目标
工程细节说明
databand-mock-api (模拟数据源API工程) API模拟工具
- App.java:一个简单的mock控制台程序
api mock详情介绍
api mock工程源码
databand-mock-log (模拟数据源生成日志数据工程) 日志模拟工具
目前是简单的控制台小程序,直接运行main即可。
- CsvMock.java:csv文件生成,运行后在"FILE_PATH"定义的文件夹中可找到csv文件:
- LogMock.java:log文件生成,生成路径见配置文件:logback.xml。 win下默认“c:/logs/”,linux 或 mac下路径请自行修改:
- JsonMock.java:json文件生成,在"FILE_PATH"定义的文件夹中可找到json文件:
- XmlMock.java:xml文件生成,在"FILE_PATH"定义的文件夹中可找到json文件:
- RpcMock.java:rpc输出,运行后可以用flume(或filebeat)进行测试,配置文件见:/flumeConf/avro-memory-log.properties:运行脚本: flume-ng agent --conf conf --conf-file /usr/app/apache-flume-1.8.0-bin/avro-memory-log.properties --name a2 -Dflume.root.logger=INFO,console
- SyslogMock.java:syslog(udp)输出,运行后可以用flume(或filebeat)进行测试,配置文件见:/flumeConf/syslog-log.properties:
- TcpMock.java:Tcp输出,运行后可以用flume进行测试,配置文件见:/flumeConf/syslog-log.properties:
- MySQLMock.java:mysql数据生成,通过list键值对形式对数据表进行写操作。
log mock工程源码
databand-mock-mq (模拟数据源生成日志数据工程) MQ消息模拟生成工具
目前是简单的控制台小程序,直接运行main即可。
- KafkaProducer.java:Kafka消息生成:
- KafkaConsumer.java:Kafka消息消费:
- RabbitMQProducer.java:RabbitMQ消息生成:
- RabbitMQConsumer.java:RabbitMQ消息消费:
mq mock工程源码
数据源日志
类型分为:
- CSV日志,用于批处理,采用UTF-8字符集,每行( )表示一条记录,每条记录中各个字段的值使用双引号括起来,并使用逗号(,)分隔;
- Kafka 日志,用于流处理,生产者策略性的产生一些有偏移属性的带日期时间数据。
业务:
- a)产品销售日志,采用CSV格式;
- b)节目播出日志,采用CSV格式;
- c)搜索热词日志,采用kafka;
- d)广告播放日志,采用kafka;
数据定义,批处理类型日志,原始数据源为csv,暂时以这两个业务作为批处理数据演示,实际上平台将是与业务无关的,只关注数据流和数据服务。
一、产品销售csv日志: 处理类:org.databandtech.logmock.ProductSalesCSVLog
- 1 产品id productId
- 2 产品分类id categoryId
- 3 型号规格 modelId
- 4 颜色 color
- 5 买家id userId
- 6 购买日期 saleDatetime
- 7 购买数量 buyCount
- 8 购买金额 buyTotle
- 9 折扣金额 buyDiscount
- 10 城市 cityCode
- 11 地址 address
二、节目播出csv日志 处理类:org.databandtech.logmock.ShowsCSVLog
- 1 用户id userId
- 2 状态类型码 status
- 3 城市 cityCode
- 4 区县 areaCode
- 5 收视开始时间 beginTime
- 6 收视结束时间 endTime
- 7 节目ID showId
- 8 栏目ID columnId
- 9 频道ID channelId
- 10 高清标志码 hd
- 11 节目类型码 showType
状态类型码:
- 1:"tv_playing"、2:"vod_playing"、3:"browsing"、4:"tvod_playing"、5:"ad_event" 、6:"external_link"、7:"order"
高清标志码:
- 0:标清、1:高清、2:智能、3:其他
节目类型码:
- 电视剧:tv、电影:movie、综艺:variety、其他:other
流类型日志,原始数据源为kafka,暂时以这两个业务作为流数据演示,实际上平台将是与业务无关的,只关注数据流和数据服务。
三、搜索热词日志: 处理类:org.databandtech.mockmq.HotWordKafkaLog
Kafka Topic: HOTWORDS
- 1 KEYWORD 热词
- 2 USERID 用户id
- 3 TS 搜索时间
四、广告监测日志 处理类:org.databandtech.mockmq.AdKafkaLog
Kafka Topic: ADMONITOR
- 1 OS 设备的操作系统类型
- 2 UID 用户id
- 3 MAC1 MAC地址
- 4 MACCN 当前联网形式
- 5 IP IP
- 6 ROVINCECODE 所属省份代码
- 7 CITYCODE 所属城市代码
- 8 AREACODE 所属区县代码
- 9 TS 客户端触发的时间
- 10 ADMID 广告素材
- 11 ADID 广告主
- 12 APPNAME 应用名称
分布式存储-原始记录备份
从CSV日志生成的数据源需要做原始文档的备份存储,使用HDFS,而kafka流数据则依据具体情况选择是否存入HDFS或者HIVE,还是直接清洗后,存入ClickHouse等。
将CSV日志原始存档进HDFS的方式:
- 1、直接Put文件目录进hdfs文件系统;
- 2、使用Flume的spooling-to-hdfs,使用方法见databand-etl-flume中的spooling-memory-hdfs2.properties
- 3、使用databand-job-springboot定时任务,类型为HdfsBackupJob。
将kafka存进HDFS的方式:
- 1、使用Flume的kafka-to-hdfs,使用方法见databand-etl-flume中的kafka-flume-hdfs.properties;
- 2、使用Flink或者Storm导入,例子见databand-etl-storm、databand-etl-flink;
- 3、使用kafka的客户端库和hdfs客户端库,自行开发。
分布式存储-数据仓库存档
产品表外部表,建表语句
CREATE EXTERNAL TABLE product(address STRING,buycount INT,buydiscount INT,buytotle INT,categoryid STRING,citycode STRING,color STRING,modelid STRING,productid STRING,saledatetime STRING,userid STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LINES TERMINATED BY '
' STORED AS TEXTFILE LOCATION '/home/product';
节目表外部表,建表语句
CREATE EXTERNAL TABLE show(areacode STRING,channelid STRING,citycode STRING,columnid STRING,hd INT,showdatetime STRING,showduration INT,showid STRING,status STRING,userid STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LINES TERMINATED BY '
' STORED AS TEXTFILE LOCATION '/home/show';
– 可以用load data引入数据,覆盖引入: – LOAD DATA LOCAL INPATH '/home/product/2020-12-20.csv' – OVERWRITE INTO TABLE product;
– HDFS 文件 – hive> LOAD DATA INPATH '/home/product/2020-12-20.csv' – > OVERWRITE INTO TABLE product;
– 本地文件 – hive> LOAD DATA LOCAL INPATH '/home/product/2020-12-20.csv' – > OVERWRITE INTO TABLE product;
Count计数语句
- 计算累计订单数:select count(1) from product;
- 计算地区为广州的订单数:select count(1) from product where cityCode="广州";
- 计算节目数:select count(1) from show;
- 计算日志为破茧的记录数:select count(1) from show where showid="破茧";
- 计算2020-12月的全部DELL电脑订单金额:select sum(buytotle) from product where modelid="DELL" and instr(saledatetime,"2020-12")>0;
可以测试一下hive输出结果:
分析规划 - 统计指标规划
产品销售日志 统计规划
X轴维度 - key
- 时间维度: 年、季、月、周;
- 产品分类维度:按产品类型,如电视、PC;
- 按产品型号规格维度;
- 按城市分组维度;
- 按购买者维度;
Y轴维度 - value
- 订单数
- 订单金额
指标:
- 产品各分类订单数,product_order_count_by_cate,按年、季、月、周、天;
- 产品各型号规格订单数,product_order_count_by_model,按年、季、月、天;
- 各城市分布订单数,product_order_count_by_city,按年、月;
- top20订购者订单数,product_order_count20_by_user,按年、月;
- 产品各分类订单金额,product_order_amount_by_cate,按年、季、月、周、天;
- 产品各型号规格订单金额,product_order_amount_by_model,按年、季、月、天;
- 各城市分布订单金额,product_order_amount_by_city,按年、月;
- top20订购者订单金额,product_order_amount20_by_user,按年、月;
节目播出日志 统计规划
X轴维度 - key
- 时间维度: 年、季、月、周;
- 城市维度
- 频道维度
- 节目维度
- 用户维度
Y轴维度 - value
- 播放时长
- 播放次数
指标:
- 按城市分组播放时长,show_dration_by_city,按年、季、月、周、天;
- 按频道分组播放时长,show_dration_by_channel,按年、季、月;
- 按节目top20播放时长,show_dration20_by_show,按年、月;
- 按用户top20播放时长,show_dration20_by_user,按年、月;
- 按城市分组播放次数,show_times_by_city,按年、季、月、周、天;
- 按频道分组播放次数,show_times_by_channel,按年、季、月;
- 按节目top20播放次数,show_times20_by_show,按年、月;
- 按用户top20播放次数,show_times20_by_user,按年、月;
搜索热词日志 统计规划
待完成
广告监测日志 统计规划
待完成
批处理统计分析
产品销售日志 批处理统计分析计算
产品各分类订单数(按天),hive sql:
- select categoryid,saledatetime,sum(buycount) from product group by categoryid,saledatetime order by saledatetime;
产品各分类订单数(按天,指定某天,用于增量定时任务导出统计)
- select categoryid,saledatetime,sum(buycount) from product group by categoryid,saledatetime having saledatetime="2020-12-30" order by saledatetime ;
其他分析查询SQL略,按天统计的数据都有了,按周、月、季、年就以此聚合。
导出结果到本地文件,相同记录则覆盖
use default;
-- Save to [LOCAL]
INSERT OVERWRITE LOCAL DIRECTORY '/home/product_order_count_by_cate'
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
-- SQL
select categoryid,saledatetime,sum(buycount) from product group by categoryid,saledatetime order by saledatetime;
导出结果到HDFS,相同记录则覆盖
use default;
-- Save to HDFS
INSERT OVERWRITE DIRECTORY '/home/product_order_count_by_cate'
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
-- SQL
select categoryid,saledatetime,sum(buycount) from product group by categoryid,saledatetime order by saledatetime;
执行完之后可以查看hdfs的记录是否已经保存
- hadoop fs -tail /home/product_order_count_by_cate/000000_0
节目播出日志 批处理统计分析计算
略
批处理定时任务
databand-job-springboot:定时任务作业调度服务,支持Shell,Hadoop MR,HiveSQL,Python,Spark,Flink,JavaJar任务。
- 注入见TaskConfig的方法scheduledTaskJobMap() 的例子,目前仅提供java注入,未来有数据库加载注入和配置文件注入
- 命令行任务,CommandExecuteJob的实例,
- 原始记录备份(从本地),从数据源中备份原始数据到HDFS,HdfsBackupJob;
- 原始记录备份(到本地),从HDFS数据源中备份原始数据到本地文件,HdfsToLocalFileJob;
- Hive SQL任务,HiveSqlQueryJob,hive执行DQL查询任务,需要返回数据集,并对数据集进行分析数据库存储,存储的数据用于报表图表等展现,必须实现SavableTaskJob接口;
- Hive SQL任务,HiveSqlExecuteJob,hive执行脚本任务,用于DDL、DML操作,比如load data等;
- 统计分析计算,Hadoop中运行MR,执行处理,HadoopMRJob;
- 更多任务类型,不一一列出。
其中每种类型都有针对各个统计指标的实例:JobInstances 和 JobType是多对一的关系。
运行方式:
- 1、先导入数据:databand_scheduletask.sql
- 2、查看任务: http://localhost:8081/getAllSchedule
3、启动单一任务,目前还没有统一的管理界面,未来会开发完善:
- http://localhost:8081/start?jobcode=WindowsDir1
- http://localhost:8081/start?jobcode=WindowsIP1
- http://localhost:8081/start?jobcode=hdfs_product2020
- http://localhost:8081/start?jobcode=hdfs_toLocal2020
- http://localhost:8081/start?jobcode=hdfs_toLocal2020_1
流数据任务
databand-streamjob-springboot:流数据持久化任务