2015-09-22 26 views
14

Ho un DataFrame con lo schemaEliminazione di una colonna nidificata Spark dataframe

root 
|-- label: string (nullable = true) 
|-- features: struct (nullable = true) 
| |-- feat1: string (nullable = true) 
| |-- feat2: string (nullable = true) 
| |-- feat3: string (nullable = true) 

Mentre, sono in grado di filtrare il frame di dati utilizzando

val data = rawData 
    .filter(!(rawData("features.feat1") <=> "100")) 

non sono in grado di rilasciare le colonne usando

val data = rawData 
     .drop("features.feat1") 

È qualcosa che sto facendo male qui? Ho anche provato (senza successo) a fare drop(rawData("features.feat1")), anche se non ha molto senso farlo.

Grazie in anticipo,

Nikhil

+0

E se invece lo si mappasse su un nuovo dataframe? Non penso che l'API DataFrame ti permetta di eliminare un campo struct all'interno di un tipo di colonna struct. –

+0

Ohh. Ci proverò, ma mi sembra piuttosto scomodo se devo mappare solo per risolvere un nome di colonna annidato in questo modo :(. –

+0

Puoi sempre ottenere tutte le colonne con il metodo '.columns()' di DataFrame, rimuovere la colonna indesiderata dalla sequenza e fai 'select (myColumns: _ *)'. Dovrebbe essere un po 'più breve. – Niemand

risposta

12

è solo un esercizio di programmazione, ma si può provare qualcosa di simile:

import org.apache.spark.sql.{DataFrame, Column} 
import org.apache.spark.sql.types.{StructType, StructField} 
import org.apache.spark.sql.{functions => f} 
import scala.util.Try 

case class DFWithDropFrom(df: DataFrame) { 
    def getSourceField(source: String): Try[StructField] = { 
    Try(df.schema.fields.filter(_.name == source).head) 
    } 

    def getType(sourceField: StructField): Try[StructType] = { 
    Try(sourceField.dataType.asInstanceOf[StructType]) 
    } 

    def genOutputCol(names: Array[String], source: String): Column = { 
    f.struct(names.map(x => f.col(source).getItem(x).alias(x)): _*) 
    } 

    def dropFrom(source: String, toDrop: Array[String]): DataFrame = { 
    getSourceField(source) 
     .flatMap(getType) 
     .map(_.fieldNames.diff(toDrop)) 
     .map(genOutputCol(_, source)) 
     .map(df.withColumn(source, _)) 
     .getOrElse(df) 
    } 
} 

Esempio di utilizzo:

scala> case class features(feat1: String, feat2: String, feat3: String) 
defined class features 

scala> case class record(label: String, features: features) 
defined class record 

scala> val df = sc.parallelize(Seq(record("a_label", features("f1", "f2", "f3")))).toDF 
df: org.apache.spark.sql.DataFrame = [label: string, features: struct<feat1:string,feat2:string,feat3:string>] 

scala> DFWithDropFrom(df).dropFrom("features", Array("feat1")).show 
+-------+--------+ 
| label|features| 
+-------+--------+ 
|a_label| [f2,f3]| 
+-------+--------+ 


scala> DFWithDropFrom(df).dropFrom("foobar", Array("feat1")).show 
+-------+----------+ 
| label| features| 
+-------+----------+ 
|a_label|[f1,f2,f3]| 
+-------+----------+ 


scala> DFWithDropFrom(df).dropFrom("features", Array("foobar")).show 
+-------+----------+ 
| label| features| 
+-------+----------+ 
|a_label|[f1,f2,f3]| 
+-------+----------+ 

Aggiungi un implicit conversion e sei a posto.

9

Questa versione consente di rimuovere le colonne nidificate a qualsiasi livello:

import org.apache.spark.sql._ 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.types.{StructType, DataType} 

/** 
    * Various Spark utilities and extensions of DataFrame 
    */ 
object DataFrameUtils { 

    private def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = { 
    if (fullColName.equals(dropColName)) { 
     None 
    } else { 
     colType match { 
     case colType: StructType => 
      if (dropColName.startsWith(s"${fullColName}.")) { 
      Some(struct(
       colType.fields 
       .flatMap(f => 
        dropSubColumn(col.getField(f.name), f.dataType, s"${fullColName}.${f.name}", dropColName) match { 
        case Some(x) => Some(x.alias(f.name)) 
        case None => None 
        }) 
       : _*)) 
      } else { 
      Some(col) 
      } 
     case other => Some(col) 
     } 
    } 
    } 

    protected def dropColumn(df: DataFrame, colName: String): DataFrame = { 
    df.schema.fields 
     .flatMap(f => { 
     if (colName.startsWith(s"${f.name}.")) { 
      dropSubColumn(col(f.name), f.dataType, f.name, colName) match { 
      case Some(x) => Some((f.name, x)) 
      case None => None 
      } 
     } else { 
      None 
     } 
     }) 
     .foldLeft(df.drop(colName)) { 
     case (df, (colName, column)) => df.withColumn(colName, column) 
     } 
    } 

    /** 
    * Extended version of DataFrame that allows to operate on nested fields 
    */ 
    implicit class ExtendedDataFrame(df: DataFrame) extends Serializable { 
    /** 
     * Drops nested field from DataFrame 
     * 
     * @param colName Dot-separated nested field name 
     */ 
    def dropNestedColumn(colName: String): DataFrame = { 
     DataFrameUtils.dropColumn(df, colName) 
    } 
    } 
} 

Usage:

import DataFrameUtils._ 
df.dropNestedColumn("a.b.c.d") 
+1

Grazie mille!Qualche possibilità che hai aggiornato per eliminare un campo da una struttura in un array sotto un array? Hai hackerato da più di un giorno ora, vicino ma non riesco a ottenerlo. genitore: array >>> –

+1

@alexP_Keaton Ehi, hai trovato una soluzione per far cadere una colonna all'interno di un array? –

+0

Vorrei aggiungere che questo metodo non conserva la proprietà 'nullable' della struttura padre modificata. In questo esempio, 'features' diventerà' struct (nullable = false) ' –

2

seguente frammento di codice di spektom per Scala, ho creato un codice simile a Java. Poiché java 8 non ha foldLeft, ho usato perOppure ordinato. Questo codice è adatto per spark 2.x (Sto usando 2.1) Inoltre ho notato che il rilascio di una colonna e l'aggiunta usando withColumn con lo stesso nome non funziona, quindi sto solo sostituendo la colonna, e sembra lavorare.

Codice non è completamente testato, spero che funziona :-)

