Sto raccogliendo i dati da un'applicazione di messaggistica, Attualmente sto usando Flume, invia circa 50 milioni di dischi al giornoPersistendo Spark uscita Streaming
Vorrei usare Kafka, consumano da Kafka usando Spark Streaming e persistono a Hadoop e query con impala
sto avendo problemi con l'approccio che ho provato ..
approccio 1 - Salvo RDD come legno, puntare una tabella alveare in legno esterno al legno elenco
// scala
val ssc = new StreamingContext(sparkConf, Seconds(bucketsize.toInt))
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
lines.foreachRDD(rdd => {
// 1 - Create a SchemaRDD object from the rdd and specify the schema
val SchemaRDD1 = sqlContext.jsonRDD(rdd, schema)
// 2 - register it as a spark sql table
SchemaRDD1.registerTempTable("sparktable")
// 3 - qry sparktable to produce another SchemaRDD object of the data needed 'finalParquet'. and persist this as parquet files
val finalParquet = sqlContext.sql(sql)
finalParquet.saveAsParquetFile(dir)
Il problema è che FinalParquet. saveAsParquetFile emette un numero enorme. di file, il Dstream ricevuto da Kafka restituisce oltre 200 file per un lotto di 1 minuto. La ragione per cui viene generato molti file è perché il calcolo è distribuito come spiegato in un altro post-how to make saveAsTextFile NOT split output into multiple file? le soluzioni propsed non mi sembrano ottimali, ad es. come afferma un utente - Avere un singolo file di output è solo una buona idea se si hanno pochissimi dati.
Approccio 2: utilizzare Hivecontext. inserire dati RDD direttamente a una tabella alveare
# python
sqlContext = HiveContext(sc)
ssc = StreamingContext(sc, int(batch_interval))
kvs = KafkaUtils.createStream(ssc, zkQuorum, group, {topics: 1})
lines = kvs.map(lambda x: x[1]).persist(StorageLevel.MEMORY_AND_DISK_SER)
lines.foreachRDD(sendRecord)
def sendRecord(rdd):
sql = "INSERT INTO TABLE table select * from beacon_sparktable"
# 1 - Apply the schema to the RDD creating a data frame 'beaconDF'
beaconDF = sqlContext.jsonRDD(rdd,schema)
# 2- Register the DataFrame as a spark sql table.
beaconDF.registerTempTable("beacon_sparktable")
# 3 - insert to hive directly from a qry on the spark sql table
sqlContext.sql(sql);
Questo funziona bene, si inserisce direttamente ad un tavolo in legno, ma ci sono schedulazione ritardi per lotti come tempo di elaborazione supera l'intervallo di tempo in batch. Il consumatore non può tenere il passo con ciò che viene prodotto e i lotti da elaborare iniziano a fare la fila.
sembra che scrivere all'alveare sia lento. Ho provato a regolare le dimensioni dell'intervallo batch, eseguendo più istanze di consumo.
In sintesi
Qual è il modo migliore per persistere Big dati da Spark Streaming dato che ci sono problemi con più file e la latenza potenziale con la scrittura di scorporare? Cosa stanno facendo gli altri?
Una domanda simile è stato chiesto qui, ma ha un problema con le directory come apposto troppi file How to make Spark Streaming write its output so that Impala can read it?
Molte grazie per qualsiasi aiuto
È possibile impostare una finestra diversa per il flusso di output. 'val lines = KafkaUtils.createStream (ssc, zkQuorum, group, topicMap) .map (_._ 2) .window (Minutes (15)). foreachRDD (rdd =>' – ssedano
questo per me sembra un caso d'uso molto comune , Sono sorpreso che nessuno abbia risposto.Immagino che suggerirei di usare un database, dato che Spark da solo non può davvero sostituirlo. Prova Cassandra o HBase (curva di apprendimento molto ripida per HBase). – avloss