próbuję zbudować przykładową aplikację przy użyciu Apache flink który wykonuje następujące czynności:Apache Flink - używać wartości od strumienia danych do dynamicznego tworzenia strumieniowe źródło danych
- Czyta strumień symboli giełdowych (np "CSCO", "FB") z kolejki Kafka.
- Dla każdego symbolu wykonuje w czasie rzeczywistym wyszukiwanie aktualnych cen i strumieni wartości dla dalszego przetwarzania.
* Aktualizacja do oryginalnego wpisu *
przeniosłem funkcję mapy do osobnej klasy i nie pojawia się komunikat o błędzie run-time "Realizacja MapFunction nie jest możliwy do serializacji więcej. Obiekt prawdopodobnie zawiera lub nie odnosi się do niezdefiniowalnych pól ".
Kwestią, którą teraz stoję, jest to, że temat Kafki "ceny akcji", próbuję zapisać ceny, nie otrzymuje ich. Próbuję rozwiązać problem i opublikuję wszelkie aktualizacje.
public class RetrieveStockPrices {
@SuppressWarnings("serial")
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "stocks");
DataStream<String> streamOfStockSymbols = streamExecEnv.addSource(new FlinkKafkaConsumer08<String>("stocksymbol", new SimpleStringSchema(), properties));
DataStream<String> stockPrice =
streamOfStockSymbols
//get unique keys
.keyBy(new KeySelector<String, String>() {
@Override
public String getKey(String trend) throws Exception {
return trend;
}
})
//collect events over a window
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
//return the last event from the window...all elements are the same "Symbol"
.apply(new WindowFunction<String, String, String, TimeWindow>() {
@Override
public void apply(String key, TimeWindow window, Iterable<String> input, Collector<String> out) throws Exception {
out.collect(input.iterator().next().toString());
}
})
.map(new StockSymbolToPriceMapFunction());
streamExecEnv.execute("Retrieve Stock Prices");
}
}
public class StockSymbolToPriceMapFunction extends RichMapFunction<String, String> {
@Override
public String map(String stockSymbol) throws Exception {
final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
System.out.println("StockSymbolToPriceMapFunction: stockSymbol: " + stockSymbol);
DataStream<String> stockPrices = streamExecEnv.addSource(new LookupStockPrice(stockSymbol));
stockPrices.keyBy(new CustomKeySelector()).addSink(new FlinkKafkaProducer08<String>("localhost:9092", "stockprices", new SimpleStringSchema()));
return "100000";
}
private static class CustomKeySelector implements KeySelector<String, String> {
@Override
public String getKey(String arg0) throws Exception {
return arg0.trim();
}
}
}
public class LookupStockPrice extends RichSourceFunction<String> {
public String stockSymbol = null;
public boolean isRunning = true;
public LookupStockPrice(String inSymbol) {
stockSymbol = inSymbol;
}
@Override
public void open(Configuration parameters) throws Exception {
isRunning = true;
}
@Override
public void cancel() {
isRunning = false;
}
@Override
public void run(SourceFunction.SourceContext<String> ctx)
throws Exception {
String stockPrice = "0";
while (isRunning) {
//TODO: query Google Finance API
stockPrice = Integer.toString((new Random()).nextInt(100)+1);
ctx.collect(stockPrice);
Thread.sleep(10000);
}
}
}
Czy jest możliwe dynamiczne tworzenie strumieni w odpowiedzi na przychodzące dane? –
Można zaimplementować funkcję 'FlatMapFunction', która dynamicznie odczytuje i emituje dane na podstawie nadchodzących rekordów. Na przykład, jeśli masz strumień z nazwami plików, 'FlatMapFunction' możesz otworzyć te pliki i emitować swoje dane. Jednak typy wyjściowe wszystkich rekordów muszą być takie same. Również może być trudne, aby uzyskać semantykę przetwarzania zdarzeń w czasie, ale jest to bardziej ogólny problem z dynamicznie dodawanych źródeł. –
@FabianHueske Rozwiązuję podobny przypadek użycia. Więc jeśli będę musiał użyć FlatMapFunction, będziemy musieli odczytać plik za pomocą normalnych API plików z scala/Java i nie używając readTextFile Flink. Powód: nie możemy używać StreamExecutionEnvironment wewnątrz flatMap. Czy moje zrozumienie jest poprawne? –