Skip to content

Commit

Permalink
included reconciler handler as a generic p2p msg
Browse files Browse the repository at this point in the history
  • Loading branch information
tittuvarghese committed Sep 27, 2024
1 parent a0baf0f commit ad406a4
Show file tree
Hide file tree
Showing 5 changed files with 225 additions and 16 deletions.
59 changes: 59 additions & 0 deletions common/p2pmessage/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package p2pmessage

import (
"github.com/hyperledger/fabric/common/metrics"
)

var (
streamsOpened = metrics.CounterOpts{
Namespace: "p2p_message",
Name: "streams_opened",
Help: "The number of GRPC streams that have been opened for the p2p_message service.",
}
streamsClosed = metrics.CounterOpts{
Namespace: "p2p_message",
Name: "streams_closed",
Help: "The number of GRPC streams that have been closed for the p2p_message service.",
}

requestsReceived = metrics.CounterOpts{
Namespace: "p2p_message",
Name: "requests_received",
Help: "The number of p2p_message requests that have been received.",
LabelNames: []string{"channel", "filtered", "data_type"},
StatsdFormat: "%{#fqname}.%{channel}.%{filtered}.%{data_type}",
}
requestsCompleted = metrics.CounterOpts{
Namespace: "p2p_message",
Name: "requests_completed",
Help: "The number of p2p_message requests that have been completed.",
LabelNames: []string{"channel", "filtered", "data_type", "success"},
StatsdFormat: "%{#fqname}.%{channel}.%{filtered}.%{data_type}.%{success}",
}

blocksReconciled = metrics.CounterOpts{
Namespace: "p2p_message",
Name: "blocks_reconciled",
Help: "The number of blocks sent by the p2p_message service.",
LabelNames: []string{"channel", "filtered", "data_type"},
StatsdFormat: "%{#fqname}.%{channel}.%{filtered}.%{data_type}",
}
)

type Metrics struct {
StreamsOpened metrics.Counter
StreamsClosed metrics.Counter
RequestsReceived metrics.Counter
RequestsCompleted metrics.Counter
BlocksReconciled metrics.Counter
}

func NewMetrics(p metrics.Provider) *Metrics {
return &Metrics{
StreamsOpened: p.NewCounter(streamsOpened),
StreamsClosed: p.NewCounter(streamsClosed),
RequestsReceived: p.NewCounter(requestsReceived),
RequestsCompleted: p.NewCounter(requestsCompleted),
BlocksReconciled: p.NewCounter(blocksReconciled),
}
}
134 changes: 134 additions & 0 deletions common/p2pmessage/p2pmessage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package p2pmessage

import (
"context"
"github.com/hyperledger/fabric/common/crypto"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger/blockledger"
"github.com/hyperledger/fabric/common/policies"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/internal/peer/protos"
"time"
)

var logger = flogging.MustGetLogger("common.p2pmessage")

//go:generate counterfeiter -o mock/chain_manager.go -fake-name ChainManager . ChainManager

// ChainManager provides a way for the Handler to look up the Chain.
type ChainManager interface {
GetChain(chainID string) Chain
}

//go:generate counterfeiter -o mock/chain.go -fake-name Chain . Chain

// Chain encapsulates chain operations and data.
type Chain interface {
// Sequence returns the current config sequence number, can be used to detect config changes
Sequence() uint64

// PolicyManager returns the current policy manager as specified by the chain configuration
PolicyManager() policies.Manager

// Reader returns the chain Reader for the chain
Reader() blockledger.Reader

// Errored returns a channel which closes when the backing consenter has errored
Errored() <-chan struct{}
}

////go:generate counterfeiter -o mock/policy_checker.go -fake-name PolicyChecker . PolicyChecker
//
//// PolicyChecker checks the envelope against the policy logic supplied by the
//// function.
//type PolicyChecker interface {
// CheckPolicy(envelope *cb.Envelope, channelID string) error
//}
//
//// The PolicyCheckerFunc is an adapter that allows the use of an ordinary
//// function as a PolicyChecker.
//type PolicyCheckerFunc func(envelope *cb.Envelope, channelID string) error
//
//// CheckPolicy calls pcf(envelope, channelID)
//func (pcf PolicyCheckerFunc) CheckPolicy(envelope *cb.Envelope, channelID string) error {
// return pcf(envelope, channelID)
//}

////go:generate counterfeiter -o mock/inspector.go -fake-name Inspector . Inspector
//
//// Inspector verifies an appropriate binding between the message and the context.
//type Inspector interface {
// Inspect(context.Context, proto.Message) error
//}
//
//// The InspectorFunc is an adapter that allows the use of an ordinary
//// function as an Inspector.
//type InspectorFunc func(context.Context, proto.Message) error
//
//// Inspect calls inspector(ctx, p)
//func (inspector InspectorFunc) Inspect(ctx context.Context, p proto.Message) error {
// return inspector(ctx, p)
//}

// Handler handles server requests.
type Handler struct {
ExpirationCheckFunc func(identityBytes []byte) time.Time
ChainManager ChainManager
TimeWindow time.Duration
//BindingInspector Inspector
Metrics *Metrics
}

// Server is a polymorphic structure to support generalization of this handler
// to be able to deliver different type of responses.
type Server struct {
Receiver
}

// Receiver is used to receive enveloped seek requests.
type Receiver interface {
SendReconcileRequest()
}

