2011-07-20 12 views
8

Piszę skrypt, aby zrobić kopię niektórych danych między dwoma komputerami w tej samej sieci, używając psycopg2. Jestem pewien, zastępując starą, brzydką bash, które wykonuje kopię zrura postgresowa KOPIA w pythonie z psycopg2

psql -c -h remote.host "COPY table TO STDOUT" | psql -c "COPY table FROM STDIN" 

Wydaje się zarówno najprostszym i most efficient sposób zrobić kopię. Jest to łatwe do powtórzenia w Pythonie z StringIO lub Temp-pliku, tak jak poniżej:

buf = StringIO() 

from_curs = from_conn.cursor() 
to_curs  = to_conn.cursor() 

from_curs.copy_expert("COPY table TO STDOUT", buf) 
buf.seek(0, os.SEEK_SET) 
to_curs.copy_expert("COPY table FROM STDIN", buf) 

... ale to wiąże się zapisywanie wszystkich danych na dysku/pamięci.

Czy ktoś wymyślił sposób naśladowania zachowania rury uniksowej w takiej kopii? Nie mogę znaleźć obiektu o uniksowych rurach, który nie zawiera POpen - może najlepszym rozwiązaniem jest po prostu użycie POpen i podprocesu.

+0

Ciekawy jest poniżej rozwiązanie zadziałało? – agf

Odpowiedz

0

Można użyć deque że już podklasy wspieranie czytania i pisania:

from collections import deque 
from Exceptions import IndexError 

class DequeBuffer(deque): 
    def write(self, data): 
     self.append(data) 
    def read(self): 
     try: 
      return self.popleft() 
     except IndexError: 
      return '' 

buf = DequeBuffer() 

Jeśli czytelnik jest znacznie szybsze niż pisarza, a tabela jest duża, deque będzie wciąż duże, ale będzie mniejszy niż przechowywanie całej rzeczy.

Ponadto, nie wiem na pewno return '' kiedy deque jest pusty jest bezpieczny, zamiast próbować, dopóki nie jest pusty, ale domyślam się, że tak. Daj znać czy działa.

Pamiętaj, aby del buf, gdy jesteś pewien, że kopia jest wykonana, zwłaszcza jeśli skrypt nie jest tylko wyjściem w tym momencie.

12

Będziesz musiał umieścić jedno ze swoich połączeń w osobnym wątku. Właśnie uświadomiłem sobie, można użyć os.pipe(), co sprawia, resztę dość prosta:

#!/usr/bin/python 
import psycopg2 
import os 
import threading 

fromdb = psycopg2.connect("dbname=from_db") 
todb = psycopg2.connect("dbname=to_db") 

r_fd, w_fd = os.pipe() 

def copy_from(): 
    cur = todb.cursor() 
    cur.copy_from(os.fdopen(r_fd), 'table') 
    cur.close() 
    todb.commit() 

to_thread = threading.Thread(target=copy_from) 
to_thread.start() 

cur = fromdb.cursor() 
write_f = os.fdopen(w_fd, 'w') 
cur.copy_to(write_f, 'table') 
write_f.close() # or deadlock... 

to_thread.join() 
+0

To świetne rozwiązanie! Ciekawi mnie jednak, dlaczego konieczne było wprowadzenie obiektu Thread? – Demitri

+3

@Demitri, 'copy_from()' i 'copy_to()' blokują polecenia; nie wracają, dopóki operacja się nie zakończy. Gdybyśmy zrobili pierwsze wywołanie w głównym wątku, po prostu czekałoby na dane w rurze i nigdy nie odzyskalibyśmy kontroli, aby wykonać drugie połączenie. –

+0

Ah, rozumiem. Nadal będzie blokować nowy wątek, ale pozwoli głównemu wątkowi na podawanie rury w czasie wolnym. Dzięki. – Demitri