2015-09-14 10 views
10

Ostatnio mam zamiar przenieść mój samodzielny kod Pythona ML do iskry. Pipeline ML w spark.ml okazuje się całkiem przydatny, z usprawnionym interfejsem API do łączenia etapów algorytmu i przeszukiwania siatki parametrów.Buforowanie pośrednie wyniki w rurociągu Spark ML

Mimo to znalazłem wsparcie dla jednej ważnej funkcji niejasnej w istniejących dokumentach: buforowanie pośrednich wyników. Znaczenie tej funkcji pojawia się, gdy rurociąg obejmuje etapy intensywnego obliczania.

Na przykład w moim przypadku używam ogromnej macierzy rzadkiej do wykonywania wielu średnich ruchomych na danych szeregów czasowych w celu utworzenia funkcji wprowadzania. Struktura macierzy jest określona przez pewien hiper-parametr. Krok ten okazuje się wąskim gardłem dla całego rurociągu, ponieważ muszę go zbudować w czasie wykonywania.

Podczas wyszukiwania parametrów zwykle mam inne parametry do zbadania poza tym "parametrem struktury". Jeśli więc mogę ponownie wykorzystać ogromną macierz, gdy "parametr struktury" pozostanie niezmieniony, mogę zaoszczędzić mnóstwo czasu. Z tego powodu celowo utworzyłem mój kod do buforowania i ponownego użycia tych pośrednich wyników.

Moje pytanie brzmi: Czy uchwyt potoku ML firmy ML Spark może automatycznie buforować buforowanie? Czy muszę ręcznie utworzyć kod, aby to zrobić? Jeśli tak, czy istnieje jakaś najlepsza praktyka do nauki?

P.S. Zajrzałem do oficjalnego dokumentu i kilku innych materiałów, ale żaden z nich nie wydaje się omawiać tego tematu.

+0

mam [Podobne pytanie] (http://stackoverflow.com/questions/33161320/distributed-batch-computation -z-długotrwałym-utrzymywaniem-i-kontrolowaniem), który niestety nie ma również odpowiedzi. –

Odpowiedz

4

Więc wpadłem na ten sam problem i sposób, w jaki rozwiązałem, to to, że zaimplementowałem mój własny PipelineStage, który buforuje dane wejściowe DataSet i zwraca je tak, jak jest.

import org.apache.spark.ml.Transformer 
import org.apache.spark.ml.param.ParamMap 
import org.apache.spark.ml.util.{DefaultParamsWritable, Identifiable} 
import org.apache.spark.sql.{DataFrame, Dataset} 
import org.apache.spark.sql.types.StructType 

class Cacher(val uid: String) extends Transformer with DefaultParamsWritable { 
    override def transform(dataset: Dataset[_]): DataFrame = dataset.toDF.cache() 

    override def copy(extra: ParamMap): Transformer = defaultCopy(extra) 

    override def transformSchema(schema: StructType): StructType = schema 

    def this() = this(Identifiable.randomUID("CacherTransformer")) 
} 

Aby go użyć wtedy można zrobić coś takiego:

new Pipeline().setStages(Array(stage1, new Cacher(), stage2)) 
+1

Domyślam się, że jedyny problem z tym rozwiązaniem to fakt, że nie rozpakowuje się wstępnie buforowanej ramki danych (jeśli łączysz kilku Cachera). Możesz argumentować, że to nie problem, ponieważ Spark automatycznie unieważnia się w czasie GC, ale może to być dość mylące z Twojego interfejsu użytkownika, np. widząc tak wiele buforowanych danych. –

Powiązane problemy