2012-05-10 33 views
6

Chcę przechowywać tablice Numpy w bazie danych PostgreSQL w formie binarnej (bytea). Mogę sprawić, żeby to działało dobrze w teście nr 1 (zobacz poniżej), ale nie chcę manipulować tablicami danych przed wstawianiem i po selekcji za każdym razem - chcę używać adapterów i konwerterów psycopg2.Używanie konwertera psycopg2 do pobierania danych z bazy danych PostgreSQL

Oto co mam w tej chwili:

import numpy as np 
import psycopg2, psycopg2.extras 


def my_adapter(spectrum): 
    return psycopg2.Binary(spectrum) 

def my_converter(my_buffer, cursor): 
    return np.frombuffer(my_buffer) 


class MyBinaryTest(): 

    # Connection info 
    user = 'postgres' 
    password = 'XXXXXXXXXX' 
    host = 'localhost' 
    database = 'test_binary' 


    def __init__(self): 
     pass 

    def set_up(self): 

     # Set up 
     connection = psycopg2.connect(host=self.host, user=self.user, password=self.password) 

     connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) 

     cursor = connection.cursor() 
     try: # Clear out any old test database 
      cursor.execute('drop database %s' % (self.database,)) 
     except: 
      pass 

     cursor.execute('create database %s' % (self.database,)) 
     cursor.close() 
     connection.close() 

     # Direct connectly to the database and set up our table    
     self.connection = psycopg2.connect(host=self.host, user=self.user, password=self.password, database=self.database) 
     self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.DictCursor) 

     self.cursor.execute('''CREATE TABLE spectrum (
      "sid" integer not null primary key, 
      "data" bytea not null 
      ); 

      CREATE SEQUENCE spectrum_id; 
      ALTER TABLE spectrum 
       ALTER COLUMN sid 
        SET DEFAULT NEXTVAL('spectrum_id'); 
      ''') 
     self.connection.commit() 



    def perform_test_one(self): 

     # Lets do a test 

     shape = (2, 100) 
     data = np.random.random(shape) 

     # Binary up the data 
     send_data = psycopg2.Binary(data) 

     self.cursor.execute('insert into spectrum (data) values (%s) returning sid;', [send_data]) 
     self.connection.commit() 

     # Retrieve the data we just inserted 
     query = self.cursor.execute('select * from spectrum') 
     result = self.cursor.fetchall() 

     print "Type of data retrieved:", type(result[0]['data']) 

     # Convert it back to a numpy array of the same shape 
     retrieved_data = np.frombuffer(result[0]['data']).reshape(*shape) 

     # Ensure there was no problem 
     assert np.all(retrieved_data == data) 
     print "Everything went swimmingly in test one!" 

     return True 

    def perform_test_two(self): 

     if not self.use_adapters: return False 

     # Lets do a test 

     shape = (2, 100) 
     data = np.random.random(shape) 

     # No changes made to the data, as the adapter should take care of it (and it does) 

     self.cursor.execute('insert into spectrum (data) values (%s) returning sid;', [data]) 
     self.connection.commit() 

     # Retrieve the data we just inserted 
     query = self.cursor.execute('select * from spectrum') 
     result = self.cursor.fetchall() 

     # No need to change the type of data, as the converter should take care of it 
     # (But, we never make it here) 

     retrieved_data = result[0]['data'] 

     # Ensure there was no problem 
     assert np.all(retrieved_data == data.flatten()) 
     print "Everything went swimmingly in test two!" 

     return True 


    def setup_adapters_and_converters(self): 

     # Set up test adapters 
     psycopg2.extensions.register_adapter(np.ndarray, my_adapter) 

     # Register our converter 
     self.cursor.execute("select null::bytea;") 
     my_oid = self.cursor.description[0][1] 

     obj = psycopg2.extensions.new_type((my_oid,), "numpy_array", my_converter) 
     psycopg2.extensions.register_type(obj, self.connection) 

     self.connection.commit() 

     self.use_adapters = True 


    def tear_down(self): 

     # Tear down 

     self.cursor.close() 
     self.connection.close() 

     connection = psycopg2.connect(host=self.host, user=self.user, password=self.password) 

     connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) 

     cursor = connection.cursor() 
     cursor.execute('drop database %s' % (self.database,)) 
     cursor.close() 
     connection.close() 


test = MyBinaryTest() 
test.set_up() 
test.perform_test_one() 
test.setup_adapters_and_converters() 
test.perform_test_two() 
test.tear_down() 

Teraz test # 1 działa dobrze. Kiedy weźmie się kod użyty w teście 1 i skonfiguruję adapter i konwerter psycopg2, to nie działa (test 2). Dzieje się tak dlatego, że dane podawane do konwertera nie są już w rzeczywistości buforami; jest to ciąg znaków reprezentujących bajtów w PosgreSQL. Wyjście jest w następujący sposób:

In [1]: run -i test_binary.py 
Type of data retrieved: type 'buffer'> 
Everything went swimmingly in test one! 
ERROR: An unexpected error occurred while tokenizing input 
The following traceback may be corrupted or invalid 
The error message is: ('EOF in multi-line statement', (273, 0)) 

--------------------------------------------------------------------------- 
ValueError        Traceback (most recent call last) 

/Users/andycasey/thesis/scope/scope/test_binary.py in <module>() 
    155 test.perform_test_one() 
    156 test.setup_adapters_and_converters() 
--> 157 test.perform_test_two() 
    158 test.tear_down() 
    159 

/Users/andycasey/thesis/scope/scope/test_binary.py in perform_test_two(self) 
    101   # Retrieve the data we just inserted 

    102   query = self.cursor.execute('select * from spectrum') 
--> 103   result = self.cursor.fetchall() 
    104 
    105   # No need to change the type of data, as the converter should take care of it 


/Library/Python/2.6/site-packages/psycopg2/extras.pyc in fetchall(self) 
    81  def fetchall(self): 
    82   if self._prefetch: 
---> 83    res = _cursor.fetchall(self) 
    84   if self._query_executed: 
    85    self._build_index() 

/Users/andycasey/thesis/scope/scope/test_binary.py in my_converter(my_buffer, cursor) 
     7 
     8 def my_converter(my_buffer, cursor): 
----> 9  return np.frombuffer(my_buffer) 
    10 
    11 

ValueError: buffer size must be a multiple of element size 
WARNING: Failure executing file: <test_binary.py> 

In [2]: %debug 
> /Users/andycasey/thesis/scope/scope/test_binary.py(9)my_converter() 
     8 def my_converter(my_buffer, cursor): 
----> 9  return np.frombuffer(my_buffer) 
    10 

ipdb> my_buffer 
'\\x40e67378b9b8ae3f78b15ebecf20ef3f4092f00289dc803f20a843f40b9ddd3f64b6ec99bf62e83f8cea6eb60758d43f2ba47d8e6d5be73f4e88f267bbb2d83ffacc8aad2220d43fc6006b9c7eb7d33ff440cccc638de33f70e0b4b906a1e13fe0eca2af2f87c83f98d31f41e081ee3f1e6f5b8a52fdea3f80fcbd0ec3a0a93f95316c9e462eed3f83fe6d8d2463ea3fb44849fa8404d33f701be5924049df3f6ef3ca0c50f6d63f0c7b7d800cfdda3fc000e89b890c983fb32cf3e4ba1dea3f87f17f7efc06e33f2e194b361190ed3f60e955f0456d933ff24dd5aabc7eeb3f7802405af74ddc3f9ce9c3852db8e03fa0c936267c19d33f3406c35637f9ec3f288d23502e70ee3f08fe67e7ed8ec53f00f5cde29763dc3f26bcb4d362c4e23fa9e01fac6cd8e33fbec912f5ff7ae13f7fbd61e2e585ed3fa0070671e970e83f68ef1f6e0b90da3fce9ce834bfa6d43fa02b825d144e903f42912641e5aedd3f645a299de883db3fd8b5126bb8f6c23f3c5d4ae40ecccd3f5ae503835d00e13fcc784bdb7ea9c43f880ebfb30719be3f1dffcb042f58e23f44cc727ab3dfc53f1bbe477eb861e43f3c4f55f6aea5e53fdc80f6fa91d6e33f12b580ef03acd03f1cb78f8dccaac13f9ebdbd206453d43f32ffc626fe4ddc3f625ff4e2b317d33f44822e2f0d52ca3f38fea7c36ba6cb3ff0290b4707cedc3fd456190f786bcd3f7ed46219b47eda3f66fbdef755c3df3f40ccd47f88978c3f382897872cf5b73f5d24a66af5d7e13f2dd179d56ea3ee3fc4bb5b0962bcd63f20024c1c55ddb63f68a02e5f73fbd13f21eeb68b333de63f1a19dfe1b713e53f7556fedbb698e53f44eb6e9228accf3fe61a509c1d4ae43fe0fb0624828fa83f1822e55e76cdd23f801708ab685dd93f06076be2e92bed3f5ac2ff90247fed3fd42902b6b974d13f9df97b70385ce83fdabc4af1e81fe83f250611249338e73fc0251f9c9739e93f5821b6024621d63f7a7e1fc15605e73fab085fa8bb67e83fb4eb1d087ef5dd3fd1b450d406cbe13f0078ed1c422d3e3f44ed12d19085e83f117d628438daea3f15c776903519e23f747f248fa2e0c83ffcd052e9c4edc93f177a255a0a91e93fbe3b9b894d8edf3fea9fb6dd8be4e23fdc879e88e094e83f18bd28327ae3c03fc1bfd06d0379ec3fe8d7ee7e066ee03f750c4e0f4802e33fca3e4d0e34d3da3fe0578becde30c43f6044d9ad900ed23f08a2562899a3d13f5a83cf6694f3e33f001c61debd5f513fa009953fde2c9a3f29d53b02ca65e53fda066b4421a8ea3f58f074484a08cc3fe239b4b7eb57e03f1f904fe586bde43f9ce6edd599d1d13f43878f622d7ee23fd3ebab4e7904e93f7c3437ad0e16d23fac5e5e9e08a9c83f2b7b2d56db34e73f74f8cd68effeed3f4c279a9d4210c53ffafad9b31886d33f4c3eb4acc9b0dc3f6ed2f82f486edc3fc349273cbe1fec3fe2f70e89b061d83facaa25cb8fdbcd3fb0659c127fb7e83f00a224076b6da43f9ab1eb331dfade3fc86e03757e3bec3f3d00c8545ccce93f90fac6a4cc21b93f08f57560a68bc63fd8cccbabcd13b03fc679c7f9ece6df3f4a8c78aa1a1aed3ffecac18174dbe43fdfe102cffb48e93f0078f7fa27cc463fb40acdaea46ee63f54f754df4daadf3f2a9e063d0ab3da3f82a21b50d3c6d33f1182e48aafb5ed3fb67f3de3b109d63f494258c18422e13f8a5542fc1491e63f43247cbeabece13feb9355572f68eb3f3cf415eee8f1d53f887df6aab75bb43f0042cd907780523ff5e724cad881e03fdb9de04e99ffe43fd6594feb9b75ec3f6d4e6fcf7690e13fabe634f015dee13f584563d26021c93f6f1916ee57c8e13fd8906bad6fa7cd3ff8fad5b03b02eb3f1b3b87c15f16e53f4014ec100f79c73f1aee1302d960d83f45be6b695ed9e13ffc86d1d311dbdb3f089e89e6389fb93f24d742e400cbd63fa048c53d8fbf9c3f6eb1db094d81ed3f8bbf0cba79fde63f70e8f3d63c43c33ff1c5e6fed947e43f64f3a21f062ee03f0d12c4282794e03fa0a3be998572ba3f16510b776d7aeb3fb8c7ca308d2acd3f6f37eb1eb330ef3f1ba1bdb6577fe73f78d805294a05b43f0ed0bea2f180db3f5a4cce890b57ea3f2472556ba6f1e43f1a79fcc20701e53fe2ae8a1ea5f7d73fe0bd1efc12caec3ff94b1e02a75bed3f78e098184e3fea3f46ff0b2344dedb3f1cdc0f7b72efdb3f6ceb0b772b37e43f47e49b2a7088ea3f' 

Czy ktoś wie jak mogę (a) de-serializacji reprezentację ciąg wraca do mnie w my_converter więc zwróciła tablicy numpy za każdym razem, lub (b) PostgreSQL siły/psycopg2, aby wysłać reprezentację bufora do konwertera (którego mogę użyć) zamiast reprezentacji ciągów znaków?

Dzięki!

Jestem na OS X 10.6.8 z Pythona 2.6.1 (r261: 67.515), PostgreSQL 9.0.3 i psycopg2 2,4 (dt dec PQ3 EXT)

Odpowiedz

8

