Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support batching outgoing messages #576

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open

Conversation

Yu-Xie
Copy link

@Yu-Xie Yu-Xie commented Aug 8, 2023

Prototype for #555

  • Support batching mode when sending application messages to target by adding a func SaveMessagesAndIncrNextSenderMsgSeqNum(seqNum int, msg [][]byte) in the storage interface, and then a func SendAppToTarget(m []Messagable, sessionID SessionID) error in registry.go to expose it.
  • It's supported in SQLStore and MemoryStore in this PR. If we like this direction, I'm happy to implement FileStore and MongoStore too.

@Yu-Xie
Copy link
Author

Yu-Xie commented Aug 9, 2023

Posting some load testing results using local Postgres:

Output:

Spent 78.082917ms seconds to send 100 messages with 10 batch size and 10 batches
Spent 14.086666ms seconds to send 100 messages with 50 batch size and 2 batches
Spent 61.315208ms seconds to send 500 messages with 50 batch size and 10 batches
Spent 35.222625ms seconds to send 500 messages with 100 batch size and 5 batches
Spent 64.8035ms seconds to send 1000 messages with 100 batch size and 10 batches

Note that this is much faster compare to existing implementation (#555) which takes 1.5 SECONDS to prepare 1000 messages for sending.

Source code:

package quickfix

import (
	"fmt"
	"strconv"
	"sync/atomic"
	"testing"
	"time"

	"github.com/quickfixgo/quickfix/config"
	"github.com/stretchr/testify/require"

	_ "github.com/lib/pq"
)

func TestBatchSendMessage(t *testing.T) {
	const driver = "postgres"
	const dsn = `host=127.0.0.1 port=5432 user=xxx password=password dbname=xxx sslmode=disable`
	settings := NewSettings()
	sessionSettings := NewSessionSettings()
	sessionSettings.Set(config.BeginString, BeginStringFIX42)
	sessionSettings.Set(config.SenderCompID, "mock_initiator")
	sessionSettings.Set(config.TargetCompID, "mock_acceptor")
	sessionSettings.Set(config.HeartBtInt, "10")
	sessionSettings.Set(config.SocketConnectPort, "12345")
	sessionSettings.Set(config.SocketConnectHost, "127.0.0.1")
	sessionSettings.Set(config.SQLStoreDriver, driver)
	sessionSettings.Set(config.SQLStoreDataSourceName, dsn)
	sid, err := settings.AddSession(sessionSettings)
	require.NoError(t, err)

	factory := NewSQLStoreFactory(settings)

	initiator, err := NewInitiator(&app{}, factory, settings, NewNullLogFactory())
	require.NoError(t, err)

	err = initiator.Start()
	require.NoError(t, err)

	time.Sleep(10 * time.Second)

	require.NoError(t, batchSend(10, 100, sid))
	require.NoError(t, batchSend(50, 100, sid))
	require.NoError(t, batchSend(50, 500, sid))
	require.NoError(t, batchSend(100, 500, sid))
	require.NoError(t, batchSend(100, 1000, sid))

	time.Sleep(20 * time.Second)
	t.Fail() // to see logs
}

func batchSend(buf, n int, sessionID SessionID) error {
	if buf > n {
		buf = n
	}
	ch := make(chan Messagable, n)
	for _, o := range newOrders(n) {
		select {
		case ch <- o:
		}
	}
	// batch
	b := make([]Messagable, 0, buf)
	done := 0
	numBatches := 0
	start := time.Now()
	for {
		select {
		case o := <-ch:
			b = append(b, o)
			if len(b) == buf {
				if err := SendAppToTarget(b, sessionID); err != nil {
					return err
				}
				numBatches++
				done += buf
				b = make([]Messagable, 0, buf)
			}
		}
		if done == n {
			latency := time.Since(start)
			fmt.Printf("Spent %s seconds to send %d messages with %d batch size and %d batches\n", latency.String(), n, buf, numBatches)
			return nil
		}
	}
}

var cnt atomic.Int32

func newOrders(n int) []Messagable {
	res := make([]Messagable, 0, n)
	for i := 0; i < n; i++ {
		m := NewMessage()
		cnt.Add(1)
		m.Header.Set(w(35, "D"))
		m.Body.Set(w(38, "101.0"))
		m.Body.Set(w(11, strconv.Itoa(int(cnt.Load()))))
		m.Body.Set(w(55, "symbol"))
		m.Body.Set(w(54, "1"))
		m.Body.Set(w(59, "0"))
		m.Body.Set(w(40, "1"))
		res = append(res, m.ToMessage())
	}
	return res
}

func w(tag int, s string) FieldWriter {
	return &writer{tag: Tag(tag), s: s}
}

type writer struct {
	tag Tag
	s   string
}

func (w *writer) Tag() Tag {
	return w.tag
}

func (w *writer) Write() []byte {
	return []byte(w.s)
}

type app struct {
}

func (a app) OnCreate(sessionID SessionID) {
}

func (a app) OnLogon(sessionID SessionID) {
}

func (a app) OnLogout(sessionID SessionID) {
}

func (a app) ToAdmin(message *Message, sessionID SessionID) {
}

func (a app) ToApp(message *Message, sessionID SessionID) error {
	return nil
}

func (a app) FromAdmin(message *Message, sessionID SessionID) MessageRejectError {
	return nil
}

func (a app) FromApp(message *Message, sessionID SessionID) MessageRejectError {
	return nil
}

@ackleymi
Copy link
Member

@Yu-Xie if you would change SendAppToTarget to SendBatchToTarget and SaveMessagesAndIncrNextSenderMsgSeqNum to SaveBatchAndIncrNextSenderMsgSeqNum I will merge. I'd like to keep func names immediately obvious to prevent accidental misuse. Also if the FileStore and MongoStore impls are easy enough to implement for you, please do so.

@Yu-Xie
Copy link
Author

Yu-Xie commented Nov 10, 2023

@Yu-Xie if you would change SendAppToTarget to SendBatchToTarget and SaveMessagesAndIncrNextSenderMsgSeqNum to SaveBatchAndIncrNextSenderMsgSeqNum I will merge. I'd like to keep func names immediately obvious to prevent accidental misuse. Also if the FileStore and MongoStore impls are easy enough to implement for you, please do so.

Makes sense -- I have addressed these feedbacks in #599

for _, mb := range msgBytes {
s.toSend = append(s.toSend, mb)
select {
case s.messageEvent <- true:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be moved out of the loop as we don't release the lock until we return so it is not very useful to notify each time

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants