Skip to content

Commit

Permalink
Improve matchmaker parsed query caching.
Browse files Browse the repository at this point in the history
  • Loading branch information
zyro committed Jul 1, 2022
1 parent 1253617 commit c812e99
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 20 deletions.
5 changes: 5 additions & 0 deletions server/config.go
Expand Up @@ -278,6 +278,9 @@ func CheckConfig(logger *zap.Logger, config Config) map[string]string {
if config.GetMatchmaker().BatchPoolSize < 1 {
logger.Fatal("Matchmaker batch pool size must be >= 1", zap.Int("matchmaker.batch_pool_size", config.GetMatchmaker().BatchPoolSize))
}
if config.GetMatchmaker().RevThreshold < 0 {
logger.Fatal("Matchmaker reverse matching threshold must be >= 0", zap.Int("matchmaker.rev_threshold", config.GetMatchmaker().RevThreshold))
}

// If the runtime path is not overridden, set it to `datadir/modules`.
if config.GetRuntime().Path == "" {
Expand Down Expand Up @@ -934,6 +937,7 @@ type MatchmakerConfig struct {
MaxIntervals int `yaml:"max_intervals" json:"max_intervals" usage:"How many intervals the matchmaker attempts to find matches at the max player count, before allowing min count. Default 2."`
BatchPoolSize int `yaml:"batch_pool_size" json:"batch_pool_size" usage:"Number of concurrent indexing batches that will be allocated."`
RevPrecision bool `yaml:"rev_precision" json:"rev_precision" usage:"Reverse matching precision. Default true."`
RevThreshold int `yaml:"rev_threshold" json:"rev_threshold" usage:"Reverse matching threshold. Default 1."`
}

func NewMatchmakerConfig() *MatchmakerConfig {
Expand All @@ -943,6 +947,7 @@ func NewMatchmakerConfig() *MatchmakerConfig {
MaxIntervals: 2,
BatchPoolSize: 32,
RevPrecision: true,
RevThreshold: 1,
}
}

Expand Down
68 changes: 48 additions & 20 deletions server/matchmaker.go
Expand Up @@ -106,6 +106,7 @@ type MatchmakerIndex struct {
Node string `json:"-"`
StringProperties map[string]string `json:"-"`
NumericProperties map[string]float64 `json:"-"`
ParsedQuery bluge.Query `json:"-"`
}

type MatchmakerExtract struct {
Expand Down Expand Up @@ -206,6 +207,7 @@ type LocalMatchmaker struct {
indexes map[string]*MatchmakerIndex
activeIndexes map[string]*MatchmakerIndex
revCache map[string]map[string]bool
revThresholdFn func() *time.Timer
}

func NewLocalMatchmaker(logger, startupLogger *zap.Logger, config Config, router MessageRouter, metrics Metrics, runtime *Runtime) Matchmaker {
Expand Down Expand Up @@ -240,6 +242,12 @@ func NewLocalMatchmaker(logger, startupLogger *zap.Logger, config Config, router
revCache: make(map[string]map[string]bool),
}

if revThreshold := m.config.GetMatchmaker().RevThreshold; revThreshold > 0 && m.config.GetMatchmaker().RevPrecision {
m.revThresholdFn = func() *time.Timer {
return time.NewTimer(time.Duration(m.config.GetMatchmaker().IntervalSec*revThreshold) * time.Second)
}
}

go func() {
ticker := time.NewTicker(time.Duration(config.GetMatchmaker().IntervalSec) * time.Second)
for {
Expand Down Expand Up @@ -292,7 +300,22 @@ func (m *LocalMatchmaker) Process() {
return
}

var threshold bool
var timer *time.Timer
if m.revThresholdFn != nil {
timer = m.revThresholdFn()
defer timer.Stop()
}

for ticket, index := range m.activeIndexes {
if !threshold && timer != nil {
select {
case <-timer.C:
threshold = true
default:
}
}

index.Intervals++
lastInterval := index.Intervals >= m.config.GetMatchmaker().MaxIntervals || index.MinCount == index.MaxCount
if lastInterval {
Expand All @@ -306,13 +329,9 @@ func (m *LocalMatchmaker) Process() {
}

indexQuery := bluge.NewBooleanQuery()

// Results must match the query string.
parsedIndexQuery, err := ParseQueryString(index.Query)
if err != nil {
m.logger.Error("error parsing query string", zap.Error(err))
continue
}
indexQuery.AddMust(parsedIndexQuery)
indexQuery.AddMust(index.ParsedQuery)

// Results must also have compatible min/max ranges, for example 2-4 must not match with 6-8.
minCountRange := bluge.NewNumericRangeInclusiveQuery(
Expand Down Expand Up @@ -381,7 +400,7 @@ func (m *LocalMatchmaker) Process() {
continue
}

outerMutualMatch, err := validateMatch(m, indexReader, hitIndex.Query, hit.ID, ticket)
outerMutualMatch, err := validateMatch(m, threshold, indexReader, hitIndex.ParsedQuery, hit.ID, ticket)
if err != nil {
m.logger.Error("error validating mutual match", zap.Error(err))
continue
Expand Down Expand Up @@ -425,7 +444,7 @@ func (m *LocalMatchmaker) Process() {
sessionIdConflict = true
break
}
entryMatchesSearchHitQuery, err := validateMatch(m, indexReader, hitIndex.Query, hit.ID, entry.Ticket)
entryMatchesSearchHitQuery, err := validateMatch(m, threshold, indexReader, hitIndex.ParsedQuery, hit.ID, entry.Ticket)
if err != nil {
mutualMatchConflict = true
m.logger.Error("error validating mutual match", zap.Error(err))
Expand All @@ -437,7 +456,7 @@ func (m *LocalMatchmaker) Process() {
}
// MatchmakerEntry does not have the query, read it out of indexes.
if entriesIndexEntry, ok := m.indexes[entry.Ticket]; ok {
searchHitMatchesEntryQuery, err := validateMatch(m, indexReader, entriesIndexEntry.Query, entry.Ticket, hit.ID)
searchHitMatchesEntryQuery, err := validateMatch(m, threshold, indexReader, entriesIndexEntry.ParsedQuery, entry.Ticket, hit.ID)
if err != nil {
mutualMatchConflict = true
m.logger.Error("error validating mutual match", zap.Error(err))
Expand Down Expand Up @@ -704,6 +723,7 @@ func (m *LocalMatchmaker) Add(presences []*MatchmakerPresence, sessionID, partyI
Node: m.node,
StringProperties: stringProperties,
NumericProperties: numericProperties,
ParsedQuery: parsedQuery,
}

m.Lock()
Expand Down Expand Up @@ -780,6 +800,18 @@ func (m *LocalMatchmaker) Insert(extracts []*MatchmakerExtract) error {
entries := make(map[string][]*MatchmakerEntry, len(extracts))

for _, extract := range extracts {
parsedQuery, err := ParseQueryString(extract.Query)
if err != nil {
m.logger.Error("error parsing matchmaker query", zap.Error(err), zap.String("query", extract.Query))
continue
}
if parsedQuery, ok := parsedQuery.(ValidatableQuery); ok {
if parsedQuery.Validate() != nil {
m.logger.Error("error validating matchmaker query", zap.String("query", extract.Query))
continue
}
}

properties := make(map[string]interface{}, len(extract.StringProperties)+len(extract.NumericProperties))
for k, v := range extract.StringProperties {
properties[k] = v
Expand All @@ -791,7 +823,8 @@ func (m *LocalMatchmaker) Insert(extracts []*MatchmakerExtract) error {
sessionIDs := make(map[string]struct{}, len(extract.Presences))
for _, presence := range extract.Presences {
if _, found := sessionIDs[presence.SessionId]; found {
return runtime.ErrMatchmakerDuplicateSession
m.logger.Error("error checking matchmaker session duplicates", zap.String("session_id", presence.SessionId))
continue
}
sessionIDs[presence.SessionId] = struct{}{}
}
Expand All @@ -813,13 +846,13 @@ func (m *LocalMatchmaker) Insert(extracts []*MatchmakerExtract) error {
Node: extract.Node,
StringProperties: extract.StringProperties,
NumericProperties: extract.NumericProperties,
ParsedQuery: parsedQuery,
}

matchmakerIndexDoc, err := MapMatchmakerIndex(extract.Ticket, index)
if err != nil {
m.Unlock()
m.logger.Error("error mapping matchmaker index document", zap.Error(err))
return runtime.ErrMatchmakerIndex
continue
}

batch.Insert(matchmakerIndexDoc)
Expand Down Expand Up @@ -1260,8 +1293,8 @@ func MapMatchmakerIndex(id string, in *MatchmakerIndex) (*bluge.Document, error)
return rv, nil
}

func validateMatch(m *LocalMatchmaker, r *bluge.Reader, queryStr string, fromTicket, toTicket string) (bool, error) {
if !m.config.GetMatchmaker().RevPrecision {
func validateMatch(m *LocalMatchmaker, threshold bool, r *bluge.Reader, fromTicketQuery bluge.Query, fromTicket, toTicket string) (bool, error) {
if threshold || !m.config.GetMatchmaker().RevPrecision {
return true, nil
}

Expand All @@ -1272,15 +1305,10 @@ func validateMatch(m *LocalMatchmaker, r *bluge.Reader, queryStr string, fromTic
}
}

ticketQuery, err := ParseQueryString(queryStr)
if err != nil {
return false, err
}

idQuery := bluge.NewTermQuery(toTicket).SetField("_id")

topQuery := bluge.NewBooleanQuery()
topQuery.AddMust(idQuery, ticketQuery)
topQuery.AddMust(idQuery, fromTicketQuery)

req := bluge.NewTopNSearch(0, topQuery).WithStandardAggregations()
dmi, err := r.Search(m.ctx, req)
Expand Down

0 comments on commit c812e99

Please sign in to comment.