Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

如何有重复id登录,是否有选项舍弃之前的id,而不是新的id自动拒绝? #203

Open
luohao123 opened this issue Nov 14, 2023 · 5 comments

Comments

@luohao123
Copy link

如何有重复id登录,是否有选项舍弃之前的id,而不是新的id自动拒绝?

@heliosgo
Copy link

其实目前的逻辑就是舍弃旧的,具体可以看看

gmqtt/server/server.go

Lines 342 to 499 in a28cd4f

func (srv *server) registerClient(connect *packets.Connect, client *client) (sessionResume bool, err error) {
var qs queue.Store
var ua unack.Store
var sess *gmqtt.Session
var oldSession *gmqtt.Session
now := time.Now()
oldSession, err = srv.lockDuplicatedID(client)
if err != nil {
return
}
defer func() {
if err == nil {
var willMsg *gmqtt.Message
var willDelayInterval, expiryInterval uint32
if connect.WillFlag {
willMsg = &gmqtt.Message{
QoS: connect.WillQos,
Topic: string(connect.WillTopic),
Payload: connect.WillMsg,
}
setWillProperties(connect.WillProperties, willMsg)
}
// use default expiry if the client version is version3.1.1
if packets.IsVersion3X(client.version) && !connect.CleanStart {
expiryInterval = uint32(srv.config.MQTT.SessionExpiry.Seconds())
} else if connect.Properties != nil {
willDelayInterval = convertUint32(connect.WillProperties.WillDelayInterval, 0)
expiryInterval = client.opts.SessionExpiry
}
sess = &gmqtt.Session{
ClientID: client.opts.ClientID,
Will: willMsg,
ConnectedAt: time.Now(),
WillDelayInterval: willDelayInterval,
ExpiryInterval: expiryInterval,
}
err = srv.sessionStore.Set(sess)
}
if err == nil {
client.session = sess
if sessionResume {
// If a new Network Connection to this Session is made before the Will Delay Interval has passed,
// the Server MUST NOT send the Will Message [MQTT-3.1.3-9].
if w, ok := srv.willMessage[client.opts.ClientID]; ok {
w.signal(false)
}
if srv.hooks.OnSessionResumed != nil {
srv.hooks.OnSessionResumed(context.Background(), client)
}
srv.statsManager.sessionActive(false)
} else {
if srv.hooks.OnSessionCreated != nil {
srv.hooks.OnSessionCreated(context.Background(), client)
}
srv.statsManager.sessionActive(true)
}
srv.clients[client.opts.ClientID] = client
srv.unackStore[client.opts.ClientID] = ua
srv.queueStore[client.opts.ClientID] = qs
client.queueStore = qs
client.unackStore = ua
if client.version == packets.Version5 {
client.topicAliasManager = srv.newTopicAliasManager(client.config, client.opts.ClientTopicAliasMax, client.opts.ClientID)
}
}
srv.mu.Unlock()
}()
client.setConnected(time.Now())
if srv.hooks.OnConnected != nil {
srv.hooks.OnConnected(context.Background(), client)
}
srv.statsManager.clientConnected(client.opts.ClientID)
if oldSession != nil {
if !oldSession.IsExpired(now) && !connect.CleanStart {
sessionResume = true
}
// clean old session
if !sessionResume {
err = srv.sessionTerminatedLocked(oldSession.ClientID, TakenOverTermination)
if err != nil {
err = fmt.Errorf("session terminated fail: %w", err)
zaplog.Error("session terminated fail", zap.Error(err))
}
// Send will message because the previous session is ended.
if w, ok := srv.willMessage[client.opts.ClientID]; ok {
w.signal(true)
}
} else {
qs = srv.queueStore[client.opts.ClientID]
if qs != nil {
err = qs.Init(&queue.InitOptions{
CleanStart: false,
Version: client.version,
ReadBytesLimit: client.opts.ClientMaxPacketSize,
Notifier: client.queueNotifier,
})
if err != nil {
return
}
}
ua = srv.unackStore[client.opts.ClientID]
if ua != nil {
err = ua.Init(false)
if err != nil {
return
}
}
if ua == nil || qs == nil {
// This could happen if backend store loss some data which will bring the session into "inconsistent state".
// We should create a new session and prevent the client reuse the inconsistent one.
sessionResume = false
zaplog.Error("detect inconsistent session state",
zap.String("remote_addr", client.rwc.RemoteAddr().String()),
zap.String("client_id", client.opts.ClientID))
} else {
zaplog.Info("logged in with session reuse",
zap.String("remote_addr", client.rwc.RemoteAddr().String()),
zap.String("client_id", client.opts.ClientID))
}
}
}
if !sessionResume {
// create new session
// It is ok to pass nil to defaultNotifier, because we will call Init to override it.
qs, err = srv.persistence.NewQueueStore(srv.config, nil, client.opts.ClientID)
if err != nil {
return
}
err = qs.Init(&queue.InitOptions{
CleanStart: true,
Version: client.version,
ReadBytesLimit: client.opts.ClientMaxPacketSize,
Notifier: client.queueNotifier,
})
if err != nil {
return
}
ua, err = srv.persistence.NewUnackStore(srv.config, client.opts.ClientID)
if err != nil {
return
}
err = ua.Init(true)
if err != nil {
return
}
zaplog.Info("logged in with new session",
zap.String("remote_addr", client.rwc.RemoteAddr().String()),
zap.String("client_id", client.opts.ClientID),
)
}
delete(srv.offlineClients, client.opts.ClientID)
return
}

oldSession, err = srv.lockDuplicatedID(client)

@luohao123
Copy link
Author

那为啥每次会报错 login with duplated id,然后客户端就掉线了

@heliosgo
Copy link

那为啥每次会报错 login with duplated id,然后客户端就掉线了

掉线的是旧的,会与新连接建立会话。正常是这样的才对。我刚测试了一下,好像没啥问题

@luohao123
Copy link
Author

@jiennyx shamblett/mqtt_client#460 (comment)

你这么一说,似乎有点道理,似乎好像是 我用client_id 5566 一开始链接了,然后如果服务端似乎有一个链接缓存,再次以5566 链接的时候,它就会断开老的,但是由于实际上老的就是本次的链接,所以表现为我一连上,立马就断开了。

可以这么理解吗?

@heliosgo
Copy link

@luohao123 不太理解 但是由于实际上老的就是本次的链接,所以表现为我一连上,立马就断开了。
我贴段日志把,看起来清楚点。

2023-11-15T16:34:17.030+0800 INFO server/server.go:327 logging w
ith duplicate ClientID {"remote": "192.168.19.118:61779", "client_id": "mqttx_67fa4d05"}

2023-11-15T16:34:17.031+0800 WARN server/client.go:274 connectio
n lost {"client_id": "mqttx_67fa4d05", "remote_addr": "192.168.19.118:61768", "error": "operation
error: Code = 8e, reasonString: "}

2023-11-15T16:34:17.031+0800 INFO server/server.go:599 logged ou
t and cleaning session {"remote_addr": "192.168.19.118:61768", "client_id": "mqttx_67fa4d05"}

2023-11-15T16:34:17.032+0800 INFO server/server.go:492 logged in
with new session {"remote_addr": "192.168.19.118:61779", "client_id": "mqttx_67fa4d05"}

:61768 被断开,:61779 正常建立。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants