Compare commits

...

8 Commits

Author SHA1 Message Date
Rafael Paulovic
c8da64e4eb chore: separate resource service into search and storage 2026-01-15 00:49:08 +01:00
Rafael Paulovic
2fab497c18 chore: use only needed methods in storage interface
- continue cleanup and separation
2026-01-14 21:14:24 +01:00
Rafael Paulovic
b0bb71f834 fix: wire 2026-01-14 19:23:10 +01:00
mayor
a7aa55f908 Add search-server target and configurable search mode
- Add SearchServer module target for standalone search service
- Add search_mode config: "", "embedded" (default), "remote"
- Add search_server_address config for remote search server
- Create remote_search.go: gRPC client wrapper for remote search
- Create search_service.go: standalone search gRPC service
- Modify service.go: conditional search mode handling
- Backward compatible: empty/embedded mode works as before

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 17:49:03 +01:00
mayor
0a61846b5e Initialize backend before search server
The backend's Init() must be called before search.Init() because
buildIndexes() calls storage.GetResourceStats() which requires
the database connection to be established.

Previously, backend.Init() was called by ResourceServer.Init()
through the Lifecycle hooks, but now search.Init() runs before
ResourceServer is created.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 16:58:30 +01:00
mayor
785cc739ee Fix nil pointer in NewSearchServer - use serverOptions.Backend
The backend may be created inside the function and assigned to
serverOptions.Backend, not opts.Backend. Using opts.Backend caused
a nil pointer dereference in searchSupport.buildIndexes.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 16:22:42 +01:00
mayor
4913baaf04 Fix compilation errors from SearchServer extraction
- Update NewLocalResourceClient to accept SearchServer parameter
- Add SearchClient methods to ResourceClient interface (client needs both)
- Fix directResourceClient to implement new interface with stub methods
- Update search_and_storage.go test to create SearchServer separately

This continues the work of extracting SearchServer from ResourceServer
to enable independent usage by storage.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-12 17:59:46 +01:00
Peter Štibraný
c8f1efe7c7 Initial prototype extracting SearchServer from ResourceServer. 2026-01-12 17:50:39 +01:00
25 changed files with 1061 additions and 312 deletions

View File

