2016-01-20 13 views
14

L'RDD ha 512 partizioni di dimensioni uguali ed è memorizzato al 100% nella memoria su 512 esecutori.Perché l'attività Spark impiega molto tempo per trovare il blocco localmente?

Ho un lavoro filter-map-collect con 512 attività. A volte questo lavoro termina al secondo. In altre occasioni il 50% delle attività viene completato in un secondo inferiore, il 45% delle attività richiede 10 secondi e il 5% delle attività richiede 20 secondi.

Ecco il registro da un esecutore in cui il compito voluti 20 secondi:

15/12/16 09:44:37 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 5312 
15/12/16 09:44:37 INFO executor.Executor: Running task 215.0 in stage 17.0 (TID 5312) 
15/12/16 09:44:37 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 10 
15/12/16 09:44:37 INFO storage.MemoryStore: ensureFreeSpace(1777) called with curMem=908793307, maxMem=5927684014 
15/12/16 09:44:37 INFO storage.MemoryStore: Block broadcast_10_piece0 stored as bytes in memory (estimated size 1777.0 B, free 4.7 GB) 
15/12/16 09:44:37 INFO broadcast.TorrentBroadcast: Reading broadcast variable 10 took 186 ms 
15/12/16 09:44:37 INFO storage.MemoryStore: ensureFreeSpace(3272) called with curMem=908795084, maxMem=5927684014 
15/12/16 09:44:37 INFO storage.MemoryStore: Block broadcast_10 stored as values in memory (estimated size 3.2 KB, free 4.7 GB) 
15/12/16 09:44:57 INFO storage.BlockManager: Found block rdd_5_215 locally 
15/12/16 09:44:57 INFO executor.Executor: Finished task 215.0 in stage 17.0 (TID 5312). 2074 bytes result sent to driver 

Così appare la 20 secondi viene speso trovare il blocco locale. Guardando i registri per altre attività lente indica che sono tutti in ritardo per lo stesso motivo. La mia comprensione è che un blocco locale significa all'interno della stessa istanza JVM e quindi non capisco perché ci vuole così tanto tempo per individuarlo.

Poiché il ritardo è sempre o esattamente 10 secondi o esattamente 20 secondi, sospetto che sia dovuto a un timeout di 10 secondi su un ascoltatore o qualcosa del genere. Se ciò è vero, suppongo che le mie opzioni possano scoprire perché è scaduto e risolverlo o rendere il timeout più breve, in modo che provi più frequentemente.

Perché l'attività impiega così tanto tempo a trovare un blocco locale e come posso risolverlo?

Aggiornamento: Aggiunta del registro DEBUG per org.apache.spark.storage.

