day12_调度和可视化
- 游戏开发
- 2025-09-02 13:39:02

文章目录 day12_调度和可视化一、任务调度1、开启进程2、登入UI界面3、配置租户4、创建项目5、创建工作流5.1 HiveSQL部署(掌握)5.2 SparkDSL部署(掌握)5.3 SparkSQL部署(熟悉)5.4 SeaTunnel部署(掌握)5.5 任务上线 二、数据可视化1、用户画像管理系统(了解)2、使用Doris分析ElasticSearch2.1 创建Catalog2.2 使用示例 3、FineBI可视化3.1 安装FineBI3.2 创建数据连接3.3 创建公共数据(掌握)3.3.1 Doris数据集3.3.2 ElasticSearch数据集3.3.3 ElasticSearch和MySQL标签关联的数据集3.3.4 数据集更新 3.4 创建分析主题3.5 创建各个组件3.6 创建仪表盘3.7 图表发布 day12_调度和可视化 一、任务调度 1、开启进程
按顺序启动如下服务。注意:如果已经启动了,不用再次启动,其他的例如ElasticSearch、Doris、Hive等不要停。
启动Zookeeper nohup /export/server/kafka/bin/zookeeper-server-start.sh ../config/zookeeper.sql > /dev/null 2>&1 & 注意: 这是一条命令 启动Hadoop start-all.sh 启动Hive 启动metastore服务 nohup /export/server/hive/bin/hive --service metastore > /tmp/hive-metastore.log 2>&1 & 启动Hiveserver2服务 nohup /export/server/hive/bin/hive --service hiveserver2 > /tmp/hive-hiveserver2.log 2>&1 & 启动DS海豚调度器 /export/server/dolphinscheduler/bin/start-all.sh 如需写出到Doris,则还需要启动Doris;如需写出到ES,则还需启动ES 启动Doris /export/server/doris/fe/bin/start_fe.sh --daemon /export/server/doris/be/bin/start_be.sh --daemon /export/server/doris/apache_hdfs_broker/bin/start_broker.sh --daemon ---------------------------------------------------------- 启动ES 1- 切换用户 su es 2- 进入目录 cd /home/es/elasticsearch-7.10.2/bin 3- 启动 elasticsearch -d 4- 切回为root exit 5- 验证 jps ---------------------------------------------------------- 启动Kafka cd /export/server/kafka/bin nohup ./kafka-server-start.sh ../config/server.sql 2>&1 & 2、登入UI界面http://192.168.88.166:12345/dolphinscheduler/ui/login
账号:admin
密码:dolphinscheduler123
3、配置租户租户对应的是 Linux 的用户,用于 worker 提交作业所使用的用户。注意一定要是root。
然后设置admin的租户为root。
4、创建项目项目管理->创建项目->输入名称即可
5、创建工作流 5.1 HiveSQL部署(掌握)演示用的HiveSQL语句
insert overwrite table dwm.dwm_mem_first_buy_i partition(dt) select t.zt_id, t.trade_date_time, t.trade_date, t.week_trade_date, t.month_trade_date, t.store_no, t.sale_amount, t.order_no, t.source_type, '2025-03-01' as dt from (select zt_id, create_time as trade_date_time, trade_date, week_trade_date, month_trade_date, store_no, real_paid_amount as sale_amount, order_no, source_type, row_number() over(partition by zt_id order by create_time) as rn from dwm.dwm_sell_o2o_order_i where dt<'${inputdate}' and member_type = 1) t left join dwm.dwm_mem_first_buy_i f on t.zt_id=f.zt_id and f.dt < '${inputdate}' where t.rn=1 and f.zt_id is null配置过程如下:
创建数据源:点击数据源中心,然后点击创建数据源
填写数据源信息
数据源:选择hive/impala
数据源名称: hive
主机名:up01
端口:10000
用户名:root
数据库名:dwm
配置完成后点击测试连接,没问题后点击提交。
创建工作流:点击工作流定义,然后点击创建工作流。拖拽一个SQL类型到面板,然后填写节点设置。
节点名称为表名
设置失败重试次数为3次
设置延时告警
设置数据源类型为HIVE,数据源实例选择hive
SQL类型选择非查询
SQL语句填写计算的语句,注意不要带上;
如果使用到UDF函数,可以先注册后进行选择使用
可以在任务中自定义参数,也可以在工作流中进行整体设置。
如果有前置SQL语句或后置SQL语句,可以添加在对应的地方,这里可以把set语句写在前置SQL中
set hive.exec.dynamic.partition.mode=nonstrict;
在工作中,需要配置前置任务,这里的前置任务应该是 dwm_sell_o2o_order_i 。
保存工作流:点击右上角保存,填写基本信息工作流名称填写具体的主题或需求名称
租户选择root
执行策略选择并行
全局变量设置key为inputdate,value为$[yyyy-MM-dd-12]。注意使用的是中括号。
测试:在工作流定义页面,选择member工作流,点击上线,然后点击运行,进行测试。 结果验证 DS调度前后执行如下的SQL,核对数据条数是否发生变化 select count(1) from dwm.dwm_mem_first_buy_i; 5.2 SparkDSL部署(掌握) 将程序部署到Yarn上,需要注意如下两点: - 注意1:需要将代码中的setMaster内容改为yarn,或者直接删除。推荐直接删除 - 注意2:在程序代码中与文件路径有关的地方,需要全部都改为分布式文件系统,例如:HDFS。否则会出现找不到文件的异常 本次项目中,也就是需要修改基类中的master("local[*]"),这里推荐删除。具体代码如下:创建工作流,以及保存工作流、测试工作流的方式都一样,最大的区别是任务的类型及配置。
首先需要将整体的代码打成zip包,因为计算标签的代码中使用到了相关的代码。这里只将tags文件夹打成zip包即可。压缩包的名称不要有中文。
将zip包上传到资源中心
上传标签计算代码将标签计算的代码,也就是每个四级标签对应的py文件上传到资源中心。可以按照在PyCharm中的文件结构组织文件。
比如创建tags文件夹,在tags文件夹下创建match文件夹,在match文件夹中上传匹配类标签。
创建计算节点新建一个工作流,在其中拖拽一个SPARK类型。然后填写节点设置。
选项参数如下:
--conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \ --conf "spark.pyspark.python=/root/anaconda3/bin/python3" \ --py-files tags.zip 注意: --py-files tags.zip要改成你自己的压缩包在资源处选择打好并上传的zip包
注意:Spark版本一定要选择SPARK1
保存工作流:使用相同的方法,配置其他几个标签计算的节点,配置完成后,点击保存。填写工作流,选择root租户,点击确定。 上线测试:在工作流定义页面,选择match工作流,点击上线,然后点击运行,进行测试。**拓展:**也可以直接使用spark-submit命令来提交作业,假设将tags.zip上传到了hdfs的/tags路径,而age_tag.py也上传到了/tags路径,则命令如下:
cd /export/server/spark bin/spark-submit --master yarn \ --deploy-mode client \ --driver-cores 1 --driver-memory 512M --num-executors 2 --executor-cores 2 --executor-memory 2G \ --name age_tags \ --conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \ --conf "spark.pyspark.python=/root/anaconda3/bin/python3" \ --py-files hdfs://up01:8020/tags/tags.zip \ hdfs://up01:8020/tags/age_tag.py 5.3 SparkSQL部署(熟悉)整体与HiveSQL部署类似。唯一区别是程序类型选择SQL,Spark版本选择SPARK1。
演示用的SQL:
insert overwrite table ads.ads_mem_user_rfm_i select mem.zt_id, if(t.zt_id is null, '121', t.tags_id) as tags_id, -- 121理解为默认标签值 '2025-03-01' as dt from dwd.dwd_mem_member_union_i mem left join ( select zt_id, case when r = 1 and f = 1 and m = 1 then '114' when r = 1 and f = 0 and m = 1 then '115' when r = 0 and f = 1 and m = 1 then '116' when r = 0 and f = 0 and m = 1 then '117' when r = 1 and f = 1 and m = 0 then '118' when r = 1 and f = 0 and m = 0 then '119' when r = 0 and f = 1 and m = 0 then '120' when r = 0 and f = 0 and m = 0 then '121' end as tags_id from ( select zt_id, if(min(r) over (partition by zt_id) < 7, 1, 0) as r, -- 取最小的时间差作为r,如果小于7则为1 if(f > sum_f * 1.00 / user_count, 1, 0) as f, -- 与平均值对比,如果大于平均值则为1 if(m > sum_m * 1.00 / user_count, 1, 0) as m, -- 与平均值对比,如果大于平均值则为1 row_number() over (partition by zt_id order by dt desc ) as rn -- 按照日期进行逆序排列 from ( -- 这里就算出了每个用户的R、F、M的值是多少 select zt_id, datediff(current_date(), '2025-01-19') as r, -- 距离当天的时间差 sum(consume_times) over (partition by zt_id) as f, -- 单个用户完单单量 sum(consume_times) over () as sum_f, -- 所有用户完单单量 sum(consume_amount) over (partition by zt_id) as m, -- 单个用户完单金额 sum(consume_amount) over () as sum_m, -- 所有用户完单金额 t2.user_count, -- 总用户数 t1.dt from dwm.dwm_mem_member_behavior_day_i as t1 cross join ( select count(distinct zt_id) as user_count -- 总用户数 from dwm.dwm_mem_member_behavior_day_i -- 用户行为表:下单、付款、浏览、退货等各种信息 where datediff(current_date(), to_date(dt)) <= 30 ) t2 where datediff(current_date(), to_date(t1.dt)) <= 30 ) t3 ) t4 where rn = 1 -- 只取最新的一条记录 ) t on mem.zt_id = t.zt_id where mem.start_date <= '${inputdate}' and mem.end_date > '2025-01-19'
5.4 SeaTunnel部署(掌握)
在工作流中拖拽一个SHELL节点,然后设置节点。
设置节点名称
配置重试次数,这里可以只设置1次
配置资源信息
配置超时策略,勾选超时告警和超时失败(防止因任务卡住而占用资源,如果超时会自动被杀死)
填写脚本,注意将日期配置成参数形式
首先配置好hive2es.config,然后在shell中填写命令。
cd /export/server/apache-seatunnel-2.3.5 ./bin/seatunnel.sh --config ./config/job/hive2es.config -e local -i pt=${inputdate} 设置前置任务5.5 任务上线 上线任务
任务上线后,就不可编辑,这样可以避免正在执行的任务被修改
定时时间设定,DS已经可视化处理,我们直接点击相应的时间即可。
定时好了后我么可以点击执行时间,查看它任务模拟的执行时间是否正确,执行得到结果如下:
上线定时整个流程到此就结束了。到指定的时间点,任务会被执行。
二、数据可视化 1、用户画像管理系统(了解)画像web产品主要使用java和vue实现前后端交互和标签可视化。其中主要有3方面可视化内容:单个用户标签可视化、单个标签可视化、组合标签可视化。
单个用户分析
标签分析
圈人精准营销 使用Elasticsearch的DSL语句进行查询 案例: 【查用户】查询id为 16682885 的用户的所有标签 POST /user_profile_tags/_search { "query": { "term": { "user_id": 16682885 } } } 2、使用Doris分析ElasticSearch 启动Doris /export/server/doris/fe/bin/start_fe.sh --daemon /export/server/doris/be/bin/start_be.sh --daemon /export/server/doris/apache_hdfs_broker/bin/start_broker.sh --daemon 2.1 创建Catalog 在Doris中执行如下命令: CREATE CATALOG es sql ( "type"="es", "hosts"="http://up01:9200", "enable_docvalue_scan" = "true", "enable_keyword_sniff" = "true" ); 注: 具体参数解释查看课件 验证集成结果 -- 查看Doris与数据源的整合情况 show catalogs; -- Catalog名称.数据库名称.数据表名称 select * from es.default_db.user_profile_tags limit 10; 2.2 使用示例完成在Doris中建立ES外表后,除了无法使用Doris中的数据模型(rollup、预聚合、物化视图等)外并无区别。使用示例如下
切换到es的catalog switch es; 【查用户】查询id为 16682885 的用户的所有标签 select user_id,tags_id_times,tags_id_once,tags_id_streaming from es.default_db.user_profile_tags where user_id=16682885; 【查标签】查询标签为18的用户 select user_id,tags_id_times,tags_id_once,tags_id_streaming from es.default_db.user_profile_tags where tags_id_times='18' limit 10; 3、FineBI可视化 3.1 安装FineBI1- 执行FineBI安装包, 将其安装到windows上。下一步安装即可。安装完以后启动
2- 设置管理员账户
3.2 创建数据连接 点击左侧的管理系统,然后点击数据连接,点击数据连接管理 点击新建数据连接,选择所有,点击Doris。 根据下图进行配置,完成后点击右上角测试链接,通过后点击保存。 注意一:如果要配置Doris与ElasticSearch集成后的数据库,数据库名称前面需要带Catalog名称。如下图中的红色框 注意二:如果FineBI不支持Doris,直接使用MySQL连接即可。端口号要改成9030 3.3 创建公共数据(掌握) 3.3.1 Doris数据集 点击公共数据,然后点击新建文件夹,新建xtzg文件夹。 点击xtzg文件夹右边的+号,选择数据库表。数据连接选择Doris_DB_log_analysis_db,将右边的所有表选上,点击确定,完成数据库表数据的添加。
3.3.2 ElasticSearch数据集因为es数据属于外部表,所以不能直接通过数据库表来配置,需要通过SQL数据集的方式来配置。
前提是先要创建es的catalog。
点击【添加表】,创建SQL数据集填写表名为:user_profile_tags
填写SQL语句
select * from es.default_db.user_profile_tags 点击右上角预览,没有问题后,点击确定 3.3.3 ElasticSearch和MySQL标签关联的数据集 前提准备在/export/server/doris/fe/lib/目录中上传mysql-connector-java-5.1.49.jar。
创建catalog CREATE CATALOG jdbc_mysql sql ( "type"="jdbc", "user"="root", "password"="123456", "jdbc_url" = "jdbc:mysql://up01:3306/tags_info", "driver_url" = "file:///export/server/doris/fe/lib/mysql-connector-java-5.1.49.jar", "driver_class" = "com.mysql.jdbc.Driver" ); -- 集成以后的验证语句 select * from jdbc_mysql.tags_info.tbl_basic_tag limit 10; 有了mysql catalog后,就可以查询mysql中的数据。 查询某个标签下的所有用户及对应的标签名称:如查询每个用户的年龄段标签,其中年龄段标签的pid为15,对应的sql如下: select t1.user_id,t2.name from ( select user_id, tags_id_times, cast(tag as int) as tag from es.default_db.user_profile_tags lateral view explode_split(tags_id_times,',') t as tag ) t1 join jdbc_mysql.tags_info.tbl_basic_tag t2 on t1.tag=t2.id where t2.pid=15 配置数据集:跟配置es的数据集相同,只是sql不同。 3.3.4 数据集更新可以设置缓存策略,从而实现数据的自动更新。
点击数据表,然后点击缓存设置,然后点击编辑,设置缓存策略。
3.4 创建分析主题点击仪表板,然后点击新建文件夹,创建xtzg文件夹。
点击xtzg文件夹进去,然后新建仪表板,创建小兔智购用户画像分析主题。
选择数据。点击公共数据,选择xtzg文件夹下的doris_log_user_event_result,然后确定。 3.5 创建各个组件 绘制漏斗图绘制柱状图
用同样的方式,可以绘制其他图
3.6 创建仪表盘 点击下方添加仪表盘,重命名为:小兔智购用户画像 添加各个组件:通过拖拽的方式,添加绘制好的各个组件 3.7 图表发布可以按照文档操作 help.fanruan /finebi/doc-view-2108.html
day12_调度和可视化由讯客互联游戏开发栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“day12_调度和可视化”