2014-07-08 15 views
11

È necessario un aiuto per la migliore pratica di implementazione. L'ambiente operativo è la seguente: file di datiApache Spark on YARN: numero elevato di file di dati di input (combina più file di input in spark)

  • Log arriva in modo irregolare.
  • Le dimensioni di un file di dati di registro vanno da 3,9 KB a 8,5 MB. La media è di circa 1 MB.
  • Il numero di record di un file di dati è compreso tra 13 righe e 22000 righe. La media è di circa 2700 linee.
  • Il file di dati deve essere post-elaborato prima dell'aggregazione.
  • L'algoritmo di post-elaborazione può essere modificato.
  • Il file post-elaborazione viene gestito separatamente con il file di dati originale, poiché l'algoritmo di post-elaborazione potrebbe essere modificato.
  • L'aggregazione giornaliera viene eseguita. Tutto il file di dati postelaborato deve essere filtrato record per record e viene calcolata l'aggregazione (media, max min ...).
  • Poiché l'aggregazione è a grana fine, il numero di record dopo l'aggregazione non è così ridotto. Può essere circa la metà del numero dei record originali.
  • In un punto, il numero del file post-elaborato può essere di circa 200.000.
  • Un file di dati deve poter essere eliminato singolarmente.

In un test, ho cercato di elaborare 160.000 file post-processed by Spark iniziano con sc.textFile() con percorso glob, non è riuscito con eccezione OutOfMemory sul processo conducente.

Qual è la migliore pratica per gestire questo tipo di dati? Dovrei usare HBase invece dei file normali per salvare i dati post-elaborati?

risposta

8

Abbiamo scritto proprio caricatore. Ha risolto il nostro problema con piccoli file in HDFS. Usa Hadoop CombineFileInputFormat. Nel nostro caso ha ridotto il numero di mappatori da 100000 a circa 3000 e ha reso il lavoro molto più veloce.

https://github.com/RetailRocket/SparkMultiTool

Esempio:

import ru.retailrocket.spark.multitool.Loaders 
val sessions = Loaders.combineTextFile(sc, "file:///test/*") 
// or val sessions = Loaders.combineTextFile(sc, conf.weblogs(), size = 256, delim = "\n") 
// where size is split size in Megabytes, delim - line break character 
println(sessions.count()) 
+0

Grazie per aver condiviso questo. Penso che l'argomento dimensione sia particolarmente prezioso, dal momento che non può essere specificato su coalesce(). – zeodtr

+0

Questa soluzione è migliore della coalesce perché funziona a livello di mappa, ma si rafforza dopo. –

+1

Poiché hasdoop supporta CombineTextInputFormat (almeno dalla 2.2), la combinazione di file di input di piccole dimensioni può essere eseguita con sc.newAPIHadoopFile(), senza implementare una classe personalizzata. – zeodtr

3

Sono quasi sicuro che il motivo per cui si ottiene OOM è dovuto alla gestione di così tanti piccoli file. Quello che vuoi è combinare i file di input in modo da non ottenere così tante partizioni. Cerco di limitare i miei lavori a circa 10k partizioni.

Dopo textFile, è possibile utilizzare .coalesce(10000, false) ... non sicuro al 100% che funzionerà, però, perché è stato un po 'che ho fatto, per favore fatemelo sapere. Quindi prova

sc.textFile(path).coalesce(10000, false) 
+0

Grazie! Ci proverò. – zeodtr

+0

Ha funzionato! In realtà ho usato il fattore di coalesce 1227, che è il numero di partizioni quando Spark elabora il grande singolo file che contiene l'intero record. Ma il lavoro viene eseguito più lentamente (come previsto), e tuttavia sembra che le informazioni di tutti i file siano ancora trasferite al processo del driver, il che può causare OOM quando sono coinvolti troppi file. Ma 1,68 GB per il processo del driver per i file 168016 non sono così male. – zeodtr

+0

Bene, abbiamo un semplice lavoro separato appositamente per ridurre il numero di file in quanto è una cosa così importante. Una volta ho dovuto eseguirlo in 5 va su 5 sottoinsiemi – samthebest

0

È possibile utilizzare questo

primo luogo è possibile ottenere un Buffer/Elenco dei S3 percorsi/Lo stesso vale per HDFS o il percorso locale

Se Stai provando con Amazon S3 quindi:

import scala.collection.JavaConverters._ 
import java.util.ArrayList 
import com.amazonaws.services.s3.AmazonS3Client 
import com.amazonaws.services.s3.model.ObjectListing 
import com.amazonaws.services.s3.model.S3ObjectSummary 
import com.amazonaws.services.s3.model.ListObjectsRequest 

def listFiles(s3_bucket:String, base_prefix : String) = { 
    var files = new ArrayList[String] 

    //S3 Client and List Object Request 
    var s3Client = new AmazonS3Client(); 
    var objectListing: ObjectListing = null; 
    var listObjectsRequest = new ListObjectsRequest(); 

    //Your S3 Bucket 
    listObjectsRequest.setBucketName(s3_bucket) 

    //Your Folder path or Prefix 
    listObjectsRequest.setPrefix(base_prefix) 

    //Adding s3:// to the paths and adding to a list 
    do { 
     objectListing = s3Client.listObjects(listObjectsRequest); 
     for (objectSummary <- objectListing.getObjectSummaries().asScala) { 
     files.add("s3://" + s3_bucket + "/" + objectSummary.getKey()); 
     } 
     listObjectsRequest.setMarker(objectListing.getNextMarker()); 
    } while (objectListing.isTruncated()); 

    //Removing Base Directory Name 
    files.remove(0) 

    //Creating a Scala List for same 
    files.asScala 
    } 

Ora passare questo oggetto List per il seguente pezzo di codice, nota: SC è un oggetto di SqlContext

var df: DataFrame = null; 
    for (file <- files) { 
    val fileDf= sc.textFile(file) 
    if (df!= null) { 
     df= df.unionAll(fileDf) 
    } else { 
     df= fileDf 
    } 
    } 

Ora hai un finale Unified RDD cioè df

opzionale, e si può anche partizionare in un unico BigRDD

val files = sc.textFile(filename, 1).repartition(1) 

Ripartizionamento funziona sempre: D

Problemi correlati