Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[question] Is that gorilla websocket have limit the broadcast message in a second? #648

Closed
1231sadqwf opened this issue Oct 26, 2020 · 8 comments
Labels

Comments

@1231sadqwf
Copy link

Describe the problem you're having
I use golang.org/x/net/websocket to broadcast about 100 message in a second is no problem, the client can receive all the message, but when I use gorilla websocket, only receive less than 10 message then websocket clientv will be corrupted.

Versions

Go version: go version go1.13.11 linux/amd64

package version: c3dd95a

"Show me the code!"
package main

import (
"encoding/json"
"github.com/gorilla/websocket"
"log"
"net/http"
"time"
)

const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second

// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second

// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10

// Maximum message size allowed from peer.
maxMessageSize = 512

)

var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {return true},
}

var newline = []byte{'\n'}

type Client struct {
hub *Hub
conn *websocket.Conn
send chan msg
see string
}

type Hub struct {
clients map[*Client]bool
broadcast chan msg
register chan *Client
unregister chan *Client
}

func newHub() *Hub {
return &Hub{
broadcast: make(chan msg),
register: make(chan *Client),
unregister: make(chan *Client),
clients: make(map[*Client]bool),
}

}

func (h *Hub) run() {
for {
select {
case client := <-h.register:
h.clients[client] = true
case client := <-h.unregister:
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
close(client.send)
}
case message := <-h.broadcast:
for client := range h.clients {
if client.see == message.LogName {
client.send <- message
}
//select {
//case client.send <- message:
//default:
// close(client.send)
// delete(h.clients, client)
//}
}
}
}
}

func (c *Client) readPump() {
defer func() {
c.hub.unregister <- c
_ = c.conn.Close()
}()
c.conn.SetReadLimit(maxMessageSize)
_ = c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(string) error { _ = c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
ErrorLogger.Printf("readPump failed with error: %v", err)
}
break
}

	type recv struct {
		LogName string `json:"logName"`
	}
	var rcv = &recv{}
	if err := json.Unmarshal(message, &rcv); err != nil {
		c.hub.unregister <- c
		ErrorLogger.Printf("%s",err)
		break
	}
	c.see = rcv.LogName
}

}

func (c *Client) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
_ = c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// The hub closed the channel.
_ = c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}

		w, err := c.conn.NextWriter(websocket.TextMessage)
		if err != nil {
			return
		}
		msgByte, _ := json.Marshal(message)
		_, _ = w.Write(msgByte)

		// Add queued chat messages to the current websocket message.
		n := len(c.send)
		_, _ = w.Write(newline)
		for i := 0; i < n; i++ {
			for msg := range c.send {
				msgByte, _ := json.Marshal(msg)
				_, err := w.Write(msgByte)
				if err != nil {
					newHub().unregister <- c
					ErrorLogger.Printf("%s", err)
					break
				}
			}
		}
		if err := w.Close(); err != nil {
			return
		}
	case <-ticker.C:
		_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
		if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
			return
		}
	}
}

}

// serveWs handles websocket requests from the peer.
func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Fatal(err)
return
}
client := &Client{hub: hub, conn: conn, send: make(chan msg, 1)}
client.hub.register <- client

// Allow collection of memory referenced by the caller by doing all work in
// new goroutines.
go client.writePump()
go client.readPump()

}

@ghost
Copy link

ghost commented Oct 26, 2020

Is your client written to handle multiple JSON documents in a single WebSocket message?

Show the code with x/net/websocket that worked.

@1231sadqwf
Copy link
Author

Is your client written to handle multiple JSON documents in a single WebSocket message?

Show the code with x/net/websocket that worked.

no

@1231sadqwf
Copy link
Author

1231sadqwf commented Oct 26, 2020


import (
	"encoding/json"
	"golang.org/x/net/websocket"
	"io"
)

//  websocket客户端
type client struct {
	id     string
	socket *websocket.Conn
	send   chan msg
	see    string
}

// 客户端管理
type clientManager struct {
	clients    map[*client]bool
	broadcast  chan msg
	register   chan *client
	unregister chan *client
}

var manager = clientManager{
	broadcast:  make(chan msg),
	register:   make(chan *client),
	unregister: make(chan *client),
	clients:    make(map[*client]bool),
}

func (manager *clientManager) start() {
	defer func() {
		if err := recover(); err != nil {
			ErrorLogger.Printf("manager start() panic")
		}
	}()

	for {
		select {
		case conn := <-manager.register:
			manager.clients[conn] = true
		case conn := <-manager.unregister:
			if _, ok := manager.clients[conn]; ok {
				close(conn.send)
				_ = conn.socket.Close()
				delete(manager.clients, conn)
			}
		case msg := <-manager.broadcast:
			for conn := range manager.clients {
				if conn.see == msg.LogName {
					conn.send <- msg
				}
			}
		}
	}
}

func (c *client) write() {

	for msg := range c.send {
		msgByte, _ := json.Marshal(msg) // 忽略错误
		_, err := c.socket.Write(msgByte)
		if err != nil {
			manager.unregister <- c
			ErrorLogger.Printf("%s",err)
			break
		}
	}
}

func (c *client) read() {
	for {
		var reply string
		if err := websocket.Message.Receive(c.socket, &reply); err != nil {
			if err != io.EOF {
				ErrorLogger.Printf("%s",err)
				manager.unregister <- c
			}
			break
		}
		type recv struct {
			LogName string `json:"logName"`
		}
		var rcv = &recv{}
		if err := json.Unmarshal([]byte(reply), &rcv); err != nil {
			manager.unregister <- c
			ErrorLogger.Printf("%s",err)
			break
		}
		c.see = rcv.LogName
	}
}

@ghost
Copy link

ghost commented Oct 26, 2020

The gorilla package does not have a limit the broadcast message in a second.

If your client is not written to handle multiple JSON documents in a WebSocket message, then remove your code that writes multiple documents to a message.

Your x/net/websocket code writes a single JSON document per WebSocket message. If you do the same in with the gorilla code, the application will probably work as expected.

@1231sadqwf
Copy link
Author

remove your code that writes multiple documents to a message.

did you mean this?

			// Add queued chat messages to the current websocket message.
			n := len(c.send)
			_, _ = w.Write(newline)
			for i := 0; i < n; i++ {
				for msg := range c.send {
					msgByte, _ := json.Marshal(msg)
					_, err := w.Write(msgByte)
					if err != nil {
						newHub().unregister <- c
						ErrorLogger.Printf("%s", err)
						break
					}
				}
			}

@ghost
Copy link

ghost commented Oct 26, 2020

Yes, that is your code that adds multiple documents to a websocket message.

@1231sadqwf
Copy link
Author

Ok, did gorilla have any variable or api that can make the client receive more message in a second?

@ghost
Copy link

ghost commented Oct 26, 2020

Gorilla does not have a variable or API that limits the number of messages per second.

The code to pack multiple JSON documents into a single WebSocket message will trigger then sending messages at a very high rate. Given that your client is not designed to handle multiple JSON documents in a single WebSocket message, you can expect your application to lose messages when sending messages at a very high rate.

Delete the code quoted a couple of comments back and test again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant