2015-05-25 14 views
6

Mam niestandardowe źródło danych i chcę załadować dane do mojego klastra Spark, aby wykonać pewne obliczenia. W tym celu widzę, że może potrzebować implementacji nowego RDD dla mojego źródła danych.Implementacja niestandardowego Spark RDD w Javie

Jestem kompletnym Scala noobem i mam nadzieję, że mogę zaimplementować RDD w samej Javie. Rozejrzałem się po Internecie i nie mogłem znaleźć żadnych zasobów. Jakieś wskazówki?

Moje dane są w S3 i są indeksowane w Dynamo. Na przykład, jeśli chcę załadować dane z danego przedziału czasowego, najpierw będę musiał zapytać Dynamo o klucze plików S3 dla odpowiedniego zakresu czasu, a następnie załadować je do Sparka. Pliki nie zawsze mają ten sam prefiks ścieżki S3, więc sc.testFile("s3://directory_path/") nie będzie działać.

Szukam wskazówek, jak wdrożyć coś analogicznego do HadoopRDD lub JdbcRDD, ale w Javie. Coś podobnego do tego, co tutaj zrobili: DynamoDBRDD. Ten odczytuje dane z Dynamo, mój niestandardowy RDD będzie sprawdzał klucze DynamoDB dla kluczy S3, a następnie ładuje je z S3.

+1

obiekt 'RDD' jest dość giętki pojemnik. Jak myślisz, dlaczego musiałbyś go ponownie wdrożyć? Jaki jest format Twoich danych? – ohruunuruus

+0

Moje dane są w S3 i są indeksowane w Dynamo. Na przykład, jeśli chcę załadować dane z danego przedziału czasowego, najpierw będę musiał zapytać Dynamo o klucze plików S3 dla odpowiedniego zakresu czasu, a następnie załadować je do Sparka. Pliki nie zawsze muszą znajdować się w tym samym przedrostku ścieżki S3, więc '' sc.testFile ("s3: // ścieżka_katalogu /") '' 'nie będzie działać. Szukam wskazówek, jak zaimplementować coś analogicznego do HadoopRDD lub JdbcRDD, ale w Javie. –

+0

Zgodnie z tym: http://apache-spark-user-list.1001560.n3.nabble.com/is-there-any-easier-way-to-define-a-ustom-DDD-in-Java-td6917 .html nie było możliwe rok temu. jednak chciałbym wiedzieć, czy coś się zmieniło. – tsiki

Odpowiedz

1

Jedną z opcji jest czytanie specyfikacji Hadoop, ale jeśli dane są uporządkowane, Spark SQL ma nową Data Sources API, niektóre implementacje są publikowane na Spark Packages, w tym avro, redshift i csv.

8

Można rozszerzyć RDD w Javie i zaimplementować metody getPartitions i Compute.

Java może rozszerzać klasy Scala z pewnymi ograniczeniami.

przykład:

package com.openmarket.danyal; 
// Other imports left out 
import org.apache.spark.Dependency; 
import org.apache.spark.Partition; 
import org.apache.spark.SparkConf; 
import org.apache.spark.SparkContext; 
import org.apache.spark.TaskContext; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.rdd.RDD; 

import scala.collection.AbstractIterator; 
import scala.collection.Iterator; 
import scala.collection.mutable.ArrayBuffer; 
import scala.reflect.ClassManifestFactory$; 
import scala.reflect.ClassTag; 

public class AlphaTest { 
    private static final ClassTag<String> STRING_TAG = ClassManifestFactory$.MODULE$.fromClass(String.class); 

    public static void main(final String[] args) { 
     SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("Learn ABCs"); 
     try(JavaSparkContext sc = new JavaSparkContext(conf)) { 
      System.out.println(new AlphabetRDD(sc.sc()).toJavaRDD().collect()); 
     } 
    } 

    public static class AlphabetRDD extends RDD<String> { 
     private static final long serialVersionUID = 1L; 

     public AlphabetRDD(SparkContext sc) { 
      super(sc, new ArrayBuffer<Dependency<?>>(), STRING_TAG); 
     } 

     @Override 
     public Iterator<String> compute(Partition arg0, TaskContext arg1) { 
      AlphabetRangePartition p = (AlphabetRangePartition)arg0; 
      return new CharacterIterator(p.from, p.to); 
     } 

     @Override 
     public Partition[] getPartitions() { 
      return new Partition[] {new AlphabetRangePartition(1, 'A', 'M'), new AlphabetRangePartition(2, 'P', 'Z')}; 
     } 

    } 

    /** 
    * A partition representing letters of the Alphabet between a range 
    */ 
    public static class AlphabetRangePartition implements Partition { 
     private static final long serialVersionUID = 1L; 
     private int index; 
     private char from; 
     private char to; 

     public AlphabetRangePartition(int index, char c, char d) { 
      this.index = index; 
      this.from = c; 
      this.to = d; 
     } 

     @Override 
     public int index() { 
      return index; 
     } 

     @Override 
     public boolean equals(Object obj) { 
      if(!(obj instanceof AlphabetRangePartition)) { 
       return false; 
      } 
      return ((AlphabetRangePartition)obj).index != index; 
     } 

     @Override 
     public int hashCode() { 
      return index(); 
     } 
    } 

    /** 
    * Iterators over all characters between two characters 
    */ 
    public static class CharacterIterator extends AbstractIterator<String> { 
     private char next; 
     private char last; 

     public CharacterIterator(char from, char to) { 
      next = from; 
      this.last = to; 
     } 

     @Override 
     public boolean hasNext() { 
      return next <= last; 
     } 

     @Override 
     public String next() { 
      // Post increments next after returning it 
      return Character.toString(next++); 
     } 
    } 
} 
+0

Świetne rozwiązanie. Jeden połów: Uruchom indeks partycji z 0, w przeciwnym razie metoda 'cartesian()' na tym niestandardowym RDD spowoduje wyjątek ArrayOutOfBoundException. 'return new Partition [] {new AlphabetRange Partition (0, 'A', 'M'), nowa AlphabetRange Partition (1, 'P', 'Z')};' –

Powiązane problemy