2010-01-18 20 views
6

Ho un grande file di dati XML (> 160M) da elaborare e sembra che l'analisi SAX/expat/pulldom sia la strada da percorrere. Mi piacerebbe avere una discussione che passa attraverso i nodi e spinge i nodi per essere elaborati in una coda, e quindi altri thread di lavoro estraggono il successivo nodo disponibile dalla coda e lo elaborano.Come posso elaborare xml in modo asincrono in python?

Ho la seguente (che dovrebbe avere serrature, so - lo farà, in seguito)

import sys, time 
import xml.parsers.expat 
import threading 

q = [] 

def start_handler(name, attrs): 
    q.append(name) 

def do_expat(): 
    p = xml.parsers.expat.ParserCreate() 
    p.StartElementHandler = start_handler 
    p.buffer_text = True 
    print("opening {0}".format(sys.argv[1])) 
    with open(sys.argv[1]) as f: 
     print("file is open") 
     p.ParseFile(f) 
     print("parsing complete") 


t = threading.Thread(group=None, target=do_expat) 
t.start() 

while True: 
    print(q) 
    time.sleep(1) 

Il problema è che il corpo del blocco while viene chiamato solo una volta, e poi non posso anche Ctrl-C lo interrompe. Su file più piccoli, l'output è come previsto, ma ciò sembra indicare che il gestore viene chiamato solo quando il documento è completamente analizzato, il che sembra vanificare lo scopo di un parser SAX.

Sono sicuro che è la mia ignoranza, ma non vedo dove sto facendo l'errore.

PS: Ho anche provato a cambiare start_handler così:

def start_handler(name, attrs): 
    def app(): 
     q.append(name) 
    u = threading.Thread(group=None, target=app) 
    u.start() 

No amore, però.

risposta

7

ParseFile, come hai notato, solo "ingoia" tutto - non va bene per il incrementale analisi che vuoi fare! Quindi, basta nutrire il file al parser un po 'alla volta, facendo attenzione a cedere sotto condizione di controllo per gli altri thread, come si va - ad esempio:

while True: 
    data = f.read(BUFSIZE) 
    if not data: 
    p.Parse('', True) 
    break 
    p.Parse(data, False) 
    time.sleep(0.0) 

la chiamata time.sleep(0.0) è il modo di Python per dire "resa ad altri fili se ce ne sono pronti e in attesa "; il metodo Parse è documentato here.

Il secondo punto è, dimenticare i blocchi per questo utilizzo! - usa Queue.Queue invece, è intrinsecamente sicuro e quasi invariabilmente il modo migliore e più semplice per coordinare più thread in Python. È sufficiente creare un'istanza Queueq, q.put(name) e aver lavorato il blocco di thread su q.get() in attesa di ottenere altro lavoro da fare: è COSÌ SEMPLICE!

(Esistono diverse strategie ausiliarie che è possibile utilizzare per coordinare la terminazione dei thread di lavoro quando non c'è più lavoro da fare, ma i requisiti speciali più semplici e assenti, è quello di renderli solo thread daemon, quindi saranno tutti terminare quando il thread principale funziona - vedere the docs).

+0

Votato per i suggerimenti della coda, ma sei sicuro riguardo a ParseFile che degusta tutto in una volta sola? Richiama i gestori Python per gestire i tag mentre va, questo è l'intero scopo dell'analisi SAX ... o stai dicendo che non è sufficiente per attivare un cambio di thread in Python? –

+1

Se si desidera SAX, è possibile utilizzare xml.sax, consultare http://docs.python.org/library/xml.sax.html?highlight=sax#module-xml.sax; l'OP non sta usando SAX, ma piuttosto xml.parsers.expat, un'interfaccia di astrazione inferiore che ** non ** impone una strategia incrementale (lo supporta, ma non la impone, lasciandola fino al livello di codice Python scegliere e scegliere). –

+0

La scelta di expat è stata alquanto arbitraria, non sono riuscito a trovare una buona spiegazione della differenza tra expat e sax. Il modulo sax funziona altrettanto bene - forse anche meglio, dal momento che sembra essere asincrono come mi serviva. Ho finito con l'adottare il metodo "alimentalo un po 'alla volta" in ogni caso, dal momento che mi dà la possibilità di sterilizzare gli archi che alimento prima che il parser arrivi loro. Risposta molto utile, grazie. – decitrig

1

L'unica cosa che vedo è sbagliata è che si accede a q contemporaneamente da diversi thread - nessun blocco mentre si scrive effettivamente. Questo è un problema - e probabilmente stai creando problemi con l'interprete Python che ti blocca. :)

