2015-07-18 17 views
7

Mam ogólnodostępny zasób Amazon S3 (plik tekstowy) i chcę uzyskać do niego dostęp od iskry. Oznacza to, że - nie mam żadnych poświadczeń Amazon - to działa dobrze, jeśli chcę tylko pobrać go:Dostęp do publicznego dostępnego pliku Amazon S3 z Apache Spark

val bucket = "<my-bucket>" 
val key = "<my-key>" 

val client = new AmazonS3Client 
val o = client.getObject(bucket, key) 
val content = o.getObjectContent // <= can be read and used as input stream 

Jednak przy próbie dostępu do tego samego zasobu z kontekstu zapłonowej

val conf = new SparkConf().setAppName("app").setMaster("local") 
val sc = new SparkContext(conf) 
val f = sc.textFile(s"s3a://$bucket/$key") 
println(f.count()) 

I pojawia się następujący błąd z stacktrace:

Exception in thread "main" com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain 
    at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117) 
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521) 
    at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031) 
    at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994) 
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297) 
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2653) 
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:92) 
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2687) 
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2669) 
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:371) 
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295) 
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:221) 
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270) 
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) 
    at scala.Option.getOrElse(Option.scala:121) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) 
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) 
    at scala.Option.getOrElse(Option.scala:121) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781) 
    at org.apache.spark.rdd.RDD.count(RDD.scala:1099) 
    at com.example.Main$.main(Main.scala:14) 
    at com.example.Main.main(Main.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) 

nie chce podać żadnych poświadczeń AWS - chcę tylko, aby uzyskać dostęp do zasobu anonimowo (na razie) - jak to osiągnąć? Prawdopodobnie potrzebuję użyć czegoś takiego jak AnonymousAWSCredentialsProvider - ale jak umieścić go wewnątrz iskry?

P.S. Mój build.sbt właśnie w przypadku:

scalaVersion := "2.11.7" 

libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % "1.4.1", 
    "org.apache.hadoop" % "hadoop-aws" % "2.7.1" 
) 

AKTUALIZACJA: Po przeprowadzeniu niektórych dochodzeń - widzę powód, dla którego nie działa.

Przede wszystkim S3AFileSystem tworzy klienta AWS z następującym porządku mandatów:

AWSCredentialsProviderChain credentials = new AWSCredentialsProviderChain(
    new BasicAWSCredentialsProvider(accessKey, secretKey), 
    new InstanceProfileCredentialsProvider(), 
    new AnonymousAWSCredentialsProvider() 
); 

„klawiszowy” i wartości „tajny klucz” zostały zaczerpnięte z instancji iskra conf (klawisze muszą być „fs.s3a. access.key "i" fs.s3a.secret.key "lub org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY i org.apache.hadoop.fs.s3a.Constants.SECRET_KEY stałych, co jest wygodniejsze).

Po drugie - prawdopodobnie widzisz, że AnonymousAWSCredentialsProvider to trzecia opcja (ostatni priorytet) - co może być z tym nie tak? Zobacz implementację funkcji AnonymousAWSCredentials:

public class AnonymousAWSCredentials implements AWSCredentials { 

    public String getAWSAccessKeyId() { 
     return null; 
    } 

    public String getAWSSecretKey() { 
     return null; 
    } 
} 

Po prostu zwraca wartość null dla klucza dostępu i tajnego klucza. Brzmi rozsądnie. Ale zajrzeć AWSCredentialsProviderChain:

AWSCredentials credentials = provider.getCredentials(); 

if (credentials.getAWSAccessKeyId() != null && 
    credentials.getAWSSecretKey() != null) { 
    log.debug("Loading credentials from " + provider.toString()); 

    lastUsedProvider = provider; 
    return credentials; 
} 

To nie wybiera dostawcę w przypadku oba klucze są nieważne - to znaczy anonimowe poświadczenia nie może pracować. Wygląda jak błąd wewnątrz aws-java-sdk-1.7.4. Próbowałem użyć najnowszej wersji - ale jest to niezgodne z hadoop-aws-2.7.1.

Jakieś inne pomysły?

+0

miałeś żadnych sukcesów, może z nowszych wersjach? –

+0

Nie, nie próbowałem tego od jakiegoś czasu - nawet o tym zapomniałem, nie używaj amazonka s3 do niczego. – pkozlov

Odpowiedz

3

Osobiście nigdy nie uzyskiwałem dostępu do publicznych danych ze Sparka. Możesz spróbować użyć atrapowych poświadczeń lub utworzyć je tylko dla tego użycia. Ustaw je bezpośrednio na obiekcie SparkConf.

val sparkConf: SparkConf = ??? 
val accessKeyId: String = ??? 
val secretAccessKey: String = ??? 
sparkConf.set("spark.hadoop.fs.s3.awsAccessKeyId", accessKeyId) 
sparkConf.set("spark.hadoop.fs.s3n.awsAccessKeyId", accessKeyId) 
sparkConf.set("spark.hadoop.fs.s3.awsSecretAccessKey", secretAccessKey) 
sparkConf.set("spark.hadoop.fs.s3n.awsSecretAccessKey", secretAccessKey) 

Jako alternatywa, przeczytaj dokumentację DefaultAWSCredentialsProviderChain aby zobaczyć, gdzie poświadczenia są szukali. Lista (kolejność jest istotna) wynosi:

  • Zmienne środowiskowe - AWS_ACCESS_KEY_ID i AWS_SECRET_KEY
  • Java Właściwości systemu - aws.accessKeyId i aws.secretKey
  • złożyć
  • danych logowania profile w domyślnej lokalizacji (~/.AWS/poświadczenia) współdzielone przez wszystkie SDK AWS i AWS CLI
  • poświadczeń profilu
  • instancji wydanego przez służby metadanych Amazon EC2
+0

Coś jest nadal nie tak. Dodałem następujące wartości do kluczy, które mi dałeś (dokładny ciąg "aaa" jako fałszywe poświadczenia). W najgorszym przypadku spodziewałem się zobaczyć błąd auth, ale widzę ten sam wyjątek "Nie można załadować danych uwierzytelniających AWS od żadnego dostawcy w łańcuchu" – pkozlov

+1

Poprawne klucze muszą być "spark.hadoop.fs.s3a.access.key" i " spark.hadoop.fs.s3a.secret.key "Przy okazji, podanie fałszywych wartości nie pomogło - teraz widzę błąd 403. Wygląda na to, że nie można użyć anonimowych danych uwierzytelniających dla AWS S3 z iskrą I zgodnie z kod źródłowy - kolejność mandatów różni AWSCredentialsProviderChain poświadczenia = new AWSCredentialsProviderChain ( nowy BasicAWSCredentialsProvider (ACCESSKEY, tajny klucz), nowy InstanceProfileCredentialsProvider(), nowy AnonymousAWSCredentialsProvider() ); I. anonimowy po prostu nie działa. – pkozlov

+0

Ok, przepraszam, nie widziałem, że używasz protokołu 's3a'. Czy próbowałeś z 's3n'? –