Ho usato i dati di ingresso con il formato seguente:concomitante lavoro Esecuzione in Spark
0
1
2
3
4
5
…
14
Input Location: hdfs://localhost:9000/Input/datasource
Ho usato il seguente frammento di codice per salvare RDD come file di testo utilizzando più thread:
package org.apache.spark.examples;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.avro.ipc.specific.Person;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
class RunnableDemo implements Runnable
{
private Thread t;
private String threadName;
private String path;
private JavaRDD<String> javaRDD;
// private JavaSparkContext javaSparkContext;
RunnableDemo(String threadName,JavaRDD<String> javaRDD,String path)
{
this.threadName=threadName;
this.javaRDD=javaRDD;
this.path=path;
// this.javaSparkContext=javaSparkContext;
}
@Override
public void run() {
System.out.println("Running " + threadName);
try {
this.javaRDD.saveAsTextFile(path);
// System.out.println(this.javaRDD.count());
Thread.sleep(50);
} catch (InterruptedException e) {
System.out.println("Thread " + threadName + " interrupted.");
}
System.out.println("Thread " + threadName + " exiting.");
// this.javaSparkContext.stop();
}
public void start()
{
System.out.println("Starting " + threadName);
if (t == null)
{
t = new Thread (this, threadName);
t.start();
}
}
}
public class SparkJavaTest {
public static void main(String[] args) {
//Spark Configurations:
SparkConf sparkConf=new SparkConf().setAppName("SparkJavaTest");
JavaSparkContext ctx=new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(ctx);
JavaRDD<String> dataCollection=ctx.textFile("hdfs://yarncluster/Input/datasource");
List<StructField> fields= new ArrayList<StructField>();
fields.add(DataTypes.createStructField("Id", DataTypes.IntegerType,true));
JavaRDD<Row> rowRDD =dataCollection.map(
new Function<String, Row>() {
@Override
public Row call(String record) throws Exception {
String[] fields = record.split("\u0001");
return RowFactory.create(Integer.parseInt(fields[0].trim()));
}
});
StructType schema = DataTypes.createStructType(fields);
DataFrame dataFrame =sqlContext.createDataFrame(rowRDD, schema);
dataFrame.registerTempTable("data");
long recordsCount=dataFrame.count();
long splitRecordsCount=5;
long splitCount =recordsCount/splitRecordsCount;
List<JavaRDD<Row>> list1=new ArrayList<JavaRDD<Row>>();
for(int i=0;i<splitCount;i++)
{
long start = i*splitRecordsCount;
long end = (i+1)*splitRecordsCount;
DataFrame temp=sqlContext.sql("SELECT * FROM data WHERE Id >="+ start +" AND Id < " + end);
list1.add(temp.toJavaRDD());
}
long length =list1.size();
int split=0;
for (int i = 0; i < length; i++) {
JavaRDD rdd1 =list1.get(i);
JavaPairRDD rdd3=rdd1.cartesian(rdd1);
JavaPairRDD<Row,Row> rdd4=rdd3.filter(
new Function<Tuple2<Row,Row>,Boolean>()
{
public Boolean call(Tuple2<Row,Row> s)
{
Row line1=s._1;
Row line2=s._2;
long app1 = Integer.parseInt(line1.get(0).toString());
long app2 = Integer.parseInt(line2.get(0).toString());
if(app1<app2)
{
return true;
}
return false;
}
});
JavaRDD<String> test=rdd4.map(new Function<Tuple2<Row,Row>, String>() {
@Override
public String call(Tuple2<Row, Row> s)
throws Exception {
Row data1=s._1;
Row data2=s._2;
int x =Integer.parseInt(data1.get(0).toString());
int y =Integer.parseInt(data2.get(0).toString());
String result =x +","+ y+","+(x+y);
return result;
}
});
RunnableDemo R =new RunnableDemo("Thread-"+split,test,"hdfs://yarncluster/GettingStarted/Output/"+split);
R.start();
split++;
R.start();
int index =i;
while(index<length)
{
JavaRDD rdd2 =list1.get(index);
rdd3=rdd1.cartesian(rdd2);
rdd4=rdd3.filter(
new Function<Tuple2<Row,Row>,Boolean>()
{
public Boolean call(Tuple2<Row,Row> s)
{
Row line1=s._1;
Row line2=s._2;
long app1 = Integer.parseInt(line1.get(0).toString());
long app2 = Integer.parseInt(line2.get(0).toString());
if(app1<app2)
{
return true;
}
return false;
}
});
test=rdd4.map(new Function<Tuple2<Row,Row>, String>() {
@Override
public String call(Tuple2<Row, Row> s)
throws Exception {
Row data1=s._1;
Row data2=s._2;
int x =Integer.parseInt(data1.get(0).toString());
int y =Integer.parseInt(data2.get(0).toString());
String result =x +","+ y+","+(x+y);
return result;
}
});
R =new RunnableDemo("Thread-"+split,test,"hdfs://yarncluster/GettingStarted/Output/"+split);
R.start();
split++;
index++;
}
}
}
}
In questo caso ho affrontato la seguente eccezione
Ho provato la soluzione fornita nel seguente link
How to run concurrent jobs(actions) in Apache Spark using single spark context
Ma ancora, non riesco a risolvere questo problema.
Potresti per favore guidarmi a risolvere questo?
Ciao Chetandalal, già ho provato con questa proprietà. Non funziona per me – Raja