From 2dba473015a6e7e75a5d117bf766a6c24d404d38 Mon Sep 17 00:00:00 2001 From: maicon Date: Fri, 18 Jul 2025 09:43:56 -0300 Subject: [PATCH] Feature/unified storage search dual reader (#108291) * Add UnifiedStorageSearchDualReaderEnabled feature flag Signed-off-by: Maicon Costa * Refactor UniSearch Dual Reader Signed-off-by: Maicon Costa * Run make gen-feature-toggles Signed-off-by: Maicon Costa * fix: unit tests search_client Signed-off-by: Bruno Abrantes * feat: cancels shadow search requests after 500ms Signed-off-by: Bruno Abrantes --------- Signed-off-by: Maicon Costa Signed-off-by: Bruno Abrantes Co-authored-by: Will Assis Co-authored-by: Bruno Abrantes --- .../src/types/featureToggles.gen.ts | 4 + pkg/registry/apis/dashboard/register.go | 2 +- pkg/registry/apis/dashboard/search.go | 2 +- pkg/services/apiserver/client/client.go | 5 +- .../dashboards/service/client/client.go | 9 +- .../dashboards/service/dashboard_service.go | 2 +- .../dashboardversion/dashverimpl/dashver.go | 1 + pkg/services/featuremgmt/registry.go | 8 + pkg/services/featuremgmt/toggles_gen.csv | 1 + pkg/services/featuremgmt/toggles_gen.go | 4 + pkg/services/featuremgmt/toggles_gen.json | 48 +- pkg/services/folder/folderimpl/folder.go | 2 + .../folderimpl/folder_unifiedstorage_test.go | 2 +- pkg/storage/unified/resource/search_client.go | 54 ++- .../unified/resource/search_client_test.go | 445 ++++++++++++++++++ 15 files changed, 562 insertions(+), 27 deletions(-) create mode 100644 pkg/storage/unified/resource/search_client_test.go diff --git a/packages/grafana-data/src/types/featureToggles.gen.ts b/packages/grafana-data/src/types/featureToggles.gen.ts index 1f42c837828..32cd584aa78 100644 --- a/packages/grafana-data/src/types/featureToggles.gen.ts +++ b/packages/grafana-data/src/types/featureToggles.gen.ts @@ -1055,4 +1055,8 @@ export interface FeatureToggles { * @default false */ pluginAssetProvider?: boolean; + /** + * Enable dual reader for unified storage search + */ + unifiedStorageSearchDualReaderEnabled?: boolean; } diff --git a/pkg/registry/apis/dashboard/register.go b/pkg/registry/apis/dashboard/register.go index f2fda01d1c9..8bc223b79f8 100644 --- a/pkg/registry/apis/dashboard/register.go +++ b/pkg/registry/apis/dashboard/register.go @@ -122,7 +122,7 @@ func RegisterAPIService( dbp := legacysql.NewDatabaseProvider(sql) namespacer := request.GetNamespaceMapper(cfg) legacyDashboardSearcher := legacysearcher.NewDashboardSearchClient(dashStore, sorter) - folderClient := client.NewK8sHandler(dual, request.GetNamespaceMapper(cfg), folders.FolderResourceInfo.GroupVersionResource(), restConfigProvider.GetRestConfig, dashStore, userService, unified, sorter) + folderClient := client.NewK8sHandler(dual, request.GetNamespaceMapper(cfg), folders.FolderResourceInfo.GroupVersionResource(), restConfigProvider.GetRestConfig, dashStore, userService, unified, sorter, features) builder := &DashboardsAPIBuilder{ log: log.New("grafana-apiserver.dashboards"), diff --git a/pkg/registry/apis/dashboard/search.go b/pkg/registry/apis/dashboard/search.go index 17643ce2743..4252502e9ef 100644 --- a/pkg/registry/apis/dashboard/search.go +++ b/pkg/registry/apis/dashboard/search.go @@ -43,7 +43,7 @@ type SearchHandler struct { } func NewSearchHandler(tracer trace.Tracer, dual dualwrite.Service, legacyDashboardSearcher resourcepb.ResourceIndexClient, resourceClient resource.ResourceClient, features featuremgmt.FeatureToggles) *SearchHandler { - searchClient := resource.NewSearchClient(dualwrite.NewSearchAdapter(dual), dashboardv0alpha1.DashboardResourceInfo.GroupResource(), resourceClient, legacyDashboardSearcher) + searchClient := resource.NewSearchClient(dualwrite.NewSearchAdapter(dual), dashboardv0alpha1.DashboardResourceInfo.GroupResource(), resourceClient, legacyDashboardSearcher, features) return &SearchHandler{ client: searchClient, log: log.New("grafana-apiserver.dashboards.search"), diff --git a/pkg/services/apiserver/client/client.go b/pkg/services/apiserver/client/client.go index 8aa75c22416..b909c82252d 100644 --- a/pkg/services/apiserver/client/client.go +++ b/pkg/services/apiserver/client/client.go @@ -15,6 +15,7 @@ import ( "github.com/grafana/grafana/pkg/registry/apis/dashboard/legacysearcher" "github.com/grafana/grafana/pkg/services/apiserver/endpoints/request" "github.com/grafana/grafana/pkg/services/dashboards" + "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/services/search/sort" "github.com/grafana/grafana/pkg/services/user" "github.com/grafana/grafana/pkg/storage/legacysql/dualwrite" @@ -46,9 +47,9 @@ type k8sHandler struct { } func NewK8sHandler(dual dualwrite.Service, namespacer request.NamespaceMapper, gvr schema.GroupVersionResource, - restConfig func(context.Context) (*rest.Config, error), dashStore dashboards.Store, userSvc user.Service, resourceClient resource.ResourceClient, sorter sort.Service) K8sHandler { + restConfig func(context.Context) (*rest.Config, error), dashStore dashboards.Store, userSvc user.Service, resourceClient resource.ResourceClient, sorter sort.Service, features featuremgmt.FeatureToggles) K8sHandler { legacySearcher := legacysearcher.NewDashboardSearchClient(dashStore, sorter) - searchClient := resource.NewSearchClient(dualwrite.NewSearchAdapter(dual), gvr.GroupResource(), resourceClient, legacySearcher) + searchClient := resource.NewSearchClient(dualwrite.NewSearchAdapter(dual), gvr.GroupResource(), resourceClient, legacySearcher, features) return &k8sHandler{ namespacer: namespacer, diff --git a/pkg/services/dashboards/service/client/client.go b/pkg/services/dashboards/service/client/client.go index 401d4773d16..01d0e020f62 100644 --- a/pkg/services/dashboards/service/client/client.go +++ b/pkg/services/dashboards/service/client/client.go @@ -5,7 +5,6 @@ import ( "fmt" "sync" - "github.com/prometheus/client_golang/prometheus" "go.opentelemetry.io/otel/attribute" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -18,11 +17,13 @@ import ( "github.com/grafana/grafana/pkg/services/apiserver/client" "github.com/grafana/grafana/pkg/services/apiserver/endpoints/request" "github.com/grafana/grafana/pkg/services/dashboards" + "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/services/search/sort" "github.com/grafana/grafana/pkg/services/user" "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/storage/legacysql/dualwrite" "github.com/grafana/grafana/pkg/storage/unified/resource" + "github.com/prometheus/client_golang/prometheus" ) type K8sClientFactory func(ctx context.Context, version string) client.K8sHandler @@ -44,8 +45,9 @@ func NewK8sClientWithFallback( sorter sort.Service, dual dualwrite.Service, reg prometheus.Registerer, + features featuremgmt.FeatureToggles, ) *K8sClientWithFallback { - newClientFunc := newK8sClientFactory(cfg, restConfigProvider, dashboardStore, userService, resourceClient, sorter, dual) + newClientFunc := newK8sClientFactory(cfg, restConfigProvider, dashboardStore, userService, resourceClient, sorter, dual, features) return &K8sClientWithFallback{ K8sHandler: newClientFunc(context.Background(), dashboardv0.VERSION), newClientFunc: newClientFunc, @@ -112,6 +114,7 @@ func newK8sClientFactory( resourceClient resource.ResourceClient, sorter sort.Service, dual dualwrite.Service, + features featuremgmt.FeatureToggles, ) K8sClientFactory { clientCache := make(map[string]client.K8sHandler) cacheMutex := &sync.RWMutex{} @@ -149,7 +152,7 @@ func newK8sClientFactory( } span.AddEvent("Creating new client") - newClient := client.NewK8sHandler(dual, request.GetNamespaceMapper(cfg), gvr, restConfigProvider.GetRestConfig, dashboardStore, userService, resourceClient, sorter) + newClient := client.NewK8sHandler(dual, request.GetNamespaceMapper(cfg), gvr, restConfigProvider.GetRestConfig, dashboardStore, userService, resourceClient, sorter, features) clientCache[version] = newClient return newClient diff --git a/pkg/services/dashboards/service/dashboard_service.go b/pkg/services/dashboards/service/dashboard_service.go index e3d17b5bedf..82ca60b6940 100644 --- a/pkg/services/dashboards/service/dashboard_service.go +++ b/pkg/services/dashboards/service/dashboard_service.go @@ -387,7 +387,7 @@ func ProvideDashboardServiceImpl( serverLockService *serverlock.ServerLockService, kvstore kvstore.KVStore, ) (*DashboardServiceImpl, error) { - k8sclient := dashboardclient.NewK8sClientWithFallback(cfg, restConfigProvider, dashboardStore, userService, resourceClient, sorter, dual, r) + k8sclient := dashboardclient.NewK8sClientWithFallback(cfg, restConfigProvider, dashboardStore, userService, resourceClient, sorter, dual, r, features) dashSvc := &DashboardServiceImpl{ cfg: cfg, log: log.New("dashboard-service"), diff --git a/pkg/services/dashboardversion/dashverimpl/dashver.go b/pkg/services/dashboardversion/dashverimpl/dashver.go index c36288073b3..9b16131262e 100644 --- a/pkg/services/dashboardversion/dashverimpl/dashver.go +++ b/pkg/services/dashboardversion/dashverimpl/dashver.go @@ -61,6 +61,7 @@ func ProvideService(cfg *setting.Cfg, db db.DB, dashboardService dashboards.Dash userService, unified, sorter, + features, ), dashSvc: dashboardService, log: log.New("dashboard-version"), diff --git a/pkg/services/featuremgmt/registry.go b/pkg/services/featuremgmt/registry.go index 0663e997deb..e72d7e68035 100644 --- a/pkg/services/featuremgmt/registry.go +++ b/pkg/services/featuremgmt/registry.go @@ -1819,6 +1819,14 @@ var ( Expression: "false", RequiresRestart: true, }, + { + Name: "unifiedStorageSearchDualReaderEnabled", + Description: "Enable dual reader for unified storage search", + Stage: FeatureStageExperimental, + Owner: grafanaSearchAndStorageSquad, + HideFromAdminPage: true, + HideFromDocs: true, + }, } ) diff --git a/pkg/services/featuremgmt/toggles_gen.csv b/pkg/services/featuremgmt/toggles_gen.csv index 56fbe4a7545..b1d89c6daa3 100644 --- a/pkg/services/featuremgmt/toggles_gen.csv +++ b/pkg/services/featuremgmt/toggles_gen.csv @@ -235,3 +235,4 @@ enablePluginImporter,experimental,@grafana/plugins-platform-backend,false,false, otelLogsFormatting,experimental,@grafana/observability-logs,false,false,true alertingNotificationHistory,experimental,@grafana/alerting-squad,false,false,false pluginAssetProvider,experimental,@grafana/plugins-platform-backend,false,true,false +unifiedStorageSearchDualReaderEnabled,experimental,@grafana/search-and-storage,false,false,false diff --git a/pkg/services/featuremgmt/toggles_gen.go b/pkg/services/featuremgmt/toggles_gen.go index 151ced45e79..9fe23daeb2d 100644 --- a/pkg/services/featuremgmt/toggles_gen.go +++ b/pkg/services/featuremgmt/toggles_gen.go @@ -950,4 +950,8 @@ const ( // FlagPluginAssetProvider // Allows decoupled core plugins to load from the Grafana CDN FlagPluginAssetProvider = "pluginAssetProvider" + + // FlagUnifiedStorageSearchDualReaderEnabled + // Enable dual reader for unified storage search + FlagUnifiedStorageSearchDualReaderEnabled = "unifiedStorageSearchDualReaderEnabled" ) diff --git a/pkg/services/featuremgmt/toggles_gen.json b/pkg/services/featuremgmt/toggles_gen.json index 588999aa8a5..e3566c2dd9f 100644 --- a/pkg/services/featuremgmt/toggles_gen.json +++ b/pkg/services/featuremgmt/toggles_gen.json @@ -2450,6 +2450,22 @@ "requiresRestart": true } }, + { + "metadata": { + "name": "pluginAssetProvider", + "resourceVersion": "1752486584712", + "creationTimestamp": "2025-07-14T09:49:44Z" + }, + "spec": { + "description": "Allows decoupled core plugins to load from the Grafana CDN", + "stage": "experimental", + "codeowner": "@grafana/plugins-platform-backend", + "requiresRestart": true, + "hideFromAdminPage": true, + "hideFromDocs": true, + "expression": "false" + } + }, { "metadata": { "name": "pluginLoadingRefactor", @@ -2467,22 +2483,6 @@ "expression": "false" } }, - { - "metadata": { - "name": "pluginAssetProvider", - "resourceVersion": "1752486584712", - "creationTimestamp": "2025-07-14T09:49:44Z" - }, - "spec": { - "description": "Allows decoupled core plugins to load from the Grafana CDN", - "stage": "experimental", - "codeowner": "@grafana/plugins-platform-backend", - "requiresRestart": true, - "hideFromAdminPage": true, - "hideFromDocs": true, - "expression": "false" - } - }, { "metadata": { "name": "pluginProxyPreserveTrailingSlash", @@ -3350,6 +3350,20 @@ "hideFromDocs": true } }, + { + "metadata": { + "name": "unifiedStorageSearchDualReaderEnabled", + "resourceVersion": "1752500336818", + "creationTimestamp": "2025-07-14T13:38:56Z" + }, + "spec": { + "description": "Enable dual reader for unified storage search", + "stage": "experimental", + "codeowner": "@grafana/search-and-storage", + "hideFromAdminPage": true, + "hideFromDocs": true + } + }, { "metadata": { "name": "unifiedStorageSearchPermissionFiltering", @@ -3436,4 +3450,4 @@ } } ] -} +} \ No newline at end of file diff --git a/pkg/services/folder/folderimpl/folder.go b/pkg/services/folder/folderimpl/folder.go index 68e570cdb6d..4690d6c4640 100644 --- a/pkg/services/folder/folderimpl/folder.go +++ b/pkg/services/folder/folderimpl/folder.go @@ -122,6 +122,7 @@ func ProvideService( userService, resourceClient, sorter, + features, ) unifiedStore := ProvideUnifiedStore(k8sHandler, userService, tracer) @@ -140,6 +141,7 @@ func ProvideService( userService, resourceClient, sorter, + features, ) srv.dashboardK8sClient = dashHandler } diff --git a/pkg/services/folder/folderimpl/folder_unifiedstorage_test.go b/pkg/services/folder/folderimpl/folder_unifiedstorage_test.go index 738e1e98f56..44bd735445f 100644 --- a/pkg/services/folder/folderimpl/folder_unifiedstorage_test.go +++ b/pkg/services/folder/folderimpl/folder_unifiedstorage_test.go @@ -199,7 +199,7 @@ func TestIntegrationFolderServiceViaUnifiedStorage(t *testing.T) { tracer := noop.NewTracerProvider().Tracer("TestIntegrationFolderServiceViaUnifiedStorage") dashboardStore := dashboards.NewFakeDashboardStore(t) - k8sCli := client.NewK8sHandler(dualwrite.ProvideTestService(), request.GetNamespaceMapper(cfg), folderv1.FolderResourceInfo.GroupVersionResource(), restCfgProvider.GetRestConfig, dashboardStore, userService, nil, sort.ProvideService()) + k8sCli := client.NewK8sHandler(dualwrite.ProvideTestService(), request.GetNamespaceMapper(cfg), folderv1.FolderResourceInfo.GroupVersionResource(), restCfgProvider.GetRestConfig, dashboardStore, userService, nil, sort.ProvideService(), nil) unifiedStore := ProvideUnifiedStore(k8sCli, userService, tracer) ctx := context.Background() diff --git a/pkg/storage/unified/resource/search_client.go b/pkg/storage/unified/resource/search_client.go index 5679711e9e3..58d743da306 100644 --- a/pkg/storage/unified/resource/search_client.go +++ b/pkg/storage/unified/resource/search_client.go @@ -2,26 +2,36 @@ package resource import ( "context" + "time" "google.golang.org/grpc" "k8s.io/apimachinery/pkg/runtime/schema" + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/storage/unified/resourcepb" ) +const ( + // backgroundRequestTimeout is the timeout for background shadow traffic requests + backgroundRequestTimeout = 500 * time.Millisecond +) + type DualWriter interface { IsEnabled(schema.GroupResource) bool ReadFromUnified(context.Context, schema.GroupResource) (bool, error) } func NewSearchClient(dual DualWriter, gr schema.GroupResource, unifiedClient resourcepb.ResourceIndexClient, - legacyClient resourcepb.ResourceIndexClient) resourcepb.ResourceIndexClient { + legacyClient resourcepb.ResourceIndexClient, features featuremgmt.FeatureToggles) resourcepb.ResourceIndexClient { if dual.IsEnabled(gr) { return &searchWrapper{ dual: dual, groupResource: gr, unifiedClient: unifiedClient, legacyClient: legacyClient, + features: features, + logger: log.New("unified-storage.search-client"), } } //nolint:errcheck @@ -37,6 +47,8 @@ type searchWrapper struct { unifiedClient resourcepb.ResourceIndexClient legacyClient resourcepb.ResourceIndexClient + features featuremgmt.FeatureToggles + logger log.Logger } func (s *searchWrapper) GetStats(ctx context.Context, in *resourcepb.ResourceStatsRequest, @@ -49,6 +61,26 @@ func (s *searchWrapper) GetStats(ctx context.Context, in *resourcepb.ResourceSta if unified { client = s.unifiedClient } + + // If dual reader feature flag is enabled, and legacy is the main storage, + // make a background call to unified + if s.features != nil && s.features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageSearchDualReaderEnabled) && !unified { + // Create background context with timeout but ignore parent cancelation + ctxBg := context.WithoutCancel(ctx) + ctxBgWithTimeout, cancel := context.WithTimeout(ctxBg, backgroundRequestTimeout) + + // Make background call without blocking the main request + go func() { + defer cancel() // Ensure we clean up the context + _, bgErr := s.unifiedClient.GetStats(ctxBgWithTimeout, in, opts...) + if bgErr != nil { + s.logger.Error("Background GetStats call to unified failed", "error", bgErr, "timeout", backgroundRequestTimeout) + } else { + s.logger.Debug("Background GetStats call to unified succeeded", "timeout", backgroundRequestTimeout) + } + }() + } + return client.GetStats(ctx, in, opts...) } @@ -62,5 +94,25 @@ func (s *searchWrapper) Search(ctx context.Context, in *resourcepb.ResourceSearc if unified { client = s.unifiedClient } + + // If dual reader feature flag is enabled, and legacy is the main storage, + // make a background call to unified + if s.features != nil && s.features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageSearchDualReaderEnabled) && !unified { + // Create background context with timeout but ignore parent cancelation + ctxBg := context.WithoutCancel(ctx) + ctxBgWithTimeout, cancel := context.WithTimeout(ctxBg, backgroundRequestTimeout) + + // Make background call without blocking the main request + go func() { + defer cancel() // Ensure we clean up the context + _, bgErr := s.unifiedClient.Search(ctxBgWithTimeout, in, opts...) + if bgErr != nil { + s.logger.Error("Background Search call to unified failed", "error", bgErr, "timeout", backgroundRequestTimeout) + } else { + s.logger.Debug("Background Search call to unified succeeded", "timeout", backgroundRequestTimeout) + } + }() + } + return client.Search(ctx, in, opts...) } diff --git a/pkg/storage/unified/resource/search_client_test.go b/pkg/storage/unified/resource/search_client_test.go new file mode 100644 index 00000000000..c46093b45c9 --- /dev/null +++ b/pkg/storage/unified/resource/search_client_test.go @@ -0,0 +1,445 @@ +package resource + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "k8s.io/apimachinery/pkg/runtime/schema" + + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/services/featuremgmt" + "github.com/grafana/grafana/pkg/storage/unified/resourcepb" + "github.com/grafana/grafana/pkg/util/testutil" +) + +// Mock DualWriter +type MockDualWriter struct { + mock.Mock +} + +func (m *MockDualWriter) IsEnabled(gr schema.GroupResource) bool { + args := m.Called(gr) + return args.Bool(0) +} + +func (m *MockDualWriter) ReadFromUnified(ctx context.Context, gr schema.GroupResource) (bool, error) { + args := m.Called(ctx, gr) + return args.Bool(0), args.Error(1) +} + +// Mock ResourceIndexClient with enhanced timeout testing capabilities +type MockResourceIndexClient struct { + mock.Mock + searchCalled chan struct{} + statsCalled chan struct{} + searchDelay time.Duration + statsDelay time.Duration + contextCanceled chan context.Context +} + +func NewMockResourceIndexClient() *MockResourceIndexClient { + return &MockResourceIndexClient{ + searchCalled: make(chan struct{}, 1), + statsCalled: make(chan struct{}, 1), + contextCanceled: make(chan context.Context, 10), // Buffer for multiple calls + } +} + +func (m *MockResourceIndexClient) SetSearchDelay(delay time.Duration) { + m.searchDelay = delay +} + +func (m *MockResourceIndexClient) SetStatsDelay(delay time.Duration) { + m.statsDelay = delay +} + +func (m *MockResourceIndexClient) Search(ctx context.Context, in *resourcepb.ResourceSearchRequest, opts ...grpc.CallOption) (*resourcepb.ResourceSearchResponse, error) { + args := m.Called(ctx, in, opts) + + // Simulate delay if configured + if m.searchDelay > 0 { + select { + case <-time.After(m.searchDelay): + // Delay completed normally + case <-ctx.Done(): + // Context was canceled during delay + m.contextCanceled <- ctx + return nil, ctx.Err() + } + } + + // Signal that Search was called + select { + case m.searchCalled <- struct{}{}: + default: + } + + return args.Get(0).(*resourcepb.ResourceSearchResponse), args.Error(1) +} + +func (m *MockResourceIndexClient) GetStats(ctx context.Context, in *resourcepb.ResourceStatsRequest, opts ...grpc.CallOption) (*resourcepb.ResourceStatsResponse, error) { + args := m.Called(ctx, in, opts) + + // Simulate delay if configured + if m.statsDelay > 0 { + select { + case <-time.After(m.statsDelay): + // Delay completed normally + case <-ctx.Done(): + // Context was canceled during delay + m.contextCanceled <- ctx + return nil, ctx.Err() + } + } + + // Signal that GetStats was called + select { + case m.statsCalled <- struct{}{}: + default: + } + + return args.Get(0).(*resourcepb.ResourceStatsResponse), args.Error(1) +} + +func setupTestSearchClient(t *testing.T) (schema.GroupResource, *MockResourceIndexClient, *MockResourceIndexClient, featuremgmt.FeatureToggles) { + t.Helper() + gr := schema.GroupResource{Group: "test", Resource: "items"} + unifiedClient := NewMockResourceIndexClient() + legacyClient := NewMockResourceIndexClient() + features := featuremgmt.WithFeatures() + return gr, unifiedClient, legacyClient, features +} + +func setupTestSearchWrapper(t *testing.T, dual *MockDualWriter, unifiedClient, legacyClient *MockResourceIndexClient, features featuremgmt.FeatureToggles, gr schema.GroupResource) *searchWrapper { + t.Helper() + return &searchWrapper{ + dual: dual, + groupResource: gr, + unifiedClient: unifiedClient, + legacyClient: legacyClient, + features: features, + logger: log.NewNopLogger(), + } +} + +func TestSearchClient_NewSearchClient(t *testing.T) { + gr, unifiedClient, legacyClient, features := setupTestSearchClient(t) + + t.Run("returns wrapper when dual writer is enabled", func(t *testing.T) { + dual := &MockDualWriter{} // Create fresh mock for this test + dual.On("IsEnabled", gr).Return(true) + + client := NewSearchClient(dual, gr, unifiedClient, legacyClient, features) + + wrapper, ok := client.(*searchWrapper) + require.True(t, ok) + assert.Equal(t, dual, wrapper.dual) + assert.Equal(t, gr, wrapper.groupResource) + assert.Equal(t, unifiedClient, wrapper.unifiedClient) + assert.Equal(t, legacyClient, wrapper.legacyClient) + + dual.AssertExpectations(t) + }) + + t.Run("returns unified client when dual writer disabled but read from unified", func(t *testing.T) { + dual := &MockDualWriter{} // Create fresh mock for this test + dual.On("IsEnabled", gr).Return(false) + dual.On("ReadFromUnified", mock.Anything, gr).Return(true, nil) + + client := NewSearchClient(dual, gr, unifiedClient, legacyClient, features) + + assert.Equal(t, unifiedClient, client) + dual.AssertExpectations(t) + }) + + t.Run("returns legacy client when dual writer disabled and not reading from unified", func(t *testing.T) { + dual := &MockDualWriter{} // Create fresh mock for this test + dual.On("IsEnabled", gr).Return(false) + dual.On("ReadFromUnified", mock.Anything, gr).Return(false, nil) + + client := NewSearchClient(dual, gr, unifiedClient, legacyClient, features) + + assert.Equal(t, legacyClient, client) + dual.AssertExpectations(t) + }) +} + +func TestSearchWrapper_Search(t *testing.T) { + gr, unifiedClient, legacyClient, features := setupTestSearchClient(t) + req := &resourcepb.ResourceSearchRequest{Query: "test"} + expectedResponse := &resourcepb.ResourceSearchResponse{TotalHits: 0} + + t.Run("uses unified client when reading from unified", func(t *testing.T) { + ctx := testutil.NewDefaultTestContext(t) + dual := &MockDualWriter{} + + dual.On("ReadFromUnified", mock.Anything, gr).Return(true, nil) + unifiedClient.On("Search", mock.Anything, req, mock.Anything).Return(expectedResponse, nil) + + wrapper := setupTestSearchWrapper(t, dual, unifiedClient, legacyClient, features, gr) + + resp, err := wrapper.Search(ctx, req) + + require.NoError(t, err) + assert.Equal(t, expectedResponse, resp) + + dual.AssertExpectations(t) + unifiedClient.AssertExpectations(t) + legacyClient.AssertNotCalled(t, "Search") + }) + + t.Run("uses legacy client when not reading from unified", func(t *testing.T) { + ctx := testutil.NewDefaultTestContext(t) + dual := &MockDualWriter{} + + dual.On("ReadFromUnified", mock.Anything, gr).Return(false, nil) + legacyClient.On("Search", mock.Anything, req, mock.Anything).Return(expectedResponse, nil) + + wrapper := setupTestSearchWrapper(t, dual, unifiedClient, legacyClient, features, gr) + + resp, err := wrapper.Search(ctx, req) + + require.NoError(t, err) + assert.Equal(t, expectedResponse, resp) + + dual.AssertExpectations(t) + legacyClient.AssertExpectations(t) + unifiedClient.AssertNotCalled(t, "Search") + }) + + t.Run("makes background call to unified when feature flag enabled and using legacy", func(t *testing.T) { + ctx := testutil.NewDefaultTestContext(t) + dual := &MockDualWriter{} + featuresWithFlag := featuremgmt.WithFeatures(featuremgmt.FlagUnifiedStorageSearchDualReaderEnabled) + + dual.On("ReadFromUnified", mock.Anything, gr).Return(false, nil) + legacyClient.On("Search", mock.Anything, req, mock.Anything).Return(expectedResponse, nil) + + // Expect background call to unified client + unifiedBgResponse := &resourcepb.ResourceSearchResponse{TotalHits: 0} + unifiedClient.On("Search", mock.Anything, req, mock.Anything).Return(unifiedBgResponse, nil) + + wrapper := setupTestSearchWrapper(t, dual, unifiedClient, legacyClient, featuresWithFlag, gr) + + resp, err := wrapper.Search(ctx, req) + + require.NoError(t, err) + assert.Equal(t, expectedResponse, resp) + + // Wait for background goroutine to complete + select { + case <-unifiedClient.searchCalled: + // Background call was made + case <-time.After(100 * time.Millisecond): + t.Fatal("Background unified client call was not made within timeout") + } + + dual.AssertExpectations(t) + legacyClient.AssertExpectations(t) + unifiedClient.AssertExpectations(t) + }) + + t.Run("handles background call error gracefully", func(t *testing.T) { + ctx := testutil.NewDefaultTestContext(t) + dual := &MockDualWriter{} + featuresWithFlag := featuremgmt.WithFeatures(featuremgmt.FlagUnifiedStorageSearchDualReaderEnabled) + + dual.On("ReadFromUnified", mock.Anything, gr).Return(false, nil) + legacyClient.On("Search", mock.Anything, req, mock.Anything).Return(expectedResponse, nil) + + // Background call returns error - should be handled gracefully + unifiedClient.On("Search", mock.Anything, req, mock.Anything).Return((*resourcepb.ResourceSearchResponse)(nil), assert.AnError) + + wrapper := setupTestSearchWrapper(t, dual, unifiedClient, legacyClient, featuresWithFlag, gr) + + resp, err := wrapper.Search(ctx, req) + + // Main request should still succeed despite background error + require.NoError(t, err) + assert.Equal(t, expectedResponse, resp) + + // Wait for background goroutine to complete + select { + case <-unifiedClient.searchCalled: + // Background call was made (even though it failed) + case <-time.After(100 * time.Millisecond): + t.Fatal("Background unified client call was not made within timeout") + } + + dual.AssertExpectations(t) + legacyClient.AssertExpectations(t) + unifiedClient.AssertExpectations(t) + }) + + t.Run("background request times out after 500ms", func(t *testing.T) { + ctx := testutil.NewDefaultTestContext(t) + dual := &MockDualWriter{} + featuresWithFlag := featuremgmt.WithFeatures(featuremgmt.FlagUnifiedStorageSearchDualReaderEnabled) + + dual.On("ReadFromUnified", mock.Anything, gr).Return(false, nil) + legacyClient.On("Search", mock.Anything, req, mock.Anything).Return(expectedResponse, nil) + + // Configure unified client to take longer than the 500ms timeout + unifiedClient.SetSearchDelay(600 * time.Millisecond) // Longer than 500ms timeout + unifiedClient.On("Search", mock.Anything, req, mock.Anything).Return((*resourcepb.ResourceSearchResponse)(nil), context.DeadlineExceeded) + + wrapper := setupTestSearchWrapper(t, dual, unifiedClient, legacyClient, featuresWithFlag, gr) + + start := time.Now() + resp, err := wrapper.Search(ctx, req) + mainRequestDuration := time.Since(start) + + // Main request should succeed quickly despite background timeout + require.NoError(t, err) + assert.Equal(t, expectedResponse, resp) + assert.Less(t, mainRequestDuration, 50*time.Millisecond, "Main request should not be blocked by background timeout") + + // Wait for background context to be canceled + select { + case canceledCtx := <-unifiedClient.contextCanceled: + assert.Error(t, canceledCtx.Err(), "Background context should be canceled") + assert.Equal(t, context.DeadlineExceeded, canceledCtx.Err()) + case <-time.After(700 * time.Millisecond): + t.Fatal("Background request should have been canceled due to timeout") + } + + dual.AssertExpectations(t) + legacyClient.AssertExpectations(t) + unifiedClient.AssertExpectations(t) + }) + + t.Run("background request completes successfully when within timeout", func(t *testing.T) { + ctx := testutil.NewDefaultTestContext(t) + dual := &MockDualWriter{} + featuresWithFlag := featuremgmt.WithFeatures(featuremgmt.FlagUnifiedStorageSearchDualReaderEnabled) + + dual.On("ReadFromUnified", mock.Anything, gr).Return(false, nil) + legacyClient.On("Search", mock.Anything, req, mock.Anything).Return(expectedResponse, nil) + + // Configure unified client to respond within the 500ms timeout + unifiedClient.SetSearchDelay(100 * time.Millisecond) // Well within 500ms timeout + unifiedClient.On("Search", mock.Anything, req, mock.Anything).Return(&resourcepb.ResourceSearchResponse{TotalHits: 0}, nil) + + wrapper := setupTestSearchWrapper(t, dual, unifiedClient, legacyClient, featuresWithFlag, gr) + + start := time.Now() + resp, err := wrapper.Search(ctx, req) + mainRequestDuration := time.Since(start) + + // Main request should succeed quickly + require.NoError(t, err) + assert.Equal(t, expectedResponse, resp) + assert.Less(t, mainRequestDuration, 50*time.Millisecond, "Main request should not be blocked") + + // Wait for successful background call + select { + case <-unifiedClient.searchCalled: + // Background call completed successfully + case <-time.After(200 * time.Millisecond): + t.Fatal("Expected successful background call") + } + + dual.AssertExpectations(t) + legacyClient.AssertExpectations(t) + unifiedClient.AssertExpectations(t) + }) +} + +func TestSearchWrapper_GetStats(t *testing.T) { + gr, unifiedClient, legacyClient, features := setupTestSearchClient(t) + req := &resourcepb.ResourceStatsRequest{Namespace: "test"} + expectedResponse := &resourcepb.ResourceStatsResponse{Stats: []*resourcepb.ResourceStatsResponse_Stats{{Count: 100}}} + + t.Run("uses unified client when reading from unified", func(t *testing.T) { + ctx := testutil.NewDefaultTestContext(t) + dual := &MockDualWriter{} + + dual.On("ReadFromUnified", mock.Anything, gr).Return(true, nil) + unifiedClient.On("GetStats", mock.Anything, req, mock.Anything).Return(expectedResponse, nil) + + wrapper := setupTestSearchWrapper(t, dual, unifiedClient, legacyClient, features, gr) + + resp, err := wrapper.GetStats(ctx, req) + + require.NoError(t, err) + assert.Equal(t, expectedResponse, resp) + + dual.AssertExpectations(t) + unifiedClient.AssertExpectations(t) + legacyClient.AssertNotCalled(t, "GetStats") + }) + + t.Run("makes background call to unified when feature flag enabled and using legacy", func(t *testing.T) { + ctx := testutil.NewDefaultTestContext(t) + dual := &MockDualWriter{} + featuresWithFlag := featuremgmt.WithFeatures(featuremgmt.FlagUnifiedStorageSearchDualReaderEnabled) + + dual.On("ReadFromUnified", mock.Anything, gr).Return(false, nil) + legacyClient.On("GetStats", mock.Anything, req, mock.Anything).Return(expectedResponse, nil) + + // Expect background call to unified client + unifiedBgResponse := &resourcepb.ResourceStatsResponse{Stats: []*resourcepb.ResourceStatsResponse_Stats{{Count: 50}}} + unifiedClient.On("GetStats", mock.Anything, req, mock.Anything).Return(unifiedBgResponse, nil) + + wrapper := setupTestSearchWrapper(t, dual, unifiedClient, legacyClient, featuresWithFlag, gr) + + resp, err := wrapper.GetStats(ctx, req) + + require.NoError(t, err) + assert.Equal(t, expectedResponse, resp) + + // Wait for background goroutine to complete + select { + case <-unifiedClient.statsCalled: + // Background call was made + case <-time.After(100 * time.Millisecond): + t.Fatal("Background unified client GetStats call was not made within timeout") + } + + dual.AssertExpectations(t) + legacyClient.AssertExpectations(t) + unifiedClient.AssertExpectations(t) + }) + + t.Run("background GetStats request times out after 500ms", func(t *testing.T) { + ctx := testutil.NewDefaultTestContext(t) + dual := &MockDualWriter{} + featuresWithFlag := featuremgmt.WithFeatures(featuremgmt.FlagUnifiedStorageSearchDualReaderEnabled) + + dual.On("ReadFromUnified", mock.Anything, gr).Return(false, nil) + legacyClient.On("GetStats", mock.Anything, req, mock.Anything).Return(expectedResponse, nil) + + // Configure unified client to take longer than the 500ms timeout + unifiedClient.SetStatsDelay(600 * time.Millisecond) // Longer than 500ms timeout + unifiedClient.On("GetStats", mock.Anything, req, mock.Anything).Return((*resourcepb.ResourceStatsResponse)(nil), context.DeadlineExceeded) + + wrapper := setupTestSearchWrapper(t, dual, unifiedClient, legacyClient, featuresWithFlag, gr) + + start := time.Now() + resp, err := wrapper.GetStats(ctx, req) + mainRequestDuration := time.Since(start) + + // Main request should succeed quickly despite background timeout + require.NoError(t, err) + assert.Equal(t, expectedResponse, resp) + assert.Less(t, mainRequestDuration, 50*time.Millisecond, "Main request should not be blocked by background timeout") + + // Wait for background context to be canceled + select { + case canceledCtx := <-unifiedClient.contextCanceled: + assert.Error(t, canceledCtx.Err(), "Background context should be canceled") + assert.Equal(t, context.DeadlineExceeded, canceledCtx.Err()) + case <-time.After(700 * time.Millisecond): + t.Fatal("Background request should have been canceled due to timeout") + } + + dual.AssertExpectations(t) + legacyClient.AssertExpectations(t) + unifiedClient.AssertExpectations(t) + }) +}