【推荐】2019 Java 开发者跳槽指南.pdf(吐血整理) >>>
1、OffsetOutOfRangeException: Offsets out of range
-
1.1、错误分析 ==> kakfa集群定时删除过期数据导致最早offset往后推移 造成这种结果有如下情况:
->消费跟不上,但是kafka在不断地删除过期数据 ->程序挂了好久,然后在启动去消费,上次记录的offset已经远小于目前最小的offset。 ->程序中有window操作,如下图:
-
1.2、解决方法
->消费跟不上 ==> 增加partition数,以及executor ->挂了好久才启动,这时候程序要自己管理offset,重新计算offset或者换groupID ->window这个一般不是问题,只要前面两个解决了,消费跟上来,就不会出现这种情况。 还有可能是如下情况 如果消息体太大了,超过 fetch.message.max.bytes=1m,那么Spark Streaming会直接抛出OffsetOutOfRangeException异常,然后停止服务。
2、yarn的applicationMaster和resourceManager失去联系。
参考链接:https://www.jishux.com/p/6f78aac7ff7d8f1f
-
2.1、错误分析
一般只有cluster模式才会有这种情况,因为cluster模式下,ApplicationMaster和Driver在同一个Executor进程中运行,这个executor的cpu核数就是--driver.cores的值,默认是1,由于driver负责DAGSchudle调度,接受心跳等等,需要占用很多cpu资源,从而导致ApplicationMaster无法正常和resourceManager保持心跳,默认3S没有心跳之后,ResourceManager就会删除appattempxxxId,导致报错
而client模式下,不会出现这种情况,因为此模式下:Driver在启动机器这端,和ApplicationMaster分离,不影响ApplicationMaster的工作。
-
2.2、解决方法
--driver-cores 设置为2或者3
3、suffle之后,reduce无法从上一级拉取数据
-
3.1、错误分析
一般不会是网络问题,因为都在局域网。很有可能是suffle之后,数据量太大,频繁GC导致终端响应,导致socket连接拒绝或者断开,可以查看executors
-
3.2、解决方法
1、控制sparkStreaming消费的速率 // 每秒钟,每个分区的速度,如果是20S一个批次,则每个分区6000条 conf.set("spark.streaming.kafka.maxRatePerPartition", "300"); 2、增大topic的分区数,这样可以增加executor并行度,因为一个分区只能给一个consumer消费,也就是只能给一个executor处理,多少个分区,最多多少个并行执行executor。 kafka-topics --zookeeper zk01:2181/kafka --alter --topic utopic --partitions 15 3、partitions数增加后,可以增加executor数量,增强处理能力 --executor-num 10 4、如果有reduceByKey操作,那么reduceNum最好设置为executors数量,不能随便设置为两个,否则这两个Excutor压力会非常大,GC更加严重。 rdd.reduceByKey(new Function2(){}, reduceNum);
4、多少个partition,就只能有多少个executor并行执行
5、动态控制消费速度的坑(业务中有window)
sparkConf设置spark.streaming.backpressure.enabled = true,可以开启动态控制消费速度。他会根据这次的数据量看看处理速度,来动态调整数据量。
如果业务代码只有一种的话,那么没有问题,比如没有shuffle,都是流式串行的代码,处理很快。但是如果有window,并且window之后还做suffle的话,那就有问题了。因为window一般都会跨几个duration的,这几个duration的数据量可以有几亿条,那就出问题了。
6、SparkStreaming在Executor中缓存kafkaConsumer导致KafkaConsumer is not safe for multi-threaded access异常问题 ==> spark-19185
问题分析:单个kafkaConsumer链接不能给多线程并发操作,否则会出现上面异常。而executor为了减少kafka的链接,缓存了kafkaConsumer。
解决办法:不缓存 :conf.set("spark.streaming.kafka.consumer.cache.enabled", "false");
注意:kafkaConsumer不能并发访问,但是producer可以
7、关于kafkaConsumer协调器问题
-
日志中频繁出现Revoking previously assigned partitions,几分钟后又出现Successfully joined group,如下:
AppInfoParser: Kafka version : 0.10.1.2.6.4.0-91 AppInfoParser: Kafka commitId : ba0af6800a08d2f8 AbstractCoordinator: Discovered coordinator kafka-rzx2.bigdata.com:6667 (id: 2147482644 rack: null) for group topic_streaming5. ConsumerCoordinator: Revoking previously assigned partitions [] for group topic_streaming5 AbstractCoordinator: (Re-)joining group topic_streaming5 AbstractCoordinator: Successfully joined group topic_streaming5 with generation 3 ConsumerCoordinator: Setting newly assigned partitions [wl_002-1, wl_002-0, wl_002-2] for group topic_streaming5
-
问题分析:
这是由于groupCoordinator检测到有Consumer不正常,做了rebalance,所以才会频繁重新发现GroupCoordinator,然后加入组,最后获取负责的分区
-
如下几个参数是kafkaConsumer创建时的参数
heartbeat.interval.ms 心跳间隔。心跳是在 consumer 与 coordinator 之间进行的。心跳是确定 consumer 存活,加入或者退出 group 的有效手段。 这个值必须设置的小于 session.timeout.ms,因为当 consumer 由于某种原因不能发 heartbeat 到 coordinator 时,并且时间超过 session.timeout.ms 时,就会认为该 consumer 已退出,它所订阅的 partition 会分配到同一 group 内的其它的 consumer 上。 默认值:3000 (3s),通常设置的值要低于session.timeout.ms的1/3。 session.timeout.ms consumer session 过期时间。如果超时时间范围内,没有收到消费者的心跳,broker 会把这个消费者置为失效,并触发消费者负载均衡。因为只有在调用 poll 方法时才会发送心跳,更大的 session 超时时间允许消费者在 poll 循环周期内处理消息内容,尽管这会有花费更长时间检测失效的代价。如果想控制消费者处理消息的时间, 默认值:10000 (10s),这个值必须设置在 broker configuration 中的 group.min.session.timeout.ms 与 group.max.session.timeout.ms 之间。 max.poll.interval.ms 参数设置大一点可以增加两次 poll 之间处理消息的时间。 当 consumer 一切正常(也就是保持着 heartbeat ),且参数的值小于消息处理的时长,会导致 consumer leave group 然后又 rejoin group,触发无谓的 group balance,出现 consumer livelock 现象。 但如果设置的太大,会延迟 group rebalance,因为消费者只会在调用 poll 时加入rebalance。 max.poll.records ● max.poll.interval.ms=5min ● max.poll.records=500 即平均 600ms 要处理完一条消息,如果消息的消费时间高于 600ms,则一定要调整 max.poll.records 或 max.poll.interval.ms。
8、kerberos服务器域找不到
-
问题分析:
这个主要是driver和executor这些容器进程的系统参数没有设置krb5.conf这个文件,导致没法找到kerberos服务器
-
解决办法:
spark-submit启动时设置driver和executor的krb5.conf和jaas.conf参数
--conf spark.driver.extraJavaOptions="-Djava.security.krb5.conf=./krb5.conf -Djava.security.auth.login.config=./jaas.conf" \ --conf spark.executor.extraJavaOptions="-Djava.security.krb5.conf=./krb5.conf -Djava.security.auth.login.config=./jaas.conf" \
注意:本文归作者所有,未经作者允许,不得转载