2015-01-22 9 views
9

Sto provando ad eseguire il codice seguente usando eclipse (con maven conf) con 2 worker e ognuno ha 2 core o anche provato con spark-submit.spark ssc.textFileStream non streamining alcun file dalla directory

public class StreamingWorkCount implements Serializable { 

    public static void main(String[] args) { 
     Logger.getLogger("org.apache.spark").setLevel(Level.WARN); 
     JavaStreamingContext jssc = new JavaStreamingContext(
       "spark://192.168.1.19:7077", "JavaWordCount", 
       new Duration(1000)); 
     JavaDStream<String> trainingData = jssc.textFileStream(
       "/home/bdi-user/kaushal-drive/spark/data/training").cache(); 
     trainingData.foreach(new Function<JavaRDD<String>, Void>() { 

      public Void call(JavaRDD<String> rdd) throws Exception { 
       List<String> output = rdd.collect(); 
       System.out.println("Sentences Collected from files " + output); 
       return null; 
      } 
     }); 

     trainingData.print(); 
     jssc.start(); 
     jssc.awaitTermination(); 
    } 
} 

e il registro di tale codice

15/01/22 21:57:13 INFO FileInputDStream: New files at time 1421944033000 ms: 

15/01/22 21:57:13 INFO JobScheduler: Added jobs for time 1421944033000 ms 
15/01/22 21:57:13 INFO JobScheduler: Starting job streaming job 1421944033000 ms.0 from job set of time 1421944033000 ms 
15/01/22 21:57:13 INFO SparkContext: Starting job: foreach at StreamingKMean.java:33 
15/01/22 21:57:13 INFO DAGScheduler: Job 3 finished: foreach at StreamingKMean.java:33, took 0.000094 s 
Sentences Collected from files [] 
------------------------------------------- 
15/01/22 21:57:13 INFO JobScheduler: Finished job streaming job 1421944033000 ms.0 from job set of time 1421944033000 ms 
Time: 1421944033000 ms 
-------------------------------------------15/01/22 21:57:13 INFO JobScheduler: Starting job streaming job 1421944033000 ms.1 from job set of time 1421944033000 ms 


15/01/22 21:57:13 INFO JobScheduler: Finished job streaming job 1421944033000 ms.1 from job set of time 1421944033000 ms 
15/01/22 21:57:13 INFO JobScheduler: Total delay: 0.028 s for time 1421944033000 ms (execution: 0.013 s) 
15/01/22 21:57:13 INFO MappedRDD: Removing RDD 5 from persistence list 
15/01/22 21:57:13 INFO BlockManager: Removing RDD 5 
15/01/22 21:57:13 INFO FileInputDStream: Cleared 0 old files that were older than 1421943973000 ms: 
15/01/22 21:57:13 INFO FileInputDStream: Cleared 0 old files that were older than 1421943973000 ms: 
15/01/22 21:57:13 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer() 

Il problema è che i dati, non ricevo formano il file, che si trova nella directory. Mi aiuti per favore.

+0

fronte esattamente lo stesso problema su macchina Windows.Si prega di suggerire –

+0

Penso che questo funzioni solo in HDFS e non nel file system locale –

risposta

8

Provalo con un'altra directory e copia questi file in quella directory, mentre il lavoro è in esecuzione.

+0

sì, ho anche provato con un'altra dir. non ho capito qual è il problema e come eseguire il debug, anche se non viene visualizzato nel registro. – Kaushal

+1

Ma la directory era vuota quando hai iniziato il lavoro? – pzecevic

+0

In realtà alcuni file sono già lì e copio anche alcuni file quando avvio il mio lavoro. – Kaushal

1

Penso che sia necessario aggiungere lo schema, ad esempio file:// o hdfs:// davanti al percorso.


Annullamento di modifica al mio commento perché: E 'infatti file:// e hdfs:// che deve essere aggiunto "di fronte a" il percorso, in modo che il percorso totale diventa file:///tmp/file.txt o hdfs:///user/data. Se non è presente alcun NameNode nella configurazione, quest'ultimo deve essere hdfs://host:port/user/data.

+1

utilizzando HDFS, funziona ma quando uso il file system locale con prefisso 'file: ///' (spark non supporta file: //), non funziona. – Kaushal

+1

Ciò potrebbe essere dovuto al fatto che si sta utilizzando un cluster e che il percorso specificato deve essere accessibile da tutti gli esecutori Spark, vale a dire che non è sufficiente se il driver Spark può accedervi. – tgpfeiffer

3

ha avuto lo stesso problema. Ecco il mio codice:

linee = jssc.textFileStream ("file: /// Users/progetti/scintilla/test/data ');

il TextFileSTream è molto sensibili; quello che ho finito per fare era:

1. Run Spark program 
2. touch datafile 
3. mv datafile datafile2 
4. mv datafile2 /Users/projects/spark/test/data 

e che lo ha fatto

+0

Sì, ha funzionato bene! – lihongxu

0

JavaDoc suggerisce funzione flusso solo nuovo file. S.

Rif: https://spark.apache.org/docs/1.0.1/api/java/org/apache/spark/streaming/api/java/JavaStreamingContext.html#textFileStream(java.lang.String)

creare un flusso di ingresso che controlla un file system compatibile con Hadoop per i nuovi file e li legge come file di testo (con il tasto come LongWritable, valore come testo e l'ingresso formato TextInputFormat). I file devono essere scritti nella directory monitorata "spostandoli" da un'altra posizione all'interno dello stesso file system. Nomi di file che iniziano con. sono ignorati

0

textFileStream può monitorare solo una cartella quando i file nella cartella vengono aggiunti o aggiornati.

Se si desidera solo leggere i file, è possibile utilizzare SparkContext.textFile.

0

È necessario considerare che Spark Streaming leggerà solo i nuovi file nella directory, non quelli aggiornati (una volta entrati nella directory) e anche tutti devono avere lo stesso formato.

Source

Problemi correlati