Skip to content

Commit

Permalink
remove old receives
Browse files Browse the repository at this point in the history
  • Loading branch information
williambanfield committed Nov 1, 2022
1 parent 7bfb529 commit c5e641b
Show file tree
Hide file tree
Showing 12 changed files with 18 additions and 290 deletions.
19 changes: 0 additions & 19 deletions blocksync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"reflect"
"time"

"github.com/cosmos/gogoproto/proto"

"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
bcproto "github.com/tendermint/tendermint/proto/tendermint/blocksync"
Expand Down Expand Up @@ -235,23 +233,6 @@ func (bcR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
}
}

func (bcR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
var msg *bcproto.Message
err := proto.Unmarshal(msgBytes, msg)
if err != nil {
panic(err)
}
uw, err := msg.Unwrap()
if err != nil {
panic(err)
}
bcR.ReceiveEnvelope(p2p.Envelope{
ChannelID: chID,
Src: peer,
Message: uw,
})
}

// Handle messages from the poolReactor telling the reactor what to do.
// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
func (bcR *Reactor) poolRoutine(stateSynced bool) {
Expand Down
3 changes: 0 additions & 3 deletions consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,4 @@ func (br *ByzantineReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
func (br *ByzantineReactor) ReceiveEnvelope(e p2p.Envelope) {
br.reactor.ReceiveEnvelope(e)
}
func (br *ByzantineReactor) Receive(chID byte, p p2p.Peer, m []byte) {
br.reactor.Receive(chID, p, m)
}
func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer }
18 changes: 0 additions & 18 deletions consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"sync"
"time"

"github.com/gogo/protobuf/proto"
cstypes "github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/libs/bits"
tmevents "github.com/tendermint/tendermint/libs/events"
Expand Down Expand Up @@ -388,23 +387,6 @@ func (conR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
}
}

func (conR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
var msg *tmcons.Message
err := proto.Unmarshal(msgBytes, msg)
if err != nil {
panic(err)
}
uw, err := msg.Unwrap()
if err != nil {
panic(err)
}
conR.ReceiveEnvelope(p2p.Envelope{
ChannelID: chID,
Src: peer,
Message: uw,
})
}

// SetEventBus sets event bus.
func (conR *Reactor) SetEventBus(b *types.EventBus) {
conR.eventBus = b
Expand Down
13 changes: 0 additions & 13 deletions evidence/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,19 +92,6 @@ func (evR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
}
}

func (evR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
var msg *tmproto.EvidenceList
err := proto.Unmarshal(msgBytes, msg)
if err != nil {
panic(err)
}
evR.ReceiveEnvelope(p2p.Envelope{
ChannelID: chID,
Src: peer,
Message: msg,
})
}

// SetEventBus implements events.Eventable.
func (evR *Reactor) SetEventBus(b *types.EventBus) {
evR.eventBus = b
Expand Down
19 changes: 0 additions & 19 deletions mempool/v0/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"fmt"
"time"

"github.com/cosmos/gogoproto/proto"

cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/clist"
"github.com/tendermint/tendermint/libs/log"
Expand Down Expand Up @@ -190,23 +188,6 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
// broadcasting happens from go routines per peer
}

func (memR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
var msg *protomem.Message
err := proto.Unmarshal(msgBytes, msg)
if err != nil {
panic(err)
}
uw, err := msg.Unwrap()
if err != nil {
panic(err)
}
memR.ReceiveEnvelope(p2p.Envelope{
ChannelID: chID,
Src: peer,
Message: uw,
})
}

// PeerState describes the state of a peer.
type PeerState interface {
GetHeight() int64
Expand Down
19 changes: 0 additions & 19 deletions mempool/v1/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"fmt"
"time"

"github.com/cosmos/gogoproto/proto"

cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/clist"
"github.com/tendermint/tendermint/libs/log"
Expand Down Expand Up @@ -189,23 +187,6 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
// broadcasting happens from go routines per peer
}

func (memR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
var msg *protomem.Message
err := proto.Unmarshal(msgBytes, msg)
if err != nil {
panic(err)
}
uw, err := msg.Unwrap()
if err != nil {
panic(err)
}
memR.ReceiveEnvelope(p2p.Envelope{
ChannelID: chID,
Src: peer,
Message: uw,
})
}