public class DataFrameUtils { 

public static Dataset<Row> dropNestedColumn(Dataset<Row> dataFrame, String columnName) { 
    final DataFrameFolder dataFrameFolder = new DataFrameFolder(dataFrame); 
    Arrays.stream(dataFrame.schema().fields()) 
     .flatMap(f -> { 
      if (columnName.startsWith(f.name() + ".")) { 
       final Optional<Column> column = dropSubColumn(col(f.name()), f.dataType(), f.name(), columnName); 
       if (column.isPresent()) { 
        return Stream.of(new Tuple2<>(f.name(), column)); 
       } else { 
        return Stream.empty(); 
       } 
      } else { 
       return Stream.empty(); 
      } 
     }).forEachOrdered(colTuple -> dataFrameFolder.accept(colTuple)); 

    return dataFrameFolder.getDF(); 
} 

private static Optional<Column> dropSubColumn(Column col, DataType colType, String fullColumnName, String dropColumnName) { 
    Optional<Column> column = Optional.empty(); 
    if (!fullColumnName.equals(dropColumnName)) { 
     if (colType instanceof StructType) { 
      if (dropColumnName.startsWith(fullColumnName + ".")) { 
       column = Optional.of(struct(getColumns(col, (StructType)colType, fullColumnName, dropColumnName))); 
      } 
     } else { 
      column = Optional.of(col); 
     } 
    } 

    return column; 
} 

private static Column[] getColumns(Column col, StructType colType, String fullColumnName, String dropColumnName) { 
    return Arrays.stream(colType.fields()) 
     .flatMap(f -> { 
        final Optional<Column> column = dropSubColumn(col.getField(f.name()), f.dataType(), 
          fullColumnName + "." + f.name(), dropColumnName); 
        if (column.isPresent()) { 
         return Stream.of(column.get().alias(f.name())); 
        } else { 
         return Stream.empty(); 
        } 
       } 
     ).toArray(Column[]::new); 

} 

private static class DataFrameFolder implements Consumer<Tuple2<String, Optional<Column>>> { 
    private Dataset<Row> df; 

