11.11大促主会场
新人页面
精选商品
首月0月租体验,领12个月京东PLUS
自营热卖

121.Spark大型电商项目-广告点击流量实时统计-对实时计算程序进行性能调优

温酒书生 1年前   阅读数 143 0

目录

1、并行化数据接收:处理多个topic的数据时比较有效

2、spark.streaming.blockInterval:增加block数量,增加每个batch rdd的partition数量,增加处理并行度

3、inputStream.repartition():重分区,增加每个batch rdd的partition数量

4、调节并行度

5、使用Kryo序列化机制

6、batch interval:每个的处理时间必须小于batch interval


本篇文章记录广告点击流量实时统计-对实时计算程序进行性能调优。


1、并行化数据接收:处理多个topic的数据时比较有效

int numStreams = 5;
List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<JavaPairDStream<String, String>>(numStreams);
for (int i = 0; i < numStreams; i++) {
  kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
unifiedStream.print();

2、spark.streaming.blockInterval:增加block数量,增加每个batch rdd的partition数量,增加处理并行度

receiver从数据源源源不断地获取到数据;首先是会按照block interval,将指定时间间隔的数据,收集为一个block;默认时间是200ms,官方推荐不要小于50ms;接着呢,会将指定batch interval时间间隔内的block,合并为一个batch;创建为一个rdd,然后启动一个job,去处理这个batch rdd中的数据

batch rdd,它的partition数量是多少呢?一个batch有多少个block,就有多少个partition;就意味着并行度是多少;就意味着每个batch rdd有多少个task会并行计算和处理。

当然是希望可以比默认的task数量和并行度再多一些了;可以手动调节block interval;减少block interval;每个batch可以包含更多的block;有更多的partition;也就有更多的task并行处理每个batch rdd。

定死了,初始的rdd过来,直接就是固定的partition数量了

3、inputStream.repartition(<number of partitions>):重分区,增加每个batch rdd的partition数量

有些时候,希望对某些dstream中的rdd进行定制化的分区
对dstream中的rdd进行重分区,去重分区成指定数量的分区,这样也可以提高指定dstream的rdd的计算并行度

4、调节并行度


spark.default.parallelism
reduceByKey(numPartitions)

5、使用Kryo序列化机制

spark streaming,也是有不少序列化的场景的
提高序列化task发送到executor上执行的性能,如果task很多的时候,task序列化和反序列化的性能开销也比较可观
默认输入数据的存储级别是StorageLevel.MEMORY_AND_DISK_SER_2,receiver接收到数据,默认就会进行持久化操作;首先序列化数据,存储到内存中;如果内存资源不够大,那么就写入磁盘;而且,还会写一份冗余副本到其他executor的block manager中,进行数据冗余。

6、batch interval:每个的处理时间必须小于batch interval

实际上你的spark streaming跑起来以后,其实都是可以在spark ui上观察它的运行情况的;可以看到batch的处理时间;
如果发现batch的处理时间大于batch interval,就必须调节batch interval
尽量不要让batch处理时间大于batch interval
比如batch每隔5秒生成一次;你的batch处理时间要达到6秒;就会出现,batch在你的内存中日积月累,一直囤积着,没法及时计算掉,释放内存空间;而且对内存空间的占用越来越大,那么此时会导致内存空间快速消耗

如果发现batch处理时间比batch interval要大,就尽量将batch interval调节大一些


注意:本文归作者所有,未经作者允许,不得转载

全部评论: 0

    我有话说: