2014-05-21 13 views
9

Questa è una domanda che ho already asked sulla mailing list dell'utente spark e spero di ottenere più successo qui .Bypassare org.apache.hadoop.mapred.InvalidInputException: Input Pattern s3n: // [...] corrisponde a 0 file

Non sono sicuro che sia direttamente correlato alla scintilla, anche se la scintilla ha qualcosa a che fare con il fatto che non posso risolvere facilmente questo problema.

Sto cercando di ottenere alcuni file da S3 utilizzando vari modelli. Il mio problema è che alcuni di questi modelli può restituire nulla, e quando lo fanno, ottengo la seguente eccezione:

org.apache.hadoop.mapred.InvalidInputException: Input Pattern s3n://bucket/mypattern matches 0 files 
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:197) 
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208) 
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:140) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) 
    at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) 
    at org.apache.spark.rdd.FlatMappedRDD.getPartitions(FlatMappedRDD.scala:30) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) 
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:52) 
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:52) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
    at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:52) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) 
    at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:58) 
    at org.apache.spark.api.java.JavaPairRDD.reduceByKey(JavaPairRDD.scala:335) 
    ... 2 more 

Vorrei un modo per ignorare i file mancanti e semplicemente non fare nulla in questo caso. Il problema qui IMO è che non so se un pattern restituirà qualcosa finché non viene effettivamente eseguito e la scintilla inizia l'elaborazione dei dati solo quando si verifica un'azione (qui, la parte reduceByKey). Quindi non posso cogliere un errore da qualche parte e lasciare che le cose continuino.

Una soluzione potrebbe essere quella di forzare la scintilla per elaborare ciascun percorso singolarmente ma che probabilmente costerà assegnare in termini di velocità e/o memoria, quindi sto cercando un'altra opzione che sarebbe efficiente.

Sto usando la scintilla 0.9.1. Grazie

risposta

4

Ok, scavando un po 'in Spark e grazie a qualcuno che mi guida nella lista dell'utente scintilla Penso che ho capito:

sc.newAPIHadoopFile("s3n://missingPattern/*", EmptiableTextInputFormat.class, LongWritable.class, Text.class, sc.hadoopConfiguration()) 
    .map(new Function<Tuple2<LongWritable, Text>, String>() { 
     @Override 
     public String call(Tuple2<LongWritable, Text> arg0) throws Exception { 
      return arg0._2.toString(); 
     } 
    }) 
    .count(); 

E la EmptiableTextInputFormat che fa la magia:

import java.io.IOException; 
import java.util.Collections; 
import java.util.List; 

import org.apache.hadoop.mapreduce.InputSplit; 
import org.apache.hadoop.mapreduce.JobContext; 
import org.apache.hadoop.mapreduce.lib.input.InvalidInputException; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 

public class EmptiableTextInputFormat extends TextInputFormat { 
    @Override 
    public List<InputSplit> getSplits(JobContext arg0) throws IOException { 
     try { 
      return super.getSplits(arg0); 
     } catch (InvalidInputException e) { 
      return Collections.<InputSplit> emptyList(); 
     } 
    } 
} 

Si potrebbe eventualmente controllare il messaggio dello InvalidInputException per una maggiore precisione.

+0

Un modo per implementare la stessa logica 'SparkContext.sequenceFile()'? –

2

Per chiunque desideri un trucco veloce, ecco un esempio con sc.wholeTextFiles

def wholeTextFilesIgnoreErrors(path: String, sc: SparkContext): RDD[(String, String)] = { 
    // TODO This is a bit hacky, probabally ought to work out a better way using lower level hadoop api 

    sc.wholeTextFiles(path.split(",").filter(subPath => Try(sc.textFile(subPath).take(1)).isSuccess).mkString(",")) 
    } 
Problemi correlati