2011-09-26 10 views
18

Simile a un altro post che ho fatto, questo risponde a questo post e crea una nuova domanda.Crea connessione DB e mantiene su più processi (multiprocessing)

Riepilogo: Ho bisogno di aggiornare ogni record in un database spaziale in cui ho un set di dati di punti che si sovrappongono serie di dati di poligoni. Per ogni caratteristica del punto voglio assegnare un tasto per metterlo in relazione con la funzione poligono all'interno della quale si trova. Quindi se il mio punto 'New York City' si trova all'interno del poligono USA e per il poligono USA 'GID = 1' assegnerò 'gid_fkey = 1' per il mio punto New York City.

Okay, questo è stato ottenuto utilizzando il multiprocessing. Ho notato un aumento della velocità del 150% usando questo, quindi funziona. Ma penso che ci sia un sovraccarico non necessario in quanto è necessaria una connessione DB per ogni record.

Quindi, ecco il codice:

import multiprocessing, time, psycopg2 

class Consumer(multiprocessing.Process): 

    def __init__(self, task_queue, result_queue): 
     multiprocessing.Process.__init__(self) 
     self.task_queue = task_queue 
     self.result_queue = result_queue 

    def run(self): 
     proc_name = self.name 
     while True: 
      next_task = self.task_queue.get() 
      if next_task is None: 
       print 'Tasks Complete' 
       self.task_queue.task_done() 
       break    
      answer = next_task() 
      self.task_queue.task_done() 
      self.result_queue.put(answer) 
     return 


class Task(object): 
    def __init__(self, a): 
     self.a = a 

    def __call__(self):   
     pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'") 
     pyConn.set_isolation_level(0) 
     pyCursor1 = pyConn.cursor() 

     procQuery = 'UPDATE city SET gid_fkey = gid FROM country WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a) 

     pyCursor1.execute(procQuery) 
     print 'What is self?' 
     print self.a 

     return self.a 

    def __str__(self): 
     return 'ARC' 
    def run(self): 
     print 'IN' 

if __name__ == '__main__': 
    tasks = multiprocessing.JoinableQueue() 
    results = multiprocessing.Queue() 

    num_consumers = multiprocessing.cpu_count() * 2 
    consumers = [Consumer(tasks, results) for i in xrange(num_consumers)] 
    for w in consumers: 
     w.start() 

    pyConnX = psycopg2.connect("dbname='geobase_1' host = 'localhost'") 
    pyConnX.set_isolation_level(0) 
    pyCursorX = pyConnX.cursor() 

    pyCursorX.execute('SELECT count(*) FROM cities WHERE gid_fkey IS NULL')  
    temp = pyCursorX.fetchall()  
    num_job = temp[0] 
    num_jobs = num_job[0] 

    pyCursorX.execute('SELECT city_id FROM city WHERE gid_fkey IS NULL')  
    cityIdListTuple = pyCursorX.fetchall()  

    cityIdListList = [] 

    for x in cityIdListTuple: 
     cityIdList.append(x[0]) 


    for i in xrange(num_jobs): 
     tasks.put(Task(cityIdList[i - 1])) 

    for i in xrange(num_consumers): 
     tasks.put(None) 

    while num_jobs: 
     result = results.get() 
     print result 
     num_jobs -= 1 

Sembra essere compreso tra 0,3 e 1,5 secondi per connessione come ho misurarla con il modulo di 'tempo'.

C'è un modo per creare una connessione DB per processo e quindi basta utilizzare le informazioni city_id come variabile che posso alimentare in una query per il cursore in questo aperto? In questo modo, dico quattro processi ciascuno con una connessione DB e poi mi rilasciano city_id in qualche modo per elaborare.

risposta

31

cercare di isolare la creazione della connessione nel costruttore dei consumatori, poi dare alla Task eseguito:

import multiprocessing, time, psycopg2 

class Consumer(multiprocessing.Process): 

    def __init__(self, task_queue, result_queue): 
     multiprocessing.Process.__init__(self) 
     self.task_queue = task_queue 
     self.result_queue = result_queue 
     self.pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'") 
     self.pyConn.set_isolation_level(0) 


    def run(self): 
     proc_name = self.name 
     while True: 
      next_task = self.task_queue.get() 
      if next_task is None: 
       print 'Tasks Complete' 
       self.task_queue.task_done() 
       break    
      answer = next_task(connection=self.pyConn) 
      self.task_queue.task_done() 
      self.result_queue.put(answer) 
     return 


class Task(object): 
    def __init__(self, a): 
     self.a = a 

    def __call__(self, connection=None):   
     pyConn = connection 
     pyCursor1 = pyConn.cursor() 

     procQuery = 'UPDATE city SET gid_fkey = gid FROM country WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a) 

     pyCursor1.execute(procQuery) 
     print 'What is self?' 
     print self.a 

     return self.a 

    def __str__(self): 
     return 'ARC' 
    def run(self): 
     print 'IN' 
+1

Mate che ha funzionato a meraviglia. Non avere i complimenti per darti il ​​segno di approvazione, ma quel codice era assolutamente magico. Liberarsi delle connessioni DB costanti ha facilmente aumentato la velocità di un altro 50%. Probabilmente più vicino al 100% in alcuni casi. Grazie ancora. –

+0

@EnE_: Sono contento che ti abbia aiutato :). Dovresti accettare la risposta, hai il diritto di farlo perché sei il proprietario della domanda. –

+0

Ok, devo ammettere che ho pensato che avrei dovuto premere la freccia su piuttosto che il segno di spunta. "Tick of approval" è stato, purtroppo, un autodenuncia della frase = D –

Problemi correlati