2015-06-16 8 views
5

Ho usato i dati di ingresso con il formato seguente:concomitante lavoro Esecuzione in Spark

0 
1 
2 
3 
4 
5 
… 
14 

Input Location: hdfs://localhost:9000/Input/datasource 

Ho usato il seguente frammento di codice per salvare RDD come file di testo utilizzando più thread:

package org.apache.spark.examples; 

import java.io.Serializable; 
import java.util.ArrayList; 
import java.util.Arrays; 
import java.util.Collections; 
import java.util.Comparator; 
import java.util.List; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 

import org.apache.avro.ipc.specific.Person; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaPairRDD; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.api.java.function.Function2; 
import org.apache.spark.api.java.function.PairFunction; 
import org.apache.spark.sql.DataFrame; 
import org.apache.spark.sql.Row; 
import org.apache.spark.sql.RowFactory; 
import org.apache.spark.sql.SQLContext; 
import org.apache.spark.sql.types.DataTypes; 
import org.apache.spark.sql.types.StructField; 
import org.apache.spark.sql.types.StructType; 

import scala.Tuple2; 

class RunnableDemo implements Runnable 
{ 

    private Thread t; 
    private String threadName; 
    private String path; 
    private JavaRDD<String> javaRDD; 
// private JavaSparkContext javaSparkContext; 

    RunnableDemo(String threadName,JavaRDD<String> javaRDD,String path) 
    { 
     this.threadName=threadName; 
     this.javaRDD=javaRDD; 
     this.path=path; 
//  this.javaSparkContext=javaSparkContext; 
    } 


    @Override 
    public void run() { 
     System.out.println("Running " + threadName);  
     try { 
      this.javaRDD.saveAsTextFile(path); 
//   System.out.println(this.javaRDD.count()); 
      Thread.sleep(50); 
      } catch (InterruptedException e) { 
       System.out.println("Thread " + threadName + " interrupted."); 
       } 
     System.out.println("Thread " + threadName + " exiting."); 
//  this.javaSparkContext.stop(); 
    } 

    public void start() 
     { 
      System.out.println("Starting " + threadName); 
      if (t == null) 
      { 
      t = new Thread (this, threadName); 
      t.start(); 
      } 
     } 

} 

public class SparkJavaTest { 



