2012-05-10 13 views
6

Desidero memorizzare gli array Numpy in un database PostgreSQL in formato binario (bytea). Posso farlo funzionare bene nel test numero 1 (vedi sotto), ma non voglio dover manipolare gli array di dati prima degli inserimenti e dopo averli selezionati ogni volta: voglio utilizzare adattatori e convertitori di psycopg2.Utilizzo di un convertitore psycopg2 per recuperare dati da PostgreSQL

Ecco quello che ho in questo momento:

import numpy as np 
import psycopg2, psycopg2.extras 


def my_adapter(spectrum): 
    return psycopg2.Binary(spectrum) 

def my_converter(my_buffer, cursor): 
    return np.frombuffer(my_buffer) 


class MyBinaryTest(): 

    # Connection info 
    user = 'postgres' 
    password = 'XXXXXXXXXX' 
    host = 'localhost' 
    database = 'test_binary' 


    def __init__(self): 
     pass 

    def set_up(self): 

     # Set up 
     connection = psycopg2.connect(host=self.host, user=self.user, password=self.password) 

     connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) 

     cursor = connection.cursor() 
     try: # Clear out any old test database 
      cursor.execute('drop database %s' % (self.database,)) 
     except: 
      pass 

     cursor.execute('create database %s' % (self.database,)) 
     cursor.close() 
     connection.close() 

     # Direct connectly to the database and set up our table    
     self.connection = psycopg2.connect(host=self.host, user=self.user, password=self.password, database=self.database) 
     self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.DictCursor) 

     self.cursor.execute('''CREATE TABLE spectrum (
      "sid" integer not null primary key, 
      "data" bytea not null 
      ); 

      CREATE SEQUENCE spectrum_id; 
      ALTER TABLE spectrum 
       ALTER COLUMN sid 
        SET DEFAULT NEXTVAL('spectrum_id'); 
      ''') 
     self.connection.commit() 



    def perform_test_one(self): 

     # Lets do a test 

     shape = (2, 100) 
     data = np.random.random(shape) 

     # Binary up the data 
     send_data = psycopg2.Binary(data) 

     self.cursor.execute('insert into spectrum (data) values (%s) returning sid;', [send_data]) 
     self.connection.commit() 

     # Retrieve the data we just inserted 
     query = self.cursor.execute('select * from spectrum') 
     result = self.cursor.fetchall() 

     print "Type of data retrieved:", type(result[0]['data']) 

     # Convert it back to a numpy array of the same shape 
     retrieved_data = np.frombuffer(result[0]['data']).reshape(*shape) 

     # Ensure there was no problem 
     assert np.all(retrieved_data == data) 
     print "Everything went swimmingly in test one!" 

     return True 

    def perform_test_two(self): 

     if not self.use_adapters: return False 

     # Lets do a test 

     shape = (2, 100) 
     data = np.random.random(shape) 

     # No changes made to the data, as the adapter should take care of it (and it does) 

     self.cursor.execute('insert into spectrum (data) values (%s) returning sid;', [data]) 
     self.connection.commit() 

     # Retrieve the data we just inserted 
     query = self.cursor.execute('select * from spectrum') 
     result = self.cursor.fetchall() 

     # No need to change the type of data, as the converter should take care of it 
     # (But, we never make it here) 

     retrieved_data = result[0]['data'] 

     # Ensure there was no problem 
     assert np.all(retrieved_data == data.flatten()) 
     print "Everything went swimmingly in test two!" 

     return True 


    def setup_adapters_and_converters(self): 

     # Set up test adapters 
     psycopg2.extensions.register_adapter(np.ndarray, my_adapter) 

     # Register our converter 
     self.cursor.execute("select null::bytea;") 
     my_oid = self.cursor.description[0][1] 

     obj = psycopg2.extensions.new_type((my_oid,), "numpy_array", my_converter) 
     psycopg2.extensions.register_type(obj, self.connection) 

     self.connection.commit() 

     self.use_adapters = True 


    def tear_down(self): 

     # Tear down 

     self.cursor.close() 
     self.connection.close() 

     connection = psycopg2.connect(host=self.host, user=self.user, password=self.password) 

     connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) 

     cursor = connection.cursor() 
     cursor.execute('drop database %s' % (self.database,)) 
     cursor.close() 
     connection.close() 


