Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Akka MQTT Streaming: PingFailed not generated when client is busy. #2905

Open
sbmpost opened this issue Sep 14, 2022 · 5 comments
Open

Akka MQTT Streaming: PingFailed not generated when client is busy. #2905

sbmpost opened this issue Sep 14, 2022 · 5 comments

Comments

@sbmpost
Copy link

sbmpost commented Sep 14, 2022

@huntc
I slightly modified the test in MqttSessionSpec to start publishing when a ping response is expected. This results in the ping timer being reset eventhough a ping response is still pending. We get the PingFailed exception only after we stop publishing (which may never happen). There may be messages other than 'publish' resulting in the same behaviour. I suggest to check all places in the file called 'ClientState', where the method 'serverConnected' calls itself with:

resetPingReqTimer = true

Here is the slightly adapted test using 'awaitCond' to demonstrate the issue:

"MQTT client connector" should {

  "disconnect a connected session if a ping request is not replied to" in assertAllStagesStopped {
    val session = ActorMqttClientSession(settings)

    val server = TestProbe()
    val pipeToServer = Flow[ByteString].mapAsync(1)(msg => server.ref.ask(msg).mapTo[ByteString])

    val (client, result) =
      Source
        .queue(1, OverflowStrategy.fail)
        .via(
          Mqtt
            .clientSessionFlow(session, ByteString("1"))
            .join(pipeToServer)
        )
        .toMat(Sink.ignore)(Keep.both)
        .run()

    val connect = Connect("some-client-id", ConnectFlags.None).copy(keepAlive = 100.millis.dilated)
    val connectBytes = connect.encode(ByteString.newBuilder).result()
    val connAck = ConnAck(ConnAckFlags.None, ConnAckReturnCode.ConnectionAccepted)
    val connAckBytes = connAck.encode(ByteString.newBuilder).result()
    val pingReq = PingReq
    val pingReqBytes = pingReq.encode(ByteString.newBuilder).result()

    client.offer(Command(connect))

    server.expectMsg(connectBytes)
    server.reply(connAckBytes)

    server.expectMsg(pingReqBytes)

    awaitCond {
      if (result.isCompleted) {
        val done: scala.util.Try[Done] = Await.ready(result, Duration.Zero).value.get
        done match {
          case Success(_) =>
            false
          case Failure(throwable) =>
            if (throwable.isInstanceOf[ActorMqttClientSession.PingFailed.type]) {
              true
            } else {
              false
            }
        }
      } else {
        // Comment out the line below to make the test succeed
        session ! Command(Publish("Keep Client Busy", ByteString()))
        false
      }
    }

    client.complete()
    client.watchCompletion().foreach(_ => session.shutdown())
  }

}
@sbmpost sbmpost changed the title Akka MQTT Streaming: PingFailed is not generated as long as client is kept occupied. Akka MQTT Streaming: PingFailed not generated when client is kept occupied. Sep 14, 2022
@sbmpost sbmpost changed the title Akka MQTT Streaming: PingFailed not generated when client is kept occupied. Akka MQTT Streaming: PingFailed not generated when client is busy. Sep 14, 2022
@johanandren johanandren transferred this issue from akka/akka Sep 15, 2022
@huntc
Copy link
Contributor

huntc commented Sep 15, 2022

@sbmpost FYI I'm no longer involved with this project. Thanks for the notification.

@sbmpost
Copy link
Author

sbmpost commented Sep 15, 2022

@ennru Also notifying you then ;-)

@ennru
Copy link
Member

ennru commented Sep 20, 2022

Thank you for your investigation of the ping. Most of my MQTT protocol knowledge has been evicted.
Would you be in a position to suggest an improvement to this in a PR?

@sbmpost
Copy link
Author

sbmpost commented Sep 20, 2022

@ennru
I am afraid not at the moment. I am very time limited mostly due to personal circumstances. If this situation changes, I might be able to give it a shot.

@KevinAtSesam
Copy link

Colleague from @sbmpost speaking here. We did discuss this in our team but with the recent license changes, we can't commit company resources to this. While we gladly contribute to projects that our business is based on, we can't spend the resources on bugfixes in software, that we must then license to use.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants