Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

message does not save at session when client is disconnect #352

Open
mrhb6006 opened this issue Dec 25, 2023 · 21 comments
Open

message does not save at session when client is disconnect #352

mrhb6006 opened this issue Dec 25, 2023 · 21 comments
Labels
bug Something isn't working discussion Something to be discussed

Comments

@mrhb6006
Copy link

mrhb6006 commented Dec 25, 2023

1- A client connects to the server and subscribes to a topic
2- Then Client will be disconnected (while the session has not expired)
3- A data is published on that topic (inline client publish).

But the client does not receive data on that topic after reconnecting. (Session has not expired)
Is the data stored in a client session that is not connected, not delivered to the client on the next connection? (Of course, if the session has not expired)

@mrhb6006
Copy link
Author

i use this pkg at production, plz help

@werbenhu
Copy link
Member

@mrhb6006 I'm too busy at the moment. I will take a look at this issue as soon as I have some free time.

@torkamania
Copy link

@mrhb6006 What is the version of the library in your app?

@thedevop
Copy link
Collaborator

@mrhb6006 , in order for the message to be saved, the following conditions need to be met:

  1. Client session is not expired (met based on above)
  2. Client subscribed to the topic with QOS > 0
  3. Message published with QOS > 0

Can you check what QOS was used for subscription and publishing?

@mrhb6006 mrhb6006 reopened this Dec 27, 2023
@mrhb6006
Copy link
Author

@thedevop
all conditions are met
when publish with real client it is ok
but when i use inline client it is not

@mrhb6006
Copy link
Author

@mrhb6006 What is the version of the library in your app?

last version

@mrhb6006
Copy link
Author

some

tnx
I'm waiting for you

@thedevop
Copy link
Collaborator

Can you elaborate your steps as an inline client would not experience disconnect.

werbenhu added a commit to werbenhu/mochimqtt that referenced this issue Dec 27, 2023
…herit subscriptions from the old connection when a client is taken over.
@werbenhu
Copy link
Member

@mrhb6006 Take a look at #354 to see if it resolves your issue.

@werbenhu
Copy link
Member

werbenhu commented Dec 27, 2023

@mrhb6006 @thedevop @mochi-co According to MQTT spec 3.1.2.4 Clean Session , if the client does not expire, we should retain the client's subscriptions upon reconnection. However, I tested Mosquitto, and it does not support this feature. Therefore, I think making this an optional capability is a good idea.

@werbenhu werbenhu added bug Something isn't working discussion Something to be discussed labels Dec 27, 2023
@thedevop
Copy link
Collaborator

thedevop commented Dec 27, 2023

@werbenhu , we current do retain client's subscription

server/server.go

Lines 495 to 501 in 5058333

for _, sub := range existing.State.Subscriptions.GetAll() {
existed := !s.Topics.Subscribe(cl.ID, sub) // [MQTT-3.8.4-3]
if !existed {
atomic.AddInt64(&s.Info.Subscriptions, 1)
}
cl.State.Subscriptions.Add(sub.Filter, sub)
}

We then clean up the old client:

s.UnsubscribeClient(existing)

But will not unsubscribe topics if the client is taken over.

server/server.go

Lines 1265 to 1267 in 5058333

if atomic.LoadUint32(&cl.State.isTakenOver) == 1 {
return
}

I have tested with Paho client, and it does retain the subscriptions. However, the messages are only stored if QOS > 0.

@werbenhu
Copy link
Member

@thedevop The testing environment I'm in right now is indeed, as you said, without any issues. I will conduct further tests when I return to the previous environment in a few days.

@werbenhu
Copy link
Member

werbenhu commented Jan 6, 2024

@thedevop you are right. I tested it today, and I couldn't reproduce the issue. It might have been my mistake earlier.

@thedevop
Copy link
Collaborator

thedevop commented Jan 7, 2024

@mrhb6006 , can you re-test if a client retains subscription when it disconnect/reconnect?

Can you also re-test the saved messages, make sure the following conditions are met:

  1. Client session is not expired (met based on above)
  2. Client subscribed to the topic with QOS > 0
  3. Message published with QOS > 0

@mrhb6006
Copy link
Author

mrhb6006 commented Jan 9, 2024

@mrhb6006 , can you re-test if a client retains subscription when it disconnect/reconnect?

Can you also re-test the saved messages, make sure the following conditions are met:

  1. Client session is not expired (met based on above)
  2. Client subscribed to the topic with QOS > 0
  3. Message published with QOS > 0

yes all conditions are met.
i use inline client for publish and message not save at session

@mrhb6006
Copy link
Author

mrhb6006 commented Jan 9, 2024

@thedevop you are right. I tested it today, and I couldn't reproduce the issue. It might have been my mistake earlier.

test with inline client publish ?

@werbenhu
Copy link
Member

werbenhu commented Jan 9, 2024

@mrhb6006 I will test this scenario later.

@werbenhu
Copy link
Member

werbenhu commented Jan 9, 2024

@mrhb6006 I tested it, and it's OK. Client A subscribes to the 'mochi' topic with QoS 1, then disconnects. Client B connects, publishes a message to the 'trigger' topic, triggering an inline publish to send a message. When Client A reconnects, it can receive the content sent by Client B. You can refer to my 'main.go' below for details. The client I used is mqttx.

// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileContributor: mochi-co

package main

import (
	"bytes"
	"flag"
	"log"
	"os"
	"os/signal"
	"syscall"

	mqtt "github.com/mochi-mqtt/server/v2"
	"github.com/mochi-mqtt/server/v2/listeners"
	"github.com/mochi-mqtt/server/v2/packets"
)

// Options contains configuration settings for the hook.
type MyHookOptions struct {
	Server *mqtt.Server
}

// AllowHook is an authentication hook which allows connection access
// for all users and read and write access to all topics.
type MyHook struct {
	mqtt.HookBase
	config *MyHookOptions
}

func (h *MyHook) Init(config any) error {
	h.Log.Info("initialised")
	if _, ok := config.(*MyHookOptions); !ok && config != nil {
		return mqtt.ErrInvalidConfigType
	}

	h.config = config.(*MyHookOptions)
	if h.config.Server == nil {
		return mqtt.ErrInvalidConfigType
	}
	return nil
}

// ID returns the ID of the hook.
func (h *MyHook) ID() string {
	return "allow-all-auth"
}

// Provides indicates which hook methods this hook provides.
func (h *MyHook) Provides(b byte) bool {
	return bytes.Contains([]byte{
		mqtt.OnConnectAuthenticate,
		mqtt.OnACLCheck,
		mqtt.OnPublish,
	}, []byte{b})
}

// OnConnectAuthenticate returns true/allowed for all requests.
func (h *MyHook) OnConnectAuthenticate(cl *mqtt.Client, pk packets.Packet) bool {
	return true
}

// OnACLCheck returns true/allowed for all checks.
func (h *MyHook) OnACLCheck(cl *mqtt.Client, topic string, write bool) bool {
	return true
}

func (h *MyHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) {
	h.Log.Info("received from client", "client", cl.ID, "payload", string(pk.Payload))
	pkx := pk
	if string(pk.TopicName) == "trigger" {
		h.config.Server.Publish("mochi", pk.Payload, false, 1);
	}
	return pkx, nil
}

func main() {
	tcpAddr := flag.String("tcp", ":1883", "network address for TCP listener")
	flag.Parse()

	sigs := make(chan os.Signal, 1)
	done := make(chan bool, 1)
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
	go func() {
		<-sigs
		done <- true
	}()

	server := mqtt.New(&mqtt.Options{
		InlineClient: true,
	})
	_ = server.AddHook(new(MyHook), &MyHookOptions{
		Server: server,
	})
	
	tcp := listeners.NewTCP("t1", *tcpAddr, nil)
	err := server.AddListener(tcp)
	if err != nil {
		log.Fatal(err)
	}

	go func() {
		err := server.Serve()
		if err != nil {
			log.Fatal(err)
		}
	}()

	<-done
	server.Log.Warn("caught signal, stopping...")
	_ = server.Close()
	server.Log.Info("main.go finished")

}

@mrhb6006
Copy link
Author

mrhb6006 commented Jan 10, 2024

@mrhb6006 I tested it, and it's OK. Client A subscribes to the 'mochi' topic with QoS 1, then disconnects. Client B connects, publishes a message to the 'trigger' topic, triggering an inline publish to send a message. When Client A reconnects, it can receive the content sent by Client B. You can refer to my 'main.go' below for details. The client I used is mqttx.

// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileContributor: mochi-co

package main

import (
	"bytes"
	"flag"
	"log"
	"os"
	"os/signal"
	"syscall"

	mqtt "github.com/mochi-mqtt/server/v2"
	"github.com/mochi-mqtt/server/v2/listeners"
	"github.com/mochi-mqtt/server/v2/packets"
)

// Options contains configuration settings for the hook.
type MyHookOptions struct {
	Server *mqtt.Server
}

// AllowHook is an authentication hook which allows connection access
// for all users and read and write access to all topics.
type MyHook struct {
	mqtt.HookBase
	config *MyHookOptions
}

func (h *MyHook) Init(config any) error {
	h.Log.Info("initialised")
	if _, ok := config.(*MyHookOptions); !ok && config != nil {
		return mqtt.ErrInvalidConfigType
	}

	h.config = config.(*MyHookOptions)
	if h.config.Server == nil {
		return mqtt.ErrInvalidConfigType
	}
	return nil
}

// ID returns the ID of the hook.
func (h *MyHook) ID() string {
	return "allow-all-auth"
}

// Provides indicates which hook methods this hook provides.
func (h *MyHook) Provides(b byte) bool {
	return bytes.Contains([]byte{
		mqtt.OnConnectAuthenticate,
		mqtt.OnACLCheck,
		mqtt.OnPublish,
	}, []byte{b})
}

// OnConnectAuthenticate returns true/allowed for all requests.
func (h *MyHook) OnConnectAuthenticate(cl *mqtt.Client, pk packets.Packet) bool {
	return true
}

// OnACLCheck returns true/allowed for all checks.
func (h *MyHook) OnACLCheck(cl *mqtt.Client, topic string, write bool) bool {
	return true
}

func (h *MyHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) {
	h.Log.Info("received from client", "client", cl.ID, "payload", string(pk.Payload))
	pkx := pk
	if string(pk.TopicName) == "trigger" {
		h.config.Server.Publish("mochi", pk.Payload, false, 1);
	}
	return pkx, nil
}

func main() {
	tcpAddr := flag.String("tcp", ":1883", "network address for TCP listener")
	flag.Parse()

	sigs := make(chan os.Signal, 1)
	done := make(chan bool, 1)
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
	go func() {
		<-sigs
		done <- true
	}()

	server := mqtt.New(&mqtt.Options{
		InlineClient: true,
	})
	_ = server.AddHook(new(MyHook), &MyHookOptions{
		Server: server,
	})
	
	tcp := listeners.NewTCP("t1", *tcpAddr, nil)
	err := server.AddListener(tcp)
	if err != nil {
		log.Fatal(err)
	}

	go func() {
		err := server.Serve()
		if err != nil {
			log.Fatal(err)
		}
	}()

	<-done
	server.Log.Warn("caught signal, stopping...")
	_ = server.Close()
	server.Log.Info("main.go finished")

}

i mean use server.Publish("trigger",body,2) for publish
dont work for me

@werbenhu
Copy link
Member

werbenhu commented Jan 10, 2024

@mrhb6006 I have also tested the situation of subscribing and publishing to the QoS2 topic, and it works fine. You can modify my main.go to conduct the test.

@mochi-co
Copy link
Collaborator

Can this be closed?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working discussion Something to be discussed
Projects
None yet
Development

No branches or pull requests

5 participants