https://n3mnd6knun.feishu.cn/docx/ZOZzdj3pJoBIfex9M2XcWHFqnNe?openbrd=1&doc_app_id=501&blockId=Iayidy2CAoJUA4xSIHMcgohQn7c&blockType=whiteboard&blockToken=BMFkwx79IhOMy1btnbFcLmM3nze#Iayidy2CAoJUA4xSIHMcgohQn7c
计算后启动新任务进行文件merge
/* Hive SQL 对应参数 */
-- 在 map-only job 后合并文件
set hive.merge.mapfiles = true;
-- 在 map-reduce job 后合并文件
set hive.merge.mapredfiles = true;
-- 当输出文件平均大小小于设定值时,启动一个独立的 map-reduce 任务进行文件merge
set hive.merge.smallfiles.avgsize=104857600;
-- 合并后每个文件的大小,默认256000000
set hive.merge.size.per.task=104857600;
/* Spark SQL 对应参数 */
-- 与 hive.merge.smallfiles.avgsize 类似,当输出文件平均大小小于设定值时
-- 额外启动一轮stage进行小文件的合并
-- 设置阈值
**set spark.sql.mergeSmallFileSize=256000000;**
-- 与 hive.merge.size.per.task 类似,合并后每个文件的大小
-- 当启动额外stage进行小文件合并时,会根据该参数的值和input数据量共同计算出partition 数量
set spark.sql.targetBytesInPartitionWhenMerge=256000000;
配置后:
运行任务多了一个stage,文件数变少,每个文件变大
通过在语句后面添加 distribute by key ,将数据按key值分发到reduce,例:
where dt in ('20201025','20201024','20201023') distribute by id%49
需要注意,不要使用rand()处理
distribute by id%100 → distribute by id%49
/* Hive SQL 对应参数,计算Reducer数 */
-- 设置每个reducer处理的大小,用于计算reducer个数
set hive.exec.reducers.bytes.per.reducer=100000000;
-- 设置作业最大的reduce数,默认为999
set hive.exec.reducers.max = 888;
-- Reducer数=min(hive.exec.reducers.max, 总输入数据量/ hive.exec.reducers.bytes.per.reducer)
/* Spark SQL 对应参数,计算partition数 */
-- 是否开启调整partition功能,如果开启,partition可能会被合并
set spark.sql.adaptive.enabled=true;
-- 配合spark.sql.adaptive.enabled使用,类似hive.exec.reducers.bytes.per.reducer,用于计算partition数
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=100000000;
-- 配合spark.sql.adaptive.enabled使用,限制Partitions数
set spark.sql.adaptive.minNumPostShufflePartitions=30;
set spark.sql.adaptive.maxNumPostShufflePartitions=100;
另外,对于Hive SQL,还可以直接设定reduce个数控制输出文件 一般不直接设定reduce个数,设定不当反而降低执行效率;
参数设置1
SQL
-- 设定reduce个数,如果设定则不会计算reducer个数
set mapred.reduce.tasks=30;