test = MyBinaryTest() 
test.set_up() 
test.perform_test_one() 
test.setup_adapters_and_converters() 
test.perform_test_two() 
test.tear_down() 

Ora, test # 1 funziona bene. Quando prendo il codice che ho usato nel test 1 e ho installato un adattatore psycopg2 e un convertitore, non funziona (test 2). Questo perché i dati che vengono inviati al convertitore non sono più un buffer; è la rappresentazione stringa di PosgreSQL di bytea. L'uscita è la seguente:

In [1]: run -i test_binary.py 
Type of data retrieved: type 'buffer'> 
Everything went swimmingly in test one! 
ERROR: An unexpected error occurred while tokenizing input 
The following traceback may be corrupted or invalid 
The error message is: ('EOF in multi-line statement', (273, 0)) 

--------------------------------------------------------------------------- 
ValueError        Traceback (most recent call last) 

/Users/andycasey/thesis/scope/scope/test_binary.py in <module>() 
    155 test.perform_test_one() 
    156 test.setup_adapters_and_converters() 
--> 157 test.perform_test_two() 
    158 test.tear_down() 
    159 

/Users/andycasey/thesis/scope/scope/test_binary.py in perform_test_two(self) 
    101   # Retrieve the data we just inserted 

    102   query = self.cursor.execute('select * from spectrum') 
--> 103   result = self.cursor.fetchall() 
    104 
    105   # No need to change the type of data, as the converter should take care of it 


/Library/Python/2.6/site-packages/psycopg2/extras.pyc in fetchall(self) 
    81  def fetchall(self): 
    82   if self._prefetch: 
---> 83    res = _cursor.fetchall(self) 
    84   if self._query_executed: 
    85    self._build_index() 

/Users/andycasey/thesis/scope/scope/test_binary.py in my_converter(my_buffer, cursor) 
     7 
     8 def my_converter(my_buffer, cursor): 
----> 9  return np.frombuffer(my_buffer) 
    10 
    11 

ValueError: buffer size must be a multiple of element size 
WARNING: Failure executing file: <test_binary.py> 

In [2]: %debug 
> /Users/andycasey/thesis/scope/scope/test_binary.py(9)my_converter() 
     8 def my_converter(my_buffer, cursor): 
----> 9  return np.frombuffer(my_buffer) 
    10 

