From 8fdc4ce5b75cd34904974859fb8e57a2fc4dc145 Mon Sep 17 00:00:00 2001 From: Paul Banks Date: Fri, 21 Apr 2023 13:14:57 +0100 Subject: [PATCH] NetworkTransport make pipelining configurable and default to max 2 in flight. (#541) * NetworkTransport make pipelining configurable and default to max 2 inflight * Add PR link * Fix net transport test * Use constants instead of magic numbers to express the semantics * Doc comment tweaks --- net_transport.go | 93 ++++++++++++++++--- net_transport_test.go | 204 +++++++++++++++++++++++++++++------------- testing.go | 15 ++-- 3 files changed, 232 insertions(+), 80 deletions(-) diff --git a/net_transport.go b/net_transport.go index a780510b..bf78a481 100644 --- a/net_transport.go +++ b/net_transport.go @@ -28,9 +28,11 @@ const ( // DefaultTimeoutScale is the default TimeoutScale in a NetworkTransport. DefaultTimeoutScale = 256 * 1024 // 256KB - // rpcMaxPipeline controls the maximum number of outstanding - // AppendEntries RPC calls. - rpcMaxPipeline = 128 + // DefaultMaxRPCsInFlight is the default value used for pipelining configuration + // if a zero value is passed. See https://github.com/hashicorp/raft/pull/541 + // for rationale. Note, if this is changed we should update the doc comments + // below for NetworkTransportConfig.MaxRPCsInFlight. + DefaultMaxRPCsInFlight = 2 // connReceiveBufferSize is the size of the buffer we will use for reading RPC requests into // on followers @@ -39,6 +41,16 @@ const ( // connSendBufferSize is the size of the buffer we will use for sending RPC request data from // the leader to followers. connSendBufferSize = 256 * 1024 // 256KB + + // minInFlightForPipelining is a property of our current pipelining + // implementation and must not be changed unless we change the invariants of + // that implementation. Roughly speaking even with a zero-length in-flight + // buffer we still allow 2 requests to be in-flight before we block because we + // only block after sending and the receiving go-routine always unblocks the + // chan right after first send. This is a constant just to provide context + // rather than a magic number in a few places we have to check invariants to + // avoid panics etc. + minInFlightForPipelining = 2 ) var ( @@ -76,7 +88,8 @@ type NetworkTransport struct { logger hclog.Logger - maxPool int + maxPool int + maxInFlight int serverAddressProvider ServerAddressProvider @@ -108,6 +121,39 @@ type NetworkTransportConfig struct { // MaxPool controls how many connections we will pool MaxPool int + // MaxRPCsInFlight controls the pipelining "optimization" when replicating + // entries to followers. + // + // Setting this to 1 explicitly disables pipelining since no overlapping of + // request processing is allowed. If set to 1 the pipelining code path is + // skipped entirely and every request is entirely synchronous. + // + // If zero is set (or left as default), DefaultMaxRPCsInFlight is used which + // is currently 2. A value of 2 overlaps the preparation and sending of the + // next request while waiting for the previous response, but avoids additional + // queuing. + // + // Historically this was internally fixed at (effectively) 130 however + // performance testing has shown that in practice the pipelining optimization + // combines badly with batching and actually has a very large negative impact + // on commit latency when throughput is high, whilst having very little + // benefit on latency or throughput in any other case! See + // [#541](https://github.com/hashicorp/raft/pull/541) for more analysis of the + // performance impacts. + // + // Increasing this beyond 2 is likely to be beneficial only in very + // high-latency network conditions. HashiCorp doesn't recommend using our own + // products this way. + // + // To maintain the behavior from before version 1.4.1 exactly, set this to + // 130. The old internal constant was 128 but was used directly as a channel + // buffer size. Since we send before blocking on the channel and unblock the + // channel as soon as the receiver is done with the earliest outstanding + // request, even an unbuffered channel (buffer=0) allows one request to be + // sent while waiting for the previous one (i.e. 2 inflight). so the old + // buffer actually allowed 130 RPCs to be inflight at once. + MaxRPCsInFlight int + // Timeout is used to apply I/O deadlines. For InstallSnapshot, we multiply // the timeout by (SnapshotSize / TimeoutScale). Timeout time.Duration @@ -162,11 +208,17 @@ func NewNetworkTransportWithConfig( Level: hclog.DefaultLevel, }) } + maxInFlight := config.MaxRPCsInFlight + if maxInFlight == 0 { + // Default zero value + maxInFlight = DefaultMaxRPCsInFlight + } trans := &NetworkTransport{ connPool: make(map[ServerAddress][]*netConn), consumeCh: make(chan RPC), logger: config.Logger, maxPool: config.MaxPool, + maxInFlight: maxInFlight, shutdownCh: make(chan struct{}), stream: config.Stream, timeout: config.Timeout, @@ -379,6 +431,12 @@ func (n *NetworkTransport) returnConn(conn *netConn) { // AppendEntriesPipeline returns an interface that can be used to pipeline // AppendEntries requests. func (n *NetworkTransport) AppendEntriesPipeline(id ServerID, target ServerAddress) (AppendPipeline, error) { + if n.maxInFlight < minInFlightForPipelining { + // Pipelining is disabled since no more than one request can be outstanding + // at once. Skip the whole code path and use synchronous requests. + return nil, ErrPipelineReplicationNotSupported + } + // Get a connection conn, err := n.getConnFromAddressProvider(id, target) if err != nil { @@ -386,7 +444,7 @@ func (n *NetworkTransport) AppendEntriesPipeline(id ServerID, target ServerAddre } // Create the pipeline - return newNetPipeline(n, conn), nil + return newNetPipeline(n, conn, n.maxInFlight), nil } // AppendEntries implements the Transport interface. @@ -720,14 +778,25 @@ func sendRPC(conn *netConn, rpcType uint8, args interface{}) error { return nil } -// newNetPipeline is used to construct a netPipeline from a given -// transport and connection. -func newNetPipeline(trans *NetworkTransport, conn *netConn) *netPipeline { +// newNetPipeline is used to construct a netPipeline from a given transport and +// connection. It is a bug to ever call this with maxInFlight less than 2 +// (minInFlightForPipelining) and will cause a panic. +func newNetPipeline(trans *NetworkTransport, conn *netConn, maxInFlight int) *netPipeline { + if maxInFlight < minInFlightForPipelining { + // Shouldn't happen (tm) since we validate this in the one call site and + // skip pipelining if it's lower. + panic("pipelining makes no sense if maxInFlight < 2") + } n := &netPipeline{ - conn: conn, - trans: trans, - doneCh: make(chan AppendFuture, rpcMaxPipeline), - inprogressCh: make(chan *appendFuture, rpcMaxPipeline), + conn: conn, + trans: trans, + // The buffer size is 2 less than the configured max because we send before + // waiting on the channel and the decode routine unblocks the channel as + // soon as it's waiting on the first request. So a zero-buffered channel + // still allows 1 request to be sent even while decode is still waiting for + // a response from the previous one. i.e. two are inflight at the same time. + inprogressCh: make(chan *appendFuture, maxInFlight-2), + doneCh: make(chan AppendFuture, maxInFlight-2), shutdownCh: make(chan struct{}), } go n.decodeResponses() diff --git a/net_transport_test.go b/net_transport_test.go index 28aa6c75..1b81bdb1 100644 --- a/net_transport_test.go +++ b/net_transport_test.go @@ -5,6 +5,7 @@ package raft import ( "bytes" + "context" "fmt" "net" "reflect" @@ -199,6 +200,31 @@ func TestNetworkTransport_Heartbeat_FastPath(t *testing.T) { } } +func makeAppendRPC() AppendEntriesRequest { + return AppendEntriesRequest{ + Term: 10, + PrevLogEntry: 100, + PrevLogTerm: 4, + Entries: []*Log{ + { + Index: 101, + Term: 4, + Type: LogNoop, + }, + }, + LeaderCommitIndex: 90, + RPCHeader: RPCHeader{Addr: []byte("cartman")}, + } +} + +func makeAppendRPCResponse() AppendEntriesResponse { + return AppendEntriesResponse{ + Term: 4, + LastLog: 90, + Success: true, + } +} + func TestNetworkTransport_AppendEntries(t *testing.T) { for _, useAddrProvider := range []bool{true, false} { @@ -211,26 +237,8 @@ func TestNetworkTransport_AppendEntries(t *testing.T) { rpcCh := trans1.Consumer() // Make the RPC request - args := AppendEntriesRequest{ - Term: 10, - PrevLogEntry: 100, - PrevLogTerm: 4, - Entries: []*Log{ - { - Index: 101, - Term: 4, - Type: LogNoop, - }, - }, - LeaderCommitIndex: 90, - RPCHeader: RPCHeader{Addr: []byte("cartman")}, - } - - resp := AppendEntriesResponse{ - Term: 4, - LastLog: 90, - Success: true, - } + args := makeAppendRPC() + resp := makeAppendRPCResponse() // Listen for a request go func() { @@ -282,26 +290,8 @@ func TestNetworkTransport_AppendEntriesPipeline(t *testing.T) { rpcCh := trans1.Consumer() // Make the RPC request - args := AppendEntriesRequest{ - Term: 10, - PrevLogEntry: 100, - PrevLogTerm: 4, - Entries: []*Log{ - { - Index: 101, - Term: 4, - Type: LogNoop, - }, - }, - LeaderCommitIndex: 90, - RPCHeader: RPCHeader{Addr: []byte("cartman")}, - } - - resp := AppendEntriesResponse{ - Term: 4, - LastLog: 90, - Success: true, - } + args := makeAppendRPC() + resp := makeAppendRPCResponse() // Listen for a request go func() { @@ -368,26 +358,8 @@ func TestNetworkTransport_AppendEntriesPipeline_CloseStreams(t *testing.T) { rpcCh := trans1.Consumer() // Make the RPC request - args := AppendEntriesRequest{ - Term: 10, - PrevLogEntry: 100, - PrevLogTerm: 4, - Entries: []*Log{ - { - Index: 101, - Term: 4, - Type: LogNoop, - }, - }, - LeaderCommitIndex: 90, - RPCHeader: RPCHeader{Addr: []byte("cartman")}, - } - - resp := AppendEntriesResponse{ - Term: 4, - LastLog: 90, - Success: true, - } + args := makeAppendRPC() + resp := makeAppendRPCResponse() shutdownCh := make(chan struct{}) defer close(shutdownCh) @@ -467,6 +439,105 @@ func TestNetworkTransport_AppendEntriesPipeline_CloseStreams(t *testing.T) { } } +func TestNetworkTransport_AppendEntriesPipeline_MaxRPCsInFlight(t *testing.T) { + // Test the important cases 0 (default to 2), 1 (disabled), 2 and "some" + for _, max := range []int{0, 1, 2, 10} { + t.Run(fmt.Sprintf("max=%d", max), func(t *testing.T) { + config := &NetworkTransportConfig{ + MaxPool: 2, + MaxRPCsInFlight: max, + Timeout: time.Second, + // Don't use test logger as the transport has multiple goroutines and + // causes panics. + ServerAddressProvider: &testAddrProvider{"localhost:0"}, + } + + // Transport 1 is consumer + trans1, err := NewTCPTransportWithConfig("localhost:0", nil, config) + require.NoError(t, err) + defer trans1.Close() + + // Make the RPC request + args := makeAppendRPC() + resp := makeAppendRPCResponse() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Transport 2 makes outbound request + config.ServerAddressProvider = &testAddrProvider{string(trans1.LocalAddr())} + trans2, err := NewTCPTransportWithConfig("localhost:0", nil, config) + require.NoError(t, err) + defer trans2.Close() + + // Kill the transports on the timeout to unblock. That means things that + // shouldn't have blocked did block. + go func() { + <-ctx.Done() + trans2.Close() + trans1.Close() + }() + + // Attempt to pipeline + pipeline, err := trans2.AppendEntriesPipeline("id1", trans1.LocalAddr()) + if max == 1 { + // Max == 1 implies no pipelining + require.EqualError(t, err, ErrPipelineReplicationNotSupported.Error()) + return + } + require.NoError(t, err) + + expectedMax := max + if max == 0 { + // Should have defaulted to 2 + expectedMax = 2 + } + + for i := 0; i < expectedMax-1; i++ { + // We should be able to send `max - 1` rpcs before `AppendEntries` + // blocks. It blocks on the `max` one because it it sends before pushing + // to the chan. It will block forever when it does because nothing is + // responding yet. + out := new(AppendEntriesResponse) + _, err := pipeline.AppendEntries(&args, out) + require.NoError(t, err) + } + + // Verify the next send blocks without blocking test forever + errCh := make(chan error, 1) + go func() { + out := new(AppendEntriesResponse) + _, err := pipeline.AppendEntries(&args, out) + errCh <- err + }() + + select { + case err := <-errCh: + require.NoError(t, err) + t.Fatalf("AppendEntries didn't block with %d in flight", max) + case <-time.After(50 * time.Millisecond): + // OK it's probably blocked or we got _really_ unlucky with scheduling! + } + + // Verify that once we receive/respond another one can be sent. + rpc := <-trans1.Consumer() + rpc.Respond(resp, nil) + + // We also need to consume the response from the pipeline in case chan is + // unbuffered (inflight is 2 or 1) + <-pipeline.Consumer() + + // The last append should unblock once the response is received. + select { + case <-errCh: + // OK + case <-time.After(50 * time.Millisecond): + t.Fatalf("last append didn't unblock") + } + }) + } +} + func TestNetworkTransport_RequestVote(t *testing.T) { for _, useAddrProvider := range []bool{true, false} { @@ -741,11 +812,18 @@ func TestNetworkTransport_PooledConn(t *testing.T) { } func makeTransport(t *testing.T, useAddrProvider bool, addressOverride string) (*NetworkTransport, error) { + config := &NetworkTransportConfig{ + MaxPool: 2, + // Setting this because older tests for pipelining were written when this + // was a constant and block forever if it's not large enough. + MaxRPCsInFlight: 130, + Timeout: time.Second, + Logger: newTestLogger(t), + } if useAddrProvider { - config := &NetworkTransportConfig{MaxPool: 2, Timeout: time.Second, Logger: newTestLogger(t), ServerAddressProvider: &testAddrProvider{addressOverride}} - return NewTCPTransportWithConfig("localhost:0", nil, config) + config.ServerAddressProvider = &testAddrProvider{addressOverride} } - return NewTCPTransportWithLogger("localhost:0", nil, 2, time.Second, newTestLogger(t)) + return NewTCPTransportWithConfig("localhost:0", nil, config) } type testCountingWriter struct { diff --git a/testing.go b/testing.go index 96961b60..fd770d42 100644 --- a/testing.go +++ b/testing.go @@ -199,13 +199,18 @@ func newTestLogger(t *testing.T) hclog.Logger { return newTestLoggerWithPrefix(t, "") } -// newTestLoggerWithPrefix returns a Logger that can be used in tests. prefix will -// be added as the name of the logger. +// newTestLoggerWithPrefix returns a Logger that can be used in tests. prefix +// will be added as the name of the logger. // // If tests are run with -v (verbose mode, or -json which implies verbose) the -// log output will go to stderr directly. -// If tests are run in regular "quiet" mode, logs will be sent to t.Log so that -// the logs only appear when a test fails. +// log output will go to stderr directly. If tests are run in regular "quiet" +// mode, logs will be sent to t.Log so that the logs only appear when a test +// fails. +// +// Be careful where this is used though - calling t.Log after the test completes +// causes a panic. This is common if you use it for a NetworkTransport for +// example and then close the transport at the end of the test because an error +// is logged after the test is complete. func newTestLoggerWithPrefix(t *testing.T, prefix string) hclog.Logger { if testing.Verbose() { return hclog.New(&hclog.LoggerOptions{Name: prefix})