    public DataFrameFolder(Dataset<Row> df) { 
     this.df = df; 
    } 

    public Dataset<Row> getDF() { 
     return df; 
    } 

    @Override 
    public void accept(Tuple2<String, Optional<Column>> colTuple) { 
     if (!colTuple._2().isPresent()) { 
      df = df.drop(colTuple._1()); 
     } else { 
      df = df.withColumn(colTuple._1(), colTuple._2().get()); 
     } 
    } 
} 

Esempio di utilizzo:

private class Pojo { 
    private String str; 
    private Integer number; 
    private List<String> strList; 
    private Pojo2 pojo2; 

    public String getStr() { 
     return str; 
    } 

    public Integer getNumber() { 
     return number; 
    } 

    public List<String> getStrList() { 
     return strList; 
    } 

    public Pojo2 getPojo2() { 
     return pojo2; 
    } 

} 

private class Pojo2 { 
    private String str; 
    private Integer number; 
    private List<String> strList; 

    public String getStr() { 
     return str; 
    } 

    public Integer getNumber() { 
     return number; 
    } 

    public List<String> getStrList() { 
     return strList; 
    } 

} 

SQLContext context = new SQLContext(new SparkContext("local[1]", "test")); 
Dataset<Row> df = context.createDataFrame(Collections.emptyList(), Pojo.class); 
Dataset<Row> dfRes = DataFrameUtils.dropNestedColumn(df, "pojo2.str"); 

struct originale:

root 
|-- number: integer (nullable = true) 
|-- pojo2: struct (nullable = true) 
| |-- number: integer (nullable = true) 
| |-- str: string (nullable = true) 
| |-- strList: array (nullable = true) 
| | |-- element: string (containsNull = true) 
|-- str: string (nullable = true) 
|-- strList: array (nullable = true) 
| |-- element: string (containsNull = true) 

Dopo goccia:

root 
|-- number: integer (nullable = true) 
|-- pojo2: struct (nullable = false) 
| |-- number: integer (nullable = true) 
| |-- strList: array (nullable = true) 
| | |-- element: string (containsNull = true) 
|-- str: string (nullable = true) 
|-- strList: array (nullable = true) 
| |-- element: string (containsNull = true) 
+0

aggiungi un semplice esempio di come chiamarlo e ti inviterò in eccesso – xXxpRoGrAmmErxXx

+1

Esempio di utilizzo aggiunto per @xXxpRoGrAmmErxXx richiesta –

1

Espansione su risposta spektom. Con supporto per i tipi di array:

object DataFrameUtils { 

    private def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = { 
    if (fullColName.equals(dropColName)) { 
     None 
    } else if (dropColName.startsWith(s"$fullColName.")) { 
     colType match { 
     case colType: StructType => 
      Some(struct(
      colType.fields 
       .flatMap(f => 
       dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match { 
        case Some(x) => Some(x.alias(f.name)) 
        case None => None 
       }) 
       : _*)) 
     case colType: ArrayType => 
      colType.elementType match { 
      case innerType: StructType => 
       Some(struct(innerType.fields 
       .flatMap(f => 
        dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match { 
        case Some(x) => Some(x.alias(f.name)) 
        case None => None 
        }) 
       : _*)) 
      } 

     case other => Some(col) 
     } 
    } else { 
     Some(col) 
    } 
    } 

    protected def dropColumn(df: DataFrame, colName: String): DataFrame = { 
    df.schema.fields 
     .flatMap(f => { 
     if (colName.startsWith(s"${f.name}.")) { 
      dropSubColumn(col(f.name), f.dataType, f.name, colName) match { 
      case Some(x) => Some((f.name, x)) 
      case None => None 
      } 
     } else { 
      None 
     } 
     }) 
     .foldLeft(df.drop(colName)) { 
     case (df, (colName, column)) => df.withColumn(colName, column) 
     } 
    } 

    /** 
    * Extended version of DataFrame that allows to operate on nested fields 
    */ 
    implicit class ExtendedDataFrame(df: DataFrame) extends Serializable { 
    /** 
     * Drops nested field from DataFrame 
     * 
     * @param colName Dot-separated nested field name 
     */ 
    def dropNestedColumn(colName: String): DataFrame = { 
     DataFrameUtils.dropColumn(df, colName) 
    } 
    } 

} 
Problemi correlati