Skip to content

Commit

Permalink
Check if inner consensus message is missing
Browse files Browse the repository at this point in the history
Change-Id: I06b466e6ff6d43f2b9804dd21185241716356050
Signed-off-by: Yacov Manevich <yacovm@il.ibm.com>
  • Loading branch information
yacovm authored and ale-linux committed Jun 21, 2022
1 parent 8c3b10d commit 6e5e693
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 4 deletions.
60 changes: 58 additions & 2 deletions orderer/common/cluster/comm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ func (*mockChannelExtractor) TargetChannel(msg proto.Message) string {
}
}

type clusterServer interface {
// Step passes an implementation-specific message to another cluster member.
Step(server orderer.Cluster_StepServer) error
}

type clusterNode struct {
lock sync.Mutex
frozen bool
Expand All @@ -132,6 +137,7 @@ type clusterNode struct {
clientConfig comm_utils.ClientConfig
serverConfig comm_utils.ServerConfig
c *cluster.Comm
dispatcher clusterServer
}

func (cn *clusterNode) Step(stream orderer.Cluster_StepServer) error {
Expand Down Expand Up @@ -179,7 +185,7 @@ func (cn *clusterNode) resurrect() {
panic(fmt.Errorf("failed starting gRPC server: %v", err))
}
cn.srv = gRPCServer
orderer.RegisterClusterServer(gRPCServer.Server(), cn)
orderer.RegisterClusterServer(gRPCServer.Server(), cn.dispatcher)
go cn.srv.Start()
}

Expand Down Expand Up @@ -257,6 +263,10 @@ func newTestNodeWithMetrics(t *testing.T, metrics cluster.MetricsProvider, tlsCo
srv: gRPCServer,
}

if tstSrv.dispatcher == nil {
tstSrv.dispatcher = tstSrv
}

tstSrv.freezeCond.L = &tstSrv.lock

compareCert := cluster.CachePublicKeyComparisons(func(a, b []byte) bool {
Expand All @@ -275,7 +285,7 @@ func newTestNodeWithMetrics(t *testing.T, metrics cluster.MetricsProvider, tlsCo
CompareCertificate: compareCert,
}

orderer.RegisterClusterServer(gRPCServer.Server(), tstSrv)
orderer.RegisterClusterServer(gRPCServer.Server(), tstSrv.dispatcher)
go gRPCServer.Start()
return tstSrv
}
Expand Down Expand Up @@ -481,6 +491,52 @@ func TestBlockingSend(t *testing.T) {
}
}

func TestEmptyRequest(t *testing.T) {
// Scenario: Ensures empty messages are discarded and an error is returned
// back to the sender.

node1 := newTestNode(t)
node2 := newTestNode(t)

node2.srv.Stop()
svc := &cluster.Service{
StepLogger: flogging.MustGetLogger("test"),
Logger: flogging.MustGetLogger("test"),
StreamCountReporter: &cluster.StreamCountReporter{
Metrics: cluster.NewMetrics(&disabled.Provider{}),
},
Dispatcher: node2.c,
}
node2.dispatcher = svc

// Sleep to let the gRPC service be closed
time.Sleep(time.Second)

// Resurrect the node with the new dispatcher
node2.resurrect()

defer node1.stop()
defer node2.stop()

config := []cluster.RemoteNode{node1.nodeInfo, node2.nodeInfo}
node1.c.Configure(testChannel, config)
node2.c.Configure(testChannel, config)

assertBiDiCommunication(t, node1, node2, testReq)

rm, err := node1.c.Remote(testChannel, node2.nodeInfo.ID)
require.NoError(t, err)

stream, err := rm.NewStream(time.Second * 10)
require.NoError(t, err)

err = stream.Send(&orderer.StepRequest{})
require.NoError(t, err)

_, err = stream.Recv()
require.Error(t, err, "message is neither a Submit nor a Consensus request")
}

func TestBasic(t *testing.T) {
// Scenario: Basic test that spawns 2 nodes and sends each other
// messages that are expected to be echoed back
Expand Down
6 changes: 4 additions & 2 deletions orderer/common/cluster/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/hyperledger/fabric-protos-go/orderer"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/util"
"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -90,10 +91,11 @@ func (s *Service) handleMessage(stream StepStream, addr string, exp *certificate
nodeName := commonNameFromContext(stream.Context())
s.Logger.Debugf("Received message from %s(%s): %v", nodeName, addr, requestAsString(request))
return s.handleSubmit(submitReq, stream, addr)
} else if consensusReq := request.GetConsensusRequest(); consensusReq != nil {
return s.Dispatcher.DispatchConsensus(stream.Context(), request.GetConsensusRequest())
}

// Else, it's a consensus message.
return s.Dispatcher.DispatchConsensus(stream.Context(), request.GetConsensusRequest())
return errors.Errorf("message is neither a Submit nor a Consensus request")
}

func (s *Service) handleSubmit(request *orderer.SubmitRequest, stream StepStream, addr string) error {
Expand Down

0 comments on commit 6e5e693

Please sign in to comment.