2014-12-10 38 views
7

Mam trywialny program iskier. Wyciąłem dane wejściowe do jednego pliku z jedną linią. Więc jestem pewien, że to nie jest tradycyjna presja pamięci.KryoException: Przepełnienie bufora z bardzo małym wkładem

Exception in thread "main" com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 32749568, required: 34359296 
    at com.esotericsoftware.kryo.io.Output.require(Output.java:138) 
    at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) 
    at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206) 
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29) 
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18) 
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) 
    at carbonite.serializer$write_map.invoke(serializer.clj:69) 

mogę ustawić spark.kryoserializer.buffer.mb, ale myślę, że odkładając tylko problem. Chciałbym to zrozumieć.

Nie sądzę, że w programie jest coś niestandardowego. Jeśli usuniemy pojedynczą linię (pozornie losowo), błąd zniknie.

Wygląda na to, że trafiam w jakiś ustalony limit. Ale fakt, że mój plik wejściowy jest bardzo mały i jedyne operacje, które wykonuję, są przewidywalne. maps i reduceByKey. Podejrzewam, że jest coś jeszcze.

Używam biblioteki Flambo Clojure 0.4.0 (ale nie sądzę, że to powoduje) i Spark Core 2.10.

Oto minimalny przykład roboczy. Przepraszam, że to trochę tajemnicze, ale usunąłem wszystko, co zewnętrzne.

(ns mytest.core 
    (:require [flambo.conf :as conf]) 
    (:require [flambo.api :as f])) 

(def sc (f/spark-context (-> (conf/spark-conf) 
      (conf/master "local") 
      (conf/app-name "test") 
      (conf/set "spark.driver.memory" "1g") 
      (conf/set "spark.executor.memory" "1g")))) 

(defn -main 

    [& args] 
    (let [logfile (f/text-file sc "file://tmp/one-line-file") 
     a (f/map logfile (f/fn [u] nil)) 
     b (f/map logfile (f/fn [u] nil)) 
     c (f/map logfile (f/fn [u] nil)) 
     d (f/map logfile (f/fn [u] nil)) 
     e (f/map logfile (f/fn [u] nil)) 
     g (f/map logfile (f/fn [u] nil)) 
     h (f/map logfile (f/fn [u] nil)) 
     i (f/map logfile (f/fn [u] nil)) 
     j (f/map logfile (f/fn [u] nil)) 
     k (f/map logfile (f/fn [u] nil)) 
     l (f/map logfile (f/fn [u] nil)) 
     m (f/map logfile (f/fn [u] nil)) 
     n (f/map logfile (f/fn [u] nil)) 
     o (f/map logfile (f/fn [u] nil)) 
     p (f/map logfile (f/fn [u] nil)) 
     q (f/map logfile (f/fn [u] nil)) 
     r (f/map logfile (f/fn [u] nil)) 
     s (f/map logfile (f/fn [u] nil)) 
     t (f/map logfile (f/fn [u] nil)) 
])) 

EDIT

Jeśli podzielić to na dwa kawałki i ponownie utworzyć leniwy strumień pliku, to działa:

(defn get-inputs [] 
    (f/text-file sc "file://tmp/one-line-file")) 

(defn -main 

    [& args] 
    (let [logfile (get-inputs) 
     a (f/map logfile (f/fn [u] nil)) 
     b (f/map logfile (f/fn [u] nil)) 
     c (f/map logfile (f/fn [u] nil)) 
     d (f/map logfile (f/fn [u] nil)) 
     e (f/map logfile (f/fn [u] nil)) 
     g (f/map logfile (f/fn [u] nil)) 
     h (f/map logfile (f/fn [u] nil)) 
     i (f/map logfile (f/fn [u] nil))]) 

    (let [logfile (get-inputs) 
     j (f/map logfile (f/fn [u] nil)) 
     k (f/map logfile (f/fn [u] nil)) 
     l (f/map logfile (f/fn [u] nil)) 
     m (f/map logfile (f/fn [u] nil)) 
     n (f/map logfile (f/fn [u] nil)) 
     o (f/map logfile (f/fn [u] nil)) 
     p (f/map logfile (f/fn [u] nil)) 
     q (f/map logfile (f/fn [u] nil)) 
     r (f/map logfile (f/fn [u] nil)) 
     s (f/map logfile (f/fn [u] nil)) 
     t (f/map logfile (f/fn [u] nil))])) 

W Javie byłoby to równoważne utworzeniu dwóch zakresy lokalne (np. dwie oddzielne metody). I get-inputs to tylko metoda, która zwraca nowo skonstruowany obiekt pliku tekstowego.

Pomyślałem, że metoda textFile utworzy leniwy strumień, który może być (ponownie) odczytywany wiele razy, więc te dwa przykłady nie powinny się znacznie różnić.

Odpowiedz

1

Dodaj to do swojej zapłonowej kontekstowego conf:

conf.set("spark.kryoserializer.buffer.mb","128") 
+1

Dzięki Zrobiłem to, ale to nie jest odpowiedź na moje pytanie. Dlaczego w buforze zabraknie miejsca w jednym pliku? – Joe

+0

Może dlatego, że plik jest zbyt duży. Innym powodem może być dołączenie do pamięci/zapisu na dysku. –

Powiązane problemy