Skip to content

Commit

Permalink
Close connections to stale ordering nodes
Browse files Browse the repository at this point in the history
Signed-off-by: andrew-coleman <andrew_coleman@uk.ibm.com>
(cherry picked from commit 9699a9f)
  • Loading branch information
andrew-coleman authored and denyeart committed Jan 18, 2022
1 parent e1ed78f commit e7cb726
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 4 deletions.
30 changes: 30 additions & 0 deletions internal/pkg/gateway/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,37 @@ func (reg *registry) configUpdate(bundle *channelconfig.Bundle) {
}
}
if len(channelOrderers) > 0 {
reg.closeStaleOrdererConnections(channel, channelOrderers)
reg.channelOrderers.Store(channel, channelOrderers)
}
}
}

func (reg *registry) closeStaleOrdererConnections(channel string, channelOrderers []*endpointConfig) {
// Load the list of orderers that is about to be overwritten, if loaded is false, then another goroutine got there first
oldList, loaded := reg.channelOrderers.LoadAndDelete(channel)
if loaded {
currentEndpoints := map[string]struct{}{}
reg.channelOrderers.Range(func(key, value interface{}) bool {
for _, ep := range value.([]*endpointConfig) {
currentEndpoints[ep.address] = struct{}{}
}
return true
})
for _, ep := range channelOrderers {
currentEndpoints[ep.address] = struct{}{}
}
// if there are any in the oldEndpoints that are not in the currentEndpoints, then remove from registry and close connection
for _, ep := range oldList.([]*endpointConfig) {
if _, exists := currentEndpoints[ep.address]; !exists {
client, found := reg.broadcastClients.LoadAndDelete(ep.address)
if found {
err := client.(*orderer).closeConnection()
if err != nil {
reg.logger.Errorw("Failed to close connection to orderer", "address", ep.address, "mspid", ep.mspid, "err", err)
}
}
}
}
}
}
84 changes: 80 additions & 4 deletions internal/pkg/gateway/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,90 @@ func TestOrdererCache(t *testing.T) {
require.Len(t, orderers, 1)

// trigger the config update callback, updating the orderers
bundle, err := createChannelConfigBundle()
bundle, err := createChannelConfigBundle(channelName, []string{"orderer1:7050", "orderer2:7050", "orderer3:7050"})
require.NoError(t, err)
test.server.registry.configUpdate(bundle)
orderers, err = test.server.registry.orderers(channelName)
require.NoError(t, err)
require.Len(t, orderers, 3)
}

func TestStaleOrdererConnections(t *testing.T) {
def := &testDef{
config: buildConfig(t, []string{"orderer1", "orderer2", "orderer3"}),
}
test := prepareTest(t, def)

orderers, err := test.server.registry.orderers(channelName)
require.NoError(t, err)
require.Len(t, orderers, 3)

closed := make([]bool, len(orderers))
for i, o := range orderers {
o.closeConnection = func(index int) func() error {
return func() error {
closed[index] = true
return nil
}
}(i)
}
// trigger the config update callback, updating the orderers
bundle, err := createChannelConfigBundle(channelName, []string{"orderer1:7050", "orderer3:7050"})
require.NoError(t, err)
test.server.registry.configUpdate(bundle)
orderers, err = test.server.registry.orderers(channelName)
require.NoError(t, err)
require.Len(t, orderers, 2)
require.False(t, closed[0])
require.True(t, closed[1])
require.False(t, closed[2])
}

func TestStaleMultiChannelOrdererConnections(t *testing.T) {
channel1 := "channel1"
// channel2 := "channel2"
// channel3 := "channel3"

def := &testDef{
config: buildConfig(t, []string{"orderer1", "orderer2"}),
}
test := prepareTest(t, def)

orderers, err := test.server.registry.orderers(channelName)
require.NoError(t, err)
require.Len(t, orderers, 2)

// trigger the config update callback, updating the orderers
bundle, err := createChannelConfigBundle(channel1, []string{"orderer1:7050", "orderer3:7050", "orderer4:7050"})
require.NoError(t, err)
test.server.registry.configUpdate(bundle)
orderers, err = test.server.registry.orderers(channel1)
require.NoError(t, err)
require.Len(t, orderers, 3)

closed := make([]bool, len(orderers))
for i, o := range orderers {
o.closeConnection = func(index int) func() error {
return func() error {
closed[index] = true
return nil
}
}(i)
}

// new config update removes orderer1 and orderer3 from channel1 - should only trigger the closure of orderer3
bundle, err = createChannelConfigBundle(channel1, []string{"orderer4:7050"})
require.NoError(t, err)
test.server.registry.configUpdate(bundle)
orderers, err = test.server.registry.orderers(channel1)
require.NoError(t, err)
require.Len(t, orderers, 1)

require.False(t, closed[0]) // orderer1
require.True(t, closed[1]) // orderer3
require.False(t, closed[2]) // orderer4
}

func buildConfig(t *testing.T, orderers []string) *dp.ConfigResult {
ca, err := tlsgen.NewCA()
require.NoError(t, err)
Expand All @@ -64,10 +140,10 @@ func buildConfig(t *testing.T, orderers []string) *dp.ConfigResult {
}
}

func createChannelConfigBundle() (*channelconfig.Bundle, error) {
func createChannelConfigBundle(channel string, endpoints []string) (*channelconfig.Bundle, error) {
conf := genesisconfig.Load(genesisconfig.SampleDevModeSoloProfile, configtest.GetDevConfigDir())
conf.Capabilities = map[string]bool{"V2_0": true}
conf.Orderer.Organizations[0].OrdererEndpoints = []string{"orderer1", "orderer2", "orderer3"}
conf.Orderer.Organizations[0].OrdererEndpoints = endpoints

cg, err := encoder.NewChannelGroup(conf)
if err != nil {
Expand All @@ -79,5 +155,5 @@ func createChannelConfigBundle() (*channelconfig.Bundle, error) {
return nil, err
}

return channelconfig.NewBundle(channelName, &cb.Config{ChannelGroup: cg}, cryptoProvider)
return channelconfig.NewBundle(channel, &cb.Config{ChannelGroup: cg}, cryptoProvider)
}

0 comments on commit e7cb726

Please sign in to comment.