11.11大促主会场
新人页面
精选商品
首月0月租体验,领12个月京东PLUS
自营热卖

11、Spark线上常见错误

伴君幽独 1年前   阅读数 191 0

【推荐】2019 Java 开发者跳槽指南.pdf(吐血整理) >>>

1、OffsetOutOfRangeException: Offsets out of range

  • 1.1、错误分析 ==> kakfa集群定时删除过期数据导致最早offset往后推移 造成这种结果有如下情况:

      ->消费跟不上,但是kafka在不断地删除过期数据
      ->程序挂了好久,然后在启动去消费,上次记录的offset已经远小于目前最小的offset。
      ->程序中有window操作,如下图:
    

  • 1.2、解决方法

      ->消费跟不上 ==> 增加partition数,以及executor
      ->挂了好久才启动,这时候程序要自己管理offset,重新计算offset或者换groupID
      ->window这个一般不是问题,只要前面两个解决了,消费跟上来,就不会出现这种情况。
    
      还有可能是如下情况
      如果消息体太大了,超过 fetch.message.max.bytes=1m,那么Spark Streaming会直接抛出OffsetOutOfRangeException异常,然后停止服务。
    

2、yarn的applicationMaster和resourceManager失去联系。

参考链接:https://www.jishux.com/p/6f78aac7ff7d8f1f

  • 2.1、错误分析

    一般只有cluster模式才会有这种情况,因为cluster模式下,ApplicationMaster和Driver在同一个Executor进程中运行,这个executor的cpu核数就是--driver.cores的值,默认是1,由于driver负责DAGSchudle调度,接受心跳等等,需要占用很多cpu资源,从而导致ApplicationMaster无法正常和resourceManager保持心跳,默认3S没有心跳之后,ResourceManager就会删除appattempxxxId,导致报错

    而client模式下,不会出现这种情况,因为此模式下:Driver在启动机器这端,和ApplicationMaster分离,不影响ApplicationMaster的工作。

  • 2.2、解决方法

      --driver-cores 设置为2或者3
    

3、suffle之后,reduce无法从上一级拉取数据

  • 3.1、错误分析

    一般不会是网络问题,因为都在局域网。很有可能是suffle之后,数据量太大,频繁GC导致终端响应,导致socket连接拒绝或者断开,可以查看executors

  • 3.2、解决方法

      1、控制sparkStreaming消费的速率
      // 每秒钟,每个分区的速度,如果是20S一个批次,则每个分区6000条
      conf.set("spark.streaming.kafka.maxRatePerPartition", "300");
      2、增大topic的分区数,这样可以增加executor并行度,因为一个分区只能给一个consumer消费,也就是只能给一个executor处理,多少个分区,最多多少个并行执行executor。
      kafka-topics --zookeeper zk01:2181/kafka --alter --topic   utopic --partitions 15 
    
      3、partitions数增加后,可以增加executor数量,增强处理能力
      --executor-num  10
    
      4、如果有reduceByKey操作,那么reduceNum最好设置为executors数量,不能随便设置为两个,否则这两个Excutor压力会非常大,GC更加严重。
    
      rdd.reduceByKey(new Function2(){},      reduceNum);
    

4、多少个partition,就只能有多少个executor并行执行

5、动态控制消费速度的坑(业务中有window)

sparkConf设置spark.streaming.backpressure.enabled = true,可以开启动态控制消费速度。他会根据这次的数据量看看处理速度,来动态调整数据量。

如果业务代码只有一种的话,那么没有问题,比如没有shuffle,都是流式串行的代码,处理很快。但是如果有window,并且window之后还做suffle的话,那就有问题了。因为window一般都会跨几个duration的,这几个duration的数据量可以有几亿条,那就出问题了。

6、SparkStreaming在Executor中缓存kafkaConsumer导致KafkaConsumer is not safe for multi-threaded access异常问题 ==> spark-19185

