2012-09-11 16 views
5

Chciałbym wiedzieć, czy z Camelem można zrobić dławienie w oparciu o treść wymiany.Dławienie w oparciu o zawartość

Sytuacja wygląda następująco: muszę połączyć się z serwisem za pomocą mydła. Wśród parametrów wysyłanych do tego serwisu znajduje się identyfikatorklienta. Problem polega na tym, że usługa sieci Web odsyła błąd, jeśli istnieje więcej niż 1 żądanie na minutę dla danego identyfikatora klienta.

Zastanawiam się, czy byłoby możliwe zaimplementowanie funkcji dławienia na identyfikator Customer with the Camel. Tak więc ograniczanie nie powinno być zaimplementowane dla wszystkich komunikatów, ale tylko dla wiadomości z tym samym identyfikatorem klienta.

Poinformuj mnie, w jaki sposób mogę to wdrożyć lub jeśli muszę wyjaśnić moje pytanie.

+2

jego faktycznie dobry pomysł, aby dodać wsparcie dla grup w istniejący Erot Throttler. To powiedziawszy, chociaż wiadomości będą przechowywane w pamięci podczas oczekiwania na zwolnienie. Pozwól mi zalogować JIRA dla tego rozszerzenia. –

+0

Dla przypomnienia tutaj jest bilet: https://issues.apache.org/jira/browse/CAMEL-5599 –

+0

@ClausIbsen - minęło trochę czasu, ale rzuciłbym wsparcie za tą funkcję! – Denise

Odpowiedz

1

ActiveMQ Message Groups jest przeznaczony do obsługi tego przypadku. Jeśli więc możesz wprowadzić przeskok kolejki JMS w swojej trasie, po prostu ustaw nagłówek JMSXGroupId na identyfikatorklientad. Następnie na innej trasie możesz spożywać z tej kolejki i wysyłać do serwisu internetowego, aby uzyskać opisane zachowanie.

również zobaczyć http://camel.apache.org/parallel-processing-and-ordering.html więcej informacji ...

1

Podczas ActiveMQ wiadomość Grupy zdecydowanie zająć się równoległe przetwarzanie unikatowy identyfikator klienta, w mojej ocenie Mikołaj jest prawdą, że wprowadzenie gazu dla każdej unikatowej grupy reprezentuje ZAIMPLEMENTOWANE funkcji dla Camel/ActiveMQ.

Grupy wiadomości same nie będą spełniać warunków umowy SLA. O ile każda grupa wiadomości (skorelowana z identyfikatorem klienta) będzie przetwarzana w kolejności z jednym wątkiem w grupie, o ile otrzymanie odpowiedzi zajmie mniej niż minutę, wymóg jednego żądania na minutę na jednego klienta nie będzie egzekwowany .

Powiedziałbym, że byłbym bardzo zainteresowany, aby wiedzieć, czy możliwe byłoby połączenie grup wiadomości i strategii przepustnicy w sposób, który symulowałby żądanie funkcji w JIRA. Moje dotychczasowe próby zawiodły. Myślałam coś wzdłuż tych linii:

<route> 
    <from uri="activemq:pending?maxConcurrentConsumers=10"/> 
    <throttle timePeriodMillis="60000"> 
    <constant>1</constant> 
    <to uri="mock:endpoint"/> 
    </throttle> 
</route> 

Jednak przepustnica wydaje się być stosowany do całego zestawu wniosków ruchomych do punktu końcowego, a nie do każdego konsumenta. Muszę przyznać, że trochę mnie to zaskoczyło. Spodziewałem się, że przepustnica będzie miała zastosowanie do każdego konsumenta indywidualnie, co byłoby zgodne z umową SLA w pierwotnym pytaniu, pod warunkiem, że wiadomości zawierają identyfikator klienta w nagłówku JMSXGroupId.

0

Natknąłem się na podobny problem iw końcu wpadłem na rozwiązanie opisane tutaj.

Moje założenia są następujące:

  • Zamówienie komunikatów nie jest ważne (choć może on być rozwiązany poprzez ponowne sekwencera)
  • Całkowita objętość wiadomości na identyfikator klienta nie jest wielki więc środowisko wykonawcze nie jest nasycony .

Podejście rozwiązanie:

  • Run agregator przez 1 minutę podczas korzystania customerId zebrać wiadomości z tym samym identyfikatorem klienta na liście
  • Użyj Splitter podzielić listę na poszczególne wiadomości
  • Wyślij pierwszą wiadomość z rozdzielacza do rzeczywistej usługi
  • Ponownie przenieś resztę listy z powrotem do agregatora.
wersja

Java DSL jest nieco łatwiejsze do zrozumienia:

final AggregationStrategy aggregationStrategy = AggregationStrategies.flexible(Object.class) 
     .accumulateInCollection(ArrayList.class); 

from("direct:start") 
    .log("Receiving ${body}") 
    .aggregate(header("customerID"), aggregationStrategy).completionTimeout(60000) 
     .log("Aggregate: releasing ${body}") 
     .split(body()) 
     .choice() 
      .when(header(Exchange.SPLIT_INDEX).isEqualTo(0)) 
       .log("*** Processing: ${body}") 
       .to("mock:result") 
      .otherwise() 
       .to("seda:delay") 
     .endChoice(); 

from("seda:delay") 
    .delay(0) 
    .to("direct:start"); 

Wiosna wersja XML wygląda następująco:

<!-- this is our aggregation strategy defined as a spring bean --> 
<!-- see http://stackoverflow.com/questions/27404726/how-does-one-set-the-pick-expression-for-apache-camels-flexibleaggregationstr --> 
<bean id="_flexible0" class="org.apache.camel.util.toolbox.FlexibleAggregationStrategy"/> 
<bean id="_flexible2" factory-bean="_flexible0" factory-method="accumulateInCollection"> 
    <constructor-arg value="java.util.ArrayList" /> 
</bean> 

<camelContext xmlns="http://camel.apache.org/schema/spring"> 
     <route> 
      <from uri="direct:start"/> 
      <log message="Receiving ${body}"/> 
      <aggregate strategyRef="_flexible2" completionTimeout="60000" > 
       <correlationExpression> 
        <xpath>/order/@customerID</xpath> 
       </correlationExpression> 
       <log message="Aggregate: releasing ${body}"/> 
       <split> 
        <simple>${body}</simple> 
        <choice> 
         <when> 
          <simple>${header.CamelSplitIndex} == 0</simple> 
          <log message="*** Processing: ${body}"/> 
          <to uri="mock:result"/> 
         </when> 
         <otherwise> 
          <log message="--- Delaying: ${body}"/> 
          <to uri="seda:delay" /> 
         </otherwise> 
        </choice> 
       </split> 
      </aggregate> 
     </route> 

     <route> 
      <from uri="seda:delay"/> 
      <to uri="direct:start"/> 
     </route> 
</camelContext> 
Powiązane problemy