From 9699a9f1205adb4437b30729f19e45d95b32c712 Mon Sep 17 00:00:00 2001 From: andrew-coleman Date: Tue, 11 Jan 2022 13:19:48 +0000 Subject: [PATCH] Close connections to stale ordering nodes Signed-off-by: andrew-coleman --- internal/pkg/gateway/registry.go | 30 ++++++++++ internal/pkg/gateway/registry_test.go | 84 +++++++++++++++++++++++++-- 2 files changed, 110 insertions(+), 4 deletions(-) diff --git a/internal/pkg/gateway/registry.go b/internal/pkg/gateway/registry.go index 2f37220f1ea..8a8f5a5f7bd 100644 --- a/internal/pkg/gateway/registry.go +++ b/internal/pkg/gateway/registry.go @@ -427,7 +427,37 @@ func (reg *registry) configUpdate(bundle *channelconfig.Bundle) { } } if len(channelOrderers) > 0 { + reg.closeStaleOrdererConnections(channel, channelOrderers) reg.channelOrderers.Store(channel, channelOrderers) } } } + +func (reg *registry) closeStaleOrdererConnections(channel string, channelOrderers []*endpointConfig) { + // Load the list of orderers that is about to be overwritten, if loaded is false, then another goroutine got there first + oldList, loaded := reg.channelOrderers.LoadAndDelete(channel) + if loaded { + currentEndpoints := map[string]struct{}{} + reg.channelOrderers.Range(func(key, value interface{}) bool { + for _, ep := range value.([]*endpointConfig) { + currentEndpoints[ep.address] = struct{}{} + } + return true + }) + for _, ep := range channelOrderers { + currentEndpoints[ep.address] = struct{}{} + } + // if there are any in the oldEndpoints that are not in the currentEndpoints, then remove from registry and close connection + for _, ep := range oldList.([]*endpointConfig) { + if _, exists := currentEndpoints[ep.address]; !exists { + client, found := reg.broadcastClients.LoadAndDelete(ep.address) + if found { + err := client.(*orderer).closeConnection() + if err != nil { + reg.logger.Errorw("Failed to close connection to orderer", "address", ep.address, "mspid", ep.mspid, "err", err) + } + } + } + } + } +} diff --git a/internal/pkg/gateway/registry_test.go b/internal/pkg/gateway/registry_test.go index 23c2de8d15c..c535f3ca955 100644 --- a/internal/pkg/gateway/registry_test.go +++ b/internal/pkg/gateway/registry_test.go @@ -34,7 +34,7 @@ func TestOrdererCache(t *testing.T) { require.Len(t, orderers, 1) // trigger the config update callback, updating the orderers - bundle, err := createChannelConfigBundle() + bundle, err := createChannelConfigBundle(channelName, []string{"orderer1:7050", "orderer2:7050", "orderer3:7050"}) require.NoError(t, err) test.server.registry.configUpdate(bundle) orderers, err = test.server.registry.orderers(channelName) @@ -42,6 +42,82 @@ func TestOrdererCache(t *testing.T) { require.Len(t, orderers, 3) } +func TestStaleOrdererConnections(t *testing.T) { + def := &testDef{ + config: buildConfig(t, []string{"orderer1", "orderer2", "orderer3"}), + } + test := prepareTest(t, def) + + orderers, err := test.server.registry.orderers(channelName) + require.NoError(t, err) + require.Len(t, orderers, 3) + + closed := make([]bool, len(orderers)) + for i, o := range orderers { + o.closeConnection = func(index int) func() error { + return func() error { + closed[index] = true + return nil + } + }(i) + } + // trigger the config update callback, updating the orderers + bundle, err := createChannelConfigBundle(channelName, []string{"orderer1:7050", "orderer3:7050"}) + require.NoError(t, err) + test.server.registry.configUpdate(bundle) + orderers, err = test.server.registry.orderers(channelName) + require.NoError(t, err) + require.Len(t, orderers, 2) + require.False(t, closed[0]) + require.True(t, closed[1]) + require.False(t, closed[2]) +} + +func TestStaleMultiChannelOrdererConnections(t *testing.T) { + channel1 := "channel1" + // channel2 := "channel2" + // channel3 := "channel3" + + def := &testDef{ + config: buildConfig(t, []string{"orderer1", "orderer2"}), + } + test := prepareTest(t, def) + + orderers, err := test.server.registry.orderers(channelName) + require.NoError(t, err) + require.Len(t, orderers, 2) + + // trigger the config update callback, updating the orderers + bundle, err := createChannelConfigBundle(channel1, []string{"orderer1:7050", "orderer3:7050", "orderer4:7050"}) + require.NoError(t, err) + test.server.registry.configUpdate(bundle) + orderers, err = test.server.registry.orderers(channel1) + require.NoError(t, err) + require.Len(t, orderers, 3) + + closed := make([]bool, len(orderers)) + for i, o := range orderers { + o.closeConnection = func(index int) func() error { + return func() error { + closed[index] = true + return nil + } + }(i) + } + + // new config update removes orderer1 and orderer3 from channel1 - should only trigger the closure of orderer3 + bundle, err = createChannelConfigBundle(channel1, []string{"orderer4:7050"}) + require.NoError(t, err) + test.server.registry.configUpdate(bundle) + orderers, err = test.server.registry.orderers(channel1) + require.NoError(t, err) + require.Len(t, orderers, 1) + + require.False(t, closed[0]) // orderer1 + require.True(t, closed[1]) // orderer3 + require.False(t, closed[2]) // orderer4 +} + func buildConfig(t *testing.T, orderers []string) *dp.ConfigResult { ca, err := tlsgen.NewCA() require.NoError(t, err) @@ -64,10 +140,10 @@ func buildConfig(t *testing.T, orderers []string) *dp.ConfigResult { } } -func createChannelConfigBundle() (*channelconfig.Bundle, error) { +func createChannelConfigBundle(channel string, endpoints []string) (*channelconfig.Bundle, error) { conf := genesisconfig.Load(genesisconfig.SampleDevModeSoloProfile, configtest.GetDevConfigDir()) conf.Capabilities = map[string]bool{"V2_0": true} - conf.Orderer.Organizations[0].OrdererEndpoints = []string{"orderer1", "orderer2", "orderer3"} + conf.Orderer.Organizations[0].OrdererEndpoints = endpoints cg, err := encoder.NewChannelGroup(conf) if err != nil { @@ -79,5 +155,5 @@ func createChannelConfigBundle() (*channelconfig.Bundle, error) { return nil, err } - return channelconfig.NewBundle(channelName, &cb.Config{ChannelGroup: cg}, cryptoProvider) + return channelconfig.NewBundle(channel, &cb.Config{ChannelGroup: cg}, cryptoProvider) }