W krótkim:Spark wielu kontekstów
klaster EC2: 1 Master 3 niewolnicy wersja
Spark: 1.3.1
Chciałbym użyć opcji spark.driver.allowMultipleContexts, jeden kontekstowy lokalny (tylko master) i jeden klaster (master i slaves).
otrzymuję ten błąd StackTrace (linia 29 jest gdzie zadzwonić do obiektu, który zainicjować drugi sparkcontext):
fr.entry.Main.main(Main.scala)
at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1$$anonfun$apply$10.apply(SparkContext.scala:1812)
at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1$$anonfun$apply$10.apply(SparkContext.scala:1808)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:1808)
at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:1795)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:1795)
at org.apache.spark.SparkContext$.setActiveContext(SparkContext.scala:1847)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:1754)
at fr.entry.cluster$.<init>(Main.scala:79)
at fr.entry.cluster$.<clinit>(Main.scala)
at fr.entry.Main$delayedInit$body.apply(Main.scala:29)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
at scala.App$class.main(App.scala:71)
at fr.entry.Main$.main(Main.scala:14)
at fr.entry.Main.main(Main.scala)
15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app- 20150928153330-0036/2 is now LOADING
15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app- 20150928153330-0036/0 is now RUNNING
15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app-20150928153330-0036/1 is now RUNNING
15/09/28 15:33:30 INFO SparkContext: Starting job: sum at Main.scala:29
15/09/28 15:33:30 INFO DAGScheduler: Got job 0 (sum at Main.scala:29) with 2 output partitions (allowLocal=false)
15/09/28 15:33:30 INFO DAGScheduler: Final stage: Stage 0(sum at Main.scala:29)
15/09/28 15:33:30 INFO DAGScheduler: Parents of final stage: List()
15/09/28 15:33:30 INFO DAGScheduler: Missing parents: List()
15/09/28 15:33:30 INFO DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[2] at numericRDDToDoubleRDDFunctions at Main.scala:29), which has no missing parents
15/09/28 15:33:30 INFO MemoryStore: ensureFreeSpace(2264) called with curMem=0, maxMem=55566516879
15/09/28 15:33:30 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.2 KB, free 51.8 GB)
15/09/28 15:33:30 INFO MemoryStore: ensureFreeSpace(1656) called with curMem=2264, maxMem=55566516879
15/09/28 15:33:30 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1656.0 B, free 51.8 GB)
15/09/28 15:33:30 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:40476 (size: 1656.0 B, free: 51.8 GB)
15/09/28 15:33:30 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0
15/09/28 15:33:30 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:839
15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app-20150928153330-0036/2 is now RUNNING
15/09/28 15:33:30 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (MapPartitionsRDD[2] at numericRDDToDoubleRDDFunctions at Main.scala:29)
15/09/28 15:33:30 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
15/09/28 15:33:45 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
15/09/28 15:34:00 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
Więcej szczegółów:
Chciałbym uruchomić jeden program, który robi dwie rzeczy. Po pierwsze mam obiekt sparkContext lokalny (tylko na master), tworzę RDD i wykonuję niektóre operacje. Po drugie mam zainicjować drugie zainicjowanie programu isoConnect za pomocą mastera i 3 niewolników, które również wykonują RDD i wykonują niektóre operacje. Tak więc w pierwszym przypadku chcę użyć 16 rdzeni mistrza, a drugi przypadek chcę użyć 8 xres niewolników.
Prosty przykład:
val arr = Array(Array(1, 2, 3, 4, 5, 6, 7, 8), Array(1, 2, 3, 4, 5, 6, 7, 8))
println(local.sparkContext.makeRDD(arr).count())
println(cluster.sparkContext.makeRDD(arr).map(l => l.sum).sum)
Moi dwaj SparkContexts:
object local {
val project = "test"
val version = "1.0"
val sc = new SparkConf()
.setMaster("local[16]")
.setAppName("Local")
.set("spark.local.dir", "/mnt")
.setJars(Seq("target/scala-2.10/" + project + "-assembly-" + version + ".jar", "target/scala-2.10/" + project + "_2.10-" + version + "-tests.jar"))
.setSparkHome("/root/spark")
.set("spark.driver.allowMultipleContexts", "true")
.set("spark.executor.memory", "45g")
val sparkContext = new SparkContext(sc)
}
object cluster {
val project = "test"
val version = "1.0"
val sc = new SparkConf()
.setMaster(masterURL) // ec2-XX-XXX-XXX-XX.compute-1.amazonaws.com
.setAppName("Cluster")
.set("spark.local.dir", "/mnt")
.setJars(Seq("target/scala-2.10/" + project + "-assembly-" + version + ".jar", "target/scala-2.10/" + project + "_2.10-" + version + "-tests.jar") ++ otherJars)
.setSparkHome("/root/spark")
.set("spark.driver.allowMultipleContexts", "true")
.set("spark.executor.memory", "35g")
val sparkContext = new SparkContext(sc)
}
Jak mogę rozwiązać ten problem?
Czy możesz podać, dlaczego chcesz używać dwóch kontekstów? Najczęściej nie jest to wymagane. – Gillespie
@Gillespie, Załóżmy, że mam 3 programy: prog1 i prog3 mogą działać równolegle, a prog2 musi być sekwencyjny. Odsetek Prog1 to 15 RDD (zestawy danych). Prog2 to algorytm uczenia maszynowego, który muszę uruchomić 15 razy. Ponieważ prog2 musi działać lokalnie na 1 rdzeniu. Zrobiłem mały hack, który ma zrobić jeden RDD, który zawiera 15 zebranych zestawów danych. Mapuję na tym RDD i uruchamiam prog2 na każdym rekordzie. Prog3 bierze 15 wyników prog2 i wykonuje kilka operacji równolegle. Mam nadzieję, że to jest jasne? Myślę, że w moim przypadku jest to wymagane, ale jeśli tak nie jest, jestem również zainteresowany odpowiedzią na mój mały przykład. – GermainGum