Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p: add a per-message type send and receive metric (backport #9622) #9640

Open
wants to merge 31 commits into
base: v0.37.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
97a6c38
p2p: add a per-message type send and receive metric (#9622)
williambanfield Oct 27, 2022
79a7d6d
all builds except fuzz
williambanfield Oct 28, 2022
89482f8
all builds except fuzz
williambanfield Oct 26, 2022
12bf141
New methods call the old methods
williambanfield Oct 26, 2022
cef631d
Receive called from OnReceive
williambanfield Oct 26, 2022
0b2202e
tests build after reintroducing send and receive
williambanfield Oct 26, 2022
af054f1
fix fuzz peer
williambanfield Oct 28, 2022
e744325
nolint gosec
williambanfield Oct 28, 2022
6986c21
mocks use the 'New' calls
williambanfield Oct 28, 2022
e07e7c9
move nolint
williambanfield Oct 28, 2022
ed6ef40
NewReceive called conditionally from switch
williambanfield Oct 28, 2022
9aaacd7
add receives to all with NewReceive
williambanfield Oct 28, 2022
3e111ac
all reactors have receive with empty new receive call
williambanfield Oct 28, 2022
39b2e4b
reactors have channel and peer in new receive
williambanfield Oct 28, 2022
1d20f37
add proto message unmarshal to reactors
williambanfield Oct 28, 2022
bc62cfe
unwrap messages in receive calls
williambanfield Oct 28, 2022
cc9e8f4
add channel types where absent
williambanfield Oct 28, 2022
10dca9f
fix mempool reactor to include Receive
williambanfield Oct 28, 2022
bbb5051
unwrap messages in receive calls
williambanfield Oct 28, 2022
ad8aa2b
remove erroneously added blockchain reactors
williambanfield Oct 31, 2022
18adc25
Merge remote-tracking branch 'origin/v0.37.x' into mergify/bp/v0.37.x…
williambanfield Oct 31, 2022
bdb9fcc
remove erroneously added maverick reactor
williambanfield Oct 31, 2022
deaf1a9
re-add broadcast method
williambanfield Oct 31, 2022
4b96c40
Rename new methods to *Envelope instead of New*
williambanfield Oct 31, 2022
ad14bf1
add deprecation notices
williambanfield Oct 31, 2022
3302345
fix typos
williambanfield Oct 31, 2022
3ce5e29
document new metrics
williambanfield Oct 31, 2022
59f56b3
format after sed changes
williambanfield Oct 31, 2022
9be4f46
remove erroneous broadcast method name update
williambanfield Oct 31, 2022
648393b
remove old receives
williambanfield Nov 1, 2022
93c8c1f
comment out old print statement
williambanfield Oct 31, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
52 changes: 0 additions & 52 deletions blocksync/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,58 +19,6 @@ const (
BlockResponseMessageFieldKeySize
)

// EncodeMsg encodes a Protobuf message
func EncodeMsg(pb proto.Message) ([]byte, error) {
msg := bcproto.Message{}

switch pb := pb.(type) {
case *bcproto.BlockRequest:
msg.Sum = &bcproto.Message_BlockRequest{BlockRequest: pb}
case *bcproto.BlockResponse:
msg.Sum = &bcproto.Message_BlockResponse{BlockResponse: pb}
case *bcproto.NoBlockResponse:
msg.Sum = &bcproto.Message_NoBlockResponse{NoBlockResponse: pb}
case *bcproto.StatusRequest:
msg.Sum = &bcproto.Message_StatusRequest{StatusRequest: pb}
case *bcproto.StatusResponse:
msg.Sum = &bcproto.Message_StatusResponse{StatusResponse: pb}
default:
return nil, fmt.Errorf("unknown message type %T", pb)
}

bz, err := proto.Marshal(&msg)
if err != nil {
return nil, fmt.Errorf("unable to marshal %T: %w", pb, err)
}

return bz, nil
}

// DecodeMsg decodes a Protobuf message.
func DecodeMsg(bz []byte) (proto.Message, error) {
pb := &bcproto.Message{}

err := proto.Unmarshal(bz, pb)
if err != nil {
return nil, err
}

switch msg := pb.Sum.(type) {
case *bcproto.Message_BlockRequest:
return msg.BlockRequest, nil
case *bcproto.Message_BlockResponse:
return msg.BlockResponse, nil
case *bcproto.Message_NoBlockResponse:
return msg.NoBlockResponse, nil
case *bcproto.Message_StatusRequest:
return msg.StatusRequest, nil
case *bcproto.Message_StatusResponse:
return msg.StatusResponse, nil
default:
return nil, fmt.Errorf("unknown message type %T", msg)
}
}

// ValidateMsg validates a message.
func ValidateMsg(pb proto.Message) error {
if pb == nil {
Expand Down
109 changes: 42 additions & 67 deletions blocksync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,21 +143,20 @@ func (bcR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
SendQueueCapacity: 1000,
RecvBufferCapacity: 50 * 4096,
RecvMessageCapacity: MaxMsgSize,
MessageType: &bcproto.Message{},
},
}
}

// AddPeer implements Reactor by sending our state to peer.
func (bcR *Reactor) AddPeer(peer p2p.Peer) {
msgBytes, err := EncodeMsg(&bcproto.StatusResponse{
Base: bcR.store.Base(),
Height: bcR.store.Height()})
if err != nil {
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
return
}

peer.Send(BlocksyncChannel, msgBytes)
peer.SendEnvelope(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.StatusResponse{
Base: bcR.store.Base(),
Height: bcR.store.Height(),
},
})
// it's OK if send fails. will try later in poolRoutine

// peer is added to the pool once we receive the first
Expand All @@ -182,69 +181,53 @@ func (bcR *Reactor) respondToPeer(msg *bcproto.BlockRequest,
return false
}

msgBytes, err := EncodeMsg(&bcproto.BlockResponse{Block: bl})
if err != nil {
bcR.Logger.Error("could not marshal msg", "err", err)
return false
}

return src.TrySend(BlocksyncChannel, msgBytes)
return src.TrySendEnvelope(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.BlockResponse{Block: bl},
})
}

bcR.Logger.Info("Peer asking for a block we don't have", "src", src, "height", msg.Height)

msgBytes, err := EncodeMsg(&bcproto.NoBlockResponse{Height: msg.Height})
if err != nil {
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
return false
}

return src.TrySend(BlocksyncChannel, msgBytes)
return src.TrySendEnvelope(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.NoBlockResponse{Height: msg.Height},
})
}

// Receive implements Reactor by handling 4 types of messages (look below).
func (bcR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
msg, err := DecodeMsg(msgBytes)
if err != nil {
bcR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
bcR.Switch.StopPeerForError(src, err)
func (bcR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
if err := ValidateMsg(e.Message); err != nil {
bcR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", e.Message, "err", err)
bcR.Switch.StopPeerForError(e.Src, err)
return
}

if err = ValidateMsg(msg); err != nil {
bcR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err)
bcR.Switch.StopPeerForError(src, err)
return
}
bcR.Logger.Debug("Receive", "e.Src", e.Src, "chID", e.ChannelID, "msg", e.Message)

bcR.Logger.Debug("Receive", "src", src, "chID", chID, "msg", msg)

switch msg := msg.(type) {
switch msg := e.Message.(type) {
case *bcproto.BlockRequest:
bcR.respondToPeer(msg, src)
bcR.respondToPeer(msg, e.Src)
case *bcproto.BlockResponse:
bi, err := types.BlockFromProto(msg.Block)
if err != nil {
bcR.Logger.Error("Block content is invalid", "err", err)
return
}
bcR.pool.AddBlock(src.ID(), bi, len(msgBytes))
bcR.pool.AddBlock(e.Src.ID(), bi, msg.Block.Size())
case *bcproto.StatusRequest:
// Send peer our state.
msgBytes, err := EncodeMsg(&bcproto.StatusResponse{
Height: bcR.store.Height(),
Base: bcR.store.Base(),
e.Src.TrySendEnvelope(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.StatusResponse{
Height: bcR.store.Height(),
Base: bcR.store.Base(),
},
})
if err != nil {
bcR.Logger.Error("could not convert msg to protobut", "err", err)
return
}
src.TrySend(BlocksyncChannel, msgBytes)
case *bcproto.StatusResponse:
// Got a peer status. Unverified.
bcR.pool.SetPeerRange(src.ID(), msg.Base, msg.Height)
bcR.pool.SetPeerRange(e.Src.ID(), msg.Base, msg.Height)
case *bcproto.NoBlockResponse:
bcR.Logger.Debug("Peer does not have requested block", "peer", src, "height", msg.Height)
bcR.Logger.Debug("Peer does not have requested block", "peer", e.Src, "height", msg.Height)
default:
bcR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
Expand Down Expand Up @@ -285,13 +268,10 @@ func (bcR *Reactor) poolRoutine(stateSynced bool) {
if peer == nil {
continue
}
msgBytes, err := EncodeMsg(&bcproto.BlockRequest{Height: request.Height})
if err != nil {
bcR.Logger.Error("could not convert msg to proto", "err", err)
continue
}

queued := peer.TrySend(BlocksyncChannel, msgBytes)
queued := peer.TrySendEnvelope(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.BlockRequest{Height: request.Height},
})
if !queued {
bcR.Logger.Debug("Send queue is full, drop block request", "peer", peer.ID(), "height", request.Height)
}
Expand All @@ -303,7 +283,7 @@ func (bcR *Reactor) poolRoutine(stateSynced bool) {

case <-statusUpdateTicker.C:
// ask for status updates
go bcR.BroadcastStatusRequest() //nolint: errcheck
go bcR.BroadcastStatusRequest()

}
}
Expand Down Expand Up @@ -429,14 +409,9 @@ FOR_LOOP:
}

// BroadcastStatusRequest broadcasts `BlockStore` base and height.
func (bcR *Reactor) BroadcastStatusRequest() error {
bm, err := EncodeMsg(&bcproto.StatusRequest{})
if err != nil {
bcR.Logger.Error("could not convert msg to proto", "err", err)
return fmt.Errorf("could not convert msg to proto: %w", err)
}

bcR.Switch.Broadcast(BlocksyncChannel, bm)

return nil
func (bcR *Reactor) BroadcastStatusRequest() {
bcR.Switch.BroadcastEnvelope(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.StatusRequest{},
})
}
48 changes: 34 additions & 14 deletions consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
mempoolv0 "github.com/tendermint/tendermint/mempool/v0"
mempoolv1 "github.com/tendermint/tendermint/mempool/v1"
"github.com/tendermint/tendermint/p2p"
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/store"
Expand Down Expand Up @@ -165,10 +166,16 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
for i, peer := range peerList {
if i < len(peerList)/2 {
bcs.Logger.Info("Signed and pushed vote", "vote", prevote1, "peer", peer)
peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote1}))
peer.SendEnvelope(p2p.Envelope{
Message: &tmcons.Vote{Vote: prevote1.ToProto()},
ChannelID: VoteChannel,
})
} else {
bcs.Logger.Info("Signed and pushed vote", "vote", prevote2, "peer", peer)
peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote2}))
peer.SendEnvelope(p2p.Envelope{
Message: &tmcons.Vote{Vote: prevote2.ToProto()},
ChannelID: VoteChannel,
})
}
}
} else {
Expand Down Expand Up @@ -520,28 +527,41 @@ func sendProposalAndParts(
parts *types.PartSet,
) {
// proposal
msg := &ProposalMessage{Proposal: proposal}
peer.Send(DataChannel, MustEncode(msg))
peer.SendEnvelope(p2p.Envelope{
ChannelID: DataChannel,
Message: &tmcons.Proposal{Proposal: *proposal.ToProto()},
})

// parts
for i := 0; i < int(parts.Total()); i++ {
part := parts.GetPart(i)
msg := &BlockPartMessage{
Height: height, // This tells peer that this part applies to us.
Round: round, // This tells peer that this part applies to us.
Part: part,
pp, err := part.ToProto()
if err != nil {
panic(err) // TODO: wbanfield better error handling
}
peer.Send(DataChannel, MustEncode(msg))
peer.SendEnvelope(p2p.Envelope{
ChannelID: DataChannel,
Message: &tmcons.BlockPart{
Height: height, // This tells peer that this part applies to us.
Round: round, // This tells peer that this part applies to us.
Part: *pp,
},
})
}

// votes
cs.mtx.Lock()
prevote, _ := cs.signVote(tmproto.PrevoteType, blockHash, parts.Header())
precommit, _ := cs.signVote(tmproto.PrecommitType, blockHash, parts.Header())
cs.mtx.Unlock()

peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote}))
peer.Send(VoteChannel, MustEncode(&VoteMessage{precommit}))
peer.SendEnvelope(p2p.Envelope{
ChannelID: VoteChannel,
Message: &tmcons.Vote{Vote: prevote.ToProto()},
})
peer.SendEnvelope(p2p.Envelope{
ChannelID: VoteChannel,
Message: &tmcons.Vote{Vote: precommit.ToProto()},
})
}

