Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sparkmonitor failure for kernel restart - java.net.SocketException: Broken pipe (Write failed) #15

Open
creepysta opened this issue Feb 24, 2022 · 2 comments

Comments

@creepysta
Copy link

Hi,

Noticed the following issue when restarting the kernel from a classic Jupyter
Notebook using JEG to launch remote spark kernels in kubernetes. The
sparkmonitor doesn't show up, and in the driver logs we see its a
java.net.SocketException: Broken pipe (Write failed) that's being thrown and the following line -

[IPKernelApp] WARNING | No such comm: b5b03d3c1393459f9b736fb5f5dd5461

PFA the stack trace at the end.

Observations so far -

For a successful case -

  1. Comm opened
  2. Client connected
INFO:SparkMonitorKernel:Comm opened
[I 2021-12-28 06:36:19,883.883 SparkMonitorKernel] Comm opened
...
INFO:SparkMonitorKernel:Client Connected ('127.0.0.1', 35792)
[I 2021-12-28 06:36:19,914.914 SparkMonitorKernel] Client Connected ('127.0.0.1', 35792)

For failure case -

  1. Client Connected
  2. Comm opened
INFO:SparkMonitorKernel:Client Connected ('127.0.0.1', 33320)
[I 2021-12-28 05:44:11,603.603 SparkMonitorKernel] Client Connected ('127.0.0.1', 33320)
...
INFO:SparkMonitorKernel:Comm opened
[I 2021-12-28 05:44:11,760.760 SparkMonitorKernel] Comm opened

For a temporary fix, to replicate the successful case, a delay of 20secs has
been placed in the CustomListener.scala before establishing the socket
connection. This is to ensure the Comm opened is done before Client Connected

Thanks @akhileshram for pointing out the fix

def startConnection(): Unit = {
  try {
      Thread.sleep(20000) // added
      socket = new Socket("localhost", port.toInt)
      out = new OutputStreamWriter(socket.getOutputStream())

      ....
  }
}

Any hint, or help with this issue will help out a lot.

Error

2021-12-28 05:44:48,802 INFO  [spark-listener-group-shared] listener.JupyterSparkMonitorListener (CustomListener.scala:onJobStart(267)) - Job Start: 0
2021-12-28 05:44:48,804 ERROR [spark-listener-group-shared] listener.JupyterSparkMonitorListener (CustomListener.scala:send(86)) - Exception sending socket message:
java.net.SocketException: Broken pipe (Write failed)
	at java.base/java.net.SocketOutputStream.socketWrite0(Native Method)
	at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110)
	at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150)
	at java.base/sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:233)
	at java.base/sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:312)
	at java.base/sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:316)
	at java.base/sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:153)
	at java.base/java.io.OutputStreamWriter.flush(OutputStreamWriter.java:251)
	at sparkmonitor.listener.JupyterSparkMonitorListener.send(CustomListener.scala:83)
	at sparkmonitor.listener.JupyterSparkMonitorListener.onJobStart(CustomListener.scala:269)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:37)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
	at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
	at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1381)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
@rahul26goyal
Copy link

@creepysta : Even with the additional sleep(), are you still seeing the exception or its solved completely?

@creepysta
Copy link
Author

@creepysta : Even with the additional sleep(), are you still seeing the exception or its solved completely?

The sleep() seems to have solved the cases where the sparkmonitor intermittently won't show up even -

  • with a new notebook start, and
  • with manually restarting the kernel from a running notebook.

But it doesn't solve the scenario when the JEG restarts the kernel.

notebook - Classic Jupyter notebook
kernel - (here referring) spark driver pod (kernel)
JEG - Jupyter Enterprise Gateway

@krishnan-r krishnan-r transferred this issue from swan-cern/jupyter-extensions Apr 13, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants