2016-03-07 9 views
7

Stiamo utilizzando Spark Streaming 1.6.0 in esecuzione su AWS EMR 4.3.x, che utilizza i dati da un flusso Kinesis. Utilizzato per funzionare correttamente in Spark 1.3.1 Dopo la migrazione, non siamo in grado di sopportare il carico a lungo. Ganglia mostra che la memoria utilizzata dal cluster continua a crescere fino a che non viene raggiunto un limite senza un GC. Successivamente ci sono molti micro-lotti molto lunghi (in termini di dozzine di minuti invece di diversi secondi). E poi Spark inizia a uccidere e rimbalzare gli esecutori (fatto più e più volte),Spark streaming 1.6.0 - Esecutori che rimbalzano

Fondamentalmente il cluster diventa inutilizzabile. Il problema è riproducibile sotto carico, di volta in volta. Quale potrebbe essere il motivo per cui Spark non riesce a GC senza uccidere gli esecutori? Come possiamo far funzionare il cluster per settimane (al momento non è possibile eseguirlo per ore)

Qualsiasi input è benvenuto.

Stiamo utilizzando le seguenti definizioni quando si definisce un lavoro:

sparkConf.set("spark.shuffle.consolidateFiles", "true"); 
sparkConf.set("spark.storage.memoryFraction", "0.5"); 
sparkConf.set("spark.streaming.backpressure.enabled", "true"); 
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); 

Facendo un'unione di

KinesisUtils.createStream(streamingContext, appName, 
         kinesisStreamName, kinesisEndpoint, awsRegionName, initialPositionInStream, checkpointInterval, 
         StorageLevel.MEMORY_AND_DISK_2()); 

Ho spogliato la nostra applicazione ad uno scheletro nudo per le prove. Kept mappa dal flusso di byte al flusso di stringhe, quindi convertire in oggetti, filtrare gli eventi non pertinenti, quindi persistere e memorizzare in S3.

eventStream = eventStream.persist (StorageLevel.MEMORY_AND_DISK_SER_2());

eventStream = eventStream.repartition (configuration.getSparkOutputPartitions()); eventStream.foreachRDD (new RddByPartitionSaverFunction <> (new OutputToS3Function()));

lavoro Spark è inviata con la seguente configurazione (copiata con variazione delle dimensioni della memoria dalla configurazione Spark predefinito):

spark.executor.extraJavaOptions -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p' -XX:PermSize=256M -XX:MaxPermSize=256M 
spark.driver.extraJavaOptions -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p' -XX:PermSize=512M -XX:MaxPermSize=512M 

eccezioni Aggiunta. 1-st grappolo

16/03/06 13:54:52 WARN BlockManagerMaster: Failed to remove broadcast 1327 with removeFromMaster = true - Cannot receive any reply in 120 seconds. This timeout is controlled by spark.rpc.askTimeout 
org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 120 seconds. This timeout is controlled by spark.rpc.askTimeout 
     at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48) 
     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63) 
     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) 
     at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) 
     at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185) 
     at scala.util.Try$.apply(Try.scala:161) 
     at scala.util.Failure.recover(Try.scala:185) 
     at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) 
     at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) 
     at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) 
     at org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293) 
     at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133) 
     at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) 
     at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) 
     at scala.concurrent.Promise$class.complete(Promise.scala:55) 
     at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153) 
     at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) 
     at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) 
     at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) 
     at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.processBatch$1(Future.scala:643) 
     at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply$mcV$sp(Future.scala:658) 
     at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply(Future.scala:635) 
     at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply(Future.scala:635) 
     at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) 
     at scala.concurrent.Future$InternalCallbackExecutor$Batch.run(Future.scala:634) 
     at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) 
     at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:685) 
     at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) 
     at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) 
     at scala.concurrent.Promise$class.tryFailure(Promise.scala:112) 
     at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:153) 
     at org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:241) 
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) 
     at java.util.concurrent.FutureTask.run(FutureTask.java:262) 
     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178) 
     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     at java.lang.Thread.run(Thread.java:745) 
Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply in 120 seconds 
     at org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:242) 
     ... 7 more 
16/03/06 13:54:52 ERROR ContextCleaner: Error cleaning broadcast 1327 
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout 
     at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48) 
     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63) 
     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) 
     at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) 
     at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76) 
     at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:136) 
     at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:228) 
     at org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45) 
     at org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:67) 
     at org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:233) 
     at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:189) 
     at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:180) 
     at scala.Option.foreach(Option.scala:236) 
     at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:180) 
     at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1180) 
     at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:173) 
     at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:68) 
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds] 
     at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) 
     at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) 
     at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) 
     at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) 
     at scala.concurrent.Await$.result(package.scala:107) 
     at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) 
     ... 12 more 

16/03/06 13:55:04 ERROR YarnClusterScheduler: Lost executor 6 on ip-***-194.ec2.internal: Container killed by YARN for exceeding memory limits. 11.3 GB of 11.3 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. 
1 
16/03/06 13:55:10 ERROR YarnClusterScheduler: Lost executor 1 on ip-***-193.ec2.internal: Container killed by YARN for exceeding memory limits. 11.3 GB of 11.3 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. 

Secondo tentativo cluster:

16/03/07 14:24:38 ERROR server.TransportChannelHandler: Connection to ip-***-22.ec2.internal/N.N.N.22:40791 has been quiet for 120000 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network.timeout if this is wrong. 
16/03/07 14:24:38 ERROR client.TransportResponseHandler: Still have 12 requests outstanding when connection from ip-***-22.ec2.internal/N.N.N.22:40791 is closed 
16/03/07 14:24:38 ERROR netty.NettyBlockTransferService: Error while uploading block input-47-1457357970366 
java.io.IOException: Connection from ip-***-22.ec2.internal/N.N.N.22:40791 closed 
     at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124) 
     at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739) 
     at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659) 
     at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) 
     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) 
     at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) 
     at java.lang.Thread.run(Thread.java:745) 
16/03/07 14:24:38 ERROR netty.NettyBlockTransferService: Error while uploading block input-15-1457357969730 
java.io.IOException: Connection from ip-***-22.ec2.internal/N.N.N.22:40791 closed 
     at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124) 
     at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     a.io.IOException: Connection from ip-***-22.ec2.internal/N.N.N.22:40791 closed 
     at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124) 
     at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739) 
     at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659) 
     at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) 
     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) 
     at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) 
     at java.lang.Thread.run(Thread.java:745) 
t io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739) 
     at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659) 
     at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) 
     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) 
     at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) 
     at java.lang.Thread.run(Thread.java:745) 

Grazie in anticipo ...

+0

Hai trovato una soluzione o il problema? –

+0

Ciao. No, non abbiamo ancora trovato una soluzione. Il problema è riproducibile solo con un gran numero di frammenti di Kinesis (120). SOLO allora gli esecutori iniziano a rimbalzare. Idee? Grazie – visitor

+0

Il boia di Executor è stato interrotto in Spark 2.0.0 (EMR 5.0.0). Hai un nuovo problema che impedisce una lunga esecuzione della stessa applicazione: http: // StackOverflow.com/domande/39289345/scintilla-streaming-2-0-0-blocca-dopo-diversi-giorni-under-carico – visitor

risposta

-2

faccio ottenere qualcosa nella stessa vena .. Quando un micro lotti sono voluti più di 120 secondi per completare, ha sparato:

16/03/14 22:57:30 INFO SparkStreaming$: Batch size: 2500, total read: 4287800 
16/03/14 22:57:35 INFO SparkStreaming$: Batch size: 2500, total read: 4290300 
16/03/14 22:57:42 INFO SparkStreaming$: Batch size: 2500, total read: 4292800 
16/03/14 22:57:45 INFO SparkStreaming$: Batch size: 2500, total read: 4295300 
16/03/14 22:59:45 ERROR ContextCleaner: Error cleaning broadcast 11251 
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout 
     at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48) 
     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63) 
     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) 
     at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) 
     at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76) 
     at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:136) 
     at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:228) 
     at org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45) 
     at org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:67) 
     at org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:233) 
     at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:189) 
     at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:180) 
     at scala.Option.foreach(Option.scala:236) 
     at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:180) 
     at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1180) 
     at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:173) 
     at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:68) 
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds] 
     at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) 
     at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) 
     at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) 
     at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) 
     at scala.concurrent.Await$.result(package.scala:107) 
     at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) 
     ... 12 more 

Sono in esecuzione in modalità locale e consumo f rom Kinesis. Non sto nemmeno utilizzando alcuna variabile di trasmissione.

Problemi correlati