diff --git a/common/p2pmessage/metrics.go b/common/p2pmessage/metrics.go new file mode 100644 index 00000000000..cd8452edb4e --- /dev/null +++ b/common/p2pmessage/metrics.go @@ -0,0 +1,59 @@ +package p2pmessage + +import ( + "github.com/hyperledger/fabric/common/metrics" +) + +var ( + streamsOpened = metrics.CounterOpts{ + Namespace: "p2p_message", + Name: "streams_opened", + Help: "The number of GRPC streams that have been opened for the p2p_message service.", + } + streamsClosed = metrics.CounterOpts{ + Namespace: "p2p_message", + Name: "streams_closed", + Help: "The number of GRPC streams that have been closed for the p2p_message service.", + } + + requestsReceived = metrics.CounterOpts{ + Namespace: "p2p_message", + Name: "requests_received", + Help: "The number of p2p_message requests that have been received.", + LabelNames: []string{"channel", "filtered", "data_type"}, + StatsdFormat: "%{#fqname}.%{channel}.%{filtered}.%{data_type}", + } + requestsCompleted = metrics.CounterOpts{ + Namespace: "p2p_message", + Name: "requests_completed", + Help: "The number of p2p_message requests that have been completed.", + LabelNames: []string{"channel", "filtered", "data_type", "success"}, + StatsdFormat: "%{#fqname}.%{channel}.%{filtered}.%{data_type}.%{success}", + } + + blocksReconciled = metrics.CounterOpts{ + Namespace: "p2p_message", + Name: "blocks_reconciled", + Help: "The number of blocks sent by the p2p_message service.", + LabelNames: []string{"channel", "filtered", "data_type"}, + StatsdFormat: "%{#fqname}.%{channel}.%{filtered}.%{data_type}", + } +) + +type Metrics struct { + StreamsOpened metrics.Counter + StreamsClosed metrics.Counter + RequestsReceived metrics.Counter + RequestsCompleted metrics.Counter + BlocksReconciled metrics.Counter +} + +func NewMetrics(p metrics.Provider) *Metrics { + return &Metrics{ + StreamsOpened: p.NewCounter(streamsOpened), + StreamsClosed: p.NewCounter(streamsClosed), + RequestsReceived: p.NewCounter(requestsReceived), + RequestsCompleted: p.NewCounter(requestsCompleted), + BlocksReconciled: p.NewCounter(blocksReconciled), + } +} diff --git a/common/p2pmessage/p2pmessage.go b/common/p2pmessage/p2pmessage.go new file mode 100644 index 00000000000..435a7e00831 --- /dev/null +++ b/common/p2pmessage/p2pmessage.go @@ -0,0 +1,134 @@ +package p2pmessage + +import ( + "context" + "github.com/hyperledger/fabric/common/crypto" + "github.com/hyperledger/fabric/common/flogging" + "github.com/hyperledger/fabric/common/ledger/blockledger" + "github.com/hyperledger/fabric/common/policies" + "github.com/hyperledger/fabric/common/util" + "github.com/hyperledger/fabric/internal/peer/protos" + "time" +) + +var logger = flogging.MustGetLogger("common.p2pmessage") + +//go:generate counterfeiter -o mock/chain_manager.go -fake-name ChainManager . ChainManager + +// ChainManager provides a way for the Handler to look up the Chain. +type ChainManager interface { + GetChain(chainID string) Chain +} + +//go:generate counterfeiter -o mock/chain.go -fake-name Chain . Chain + +// Chain encapsulates chain operations and data. +type Chain interface { + // Sequence returns the current config sequence number, can be used to detect config changes + Sequence() uint64 + + // PolicyManager returns the current policy manager as specified by the chain configuration + PolicyManager() policies.Manager + + // Reader returns the chain Reader for the chain + Reader() blockledger.Reader + + // Errored returns a channel which closes when the backing consenter has errored + Errored() <-chan struct{} +} + +////go:generate counterfeiter -o mock/policy_checker.go -fake-name PolicyChecker . PolicyChecker +// +//// PolicyChecker checks the envelope against the policy logic supplied by the +//// function. +//type PolicyChecker interface { +// CheckPolicy(envelope *cb.Envelope, channelID string) error +//} +// +//// The PolicyCheckerFunc is an adapter that allows the use of an ordinary +//// function as a PolicyChecker. +//type PolicyCheckerFunc func(envelope *cb.Envelope, channelID string) error +// +//// CheckPolicy calls pcf(envelope, channelID) +//func (pcf PolicyCheckerFunc) CheckPolicy(envelope *cb.Envelope, channelID string) error { +// return pcf(envelope, channelID) +//} + +////go:generate counterfeiter -o mock/inspector.go -fake-name Inspector . Inspector +// +//// Inspector verifies an appropriate binding between the message and the context. +//type Inspector interface { +// Inspect(context.Context, proto.Message) error +//} +// +//// The InspectorFunc is an adapter that allows the use of an ordinary +//// function as an Inspector. +//type InspectorFunc func(context.Context, proto.Message) error +// +//// Inspect calls inspector(ctx, p) +//func (inspector InspectorFunc) Inspect(ctx context.Context, p proto.Message) error { +// return inspector(ctx, p) +//} + +// Handler handles server requests. +type Handler struct { + ExpirationCheckFunc func(identityBytes []byte) time.Time + ChainManager ChainManager + TimeWindow time.Duration + //BindingInspector Inspector + Metrics *Metrics +} + +// Server is a polymorphic structure to support generalization of this handler +// to be able to deliver different type of responses. +type Server struct { + Receiver +} + +// Receiver is used to receive enveloped seek requests. +type Receiver interface { + SendReconcileRequest() +} + +// NewHandler creates an implementation of the Handler interface. +func NewHandler(cm ChainManager, timeWindow time.Duration, mutualTLS bool, metrics *Metrics, expirationCheckDisabled bool) *Handler { + expirationCheck := crypto.ExpiresAt + if expirationCheckDisabled { + expirationCheck = noExpiration + } + return &Handler{ + ChainManager: cm, + TimeWindow: timeWindow, + //BindingInspector: InspectorFunc(comm.NewBindingInspector(mutualTLS, ExtractChannelHeaderCertHash)), + Metrics: metrics, + ExpirationCheckFunc: expirationCheck, + } +} + +// Handle receives incoming deliver requests. +func (h *Handler) Handle(ctx context.Context, request *protos.ReconcileRequest) (*protos.ReconcileResponse, error) { + addr := util.ExtractRemoteAddress(ctx) + logger.Debugf("Starting new p2p loop for %s", addr) + h.Metrics.StreamsOpened.Add(1) + defer h.Metrics.StreamsClosed.Add(1) + + reconcileResponse := &protos.ReconcileResponse{ + Success: true, + Message: "I amd sending the response! It's me server", + } + + return reconcileResponse, nil +} + +// ExtractChannelHeaderCertHash extracts the TLS cert hash from a channel header. +//func ExtractChannelHeaderCertHash(msg proto.Message) []byte { +// chdr, isChannelHeader := msg.(*cb.ChannelHeader) +// if !isChannelHeader || chdr == nil { +// return nil +// } +// return chdr.TlsCertHash +//} + +func noExpiration(_ []byte) time.Time { + return time.Time{} +} diff --git a/core/peer/p2pmessage.go b/core/peer/p2pmessage.go index 386f66bbcbc..6d23026b08c 100644 --- a/core/peer/p2pmessage.go +++ b/core/peer/p2pmessage.go @@ -2,24 +2,22 @@ package peer import ( "context" - "github.com/hyperledger/fabric/common/deliver" + "github.com/hyperledger/fabric/common/p2pmessage" "github.com/hyperledger/fabric/internal/peer/protos" ) // P2pMessageServer holds the dependencies necessary to create a deliver server type P2pMessageServer struct { - DeliverHandler *deliver.Handler + DeliverHandler *p2pmessage.Handler PolicyCheckerProvider PolicyCheckerProvider CollectionPolicyChecker CollectionPolicyChecker IdentityDeserializerMgr IdentityDeserializerManager } -func (p P2pMessageServer) SendReconcileRequest(ctx context.Context, request *protos.ReconcileRequest) (*protos.ReconcileResponse, error) { - //TODO implement me - logger.Debugf("Received reconcilation request %v", request) - reconcileResponse := &protos.ReconcileResponse{ - Success: true, - Message: "I amd sending the response! It's me server", - } - return reconcileResponse, nil +// Deliver sends a stream of blocks to a client after commitment +func (s *P2pMessageServer) SendReconcileRequest(ctx context.Context, request *protos.ReconcileRequest) (*protos.ReconcileResponse, error) { + logger.Debugf("Starting new Deliver handler") + defer dumpStacktraceOnPanic() + + return s.DeliverHandler.Handle(ctx, request) } diff --git a/core/peer/peer.go b/core/peer/peer.go index 64f1c4493d1..48c9e7afce2 100644 --- a/core/peer/peer.go +++ b/core/peer/peer.go @@ -8,6 +8,7 @@ package peer import ( "fmt" + "github.com/hyperledger/fabric/common/p2pmessage" "sync" "github.com/hyperledger/fabric-protos-go/common" @@ -118,6 +119,21 @@ func (p *Peer) updateTrustedRoots(cm channelconfig.Resources) { } } +// +// P2P Message service support structs for the peer +// + +type P2PMessageChainManager struct { + Peer *Peer +} + +func (p P2PMessageChainManager) GetChain(chainID string) p2pmessage.Chain { + if channel := p.Peer.Channel(chainID); channel != nil { + return channel + } + return nil +} + // // Deliver service support structs for the peer // diff --git a/internal/peer/node/start.go b/internal/peer/node/start.go index bf07fbba9e9..71a0bf58422 100644 --- a/internal/peer/node/start.go +++ b/internal/peer/node/start.go @@ -9,6 +9,7 @@ package node import ( "context" "fmt" + "github.com/hyperledger/fabric/common/p2pmessage" "github.com/hyperledger/fabric/internal/peer/protos" "io" "io/ioutil" @@ -491,26 +492,27 @@ func serve(args []string) error { } } - metrics := deliver.NewMetrics(metricsProvider) + deliverMetrics := deliver.NewMetrics(metricsProvider) abServer := &peer.DeliverServer{ DeliverHandler: deliver.NewHandler( &peer.DeliverChainManager{Peer: peerInstance}, coreConfig.AuthenticationTimeWindow, mutualTLS, - metrics, + deliverMetrics, false, ), PolicyCheckerProvider: policyCheckerProvider, } pb.RegisterDeliverServer(peerServer.Server(), abServer) - // Custom + // Block & Txn Reconcile Implementation + p2pMessageMetrics := p2pmessage.NewMetrics(metricsProvider) p2pMessageServer := &peer.P2pMessageServer{ - DeliverHandler: deliver.NewHandler( - &peer.DeliverChainManager{Peer: peerInstance}, + DeliverHandler: p2pmessage.NewHandler( + &peer.P2PMessageChainManager{Peer: peerInstance}, coreConfig.AuthenticationTimeWindow, mutualTLS, - metrics, + p2pMessageMetrics, false, ), PolicyCheckerProvider: policyCheckerProvider,