dostałem tej pracy poprzez stworzenie klasy otoki nad zmiennej transmisji. Metoda updateAndGet klasy wrapper zwraca odświeżoną zmienną broadcast. Wołam tej funkcji wewnątrz dStream.transform -> zgodnie z Dokumentacją Spark
http://spark.apache.org/docs/latest/streaming-programming-guide.html#transform-operation
Transform Operation stany. „funkcja dostarczana jest wywoływana w każdym przedziale wsadowym To pozwala zrobić czaso- różne operacje RDD, to znaczy operacje RDD, liczba partycji, rozgłaszanie zmiennych itd.mogą być zmieniane między partiami „
klasa BroadcastWrapper będzie wyglądać.
public class BroadcastWrapper {
private Broadcast<ReferenceData> broadcastVar;
private Date lastUpdatedAt = Calendar.getInstance().getTime();
private static BroadcastWrapper obj = new BroadcastWrapper();
private BroadcastWrapper(){}
public static BroadcastWrapper getInstance() {
return obj;
}
public JavaSparkContext getSparkContext(SparkContext sc) {
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);
return jsc;
}
public Broadcast<ReferenceData> updateAndGet(SparkContext sparkContext){
Date currentDate = Calendar.getInstance().getTime();
long diff = currentDate.getTime()-lastUpdatedAt.getTime();
if (var == null || diff > 60000) { //Lets say we want to refresh every 1 min = 60000 ms
if (var != null)
var.unpersist();
lastUpdatedAt = new Date(System.currentTimeMillis());
//Your logic to refresh
ReferenceData data = getRefData();
var = getSparkContext(sparkContext).broadcast(data);
}
return var;
}
}
Można użyć tej funkcji updateAndGet nadawane zmiennym w sposób, który pozwala stream.transform RDD-RDD Przekształcenia
objectStream.transform(stream -> {
Broadcast<Object> var = BroadcastWrapper.getInstance().updateAndGet(stream.context());
/**Your code to manipulate stream **/
});
Zobacz pełną odpowiedź z tego pytania: https://stackoverflow.com/a/41259333/3166245
Ho pe to pomaga
Więc, jak myślisz, że on może odczytać zmienną emisji z tego fragmentu? W pewnym sensie pokonuje ona swój cel, by zwrócić "Jednostkę". – kareblak