-
Notifications
You must be signed in to change notification settings - Fork 8.8k
/
broadcast.go
143 lines (118 loc) · 5.17 KB
/
broadcast.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package broadcast
import (
"io"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/orderer/common/msgprocessor"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric/orderer/common/util"
"github.com/op/go-logging"
"github.com/pkg/errors"
)
const pkgLogID = "orderer/common/broadcast"
var logger *logging.Logger
func init() {
logger = flogging.MustGetLogger(pkgLogID)
}
// Handler defines an interface which handles broadcasts
type Handler interface {
// Handle starts a service thread for a given gRPC connection and services the broadcast connection
Handle(srv ab.AtomicBroadcast_BroadcastServer) error
}
// ChannelSupportRegistrar provides a way for the Handler to look up the Support for a channel
type ChannelSupportRegistrar interface {
// BroadcastChannelSupport returns the message channel header, whether the message is a config update
// and the channel resources for a message or an error if the message is not a message which can
// be processed directly (like CONFIG and ORDERER_TRANSACTION messages)
BroadcastChannelSupport(msg *cb.Envelope) (*cb.ChannelHeader, bool, ChannelSupport, error)
}
// ChannelSupport provides the backing resources needed to support broadcast on a channel
type ChannelSupport interface {
msgprocessor.Processor
Consenter
}
// Consenter provides methods to send messages through consensus
type Consenter interface {
// Order accepts a message or returns an error indicating the cause of failure
// It ultimately passes through to the consensus.Chain interface
Order(env *cb.Envelope, configSeq uint64) error
// Configure accepts a reconfiguration or returns an error indicating the cause of failure
// It ultimately passes through to the consensus.Chain interface
Configure(config *cb.Envelope, configSeq uint64) error
}
type handlerImpl struct {
sm ChannelSupportRegistrar
}
// NewHandlerImpl constructs a new implementation of the Handler interface
func NewHandlerImpl(sm ChannelSupportRegistrar) Handler {
return &handlerImpl{
sm: sm,
}
}
// Handle starts a service thread for a given gRPC connection and services the broadcast connection
func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
addr := util.ExtractRemoteAddress(srv.Context())
logger.Debugf("Starting new broadcast loop for %s", addr)
for {
msg, err := srv.Recv()
if err == io.EOF {
logger.Debugf("Received EOF from %s, hangup", addr)
return nil
}
if err != nil {
logger.Warningf("Error reading from %s: %s", addr, err)
return err
}
chdr, isConfig, processor, err := bh.sm.BroadcastChannelSupport(msg)
if err != nil {
logger.Warningf("[channel: %s] Could not get message processor for serving %s: %s", chdr.ChannelId, addr, err)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR, Info: err.Error()})
}
if !isConfig {
logger.Debugf("[channel: %s] Broadcast is processing normal message from %s with txid '%s' of type %s", chdr.ChannelId, addr, chdr.TxId, cb.HeaderType_name[chdr.Type])
configSeq, err := processor.ProcessNormalMsg(msg)
if err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of normal message from %s because of error: %s", chdr.ChannelId, addr, err)
return srv.Send(&ab.BroadcastResponse{Status: ClassifyError(err), Info: err.Error()})
}
err = processor.Order(msg, configSeq)
if err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of normal message from %s with SERVICE_UNAVAILABLE: rejected by Order: %s", chdr.ChannelId, addr, err)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()})
}
} else { // isConfig
logger.Debugf("[channel: %s] Broadcast is processing config update message from %s", chdr.ChannelId, addr)
config, configSeq, err := processor.ProcessConfigUpdateMsg(msg)
if err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of config message from %s because of error: %s", chdr.ChannelId, addr, err)
return srv.Send(&ab.BroadcastResponse{Status: ClassifyError(err), Info: err.Error()})
}
err = processor.Configure(config, configSeq)
if err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of config message from %s with SERVICE_UNAVAILABLE: rejected by Configure: %s", chdr.ChannelId, addr, err)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()})
}
}
logger.Debugf("[channel: %s] Broadcast has successfully enqueued message of type %s from %s", chdr.ChannelId, cb.HeaderType_name[chdr.Type], addr)
err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS})
if err != nil {
logger.Warningf("[channel: %s] Error sending to %s: %s", chdr.ChannelId, addr, err)
return err
}
}
}
// ClassifyError converts an error type into a status code.
func ClassifyError(err error) cb.Status {
switch errors.Cause(err) {
case msgprocessor.ErrChannelDoesNotExist:
return cb.Status_NOT_FOUND
case msgprocessor.ErrPermissionDenied:
return cb.Status_FORBIDDEN
default:
return cb.Status_BAD_REQUEST
}
}