Spark异常处理:java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD

相关阅读:

Spark异常处理:java.lang.NoSuchMethodError: io.netty.channel.DefaultFileRegion.(Ljava/io/File;JJ)V

Spark异常处理:java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateParser; local class incompatible: stream classdesc serialVersionUID = 2, local class serialVersionUID = 3


环境介绍:

系统:Centos 7.2

Zeppelin:0.8.0

Spark:apache 2.1.0

问题描述:

spark on zeppelin执行notebooke时,发生如下异常:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 192.168.1.47, executor 0): 
java.lang.ClassCastException: 
cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
	at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
	at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2251)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
	at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1353)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.take(RDD.scala:1326)
	at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1367)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.first(RDD.scala:1366)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.findFirstLine(CSVFileFormat.scala:206)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.inferSchema(CSVFileFormat.scala:60)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:184)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:184)
	at scala.Option.orElse(Option.scala:289)
	at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$getOrInferFileFormatSchema(DataSource.scala:183)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:387)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
	at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:415)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.zeppelin.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at org.apache.zeppelin.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at org.apache.zeppelin.py4j.Gateway.invoke(Gateway.java:290)
	at org.apache.zeppelin.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at org.apache.zeppelin.py4j.commands.CallCommand.execute(CallCommand.java:79)
	at org.apache.zeppelin.py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Thread.java:748)
Caused by: 
java.lang.ClassCastException: 
cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
	at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
	at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2251)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more

主要的异常是:

java.lang.ClassCastException: 
cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD

异常分析:

这个异常是反序列化中出现的问题:

1.类加载器无法找到列表中的对象类

2.Java反序列化所使用的类加载器与加载列表中的对象类的类加载程序不同

异常处理:

检查下列事项:

1.确保加载在driver程序上的Jar包,在executor的中也包含

2.确保spark提供的JAR不包含在应用程序中(以避免使用不同的类加载器加载)

思路:

1.先判断spark submit是否为driver添加额外的lib,如果存在该参数,同样,为executor也添加相同的额外lib

2.检查对应spark jar包是否在application中加载

根据思路,先查看zeppelin提交spark submit命令,命令如下:

/opt/jdk1.8.0_144/bin/java -cp /data/software/zeppelinauto/interpreter/spark/*:/data/software/zeppelinauto/lib/interpreter/*:./:/opt/jdk1.8.0_144/lib/:/opt/jdk1.8.0_144/jre/lib/:/data/software/zeppelinauto/lib/*:/data/software/zeppelinauto/*:/data/software/zeppelinauto/conf/:/data/software/spark-2.1.0/hadoop/:/data/software/zeppelinauto/interpreter/spark/spark-interpreter-0.8.0-SNAPSHOT.jar:/data/software/spark-2.1.0//conf/:/data/software/spark-2.1.0/jars/* 
-Xmx1024M 
-Dfile.encoding=UTF-8 -Dlog4j.configuration=file:///data/software/zeppelinauto/conf/log4j.properties 
-Dzeppelin.log.file=/data/software/zeppelinauto/logs/zeppelin-interpreter-spark-zeppelinauto-uat-sql.insightcredit.log
org.apache.spark.deploy.SparkSubmit 
--master spark://192.168.1.45:7077 
--conf spark.executor.memory=1024M 
--conf spark.num.executors=1 
--conf spark.driver.memory=1024M 
--conf spark.files.overwrite=true --conf spark.driver.allowMultipleContexts=true  
--conf spark.yarn.dist.archives=/data/software/spark-2.1.0/R/lib/sparkr.zip#sparkr 
--conf spark.driver.extraClassPath=:/data/software/zeppelinauto/interpreter/spark/*:/data/software/zeppelinauto/lib/interpreter/*:.:/opt/jdk1.8.0_144/lib:/opt/jdk1.8.0_144/jre/lib:/data/software/zeppelinauto/lib/interpreter/*:/data/software/zeppelinauto/lib/*:/data/software/zeppelinauto/*::/data/software/zeppelinauto/conf:/data/software/spark-2.1.0/hadoop:/data/software/zeppelinauto/interpreter/spark/spark-interpreter-0.8.0-SNAPSHOT.jar:/data/software/spark-2.1.0/hadoop 
--conf spark.app.name=zeppelinauto 
--conf spark.cores.max=1 
--conf spark.driver.extraJavaOptions= -Dfile.encoding=UTF-8 -Dlog4j.configuration=file:///data/software/zeppelinauto/conf/log4j.properties -Dzeppelin.log.file=/data/software/zeppelinauto/logs/zeppelin-interpreter-spark-zeppelinauto-uat-sql.insightcredit.log 
--class org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer /data/software/zeppelinauto/interpreter/spark/spark-interpreter-0.8.0-SNAPSHOT.jar

对zeppelin提交spark submit的命令分析,可以得到如下信息:

(1).zeppelin提交spark任务使用的lib包,都是zeppelin自带interpreter解释器的lib包,没有添加spark的lib包,但是在程序运行启动任务时,是提交到spark集群的,启动的executor添加的相关依赖是spark的lib,而不是zeppelin的lib

(2).在提交时添加了额外的drvier依赖包:–conf spark.driver.extraClassPath添加的依赖也是zeppelin自带的interpreter解释器相关lib包

(3).调用的主方法是org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer,作为spark程序的入口

异常处理:

在zeppelin web界面,为spark解释器添加如下配置:

spark.executor.extraClassPath为executor添加与spark.driver.extraClassPath一样的lib,让driver环境与executor的运行环境相同

注:

具体添加的lib目录,请根据实际情况修改,这里仅为示例


修改后重启spark解释器,异常解决


亲,看完了点个赞呗!

赫墨拉

我是一个喜爱大数据的小菜鸡,这里是我分享我的成长和经历的博客

发表评论

邮箱地址不会被公开。