From f4ee58db5057c5fa7e2e05e150fc67aa8de7b423 Mon Sep 17 00:00:00 2001 From: Will Assis <35489495+gassiss@users.noreply.github.com> Date: Fri, 27 Jun 2025 08:15:52 -0400 Subject: [PATCH] unified-storage: split resource index client (#106297) * split resource server and index server grpc connection if defined in config --- pkg/services/apiserver/options/storage.go | 15 +- pkg/storage/unified/client.go | 66 +++++--- pkg/storage/unified/client_test.go | 151 ++++++++++++++++++ pkg/storage/unified/resource/client.go | 41 +++-- .../unified/sql/test/integration_test.go | 2 +- 5 files changed, 229 insertions(+), 46 deletions(-) create mode 100644 pkg/storage/unified/client_test.go diff --git a/pkg/services/apiserver/options/storage.go b/pkg/services/apiserver/options/storage.go index 15199252146..cddc8bb3c61 100644 --- a/pkg/services/apiserver/options/storage.go +++ b/pkg/services/apiserver/options/storage.go @@ -43,6 +43,7 @@ type StorageOptions struct { // For unified-grpc Address string + IndexServerAddress string GrpcClientAuthenticationToken string GrpcClientAuthenticationTokenExchangeURL string GrpcClientAuthenticationTokenNamespace string @@ -136,10 +137,22 @@ func (o *StorageOptions) ApplyTo(serverConfig *genericapiserver.RecommendedConfi if err != nil { return err } + var indexConn *grpc.ClientConn + if o.IndexServerAddress != "" { + indexConn, err = grpc.NewClient(o.IndexServerAddress, + grpc.WithStatsHandler(otelgrpc.NewClientHandler()), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + return err + } + } else { + indexConn = conn + } const resourceStoreAudience = "resourceStore" - unified, err := resource.NewRemoteResourceClient(tracer, conn, resource.RemoteResourceClientConfig{ + unified, err := resource.NewRemoteResourceClient(tracer, conn, indexConn, resource.RemoteResourceClientConfig{ Token: o.GrpcClientAuthenticationToken, TokenExchangeURL: o.GrpcClientAuthenticationTokenExchangeURL, Namespace: o.GrpcClientAuthenticationTokenNamespace, diff --git a/pkg/storage/unified/client.go b/pkg/storage/unified/client.go index 33e1425146a..1d1855d5eee 100644 --- a/pkg/storage/unified/client.go +++ b/pkg/storage/unified/client.go @@ -55,7 +55,8 @@ func ProvideUnifiedStorageClient(opts *Options, storageMetrics *resource.Storage client, err := newClient(options.StorageOptions{ StorageType: options.StorageType(apiserverCfg.Key("storage_type").MustString(string(options.StorageTypeUnified))), DataPath: apiserverCfg.Key("storage_path").MustString(filepath.Join(opts.Cfg.DataPath, "grafana-apiserver")), - Address: apiserverCfg.Key("address").MustString(""), // client address + Address: apiserverCfg.Key("address").MustString(""), + IndexServerAddress: apiserverCfg.Key("index_server_address").MustString(""), BlobStoreURL: apiserverCfg.Key("blob_url").MustString(""), BlobThresholdBytes: apiserverCfg.Key("blob_threshold_bytes").MustInt(options.BlobThresholdDefault), }, opts.Cfg, opts.Features, opts.DB, opts.Tracer, opts.Reg, opts.Authzc, opts.Docs, storageMetrics, indexMetrics) @@ -117,34 +118,29 @@ func newClient(opts options.StorageOptions, } var ( - conn grpc.ClientConnInterface - err error - metrics = newClientMetrics(reg) + conn grpc.ClientConnInterface + indexConn grpc.ClientConnInterface + err error + metrics = newClientMetrics(reg) ) - // Create either a connection pool or a single connection. - // The connection pool __can__ be useful when connection to - // server side load balancers like kube-proxy. - if features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageGrpcConnectionPool) { - conn, err = newPooledConn(&poolOpts{ - initialCapacity: 3, - maxCapacity: 6, - idleTimeout: time.Minute, - factory: func() (*grpc.ClientConn, error) { - return grpcConn(opts.Address, metrics) - }, - }) + + conn, err = newGrpcConn(opts.Address, metrics, features) + if err != nil { + return nil, err + } + + if opts.IndexServerAddress != "" { + indexConn, err = newGrpcConn(opts.IndexServerAddress, metrics, features) + if err != nil { return nil, err } } else { - conn, err = grpcConn(opts.Address, metrics) - if err != nil { - return nil, err - } + indexConn = conn } // Create a client instance - client, err := resource.NewResourceClient(conn, cfg, features, tracer) + client, err := resource.NewResourceClient(conn, indexConn, cfg, features, tracer) if err != nil { return nil, err } @@ -164,6 +160,34 @@ func newClient(opts options.StorageOptions, } } +func newGrpcConn(address string, metrics *clientMetrics, features featuremgmt.FeatureToggles) (grpc.ClientConnInterface, error) { + // Create either a connection pool or a single connection. + // The connection pool __can__ be useful when connection to + // server side load balancers like kube-proxy. + if features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageGrpcConnectionPool) { + conn, err := newPooledConn(&poolOpts{ + initialCapacity: 3, + maxCapacity: 6, + idleTimeout: time.Minute, + factory: func() (*grpc.ClientConn, error) { + return grpcConn(address, metrics) + }, + }) + if err != nil { + return nil, err + } + + return conn, nil + } + + conn, err := grpcConn(address, metrics) + if err != nil { + return nil, err + } + + return conn, nil +} + // grpcConn creates a new gRPC connection to the provided address. func grpcConn(address string, metrics *clientMetrics) (*grpc.ClientConn, error) { // Report gRPC status code errors as labels. diff --git a/pkg/storage/unified/client_test.go b/pkg/storage/unified/client_test.go new file mode 100644 index 00000000000..b75c22442a6 --- /dev/null +++ b/pkg/storage/unified/client_test.go @@ -0,0 +1,151 @@ +package unified + +import ( + "context" + "net" + "strings" + "testing" + + authlib "github.com/grafana/authlib/types" + "github.com/grafana/grafana/pkg/apimachinery/identity" + "github.com/grafana/grafana/pkg/services/apiserver/options" + "github.com/grafana/grafana/pkg/services/featuremgmt" + "github.com/grafana/grafana/pkg/setting" + "github.com/grafana/grafana/pkg/storage/unified/resource" + "github.com/grafana/grafana/pkg/storage/unified/resourcepb" + "github.com/stretchr/testify/require" + + "google.golang.org/grpc" +) + +func TestUnifiedStorageClient(t *testing.T) { + resourceServerAddress := ":11000" + resourceServer := createTestGrpcServer(t, resourceServerAddress) + defer resourceServer.s.Stop() + indexServerAddress := ":11001" + indexServer := createTestGrpcServer(t, indexServerAddress) + defer indexServer.s.Stop() + + t.Run("when storage type is unified-grpc", func(t *testing.T) { + t.Run("should create a client that connects to the unified storage server address", func(t *testing.T) { + resourceServer.resetCalls() + indexServer.resetCalls() + + client, err := newClient( + options.StorageOptions{ + StorageType: options.StorageTypeUnifiedGrpc, + Address: resourceServerAddress, + }, + &setting.Cfg{}, + featuremgmt.WithFeatures(), + nil, + nil, + nil, + authlib.FixedAccessClient(true), + nil, + nil, + nil, + ) + require.NoError(t, err) + + testCallAllMethods(client) + // every method should hit resource server exactly once + for method, count := range resourceServer.Calls { + require.Equal(t, 1, count, "method was called more than once: "+method) + } + // no hits to the index server in this case + for range indexServer.Calls { + require.FailNow(t, "index server was called when it should have not") + } + }) + + t.Run("should connect to a separate index server if defined in the config", func(t *testing.T) { + resourceServer.resetCalls() + indexServer.resetCalls() + + client, err := newClient( + options.StorageOptions{ + StorageType: options.StorageTypeUnifiedGrpc, + Address: resourceServerAddress, + IndexServerAddress: indexServerAddress, + }, + &setting.Cfg{}, + featuremgmt.WithFeatures(), + nil, + nil, + nil, + authlib.FixedAccessClient(true), + nil, + nil, + nil, + ) + require.NoError(t, err) + + testCallAllMethods(client) + // only resource store methods in this case + for method, count := range resourceServer.Calls { + require.Equal(t, 1, count, "method was called more than once: "+method) + require.True(t, strings.Contains(method, "resource.ResourceStore")) + } + // index server methods should be called here + for method, count := range indexServer.Calls { + require.Equal(t, 1, count, "method was called more than once: "+method) + require.True(t, strings.Contains(method, "resource.ResourceIndex") || strings.Contains(method, "resource.ManagedObjectIndex")) + } + }) + }) +} + +func testCallAllMethods(client resource.ResourceClient) { + _, _ = client.Read(identity.WithServiceIdentityContext(context.Background(), 1), &resourcepb.ReadRequest{}) + _, _ = client.Create(identity.WithServiceIdentityContext(context.Background(), 1), &resourcepb.CreateRequest{}) + _, _ = client.Delete(identity.WithServiceIdentityContext(context.Background(), 1), &resourcepb.DeleteRequest{}) + _, _ = client.Update(identity.WithServiceIdentityContext(context.Background(), 1), &resourcepb.UpdateRequest{}) + _, _ = client.List(identity.WithServiceIdentityContext(context.Background(), 1), &resourcepb.ListRequest{}) + _, _ = client.GetStats(identity.WithServiceIdentityContext(context.Background(), 1), &resourcepb.ResourceStatsRequest{}) + _, _ = client.Search(identity.WithServiceIdentityContext(context.Background(), 1), &resourcepb.ResourceSearchRequest{}) + _, _ = client.CountManagedObjects(identity.WithServiceIdentityContext(context.Background(), 1), &resourcepb.CountManagedObjectsRequest{}) + _, _ = client.ListManagedObjects(identity.WithServiceIdentityContext(context.Background(), 1), &resourcepb.ListManagedObjectsRequest{}) +} + +func createTestGrpcServer(t *testing.T, address string) *testServer { + listener, err := net.Listen("tcp", address) + require.NoError(t, err, "failed to listen") + + testServer := newTestServer() + s := grpc.NewServer( + grpc.UnknownServiceHandler(testServer.handler), + ) + + go func() { + _ = s.Serve(listener) + }() + + testServer.s = s + + return testServer +} + +type testServer struct { + resource.ResourceServer + Calls map[string]int + s *grpc.Server +} + +func newTestServer() *testServer { + return &testServer{ + Calls: make(map[string]int), + } +} + +func (s *testServer) resetCalls() { + s.Calls = make(map[string]int) +} + +func (s *testServer) handler(srv interface{}, serverStream grpc.ServerStream) error { + fullMethodName, ok := grpc.MethodFromServerStream(serverStream) + if ok { + s.Calls[fullMethodName]++ + } + return nil +} diff --git a/pkg/storage/unified/resource/client.go b/pkg/storage/unified/resource/client.go index 37ca1d2e500..079c617cf59 100644 --- a/pkg/storage/unified/resource/client.go +++ b/pkg/storage/unified/resource/client.go @@ -48,14 +48,14 @@ type resourceClient struct { resourcepb.DiagnosticsClient } -func NewResourceClient(conn grpc.ClientConnInterface, cfg *setting.Cfg, features featuremgmt.FeatureToggles, tracer trace.Tracer) (ResourceClient, error) { +func NewResourceClient(conn, indexConn grpc.ClientConnInterface, cfg *setting.Cfg, features featuremgmt.FeatureToggles, tracer trace.Tracer) (ResourceClient, error) { if !features.IsEnabledGlobally(featuremgmt.FlagAppPlatformGrpcClientAuth) { - return NewLegacyResourceClient(conn), nil + return NewLegacyResourceClient(conn, indexConn), nil } clientCfg := authnGrpcUtils.ReadGrpcClientConfig(cfg) - return NewRemoteResourceClient(tracer, conn, RemoteResourceClientConfig{ + return NewRemoteResourceClient(tracer, conn, indexConn, RemoteResourceClientConfig{ Token: clientCfg.Token, TokenExchangeURL: clientCfg.TokenExchangeURL, Audiences: []string{"resourceStore"}, @@ -64,24 +64,25 @@ func NewResourceClient(conn grpc.ClientConnInterface, cfg *setting.Cfg, features }) } -func newResourceClient(cc grpc.ClientConnInterface) ResourceClient { +func newResourceClient(storageCc grpc.ClientConnInterface, indexCc grpc.ClientConnInterface) ResourceClient { return &resourceClient{ - ResourceStoreClient: resourcepb.NewResourceStoreClient(cc), - ResourceIndexClient: resourcepb.NewResourceIndexClient(cc), - ManagedObjectIndexClient: resourcepb.NewManagedObjectIndexClient(cc), - BulkStoreClient: resourcepb.NewBulkStoreClient(cc), - BlobStoreClient: resourcepb.NewBlobStoreClient(cc), - DiagnosticsClient: resourcepb.NewDiagnosticsClient(cc), + ResourceStoreClient: resourcepb.NewResourceStoreClient(storageCc), + ResourceIndexClient: resourcepb.NewResourceIndexClient(indexCc), + ManagedObjectIndexClient: resourcepb.NewManagedObjectIndexClient(indexCc), + BulkStoreClient: resourcepb.NewBulkStoreClient(storageCc), + BlobStoreClient: resourcepb.NewBlobStoreClient(storageCc), + DiagnosticsClient: resourcepb.NewDiagnosticsClient(storageCc), } } func NewAuthlessResourceClient(cc grpc.ClientConnInterface) ResourceClient { - return newResourceClient(cc) + return newResourceClient(cc, cc) } -func NewLegacyResourceClient(channel grpc.ClientConnInterface) ResourceClient { +func NewLegacyResourceClient(channel grpc.ClientConnInterface, indexChannel grpc.ClientConnInterface) ResourceClient { cc := grpchan.InterceptClientConn(channel, grpcUtils.UnaryClientInterceptor, grpcUtils.StreamClientInterceptor) - return newResourceClient(cc) + cci := grpchan.InterceptClientConn(indexChannel, grpcUtils.UnaryClientInterceptor, grpcUtils.StreamClientInterceptor) + return newResourceClient(cc, cci) } func NewLocalResourceClient(server ResourceServer) ResourceClient { @@ -114,7 +115,7 @@ func NewLocalResourceClient(server ResourceServer) ResourceClient { ) cc := grpchan.InterceptClientConn(channel, clientInt.UnaryClientInterceptor, clientInt.StreamClientInterceptor) - return newResourceClient(cc) + return newResourceClient(cc, cc) } type RemoteResourceClientConfig struct { @@ -125,7 +126,7 @@ type RemoteResourceClientConfig struct { AllowInsecure bool } -func NewRemoteResourceClient(tracer trace.Tracer, conn grpc.ClientConnInterface, cfg RemoteResourceClientConfig) (ResourceClient, error) { +func NewRemoteResourceClient(tracer trace.Tracer, conn grpc.ClientConnInterface, indexConn grpc.ClientConnInterface, cfg RemoteResourceClientConfig) (ResourceClient, error) { exchangeOpts := []authnlib.ExchangeClientOpts{} if cfg.AllowInsecure { @@ -149,14 +150,8 @@ func NewRemoteResourceClient(tracer trace.Tracer, conn grpc.ClientConnInterface, ) cc := grpchan.InterceptClientConn(conn, clientInt.UnaryClientInterceptor, clientInt.StreamClientInterceptor) - return &resourceClient{ - ResourceStoreClient: resourcepb.NewResourceStoreClient(cc), - ResourceIndexClient: resourcepb.NewResourceIndexClient(cc), - BlobStoreClient: resourcepb.NewBlobStoreClient(cc), - BulkStoreClient: resourcepb.NewBulkStoreClient(cc), - ManagedObjectIndexClient: resourcepb.NewManagedObjectIndexClient(cc), - DiagnosticsClient: resourcepb.NewDiagnosticsClient(cc), - }, nil + cci := grpchan.InterceptClientConn(indexConn, clientInt.UnaryClientInterceptor, clientInt.StreamClientInterceptor) + return newResourceClient(cc, cci), nil } var authLogger = slog.Default().With("logger", "resource-client-auth-interceptor") diff --git a/pkg/storage/unified/sql/test/integration_test.go b/pkg/storage/unified/sql/test/integration_test.go index edddcf8ef1d..99ff031aab1 100644 --- a/pkg/storage/unified/sql/test/integration_test.go +++ b/pkg/storage/unified/sql/test/integration_test.go @@ -165,7 +165,7 @@ func TestClientServer(t *testing.T) { t.Run("Create a client", func(t *testing.T) { conn, err := unified.GrpcConn(svc.GetAddress(), prometheus.NewPedanticRegistry()) require.NoError(t, err) - client, err = resource.NewRemoteResourceClient(tracing.NewNoopTracerService(), conn, resource.RemoteResourceClientConfig{ + client, err = resource.NewRemoteResourceClient(tracing.NewNoopTracerService(), conn, conn, resource.RemoteResourceClientConfig{ Token: "some-token", TokenExchangeURL: "http://some-change-url", AllowInsecure: true,