2016-05-18 11 views
5

Ho creato un DataFrame da una tabella HBase (PHOENIX) che ha 500 milioni di righe. Da DataFrame ho creato un RDD di JavaBean e lo uso per unire i dati di un file.PHOENIX SPARK - Carica tabella come DataFrame

Map<String, String> phoenixInfoMap = new HashMap<String, String>(); 
phoenixInfoMap.put("table", tableName); 
phoenixInfoMap.put("zkUrl", zkURL); 
DataFrame df = sqlContext.read().format("org.apache.phoenix.spark").options(phoenixInfoMap).load(); 
JavaRDD<Row> tableRows = df.toJavaRDD(); 
JavaPairRDD<String, AccountModel> dbData = tableRows.mapToPair(
new PairFunction<Row, String, String>() 
{ 
    @Override 
    public Tuple2<String, String> call(Row row) throws Exception 
    { 
     return new Tuple2<String, String>(row.getAs("ID"), row.getAs("NAME")); 
    } 
}); 

Ora la mia domanda - Diciamo che il file ha 2 milioni di voci univoche corrispondenti alla tabella. L'intera tabella viene caricata in memoria come RDD o solo i corrispondenti 2 milioni di record dalla tabella verranno caricati in memoria come RDD?

+0

Ciao @Mohan, per favore fammi sapere la dipendenza della build per il 'DataFrame df = sqlContext.read(). Format (" org.apache.phoenix.spark "). Options (phoenixInfoMap) .load()' metodo . Sto facendo lo stesso, ma ottenendo 'java.lang.NoSuchMethodError' – Explorer

risposta

2

La sua dichiarazione

DataFrame df = sqlContext.read().format("org.apache.phoenix.spark").options(phoenixInfoMap) 
.load(); 

caricherà l'intera tabella in memoria. Non hai fornito alcun filtro per phoenix da spingere verso il basso in hbase - e quindi ridurre il numero di righe lette.

Se si esegue un join su un'origine dati non-HBase, ad esempio un file flat, è necessario prima leggere tutti i record della tabella hbase. I record che non corrispondono all'origine dati secondaria non verranno salvati nel nuovo DataFrame - ma la lettura iniziale sarebbe ancora avvenuta.

Aggiornamento Un approccio potenziale potrebbe essere pre-elaborare il file, ad esempio estrarre gli id ​​che si desidera. Memorizza i risultati in una nuova tabella HBase. Quindi eseguire il join direttamente in HBase tramite Phoenix non Spark.

La motivazione di questo approccio è spostare il calcolo sui dati. La maggior parte dei dati risiede in HBase, quindi sposta i piccoli dati (gli ID nei file) lì.

Non ho dimestichezza diretta con Phoenix tranne che fornisce uno strato sql sopra a hbase. Presumibilmente, allora sarebbe in grado di fare un tale join e memorizzare il risultato in una tabella HBase separata ..? Quella tabella separata potrebbe quindi essere caricata in Spark per essere utilizzata nei calcoli successivi.

+0

Grazie @javadba. Esiste un modo efficace per gestire questo scenario? Vorrei caricare solo i 2 milioni di voci corrispondenti nel file dalla tabella HBase. – Mohan

+0

Ho aggiornato l'OP per rispondere a questo. – javadba

Problemi correlati