bloccaggio prova, in realtà non è molto difficile:

import sys, time 
import xml.parsers.expat 
import threading 

q = [] 
q_lock = threading.Lock() <--- 

def start_handler(name, attrs): 
    q_lock.acquire() <--- 
    q.append(name) 
    q_lock.release() <--- 

def do_expat(): 
    p = xml.parsers.expat.ParserCreate() 
    p.StartElementHandler = start_handler 
    p.buffer_text = True 
    print("opening {0}".format(sys.argv[1])) 
    with open(sys.argv[1]) as f: 
     print("file is open") 
     p.ParseFile(f) 
     print("parsing complete") 


t = threading.Thread(group=None, target=do_expat) 
t.start() 

while True: 
    q_lock.acquire() <--- 
    print(q) 
    q_lock.release() <--- 
    time.sleep(1) 

Vedete, è stato molto semplice, abbiamo appena creato una variabile di blocco per proteggere il nostro oggetto, e acquisire quella bloccare ogni volta prima di usare l'oggetto e il rilascio ogni volta che abbiamo finito il nostro compito sull'oggetto. In questo modo abbiamo garantito che q.append(name) non si sovrapporrà mai a print(q).


(con le nuove versioni di Python c'è anche un "con ...." la sintassi che consente di non rilasciare serrature o chiudere i file o altri ripuliture si dimentica frequentemente.)

7

Sono non troppo sicuro di questo problema. Sto indovinando che la chiamata a ParseFile sta bloccando e solo il thread di analisi viene eseguito a causa della GIL. Un modo per aggirare questo sarebbe utilizzare multiprocessing invece. È progettato per funzionare con le code, comunque.

si effettua una Process e si può passare un Queue:

import sys, time 
import xml.parsers.expat 
import multiprocessing 
import Queue 

def do_expat(q): 
    p = xml.parsers.expat.ParserCreate() 

    def start_handler(name, attrs): 
     q.put(name) 

    p.StartElementHandler = start_handler 
    p.buffer_text = True 
    print("opening {0}".format(sys.argv[1])) 
    with open(sys.argv[1]) as f: 
     print("file is open") 
     p.ParseFile(f) 
     print("parsing complete") 

if __name__ == '__main__': 
    q = multiprocessing.Queue() 
    process = multiprocessing.Process(target=do_expat, args=(q,)) 
    process.start() 

    elements = [] 
    while True: 
     while True: 
      try: 
       elements.append(q.get_nowait()) 
      except Queue.Empty: 
       break 

     print elements 
     time.sleep(1) 

ho incluso un elenco di elementi, proprio per replicare lo script originale. La tua soluzione finale probabilmente utilizzerà get_nowait e un Pool o qualcosa di simile.

+1

Sì, questo è un buon percorso per scendere - come hai detto che avresti voluto usare le code comunque. –

+0

Ho provato quel codice; evita il lockup, ma ParseFile continua a non produrre nulla finché non viene letto l'intero input. – decitrig

0

Non so molto sull'implementazione, ma se l'analisi è un codice C che viene eseguito fino al completamento, altri thread Python non verranno eseguiti. Se il parser richiama il codice Python, GIL potrebbe essere rilasciato per l'esecuzione di altri thread, ma non ne sono sicuro. Potresti voler controllare quei dettagli.

Problemi correlati