2014-11-19 18 views
10

sc.textFile (ścieżka) umożliwia odczytanie pliku HDFS, ale nie przyjmuje parametrów (np. Pomiń liczbę wierszy, has_headers, ...).Filtr RDD oparty na numerze wiersza

w "Spark" Nauka O'Reilly e-książki, jest zalecane, aby użyć następujących funkcji odczytać CSV (Przykład 5-12. Python przykład CSV obciążenia)

import csv 
import StringIO 

def loadRecord(line): 
"""Parse a CSV line""" 
input = StringIO.StringIO(line) 
reader = csv.DictReader(input, fieldnames=["name", "favouriteAnimal"]) 
return reader.next() 
input = sc.textFile(inputFile).map(loadRecord) 

Moje pytanie dotyczy jak być selektywny o wierszach „pobranych”:

  1. Jak uniknąć ładowania pierwszego wiersza (nagłówki)
  2. Jak usunąć konkretny wiersz (na przykład, wiersz 5)

Widzę tu kilka przyzwoitych rozwiązań: select range of elements, ale chciałbym zobaczyć, czy jest coś prostszego.

Thx!

Odpowiedz

16

Nie martw się ładowaniem wierszy/linii, których nie potrzebujesz. Gdy to zrobisz:

input = sc.textFile(inputFile) 

Nie ładujesz pliku. Właśnie dostajesz obiekt, który pozwoli ci operować na pliku. Aby być efektywnym, lepiej myśleć w kategoriach uzyskania tylko tego, czego chcesz. Na przykład:

header = input.take(1)[0] 
rows = input.filter(lambda line: line != header) 

Należy zauważyć, że tutaj nie używam indeksu do odniesienia do linii, którą chcę upuścić, ale raczej do jej wartości. Ma to skutek uboczny, że inne linie o tej wartości będą również ignorowane, ale bardziej w duchu Sparka, ponieważ Spark będzie rozpowszechniał twój plik tekstowy w różnych częściach w węzłach, a pojęcie numerów linii ginie w każdej partycji. Jest to również powód, dla którego nie jest to łatwe w Sparku (Hadoop), ponieważ każda partycja powinna być uważana za niezależną, a globalny numer linii przerwałby to założenie.

Jeśli naprawdę potrzebujesz pracować z numerami linii, polecam dodać je do pliku spoza Spark (patrz here), a następnie po prostu filtrować według tej kolumny wewnątrz Sparka.

Edytuj: Dodano rozwiązanie zipWithIndex zgodnie z sugestią @Daniel Darabos.

sc.textFile('test.txt')\ 
    .zipWithIndex()\   # [(u'First', 0), (u'Second', 1), ... 
    .filter(lambda x: x[1]!=5)\ # select columns 
    .map(lambda x: x[0])\  # [u'First', u'Second' 
    .collect() 
+0

Dzięki Elyase. Z moją edycją twój kod robi lewę dla nagłówków! –

+1

Możesz użyć 'RDD.zipWithIndex()', aby dodać numery wierszy, a następnie możesz również filtrować według tego. (Zauważ, że 'zipWithIndex' nie jest tani.) –