    public static void main(String[] args) { 

     //Spark Configurations: 

     SparkConf sparkConf=new SparkConf().setAppName("SparkJavaTest"); 

     JavaSparkContext ctx=new JavaSparkContext(sparkConf); 

     SQLContext sqlContext = new SQLContext(ctx);   

     JavaRDD<String> dataCollection=ctx.textFile("hdfs://yarncluster/Input/datasource"); 




     List<StructField> fields= new ArrayList<StructField>(); 
     fields.add(DataTypes.createStructField("Id", DataTypes.IntegerType,true)); 

     JavaRDD<Row> rowRDD =dataCollection.map(
       new Function<String, Row>() { 
        @Override 
        public Row call(String record) throws Exception { 
         String[] fields = record.split("\u0001");      
         return RowFactory.create(Integer.parseInt(fields[0].trim())); 
        }     
       });  

     StructType schema = DataTypes.createStructType(fields); 

     DataFrame dataFrame =sqlContext.createDataFrame(rowRDD, schema);   
     dataFrame.registerTempTable("data"); 

     long recordsCount=dataFrame.count();   
     long splitRecordsCount=5; 
     long splitCount =recordsCount/splitRecordsCount; 
     List<JavaRDD<Row>> list1=new ArrayList<JavaRDD<Row>>(); 

     for(int i=0;i<splitCount;i++) 
     { 
      long start = i*splitRecordsCount; 
      long end = (i+1)*splitRecordsCount;   
      DataFrame temp=sqlContext.sql("SELECT * FROM data WHERE Id >="+ start +" AND Id < " + end);   
      list1.add(temp.toJavaRDD()); 
     }  

     long length =list1.size(); 

     int split=0; 

     for (int i = 0; i < length; i++) { 

      JavaRDD rdd1 =list1.get(i); 

      JavaPairRDD rdd3=rdd1.cartesian(rdd1); 

      JavaPairRDD<Row,Row> rdd4=rdd3.filter(
        new Function<Tuple2<Row,Row>,Boolean>() 
        { 
         public Boolean call(Tuple2<Row,Row> s) 
         { 
          Row line1=s._1; 
          Row line2=s._2; 

          long app1 = Integer.parseInt(line1.get(0).toString()); 

          long app2 = Integer.parseInt(line2.get(0).toString()); 


          if(app1<app2) 
          { 
           return true; 
          } 
          return false; 
         } 
        }); 

      JavaRDD<String> test=rdd4.map(new Function<Tuple2<Row,Row>, String>() { 
       @Override 
       public String call(Tuple2<Row, Row> s) 
         throws Exception { 

        Row data1=s._1; 
        Row data2=s._2; 

        int x =Integer.parseInt(data1.get(0).toString()); 
        int y =Integer.parseInt(data2.get(0).toString()); 

        String result =x +","+ y+","+(x+y); 
        return result; 
       } 
      }); 

      RunnableDemo R =new RunnableDemo("Thread-"+split,test,"hdfs://yarncluster/GettingStarted/Output/"+split); 

      R.start(); 
      split++;    
      R.start(); 

      int index =i; 

      while(index<length) 
      { 
       JavaRDD rdd2 =list1.get(index); 
       rdd3=rdd1.cartesian(rdd2); 

       rdd4=rdd3.filter(
         new Function<Tuple2<Row,Row>,Boolean>() 
         { 
          public Boolean call(Tuple2<Row,Row> s) 
          { 
           Row line1=s._1; 
           Row line2=s._2; 

           long app1 = Integer.parseInt(line1.get(0).toString()); 

           long app2 = Integer.parseInt(line2.get(0).toString()); 


           if(app1<app2) 
           { 
            return true; 
           } 
           return false; 
          } 
         });   

       test=rdd4.map(new Function<Tuple2<Row,Row>, String>() { 
        @Override 
        public String call(Tuple2<Row, Row> s) 
          throws Exception { 

         Row data1=s._1; 
         Row data2=s._2; 

         int x =Integer.parseInt(data1.get(0).toString()); 
         int y =Integer.parseInt(data2.get(0).toString()); 

         String result =x +","+ y+","+(x+y); 
         return result; 
        } 
       });   

       R =new RunnableDemo("Thread-"+split,test,"hdfs://yarncluster/GettingStarted/Output/"+split); 

       R.start(); 
       split++;    
       index++;     
      } 
     } 
    } 

} 

In questo caso ho affrontato la seguente eccezione

enter image description here

Ho provato la soluzione fornita nel seguente link

How to run concurrent jobs(actions) in Apache Spark using single spark context

Ma ancora, non riesco a risolvere questo problema.

Potresti per favore guidarmi a risolvere questo?

risposta

0

Potrebbe provare a utilizzare

SparkConf sparkConf=new SparkConf() 
.setAppName("SparkJavaTest") 
.set("spark.driver.allowMultipleContexts", "true"); 

Sono anche una novità per la programmazione Spark, ma questo mi ha aiutato durante l'esecuzione di più lavori scintilla nella pianificazione. Dalla mia comprensione non abbiamo bisogno di pensare ai thread o alla logica M-R/piano di esecuzione come Spark fa per te.

+0

Ciao Chetandalal, già ho provato con questa proprietà. Non funziona per me – Raja

4

Prima di tutto, si sta tentando di eseguire tutto il lavoro sul nodo driver utilizzando diversi thread. Questo non è proprio nello spirito di Spark, dal momento che ogni unità di lavoro nel tuo caso è indipendente dalle altre e può essere eseguita su macchine diverse. Hai un esempio di gioco qui, ma diventerà molto importante con grandi volumi di dati.

Un approccio migliore sarebbe quello di utilizzare qualcosa come mapPartitions per inviare l'intervallo di chiavi a ciascun operatore e consentire loro di eseguire le query SQL corrispondenti, quindi salvare i risultati, con un thread per worker. Ciò renderà il codice più pulito e più facile da ragionare (una volta che ti sarai abituato al modo in cui funzionano gli RDD). Ovviamente è necessario impostare level of parallelism e il numero di partizioni (parlato di here) per i dati di input in modo appropriato.

Il problema immediato con il codice è che il thread principale avvia altri thread, ma non aspetta che finiscano. Normalmente questo fa terminare i thread generati insieme al genitore (vedere javadoc). Notare come nel answer alla domanda collegata la funzione principale fa un get() sui future spawnati prima di tornare.

Problemi correlati