2015-09-28 18 views
8

W krótkim:Spark wielu kontekstów

klaster EC2: 1 Master 3 niewolnicy wersja

Spark: 1.3.1

Chciałbym użyć opcji spark.driver.allowMultipleContexts, jeden kontekstowy lokalny (tylko master) i jeden klaster (master i slaves).

otrzymuję ten błąd StackTrace (linia 29 jest gdzie zadzwonić do obiektu, który zainicjować drugi sparkcontext):

fr.entry.Main.main(Main.scala) 
    at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1$$anonfun$apply$10.apply(SparkContext.scala:1812) 
    at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1$$anonfun$apply$10.apply(SparkContext.scala:1808) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:1808) 
    at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:1795) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:1795) 
    at org.apache.spark.SparkContext$.setActiveContext(SparkContext.scala:1847) 
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:1754) 
    at fr.entry.cluster$.<init>(Main.scala:79) 
    at fr.entry.cluster$.<clinit>(Main.scala) 
    at fr.entry.Main$delayedInit$body.apply(Main.scala:29) 
    at scala.Function0$class.apply$mcV$sp(Function0.scala:40) 
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) 
    at scala.App$$anonfun$main$1.apply(App.scala:71) 
    at scala.App$$anonfun$main$1.apply(App.scala:71) 
    at scala.collection.immutable.List.foreach(List.scala:318) 
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) 
    at scala.App$class.main(App.scala:71) 
    at fr.entry.Main$.main(Main.scala:14) 
    at fr.entry.Main.main(Main.scala) 
15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app- 20150928153330-0036/2 is now LOADING 
15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app- 20150928153330-0036/0 is now RUNNING 
15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app-20150928153330-0036/1 is now RUNNING 
15/09/28 15:33:30 INFO SparkContext: Starting job: sum at Main.scala:29 
15/09/28 15:33:30 INFO DAGScheduler: Got job 0 (sum at Main.scala:29) with 2 output partitions (allowLocal=false) 
15/09/28 15:33:30 INFO DAGScheduler: Final stage: Stage 0(sum at Main.scala:29) 
15/09/28 15:33:30 INFO DAGScheduler: Parents of final stage: List() 
15/09/28 15:33:30 INFO DAGScheduler: Missing parents: List() 
15/09/28 15:33:30 INFO DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[2] at numericRDDToDoubleRDDFunctions at Main.scala:29), which has no missing parents 
15/09/28 15:33:30 INFO MemoryStore: ensureFreeSpace(2264) called with curMem=0, maxMem=55566516879 
15/09/28 15:33:30 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.2 KB, free 51.8 GB) 
15/09/28 15:33:30 INFO MemoryStore: ensureFreeSpace(1656) called with curMem=2264, maxMem=55566516879 
15/09/28 15:33:30 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1656.0 B, free 51.8 GB) 
15/09/28 15:33:30 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:40476 (size: 1656.0 B, free: 51.8 GB) 
15/09/28 15:33:30 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 
15/09/28 15:33:30 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:839 
15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app-20150928153330-0036/2 is now RUNNING 
15/09/28 15:33:30 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (MapPartitionsRDD[2] at numericRDDToDoubleRDDFunctions at Main.scala:29) 
15/09/28 15:33:30 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 
15/09/28 15:33:45 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 
15/09/28 15:34:00 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 

Więcej szczegółów:

Chciałbym uruchomić jeden program, który robi dwie rzeczy. Po pierwsze mam obiekt sparkContext lokalny (tylko na master), tworzę RDD i wykonuję niektóre operacje. Po drugie mam zainicjować drugie zainicjowanie programu isoConnect za pomocą mastera i 3 niewolników, które również wykonują RDD i wykonują niektóre operacje. Tak więc w pierwszym przypadku chcę użyć 16 rdzeni mistrza, a drugi przypadek chcę użyć 8 xres niewolników.

Prosty przykład:

val arr = Array(Array(1, 2, 3, 4, 5, 6, 7, 8), Array(1, 2, 3, 4, 5, 6, 7, 8)) 
println(local.sparkContext.makeRDD(arr).count()) 
println(cluster.sparkContext.makeRDD(arr).map(l => l.sum).sum) 

Moi dwaj SparkContexts:

object local { 

    val project = "test" 
    val version = "1.0" 

    val sc = new SparkConf() 
    .setMaster("local[16]") 
    .setAppName("Local") 
    .set("spark.local.dir", "/mnt") 
    .setJars(Seq("target/scala-2.10/" + project + "-assembly-" + version + ".jar", "target/scala-2.10/" + project + "_2.10-" + version + "-tests.jar")) 
    .setSparkHome("/root/spark") 
    .set("spark.driver.allowMultipleContexts", "true") 
    .set("spark.executor.memory", "45g") 

