Skip to content

Commit

Permalink
Raft to BFT migration
Browse files Browse the repository at this point in the history
Signed-off-by: Yacov Manevich <yacov.manevich@ibm.com>
  • Loading branch information
yacovm committed Nov 28, 2023
1 parent da0853a commit 99283c7
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 23 deletions.
199 changes: 197 additions & 2 deletions integration/raft/migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ package raft
import (
"errors"
"fmt"
"github.com/SmartBFT-Go/consensus/pkg/types"
"github.com/hyperledger/fabric-protos-go/orderer/smartbft"
"github.com/hyperledger/fabric/common/policies"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -39,10 +42,12 @@ var _ = Describe("ConsensusTypeMigration", func() {
client *docker.Client
network *nwo.Network

o1Proc, o2Proc, o3Proc ifrit.Process
o1Proc, o2Proc, o3Proc, o4Proc ifrit.Process

o1Runner *ginkgomon.Runner
o2Runner *ginkgomon.Runner
o3Runner *ginkgomon.Runner
o4Runner *ginkgomon.Runner
)

BeforeEach(func() {
Expand All @@ -55,7 +60,7 @@ var _ = Describe("ConsensusTypeMigration", func() {
})

AfterEach(func() {
for _, oProc := range []ifrit.Process{o1Proc, o2Proc, o3Proc} {
for _, oProc := range []ifrit.Process{o1Proc, o2Proc, o3Proc, o4Proc} {
if oProc != nil {
oProc.Signal(syscall.SIGTERM)
Eventually(oProc.Wait(), network.EventuallyTimeout).Should(Receive())
Expand All @@ -69,6 +74,127 @@ var _ = Describe("ConsensusTypeMigration", func() {
_ = os.RemoveAll(testDir)
})

Describe("Raft to BFT migration", func() {
FIt("migrates from Raft to BFT", func() {
networkConfig := nwo.MultiNodeEtcdRaft()
networkConfig.Orderers = append(networkConfig.Orderers, &nwo.Orderer{Name: "orderer4", Organization: "OrdererOrg"})
networkConfig.Profiles[0].Orderers = []string{"orderer1", "orderer2", "orderer3", "orderer4"}

network = nwo.New(networkConfig, testDir, client, StartPort(), components)

o1, o2, o3, o4 := network.Orderer("orderer1"), network.Orderer("orderer2"), network.Orderer("orderer3"), network.Orderer("orderer4")
network.GenerateConfigTree()
network.Bootstrap()

runOrderers := func() {
o1Runner = network.OrdererRunner(o1)
o2Runner = network.OrdererRunner(o2)
o3Runner = network.OrdererRunner(o3)
o4Runner = network.OrdererRunner(o4)

o1Runner.Command.Env = append(o1Runner.Command.Env, "FABRIC_LOGGING_SPEC=orderer.common.cluster=debug:orderer.consensus.smartbft=debug:policies.ImplicitOrderer=debug")
o2Runner.Command.Env = append(o2Runner.Command.Env, "FABRIC_LOGGING_SPEC=orderer.common.cluster=debug:orderer.consensus.smartbft=debug:policies.ImplicitOrderer=debug")
o3Runner.Command.Env = append(o3Runner.Command.Env, "FABRIC_LOGGING_SPEC=orderer.common.cluster=debug:orderer.consensus.smartbft=debug:policies.ImplicitOrderer=debug")
o4Runner.Command.Env = append(o4Runner.Command.Env, "FABRIC_LOGGING_SPEC=orderer.common.cluster=debug:orderer.consensus.smartbft=debug:policies.ImplicitOrderer=debug")

o1Proc = ifrit.Invoke(o1Runner)
o2Proc = ifrit.Invoke(o2Runner)
o3Proc = ifrit.Invoke(o3Runner)
o4Proc = ifrit.Invoke(o4Runner)

Eventually(o1Proc.Ready(), network.EventuallyTimeout).Should(BeClosed())
Eventually(o2Proc.Ready(), network.EventuallyTimeout).Should(BeClosed())
Eventually(o3Proc.Ready(), network.EventuallyTimeout).Should(BeClosed())
Eventually(o4Proc.Ready(), network.EventuallyTimeout).Should(BeClosed())
}

runOrderers()

channelparticipation.JoinOrderersAppChannelCluster(network, "testchannel", o1, o2, o3, o4)
FindLeader([]*ginkgomon.Runner{o1Runner, o2Runner, o3Runner, o4Runner})

By("performing operation with orderer1")
env := CreateBroadcastEnvelope(network, o1, "testchannel", []byte("foo"))
resp, err := ordererclient.Broadcast(network, o1, env)
Expect(err).NotTo(HaveOccurred())
Expect(resp.Status).To(Equal(common.Status_SUCCESS))

block := FetchBlock(network, o1, 1, "testchannel")
Expect(block).NotTo(BeNil())

By("Change to maintenance mode")

peer := network.Peer("Org1", "peer0")
orderer := network.Orderer("orderer1")

addBFTInConfigTree(network, peer, orderer, "testchannel")

By("Config update on standard channel, State=MAINTENANCE, enter maintenance-mode")
config, updatedConfig := prepareTransition(network, peer, orderer, "testchannel",
"etcdraft", protosorderer.ConsensusType_STATE_NORMAL,
"etcdraft", nil, protosorderer.ConsensusType_STATE_MAINTENANCE)
nwo.UpdateOrdererConfig(network, orderer, "testchannel", config, updatedConfig, peer, orderer)

bftMetadata := protoutil.MarshalOrPanic(&smartbft.Options{
RequestBatchMaxCount: types.DefaultConfig.RequestBatchMaxCount,
RequestBatchMaxBytes: types.DefaultConfig.RequestBatchMaxBytes,
RequestBatchMaxInterval: types.DefaultConfig.RequestBatchMaxInterval.String(),
IncomingMessageBufferSize: types.DefaultConfig.IncomingMessageBufferSize,
RequestPoolSize: types.DefaultConfig.RequestPoolSize,
RequestForwardTimeout: types.DefaultConfig.RequestForwardTimeout.String(),
RequestComplainTimeout: types.DefaultConfig.RequestComplainTimeout.String(),
RequestAutoRemoveTimeout: types.DefaultConfig.RequestAutoRemoveTimeout.String(),
ViewChangeResendInterval: types.DefaultConfig.ViewChangeResendInterval.String(),
ViewChangeTimeout: types.DefaultConfig.ViewChangeTimeout.String(),
LeaderHeartbeatTimeout: types.DefaultConfig.LeaderHeartbeatTimeout.String(),
LeaderHeartbeatCount: types.DefaultConfig.LeaderHeartbeatCount,
CollectTimeout: types.DefaultConfig.CollectTimeout.String(),
SyncOnStart: types.DefaultConfig.SyncOnStart,
SpeedUpViewChange: types.DefaultConfig.SpeedUpViewChange,
})
config, updatedConfig = prepareTransition(network, peer, orderer, "testchannel",
"etcdraft", protosorderer.ConsensusType_STATE_MAINTENANCE,
"BFT", bftMetadata, protosorderer.ConsensusType_STATE_MAINTENANCE)
nwo.UpdateOrdererConfig(network, orderer, "testchannel", config, updatedConfig, peer, orderer)

time.Sleep(time.Second) // TODO: check block was committed in all orderers

for _, oProc := range []ifrit.Process{o1Proc, o2Proc, o3Proc, o4Proc} {
if oProc != nil {
oProc.Signal(syscall.SIGTERM)
Eventually(oProc.Wait(), network.EventuallyTimeout).Should(Receive())
}
}

runOrderers()

// TODO:
By("Waiting for followers to see the leader, again")
Eventually(o2Runner.Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("Message from 1 channel=testchannel"))
Eventually(o3Runner.Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("Message from 1 channel=testchannel"))
Eventually(o4Runner.Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("Message from 1 channel=testchannel"))

// Exit maintenance mode

config, updatedConfig = prepareTransition(network, peer, orderer, "testchannel",
"BFT", protosorderer.ConsensusType_STATE_MAINTENANCE,
"BFT", nil, protosorderer.ConsensusType_STATE_NORMAL)
nwo.UpdateOrdererConfig(network, orderer, "testchannel", config, updatedConfig, peer, orderer)

// Now, run a transaction to ensure BFT works.
env = CreateBroadcastEnvelope(network, o1, "testchannel", []byte("foo"))
resp, err = ordererclient.Broadcast(network, o1, env)
Expect(err).NotTo(HaveOccurred())
Expect(resp.Status).To(Equal(common.Status_SUCCESS))

time.Sleep(time.Second * 5)

// TODO: check block was successfully committed in all orderers

})

})

// These tests execute the migration config updates on an etcdraft based system, but do not restart the orderer
// to a "future-type" consensus-type that currently does not exist. However, these tests try to maintain as much as
// possible the testing infrastructure for consensus-type migration that existed when "solo" and "kafka" were still
Expand Down Expand Up @@ -604,3 +730,72 @@ func createDeliverEnvelope(n *nwo.Network, signer *nwo.SigningIdentity, blkNum u

return env
}

func addBFTInConfigTree(network *nwo.Network, peer *nwo.Peer, orderer *nwo.Orderer, channel string) {
config := nwo.GetConfig(network, peer, orderer, channel)

updatedConfig := proto.Clone(config).(*common.Config)

orderersVal := &common.Orderers{
ConsenterMapping: computeConsenterMappings(network),
}

policies.EncodeBFTBlockVerificationPolicy(orderersVal.ConsenterMapping, updatedConfig.ChannelGroup.Groups["Orderer"])

updatedConfig.ChannelGroup.Groups["Orderer"].Values["Orderers"] = &common.ConfigValue{
Value: protoutil.MarshalOrPanic(orderersVal),
ModPolicy: "/Channel/Orderer/Admins",
}

nwo.UpdateOrdererConfig(network, orderer, channel, config, updatedConfig, peer, orderer)
}

func computeConsenterMappings(network *nwo.Network) []*common.Consenter {
var consenters []*common.Consenter

for i, orderer := range network.Orderers {
ecertPath := network.OrdererCert(orderer)
ecert, err := os.ReadFile(ecertPath)
Expect(err).To(Not(HaveOccurred()))

tlsDir := network.OrdererLocalTLSDir(orderer)
tlsPath := filepath.Join(tlsDir, "server.crt")
tlsCert, err := os.ReadFile(tlsPath)
Expect(err).To(Not(HaveOccurred()))

consenters = append(consenters, &common.Consenter{
ServerTlsCert: tlsCert,
ClientTlsCert: tlsCert,
MspId: network.Organization(orderer.Organization).MSPID,
Host: "127.0.0.1",
Port: uint32(network.PortsByOrdererID[orderer.ID()][nwo.ClusterPort]),
Id: uint32(i + 1),
Identity: ecert,
})
}

return consenters
}

func CreateBroadcastEnvelope(n *nwo.Network, entity interface{}, channel string, data []byte) *common.Envelope {
var signer *nwo.SigningIdentity
switch creator := entity.(type) {
case *nwo.Peer:
signer = n.PeerUserSigner(creator, "Admin")
case *nwo.Orderer:
signer = n.OrdererUserSigner(creator, "Admin")
}
Expect(signer).NotTo(BeNil())

env, err := protoutil.CreateSignedEnvelope(
common.HeaderType_ENDORSER_TRANSACTION,
channel,
signer,
&common.Envelope{Payload: data},
0,
0,
)
Expect(err).NotTo(HaveOccurred())

return env
}
4 changes: 1 addition & 3 deletions orderer/common/msgprocessor/maintenancefilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ func NewMaintenanceFilter(support MaintenanceFilterSupport, bccsp bccsp.BCCSP) *
bccsp: bccsp,
}
mf.permittedTargetConsensusTypes["etcdraft"] = true
// Until we have a BFT consensus type, we use this for integration testing of consensus-type migration.
// Caution: proposing a config block with this type will cause panic.
mf.permittedTargetConsensusTypes["testing-only"] = true
mf.permittedTargetConsensusTypes["BFT"] = true
return mf
}

Expand Down
20 changes: 15 additions & 5 deletions orderer/consensus/etcdraft/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1115,12 +1115,17 @@ func (c *Chain) commitBlock(block *common.Block) {
func (c *Chain) detectConfChange(block *common.Block) *MembershipChanges {
// If config is targeting THIS channel, inspect consenter set and
// propose raft ConfChange if it adds/removes node.
configMetadata := c.newConfigMetadata(block)
configMetadata, consensusType := c.newConfigMetadata(block)

if configMetadata == nil {
return nil
}

if consensusType.Type != "etcdraft" {
c.logger.Infof("Detected migration to %s", consensusType.Type)
return nil
}

if configMetadata.Options != nil &&
configMetadata.Options.SnapshotIntervalSize != 0 &&
configMetadata.Options.SnapshotIntervalSize != c.sizeLimit {
Expand Down Expand Up @@ -1425,12 +1430,12 @@ func (c *Chain) getInFlightConfChange() *raftpb.ConfChange {
}

// newMetadata extract config metadata from the configuration block
func (c *Chain) newConfigMetadata(block *common.Block) *etcdraft.ConfigMetadata {
metadata, err := ConsensusMetadataFromConfigBlock(block)
func (c *Chain) newConfigMetadata(block *common.Block) (*etcdraft.ConfigMetadata, *orderer.ConsensusType) {
metadata, consensusType, err := ConsensusMetadataFromConfigBlock(block)
if err != nil {
c.logger.Panicf("error reading consensus metadata: %s", err)
}
return metadata
return metadata, consensusType
}

// ValidateConsensusMetadata determines the validity of a
Expand All @@ -1446,6 +1451,11 @@ func (c *Chain) ValidateConsensusMetadata(oldOrdererConfig, newOrdererConfig cha
return nil
}

if newOrdererConfig.ConsensusType() != "etcdraft" {
// This is a migration, so we don't know how to validate this config change.
return nil
}

if oldOrdererConfig == nil {
c.logger.Panic("Programming Error: ValidateConsensusMetadata called with nil old channel config")
return nil
Expand Down Expand Up @@ -1572,7 +1582,7 @@ func (c *Chain) checkForEvictionNCertRotation(env *common.Envelope) bool {
return false
}

configMeta, err := MetadataFromConfigUpdate(configUpdate)
configMeta, _, err := MetadataFromConfigUpdate(configUpdate)
if err != nil || configMeta == nil {
c.logger.Warnf("could not read config metadata: %s", err)
return false
Expand Down
26 changes: 13 additions & 13 deletions orderer/consensus/etcdraft/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,22 +84,22 @@ func MetadataHasDuplication(md *etcdraft.ConfigMetadata) error {
}

// MetadataFromConfigValue reads and translates configuration updates from config value into raft metadata
func MetadataFromConfigValue(configValue *common.ConfigValue) (*etcdraft.ConfigMetadata, error) {
func MetadataFromConfigValue(configValue *common.ConfigValue) (*etcdraft.ConfigMetadata, *orderer.ConsensusType, error) {
consensusTypeValue := &orderer.ConsensusType{}
if err := proto.Unmarshal(configValue.Value, consensusTypeValue); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal consensusType config update")
return nil, nil, errors.Wrap(err, "failed to unmarshal consensusType config update")
}

updatedMetadata := &etcdraft.ConfigMetadata{}
if err := proto.Unmarshal(consensusTypeValue.Metadata, updatedMetadata); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal updated (new) etcdraft metadata configuration")
return nil, nil, errors.Wrap(err, "failed to unmarshal updated (new) etcdraft metadata configuration")
}

return updatedMetadata, nil
return updatedMetadata, consensusTypeValue, nil
}

// MetadataFromConfigUpdate extracts consensus metadata from config update
func MetadataFromConfigUpdate(update *common.ConfigUpdate) (*etcdraft.ConfigMetadata, error) {
func MetadataFromConfigUpdate(update *common.ConfigUpdate) (*etcdraft.ConfigMetadata, *orderer.ConsensusType, error) {
var baseVersion uint64
if update.ReadSet != nil && update.ReadSet.Groups != nil {
if ordererConfigGroup, ok := update.ReadSet.Groups["Orderer"]; ok {
Expand All @@ -115,13 +115,13 @@ func MetadataFromConfigUpdate(update *common.ConfigUpdate) (*etcdraft.ConfigMeta
if baseVersion == val.Version {
// Only if the version in the write set differs from the read-set
// should we consider this to be an update to the consensus type
return nil, nil
return nil, nil, nil
}
return MetadataFromConfigValue(val)
}
}
}
return nil, nil
return nil, nil, nil
}

// ConfigChannelHeader expects a config block and returns the header type
Expand Down Expand Up @@ -168,28 +168,28 @@ func ConfigEnvelopeFromBlock(block *common.Block) (*common.Envelope, error) {
}

// ConsensusMetadataFromConfigBlock reads consensus metadata updates from the configuration block
func ConsensusMetadataFromConfigBlock(block *common.Block) (*etcdraft.ConfigMetadata, error) {
func ConsensusMetadataFromConfigBlock(block *common.Block) (*etcdraft.ConfigMetadata, *orderer.ConsensusType, error) {
if block == nil {
return nil, errors.New("nil block")
return nil, nil, errors.New("nil block")
}

if !protoutil.IsConfigBlock(block) {
return nil, errors.New("not a config block")
return nil, nil, errors.New("not a config block")
}

configEnvelope, err := ConfigEnvelopeFromBlock(block)
if err != nil {
return nil, errors.Wrap(err, "cannot read config update")
return nil, nil, errors.Wrap(err, "cannot read config update")
}

payload, err := protoutil.UnmarshalPayload(configEnvelope.Payload)
if err != nil {
return nil, errors.Wrap(err, "failed to extract payload from config envelope")
return nil, nil, errors.Wrap(err, "failed to extract payload from config envelope")
}
// get config update
configUpdate, err := configtx.UnmarshalConfigUpdateFromPayload(payload)
if err != nil {
return nil, errors.Wrap(err, "could not read config update")
return nil, nil, errors.Wrap(err, "could not read config update")
}

return MetadataFromConfigUpdate(configUpdate)
Expand Down

0 comments on commit 99283c7

Please sign in to comment.