// NewHandler creates an implementation of the Handler interface.
func NewHandler(cm ChainManager, timeWindow time.Duration, mutualTLS bool, metrics *Metrics, expirationCheckDisabled bool) *Handler {
expirationCheck := crypto.ExpiresAt
if expirationCheckDisabled {
expirationCheck = noExpiration
}
return &Handler{
ChainManager: cm,
TimeWindow: timeWindow,
//BindingInspector: InspectorFunc(comm.NewBindingInspector(mutualTLS, ExtractChannelHeaderCertHash)),
Metrics: metrics,
ExpirationCheckFunc: expirationCheck,
}
}

// Handle receives incoming deliver requests.
func (h *Handler) Handle(ctx context.Context, request *protos.ReconcileRequest) (*protos.ReconcileResponse, error) {
addr := util.ExtractRemoteAddress(ctx)
logger.Debugf("Starting new p2p loop for %s", addr)
h.Metrics.StreamsOpened.Add(1)
defer h.Metrics.StreamsClosed.Add(1)

reconcileResponse := &protos.ReconcileResponse{
Success: true,
Message: "I amd sending the response! It's me server",
}

return reconcileResponse, nil
}

// ExtractChannelHeaderCertHash extracts the TLS cert hash from a channel header.
//func ExtractChannelHeaderCertHash(msg proto.Message) []byte {
// chdr, isChannelHeader := msg.(*cb.ChannelHeader)
// if !isChannelHeader || chdr == nil {
// return nil
// }
// return chdr.TlsCertHash
//}

func noExpiration(_ []byte) time.Time {
return time.Time{}
}
18 changes: 8 additions & 10 deletions core/peer/p2pmessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,22 @@ package peer

import (
"context"
"github.com/hyperledger/fabric/common/deliver"
"github.com/hyperledger/fabric/common/p2pmessage"
"github.com/hyperledger/fabric/internal/peer/protos"
)

// P2pMessageServer holds the dependencies necessary to create a deliver server
type P2pMessageServer struct {
DeliverHandler *deliver.Handler
DeliverHandler *p2pmessage.Handler
PolicyCheckerProvider PolicyCheckerProvider
CollectionPolicyChecker CollectionPolicyChecker
IdentityDeserializerMgr IdentityDeserializerManager
}

func (p P2pMessageServer) SendReconcileRequest(ctx context.Context, request *protos.ReconcileRequest) (*protos.ReconcileResponse, error) {
//TODO implement me
logger.Debugf("Received reconcilation request %v", request)
reconcileResponse := &protos.ReconcileResponse{
Success: true,
Message: "I amd sending the response! It's me server",
}
return reconcileResponse, nil
// Deliver sends a stream of blocks to a client after commitment
func (s *P2pMessageServer) SendReconcileRequest(ctx context.Context, request *protos.ReconcileRequest) (*protos.ReconcileResponse, error) {
logger.Debugf("Starting new Deliver handler")
defer dumpStacktraceOnPanic()

return s.DeliverHandler.Handle(ctx, request)
}
16 changes: 16 additions & 0 deletions core/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package peer

import (
"fmt"
"github.com/hyperledger/fabric/common/p2pmessage"
"sync"

"github.com/hyperledger/fabric-protos-go/common"
Expand Down Expand Up @@ -118,6 +119,21 @@ func (p *Peer) updateTrustedRoots(cm channelconfig.Resources) {
}
}

//
// P2P Message service support structs for the peer
//

type P2PMessageChainManager struct {
Peer *Peer
}

func (p P2PMessageChainManager) GetChain(chainID string) p2pmessage.Chain {
if channel := p.Peer.Channel(chainID); channel != nil {
return channel
}
return nil
}

//
// Deliver service support structs for the peer
//
Expand Down
14 changes: 8 additions & 6 deletions internal/peer/node/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package node
import (
"context"
"fmt"
"github.com/hyperledger/fabric/common/p2pmessage"
"github.com/hyperledger/fabric/internal/peer/protos"
"io"
"io/ioutil"
Expand Down Expand Up @@ -491,26 +492,27 @@ func serve(args []string) error {
}
}

metrics := deliver.NewMetrics(metricsProvider)
deliverMetrics := deliver.NewMetrics(metricsProvider)
abServer := &peer.DeliverServer{
DeliverHandler: deliver.NewHandler(
&peer.DeliverChainManager{Peer: peerInstance},
coreConfig.AuthenticationTimeWindow,
mutualTLS,
metrics,
deliverMetrics,
false,
),
PolicyCheckerProvider: policyCheckerProvider,
}
pb.RegisterDeliverServer(peerServer.Server(), abServer)

// Custom
// Block & Txn Reconcile Implementation
p2pMessageMetrics := p2pmessage.NewMetrics(metricsProvider)
p2pMessageServer := &peer.P2pMessageServer{
DeliverHandler: deliver.NewHandler(
&peer.DeliverChainManager{Peer: peerInstance},
DeliverHandler: p2pmessage.NewHandler(
&peer.P2PMessageChainManager{Peer: peerInstance},
coreConfig.AuthenticationTimeWindow,
mutualTLS,
metrics,
p2pMessageMetrics,
false,
),
PolicyCheckerProvider: policyCheckerProvider,
Expand Down

0 comments on commit ad406a4

Please sign in to comment.