2012-04-09 50 views
16

cross-wysłana do http://social.msdn.microsoft.com/Forums/en-US/tpldataflow/thread/89b3f71d-3777-4fad-9c11-50d8dc81a4a9pozorna BufferBlock.Post/Receive/ReceiveAsync rasa/bug

wiem ... Nie jestem za pomocą TplDataflow do jego maksymalnego potencjału. ATM Używam po prostu BufferBlock jako bezpiecznej kolejki do przekazywania wiadomości, gdzie producent i konsument działają w różnym tempie. Widzę dziwne zachowanie, które sprawia, że ​​zastanawiam się, jak postępować zgodnie z .

private BufferBlock<object> messageQueue = new BufferBlock<object>(); 

public void Send(object message) 
{ 
    var accepted=messageQueue.Post(message); 
    logger.Info("Send message was called qlen = {0} accepted={1}", 
    messageQueue.Count,accepted); 
} 

public async Task<object> GetMessageAsync() 
{ 
    try 
    { 
     var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30)); 
     //despite messageQueue.Count>0 next line 
     //occasionally does not execute 
     logger.Info("message received"); 
     //....... 
    } 
    catch(TimeoutException) 
    { 
     //do something 
    } 
} 

W kodzie powyżej (która jest częścią linii 2000 rozproszonych w roztworze) Send jest nazywany okresowo co 100 ms lub więcej. Oznacza to, że pozycja jest Post ed do messageQueue około 10 razy na sekundę. To jest zweryfikowane. Jednak czasami okazuje się, że ReceiveAsync nie kończy się w czasie oczekiwania (tj. Post nie powoduje zakończenia ReceiveAsync) i TimeoutException jest podnoszone po 30 sekundach. W tym momencie messageQueue.Count jest w setkach. To nieoczekiwane. Problem ten zaobserwowano także przy niższych wskaźnikach wysyłania (1 post/sekundę) i zwykle ma miejsce, zanim 1000 elementów przeszło przez BufferBlock.

Tak więc, aby obejść ten problem, używam następujący kod, który działa, ale czasami powoduje 1s opóźnienia przy odbiorze (z powodu błędu powyżej występujące)

public async Task<object> GetMessageAsync() 
    { 
     try 
     { 
      object m; 
      var attempts = 0; 
      for (; ;) 
      { 
       try 
       { 
        m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(1)); 
       } 
       catch (TimeoutException) 
       { 
        attempts++; 
        if (attempts >= 30) throw; 
        continue; 
       } 
       break; 

      } 

      logger.Info("message received"); 
      //....... 
     } 
     catch(TimeoutException) 
     { 
      //do something 
     } 
    } 

To wygląda jak wyścigu w TDF do mnie, ale nie mogę dojść do sedna, dlaczego to nie występuje w innych miejscach, gdzie używam BufferBlock w podobny sposób. Eksperymentalna zmiana z ReceiveAsync na Receive nie pomaga. Nie sprawdziłem, ale wyobrażam sobie w izolacji, powyższy kod działa idealnie. Jest to wzorzec, który widziałem udokumentowany w "Wprowadzenie do przepływu danych TPL" tpldataflow.docx.

Co mogę zrobić, aby dotrzeć do sedna tego? Czy są jakieś dane, które mogą pomóc wnioskować, co się dzieje? Jeśli nie mogę utworzyć wiarygodnego testu, jakie dodatkowe informacje mogę zaoferować?

Pomoc!

+1

Nie widzę niczego złego w tym, co robisz lub jakie są Twoje oczekiwania. Zdecydowanie uważam, że musisz zachować to aktywne na forach MSDN więcej niż tutaj. Zwróciłeś już uwagę na @StephenToub, a on zdecydowanie jest tym facetem, któremu chcesz się przyjrzeć. –

+0

Nie. Nigdy nie dotarłem do sedna. Nie udało mi się odtworzyć problemu w małym, samodzielnym przykładzie. Ponieważ używałem tylko BufferBlock, zamiast tego przetasowałem własną implementację kolejki asynchronicznej. Nie musiałem zmieniać żadnego innego kodu ... Po prostu ponownie zaimplementowałem części interfejsu BufferBlock, którego używałem. Jest teraz przyjemnością, co sprawia, że ​​myślę, że coś jest nie w porządku, ale nie mogę tego udowodnić. Grr. – spender

+0

@spendor Bardzo interesujące, co dziwne, po znalezieniu BufferBlock zeskrobałem własną asynchroniczną kolejkę równoległą ...teraz będę musiał ponownie rozważyć. Dzięki. –

Odpowiedz

1

Stephen wydaje się, że po to rozwiązanie

var m = oczekiwać messageQueue.ReceiveAsync();

zamiast:

var m = czekają messageQueue.ReceiveAsync (TimeSpan.FromSeconds (30));

Czy możesz potwierdzić lub zaprzeczyć temu?

+0

To się nie udało. Nie miało znaczenia, które przeciążenie ReceiveAsync wybrałam, wynik był taki sam. Zobacz mój komentarz powyżej dla mojej rezolucji. – spender