@@ -74,7 +74,7 @@ func ToUnifiedStorage(c utils.CommandLine, cfg *setting.Cfg, sqlStore db.DB) err
return err return err
} }
grpcClient, err := newUnifiedClient(cfg, sqlStore, featureToggles) grpcClient, err := newUnifiedMigratorClient(cfg, sqlStore, featureToggles)
if err != nil { if err != nil {
return err return err
} }
@@ -92,7 +92,7 @@ func ToUnifiedStorage(c utils.CommandLine, cfg *setting.Cfg, sqlStore db.DB) err
return runInteractiveMigration(ctx, cfg, opts, dashboardAccess, grpcClient, start) return runInteractiveMigration(ctx, cfg, opts, dashboardAccess, grpcClient, start)
} }
func runNonInteractiveMigration(ctx context.Context, opts legacy.MigrateOptions, dashboardAccess legacy.MigrationDashboardAccessor, grpcClient resource.ResourceClient, start time.Time) error { func runNonInteractiveMigration(ctx context.Context, opts legacy.MigrateOptions, dashboardAccess legacy.MigrationDashboardAccessor, grpcClient resource.MigratorClient, start time.Time) error {
migrator := migrations.ProvideUnifiedMigrator(dashboardAccess, grpcClient) migrator := migrations.ProvideUnifiedMigrator(dashboardAccess, grpcClient)
opts.WithHistory = true // always include history in non-interactive mode opts.WithHistory = true // always include history in non-interactive mode
@@ -109,7 +109,7 @@ func runNonInteractiveMigration(ctx context.Context, opts legacy.MigrateOptions,
return nil return nil
} }
func runInteractiveMigration(ctx context.Context, cfg *setting.Cfg, opts legacy.MigrateOptions, dashboardAccess legacy.MigrationDashboardAccessor, grpcClient resource.ResourceClient, start time.Time) error { func runInteractiveMigration(ctx context.Context, cfg *setting.Cfg, opts legacy.MigrateOptions, dashboardAccess legacy.MigrationDashboardAccessor, grpcClient resource.MigratorClient, start time.Time) error {
yes, err := promptYesNo(fmt.Sprintf("Count legacy resources for namespace: %s?", opts.Namespace)) yes, err := promptYesNo(fmt.Sprintf("Count legacy resources for namespace: %s?", opts.Namespace))
if err != nil { if err != nil {
return err return err
@@ -225,7 +225,7 @@ func promptYesNo(prompt string) (bool, error) {
} }
} }
func newUnifiedClient(cfg *setting.Cfg, sqlStore db.DB, featureToggles featuremgmt.FeatureToggles) (resource.ResourceClient, error) { func newUnifiedMigratorClient(cfg *setting.Cfg, sqlStore db.DB, featureToggles featuremgmt.FeatureToggles) (resource.MigratorClient, error) {
return unified.ProvideUnifiedStorageClient(&unified.Options{ return unified.ProvideUnifiedStorageClient(&unified.Options{
Cfg: cfg, Cfg: cfg,
Features: featureToggles, Features: featureToggles,

View File

@@ -10,6 +10,7 @@ const (
SearchServerRing string = "search-server-ring" SearchServerRing string = "search-server-ring"
SearchServerDistributor string = "search-server-distributor" SearchServerDistributor string = "search-server-distributor"
StorageServer string = "storage-server" StorageServer string = "storage-server"
SearchServer string = "search-server"
ZanzanaServer string = "zanzana-server" ZanzanaServer string = "zanzana-server"
InstrumentationServer string = "instrumentation-server" InstrumentationServer string = "instrumentation-server"
FrontendServer string = "frontend-server" FrontendServer string = "frontend-server"
@@ -21,6 +22,7 @@ var dependencyMap = map[string][]string{
SearchServerRing: {InstrumentationServer, MemberlistKV}, SearchServerRing: {InstrumentationServer, MemberlistKV},
GrafanaAPIServer: {InstrumentationServer}, GrafanaAPIServer: {InstrumentationServer},
StorageServer: {InstrumentationServer, SearchServerRing}, StorageServer: {InstrumentationServer, SearchServerRing},
SearchServer: {InstrumentationServer, SearchServerRing},
ZanzanaServer: {InstrumentationServer}, ZanzanaServer: {InstrumentationServer},
SearchServerDistributor: {InstrumentationServer, MemberlistKV, SearchServerRing}, SearchServerDistributor: {InstrumentationServer, MemberlistKV, SearchServerRing},
Core: {}, Core: {},

View File

@@ -11,91 +11,59 @@ import (
) )
var ( var (
_ resource.ResourceClient = (*directResourceClient)(nil) _ resource.StorageClient = (*DirectStorageClient)(nil)
) )
// The direct client passes requests directly to the server using the *same* context // NewDirectStorageClient creates a client that passes requests directly to the server using the *same* context
func NewDirectResourceClient(server resource.ResourceServer) resource.ResourceClient { func NewDirectStorageClient(server resource.ResourceServer) *DirectStorageClient {
return &directResourceClient{server} return &DirectStorageClient{server}
} }
type directResourceClient struct { type DirectStorageClient struct {
server resource.ResourceServer server resource.ResourceServer
} }
// Create implements ResourceClient. // Create implements ResourceClient.
func (d *directResourceClient) Create(ctx context.Context, in *resourcepb.CreateRequest, opts ...grpc.CallOption) (*resourcepb.CreateResponse, error) { func (d *DirectStorageClient) Create(ctx context.Context, in *resourcepb.CreateRequest, _ ...grpc.CallOption) (*resourcepb.CreateResponse, error) {
return d.server.Create(ctx, in) return d.server.Create(ctx, in)
} }
// Delete implements ResourceClient. // Delete implements ResourceClient.
func (d *directResourceClient) Delete(ctx context.Context, in *resourcepb.DeleteRequest, opts ...grpc.CallOption) (*resourcepb.DeleteResponse, error) { func (d *DirectStorageClient) Delete(ctx context.Context, in *resourcepb.DeleteRequest, _ ...grpc.CallOption) (*resourcepb.DeleteResponse, error) {
return d.server.Delete(ctx, in) return d.server.Delete(ctx, in)
} }
// GetBlob implements ResourceClient. // GetBlob implements ResourceClient.
func (d *directResourceClient) GetBlob(ctx context.Context, in *resourcepb.GetBlobRequest, opts ...grpc.CallOption) (*resourcepb.GetBlobResponse, error) { func (d *DirectStorageClient) GetBlob(ctx context.Context, in *resourcepb.GetBlobRequest, _ ...grpc.CallOption) (*resourcepb.GetBlobResponse, error) {
return d.server.GetBlob(ctx, in) return d.server.GetBlob(ctx, in)
} }
// GetStats implements ResourceClient.
func (d *directResourceClient) GetStats(ctx context.Context, in *resourcepb.ResourceStatsRequest, opts ...grpc.CallOption) (*resourcepb.ResourceStatsResponse, error) {
return d.server.GetStats(ctx, in)
}
// IsHealthy implements ResourceClient. // IsHealthy implements ResourceClient.
func (d *directResourceClient) IsHealthy(ctx context.Context, in *resourcepb.HealthCheckRequest, opts ...grpc.CallOption) (*resourcepb.HealthCheckResponse, error) { func (d *DirectStorageClient) IsHealthy(ctx context.Context, in *resourcepb.HealthCheckRequest, _ ...grpc.CallOption) (*resourcepb.HealthCheckResponse, error) {
return d.server.IsHealthy(ctx, in) return d.server.IsHealthy(ctx, in)
} }
// List implements ResourceClient. // List implements ResourceClient.
func (d *directResourceClient) List(ctx context.Context, in *resourcepb.ListRequest, opts ...grpc.CallOption) (*resourcepb.ListResponse, error) { func (d *DirectStorageClient) List(ctx context.Context, in *resourcepb.ListRequest, _ ...grpc.CallOption) (*resourcepb.ListResponse, error) {
return d.server.List(ctx, in) return d.server.List(ctx, in)
} }
func (d *directResourceClient) ListManagedObjects(ctx context.Context, in *resourcepb.ListManagedObjectsRequest, opts ...grpc.CallOption) (*resourcepb.ListManagedObjectsResponse, error) {
return d.server.ListManagedObjects(ctx, in)
}
func (d *directResourceClient) CountManagedObjects(ctx context.Context, in *resourcepb.CountManagedObjectsRequest, opts ...grpc.CallOption) (*resourcepb.CountManagedObjectsResponse, error) {
return d.server.CountManagedObjects(ctx, in)
}
// PutBlob implements ResourceClient. // PutBlob implements ResourceClient.
func (d *directResourceClient) PutBlob(ctx context.Context, in *resourcepb.PutBlobRequest, opts ...grpc.CallOption) (*resourcepb.PutBlobResponse, error) { func (d *DirectStorageClient) PutBlob(ctx context.Context, in *resourcepb.PutBlobRequest, _ ...grpc.CallOption) (*resourcepb.PutBlobResponse, error) {
return d.server.PutBlob(ctx, in) return d.server.PutBlob(ctx, in)
} }
// Read implements ResourceClient. // Read implements ResourceClient.
func (d *directResourceClient) Read(ctx context.Context, in *resourcepb.ReadRequest, opts ...grpc.CallOption) (*resourcepb.ReadResponse, error) { func (d *DirectStorageClient) Read(ctx context.Context, in *resourcepb.ReadRequest, _ ...grpc.CallOption) (*resourcepb.ReadResponse, error) {
return d.server.Read(ctx, in) return d.server.Read(ctx, in)
} }
// Search implements ResourceClient.
func (d *directResourceClient) Search(ctx context.Context, in *resourcepb.ResourceSearchRequest, opts ...grpc.CallOption) (*resourcepb.ResourceSearchResponse, error) {
return d.server.Search(ctx, in)
}
// Update implements ResourceClient. // Update implements ResourceClient.
func (d *directResourceClient) Update(ctx context.Context, in *resourcepb.UpdateRequest, opts ...grpc.CallOption) (*resourcepb.UpdateResponse, error) { func (d *DirectStorageClient) Update(ctx context.Context, in *resourcepb.UpdateRequest, _ ...grpc.CallOption) (*resourcepb.UpdateResponse, error) {
return d.server.Update(ctx, in) return d.server.Update(ctx, in)
} }
// Watch implements ResourceClient. // Watch implements ResourceClient.
func (d *directResourceClient) Watch(ctx context.Context, in *resourcepb.WatchRequest, opts ...grpc.CallOption) (resourcepb.ResourceStore_WatchClient, error) { func (d *DirectStorageClient) Watch(_ context.Context, _ *resourcepb.WatchRequest, _ ...grpc.CallOption) (resourcepb.ResourceStore_WatchClient, error) {
return nil, fmt.Errorf("watch not supported with direct resource client") return nil, fmt.Errorf("watch not supported with direct resource client")
} }
// BulkProcess implements resource.ResourceClient.
func (d *directResourceClient) BulkProcess(ctx context.Context, opts ...grpc.CallOption) (resourcepb.BulkStore_BulkProcessClient, error) {
return nil, fmt.Errorf("BulkProcess not supported with direct resource client")
}
// RebuildIndexes implements resource.ResourceClient.
func (b *directResourceClient) RebuildIndexes(ctx context.Context, req *resourcepb.RebuildIndexesRequest, opts ...grpc.CallOption) (*resourcepb.RebuildIndexesResponse, error) {
return nil, fmt.Errorf("not implemented")
}
func (b *directResourceClient) GetQuotaUsage(ctx context.Context, req *resourcepb.QuotaUsageRequest, opts ...grpc.CallOption) (*resourcepb.QuotaUsageResponse, error) {
return nil, fmt.Errorf("not implemented")
}

View File

@@ -11,6 +11,7 @@ import (
"k8s.io/apiserver/pkg/registry/rest" "k8s.io/apiserver/pkg/registry/rest"
"github.com/grafana/authlib/types" "github.com/grafana/authlib/types"
"github.com/grafana/grafana/pkg/apimachinery/utils" "github.com/grafana/grafana/pkg/apimachinery/utils"
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic" grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest" grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
@@ -40,7 +41,7 @@ func (s *DashboardStorage) NewStore(dash utils.ResourceInfo, scheme *runtime.Sch
if err != nil { if err != nil {
return nil, err return nil, err
} }
client := legacy.NewDirectResourceClient(server) // same context client := legacy.NewDirectStorageClient(server) // same context
optsGetter := apistore.NewRESTOptionsGetterForClient(client, nil, optsGetter := apistore.NewRESTOptionsGetterForClient(client, nil,
defaultOpts.StorageConfig.Config, nil, defaultOpts.StorageConfig.Config, nil,
) )

View File

@@ -37,7 +37,6 @@ import (
"github.com/grafana/grafana/pkg/services/folder" "github.com/grafana/grafana/pkg/services/folder"
"github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/apistore" "github.com/grafana/grafana/pkg/storage/unified/apistore"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb" "github.com/grafana/grafana/pkg/storage/unified/resourcepb"
) )
@@ -74,7 +73,7 @@ func RegisterAPIService(cfg *setting.Cfg,
acService accesscontrol.Service, acService accesscontrol.Service,
accessClient authlib.AccessClient, accessClient authlib.AccessClient,
registerer prometheus.Registerer, registerer prometheus.Registerer,
unified resource.ResourceClient, unified resourcepb.ResourceIndexClient,
zanzanaClient zanzana.Client, zanzanaClient zanzana.Client,
) *FolderAPIBuilder { ) *FolderAPIBuilder {
builder := &FolderAPIBuilder{ builder := &FolderAPIBuilder{
@@ -93,7 +92,7 @@ func RegisterAPIService(cfg *setting.Cfg,
return builder return builder
} }
func NewAPIService(ac authlib.AccessClient, searcher resource.ResourceClient, features featuremgmt.FeatureToggles, zanzanaClient zanzana.Client, resourcePermissionsSvc *dynamic.NamespaceableResourceInterface) *FolderAPIBuilder { func NewAPIService(ac authlib.AccessClient, searcher resourcepb.ResourceIndexClient, features featuremgmt.FeatureToggles, zanzanaClient zanzana.Client, resourcePermissionsSvc *dynamic.NamespaceableResourceInterface) *FolderAPIBuilder {
return &FolderAPIBuilder{ return &FolderAPIBuilder{
features: features, features: features,
accessClient: ac, accessClient: ac,

View File

@@ -742,7 +742,7 @@ func NewLocalStore(resourceInfo utils.ResourceInfo, scheme *runtime.Scheme, defa
return nil, err return nil, err
} }
client := resource.NewLocalResourceClient(server) client := resource.NewLocalResourceClient(server, nil)
optsGetter := apistore.NewRESTOptionsGetterForClient(client, nil, defaultOpts.StorageConfig.Config, nil) optsGetter := apistore.NewRESTOptionsGetterForClient(client, nil, defaultOpts.StorageConfig.Config, nil)
store, err := grafanaregistry.NewRegistryStore(scheme, resourceInfo, optsGetter) store, err := grafanaregistry.NewRegistryStore(scheme, resourceInfo, optsGetter)

View File

@@ -205,6 +205,14 @@ func (s *ModuleServer) Run() error {
return sql.ProvideUnifiedStorageGrpcService(s.cfg, s.features, nil, s.log, s.registerer, docBuilders, s.storageMetrics, s.indexMetrics, s.searchServerRing, s.MemberlistKVConfig, s.httpServerRouter, s.storageBackend) return sql.ProvideUnifiedStorageGrpcService(s.cfg, s.features, nil, s.log, s.registerer, docBuilders, s.storageMetrics, s.indexMetrics, s.searchServerRing, s.MemberlistKVConfig, s.httpServerRouter, s.storageBackend)
}) })
m.RegisterModule(modules.SearchServer, func() (services.Service, error) {
docBuilders, err := InitializeDocumentBuilders(s.cfg)
if err != nil {
return nil, err
}
return sql.ProvideUnifiedSearchGrpcService(s.cfg, s.features, nil, s.log, s.registerer, docBuilders, s.indexMetrics, s.searchServerRing, s.MemberlistKVConfig, s.storageBackend)
})
m.RegisterModule(modules.ZanzanaServer, func() (services.Service, error) { m.RegisterModule(modules.ZanzanaServer, func() (services.Service, error) {
return authz.ProvideZanzanaService(s.cfg, s.features, s.registerer) return authz.ProvideZanzanaService(s.cfg, s.features, s.registerer)
}) })

View File

@@ -372,7 +372,7 @@ func initModuleServerForTest(
return testModuleServer{server: ms, grpcAddress: cfg.GRPCServer.Address, httpPort: cfg.HTTPPort, healthClient: healthClient, id: cfg.InstanceID} return testModuleServer{server: ms, grpcAddress: cfg.GRPCServer.Address, httpPort: cfg.HTTPPort, healthClient: healthClient, id: cfg.InstanceID}
} }
func createBaselineServer(t *testing.T, dbType, dbConnStr string, testNamespaces []string) resource.ResourceServer { func createBaselineServer(t *testing.T, dbType, dbConnStr string, testNamespaces []string) resource.SearchServer {
cfg := setting.NewCfg() cfg := setting.NewCfg()
section, err := cfg.Raw.NewSection("database") section, err := cfg.Raw.NewSection("database")
require.NoError(t, err) require.NoError(t, err)
@@ -391,17 +391,20 @@ func createBaselineServer(t *testing.T, dbType, dbConnStr string, testNamespaces
require.NoError(t, err) require.NoError(t, err)
searchOpts, err := search.NewSearchOptions(features, cfg, docBuilders, nil, nil) searchOpts, err := search.NewSearchOptions(features, cfg, docBuilders, nil, nil)
require.NoError(t, err) require.NoError(t, err)
server, err := sql.NewResourceServer(sql.ServerOptions{ searchServer, err := sql.NewSearchServer(sql.SearchServerOptions{
DB: nil, DB: nil,
Cfg: cfg, Cfg: cfg,
Tracer: tracer, Tracer: tracer,
Reg: nil, Reg: nil,
AccessClient: nil, AccessClient: nil,
SearchOptions: searchOpts, SearchOptions: searchOpts,
StorageMetrics: nil, IndexMetrics: nil,
IndexMetrics: nil, })
Features: features, require.NoError(t, err)
QOSQueue: nil, storageServer, err := sql.NewStorageServer(sql.StorageServerOptions{
Cfg: cfg,
Tracer: tracer,
Features: features,
}) })
require.NoError(t, err) require.NoError(t, err)
@@ -417,12 +420,12 @@ func createBaselineServer(t *testing.T, dbType, dbConnStr string, testNamespaces
for _, ns := range testNamespaces { for _, ns := range testNamespaces {
for range rand.Intn(maxPlaylistPerNamespace) + 1 { for range rand.Intn(maxPlaylistPerNamespace) + 1 {
_, err = server.Create(ctx, generatePlaylistPayload(ns)) _, err = storageServer.Create(ctx, generatePlaylistPayload(ns))
require.NoError(t, err) require.NoError(t, err)
} }
} }
return server return searchServer
} }
var counter int var counter int

23
pkg/server/wire_gen.go generated
View File

@@ -513,6 +513,11 @@ func Initialize(ctx context.Context, cfg *setting.Cfg, opts Options, apiOpts api
if err != nil { if err != nil {
return nil, err return nil, err
} }
storageMetrics := resource.ProvideStorageMetrics(registerer)
storageBackend, err := sql.ProvideStorageBackend(cfg, sqlStore, tracer, registerer, storageMetrics)
if err != nil {
return nil, err
}
options := &unified.Options{ options := &unified.Options{
Cfg: cfg, Cfg: cfg,
Features: featureToggles, Features: featureToggles,
@@ -522,8 +527,8 @@ func Initialize(ctx context.Context, cfg *setting.Cfg, opts Options, apiOpts api
Authzc: accessClient, Authzc: accessClient,
Docs: documentBuilderSupplier, Docs: documentBuilderSupplier,
SecureValues: inlineSecureValueSupport, SecureValues: inlineSecureValueSupport,
Backend: storageBackend,
} }
storageMetrics := resource.ProvideStorageMetrics(registerer)
bleveIndexMetrics := resource.ProvideIndexMetrics(registerer) bleveIndexMetrics := resource.ProvideIndexMetrics(registerer)
resourceClient, err := unified.ProvideUnifiedStorageClient(options, storageMetrics, bleveIndexMetrics) resourceClient, err := unified.ProvideUnifiedStorageClient(options, storageMetrics, bleveIndexMetrics)
if err != nil { if err != nil {
@@ -1173,6 +1178,11 @@ func InitializeForTest(ctx context.Context, t sqlutil.ITestDB, testingT interfac
if err != nil { if err != nil {
return nil, err return nil, err
} }
storageMetrics := resource.ProvideStorageMetrics(registerer)
storageBackend, err := sql.ProvideStorageBackend(cfg, sqlStore, tracer, registerer, storageMetrics)
if err != nil {
return nil, err
}
options := &unified.Options{ options := &unified.Options{
Cfg: cfg, Cfg: cfg,
Features: featureToggles, Features: featureToggles,
@@ -1182,8 +1192,8 @@ func InitializeForTest(ctx context.Context, t sqlutil.ITestDB, testingT interfac
Authzc: accessClient, Authzc: accessClient,
Docs: documentBuilderSupplier, Docs: documentBuilderSupplier,
SecureValues: inlineSecureValueSupport, SecureValues: inlineSecureValueSupport,
Backend: storageBackend,
} }
storageMetrics := resource.ProvideStorageMetrics(registerer)
bleveIndexMetrics := resource.ProvideIndexMetrics(registerer) bleveIndexMetrics := resource.ProvideIndexMetrics(registerer)
resourceClient, err := unified.ProvideUnifiedStorageClient(options, storageMetrics, bleveIndexMetrics) resourceClient, err := unified.ProvideUnifiedStorageClient(options, storageMetrics, bleveIndexMetrics)
if err != nil { if err != nil {
@@ -1748,7 +1758,14 @@ func InitializeModuleServer(cfg *setting.Cfg, opts Options, apiOpts api.ServerOp
hooksService := hooks.ProvideService() hooksService := hooks.ProvideService()
ossLicensingService := licensing.ProvideService(cfg, hooksService) ossLicensingService := licensing.ProvideService(cfg, hooksService)
moduleRegisterer := ProvideNoopModuleRegisterer() moduleRegisterer := ProvideNoopModuleRegisterer()
storageBackend, err := sql.ProvideStorageBackend(cfg) ossMigrations := migrations.ProvideOSSMigrations(featureToggles)
inProcBus := bus.ProvideBus(tracingService)
sqlStore, err := sqlstore.ProvideService(cfg, featureToggles, ossMigrations, inProcBus, tracingService)
if err != nil {
return nil, err
}
tracer := otelTracer()
storageBackend, err := sql.ProvideStorageBackend(cfg, sqlStore, tracer, registerer, storageMetrics)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -6,6 +6,9 @@ package server
import ( import (
"github.com/google/wire" "github.com/google/wire"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/services/sqlstore"
"github.com/grafana/grafana/pkg/configprovider" "github.com/grafana/grafana/pkg/configprovider"
"github.com/grafana/grafana/pkg/infra/metrics" "github.com/grafana/grafana/pkg/infra/metrics"
@@ -65,6 +68,7 @@ import (
"github.com/grafana/grafana/pkg/storage/legacysql" "github.com/grafana/grafana/pkg/storage/legacysql"
"github.com/grafana/grafana/pkg/storage/unified" "github.com/grafana/grafana/pkg/storage/unified"
"github.com/grafana/grafana/pkg/storage/unified/resource" "github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
search2 "github.com/grafana/grafana/pkg/storage/unified/search" search2 "github.com/grafana/grafana/pkg/storage/unified/search"
"github.com/grafana/grafana/pkg/storage/unified/search/builders" "github.com/grafana/grafana/pkg/storage/unified/search/builders"
"github.com/grafana/grafana/pkg/storage/unified/sql" "github.com/grafana/grafana/pkg/storage/unified/sql"
@@ -145,8 +149,10 @@ var wireExtsBasicSet = wire.NewSet(
sandbox.ProvideService, sandbox.ProvideService,
wire.Bind(new(sandbox.Sandbox), new(*sandbox.Service)), wire.Bind(new(sandbox.Sandbox), new(*sandbox.Service)),
wire.Struct(new(unified.Options), "*"), wire.Struct(new(unified.Options), "*"),
unified.ProvideUnifiedStorageClient,
sql.ProvideStorageBackend, sql.ProvideStorageBackend,
unified.ProvideUnifiedStorageClient,
wire.Bind(new(resourcepb.ResourceIndexClient), new(resource.ResourceClient)),
wire.Bind(new(resource.MigratorClient), new(resource.ResourceClient)),
builder.ProvideDefaultBuildHandlerChainFuncFromBuilders, builder.ProvideDefaultBuildHandlerChainFuncFromBuilders,
aggregatorrunner.ProvideNoopAggregatorConfigurator, aggregatorrunner.ProvideNoopAggregatorConfigurator,
apisregistry.WireSetExts, apisregistry.WireSetExts,
@@ -195,6 +201,16 @@ var wireExtsModuleServerSet = wire.NewSet(
tracing.ProvideTracingConfig, tracing.ProvideTracingConfig,
tracing.ProvideService, tracing.ProvideService,
wire.Bind(new(tracing.Tracer), new(*tracing.TracingService)), wire.Bind(new(tracing.Tracer), new(*tracing.TracingService)),
otelTracer,
// Bus
bus.ProvideBus,
wire.Bind(new(bus.Bus), new(*bus.InProcBus)),
// Database migrations
migrations.ProvideOSSMigrations,
wire.Bind(new(registry.DatabaseMigrator), new(*migrations.OSSMigrations)),
// Database
sqlstore.ProvideService,
wire.Bind(new(db.DB), new(*sqlstore.SQLStore)),
// Unified storage // Unified storage
resource.ProvideStorageMetrics, resource.ProvideStorageMetrics,
resource.ProvideIndexMetrics, resource.ProvideIndexMetrics,

View File

@@ -26,7 +26,7 @@ var _ generic.RESTOptionsGetter = (*RESTOptionsGetter)(nil)
type StorageOptionsRegister func(gr schema.GroupResource, opts StorageOptions) type StorageOptionsRegister func(gr schema.GroupResource, opts StorageOptions)
type RESTOptionsGetter struct { type RESTOptionsGetter struct {
client resource.ResourceClient client resource.StorageClient
secrets secret.InlineSecureValueSupport secrets secret.InlineSecureValueSupport
original storagebackend.Config original storagebackend.Config
configProvider RestConfigProvider configProvider RestConfigProvider
@@ -36,7 +36,7 @@ type RESTOptionsGetter struct {
} }
func NewRESTOptionsGetterForClient( func NewRESTOptionsGetterForClient(
client resource.ResourceClient, client resource.StorageClient,
secrets secret.InlineSecureValueSupport, secrets secret.InlineSecureValueSupport,
original storagebackend.Config, original storagebackend.Config,
configProvider RestConfigProvider, configProvider RestConfigProvider,
@@ -79,7 +79,7 @@ func NewRESTOptionsGetterMemory(originalStorageConfig storagebackend.Config, sec
} }
return NewRESTOptionsGetterForClient( return NewRESTOptionsGetterForClient(
resource.NewLocalResourceClient(server), resource.NewLocalResourceClient(server, nil),
secrets, secrets,
originalStorageConfig, originalStorageConfig,
nil, nil,
@@ -118,7 +118,7 @@ func NewRESTOptionsGetterForFileXX(path string,
} }
return NewRESTOptionsGetterForClient( return NewRESTOptionsGetterForClient(
resource.NewLocalResourceClient(server), resource.NewLocalResourceClient(server, nil),
nil, // secrets nil, // secrets
originalStorageConfig, originalStorageConfig,
nil, nil,

View File

@@ -88,7 +88,7 @@ type Storage struct {
trigger storage.IndexerFuncs trigger storage.IndexerFuncs
indexers *cache.Indexers indexers *cache.Indexers
store resource.ResourceClient store resource.StorageClient
getKey func(string) (*resourcepb.ResourceKey, error) getKey func(string) (*resourcepb.ResourceKey, error)
snowflake *snowflake.Node // used to enforce internal ids snowflake *snowflake.Node // used to enforce internal ids
configProvider RestConfigProvider // used for provisioning configProvider RestConfigProvider // used for provisioning
@@ -112,7 +112,7 @@ type RestConfigProvider interface {
// NewStorage instantiates a new Storage. // NewStorage instantiates a new Storage.
func NewStorage( func NewStorage(
config *storagebackend.ConfigForResource, config *storagebackend.ConfigForResource,
store resource.ResourceClient, store resource.StorageClient,
keyFunc func(obj runtime.Object) (string, error), keyFunc func(obj runtime.Object) (string, error),
keyParser func(key string) (*resourcepb.ResourceKey, error), keyParser func(key string) (*resourcepb.ResourceKey, error),
newFunc func() runtime.Object, newFunc func() runtime.Object,

View File

@@ -156,7 +156,7 @@ func testSetup(t testing.TB, opts ...setupOption) (context.Context, storage.Inte
default: default:
t.Fatalf("unsupported storage type: %s", setupOpts.storageType) t.Fatalf("unsupported storage type: %s", setupOpts.storageType)
} }
client := resource.NewLocalResourceClient(server) client := resource.NewLocalResourceClient(server, nil)
config := storagebackend.NewDefaultConfig(setupOpts.prefix, setupOpts.codec) config := storagebackend.NewDefaultConfig(setupOpts.prefix, setupOpts.codec)
store, destroyFunc, err := apistore.NewStorage( store, destroyFunc, err := apistore.NewStorage(

View File

@@ -21,6 +21,7 @@ import (
"github.com/grafana/dskit/grpcclient" "github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/middleware" "github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/services" "github.com/grafana/dskit/services"
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest" grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
infraDB "github.com/grafana/grafana/pkg/infra/db" infraDB "github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/infra/tracing"
@@ -45,6 +46,7 @@ type Options struct {
Authzc types.AccessClient Authzc types.AccessClient
Docs resource.DocumentBuilderSupplier Docs resource.DocumentBuilderSupplier
SecureValues secrets.InlineSecureValueSupport SecureValues secrets.InlineSecureValueSupport
Backend resource.StorageBackend // Shared backend to avoid duplicate metrics registration
} }
type clientMetrics struct { type clientMetrics struct {
@@ -66,7 +68,7 @@ func ProvideUnifiedStorageClient(opts *Options,
BlobStoreURL: apiserverCfg.Key("blob_url").MustString(""), BlobStoreURL: apiserverCfg.Key("blob_url").MustString(""),
BlobThresholdBytes: apiserverCfg.Key("blob_threshold_bytes").MustInt(options.BlobThresholdDefault), BlobThresholdBytes: apiserverCfg.Key("blob_threshold_bytes").MustInt(options.BlobThresholdDefault),
GrpcClientKeepaliveTime: apiserverCfg.Key("grpc_client_keepalive_time").MustDuration(0), GrpcClientKeepaliveTime: apiserverCfg.Key("grpc_client_keepalive_time").MustDuration(0),
}, opts.Cfg, opts.Features, opts.DB, opts.Tracer, opts.Reg, opts.Authzc, opts.Docs, storageMetrics, indexMetrics, opts.SecureValues) }, opts.Cfg, opts.Features, opts.DB, opts.Tracer, opts.Reg, opts.Authzc, opts.Docs, storageMetrics, indexMetrics, opts.SecureValues, opts.Backend)
if err == nil { if err == nil {
// Decide whether to disable SQL fallback stats per resource in Mode 5. // Decide whether to disable SQL fallback stats per resource in Mode 5.
// Otherwise we would still try to query the legacy SQL database in Mode 5. // Otherwise we would still try to query the legacy SQL database in Mode 5.
@@ -102,6 +104,7 @@ func newClient(opts options.StorageOptions,
storageMetrics *resource.StorageMetrics, storageMetrics *resource.StorageMetrics,
indexMetrics *resource.BleveIndexMetrics, indexMetrics *resource.BleveIndexMetrics,
secure secrets.InlineSecureValueSupport, secure secrets.InlineSecureValueSupport,
backend resource.StorageBackend,
) (resource.ResourceClient, error) { ) (resource.ResourceClient, error) {
ctx := context.Background() ctx := context.Background()
@@ -135,7 +138,7 @@ func newClient(opts options.StorageOptions,
if err != nil { if err != nil {
return nil, err return nil, err
} }
return resource.NewLocalResourceClient(server), nil return resource.NewLocalResourceClient(server, nil), nil
case options.StorageTypeUnifiedGrpc: case options.StorageTypeUnifiedGrpc:
if opts.Address == "" { if opts.Address == "" {
@@ -168,24 +171,14 @@ func newClient(opts options.StorageOptions,
return resource.NewResourceClient(conn, indexConn, cfg, features, tracer) return resource.NewResourceClient(conn, indexConn, cfg, features, tracer)
default: default:
// Create search options for the search server
searchOptions, err := search.NewSearchOptions(features, cfg, docs, indexMetrics, nil) searchOptions, err := search.NewSearchOptions(features, cfg, docs, indexMetrics, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
serverOptions := sql.ServerOptions{ // Setup QOS queue if enabled
DB: db, var qosQueue sql.QOSEnqueueDequeuer
Cfg: cfg,
Tracer: tracer,
Reg: reg,
AccessClient: authzc,
SearchOptions: searchOptions,
StorageMetrics: storageMetrics,
IndexMetrics: indexMetrics,
Features: features,
SecureValues: secure,
}
if cfg.QOSEnabled { if cfg.QOSEnabled {
qosReg := prometheus.WrapRegistererWithPrefix("resource_server_qos_", reg) qosReg := prometheus.WrapRegistererWithPrefix("resource_server_qos_", reg)
queue := scheduler.NewQueue(&scheduler.QueueOptions{ queue := scheduler.NewQueue(&scheduler.QueueOptions{
@@ -196,7 +189,7 @@ func newClient(opts options.StorageOptions,
if err := services.StartAndAwaitRunning(ctx, queue); err != nil { if err := services.StartAndAwaitRunning(ctx, queue); err != nil {
return nil, fmt.Errorf("failed to start queue: %w", err) return nil, fmt.Errorf("failed to start queue: %w", err)
} }
scheduler, err := scheduler.NewScheduler(queue, &scheduler.Config{ sched, err := scheduler.NewScheduler(queue, &scheduler.Config{
NumWorkers: cfg.QOSNumberWorker, NumWorkers: cfg.QOSNumberWorker,
Logger: cfg.Logger, Logger: cfg.Logger,
}) })
@@ -204,31 +197,59 @@ func newClient(opts options.StorageOptions,
return nil, fmt.Errorf("failed to create scheduler: %w", err) return nil, fmt.Errorf("failed to create scheduler: %w", err)
} }
err = services.StartAndAwaitRunning(ctx, scheduler) err = services.StartAndAwaitRunning(ctx, sched)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to start scheduler: %w", err) return nil, fmt.Errorf("failed to start scheduler: %w", err)
} }
serverOptions.QOSQueue = queue qosQueue = queue
} }
// only enable if an overrides file path is provided // Setup overrides service if enabled
var overridesSvc *resource.OverridesService
if cfg.OverridesFilePath != "" { if cfg.OverridesFilePath != "" {
overridesSvc, err := resource.NewOverridesService(ctx, cfg.Logger, reg, tracer, resource.ReloadOptions{ overridesSvc, err = resource.NewOverridesService(ctx, cfg.Logger, reg, tracer, resource.ReloadOptions{
FilePath: cfg.OverridesFilePath, FilePath: cfg.OverridesFilePath,
ReloadPeriod: cfg.OverridesReloadInterval, ReloadPeriod: cfg.OverridesReloadInterval,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
} }
serverOptions.OverridesService = overridesSvc
} }
server, err := sql.NewResourceServer(serverOptions) // Create the search server with shared backend
searchServer, err := sql.NewSearchServer(sql.SearchServerOptions{
Backend: backend, // Use shared backend to avoid duplicate metrics registration
DB: db,
Cfg: cfg,
Tracer: tracer,
Reg: reg,
AccessClient: authzc,
SearchOptions: searchOptions,
IndexMetrics: indexMetrics,
})
if err != nil { if err != nil {
return nil, err return nil, err
} }
return resource.NewLocalResourceClient(server), nil
// Create the storage server with shared backend
storageServer, err := sql.NewStorageServer(sql.StorageServerOptions{
Backend: backend, // Use shared backend to avoid duplicate metrics registration
DB: db,
Cfg: cfg,
Tracer: tracer,
Reg: reg,
AccessClient: authzc,
StorageMetrics: storageMetrics,
Features: features,
QOSQueue: qosQueue,
SecureValues: secure,
OverridesService: overridesSvc,
})
if err != nil {
return nil, err
}
return resource.NewLocalResourceClient(storageServer, searchServer), nil
} }
} }

View File

@@ -48,7 +48,7 @@ func buildCollectionSettings(opts legacy.MigrateOptions) resource.BulkSettings {
} }
type resourceClientStreamProvider struct { type resourceClientStreamProvider struct {
client resource.ResourceClient client resource.MigratorClient
} }
func (r *resourceClientStreamProvider) createStream(ctx context.Context, opts legacy.MigrateOptions) (resourcepb.BulkStore_BulkProcessClient, error) { func (r *resourceClientStreamProvider) createStream(ctx context.Context, opts legacy.MigrateOptions) (resourcepb.BulkStore_BulkProcessClient, error) {
@@ -71,7 +71,7 @@ func (b *bulkStoreClientStreamProvider) createStream(ctx context.Context, opts l
// This can migrate Folders, Dashboards and LibraryPanels // This can migrate Folders, Dashboards and LibraryPanels
func ProvideUnifiedMigrator( func ProvideUnifiedMigrator(
dashboardAccess legacy.MigrationDashboardAccessor, dashboardAccess legacy.MigrationDashboardAccessor,
client resource.ResourceClient, client resource.MigratorClient,
) UnifiedMigrator { ) UnifiedMigrator {
return newUnifiedMigrator( return newUnifiedMigrator(
dashboardAccess, dashboardAccess,

View File

@@ -31,13 +31,31 @@ import (
"github.com/grafana/grafana/pkg/storage/unified/resourcepb" "github.com/grafana/grafana/pkg/storage/unified/resourcepb"
) )
//go:generate mockery --name ResourceClient --structname MockResourceClient --inpackage --filename client_mock.go --with-expecter // SearchClient is used to interact with unified search
type ResourceClient interface { type SearchClient interface {
resourcepb.ResourceStoreClient
resourcepb.ResourceIndexClient resourcepb.ResourceIndexClient
resourcepb.ManagedObjectIndexClient resourcepb.ManagedObjectIndexClient
resourcepb.BulkStoreClient }
// StorageClient is used to interact with unified storage
type StorageClient interface {
resourcepb.ResourceStoreClient
resourcepb.BlobStoreClient resourcepb.BlobStoreClient
}
// MigratorClient is used to perform migrations to unified storage
type MigratorClient interface {
resourcepb.BulkStoreClient
GetStats(ctx context.Context, in *resourcepb.ResourceStatsRequest, opts ...grpc.CallOption) (*resourcepb.ResourceStatsResponse, error)
}
// ResourceClient combines all resource-related clients and should be avoided in favor of more specific interfaces when possible
//
//go:generate mockery --name ResourceClient --structname MockResourceClient --inpackage --filename client_mock.go --with-expecter
type ResourceClient interface {
StorageClient
SearchClient
MigratorClient
resourcepb.DiagnosticsClient resourcepb.DiagnosticsClient
resourcepb.QuotasClient resourcepb.QuotasClient
} }
@@ -92,16 +110,15 @@ func NewLegacyResourceClient(channel grpc.ClientConnInterface, indexChannel grpc
return newResourceClient(cc, cci) return newResourceClient(cc, cci)
} }
func NewLocalResourceClient(server ResourceServer) ResourceClient { func NewLocalResourceClient(server ResourceServer, searchServer SearchServer) ResourceClient {
// scenario: local in-proc // scenario: local in-proc
channel := &inprocgrpc.Channel{} channel := &inprocgrpc.Channel{}
indexChannel := &inprocgrpc.Channel{}
tracer := otel.Tracer("github.com/grafana/grafana/pkg/storage/unified/resource") tracer := otel.Tracer("github.com/grafana/grafana/pkg/storage/unified/resource")
grpcAuthInt := grpcutils.NewUnsafeAuthenticator(tracer) grpcAuthInt := grpcutils.NewUnsafeAuthenticator(tracer)
for _, desc := range []*grpc.ServiceDesc{ for _, desc := range []*grpc.ServiceDesc{
&resourcepb.ResourceStore_ServiceDesc, &resourcepb.ResourceStore_ServiceDesc,
&resourcepb.ResourceIndex_ServiceDesc,
&resourcepb.ManagedObjectIndex_ServiceDesc,
&resourcepb.BlobStore_ServiceDesc, &resourcepb.BlobStore_ServiceDesc,
&resourcepb.BulkStore_ServiceDesc, &resourcepb.BulkStore_ServiceDesc,
&resourcepb.Diagnostics_ServiceDesc, &resourcepb.Diagnostics_ServiceDesc,
@@ -117,13 +134,31 @@ func NewLocalResourceClient(server ResourceServer) ResourceClient {
) )
} }
// Register search services on the index channel if searchServer is provided
if searchServer != nil {
for _, desc := range []*grpc.ServiceDesc{
&resourcepb.ResourceIndex_ServiceDesc,
&resourcepb.ManagedObjectIndex_ServiceDesc,
} {
indexChannel.RegisterService(
grpchan.InterceptServer(
desc,
grpcAuth.UnaryServerInterceptor(grpcAuthInt),
grpcAuth.StreamServerInterceptor(grpcAuthInt),
),
searchServer,
)
}
}
clientInt := authnlib.NewGrpcClientInterceptor( clientInt := authnlib.NewGrpcClientInterceptor(
ProvideInProcExchanger(), ProvideInProcExchanger(),
authnlib.WithClientInterceptorIDTokenExtractor(idTokenExtractor), authnlib.WithClientInterceptorIDTokenExtractor(idTokenExtractor),
) )
cc := grpchan.InterceptClientConn(channel, clientInt.UnaryClientInterceptor, clientInt.StreamClientInterceptor) cc := grpchan.InterceptClientConn(channel, clientInt.UnaryClientInterceptor, clientInt.StreamClientInterceptor)
return newResourceClient(cc, cc) cci := grpchan.InterceptClientConn(indexChannel, clientInt.UnaryClientInterceptor, clientInt.StreamClientInterceptor)
return newResourceClient(cc, cci)
} }
type RemoteResourceClientConfig struct { type RemoteResourceClientConfig struct {

View File

@@ -127,7 +127,10 @@ type SearchBackend interface {
GetOpenIndexes() []NamespacedResource GetOpenIndexes() []NamespacedResource
} }
// This supports indexing+search regardless of implementation var _ SearchServer = &searchSupport{}
// This supports indexing+search regardless of implementation.
// Implements SearchServer interface.
type searchSupport struct { type searchSupport struct {
log log.Logger log log.Logger
storage StorageBackend storage StorageBackend
@@ -160,6 +163,10 @@ var (
_ resourcepb.ManagedObjectIndexServer = (*searchSupport)(nil) _ resourcepb.ManagedObjectIndexServer = (*searchSupport)(nil)
) )
func NewSearchServer(opts SearchOptions, storage StorageBackend, access types.AccessClient, blob BlobSupport, indexMetrics *BleveIndexMetrics, ownsIndexFn func(key NamespacedResource) (bool, error)) (SearchServer, error) {
return newSearchSupport(opts, storage, access, blob, indexMetrics, ownsIndexFn)
}
func newSearchSupport(opts SearchOptions, storage StorageBackend, access types.AccessClient, blob BlobSupport, indexMetrics *BleveIndexMetrics, ownsIndexFn func(key NamespacedResource) (bool, error)) (support *searchSupport, err error) { func newSearchSupport(opts SearchOptions, storage StorageBackend, access types.AccessClient, blob BlobSupport, indexMetrics *BleveIndexMetrics, ownsIndexFn func(key NamespacedResource) (bool, error)) (support *searchSupport, err error) {
// No backend search support // No backend search support
if opts.Backend == nil { if opts.Backend == nil {
@@ -598,6 +605,22 @@ func (s *searchSupport) buildIndexes(ctx context.Context) (int, error) {
return totalBatchesIndexed, nil return totalBatchesIndexed, nil
} }
func (s *searchSupport) Init(ctx context.Context) error {
return s.init(ctx)
}
func (s *searchSupport) Stop(_ context.Context) error {
s.stop()
return nil
}
// IsHealthy implements resourcepb.DiagnosticsServer
func (s *searchSupport) IsHealthy(ctx context.Context, req *resourcepb.HealthCheckRequest) (*resourcepb.HealthCheckResponse, error) {
return &resourcepb.HealthCheckResponse{
Status: resourcepb.HealthCheckResponse_SERVING,
}, nil
}
func (s *searchSupport) init(ctx context.Context) error { func (s *searchSupport) init(ctx context.Context) error {
origCtx := ctx origCtx := ctx

View File

@@ -60,7 +60,7 @@ func ProvideSearchDistributorServer(cfg *setting.Cfg, features featuremgmt.Featu
} }
type RingClient struct { type RingClient struct {
Client ResourceClient Client SearchClient
grpc_health_v1.HealthClient grpc_health_v1.HealthClient
Conn *grpc.ClientConn Conn *grpc.ClientConn
} }
@@ -99,7 +99,7 @@ var (
func (ds *distributorServer) Search(ctx context.Context, r *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) { func (ds *distributorServer) Search(ctx context.Context, r *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) {
ctx, span := ds.tracing.Start(ctx, "distributor.Search") ctx, span := ds.tracing.Start(ctx, "distributor.Search")
defer span.End() defer span.End()
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace, "Search") ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -110,7 +110,7 @@ func (ds *distributorServer) Search(ctx context.Context, r *resourcepb.ResourceS
func (ds *distributorServer) GetStats(ctx context.Context, r *resourcepb.ResourceStatsRequest) (*resourcepb.ResourceStatsResponse, error) { func (ds *distributorServer) GetStats(ctx context.Context, r *resourcepb.ResourceStatsRequest) (*resourcepb.ResourceStatsResponse, error) {
ctx, span := ds.tracing.Start(ctx, "distributor.GetStats") ctx, span := ds.tracing.Start(ctx, "distributor.GetStats")
defer span.End() defer span.End()
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace, "GetStats") ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -215,7 +215,7 @@ func (ds *distributorServer) RebuildIndexes(ctx context.Context, r *resourcepb.R
func (ds *distributorServer) CountManagedObjects(ctx context.Context, r *resourcepb.CountManagedObjectsRequest) (*resourcepb.CountManagedObjectsResponse, error) { func (ds *distributorServer) CountManagedObjects(ctx context.Context, r *resourcepb.CountManagedObjectsRequest) (*resourcepb.CountManagedObjectsResponse, error) {
ctx, span := ds.tracing.Start(ctx, "distributor.CountManagedObjects") ctx, span := ds.tracing.Start(ctx, "distributor.CountManagedObjects")
defer span.End() defer span.End()
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace, "CountManagedObjects") ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -226,7 +226,7 @@ func (ds *distributorServer) CountManagedObjects(ctx context.Context, r *resourc
func (ds *distributorServer) ListManagedObjects(ctx context.Context, r *resourcepb.ListManagedObjectsRequest) (*resourcepb.ListManagedObjectsResponse, error) { func (ds *distributorServer) ListManagedObjects(ctx context.Context, r *resourcepb.ListManagedObjectsRequest) (*resourcepb.ListManagedObjectsResponse, error) {
ctx, span := ds.tracing.Start(ctx, "distributor.ListManagedObjects") ctx, span := ds.tracing.Start(ctx, "distributor.ListManagedObjects")
defer span.End() defer span.End()
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace, "ListManagedObjects") ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -234,7 +234,7 @@ func (ds *distributorServer) ListManagedObjects(ctx context.Context, r *resource
return client.ListManagedObjects(ctx, r) return client.ListManagedObjects(ctx, r)
} }
func (ds *distributorServer) getClientToDistributeRequest(ctx context.Context, namespace string, methodName string) (context.Context, ResourceClient, error) { func (ds *distributorServer) getClientToDistributeRequest(ctx context.Context, namespace string) (context.Context, SearchClient, error) {
ringHasher := fnv.New32a() ringHasher := fnv.New32a()
_, err := ringHasher.Write([]byte(namespace)) _, err := ringHasher.Write([]byte(namespace))
if err != nil { if err != nil {

View File

@@ -34,12 +34,18 @@ import (
var tracer = otel.Tracer("github.com/grafana/grafana/pkg/storage/unified/resource") var tracer = otel.Tracer("github.com/grafana/grafana/pkg/storage/unified/resource")
type SearchServer interface {
LifecycleHooks
resourcepb.ResourceIndexServer
resourcepb.ManagedObjectIndexServer
resourcepb.DiagnosticsServer
}
// ResourceServer implements all gRPC services // ResourceServer implements all gRPC services
type ResourceServer interface { type ResourceServer interface {
resourcepb.ResourceStoreServer resourcepb.ResourceStoreServer
resourcepb.BulkStoreServer resourcepb.BulkStoreServer
resourcepb.ResourceIndexServer
resourcepb.ManagedObjectIndexServer
resourcepb.BlobStoreServer resourcepb.BlobStoreServer
resourcepb.DiagnosticsServer resourcepb.DiagnosticsServer
resourcepb.QuotasServer resourcepb.QuotasServer
@@ -221,9 +227,6 @@ type ResourceServerOptions struct {
// The blob configuration // The blob configuration
Blob BlobConfig Blob BlobConfig
// Search options
Search SearchOptions
// Quota service // Quota service
OverridesService *OverridesService OverridesService *OverridesService
@@ -251,16 +254,15 @@ type ResourceServerOptions struct {
storageMetrics *StorageMetrics storageMetrics *StorageMetrics
IndexMetrics *BleveIndexMetrics
// MaxPageSizeBytes is the maximum size of a page in bytes. // MaxPageSizeBytes is the maximum size of a page in bytes.
MaxPageSizeBytes int MaxPageSizeBytes int
// IndexMinUpdateInterval is the time to wait after a successful write operation to ensure read-after-write consistency in search.
// This config is shared with search
IndexMinUpdateInterval time.Duration
// QOSQueue is the quality of service queue used to enqueue // QOSQueue is the quality of service queue used to enqueue
QOSQueue QOSEnqueuer QOSQueue QOSEnqueuer
QOSConfig QueueConfig QOSConfig QueueConfig
OwnsIndexFn func(key NamespacedResource) (bool, error)
} }
func NewResourceServer(opts ResourceServerOptions) (*server, error) { func NewResourceServer(opts ResourceServerOptions) (*server, error) {
@@ -343,23 +345,24 @@ func NewResourceServer(opts ResourceServerOptions) (*server, error) {
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
storageMetrics: opts.storageMetrics, storageMetrics: opts.storageMetrics,
indexMetrics: opts.IndexMetrics,
maxPageSizeBytes: opts.MaxPageSizeBytes, maxPageSizeBytes: opts.MaxPageSizeBytes,
reg: opts.Reg, reg: opts.Reg,
queue: opts.QOSQueue, queue: opts.QOSQueue,
queueConfig: opts.QOSConfig, queueConfig: opts.QOSConfig,
overridesService: opts.OverridesService, overridesService: opts.OverridesService,
artificialSuccessfulWriteDelay: opts.Search.IndexMinUpdateInterval, artificialSuccessfulWriteDelay: opts.IndexMinUpdateInterval,
} }
if opts.Search.Resources != nil { /*
var err error if opts.Search.Resources != nil {
s.search, err = newSearchSupport(opts.Search, s.backend, s.access, s.blob, opts.IndexMetrics, opts.OwnsIndexFn) var err error
if err != nil { s.search, err = newSearchSupport(opts.Search, s.backend, s.access, s.blob, opts.IndexMetrics, opts.OwnsIndexFn)
return nil, err if err != nil {
return nil, err
}
} }
} */
err := s.Init(ctx) err := s.Init(ctx)
if err != nil { if err != nil {
@@ -377,7 +380,6 @@ type server struct {
backend StorageBackend backend StorageBackend
blob BlobSupport blob BlobSupport
secure secrets.InlineSecureValueSupport secure secrets.InlineSecureValueSupport
search *searchSupport
diagnostics resourcepb.DiagnosticsServer diagnostics resourcepb.DiagnosticsServer
access claims.AccessClient access claims.AccessClient
writeHooks WriteAccessHooks writeHooks WriteAccessHooks
@@ -424,11 +426,6 @@ func (s *server) Init(ctx context.Context) error {
s.initErr = s.overridesService.init(ctx) s.initErr = s.overridesService.init(ctx)
} }
// initialize the search index
if s.initErr == nil && s.search != nil {
s.initErr = s.search.init(ctx)
}
// Start watching for changes // Start watching for changes
if s.initErr == nil { if s.initErr == nil {
s.initErr = s.initWatcher() s.initErr = s.initWatcher()
@@ -453,10 +450,6 @@ func (s *server) Stop(ctx context.Context) error {
} }
} }
if s.search != nil {
s.search.stop()
}
if s.overridesService != nil { if s.overridesService != nil {
if err := s.overridesService.stop(ctx); err != nil { if err := s.overridesService.stop(ctx); err != nil {
stopFailed = true stopFailed = true
@@ -1372,47 +1365,6 @@ func (s *server) Watch(req *resourcepb.WatchRequest, srv resourcepb.ResourceStor
} }
} }
func (s *server) Search(ctx context.Context, req *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) {
if s.search == nil {
return nil, fmt.Errorf("search index not configured")
}
return s.search.Search(ctx, req)
}
// GetStats implements ResourceServer.
func (s *server) GetStats(ctx context.Context, req *resourcepb.ResourceStatsRequest) (*resourcepb.ResourceStatsResponse, error) {
if err := s.Init(ctx); err != nil {
return nil, err
}
if s.search == nil {
// If the backend implements "GetStats", we can use it
srv, ok := s.backend.(resourcepb.ResourceIndexServer)
if ok {
return srv.GetStats(ctx, req)
}
return nil, fmt.Errorf("search index not configured")
}
return s.search.GetStats(ctx, req)
}
func (s *server) ListManagedObjects(ctx context.Context, req *resourcepb.ListManagedObjectsRequest) (*resourcepb.ListManagedObjectsResponse, error) {
if s.search == nil {
return nil, fmt.Errorf("search index not configured")
}
return s.search.ListManagedObjects(ctx, req)
}
func (s *server) CountManagedObjects(ctx context.Context, req *resourcepb.CountManagedObjectsRequest) (*resourcepb.CountManagedObjectsResponse, error) {
if s.search == nil {
return nil, fmt.Errorf("search index not configured")
}
return s.search.CountManagedObjects(ctx, req)
}
// IsHealthy implements ResourceServer. // IsHealthy implements ResourceServer.
func (s *server) IsHealthy(ctx context.Context, req *resourcepb.HealthCheckRequest) (*resourcepb.HealthCheckResponse, error) { func (s *server) IsHealthy(ctx context.Context, req *resourcepb.HealthCheckRequest) (*resourcepb.HealthCheckResponse, error) {
return s.diagnostics.IsHealthy(ctx, req) return s.diagnostics.IsHealthy(ctx, req)
@@ -1568,14 +1520,6 @@ func (s *server) runInQueue(ctx context.Context, tenantID string, runnable func(
} }
} }
func (s *server) RebuildIndexes(ctx context.Context, req *resourcepb.RebuildIndexesRequest) (*resourcepb.RebuildIndexesResponse, error) {
if s.search == nil {
return nil, fmt.Errorf("search index not configured")
}
return s.search.RebuildIndexes(ctx, req)
}
func (s *server) checkQuota(ctx context.Context, nsr NamespacedResource) { func (s *server) checkQuota(ctx context.Context, nsr NamespacedResource) {
span := trace.SpanFromContext(ctx) span := trace.SpanFromContext(ctx)
span.AddEvent("checkQuota", trace.WithAttributes( span.AddEvent("checkQuota", trace.WithAttributes(

View File

@@ -11,6 +11,9 @@ import (
"time" "time"
"github.com/go-sql-driver/mysql" "github.com/go-sql-driver/mysql"
infraDB "github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
"github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgconn"
"github.com/lib/pq" "github.com/lib/pq"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@@ -44,10 +47,63 @@ const defaultPrunerHistoryLimit = 20
func ProvideStorageBackend( func ProvideStorageBackend(
cfg *setting.Cfg, cfg *setting.Cfg,
db infraDB.DB,
tracer trace.Tracer,
reg prometheus.Registerer,
storageMetrics *resource.StorageMetrics,
) (resource.StorageBackend, error) { ) (resource.StorageBackend, error) {
// TODO: make this the central place to provide SQL backend // Create the resource DB
// Currently it is skipped as we need to handle the cases of Diagnostics and Lifecycle eDB, err := dbimpl.ProvideResourceDB(db, cfg, tracer)
return nil, nil if err != nil {
return nil, fmt.Errorf("failed to create resource DB: %w", err)
}
// Check if HA is enabled
isHA := isHighAvailabilityEnabled(
cfg.SectionWithEnvOverrides("database"),
cfg.SectionWithEnvOverrides("resource_api"),
)
// Create the backend
backend, err := NewBackend(BackendOptions{
DBProvider: eDB,
Reg: reg,
IsHA: isHA,
storageMetrics: storageMetrics,
LastImportTimeMaxAge: cfg.MaxFileIndexAge,
})
if err != nil {
return nil, fmt.Errorf("failed to create backend: %w", err)
}
// Initialize the backend
if err := backend.Init(context.Background()); err != nil {
return nil, fmt.Errorf("failed to initialize backend: %w", err)
}
return backend, nil
}
// isHighAvailabilityEnabled determines if high availability mode should
// be enabled based on database configuration. High availability is enabled
// by default except for SQLite databases.
func isHighAvailabilityEnabled(dbCfg, resourceAPICfg *setting.DynamicSection) bool {
// If the resource API is using a non-SQLite database, we assume it's in HA mode.
resourceDBType := resourceAPICfg.Key("db_type").String()
if resourceDBType != "" && resourceDBType != migrator.SQLite {
return true
}
// Check in the config if HA is enabled - by default we always assume a HA setup.
isHA := dbCfg.Key("high_availability").MustBool(true)
// SQLite is not possible to run in HA, so we force it to false.
databaseType := dbCfg.Key("type").String()
if databaseType == migrator.SQLite {
isHA = false
}
return isHA
} }
type Backend interface { type Backend interface {

View File

@@ -0,0 +1,322 @@
package sql
import (
"context"
"errors"
"fmt"
"hash/fnv"
"net/http"
"github.com/gorilla/mux"
"github.com/grafana/grafana/pkg/storage/unified/search"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/health/grpc_health_v1"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
infraDB "github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/modules"
"github.com/grafana/grafana/pkg/services/authz"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/grpcserver"
"github.com/grafana/grafana/pkg/services/grpcserver/interceptors"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/resource/grpc"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
)
var _ UnifiedStorageGrpcService = (*searchService)(nil)
// operation used by the search-servers to check if they own the namespace
var (
searchOwnerRead = ring.NewOp([]ring.InstanceState{ring.JOINING, ring.ACTIVE, ring.LEAVING}, nil)
)
type searchService struct {
*services.BasicService
backend resource.StorageBackend
cfg *setting.Cfg
features featuremgmt.FeatureToggles
db infraDB.DB
stopCh chan struct{}
stoppedCh chan error
handler grpcserver.Provider
tracing trace.Tracer
authenticator func(ctx context.Context) (context.Context, error)
httpServerRouter *mux.Router
log log.Logger
reg prometheus.Registerer
docBuilders resource.DocumentBuilderSupplier
indexMetrics *resource.BleveIndexMetrics
searchRing *ring.Ring
// Ring lifecycle and sharding support
ringLifecycler *ring.BasicLifecycler
// Subservices manager
subservices *services.Manager
subservicesWatcher *services.FailureWatcher
hasSubservices bool
}
func ProvideUnifiedSearchGrpcService(
cfg *setting.Cfg,
features featuremgmt.FeatureToggles,
db infraDB.DB,
log log.Logger,
reg prometheus.Registerer,
docBuilders resource.DocumentBuilderSupplier,
indexMetrics *resource.BleveIndexMetrics,
searchRing *ring.Ring,
memberlistKVConfig kv.Config,
backend resource.StorageBackend,
httpServerRouter *mux.Router,
) (UnifiedStorageGrpcService, error) {
var err error
tracer := otel.Tracer("unified-search-server")
authn := NewAuthenticatorWithFallback(cfg, reg, tracer, func(ctx context.Context) (context.Context, error) {
auth := grpc.Authenticator{Tracer: tracer}
return auth.Authenticate(ctx)
})
s := &searchService{
backend: backend,
cfg: cfg,
features: features,
stopCh: make(chan struct{}),
stoppedCh: make(chan error, 1),
authenticator: authn,
tracing: tracer,
db: db,
log: log,
reg: reg,
docBuilders: docBuilders,
indexMetrics: indexMetrics,
searchRing: searchRing,
httpServerRouter: httpServerRouter,
subservicesWatcher: services.NewFailureWatcher(),
}
subservices := []services.Service{}
if cfg.EnableSharding {
ringStore, err := kv.NewClient(
memberlistKVConfig,
ring.GetCodec(),
kv.RegistererWithKVName(reg, resource.RingName),
log,
)
if err != nil {
return nil, fmt.Errorf("failed to create KV store client: %s", err)
}
lifecyclerCfg, err := toLifecyclerConfig(cfg, log)
if err != nil {
return nil, fmt.Errorf("failed to initialize search-ring lifecycler config: %s", err)
}
// Define lifecycler delegates in reverse order (last to be called defined first because they're
// chained via "next delegate").
delegate := ring.BasicLifecyclerDelegate(ring.NewInstanceRegisterDelegate(ring.JOINING, resource.RingNumTokens))
delegate = ring.NewLeaveOnStoppingDelegate(delegate, log)
delegate = ring.NewAutoForgetDelegate(resource.RingHeartbeatTimeout*2, delegate, log)
s.ringLifecycler, err = ring.NewBasicLifecycler(
lifecyclerCfg,
resource.RingName,
resource.RingKey,
ringStore,
delegate,
log,
reg,
)
if err != nil {
return nil, fmt.Errorf("failed to initialize search-ring lifecycler: %s", err)
}
s.ringLifecycler.SetKeepInstanceInTheRingOnShutdown(true)
subservices = append(subservices, s.ringLifecycler)
}
if len(subservices) > 0 {
s.hasSubservices = true
s.subservices, err = services.NewManager(subservices...)
if err != nil {
return nil, fmt.Errorf("failed to create subservices manager: %w", err)
}
}
// This will be used when running as a dskit service
s.BasicService = services.NewBasicService(s.starting, s.running, s.stopping).WithName(modules.SearchServer)
// Register HTTP endpoints if router is provided
s.RegisterHTTPEndpoints(httpServerRouter)
return s, nil
}
func (s *searchService) PrepareDownscale(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodPost:
s.log.Info("Preparing for downscale. Will not keep instance in ring on shutdown.")
s.ringLifecycler.SetKeepInstanceInTheRingOnShutdown(false)
case http.MethodDelete:
s.log.Info("Downscale canceled. Will keep instance in ring on shutdown.")
s.ringLifecycler.SetKeepInstanceInTheRingOnShutdown(true)
case http.MethodGet:
// used for delayed downscale use case, which we don't support. Leaving here for completion sake
s.log.Info("Received GET request for prepare-downscale. Behavior not implemented.")
default:
}
}
func (s *searchService) OwnsIndex(key resource.NamespacedResource) (bool, error) {
if s.searchRing == nil {
return true, nil
}
if st := s.searchRing.State(); st != services.Running {
return false, fmt.Errorf("ring is not Running: %s", st)
}
ringHasher := fnv.New32a()
_, err := ringHasher.Write([]byte(key.Namespace))
if err != nil {
return false, fmt.Errorf("error hashing namespace: %w", err)
}
rs, err := s.searchRing.GetWithOptions(ringHasher.Sum32(), searchOwnerRead, ring.WithReplicationFactor(s.searchRing.ReplicationFactor()))
if err != nil {
return false, fmt.Errorf("error getting replicaset from ring: %w", err)
}
return rs.Includes(s.ringLifecycler.GetInstanceAddr()), nil
}
func (s *searchService) starting(ctx context.Context) error {
if s.hasSubservices {
s.subservicesWatcher.WatchManager(s.subservices)
if err := services.StartManagerAndAwaitHealthy(ctx, s.subservices); err != nil {
return fmt.Errorf("failed to start subservices: %w", err)
}
}
authzClient, err := authz.ProvideStandaloneAuthZClient(s.cfg, s.features, s.tracing, s.reg)
if err != nil {
return err
}
// Create search options for the search server
searchOptions, err := search.NewSearchOptions(s.features, s.cfg, s.docBuilders, s.indexMetrics, s.OwnsIndex)
if err != nil {
return err
}
// Create the search server
searchServer, err := NewSearchServer(SearchServerOptions{
Backend: s.backend,
DB: s.db,
Cfg: s.cfg,
Tracer: s.tracing,
Reg: s.reg,
AccessClient: authzClient,
SearchOptions: searchOptions,
IndexMetrics: s.indexMetrics,
OwnsIndexFn: s.OwnsIndex,
})
if err != nil {
return err
}
s.handler, err = grpcserver.ProvideService(s.cfg, s.features, interceptors.AuthenticatorFunc(s.authenticator), s.tracing, prometheus.DefaultRegisterer)
if err != nil {
return err
}
healthService, err := resource.ProvideHealthService(searchServer)
if err != nil {
return err
}
srv := s.handler.GetServer()
// Register search services
resourcepb.RegisterResourceIndexServer(srv, searchServer)
resourcepb.RegisterManagedObjectIndexServer(srv, searchServer)
resourcepb.RegisterDiagnosticsServer(srv, searchServer)
grpc_health_v1.RegisterHealthServer(srv, healthService)
// register reflection service
_, err = grpcserver.ProvideReflectionService(s.cfg, s.handler)
if err != nil {
return err
}
if s.cfg.EnableSharding {
s.log.Info("waiting until search server is JOINING in the ring")
lfcCtx, cancel := context.WithTimeout(context.Background(), s.cfg.ResourceServerJoinRingTimeout)
defer cancel()
if err := ring.WaitInstanceState(lfcCtx, s.searchRing, s.ringLifecycler.GetInstanceID(), ring.JOINING); err != nil {
return fmt.Errorf("error switching to JOINING in the ring: %s", err)
}
s.log.Info("search server is JOINING in the ring")
if err := s.ringLifecycler.ChangeState(ctx, ring.ACTIVE); err != nil {
return fmt.Errorf("error switching to ACTIVE in the ring: %s", err)
}
s.log.Info("search server is ACTIVE in the ring")
}
// start the gRPC server
go func() {
err := s.handler.Run(ctx)
if err != nil {
s.stoppedCh <- err
} else {
s.stoppedCh <- nil
}
}()
return nil
}
// GetAddress returns the address of the gRPC server.
func (s *searchService) GetAddress() string {
return s.handler.GetAddress()
}
func (s *searchService) running(ctx context.Context) error {
select {
case err := <-s.stoppedCh:
if err != nil && !errors.Is(err, context.Canceled) {
return err
}
case err := <-s.subservicesWatcher.Chan():
return fmt.Errorf("subservice failure: %w", err)
case <-ctx.Done():
close(s.stopCh)
}
return nil
}
func (s *searchService) stopping(_ error) error {
if s.hasSubservices {
err := services.StopManagerAndAwaitStopped(context.Background(), s.subservices)
if err != nil {
return fmt.Errorf("failed to stop subservices: %w", err)
}
}
return nil
}
func (s *searchService) RegisterHTTPEndpoints(httpServerRouter *mux.Router) {
if httpServerRouter != nil && s.cfg.EnableSharding {
httpServerRouter.Path("/prepare-downscale").Methods("GET", "POST", "DELETE").Handler(http.HandlerFunc(s.PrepareDownscale))
}
}

View File

@@ -16,7 +16,6 @@ import (
secrets "github.com/grafana/grafana/pkg/registry/apis/secret/contracts" secrets "github.com/grafana/grafana/pkg/registry/apis/secret/contracts"
inlinesecurevalue "github.com/grafana/grafana/pkg/registry/apis/secret/inline" inlinesecurevalue "github.com/grafana/grafana/pkg/registry/apis/secret/inline"
"github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
"github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/resource" "github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl" "github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
@@ -30,8 +29,21 @@ type QOSEnqueueDequeuer interface {
Dequeue(ctx context.Context) (func(), error) Dequeue(ctx context.Context) (func(), error)
} }
// ServerOptions contains the options for creating a new ResourceServer // SearchServerOptions contains the options for creating a new SearchServer
type ServerOptions struct { type SearchServerOptions struct {
Backend resource.StorageBackend
DB infraDB.DB
Cfg *setting.Cfg
Tracer trace.Tracer
Reg prometheus.Registerer
AccessClient types.AccessClient
SearchOptions resource.SearchOptions
IndexMetrics *resource.BleveIndexMetrics
OwnsIndexFn func(key resource.NamespacedResource) (bool, error)
}
// StorageServerOptions contains the options for creating a storage-only server (without search)
type StorageServerOptions struct {
Backend resource.StorageBackend Backend resource.StorageBackend
OverridesService *resource.OverridesService OverridesService *resource.OverridesService
DB infraDB.DB DB infraDB.DB
@@ -39,16 +51,66 @@ type ServerOptions struct {
Tracer trace.Tracer Tracer trace.Tracer
Reg prometheus.Registerer Reg prometheus.Registerer
AccessClient types.AccessClient AccessClient types.AccessClient
SearchOptions resource.SearchOptions
StorageMetrics *resource.StorageMetrics StorageMetrics *resource.StorageMetrics
IndexMetrics *resource.BleveIndexMetrics
Features featuremgmt.FeatureToggles Features featuremgmt.FeatureToggles
QOSQueue QOSEnqueueDequeuer QOSQueue QOSEnqueueDequeuer
SecureValues secrets.InlineSecureValueSupport SecureValues secrets.InlineSecureValueSupport
OwnsIndexFn func(key resource.NamespacedResource) (bool, error)
} }
func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) { // NewSearchServer creates a new SearchServer with the given options.
// This can be used to create a standalone search server or to create a search server
// that will be passed to NewResourceServer.
//
// Important: When running in monolith mode, the backend should be provided by the caller
// to avoid duplicate metrics registration. Only in standalone microservice mode should
// this function create its own backend.
func NewSearchServer(opts SearchServerOptions) (resource.SearchServer, error) {
backend := opts.Backend
if backend == nil {
eDB, err := dbimpl.ProvideResourceDB(opts.DB, opts.Cfg, opts.Tracer)
if err != nil {
return nil, err
}
isHA := isHighAvailabilityEnabled(opts.Cfg.SectionWithEnvOverrides("database"),
opts.Cfg.SectionWithEnvOverrides("resource_api"))
b, err := NewBackend(BackendOptions{
DBProvider: eDB,
Reg: opts.Reg,
IsHA: isHA,
LastImportTimeMaxAge: opts.SearchOptions.MaxIndexAge,
})
if err != nil {
return nil, err
}
// Initialize the backend before creating search server
if err := b.Init(context.Background()); err != nil {
return nil, fmt.Errorf("failed to initialize backend: %w", err)
}
backend = b
}
search, err := resource.NewSearchServer(opts.SearchOptions, backend, opts.AccessClient, nil, opts.IndexMetrics, opts.OwnsIndexFn)
if err != nil {
return nil, fmt.Errorf("failed to create search server: %w", err)
}
if err := search.Init(context.Background()); err != nil {
return nil, fmt.Errorf("failed to initialize search server: %w", err)
}
return search, nil
}
// NewStorageServer creates a storage-only server without search capabilities.
// Use this when you want to run storage and search as separate services.
//
// Important: When running in monolith mode, the backend should be provided by the caller
// to avoid duplicate metrics registration. Only in standalone microservice mode should
// this function create its own backend.
func NewStorageServer(opts StorageServerOptions) (resource.ResourceServer, error) {
apiserverCfg := opts.Cfg.SectionWithEnvOverrides("grafana-apiserver") apiserverCfg := opts.Cfg.SectionWithEnvOverrides("grafana-apiserver")
if opts.SecureValues == nil && opts.Cfg != nil && opts.Cfg.SecretsManagement.GrpcClientEnable { if opts.SecureValues == nil && opts.Cfg != nil && opts.Cfg.SecretsManagement.GrpcClientEnable {
@@ -92,7 +154,6 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
if opts.Backend != nil { if opts.Backend != nil {
serverOptions.Backend = opts.Backend serverOptions.Backend = opts.Backend
// TODO: we should probably have a proper interface for diagnostics/lifecycle
} else { } else {
eDB, err := dbimpl.ProvideResourceDB(opts.DB, opts.Cfg, opts.Tracer) eDB, err := dbimpl.ProvideResourceDB(opts.DB, opts.Cfg, opts.Tracer)
if err != nil { if err != nil {
@@ -130,7 +191,6 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
return nil, fmt.Errorf("failed to create resource version manager: %w", err) return nil, fmt.Errorf("failed to create resource version manager: %w", err)
} }
// TODO add config to decide whether to pass RvManager or not
kvBackendOpts.RvManager = rvManager kvBackendOpts.RvManager = rvManager
kvBackend, err := resource.NewKVStorageBackend(kvBackendOpts) kvBackend, err := resource.NewKVStorageBackend(kvBackendOpts)
if err != nil { if err != nil {
@@ -148,7 +208,7 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
Reg: opts.Reg, Reg: opts.Reg,
IsHA: isHA, IsHA: isHA,
storageMetrics: opts.StorageMetrics, storageMetrics: opts.StorageMetrics,
LastImportTimeMaxAge: opts.SearchOptions.MaxIndexAge, // No need to keep last_import_times older than max index age. LastImportTimeMaxAge: opts.Cfg.MaxFileIndexAge,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@@ -159,33 +219,15 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
} }
} }
serverOptions.Search = opts.SearchOptions // Initialize the backend before creating server
serverOptions.IndexMetrics = opts.IndexMetrics if serverOptions.Lifecycle != nil {
if err := serverOptions.Lifecycle.Init(context.Background()); err != nil {
return nil, fmt.Errorf("failed to initialize backend: %w", err)
}
}
serverOptions.QOSQueue = opts.QOSQueue serverOptions.QOSQueue = opts.QOSQueue
serverOptions.OwnsIndexFn = opts.OwnsIndexFn
serverOptions.OverridesService = opts.OverridesService serverOptions.OverridesService = opts.OverridesService
return resource.NewResourceServer(serverOptions) return resource.NewResourceServer(serverOptions)
} }
// isHighAvailabilityEnabled determines if high availability mode should
// be enabled based on database configuration. High availability is enabled
// by default except for SQLite databases.
func isHighAvailabilityEnabled(dbCfg, resourceAPICfg *setting.DynamicSection) bool {
// If the resource API is using a non-SQLite database, we assume it's in HA mode.
resourceDBType := resourceAPICfg.Key("db_type").String()
if resourceDBType != "" && resourceDBType != migrator.SQLite {
return true
}
// Check in the config if HA is enabled - by default we always assume a HA setup.
isHA := dbCfg.Key("high_availability").MustBool(true)
// SQLite is not possible to run in HA, so we force it to false.
databaseType := dbCfg.Key("type").String()
if databaseType == migrator.SQLite {
isHA = false
}
return isHA
}

View File

@@ -12,6 +12,7 @@ import (
"time" "time"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/grafana/grafana/pkg/storage/unified/search"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promauto"
"go.opentelemetry.io/otel" "go.opentelemetry.io/otel"
@@ -36,7 +37,7 @@ import (
"github.com/grafana/grafana/pkg/storage/unified/resource" "github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/resource/grpc" "github.com/grafana/grafana/pkg/storage/unified/resource/grpc"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb" "github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/grafana/grafana/pkg/storage/unified/search" "github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
"github.com/grafana/grafana/pkg/util/scheduler" "github.com/grafana/grafana/pkg/util/scheduler"
) )
@@ -54,38 +55,40 @@ type UnifiedStorageGrpcService interface {
type service struct { type service struct {
*services.BasicService *services.BasicService
// Subservices manager backend resource.StorageBackend
subservices *services.Manager cfg *setting.Cfg
subservicesWatcher *services.FailureWatcher features featuremgmt.FeatureToggles
hasSubservices bool stopCh chan struct{}
stoppedCh chan error
backend resource.StorageBackend authenticator func(context.Context) (context.Context, error)
cfg *setting.Cfg tracing trace.Tracer
features featuremgmt.FeatureToggles db infraDB.DB
db infraDB.DB
stopCh chan struct{}
stoppedCh chan error
handler grpcserver.Provider
tracing trace.Tracer
authenticator func(ctx context.Context) (context.Context, error)
log log.Logger log log.Logger
reg prometheus.Registerer reg prometheus.Registerer
docBuilders resource.DocumentBuilderSupplier
storageMetrics *resource.StorageMetrics storageMetrics *resource.StorageMetrics
indexMetrics *resource.BleveIndexMetrics indexMetrics *resource.BleveIndexMetrics
docBuilders resource.DocumentBuilderSupplier
searchRing *ring.Ring searchRing *ring.Ring
// Handler for the gRPC server
handler grpcserver.Provider
// Ring lifecycle and sharding support
ringLifecycler *ring.BasicLifecycler ringLifecycler *ring.BasicLifecycler
queue QOSEnqueueDequeuer // QoS support
queue *scheduler.Queue
scheduler *scheduler.Scheduler scheduler *scheduler.Scheduler
// Subservices management
hasSubservices bool
subservices *services.Manager
subservicesWatcher *services.FailureWatcher
} }
// ProvideUnifiedStorageGrpcService provides a combined storage and search service running on the same gRPC server.
// This is used when running Grafana as a monolith where both services share the same process.
// Each service (storage and search) maintains its own lifecycle but shares the gRPC server.
func ProvideUnifiedStorageGrpcService( func ProvideUnifiedStorageGrpcService(
cfg *setting.Cfg, cfg *setting.Cfg,
features featuremgmt.FeatureToggles, features featuremgmt.FeatureToggles,
@@ -101,7 +104,7 @@ func ProvideUnifiedStorageGrpcService(
backend resource.StorageBackend, backend resource.StorageBackend,
) (UnifiedStorageGrpcService, error) { ) (UnifiedStorageGrpcService, error) {
var err error var err error
tracer := otel.Tracer("unified-storage") tracer := otel.Tracer("unified-storage-combined")
// FIXME: This is a temporary solution while we are migrating to the new authn interceptor // FIXME: This is a temporary solution while we are migrating to the new authn interceptor
// grpcutils.NewGrpcAuthenticator should be used instead. // grpcutils.NewGrpcAuthenticator should be used instead.
@@ -178,7 +181,7 @@ func ProvideUnifiedStorageGrpcService(
MaxSizePerTenant: cfg.QOSMaxSizePerTenant, MaxSizePerTenant: cfg.QOSMaxSizePerTenant,
Registerer: qosReg, Registerer: qosReg,
}) })
scheduler, err := scheduler.NewScheduler(queue, &scheduler.Config{ sched, err := scheduler.NewScheduler(queue, &scheduler.Config{
NumWorkers: cfg.QOSNumberWorker, NumWorkers: cfg.QOSNumberWorker,
Logger: log, Logger: log,
}) })
@@ -187,7 +190,7 @@ func ProvideUnifiedStorageGrpcService(
} }
s.queue = queue s.queue = queue
s.scheduler = scheduler s.scheduler = sched
subservices = append(subservices, s.queue, s.scheduler) subservices = append(subservices, s.queue, s.scheduler)
} }
@@ -200,6 +203,7 @@ func ProvideUnifiedStorageGrpcService(
} }
// This will be used when running as a dskit service // This will be used when running as a dskit service
// Note: We use StorageServer as the module name for backward compatibility
s.BasicService = services.NewBasicService(s.starting, s.running, s.stopping).WithName(modules.StorageServer) s.BasicService = services.NewBasicService(s.starting, s.running, s.stopping).WithName(modules.StorageServer)
return s, nil return s, nil
@@ -220,11 +224,6 @@ func (s *service) PrepareDownscale(w http.ResponseWriter, r *http.Request) {
} }
} }
var (
// operation used by the search-servers to check if they own the namespace
searchOwnerRead = ring.NewOp([]ring.InstanceState{ring.JOINING, ring.ACTIVE, ring.LEAVING}, nil)
)
func (s *service) OwnsIndex(key resource.NamespacedResource) (bool, error) { func (s *service) OwnsIndex(key resource.NamespacedResource) (bool, error) {
if s.searchRing == nil { if s.searchRing == nil {
return true, nil return true, nil
@@ -261,59 +260,108 @@ func (s *service) starting(ctx context.Context) error {
return err return err
} }
searchOptions, err := search.NewSearchOptions(s.features, s.cfg, s.docBuilders, s.indexMetrics, s.OwnsIndex) // Setup overrides service if enabled
if err != nil { var overridesSvc *resource.OverridesService
return err
}
serverOptions := ServerOptions{
Backend: s.backend,
DB: s.db,
Cfg: s.cfg,
Tracer: s.tracing,
Reg: s.reg,
AccessClient: authzClient,
SearchOptions: searchOptions,
StorageMetrics: s.storageMetrics,
IndexMetrics: s.indexMetrics,
Features: s.features,
QOSQueue: s.queue,
OwnsIndexFn: s.OwnsIndex,
}
if s.cfg.OverridesFilePath != "" { if s.cfg.OverridesFilePath != "" {
overridesSvc, err := resource.NewOverridesService(context.Background(), s.log, s.reg, s.tracing, resource.ReloadOptions{ overridesSvc, err = resource.NewOverridesService(context.Background(), s.log, s.reg, s.tracing, resource.ReloadOptions{
FilePath: s.cfg.OverridesFilePath, FilePath: s.cfg.OverridesFilePath,
ReloadPeriod: s.cfg.OverridesReloadInterval, ReloadPeriod: s.cfg.OverridesReloadInterval,
}) })
if err != nil { if err != nil {
return err return err
} }
serverOptions.OverridesService = overridesSvc
} }
server, err := NewResourceServer(serverOptions) // Ensure we have a backend - create one if needed
// This is critical: we create the backend ONCE and share it between search and storage servers
// to avoid duplicate metrics registration
backend := s.backend
if backend == nil {
eDB, err := dbimpl.ProvideResourceDB(s.db, s.cfg, s.tracing)
if err != nil {
return fmt.Errorf("failed to create resource DB: %w", err)
}
isHA := isHighAvailabilityEnabled(s.cfg.SectionWithEnvOverrides("database"),
s.cfg.SectionWithEnvOverrides("resource_api"))
b, err := NewBackend(BackendOptions{
DBProvider: eDB,
Reg: s.reg,
IsHA: isHA,
storageMetrics: s.storageMetrics,
LastImportTimeMaxAge: s.cfg.MaxFileIndexAge,
})
if err != nil {
return fmt.Errorf("failed to create backend: %w", err)
}
// Initialize the backend
if err := b.Init(context.Background()); err != nil {
return fmt.Errorf("failed to initialize backend: %w", err)
}
backend = b
}
// Create search options for the search server
searchOptions, err := search.NewSearchOptions(s.features, s.cfg, s.docBuilders, s.indexMetrics, s.OwnsIndex)
if err != nil { if err != nil {
return err return err
} }
// Create the search server - pass the shared backend
searchServer, err := NewSearchServer(SearchServerOptions{
Backend: backend, // Use the shared backend
DB: s.db,
Cfg: s.cfg,
Tracer: s.tracing,
Reg: s.reg,
AccessClient: authzClient,
SearchOptions: searchOptions,
IndexMetrics: s.indexMetrics,
OwnsIndexFn: s.OwnsIndex,
})
if err != nil {
return err
}
// Create the storage server - pass the shared backend
storageServer, err := NewStorageServer(StorageServerOptions{
Backend: backend, // Use the shared backend
OverridesService: overridesSvc,
DB: s.db,
Cfg: s.cfg,
Tracer: s.tracing,
Reg: s.reg,
AccessClient: authzClient,
StorageMetrics: s.storageMetrics,
Features: s.features,
QOSQueue: s.queue,
})
if err != nil {
return err
}
s.handler, err = grpcserver.ProvideService(s.cfg, s.features, interceptors.AuthenticatorFunc(s.authenticator), s.tracing, prometheus.DefaultRegisterer) s.handler, err = grpcserver.ProvideService(s.cfg, s.features, interceptors.AuthenticatorFunc(s.authenticator), s.tracing, prometheus.DefaultRegisterer)
if err != nil { if err != nil {
return err return err
} }
healthService, err := resource.ProvideHealthService(server) healthService, err := resource.ProvideHealthService(storageServer)
if err != nil { if err != nil {
return err return err
} }
srv := s.handler.GetServer() srv := s.handler.GetServer()
resourcepb.RegisterResourceStoreServer(srv, server) // Register storage services
resourcepb.RegisterBulkStoreServer(srv, server) resourcepb.RegisterResourceStoreServer(srv, storageServer)
resourcepb.RegisterResourceIndexServer(srv, server) resourcepb.RegisterBulkStoreServer(srv, storageServer)
resourcepb.RegisterManagedObjectIndexServer(srv, server) resourcepb.RegisterBlobStoreServer(srv, storageServer)
resourcepb.RegisterBlobStoreServer(srv, server) resourcepb.RegisterDiagnosticsServer(srv, storageServer)
resourcepb.RegisterDiagnosticsServer(srv, server) resourcepb.RegisterQuotasServer(srv, storageServer)
resourcepb.RegisterQuotasServer(srv, server) // Register search services
resourcepb.RegisterResourceIndexServer(srv, searchServer)
resourcepb.RegisterManagedObjectIndexServer(srv, searchServer)
grpc_health_v1.RegisterHealthServer(srv, healthService) grpc_health_v1.RegisterHealthServer(srv, healthService)
// register reflection service // register reflection service

View File

@@ -0,0 +1,234 @@
package sql
import (
"context"
"errors"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/health/grpc_health_v1"
"github.com/grafana/dskit/services"
infraDB "github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/modules"
"github.com/grafana/grafana/pkg/services/authz"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/grpcserver"
"github.com/grafana/grafana/pkg/services/grpcserver/interceptors"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/resource/grpc"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/grafana/grafana/pkg/util/scheduler"
)
var _ UnifiedStorageGrpcService = (*storageService)(nil)
type storageService struct {
*services.BasicService
backend resource.StorageBackend
cfg *setting.Cfg
features featuremgmt.FeatureToggles
db infraDB.DB
stopCh chan struct{}
stoppedCh chan error
handler grpcserver.Provider
tracing trace.Tracer
authenticator func(ctx context.Context) (context.Context, error)
log log.Logger
reg prometheus.Registerer
storageMetrics *resource.StorageMetrics
queue QOSEnqueueDequeuer
scheduler *scheduler.Scheduler
// Subservices manager
subservices *services.Manager
subservicesWatcher *services.FailureWatcher
hasSubservices bool
}
func ProvideStorageService(
cfg *setting.Cfg,
features featuremgmt.FeatureToggles,
db infraDB.DB,
log log.Logger,
reg prometheus.Registerer,
storageMetrics *resource.StorageMetrics,
backend resource.StorageBackend,
) (UnifiedStorageGrpcService, error) {
var err error
tracer := otel.Tracer("unified-storage-server")
authn := NewAuthenticatorWithFallback(cfg, reg, tracer, func(ctx context.Context) (context.Context, error) {
auth := grpc.Authenticator{Tracer: tracer}
return auth.Authenticate(ctx)
})
s := &storageService{
backend: backend,
cfg: cfg,
features: features,
stopCh: make(chan struct{}),
stoppedCh: make(chan error, 1),
authenticator: authn,
tracing: tracer,
db: db,
log: log,
reg: reg,
storageMetrics: storageMetrics,
subservicesWatcher: services.NewFailureWatcher(),
}
subservices := []services.Service{}
// Setup QOS if enabled
if cfg.QOSEnabled {
qosReg := prometheus.WrapRegistererWithPrefix("resource_server_qos_", reg)
queue := scheduler.NewQueue(&scheduler.QueueOptions{
MaxSizePerTenant: cfg.QOSMaxSizePerTenant,
Registerer: qosReg,
})
sched, err := scheduler.NewScheduler(queue, &scheduler.Config{
NumWorkers: cfg.QOSNumberWorker,
Logger: log,
})
if err != nil {
return nil, fmt.Errorf("failed to create qos scheduler: %s", err)
}
s.queue = queue
s.scheduler = sched
subservices = append(subservices, s.queue, s.scheduler)
}
if len(subservices) > 0 {
s.hasSubservices = true
s.subservices, err = services.NewManager(subservices...)
if err != nil {
return nil, fmt.Errorf("failed to create subservices manager: %w", err)
}
}
// This will be used when running as a dskit service
s.BasicService = services.NewBasicService(s.starting, s.running, s.stopping).WithName(modules.StorageServer)
return s, nil
}
func (s *storageService) starting(ctx context.Context) error {
if s.hasSubservices {
s.subservicesWatcher.WatchManager(s.subservices)
if err := services.StartManagerAndAwaitHealthy(ctx, s.subservices); err != nil {
return fmt.Errorf("failed to start subservices: %w", err)
}
}
authzClient, err := authz.ProvideStandaloneAuthZClient(s.cfg, s.features, s.tracing, s.reg)
if err != nil {
return err
}
// Setup overrides service if enabled
var overridesSvc *resource.OverridesService
if s.cfg.OverridesFilePath != "" {
overridesSvc, err = resource.NewOverridesService(context.Background(), s.log, s.reg, s.tracing, resource.ReloadOptions{
FilePath: s.cfg.OverridesFilePath,
ReloadPeriod: s.cfg.OverridesReloadInterval,
})
if err != nil {
return err
}
}
// Create the storage server
storageServer, err := NewStorageServer(StorageServerOptions{
Backend: s.backend,
OverridesService: overridesSvc,
DB: s.db,
Cfg: s.cfg,
Tracer: s.tracing,
Reg: s.reg,
AccessClient: authzClient,
StorageMetrics: s.storageMetrics,
Features: s.features,
QOSQueue: s.queue,
})
if err != nil {
return err
}
s.handler, err = grpcserver.ProvideService(s.cfg, s.features, interceptors.AuthenticatorFunc(s.authenticator), s.tracing, prometheus.DefaultRegisterer)
if err != nil {
return err
}
healthService, err := resource.ProvideHealthService(storageServer)
if err != nil {
return err
}
srv := s.handler.GetServer()
// Register storage services
resourcepb.RegisterResourceStoreServer(srv, storageServer)
resourcepb.RegisterBulkStoreServer(srv, storageServer)
resourcepb.RegisterBlobStoreServer(srv, storageServer)
resourcepb.RegisterDiagnosticsServer(srv, storageServer)
resourcepb.RegisterQuotasServer(srv, storageServer)
grpc_health_v1.RegisterHealthServer(srv, healthService)
// register reflection service
_, err = grpcserver.ProvideReflectionService(s.cfg, s.handler)
if err != nil {
return err
}
// start the gRPC server
go func() {
err := s.handler.Run(ctx)
if err != nil {
s.stoppedCh <- err
} else {
s.stoppedCh <- nil
}
}()
return nil
}
// GetAddress returns the address of the gRPC server.
func (s *storageService) GetAddress() string {
return s.handler.GetAddress()
}
func (s *storageService) running(ctx context.Context) error {
select {
case err := <-s.stoppedCh:
if err != nil && !errors.Is(err, context.Canceled) {
return err
}
case err := <-s.subservicesWatcher.Chan():
return fmt.Errorf("subservice failure: %w", err)
case <-ctx.Done():
close(s.stopCh)
}
return nil
}
func (s *storageService) stopping(_ error) error {
if s.hasSubservices {
err := services.StopManagerAndAwaitStopped(context.Background(), s.subservices)
if err != nil {
return fmt.Errorf("failed to stop subservices: %w", err)
}
}
return nil
}

View File

@@ -32,6 +32,7 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
nsPrefix := "test-ns" nsPrefix := "test-ns"
var server resource.ResourceServer var server resource.ResourceServer
var searchServer resource.SearchServer
t.Run("Create initial resources in storage", func(t *testing.T) { t.Run("Create initial resources in storage", func(t *testing.T) {
initialResources := []struct { initialResources := []struct {
@@ -96,25 +97,34 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
}) })
t.Run("Create a resource server with both backends", func(t *testing.T) { t.Run("Create a resource server with both backends", func(t *testing.T) {
// Create a resource server with both backends // Create search server first
var err error var err error
server, err = resource.NewResourceServer(resource.ResourceServerOptions{ searchOpts := resource.SearchOptions{
Backend: backend, Backend: searchBackend,
Search: resource.SearchOptions{ Resources: &resource.TestDocumentBuilderSupplier{
Backend: searchBackend, GroupsResources: map[string]string{
Resources: &resource.TestDocumentBuilderSupplier{ "test.grafana.app": "testresources",
GroupsResources: map[string]string{
"test.grafana.app": "testresources",
},
}, },
}, },
}
searchServer, err = resource.NewSearchServer(searchOpts, backend, nil, nil, nil, nil)
require.NoError(t, err)
require.NotNil(t, searchServer)
// Initialize the search server
err = searchServer.Init(ctx)
require.NoError(t, err)
// Create a resource server separately from the search server
server, err = resource.NewResourceServer(resource.ResourceServerOptions{
Backend: backend,
}) })
require.NoError(t, err) require.NoError(t, err)
}) })
t.Run("Search for initial resources", func(t *testing.T) { t.Run("Search for initial resources", func(t *testing.T) {
// Test 1: Search for initial resources // Test 1: Search for initial resources
searchResp, err := server.Search(ctx, &resourcepb.ResourceSearchRequest{ searchResp, err := searchServer.Search(ctx, &resourcepb.ResourceSearchRequest{
Options: &resourcepb.ListOptions{ Options: &resourcepb.ListOptions{
Key: &resourcepb.ResourceKey{ Key: &resourcepb.ResourceKey{
Group: "test.grafana.app", Group: "test.grafana.app",
@@ -194,7 +204,7 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
}) })
t.Run("Search for documents", func(t *testing.T) { t.Run("Search for documents", func(t *testing.T) {
searchResp, err := server.Search(ctx, &resourcepb.ResourceSearchRequest{ searchResp, err := searchServer.Search(ctx, &resourcepb.ResourceSearchRequest{
Options: &resourcepb.ListOptions{ Options: &resourcepb.ListOptions{
Key: &resourcepb.ResourceKey{ Key: &resourcepb.ResourceKey{
Group: "test.grafana.app", Group: "test.grafana.app",
@@ -212,7 +222,7 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
}) })
t.Run("Search with tags", func(t *testing.T) { t.Run("Search with tags", func(t *testing.T) {
searchResp, err := server.Search(ctx, &resourcepb.ResourceSearchRequest{ searchResp, err := searchServer.Search(ctx, &resourcepb.ResourceSearchRequest{
Options: &resourcepb.ListOptions{ Options: &resourcepb.ListOptions{
Key: &resourcepb.ResourceKey{ Key: &resourcepb.ResourceKey{
Group: "test.grafana.app", Group: "test.grafana.app",
@@ -231,7 +241,7 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
require.Equal(t, int64(0), searchResp.TotalHits) require.Equal(t, int64(0), searchResp.TotalHits)
// this is the correct way of searching by tag // this is the correct way of searching by tag
searchResp, err = server.Search(ctx, &resourcepb.ResourceSearchRequest{ searchResp, err = searchServer.Search(ctx, &resourcepb.ResourceSearchRequest{
Options: &resourcepb.ListOptions{ Options: &resourcepb.ListOptions{
Key: &resourcepb.ResourceKey{ Key: &resourcepb.ResourceKey{
Group: "test.grafana.app", Group: "test.grafana.app",
@@ -253,7 +263,7 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
}) })
t.Run("Search with specific tag", func(t *testing.T) { t.Run("Search with specific tag", func(t *testing.T) {
searchResp, err := server.Search(ctx, &resourcepb.ResourceSearchRequest{ searchResp, err := searchServer.Search(ctx, &resourcepb.ResourceSearchRequest{
Options: &resourcepb.ListOptions{ Options: &resourcepb.ListOptions{
Key: &resourcepb.ResourceKey{ Key: &resourcepb.ResourceKey{
Group: "test.grafana.app", Group: "test.grafana.app",
@@ -272,7 +282,7 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
require.Equal(t, int64(0), searchResp.TotalHits) require.Equal(t, int64(0), searchResp.TotalHits)
// this is the correct way of searching by tag // this is the correct way of searching by tag
searchResp, err = server.Search(ctx, &resourcepb.ResourceSearchRequest{ searchResp, err = searchServer.Search(ctx, &resourcepb.ResourceSearchRequest{
Options: &resourcepb.ListOptions{ Options: &resourcepb.ListOptions{
Key: &resourcepb.ResourceKey{ Key: &resourcepb.ResourceKey{
Group: "test.grafana.app", Group: "test.grafana.app",