2015-12-19 9 views
8

Obecnie pracuję z danymi sekwencji DNA i natknąłem się na blokadę wydajności.Spark: bardziej wydajna agregacja do łączenia ciągów z różnych wierszy

Mam dwa wyszukiwane słowniki/hashe (jako RDD) z "słowami" DNA (krótkie sekwencje) jako kluczami i listą pozycji indeksu jako wartością. Jeden dotyczy krótszej sekwencji zapytania, a drugi sekwencji bazy danych. Tworzenie tabel jest dość szybkie, nawet w przypadku bardzo, bardzo dużych sekwencji.

W następnym kroku muszę je powiązać i znaleźć "trafienia" (pary pozycji indeksu dla każdego wspólnego słowa).

Najpierw dołączę do słowników odnośników, które są dość szybkie. Jednak teraz potrzebuję par, więc muszę dwukrotnie wypunktować mapę, raz, aby rozwinąć listę indeksów z zapytania i po raz drugi, aby rozwinąć listę indeksów z bazy danych. To nie jest idealne, ale nie widzę innego sposobu, aby to zrobić. Przynajmniej działa dobrze.

Dane wyjściowe w tym miejscu to: (query_index, (word_length, diagonal_offset)), gdzie przesunięcie diagonalne jest wskaźnikiem_sortowania_danych_obiektu minus indeks sekwencji zapytań.

Muszę teraz znaleźć pary indeksów z tym samym przekątnym odsunięciem (db_index - zapytanie_index) i względnie blisko siebie i połączyć je (więc zwiększam długość słowa), ale tylko jako pary (tj. Raz Dołączam do jednego indeksu z drugim, nie chcę, aby coś innego się z nim łączyło).

Robię to za pomocą operacji aggregateByKey za pomocą specjalnego obiektu o nazwie Seed().

PARALELLISM = 16 # I have 4 cores with hyperthreading 
def generateHsps(query_lookup_table_rdd, database_lookup_table_rdd): 
    global broadcastSequences 

    def mergeValueOp(seedlist, (query_index, seed_length)): 
     return seedlist.addSeed((query_index, seed_length)) 

    def mergeSeedListsOp(seedlist1, seedlist2): 
     return seedlist1.mergeSeedListIntoSelf(seedlist2) 

    hits_rdd = (query_lookup_table_rdd.join(database_lookup_table_rdd) 
       .flatMap(lambda (word, (query_indices, db_indices)): [(query_index, db_indices) for query_index in query_indices], preservesPartitioning=True) 
       .flatMap(lambda (query_index, db_indices): [(db_index - query_index, (query_index, WORD_SIZE)) for db_index in db_indices], preservesPartitioning=True) 
       .aggregateByKey(SeedList(), mergeValueOp, mergeSeedListsOp, PARALLELISM) 
       .map(lambda (diagonal, seedlist): (diagonal, seedlist.mergedSeedList)) 
       .flatMap(lambda (diagonal, seedlist): [(query_index, seed_length, diagonal) for query_index, seed_length in seedlist]) 
       ) 

    return hits_rdd 

Seed():

class SeedList(): 
    def __init__(self): 
     self.unmergedSeedList = [] 
     self.mergedSeedList = [] 


    #Try to find a more efficient way to do this 
    def addSeed(self, (query_index1, seed_length1)): 
     for i in range(0, len(self.unmergedSeedList)): 
      (query_index2, seed_length2) = self.unmergedSeedList[i] 
      #print "Checking ({0}, {1})".format(query_index2, seed_length2) 
      if min(abs(query_index2 + seed_length2 - query_index1), abs(query_index1 + seed_length1 - query_index2)) <= WINDOW_SIZE: 
       self.mergedSeedList.append((min(query_index1, query_index2), max(query_index1+seed_length1, query_index2+seed_length2)-min(query_index1, query_index2))) 
       self.unmergedSeedList.pop(i) 
       return self 
     self.unmergedSeedList.append((query_index1, seed_length1)) 
     return self 

    def mergeSeedListIntoSelf(self, seedlist2): 
     print "merging seed" 
     for (query_index2, seed_length2) in seedlist2.unmergedSeedList: 
      wasmerged = False 
      for i in range(0, len(self.unmergedSeedList)): 
       (query_index1, seed_length1) = self.unmergedSeedList[i] 
       if min(abs(query_index2 + seed_length2 - query_index1), abs(query_index1 + seed_length1 - query_index2)) <= WINDOW_SIZE: 
        self.mergedSeedList.append((min(query_index1, query_index2), max(query_index1+seed_length1, query_index2+seed_length2)-min(query_index1, query_index2))) 
        self.unmergedSeedList.pop(i) 
        wasmerged = True 
        break 
      if not wasmerged: 
       self.unmergedSeedList.append((query_index2, seed_length2)) 
     return self 

To gdzie wydajność naprawdę zepsuje nawet dla sekwencji umiarkowanej długości.

Czy istnieje lepszy sposób na wykonanie tej agregacji? Moje przeczucie mówi "tak", ale nie mogę tego wymyślić.

Wiem, że jest to bardzo długotrwałe i techniczne pytanie, a ja naprawdę doceniam każdy wgląd, nawet jeśli nie ma łatwego rozwiązania.

Edycja: Oto jak robię tabel przeglądowych:

def createLookupTable(sequence_rdd, sequence_name, word_length): 
    global broadcastSequences 
    blank_list = [] 

    def addItemToList(lst, val): 
     lst.append(val) 
     return lst 

    def mergeLists(lst1, lst2): 
     #print "Merging" 
     return lst1+lst2 

    return (sequence_rdd 
      .flatMap(lambda seq_len: range(0, seq_len - word_length + 1)) 
      .repartition(PARALLELISM) 
      #.partitionBy(PARALLELISM) 
      .map(lambda index: (str(broadcastSequences.value[sequence_name][index:index + word_length]), index), preservesPartitioning=True) 
      .aggregateByKey(blank_list, addItemToList, mergeLists, PARALLELISM)) 
      #.map(lambda (word, indices): (word, sorted(indices)))) 

i tutaj jest funkcja, która uruchamia całą operację:

def run(query_seq, database_sequence, translate_query=False): 
    global broadcastSequences 
    scoring_matrix = 'nucleotide' if isinstance(query_seq.alphabet, DNAAlphabet) else 'blosum62' 
    sequences = {'query': query_seq, 
       'database': database_sequence} 

    broadcastSequences = sc.broadcast(sequences) 
    query_rdd = sc.parallelize([len(query_seq)]) 
    query_rdd.cache() 
    database_rdd = sc.parallelize([len(database_sequence)]) 
    database_rdd.cache() 
    query_lookup_table_rdd = createLookupTable(query_rdd, 'query', WORD_SIZE) 
    query_lookup_table_rdd.cache() 
    database_lookup_table_rdd = createLookupTable(database_rdd, 'database', WORD_SIZE) 
    seeds_rdd = generateHsps(query_lookup_table_rdd, database_lookup_table_rdd) 
    return seeds_rdd 

Edycja 2: Mam manipulowane co nieco i nieznacznie poprawiono wydajność, zastępując:

   .flatMap(lambda (word, (query_indices, db_indices)): [(query_index, db_indices) for query_index in query_indices], preservesPartitioning=True) 
       .flatMap(lambda (query_index, db_indices): [(db_index - query_index, (query_index, WORD_SIZE)) for db_index in db_indices], preservesPartitioning=True) 

w hits_rdd z:

.flatMap(lambda (word, (query_indices, db_indices)): itertools.product(query_indices, db_indices)) 
       .map(lambda (query_index, db_index): (db_index - query_index, (query_index, WORD_SIZE))) 

Przynajmniej teraz nie spalam pamięci z pośrednimi strukturami danych.

+0

ma zmienną WINDOW_SIZE dla każdej listy SeedList? – Back2Basics

+0

@ Back2Basics WINDOW_SIZE jest takie samo dla każdego SeedList. –

+0

Hasło hits_rdd nie ma sensu. Czy możemy przyjrzeć się przykładom query_lookup_table_dd i database_lookup_table_dd, a także implementacjom flatMap i aggregateByKey? – Back2Basics

Odpowiedz

1

Zapomnijmy o technicznych szczegółach tego, co robisz i myślisz "funkcjonalnie" o poszczególnych etapach, zapominając o szczegółach implementacji.Takie funkcjonalne myślenie jest ważną częścią równoległej analizy danych; Idealnie, jeśli możemy rozwiązać ten problem w ten sposób, możemy wyraźniej wytłumaczyć, o jakie kroki chodzi, i zakończyć w sposób bardziej przejrzysty i często bardziej zwięzły. Myśląc w kategoriach tabelarycznego modelu danych, uważam, że Twój problem składa się z następujących kroków:

  1. Dołącz do dwóch zestawów danych w kolumnie sekwencji. Stwórz nową kolumnę delta zawierającą różnicę między indeksami.
  2. Posortuj według (dowolnego) indeksu, aby upewnić się, że podciągi są we właściwej kolejności.
  3. Grupuj według delta i łącz ciągi w kolumnie sekwencji, aby uzyskać pełne dopasowania między zestawami danych.

Dla pierwszych 3 kroków, myślę, że sensowne jest korzystanie z DataFrames, ponieważ ten model danych ma sens w mojej głowie o rodzaju przetwarzania, który robimy. (Właściwie mogę użyć DataFrames również dla kroku 4, z wyjątkiem Pyspark nie obsługuje obecnie niestandardowych funkcji agregujących dla DataFrames, chociaż Scala robi).

W czwartym kroku (jeśli dobrze rozumiem, o co naprawdę pytasz w swoim pytaniu), trudno jest myśleć o tym, jak to zrobić funkcjonalnie, jednak myślę, że eleganckim i skutecznym rozwiązaniem jest użyj reduku (znanego również jako prawy fałd); ten wzór można zastosować do dowolnego problemu, który można sformułować w kategoriach iteracyjnego stosowania asocjacyjnej funkcji binarnej, czyli funkcji, w której "grupowanie" dowolnych 3 argumentów nie ma znaczenia (chociaż porządek na pewno może mieć znaczenie), symbolicznie, jest to funkcja x,y -> f(x,y), gdzie f(x, f(y, z)) = f(f(x, y), z). Łańcuch (lub bardziej ogólnie lista) konkatenacja jest właśnie taką funkcją.

Oto przykład tego, jak to może wyglądać w pyspark; miejmy nadzieję, że możesz dostosować to do szczegółów twoich danych:

#setup some sample data 
query = [['abcd', 30] ,['adab', 34] ,['dbab',38]] 
reference = [['dbab', 20], ['ccdd', 24], ['abcd', 50], ['adab',54], ['dbab',58], ['dbab', 62]] 

#create data frames 
query_df = sqlContext.createDataFrame(query, schema = ['sequence1', 'index1']) 
reference_df = sqlContext.createDataFrame(reference, schema = ['sequence2', 'index2']) 

#step 1: join 
matches = query_df.join(reference_df, query_df.sequence1 == reference_df.sequence2) 

#step 2: calculate delta column 
matches_delta = matches.withColumn('delta', matches.index2 - matches.index1) 

#step 3: sort by index 
matches_sorted = matches_delta.sort('delta').sort('index2') 

#step 4: convert back to rdd and reduce 
#note that + is just string concatenation for strings 
r = matches_sorted['delta', 'sequence1'].rdd 
r.reduceByKey(lambda x, y : x + y).collect() 

#expected output: 
#[(24, u'dbab'), (-18, u'dbab'), (20, u'abcdadabdbab')] 
+0

Interesujące podejście. Samo sprzężenie nie jest problemem, jest to rozszerzenie sparowanych list jako produktu kartezjańskiego, więc może pominę krok agregacji i zrobię to zamiast tego. Nigdy wcześniej nie współpracowałem z DataFrames, ale podejrzewałam, że będę tego chciał. –

+0

W rzeczywistości mam coraz gorszy występ, podejrzewam, że to dlatego, że dopasowujemy każdą możliwą parę słów, a nie całą listę słów, tak jak robiłem to wcześniej. Nie jestem pewien, czy to się opłaca z większymi sekwencjami, gdzie krok ekspansji jest dużym hitem wydajności, ale zobaczę. –

Powiązane problemy