Ho scritto un semplice flusso MapReduce per leggere le righe da un file CSV da un file su Google Cloud Storage e successivamente creare un'entità. Tuttavia, non riesco a farlo funzionare su più di una porzione.Come ridurre la scalabilità della mappa AppEngine?
Il codice utilizza mapreduce.control.start_map e ha un aspetto simile a questo.
class LoadEntitiesPipeline(webapp2.RequestHandler):
id = control.start_map(map_name,
handler_spec="backend.line_processor",
reader_spec="mapreduce.input_readers.FileInputReader",
queue_name=get_queue_name("q-1"),
shard_count=shard_count,
mapper_parameters={
'shard_count': shard_count,
'batch_size': 50,
'processing_rate': 1000000,
'files': [gsfile],
'format': 'lines'})
Ho shard_count in entrambi i posti, perché non sono sicuro di quali siano i metodi effettivamente necessari. L'impostazione di shard_count ovunque tra 8 e 32, non cambia nulla in quanto la pagina di stato dice sempre 1/1 di shard in esecuzione. Per separare le cose, ho eseguito tutto su una coda di backend con un numero elevato di istanze. Ho provato a regolare i parametri della coda per this wiki. Alla fine, sembra funzionare solo in serie.
Qualche idea? Grazie!
Update (Ancora senza successo):
Nel tentativo di isolare le cose, ho provato a fare la chiamata utilizzando chiamate dirette al gasdotto in questo modo:
class ImportHandler(webapp2.RequestHandler):
def get(self, gsfile):
pipeline = LoadEntitiesPipeline2(gsfile)
pipeline.start(queue_name=get_queue_name("q-1"))
self.redirect(pipeline.base_path + "/status?root=" + pipeline.pipeline_id)
class LoadEntitiesPipeline2(base_handler.PipelineBase):
def run(self, gsfile):
yield mapreduce_pipeline.MapperPipeline(
'loadentities2_' + gsfile,
'backend.line_processor',
'mapreduce.input_readers.FileInputReader',
params={'files': [gsfile], 'format': 'lines'},
shards=32
)
Con questo nuovo codice, ancora gira solo su un frammento. Sto iniziando a chiedermi se mapreduce.input_readers.FileInputReader è in grado di parallelizzare l'input per riga.
Sì, non c'è niente di strano con la mia codifica newline, ed è in grado di elaborare ogni linea fine, ma non di fare lo sharding alla line granularity. In effetti, quando divido il file molto grande in file più piccoli (diciamo 5000 righe ciascuno). Ottengo può ottenere la chiamata mapreduce a shard, ma sembra che sia sharding per nomi di file e non a granularità più fine. –