//----------------------------------------
Expand Down Expand Up @@ -579,7 +599,7 @@ func (br *ByzantineReactor) AddPeer(peer p2p.Peer) {
func (br *ByzantineReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
br.reactor.RemovePeer(peer, reason)
}
func (br *ByzantineReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
br.reactor.Receive(chID, peer, msgBytes)
func (br *ByzantineReactor) ReceiveEnvelope(e p2p.Envelope) {
br.reactor.ReceiveEnvelope(e)
}
func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer }
6 changes: 5 additions & 1 deletion consensus/invalid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/tendermint/tendermint/libs/log"
tmrand "github.com/tendermint/tendermint/libs/rand"
"github.com/tendermint/tendermint/p2p"
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
"github.com/tendermint/tendermint/types"
)
Expand Down Expand Up @@ -94,7 +95,10 @@ func invalidDoPrevoteFunc(t *testing.T, height int64, round int32, cs *State, sw
peers := sw.Peers().List()
for _, peer := range peers {
cs.Logger.Info("Sending bad vote", "block", blockHash, "peer", peer)
peer.Send(VoteChannel, MustEncode(&VoteMessage{precommit}))
peer.SendEnvelope(p2p.Envelope{
Message: &tmcons.Vote{Vote: precommit.ToProto()},
ChannelID: VoteChannel,
})
}
}()
}