// PeerState describes the state of a peer.
type PeerState interface {
GetHeight() int64
Expand Down
31 changes: 5 additions & 26 deletions p2p/base_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,28 +38,8 @@ type Reactor interface {
// or other reason).
RemovePeer(peer Peer, reason interface{})

// Receive is called by the switch when msgBytes is received from the peer.
//
// NOTE reactor can not keep msgBytes around after Receive completes without
// copying.
//
// CONTRACT: msgBytes are not nil.
//
// Only one of Receive or ReceiveEnvelope are called per message. If ReceiveEnvelope
// is implemented, it will be used, otherwise the switch will fallback to
// using Receive.
// Deprecated: Reactors looking to receive data from a peer should implement ReceiveEnvelope.
// Receive will be deprecated in favor of ReceiveEnvelope in v0.38.
Receive(chID byte, peer Peer, msgBytes []byte)
}

type ReceiveEnveloper interface {
// ReceiveEnvelope is called by the switch when an envelope is received from any connected
// peer on any of the channels registered by the reactor.
//
// Only one of Receive or ReceiveEnvelope are called per message. If ReceiveEnvelope
// is implemented, it will be used, otherwise the switch will fallback to
// using Receive. Receive will be replaced by ReceiveEnvelope in a future version
ReceiveEnvelope(Envelope)
}

Expand All @@ -80,9 +60,8 @@ func NewBaseReactor(name string, impl Reactor) *BaseReactor {
func (br *BaseReactor) SetSwitch(sw *Switch) {
br.Switch = sw
}
func (*BaseReactor) GetChannels() []*conn.ChannelDescriptor { return nil }
func (*BaseReactor) AddPeer(peer Peer) {}
func (*BaseReactor) RemovePeer(peer Peer, reason interface{}) {}
func (*BaseReactor) ReceiveEnvelope(e Envelope) {}
func (*BaseReactor) Receive(chID byte, peer Peer, msgBytes []byte) {}
func (*BaseReactor) InitPeer(peer Peer) Peer { return peer }
func (*BaseReactor) GetChannels() []*conn.ChannelDescriptor { return nil }
func (*BaseReactor) AddPeer(peer Peer) {}
func (*BaseReactor) RemovePeer(peer Peer, reason interface{}) {}
func (*BaseReactor) ReceiveEnvelope(e Envelope) {}
func (*BaseReactor) InitPeer(peer Peer) Peer { return peer }
95 changes: 13 additions & 82 deletions p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,6 @@ type Peer interface {
SendEnvelope(Envelope) bool
TrySendEnvelope(Envelope) bool

// Deprecated: entities looking to act as peers should implement SendEnvelope instead.
// Send will be removed in v0.38.
Send(byte, []byte) bool

// Deprecated: entities looking to act as peers should implement TrySendEnvelope instead.
// TrySend will be removed in v0.38.
TrySend(byte, []byte) bool

Set(string, interface{})
Get(string) interface{}
}
Expand Down Expand Up @@ -260,64 +252,22 @@ func (p *peer) Status() tmconn.ConnectionStatus {
// SendEnvelope sends the message in the envelope on the channel specified by the
// envelope. Returns false if the connection times out trying to place the message
// onto its internal queue.
// Using SendEnvelope allows for tracking the message bytes sent and received by message type
// as a metric which Send cannot support.
func (p *peer) SendEnvelope(e Envelope) bool {
if !p.IsRunning() {
return false
} else if !p.hasChannel(e.ChannelID) {
return false
}
msg := e.Message
metricLabelValue := p.mlc.ValueToMetricLabel(msg)
if w, ok := msg.(Wrapper); ok {
msg = w.Wrap()
}
msgBytes, err := proto.Marshal(msg)
if err != nil {
p.Logger.Error("marshaling message to send", "error", err)
return false
}
res := p.Send(e.ChannelID, msgBytes)
if res {
p.metrics.MessageSendBytesTotal.With("message_type", metricLabelValue).Add(float64(len(msgBytes)))
}
return res
}

// Send msg bytes to the channel identified by chID byte. Returns false if the
// send queue is full after timeout, specified by MConnection.
// SendEnvelope replaces Send which will be deprecated in a future release.
func (p *peer) Send(chID byte, msgBytes []byte) bool {
if !p.IsRunning() {
return false
} else if !p.hasChannel(chID) {
return false
}
res := p.mconn.Send(chID, msgBytes)
if res {
labels := []string{
"peer_id", string(p.ID()),
"chID", fmt.Sprintf("%#x", chID),
}
p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes)))
}
return res
return p.send(e.ChannelID, e.Message, p.mconn.Send)
}

