In primo luogo, vorrei generare alcuni dati casuali per iniziare a lavorare con.
import random
number_of_rows = int(1e6)
line_error = "error line"
text = []
for i in range(number_of_rows):
choice = random.choice([1,2,3,4])
if choice == 1:
line = line_error
elif choice == 2:
line = "1 2 3 4 5 6 7 8 9_1"
elif choice == 3:
line = "1 2 3 4 5 6 7 8 9_2"
elif choice == 4:
line = "1 2 3 4 5 6 7 8 9_3"
text.append(line)
Ora ho una stringa text
assomiglia
1 2 3 4 5 6 7 8 9_2
error line
1 2 3 4 5 6 7 8 9_3
1 2 3 4 5 6 7 8 9_2
1 2 3 4 5 6 7 8 9_3
1 2 3 4 5 6 7 8 9_1
error line
1 2 3 4 5 6 7 8 9_2
....
La vostra soluzione:
def wrapException(a):
try:
return a[8]
except:
return 'error'
log.map(lambda s : s.split(' ')).map(wrapException).filter(lambda s : s!='error').map(lambda code : (code,1)).reduceByKey(lambda acu,value : acu + value).collect()
#[('9_3', 250885), ('9_1', 249307), ('9_2', 249772)]
Ecco la mia soluzione:
from operator import add
def myfunction(l):
try:
return (l.split(' ')[8],1)
except:
return ('MYERROR', 1)
log.map(myfunction).reduceByKey(add).collect()
#[('9_3', 250885), ('9_1', 249307), ('MYERROR', 250036), ('9_2', 249772)]
Commento:
(1) Consiglio vivamente anche di calcolare le righe con "errore" perché non aggiungerà troppo overhead e può anche essere utilizzato per il controllo di integrità, ad esempio, tutti i conteggi dovrebbero sommarsi al numero totale di righe nel log, se si escludono queste righe, non si ha idea di quelle che sono veramente linee sbagliate o qualcosa è andato storto nella logica di codifica.
(2) Proverò a comprimere tutte le operazioni a livello di linea in una funzione per evitare il concatenamento delle funzioni map
, filter
, quindi è più leggibile.
(3) Dal punto di vista delle prestazioni, ho generato un campione di record 1M e il mio codice terminato in 3 secondi e il tuo in 2 secondi, non è una comparazione equa dato che i dati sono così piccoli e il mio cluster è piuttosto robusto, Ti consiglierei di generare un file più grande (1e12?) E fare un punto di riferimento sul tuo.