Sto lavorando alla funzione appengine-mapreduce e ho modificato la demo per adattarla al mio scopo. Fondamentalmente ho un milione di righe nel seguente formato: userid, time1, time2. Il mio scopo è trovare la differenza tra time1 e time2 per ogni userid.Limite di memoria colpito con appengine-mapreduce
Tuttavia, come ho eseguito questo su Google App Engine, ho incontrato questo messaggio di errore nella sezione logs:
Limite della memoria superato morbido privato con 180.56 MB dopo la manutenzione 130 richieste totali Mentre la manipolazione questa richiesta, il il processo che ha gestito questa richiesta è risultato essere utilizzando troppa memoria ed è stato terminato. È probabile che ciò causi l'utilizzo di un nuovo processo per la richiesta successiva all'applicazione. Se vedi questo messaggio frequentemente, potresti avere una perdita di memoria nella tua applicazione.
def time_count_map(data):
"""Time count map function."""
(entry, text_fn) = data
text = text_fn()
try:
q = text.split('\n')
for m in q:
reader = csv.reader([m.replace('\0', '')], skipinitialspace=True)
for s in reader:
"""Calculate time elapsed"""
sdw = s[1]
start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p")
edw = s[2]
end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p")
time_difference = time.mktime(end_date) - time.mktime(start_date)
yield (s[0], time_difference)
except IndexError, e:
logging.debug(e)
def time_count_reduce(key, values):
"""Time count reduce function."""
time = 0.0
for subtime in values:
time += float(subtime)
realtime = int(time)
yield "%s: %d\n" % (key, realtime)
Qualcuno può suggerire in quale altro modo posso ottimizzare il mio codice migliore? Grazie!!
Modificato:
Ecco il gestore gasdotto:
class TimeCountPipeline(base_handler.PipelineBase):
"""A pipeline to run Time count demo.
Args:
blobkey: blobkey to process as string. Should be a zip archive with
text files inside.
"""
def run(self, filekey, blobkey):
logging.debug("filename is %s" % filekey)
output = yield mapreduce_pipeline.MapreducePipeline(
"time_count",
"main.time_count_map",
"main.time_count_reduce",
"mapreduce.input_readers.BlobstoreZipInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
mapper_params={
"blob_key": blobkey,
},
reducer_params={
"mime_type": "text/plain",
},
shards=32)
yield StoreOutput("TimeCount", filekey, output)
Mapreduce.yaml:
mapreduce:
- name: Make messages lowercase
params:
- name: done_callback
value: /done
mapper:
handler: main.lower_case_posts
input_reader: mapreduce.input_readers.DatastoreInputReader
params:
- name: entity_kind
default: main.Post
- name: processing_rate
default: 100
- name: shard_count
default: 4
- name: Make messages upper case
params:
- name: done_callback
value: /done
mapper:
handler: main.upper_case_posts
input_reader: mapreduce.input_readers.DatastoreInputReader
params:
- name: entity_kind
default: main.Post
- name: processing_rate
default: 100
- name: shard_count
default: 4
Il resto dei file sono esattamente la stessa della demo.
Ho caricato una copia dei miei codici su Dropbox: http://dl.dropbox.com/u/4288806/demo%20compressed%20fail%20memory.zip
Puoi mostrare la configurazione di mapreduce? Per qualche motivo sembra che tu stia passando l'intero file al mappatore, invece di mapparlo riga per riga. –
Ciao Daniel, la mia domanda è stata modificata. Grazie, lo apprezzo davvero! – autumngard