2016-01-15 17 views
7

Próbuję zrobić trochę tekstu NLP wyczyścić niektóre kolumny Unicode w DataFrame PySpark. Próbowałem już w Spark 1.3, 1.5 i 1.6 i nie mogę sprawić, żeby rzeczy działały w moim życiu. Próbowałem również używać Python 2.7 i Python 3.4.Pyspark DataFrame UDF na kolumnie tekstu

Stworzyłem bardzo prosty pakiet udf, jak widać poniżej, który powinien po prostu zwrócić ciąg znaków dla każdego rekordu w nowej kolumnie. Inne funkcje będą manipulować tekstem, a następnie powrócić zmieniony tekst z powrotem do nowej kolumny.

import pyspark 
from pyspark.sql import SQLContext 
from pyspark.sql.types import * 
from pyspark.sql import SQLContext 
from pyspark.sql.functions import udf 

def dummy_function(data_str): 
    cleaned_str = 'dummyData' 
    return cleaned_str 

dummy_function_udf = udf(dummy_function, StringType()) 

Niektóre przykładowe dane mogą być rozpakowane z here.

Oto kod używać do importowania danych, a następnie zastosować na UDF.

# Load a text file and convert each line to a Row. 
lines = sc.textFile("classified_tweets.txt") 
parts = lines.map(lambda l: l.split("\t")) 
training = parts.map(lambda p: (p[0], p[1])) 

# Create dataframe 
training_df = sqlContext.createDataFrame(training, ["tweet", "classification"]) 

training_df.show(5) 
+--------------------+--------------+ 
|    tweet|classification| 
+--------------------+--------------+ 
|rt @jiffyclub: wi...|  python| 
|rt @arnicas: ipyt...|  python| 
|rt @treycausey: i...|  python| 
|what's my best op...|  python| 
|rt @raymondh: #py...|  python| 
+--------------------+--------------+ 

# Apply UDF function 
df = training_df.withColumn("dummy", dummy_function_udf(training_df['tweet'])) 
df.show(5) 

Po uruchomieniu programu df.show (5) pojawia się następujący błąd. Rozumiem, że problem najprawdopodobniej nie wynika z show(), ale ślad nie daje mi wiele pomocy.

---------------------------------------------------------------------------Py4JJavaError        Traceback (most recent call last)<ipython-input-19-0b21c233c724> in <module>() 
     1 df = training_df.withColumn("dummy", dummy_function_udf(training_df['tweet'])) 
----> 2 df.show(5) 
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/sql/dataframe.py in show(self, n, truncate) 
    255   +---+-----+ 
    256   """ 
--> 257   print(self._jdf.showString(n, truncate)) 
    258 
    259  def __repr__(self): 
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args) 
    811   answer = self.gateway_client.send_command(command) 
    812   return_value = get_return_value(
--> 813    answer, self.gateway_client, self.target_id, self.name) 
    814 
    815   for temp_arg in temp_args: 
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/sql/utils.py in deco(*a, **kw) 
    43  def deco(*a, **kw): 
    44   try: 
---> 45    return f(*a, **kw) 
    46   except py4j.protocol.Py4JJavaError as e: 
    47    s = e.java_exception.toString() 
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 
    306     raise Py4JJavaError(
    307      "An error occurred while calling {0}{1}{2}.\n". 
--> 308      format(target_id, ".", name), value) 
    309    else: 
    310     raise Py4JError(
Py4JJavaError: An error occurred while calling o474.showString. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 10.0 failed 1 times, most recent failure: Lost task 0.0 in stage 10.0 (TID 10, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main 
    process() 
    File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream 
    vs = list(itertools.islice(iterator, batch)) 
    File "<ipython-input-12-4bc30395aac5>", line 4, in <lambda> 
IndexError: list index out of range 

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:129) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:125) 
    at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:913) 
    at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:929) 
    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:968) 
    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452) 
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280) 
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741) 
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) 
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212) 
    at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165) 
    at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174) 
    at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538) 
    at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538) 
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) 
    at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125) 
    at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1537) 
    at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1544) 
    at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1414) 
    at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1413) 
    at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2138) 
    at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1413) 
    at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1495) 
    at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:171) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:209) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main 
    process() 
    File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream 
    vs = list(itertools.islice(iterator, batch)) 
    File "<ipython-input-12-4bc30395aac5>", line 4, in <lambda> 
IndexError: list index out of range 

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:129) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:125) 
    at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:913) 
    at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:929) 
    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:968) 
    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452) 
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280) 
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741) 
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239) 

Rzeczywista funkcja usiłuję:

def tag_and_remove(data_str): 
    cleaned_str = ' ' 
    # noun tags 
    nn_tags = ['NN', 'NNP', 'NNP', 'NNPS', 'NNS'] 
    # adjectives 
    jj_tags = ['JJ', 'JJR', 'JJS'] 
    # verbs 
    vb_tags = ['VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ'] 
    nltk_tags = nn_tags + jj_tags + vb_tags 

    # break string into 'words' 
    text = data_str.split() 

    # tag the text and keep only those with the right tags 
    tagged_text = pos_tag(text) 
    for tagged_word in tagged_text: 
     if tagged_word[1] in nltk_tags: 
      cleaned_str += tagged_word[0] + ' ' 

    return cleaned_str 


tag_and_remove_udf = udf(tag_and_remove, StringType()) 
+2

Czy na pewno 'l.split ('\ t')' zwraca więcej niż jednej pozycji? Błąd indeksowania wynika prawdopodobnie z 'training = parts.map (...)'. Jak wyglądają Twoje dane - czy na pewno wszędzie są karty? – AChampion

+0

Tak, mogę potwierdzić, że dane mają dwie kolumny. Przed umieszczeniem pliku płaskiego wyczyściłem dane ze wszystkich spacji poza spacjami. Podam małą próbkę do góry. – dreyco676

+2

Nie dzielisz na białe znaki - tylko tabulatory - 'l.split()' rozdzieliłyby się na dowolne białe znaki. – AChampion

Odpowiedz

3

Twój zestaw danych nie jest czysty. 985 linii split('\t') tylko do jednej wartości:

>>> from operator import add 
>>> lines = sc.textFile("classified_tweets.txt") 
>>> parts = lines.map(lambda l: l.split("\t")) 
>>> parts.map(lambda l: (len(l), 1)).reduceByKey(add).collect() 
[(2, 149195), (1, 985)] 
>>> parts.filter(lambda l: len(l) == 1).take(5) 
[['"show me the money!” at what point do you start trying to monetize your #startup? tweet us with #startuplife.'], 
['a good pitch can mean money in the bank for your #startup. see how body language plays a key role: (via: ajalumnify)'], 
['100+ apps in five years? @2359media did it using microsoft #azure: #azureapps'], 
['does buying better coffee make you a better leader? little things can make a big difference: (via: @jmbrandonbb)'], 
['[email protected] graduates pitched\xa0#homeautomation #startups to #vcs! check out how they celebrated: ']] 

więc zmiana kodu do:

>>> training = parts.filter(lambda l: len(l) == 2).map(lambda p: (p[0], p[1].strip())) 
>>> training_df = sqlContext.createDataFrame(training, ["tweet", "classification"]) 
>>> df = training_df.withColumn("dummy", dummy_function_udf(training_df['tweet'])) 
>>> df.show(5) 
+--------------------+--------------+---------+ 
|    tweet|classification| dummy| 
+--------------------+--------------+---------+ 
|rt @jiffyclub: wi...|  python|dummyData| 
|rt @arnicas: ipyt...|  python|dummyData| 
|rt @treycausey: i...|  python|dummyData| 
|what's my best op...|  python|dummyData| 
|rt @raymondh: #py...|  python|dummyData| 
+--------------------+--------------+---------+ 
only showing top 5 rows 
+0

Dziękuję. Dowiedziałem się, że show() niekoniecznie powoduje pełne analizowanie, jeśli nie ma potrzeby, aby wartość N była określona. – dreyco676

4

Chyba jesteś misdefining problemu, a może uproszczenie lambda dla celów tego pytania, ale ukrywa się prawdziwy problem.

Twój ślad stosu czyta

File "<ipython-input-12-4bc30395aac5>", line 4, in <lambda> 
IndexError: list index out of range 

Kiedy uruchomić ten kod działa poprawnie:

import pyspark 
from pyspark.sql import SQLContext 
from pyspark.sql.types import * 
from pyspark.sql import SQLContext 
from pyspark.sql.functions import udf 

training_df = sqlContext.sql("select 'foo' as tweet, 'bar' as classification") 

def dummy_function(data_str): 
    cleaned_str = 'dummyData' 
    return cleaned_str 

dummy_function_udf = udf(dummy_function, StringType()) 
df = training_df.withColumn("dummy", dummy_function_udf(training_df['tweet'])) 
df.show() 

+-----+--------------+---------+ 
|tweet|classification| dummy| 
+-----+--------------+---------+ 
| foo|   bar|dummyData| 
+-----+--------------+---------+ 

Czy jesteś pewien, że nie jest to jakiś inny błąd w swojej dummy_function_udf? Jaka jest "prawdziwa" wersja, której używasz - poza tą przykładową wersją?

+0

Dziękuję bardzo za odpowiedź. Wygląda na to, że dane tekstowe są zawsze złe i zepsują twoje parsery. Spodziewałem się, że jakiekolwiek błędy przy analizie pojawią się w pliku training_df.show (5), ale wygląda na to, że analizuje on tylko pierwsze N ​​rekordów, jeśli wykonano je bez żadnych innych transformacji. – dreyco676

+0

Dzięki za odpowiedź. Miałem podobny problem. Czy mogę śledzić i pytać, co znaczy "udf"? Skopiowałem kod do powłoki tylko po to, aby znaleźć następujący błąd: Traceback (ostatnie ostatnie połączenie): Plik "", wiersz 1, w NameError: nazwa 'udf' nie jest zdefiniowana ' – yuqli

+0

Zdefiniowana przez użytkownika funkcja. –

Powiązane problemy