16/02/01 12:14:07 INFO CoarseGrainedExecutorBackend: Got assigned task 3029 
16/02/01 12:14:07 INFO Executor: Running task 115.0 in stage 9.0 (TID 3029) 
16/02/01 12:14:07 DEBUG Executor: Task 3029's epoch is 1 
16/02/01 12:14:07 DEBUG BlockManager: Getting local block broadcast_6 
16/02/01 12:14:07 DEBUG BlockManager: Block broadcast_6 not registered locally 
16/02/01 12:14:07 INFO TorrentBroadcast: Started reading broadcast variable 6 
16/02/01 12:14:07 DEBUG TorrentBroadcast: Reading piece broadcast_6_piece0 of broadcast_6 
16/02/01 12:14:07 DEBUG BlockManager: Getting local block broadcast_6_piece0 as bytes 
16/02/01 12:14:07 DEBUG BlockManager: Block broadcast_6_piece0 not registered locally 
16/02/01 12:14:07 DEBUG BlockManager: Getting remote block broadcast_6_piece0 as bytes 
16/02/01 12:14:07 DEBUG BlockManager: Getting remote block broadcast_6_piece0 from BlockManagerId(385, node1._.com, 54162) 
16/02/01 12:14:07 DEBUG TransportClient: Sending fetch chunk request 0 to node1._.com:54162 
16/02/01 12:14:07 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 2017.0 B, free 807.3 MB) 
16/02/01 12:14:07 DEBUG BlockManagerMaster: Updated info of block broadcast_6_piece0 
16/02/01 12:14:07 DEBUG BlockManager: Told master about block broadcast_6_piece0 
16/02/01 12:14:07 DEBUG BlockManager: Put block broadcast_6_piece0 locally took 2 ms 
16/02/01 12:14:07 DEBUG BlockManager: Putting block broadcast_6_piece0 without replication took 2 ms 
16/02/01 12:14:07 INFO TorrentBroadcast: Reading broadcast variable 6 took 87 ms 
16/02/01 12:14:07 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 3.6 KB, free 807.3 MB) 
16/02/01 12:14:07 DEBUG BlockManager: Put block broadcast_6 locally took 1 ms 
16/02/01 12:14:07 DEBUG BlockManager: Putting block broadcast_6 without replication took 1 ms 
16/02/01 12:14:17 DEBUG CacheManager: Looking for partition rdd_5_115 
16/02/01 12:14:17 DEBUG BlockManager: Getting local block rdd_5_115 
16/02/01 12:14:17 DEBUG BlockManager: Level for block rdd_5_115 is StorageLevel(false, true, false, true, 1) 
16/02/01 12:14:17 DEBUG BlockManagerSlaveEndpoint: removing broadcast 4 
16/02/01 12:14:17 DEBUG BlockManager: Getting block rdd_5_115 from memory 
16/02/01 12:14:17 DEBUG BlockManager: Removing broadcast 4 
16/02/01 12:14:17 INFO BlockManager: Found block rdd_5_115 locally 
16/02/01 12:14:17 DEBUG BlockManager: Removing block broadcast_4 
16/02/01 12:14:17 DEBUG MemoryStore: Block broadcast_4 of size 3680 dropped from memory (free 5092230668) 
16/02/01 12:14:17 DEBUG BlockManager: Removing block broadcast_4_piece0 
16/02/01 12:14:17 DEBUG MemoryStore: Block broadcast_4_piece0 of size 2017 dropped from memory (free 5092232685) 
16/02/01 12:14:17 DEBUG BlockManagerMaster: Updated info of block broadcast_4_piece0 
16/02/01 12:14:17 DEBUG BlockManager: Told master about block broadcast_4_piece0 
16/02/01 12:14:17 DEBUG BlockManagerSlaveEndpoint: Done removing broadcast 4, response is 2 
16/02/01 12:14:17 DEBUG BlockManagerSlaveEndpoint: Sent response: 2 to node2._.com:45115 
16/02/01 12:14:17 INFO Executor: Finished task 115.0 in stage 9.0 (TID 3029). 2164 bytes result sent to driver 
16/02/01 12:14:17 DEBUG BlockManagerSlaveEndpoint: removing broadcast 5 
16/02/01 12:14:17 DEBUG BlockManager: Removing broadcast 5 
16/02/01 12:14:17 DEBUG BlockManager: Removing block broadcast_5_piece0 
16/02/01 12:14:17 DEBUG MemoryStore: Block broadcast_5_piece0 of size 2017 dropped from memory (free 5092234702) 
16/02/01 12:14:17 DEBUG BlockManagerMaster: Updated info of block broadcast_5_piece0 
16/02/01 12:14:17 DEBUG BlockManager: Told master about block broadcast_5_piece0 
16/02/01 12:14:17 DEBUG BlockManager: Removing block broadcast_5 
16/02/01 12:14:17 DEBUG MemoryStore: Block broadcast_5 of size 3680 dropped from memory (free 5092238382) 
16/02/01 12:14:17 DEBUG BlockManagerSlaveEndpoint: Done removing broadcast 5, response is 2 
16/02/01 12:14:17 DEBUG BlockManagerSlaveEndpoint: Sent response: 2 to node2._.com:45115 
+1

Potresti accendere il livello DEBUG di registrazione sul pacchetto 'org.apache.spark.storage' e condividere i risultati? Ho controllato il codice di BlockManager e ci sono molte cose che potrebbero accadere durante il metodo 'doGetLocal' e ci sono voci di registro del livello di debug che potrebbero aiutare a capire cosa sta facendo esattamente. BTW, 'Trovato blocco rdd_5_215 localmente 'significa che lo ha trovato nel BlockManager locale (non in quello remoto), ma potrebbe ricevere blocchi dalla memoria o dal disco o da una memoria esterna. –

+0

Grazie a @AlexLarikov, ho aggiunto il registro DEBUG. Quando guardo l'interfaccia utente web di Spark, mi dice che l'RDD è memorizzato al 100% nella cache. Stando così le cose, è ancora ragionevole che Spark recuperi un blocco dal disco? – user2179977

risposta

0

L'unica cosa che sembra di stare fuori per me è che si ha la replicazione attivato tramite il vostro livello di storage StorageLevel(false, true, false, true, 1)

Dal momento che si dispone di 512 partizioni in tutto 512 esecutori si può replicare i blocchi attraverso ogni esecutore , che può causare quel rallentamento alla fine. Cercherò di spegnere la replica e vedere cosa fa per la performance.

0

Quanti nuclei totali si stanno assegnando all'applicazione Spark? Questo potrebbe accadere se stai allocando 256 core e se il valore per spark.locality.wait è 10.

Non conosco il tuo ambiente ma sembra che tu abbia troppi esecutori. Hanno solo pochi esecutori (a seconda della potenza dei nodi di calcolo) e dispongono di più core disponibili per ciascun executor. In breve, invece di avere molti processi con 1 thread ciascuno, hanno alcuni processi con molti thread ciascuno.

Problemi correlati