Skip to content

Commit

Permalink
Custom nonce monitor for blobber txns (#802)
Browse files Browse the repository at this point in the history
* Use a custom nonce monitor

* More fail conditions

* Add logging and fix recurring balance refresh

* Add expiry on nonce reservation

* Add more logging

* Capture setting of nonce failure

* Readd error

* Txn nonce log upon set

* Update txn verify to use nonce

* Update other txn verify to use nonce

* Fix error logging
  • Loading branch information
avanaur committed Aug 28, 2022
1 parent bb0d7bd commit 8dc6681
Show file tree
Hide file tree
Showing 13 changed files with 262 additions and 49 deletions.
1 change: 1 addition & 0 deletions code/go/0chain.net/blobbercore/allocation/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ func sendFinalizeAllocation(a *Allocation) {
var request finalizeRequest
request.AllocationID = a.ID

// TODO should this be verified?
err = tx.ExecuteSmartContract(
transaction.STORAGE_CONTRACT_ADDRESS,
transaction.FINALIZE_ALLOCATION,
Expand Down
2 changes: 1 addition & 1 deletion code/go/0chain.net/blobbercore/challenge/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (cr *ChallengeEntity) SubmitChallengeToBC(ctx context.Context) (*transactio
t *transaction.Transaction
)
for i := 0; i < 3; i++ {
t, err = transaction.VerifyTransaction(txn.Hash, chain.GetServerChain())
t, err = transaction.VerifyTransactionWithNonce(txn.Hash, txn.GetTransaction().GetTransactionNonce())
if err == nil {
break
}
Expand Down
10 changes: 5 additions & 5 deletions code/go/0chain.net/blobbercore/handler/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@ func getBlobberHealthCheckError() error {
return err
}

func BlobberHealthCheck() (string, error) {
func BlobberHealthCheck() (*transaction.Transaction, error) {
if config.Configuration.Capacity == 0 {

setBlobberHealthCheckError(ErrBlobberHasRemoved)
return "", ErrBlobberHasRemoved
return nil, ErrBlobberHasRemoved
}

txn, err := transaction.NewTransactionEntity()
if err != nil {
setBlobberHealthCheckError(err)
return "", err
return nil, err
}

err = txn.ExecuteSmartContract(transaction.STORAGE_CONTRACT_ADDRESS,
Expand All @@ -49,9 +49,9 @@ func BlobberHealthCheck() (string, error) {
zap.String("err:", err.Error()))
setBlobberHealthCheckError(err)

return "", err
return nil, err
}

setBlobberHealthCheckError(err)
return txn.Hash, nil
return txn, nil
}
60 changes: 30 additions & 30 deletions code/go/0chain.net/blobbercore/handler/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/config"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore"
"github.com/0chain/blobber/code/go/0chain.net/core/chain"
"github.com/0chain/blobber/code/go/0chain.net/core/logging"

"github.com/0chain/blobber/code/go/0chain.net/core/node"
Expand Down Expand Up @@ -87,15 +86,15 @@ func RegisterBlobber(ctx context.Context) error {

b, err := config.ReloadFromChain(ctx, datastore.GetStore().GetDB())
if err != nil || b.BaseURL != node.Self.GetURLBase() { // blobber is not registered yet, baseURL is changed
txnHash, err := sendSmartContractBlobberAdd(ctx)
txn, err := sendSmartContractBlobberAdd(ctx)
if err != nil {
logging.Logger.Error("Error when sending add request to blockchain", zap.Any("err", err))
return err
}

t, err := TransactionVerify(txnHash)
t, err := TransactionVerify(txn)
if err != nil {
logging.Logger.Error("Failed to verify blobber register transaction", zap.Any("err", err), zap.String("txn.Hash", txnHash))
logging.Logger.Error("Failed to verify blobber register transaction", zap.Any("err", err), zap.String("txn.Hash", txn.Hash))
return err
}

Expand All @@ -115,14 +114,14 @@ func RegisterBlobber(ctx context.Context) error {
// UpdateBlobber update blobber
func UpdateBlobber(ctx context.Context) error {

txnHash, err := sendSmartContractBlobberAdd(ctx)
txn, err := sendSmartContractBlobberAdd(ctx)
if err != nil {
return err
}

t, err := TransactionVerify(txnHash)
t, err := TransactionVerify(txn)
if err != nil {
logging.Logger.Error("Failed to verify blobber update transaction", zap.Any("err", err), zap.String("txn.Hash", txnHash))
logging.Logger.Error("Failed to verify blobber update transaction", zap.Any("err", err), zap.String("txn.Hash", txn.Hash))
return err
}

Expand All @@ -132,13 +131,13 @@ func UpdateBlobber(ctx context.Context) error {
}

func RefreshPriceOnChain(ctx context.Context) error {
txnHash, err := sendSmartContractBlobberAdd(ctx)
txn, err := sendSmartContractBlobberAdd(ctx)
if err != nil {
return err
}

if t, err := TransactionVerify(txnHash); err != nil {
logging.Logger.Error("Failed to verify price refresh transaction", zap.Any("err", err), zap.String("txn.Hash", txnHash))
if t, err := TransactionVerify(txn); err != nil {
logging.Logger.Error("Failed to verify price refresh transaction", zap.Any("err", err), zap.String("txn.Hash", txn.Hash))
} else {
logging.Logger.Info("Verified price refresh transaction", zap.String("txn_hash", t.Hash), zap.Any("txn_output", t.TransactionOutput))
}
Expand All @@ -147,39 +146,39 @@ func RefreshPriceOnChain(ctx context.Context) error {
}

// sendSmartContractBlobberAdd Add or update blobber on blockchain
func sendSmartContractBlobberAdd(ctx context.Context) (string, error) {
func sendSmartContractBlobberAdd(ctx context.Context) (*transaction.Transaction, error) {
// initialize storage node (ie blobber)
txn, err := transaction.NewTransactionEntity()
if err != nil {
return "", err
return nil, err
}

sn, err := getStorageNode()
if err != nil {
return "", err
return nil, err
}

err = txn.ExecuteSmartContract(transaction.STORAGE_CONTRACT_ADDRESS,
transaction.ADD_BLOBBER_SC_NAME, sn, 0)
if err != nil {
logging.Logger.Error("Failed to set blobber on the blockchain",
zap.String("err:", err.Error()))
return "", err
return nil, err
}

return txn.Hash, nil
return txn, nil
}

// UpdateBlobberOnChain updates latest changes in blobber's settings, capacity,etc.
func UpdateBlobberOnChain(ctx context.Context) error {

txnHash, err := sendSmartContractBlobberUpdate(ctx)
txn, err := sendSmartContractBlobberUpdate(ctx)
if err != nil {
return err
}

if t, err := TransactionVerify(txnHash); err != nil {
logging.Logger.Error("Failed to verify blobber update transaction", zap.Any("err", err), zap.String("txn.Hash", txnHash))
if t, err := TransactionVerify(txn); err != nil {
logging.Logger.Error("Failed to verify blobber update transaction", zap.Any("err", err), zap.String("txn.Hash", txn.Hash))
} else {
logging.Logger.Info("Verified blobber update transaction", zap.String("txn_hash", t.Hash), zap.Any("txn_output", t.TransactionOutput))
}
Expand All @@ -188,27 +187,27 @@ func UpdateBlobberOnChain(ctx context.Context) error {
}

// sendSmartContractBlobberUpdate update blobber on blockchain
func sendSmartContractBlobberUpdate(ctx context.Context) (string, error) {
func sendSmartContractBlobberUpdate(ctx context.Context) (*transaction.Transaction, error) {
// initialize storage node (ie blobber)
txn, err := transaction.NewTransactionEntity()
if err != nil {
return "", err
return nil, err
}

sn, err := getStorageNode()
if err != nil {
return "", err
return nil, err
}

err = txn.ExecuteSmartContract(transaction.STORAGE_CONTRACT_ADDRESS,
transaction.UPDATE_BLOBBER_SC_NAME, sn, 0)
if err != nil {
logging.Logger.Error("Failed to set blobber on the blockchain",
zap.String("err:", err.Error()))
return "", err
return txn, err
}

return txn.Hash, nil
return txn, nil
}

// ErrBlobberHasRemoved represents service health check error, where the
Expand All @@ -220,15 +219,15 @@ var ErrBlobberHasRemoved = errors.New("blobber has been removed")
// ErrBlobberNotFound it is not registered on chain
var ErrBlobberNotFound = errors.New("blobber is not found")

func TransactionVerify(txnHash string) (t *transaction.Transaction, err error) {
func TransactionVerify(txn *transaction.Transaction) (t *transaction.Transaction, err error) {
for i := 0; i < util.MAX_RETRIES; i++ {
time.Sleep(transaction.SLEEP_FOR_TXN_CONFIRMATION * time.Second)
if t, err = transaction.VerifyTransaction(txnHash, chain.GetServerChain()); err == nil {
if t, err = transaction.VerifyTransactionWithNonce(txn.Hash, txn.GetTransaction().GetTransactionNonce()); err == nil {
return t, nil
}
}

return nil, errors.New("[txn]max retries exceeded with " + txnHash)
return nil, errors.New("[txn]max retries exceeded with " + txn.Hash)
}

func WalletRegister() error {
Expand All @@ -244,11 +243,12 @@ func WalletRegister() error {

// SendHealthCheck send heartbeat to blockchain
func SendHealthCheck() (string, error) {
txnHash, err := BlobberHealthCheck()
txn, err := BlobberHealthCheck()
if err != nil {
return txnHash, err
return "", err
}
_, err = TransactionVerify(txnHash)

return txnHash, err
_, err = TransactionVerify(txn)

return txn.Hash, err
}
3 changes: 1 addition & 2 deletions code/go/0chain.net/blobbercore/readmarker/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"time"

"github.com/0chain/blobber/code/go/0chain.net/core/chain"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
zLogger "github.com/0chain/blobber/code/go/0chain.net/core/logging"
"github.com/0chain/blobber/code/go/0chain.net/core/transaction"
Expand Down Expand Up @@ -58,7 +57,7 @@ func (rme *ReadMarkerEntity) RedeemReadMarker(ctx context.Context) (err error) {
time.Sleep(transaction.SLEEP_FOR_TXN_CONFIRMATION * time.Second)

var logHash = tx.Hash // keep transaction hash for error logs
tx, err = transaction.VerifyTransaction(tx.Hash, chain.GetServerChain())
tx, err = transaction.VerifyTransactionWithNonce(tx.Hash, tx.GetTransaction().GetTransactionNonce())
if err != nil {
zLogger.Logger.Error("Error verifying the read redeem transaction", zap.Error(err), zap.String("txn", logHash))
return common.NewErrorf("redeem_read_marker", "verifying transaction: %v", err)
Expand Down
3 changes: 2 additions & 1 deletion code/go/0chain.net/blobbercore/writemarker/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,14 @@ func (wme *WriteMarkerEntity) RedeemMarker(ctx context.Context) error {
}

time.Sleep(transaction.SLEEP_FOR_TXN_CONFIRMATION * time.Second)
t, err := transaction.VerifyTransaction(txn.Hash, chain.GetServerChain())
t, err := transaction.VerifyTransactionWithNonce(txn.Hash, txn.GetTransaction().GetTransactionNonce())
if err != nil {
Logger.Error("Error verifying the close connection transaction", zap.String("err:", err.Error()), zap.String("txn", txn.Hash))
wme.Status = Failed
wme.StatusMessage = "Error verifying the close connection transaction." + err.Error()
wme.ReedeemRetries++
wme.CloseTxnID = txn.Hash
// TODO Is this single try?
if err := wme.UpdateStatus(ctx, Failed, "Error verifying the close connection transaction."+err.Error(), txn.Hash); err != nil {
Logger.Error("WriteMarkerEntity_UpdateStatus", zap.Error(err))
}
Expand Down
9 changes: 7 additions & 2 deletions code/go/0chain.net/blobbercore/writemarker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func redeemWriteMarker(allocationObj *allocation.Allocation, wm *WriteMarkerEnti
if shouldRollback {
if rollbackErr := db.Rollback().Error; rollbackErr != nil {
logging.Logger.Error("Error rollback on redeeming the write marker.",
zap.Any("allocation", allocationObj.ID),
zap.Any("wm", wm.WM.AllocationID), zap.Error(rollbackErr))
}
}
Expand All @@ -74,6 +75,7 @@ func redeemWriteMarker(allocationObj *allocation.Allocation, wm *WriteMarkerEnti
err := wm.RedeemMarker(ctx)
if err != nil {
logging.Logger.Error("Error redeeming the write marker.",
zap.Any("allocation", allocationObj.ID),
zap.Any("wm", wm.WM.AllocationID), zap.Any("error", err))

shouldRollback = true
Expand All @@ -85,6 +87,7 @@ func redeemWriteMarker(allocationObj *allocation.Allocation, wm *WriteMarkerEnti
wm.WM.AllocationRoot, allocationObj.ID).Error
if err != nil {
logging.Logger.Error("Error redeeming the write marker. Allocation latest wm redeemed update failed",
zap.Any("allocation", allocationObj.ID),
zap.Any("wm", wm.WM.AllocationRoot), zap.Any("error", err))
shouldRollback = true
return err
Expand All @@ -94,13 +97,15 @@ func redeemWriteMarker(allocationObj *allocation.Allocation, wm *WriteMarkerEnti
if err != nil {
logging.Logger.Error("Error committing the writemarker redeem",
zap.Any("allocation", allocationObj.ID),
zap.Error(err))
zap.Any("wm", wm.WM.AllocationRoot), zap.Error(err))
shouldRollback = true
return err
}

allocationObj.LatestRedeemedWM = wm.WM.AllocationRoot
logging.Logger.Info("Success Redeeming the write marker", zap.Any("wm", wm.WM.AllocationRoot), zap.Any("txn", wm.CloseTxnID))
logging.Logger.Info("Success Redeeming the write marker",
zap.Any("allocation", allocationObj.ID),
zap.Any("wm", wm.WM.AllocationRoot), zap.Any("txn", wm.CloseTxnID))

return nil
}
Expand Down
6 changes: 6 additions & 0 deletions code/go/0chain.net/blobbercore/writemarker/writemarker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/allocation"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/config"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
"github.com/remeh/sizedwaitgroup"
"go.uber.org/zap"
)

func redeemWriteMarkers() {
Expand All @@ -14,6 +16,10 @@ func redeemWriteMarkers() {
db.Where(&allocation.Allocation{IsRedeemRequired: true}).
Find(&allocations)
if len(allocations) > 0 {

logging.Logger.Info("Redeem writemarkers for allocations",
zap.Any("numOfAllocations", len(allocations)))

swg := sizedwaitgroup.New(config.Configuration.WMRedeemNumWorkers)
for _, allocationObj := range allocations {
swg.Add()
Expand Down

0 comments on commit 8dc6681

Please sign in to comment.