2015-10-01 20 views
7

Sto raccogliendo i dati da un'applicazione di messaggistica, Attualmente sto usando Flume, invia circa 50 milioni di dischi al giornoPersistendo Spark uscita Streaming

Vorrei usare Kafka, consumano da Kafka usando Spark Streaming e persistono a Hadoop e query con impala

sto avendo problemi con l'approccio che ho provato ..

approccio 1 - Salvo RDD come legno, puntare una tabella alveare in legno esterno al legno elenco

// scala 
val ssc = new StreamingContext(sparkConf, Seconds(bucketsize.toInt)) 
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) 
lines.foreachRDD(rdd => { 

    // 1 - Create a SchemaRDD object from the rdd and specify the schema 
    val SchemaRDD1 = sqlContext.jsonRDD(rdd, schema) 

    // 2 - register it as a spark sql table 
    SchemaRDD1.registerTempTable("sparktable") 

    // 3 - qry sparktable to produce another SchemaRDD object of the data needed 'finalParquet'. and persist this as parquet files 
    val finalParquet = sqlContext.sql(sql) 
    finalParquet.saveAsParquetFile(dir) 

Il problema è che FinalParquet. saveAsParquetFile emette un numero enorme. di file, il Dstream ricevuto da Kafka restituisce oltre 200 file per un lotto di 1 minuto. La ragione per cui viene generato molti file è perché il calcolo è distribuito come spiegato in un altro post-how to make saveAsTextFile NOT split output into multiple file? le soluzioni propsed non mi sembrano ottimali, ad es. come afferma un utente - Avere un singolo file di output è solo una buona idea se si hanno pochissimi dati.

Approccio 2: utilizzare Hivecontext. inserire dati RDD direttamente a una tabella alveare

# python 
sqlContext = HiveContext(sc) 
ssc = StreamingContext(sc, int(batch_interval)) 
kvs = KafkaUtils.createStream(ssc, zkQuorum, group, {topics: 1}) 
lines = kvs.map(lambda x: x[1]).persist(StorageLevel.MEMORY_AND_DISK_SER) 
lines.foreachRDD(sendRecord) 

def sendRecord(rdd): 

    sql = "INSERT INTO TABLE table select * from beacon_sparktable" 

    # 1 - Apply the schema to the RDD creating a data frame 'beaconDF' 
    beaconDF = sqlContext.jsonRDD(rdd,schema) 

    # 2- Register the DataFrame as a spark sql table. 
    beaconDF.registerTempTable("beacon_sparktable") 

    # 3 - insert to hive directly from a qry on the spark sql table 
    sqlContext.sql(sql); 

Questo funziona bene, si inserisce direttamente ad un tavolo in legno, ma ci sono schedulazione ritardi per lotti come tempo di elaborazione supera l'intervallo di tempo in batch. Il consumatore non può tenere il passo con ciò che viene prodotto e i lotti da elaborare iniziano a fare la fila.

sembra che scrivere all'alveare sia lento. Ho provato a regolare le dimensioni dell'intervallo batch, eseguendo più istanze di consumo.

In sintesi

Qual è il modo migliore per persistere Big dati da Spark Streaming dato che ci sono problemi con più file e la latenza potenziale con la scrittura di scorporare? Cosa stanno facendo gli altri?

Una domanda simile è stato chiesto qui, ma ha un problema con le directory come apposto troppi file How to make Spark Streaming write its output so that Impala can read it?

Molte grazie per qualsiasi aiuto

+0

È possibile impostare una finestra diversa per il flusso di output. 'val lines = KafkaUtils.createStream (ssc, zkQuorum, group, topicMap) .map (_._ 2) .window (Minutes (15)). foreachRDD (rdd =>' – ssedano

+0

questo per me sembra un caso d'uso molto comune , Sono sorpreso che nessuno abbia risposto.Immagino che suggerirei di usare un database, dato che Spark da solo non può davvero sostituirlo. Prova Cassandra o HBase (curva di apprendimento molto ripida per HBase). – avloss

risposta

0

In soluzione # 2, il numero di file creati può essere controllato tramite il numero di partizioni di ciascun RDD.

Guarda questo esempio:

// create a Hive table (assume it's already existing) 
sqlContext.sql("CREATE TABLE test (id int, txt string) STORED AS PARQUET") 

// create a RDD with 2 records and only 1 partition 
val rdd = sc.parallelize(List(List(1, "hello"), List(2, "world")), 1) 

// create a DataFrame from the RDD 
val schema = StructType(Seq(
StructField("id", IntegerType, nullable = false), 
StructField("txt", StringType, nullable = false) 
)) 
val df = sqlContext.createDataFrame(rdd.map(Row(_:_*)), schema) 

// this creates a single file, because the RDD has 1 partition 
df.write.mode("append").saveAsTable("test") 

Ora, penso che si può giocare con la frequenza con cui si tira i dati da Kafka, e il numero di partizioni di ogni RDD (di default, le partizioni del vostro argomento Kafka , che puoi eventualmente ridurre ripartizionando).

Sto utilizzando Spark 1.5 da CDH 5.5.1 e ottengo lo stesso risultato utilizzando df.write.mode("append").saveAsTable("test") o la stringa SQL.

Problemi correlati