    val sparkContext = new SparkContext(sc) 
} 

object cluster { 

    val project = "test" 
    val version = "1.0" 

    val sc = new SparkConf() 
    .setMaster(masterURL) // ec2-XX-XXX-XXX-XX.compute-1.amazonaws.com 
    .setAppName("Cluster") 
    .set("spark.local.dir", "/mnt") 
    .setJars(Seq("target/scala-2.10/" + project + "-assembly-" + version + ".jar", "target/scala-2.10/" + project + "_2.10-" + version + "-tests.jar") ++ otherJars) 
    .setSparkHome("/root/spark") 
    .set("spark.driver.allowMultipleContexts", "true") 
    .set("spark.executor.memory", "35g") 

    val sparkContext = new SparkContext(sc) 
} 

Jak mogę rozwiązać ten problem?

+1

Czy możesz podać, dlaczego chcesz używać dwóch kontekstów? Najczęściej nie jest to wymagane. – Gillespie

+0

@Gillespie, Załóżmy, że mam 3 programy: prog1 i prog3 mogą działać równolegle, a prog2 musi być sekwencyjny. Odsetek Prog1 to 15 RDD (zestawy danych). Prog2 to algorytm uczenia maszynowego, który muszę uruchomić 15 razy. Ponieważ prog2 musi działać lokalnie na 1 rdzeniu. Zrobiłem mały hack, który ma zrobić jeden RDD, który zawiera 15 zebranych zestawów danych. Mapuję na tym RDD i uruchamiam prog2 na każdym rekordzie. Prog3 bierze 15 wyników prog2 i wykonuje kilka operacji równolegle. Mam nadzieję, że to jest jasne? Myślę, że w moim przypadku jest to wymagane, ale jeśli tak nie jest, jestem również zainteresowany odpowiedzią na mój mały przykład. – GermainGum

Odpowiedz

10

Mimo że istnieje opcja konfiguracyjna spark.driver.allowMultipleContext, jest to mylące, ponieważ odradza się korzystanie z wielu kontekstów Spark. Ta opcja jest używana tylko w wewnętrznych testach Sparka i nie powinna być używana w programach użytkownika. Możesz uzyskać nieoczekiwane wyniki, uruchamiając więcej niż jeden kontekst Sparka w pojedynczej maszynie JVM.

+0

Czy to zniechęcenie zostało udokumentowane w dowolnym miejscu? Chciałbym, żeby było prawdą, że 2 jest zniechęcone, ale chciałbym zobaczyć, że gdzieś oficjalnie, jeśli to możliwe, – Kristian

+0

To ograniczenie zostało rzekomo podniesione w iskrze 2.0. Patrzę na to. – javadba

1

Jeśli wymagana jest koordynacja między 2 programami, lepiej byłoby uczynić ją częścią pojedynczej aplikacji Spark, aby wykorzystać wewnętrzne optymalizacje Sparksa i uniknąć niepotrzebnych operacji wejścia/wyjścia.

Po drugie, jeśli 2 aplikacje nie muszą koordynować w żaden sposób, można uruchomić 2 oddzielne aplikacje. Ponieważ używasz Amazon EC2/EMR, możesz użyć narzędzia YARN jako swojego menedżera zasobów bez znaczących inwestycji czasowych, zgodnie z opisem here.

1

Jeśli masz potrzebę pracy z dużą ilością kontekstów Spark, możesz włączyć opcję specjalną [MultipleContexts] (1), ale jest ona używana tylko do wewnętrznych testów Sparka i nie powinna być używana w programach użytkownika. Otrzymasz nieoczekiwane zachowanie podczas uruchamiania więcej niż jednego kontekstu Spark w pojedynczej maszynie JVM [SPARK-2243] (2). Możliwe jest jednak tworzenie różnych kontekstów w oddzielnych maszynach JVM i zarządzanie kontekstami na poziomie SparkConf, co optymalnie dopasuje wykonywalne zadania.

Wygląda to tak: Mist creates every new Sparkcontext in its own JVM.

Jest middleware na szczycie Spark - [Mist]. Zarządza kontekstami Sparka i wieloma JVM, dzięki czemu możesz mieć różne zadania, takie jak potok ETL, szybkie zadanie prognozy, zapytanie o Hive Hive i aplikację do strumieniowania Spark, działającą równolegle w tym samym klastrze.

1> github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/SparkContextSuite.scala#L67

2> issues.apache.org/jira/przeglądania/SPARK-2243

0

Java:

.set("spark.driver.allowMultipleContexts", "true") 

+

sparkContext.cancelAllJobs(); 
sparkContext.stop(); 

To działa na mnie.

Powiązane problemy