2012-02-12 17 views
8

Sto lavorando alla funzione appengine-mapreduce e ho modificato la demo per adattarla al mio scopo. Fondamentalmente ho un milione di righe nel seguente formato: userid, time1, time2. Il mio scopo è trovare la differenza tra time1 e time2 per ogni userid.Limite di memoria colpito con appengine-mapreduce

Tuttavia, come ho eseguito questo su Google App Engine, ho incontrato questo messaggio di errore nella sezione logs:

Limite della memoria superato morbido privato con 180.56 MB dopo la manutenzione 130 richieste totali Mentre la manipolazione questa richiesta, il il processo che ha gestito questa richiesta è risultato essere utilizzando troppa memoria ed è stato terminato. È probabile che ciò causi l'utilizzo di un nuovo processo per la richiesta successiva all'applicazione. Se vedi questo messaggio frequentemente, potresti avere una perdita di memoria nella tua applicazione.

def time_count_map(data): 
    """Time count map function.""" 
    (entry, text_fn) = data 
    text = text_fn() 

    try: 
    q = text.split('\n') 
    for m in q: 
     reader = csv.reader([m.replace('\0', '')], skipinitialspace=True) 
     for s in reader: 
      """Calculate time elapsed""" 
      sdw = s[1] 
      start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p") 
      edw = s[2] 
      end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p") 
      time_difference = time.mktime(end_date) - time.mktime(start_date) 
      yield (s[0], time_difference) 
    except IndexError, e: 
    logging.debug(e) 


def time_count_reduce(key, values): 
    """Time count reduce function.""" 
    time = 0.0 
    for subtime in values: 
    time += float(subtime) 
    realtime = int(time) 
    yield "%s: %d\n" % (key, realtime) 

Qualcuno può suggerire in quale altro modo posso ottimizzare il mio codice migliore? Grazie!!

Modificato:

Ecco il gestore gasdotto:

class TimeCountPipeline(base_handler.PipelineBase): 
    """A pipeline to run Time count demo. 

    Args: 
    blobkey: blobkey to process as string. Should be a zip archive with 
     text files inside. 
    """ 

    def run(self, filekey, blobkey): 
    logging.debug("filename is %s" % filekey) 
    output = yield mapreduce_pipeline.MapreducePipeline(
     "time_count", 
     "main.time_count_map", 
     "main.time_count_reduce", 
     "mapreduce.input_readers.BlobstoreZipInputReader", 
     "mapreduce.output_writers.BlobstoreOutputWriter", 
     mapper_params={ 
      "blob_key": blobkey, 
     }, 
     reducer_params={ 
      "mime_type": "text/plain", 
     }, 
     shards=32) 
    yield StoreOutput("TimeCount", filekey, output) 

Mapreduce.yaml:

mapreduce: 
- name: Make messages lowercase 
    params: 
    - name: done_callback 
    value: /done 
    mapper: 
    handler: main.lower_case_posts 
    input_reader: mapreduce.input_readers.DatastoreInputReader 
    params: 
    - name: entity_kind 
     default: main.Post 
    - name: processing_rate 
     default: 100 
    - name: shard_count 
     default: 4 
- name: Make messages upper case 
    params: 
    - name: done_callback 
    value: /done 
    mapper: 
    handler: main.upper_case_posts 
    input_reader: mapreduce.input_readers.DatastoreInputReader 
    params: 
    - name: entity_kind 
     default: main.Post 
    - name: processing_rate 
     default: 100 
    - name: shard_count 
     default: 4 

Il resto dei file sono esattamente la stessa della demo.

Ho caricato una copia dei miei codici su Dropbox: http://dl.dropbox.com/u/4288806/demo%20compressed%20fail%20memory.zip

+0

Puoi mostrare la configurazione di mapreduce? Per qualche motivo sembra che tu stia passando l'intero file al mappatore, invece di mapparlo riga per riga. –

+0

Ciao Daniel, la mia domanda è stata modificata. Grazie, lo apprezzo davvero! – autumngard

risposta

2

è probabile che il file di input supera il limite di memoria morbido in termini di dimensioni. Per file di grandi dimensioni utilizzare BlobstoreLineInputReader o BlobstoreZipLineInputReader.

Questi lettori di input passano qualcosa di diverso dalla funzione map, passano lo start_position nel file e nella riga di testo.

tuo map funzione potrebbe essere simile:

def time_count_map(data): 
    """Time count map function.""" 
    text = data[1] 

    try: 
     reader = csv.reader([text.replace('\0', '')], skipinitialspace=True) 
     for s in reader: 
      """Calculate time elapsed""" 
      sdw = s[1] 
      start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p") 
      edw = s[2] 
      end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p") 
      time_difference = time.mktime(end_date) - time.mktime(start_date) 
      yield (s[0], time_difference) 
    except IndexError, e: 
     logging.debug(e) 

Utilizzando BlobstoreLineInputReader permetterà eseguire il processo molto più veloce in quanto può utilizzare più di un frammento, fino a 256, ma significa che è necessario caricare il tuo file non compressi, che può essere un dolore. Lo gestisco caricando i file compressi su un server Windows EC2, quindi eseguo la decompressione e il caricamento da lì, poiché la larghezza di banda dell'upstream è così grande.

+0

Questo ha funzionato molto bene per me! Molte grazie! :) – autumngard

6

Considerare inoltre di chiamare gc.collect() in punti regolari durante il codice. Ho visto diverse domande di SO sul superamento dei limiti di memoria morbida che sono stati alleviati chiamando gc.collect(), che ha a che fare con blobstore.

+0

sta chiamando gc.collect() si applica solo al blobstore o in generale? – marcadian

Problemi correlati