2015-03-10 17 views
15

Ho già eseguito l'installazione di scintille e ho eseguito alcuni test che impostano i nodi master e worker. Detto questo, ho una grossa confusione su cosa si intenda esattamente per lavoro nel contesto di Spark (non in SparkContext). Ho sotto domandeChe cos'è Spark Job?

  • Quanto è diverso il lavoro da un programma Driver.
  • L'applicazione stessa fa parte del programma Driver?
  • Spark submit in un modo è un lavoro?

Ho letto il Spark documention ma ancora questa cosa non è chiara per me.

Detto questo, la mia implementazione è di scrivere lavori spark (programmaticamente) che vorrebbero inviare una scintilla.

Gentilmente aiutare con qualche esempio, se possibile. Sarebbe molto utile.

Nota: Gentilmente non postare collegamenti spark perché l'ho già provato. Anche se le domande sembrano ingenue, tuttavia ho bisogno di maggiore chiarezza nella comprensione.

risposta

24

Bene, la terminologia può sempre essere difficile poiché dipende dal contesto. In molti casi, puoi essere utilizzato per "inviare un lavoro a un cluster", che per scintilla sarebbe di inviare un programma di driver.

Detto questo, Spark ha la propria definizione di "lavoro", direttamente dal glossario:

Lavoro Un calcolo parallelo costituito da più attività che viene ha generato in risposta ad un'azione di Spark (ad esempio salvare, raccogliere); vedrai questo termine usato nei log del conducente.

Così ho questo contesto, diciamo è necessario effettuare le seguenti operazioni:

  1. Caricare un file con persone nomi e gli indirizzi in RDD1
  2. Caricare un file con persone nomi e telefoni in RDD2
  3. registrazione RDD1 e RDD2 per nome, per ottenere RDD3
  4. mappa su RDD3 per ottenere una bella carta di presentazione HTML per ogni persona come RDD4
  5. Salva RDD4 file.
  6. Mappa RDD1 per estrarre zipcodes dagli indirizzi per ottenere RDD5
  7. Aggregazione su RDD5 per ottenere un conteggio di quante persone vivono su ogni codice postale come RDD6
  8. Raccogliere RDD6 e stampa queste statistiche al stdout.

Quindi,

  1. Il programma pilota è questo intero pezzo di codice, in esecuzione tutte le 8 passi.
  2. Produrre l'intera scheda HTML impostato sul punto 5 è un lavoro (chiaro, perché stiamo usando il Salva azione, non una trasformazione). Stesso con il raccogliere nel passaggio 8
  3. altri passi verranno organizzati in fasi, con ogni lavoro essendo il risultato di una sequenza di stadi. Per cose semplici, un lavoro può avere una singola fase, ma la necessità di ripartizionare i dati (ad esempio, il join nel passaggio 3) o qualsiasi altra cosa che interrompa la localizzazione dei dati di solito causa l'apparizione di più fasi. Puoi pensare alle fasi come a calcoli che producono risultati intermedi, che possono infatti essere mantenuti. Ad esempio, possiamo mantenere RDD1 poiché lo useremo più di una volta, evitando il ricalcolo.
  4. Tutti e 3 sopra fondamentalmente parlano di come la logica di un determinato algoritmo verrà interrotta. Al contrario, l'attività è un particolare che passerà attraverso un determinato livello, su un determinato esecutore.

spero che rende le cose più chiare ;-)

+0

è chiaro per me ora :) ma comunque ho un query su come scrivere la pianificazione del lavoro. Ho letto documenti ma non riesco ad ottenere il codice. – chaosguru

+1

Beh, questo dipende molto dal tipo di infrastruttura che hai (stai usando Spark on Yarn per esempio?) Non è il mio forte seme, ma in linea di principio, lancio tutti i miei programmi di driver dagli script di Bash (per ricordare i parametri, creare cartelle di output, ecc.). Qualsiasi normale strumento di pianificazione in grado di eseguire un comando di console dovrebbe funzionare IMHO. Se ogni lavoro utilizza tutte le risorse nel cluster, è possibile semplicemente inviare programmi e attenderanno la liberazione delle risorse. –

+0

Inoltre, sarebbe bello se si potesse accettare la risposta se chiarito le cose. Aiuta le persone che cercano domande senza risposta e quindi più domande ricevono risposta ;-) –

0

Hey ecco qualcosa che ho fatto prima, spero che funziona per voi:

#!/bin/bash 
# Hadoop and Server Variables 
HADOOP="hadoop fs" 
HDFS_HOME="hdfs://ha-edge-group/user/max" 
LOCAL_HOME="/home/max" 

# Cluster Variables 
DRIVER_MEM="10G" 
EXECUTOR_MEM="10G" 
CORES="5" 
EXECUTORS="15" 

# Script Arguments 
SCRIPT="availability_report.py" # Arg[0] 
APPNAME="Availability Report" # arg[1] 

DAY=`date -d yesterday +%Y%m%d` 

for HOUR in 00 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 
do 
     #local directory to getmerge to 
     LOCAL_OUTFILE="$LOCAL_HOME/availability_report/data/$DAY/$HOUR.txt" 

     # Script arguments 
     HDFS_SOURCE="webhdfs://1.2.3.4:0000/data/lbs_ndc/raw_$DAY'_'$HOUR" # arg[2] 
     HDFS_CELLS="webhdfs://1.2.3.4:0000/data/cells/CELLID_$DAY.txt" # arg[3] 
     HDFS_OUT_DIR="$HDFS_HOME/availability/$DAY/$HOUR" # arg[4] 

     spark-submit \ 
     --master yarn-cluster \ 
     --driver-memory $DRIVER_MEM \ 
     --executor-memory $EXECUTOR_MEM \ 
     --executor-cores $CORES \ 
     --num-executors $EXECUTORS \ 
     --conf spark.scheduler.mode=FAIR \ 
     $SCRIPT $APPNAME $HDFS_SOURCE $HDFS_CELLS $HDFS_OUT_DIR 

     $HADOOP -getmerge $HDFS_OUT_DIR $LOCAL_OUTFILE 
done