2015-10-28 19 views
6

Ho un pessimo access_log HTTPD e voglio semplicemente saltare le linee "schifose".Qual è l'equivalente di scala.util.Try in pyspark?

in Scala questo è semplice:

import scala.util.Try 

val log = sc.textFile("access_log") 

log.map(_.split(' ')).map(a => Try(a(8))).filter(_.isSuccess).map(_.get).map(code => (code,1)).reduceByKey(_ + _).collect() 

per Python Ho la seguente soluzione definendo in modo esplicito una funzione in contrasto con il "lambda" notazione:

log = sc.textFile("access_log") 

def wrapException(a): 
    try: 
     return a[8] 
    except: 
     return 'error' 

log.map(lambda s : s.split(' ')).map(wrapException).filter(lambda s : s!='error').map(lambda code : (code,1)).reduceByKey(lambda acu,value : acu + value).collect() 

C'è un modo migliore di farlo (ad es. come in Scala) in pyspark?

Grazie mille!

risposta

2

In primo luogo, vorrei generare alcuni dati casuali per iniziare a lavorare con.

import random 
number_of_rows = int(1e6) 
line_error = "error line" 
text = [] 
for i in range(number_of_rows): 
    choice = random.choice([1,2,3,4]) 
    if choice == 1: 
     line = line_error 
    elif choice == 2: 
     line = "1 2 3 4 5 6 7 8 9_1" 
    elif choice == 3: 
     line = "1 2 3 4 5 6 7 8 9_2" 
    elif choice == 4: 
     line = "1 2 3 4 5 6 7 8 9_3" 
    text.append(line) 

Ora ho una stringa text assomiglia

1 2 3 4 5 6 7 8 9_2 
    error line 
    1 2 3 4 5 6 7 8 9_3 
    1 2 3 4 5 6 7 8 9_2 
    1 2 3 4 5 6 7 8 9_3 
    1 2 3 4 5 6 7 8 9_1 
    error line 
    1 2 3 4 5 6 7 8 9_2 
    .... 

La vostra soluzione:

def wrapException(a): 
    try: 
     return a[8] 
    except: 
     return 'error' 

log.map(lambda s : s.split(' ')).map(wrapException).filter(lambda s : s!='error').map(lambda code : (code,1)).reduceByKey(lambda acu,value : acu + value).collect() 

#[('9_3', 250885), ('9_1', 249307), ('9_2', 249772)] 

Ecco la mia soluzione:

from operator import add 
def myfunction(l): 
    try: 
     return (l.split(' ')[8],1) 
    except: 
     return ('MYERROR', 1) 
log.map(myfunction).reduceByKey(add).collect() 
#[('9_3', 250885), ('9_1', 249307), ('MYERROR', 250036), ('9_2', 249772)] 

Commento:

(1) Consiglio vivamente anche di calcolare le righe con "errore" perché non aggiungerà troppo overhead e può anche essere utilizzato per il controllo di integrità, ad esempio, tutti i conteggi dovrebbero sommarsi al numero totale di righe nel log, se si escludono queste righe, non si ha idea di quelle che sono veramente linee sbagliate o qualcosa è andato storto nella logica di codifica.

(2) Proverò a comprimere tutte le operazioni a livello di linea in una funzione per evitare il concatenamento delle funzioni map, filter, quindi è più leggibile.

(3) Dal punto di vista delle prestazioni, ho generato un campione di record 1M e il mio codice terminato in 3 secondi e il tuo in 2 secondi, non è una comparazione equa dato che i dati sono così piccoli e il mio cluster è piuttosto robusto, Ti consiglierei di generare un file più grande (1e12?) E fare un punto di riferimento sul tuo.

6

Meglio è un termine soggettivo ma ci sono alcuni approcci che puoi provare.

  • La cosa più semplice che si può fare in questo caso particolare è quello di evitare le eccezioni di sorta. Tutto ciò che serve è un po 'flatMap e affettare:

    log.flatMap(lambda s : s.split(' ')[8:9]) 
    

    Come si può vedere che significa che nessuna necessità di una gestione delle eccezioni o successivamente filter.

  • idea precedente può essere esteso con un semplice involucro

    def seq_try(f, *args, **kwargs): 
        try: 
         return [f(*args, **kwargs)] 
        except: 
         return [] 
    

    e esempio di utilizzo

    from operator import div # FYI operator provides getitem as well. 
    
    rdd = sc.parallelize([1, 2, 0, 3, 0, 5, "foo"]) 
    
    rdd.flatMap(lambda x: seq_try(div, 1., x)).collect() 
    ## [1.0, 0.5, 0.3333333333333333, 0.2] 
    
  • finalmente approccio più OO:

    import inspect as _inspect 
    
    class _Try(object): pass  
    
    class Failure(_Try): 
        def __init__(self, e): 
         if Exception not in _inspect.getmro(e.__class__): 
          msg = "Invalid type for Failure: {0}" 
          raise TypeError(msg.format(e.__class__)) 
         self._e = e 
         self.isSuccess = False 
         self.isFailure = True 
    
        def get(self): raise self._e 
    
        def __repr__(self): 
         return "Failure({0})".format(repr(self._e)) 
    
    class Success(_Try): 
        def __init__(self, v): 
         self._v = v 
         self.isSuccess = True 
         self.isFailure = False 
    
        def get(self): return self._v 
    
        def __repr__(self): 
         return "Success({0})".format(repr(self._v)) 
    
    def Try(f, *args, **kwargs): 
        try: 
         return Success(f(*args, **kwargs)) 
        except Exception as e: 
         return Failure(e) 
    

    e esempio di utilizzo:

    tries = rdd.map(lambda x: Try(div, 1.0, x)) 
    tries.collect() 
    ## [Success(1.0), 
    ## Success(0.5), 
    ## Failure(ZeroDivisionError('float division by zero',)), 
    ## Success(0.3333333333333333), 
    ## Failure(ZeroDivisionError('float division by zero',)), 
    ## Success(0.2), 
    ## Failure(TypeError("unsupported operand type(s) for /: 'float' and 'str'",))] 
    
    tries.filter(lambda x: x.isSuccess).map(lambda x: x.get()).collect() 
    ## [1.0, 0.5, 0.3333333333333333, 0.2] 
    

    Si può anche usare il pattern matching con multipledispatch

    from multipledispatch import dispatch 
    from operator import getitem 
    
    @dispatch(Success) 
    def check(x): return "Another great success" 
    
    @dispatch(Failure) 
    def check(x): return "What a failure" 
    
    a_list = [1, 2, 3] 
    
    check(Try(getitem, a_list, 1)) 
    ## 'Another great success' 
    
    check(Try(getitem, a_list, 10)) 
    ## 'What a failure' 
    

    Se ti piace questo approccio che ho spinto un po 'più completa attuazione GitHub e pypi.