Obecnie pracuję z danymi sekwencji DNA i natknąłem się na blokadę wydajności.Spark: bardziej wydajna agregacja do łączenia ciągów z różnych wierszy
Mam dwa wyszukiwane słowniki/hashe (jako RDD) z "słowami" DNA (krótkie sekwencje) jako kluczami i listą pozycji indeksu jako wartością. Jeden dotyczy krótszej sekwencji zapytania, a drugi sekwencji bazy danych. Tworzenie tabel jest dość szybkie, nawet w przypadku bardzo, bardzo dużych sekwencji.
W następnym kroku muszę je powiązać i znaleźć "trafienia" (pary pozycji indeksu dla każdego wspólnego słowa).
Najpierw dołączę do słowników odnośników, które są dość szybkie. Jednak teraz potrzebuję par, więc muszę dwukrotnie wypunktować mapę, raz, aby rozwinąć listę indeksów z zapytania i po raz drugi, aby rozwinąć listę indeksów z bazy danych. To nie jest idealne, ale nie widzę innego sposobu, aby to zrobić. Przynajmniej działa dobrze.
Dane wyjściowe w tym miejscu to: (query_index, (word_length, diagonal_offset))
, gdzie przesunięcie diagonalne jest wskaźnikiem_sortowania_danych_obiektu minus indeks sekwencji zapytań.
Muszę teraz znaleźć pary indeksów z tym samym przekątnym odsunięciem (db_index - zapytanie_index) i względnie blisko siebie i połączyć je (więc zwiększam długość słowa), ale tylko jako pary (tj. Raz Dołączam do jednego indeksu z drugim, nie chcę, aby coś innego się z nim łączyło).
Robię to za pomocą operacji aggregateByKey za pomocą specjalnego obiektu o nazwie Seed().
PARALELLISM = 16 # I have 4 cores with hyperthreading
def generateHsps(query_lookup_table_rdd, database_lookup_table_rdd):
global broadcastSequences
def mergeValueOp(seedlist, (query_index, seed_length)):
return seedlist.addSeed((query_index, seed_length))
def mergeSeedListsOp(seedlist1, seedlist2):
return seedlist1.mergeSeedListIntoSelf(seedlist2)
hits_rdd = (query_lookup_table_rdd.join(database_lookup_table_rdd)
.flatMap(lambda (word, (query_indices, db_indices)): [(query_index, db_indices) for query_index in query_indices], preservesPartitioning=True)
.flatMap(lambda (query_index, db_indices): [(db_index - query_index, (query_index, WORD_SIZE)) for db_index in db_indices], preservesPartitioning=True)
.aggregateByKey(SeedList(), mergeValueOp, mergeSeedListsOp, PARALLELISM)
.map(lambda (diagonal, seedlist): (diagonal, seedlist.mergedSeedList))
.flatMap(lambda (diagonal, seedlist): [(query_index, seed_length, diagonal) for query_index, seed_length in seedlist])
)
return hits_rdd
Seed():
class SeedList():
def __init__(self):
self.unmergedSeedList = []
self.mergedSeedList = []
#Try to find a more efficient way to do this
def addSeed(self, (query_index1, seed_length1)):
for i in range(0, len(self.unmergedSeedList)):
(query_index2, seed_length2) = self.unmergedSeedList[i]
#print "Checking ({0}, {1})".format(query_index2, seed_length2)
if min(abs(query_index2 + seed_length2 - query_index1), abs(query_index1 + seed_length1 - query_index2)) <= WINDOW_SIZE:
self.mergedSeedList.append((min(query_index1, query_index2), max(query_index1+seed_length1, query_index2+seed_length2)-min(query_index1, query_index2)))
self.unmergedSeedList.pop(i)
return self
self.unmergedSeedList.append((query_index1, seed_length1))
return self
def mergeSeedListIntoSelf(self, seedlist2):
print "merging seed"
for (query_index2, seed_length2) in seedlist2.unmergedSeedList:
wasmerged = False
for i in range(0, len(self.unmergedSeedList)):
(query_index1, seed_length1) = self.unmergedSeedList[i]
if min(abs(query_index2 + seed_length2 - query_index1), abs(query_index1 + seed_length1 - query_index2)) <= WINDOW_SIZE:
self.mergedSeedList.append((min(query_index1, query_index2), max(query_index1+seed_length1, query_index2+seed_length2)-min(query_index1, query_index2)))
self.unmergedSeedList.pop(i)
wasmerged = True
break
if not wasmerged:
self.unmergedSeedList.append((query_index2, seed_length2))
return self
To gdzie wydajność naprawdę zepsuje nawet dla sekwencji umiarkowanej długości.
Czy istnieje lepszy sposób na wykonanie tej agregacji? Moje przeczucie mówi "tak", ale nie mogę tego wymyślić.
Wiem, że jest to bardzo długotrwałe i techniczne pytanie, a ja naprawdę doceniam każdy wgląd, nawet jeśli nie ma łatwego rozwiązania.
Edycja: Oto jak robię tabel przeglądowych:
def createLookupTable(sequence_rdd, sequence_name, word_length):
global broadcastSequences
blank_list = []
def addItemToList(lst, val):
lst.append(val)
return lst
def mergeLists(lst1, lst2):
#print "Merging"
return lst1+lst2
return (sequence_rdd
.flatMap(lambda seq_len: range(0, seq_len - word_length + 1))
.repartition(PARALLELISM)
#.partitionBy(PARALLELISM)
.map(lambda index: (str(broadcastSequences.value[sequence_name][index:index + word_length]), index), preservesPartitioning=True)
.aggregateByKey(blank_list, addItemToList, mergeLists, PARALLELISM))
#.map(lambda (word, indices): (word, sorted(indices))))
i tutaj jest funkcja, która uruchamia całą operację:
def run(query_seq, database_sequence, translate_query=False):
global broadcastSequences
scoring_matrix = 'nucleotide' if isinstance(query_seq.alphabet, DNAAlphabet) else 'blosum62'
sequences = {'query': query_seq,
'database': database_sequence}
broadcastSequences = sc.broadcast(sequences)
query_rdd = sc.parallelize([len(query_seq)])
query_rdd.cache()
database_rdd = sc.parallelize([len(database_sequence)])
database_rdd.cache()
query_lookup_table_rdd = createLookupTable(query_rdd, 'query', WORD_SIZE)
query_lookup_table_rdd.cache()
database_lookup_table_rdd = createLookupTable(database_rdd, 'database', WORD_SIZE)
seeds_rdd = generateHsps(query_lookup_table_rdd, database_lookup_table_rdd)
return seeds_rdd
Edycja 2: Mam manipulowane co nieco i nieznacznie poprawiono wydajność, zastępując:
.flatMap(lambda (word, (query_indices, db_indices)): [(query_index, db_indices) for query_index in query_indices], preservesPartitioning=True)
.flatMap(lambda (query_index, db_indices): [(db_index - query_index, (query_index, WORD_SIZE)) for db_index in db_indices], preservesPartitioning=True)
w hits_rdd z:
.flatMap(lambda (word, (query_indices, db_indices)): itertools.product(query_indices, db_indices))
.map(lambda (query_index, db_index): (db_index - query_index, (query_index, WORD_SIZE)))
Przynajmniej teraz nie spalam pamięci z pośrednimi strukturami danych.
ma zmienną WINDOW_SIZE dla każdej listy SeedList? – Back2Basics
@ Back2Basics WINDOW_SIZE jest takie samo dla każdego SeedList. –
Hasło hits_rdd nie ma sensu. Czy możemy przyjrzeć się przykładom query_lookup_table_dd i database_lookup_table_dd, a także implementacjom flatMap i aggregateByKey? – Back2Basics