Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OOM for 3K websockets #22

Open
indexk09 opened this issue Mar 17, 2022 · 7 comments
Open

OOM for 3K websockets #22

indexk09 opened this issue Mar 17, 2022 · 7 comments

Comments

@indexk09
Copy link

indexk09 commented Mar 17, 2022

Hi,

I am trying to implement a websocket push based server using this library and I am constantly running into OOM for large number of sockets like 3K for example. I am wondering why this is happening? Below is the code. OOM doesn't happen for small number of sockets and memory seems to be stable. It only happens for large number of sockets.

        clients := map[string]net.Conn{}
	go func() {
		for {
			select {
			case res := <-chIO: // receive IO events from watcher
				if res.Error != nil {
					log.Error().Msgf("Error receiving IO event from watcher: %v", res.Error)
					delete(clients, res.Conn.RemoteAddr().String())
					err = w.Free(res.Conn)
					if err != nil {
						log.Error().Msgf("error freeing connection: %v", err)
					}
					continue
				}
			case feed := <-out:
				f := ws.NewTextFrame(feed)
				bts := CompileHeader(f.Header)
				for index, conn := range clients {
					if conn != nil {
						err = w.Write(nil, conn, bts)
						if err != nil {
							if errors.Is(err, syscall.EPIPE) || errors.Is(err, syscall.ECONNRESET) {
								delete(clients, index)
							} else {
								log.Error().Msgf("unable to write header: %v", err)
							}
						}
						err = w.Write(nil, conn, f.Payload)
						if err != nil {
							if errors.Is(err, syscall.EPIPE) || errors.Is(err, syscall.ECONNRESET) {
								delete(clients, index)
							} else {
								log.Error().Msgf("unable to write payload: %v", err)
							}
						}
					}
				}
			case conn := <-chConn: // receive new connection events
				clients[conn.RemoteAddr().String()] = conn
			}
		}
	}()

I did some profiling with pprof and here is what I have

heap profile: 35098695: 5565869472 [80607043: 16351145656] @ heap/2
14246148: 2507322048 [14882947: 2619398672] @ 0x84d345 0x475a92 0x84b25a 0x8f7a31 0x8f785b 0x46d041
	0x84d344	github.com/xtaci/gaio.init.0.func1+0x24			/home/circleci/.go_workspace/pkg/mod/github.com/xtaci/gaio@v1.2.14/watcher.go:28
	0x475a91	sync.(*Pool).Get+0xb1					/usr/local/go/src/sync/pool.go:148
	0x84b259	github.com/xtaci/gaio.(*watcher).aioCreate+0x1b9	/home/circleci/.go_workspace/pkg/mod/github.com/xtaci/gaio@v1.2.14/watcher.go:272
	0x8f7a30	github.com/xtaci/gaio.(*watcher).Write+0x7b0		/home/circleci/.go_workspace/pkg/mod/github.com/xtaci/gaio@v1.2.14/watcher.go:240

14246061: 2507306736 [14882895: 2619389520] @ 0x84d345 0x475a92 0x84b25a 0x8f7845 0x8f75db 0x46d041
	0x84d344	github.com/xtaci/gaio.init.0.func1+0x24			/home/circleci/.go_workspace/pkg/mod/github.com/xtaci/gaio@v1.2.14/watcher.go:28
	0x475a91	sync.(*Pool).Get+0xb1					/usr/local/go/src/sync/pool.go:148
	0x84b259	github.com/xtaci/gaio.(*watcher).aioCreate+0x1b9	/home/circleci/.go_workspace/pkg/mod/github.com/xtaci/gaio@v1.2.14/watcher.go:272
	0x8f7844	github.com/xtaci/gaio.(*watcher).Write+0x5c4		/home/circleci/.go_workspace/pkg/mod/github.com/xtaci/gaio@v1.2.14/watcher.go:240

6377990: 306143520 [24276054: 1165250592] @ 0x84ca52 0x84ca3e 0x84cb0f 0x84c0b8 0x46d041
	0x84ca51	container/list.(*List).insertValue+0x4d1		/usr/local/go/src/container/list/list.go:104
	0x84ca3d	container/list.(*List).PushBack+0x4bd			/usr/local/go/src/container/list/list.go:155
	0x84cb0e	github.com/xtaci/gaio.(*watcher).handlePending+0x58e	/home/circleci/.go_workspace/pkg/mod/github.com/xtaci/gaio@v1.2.14/watcher.go:563
	0x84c0b7	github.com/xtaci/gaio.(*watcher).loop+0x2d7		/home/circleci/.go_workspace/pkg/mod/github.com/xtaci/gaio@v1.2.14/watcher.go:437

1: 132481024 [1: 132481024] @ 0x84b425 0x8f7a31 0x8f785b 0x46d041
	0x84b424	github.com/xtaci/gaio.(*watcher).aioCreate+0x384	/home/circleci/.go_workspace/pkg/mod/github.com/xtaci/gaio@v1.2.14/watcher.go:276
	0x8f7a30	github.com/xtaci/gaio.(*watcher).Write+0x7b0		/home/circleci/.go_workspace/pkg/mod/github.com/xtaci/gaio@v1.2.14/watcher.go:240

1: 67821568 [1: 67821568] @ 0x84b425 0x8f7845 0x8f75db 0x46d041
	0x84b424	github.com/xtaci/gaio.(*watcher).aioCreate+0x384	/home/circleci/.go_workspace/pkg/mod/github.com/xtaci/gaio@v1.2.14/watcher.go:276
	0x8f7844	github.com/xtaci/gaio.(*watcher).Write+0x5c4		/home/circleci/.go_workspace/pkg/mod/github.com/xtaci/gaio@v1.2.14/watcher.go:240

159604: 28090304 [159656: 28099456] @ 0x84d345 0x475a92 0x84b25a 0x8f7545 0x8f74e7 0x46d041
	0x84d344	github.com/xtaci/gaio.init.0.func1+0x24			/home/circleci/.go_workspace/pkg/mod/github.com/xtaci/gaio@v1.2.14/watcher.go:28
	0x475a91	sync.(*Pool).Get+0xb1					/usr/local/go/src/sync/pool.go:148
	0x84b259	github.com/xtaci/gaio.(*watcher).aioCreate+0x1b9	/home/circleci/.go_workspace/pkg/mod/github.com/xtaci/gaio@v1.2.14/watcher.go:272
	0x8f7544	github.com/xtaci/gaio.(*watcher).Free+0x2c4		/home/circleci/.go_workspace/pkg/mod/github.com/xtaci/gaio@v1.2.14/watcher.go:256

@indexk09
Copy link
Author

@xtaci any idea, why? looking at the profiler it shows somehow this list https://github.com/xtaci/gaio/blob/master/watcher.go#L563 is getting larger and larger and eventually runs out even though I have 8GB machine.

@xtaci
Copy link
Owner

xtaci commented Mar 17, 2022

lemme look into this, somewhat wired.

@indexk09
Copy link
Author

@xtaci Please let me know if you need any more info. I can provide that pretty quickly since I spent few days tweaking it. Also, I have one question regarding async io call backs. It appears to me(although not 100% sure) that an aiocb object is created for every write request (ignoring reads for the sake of this discussion) if so, I am guessing what might be happening is that if we have slow clients the number of callbacks are piling up in the list pointed out in my previous comment and eventually running out of 8GB memory. so I tried changing write to writeTimeout of 100ms in the above code and that still didn't work. it is still causing OOM. Now the question I wanted to ask is, what happens if my server send a ton of write requests to all 3K sockets? would ii block at some point? if not, how to prevent OOM?

@xtaci
Copy link
Owner

xtaci commented Mar 17, 2022

could you show me a code snippet which can re-appear the OOM

@xtaci
Copy link
Owner

xtaci commented Mar 17, 2022

clients

aiocb is allocated via pool, and will be recycled

func init() {
    aiocbPool.New = func() interface{} {
        return new(aiocb)
    }
}

@indexk09
Copy link
Author

indexk09 commented Mar 17, 2022

@xtaci its the same as pasted above. let me know if you need more clarification ? I am also pasting below. this is very similar to your push server example, except I am using map of connections instead of can array. OOM happens when there are 1K+ connections with the code below. On a high level the code is just trying broadcast each message from a message queue to all the connected clients concurrently.

        clients := map[string]net.Conn{}
	go func() {
		for {
			select {
			case res := <-chIO: // receive IO events from watcher
				if res.Error != nil {
					log.Error().Msgf("Error receiving IO event from watcher: %v", res.Error)
					delete(clients, res.Conn.RemoteAddr().String())
					err = w.Free(res.Conn)
					if err != nil {
						log.Error().Msgf("error freeing connection: %v", err)
					}
					continue
				}
			case feed := <-out:
				f := ws.NewTextFrame(feed)
				bts := CompileHeader(f.Header)
				for index, conn := range clients {
					if conn != nil {
						err = w.Write(nil, conn, bts)
						if err != nil {
							if errors.Is(err, syscall.EPIPE) || errors.Is(err, syscall.ECONNRESET) {
								delete(clients, index)
							} else {
								log.Error().Msgf("unable to write header: %v", err)
							}
						}
						err = w.Write(nil, conn, f.Payload)
						if err != nil {
							if errors.Is(err, syscall.EPIPE) || errors.Is(err, syscall.ECONNRESET) {
								delete(clients, index)
							} else {
								log.Error().Msgf("unable to write payload: %v", err)
							}
						}
					}
				}
			case conn := <-chConn: // receive new connection events
				clients[conn.RemoteAddr().String()] = conn
			}
		}
	}()

@xtaci let me know if there are specific parts of this code you want me to explain?

@indexk09
Copy link
Author

func init() {
    aiocbPool.New = func() interface{} {
        return new(aiocb)
    }
}

@xtaci what is the pool size here? and what happens if the pool is full? Does the event loop suspend its thread?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants