2013-10-05 20 views
7

Napisałem prosty przepływ MapReduce, aby odczytać w wierszach z pliku CSV z pliku w Google Cloud Storage, a następnie utworzyć Entity. Jednakże, wydaje mi się, że nie można go uruchomić na więcej niż jednym fragmencie.Jak zmniejszyć mapę AppEngine, aby zmniejszyć skalę?

Ten kod korzysta z mapreduce.control.start_map i wygląda mniej więcej tak.

class LoadEntitiesPipeline(webapp2.RequestHandler): 
     id = control.start_map(map_name, 
          handler_spec="backend.line_processor", 
          reader_spec="mapreduce.input_readers.FileInputReader", 
          queue_name=get_queue_name("q-1"), 
          shard_count=shard_count, 
          mapper_parameters={ 
           'shard_count': shard_count, 
           'batch_size': 50, 
           'processing_rate': 1000000, 
           'files': [gsfile], 
           'format': 'lines'}) 

Mam shard_count w obu miejscach, ponieważ nie jestem pewien, jakie metody faktycznie tego potrzebują. Ustawienie shard_count w dowolnym miejscu od 8 do 32, nic nie zmienia, ponieważ strona stanu zawsze mówi o 1/1 odłamkach. Aby oddzielić rzeczy, zrobiłem wszystko uruchomione w kolejce backendu z dużą liczbą instancji. Próbowałem dostosować parametry kolejki per this wiki. W końcu wydaje się, że działa się tylko seryjnie.

Wszelkie pomysły? Dzięki!

Update (Wciąż bez powodzenia):

W próbuje izolować rzeczy, próbowałem wykonywanie połączenia przy użyciu połączenia bezpośrednie do rurociągu tak:

class ImportHandler(webapp2.RequestHandler): 

    def get(self, gsfile): 
     pipeline = LoadEntitiesPipeline2(gsfile) 
     pipeline.start(queue_name=get_queue_name("q-1")) 

     self.redirect(pipeline.base_path + "/status?root=" + pipeline.pipeline_id) 


class LoadEntitiesPipeline2(base_handler.PipelineBase): 

    def run(self, gsfile): 
     yield mapreduce_pipeline.MapperPipeline(
      'loadentities2_' + gsfile, 
      'backend.line_processor', 
      'mapreduce.input_readers.FileInputReader', 
      params={'files': [gsfile], 'format': 'lines'}, 
      shards=32 
     ) 

Z tym nowym kodem, to nadal działa tylko na jednym fragmencie. Zaczynam się zastanawiać, czy mapreduce.input_readers.FileInputReader ma możliwość równoległego wprowadzania danych po linii.

Odpowiedz

0

To mi wygląda FileInputReader powinien być zdolny do sharding oparty na szybkim czytaniu: https://code.google.com/p/appengine-mapreduce/source/browse/trunk/python/src/mapreduce/input_readers.py

Wygląda formacie „”: „linii” należy rozdzielić przy użyciu:. Self.get_current_file() readline()

Czy poprawnie interpretuje linie, gdy działa seryjnie? Może przerwy w linii są niewłaściwym kodowaniem lub czymś.

+0

Tak, nie ma nic dziwnego w moim kodowaniu newline i jest w stanie przetworzyć każdą linię dobrze, po prostu nie ma podziału na szczegółowość linii. W rzeczywistości, kiedy dzielę naprawdę duży plik na mniejsze pliki (na przykład 5000 linii). Dostaję wezwanie do mapreduce do odstrojenia, ale wygląda na to, że jest ono podzielone według nazw plików, a nie na drobniejszą ziarnistość. –

0

Z doświadczenia FileInputReader zrobi maksymalnie jeden fragment na plik. Rozwiązanie: Podziel duże pliki. Używam pliku split_file w https://github.com/johnwlockwood/karl_data do odfiltrowywania plików przed ich przesłaniem do Cloud Storage. Jeśli duże pliki już tam są, możesz użyć instancji Compute Engine, aby je usunąć i wykonać odłamkę, ponieważ szybkość transferu będzie najszybsza. FYI: karld jest w cheeseshop więc można pip install karld

5

Wygląda FileInputReader can odłamek tylko za pośrednictwem plików. Parametry format zmieniają jedynie sposób wywołania funkcji mapper. Jeśli przekażesz więcej niż jeden plik do programu odwzorowującego, zacznie on działać na więcej niż jednym fragmencie. W przeciwnym razie do przetworzenia danych użyje się tylko jednego fragmentu.

EDIT # 1:

Po kopać głębiej w bibliotece MapReduce. MapReduce zdecyduje, czy podzielić plik na części na podstawie zwrotu metody can_split dla każdego zdefiniowanego przez siebie typu pliku. Obecnie jedynym formatem, który implementuje metodę split jest ZipFormat. Tak więc, jeśli twój format pliku nie jest zip, nie podzieli pliku na więcej niż jeden fragment.

@classmethod 
    def can_split(cls): 
    """Indicates whether this format support splitting within a file boundary. 

    Returns: 
     True if a FileFormat allows its inputs to be splitted into 
    different shards. 
    """ 

https://code.google.com/p/appengine-mapreduce/source/browse/trunk/python/src/mapreduce/file_formats.py

Ale wygląda na to, że jest możliwe, aby napisać swój własny format pliku metodę podziału. Możesz najpierw spróbować zhakować i dodać metodę split na stronie _TextFormat i sprawdzić, czy działa więcej niż jeden odłamek.

@classmethod 
def split(cls, desired_size, start_index, opened_file, cache): 
    pass 

EDIT # 2:

Łatwym obejście będzie lewy bieg FileInputReader seryjnie ale przenieść zadanie czasowo cosuming równolegle reduce scenę.

def line_processor(line): 
    # serial 
    yield (random.randrange(1000), line) 

def reducer(key, values): 
    # parallel 
    entities = [] 
    for v in values: 
     entities.append(CREATE_ENTITY_FROM_VALUE(v)) 
    db.put(entities) 

EDIT # 3:

Jeśli próbować modyfikować FileFormat, to jest przykład (nie zostały jeszcze testy)

from file_formats import _TextFormat, FORMATS 


class _LinesSplitFormat(_TextFormat): 
    """Read file line by line.""" 

    NAME = 'split_lines' 

    def get_next(self): 
    """Inherited.""" 
    index = self.get_index() 
    cache = self.get_cache() 
    offset = sum(cache['infolist'][:index]) 

    self.get_current_file.seek(offset) 
    result = self.get_current_file().readline() 
    if not result: 
     raise EOFError() 
    if 'encoding' in self._kwargs: 
     result = result.encode(self._kwargs['encoding']) 
    return result 

    @classmethod 
    def can_split(cls): 
    """Inherited.""" 
    return True 

    @classmethod 
    def split(cls, desired_size, start_index, opened_file, cache): 
    """Inherited.""" 
    if 'infolist' in cache: 
     infolist = cache['infolist'] 
    else: 
     infolist = [] 
     for i in opened_file: 
     infolist.append(len(i)) 
     cache['infolist'] = infolist 

    index = start_index 
    while desired_size > 0 and index < len(infolist): 
     desired_size -= infolist[index] 
     index += 1 
    return desired_size, index 


FORMATS['split_lines'] = _LinesSplitFormat 

Następnie nowy format pliku może być wywołana poprzez zmień parametr mapper_parameters z lines na split_line.

class LoadEntitiesPipeline(webapp2.RequestHandler): 
    id = control.start_map(map_name, 
         handler_spec="backend.line_processor", 
         reader_spec="mapreduce.input_readers.FileInputReader", 
         queue_name=get_queue_name("q-1"), 
         shard_count=shard_count, 
         mapper_parameters={ 
          'shard_count': shard_count, 
          'batch_size': 50, 
          'processing_rate': 1000000, 
          'files': [gsfile], 
          'format': 'split_lines'}) 
+0

Dzięki za skomplikowaną odpowiedź i sugerowane obejścia. – Alice