2013-06-04 14 views
10

Próbuję napisać topologii, który wykonuje następujące czynności:Grupowanie w topologii burzy prosty agregacji

  1. dziobek, który subskrybuje kanał twitter (na podstawie słów kluczowych)
  2. śrubą agregacji że agreguje pewną liczbę tweetów (np. N) w kolekcji i wysyła im śrubę drukarki
  3. Prosta śruba, która drukuje kolekcję na konsoli jednocześnie.

W rzeczywistości chcę zrobić więcej przetwarzania na kolekcji.

Testowałem to lokalnie i wygląda na to, że działa. Nie jestem jednak pewien, czy poprawnie ustawiłem zgrupowania na śrubach i czy działałoby to prawidłowo, gdy rozmieszczono je na prawdziwym klastrze burzowym. Byłbym wdzięczny, gdyby ktoś mógł pomóc w ocenie tej topologii i zasugerować wszelkie błędy, zmiany lub ulepszenia.

Dzięki.

Tak wygląda moja topologia.

builder.setSpout("spout", new TwitterFilterSpout("pittsburgh")); 
    builder.setBolt("sampleaggregate", new SampleAggregatorBolt()) 
       .shuffleGrouping("spout"); 
    builder.setBolt("printaggreator",new PrinterBolt()).shuffleGrouping("sampleaggregate"); 

Agregacja Bolt

public class SampleAggregatorBolt implements IRichBolt { 

    protected OutputCollector collector; 
    protected Tuple currentTuple; 
    protected Logger log; 
    /** 
    * Holds the messages in the bolt till you are ready to send them out 
    */ 
    protected List<Status> statusCache; 

    @Override 
    public void prepare(Map stormConf, TopologyContext context, 
         OutputCollector collector) { 
     this.collector = collector; 

     log = Logger.getLogger(getClass().getName()); 
     statusCache = new ArrayList<Status>(); 
    } 

    @Override 
    public void execute(Tuple tuple) { 
     currentTuple = tuple; 

     Status currentStatus = null; 
     try { 
      currentStatus = (Status) tuple.getValue(0); 
     } catch (ClassCastException e) { 
     } 
     if (currentStatus != null) { 

      //add it to the status cache 
      statusCache.add(currentStatus); 
      collector.ack(tuple); 


      //check the size of the status cache and pass it to the next stage if you have enough messages to emit 
      if (statusCache.size() > 10) { 
       collector.emit(new Values(statusCache)); 
      } 

     } 
    } 

    @Override 
    public void cleanup() { 


    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     declarer.declare(new Fields("tweets")); 

    } 

    @Override 
    public Map<String, Object> getComponentConfiguration() { 
     return null; //To change body of implemented methods use File | Settings | File Templates. 
    } 


    protected void setupNonSerializableAttributes() { 

    } 

} 

Bolt drukarki

public class PrinterBolt extends BaseBasicBolt { 

    @Override 
    public void execute(Tuple tuple, BasicOutputCollector collector) { 
     System.out.println(tuple.size() + " " + tuple); 
    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer ofd) { 
    } 

} 

Odpowiedz

4

Z tego co widzę, to wygląda dobrze. Diabeł tkwi w szczegółach. Nie jestem pewien, co robi agregator, ale jeśli ma jakieś założenia co do wartości przekazywanych do niego, powinieneś rozważyć odpowiednie grupowanie pól. Może to nie zrobić dużej różnicy, ponieważ korzystasz z domyślnej wskazówki dotyczącej paralelizmu równej 1, ale jeśli zdecydujesz się na skalowanie za pomocą wielu zagregowanych instancji śrubowych, niejawne założenia logiczne mogą wymagać grupowania nieprzemysłowego.

+0

Podałem kod dla śruby agregatora powyżej (patrz metoda wykonywania). Na razie czeka, aż zgromadzi N (10 w powyższym przykładzie) wiadomości i podzieli je, gdy tylko będzie mieć 10 wiadomości. BTW Właśnie znalazłem błąd, który naprawię. Po wyemitowaniu wartości muszę wyczyścić pamięć podręczną. Jakie zmiany powinny być konieczne, jeśli muszę użyć więcej niż jednego agregatora. –

0

Witaj, jak tylko spróbujesz zasubskrybować więcej niż jedno słowo kluczowe, napotkasz problemy. Sugeruję, że twój wylewka emituje również oryginalne słowo kluczowe, które zostało użyte do filtrowania.

Wtedy zamiast robić shuffleGrouping chciałbym zrobić fieldsGrouping

builder.setBolt("sampleaggregate", new SampleAggregatorBolt()) 
      .shuffleGrouping("spout", new Fields("keyword")); 

W ten sposób można upewnić się, że wyniki za pomocą pojedynczego hasła kończy się na tej samej śrubie za każdym razem. Takie, że możesz poprawnie obliczać agregaty. Jeśli pominiesz polaGrouping Storm, możesz utworzyć dowolną ilość zagregowanej śruby i wysyłać wiadomości z dziobka do dowolnego wystąpienia zagregowanej śruby, która w ostatecznym przypadku byłaby błędna.

Powiązane problemy