问题分析:单个kafkaConsumer链接不能给多线程并发操作,否则会出现上面异常。而executor为了减少kafka的链接,缓存了kafkaConsumer。

解决办法:不缓存 :conf.set("spark.streaming.kafka.consumer.cache.enabled", "false");

注意:kafkaConsumer不能并发访问,但是producer可以

7、关于kafkaConsumer协调器问题

  • 日志中频繁出现Revoking previously assigned partitions,几分钟后又出现Successfully joined group,如下:

      AppInfoParser: Kafka version : 0.10.1.2.6.4.0-91
      AppInfoParser: Kafka commitId : ba0af6800a08d2f8
      AbstractCoordinator: Discovered coordinator kafka-rzx2.bigdata.com:6667 (id: 2147482644 rack: null) for group topic_streaming5.
      ConsumerCoordinator: Revoking previously assigned partitions [] for group topic_streaming5
      AbstractCoordinator: (Re-)joining group topic_streaming5
      AbstractCoordinator: Successfully joined group topic_streaming5 with generation 3
      ConsumerCoordinator: Setting newly assigned partitions [wl_002-1, wl_002-0, wl_002-2] for group topic_streaming5
    
  • 问题分析:

    这是由于groupCoordinator检测到有Consumer不正常,做了rebalance,所以才会频繁重新发现GroupCoordinator,然后加入组,最后获取负责的分区

  • 如下几个参数是kafkaConsumer创建时的参数

      heartbeat.interval.ms
      心跳间隔。心跳是在 consumer 与 coordinator 之间进行的。心跳是确定 consumer 存活,加入或者退出 group 的有效手段。
      这个值必须设置的小于 session.timeout.ms,因为当 consumer 由于某种原因不能发 heartbeat 到 coordinator 时,并且时间超过 session.timeout.ms 时,就会认为该 consumer 已退出,它所订阅的 partition 会分配到同一 group 内的其它的 consumer 上。
      默认值:3000 (3s),通常设置的值要低于session.timeout.ms的1/3。
    
      session.timeout.ms
      consumer session 过期时间。如果超时时间范围内,没有收到消费者的心跳,broker 会把这个消费者置为失效,并触发消费者负载均衡。因为只有在调用 poll 方法时才会发送心跳,更大的 session 超时时间允许消费者在 poll 循环周期内处理消息内容,尽管这会有花费更长时间检测失效的代价。如果想控制消费者处理消息的时间,
      默认值:10000 (10s),这个值必须设置在 broker configuration 中的 group.min.session.timeout.ms 与 group.max.session.timeout.ms 之间。
    
      max.poll.interval.ms
      参数设置大一点可以增加两次 poll 之间处理消息的时间。
      当 consumer 一切正常(也就是保持着 heartbeat ),且参数的值小于消息处理的时长,会导致 consumer leave group 然后又 rejoin group,触发无谓的 group balance,出现 consumer livelock 现象。
      但如果设置的太大,会延迟 group rebalance,因为消费者只会在调用 poll 时加入rebalance。
    
      max.poll.records
        ● max.poll.interval.ms=5min
        ● max.poll.records=500
      即平均 600ms 要处理完一条消息,如果消息的消费时间高于 600ms,则一定要调整 max.poll.records 或 max.poll.interval.ms。
    

8、kerberos服务器域找不到

  • 问题分析:

    这个主要是driver和executor这些容器进程的系统参数没有设置krb5.conf这个文件,导致没法找到kerberos服务器

  • 解决办法:

    spark-submit启动时设置driver和executor的krb5.conf和jaas.conf参数

      --conf spark.driver.extraJavaOptions="-Djava.security.krb5.conf=./krb5.conf -Djava.security.auth.login.config=./jaas.conf" \
      --conf spark.executor.extraJavaOptions="-Djava.security.krb5.conf=./krb5.conf -Djava.security.auth.login.config=./jaas.conf" \
    

注意:本文归作者所有,未经作者允许,不得转载

全部评论: 0

    我有话说: