2016-04-22 13 views
5

Próbuję utworzyć słownik z listy w pyspark. Mam następujący wykaz list:Co Wyjątek: Losowość skrótu łańcucha znaków powinna być wyłączona za pomocą PYTHONHASHSEED w pyspark?

rawPositions 

daje

[[1009794, 'LPF6 Comdty', 'BC22', 'Enterprise', 3.0, 3904.125, 390412.5], 
[1009794, 'LPF6 Comdty', 'BC22', 'Enterprise', 3.0, 3900.75, 390075.0], 
[1009794, 'LPF6 Comdty', 'BC22', 'Enterprise', 3.0, 3882.5625, 388256.25], 
[1009794, 'LPF6 Comdty', 'BC22', 'Enterprise', 3.0, 3926.25, 392625.0], 
[2766232, 
    'CDX IG CDSI S25 V1 5Y CBBT CORP', 
    'BC85', 
    'Enterprise', 
    30000000.0, 
    -16323.2439825, 
    30000000.0], 
[2766232, 
    'CDX IG CDSI S25 V1 5Y CBBT CORP', 
    'BC85', 
    'Enterprise', 
    30000000.0, 
    -16928.620101900004, 
    30000000.0], 
[1009804, 'LPM6 Comdty', 'BC29', 'Jet', 105.0, 129596.25, 12959625.0], 
[1009804, 'LPM6 Comdty', 'BC29', 'Jet', 128.0, 162112.0, 16211200.0], 
[1009804, 'LPM6 Comdty', 'BC29', 'Jet', 135.0, 167146.875, 16714687.5], 
[1009804, 'LPM6 Comdty', 'BC29', 'Jet', 109.0, 132884.625, 13288462.5]] 

Następnie używając mojego sparkcontext zmienną sc ja parallelize listę o

i = sc.parallelize(rawPositions) 
#i.collect() 

Potem spróbuj włączyć je do słownika za pomocą funkcji groupby na trzecim elemencie każdej pozycji listy.

j = i.groupBy(lambda x: x[3]) 
j.collect() 

Daje

--------------------------------------------------------------------------- 
Py4JJavaError        Traceback (most recent call last) 
<ipython-input-143-6113a75f0a9e> in <module>() 
     2 #i.collect() 
     3 j = i.groupBy(lambda x: x[3]) 
----> 4 j.collect() 

/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/pyspark/rdd.py in collect(self) 
    769   """ 
    770   with SCCallSiteSync(self.context) as css: 
--> 771    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 
    772   return list(_load_from_socket(port, self._jrdd_deserializer)) 
    773 

/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args) 
    811   answer = self.gateway_client.send_command(command) 
    812   return_value = get_return_value(
--> 813    answer, self.gateway_client, self.target_id, self.name) 
    814 
    815   for temp_arg in temp_args: 

/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/utils.py in deco(*a, **kw) 
    43  def deco(*a, **kw): 
    44   try: 
---> 45    return f(*a, **kw) 
    46   except py4j.protocol.Py4JJavaError as e: 
    47    s = e.java_exception.toString() 

/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 
    306     raise Py4JJavaError(
    307      "An error occurred while calling {0}{1}{2}.\n". 
--> 308      format(target_id, ".", name), value) 
    309    else: 
    310     raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 14 in stage 50.0 failed 4 times, most recent failure: Lost task 14.3 in stage 50.0 (TID 7583, brllxhtce01.bluecrest.local): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main 
    process() 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 133, in dump_stream 
    for obj in iterator: 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/pyspark/rdd.py", line 1703, in add_shuffle_key 
    buckets[partitionFunc(k) % numPartitions].append((k, v)) 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/rdd.py", line 74, in portable_hash 
    raise Exception("Randomness of hash of string should be disabled via PYTHONHASHSEED") 
Exception: Randomness of hash of string should be disabled via PYTHONHASHSEED 

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) 
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.RDD.collect(RDD.scala:926) 
    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405) 
    at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) 
    at sun.reflect.GeneratedMethodAccessor31.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:209) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main 
    process() 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 133, in dump_stream 
    for obj in iterator: 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/pyspark/rdd.py", line 1703, in add_shuffle_key 
    buckets[partitionFunc(k) % numPartitions].append((k, v)) 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/rdd.py", line 74, in portable_hash 
    raise Exception("Randomness of hash of string should be disabled via PYTHONHASHSEED") 
Exception: Randomness of hash of string should be disabled via PYTHONHASHSEED 

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    ... 1 more 

nie mam pojęcia, co odnosi się do tego błędu ... Każda pomoc będzie wielki!

Odpowiedz

13

Od Pythona 3.2.3+ hash str, bytedatetime i obiektów w Pythonie jest solony stosując losową wartość, aby zapobiec niektóre rodzaje denial-of-service ataków. Oznacza to, że wartości mieszania są spójne w obrębie pojedynczej sesji interpretera, ale różnią się w zależności od sesji. PYTHONHASHSEED ustawia nasienie RNG, aby zapewnić stałą wartość między sesjami.

Możesz to łatwo sprawdzić w swojej powłoce. Jeśli PYTHONHASHSEED nie jest ustawiony dostaniesz kilka losowych wartości:

unset PYTHONHASHSEED 
for i in `seq 1 3`; 
    do 
    python3 -c "print(hash('foo'))"; 
    done 

## -7298483006336914254 
## -6081529125171670673 
## -3642265530762908581 

ale gdy jest on ustawiony dostaniesz taką samą wartość w każdym wykonania:

export PYTHONHASHSEED=323 
for i in `seq 1 3`; 
    do 
    python3 -c "print(hash('foo'))"; 
    done 

## 8902216175227028661 
## 8902216175227028661 
## 8902216175227028661 

Rejestracja groupBy i innych operacji, które zależą w przypadku domyślnego mieszania haseł do partycjonowania potrzebujesz o tej samej wartości z PYTHONHASHSEED na wszystkich komputerach w klastrze, aby uzyskać spójne wyniki.

Zobacz także:

+0

Dziękuję. To naprawiło to! – ThatDataGuy

+0

Wow, to powinno być w samouczku "Getting started with Spark". Domyślam się, że to nie dlatego, że Spark domyślnie uruchamia Python 2.7. Dzięki, uratowałeś mnie. – sudo

Powiązane problemy