diff --git a/pkg/server/module_server.go b/pkg/server/module_server.go index 5c420b9d219..d40eec8b965 100644 --- a/pkg/server/module_server.go +++ b/pkg/server/module_server.go @@ -45,8 +45,9 @@ func NewModule(opts Options, tracer tracing.Tracer, // Ensures tracing is initialized license licensing.Licensing, moduleRegisterer ModuleRegisterer, + storageBackend resource.StorageBackend, // Ensures unified storage backend is initialized ) (*ModuleServer, error) { - s, err := newModuleServer(opts, apiOpts, features, cfg, storageMetrics, indexMetrics, reg, promGatherer, license, moduleRegisterer) + s, err := newModuleServer(opts, apiOpts, features, cfg, storageMetrics, indexMetrics, reg, promGatherer, license, moduleRegisterer, storageBackend) if err != nil { return nil, err } @@ -68,6 +69,7 @@ func newModuleServer(opts Options, promGatherer prometheus.Gatherer, license licensing.Licensing, moduleRegisterer ModuleRegisterer, + storageBackend resource.StorageBackend, ) (*ModuleServer, error) { rootCtx, shutdownFn := context.WithCancel(context.Background()) @@ -90,6 +92,7 @@ func newModuleServer(opts Options, registerer: reg, license: license, moduleRegisterer: moduleRegisterer, + storageBackend: storageBackend, } return s, nil @@ -111,6 +114,7 @@ type ModuleServer struct { shutdownFinished chan struct{} isInitialized bool mtx sync.Mutex + storageBackend resource.StorageBackend storageMetrics *resource.StorageMetrics indexMetrics *resource.BleveIndexMetrics license licensing.Licensing @@ -193,7 +197,7 @@ func (s *ModuleServer) Run() error { if err != nil { return nil, err } - return sql.ProvideUnifiedStorageGrpcService(s.cfg, s.features, nil, s.log, s.registerer, docBuilders, s.storageMetrics, s.indexMetrics, s.searchServerRing, s.MemberlistKVConfig, s.httpServerRouter) + return sql.ProvideUnifiedStorageGrpcService(s.cfg, s.features, nil, s.log, s.registerer, docBuilders, s.storageMetrics, s.indexMetrics, s.searchServerRing, s.MemberlistKVConfig, s.httpServerRouter, s.storageBackend) }) m.RegisterModule(modules.ZanzanaServer, func() (services.Service, error) { diff --git a/pkg/server/search_server_distributor_test.go b/pkg/server/search_server_distributor_test.go index e7304876688..520443bc15f 100644 --- a/pkg/server/search_server_distributor_test.go +++ b/pkg/server/search_server_distributor_test.go @@ -326,7 +326,7 @@ func initModuleServerForTest( ) testModuleServer { tracer := tracing.InitializeTracerForTest() - ms, err := NewModule(opts, apiOpts, featuremgmt.WithFeatures(featuremgmt.FlagUnifiedStorageSearch), cfg, nil, nil, prometheus.NewRegistry(), prometheus.DefaultGatherer, tracer, nil, ProvideNoopModuleRegisterer()) + ms, err := NewModule(opts, apiOpts, featuremgmt.WithFeatures(featuremgmt.FlagUnifiedStorageSearch), cfg, nil, nil, prometheus.NewRegistry(), prometheus.DefaultGatherer, tracer, nil, ProvideNoopModuleRegisterer(), nil) require.NoError(t, err) conn, err := grpc.NewClient(cfg.GRPCServer.Address, diff --git a/pkg/server/wire_gen.go b/pkg/server/wire_gen.go index cd020ea22f3..4892e82118f 100644 --- a/pkg/server/wire_gen.go +++ b/pkg/server/wire_gen.go @@ -258,6 +258,7 @@ import ( "github.com/grafana/grafana/pkg/storage/unified" "github.com/grafana/grafana/pkg/storage/unified/resource" "github.com/grafana/grafana/pkg/storage/unified/search" + "github.com/grafana/grafana/pkg/storage/unified/sql" "github.com/grafana/grafana/pkg/tsdb/azuremonitor" "github.com/grafana/grafana/pkg/tsdb/cloud-monitoring" "github.com/grafana/grafana/pkg/tsdb/cloudwatch" @@ -1622,7 +1623,11 @@ func InitializeModuleServer(cfg *setting.Cfg, opts Options, apiOpts api.ServerOp hooksService := hooks.ProvideService() ossLicensingService := licensing.ProvideService(cfg, hooksService) moduleRegisterer := ProvideNoopModuleRegisterer() - moduleServer, err := NewModule(opts, apiOpts, featureToggles, cfg, storageMetrics, bleveIndexMetrics, registerer, gatherer, tracingService, ossLicensingService, moduleRegisterer) + storageBackend, err := sql.ProvideStorageBackend(cfg) + if err != nil { + return nil, err + } + moduleServer, err := NewModule(opts, apiOpts, featureToggles, cfg, storageMetrics, bleveIndexMetrics, registerer, gatherer, tracingService, ossLicensingService, moduleRegisterer, storageBackend) if err != nil { return nil, err } diff --git a/pkg/server/wireexts_oss.go b/pkg/server/wireexts_oss.go index 6b2163f4e34..1152431b320 100644 --- a/pkg/server/wireexts_oss.go +++ b/pkg/server/wireexts_oss.go @@ -64,6 +64,7 @@ import ( "github.com/grafana/grafana/pkg/storage/unified" "github.com/grafana/grafana/pkg/storage/unified/resource" search2 "github.com/grafana/grafana/pkg/storage/unified/search" + "github.com/grafana/grafana/pkg/storage/unified/sql" ) var provisioningExtras = wire.NewSet( @@ -141,6 +142,7 @@ var wireExtsBasicSet = wire.NewSet( wire.Bind(new(sandbox.Sandbox), new(*sandbox.Service)), wire.Struct(new(unified.Options), "*"), unified.ProvideUnifiedStorageClient, + sql.ProvideStorageBackend, builder.ProvideDefaultBuildHandlerChainFuncFromBuilders, aggregatorrunner.ProvideNoopAggregatorConfigurator, apisregistry.WireSetExts, @@ -193,6 +195,7 @@ var wireExtsModuleServerSet = wire.NewSet( resource.ProvideIndexMetrics, // Overriden by enterprise ProvideNoopModuleRegisterer, + sql.ProvideStorageBackend, ) var wireExtsStandaloneAPIServerSet = wire.NewSet( diff --git a/pkg/storage/unified/resource/storage_backend.go b/pkg/storage/unified/resource/storage_backend.go index 9be89511ff0..0ec839b5a3f 100644 --- a/pkg/storage/unified/resource/storage_backend.go +++ b/pkg/storage/unified/resource/storage_backend.go @@ -50,7 +50,7 @@ type kvStorageBackend struct { var _ StorageBackend = &kvStorageBackend{} -type KvBackendOptions struct { +type KVBackendOptions struct { KvStore KV WithPruner bool EventRetentionPeriod time.Duration // How long to keep events (default: 1 hour) @@ -59,7 +59,7 @@ type KvBackendOptions struct { Reg prometheus.Registerer // TODO add metrics } -func NewKvStorageBackend(opts KvBackendOptions) (StorageBackend, error) { +func NewKVStorageBackend(opts KVBackendOptions) (StorageBackend, error) { ctx := context.Background() kv := opts.KvStore diff --git a/pkg/storage/unified/resource/storage_backend_test.go b/pkg/storage/unified/resource/storage_backend_test.go index 9a0369ef544..8ec4bac7f71 100644 --- a/pkg/storage/unified/resource/storage_backend_test.go +++ b/pkg/storage/unified/resource/storage_backend_test.go @@ -27,11 +27,11 @@ var appsNamespace = NamespacedResource{ func setupTestStorageBackend(t *testing.T) *kvStorageBackend { kv := setupTestKV(t) - opts := KvBackendOptions{ + opts := KVBackendOptions{ KvStore: kv, WithPruner: true, } - backend, err := NewKvStorageBackend(opts) + backend, err := NewKVStorageBackend(opts) kvBackend := backend.(*kvStorageBackend) require.NoError(t, err) return kvBackend diff --git a/pkg/storage/unified/sql/backend.go b/pkg/storage/unified/sql/backend.go index adc9e987f91..823ec1092e7 100644 --- a/pkg/storage/unified/sql/backend.go +++ b/pkg/storage/unified/sql/backend.go @@ -20,6 +20,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime/schema" + "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/util/sqlite" "github.com/grafana/grafana-app-sdk/logging" @@ -37,6 +38,14 @@ const defaultPollingInterval = 100 * time.Millisecond const defaultWatchBufferSize = 100 // number of events to buffer in the watch stream const defaultPrunerHistoryLimit = 20 +func ProvideStorageBackend( + cfg *setting.Cfg, +) (resource.StorageBackend, error) { + // TODO: make this the central place to provide SQL backend + // Currently it is skipped as we need to handle the cases of Diagnostics and Lifecycle + return nil, nil +} + type Backend interface { resource.StorageBackend resourcepb.DiagnosticsServer diff --git a/pkg/storage/unified/sql/server.go b/pkg/storage/unified/sql/server.go index 83424b0edea..7520a2aaf36 100644 --- a/pkg/storage/unified/sql/server.go +++ b/pkg/storage/unified/sql/server.go @@ -30,6 +30,7 @@ type QOSEnqueueDequeuer interface { // ServerOptions contains the options for creating a new ResourceServer type ServerOptions struct { + Backend resource.StorageBackend DB infraDB.DB Cfg *setting.Cfg Tracer trace.Tracer @@ -87,29 +88,35 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) { maxPageSizeBytes := unifiedStorageCfg.Key("max_page_size_bytes") serverOptions.MaxPageSizeBytes = maxPageSizeBytes.MustInt(0) - eDB, err := dbimpl.ProvideResourceDB(opts.DB, opts.Cfg, opts.Tracer) - if err != nil { - return nil, err + if opts.Backend != nil { + serverOptions.Backend = opts.Backend + // TODO: we should probably have a proper interface for diagnostics/lifecycle + } else { + eDB, err := dbimpl.ProvideResourceDB(opts.DB, opts.Cfg, opts.Tracer) + if err != nil { + return nil, err + } + + isHA := isHighAvailabilityEnabled(opts.Cfg.SectionWithEnvOverrides("database"), + opts.Cfg.SectionWithEnvOverrides("resource_api")) + withPruner := opts.Features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageHistoryPruner) + + backend, err := NewBackend(BackendOptions{ + DBProvider: eDB, + Tracer: opts.Tracer, + Reg: opts.Reg, + IsHA: isHA, + withPruner: withPruner, + storageMetrics: opts.StorageMetrics, + }) + if err != nil { + return nil, err + } + serverOptions.Backend = backend + serverOptions.Diagnostics = backend + serverOptions.Lifecycle = backend } - isHA := isHighAvailabilityEnabled(opts.Cfg.SectionWithEnvOverrides("database"), - opts.Cfg.SectionWithEnvOverrides("resource_api")) - withPruner := opts.Features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageHistoryPruner) - - store, err := NewBackend(BackendOptions{ - DBProvider: eDB, - Tracer: opts.Tracer, - Reg: opts.Reg, - IsHA: isHA, - withPruner: withPruner, - storageMetrics: opts.StorageMetrics, - }) - if err != nil { - return nil, err - } - serverOptions.Backend = store - serverOptions.Diagnostics = store - serverOptions.Lifecycle = store serverOptions.Search = opts.SearchOptions serverOptions.IndexMetrics = opts.IndexMetrics serverOptions.QOSQueue = opts.QOSQueue diff --git a/pkg/storage/unified/sql/service.go b/pkg/storage/unified/sql/service.go index 79d4615f53c..1e0c228cd1c 100644 --- a/pkg/storage/unified/sql/service.go +++ b/pkg/storage/unified/sql/service.go @@ -59,6 +59,7 @@ type service struct { subservicesWatcher *services.FailureWatcher hasSubservices bool + backend resource.StorageBackend cfg *setting.Cfg features featuremgmt.FeatureToggles db infraDB.DB @@ -97,6 +98,7 @@ func ProvideUnifiedStorageGrpcService( searchRing *ring.Ring, memberlistKVConfig kv.Config, httpServerRouter *mux.Router, + backend resource.StorageBackend, ) (UnifiedStorageGrpcService, error) { var err error tracer := otel.Tracer("unified-storage") @@ -109,6 +111,7 @@ func ProvideUnifiedStorageGrpcService( }) s := &service{ + backend: backend, cfg: cfg, features: features, stopCh: make(chan struct{}), @@ -263,6 +266,7 @@ func (s *service) starting(ctx context.Context) error { } serverOptions := ServerOptions{ + Backend: s.backend, DB: s.db, Cfg: s.cfg, Tracer: s.tracing, diff --git a/pkg/storage/unified/sql/test/integration_test.go b/pkg/storage/unified/sql/test/integration_test.go index fe0c5b049fe..9fa10efd938 100644 --- a/pkg/storage/unified/sql/test/integration_test.go +++ b/pkg/storage/unified/sql/test/integration_test.go @@ -130,7 +130,7 @@ func TestClientServer(t *testing.T) { features := featuremgmt.WithFeatures() - svc, err := sql.ProvideUnifiedStorageGrpcService(cfg, features, dbstore, nil, prometheus.NewPedanticRegistry(), nil, nil, nil, nil, kv.Config{}, nil) + svc, err := sql.ProvideUnifiedStorageGrpcService(cfg, features, dbstore, nil, prometheus.NewPedanticRegistry(), nil, nil, nil, nil, kv.Config{}, nil, nil) require.NoError(t, err) var client resourcepb.ResourceStoreClient diff --git a/pkg/storage/unified/testing/storage_backend_test.go b/pkg/storage/unified/testing/storage_backend_test.go index 817478426f0..04f34e9102f 100644 --- a/pkg/storage/unified/testing/storage_backend_test.go +++ b/pkg/storage/unified/testing/storage_backend_test.go @@ -18,10 +18,10 @@ func TestBadgerKVStorageBackend(t *testing.T) { t.Cleanup(func() { _ = db.Close() }) - kvOpts := resource.KvBackendOptions{ + kvOpts := resource.KVBackendOptions{ KvStore: resource.NewBadgerKV(db), } - backend, err := resource.NewKvStorageBackend(kvOpts) + backend, err := resource.NewKVStorageBackend(kvOpts) require.NoError(t, err) return backend }, &TestOptions{ diff --git a/pkg/tests/testinfra/testinfra.go b/pkg/tests/testinfra/testinfra.go index 924579c98df..c0d8166a5a8 100644 --- a/pkg/tests/testinfra/testinfra.go +++ b/pkg/tests/testinfra/testinfra.go @@ -129,7 +129,7 @@ func StartGrafanaEnv(t *testing.T, grafDir, cfgPath string) (string, *server.Tes var storage sql.UnifiedStorageGrpcService if runstore { storage, err = sql.ProvideUnifiedStorageGrpcService(env.Cfg, env.FeatureToggles, env.SQLStore, - env.Cfg.Logger, prometheus.NewPedanticRegistry(), nil, nil, nil, nil, kv.Config{}, nil) + env.Cfg.Logger, prometheus.NewPedanticRegistry(), nil, nil, nil, nil, kv.Config{}, nil, nil) require.NoError(t, err) ctx := context.Background() err = storage.StartAsync(ctx)