flink数据倾斜
在使用 Flink的时候你有遇到过数据倾斜吗下面的方案可能对你会有帮助。什么是数据倾斜在使用一些大数据处理框架进行海量数据处理的过程中可能会遇到数据倾斜的问题由于大数据处理框架本身架构的原因在框架层面数据倾斜问题是无法避免的只能在业务层面来缓解或者避免。因为要处理海量的数据常用的大数据处理框架都会采用分布式架构将海量数据分成多个小的分片再将每个小分片分配给不同的计算节点来处理通过对计算节点进行横向扩容来快速提升框架的数据处理性能因此即使是海量数据也可以在较短的时间内完成处理但是也正是由于这种架构设计导致了数据倾斜问题的产生。试想如果小分片中的数据分布不均匀有某个或者某几个小分片中包含了80%的数据量那么处理这些分片的计算节点压力就会比较大就会导致整个分布式集群中大部分节点是空闲的只有某几个比较繁忙无法使计算资源得到重复利用最终导致框架的整体效率比较低。如何解决数据倾斜因为产生数据倾斜问题的直接原因就是数据分布不均匀要解决这个问题最直接的方法就是在业务层面将数据的分布变得均匀一些让分布式集群中每个计算节点的资源得到重复利用。因为数据分布不均匀是业务层面的问题将数据分布变均匀的方案也要结合业务场景来设计下面我们以wordCount为例来演示以下数据倾斜问题以及相应的解决方案。后面当遇到数据倾斜问题时希望对你有一定的启发。实践案例下面是一个wordCount的程序的实现public class ShuffleWindowFunctionTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env StreamExecutionEnvBase.getStreamEnv(9091); DataStreamSourceTuple2String,Integer sourceStream env.addSource(StreamExecutionEnvBase.getRandomStringSource(1000000, 1)); (sourceStream.keyBy(new KeySelectorTuple2String,Integer, String() { Override public String getKey(Tuple2String,Integer value) throws Exception { return value.f0 ; } }) .timeWindow(Time.seconds(5)) .process(new ProcessWindowFunctionTuple2String,Integer, String, String, TimeWindow() { MapString, Integer map new HashMap(); Override public void process(String s, Context context, IterableTuple2String,Integer iterable, CollectorString collector) throws Exception { for (Tuple2String,Integer tuple2 : iterable) { String key tuple2.f0; if (map.containsKey(key)) { map.put(key, map.get(key) 1); } else { map.put(key, 1); } } for (Map.EntryString, Integer entry : map.entrySet()) { collector.collect(String.format(key:%s,count:%s, entry.getKey(), entry.getValue())); } } })).setParallelism(8) .print(total).setParallelism(8); env.execute(shuffle stream); } } public class StreamExecutionEnvBase { public static StreamExecutionEnvironment getStreamEnv(Integer webUiPort) { Configuration conf new Configuration(); conf.setString(rest.port,String.valueOf(webUiPort)); StreamExecutionEnvironment env StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); return env; } public static SourceFunctionTuple2String,Integer getRandomStringSource(int count, Integer sleep) { return new SourceFunctionTuple2String,Integer() { Random random new Random(); String[] values {hadoop,flink,spark,redis,redis,redis,redis,redis,redis,redis,redis,redis,redis,redis,redis,redis,redis ,redis,redis,redis,redis,redis,redis,redis,redis,redis,redis,redis,redis,redis,redis,redis,redis,redis,redis ,redis,redis,redis,redis,redis,redis,redis,redis,redis,redis,redis,redis,redis,redis,redis,redis,redis,redis,redis}; volatile boolean running true; int c count; Override public void run(SourceContextTuple2String,Integer ctx) throws Exception { while (running c-- 0) { String target values[c % values.length]; ctx.collect(new Tuple2(target,random.nextInt(100))); TimeUnit.MILLISECONDS.sleep(sleep); } } Override public void cancel() { running false; System.out.println(cancel job ...); } }; } }上面的source代码中word为redis的数据量比较大明显多于其他word的数据量。此时对于处理reids的subTask的压力会比较大我们可以通过flink的监控来进行验证具体如下图在上图中可以发现subTask0处理的数据量是其他SubTask的40倍左右此时产生了明显的数据倾斜问题。为了解决redis倾斜的问题我们可以将redis生成key的过程进行优化将生成的key进行打散具体实现过程如下sourceStream.keyBy(new KeySelectorTuple2String,Integer, String() { Override public String getKey(Tuple2String,Integer value) throws Exception { if(redis.equals(value.f0)) return value.f0 value.f1; return value.f0; } }这样word为redis的数据生成的key就会被打散成多个打散的多个key分散到不同的subTask中处理这样数据倾斜的问题就得到了解决执行结果如下图在这里插入图片描述到这里我们解决了数据倾斜的问题但是细心的读者会发现将key打散后数据倾斜问题虽然解决了但是sink到下游的数据量也变多了也就是说发送到下游的数据聚合度降低了数据变得更散了如下图优化前优化后在这里插入图片描述结合之前文章Flink中的Window计算-增量计算全量计算我们可以知道发送下游数据量变多的原因key变多了每个key都会对应一个 ProcessWindowFunction 实例也就是 ProcessWindowFunction 实例个数变得更多聚合结果 map的聚合度就变小了发送到下游的数据量也就变得更多了。如果flink的下游是存储层如mysql那么大批量的数据写到mysql中对mysql的并发处理能力和存储都会是巨大的挑战。那么该如何解决这个问题呢解决方案其实也很简单对发送给下游的数据进行二次聚合将分散的数据再次聚合一下具体实现可以参考如下代码public static void main(String[] args) throws Exception { StreamExecutionEnvironment env StreamExecutionEnvBase.getStreamEnv(9091); DataStreamSourceTuple2String,Integer sourceStream env.addSource(StreamExecutionEnvBase.getRandomStringSource(1000000, 1)); (sourceStream.keyBy(new KeySelectorTuple2String,Integer, String() { Override public String getKey(Tuple2String,Integer value) throws Exception { if(redis.equals(value.f0)) return value.f0 value.f1; return value.f0; } }) .timeWindow(Time.seconds(5)) .process(new ProcessWindowFunctionTuple2String,Integer, String, String, TimeWindow() { MapString, Integer map new HashMap(); Override public void process(String s, Context context, IterableTuple2String,Integer iterable, CollectorString collector) throws Exception { for (Tuple2String,Integer tuple2 : iterable) { String key tuple2.f0; if (map.containsKey(key)) { map.put(key, map.get(key) 1); } else { map.put(key, 1); } } for (Map.EntryString, Integer entry : map.entrySet()) { collector.collect(String.format(key:%s,count:%s, entry.getKey(), entry.getValue())); } } })) .map(new MapFunctionString, Tuple2String,Integer() { Override public Tuple2String, Integer map(String value) throws Exception { String[] split value.split(,); return new Tuple2String,Integer(split[0].split(:)[1], Integer.valueOf(split[1].split(:)[1])); } }).keyBy(new KeySelectorTuple2String, Integer, String() { Override public String getKey(Tuple2String, Integer value) throws Exception { return value.f0; } }).timeWindow(Time.seconds(5)) .process(new ProcessWindowFunctionTuple2String,Integer, String, String, TimeWindow() { MapString, Integer map new HashMap(); Override public void process(String s, Context context, IterableTuple2String,Integer iterable, CollectorString collector) throws Exception { for (Tuple2String,Integer tuple2 : iterable) { String key tuple2.f0; if (map.containsKey(key)) { map.put(key, map.get(key) 1); } else { map.put(key, 1); } } for (Map.EntryString, Integer entry : map.entrySet()) { collector.collect(String.format(key:%s,count:%s, entry.getKey(), entry.getValue())); } } }).setParallelism(8) .print().setParallelism(4); env.execute(shuffle stream); }执行效果如下图所示此时数据倾斜问题得到了解决同时发送给下游的数据量也变小了。这时可能还会有读者剔除疑问二次聚合的过程中是否还会产生数据倾斜答案是会的。只是这个数据倾斜的程度是可控的。因为 第一次聚合后的数据量的最大值为业务key的个数 * 离散度(可能的后缀的个数在上面的例子中是:random.nextInt(100)也就是100种)。所以即使有倾斜问题也不会随着数据量增大而增大这种倾斜问题不会产生太大影响基本可以忽略。