Format widać w debugger jest łatwe parsować: jest to binarny format PostgreSQL (http://www.postgresql.org/docs/9.1/static/datatype-binary.html). psycopg może analizować ten format i zwracać bufor zawierający dane; możesz użyć tego bufora, aby uzyskać tablicę. Zamiast pisać typograficzny od podstaw, napisz jeden wywołujący oryginalny func i postprocessuj jego wynik. Przepraszamy, ale nie pamiętam teraz jego nazwy i piszę z telefonu komórkowego: możesz uzyskać dalszą pomoc z listy mailingowej.


Edytuj: pełne rozwiązanie.

Domyślna bytea typecaster (która jest obiektem, który można analizować Postgres'a binarnej reprezentacji i zwrócić Pamięci Buforowej Obiektu na niej) jest psycopg2.BINARY. Możemy użyć go do stworzenia typecaster konwersji do tablicy zamiast:

In [1]: import psycopg2 

In [2]: import numpy as np 

In [3]: a = np.eye(3) 

In [4]: a 
Out[4]: 
array([[ 1., 0., 0.], 
     [ 0., 1., 0.], 
     [ 0., 0., 1.]]) 

In [5]: cnn = psycopg2.connect('') 


# The adapter: converts from python to postgres 
# note: this only works on numpy version whose arrays 
# support the buffer protocol, 
# e.g. it works on 1.5.1 but not on 1.0.4 on my tests. 

In [12]: def adapt_array(a): 
    ....:  return psycopg2.Binary(a) 
    ....: 

In [13]: psycopg2.extensions.register_adapter(np.ndarray, adapt_array) 


# The typecaster: from postgres to python 

In [21]: def typecast_array(data, cur): 
    ....:  if data is None: return None 
    ....:  buf = psycopg2.BINARY(data, cur) 
    ....:  return np.frombuffer(buf) 
    ....: 

In [24]: ARRAY = psycopg2.extensions.new_type(psycopg2.BINARY.values, 
'ARRAY', typecast_array) 

In [25]: psycopg2.extensions.register_type(ARRAY) 


# Now it works "as expected" 

In [26]: cur = cnn.cursor() 

In [27]: cur.execute("select %s", (a,)) 

In [28]: cur.fetchone()[0] 
Out[28]: array([ 1., 0., 0., 0., 1., 0., 0., 0., 1.]) 

Jak wiecie, np.frombuffer (a) traci kształt tablicy, więc trzeba będzie aby wymyślić sposób, aby ją zachować.

+1

Dzięki @piro - I ha ve wypróbował parsowanie tego formatu hex (i formatu escape PostgreSQL) używając takich rzeczy jak 'np.frombuffer (buffer (my_buffer [2:]. decode ('hex'), 0, 1600))', ale nie udało mi się pobieranie oryginalnej tablicy. Jeśli potrafisz przywołać funkcje analizujące ten format, byłbym bardzo wdzięczny. Poprosiłem o listę mailingową i równie chętnie sprawdzają, jak ten problem został rozwiązany. Dzięki! –

+0

* bump * .. Ktoś? –

1

W przypadku numpy tablic można uniknąć strategii bufora ze wszystkimi jego wadami, takimi jak utrata kształtu i typ danych. Po numerze stackoverflow question dotyczącym przechowywania tablicy numpy w sqlite3 można z łatwością dostosować podejście do PostgreSQL.

import os 
import psycopg2 as psql 
import numpy as np 

# converts from python to postgres 
def _adapt_array(text): 
    out = io.BytesIO() 
    np.save(out, text) 
    out.seek(0) 
    return psql.Binary(out.read()) 

# converts from postgres to python 
def _typecast_array(value, cur): 
    if value is None: 
     return None 

    data = psql.BINARY(value, cur) 
    bdata = io.BytesIO(data) 
    bdata.seek(0) 
    return np.load(bdata) 

con = psql.connect('') 

psql.extensions.register_adapter(np.ndarray, _adapt_array) 
t_array = sql.extensions.new_type(sql.BINARY.values, "numpy", _typecast_array) 
psql.extensions.register_type(t_array) 

cur = con.cursor() 

Teraz można tworzyć i wypełnić tabelę (z a zdefiniowanym jak w poprzednim poście)

cur.execute("create table test (column BYTEA)") 
cur.execute("insert into test values(%s)", (a,)) 

i przywrócić obiektowi NumPy

cur.execute("select * from test") 
cur.fetchone()[0] 

Wynik:

array([[ 1., 0., 0.], 
     [ 0., 1., 0.], 
     [ 0., 0., 1.]]) 
Powiązane problemy