// TrySendEnvelope attempts to sends the message in the envelope on the channel specified by the
// envelope. Returns false immediately if the connection's internal queue is full
// Using TrySendEnvelope allows for tracking the message bytes sent and received by message type
// as a metric which TrySend cannot support.
func (p *peer) TrySendEnvelope(e Envelope) bool {
return p.send(e.ChannelID, e.Message, p.mconn.TrySend)
}

func (p *peer) send(chID byte, msg proto.Message, sendFunc func(byte, []byte) bool) bool {
if !p.IsRunning() {
// see Switch#Broadcast, where we fetch the list of peers and loop over
// them - while we're looping, one peer may be removed and stopped.
return false
} else if !p.hasChannel(e.ChannelID) {
} else if !p.hasChannel(chID) {
return false
}
msg := e.Message
metricLabelValue := p.mlc.ValueToMetricLabel(msg)
if w, ok := msg.(Wrapper); ok {
msg = w.Wrap()
Expand All @@ -327,29 +277,14 @@ func (p *peer) TrySendEnvelope(e Envelope) bool {
p.Logger.Error("marshaling message to send", "error", err)
return false
}
res := p.TrySend(e.ChannelID, msgBytes)
if res {
p.metrics.MessageSendBytesTotal.With("message_type", metricLabelValue).Add(float64(len(msgBytes)))
}
return res
}

// TrySend msg bytes to the channel identified by chID byte. Immediately returns
// false if the send queue is full.
// TrySendEnvelope replaces TrySend which will be deprecated in a future release.
func (p *peer) TrySend(chID byte, msgBytes []byte) bool {
if !p.IsRunning() {
return false
} else if !p.hasChannel(chID) {
return false
}
res := p.mconn.TrySend(chID, msgBytes)
res := sendFunc(chID, msgBytes)
if res {
labels := []string{
"peer_id", string(p.ID()),
"chID", fmt.Sprintf("%#x", chID),
}
p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes)))
p.metrics.MessageSendBytesTotal.With("message_type", metricLabelValue).Add(float64(len(msgBytes)))
}
return res
}
Expand Down Expand Up @@ -474,15 +409,11 @@ func createMConnection(
}
p.metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes)))
p.metrics.MessageReceiveBytesTotal.With("message_type", p.mlc.ValueToMetricLabel(msg)).Add(float64(len(msgBytes)))
if nr, ok := reactor.(ReceiveEnveloper); ok {
nr.ReceiveEnvelope(Envelope{
ChannelID: chID,
Src: p,
Message: msg,
})
} else {
reactor.Receive(chID, p, msgBytes)
}
reactor.ReceiveEnvelope(Envelope{
ChannelID: chID,
Src: p,
Message: msg,
})
}

onError := func(r interface{}) {
Expand Down
19 changes: 0 additions & 19 deletions p2p/pex/pex_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"sync"
"time"

"github.com/cosmos/gogoproto/proto"

"github.com/tendermint/tendermint/libs/cmap"
tmmath "github.com/tendermint/tendermint/libs/math"
tmrand "github.com/tendermint/tendermint/libs/rand"
Expand Down Expand Up @@ -300,23 +298,6 @@ func (r *Reactor) ReceiveEnvelope(e p2p.Envelope) {
}
}

func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
var msg *tmp2p.Message
err := proto.Unmarshal(msgBytes, msg)
if err != nil {
panic(err)
}
um, err := msg.Unwrap()
if err != nil {
panic(err)
}
r.ReceiveEnvelope(p2p.Envelope{
ChannelID: chID,
Src: peer,
Message: um,
})
}

// enforces a minimum amount of time between requests
func (r *Reactor) receiveRequest(src Peer) error {
id := string(src.ID())
Expand Down

0 comments on commit c5e641b

Please sign in to comment.