From 3c7fa86dcaa085e5d13661f5e129d11abc2c63b8 Mon Sep 17 00:00:00 2001 From: Yacov Manevich Date: Wed, 2 Jun 2021 23:57:24 +0300 Subject: [PATCH] [FAB-18484] Return transaction forwarding result back to the client synchronously This commit makes a Raft follower wait for the transaction forwarded to the leader to be sent into the gRPC stream, and returns the result (success or failure) back to the client accordingly. Before this commmit, the behavior was that it returns success after enqueueing it into the message queue, which might have resulted in the transaction being dropped but a success being returned to the client. Change-Id: I0cd45540be4988845663eb0c68f76fed2ff25b94 Signed-off-by: Yacov Manevich --- orderer/common/cluster/comm.go | 52 +++++++++----- orderer/common/cluster/rpc.go | 41 +++++++++-- orderer/common/cluster/rpc_test.go | 76 ++++++++++++++++++-- orderer/consensus/etcdraft/chain.go | 40 +++++++++-- orderer/consensus/etcdraft/chain_test.go | 39 +++++++++- orderer/consensus/etcdraft/consenter.go | 1 + orderer/consensus/etcdraft/mocks/mock_rpc.go | 18 ++--- 7 files changed, 222 insertions(+), 45 deletions(-) diff --git a/orderer/common/cluster/comm.go b/orderer/common/cluster/comm.go index 551c74233e2..82d5b2a866b 100644 --- a/orderer/common/cluster/comm.go +++ b/orderer/common/cluster/comm.go @@ -461,8 +461,11 @@ type RemoteContext struct { // Stream is used to send/receive messages to/from the remote cluster member. type Stream struct { - abortChan <-chan struct{} - sendBuff chan *orderer.StepRequest + abortChan <-chan struct{} + sendBuff chan struct { + request *orderer.StepRequest + report func(error) + } commShutdown chan struct{} abortReason *atomic.Value metrics *Metrics @@ -488,6 +491,11 @@ func (stream *Stream) Canceled() bool { // Send sends the given request to the remote cluster member. func (stream *Stream) Send(request *orderer.StepRequest) error { + return stream.SendWithReport(request, func(_ error) {}) +} + +// SendWithReport sends the given request to the remote cluster member and invokes report on the send result. +func (stream *Stream) SendWithReport(request *orderer.StepRequest, report func(error)) error { if stream.Canceled() { return errors.New(stream.abortReason.Load().(string)) } @@ -498,12 +506,12 @@ func (stream *Stream) Send(request *orderer.StepRequest) error { allowDrop = true } - return stream.sendOrDrop(request, allowDrop) + return stream.sendOrDrop(request, allowDrop, report) } // sendOrDrop sends the given request to the remote cluster member, or drops it // if it is a consensus request and the queue is full. -func (stream *Stream) sendOrDrop(request *orderer.StepRequest, allowDrop bool) error { +func (stream *Stream) sendOrDrop(request *orderer.StepRequest, allowDrop bool, report func(error)) error { msgType := "transaction" if allowDrop { msgType = "consensus" @@ -520,8 +528,10 @@ func (stream *Stream) sendOrDrop(request *orderer.StepRequest, allowDrop bool) e select { case <-stream.abortChan: return errors.Errorf("stream %d aborted", stream.ID) - case stream.sendBuff <- request: - // Note - async send, errors are not returned back + case stream.sendBuff <- struct { + request *orderer.StepRequest + report func(error) + }{request: request, report: report}: return nil case <-stream.commShutdown: return nil @@ -529,15 +539,14 @@ func (stream *Stream) sendOrDrop(request *orderer.StepRequest, allowDrop bool) e } // sendMessage sends the request down the stream -// Note - any errors are swallowed and not returned back - TODO Is this intentional? Shouldn't SubmitRequest errors get returned to client? -func (stream *Stream) sendMessage(request *orderer.StepRequest) { +func (stream *Stream) sendMessage(request *orderer.StepRequest, report func(error)) { start := time.Now() var err error defer func() { message := fmt.Sprintf("Send of %s to %s(%s) took %v", requestAsString(request), stream.NodeName, stream.Endpoint, time.Since(start)) if err != nil { - stream.Logger.Errorf("%s but failed due to %s", message, err.Error()) + stream.Logger.Warnf("%s but failed due to %s", message, err.Error()) } else { stream.Logger.Debug(message) } @@ -551,7 +560,7 @@ func (stream *Stream) sendMessage(request *orderer.StepRequest) { return nil, err } - _, err = stream.operateWithTimeout(f) + _, err = stream.operateWithTimeout(f, report) } func (stream *Stream) serviceStream() { @@ -564,8 +573,8 @@ func (stream *Stream) serviceStream() { for { select { - case msg := <-stream.sendBuff: - stream.sendMessage(msg) + case reqReport := <-stream.sendBuff: + stream.sendMessage(reqReport.request, reqReport.report) case <-stream.abortChan: return case <-stream.commShutdown: @@ -588,11 +597,11 @@ func (stream *Stream) Recv() (*orderer.StepResponse, error) { return stream.Cluster_StepClient.Recv() } - return stream.operateWithTimeout(f) + return stream.operateWithTimeout(f, func(_ error) {}) } // operateWithTimeout performs the given operation on the stream, and blocks until the timeout expires. -func (stream *Stream) operateWithTimeout(invoke StreamOperation) (*orderer.StepResponse, error) { +func (stream *Stream) operateWithTimeout(invoke StreamOperation, report func(error)) (*orderer.StepResponse, error) { timer := time.NewTimer(stream.Timeout) defer timer.Stop() @@ -615,11 +624,13 @@ func (stream *Stream) operateWithTimeout(invoke StreamOperation) (*orderer.StepR select { case r := <-responseChan: + report(r.err) if r.err != nil { stream.Cancel(r.err) } return r.res, r.err case <-timer.C: + report(errTimeout) stream.Logger.Warningf("Stream %d to %s(%s) was forcibly terminated because timeout (%v) expired", stream.ID, stream.NodeName, stream.Endpoint, stream.Timeout) stream.Cancel(errTimeout) @@ -685,11 +696,14 @@ func (rc *RemoteContext) NewStream(timeout time.Duration) (*Stream, error) { stepLogger := logger.WithOptions(zap.AddCallerSkip(1)) s := &Stream{ - Channel: rc.Channel, - metrics: rc.Metrics, - abortReason: abortReason, - abortChan: abortChan, - sendBuff: make(chan *orderer.StepRequest, rc.SendBuffSize), + Channel: rc.Channel, + metrics: rc.Metrics, + abortReason: abortReason, + abortChan: abortChan, + sendBuff: make(chan struct { + request *orderer.StepRequest + report func(error) + }, rc.SendBuffSize), commShutdown: rc.shutdownSignal, NodeName: nodeName, Logger: stepLogger, diff --git a/orderer/common/cluster/rpc.go b/orderer/common/cluster/rpc.go index 6504c8fedae..8aff4791d3b 100644 --- a/orderer/common/cluster/rpc.go +++ b/orderer/common/cluster/rpc.go @@ -8,6 +8,7 @@ package cluster import ( "context" + "io" "sync" "time" @@ -64,6 +65,14 @@ const ( SubmitOperation ) +func (ot OperationType) String() string { + if ot == SubmitOperation { + return "transaction" + } + + return "consensus" +} + // SendConsensus passes the given ConsensusRequest message to the raft.Node instance. func (s *RPC) SendConsensus(destination uint64, msg *orderer.ConsensusRequest) error { if s.Logger.IsEnabledFor(zapcore.DebugLevel) { @@ -86,14 +95,14 @@ func (s *RPC) SendConsensus(destination uint64, msg *orderer.ConsensusRequest) e err = stream.Send(req) if err != nil { - s.unMapStream(destination, ConsensusOperation) + s.unMapStream(destination, ConsensusOperation, stream.ID) } return err } // SendSubmit sends a SubmitRequest to the given destination node. -func (s *RPC) SendSubmit(destination uint64, request *orderer.SubmitRequest) error { +func (s *RPC) SendSubmit(destination uint64, request *orderer.SubmitRequest, report func(error)) error { if s.Logger.IsEnabledFor(zapcore.DebugLevel) { defer s.submitSent(time.Now(), destination, request) } @@ -109,12 +118,20 @@ func (s *RPC) SendSubmit(destination uint64, request *orderer.SubmitRequest) err }, } + unmapOnFailure := func(err error) { + if err != nil && err.Error() == io.EOF.Error() { + s.Logger.Infof("Un-mapping transaction stream to %d because encountered a stale stream", destination) + s.unMapStream(destination, SubmitOperation, stream.ID) + } + report(err) + } + s.submitLock.Lock() defer s.submitLock.Unlock() - err = stream.Send(req) + err = stream.SendWithReport(req, unmapOnFailure) if err != nil { - s.unMapStream(destination, SubmitOperation) + s.unMapStream(destination, SubmitOperation, stream.ID) } return err } @@ -128,7 +145,7 @@ func (s *RPC) consensusSent(start time.Time, to uint64, msg *orderer.ConsensusRe } // getOrCreateStream obtains a Submit stream for the given destination node -func (s *RPC) getOrCreateStream(destination uint64, operationType OperationType) (orderer.Cluster_StepClient, error) { +func (s *RPC) getOrCreateStream(destination uint64, operationType OperationType) (*Stream, error) { stream := s.getStream(destination, operationType) if stream != nil { return stream, nil @@ -158,9 +175,21 @@ func (s *RPC) mapStream(destination uint64, stream *Stream, operationType Operat s.cleanCanceledStreams(operationType) } -func (s *RPC) unMapStream(destination uint64, operationType OperationType) { +func (s *RPC) unMapStream(destination uint64, operationType OperationType, streamIDToUnmap uint64) { s.lock.Lock() defer s.lock.Unlock() + + stream, exists := s.StreamsByType[operationType][destination] + if !exists { + s.Logger.Debugf("No %s stream to %d found, nothing to unmap", operationType, destination) + return + } + + if stream.ID != streamIDToUnmap { + s.Logger.Debugf("Stream for %s to %d has an ID of %d, not %d", operationType, destination, stream.ID, streamIDToUnmap) + return + } + delete(s.StreamsByType[operationType], destination) } diff --git a/orderer/common/cluster/rpc_test.go b/orderer/common/cluster/rpc_test.go index edf21f34832..e907c97d7f0 100644 --- a/orderer/common/cluster/rpc_test.go +++ b/orderer/common/cluster/rpc_test.go @@ -26,6 +26,72 @@ import ( "google.golang.org/grpc" ) +func noopReport(_ error) { +} + +func TestSendSubmitWithReport(t *testing.T) { + t.Parallel() + node1 := newTestNode(t) + node2 := newTestNode(t) + + var receptionWaitGroup sync.WaitGroup + receptionWaitGroup.Add(1) + node2.handler.On("OnSubmit", testChannel, mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) { + receptionWaitGroup.Done() + }) + + defer node1.stop() + defer node2.stop() + + config := []cluster.RemoteNode{node1.nodeInfo, node2.nodeInfo} + node1.c.Configure(testChannel, config) + node2.c.Configure(testChannel, config) + + node1RPC := &cluster.RPC{ + Logger: flogging.MustGetLogger("test"), + Timeout: time.Hour, + StreamsByType: cluster.NewStreamsByType(), + Channel: testChannel, + Comm: node1.c, + } + + // Wait for connections to be established + time.Sleep(time.Second * 5) + + err := node1RPC.SendSubmit(node2.nodeInfo.ID, &orderer.SubmitRequest{Channel: testChannel, Payload: &common.Envelope{Payload: []byte("1")}}, noopReport) + require.NoError(t, err) + receptionWaitGroup.Wait() // Wait for message to be received + + // Restart the node + node2.stop() + node2.resurrect() + + var wg2 sync.WaitGroup + wg2.Add(1) + + reportSubmitFailed := func(err error) { + require.EqualError(t, err, io.EOF.Error()) + defer wg2.Done() + } + + err = node1RPC.SendSubmit(node2.nodeInfo.ID, &orderer.SubmitRequest{Channel: testChannel, Payload: &common.Envelope{Payload: []byte("2")}}, reportSubmitFailed) + require.NoError(t, err) + + wg2.Wait() + + // Ensure stale stream is cleaned up and removed from the mapping + require.Len(t, node1RPC.StreamsByType[cluster.SubmitOperation], 0) + + // Wait for connection to be re-established + time.Sleep(time.Second * 5) + + // Send again, this time it should be received + receptionWaitGroup.Add(1) + err = node1RPC.SendSubmit(node2.nodeInfo.ID, &orderer.SubmitRequest{Channel: testChannel, Payload: &common.Envelope{Payload: []byte("3")}}, noopReport) + require.NoError(t, err) + receptionWaitGroup.Wait() +} + func TestRPCChangeDestination(t *testing.T) { // We send a Submit() to 2 different nodes - 1 and 2. // The first invocation of Submit() establishes a stream with node 1 @@ -82,8 +148,8 @@ func TestRPCChangeDestination(t *testing.T) { streamToNode1.On("Recv").Return(nil, io.EOF) streamToNode2.On("Recv").Return(nil, io.EOF) - rpc.SendSubmit(1, &orderer.SubmitRequest{Channel: "mychannel"}) - rpc.SendSubmit(2, &orderer.SubmitRequest{Channel: "mychannel"}) + rpc.SendSubmit(1, &orderer.SubmitRequest{Channel: "mychannel"}, noopReport) + rpc.SendSubmit(2, &orderer.SubmitRequest{Channel: "mychannel"}, noopReport) sent.Wait() streamToNode1.AssertNumberOfCalls(t, "Send", 1) @@ -111,7 +177,7 @@ func TestSend(t *testing.T) { } submit := func(rpc *cluster.RPC) error { - err := rpc.SendSubmit(1, submitRequest) + err := rpc.SendSubmit(1, submitRequest, noopReport) return err } @@ -291,7 +357,7 @@ func TestRPCGarbageCollection(t *testing.T) { defineMocks(1) - rpc.SendSubmit(1, &orderer.SubmitRequest{Channel: "mychannel"}) + rpc.SendSubmit(1, &orderer.SubmitRequest{Channel: "mychannel"}, noopReport) // Wait for the message to arrive sent.Wait() // Ensure the stream is initialized in the mapping @@ -311,7 +377,7 @@ func TestRPCGarbageCollection(t *testing.T) { defineMocks(2) // Send a message to a different node. - rpc.SendSubmit(2, &orderer.SubmitRequest{Channel: "mychannel"}) + rpc.SendSubmit(2, &orderer.SubmitRequest{Channel: "mychannel"}, noopReport) // The mapping should be now cleaned from the previous stream. require.Len(t, mapping[cluster.SubmitOperation], 1) require.Equal(t, uint64(2), mapping[cluster.SubmitOperation][2].ID) diff --git a/orderer/consensus/etcdraft/chain.go b/orderer/consensus/etcdraft/chain.go index 24ddaeb3fb2..d95eabe9b7d 100644 --- a/orderer/consensus/etcdraft/chain.go +++ b/orderer/consensus/etcdraft/chain.go @@ -74,7 +74,7 @@ type Configurator interface { // RPC is used to mock the transport layer in tests. type RPC interface { SendConsensus(dest uint64, msg *orderer.ConsensusRequest) error - SendSubmit(dest uint64, request *orderer.SubmitRequest) error + SendSubmit(dest uint64, request *orderer.SubmitRequest, report func(err error)) error } //go:generate counterfeiter -o mocks/mock_blockpuller.go . BlockPuller @@ -92,7 +92,8 @@ type CreateBlockPuller func() (BlockPuller, error) // Options contains all the configurations relevant to the chain. type Options struct { - RaftID uint64 + RPCTimeout time.Duration + RaftID uint64 Clock clock.Clock @@ -541,8 +542,7 @@ func (c *Chain) Submit(req *orderer.SubmitRequest, sender uint64) error { } if lead != c.raftID { - if err := c.rpc.SendSubmit(lead, req); err != nil { - c.Metrics.ProposalFailures.Add(1) + if err := c.forwardToLeader(lead, req); err != nil { return err } } @@ -555,6 +555,38 @@ func (c *Chain) Submit(req *orderer.SubmitRequest, sender uint64) error { return nil } +func (c *Chain) forwardToLeader(lead uint64, req *orderer.SubmitRequest) error { + c.logger.Infof("Forwarding transaction to the leader %d", lead) + timer := time.NewTimer(c.opts.RPCTimeout) + defer timer.Stop() + + sentChan := make(chan struct{}) + atomicErr := &atomic.Value{} + + report := func(err error) { + if err != nil { + atomicErr.Store(err.Error()) + c.Metrics.ProposalFailures.Add(1) + } + close(sentChan) + } + + c.rpc.SendSubmit(lead, req, report) + + select { + case <-sentChan: + case <-c.doneC: + return errors.Errorf("chain is stopped") + case <-timer.C: + return errors.Errorf("timed out (%v) waiting on forwarding to %d", c.opts.RPCTimeout, lead) + } + + if atomicErr.Load() != nil { + return errors.Errorf(atomicErr.Load().(string)) + } + return nil +} + type apply struct { entries []raftpb.Entry soft *raft.SoftState diff --git a/orderer/consensus/etcdraft/chain_test.go b/orderer/consensus/etcdraft/chain_test.go index d745b94fd56..301cc6404f4 100644 --- a/orderer/consensus/etcdraft/chain_test.go +++ b/orderer/consensus/etcdraft/chain_test.go @@ -184,6 +184,7 @@ var _ = Describe("Chain", func() { fakeFields = newFakeMetricsFields() opts = etcdraft.Options{ + RPCTimeout: time.Second * 5, RaftID: 1, Clock: clock, TickInterval: interval, @@ -2634,7 +2635,28 @@ var _ = Describe("Chain", func() { }) }) + When("gRPC stream to leader is stuck", func() { + BeforeEach(func() { + c2.opts.RPCTimeout = time.Second + network.Lock() + network.delayWG.Add(1) + network.Unlock() + }) + It("correctly times out", func() { + err := c2.Order(env, 0) + Expect(err).To(MatchError("timed out (1s) waiting on forwarding to 1")) + network.delayWG.Done() + }) + }) + When("leader is disconnected", func() { + It("correctly returns a failure to the client when forwarding from a follower", func() { + network.disconnect(1) + + err := c2.Order(env, 0) + Expect(err).To(MatchError("connection lost")) + }) + It("proactively steps down to follower", func() { network.disconnect(1) @@ -3371,6 +3393,7 @@ func newChain( fakeFields := newFakeMetricsFields() opts := etcdraft.Options{ + RPCTimeout: timeout, RaftID: uint64(id), Clock: clock, TickInterval: interval, @@ -3537,6 +3560,7 @@ func (c *chain) getStepFunc() stepFunc { } type network struct { + delayWG sync.WaitGroup sync.RWMutex leader uint64 @@ -3615,21 +3639,30 @@ func (n *network) addChain(c *chain) { return c.step(dest, msg) } - c.rpc.SendSubmitStub = func(dest uint64, msg *orderer.SubmitRequest) error { + c.rpc.SendSubmitStub = func(dest uint64, msg *orderer.SubmitRequest, f func(error)) error { if !n.linked(c.id, dest) { - return errors.Errorf("connection refused") + err := errors.Errorf("connection refused") + f(err) + return err } if !n.connected(c.id) || !n.connected(dest) { - return errors.Errorf("connection lost") + err := errors.Errorf("connection lost") + f(err) + return err } n.RLock() target := n.chains[dest] n.RUnlock() go func() { + n.Lock() + n.delayWG.Wait() + n.Unlock() + defer GinkgoRecover() target.Submit(msg, c.id) + f(nil) }() return nil } diff --git a/orderer/consensus/etcdraft/consenter.go b/orderer/consensus/etcdraft/consenter.go index 6df005be74d..b34e14030b1 100644 --- a/orderer/consensus/etcdraft/consenter.go +++ b/orderer/consensus/etcdraft/consenter.go @@ -200,6 +200,7 @@ func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *co } opts := Options{ + RPCTimeout: c.OrdererConfig.General.Cluster.RPCTimeout, RaftID: id, Clock: clock.NewClock(), MemoryStorage: raft.NewMemoryStorage(), diff --git a/orderer/consensus/etcdraft/mocks/mock_rpc.go b/orderer/consensus/etcdraft/mocks/mock_rpc.go index 9c42fc13500..91d28129171 100644 --- a/orderer/consensus/etcdraft/mocks/mock_rpc.go +++ b/orderer/consensus/etcdraft/mocks/mock_rpc.go @@ -21,11 +21,12 @@ type FakeRPC struct { sendConsensusReturnsOnCall map[int]struct { result1 error } - SendSubmitStub func(uint64, *orderer.SubmitRequest) error + SendSubmitStub func(uint64, *orderer.SubmitRequest, func(err error)) error sendSubmitMutex sync.RWMutex sendSubmitArgsForCall []struct { arg1 uint64 arg2 *orderer.SubmitRequest + arg3 func(err error) } sendSubmitReturns struct { result1 error @@ -98,17 +99,18 @@ func (fake *FakeRPC) SendConsensusReturnsOnCall(i int, result1 error) { }{result1} } -func (fake *FakeRPC) SendSubmit(arg1 uint64, arg2 *orderer.SubmitRequest) error { +func (fake *FakeRPC) SendSubmit(arg1 uint64, arg2 *orderer.SubmitRequest, arg3 func(err error)) error { fake.sendSubmitMutex.Lock() ret, specificReturn := fake.sendSubmitReturnsOnCall[len(fake.sendSubmitArgsForCall)] fake.sendSubmitArgsForCall = append(fake.sendSubmitArgsForCall, struct { arg1 uint64 arg2 *orderer.SubmitRequest - }{arg1, arg2}) - fake.recordInvocation("SendSubmit", []interface{}{arg1, arg2}) + arg3 func(err error) + }{arg1, arg2, arg3}) + fake.recordInvocation("SendSubmit", []interface{}{arg1, arg2, arg3}) fake.sendSubmitMutex.Unlock() if fake.SendSubmitStub != nil { - return fake.SendSubmitStub(arg1, arg2) + return fake.SendSubmitStub(arg1, arg2, arg3) } if specificReturn { return ret.result1 @@ -123,17 +125,17 @@ func (fake *FakeRPC) SendSubmitCallCount() int { return len(fake.sendSubmitArgsForCall) } -func (fake *FakeRPC) SendSubmitCalls(stub func(uint64, *orderer.SubmitRequest) error) { +func (fake *FakeRPC) SendSubmitCalls(stub func(uint64, *orderer.SubmitRequest, func(err error)) error) { fake.sendSubmitMutex.Lock() defer fake.sendSubmitMutex.Unlock() fake.SendSubmitStub = stub } -func (fake *FakeRPC) SendSubmitArgsForCall(i int) (uint64, *orderer.SubmitRequest) { +func (fake *FakeRPC) SendSubmitArgsForCall(i int) (uint64, *orderer.SubmitRequest, func(err error)) { fake.sendSubmitMutex.RLock() defer fake.sendSubmitMutex.RUnlock() argsForCall := fake.sendSubmitArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } func (fake *FakeRPC) SendSubmitReturns(result1 error) {