Skip to content

Commit

Permalink
Merge pull request #66 from nats-io/fix-msgs-per-sec
Browse files Browse the repository at this point in the history
Fix msgs and bytes per sec rates
  • Loading branch information
wallyqs committed Apr 25, 2022
2 parents 8896b5b + 17fdae9 commit 220bc61
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 32 deletions.
3 changes: 1 addition & 2 deletions nats-top.go
Expand Up @@ -15,7 +15,7 @@ import (
ui "gopkg.in/gizak/termui.v1"
)

const version = "0.5.0"
const version = "0.5.2"

var (
host = flag.String("s", "127.0.0.1", "The nats server host.")
Expand Down Expand Up @@ -403,7 +403,6 @@ func generateParagraphCSV(
inMsgs, inBytes, inMsgsRate, inBytesRate,
outMsgs, outBytes, outMsgsRate, outBytesRate,
)

text += fmt.Sprintf("\n\nConnections Polled:[__DELIM__]%d\n", numConns)

displaySubs := engine.DisplaySubs
Expand Down
65 changes: 35 additions & 30 deletions util/toputils.go
Expand Up @@ -5,7 +5,6 @@ import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
Expand All @@ -17,16 +16,18 @@ import (
const DisplaySubscriptions = 1

type Engine struct {
Host string
Port int
HttpClient *http.Client
Uri string
Conns int
SortOpt server.SortOpt
Delay int
DisplaySubs bool
StatsCh chan *Stats
ShutdownCh chan struct{}
Host string
Port int
HttpClient *http.Client
Uri string
Conns int
SortOpt server.SortOpt
Delay int
DisplaySubs bool
StatsCh chan *Stats
ShutdownCh chan struct{}
LastStats *Stats
LastPollTime time.Time
}

func NewEngine(host string, port int, conns int, delay int) *Engine {
Expand Down Expand Up @@ -92,34 +93,24 @@ func (engine *Engine) Request(path string) (interface{}, error) {
// which can modify how poll values then sends to channel.
func (engine *Engine) MonitorStats() error {
delay := time.Duration(engine.Delay) * time.Second
isFirstTime := true
lastPollTime := time.Now()

for {
select {
case <-engine.ShutdownCh:
return nil
case <-time.After(delay):
stats, newLastPollTime := engine.fetchStats(isFirstTime, lastPollTime)
if stats != nil && errors.Is(stats.Error, errDud) {
isFirstTime = false
lastPollTime = newLastPollTime
}

engine.StatsCh <- stats
engine.StatsCh <- engine.fetchStats()
}
}
}

func (engine *Engine) FetchStatsSnapshot() *Stats {
stats, _ := engine.fetchStats(true, time.Now())

return stats
return engine.fetchStats()
}

var errDud = fmt.Errorf("")

func (engine *Engine) fetchStats(isFirstTime bool, lastPollTime time.Time) (*Stats, time.Time) {
func (engine *Engine) fetchStats() *Stats {
var inMsgsDelta int64
var outMsgsDelta int64
var inBytesDelta int64
Expand Down Expand Up @@ -147,7 +138,7 @@ func (engine *Engine) fetchStats(isFirstTime bool, lastPollTime time.Time) (*Sta
result, err := engine.Request("/varz")
if err != nil {
stats.Error = err
return stats, time.Time{}
return stats
}

if varz, ok := result.(*server.Varz); ok {
Expand All @@ -160,14 +151,24 @@ func (engine *Engine) fetchStats(isFirstTime bool, lastPollTime time.Time) (*Sta
result, err := engine.Request("/connz")
if err != nil {
stats.Error = err
return stats, time.Time{}
return stats
}

if connz, ok := result.(*server.Connz); ok {
stats.Connz = connz
}
}

var isFirstTime bool
if engine.LastStats != nil {
inMsgsLastVal = engine.LastStats.Varz.InMsgs
outMsgsLastVal = engine.LastStats.Varz.OutMsgs
inBytesLastVal = engine.LastStats.Varz.InBytes
outBytesLastVal = engine.LastStats.Varz.OutBytes
} else {
isFirstTime = true
}

// Periodic snapshot to get per sec metrics
inMsgsVal := stats.Varz.InMsgs
outMsgsVal := stats.Varz.OutMsgs
Expand All @@ -185,7 +186,7 @@ func (engine *Engine) fetchStats(isFirstTime bool, lastPollTime time.Time) (*Sta
outBytesLastVal = outBytesVal

now := time.Now()
tdelta := now.Sub(lastPollTime)
tdelta := now.Sub(engine.LastPollTime)

// Calculate rates but the first time
if !isFirstTime {
Expand All @@ -194,15 +195,19 @@ func (engine *Engine) fetchStats(isFirstTime bool, lastPollTime time.Time) (*Sta
inBytesRate = float64(inBytesDelta) / tdelta.Seconds()
outBytesRate = float64(outBytesDelta) / tdelta.Seconds()
}

stats.Rates = &Rates{
rates := &Rates{
InMsgsRate: inMsgsRate,
OutMsgsRate: outMsgsRate,
InBytesRate: inBytesRate,
OutBytesRate: outBytesRate,
}
stats.Rates = rates

return stats, now
// Snapshot stats.
engine.LastStats = stats
engine.LastPollTime = now

return stats
}

// SetupHTTPS sets up the http client and uri to use for polling.
Expand Down

0 comments on commit 220bc61

Please sign in to comment.