Skip to content
This repository has been archived by the owner on Nov 28, 2023. It is now read-only.

Job crashes with py4j.Py4JException: Cannot obtain a new communication channel #5

Open
natasha-aleksandrova opened this issue Mar 11, 2018 · 3 comments

Comments

@natasha-aleksandrova
Copy link

I am trying to get this library and not having any luck... Appreciate any help!

here is the code:

from pyspark.streaming import StreamingContext
from signifai.pubsub import PubsubUtils
from pyspark import SparkContext


SUBSCRIPTION = "projects/bigdata220-final-project/subscriptions/out_meetup_rsvp"

sc =SparkContext()
ssc = StreamingContext(sc, 1)
pubsubStream = PubsubUtils.createStream(ssc, SUBSCRIPTION, 5, False)
pubsubStream.flatMap(lambda x: x).pprint()
ssc.start()

here are the logs from GCP:

18/03/11 05:00:38 INFO org.spark_project.jetty.util.log: Logging initialized @2825ms
18/03/11 05:00:38 INFO org.spark_project.jetty.server.Server: jetty-9.3.z-SNAPSHOT
18/03/11 05:00:38 INFO org.spark_project.jetty.server.Server: Started @2951ms
18/03/11 05:00:39 INFO org.spark_project.jetty.server.AbstractConnector: Started ServerConnector@5625b833{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
18/03/11 05:00:39 INFO com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase: GHFS version: 1.6.3-hadoop2
18/03/11 05:00:40 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at cluster-main-m/10.128.0.5:8032
18/03/11 05:00:43 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted application application_1519879216511_0007
18/03/11 05:00:52 WARN org.apache.spark.streaming.StreamingContext: Dynamic Allocation is enabled for this application. Enabling Dynamic allocation for Spark Streaming applications can cause data loss if Write Ahead Log is not enabled for non-replayable sources like Flume. See the programming guide for details on how to enable the Write Ahead Log.

[Stage 0:>                                                         (0 + 0) / 50]
[Stage 0:>                                                         (0 + 1) / 50]
[Stage 0:=>                                                        (1 + 1) / 50]
[Stage 0:=====>                                                    (5 + 1) / 50]
[Stage 0:===========>                                             (10 + 1) / 50]
[Stage 0:=================>                                       (15 + 1) / 50]
[Stage 0:==========================>                              (23 + 1) / 50]
[Stage 0:===================================>                     (31 + 1) / 50]
[Stage 0:============================================>            (39 + 1) / 50]
[Stage 0:=======================================================> (49 + 1) / 50]
[Stage 1:================================================>        (17 + 1) / 20]
                                                                                
18/03/11 05:00:59 INFO io.signifai.pubsub_spark.receiver.PubsubInputDStream: Slide time = 1000 ms
18/03/11 05:00:59 INFO io.signifai.pubsub_spark.receiver.PubsubInputDStream: Storage level = Serialized 1x Replicated
18/03/11 05:00:59 INFO io.signifai.pubsub_spark.receiver.PubsubInputDStream: Checkpoint interval = null
18/03/11 05:00:59 INFO io.signifai.pubsub_spark.receiver.PubsubInputDStream: Remember interval = 1000 ms
18/03/11 05:00:59 INFO io.signifai.pubsub_spark.receiver.PubsubInputDStream: Initialized and validated io.signifai.pubsub_spark.receiver.PubsubInputDStream@395a0746
18/03/11 05:01:00 ERROR org.apache.spark.streaming.scheduler.JobScheduler: Error generating jobs for time 1520744460000 ms
py4j.Py4JException: Cannot obtain a new communication channel
	at py4j.CallbackClient.sendCommand(CallbackClient.java:340)
	at py4j.CallbackClient.sendCommand(CallbackClient.java:316)
	at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:103)
	at com.sun.proxy.$Proxy27.call(Unknown Source)
	at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
	at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
	at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:246)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
	at scala.Option.orElse(Option.scala:289)
	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
	at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
	at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
	at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
18/03/11 05:01:00 ERROR org.apache.spark.streaming.api.python.PythonDStream$$anon$1: Cannot connect to Python process. It's probably dead. Stopping StreamingContext.
py4j.Py4JException: Cannot obtain a new communication channel
	at py4j.CallbackClient.sendCommand(CallbackClient.java:340)
	at py4j.CallbackClient.sendCommand(CallbackClient.java:316)
	at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:103)
	at com.sun.proxy.$Proxy27.call(Unknown Source)
	at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
	at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
	at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:246)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
	at scala.Option.orElse(Option.scala:289)
	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
	at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
	at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
	at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
18/03/11 05:01:00 ERROR org.apache.spark.streaming.api.python.PythonDStream$$anon$1: Cannot connect to Python process. It's probably dead. Stopping StreamingContext.
py4j.Py4JException: Cannot obtain a new communication channel
	at py4j.CallbackClient.sendCommand(CallbackClient.java:340)
	at py4j.CallbackClient.sendCommand(CallbackClient.java:316)
	at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:103)
	at com.sun.proxy.$Proxy27.call(Unknown Source)
	at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
	at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
	at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:246)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
	at scala.Option.orElse(Option.scala:289)
	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
	at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
	at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
	at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
18/03/11 05:01:00 WARN org.apache.spark.streaming.scheduler.ReceiverTracker: Not all of the receivers have deregistered, ArrayBuffer(0)
18/03/11 05:01:00 WARN org.apache.spark.streaming.StreamingContext: StreamingContext has already been stopped
18/03/11 05:01:00 WARN org.apache.spark.streaming.StreamingContext: StreamingContext has already been stopped
18/03/11 05:01:00 INFO org.spark_project.jetty.server.AbstractConnector: Stopped Spark@5625b833{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
@shulegaa
Copy link

I've reproduced this using precisely the same program (save for my own subscription) on Google Cloud Platform (GCP) DataProc v1.2. I've both published hundreds of messages onto my own GCP PubSub subscription. Just to be sure, I've subscribed back to pull the self-same messages - via the Python client running on my laptop. As expected, GCP PubSub seems fine.

For me too, this 'receiver-JAR-with-pyspark-binding-egg' would be ideal. It builds clean.

So, anyone know of any alternatives?

@bmahe
Copy link
Contributor

bmahe commented Sep 27, 2018

Thank you both @natasha-aleksandrova and @shulegaa for the detailed report!

That exception does not make the cause apparent and I will need a bit more information:

  • Does the python connector work for you locally?
  • Does the java connector work for you locally?
  • Does the java connector work for you on your Apache Spark cluster on Google Cloud?

I just tried to reproduce tonight but was unable to unfortunately. However I only tried with Apache Spark 2.3.2 as standalone on my laptop and not on dataproc. But both java and python connectors worked for me.

Also note that I just pushed a commit to upgrade the versions referenced. So you may want to get these updates.

@natasha-aleksandrova
Copy link
Author

@bmahe I am no longer working on the Pyspark project and won't be able to provide additional info, sorry!

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants