diff --git a/server/config.go b/server/config.go index 9b1655f58..9b6197bfe 100644 --- a/server/config.go +++ b/server/config.go @@ -278,6 +278,9 @@ func CheckConfig(logger *zap.Logger, config Config) map[string]string { if config.GetMatchmaker().BatchPoolSize < 1 { logger.Fatal("Matchmaker batch pool size must be >= 1", zap.Int("matchmaker.batch_pool_size", config.GetMatchmaker().BatchPoolSize)) } + if config.GetMatchmaker().RevThreshold < 0 { + logger.Fatal("Matchmaker reverse matching threshold must be >= 0", zap.Int("matchmaker.rev_threshold", config.GetMatchmaker().RevThreshold)) + } // If the runtime path is not overridden, set it to `datadir/modules`. if config.GetRuntime().Path == "" { @@ -934,6 +937,7 @@ type MatchmakerConfig struct { MaxIntervals int `yaml:"max_intervals" json:"max_intervals" usage:"How many intervals the matchmaker attempts to find matches at the max player count, before allowing min count. Default 2."` BatchPoolSize int `yaml:"batch_pool_size" json:"batch_pool_size" usage:"Number of concurrent indexing batches that will be allocated."` RevPrecision bool `yaml:"rev_precision" json:"rev_precision" usage:"Reverse matching precision. Default true."` + RevThreshold int `yaml:"rev_threshold" json:"rev_threshold" usage:"Reverse matching threshold. Default 1."` } func NewMatchmakerConfig() *MatchmakerConfig { @@ -943,6 +947,7 @@ func NewMatchmakerConfig() *MatchmakerConfig { MaxIntervals: 2, BatchPoolSize: 32, RevPrecision: true, + RevThreshold: 1, } } diff --git a/server/matchmaker.go b/server/matchmaker.go index 4f46d3bd9..005048daa 100644 --- a/server/matchmaker.go +++ b/server/matchmaker.go @@ -106,6 +106,7 @@ type MatchmakerIndex struct { Node string `json:"-"` StringProperties map[string]string `json:"-"` NumericProperties map[string]float64 `json:"-"` + ParsedQuery bluge.Query `json:"-"` } type MatchmakerExtract struct { @@ -206,6 +207,7 @@ type LocalMatchmaker struct { indexes map[string]*MatchmakerIndex activeIndexes map[string]*MatchmakerIndex revCache map[string]map[string]bool + revThresholdFn func() *time.Timer } func NewLocalMatchmaker(logger, startupLogger *zap.Logger, config Config, router MessageRouter, metrics Metrics, runtime *Runtime) Matchmaker { @@ -240,6 +242,12 @@ func NewLocalMatchmaker(logger, startupLogger *zap.Logger, config Config, router revCache: make(map[string]map[string]bool), } + if revThreshold := m.config.GetMatchmaker().RevThreshold; revThreshold > 0 && m.config.GetMatchmaker().RevPrecision { + m.revThresholdFn = func() *time.Timer { + return time.NewTimer(time.Duration(m.config.GetMatchmaker().IntervalSec*revThreshold) * time.Second) + } + } + go func() { ticker := time.NewTicker(time.Duration(config.GetMatchmaker().IntervalSec) * time.Second) for { @@ -292,7 +300,22 @@ func (m *LocalMatchmaker) Process() { return } + var threshold bool + var timer *time.Timer + if m.revThresholdFn != nil { + timer = m.revThresholdFn() + defer timer.Stop() + } + for ticket, index := range m.activeIndexes { + if !threshold && timer != nil { + select { + case <-timer.C: + threshold = true + default: + } + } + index.Intervals++ lastInterval := index.Intervals >= m.config.GetMatchmaker().MaxIntervals || index.MinCount == index.MaxCount if lastInterval { @@ -306,13 +329,9 @@ func (m *LocalMatchmaker) Process() { } indexQuery := bluge.NewBooleanQuery() + // Results must match the query string. - parsedIndexQuery, err := ParseQueryString(index.Query) - if err != nil { - m.logger.Error("error parsing query string", zap.Error(err)) - continue - } - indexQuery.AddMust(parsedIndexQuery) + indexQuery.AddMust(index.ParsedQuery) // Results must also have compatible min/max ranges, for example 2-4 must not match with 6-8. minCountRange := bluge.NewNumericRangeInclusiveQuery( @@ -381,7 +400,7 @@ func (m *LocalMatchmaker) Process() { continue } - outerMutualMatch, err := validateMatch(m, indexReader, hitIndex.Query, hit.ID, ticket) + outerMutualMatch, err := validateMatch(m, threshold, indexReader, hitIndex.ParsedQuery, hit.ID, ticket) if err != nil { m.logger.Error("error validating mutual match", zap.Error(err)) continue @@ -425,7 +444,7 @@ func (m *LocalMatchmaker) Process() { sessionIdConflict = true break } - entryMatchesSearchHitQuery, err := validateMatch(m, indexReader, hitIndex.Query, hit.ID, entry.Ticket) + entryMatchesSearchHitQuery, err := validateMatch(m, threshold, indexReader, hitIndex.ParsedQuery, hit.ID, entry.Ticket) if err != nil { mutualMatchConflict = true m.logger.Error("error validating mutual match", zap.Error(err)) @@ -437,7 +456,7 @@ func (m *LocalMatchmaker) Process() { } // MatchmakerEntry does not have the query, read it out of indexes. if entriesIndexEntry, ok := m.indexes[entry.Ticket]; ok { - searchHitMatchesEntryQuery, err := validateMatch(m, indexReader, entriesIndexEntry.Query, entry.Ticket, hit.ID) + searchHitMatchesEntryQuery, err := validateMatch(m, threshold, indexReader, entriesIndexEntry.ParsedQuery, entry.Ticket, hit.ID) if err != nil { mutualMatchConflict = true m.logger.Error("error validating mutual match", zap.Error(err)) @@ -704,6 +723,7 @@ func (m *LocalMatchmaker) Add(presences []*MatchmakerPresence, sessionID, partyI Node: m.node, StringProperties: stringProperties, NumericProperties: numericProperties, + ParsedQuery: parsedQuery, } m.Lock() @@ -780,6 +800,18 @@ func (m *LocalMatchmaker) Insert(extracts []*MatchmakerExtract) error { entries := make(map[string][]*MatchmakerEntry, len(extracts)) for _, extract := range extracts { + parsedQuery, err := ParseQueryString(extract.Query) + if err != nil { + m.logger.Error("error parsing matchmaker query", zap.Error(err), zap.String("query", extract.Query)) + continue + } + if parsedQuery, ok := parsedQuery.(ValidatableQuery); ok { + if parsedQuery.Validate() != nil { + m.logger.Error("error validating matchmaker query", zap.String("query", extract.Query)) + continue + } + } + properties := make(map[string]interface{}, len(extract.StringProperties)+len(extract.NumericProperties)) for k, v := range extract.StringProperties { properties[k] = v @@ -791,7 +823,8 @@ func (m *LocalMatchmaker) Insert(extracts []*MatchmakerExtract) error { sessionIDs := make(map[string]struct{}, len(extract.Presences)) for _, presence := range extract.Presences { if _, found := sessionIDs[presence.SessionId]; found { - return runtime.ErrMatchmakerDuplicateSession + m.logger.Error("error checking matchmaker session duplicates", zap.String("session_id", presence.SessionId)) + continue } sessionIDs[presence.SessionId] = struct{}{} } @@ -813,13 +846,13 @@ func (m *LocalMatchmaker) Insert(extracts []*MatchmakerExtract) error { Node: extract.Node, StringProperties: extract.StringProperties, NumericProperties: extract.NumericProperties, + ParsedQuery: parsedQuery, } matchmakerIndexDoc, err := MapMatchmakerIndex(extract.Ticket, index) if err != nil { - m.Unlock() m.logger.Error("error mapping matchmaker index document", zap.Error(err)) - return runtime.ErrMatchmakerIndex + continue } batch.Insert(matchmakerIndexDoc) @@ -1260,8 +1293,8 @@ func MapMatchmakerIndex(id string, in *MatchmakerIndex) (*bluge.Document, error) return rv, nil } -func validateMatch(m *LocalMatchmaker, r *bluge.Reader, queryStr string, fromTicket, toTicket string) (bool, error) { - if !m.config.GetMatchmaker().RevPrecision { +func validateMatch(m *LocalMatchmaker, threshold bool, r *bluge.Reader, fromTicketQuery bluge.Query, fromTicket, toTicket string) (bool, error) { + if threshold || !m.config.GetMatchmaker().RevPrecision { return true, nil } @@ -1272,15 +1305,10 @@ func validateMatch(m *LocalMatchmaker, r *bluge.Reader, queryStr string, fromTic } } - ticketQuery, err := ParseQueryString(queryStr) - if err != nil { - return false, err - } - idQuery := bluge.NewTermQuery(toTicket).SetField("_id") topQuery := bluge.NewBooleanQuery() - topQuery.AddMust(idQuery, ticketQuery) + topQuery.AddMust(idQuery, fromTicketQuery) req := bluge.NewTopNSearch(0, topQuery).WithStandardAggregations() dmi, err := r.Search(m.ctx, req)