2014-12-03 9 views
8

In screenshot qui sotto di Spark di amministrazione in esecuzione sulla porta 8080:Ciò che è casuale lettura e la riproduzione casuale scrittura in Apache Spark

enter image description here

I "Shuffle Leggere" & parametri "Shuffle scrivere" sono sempre vuota per questo codice :

import org.apache.spark.SparkContext; 

object first { 
    println("Welcome to the Scala worksheet") 

    val conf = new org.apache.spark.SparkConf() 
    .setMaster("local") 
    .setAppName("distances") 
    .setSparkHome("C:\\spark-1.1.0-bin-hadoop2.4\\spark-1.1.0-bin-hadoop2.4") 
    .set("spark.executor.memory", "2g") 
    val sc = new SparkContext(conf) 

    def euclDistance(userA: User, userB: User) = { 

    val subElements = (userA.features zip userB.features) map { 
     m => (m._1 - m._2) * (m._1 - m._2) 
    } 
    val summed = subElements.sum 
    val sqRoot = Math.sqrt(summed) 

    println("value is" + sqRoot) 
    ((userA.name, userB.name), sqRoot) 
    } 

    case class User(name: String, features: Vector[Double]) 

    def createUser(data: String) = { 

    val id = data.split(",")(0) 
    val splitLine = data.split(",") 

    val distanceVector = (splitLine.toList match { 
     case h :: t => t 
    }).map(m => m.toDouble).toVector 

    User(id, distanceVector) 

    } 

    val dataFile = sc.textFile("c:\\data\\example.txt") 
    val users = dataFile.map(m => createUser(m)) 
    val cart = users.cartesian(users) // 
    val distances = cart.map(m => euclDistance(m._1, m._2)) 
    //> distances : org.apache.spark.rdd.RDD[((String, String), Double)] = MappedR 
    //| DD[4] at map at first.scala:46 
    val d = distances.collect // 

    d.foreach(println) //> ((a,a),0.0) 
    //| ((a,b),0.0) 
    //| ((a,c),1.0) 
    //| ((a,),0.0) 
    //| ((b,a),0.0) 
    //| ((b,b),0.0) 
    //| ((b,c),1.0) 
    //| ((b,),0.0) 
    //| ((c,a),1.0) 
    //| ((c,b),1.0) 
    //| ((c,c),0.0) 
    //| ((c,),0.0) 
    //| ((,a),0.0) 
    //| ((,b),0.0) 
    //| ((,c),0.0) 
    //| ((,),0.0) 

} 

Perché "Shuffle Leggi" & campi "Shuffle WRITE" vuoto? Il codice sopra può essere ottimizzato per popolare questi campi in modo da comprendere come

risposta

2

Credo che sia necessario eseguire l'applicazione in modalità cluster/distribuita per visualizzare qualsiasi valore di lettura o scrittura casuale. Tipicamente "shuffle" sono attivati ​​da un sottoinsieme di azioni Spark (ad es. GroupBy, join, ecc.)

17

Per lo spostamento casuale si intende la riallocazione dei dati tra più stadi Spark. "Shuffle Write" è la somma di tutti i dati serializzati scritti su tutti gli esecutori prima della trasmissione (normalmente alla fine di uno stage) e "Shuffle Read" indica la somma di dati serializzati letti su tutti gli esecutori all'inizio di uno stage.

Il programma ha solo uno stadio, attivato dall'operazione "collect". Non è richiesto alcun shuffling, perché hai solo un mucchio di operazioni di mappa consecutive che sono pipeline in uno stage.

Provate a dare un'occhiata a queste diapositive: http://de.slideshare.net/colorant/spark-shuffle-introduction

Si potrebbe anche aiutare a leggere chapture 5 dal documento originale: http://people.csail.mit.edu/matei/papers/2012/nsdi_spark.pdf

Problemi correlati