2014-11-18 10 views

risposta

31

Il livello di località per quanto ne so indica che è stato eseguito il tipo di accesso ai dati. Quando un nodo finisce tutto il suo lavoro e la sua CPU diventa inattiva, Spark può decidere di avviare altre attività in sospeso che richiedono l'acquisizione di dati da altre posizioni. Quindi, idealmente, tutte le attività dovrebbero essere locali, poiché associate a una minore latenza di accesso ai dati.

È possibile configurare il tempo di attesa prima di passare ad altri livelli di località utilizzando:

spark.locality.wait 

Maggiori informazioni sui parametri si possono trovare nel Spark Configuration docs

Per quanto riguarda i diversi livelli PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL o QUALSIASI penso che i metodi findTask e findSpeculativeTask in org.apache.spark.scheduler.TaskSetManager illustrate ho w Spark sceglie le attività in base al loro livello di località. Verificherà innanzitutto le attività PROCESS_LOCAL che verranno avviate nello stesso processo di esecuzione. In caso contrario, controllerà le attività NODE_LOCAL che potrebbero trovarsi in altri esecutori nello stesso nodo o dovrà essere recuperato da sistemi come HDFS, cache, ecc. RACK_LOCAL significa che i dati si trovano in un altro nodo e pertanto devono essere trasferiti prima esecuzione. E infine, ANY è solo per prendere qualsiasi attività in sospeso che possa essere eseguita nel nodo corrente.

/** 
    * Dequeue a pending task for a given node and return its index and locality level. 
    * Only search for tasks matching the given locality constraint. 
    */ 
    private def findTask(execId: String, host: String, locality: TaskLocality.Value) 
    : Option[(Int, TaskLocality.Value)] = 
    { 
    for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) { 
     return Some((index, TaskLocality.PROCESS_LOCAL)) 
    } 

    if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) { 
     for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) { 
     return Some((index, TaskLocality.NODE_LOCAL)) 
     } 
    } 

    if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) { 
     for { 
     rack <- sched.getRackForHost(host) 
     index <- findTaskFromList(execId, getPendingTasksForRack(rack)) 
     } { 
     return Some((index, TaskLocality.RACK_LOCAL)) 
     } 
    } 

    // Look for no-pref tasks after rack-local tasks since they can run anywhere. 
    for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) { 
     return Some((index, TaskLocality.PROCESS_LOCAL)) 
    } 

    if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) { 
     for (index <- findTaskFromList(execId, allPendingTasks)) { 
     return Some((index, TaskLocality.ANY)) 
     } 
    } 

    // Finally, if all else has failed, find a speculative task 
    findSpeculativeTask(execId, host, locality) 
    } 
+0

Puoi spiegare cosa intendi con "attività in sospeso"? Ritengo che l'unico lavoro di un nodo di lavoro sia eseguire le attività fornite dall'utilità di pianificazione. Una volta terminata l'esecuzione di queste attività (magari quando l'applicazione spark viene eseguita in esecuzione), rimane inattiva. Quali sono le attività in sospeso quindi? – user3376961

+0

@ user3376961 Penso che la seguente domanda possa chiarire quale attività si trova nella scintilla. Tieni presente che puoi anche lavorare con un certo livello di elasticità e che mostra anche l'importanza di non avere una relazione uno-a-uno. http://stackoverflow.com/q/25276409/91042 –