Compare commits

...

1 Commits

Author SHA1 Message Date
Peter Štibraný
c7293f6fb5 Initial prototype extracting SearchServer from ResourceServer. 2025-12-16 09:41:56 +01:00
13 changed files with 96 additions and 113 deletions

View File

@@ -38,10 +38,10 @@ func (d *directResourceClient) GetBlob(ctx context.Context, in *resourcepb.GetBl
return d.server.GetBlob(ctx, in)
}
// GetStats implements ResourceClient.
func (d *directResourceClient) GetStats(ctx context.Context, in *resourcepb.ResourceStatsRequest, opts ...grpc.CallOption) (*resourcepb.ResourceStatsResponse, error) {
return d.server.GetStats(ctx, in)
}
//// GetStats implements ResourceClient.
//func (d *directResourceClient) GetStats(ctx context.Context, in *resourcepb.ResourceStatsRequest, opts ...grpc.CallOption) (*resourcepb.ResourceStatsResponse, error) {
// return d.server.GetStats(ctx, in)
//}
// IsHealthy implements ResourceClient.
func (d *directResourceClient) IsHealthy(ctx context.Context, in *resourcepb.HealthCheckRequest, opts ...grpc.CallOption) (*resourcepb.HealthCheckResponse, error) {
@@ -53,13 +53,13 @@ func (d *directResourceClient) List(ctx context.Context, in *resourcepb.ListRequ
return d.server.List(ctx, in)
}
func (d *directResourceClient) ListManagedObjects(ctx context.Context, in *resourcepb.ListManagedObjectsRequest, opts ...grpc.CallOption) (*resourcepb.ListManagedObjectsResponse, error) {
return d.server.ListManagedObjects(ctx, in)
}
func (d *directResourceClient) CountManagedObjects(ctx context.Context, in *resourcepb.CountManagedObjectsRequest, opts ...grpc.CallOption) (*resourcepb.CountManagedObjectsResponse, error) {
return d.server.CountManagedObjects(ctx, in)
}
//func (d *directResourceClient) ListManagedObjects(ctx context.Context, in *resourcepb.ListManagedObjectsRequest, opts ...grpc.CallOption) (*resourcepb.ListManagedObjectsResponse, error) {
// return d.server.ListManagedObjects(ctx, in)
//}
//
//func (d *directResourceClient) CountManagedObjects(ctx context.Context, in *resourcepb.CountManagedObjectsRequest, opts ...grpc.CallOption) (*resourcepb.CountManagedObjectsResponse, error) {
// return d.server.CountManagedObjects(ctx, in)
//}
// PutBlob implements ResourceClient.
func (d *directResourceClient) PutBlob(ctx context.Context, in *resourcepb.PutBlobRequest, opts ...grpc.CallOption) (*resourcepb.PutBlobResponse, error) {
@@ -72,9 +72,9 @@ func (d *directResourceClient) Read(ctx context.Context, in *resourcepb.ReadRequ
}
// Search implements ResourceClient.
func (d *directResourceClient) Search(ctx context.Context, in *resourcepb.ResourceSearchRequest, opts ...grpc.CallOption) (*resourcepb.ResourceSearchResponse, error) {
return d.server.Search(ctx, in)
}
//func (d *directResourceClient) Search(ctx context.Context, in *resourcepb.ResourceSearchRequest, opts ...grpc.CallOption) (*resourcepb.ResourceSearchResponse, error) {
// return d.server.Search(ctx, in)
//}
// Update implements ResourceClient.
func (d *directResourceClient) Update(ctx context.Context, in *resourcepb.UpdateRequest, opts ...grpc.CallOption) (*resourcepb.UpdateResponse, error) {

View File

@@ -11,6 +11,7 @@ import (
"k8s.io/apiserver/pkg/registry/rest"
"github.com/grafana/authlib/types"
"github.com/grafana/grafana/pkg/apimachinery/utils"
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"

View File

@@ -41,7 +41,6 @@ import (
"github.com/grafana/grafana/pkg/services/folder"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/apistore"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
)
@@ -78,7 +77,7 @@ func RegisterAPIService(cfg *setting.Cfg,
acService accesscontrol.Service,
accessClient authlib.AccessClient,
registerer prometheus.Registerer,
unified resource.ResourceClient,
unified resourcepb.ResourceIndexClient,
zanzanaClient zanzana.Client,
) *FolderAPIBuilder {
builder := &FolderAPIBuilder{
@@ -97,7 +96,7 @@ func RegisterAPIService(cfg *setting.Cfg,
return builder
}
func NewAPIService(ac authlib.AccessClient, searcher resource.ResourceClient, features featuremgmt.FeatureToggles, zanzanaClient zanzana.Client, resourcePermissionsSvc *dynamic.NamespaceableResourceInterface) *FolderAPIBuilder {
func NewAPIService(ac authlib.AccessClient, searcher resourcepb.ResourceIndexClient, features featuremgmt.FeatureToggles, zanzanaClient zanzana.Client, resourcePermissionsSvc *dynamic.NamespaceableResourceInterface) *FolderAPIBuilder {
return &FolderAPIBuilder{
features: features,
accessClient: ac,

View File

@@ -615,7 +615,7 @@ func NewLocalStore(resourceInfo utils.ResourceInfo, scheme *runtime.Scheme, defa
return nil, err
}
client := resource.NewLocalResourceClient(server)
client := resource.NewLocalResourceClient(server, nil)
optsGetter := apistore.NewRESTOptionsGetterForClient(client, nil, defaultOpts.StorageConfig.Config, nil)
store, err := grafanaregistry.NewRegistryStore(scheme, resourceInfo, optsGetter)

View File

@@ -79,7 +79,7 @@ func NewRESTOptionsGetterMemory(originalStorageConfig storagebackend.Config, sec
}
return NewRESTOptionsGetterForClient(
resource.NewLocalResourceClient(server),
resource.NewLocalResourceClient(server, nil),
secrets,
originalStorageConfig,
nil,
@@ -118,7 +118,7 @@ func NewRESTOptionsGetterForFileXX(path string,
}
return NewRESTOptionsGetterForClient(
resource.NewLocalResourceClient(server),
resource.NewLocalResourceClient(server, nil),
nil, // secrets
originalStorageConfig,
nil,

View File

@@ -156,7 +156,7 @@ func testSetup(t testing.TB, opts ...setupOption) (context.Context, storage.Inte
default:
t.Fatalf("unsupported storage type: %s", setupOpts.storageType)
}
client := resource.NewLocalResourceClient(server)
client := resource.NewLocalResourceClient(server, nil)
config := storagebackend.NewDefaultConfig(setupOpts.prefix, setupOpts.codec)
store, destroyFunc, err := apistore.NewStorage(

View File

@@ -21,6 +21,7 @@ import (
"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/services"
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
infraDB "github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/tracing"
@@ -135,7 +136,7 @@ func newClient(opts options.StorageOptions,
if err != nil {
return nil, err
}
return resource.NewLocalResourceClient(server), nil
return resource.NewLocalResourceClient(server, nil), nil
case options.StorageTypeUnifiedGrpc:
if opts.Address == "" {
@@ -224,11 +225,11 @@ func newClient(opts options.StorageOptions,
serverOptions.OverridesService = overridesSvc
}
server, err := sql.NewResourceServer(serverOptions)
server, searchServer, err := sql.NewResourceServer(serverOptions)
if err != nil {
return nil, err
}
return resource.NewLocalResourceClient(server), nil
return resource.NewLocalResourceClient(server, searchServer), nil
}
}

View File

@@ -31,11 +31,14 @@ import (
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
)
type SearchClient interface {
resourcepb.ResourceIndexClient
resourcepb.ManagedObjectIndexClient
}
//go:generate mockery --name ResourceClient --structname MockResourceClient --inpackage --filename client_mock.go --with-expecter
type ResourceClient interface {
resourcepb.ResourceStoreClient
resourcepb.ResourceIndexClient
resourcepb.ManagedObjectIndexClient
resourcepb.BulkStoreClient
resourcepb.BlobStoreClient
resourcepb.DiagnosticsClient
@@ -100,8 +103,6 @@ func NewLocalResourceClient(server ResourceServer) ResourceClient {
grpcAuthInt := grpcutils.NewUnsafeAuthenticator(tracer)
for _, desc := range []*grpc.ServiceDesc{
&resourcepb.ResourceStore_ServiceDesc,
&resourcepb.ResourceIndex_ServiceDesc,
&resourcepb.ManagedObjectIndex_ServiceDesc,
&resourcepb.BlobStore_ServiceDesc,
&resourcepb.BulkStore_ServiceDesc,
&resourcepb.Diagnostics_ServiceDesc,

View File

@@ -127,7 +127,10 @@ type SearchBackend interface {
GetOpenIndexes() []NamespacedResource
}
// This supports indexing+search regardless of implementation
var _ SearchServer = &searchSupport{}
// This supports indexing+search regardless of implementation.
// Implements SearchServer interface.
type searchSupport struct {
log log.Logger
storage StorageBackend
@@ -160,6 +163,10 @@ var (
_ resourcepb.ManagedObjectIndexServer = (*searchSupport)(nil)
)
func NewSearchServer(opts SearchOptions, storage StorageBackend, access types.AccessClient, blob BlobSupport, indexMetrics *BleveIndexMetrics, ownsIndexFn func(key NamespacedResource) (bool, error)) (SearchServer, error) {
return newSearchSupport(opts, storage, access, blob, indexMetrics, ownsIndexFn)
}
func newSearchSupport(opts SearchOptions, storage StorageBackend, access types.AccessClient, blob BlobSupport, indexMetrics *BleveIndexMetrics, ownsIndexFn func(key NamespacedResource) (bool, error)) (support *searchSupport, err error) {
// No backend search support
if opts.Backend == nil {
@@ -598,6 +605,15 @@ func (s *searchSupport) buildIndexes(ctx context.Context) (int, error) {
return totalBatchesIndexed, nil
}
func (s *searchSupport) Init(ctx context.Context) error {
return s.init(ctx)
}
func (s *searchSupport) Stop(_ context.Context) error {
s.stop()
return nil
}
func (s *searchSupport) init(ctx context.Context) error {
origCtx := ctx

View File

@@ -60,7 +60,7 @@ func ProvideSearchDistributorServer(cfg *setting.Cfg, features featuremgmt.Featu
}
type RingClient struct {
Client ResourceClient
Client SearchClient
grpc_health_v1.HealthClient
Conn *grpc.ClientConn
}
@@ -99,7 +99,7 @@ var (
func (ds *distributorServer) Search(ctx context.Context, r *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) {
ctx, span := ds.tracing.Start(ctx, "distributor.Search")
defer span.End()
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace, "Search")
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace)
if err != nil {
return nil, err
}
@@ -110,7 +110,7 @@ func (ds *distributorServer) Search(ctx context.Context, r *resourcepb.ResourceS
func (ds *distributorServer) GetStats(ctx context.Context, r *resourcepb.ResourceStatsRequest) (*resourcepb.ResourceStatsResponse, error) {
ctx, span := ds.tracing.Start(ctx, "distributor.GetStats")
defer span.End()
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace, "GetStats")
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace)
if err != nil {
return nil, err
}
@@ -215,7 +215,7 @@ func (ds *distributorServer) RebuildIndexes(ctx context.Context, r *resourcepb.R
func (ds *distributorServer) CountManagedObjects(ctx context.Context, r *resourcepb.CountManagedObjectsRequest) (*resourcepb.CountManagedObjectsResponse, error) {
ctx, span := ds.tracing.Start(ctx, "distributor.CountManagedObjects")
defer span.End()
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace, "CountManagedObjects")
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace)
if err != nil {
return nil, err
}
@@ -226,7 +226,7 @@ func (ds *distributorServer) CountManagedObjects(ctx context.Context, r *resourc
func (ds *distributorServer) ListManagedObjects(ctx context.Context, r *resourcepb.ListManagedObjectsRequest) (*resourcepb.ListManagedObjectsResponse, error) {
ctx, span := ds.tracing.Start(ctx, "distributor.ListManagedObjects")
defer span.End()
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace, "ListManagedObjects")
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace)
if err != nil {
return nil, err
}
@@ -234,7 +234,7 @@ func (ds *distributorServer) ListManagedObjects(ctx context.Context, r *resource
return client.ListManagedObjects(ctx, r)
}
func (ds *distributorServer) getClientToDistributeRequest(ctx context.Context, namespace string, methodName string) (context.Context, ResourceClient, error) {
func (ds *distributorServer) getClientToDistributeRequest(ctx context.Context, namespace string) (context.Context, SearchClient, error) {
ringHasher := fnv.New32a()
_, err := ringHasher.Write([]byte(namespace))
if err != nil {

View File

@@ -32,12 +32,17 @@ import (
var tracer = otel.Tracer("github.com/grafana/grafana/pkg/storage/unified/resource")
type SearchServer interface {
LifecycleHooks
resourcepb.ResourceIndexServer
resourcepb.ManagedObjectIndexServer
}
// ResourceServer implements all gRPC services
type ResourceServer interface {
resourcepb.ResourceStoreServer
resourcepb.BulkStoreServer
resourcepb.ResourceIndexServer
resourcepb.ManagedObjectIndexServer
resourcepb.BlobStoreServer
resourcepb.DiagnosticsServer
resourcepb.QuotasServer
@@ -220,7 +225,8 @@ type ResourceServerOptions struct {
Blob BlobConfig
// Search options
Search SearchOptions
SearchOptions SearchOptions // TODO: needed?
Search SearchServer
// Quota service
OverridesService *OverridesService
@@ -249,16 +255,12 @@ type ResourceServerOptions struct {
storageMetrics *StorageMetrics
IndexMetrics *BleveIndexMetrics
// MaxPageSizeBytes is the maximum size of a page in bytes.
MaxPageSizeBytes int
// QOSQueue is the quality of service queue used to enqueue
QOSQueue QOSEnqueuer
QOSConfig QueueConfig
OwnsIndexFn func(key NamespacedResource) (bool, error)
}
func NewResourceServer(opts ResourceServerOptions) (*server, error) {
@@ -341,23 +343,25 @@ func NewResourceServer(opts ResourceServerOptions) (*server, error) {
ctx: ctx,
cancel: cancel,
storageMetrics: opts.storageMetrics,
indexMetrics: opts.IndexMetrics,
maxPageSizeBytes: opts.MaxPageSizeBytes,
reg: opts.Reg,
queue: opts.QOSQueue,
queueConfig: opts.QOSConfig,
overridesService: opts.OverridesService,
search: opts.Search,
artificialSuccessfulWriteDelay: opts.Search.IndexMinUpdateInterval,
artificialSuccessfulWriteDelay: opts.SearchOptions.IndexMinUpdateInterval,
}
if opts.Search.Resources != nil {
var err error
s.search, err = newSearchSupport(opts.Search, s.backend, s.access, s.blob, opts.IndexMetrics, opts.OwnsIndexFn)
if err != nil {
return nil, err
/*
if opts.Search.Resources != nil {
var err error
s.search, err = newSearchSupport(opts.Search, s.backend, s.access, s.blob, opts.IndexMetrics, opts.OwnsIndexFn)
if err != nil {
return nil, err
}
}
}
*/
err := s.Init(ctx)
if err != nil {
@@ -375,7 +379,7 @@ type server struct {
backend StorageBackend
blob BlobSupport
secure secrets.InlineSecureValueSupport
search *searchSupport
search SearchServer
diagnostics resourcepb.DiagnosticsServer
access claims.AccessClient
writeHooks WriteAccessHooks
@@ -422,11 +426,6 @@ func (s *server) Init(ctx context.Context) error {
s.initErr = s.overridesService.init(ctx)
}
// initialize the search index
if s.initErr == nil && s.search != nil {
s.initErr = s.search.init(ctx)
}
// Start watching for changes
if s.initErr == nil {
s.initErr = s.initWatcher()
@@ -451,10 +450,6 @@ func (s *server) Stop(ctx context.Context) error {
}
}
if s.search != nil {
s.search.stop()
}
if s.overridesService != nil {
if err := s.overridesService.stop(ctx); err != nil {
stopFailed = true
@@ -1432,47 +1427,6 @@ func (s *server) Watch(req *resourcepb.WatchRequest, srv resourcepb.ResourceStor
}
}
func (s *server) Search(ctx context.Context, req *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) {
if s.search == nil {
return nil, fmt.Errorf("search index not configured")
}
return s.search.Search(ctx, req)
}
// GetStats implements ResourceServer.
func (s *server) GetStats(ctx context.Context, req *resourcepb.ResourceStatsRequest) (*resourcepb.ResourceStatsResponse, error) {
if err := s.Init(ctx); err != nil {
return nil, err
}
if s.search == nil {
// If the backend implements "GetStats", we can use it
srv, ok := s.backend.(resourcepb.ResourceIndexServer)
if ok {
return srv.GetStats(ctx, req)
}
return nil, fmt.Errorf("search index not configured")
}
return s.search.GetStats(ctx, req)
}
func (s *server) ListManagedObjects(ctx context.Context, req *resourcepb.ListManagedObjectsRequest) (*resourcepb.ListManagedObjectsResponse, error) {
if s.search == nil {
return nil, fmt.Errorf("search index not configured")
}
return s.search.ListManagedObjects(ctx, req)
}
func (s *server) CountManagedObjects(ctx context.Context, req *resourcepb.CountManagedObjectsRequest) (*resourcepb.CountManagedObjectsResponse, error) {
if s.search == nil {
return nil, fmt.Errorf("search index not configured")
}
return s.search.CountManagedObjects(ctx, req)
}
// IsHealthy implements ResourceServer.
func (s *server) IsHealthy(ctx context.Context, req *resourcepb.HealthCheckRequest) (*resourcepb.HealthCheckResponse, error) {
return s.diagnostics.IsHealthy(ctx, req)

View File

@@ -46,7 +46,7 @@ type ServerOptions struct {
OwnsIndexFn func(key resource.NamespacedResource) (bool, error)
}
func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
func NewResourceServer(opts ServerOptions) (resource.ResourceServer, resource.SearchServer, error) {
apiserverCfg := opts.Cfg.SectionWithEnvOverrides("grafana-apiserver")
if opts.SecureValues == nil && opts.Cfg != nil && opts.Cfg.SecretsManagement.GrpcClientEnable {
@@ -57,7 +57,7 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
nil, // not needed for gRPC client mode
)
if err != nil {
return nil, fmt.Errorf("failed to create inline secure value service: %w", err)
return nil, nil, fmt.Errorf("failed to create inline secure value service: %w", err)
}
opts.SecureValues = inlineSecureValueService
}
@@ -77,7 +77,7 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
dir := strings.Replace(serverOptions.Blob.URL, "./data", opts.Cfg.DataPath, 1)
err := os.MkdirAll(dir, 0700)
if err != nil {
return nil, err
return nil, nil, err
}
serverOptions.Blob.URL = "file:///" + dir
}
@@ -94,7 +94,7 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
} else {
eDB, err := dbimpl.ProvideResourceDB(opts.DB, opts.Cfg, opts.Tracer)
if err != nil {
return nil, err
return nil, nil, err
}
isHA := isHighAvailabilityEnabled(opts.Cfg.SectionWithEnvOverrides("database"),
@@ -108,20 +108,31 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
LastImportTimeMaxAge: opts.SearchOptions.MaxIndexAge, // No need to keep last_import_times older than max index age.
})
if err != nil {
return nil, err
return nil, nil, err
}
serverOptions.Backend = backend
serverOptions.Diagnostics = backend
serverOptions.Lifecycle = backend
}
serverOptions.Search = opts.SearchOptions
serverOptions.IndexMetrics = opts.IndexMetrics
search, err := resource.NewSearchServer(opts.SearchOptions, opts.Backend, opts.AccessClient, nil, opts.IndexMetrics, opts.OwnsIndexFn)
if err != nil {
return nil, nil, fmt.Errorf("failed to initialize search: %w", err)
}
if err := search.Init(context.Background()); err != nil {
return nil, nil, fmt.Errorf("failed to initialize search: %w", err)
}
serverOptions.Search = search
serverOptions.QOSQueue = opts.QOSQueue
serverOptions.OwnsIndexFn = opts.OwnsIndexFn
serverOptions.OverridesService = opts.OverridesService
return resource.NewResourceServer(serverOptions)
rs, err := resource.NewResourceServer(serverOptions)
if err != nil {
_ = search.Stop(context.Background())
}
return rs, nil, err
}
// isHighAvailabilityEnabled determines if high availability mode should

View File

@@ -291,7 +291,7 @@ func (s *service) starting(ctx context.Context) error {
serverOptions.OverridesService = overridesSvc
}
server, err := NewResourceServer(serverOptions)
server, searchServer, err := NewResourceServer(serverOptions)
if err != nil {
return err
}
@@ -308,8 +308,8 @@ func (s *service) starting(ctx context.Context) error {
srv := s.handler.GetServer()
resourcepb.RegisterResourceStoreServer(srv, server)
resourcepb.RegisterBulkStoreServer(srv, server)
resourcepb.RegisterResourceIndexServer(srv, server)
resourcepb.RegisterManagedObjectIndexServer(srv, server)
resourcepb.RegisterResourceIndexServer(srv, searchServer)
resourcepb.RegisterManagedObjectIndexServer(srv, searchServer)
resourcepb.RegisterBlobStoreServer(srv, server)
resourcepb.RegisterDiagnosticsServer(srv, server)
resourcepb.RegisterQuotasServer(srv, server)