diff --git a/pkg/client/channel/api.go b/pkg/client/channel/api.go index 59f3173fdc..205b0498ad 100644 --- a/pkg/client/channel/api.go +++ b/pkg/client/channel/api.go @@ -14,21 +14,6 @@ import ( pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer" ) -// CCEvent contains the data for a chaincocde event -// Deprecated since EventHub is replaced with EventService -type CCEvent struct { - TxID string - ChaincodeID string - EventName string - Payload []byte -} - -// Registration is a handle that is returned from a successful Register Chaincode Event. -// This handle should be used in Unregister in order to unregister the event. -// Deprecated since EventHub is replaced with EventService -type Registration interface { -} - // opts allows the user to specify more advanced options type opts struct { Targets []fab.Peer // targets diff --git a/pkg/context/api/fab/event.go b/pkg/context/api/fab/event.go deleted file mode 100644 index f4f7408351..0000000000 --- a/pkg/context/api/fab/event.go +++ /dev/null @@ -1,67 +0,0 @@ -/* -Copyright SecureKey Technologies Inc. All Rights Reserved. - -SPDX-License-Identifier: Apache-2.0 -*/ - -package fab - -import ( - "crypto/x509" - - common "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/common" - ehpb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer" - pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer" -) - -// EventHub ... -type EventHub interface { - SetPeerAddr(peerURL string, certificate *x509.Certificate, serverHostOverride string, allowInsecure bool) - IsConnected() bool - Connect() error - Disconnect() error - RegisterChaincodeEvent(ccid string, eventname string, callback func(*ChaincodeEvent)) *ChainCodeCBE - UnregisterChaincodeEvent(cbe *ChainCodeCBE) - RegisterTxEvent(txnID TransactionID, callback func(TransactionID, pb.TxValidationCode, error)) - UnregisterTxEvent(txnID TransactionID) - RegisterBlockEvent(callback func(*common.Block)) - UnregisterBlockEvent(callback func(*common.Block)) -} - -//EventsClient holds the stream and adapter for consumer to work with -type EventsClient interface { - RegisterAsync(ies []*ehpb.Interest) error - UnregisterAsync(ies []*ehpb.Interest) error - Unregister(ies []*ehpb.Interest) error - Recv() (*ehpb.Event, error) - Start() error - Stop() error -} - -// The EventHubExt interface allows extensions of the SDK to add functionality to EventHub overloads. -type EventHubExt interface { - SetInterests(block bool) -} - -// ChainCodeCBE ... -/** - * The ChainCodeCBE is used internal to the EventHub to hold chaincode - * event registration callbacks. - */ -type ChainCodeCBE struct { - // chaincode id - CCID string - // event name regex filter - EventNameFilter string - // callback function to invoke on successful filter match - CallbackFunc func(*ChaincodeEvent) -} - -// ChaincodeEvent contains the current event data for the event handler -type ChaincodeEvent struct { - ChaincodeID string - TxID string - EventName string - Payload []byte - ChannelID string -} diff --git a/pkg/fab/events/consumer/consumer.go b/pkg/fab/events/consumer/consumer.go deleted file mode 100644 index a134a43296..0000000000 --- a/pkg/fab/events/consumer/consumer.go +++ /dev/null @@ -1,381 +0,0 @@ -/* -Copyright IBM Corp, SecureKey Technologies Inc. All Rights Reserved. - -SPDX-License-Identifier: Apache-2.0 -*/ - -package consumer - -import ( - grpcContext "context" - "crypto/x509" - "io" - "sync" - "time" - - "github.com/golang/protobuf/proto" - "github.com/golang/protobuf/ptypes" - "github.com/pkg/errors" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/keepalive" - - consumer "github.com/hyperledger/fabric-sdk-go/internal/github.com/hyperledger/fabric/events/consumer" - "github.com/hyperledger/fabric-sdk-go/pkg/common/context" - "github.com/hyperledger/fabric-sdk-go/pkg/context/api/core" - "github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab" - "github.com/hyperledger/fabric-sdk-go/pkg/core/config/comm" - ccomm "github.com/hyperledger/fabric-sdk-go/pkg/core/config/comm" - "github.com/hyperledger/fabric-sdk-go/pkg/core/config/endpoint" - "github.com/hyperledger/fabric-sdk-go/pkg/logging" - "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer" - ehpb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer" -) - -var logger = logging.NewLogger("fabsdk/fab") - -const defaultTimeout = time.Second * 3 - -type eventsClient struct { - sync.RWMutex - peerAddress string - regTimeout time.Duration - stream ehpb.Events_ChatClient - adapter consumer.EventAdapter - TLSCertificate *x509.Certificate - TLSServerHostOverride string - tlsCertHash []byte - clientConn *grpc.ClientConn - provider core.Providers - identity context.Identity - processEventsCompleted chan struct{} - kap keepalive.ClientParameters - failFast bool - allowInsecure bool -} - -//NewEventsClient Returns a new grpc.ClientConn to the configured local PEER. -func NewEventsClient(provider core.Providers, identity context.Identity, peerAddress string, certificate *x509.Certificate, - serverhostoverride string, regTimeout time.Duration, adapter consumer.EventAdapter, - kap keepalive.ClientParameters, failFast bool, allowInsecure bool) (fab.EventsClient, error) { - - var err error - if regTimeout < 100*time.Millisecond { - regTimeout = 100 * time.Millisecond - err = errors.New("regTimeout >= 0, setting to 100 msec") - } else if regTimeout > 60*time.Second { - regTimeout = 60 * time.Second - err = errors.New("regTimeout > 60, setting to 60 sec") - } - - return &eventsClient{ - RWMutex: sync.RWMutex{}, - peerAddress: peerAddress, - regTimeout: regTimeout, - adapter: adapter, - TLSCertificate: certificate, - TLSServerHostOverride: serverhostoverride, - provider: provider, - identity: identity, - tlsCertHash: ccomm.TLSCertHash(provider.Config()), - kap: kap, - failFast: failFast, - allowInsecure: allowInsecure, - }, err -} - -//newEventsClientConnectionWithAddress Returns a new grpc.ClientConn to the configured local PEER. -func newEventsClientConnectionWithAddress(peerAddress string, cert *x509.Certificate, serverHostOverride string, - config core.Config, kap keepalive.ClientParameters, failFast bool, allowInSecure bool) (*grpc.ClientConn, error) { - var opts []grpc.DialOption - opts = append(opts, grpc.WithTimeout(config.TimeoutOrDefault(core.EventHubConnection))) - if endpoint.AttemptSecured(peerAddress, allowInSecure) { - tlsConfig, err := comm.TLSConfig(cert, serverHostOverride, config) - if err != nil { - return nil, err - } - - opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))) - } else { - opts = append(opts, grpc.WithInsecure()) - } - - if kap.Time > 0 { - opts = append(opts, grpc.WithKeepaliveParams(kap)) - } - opts = append(opts, grpc.WithDefaultCallOptions(grpc.FailFast(failFast))) - - ctx := grpcContext.Background() - ctx, cancel := grpcContext.WithTimeout(ctx, config.TimeoutOrDefault(core.EventHubConnection)) - defer cancel() - - conn, err := grpc.DialContext(ctx, endpoint.ToAddress(peerAddress), opts...) - if err != nil { - return nil, err - } - - return conn, err -} - -func (ec *eventsClient) send(emsg *ehpb.Event) error { - ec.Lock() - defer ec.Unlock() - - user := ec.identity - payload, err := proto.Marshal(emsg) - if err != nil { - return errors.Wrap(err, "marshal event failed") - } - - signingMgr := ec.provider.SigningManager() - if signingMgr == nil { - return errors.New("signing manager is nil") - } - - signature, err := signingMgr.Sign(payload, user.PrivateKey()) - if err != nil { - return errors.WithMessage(err, "sign failed") - } - signedEvt := &peer.SignedEvent{EventBytes: payload, Signature: signature} - - return ec.stream.Send(signedEvt) -} - -// RegisterAsync - registers interest in a event and doesn't wait for a response -func (ec *eventsClient) RegisterAsync(ies []*ehpb.Interest) error { - if ec.identity == nil { - return errors.New("identity context is nil") - } - creator, err := ec.identity.SerializedIdentity() - if err != nil { - return errors.WithMessage(err, "identity context identity retrieval failed") - } - - ts, err := ptypes.TimestampProto(time.Now()) - if err != nil { - return errors.Wrap(err, "failed to create timestamp") - } - emsg := &ehpb.Event{ - Event: &ehpb.Event_Register{Register: &ehpb.Register{Events: ies}}, - Creator: creator, - TlsCertHash: ec.tlsCertHash, - Timestamp: ts, - } - if err = ec.send(emsg); err != nil { - logger.Errorf("error on Register send %s\n", err) - } - return err -} - -// register - registers interest in a event -func (ec *eventsClient) register(ies []*ehpb.Interest) error { - var err error - if err = ec.RegisterAsync(ies); err != nil { - return err - } - - regChan := make(chan struct{}) - go func() { - defer close(regChan) - in, inerr := ec.stream.Recv() - if inerr != nil { - err = inerr - return - } - switch in.Event.(type) { - case *ehpb.Event_Register: - case nil: - err = errors.New("nil object for register") - default: - err = errors.New("invalid object for register") - } - }() - select { - case <-regChan: - case <-time.After(ec.regTimeout): - err = errors.New("register timeout") - } - return err -} - -// UnregisterAsync - Unregisters interest in a event and doesn't wait for a response -func (ec *eventsClient) UnregisterAsync(ies []*ehpb.Interest) error { - if ec.identity == nil { - return errors.New("identity context is required") - } - creator, err := ec.identity.SerializedIdentity() - if err != nil { - return errors.WithMessage(err, "user context identity retrieval failed") - } - - ts, err := ptypes.TimestampProto(time.Now()) - if err != nil { - return errors.Wrap(err, "failed to create timestamp") - } - emsg := &ehpb.Event{ - Event: &ehpb.Event_Unregister{Unregister: &ehpb.Unregister{Events: ies}}, - Creator: creator, - TlsCertHash: ec.tlsCertHash, - Timestamp: ts, - } - - if err = ec.send(emsg); err != nil { - err = errors.Wrap(err, "unregister send failed") - } - - return err -} - -// unregister - unregisters interest in a event -func (ec *eventsClient) Unregister(ies []*ehpb.Interest) error { - var err error - if err = ec.UnregisterAsync(ies); err != nil { - return err - } - - regChan := make(chan struct{}) - go func() { - defer close(regChan) - in, inerr := ec.stream.Recv() - if inerr != nil { - err = inerr - return - } - switch in.Event.(type) { - case *ehpb.Event_Unregister: - case nil: - err = errors.New("nil object for unregister") - default: - err = errors.New("invalid object for unregister") - } - }() - select { - case <-regChan: - case <-time.After(ec.regTimeout): - err = errors.New("unregister timeout") - } - return err -} - -// Recv receives next event - use when client has not called Start -func (ec *eventsClient) Recv() (*ehpb.Event, error) { - in, err := ec.stream.Recv() - if err == io.EOF { - // read done - if ec.adapter != nil { - ec.adapter.Disconnected(nil) - } - return nil, err - } - if err != nil { - if ec.adapter != nil { - ec.adapter.Disconnected(err) - } - return nil, err - } - return in, nil -} -func (ec *eventsClient) processEvents() error { - defer ec.stream.CloseSend() - defer close(ec.processEventsCompleted) - - for { - in, err := ec.stream.Recv() - if err == io.EOF { - // read done. - if ec.adapter != nil { - ec.adapter.Disconnected(nil) - } - return nil - } - if err != nil { - if ec.adapter != nil { - ec.adapter.Disconnected(err) - } - return err - } - if ec.adapter != nil { - cont, err := ec.adapter.Recv(in) - if !cont { - return err - } - } - } -} - -//Start establishes connection with Event hub and registers interested events with it -func (ec *eventsClient) Start() error { - return ec.establishConnectionAndRegister() -} - -func (ec *eventsClient) establishConnectionAndRegister() error { - conn, err := newEventsClientConnectionWithAddress(ec.peerAddress, ec.TLSCertificate, ec.TLSServerHostOverride, - ec.provider.Config(), ec.kap, ec.failFast, ec.allowInsecure) - - if err != nil { - return errors.WithMessage(err, "events connection failed") - } - ec.clientConn = conn - - ies, err := ec.adapter.GetInterestedEvents() - if err != nil { - return errors.Wrap(err, "interested events retrieval failed") - } - - if len(ies) == 0 { - return errors.New("interested events is required") - } - - serverClient := ehpb.NewEventsClient(conn) - ec.stream, err = serverClient.Chat(grpcContext.Background()) - if err != nil { - return errors.Wrap(err, "events connection failed") - } - - if err = ec.register(ies); err != nil { - return err - } - - ec.processEventsCompleted = make(chan struct{}) - go ec.processEvents() - - return nil -} - -//Stop terminates connection with event hub -func (ec *eventsClient) Stop() error { - var timeoutErr error - - if ec.stream == nil { - // in case the stream/chat server has not been established earlier, we assume that it's closed, successfully - return nil - } - //this closes only sending direction of the stream; event is still there - //read will not return an error - err := ec.stream.CloseSend() - if err != nil { - return err - } - - select { - // Server ended its send stream in response to CloseSend() - case <-ec.processEventsCompleted: - // Timeout waiting for server to end stream - case <-time.After(ec.provider.Config().TimeoutOrDefault(core.EventHubConnection)): - timeoutErr = errors.New("close event stream timeout") - } - - //close client connection - if ec.clientConn != nil { - err := ec.clientConn.Close() - if err != nil { - return err - } - } - - if timeoutErr != nil { - return timeoutErr - } - - return nil -} diff --git a/pkg/fab/events/eventhub.go b/pkg/fab/events/eventhub.go deleted file mode 100755 index dde47e46f1..0000000000 --- a/pkg/fab/events/eventhub.go +++ /dev/null @@ -1,621 +0,0 @@ -/* -Copyright SecureKey Technologies Inc. All Rights Reserved. - -SPDX-License-Identifier: Apache-2.0 -*/ - -package events - -import ( - "reflect" - "regexp" - "sync" - "time" - - "github.com/golang/protobuf/proto" - "github.com/spf13/cast" - "google.golang.org/grpc/keepalive" - - common "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/common" - pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer" - - "crypto/x509" - - cnsmr "github.com/hyperledger/fabric-sdk-go/internal/github.com/hyperledger/fabric/events/consumer" - "github.com/hyperledger/fabric-sdk-go/pkg/common/context" - "github.com/hyperledger/fabric-sdk-go/pkg/context/api/core" - "github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab" - "github.com/hyperledger/fabric-sdk-go/pkg/core/config/endpoint" - "github.com/hyperledger/fabric-sdk-go/pkg/errors/status" - consumer "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/consumer" - "github.com/hyperledger/fabric-sdk-go/pkg/logging" - "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/core/ledger/util" - "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/utils" - "github.com/pkg/errors" -) - -var logger = logging.NewLogger("fabsdk/fab") - -// EventHub allows a client to listen to event at a peer. -type EventHub struct { - //Used for protecting parts of code from running concurrently - mtx sync.RWMutex - // Map of clients registered for chaincode events - chaincodeRegistrants sync.Map - // Array of clients registered for block events - blockRegistrants []func(*common.Block) - // Map of clients registered for transactional events - txRegistrants sync.Map - // peer addr to connect to - peerAddr string - // peer tls certificate - peerTLSCertificate *x509.Certificate - // peer tls server host override - peerTLSServerHostOverride string - // grpc event client interface - grpcClient fab.EventsClient - // fabric connection state of this eventhub - connected bool - // List of events client is interested in - interestedEvents []*pb.Interest - // Factory that creates EventsClient - eventsClientFactory eventClientFactory - // FabricClient - provider core.Providers - identity context.Identity - kap keepalive.ClientParameters - failFast bool - allowInsecure bool -} - -// eventClientFactory creates an EventsClient instance -type eventClientFactory interface { - newEventsClient(provider core.Providers, identity context.Identity, peerAddress string, certificate *x509.Certificate, serverHostOverride string, regTimeout time.Duration, adapter cnsmr.EventAdapter, kap keepalive.ClientParameters, failFast bool, allowInsecure bool) (fab.EventsClient, error) -} - -// consumerClientFactory is the default implementation oif the eventClientFactory -type consumerClientFactory struct{} - -func (ccf *consumerClientFactory) newEventsClient(provider core.Providers, identity context.Identity, peerAddress string, certificate *x509.Certificate, serverHostOverride string, - regTimeout time.Duration, adapter cnsmr.EventAdapter, kap keepalive.ClientParameters, failFast bool, allowInsecure bool) (fab.EventsClient, error) { - return consumer.NewEventsClient(provider, identity, peerAddress, certificate, serverHostOverride, regTimeout, adapter, kap, failFast, allowInsecure) -} - -// Context holds the providers and services needed to create an EventHub. -type Context struct { - core.Providers - context.Identity -} - -// New creates an EventHub from context. -func New(ctx Context) (*EventHub, error) { - - eventHub := EventHub{ - blockRegistrants: nil, - interestedEvents: nil, - eventsClientFactory: &consumerClientFactory{}, - provider: ctx.Providers, - identity: ctx.Identity, - } - // register default transaction callback - eventHub.RegisterBlockEvent(eventHub.txCallback) - return &eventHub, nil -} - -// FromConfig creates new event hub from context and peer config. -func FromConfig(ctx Context, peerCfg *core.PeerConfig) (*EventHub, error) { - - eventHub, err := New(ctx) - if err != nil { - return nil, err - } - - serverHostOverride := "" - if str, ok := peerCfg.GRPCOptions["ssl-target-name-override"].(string); ok { - serverHostOverride = str - } - - eventHub.peerAddr = peerCfg.EventURL - - if endpoint.IsTLSEnabled(eventHub.peerAddr) { - eventHub.peerTLSCertificate, err = peerCfg.TLSCACerts.TLSCert() - - if err != nil { - return nil, err - } - } - - eventHub.peerTLSServerHostOverride = serverHostOverride - eventHub.kap = getKeepAliveOptions(peerCfg) - eventHub.failFast = getFailFast(peerCfg) - eventHub.allowInsecure = isInsecureConnectionAllowed(peerCfg) - - return eventHub, nil -} - -func getFailFast(peerCfg *core.PeerConfig) bool { - var failFast = true //the default - if ff, ok := peerCfg.GRPCOptions["fail-fast"].(bool); ok { - failFast = cast.ToBool(ff) - } - - return failFast -} - -func getKeepAliveOptions(peerCfg *core.PeerConfig) keepalive.ClientParameters { - - var kap keepalive.ClientParameters - if kaTime, ok := peerCfg.GRPCOptions["keep-alive-time"]; ok { - kap.Time = cast.ToDuration(kaTime) - } - if kaTimeout, ok := peerCfg.GRPCOptions["keep-alive-timeout"]; ok { - kap.Timeout = cast.ToDuration(kaTimeout) - } - if kaPermit, ok := peerCfg.GRPCOptions["keep-alive-permit"]; ok { - kap.PermitWithoutStream = cast.ToBool(kaPermit) - } - return kap -} - -// SetInterests clears all interests and sets the interests for BLOCK type of events. -func (eventHub *EventHub) SetInterests(block bool) { - eventHub.mtx.Lock() - defer eventHub.mtx.Unlock() - - eventHub.interestedEvents = make([]*pb.Interest, 0) - eventHub.blockRegistrants = make([]func(*common.Block), 0) - - if block { - eventHub.blockRegistrants = append(eventHub.blockRegistrants, eventHub.txCallback) - eventHub.interestedEvents = append(eventHub.interestedEvents, &pb.Interest{EventType: pb.EventType_BLOCK}) - } -} - -// Disconnect disconnects from peer event source -func (eventHub *EventHub) Disconnect() error { - eventHub.mtx.Lock() - defer eventHub.mtx.Unlock() - - if !eventHub.connected { - return nil - } - - // Unregister interests with server and stop the stream - err := eventHub.grpcClient.UnregisterAsync(eventHub.interestedEvents) - if err != nil { - logger.Warnf("eventhub failed to unregister interests with server: %v", err) - // continue to attempt to stop stream and close connection - } - err = eventHub.grpcClient.Stop() - if err != nil { - return errors.WithMessage(err, "event client stop failed") - } - - eventHub.connected = false - return nil -} - -// RegisterBlockEvent - register callback function for block events -func (eventHub *EventHub) RegisterBlockEvent(callback func(*common.Block)) { - eventHub.mtx.Lock() - defer eventHub.mtx.Unlock() - - eventHub.blockRegistrants = append(eventHub.blockRegistrants, callback) - - // Register interest for blocks (only declare interest once, so do this for the first registrant) - if len(eventHub.blockRegistrants) == 1 { - eventHub.interestedEvents = append(eventHub.interestedEvents, &pb.Interest{EventType: pb.EventType_BLOCK}) - } -} - -// UnregisterBlockEvent unregister callback for block event -func (eventHub *EventHub) UnregisterBlockEvent(callback func(*common.Block)) { - eventHub.mtx.Lock() - defer eventHub.mtx.Unlock() - - f1 := reflect.ValueOf(callback) - - for i := range eventHub.blockRegistrants { - f2 := reflect.ValueOf(eventHub.blockRegistrants[i]) - if f1.Pointer() == f2.Pointer() { - eventHub.blockRegistrants = append(eventHub.blockRegistrants[:i], eventHub.blockRegistrants[i+1:]...) - break - } - } - - // Unregister interest for blocks if there are no more registrants - if len(eventHub.blockRegistrants) < 1 { - blockEventInterest := pb.Interest{EventType: pb.EventType_BLOCK} - eventHub.grpcClient.UnregisterAsync([]*pb.Interest{&blockEventInterest}) - for i, v := range eventHub.interestedEvents { - if *v == blockEventInterest { - eventHub.interestedEvents = append(eventHub.interestedEvents[:i], eventHub.interestedEvents[i+1:]...) - } - } - } -} - -// addChaincodeInterest adds interest for specific CHAINCODE events. -func (eventHub *EventHub) addChaincodeInterest(ChaincodeID string, EventName string) { - ccInterest := &pb.Interest{ - EventType: pb.EventType_CHAINCODE, - RegInfo: &pb.Interest_ChaincodeRegInfo{ - ChaincodeRegInfo: &pb.ChaincodeReg{ - ChaincodeId: ChaincodeID, - EventName: EventName, - }, - }, - } - - eventHub.interestedEvents = append(eventHub.interestedEvents, ccInterest) - - if eventHub.IsConnected() { - eventHub.grpcClient.RegisterAsync([]*pb.Interest{ccInterest}) - } - -} - -// removeChaincodeInterest remove interest for specific CHAINCODE event -func (eventHub *EventHub) removeChaincodeInterest(ChaincodeID string, EventName string) { - ccInterest := &pb.Interest{ - EventType: pb.EventType_CHAINCODE, - RegInfo: &pb.Interest_ChaincodeRegInfo{ - ChaincodeRegInfo: &pb.ChaincodeReg{ - ChaincodeId: ChaincodeID, - EventName: EventName, - }, - }, - } - - for i, v := range eventHub.interestedEvents { - if v.EventType == ccInterest.EventType && *(v.GetChaincodeRegInfo()) == *(ccInterest.GetChaincodeRegInfo()) { - eventHub.interestedEvents = append(eventHub.interestedEvents[:i], eventHub.interestedEvents[i+1:]...) - } - } - - if eventHub.IsConnected() { - eventHub.grpcClient.UnregisterAsync([]*pb.Interest{ccInterest}) - } - -} - -// SetPeerAddr set peer url for event source -// peeraddr peer url -// peerTLSCertificate peer tls certificate -// peerTLSServerHostOverride tls serverhostoverride -// inSecure option enables grpc retry when grpcs fails (only when no protocol provided in peerURL) -func (eventHub *EventHub) SetPeerAddr(peerURL string, peerTLSCertificate *x509.Certificate, peerTLSServerHostOverride string, allowInsecure bool) { - eventHub.peerAddr = peerURL - eventHub.peerTLSCertificate = peerTLSCertificate - eventHub.peerTLSServerHostOverride = peerTLSServerHostOverride - eventHub.allowInsecure = allowInsecure -} - -// IsConnected gets connected state of eventhub -// Returns true if connected to event source, false otherwise -func (eventHub *EventHub) IsConnected() bool { - return eventHub.connected -} - -// Connect establishes connection with peer event source -func (eventHub *EventHub) Connect() error { - - eventHub.mtx.Lock() - defer eventHub.mtx.Unlock() - - if eventHub.connected { - logger.Debugf("Nothing to do - EventHub already connected") - return nil - } - - if eventHub.peerAddr == "" { - return errors.New("peerAddr is required") - } - - if eventHub.interestedEvents == nil || len(eventHub.interestedEvents) == 0 { - return errors.New("at least one event must be registered") - } - - if eventHub.grpcClient == nil { - eventsClient, _ := eventHub.eventsClientFactory.newEventsClient(eventHub.provider, eventHub.identity, - eventHub.peerAddr, eventHub.peerTLSCertificate, eventHub.peerTLSServerHostOverride, - eventHub.provider.Config().TimeoutOrDefault(core.EventReg), eventHub, eventHub.kap, eventHub.failFast, eventHub.allowInsecure) - eventHub.grpcClient = eventsClient - } - - if err := eventHub.grpcClient.Start(); err != nil { - eventHub.grpcClient.Stop() - return errors.WithMessage(err, "event client start failed") - } - - eventHub.connected = true - return nil -} - -//GetInterestedEvents implements consumer.EventAdapter interface for registering interested events -func (eventHub *EventHub) GetInterestedEvents() ([]*pb.Interest, error) { - return eventHub.interestedEvents, nil -} - -//Recv implements consumer.EventAdapter interface for receiving events -func (eventHub *EventHub) Recv(msg *pb.Event) (bool, error) { - // Deliver events asynchronously so that we can continue receiving events - go func() { - switch msg.Event.(type) { - case *pb.Event_Block: - blockEvent := msg.Event.(*pb.Event_Block) - logger.Debugf("Recv blockEvent for block number [%d]", blockEvent.Block.Header.Number) - for _, v := range eventHub.getBlockRegistrants() { - v(blockEvent.Block) - } - txFilter := util.TxValidationFlags(blockEvent.Block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER]) - for i, tdata := range blockEvent.Block.Data.Data { - if txFilter.IsValid(i) { - if ccEvent, channelID, err := getChainCodeEvent(tdata); err != nil { - logger.Warnf("getChainCodeEvent return error: %v\n", err) - } else if ccEvent != nil { - eventHub.notifyChaincodeRegistrants(channelID, ccEvent, true) - } - } else { - logger.Debugf("received invalid transaction") - } - } - return - case *pb.Event_ChaincodeEvent: - ccEvent := msg.Event.(*pb.Event_ChaincodeEvent) - logger.Debugf("Recv ccEvent for txID [%s]", ccEvent.ChaincodeEvent.TxId) - if ccEvent != nil { - eventHub.notifyChaincodeRegistrants("", ccEvent.ChaincodeEvent, false) - } - return - default: - return - } - }() - - return true, nil -} - -// Disconnected implements consumer.EventAdapter interface for receiving events -func (eventHub *EventHub) Disconnected(err error) { - if err != nil { - logger.Warnf("EventHub was disconnected unexpectedly: %s", err) - } -} - -// RegisterChaincodeEvent registers a callback function to receive chaincode events. -// ccid: chaincode id -// eventname: The regex string used to filter events -// callback: Callback function for filter matches that takes a single parameter which is a json object representation -// of type "message ChaincodeEvent" -// Returns ChainCodeCBE object that should be treated as an opaque -// handle used to unregister (see unregisterChaincodeEvent) -func (eventHub *EventHub) RegisterChaincodeEvent(ccid string, eventname string, callback func(*fab.ChaincodeEvent)) *fab.ChainCodeCBE { - eventHub.mtx.Lock() - defer eventHub.mtx.Unlock() - - eventHub.addChaincodeInterest(ccid, eventname) - - cbe := fab.ChainCodeCBE{CCID: ccid, EventNameFilter: eventname, CallbackFunc: callback} - var cbeArray []*fab.ChainCodeCBE - - ccRegistrantArray, ok := eventHub.chaincodeRegistrants.Load(ccid) - if !ok { - cbeArray = make([]*fab.ChainCodeCBE, 0) - } else { - cbeArray = ccRegistrantArray.([]*fab.ChainCodeCBE) - } - cbeArray = append(cbeArray, &cbe) - eventHub.chaincodeRegistrants.Store(ccid, cbeArray) - - return &cbe -} - -// UnregisterChaincodeEvent unregisters chaincode event registration -// ChainCodeCBE: handle returned from call to registerChaincodeEvent. -func (eventHub *EventHub) UnregisterChaincodeEvent(cbe *fab.ChainCodeCBE) { - eventHub.mtx.Lock() - defer eventHub.mtx.Unlock() - - eventHub.removeChaincodeInterest(cbe.CCID, cbe.EventNameFilter) - - ccRegistrantArray, ok := eventHub.chaincodeRegistrants.Load(cbe.CCID) - if ok { - cbeArray := ccRegistrantArray.([]*fab.ChainCodeCBE) - if len(cbeArray) <= 0 { - logger.Debugf("No event registration for ccid %s \n", cbe.CCID) - return - } - - for i, v := range cbeArray { - if v == cbe { - newCbeArray := append(cbeArray[:i], cbeArray[i+1:]...) - if len(newCbeArray) <= 0 { - eventHub.chaincodeRegistrants.Delete(cbe.CCID) - } else { - eventHub.chaincodeRegistrants.Store(cbe.CCID, newCbeArray) - } - break - } - } - } -} - -// RegisterTxEvent registers a callback function to receive transactional events. -// txid: transaction id -// callback: Function that takes a single parameter which -// is a json object representation of type "message Transaction" -func (eventHub *EventHub) RegisterTxEvent(txnID fab.TransactionID, callback func(fab.TransactionID, pb.TxValidationCode, error)) { - logger.Debugf("reg txid %s\n", txnID) - eventHub.txRegistrants.Store(txnID, callback) -} - -// UnregisterTxEvent unregister transactional event registration. -// txid: transaction id -func (eventHub *EventHub) UnregisterTxEvent(txnID fab.TransactionID) { - logger.Debugf("un-reg txid %s\n", txnID) - eventHub.txRegistrants.Delete(txnID) -} - -/** - * private internal callback for processing tx events - * @param {object} block json object representing block of tx - * from the fabric - */ -func (eventHub *EventHub) txCallback(block *common.Block) { - txFilter := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER]) - for i, v := range block.Data.Data { - - if env, err := utils.GetEnvelopeFromBlock(v); err != nil { - logger.Debugf("error extracting Envelope from block: %v\n", err) - return - } else if env != nil { - // get the payload from the envelope - payload, err := utils.GetPayload(env) - if err != nil { - logger.Debugf("error extracting Payload from envelope: %v\n", err) - return - } - - channelHeaderBytes := payload.Header.ChannelHeader - channelHeader := &common.ChannelHeader{} - err = proto.Unmarshal(channelHeaderBytes, channelHeader) - if err != nil { - logger.Debugf("error extracting ChannelHeader from payload: %v\n", err) - return - } - - txnID := fab.TransactionID(channelHeader.TxId) - callback := eventHub.getTXRegistrant(txnID) - if callback != nil { - if txFilter.IsInvalid(i) { - callback(fab.TransactionID(txnID), txFilter.Flag(i), - status.New(status.EventServerStatus, int32(txFilter.Flag(i)), "received invalid transaction", nil)) - } else { - callback(fab.TransactionID(txnID), txFilter.Flag(i), nil) - } - } else { - logger.Debugf("No callback registered for TxID: %s\n", txnID) - } - } - } -} - -func (eventHub *EventHub) getBlockRegistrants() []func(*common.Block) { - eventHub.mtx.RLock() - defer eventHub.mtx.RUnlock() - // Return a clone of the array to avoid race conditions - clone := make([]func(*common.Block), len(eventHub.blockRegistrants)) - copy(clone, eventHub.blockRegistrants) - return clone -} - -func (eventHub *EventHub) getChaincodeRegistrants(chaincodeID string) []*fab.ChainCodeCBE { - eventHub.mtx.RLock() - defer eventHub.mtx.RUnlock() - - registrants, ok := eventHub.chaincodeRegistrants.Load(chaincodeID) - if !ok { - return nil - } - cbeRegistrants := registrants.([]*fab.ChainCodeCBE) - // Return a clone of the array to avoid race conditions - clone := make([]*fab.ChainCodeCBE, len(cbeRegistrants)) - copy(clone, cbeRegistrants) - return clone -} - -func (eventHub *EventHub) getTXRegistrant(txID fab.TransactionID) func(fab.TransactionID, pb.TxValidationCode, error) { - v, ok := eventHub.txRegistrants.Load(txID) - if !ok { - return nil - } - return v.(func(fab.TransactionID, pb.TxValidationCode, error)) -} - -// getChainCodeEvents parses block events for chaincode events associated with individual transactions -func getChainCodeEvent(tdata []byte) (event *pb.ChaincodeEvent, channelID string, err error) { - - if tdata == nil { - return nil, "", errors.New("Cannot extract payload from nil transaction") - } - - if env, err := utils.GetEnvelopeFromBlock(tdata); err != nil { - return nil, "", errors.Wrap(err, "tx from block failed") - } else if env != nil { - // get the payload from the envelope - payload, err := utils.GetPayload(env) - if err != nil { - return nil, "", errors.Wrap(err, "extract payload from envelope failed") - } - - channelHeaderBytes := payload.Header.ChannelHeader - channelHeader := &common.ChannelHeader{} - err = proto.Unmarshal(channelHeaderBytes, channelHeader) - if err != nil { - return nil, "", errors.Wrap(err, "unmarshal channel header failed") - } - - channelID := channelHeader.ChannelId - - // Chaincode events apply to endorser transaction only - if common.HeaderType(channelHeader.Type) == common.HeaderType_ENDORSER_TRANSACTION { - tx, err := utils.GetTransaction(payload.Data) - if err != nil { - return nil, "", errors.Wrap(err, "unmarshal transaction payload") - } - chaincodeActionPayload, err := utils.GetChaincodeActionPayload(tx.Actions[0].Payload) - if err != nil { - return nil, "", errors.Wrap(err, "chaincode action payload retrieval failed") - } - propRespPayload, err := utils.GetProposalResponsePayload(chaincodeActionPayload.Action.ProposalResponsePayload) - if err != nil { - return nil, "", errors.Wrap(err, "proposal response payload retrieval failed") - } - caPayload, err := utils.GetChaincodeAction(propRespPayload.Extension) - if err != nil { - return nil, "", errors.Wrap(err, "chaincode action retrieval failed") - } - ccEvent, err := utils.GetChaincodeEvents(caPayload.Events) - - if ccEvent != nil { - return ccEvent, channelID, nil - } - } - } - return nil, "", nil -} - -// Utility function to fire callbacks for chaincode registrants -func (eventHub *EventHub) notifyChaincodeRegistrants(channelID string, ccEvent *pb.ChaincodeEvent, patternMatch bool) { - cbeArray := eventHub.getChaincodeRegistrants(ccEvent.ChaincodeId) - if len(cbeArray) <= 0 { - logger.Debugf("No event registration for ccid %s \n", ccEvent.ChaincodeId) - } - for _, v := range cbeArray { - match := v.EventNameFilter == ccEvent.EventName - if !match && patternMatch { - match, _ = regexp.MatchString(v.EventNameFilter, ccEvent.EventName) - } - if match { - callback := v.CallbackFunc - if callback != nil { - callback(&fab.ChaincodeEvent{ - ChaincodeID: ccEvent.ChaincodeId, - TxID: ccEvent.TxId, - EventName: ccEvent.EventName, - Payload: ccEvent.Payload, - ChannelID: channelID, - }) - } - } - } -} - -func isInsecureConnectionAllowed(peerCfg *core.PeerConfig) bool { - allowInsecure, ok := peerCfg.GRPCOptions["allow-insecure"].(bool) - if ok { - return allowInsecure - } - return false -} diff --git a/pkg/fab/events/eventhub_test.go b/pkg/fab/events/eventhub_test.go deleted file mode 100755 index 72a34fa133..0000000000 --- a/pkg/fab/events/eventhub_test.go +++ /dev/null @@ -1,609 +0,0 @@ -/* -Copyright SecureKey Technologies Inc. All Rights Reserved. - -SPDX-License-Identifier: Apache-2.0 -*/ - -package events - -import ( - "os" - "sync/atomic" - "testing" - - "time" - - "reflect" - - "github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab" - "github.com/hyperledger/fabric-sdk-go/pkg/errors/status" - "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks" - "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/common" - pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer" - "github.com/stretchr/testify/assert" -) - -func TestMain(m *testing.M) { - os.Exit(m.Run()) -} - -func TestDeadlock(t *testing.T) { - channelID := "mychannel" - ccID := "testccid" - - eventHub, clientFactory, err := createMockedEventHub() - if err != nil { - return - } - - t.Log("EventHub Concurrency test") - - client := clientFactory.clients[0] - if client == nil { - t.Fatalf("No client") - } - - threads := 10 - eventsPerThread := 200 - eventsSent := eventsPerThread * threads - - // The test should be done in milliseconds but if there's - // a deadlock then we don't want it to hang - timeout := 50 * time.Second - - // create a flood of TX events - txCompletion := newMultiCompletionHandler(eventsSent, timeout) - go flood(eventsPerThread, threads, func() { - txh, err := mocks.NewMockTransactionHeader(channelID) - if err != nil { - t.Fatalf("mock txn header failed: %s", err) - } - - received := newCompletionHandler(timeout) - eventHub.RegisterTxEvent(txh.TransactionID(), func(txID fab.TransactionID, code pb.TxValidationCode, err error) { - txCompletion.done() - received.done() - }) - - go client.MockEvent(&pb.Event{ - Event: (&MockTxEventBuilder{ - TxID: string(txh.TransactionID()), - ChannelID: channelID, - }).Build(), - }) - - // Wait for the TX event and then unregister - received.wait() - eventHub.UnregisterTxEvent(txh.TransactionID()) - }) - - // create a flood of CC events - ccCompletion := newMultiCompletionHandler(eventsSent, timeout) - go flood(eventsPerThread, threads, func() { - txh, err := mocks.NewMockTransactionHeader(channelID) - if err != nil { - t.Fatalf("mock txn header failed: %s", err) - } - - eventName := string(txh.TransactionID()) - received := newCompletionHandler(timeout) - registration := eventHub.RegisterChaincodeEvent(ccID, eventName, func(event *fab.ChaincodeEvent) { - ccCompletion.done() - received.done() - }) - - go client.MockEvent(&pb.Event{ - Event: (&MockCCEventBuilder{ - CCID: ccID, - EventName: eventName, - }).Build(), - }) - - // Wait for the CC event and then unregister - received.wait() - eventHub.UnregisterChaincodeEvent(registration) - }) - - // Wait for all events to be received - txCompletion.wait() - ccCompletion.wait() - - if txCompletion.numDone() != eventsSent { - t.Errorf("Sent %d Tx events but received %d - could indicate a deadlock", eventsSent, txCompletion.numDone()) - } else { - t.Logf("Received all %d TX events", txCompletion.numDone()) - } - - if ccCompletion.numDone() != eventsSent { - t.Errorf("Sent %d CC events but received %d - could indicate a deadlock", eventsSent, ccCompletion.numDone()) - } else { - t.Logf("Received all %d CC events", ccCompletion.numDone()) - } -} - -func TestChaincodeEvent(t *testing.T) { - ccID := "someccid" - eventName := "someevent" - - eventHub, clientFactory, err := createMockedEventHub() - if err != nil { - return - } - - t.Log("EventHub Chaincode event test") - - client := clientFactory.clients[0] - if client == nil { - t.Fatalf("No client") - } - - eventReceived := make(chan *fab.ChaincodeEvent) - - // Register for CC event - registration := eventHub.RegisterChaincodeEvent(ccID, eventName, func(event *fab.ChaincodeEvent) { - eventReceived <- event - }) - - // Publish CC event - go client.MockEvent(&pb.Event{ - Event: (&MockCCEventBuilder{ - CCID: ccID, - EventName: eventName, - }).Build(), - }) - - // Wait for the CC event - var event *fab.ChaincodeEvent - select { - case event = <-eventReceived: - eventHub.UnregisterChaincodeEvent(registration) - case <-time.After(time.Second * 5): - t.Fatalf("Timed out waiting for CC event") - } - - // Check CC event - if event.ChaincodeID != ccID { - t.Fatalf("Expecting chaincode ID [%s] but got [%s]", ccID, event.ChaincodeID) - } - if event.EventName != eventName { - t.Fatalf("Expecting event name [%s] but got [%s]", eventName, event.EventName) - } -} - -func TestChaincodeBlockEvent(t *testing.T) { - channelID := "somechannelid" - ccID := "someccid" - eventName := "someevent" - - txh, err := mocks.NewMockTransactionHeader(channelID) - if err != nil { - t.Fatalf("mock txn header failed: %s", err) - } - - eventHub, clientFactory, err := createMockedEventHub() - if err != nil { - return - } - - client := clientFactory.clients[0] - if client == nil { - t.Fatalf("No client") - } - - eventReceived := make(chan *fab.ChaincodeEvent) - - // Register for CC event - registration := eventHub.RegisterChaincodeEvent(ccID, eventName, func(event *fab.ChaincodeEvent) { - eventReceived <- event - }) - - // Publish CC event - go client.MockEvent(&pb.Event{ - Event: (&MockCCBlockEventBuilder{ - CCID: ccID, - EventName: eventName, - ChannelID: channelID, - TxID: string(txh.TransactionID()), - }).Build(), - }) - - // Wait for CC event - var event *fab.ChaincodeEvent - select { - case event = <-eventReceived: - eventHub.UnregisterChaincodeEvent(registration) - case <-time.After(time.Second * 5): - t.Fatalf("Timed out waiting for CC event") - } - - // Check CC event - if event.ChannelID != channelID { - t.Fatalf("Expecting channel ID [%s] but got [%s]", channelID, event.ChannelID) - } - if event.ChaincodeID != ccID { - t.Fatalf("Expecting chaincode ID [%s] but got [%s]", ccID, event.ChaincodeID) - } - if event.EventName != eventName { - t.Fatalf("Expecting event name [%s] but got [%s]", eventName, event.EventName) - } - if event.TxID == "" { - t.Fatalf("Expecting TxID [%s] but got [%s]", txh.TransactionID(), event.TxID) - } -} - -func TestChaincodeBlockEventWithInvalidTx(t *testing.T) { - channelID := "somechannelid" - ccID := "someccid" - eventName := "someevent" - - txh, err := mocks.NewMockTransactionHeader(channelID) - if err != nil { - t.Fatalf("mock txn header failed: %s", err) - } - - eventHub, clientFactory, err := createMockedEventHub() - if err != nil { - return - } - - client := clientFactory.clients[0] - if client == nil { - t.Fatalf("No client") - } - - eventReceived := make(chan *fab.ChaincodeEvent) - - // Register for CC event - registration := eventHub.RegisterChaincodeEvent(ccID, eventName, func(event *fab.ChaincodeEvent) { - eventReceived <- event - }) - - // Publish CC event - go client.MockEvent(&pb.Event{ - Event: (&MockCCBlockEventBuilder{ - CCID: ccID, - EventName: eventName, - ChannelID: channelID, - TxID: string(txh.TransactionID()), - }).BuildWithTxValidationFlag(false), - }) - - // Wait for CC event - var event *fab.ChaincodeEvent - select { - case event = <-eventReceived: - t.Fatalf("CC event is not expected to be triggered for a CC Block Event with invalid Tx flag") - case <-time.After(time.Second * 5): - t.Log("Timeout expected on CC event with Block Event having an invalid Tx flag") - eventHub.UnregisterChaincodeEvent(registration) - } - - // Check CC event - if event != nil { - t.Fatalf("Expecting nil event but got [%s]", event) - } -} - -func TestInvalidTxStatusError(t *testing.T) { - validationCode := pb.TxValidationCode_ILLEGAL_WRITESET - channelID := "somechannelid" - txh, err := mocks.NewMockTransactionHeader(channelID) - if err != nil { - t.Fatalf("mock txn header failed: %s", err) - } - - eventHub, clientFactory, err := createMockedEventHub() - if err != nil { - return - } - - client := clientFactory.clients[0] - assert.NotNil(t, client) - - txReceived := make(chan error) - - // Register for tx event - eventHub.RegisterTxEvent(txh.TransactionID(), func(txID fab.TransactionID, c pb.TxValidationCode, e error) { - txReceived <- e - }) - - // Publish CC event - go client.MockEvent(&pb.Event{ - Event: (&MockTxEventBuilder{ - ChannelID: channelID, - TxID: string(txh.TransactionID()), - }).BuildWithTxValidationCode(validationCode), - }) - - select { - case err := <-txReceived: - assert.NotNil(t, err, "Expected non-nil error") - statusError, ok := status.FromError(err) - assert.True(t, ok, "Expected status error") - assert.EqualValues(t, validationCode, status.ToTransactionValidationCode(statusError.Code)) - assert.Equal(t, status.EventServerStatus, statusError.Group) - assert.Equal(t, "received invalid transaction", statusError.Message, "Expected error message") - case <-time.After(time.Second * 5): - t.Fatal("Timeout waiting for block with invalid Tx flag") - } - -} - -// completionHandler waits for a single event with a timeout -type completionHandler struct { - completed chan bool - timeout time.Duration -} - -// newCompletionHandler creates a new completionHandler -func newCompletionHandler(timeout time.Duration) *completionHandler { - return &completionHandler{ - timeout: timeout, - completed: make(chan bool), - } -} - -// wait will wait until the task(s) has completed or until the timeout -func (c *completionHandler) wait() { - select { - case <-c.completed: - case <-time.After(c.timeout): - } -} - -// done marks the task as completed -func (c *completionHandler) done() { - c.completed <- true -} - -// multiCompletionHandler waits for multiple tasks to complete -type multiCompletionHandler struct { - completionHandler - expected int32 - numCompleted int32 -} - -// newMultiCompletionHandler creates a new multiCompletionHandler -func newMultiCompletionHandler(expected int, timeout time.Duration) *multiCompletionHandler { - return &multiCompletionHandler{ - expected: int32(expected), - completionHandler: completionHandler{ - timeout: timeout, - completed: make(chan bool), - }, - } -} - -// done marks a task as completed -func (c *multiCompletionHandler) done() { - doneSoFar := atomic.AddInt32(&c.numCompleted, 1) - if doneSoFar >= c.expected { - c.completed <- true - } -} - -// numDone returns the nmber of tasks that have completed -func (c *multiCompletionHandler) numDone() int { - return int(atomic.LoadInt32(&c.numCompleted)) -} - -// flood invokes the given function in the given number of threads, -// the given number of times per thread -func flood(invocationsPerThread int, threads int, f func()) { - for t := 0; t < threads; t++ { - go func() { - for i := 0; i < invocationsPerThread; i++ { - f() - } - }() - } -} - -func TestRegisterBlockEvent(t *testing.T) { - eventHub, _, err := createMockedEventHub() - if err != nil { - return - } - - // Transaction callback is registered by default - if len(eventHub.interestedEvents) != 1 || len(eventHub.blockRegistrants) != 1 { - t.Fatalf("Transaction callback should be registered by default") - } - - f1 := reflect.ValueOf(eventHub.txCallback) - f2 := reflect.ValueOf(eventHub.blockRegistrants[0]) - - if f1.Pointer() != f2.Pointer() { - t.Fatalf("Registered callback is not txCallback") - } - - w := &callbackWrapper{t} - - eventHub.RegisterBlockEvent(w.testCallback) - - if len(eventHub.blockRegistrants) != 2 { - t.Fatalf("Failed to add test callback for block event") - } - - f1 = reflect.ValueOf(w.testCallback) - f2 = reflect.ValueOf(eventHub.blockRegistrants[1]) - - if f1.Pointer() != f2.Pointer() { - t.Fatalf("Registered callback is not testCallback") - } - - eventHub.UnregisterBlockEvent(w.testCallback) - - if len(eventHub.interestedEvents) != 1 || len(eventHub.blockRegistrants) != 1 { - t.Fatalf("Failed to unregister testCallback") - } - - eventHub.UnregisterBlockEvent(eventHub.txCallback) - - if len(eventHub.interestedEvents) != 0 || len(eventHub.blockRegistrants) != 0 { - t.Fatalf("Failed to unregister txCallback") - } - -} - -type callbackWrapper struct { - t *testing.T -} - -// private test callback to be executed on block event -func (w *callbackWrapper) testCallback(block *common.Block) { - w.t.Log("testCallback called on block") -} - -func TestRegisterChaincodeEvent(t *testing.T) { - eventHub, _, err := createMockedEventHub() - if err != nil { - return - } - - // Interest in block event is registered by default - if len(eventHub.interestedEvents) != 1 { - t.Fatalf("Transaction callback should be registered by default") - } - - w := &callbackWrapper{t} - - cbe := eventHub.RegisterChaincodeEvent("testCC", "eventID", w.testChaincodeCallback) - - if len(eventHub.interestedEvents) != 2 { - t.Fatalf("Failed to register interest for CC event") - } - - interest := eventHub.interestedEvents[1] - - if interest.EventType != pb.EventType_CHAINCODE { - t.Fatalf("Expecting chaincode event type, got (%v)", interest.EventType) - } - - ccRegInfo := interest.GetChaincodeRegInfo() - - if ccRegInfo.ChaincodeId != "testCC" { - t.Fatalf("Expecting chaincode id (%s), got (%s)", "testCC", ccRegInfo.ChaincodeId) - } - - if ccRegInfo.EventName != "eventID" { - t.Fatalf("Expecting event id (%s), got (%s)", "eventID", ccRegInfo.EventName) - } - - eventHub.UnregisterChaincodeEvent(cbe) - - if len(eventHub.interestedEvents) != 1 { - t.Fatalf("Expecting one registered interest, got %d", len(eventHub.interestedEvents)) - } - -} - -// private test callback to be executed on chaincode event -func (w *callbackWrapper) testChaincodeCallback(ce *fab.ChaincodeEvent) { - w.t.Logf("Received CC event: %v", ce) -} - -func TestDisconnect(t *testing.T) { - eventHub, _, err := createMockedEventHub() - if err != nil { - return - } - eventHub.Disconnect() - verifyDisconnectedEventHub(eventHub, t) -} - -func TestDisconnectWhenDisconnected(t *testing.T) { - eventHub, _, err := createMockedEventHub() - if err != nil { - return - } - eventHub.connected = false - eventHub.Disconnect() - verifyDisconnectedEventHub(eventHub, t) -} - -func verifyDisconnectedEventHub(eventHub *EventHub, t *testing.T) { - if eventHub.connected == true { - t.Fatalf("EventHub is not disconnected after Disconnect call") - } -} - -func TestConnectWhenConnected(t *testing.T) { - eventHub, _, err := createMockedEventHub() - if err != nil { - return - } - - eventHub.connected = true - err = eventHub.Connect() - if err != nil { - t.Fatalf("EventHub failed to connect after Connect call %s", err) - } -} - -func TestConnectWhenPeerAddrEmpty(t *testing.T) { - eventHub, _, err := createMockedEventHub() - if err != nil { - return - } - - eventHub.connected = false // need to reset connected in order to reach peerAddr check - eventHub.peerAddr = "" - err = eventHub.Connect() - - if err == nil { - t.Fatal("peerAddr empty, failed to get expected connect error") - } - return -} - -func TestConnectWithInterestsTrueAndGetInterests(t *testing.T) { - eventHub, _, err := createMockedEventHub() - if err != nil { - return - } - - eventHub.connected = false - eventHub.SetInterests(true) - err = eventHub.Connect() - - if err != nil { - t.Fatalf("InterestedEvents must not be empty. Error received: %s", err) - } - - interestedEvents, _ := eventHub.GetInterestedEvents() - if interestedEvents == nil || len(interestedEvents) == 0 { - t.Fatalf("GetInterests must not be empty. Received: %s", err) - } -} - -func TestConnectWithInterestsFalseAndGetInterests(t *testing.T) { - eventHub, _, err := createMockedEventHub() - if err != nil { - return - } - - eventHub.connected = false - eventHub.SetInterests(false) - err = eventHub.Connect() - - if err == nil { - t.Fatalf("InterestedEvents must not be empty. Error received: %s", err) - } - - interestedEvents, _ := eventHub.GetInterestedEvents() - if interestedEvents != nil && len(interestedEvents) > 0 { - t.Fatalf("GetInterests must be empty. Received: %s", err) - } - -} - -func TestInterfaces(t *testing.T) { - var apiEventHub fab.EventHub - var eventHub EventHub - - apiEventHub = &eventHub - if apiEventHub == nil { - t.Fatalf("this shouldn't happen.") - } -} diff --git a/pkg/fab/events/eventmocks.go b/pkg/fab/events/eventmocks.go deleted file mode 100644 index 7b79b81572..0000000000 --- a/pkg/fab/events/eventmocks.go +++ /dev/null @@ -1,347 +0,0 @@ -/* -Copyright SecureKey Technologies Inc. All Rights Reserved. - -SPDX-License-Identifier: Apache-2.0 -*/ - -package events - -import ( - "crypto/x509" - "time" - - "github.com/pkg/errors" - "google.golang.org/grpc/keepalive" - - fcConsumer "github.com/hyperledger/fabric-sdk-go/internal/github.com/hyperledger/fabric/events/consumer" - "github.com/hyperledger/fabric-sdk-go/pkg/common/context" - "github.com/hyperledger/fabric-sdk-go/pkg/context/api/core" - "github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab" - "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks" - ledger_util "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/core/ledger/util" - "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/common" - pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer" - protos_utils "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/utils" -) - -type mockEventClientMockEventRegistration struct { - Action string - ies []*pb.Interest -} - -type mockEventClient struct { - PeerAddress string - RegTimeout time.Duration - Adapter fcConsumer.EventAdapter - - Started int - Stopped int - Registrations []mockEventClientMockEventRegistration - - events chan pb.Event -} - -type mockEventClientFactory struct { - clients []*mockEventClient -} - -func (mecf *mockEventClientFactory) newEventsClient(provider core.Providers, identity context.Identity, peerAddress string, certificate *x509.Certificate, serverHostOverride string, regTimeout time.Duration, - adapter fcConsumer.EventAdapter, kap keepalive.ClientParameters, failFast bool, allowInsecure bool) (fab.EventsClient, error) { - mec := &mockEventClient{ - PeerAddress: peerAddress, - RegTimeout: regTimeout, - Adapter: adapter, - events: make(chan pb.Event), - } - mecf.clients = append(mecf.clients, mec) - return mec, nil -} - -// MockEvent mocks an event -func (mec *mockEventClient) MockEvent(msg *pb.Event) (bool, error) { - if mec.Started > mec.Stopped { - return mec.Adapter.Recv(msg) - } - - mec.events <- *msg - return true, nil -} - -// RegisterAsync does not register anything anywhere but acts like all is well -func (mec *mockEventClient) RegisterAsync(ies []*pb.Interest) error { - mec.Registrations = append(mec.Registrations, mockEventClientMockEventRegistration{ - Action: "register", - ies: ies, - }) - return nil -} - -// UnregisterAsync does not unregister anything anywhere but acts like all is well -func (mec *mockEventClient) UnregisterAsync(ies []*pb.Interest) error { - mec.Registrations = append(mec.Registrations, mockEventClientMockEventRegistration{ - Action: "register", - ies: ies, - }) - return nil -} - -// Unregister does not unregister anything anywhere but acts like all is well -func (mec *mockEventClient) Unregister(ies []*pb.Interest) error { - return mec.UnregisterAsync(ies) -} - -// Recv will return mock events sent to the event channel. Warning! This might block indefinitely -func (mec *mockEventClient) Recv() (*pb.Event, error) { - event := <-mec.events - return &event, nil -} - -// Start does not start anything -func (mec *mockEventClient) Start() error { - mec.Started++ - return nil -} - -// Stop does not stop anything -func (mec *mockEventClient) Stop() error { - mec.Stopped++ - return nil -} - -func createMockedEventHub() (*EventHub, *mockEventClientFactory, error) { - user := mocks.NewMockUser("user") - fabCtx := mocks.NewMockContext(user) - ctx := Context{ - Providers: fabCtx, - Identity: fabCtx, - } - eventHub, err := New(ctx) - if err != nil { - return nil, nil, errors.WithMessage(err, "Error creating event hub") - } - - var clientFactory mockEventClientFactory - eventHub.eventsClientFactory = &clientFactory - - eventHub.SetPeerAddr("mock://mock", nil, "", true) - - err = eventHub.Connect() - if err != nil { - return nil, nil, errors.WithMessage(err, "Failed to connect") - } - - return eventHub, &clientFactory, nil -} - -// MockTxEventBuilder builds a mock TX event block -type MockTxEventBuilder struct { - ChannelID string - TxID string -} - -// MockCCEventBuilder builds a mock chaincode event -type MockCCEventBuilder struct { - CCID string - EventName string - Payload []byte -} - -// MockCCBlockEventBuilder builds a mock CC event block -type MockCCBlockEventBuilder struct { - CCID string - EventName string - ChannelID string - TxID string - Payload []byte -} - -// Build builds a mock TX event block -func (b *MockTxEventBuilder) Build() *pb.Event_Block { - return &pb.Event_Block{ - Block: &common.Block{ - Header: &common.BlockHeader{}, - Metadata: b.buildBlockMetadata(pb.TxValidationCode_VALID), - Data: &common.BlockData{ - Data: [][]byte{protos_utils.MarshalOrPanic(b.buildEnvelope())}, - }, - }, - } -} - -// BuildWithTxValidationCode Build builds a mock TX event block -func (b *MockTxEventBuilder) BuildWithTxValidationCode(c pb.TxValidationCode) *pb.Event_Block { - return &pb.Event_Block{ - Block: &common.Block{ - Header: &common.BlockHeader{}, - Metadata: b.buildBlockMetadata(c), - Data: &common.BlockData{ - Data: [][]byte{protos_utils.MarshalOrPanic(b.buildEnvelope())}, - }, - }, - } -} - -func (b *MockTxEventBuilder) buildBlockMetadata(c pb.TxValidationCode) *common.BlockMetadata { - return &common.BlockMetadata{ - Metadata: [][]byte{ - []byte{}, - []byte{}, - b.buildTransactionsFilterMetaDataBytes(c), - []byte{}, - }, - } -} - -func (b *MockTxEventBuilder) buildTransactionsFilterMetaDataBytes(c pb.TxValidationCode) []byte { - return []byte(ledger_util.TxValidationFlags{uint8(c)}) -} - -// Build builds a mock chaincode event -func (b *MockCCEventBuilder) Build() *pb.Event_ChaincodeEvent { - return &pb.Event_ChaincodeEvent{ - ChaincodeEvent: &pb.ChaincodeEvent{ - ChaincodeId: b.CCID, - EventName: b.EventName, - Payload: b.Payload, - }, - } -} - -func (b *MockTxEventBuilder) buildEnvelope() *common.Envelope { - return &common.Envelope{ - Payload: protos_utils.MarshalOrPanic(b.buildPayload()), - } -} - -func (b *MockTxEventBuilder) buildPayload() *common.Payload { - return &common.Payload{ - Header: &common.Header{ - ChannelHeader: protos_utils.MarshalOrPanic(b.buildChannelHeader()), - }, - } -} - -func (b *MockTxEventBuilder) buildChannelHeader() *common.ChannelHeader { - return &common.ChannelHeader{ - TxId: b.TxID, - ChannelId: b.ChannelID, - } -} - -// Build builds a mock chaincode event block -func (b *MockCCBlockEventBuilder) Build() *pb.Event_Block { - return b.BuildWithTxValidationFlag(true) -} - -// BuildWithTxValidationFlag builds a mock chaincode event block with valid/invalid TxValidation Flag (set in the argument) -func (b *MockCCBlockEventBuilder) BuildWithTxValidationFlag(isValid bool) *pb.Event_Block { - return &pb.Event_Block{ - Block: &common.Block{ - Header: &common.BlockHeader{}, - Metadata: b.buildBlockMetadataWithValidFlag(isValid), - Data: &common.BlockData{ - Data: [][]byte{protos_utils.MarshalOrPanic(b.buildEnvelope())}, - }, - }, - } -} - -func (b *MockCCBlockEventBuilder) buildBlockMetadata() *common.BlockMetadata { - return b.buildBlockMetadataWithValidFlag(true) -} - -func (b *MockCCBlockEventBuilder) buildBlockMetadataWithValidFlag(isValid bool) *common.BlockMetadata { - return &common.BlockMetadata{ - Metadata: [][]byte{ - []byte{}, - []byte{}, - b.buildTransactionsFilterMetaDataBytesWithValidFlag(isValid), - []byte{}, - }, - } -} - -func (b *MockCCBlockEventBuilder) buildEnvelope() *common.Envelope { - return &common.Envelope{ - Payload: protos_utils.MarshalOrPanic(b.buildPayload()), - } -} - -func (b *MockCCBlockEventBuilder) buildTransactionsFilterMetaDataBytes() []byte { - return b.buildTransactionsFilterMetaDataBytesWithValidFlag(true) -} - -func (b *MockCCBlockEventBuilder) buildTransactionsFilterMetaDataBytesWithValidFlag(isValidTx bool) []byte { - if isValidTx { - return []byte(ledger_util.TxValidationFlags{uint8(pb.TxValidationCode_VALID)}) - } - // return transaction with any non valid flag - return []byte(ledger_util.TxValidationFlags{uint8(pb.TxValidationCode_BAD_COMMON_HEADER)}) -} - -func (b *MockCCBlockEventBuilder) buildPayload() *common.Payload { - logger.Debug("MockCCBlockEventBuilder.buildPayload") - return &common.Payload{ - Header: &common.Header{ - ChannelHeader: protos_utils.MarshalOrPanic(b.buildChannelHeader()), - }, - Data: protos_utils.MarshalOrPanic(b.buildTransaction()), - } -} - -func (b *MockCCBlockEventBuilder) buildChannelHeader() *common.ChannelHeader { - return &common.ChannelHeader{ - Type: int32(common.HeaderType_ENDORSER_TRANSACTION), - TxId: b.TxID, - ChannelId: b.ChannelID, - } -} - -func (b *MockCCBlockEventBuilder) buildTransaction() *pb.Transaction { - return &pb.Transaction{ - Actions: []*pb.TransactionAction{b.buildTransactionAction()}, - } -} - -func (b *MockCCBlockEventBuilder) buildTransactionAction() *pb.TransactionAction { - return &pb.TransactionAction{ - Header: []byte{}, - Payload: protos_utils.MarshalOrPanic(b.buildChaincodeActionPayload()), - } -} - -func (b *MockCCBlockEventBuilder) buildChaincodeActionPayload() *pb.ChaincodeActionPayload { - return &pb.ChaincodeActionPayload{ - Action: b.buildChaincodeEndorsedAction(), - ChaincodeProposalPayload: []byte{}, - } -} - -func (b *MockCCBlockEventBuilder) buildChaincodeEndorsedAction() *pb.ChaincodeEndorsedAction { - return &pb.ChaincodeEndorsedAction{ - ProposalResponsePayload: protos_utils.MarshalOrPanic(b.buildProposalResponsePayload()), - Endorsements: []*pb.Endorsement{}, - } -} - -func (b *MockCCBlockEventBuilder) buildProposalResponsePayload() *pb.ProposalResponsePayload { - return &pb.ProposalResponsePayload{ - ProposalHash: []byte("somehash"), - Extension: protos_utils.MarshalOrPanic(b.buildChaincodeAction()), - } -} - -func (b *MockCCBlockEventBuilder) buildChaincodeAction() *pb.ChaincodeAction { - return &pb.ChaincodeAction{ - Events: protos_utils.MarshalOrPanic(b.buildChaincodeEvent()), - } -} - -func (b *MockCCBlockEventBuilder) buildChaincodeEvent() *pb.ChaincodeEvent { - return &pb.ChaincodeEvent{ - ChaincodeId: b.CCID, - EventName: b.EventName, - TxId: b.TxID, - Payload: b.Payload, - } -} diff --git a/pkg/fab/mocks/mockchprovider.go b/pkg/fab/mocks/mockchprovider.go index b6f5cdbe08..3c75d48277 100644 --- a/pkg/fab/mocks/mockchprovider.go +++ b/pkg/fab/mocks/mockchprovider.go @@ -60,11 +60,6 @@ func (cp *MockChannelProvider) SetCustomChannelService(customSelectionService fa cp.customSelectionService = customSelectionService } -// EventHub ... -func (cs *MockChannelService) EventHub() (fab.EventHub, error) { - return NewMockEventHub(), nil -} - // EventService returns a mock event service func (cs *MockChannelService) EventService() (fab.EventService, error) { return NewMockEventService(), nil diff --git a/pkg/fab/mocks/mockeventhub.go b/pkg/fab/mocks/mockeventhub.go deleted file mode 100644 index 2f0b300af7..0000000000 --- a/pkg/fab/mocks/mockeventhub.go +++ /dev/null @@ -1,77 +0,0 @@ -/* -Copyright SecureKey Technologies Inc. All Rights Reserved. - -SPDX-License-Identifier: Apache-2.0 -*/ - -package mocks - -import ( - "crypto/x509" - - "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/common" - - "github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab" - pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer" -) - -// MockEventHub Mock EventHub -type MockEventHub struct { - RegisteredTxCallbacks chan func(fab.TransactionID, pb.TxValidationCode, error) -} - -// NewMockEventHub creates a new mock EventHub -func NewMockEventHub() *MockEventHub { - return &MockEventHub{RegisteredTxCallbacks: make(chan func(fab.TransactionID, pb.TxValidationCode, error))} -} - -// SetPeerAddr not implemented -func (m *MockEventHub) SetPeerAddr(peerURL string, certificate *x509.Certificate, serverHostOverride string, allowInsecure bool) { - // Not implemented -} - -// IsConnected not implemented -func (m *MockEventHub) IsConnected() bool { - return false -} - -// Connect not implemented -func (m *MockEventHub) Connect() error { - return nil -} - -// Disconnect not implemented -func (m *MockEventHub) Disconnect() error { - return nil -} - -// RegisterChaincodeEvent not implemented -func (m *MockEventHub) RegisterChaincodeEvent(ccid string, eventname string, callback func(event *fab.ChaincodeEvent)) *fab.ChainCodeCBE { - return nil -} - -// UnregisterChaincodeEvent not implemented -func (m *MockEventHub) UnregisterChaincodeEvent(cbe *fab.ChainCodeCBE) { - return -} - -// RegisterTxEvent not implemented -func (m *MockEventHub) RegisterTxEvent(txnID fab.TransactionID, callback func(fab.TransactionID, pb.TxValidationCode, error)) { - go func() { m.RegisteredTxCallbacks <- callback }() - return -} - -// UnregisterTxEvent not implemented -func (m *MockEventHub) UnregisterTxEvent(txnID fab.TransactionID) { - return -} - -// RegisterBlockEvent not implemented -func (m *MockEventHub) RegisterBlockEvent(callback func(*common.Block)) { - return -} - -// UnregisterBlockEvent not implemented -func (m *MockEventHub) UnregisterBlockEvent(callback func(*common.Block)) { - return -} diff --git a/pkg/fab/txn/txn.go b/pkg/fab/txn/txn.go index 2961d8add8..bb4494e107 100644 --- a/pkg/fab/txn/txn.go +++ b/pkg/fab/txn/txn.go @@ -248,25 +248,3 @@ func sendEnvelope(ctx contextApi.Client, envelope *fab.SignedEnvelope, orderer f } } } - -// Status is the transaction status returned from eventhub tx events -type Status struct { - Code pb.TxValidationCode - Error error -} - -// RegisterStatus registers on the given eventhub for the given transaction id -// returns a TxValidationCode channel which receives the validation code when the -// transaction completes. If the code is TxValidationCode_VALID then -// the transaction committed successfully, otherwise the code indicates the error -// that occurred. -func RegisterStatus(txID fab.TransactionID, eventHub fab.EventHub) chan Status { - statusNotifier := make(chan Status) - - eventHub.RegisterTxEvent(txID, func(txId fab.TransactionID, code pb.TxValidationCode, err error) { - logger.Debugf("Received code(%s) for txid(%s) and err(%s)\n", code, txId, err) - statusNotifier <- Status{Code: code, Error: err} - }) - - return statusNotifier -} diff --git a/pkg/fabsdk/provider/fabpvdr/fabpvdr.go b/pkg/fabsdk/provider/fabpvdr/fabpvdr.go index 3cce44aa2f..d040af981d 100644 --- a/pkg/fabsdk/provider/fabpvdr/fabpvdr.go +++ b/pkg/fabsdk/provider/fabpvdr/fabpvdr.go @@ -16,7 +16,6 @@ import ( "github.com/hyperledger/fabric-sdk-go/pkg/fab/channel/membership" "github.com/hyperledger/fabric-sdk-go/pkg/fab/chconfig" "github.com/hyperledger/fabric-sdk-go/pkg/fab/comm" - "github.com/hyperledger/fabric-sdk-go/pkg/fab/events" "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/deliverclient" "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/eventhubclient" "github.com/hyperledger/fabric-sdk-go/pkg/fab/orderer" @@ -108,33 +107,6 @@ func (f *InfraProvider) CreateChannelLedger(ic fab.IdentityContext, channelName return ledger, nil } -// CreateEventHub initilizes the event hub. -func (f *InfraProvider) CreateEventHub(ic fab.IdentityContext, channelID string) (fab.EventHub, error) { - peerConfig, err := f.providerContext.Config().ChannelPeers(channelID) - if err != nil { - return nil, errors.WithMessage(err, "read configuration for channel peers failed") - } - - var eventSource *core.ChannelPeer - for _, p := range peerConfig { - if p.EventSource && p.MspID == ic.MspID() { - eventSource = &p - break - } - } - - if eventSource == nil { - return nil, errors.New("unable to find event source for channel") - } - - // Event source found, create event hub - eventCtx := events.Context{ - Providers: f.providerContext, - Identity: ic, - } - return events.FromConfig(eventCtx, &eventSource.PeerConfig) -} - // CreateEventService creates the event service. func (f *InfraProvider) CreateEventService(ctx fab.ClientContext, chConfig fab.ChannelCfg) (fab.EventService, error) { key, err := NewCacheKey(ctx, chConfig)