2013-10-05 27 views
7

Ho scritto un semplice flusso MapReduce per leggere le righe da un file CSV da un file su Google Cloud Storage e successivamente creare un'entità. Tuttavia, non riesco a farlo funzionare su più di una porzione.Come ridurre la scalabilità della mappa AppEngine?

Il codice utilizza mapreduce.control.start_map e ha un aspetto simile a questo.

class LoadEntitiesPipeline(webapp2.RequestHandler): 
     id = control.start_map(map_name, 
          handler_spec="backend.line_processor", 
          reader_spec="mapreduce.input_readers.FileInputReader", 
          queue_name=get_queue_name("q-1"), 
          shard_count=shard_count, 
          mapper_parameters={ 
           'shard_count': shard_count, 
           'batch_size': 50, 
           'processing_rate': 1000000, 
           'files': [gsfile], 
           'format': 'lines'}) 

Ho shard_count in entrambi i posti, perché non sono sicuro di quali siano i metodi effettivamente necessari. L'impostazione di shard_count ovunque tra 8 e 32, non cambia nulla in quanto la pagina di stato dice sempre 1/1 di shard in esecuzione. Per separare le cose, ho eseguito tutto su una coda di backend con un numero elevato di istanze. Ho provato a regolare i parametri della coda per this wiki. Alla fine, sembra funzionare solo in serie.

Qualche idea? Grazie!

Update (Ancora senza successo):

Nel tentativo di isolare le cose, ho provato a fare la chiamata utilizzando chiamate dirette al gasdotto in questo modo:

class ImportHandler(webapp2.RequestHandler): 

    def get(self, gsfile): 
     pipeline = LoadEntitiesPipeline2(gsfile) 
     pipeline.start(queue_name=get_queue_name("q-1")) 

     self.redirect(pipeline.base_path + "/status?root=" + pipeline.pipeline_id) 


class LoadEntitiesPipeline2(base_handler.PipelineBase): 

    def run(self, gsfile): 
     yield mapreduce_pipeline.MapperPipeline(
      'loadentities2_' + gsfile, 
      'backend.line_processor', 
      'mapreduce.input_readers.FileInputReader', 
      params={'files': [gsfile], 'format': 'lines'}, 
      shards=32 
     ) 

Con questo nuovo codice, ancora gira solo su un frammento. Sto iniziando a chiedermi se mapreduce.input_readers.FileInputReader è in grado di parallelizzare l'input per riga.

risposta

0

Sembra a me come FileInputReader dovrebbe essere in grado di sharding sulla base di una rapida lettura di: https://code.google.com/p/appengine-mapreduce/source/browse/trunk/python/src/mapreduce/input_readers.py

Sembra 'format': 'linee' dovrebbe dividere utilizzando:. Self.get_current_file() readline()

Sembra interpretare correttamente le linee quando funziona in serie? Forse le interruzioni di riga sono la codifica sbagliata o qualcosa del genere.

+0

Sì, non c'è niente di strano con la mia codifica newline, ed è in grado di elaborare ogni linea fine, ma non di fare lo sharding alla line granularity. In effetti, quando divido il file molto grande in file più piccoli (diciamo 5000 righe ciascuno). Ottengo può ottenere la chiamata mapreduce a shard, ma sembra che sia sharding per nomi di file e non a granularità più fine. –

0

Per esperienza, FileInputReader esegue un massimo di un frammento per file. Soluzione: dividi i tuoi file di grandi dimensioni. Io uso split_file in https://github.com/johnwlockwood/karl_data per tagliare i file prima di caricarli su Cloud Storage. Se i file di grandi dimensioni sono già disponibili, è possibile utilizzare un'istanza di Compute Engine per estrarli e fare lo sharding perché la velocità di trasferimento sarà la più veloce. FYI: karld è nel cheeseshop in modo da poter pip install karld

5

Sembra FileInputReader può solo frammento tramite file. I parametri format cambiano solo il modo in cui la funzione mapper ha ricevuto la chiamata. Se si passa più di un file al programma di mappatura, inizierà a essere eseguito su più di un frammento. Altrimenti userà solo un frammento per elaborare i dati.

EDIT # 1:

Dopo scavare più a fondo nella biblioteca MapReduce. MapReduce deciderà se dividere o meno i file in pezzi in base al ritorno del metodo can_split per ogni tipo di file definito. Attualmente, l'unico formato che implementa il metodo split è ZipFormat. Quindi, se il formato del file non è zip, non dividerà il file per l'esecuzione su più di un frammento.

@classmethod 
    def can_split(cls): 
    """Indicates whether this format support splitting within a file boundary. 

    Returns: 
     True if a FileFormat allows its inputs to be splitted into 
    different shards. 
    """ 

https://code.google.com/p/appengine-mapreduce/source/browse/trunk/python/src/mapreduce/file_formats.py

Ma sembra che è possibile scrivere il proprio metodo split formato di file. Puoi provare a modificare e aggiungere il metodo split al numero _TextFormat e vedere se più di un frammento è in esecuzione.

@classmethod 
def split(cls, desired_size, start_index, opened_file, cache): 
    pass 

EDIT # 2:

Una soluzione semplice sarebbe lasciato la corsa FileInputReader in serie, ma spostare il processo molto lungo cosuming parallelo reduce palco.

def line_processor(line): 
    # serial 
    yield (random.randrange(1000), line) 

def reducer(key, values): 
    # parallel 
    entities = [] 
    for v in values: 
     entities.append(CREATE_ENTITY_FROM_VALUE(v)) 
    db.put(entities) 

EDIT # 3:

Se tentare di modificare il FileFormat, ecco un esempio (non sono ancora stati prova)

from file_formats import _TextFormat, FORMATS 


class _LinesSplitFormat(_TextFormat): 
    """Read file line by line.""" 

    NAME = 'split_lines' 

    def get_next(self): 
    """Inherited.""" 
    index = self.get_index() 
    cache = self.get_cache() 
    offset = sum(cache['infolist'][:index]) 

    self.get_current_file.seek(offset) 
    result = self.get_current_file().readline() 
    if not result: 
     raise EOFError() 
    if 'encoding' in self._kwargs: 
     result = result.encode(self._kwargs['encoding']) 
    return result 

    @classmethod 
    def can_split(cls): 
    """Inherited.""" 
    return True 

    @classmethod 
    def split(cls, desired_size, start_index, opened_file, cache): 
    """Inherited.""" 
    if 'infolist' in cache: 
     infolist = cache['infolist'] 
    else: 
     infolist = [] 
     for i in opened_file: 
     infolist.append(len(i)) 
     cache['infolist'] = infolist 

    index = start_index 
    while desired_size > 0 and index < len(infolist): 
     desired_size -= infolist[index] 
     index += 1 
    return desired_size, index 


FORMATS['split_lines'] = _LinesSplitFormat 

Poi il nuovo formato di file possono essere chiamati via cambia il parametro mapper_parameters da lines a split_line.

class LoadEntitiesPipeline(webapp2.RequestHandler): 
    id = control.start_map(map_name, 
         handler_spec="backend.line_processor", 
         reader_spec="mapreduce.input_readers.FileInputReader", 
         queue_name=get_queue_name("q-1"), 
         shard_count=shard_count, 
         mapper_parameters={ 
          'shard_count': shard_count, 
          'batch_size': 50, 
          'processing_rate': 1000000, 
          'files': [gsfile], 
          'format': 'split_lines'}) 
+0

Grazie per la risposta elaborata e soluzioni alternative suggerite. – Alice

Problemi correlati