ipdb> my_buffer 
'\\x40e67378b9b8ae3f78b15ebecf20ef3f4092f00289dc803f20a843f40b9ddd3f64b6ec99bf62e83f8cea6eb60758d43f2ba47d8e6d5be73f4e88f267bbb2d83ffacc8aad2220d43fc6006b9c7eb7d33ff440cccc638de33f70e0b4b906a1e13fe0eca2af2f87c83f98d31f41e081ee3f1e6f5b8a52fdea3f80fcbd0ec3a0a93f95316c9e462eed3f83fe6d8d2463ea3fb44849fa8404d33f701be5924049df3f6ef3ca0c50f6d63f0c7b7d800cfdda3fc000e89b890c983fb32cf3e4ba1dea3f87f17f7efc06e33f2e194b361190ed3f60e955f0456d933ff24dd5aabc7eeb3f7802405af74ddc3f9ce9c3852db8e03fa0c936267c19d33f3406c35637f9ec3f288d23502e70ee3f08fe67e7ed8ec53f00f5cde29763dc3f26bcb4d362c4e23fa9e01fac6cd8e33fbec912f5ff7ae13f7fbd61e2e585ed3fa0070671e970e83f68ef1f6e0b90da3fce9ce834bfa6d43fa02b825d144e903f42912641e5aedd3f645a299de883db3fd8b5126bb8f6c23f3c5d4ae40ecccd3f5ae503835d00e13fcc784bdb7ea9c43f880ebfb30719be3f1dffcb042f58e23f44cc727ab3dfc53f1bbe477eb861e43f3c4f55f6aea5e53fdc80f6fa91d6e33f12b580ef03acd03f1cb78f8dccaac13f9ebdbd206453d43f32ffc626fe4ddc3f625ff4e2b317d33f44822e2f0d52ca3f38fea7c36ba6cb3ff0290b4707cedc3fd456190f786bcd3f7ed46219b47eda3f66fbdef755c3df3f40ccd47f88978c3f382897872cf5b73f5d24a66af5d7e13f2dd179d56ea3ee3fc4bb5b0962bcd63f20024c1c55ddb63f68a02e5f73fbd13f21eeb68b333de63f1a19dfe1b713e53f7556fedbb698e53f44eb6e9228accf3fe61a509c1d4ae43fe0fb0624828fa83f1822e55e76cdd23f801708ab685dd93f06076be2e92bed3f5ac2ff90247fed3fd42902b6b974d13f9df97b70385ce83fdabc4af1e81fe83f250611249338e73fc0251f9c9739e93f5821b6024621d63f7a7e1fc15605e73fab085fa8bb67e83fb4eb1d087ef5dd3fd1b450d406cbe13f0078ed1c422d3e3f44ed12d19085e83f117d628438daea3f15c776903519e23f747f248fa2e0c83ffcd052e9c4edc93f177a255a0a91e93fbe3b9b894d8edf3fea9fb6dd8be4e23fdc879e88e094e83f18bd28327ae3c03fc1bfd06d0379ec3fe8d7ee7e066ee03f750c4e0f4802e33fca3e4d0e34d3da3fe0578becde30c43f6044d9ad900ed23f08a2562899a3d13f5a83cf6694f3e33f001c61debd5f513fa009953fde2c9a3f29d53b02ca65e53fda066b4421a8ea3f58f074484a08cc3fe239b4b7eb57e03f1f904fe586bde43f9ce6edd599d1d13f43878f622d7ee23fd3ebab4e7904e93f7c3437ad0e16d23fac5e5e9e08a9c83f2b7b2d56db34e73f74f8cd68effeed3f4c279a9d4210c53ffafad9b31886d33f4c3eb4acc9b0dc3f6ed2f82f486edc3fc349273cbe1fec3fe2f70e89b061d83facaa25cb8fdbcd3fb0659c127fb7e83f00a224076b6da43f9ab1eb331dfade3fc86e03757e3bec3f3d00c8545ccce93f90fac6a4cc21b93f08f57560a68bc63fd8cccbabcd13b03fc679c7f9ece6df3f4a8c78aa1a1aed3ffecac18174dbe43fdfe102cffb48e93f0078f7fa27cc463fb40acdaea46ee63f54f754df4daadf3f2a9e063d0ab3da3f82a21b50d3c6d33f1182e48aafb5ed3fb67f3de3b109d63f494258c18422e13f8a5542fc1491e63f43247cbeabece13feb9355572f68eb3f3cf415eee8f1d53f887df6aab75bb43f0042cd907780523ff5e724cad881e03fdb9de04e99ffe43fd6594feb9b75ec3f6d4e6fcf7690e13fabe634f015dee13f584563d26021c93f6f1916ee57c8e13fd8906bad6fa7cd3ff8fad5b03b02eb3f1b3b87c15f16e53f4014ec100f79c73f1aee1302d960d83f45be6b695ed9e13ffc86d1d311dbdb3f089e89e6389fb93f24d742e400cbd63fa048c53d8fbf9c3f6eb1db094d81ed3f8bbf0cba79fde63f70e8f3d63c43c33ff1c5e6fed947e43f64f3a21f062ee03f0d12c4282794e03fa0a3be998572ba3f16510b776d7aeb3fb8c7ca308d2acd3f6f37eb1eb330ef3f1ba1bdb6577fe73f78d805294a05b43f0ed0bea2f180db3f5a4cce890b57ea3f2472556ba6f1e43f1a79fcc20701e53fe2ae8a1ea5f7d73fe0bd1efc12caec3ff94b1e02a75bed3f78e098184e3fea3f46ff0b2344dedb3f1cdc0f7b72efdb3f6ceb0b772b37e43f47e49b2a7088ea3f' 

Qualcuno sa come posso (a) de-serializzare la rappresentazione di stringa tornando a me in my_converter così torno un allineamento Numpy di volta in volta, o (b) la forza PostgreSQL/psycopg2 per inviare la rappresentazione del buffer al convertitore (che posso usare) invece della rappresentazione della stringa?

Grazie!

Sono su OS X 10.6.8 con Python 2.6.1 (r261: 67515), PostgreSQL 9.0.3 e psycopg2 2.4 (dt dicembre pq3 ext)

risposta

8

