Skip to content

Commit

Permalink
server: record a games completed stat
Browse files Browse the repository at this point in the history
Record a game completed statistic and persist it across process restarts
by storing it in the local kv store.
  • Loading branch information
jbowens committed May 3, 2020
1 parent e197874 commit d42dde3
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 24 deletions.
7 changes: 6 additions & 1 deletion cmd/codenames/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@ func main() {
}
log.Printf("[STARTUP] Opening pebble db from directory: %s\n", dir)

db, err := pebble.Open(dir, nil)
db, err := pebble.Open(dir, &pebble.Options{
Merger: &pebble.Merger{
Merge: codenames.PebbleMerge,
Name: "codenameskv",
},
})
if err != nil {
fmt.Fprintf(os.Stderr, "pebble.Open: %s\n", err)
os.Exit(1)
Expand Down
23 changes: 17 additions & 6 deletions game.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ type Game struct {
type GameOptions struct {
TimerDurationMS int64 `json:"timer_duration_ms,omitempty"`
EnforceTimer bool `json:"enforce_timer,omitempty"`
Hooks Hooks `json:"-"`
}

type Hooks struct {
Complete func()
}

func (g *Game) StateID() string {
Expand All @@ -154,12 +159,10 @@ func (g *Game) checkWinningCondition() {
}
}
if !redRemaining {
winners := Red
g.WinningTeam = &winners
g.win(Red)
}
if !blueRemaining {
winners := Blue
g.WinningTeam = &winners
g.win(Blue)
}
}

Expand All @@ -178,6 +181,11 @@ func (g *Game) NextTurn(currentTurn int) bool {
return true
}

func (g *Game) win(team Team) {
g.WinningTeam = &team
g.Hooks.Complete()
}

func (g *Game) Guess(idx int) error {
if idx > len(g.Layout) || idx < 0 {
return fmt.Errorf("index %d is invalid", idx)
Expand All @@ -189,8 +197,7 @@ func (g *Game) Guess(idx int) error {
g.Revealed[idx] = true

if g.Layout[idx] == Black {
winners := g.currentTeam().Other()
g.WinningTeam = &winners
g.win(g.currentTeam().Other())
return nil
}

Expand All @@ -215,6 +222,10 @@ func newGame(id string, state GameState, opts GameOptions) *Game {
// distinct randomness across games with same seed
randRnd := rand.New(rand.NewSource(state.Seed * int64(state.PermIndex+1)))

if opts.Hooks.Complete == nil {
opts.Hooks.Complete = func() {}
}

game := &Game{
ID: id,
CreatedAt: time.Now(),
Expand Down
64 changes: 48 additions & 16 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package codenames
import (
"crypto/subtle"
"encoding/json"
"fmt"
"html/template"
"io"
"log"
Expand Down Expand Up @@ -31,18 +32,22 @@ type Server struct {

tpl *template.Template
gameIDWords []string
hooks Hooks

mu sync.Mutex
games map[string]*GameHandle
defaultWords []string
mux *http.ServeMux

statOpenRequests int64 // atomic access
statTotalRequests int64 //atomic access
statGamesCompleted int64 // atomic access
statOpenRequests int64 // atomic access
statTotalRequests int64 // atomic access
}

type Store interface {
Save(*Game) error
CounterAdd(string, int64) error
GetCounter(statPrefix string) (int64, error)
}

type GameHandle struct {
Expand Down Expand Up @@ -131,7 +136,7 @@ func (s *Server) getGameLocked(gameID string) (*GameHandle, bool) {
if ok {
return gh, ok
}
gh = newHandle(newGame(gameID, randomState(s.defaultWords), GameOptions{}), s.Store)
gh = newHandle(newGame(gameID, randomState(s.defaultWords), GameOptions{Hooks: s.hooks}), s.Store)
s.games[gameID] = gh
return gh, true
}
Expand All @@ -151,7 +156,7 @@ func (s *Server) handleGameState(rw http.ResponseWriter, req *http.Request) {
s.mu.Lock()
gh, ok := s.getGameLocked(body.GameID)
if !ok {
gh = newHandle(newGame(body.GameID, randomState(s.defaultWords), GameOptions{}), s.Store)
gh = newHandle(newGame(body.GameID, randomState(s.defaultWords), GameOptions{Hooks: s.hooks}), s.Store)
s.games[body.GameID] = gh
s.mu.Unlock()
writeGame(rw, gh)
Expand Down Expand Up @@ -273,6 +278,7 @@ func (s *Server) handleNextGame(rw http.ResponseWriter, req *http.Request) {
opts := GameOptions{
TimerDurationMS: request.TimerDurationMS,
EnforceTimer: request.EnforceTimer,
Hooks: s.hooks,
}

var ok bool
Expand All @@ -297,19 +303,18 @@ func (s *Server) handleNextGame(rw http.ResponseWriter, req *http.Request) {
}

type statsResponse struct {
GamesTotal int `json:"games_total"`
GamesInProgress int `json:"games_in_progress"`
GamesCreatedOneHour int `json:"games_created_1h"`
RequestsTotal int64 `json:"requests_total_process_lifetime"`
RequestsInFlight int64 `json:"requests_in_flight"`
GamesCompleted int64 `json:"games_completed"`
MemGamesTotal int `json:"mem_games_total"`
MemGamesInProgress int `json:"mem_games_in_progress"`
MemGamesCreatedOneHour int `json:"mem_games_created_1h"`
RequestsTotal int64 `json:"requests_total_process_lifetime"`
RequestsInFlight int64 `json:"requests_in_flight"`
}

func (s *Server) handleStats(rw http.ResponseWriter, req *http.Request) {
hourAgo := time.Now().Add(-time.Hour)

s.mu.Lock()
defer s.mu.Unlock()

var inProgress, createdWithinAnHour int
for _, gh := range s.games {
gh.mu.Lock()
Expand All @@ -321,12 +326,23 @@ func (s *Server) handleStats(rw http.ResponseWriter, req *http.Request) {
}
gh.mu.Unlock()
}
s.mu.Unlock()

// Sum up the count of games completed that's on disk and in-memory.
diskGamesCompleted, err := s.Store.GetCounter("games/completed/")
if err != nil {
http.Error(rw, err.Error(), 400)
return
}
memGamesCompleted := atomic.LoadInt64(&s.statGamesCompleted)

writeJSON(rw, statsResponse{
GamesTotal: len(s.games),
GamesInProgress: inProgress,
GamesCreatedOneHour: createdWithinAnHour,
RequestsTotal: atomic.LoadInt64(&s.statTotalRequests),
RequestsInFlight: atomic.LoadInt64(&s.statOpenRequests),
GamesCompleted: diskGamesCompleted + memGamesCompleted,
MemGamesTotal: len(s.games),
MemGamesInProgress: inProgress,
MemGamesCreatedOneHour: createdWithinAnHour,
RequestsTotal: atomic.LoadInt64(&s.statTotalRequests),
RequestsInFlight: atomic.LoadInt64(&s.statOpenRequests),
})
}

Expand Down Expand Up @@ -384,8 +400,11 @@ func (s *Server) Start(games map[string]*Game) error {
s.Store = discardStore{}
}

s.hooks.Complete = func() { atomic.AddInt64(&s.statGamesCompleted, 1) }

if games != nil {
for _, g := range games {
g.GameOptions.Hooks = s.hooks
s.games[g.ID] = newHandle(g, s.Store)
}
}
Expand All @@ -396,6 +415,19 @@ func (s *Server) Start(games map[string]*Game) error {
}
}()

// Periodically persist some in-memory stats.
go func() {
const hourFormat = "06010215"
for range time.Tick(time.Minute) {
hourKey := time.Now().UTC().Format(hourFormat) + "utc"
v := atomic.LoadInt64(&s.statGamesCompleted)
if v > 0 {
atomic.AddInt64(&s.statGamesCompleted, -v)
s.Store.CounterAdd(fmt.Sprintf("games/completed/%s", hourKey), v)
}
}
}()

return s.Server.ListenAndServe()
}

Expand Down
84 changes: 83 additions & 1 deletion store.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package codenames

import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"math"
Expand Down Expand Up @@ -62,6 +64,84 @@ func (ps *PebbleStore) Save(g *Game) error {
return err
}

func (ps *PebbleStore) CounterAdd(stat string, v int64) error {
var b [binary.MaxVarintLen64]byte
n := binary.PutVarint(b[:], v)

k := fmt.Sprintf("/stats/counters/%s", stat)
fmt.Printf("Writing stat to key %q, value %+v.\n", k, b[:n])

return ps.DB.Merge(
[]byte(k),
b[:n],
nil)
}

func (ps *PebbleStore) GetCounter(statPrefix string) (int64, error) {
prefix := []byte(fmt.Sprintf("/stats/counters/%s", statPrefix))

iter := ps.DB.NewIter(nil)
iter.SeekGE(prefix)

var sum int64
for ; iter.Valid() && bytes.HasPrefix(iter.Key(), prefix); iter.Next() {
rawV := iter.Value()
v, n := binary.Varint(rawV)
if n < 0 {
return 0, fmt.Errorf("unable to read stat value: %v for key %q", rawV, iter.Key())
}
sum += v
}
err := iter.Error()
if closeErr := iter.Close(); closeErr != nil {
err = closeErr
}

return sum, err
}

// PebbleMerge implements the pebble.Merge function type.
func PebbleMerge(k, v []byte) (pebble.ValueMerger, error) {
vInt, n := binary.Varint(v)
if n < 0 {
return nil, fmt.Errorf("unable to read merge value: %v", v)
}
//if bytes.HasPrefix(k, []byte("/stats/counters/")) {
return &addValueMerger{v: vInt}, nil
//}
//return nil, fmt.Errorf("unrecognized merge key: %s", pretty.Sprint(k))
}

// addValueMerger implements pebble.ValueMerger by interpreting values as a
// signed varint and adding its operands.
type addValueMerger struct {
v int64
}

func (m *addValueMerger) MergeNewer(value []byte) error {
v, n := binary.Varint(value)
if n < 0 {
return fmt.Errorf("unable to read merge value: %v", value)
}
m.v = m.v + v
return nil
}

func (m *addValueMerger) MergeOlder(value []byte) error {
v, n := binary.Varint(value)
if n < 0 {
return fmt.Errorf("unable to read merge value: %v", value)
}
m.v = m.v + v
return nil
}

func (m *addValueMerger) Finish() ([]byte, error) {
b := make([]byte, binary.MaxVarintLen64)
n := binary.PutVarint(b, m.v)
return b[:n], nil
}

func gameKV(g *Game) (key, value []byte, err error) {
value, err = json.Marshal(g)
if err != nil {
Expand All @@ -80,4 +160,6 @@ func mkkey(unixSecs int64, id string) []byte {

type discardStore struct{}

func (ds discardStore) Save(*Game) error { return nil }
func (_ discardStore) Save(*Game) error { return nil }
func (_ discardStore) GetCounter(string) (int64, error) { return 0, nil }
func (_ discardStore) CounterAdd(string, int64) error { return nil }

0 comments on commit d42dde3

Please sign in to comment.