L'RDD ha 512 partizioni di dimensioni uguali ed è memorizzato al 100% nella memoria su 512 esecutori.Perché l'attività Spark impiega molto tempo per trovare il blocco localmente?
Ho un lavoro filter-map-collect con 512 attività. A volte questo lavoro termina al secondo. In altre occasioni il 50% delle attività viene completato in un secondo inferiore, il 45% delle attività richiede 10 secondi e il 5% delle attività richiede 20 secondi.
Ecco il registro da un esecutore in cui il compito voluti 20 secondi:
15/12/16 09:44:37 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 5312
15/12/16 09:44:37 INFO executor.Executor: Running task 215.0 in stage 17.0 (TID 5312)
15/12/16 09:44:37 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 10
15/12/16 09:44:37 INFO storage.MemoryStore: ensureFreeSpace(1777) called with curMem=908793307, maxMem=5927684014
15/12/16 09:44:37 INFO storage.MemoryStore: Block broadcast_10_piece0 stored as bytes in memory (estimated size 1777.0 B, free 4.7 GB)
15/12/16 09:44:37 INFO broadcast.TorrentBroadcast: Reading broadcast variable 10 took 186 ms
15/12/16 09:44:37 INFO storage.MemoryStore: ensureFreeSpace(3272) called with curMem=908795084, maxMem=5927684014
15/12/16 09:44:37 INFO storage.MemoryStore: Block broadcast_10 stored as values in memory (estimated size 3.2 KB, free 4.7 GB)
15/12/16 09:44:57 INFO storage.BlockManager: Found block rdd_5_215 locally
15/12/16 09:44:57 INFO executor.Executor: Finished task 215.0 in stage 17.0 (TID 5312). 2074 bytes result sent to driver
Così appare la 20 secondi viene speso trovare il blocco locale. Guardando i registri per altre attività lente indica che sono tutti in ritardo per lo stesso motivo. La mia comprensione è che un blocco locale significa all'interno della stessa istanza JVM e quindi non capisco perché ci vuole così tanto tempo per individuarlo.
Poiché il ritardo è sempre o esattamente 10 secondi o esattamente 20 secondi, sospetto che sia dovuto a un timeout di 10 secondi su un ascoltatore o qualcosa del genere. Se ciò è vero, suppongo che le mie opzioni possano scoprire perché è scaduto e risolverlo o rendere il timeout più breve, in modo che provi più frequentemente.
Perché l'attività impiega così tanto tempo a trovare un blocco locale e come posso risolverlo?
Aggiornamento: Aggiunta del registro DEBUG per org.apache.spark.storage
.
16/02/01 12:14:07 INFO CoarseGrainedExecutorBackend: Got assigned task 3029
16/02/01 12:14:07 INFO Executor: Running task 115.0 in stage 9.0 (TID 3029)
16/02/01 12:14:07 DEBUG Executor: Task 3029's epoch is 1
16/02/01 12:14:07 DEBUG BlockManager: Getting local block broadcast_6
16/02/01 12:14:07 DEBUG BlockManager: Block broadcast_6 not registered locally
16/02/01 12:14:07 INFO TorrentBroadcast: Started reading broadcast variable 6
16/02/01 12:14:07 DEBUG TorrentBroadcast: Reading piece broadcast_6_piece0 of broadcast_6
16/02/01 12:14:07 DEBUG BlockManager: Getting local block broadcast_6_piece0 as bytes
16/02/01 12:14:07 DEBUG BlockManager: Block broadcast_6_piece0 not registered locally
16/02/01 12:14:07 DEBUG BlockManager: Getting remote block broadcast_6_piece0 as bytes
16/02/01 12:14:07 DEBUG BlockManager: Getting remote block broadcast_6_piece0 from BlockManagerId(385, node1._.com, 54162)
16/02/01 12:14:07 DEBUG TransportClient: Sending fetch chunk request 0 to node1._.com:54162
16/02/01 12:14:07 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 2017.0 B, free 807.3 MB)
16/02/01 12:14:07 DEBUG BlockManagerMaster: Updated info of block broadcast_6_piece0
16/02/01 12:14:07 DEBUG BlockManager: Told master about block broadcast_6_piece0
16/02/01 12:14:07 DEBUG BlockManager: Put block broadcast_6_piece0 locally took 2 ms
16/02/01 12:14:07 DEBUG BlockManager: Putting block broadcast_6_piece0 without replication took 2 ms
16/02/01 12:14:07 INFO TorrentBroadcast: Reading broadcast variable 6 took 87 ms
16/02/01 12:14:07 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 3.6 KB, free 807.3 MB)
16/02/01 12:14:07 DEBUG BlockManager: Put block broadcast_6 locally took 1 ms
16/02/01 12:14:07 DEBUG BlockManager: Putting block broadcast_6 without replication took 1 ms
16/02/01 12:14:17 DEBUG CacheManager: Looking for partition rdd_5_115
16/02/01 12:14:17 DEBUG BlockManager: Getting local block rdd_5_115
16/02/01 12:14:17 DEBUG BlockManager: Level for block rdd_5_115 is StorageLevel(false, true, false, true, 1)
16/02/01 12:14:17 DEBUG BlockManagerSlaveEndpoint: removing broadcast 4
16/02/01 12:14:17 DEBUG BlockManager: Getting block rdd_5_115 from memory
16/02/01 12:14:17 DEBUG BlockManager: Removing broadcast 4
16/02/01 12:14:17 INFO BlockManager: Found block rdd_5_115 locally
16/02/01 12:14:17 DEBUG BlockManager: Removing block broadcast_4
16/02/01 12:14:17 DEBUG MemoryStore: Block broadcast_4 of size 3680 dropped from memory (free 5092230668)
16/02/01 12:14:17 DEBUG BlockManager: Removing block broadcast_4_piece0
16/02/01 12:14:17 DEBUG MemoryStore: Block broadcast_4_piece0 of size 2017 dropped from memory (free 5092232685)
16/02/01 12:14:17 DEBUG BlockManagerMaster: Updated info of block broadcast_4_piece0
16/02/01 12:14:17 DEBUG BlockManager: Told master about block broadcast_4_piece0
16/02/01 12:14:17 DEBUG BlockManagerSlaveEndpoint: Done removing broadcast 4, response is 2
16/02/01 12:14:17 DEBUG BlockManagerSlaveEndpoint: Sent response: 2 to node2._.com:45115
16/02/01 12:14:17 INFO Executor: Finished task 115.0 in stage 9.0 (TID 3029). 2164 bytes result sent to driver
16/02/01 12:14:17 DEBUG BlockManagerSlaveEndpoint: removing broadcast 5
16/02/01 12:14:17 DEBUG BlockManager: Removing broadcast 5
16/02/01 12:14:17 DEBUG BlockManager: Removing block broadcast_5_piece0
16/02/01 12:14:17 DEBUG MemoryStore: Block broadcast_5_piece0 of size 2017 dropped from memory (free 5092234702)
16/02/01 12:14:17 DEBUG BlockManagerMaster: Updated info of block broadcast_5_piece0
16/02/01 12:14:17 DEBUG BlockManager: Told master about block broadcast_5_piece0
16/02/01 12:14:17 DEBUG BlockManager: Removing block broadcast_5
16/02/01 12:14:17 DEBUG MemoryStore: Block broadcast_5 of size 3680 dropped from memory (free 5092238382)
16/02/01 12:14:17 DEBUG BlockManagerSlaveEndpoint: Done removing broadcast 5, response is 2
16/02/01 12:14:17 DEBUG BlockManagerSlaveEndpoint: Sent response: 2 to node2._.com:45115
Potresti accendere il livello DEBUG di registrazione sul pacchetto 'org.apache.spark.storage' e condividere i risultati? Ho controllato il codice di BlockManager e ci sono molte cose che potrebbero accadere durante il metodo 'doGetLocal' e ci sono voci di registro del livello di debug che potrebbero aiutare a capire cosa sta facendo esattamente. BTW, 'Trovato blocco rdd_5_215 localmente 'significa che lo ha trovato nel BlockManager locale (non in quello remoto), ma potrebbe ricevere blocchi dalla memoria o dal disco o da una memoria esterna. –
Grazie a @AlexLarikov, ho aggiunto il registro DEBUG. Quando guardo l'interfaccia utente web di Spark, mi dice che l'RDD è memorizzato al 100% nella cache. Stando così le cose, è ancora ragionevole che Spark recuperi un blocco dal disco? – user2179977