Il formato che si vede nel debugger è facile da analizzare: è il formato binario esadecimale PostgreSQL (http://www.postgresql.org/docs/9.1/static/datatype-binary.html). psycopg può analizzare quel formato e restituire un buffer contenente i dati; puoi usare quel buffer per ottenere un array. Invece di scrivere un tipografo da zero, scrivine uno invocando la funzione originale e postprima il suo risultato. Scusa ma non ricordo il suo nome ora e sto scrivendo da un cellulare: potresti ricevere ulteriore aiuto dalla mailing list.


Modifica: soluzione completa.

Il bytea typecaster predefinita (che è l'oggetto che può analizzare i postgres binario rappresentazione e restituire un oggetto buffer di fuori di esso) è psycopg2.BINARY. Siamo in grado di utilizzarlo per creare un typecaster conversione in matrice invece:

In [1]: import psycopg2 

In [2]: import numpy as np 

In [3]: a = np.eye(3) 

In [4]: a 
Out[4]: 
array([[ 1., 0., 0.], 
     [ 0., 1., 0.], 
     [ 0., 0., 1.]]) 

In [5]: cnn = psycopg2.connect('') 


# The adapter: converts from python to postgres 
# note: this only works on numpy version whose arrays 
# support the buffer protocol, 
# e.g. it works on 1.5.1 but not on 1.0.4 on my tests. 

In [12]: def adapt_array(a): 
    ....:  return psycopg2.Binary(a) 
    ....: 

In [13]: psycopg2.extensions.register_adapter(np.ndarray, adapt_array) 


# The typecaster: from postgres to python 

In [21]: def typecast_array(data, cur): 
    ....:  if data is None: return None 
    ....:  buf = psycopg2.BINARY(data, cur) 
    ....:  return np.frombuffer(buf) 
    ....: 

In [24]: ARRAY = psycopg2.extensions.new_type(psycopg2.BINARY.values, 
'ARRAY', typecast_array) 

In [25]: psycopg2.extensions.register_type(ARRAY) 


# Now it works "as expected" 

In [26]: cur = cnn.cursor() 

In [27]: cur.execute("select %s", (a,)) 

In [28]: cur.fetchone()[0] 
Out[28]: array([ 1., 0., 0., 0., 1., 0., 0., 0., 1.]) 

Come sapete, np.frombuffer (a) perde la forma array, in modo da avere di capire un modo per preservarla.

+1

Grazie @piro - I ha ho provato ad analizzare questo formato esadecimale (e il formato di escape di PostgreSQL) usando cose come 'np.frombuffer (buffer (my_buffer [2:]. decode ('hex'), 0, 1600))', ma non ho avuto successo in recuperare l'array originale. Se riesci a ricordare le funzioni per analizzare questo formato, ti sarei molto grato. Ho chiesto sulla mailing list e sono ugualmente desiderosi di vedere come viene risolto questo problema. Grazie! –

+0

* urto * .. Chiunque? –

1

Per il caso degli array di numpy si può evitare la strategia del buffer con tutti i suoi svantaggi come la perdita di forma e il tipo di dati. Seguendo uno stackoverflow question sulla memorizzazione di una matrice numpy in sqlite3 si può facilmente adattare l'approccio per postgres.

import os 
import psycopg2 as psql 
import numpy as np 

# converts from python to postgres 
def _adapt_array(text): 
    out = io.BytesIO() 
    np.save(out, text) 
    out.seek(0) 
    return psql.Binary(out.read()) 

# converts from postgres to python 
def _typecast_array(value, cur): 
    if value is None: 
     return None 

    data = psql.BINARY(value, cur) 
    bdata = io.BytesIO(data) 
    bdata.seek(0) 
    return np.load(bdata) 

con = psql.connect('') 

psql.extensions.register_adapter(np.ndarray, _adapt_array) 
t_array = sql.extensions.new_type(sql.BINARY.values, "numpy", _typecast_array) 
psql.extensions.register_type(t_array) 

cur = con.cursor() 

Ora è possibile creare e compilare una tabella (con a definita come nel post precedente)

cur.execute("create table test (column BYTEA)") 
cur.execute("insert into test values(%s)", (a,)) 

e ripristinare l'oggetto NumPy

cur.execute("select * from test") 
cur.fetchone()[0] 

Risultato:

array([[ 1., 0., 0.], 
     [ 0., 1., 0.], 
     [ 0., 0., 1.]]) 
Problemi correlati