Skip to content

Commit

Permalink
En 3569 fix display of txs from pool and for some metrics (#379)
Browse files Browse the repository at this point in the history
* Fixed display of txs from pool
* Fixed metrics about validator count, leader count, accepted blocks
* Replaced some fmt.print.. with log.Info
* Refactored a method name
  • Loading branch information
SebastianMarian committed Aug 16, 2019
1 parent 0bf0b40 commit 04ca422
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 50 deletions.
30 changes: 16 additions & 14 deletions cmd/node/factory/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ const (
MaxTxsToRequest = 100
)

var log = logger.DefaultLogger()

// Network struct holds the network components of the Elrond protocol
type Network struct {
NetMessenger p2p.Messenger
Expand Down Expand Up @@ -863,57 +865,57 @@ func createShardDataPoolFromConfig(
uint64ByteSliceConverter typeConverters.Uint64ByteSliceConverter,
) (dataRetriever.PoolsHolder, error) {

fmt.Println("creatingShardDataPool from config")
log.Info("creatingShardDataPool from config")

txPool, err := shardedData.NewShardedData(getCacherFromConfig(config.TxDataPool))
if err != nil {
fmt.Println("error creating txpool")
log.Info("error creating txpool")
return nil, err
}

uTxPool, err := shardedData.NewShardedData(getCacherFromConfig(config.UnsignedTransactionDataPool))
if err != nil {
fmt.Println("error creating smart contract result")
log.Info("error creating smart contract result")
return nil, err
}

cacherCfg := getCacherFromConfig(config.BlockHeaderDataPool)
hdrPool, err := storageUnit.NewCache(cacherCfg.Type, cacherCfg.Size, cacherCfg.Shards)
if err != nil {
fmt.Println("error creating hdrpool")
log.Info("error creating hdrpool")
return nil, err
}

cacherCfg = getCacherFromConfig(config.MetaBlockBodyDataPool)
metaBlockBody, err := storageUnit.NewCache(cacherCfg.Type, cacherCfg.Size, cacherCfg.Shards)
if err != nil {
fmt.Println("error creating metaBlockBody")
log.Info("error creating metaBlockBody")
return nil, err
}

cacherCfg = getCacherFromConfig(config.BlockHeaderNoncesDataPool)
hdrNoncesCacher, err := storageUnit.NewCache(cacherCfg.Type, cacherCfg.Size, cacherCfg.Shards)
if err != nil {
fmt.Println("error creating hdrNoncesCacher")
log.Info("error creating hdrNoncesCacher")
return nil, err
}
hdrNonces, err := dataPool.NewNonceSyncMapCacher(hdrNoncesCacher, uint64ByteSliceConverter)
if err != nil {
fmt.Println("error creating hdrNonces")
log.Info("error creating hdrNonces")
return nil, err
}

cacherCfg = getCacherFromConfig(config.TxBlockBodyDataPool)
txBlockBody, err := storageUnit.NewCache(cacherCfg.Type, cacherCfg.Size, cacherCfg.Shards)
if err != nil {
fmt.Println("error creating txBlockBody")
log.Info("error creating txBlockBody")
return nil, err
}

cacherCfg = getCacherFromConfig(config.PeerBlockBodyDataPool)
peerChangeBlockBody, err := storageUnit.NewCache(cacherCfg.Type, cacherCfg.Size, cacherCfg.Shards)
if err != nil {
fmt.Println("error creating peerChangeBlockBody")
log.Info("error creating peerChangeBlockBody")
return nil, err
}

Expand All @@ -935,31 +937,31 @@ func createMetaDataPoolFromConfig(
cacherCfg := getCacherFromConfig(config.MetaBlockBodyDataPool)
metaBlockBody, err := storageUnit.NewCache(cacherCfg.Type, cacherCfg.Size, cacherCfg.Shards)
if err != nil {
fmt.Println("error creating metaBlockBody")
log.Info("error creating metaBlockBody")
return nil, err
}

miniBlockHashes, err := shardedData.NewShardedData(getCacherFromConfig(config.MiniBlockHeaderHashesDataPool))
if err != nil {
fmt.Println("error creating miniBlockHashes")
log.Info("error creating miniBlockHashes")
return nil, err
}

cacherCfg = getCacherFromConfig(config.ShardHeadersDataPool)
shardHeaders, err := storageUnit.NewCache(cacherCfg.Type, cacherCfg.Size, cacherCfg.Shards)
if err != nil {
fmt.Println("error creating shardHeaders")
log.Info("error creating shardHeaders")
return nil, err
}

headersNoncesCacher, err := storageUnit.NewCache(cacherCfg.Type, cacherCfg.Size, cacherCfg.Shards)
if err != nil {
fmt.Println("error creating shard headers nonces pool")
log.Info("error creating shard headers nonces pool")
return nil, err
}
headersNonces, err := dataPool.NewNonceSyncMapCacher(headersNoncesCacher, uint64ByteSliceConverter)
if err != nil {
fmt.Println("error creating shard headers nonces pool")
log.Info("error creating shard headers nonces pool")
return nil, err
}

Expand Down
3 changes: 3 additions & 0 deletions cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,9 @@ func startNode(ctx *cli.Context, log *logger.Logger, version string) error {
coreComponents.StatusHandler.SetStringValue(core.MetricNodeType, string(nodeType))
coreComponents.StatusHandler.SetUInt64Value(core.MetricRoundTime, nodesConfig.RoundDuration/milisecondsInSecond)
coreComponents.StatusHandler.SetStringValue(core.MetricAppVersion, version)
coreComponents.StatusHandler.SetUInt64Value(core.MetricCountConsensus, 0)
coreComponents.StatusHandler.SetUInt64Value(core.MetricCountLeader, 0)
coreComponents.StatusHandler.SetUInt64Value(core.MetricCountAcceptedBlocks, 0)

dataArgs := factory.NewDataComponentsFactoryArgs(generalConfig, shardCoordinator, coreComponents, uniqueDBFolder)
dataComponents, err := factory.DataComponentsFactory(dataArgs)
Expand Down
3 changes: 1 addition & 2 deletions consensus/chronology/chronology.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,6 @@ func (chr *chronology) startRound() {
msg := fmt.Sprintf("SUBROUND %s BEGINS", sr.Name())
log.Info(log.Headline(msg, chr.syncTimer.FormattedCurrentTime(), "."))

chr.appStatusHandler.SetUInt64Value(core.MetricCurrentRound, uint64(chr.rounder.Index()))

if !sr.DoWork(chr.rounder) {
chr.subroundId = srBeforeStartRound
return
Expand Down Expand Up @@ -168,6 +166,7 @@ func (chr *chronology) initRound() {

if hasSubroundsAndGenesisTimePassed {
chr.subroundId = chr.subroundHandlers[0].Current()
chr.appStatusHandler.SetUInt64Value(core.MetricCurrentRound, uint64(chr.rounder.Index()))
}

chr.mutSubrounds.RUnlock()
Expand Down
5 changes: 5 additions & 0 deletions consensus/spos/bn/bnSubroundsFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ func (fct *factory) generateStartRoundSubround() error {
return err
}

err = subroundStartRound.SetAppStatusHandler(fct.appStatusHandler)
if err != nil {
return err
}

fct.consensusCore.Chronology().AddSubround(subroundStartRound)

return nil
Expand Down
8 changes: 3 additions & 5 deletions consensus/spos/commonSubround/subroundStartRound.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func (sr *SubroundStartRound) initCurrentRound() bool {

msg := ""
if leader == sr.SelfPubKey() {
sr.appStatusHandler.Increment(core.MetricCountLeader)
msg = " (my turn)"
}

Expand All @@ -153,6 +154,8 @@ func (sr *SubroundStartRound) initCurrentRound() bool {
return false
}

sr.appStatusHandler.Increment(core.MetricCountConsensus)

err = sr.MultiSigner().Reset(pubKeys, uint16(selfIndex))
if err != nil {
log.Error(err.Error())
Expand All @@ -177,14 +180,9 @@ func (sr *SubroundStartRound) initCurrentRound() bool {
sr.SetStatus(sr.Current(), spos.SsFinished)

if leader == sr.SelfPubKey() {
sr.appStatusHandler.Increment(core.MetricCountLeader)
//TODO: Should be analyzed if call of sr.broadcastUnnotarisedBlocks() is still necessary
}

sr.appStatusHandler.Increment(core.MetricCountConsensus)

//TODO rollback decrement

// execute stored messages which were received in this new round but before this initialisation
go sr.executeStoredMessages()

Expand Down
66 changes: 47 additions & 19 deletions node/nodeTesting.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/ElrondNetwork/elrond-go/process"
"github.com/ElrondNetwork/elrond-go/process/factory"
"github.com/ElrondNetwork/elrond-go/sharding"
"github.com/ElrondNetwork/elrond-go/storage"
)

// maxLoadThresholdPercent specifies the max load percent accepted from txs storage size when generates new txs
Expand All @@ -25,12 +26,12 @@ const maxGoRoutinesSendMessage = 30

// GenerateAndSendBulkTransactions is a method for generating and propagating a set
// of transactions to be processed. It is mainly used for demo purposes
func (n *Node) GenerateAndSendBulkTransactions(receiverHex string, value *big.Int, noOfTx uint64) error {
func (n *Node) GenerateAndSendBulkTransactions(receiverHex string, value *big.Int, noOfTxs uint64) error {
if atomic.LoadInt32(&n.currentSendingGoRoutines) >= maxGoRoutinesSendMessage {
return ErrSystemBusyGeneratingTransactions
}

err := n.generateBulkTransactionsChecks(noOfTx)
err := n.generateBulkTransactionsChecks(noOfTxs)
if err != nil {
return err
}
Expand All @@ -42,27 +43,35 @@ func (n *Node) GenerateAndSendBulkTransactions(receiverHex string, value *big.In
return ErrNilTransactionPool
}

maxNoOfTx := int64(0)
txStorageSize := uint64(n.txStorageSize) * maxLoadThresholdPercent / 100
selfId := n.shardCoordinator.SelfId()
strCache := process.ShardCacherIdentifier(selfId, selfId)
txStore := txPool.ShardDataStore(strCache)
if txStore != nil {
noOfTxs = getMaxNoOfTxsToGenerate(txStore, txStorageSize, noOfTxs)
}

for i := uint32(0); i < n.shardCoordinator.NumberOfShards(); i++ {
strCache := process.ShardCacherIdentifier(n.shardCoordinator.SelfId(), i)
txStore := txPool.ShardDataStore(strCache)
if txStore == nil {
if i == selfId {
continue
}

txStoreLen := uint64(txStore.Len())
if txStoreLen+noOfTx > txStorageSize {
maxNoOfTx = int64(txStorageSize) - int64(txStoreLen)
if int64(noOfTx) > maxNoOfTx {
if maxNoOfTx <= 0 {
return ErrTooManyTransactionsInPool
}
strCache = process.ShardCacherIdentifier(i, selfId)
txStore = txPool.ShardDataStore(strCache)
if txStore != nil {
noOfTxs = getMaxNoOfTxsToGenerate(txStore, txStorageSize, noOfTxs)
}

noOfTx = uint64(maxNoOfTx)
}
strCache = process.ShardCacherIdentifier(selfId, i)
txStore = txPool.ShardDataStore(strCache)
if txStore != nil {
noOfTxs = getMaxNoOfTxsToGenerate(txStore, txStorageSize, noOfTxs)
}
}

if noOfTxs == 0 {
return ErrTooManyTransactionsInPool
}
}

newNonce, senderAddressBytes, recvAddressBytes, senderShardId, err := n.generateBulkTransactionsPrepareParams(receiverHex)
Expand All @@ -71,7 +80,7 @@ func (n *Node) GenerateAndSendBulkTransactions(receiverHex string, value *big.In
}

wg := sync.WaitGroup{}
wg.Add(int(noOfTx))
wg.Add(int(noOfTxs))

mutTransactions := sync.RWMutex{}
transactions := make([][]byte, 0)
Expand All @@ -84,7 +93,7 @@ func (n *Node) GenerateAndSendBulkTransactions(receiverHex string, value *big.In
return err
}

for nonce := newNonce; nonce < newNonce+noOfTx; nonce++ {
for nonce := newNonce; nonce < newNonce+noOfTxs; nonce++ {
go func(crtNonce uint64) {
_, signedTxBuff, err := n.generateAndSignSingleTx(
crtNonce,
Expand Down Expand Up @@ -116,8 +125,8 @@ func (n *Node) GenerateAndSendBulkTransactions(receiverHex string, value *big.In
return errFound
}

if len(transactions) != int(noOfTx) {
return errors.New(fmt.Sprintf("generated only %d from required %d transactions", len(transactions), noOfTx))
if len(transactions) != int(noOfTxs) {
return errors.New(fmt.Sprintf("generated only %d from required %d transactions", len(transactions), noOfTxs))
}

//the topic identifier is made of the current shard id and sender's shard id
Expand All @@ -144,6 +153,25 @@ func (n *Node) GenerateAndSendBulkTransactions(receiverHex string, value *big.In
return nil
}

func getMaxNoOfTxsToGenerate(
txStore storage.Cacher,
txStorageSize uint64,
noOfTxs uint64,
) uint64 {

txStoreLen := uint64(txStore.Len())
if txStoreLen >= txStorageSize {
return 0
}

maxNoOfTxs := txStorageSize - txStoreLen
if maxNoOfTxs < noOfTxs {
return maxNoOfTxs
}

return noOfTxs
}

// GenerateAndSendBulkTransactionsOneByOne is a method for generating and propagating a set
// of transactions to be processed. It is mainly used for demo purposes
func (n *Node) GenerateAndSendBulkTransactionsOneByOne(receiverHex string, value *big.Int, noOfTx uint64) error {
Expand Down
2 changes: 1 addition & 1 deletion p2p/libp2p/netMessenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func createMessenger(
}(pb, netMes.outgoingPLB)

for _, address := range netMes.ctxProvider.Host().Addrs() {
fmt.Println(address.String() + "/p2p/" + netMes.ID().Pretty())
log.Info(address.String() + "/p2p/" + netMes.ID().Pretty())
}

return &netMes, nil
Expand Down
29 changes: 22 additions & 7 deletions process/block/displayBlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,37 @@ func NewTransactionCounter() *transactionCounter {
}
}

// getNumTxsWithDst returns the number of transactions for a certain destination shard
func (txc *transactionCounter) getNumTxsWithDst(dstShardId uint32, dataPool dataRetriever.PoolsHolder, nrShards uint32) int {
// getNumTxsFromPool returns the number of transactions from pool for a given shard
func (txc *transactionCounter) getNumTxsFromPool(shardId uint32, dataPool dataRetriever.PoolsHolder, nrShards uint32) int {
txPool := dataPool.Transactions()
if txPool == nil {
return 0
}

sumTxs := 0

strCache := process.ShardCacherIdentifier(shardId, shardId)
txStore := txPool.ShardDataStore(strCache)
if txStore != nil {
sumTxs += txStore.Len()
}

for i := uint32(0); i < nrShards; i++ {
strCache := process.ShardCacherIdentifier(i, dstShardId)
txStore := txPool.ShardDataStore(strCache)
if txStore == nil {
if i == shardId {
continue
}
sumTxs += txStore.Len()

strCache = process.ShardCacherIdentifier(i, shardId)
txStore = txPool.ShardDataStore(strCache)
if txStore != nil {
sumTxs += txStore.Len()
}

strCache = process.ShardCacherIdentifier(shardId, i)
txStore = txPool.ShardDataStore(strCache)
if txStore != nil {
sumTxs += txStore.Len()
}
}

return sumTxs
Expand Down Expand Up @@ -82,7 +97,7 @@ func (txc *transactionCounter) displayLogInfo(
core.ToB64(headerHash),
txc.totalTxs,
txc.currentBlockTxs,
txc.getNumTxsWithDst(selfId, dataPool, numShards),
txc.getNumTxsFromPool(selfId, dataPool, numShards),
numShards,
selfId)
txc.mutex.RUnlock()
Expand Down
2 changes: 1 addition & 1 deletion process/block/shardblock.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (sp *shardProcessor) ProcessBlock(
return err
}

numTxWithDst := sp.txCounter.getNumTxsWithDst(header.ShardId, sp.dataPool, sp.shardCoordinator.NumberOfShards())
numTxWithDst := sp.txCounter.getNumTxsFromPool(header.ShardId, sp.dataPool, sp.shardCoordinator.NumberOfShards())

sp.appStatusHandler.SetUInt64Value(core.MetricTxPoolLoad, uint64(numTxWithDst))

Expand Down
1 change: 0 additions & 1 deletion statusHandler/termuiStatusHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ func (tsh *TermuiStatusHandler) Increment(key string) {

keyValue++
tsh.termuiConsoleMetrics.Store(key, keyValue)

}

// Decrement - will decrement the value of a key
Expand Down

0 comments on commit 04ca422

Please sign in to comment.