2015-09-24 9 views
5

Właśnie zaczynam migać. Napisałem następujący kod i mam „wyjścia źródła danych spowodował błąd: nie można odczytać kodu otoki użytkownika” BłądMigotanie: Wyjścia DataSource spowodowały błąd: Nie można odczytać otoki kodu użytkownika

Czy istnieje coś robię źle?

wersja: Flink v 0.9.1 (Hadoop 1) nieużywanie Hadoop: Zawarcie Local powłoki: scala skorupę

Kod:

val env = ExecutionEnvironment.getExecutionEnvironment 
val text = env.readTextFile("/home/ashish/Downloads/spark/synop.201501.csv" 
val data_split = text.flatMap{_.split(';')} 
data_split.first(3).print() 

Uwaga: plik wejściowy używa ';' jak deliminator

błąd:

Scala-Flink> val data_split = text.flatMap{_.split(';')} 
data_split: org.apache.flink.api.scala.DataSet[String] = [email protected] 
Scala-Flink> data_split.first(3).print() 
09/24/2015 09:20:14 Job execution switched to status RUNNING. 
09/24/2015 09:20:14 CHAIN DataSource (at $line26.$read$$iw$$iw$$iw$$iw$$iw$$iw$.<init>(<console>:14) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at $line27.$read$$iw$$iw$$iw$$iw$$iw$$iw$.<init>(<console>:15))(1/1) switched to SCHEDULED 
09/24/2015 09:20:14 CHAIN DataSource (at $line26.$read$$iw$$iw$$iw$$iw$$iw$$iw$.<init>(<console>:14) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at $line27.$read$$iw$$iw$$iw$$iw$$iw$$iw$.<init>(<console>:15))(1/1) switched to DEPLOYING 
09/24/2015 09:20:14 CHAIN DataSource (at $line26.$read$$iw$$iw$$iw$$iw$$iw$$iw$.<init>(<console>:14) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at $line27.$read$$iw$$iw$$iw$$iw$$iw$$iw$.<init>(<console>:15))(1/1) switched to FAILED 
java.lang.Exception: Call to registerInputOutput() of invokable failed 
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:504) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.RuntimeException: The initialization of the DataSource's outputs caused an error: Could not read the user code wrapper: $line27.$read$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1 
    at org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:89) 
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:501) 
    ... 1 more 
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not read the user code wrapper: $line27.$read$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1 
    at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:284) 
    at org.apache.flink.runtime.operators.RegularPactTask.instantiateUserCode(RegularPactTask.java:1507) 
    at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.setup(ChainedFlatMapDriver.java:39) 
    at org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:72) 
    at org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1378) 
    at org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:290) 
    at org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:86) 
    ... 2 more 
Caused by: java.lang.ClassNotFoundException: $line27.$read$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
    at java.lang.Class.forName0(Native Method) 
    at java.lang.Class.forName(Class.java:348) 
    at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:71) 
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) 
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) 
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302) 
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264) 
    at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282) 
    ... 8 more 

09/24/2015 09:20:14 Job execution switched to status FAILING. 
09/24/2015 09:20:14 CHAIN GroupReduce (GroupReduce at org.apache.flink.api.scala.DataSet.first(DataSet.scala:707)) -> FlatMap (collect())(1/1) switched to CANCELED 
09/24/2015 09:20:14 DataSink (collect() sink)(1/1) switched to CANCELED 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed. 
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) 
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) 
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) 
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) 
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43) 
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) 
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) 
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) 
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465) 
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:221) 
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
Caused by: java.lang.Exception: Call to registerInputOutput() of invokable failed 
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:504) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.RuntimeException: The initialization of the DataSource's outputs caused an error: Could not read the user code wrapper: $anonfun$1 
    at org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:89) 
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:501) 
    ... 1 more 
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not read the user code wrapper: $anonfun$1 
    at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:284) 
    at org.apache.flink.runtime.operators.RegularPactTask.instantiateUserCode(RegularPactTask.java:1507) 
    at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.setup(ChainedFlatMapDriver.java:39) 
    at org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:72) 
    at org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1378) 
    at org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:290) 
    at org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:86) 
    ... 2 more 
Caused by: java.lang.ClassNotFoundException: $anonfun$1 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
    at java.lang.Class.forName0(Native Method) 
    at java.lang.Class.forName(Class.java:348) 
    at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:71) 
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) 
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) 
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302) 
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264) 
    at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282) 
    ... 8 more 
+1

Kilka rzeczy, które mogą pomóc w udzieleniu odpowiedzi: (1) Trzecia linia (uzyskanie nowego środowiska wykonawczego) nie powinna zostać usunięta. Mieszanie różnych środowisk prawdopodobnie spowoduje problemy (i może być przyczyną tego problemu). (2) Czy możesz opublikować pełny ślad stosu wyjątków. Przyczyna główna jest niedostępna, powinna znajdować się poniżej "spowodowanej przez" w dalszym ciągu śledzenia stosu. (3) Twój przykład kodu wydaje się mieć obcięte linie, czy możesz opublikować pełne wiersze? –

+0

the val env powinien być pierwszą linią ... Przykro mi z tego powodu .val env = ExecutionEnvironment.getExecutionEnvironmentval text = env.readTextFile ("/ home/ashish/Downloads/spark/synop.201501.csv" val data_split = text.flatMap {_. split (';')} data_split.first (3) .print() – ashish

+0

Mam zaktualizowany pełny dziennik błędów – ashish

Odpowiedz

2

problemem jest stwierdzenie "val env = ExecutionEnvironment.getExecutionEnvironment" w pierwszej linii.

Scala Shell ma już ExecutionEnvironment, powiązany ze zmienną "env", która jest skonfigurowana do prawidłowego ładowania klas generowanych przez powłokę.

Tworząc nowe środowisko wykonawcze, zastępuje się wstępnie skonfigurowane środowisko, które nie jest prawidłowo skonfigurowane.

+0

Stephan, czy mógłbyś podać link na temat tego, jak powinien być testowany program lokalnie w repl? Mam ten sam problem z projektem flink 1.2, który rozwija się w środowisku emacs-ensime. 'env.readTextFile (" file:/d:/data/test.csv "). first (5) .print()' działa dobrze, jednak "env.rea dTextFile ("file:/d:/data/test.csv"). map (_. split ('')). first (5) .print() '- nie. –

+0

I działa poprawnie w Windowsie cmd w konsoli sbt. In ensime-inferior-scala 'org.apache.flink.runtime.operators.util.CorruptConfigurationException: Nie można odczytać otoki kodu użytkownika: $ anonfun $ 1'. –

Powiązane problemy