2012-10-24 16 views
5

Stiamo cercando di utilizzare pesantemente MapReduce nel nostro progetto. Ora abbiamo questo problema, ci sono un sacco di 'InternalError: errore interno.' errori nel registro ...AppEngine MapReduce NDB, ripetuta InternalError: errore interno

Un esempio di esso:

"POST /mapreduce/worker_callback HTTP/1.1" 500 0 "http://appname/mapreduce/worker_callback" "AppEngine-Google; 
(+http://code.google.com/appengine)" "appname.appspot.com" ms=18856 cpu_ms=15980 
queue_name=default task_name=appengine-mrshard-15828822618486744D69C-11-195 
instance=00c61b117c47e0cba49bc5e5c7f9d328693e95ce 
W 2012-10-24 06:51:27.140 
suspended generator _put_tasklet(context.py:274) raised InternalError(internal error.) 
W 2012-10-24 06:51:27.153 
suspended generator put(context.py:703) raised InternalError(internal error.) 
E 2012-10-24 06:51:27.207 
internal error. 
Traceback (most recent call last): 
    File "/base/python27_runtime/python27_lib/versions/third_party/webapp2-2.3/webapp2.py", line 1511, in __call__ 
    rv = self.handle_exception(request, response, e) 
    File "/base/python27_runtime/python27_lib/versions/third_party/webapp2-2.3/webapp2.py", line 1505, in __call__ 
    rv = self.router.dispatch(request, response) 
    File "/base/python27_runtime/python27_lib/versions/third_party/webapp2-2.3/webapp2.py", line 1253, in default_dispatcher 
    return route.handler_adapter(request, response) 
    File "/base/python27_runtime/python27_lib/versions/third_party/webapp2-2.3/webapp2.py", line 1077, in __call__ 
    return handler.dispatch() 
    File "/base/python27_runtime/python27_lib/versions/third_party/webapp2-2.3/webapp2.py", line 547, in dispatch 
    return self.handle_exception(e, self.app.debug) 
    File "/base/python27_runtime/python27_lib/versions/third_party/webapp2-2.3/webapp2.py", line 545, in dispatch 
    return method(*args, **kwargs) 
    File "/base/data/home/apps/s~appname/1.362664407983567993/mapreduce/base_handler.py", line 65, in post 
    self.handle() 
    File "/base/data/home/apps/s~appname/1.362664407983567993/mapreduce/handlers.py", line 208, in handle 
    ctx.flush() 
    File "/base/data/home/apps/s~appname/1.362664407983567993/mapreduce/context.py", line 333, in flush 
    pool.flush() 
    File "/base/data/home/apps/s~appname/1.362664407983567993/mapreduce/context.py", line 221, in flush 
    self.__flush_ndb_puts() 
    File "/base/data/home/apps/s~appname/1.362664407983567993/mapreduce/context.py", line 239, in __flush_ndb_puts 
    ndb.put_multi(self.ndb_puts.items, config=self.__create_config()) 
    File "/base/python27_runtime/python27_lib/versions/1/google/appengine/ext/ndb/model.py", line 3650, in put_multi 
    for future in put_multi_async(entities, **ctx_options)] 
    File "/base/python27_runtime/python27_lib/versions/1/google/appengine/ext/ndb/tasklets.py", line 325, in get_result 
    self.check_success() 
    File "/base/python27_runtime/python27_lib/versions/1/google/appengine/ext/ndb/tasklets.py", line 368, in _help_tasklet_along 
    value = gen.throw(exc.__class__, exc, tb) 
    File "/base/python27_runtime/python27_lib/versions/1/google/appengine/ext/ndb/context.py", line 703, in put 
    key = yield self._put_batcher.add(entity, options) 
    File "/base/python27_runtime/python27_lib/versions/1/google/appengine/ext/ndb/tasklets.py", line 368, in _help_tasklet_along 
    value = gen.throw(exc.__class__, exc, tb) 
    File "/base/python27_runtime/python27_lib/versions/1/google/appengine/ext/ndb/context.py", line 274, in _put_tasklet 
    keys = yield self._conn.async_put(options, datastore_entities) 
    File "/base/python27_runtime/python27_lib/versions/1/google/appengine/ext/ndb/tasklets.py", line 454, in _on_rpc_completion 
    result = rpc.get_result() 
    File "/base/python27_runtime/python27_lib/versions/1/google/appengine/datastore/datastore_rpc.py", line 834, in get_result 
    result = rpc.get_result() 
    File "/base/python27_runtime/python27_lib/versions/1/google/appengine/api/apiproxy_stub_map.py", line 604, in get_result 
    return self.__get_result_hook(self) 
    File "/base/python27_runtime/python27_lib/versions/1/google/appengine/datastore/datastore_rpc.py", line 1569, in __put_hook 
    self.check_rpc_success(rpc) 
    File "/base/python27_runtime/python27_lib/versions/1/google/appengine/datastore/datastore_rpc.py", line 1224, in check_rpc_success 
    raise _ToDatastoreError(err) 
InternalError: internal error. 

queue.yaml:

queue: 
- name: default 
    rate: 500/s 
    bucket_size: 100 
    max_concurrent_requests: 400 
    retry_parameters: 
    min_backoff_seconds: 5 
    max_backoff_seconds: 120 
    max_doublings: 2 

MapReduce Mapper params:

'shard_count': 16, 
'processing_rate': 200, 
'batch_size': 20 
we would like to increase these numbers, since we need more speed in processing, but once we try to increase it increases error rate... 

blobstore Files Conte: diversi (alcuni dei quali contengono milioni di linee)

Frontend istanza di classe: F4 flusso

elaborazione:

  1. Usiamo mapper solo per questo elaborazione particolare.
  2. Noi utente BlobstoreLineInputReader (BLOB contiene file di testo).
  3. Ogni riga rappresenta una nuova voce che dobbiamo creare se non esiste già (alcuni di essi vengono aggiornati).

Le mie domande sono:

  • Come possiamo evitare questi errori?
  • Ci sono suggerimenti/suggerimenti su come scegliere/bilanciare i parametri del mapper (shard_count, processing_rate, batch_size)?
  • Cosa succede con il lavoro, riceve riprovato (se sì, come possiamo controllarlo?) Oppure no?

BTW, abbiamo provato a giocare con alcuni dei suggerimenti forniti here (controllo batch_size) ma continuiamo a vedere questo.

+0

Hai risolto questo ?, sto vedendo molti di questi oggi innescati dai miei taskqueues. – payala

risposta

0

Questo sembra un errore di timeout: controlla i log per vedere quanto tempo è in esecuzione quel processo prima che ciò accada.

Se lo è, dovresti provare a ridurre il numero di elementi che stai chiamando put_multi() (ad esempio, ridurre le dimensioni del batch) e aggiungere un controllo del timer in modo che quando il tempo medio per chiamata put_multi() si avvicini al limite di tempo del processo, si esce e si avvia un altro.

Problemi correlati