Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sparkmonitor failure for kernel restart - java.net.SocketException: Broken pipe (Write failed) #11

Open
creepysta opened this issue Feb 24, 2022 · 6 comments

Comments

@creepysta
Copy link

Hi,

Noticed the following issue when restarting the kernel from a classic Jupyter Notebook using JEG to launch remote spark kernels in kubernetes. The sparkmonitor doesn't show up, and in the driver logs we see its a java.net.SocketException: Broken pipe (Write failed) that's being thrown and the following line -

[IPKernelApp] WARNING | No such comm: b5b03d3c1393459f9b736fb5f5dd5461

PFA the stack trace at the end.

Observations so far -

For a successful case -

  1. Comm opened
  2. Client connected
INFO:SparkMonitorKernel:Comm opened
[I 2021-12-28 06:36:19,883.883 SparkMonitorKernel] Comm opened
...
INFO:SparkMonitorKernel:Client Connected ('127.0.0.1', 35792)
[I 2021-12-28 06:36:19,914.914 SparkMonitorKernel] Client Connected ('127.0.0.1', 35792)

For failure case -

  1. Client Connected
  2. Comm opened
INFO:SparkMonitorKernel:Client Connected ('127.0.0.1', 33320)
[I 2021-12-28 05:44:11,603.603 SparkMonitorKernel] Client Connected ('127.0.0.1', 33320)
...
INFO:SparkMonitorKernel:Comm opened
[I 2021-12-28 05:44:11,760.760 SparkMonitorKernel] Comm opened

For a temporary fix, to replicate the successful case, a delay of 20secs has been placed in the CustomListener.scala before establishing the socket connection. This is to ensure the Comm opened is done before Client Connected

Thanks @akhileshram for pointing out the fix

def startConnection(): Unit = {
  try {
      Thread.sleep(20000) // added
      socket = new Socket("localhost", port.toInt)
      out = new OutputStreamWriter(socket.getOutputStream())

      ....
  }
}

Any hint, or help with this issue will help out a lot.

Error

2021-12-28 05:44:48,802 INFO  [spark-listener-group-shared] listener.JupyterSparkMonitorListener (CustomListener.scala:onJobStart(267)) - Job Start: 0
2021-12-28 05:44:48,804 ERROR [spark-listener-group-shared] listener.JupyterSparkMonitorListener (CustomListener.scala:send(86)) - Exception sending socket message:
java.net.SocketException: Broken pipe (Write failed)
	at java.base/java.net.SocketOutputStream.socketWrite0(Native Method)
	at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110)
	at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150)
	at java.base/sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:233)
	at java.base/sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:312)
	at java.base/sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:316)
	at java.base/sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:153)
	at java.base/java.io.OutputStreamWriter.flush(OutputStreamWriter.java:251)
	at sparkmonitor.listener.JupyterSparkMonitorListener.send(CustomListener.scala:83)
	at sparkmonitor.listener.JupyterSparkMonitorListener.onJobStart(CustomListener.scala:269)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:37)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
	at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
	at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1381)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
@krishnan-r
Copy link
Contributor

krishnan-r commented Mar 25, 2022

This probably is the same as #7, see my comment there. I will draft a PR soon with a fix.

@rdhara
Copy link

rdhara commented May 25, 2022

@creepysta Can you please explain the steps you took after adding that one line to the listener file? I'm new to sbt and Scala and am having some trouble generating the JARs referenced in the build.sbt:

assembly / assemblyOutputPath := {
  scalaBinaryVersion.value match {
    case "2.11" => (baseDirectory { base => base / ("../sparkmonitor/listener_2.11.jar") }).value
    case "2.12" => (baseDirectory { base => base / ("../sparkmonitor/listener_2.12.jar") }).value
  }
}

These jars appear in site-packages when I just run pip install sparkmonitor. I'm not even sure where the JAR building is happening, but based on a search for sbt I found this on line 75 of the packages.json: "build:scalalistener": "cd scalalistener && sbt +assembly", but the command failed with an error message.

My current plan is to make the change in a separate repo and try to pip install that repo after cloning it in. I tried this but I'm not able to get the JARs to appear after the setup.py runs; any guidance you can provide on building this locally would be very helpful - thanks in advance!

@utkarshgupta137
Copy link
Contributor

@rdhara I used github actions for building JAR files by modifying it to upload jar files as a release: https://github.com/utkarshgupta137/sparkmonitor/blob/master/.github/workflows/publish.yml#L41

@rdhara
Copy link

rdhara commented May 25, 2022

@rdhara I used github actions for building JAR files by modifying it to upload jar files as a release: https://github.com/utkarshgupta137/sparkmonitor/blob/master/.github/workflows/publish.yml#L41

Thank you I'll try this!

@creepysta
Copy link
Author

Hi @rdhara , apologies for the late reply. Can you try assembly followed by package in sbt shell? Just open sbt shell in the same folder as sbt.build file.
The output jar should be present in ../sparkmonitor considering your cwd is sparkmonitor/scalalistener

@rdhara
Copy link

rdhara commented Jun 13, 2022

Yup I figured it out (same step you have) and got it to work, thank you @creepysta!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants