https://n3mnd6knun.feishu.cn/docx/ZOZzdj3pJoBIfex9M2XcWHFqnNe?openbrd=1&doc_app_id=501&blockId=Iayidy2CAoJUA4xSIHMcgohQn7c&blockType=whiteboard&blockToken=BMFkwx79IhOMy1btnbFcLmM3nze#Iayidy2CAoJUA4xSIHMcgohQn7c

小文件问题

  1. 什么情况下算小文件
    1. 某个分区下文件数量>1000&&平均文件大小<1MB
  2. 产生原因
    1. 只有map时,若map数过多(比如直接select插入数据),可能产生大量的小文件;
    2. reduce数量越多,小文件也越多;
  3. 大量小文件会产生的问题 :
    1. 占用大量的 NN 内存,影响 HDFS 的横向扩展能力;
    2. 下游作业读取文件产生大量的map task,而每个task只处理少量数据,浪费集群资源;
    3. 下游读取需要启动大量map,且读取文件需要NN通信,最终导致作业pending时间过长,造成数据生产延迟;

解决方式

  1. 定位问题:
    1. 命令行 grep ‘^-’ | wc-l 得到分区内的文件数

输出时合并小文件

  1. 计算后启动新任务进行文件merge

    /* Hive SQL 对应参数 */
    -- 在 map-only job 后合并文件
    set hive.merge.mapfiles = true;
    -- 在 map-reduce job 后合并文件
    set hive.merge.mapredfiles = true;
    -- 当输出文件平均大小小于设定值时,启动一个独立的 map-reduce 任务进行文件merge
    set hive.merge.smallfiles.avgsize=104857600;
    -- 合并后每个文件的大小,默认256000000
    set hive.merge.size.per.task=104857600;
    
    /* Spark SQL 对应参数 */
    -- 与 hive.merge.smallfiles.avgsize 类似,当输出文件平均大小小于设定值时
    -- 额外启动一轮stage进行小文件的合并
    -- 设置阈值
    **set spark.sql.mergeSmallFileSize=256000000;**
    -- 与 hive.merge.size.per.task 类似,合并后每个文件的大小
    -- 当启动额外stage进行小文件合并时,会根据该参数的值和input数据量共同计算出partition 数量
    set spark.sql.targetBytesInPartitionWhenMerge=256000000;
    

    配置后:

    运行任务多了一个stage,文件数变少,每个文件变大

distribute by key

  1. 通过在语句后面添加 distribute by key ,将数据按key值分发到reduce,例:

    where dt in ('20201025','20201024','20201023') distribute by id%49

需要注意,不要使用rand()处理

distribute by id%100 → distribute by id%49

根据参数控制输出文件个数(有reducer时)

/* Hive SQL 对应参数,计算Reducer数 */
-- 设置每个reducer处理的大小,用于计算reducer个数
set hive.exec.reducers.bytes.per.reducer=100000000;
-- 设置作业最大的reduce数,默认为999
set hive.exec.reducers.max = 888;
-- Reducer数=min(hive.exec.reducers.max, 总输入数据量/ hive.exec.reducers.bytes.per.reducer)

/* Spark SQL 对应参数,计算partition数 */
-- 是否开启调整partition功能,如果开启,partition可能会被合并
set spark.sql.adaptive.enabled=true; 
-- 配合spark.sql.adaptive.enabled使用,类似hive.exec.reducers.bytes.per.reducer,用于计算partition数
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=100000000;
-- 配合spark.sql.adaptive.enabled使用,限制Partitions数
set spark.sql.adaptive.minNumPostShufflePartitions=30; 
set spark.sql.adaptive.maxNumPostShufflePartitions=100;

另外,对于Hive SQL,还可以直接设定reduce个数控制输出文件 一般不直接设定reduce个数,设定不当反而降低执行效率;

参数设置1
SQL
-- 设定reduce个数,如果设定则不会计算reducer个数
set mapred.reduce.tasks=30;

读取时合并小文件