2009-12-08 12 views
18

Mam dwa wątki. Jeden z nich zapisuje do PipedOutputStream, inny odczytuje z odpowiedniego PipedInputStream. Tło polega na tym, że jeden wątek pobiera pewne dane ze zdalnego serwera i multipleksuje go do kilku innych wątków za pośrednictwem potokowanych strumieni.PipedInputStream - Jak uniknąć "java.io.IOException: Pipe broken"

Problem polega na tym, że czasami (szczególnie podczas pobierania dużego (> 50MB) pliki) otrzymuję java.io.IOException pipe uszkodzony podczas próby odczytu z PipedInputStream.
Javadoc mówi, że A pipe is said to be broken if a thread that was providing data bytes to the connected piped output stream is no longer alive.
To prawda, mój wątek w piśmie naprawdę umiera po zapisaniu wszystkich swoich danych do PipedOutputStream.

Jakieś rozwiązania? Jak mogę zapobiec wysyłaniu przez PipedInputStream tego wyjątku? Chcę móc odczytać wszystkie dane, które zostały napisane do PipedOutputStream, nawet jeśli pisanie wątku zakończyło jego pracę. (Jeśli ktoś wie, jak pisać wątek przy życiu, dopóki wszystkie dane nie zostaną odczytane, to rozwiązanie jest również dopuszczalne).

Odpowiedz

17

Użyj java.util.concurrent.CountDownLatch i nie kończ pierwszego wątku, zanim drugi sygnalizuje zakończenie odczytu z potoku.

Aktualizacja: szybki i brudny kod do zilustrowania mój komentarz poniżej

final PipedInputStream pin = getInputStream(); 
    final PipedOutputStream pout = getOutputStream(); 

    final CountDownLatch latch = new CountDownLatch(1); 

    InputStream in = new InputStream() { 

     @Override 
     public int read() throws IOException { 
      return pin.read(); 
     } 

     @Override 
     public void close() throws IOException { 
      super.close(); 
      latch.countDown(); 
     } 
    }; 


    OutputStream out = new OutputStream(){ 

     @Override 
     public void write(int b) throws IOException { 
      pout.write(b); 
     } 

     @Override 
     public void close() throws IOException { 
      while(latch.getCount()!=0) { 
       try { 
        latch.await(); 
       } catch (InterruptedException e) { 
        //too bad 
       } 
      } 
      super.close(); 
     } 
    }; 

    //give the streams to your threads, they don't know a latch ever existed 
    threadOne.feed(in); 
    threadTwo.feed(out); 
+0

Niezła funkcja, zdecydowanie +1, ale musi współdzielić jedną instancję CountDownLatch między różnymi wątkami. Nie jest to zbyt dobre, ponieważ pisanie i czytanie wątków jest tworzone w różnych miejscach i chcę, żeby nie wiedzieli o sobie nawzajem. Moja architektura jest teraz taka, że ​​wiedzą tylko, że powinna pisać/czytać do/z danego strumienia. – levanovd

+0

W takim razie można rozszerzyć Piped [In | Out] putStream, aby obsłużyć manipulację CountDownLatch. – Jerome

+0

lub wpisz własny Input/OutputStream, który otacza rurę i zatrzask (zobacz przykładowy kod dodany w mojej odpowiedzi) – Jerome

0

PipedInputStream i PipedOutputStream są podzielone (w odniesieniu do gwintowania). Zakładają, że każda instancja jest związana z konkretnym wątkiem. To dziwne. Sugeruję użycie własnej (lub przynajmniej innej) implementacji.

+0

Ta odpowiedź nie daje żadnej wartości. W jaki sposób te klasy zakładają to? – matsa

4

Czy zamykasz swoje PipedOutputStream, gdy wątek, który z niego korzysta, kończy się? Musisz to zrobić, aby bajty w nim zostały przepłukane do odpowiedniego PipedInputStream.

+0

Tak, zamykam to. – levanovd

+0

Naprawdę uważam, że coś tu jest nie tak, w każdym razie nigdy nie powinieneś dostawać uszkodzonej rury, jeśli wątek pisania zakończy się normalnie. Jeśli jego dane nie mieszczą się w "PipedInputStream", powinien on blokować się do momentu, aż pojawi się miejsce. – wds

+2

Nie jest jasne, że 'close()' implikuje 'flush()'. – Raedwald

Powiązane problemy