2016-12-18 14 views
5

próbuję zbudować przykładową aplikację przy użyciu Apache flink który wykonuje następujące czynności:Apache Flink - używać wartości od strumienia danych do dynamicznego tworzenia strumieniowe źródło danych

  1. Czyta strumień symboli giełdowych (np "CSCO", "FB") z kolejki Kafka.
  2. Dla każdego symbolu wykonuje w czasie rzeczywistym wyszukiwanie aktualnych cen i strumieni wartości dla dalszego przetwarzania.

* Aktualizacja do oryginalnego wpisu *

przeniosłem funkcję mapy do osobnej klasy i nie pojawia się komunikat o błędzie run-time "Realizacja MapFunction nie jest możliwy do serializacji więcej. Obiekt prawdopodobnie zawiera lub nie odnosi się do niezdefiniowalnych pól ".

Kwestią, którą teraz stoję, jest to, że temat Kafki "ceny akcji", próbuję zapisać ceny, nie otrzymuje ich. Próbuję rozwiązać problem i opublikuję wszelkie aktualizacje.

public class RetrieveStockPrices { 
    @SuppressWarnings("serial") 
    public static void main(String[] args) throws Exception { 
     final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); 
     streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); 

     Properties properties = new Properties(); 
     properties.setProperty("bootstrap.servers", "localhost:9092"); 
     properties.setProperty("zookeeper.connect", "localhost:2181"); 
     properties.setProperty("group.id", "stocks"); 

     DataStream<String> streamOfStockSymbols = streamExecEnv.addSource(new FlinkKafkaConsumer08<String>("stocksymbol", new SimpleStringSchema(), properties)); 

     DataStream<String> stockPrice = 
      streamOfStockSymbols 
      //get unique keys 
      .keyBy(new KeySelector<String, String>() { 
       @Override 
       public String getKey(String trend) throws Exception { 
        return trend; 
       } 
       }) 
      //collect events over a window 
      .window(TumblingEventTimeWindows.of(Time.seconds(60))) 
      //return the last event from the window...all elements are the same "Symbol" 
      .apply(new WindowFunction<String, String, String, TimeWindow>() { 
       @Override 
       public void apply(String key, TimeWindow window, Iterable<String> input, Collector<String> out) throws Exception { 
        out.collect(input.iterator().next().toString()); 
       } 
      }) 
      .map(new StockSymbolToPriceMapFunction()); 

     streamExecEnv.execute("Retrieve Stock Prices"); 
    } 
} 

public class StockSymbolToPriceMapFunction extends RichMapFunction<String, String> { 
    @Override 
    public String map(String stockSymbol) throws Exception { 
     final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); 
     streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); 
     System.out.println("StockSymbolToPriceMapFunction: stockSymbol: " + stockSymbol); 

     DataStream<String> stockPrices = streamExecEnv.addSource(new LookupStockPrice(stockSymbol)); 
     stockPrices.keyBy(new CustomKeySelector()).addSink(new FlinkKafkaProducer08<String>("localhost:9092", "stockprices", new SimpleStringSchema())); 

     return "100000"; 
    } 

    private static class CustomKeySelector implements KeySelector<String, String> { 
     @Override 
     public String getKey(String arg0) throws Exception { 
      return arg0.trim(); 
     } 
    } 
} 


public class LookupStockPrice extends RichSourceFunction<String> { 
    public String stockSymbol = null; 
    public boolean isRunning = true; 

    public LookupStockPrice(String inSymbol) { 
      stockSymbol = inSymbol; 
    } 

    @Override 
    public void open(Configuration parameters) throws Exception { 
      isRunning = true; 
    } 


    @Override 
    public void cancel() { 
      isRunning = false; 
    } 

    @Override 
    public void run(SourceFunction.SourceContext<String> ctx) 
        throws Exception { 
      String stockPrice = "0"; 
      while (isRunning) { 
       //TODO: query Google Finance API 
       stockPrice = Integer.toString((new Random()).nextInt(100)+1); 
       ctx.collect(stockPrice); 
       Thread.sleep(10000); 
      } 
    } 
} 

Odpowiedz

4

StreamExecutionEnvironment nie są wcięte do użycia wewnątrz operatorów aplikacji do przesyłania strumieniowego. Nie oznacza to, że nie jest to sprawdzone i zalecane. To może działać i coś zrobić, ale najprawdopodobniej nie zachowa się dobrze i prawdopodobnie zabije twoją aplikację.

W swoim programie StockSymbolToPriceMapFunction określa całkowicie nową i niezależną aplikację do przesyłania strumieniowego. Jednak, ponieważ nie wywołujemy streamExecEnv.execute(), programy nie są uruchamiane, a metoda map powraca bez wykonywania jakichkolwiek czynności.

Jeśli będzie zadzwoń streamExecEnv.execute(), funkcja uruchomi nowy lokalny klaster Flink w robotniczej maszynie JVM i uruchomi aplikację w tym lokalnym klastrze Flink. Lokalna instancja Flink zajmie dużo miejsca na sterty i po uruchomieniu kilku klastrów, pracownik prawdopodobnie zginie z powodu OutOfMemoryError, co nie jest tym, co chcesz osiągnąć.

+0

Czy jest możliwe dynamiczne tworzenie strumieni w odpowiedzi na przychodzące dane? –

+0

Można zaimplementować funkcję 'FlatMapFunction', która dynamicznie odczytuje i emituje dane na podstawie nadchodzących rekordów. Na przykład, jeśli masz strumień z nazwami plików, 'FlatMapFunction' możesz otworzyć te pliki i emitować swoje dane. Jednak typy wyjściowe wszystkich rekordów muszą być takie same. Również może być trudne, aby uzyskać semantykę przetwarzania zdarzeń w czasie, ale jest to bardziej ogólny problem z dynamicznie dodawanych źródeł. –

+0

@FabianHueske Rozwiązuję podobny przypadek użycia. Więc jeśli będę musiał użyć FlatMapFunction, będziemy musieli odczytać plik za pomocą normalnych API plików z scala/Java i nie używając readTextFile Flink. Powód: nie możemy używać StreamExecutionEnvironment wewnątrz flatMap. Czy moje zrozumienie jest poprawne? –

Powiązane problemy