11.11大促主会场
新人页面
精选商品
首月0月租体验,领12个月京东PLUS
自营热卖

120.Spark大型电商项目-广告点击流量实时统计-实现实时计算程序的HA高可用性

山边的诗与风 1年前   阅读数 254 0

目录

HA高可用性

1、设置checkpoint目录

2、Driver高可用性

3、实现RDD高可用性:启动WAL预写日志机制


本篇文章记录广告点击流量实时统计-实现实时计算程序的HA高可用性。

HA高可用性

High Availability,如果有些数据丢失,或者节点挂掉,那么不能让你的实时计算程序挂了,必须做一些数据上的冗余副本,保证你的实时计算程序可以7 * 24小时的运转。

通过一整套方案(3个步骤),开启和实现实时计算程序的HA高可用性,保证一些关键数据都有其冗余副本,不至于因为节点挂掉或者其他原因导致数据丢失。

1、设置checkpoint目录

updateStateByKey、window等有状态的操作,自动进行checkpoint,必须设置checkpoint目录。

checkpoint目录:容错的文件系统的目录,比如说,常用的是HDFS

SparkStreaming.checkpoint("hdfs://Master:9090/checkpoint")

jssc.checkpoint("hdfs://Master:9000/streaming_checkpoint");

设置完这个基本的checkpoint目录之后,有些会自动进行checkpoint操作的DStream,就实现了HA高可用性;checkpoint,相当于是会把数据保留一份在容错的文件系统中,一旦内存中的数据丢失掉;那么就可以直接从文件系统中读取数据;不需要重新进行计算

2、Driver高可用性

第一次在创建和启动StreamingContext的时候,那么将持续不断地将实时计算程序的元数据(比如说,有些dstream或者job执行到了哪个步骤),如果后面,不幸,因为某些原因导致driver节点挂掉了,那么可以让spark集群自动重启driver,然后继续运行时候计算程序,并且是接着之前的作业继续执行,没有中断,没有数据丢失

第一次在创建和启动StreamingContext的时候,将元数据写入容错的文件系统(比如hdfs);spark-submit脚本中加一些参数;保证在driver挂掉之后,spark集群可以自己将driver重新启动起来;而且driver在启动的时候,不会重新创建一个streaming context,而是从容错文件系统(比如hdfs)中读取之前的元数据信息,包括job的执行进度,继续接着之前的进度,继续执行。

使用这种机制,就必须使用cluster模式提交,确保driver运行在某个worker上面;但是这种模式不方便我们调试程序,一会儿还要最终测试整个程序的运行,打印不出log;我们这里仅仅是用我们的代码给大家示范一下:

JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
  @Override 
  public JavaStreamingContext create() {
    JavaStreamingContext jssc = new JavaStreamingContext(...);  
    JavaDStream<String> lines = jssc.socketTextStream(...);     
    jssc.checkpoint(checkpointDirectory);                       
    return jssc;
  }
};

JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);
context.start();
context.awaitTermination();

spark-submit
--deploy-mode cluster
--supervise

3、实现RDD高可用性:启动WAL预写日志机制

spark streaming,从原理上来说,是通过receiver来进行数据接收的;接收到的数据,会被划分成一个一个的block;block会被组合成一个batch;针对一个batch,会创建一个rdd;启动一个job来执行我们定义的算子操作。

receiver主要接收到数据,那么就会立即将数据写入一份到容错文件系统(比如hdfs)上的checkpoint目录中的,一份写到磁盘文件中去;作为数据的冗余副本。

无论你的程序怎么挂掉,或者是数据丢失,那么数据都不肯能会永久性的丢失;因为肯定有副本。

WAL(Write-Ahead Log)预写日志机制

spark.streaming.receiver.writeAheadLog.enable true

  SparkConf conf = new SparkConf()
              .set("spark.streaming.receiver.writeAheadLog.enable", "true");

注意:本文归作者所有,未经作者允许,不得转载

全部评论: 0

    我有话说: