2015-02-18 24 views
6

Wdrażam strumień uczniów do klasyfikacji tekstu. W mojej implementacji jest kilka wartości jednowartościowych, które muszą zostać zaktualizowane, gdy nadejdą nowe elementy strumieniowe. Na przykład chcę zmienić szybkość uczenia się, gdy tworzone są nowe prognozy. Wątpię jednak, by można było emitować zmienne po początkowej transmisji. Co się stanie, jeśli będę potrzebował emitować zmienną za każdym razem, gdy ją aktualizuję. Jeśli istnieje sposób, aby to zrobić lub obejście tego, co chcę osiągnąć w Spark Streaming, chętnie się o tym dowiem.Okresowa transmisja w Apache Spark Streaming

Z góry dziękuję.

Odpowiedz

1

Moje zrozumienie jest po wysłaniu początkowo zmiennej rozgłaszanej, jest "tylko do odczytu". Sądzę, że można aktualizować zmienną emisji w lokalnych węzłach, ale nie na zdalnych węzłach.

Być może należy rozważyć wykonanie tej "zewnętrznej iskry". Co powiesz na korzystanie ze sklepu noSQL (Cassandra ..etc) lub nawet Memcache? Następnie możesz zaktualizować zmienną z jednego zadania i okresowo sprawdzać ten sklep przed innymi zadaniami?

0

Najlepiej jest zebrać dane do sterownika, a następnie rozesłać je do wszystkich węzłów.

Użyj Dstream # foreachRDD do zbierania obliczonych RDDs w sterowniku i gdy wiesz, kiedy musisz zmienić szybkość uczenia się, a następnie użyj SparkContext # broadcast (wartość), aby wysłać nową wartość do wszystkich węzłów.

spodziewałbym kod aby wyglądać tak:

dStreamContainingBroadcastValue.foreachRDD{ rdd => 
     val valueToBroadcast = rdd.collect() 
     sc.broadcast(valueToBroadcast) 
} 

Można również znaleźć this thread użyteczne, z listy mailingowej użytkownika iskra. Daj mi znać, czy działa.

+0

Więc, jak myślisz, że on może odczytać zmienną emisji z tego fragmentu? W pewnym sensie pokonuje ona swój cel, by zwrócić "Jednostkę". – kareblak

1

Mam brzydką grę, ale zadziałało! Możemy dowiedzieć się, jak uzyskać wartość emisji z obiektu rozgłaszania. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala#L114 tylko przez identyfikator transmisji.

więc okresowo retransmitować przez ten sam identyfikator transmisji.

val broadcastFactory = new TorrentBroadcastFactory() 
broadcastFactory.unbroadcast(BroadcastId, true, true) 
// append some ids to initIds 
val broadcastcontent = broadcastFactory.newBroadcast[.Set[String]](initIds, false, BroadcastId) 

i mogę uzyskać BroadcastId od pierwszej wartości emisji.

val ids = ssc.sparkContext.broadcast(initIds) 
// broadcast id 
val BroadcastId = broadcastIds.id 

następnie identyfikatory używane przez pracownika jako typ emisji jak zwykle.

def func(record: Array[Byte], bc: Broadcast[Set[String]]) = ??? 
1
bkc.unpersist(true) 
bkc.destroy() 
bkc = sc.broadcast(tableResultMap) 
bkv = bkc.value 

Możesz spróbować tego, że nie gwarantują, czy efektywne

1

dostałem tej pracy poprzez stworzenie klasy otoki nad zmiennej transmisji. Metoda updateAndGet klasy wrapper zwraca odświeżoną zmienną broadcast. Wołam tej funkcji wewnątrz dStream.transform -> zgodnie z Dokumentacją Spark

http://spark.apache.org/docs/latest/streaming-programming-guide.html#transform-operation

Transform Operation stany. „funkcja dostarczana jest wywoływana w każdym przedziale wsadowym To pozwala zrobić czaso- różne operacje RDD, to znaczy operacje RDD, liczba partycji, rozgłaszanie zmiennych itd.mogą być zmieniane między partiami

klasa BroadcastWrapper będzie wyglądać.

public class BroadcastWrapper { 
private Broadcast<ReferenceData> broadcastVar; 
private Date lastUpdatedAt = Calendar.getInstance().getTime(); 

private static BroadcastWrapper obj = new BroadcastWrapper(); 

private BroadcastWrapper(){} 

public static BroadcastWrapper getInstance() { 
     return obj; 
} 

public JavaSparkContext getSparkContext(SparkContext sc) { 
     JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc); 
     return jsc; 
} 

public Broadcast<ReferenceData> updateAndGet(SparkContext sparkContext){ 
     Date currentDate = Calendar.getInstance().getTime(); 
     long diff = currentDate.getTime()-lastUpdatedAt.getTime(); 
     if (var == null || diff > 60000) { //Lets say we want to refresh every 1 min = 60000 ms 
      if (var != null) 
       var.unpersist(); 
      lastUpdatedAt = new Date(System.currentTimeMillis()); 

      //Your logic to refresh 
      ReferenceData data = getRefData(); 

      var = getSparkContext(sparkContext).broadcast(data); 
     } 
     return var; 
} 
} 

Można użyć tej funkcji updateAndGet nadawane zmiennym w sposób, który pozwala stream.transform RDD-RDD Przekształcenia

objectStream.transform(stream -> { 

    Broadcast<Object> var = BroadcastWrapper.getInstance().updateAndGet(stream.context()); 

/**Your code to manipulate stream **/ 
}); 

Zobacz pełną odpowiedź z tego pytania: https://stackoverflow.com/a/41259333/3166245

Ho pe to pomaga