Skip to content

Commit

Permalink
Prevent infinite loop in gRPC tracing during span storage.
Browse files Browse the repository at this point in the history
fixes: #5971

Signed-off-by: cx <1249843194@qq.com>
  • Loading branch information
ldlb9527 committed Sep 12, 2024
1 parent 7787010 commit 5ae15ad
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 23 deletions.
34 changes: 20 additions & 14 deletions plugin/storage/grpc/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ type Factory struct {
configV1 Configuration
configV2 *ConfigV2

services *ClientPluginServices
remoteConn *grpc.ClientConn
services *ClientPluginServices
remoteConns []*grpc.ClientConn
}

// NewFactory creates a new Factory.
Expand Down Expand Up @@ -101,8 +101,8 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
return noopmetric.NewMeterProvider()
},
}
newClientFn := func(opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
return f.configV2.ToClientConn(context.Background(), componenttest.NewNopHost(), telset, opts...)
newClientFn := func(telSettings component.TelemetrySettings, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
return f.configV2.ToClientConn(context.Background(), componenttest.NewNopHost(), telSettings, opts...)
}

var err error
Expand All @@ -114,13 +114,11 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
return nil
}

type newClientFn func(opts ...grpc.DialOption) (*grpc.ClientConn, error)
type newClientFn func(telSettings component.TelemetrySettings, opts ...grpc.DialOption) (*grpc.ClientConn, error)

func (f *Factory) newRemoteStorage(telset component.TelemetrySettings, newClient newClientFn) (*ClientPluginServices, error) {
c := f.configV2
opts := []grpc.DialOption{
grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(telset.TracerProvider))),
}
opts := make([]grpc.DialOption, 0)
if c.Auth != nil {
return nil, fmt.Errorf("authenticator is not supported")
}
Expand All @@ -131,12 +129,20 @@ func (f *Factory) newRemoteStorage(telset component.TelemetrySettings, newClient
opts = append(opts, grpc.WithStreamInterceptor(tenancy.NewClientStreamInterceptor(tenancyMgr)))
}

remoteConn, err := newClient(opts...)
noTraceConn, err := newClient(telset, opts...)
if err != nil {
return nil, fmt.Errorf("error creating remote storage client: %w", err)
return nil, fmt.Errorf("error creating remote storage client without tracing: %w", err)

Check warning on line 134 in plugin/storage/grpc/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/grpc/factory.go#L134

Added line #L134 was not covered by tests
}
f.remoteConn = remoteConn
grpcClient := shared.NewGRPCClient(remoteConn)
f.remoteConns = append(f.remoteConns, noTraceConn)

opts = append(opts, grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(telset.TracerProvider))))
traceConn, err := newClient(telset, opts...)
if err != nil {
return nil, fmt.Errorf("error creating remote storage client with tracing: %w", err)

Check warning on line 141 in plugin/storage/grpc/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/grpc/factory.go#L141

Added line #L141 was not covered by tests
}
f.remoteConns = append(f.remoteConns, traceConn)

grpcClient := shared.NewGRPCClient(traceConn, noTraceConn)
return &ClientPluginServices{
PluginServices: shared.PluginServices{
Store: grpcClient,
Expand Down Expand Up @@ -200,8 +206,8 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
// Close closes the resources held by the factory
func (f *Factory) Close() error {
var errs []error
if f.remoteConn != nil {
errs = append(errs, f.remoteConn.Close())
for i := range f.remoteConns {
errs = append(errs, f.remoteConns[i].Close())
}
errs = append(errs, f.configV1.RemoteTLS.Close())
return errors.Join(errs...)
Expand Down
16 changes: 8 additions & 8 deletions plugin/storage/grpc/shared/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ type GRPCClient struct {
streamWriterClient storage_v1.StreamingSpanWriterPluginClient
}

func NewGRPCClient(c *grpc.ClientConn) *GRPCClient {
func NewGRPCClient(withTraceConn *grpc.ClientConn, withoutTraceConn *grpc.ClientConn) *GRPCClient {
return &GRPCClient{
readerClient: storage_v1.NewSpanReaderPluginClient(c),
writerClient: storage_v1.NewSpanWriterPluginClient(c),
archiveReaderClient: storage_v1.NewArchiveSpanReaderPluginClient(c),
archiveWriterClient: storage_v1.NewArchiveSpanWriterPluginClient(c),
capabilitiesClient: storage_v1.NewPluginCapabilitiesClient(c),
depsReaderClient: storage_v1.NewDependenciesReaderPluginClient(c),
streamWriterClient: storage_v1.NewStreamingSpanWriterPluginClient(c),
readerClient: storage_v1.NewSpanReaderPluginClient(withTraceConn),
writerClient: storage_v1.NewSpanWriterPluginClient(withoutTraceConn),
archiveReaderClient: storage_v1.NewArchiveSpanReaderPluginClient(withTraceConn),
archiveWriterClient: storage_v1.NewArchiveSpanWriterPluginClient(withoutTraceConn),
capabilitiesClient: storage_v1.NewPluginCapabilitiesClient(withTraceConn),
depsReaderClient: storage_v1.NewDependenciesReaderPluginClient(withTraceConn),
streamWriterClient: storage_v1.NewStreamingSpanWriterPluginClient(withoutTraceConn),
}
}

Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/grpc/shared/grpc_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func withGRPCClient(fn func(r *grpcClientTest)) {

func TestNewGRPCClient(t *testing.T) {
conn := &grpc.ClientConn{}
client := NewGRPCClient(conn)
client := NewGRPCClient(conn, conn)
assert.NotNil(t, client)

assert.Implements(t, (*storage_v1.SpanReaderPluginClient)(nil), client.readerClient)
Expand Down

0 comments on commit 5ae15ad

Please sign in to comment.