Compare commits

..

1 Commits

Author SHA1 Message Date
Peter Štibraný
c7293f6fb5 Initial prototype extracting SearchServer from ResourceServer. 2025-12-16 09:41:56 +01:00
14 changed files with 104 additions and 190 deletions

View File

@@ -38,10 +38,10 @@ func (d *directResourceClient) GetBlob(ctx context.Context, in *resourcepb.GetBl
return d.server.GetBlob(ctx, in)
}
// GetStats implements ResourceClient.
func (d *directResourceClient) GetStats(ctx context.Context, in *resourcepb.ResourceStatsRequest, opts ...grpc.CallOption) (*resourcepb.ResourceStatsResponse, error) {
return d.server.GetStats(ctx, in)
}
//// GetStats implements ResourceClient.
//func (d *directResourceClient) GetStats(ctx context.Context, in *resourcepb.ResourceStatsRequest, opts ...grpc.CallOption) (*resourcepb.ResourceStatsResponse, error) {
// return d.server.GetStats(ctx, in)
//}
// IsHealthy implements ResourceClient.
func (d *directResourceClient) IsHealthy(ctx context.Context, in *resourcepb.HealthCheckRequest, opts ...grpc.CallOption) (*resourcepb.HealthCheckResponse, error) {
@@ -53,13 +53,13 @@ func (d *directResourceClient) List(ctx context.Context, in *resourcepb.ListRequ
return d.server.List(ctx, in)
}
func (d *directResourceClient) ListManagedObjects(ctx context.Context, in *resourcepb.ListManagedObjectsRequest, opts ...grpc.CallOption) (*resourcepb.ListManagedObjectsResponse, error) {
return d.server.ListManagedObjects(ctx, in)
}
func (d *directResourceClient) CountManagedObjects(ctx context.Context, in *resourcepb.CountManagedObjectsRequest, opts ...grpc.CallOption) (*resourcepb.CountManagedObjectsResponse, error) {
return d.server.CountManagedObjects(ctx, in)
}
//func (d *directResourceClient) ListManagedObjects(ctx context.Context, in *resourcepb.ListManagedObjectsRequest, opts ...grpc.CallOption) (*resourcepb.ListManagedObjectsResponse, error) {
// return d.server.ListManagedObjects(ctx, in)
//}
//
//func (d *directResourceClient) CountManagedObjects(ctx context.Context, in *resourcepb.CountManagedObjectsRequest, opts ...grpc.CallOption) (*resourcepb.CountManagedObjectsResponse, error) {
// return d.server.CountManagedObjects(ctx, in)
//}
// PutBlob implements ResourceClient.
func (d *directResourceClient) PutBlob(ctx context.Context, in *resourcepb.PutBlobRequest, opts ...grpc.CallOption) (*resourcepb.PutBlobResponse, error) {
@@ -72,9 +72,9 @@ func (d *directResourceClient) Read(ctx context.Context, in *resourcepb.ReadRequ
}
// Search implements ResourceClient.
func (d *directResourceClient) Search(ctx context.Context, in *resourcepb.ResourceSearchRequest, opts ...grpc.CallOption) (*resourcepb.ResourceSearchResponse, error) {
return d.server.Search(ctx, in)
}
//func (d *directResourceClient) Search(ctx context.Context, in *resourcepb.ResourceSearchRequest, opts ...grpc.CallOption) (*resourcepb.ResourceSearchResponse, error) {
// return d.server.Search(ctx, in)
//}
// Update implements ResourceClient.
func (d *directResourceClient) Update(ctx context.Context, in *resourcepb.UpdateRequest, opts ...grpc.CallOption) (*resourcepb.UpdateResponse, error) {

View File

@@ -11,6 +11,7 @@ import (
"k8s.io/apiserver/pkg/registry/rest"
"github.com/grafana/authlib/types"
"github.com/grafana/grafana/pkg/apimachinery/utils"
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"

View File

@@ -41,7 +41,6 @@ import (
"github.com/grafana/grafana/pkg/services/folder"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/apistore"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
)
@@ -78,7 +77,7 @@ func RegisterAPIService(cfg *setting.Cfg,
acService accesscontrol.Service,
accessClient authlib.AccessClient,
registerer prometheus.Registerer,
unified resource.ResourceClient,
unified resourcepb.ResourceIndexClient,
zanzanaClient zanzana.Client,
) *FolderAPIBuilder {
builder := &FolderAPIBuilder{
@@ -97,7 +96,7 @@ func RegisterAPIService(cfg *setting.Cfg,
return builder
}
func NewAPIService(ac authlib.AccessClient, searcher resource.ResourceClient, features featuremgmt.FeatureToggles, zanzanaClient zanzana.Client, resourcePermissionsSvc *dynamic.NamespaceableResourceInterface) *FolderAPIBuilder {
func NewAPIService(ac authlib.AccessClient, searcher resourcepb.ResourceIndexClient, features featuremgmt.FeatureToggles, zanzanaClient zanzana.Client, resourcePermissionsSvc *dynamic.NamespaceableResourceInterface) *FolderAPIBuilder {
return &FolderAPIBuilder{
features: features,
accessClient: ac,

View File

@@ -615,7 +615,7 @@ func NewLocalStore(resourceInfo utils.ResourceInfo, scheme *runtime.Scheme, defa
return nil, err
}
client := resource.NewLocalResourceClient(server)
client := resource.NewLocalResourceClient(server, nil)
optsGetter := apistore.NewRESTOptionsGetterForClient(client, nil, defaultOpts.StorageConfig.Config, nil)
store, err := grafanaregistry.NewRegistryStore(scheme, resourceInfo, optsGetter)

View File

@@ -13,9 +13,6 @@ import (
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/ring"
ringclient "github.com/grafana/dskit/ring/client"
"github.com/grafana/grafana/pkg/services/apiserver/options"
"github.com/grafana/grafana/pkg/storage/unified"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/prometheus/client_golang/prometheus"
"github.com/urfave/cli/v2"
@@ -79,22 +76,6 @@ func newModuleServer(opts Options,
) (*ModuleServer, error) {
rootCtx, shutdownFn := context.WithCancel(context.Background())
// TODO should inject this with Wire
apiserverCfg := cfg.SectionWithEnvOverrides("grafana-apiserver")
searchServerAddress := apiserverCfg.Key("search_server_address").MustString("")
var searchClient resourcepb.ResourceIndexClient
var err error
if searchServerAddress != "" {
storageOptions := options.StorageOptions{
SearchServerAddress: searchServerAddress,
}
searchClient, err = unified.NewSearchClient(storageOptions, features)
if err != nil {
shutdownFn()
return nil, fmt.Errorf("failed to create search client: %w", err)
}
}
s := &ModuleServer{
opts: opts,
apiOpts: apiOpts,
@@ -115,7 +96,6 @@ func newModuleServer(opts Options,
license: license,
moduleRegisterer: moduleRegisterer,
storageBackend: storageBackend,
searchClient: searchClient,
hooksService: hooksService,
}
@@ -139,7 +119,6 @@ type ModuleServer struct {
isInitialized bool
mtx sync.Mutex
storageBackend resource.StorageBackend
searchClient resourcepb.ResourceIndexClient
storageMetrics *resource.StorageMetrics
indexMetrics *resource.BleveIndexMetrics
license licensing.Licensing
@@ -223,7 +202,7 @@ func (s *ModuleServer) Run() error {
if err != nil {
return nil, err
}
return sql.ProvideUnifiedStorageGrpcService(s.cfg, s.features, nil, s.log, s.registerer, docBuilders, s.storageMetrics, s.indexMetrics, s.searchServerRing, s.MemberlistKVConfig, s.httpServerRouter, s.storageBackend, s.searchClient)
return sql.ProvideUnifiedStorageGrpcService(s.cfg, s.features, nil, s.log, s.registerer, docBuilders, s.storageMetrics, s.indexMetrics, s.searchServerRing, s.MemberlistKVConfig, s.httpServerRouter, s.storageBackend)
})
m.RegisterModule(modules.ZanzanaServer, func() (services.Service, error) {

View File

@@ -79,7 +79,7 @@ func NewRESTOptionsGetterMemory(originalStorageConfig storagebackend.Config, sec
}
return NewRESTOptionsGetterForClient(
resource.NewLocalResourceClient(server),
resource.NewLocalResourceClient(server, nil),
secrets,
originalStorageConfig,
nil,
@@ -118,7 +118,7 @@ func NewRESTOptionsGetterForFileXX(path string,
}
return NewRESTOptionsGetterForClient(
resource.NewLocalResourceClient(server),
resource.NewLocalResourceClient(server, nil),
nil, // secrets
originalStorageConfig,
nil,

View File

@@ -156,7 +156,7 @@ func testSetup(t testing.TB, opts ...setupOption) (context.Context, storage.Inte
default:
t.Fatalf("unsupported storage type: %s", setupOpts.storageType)
}
client := resource.NewLocalResourceClient(server)
client := resource.NewLocalResourceClient(server, nil)
config := storagebackend.NewDefaultConfig(setupOpts.prefix, setupOpts.codec)
store, destroyFunc, err := apistore.NewStorage(

View File

@@ -7,8 +7,6 @@ import (
"time"
badger "github.com/dgraph-io/badger/v4"
"github.com/fullstorydev/grpchan"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
@@ -23,6 +21,7 @@ import (
"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/services"
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
infraDB "github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/tracing"
@@ -33,7 +32,6 @@ import (
"github.com/grafana/grafana/pkg/storage/legacysql"
"github.com/grafana/grafana/pkg/storage/unified/federated"
"github.com/grafana/grafana/pkg/storage/unified/resource"
grpcUtils "github.com/grafana/grafana/pkg/storage/unified/resource/grpc"
"github.com/grafana/grafana/pkg/storage/unified/search"
"github.com/grafana/grafana/pkg/storage/unified/sql"
"github.com/grafana/grafana/pkg/util/scheduler"
@@ -94,27 +92,6 @@ func ProvideUnifiedStorageClient(opts *Options,
return client, err
}
// TODO use wire to provide to module server
func NewSearchClient(opts options.StorageOptions, features featuremgmt.FeatureToggles) (resourcepb.ResourceIndexClient, error) {
if opts.SearchServerAddress == "" {
return nil, fmt.Errorf("expecting address for search server")
}
var (
conn grpc.ClientConnInterface
err error
metrics = newClientMetrics(prometheus.NewRegistry())
)
conn, err = newGrpcConn(opts.SearchServerAddress, metrics, features, opts.GrpcClientKeepaliveTime)
if err != nil {
return nil, err
}
cc := grpchan.InterceptClientConn(conn, grpcUtils.UnaryClientInterceptor, grpcUtils.StreamClientInterceptor)
return resourcepb.NewResourceIndexClient(cc), nil
}
func newClient(opts options.StorageOptions,
cfg *setting.Cfg,
features featuremgmt.FeatureToggles,
@@ -159,7 +136,7 @@ func newClient(opts options.StorageOptions,
if err != nil {
return nil, err
}
return resource.NewLocalResourceClient(server), nil
return resource.NewLocalResourceClient(server, nil), nil
case options.StorageTypeUnifiedGrpc:
if opts.Address == "" {
@@ -248,11 +225,11 @@ func newClient(opts options.StorageOptions,
serverOptions.OverridesService = overridesSvc
}
server, err := sql.NewResourceServer(serverOptions)
server, searchServer, err := sql.NewResourceServer(serverOptions)
if err != nil {
return nil, err
}
return resource.NewLocalResourceClient(server), nil
return resource.NewLocalResourceClient(server, searchServer), nil
}
}

View File

@@ -31,11 +31,14 @@ import (
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
)
type SearchClient interface {
resourcepb.ResourceIndexClient
resourcepb.ManagedObjectIndexClient
}
//go:generate mockery --name ResourceClient --structname MockResourceClient --inpackage --filename client_mock.go --with-expecter
type ResourceClient interface {
resourcepb.ResourceStoreClient
resourcepb.ResourceIndexClient
resourcepb.ManagedObjectIndexClient
resourcepb.BulkStoreClient
resourcepb.BlobStoreClient
resourcepb.DiagnosticsClient
@@ -100,8 +103,6 @@ func NewLocalResourceClient(server ResourceServer) ResourceClient {
grpcAuthInt := grpcutils.NewUnsafeAuthenticator(tracer)
for _, desc := range []*grpc.ServiceDesc{
&resourcepb.ResourceStore_ServiceDesc,
&resourcepb.ResourceIndex_ServiceDesc,
&resourcepb.ManagedObjectIndex_ServiceDesc,
&resourcepb.BlobStore_ServiceDesc,
&resourcepb.BulkStore_ServiceDesc,
&resourcepb.Diagnostics_ServiceDesc,

View File

@@ -127,7 +127,10 @@ type SearchBackend interface {
GetOpenIndexes() []NamespacedResource
}
// This supports indexing+search regardless of implementation
var _ SearchServer = &searchSupport{}
// This supports indexing+search regardless of implementation.
// Implements SearchServer interface.
type searchSupport struct {
log log.Logger
storage StorageBackend
@@ -160,6 +163,10 @@ var (
_ resourcepb.ManagedObjectIndexServer = (*searchSupport)(nil)
)
func NewSearchServer(opts SearchOptions, storage StorageBackend, access types.AccessClient, blob BlobSupport, indexMetrics *BleveIndexMetrics, ownsIndexFn func(key NamespacedResource) (bool, error)) (SearchServer, error) {
return newSearchSupport(opts, storage, access, blob, indexMetrics, ownsIndexFn)
}
func newSearchSupport(opts SearchOptions, storage StorageBackend, access types.AccessClient, blob BlobSupport, indexMetrics *BleveIndexMetrics, ownsIndexFn func(key NamespacedResource) (bool, error)) (support *searchSupport, err error) {
// No backend search support
if opts.Backend == nil {
@@ -598,6 +605,15 @@ func (s *searchSupport) buildIndexes(ctx context.Context) (int, error) {
return totalBatchesIndexed, nil
}
func (s *searchSupport) Init(ctx context.Context) error {
return s.init(ctx)
}
func (s *searchSupport) Stop(_ context.Context) error {
s.stop()
return nil
}
func (s *searchSupport) init(ctx context.Context) error {
origCtx := ctx

View File

@@ -60,7 +60,7 @@ func ProvideSearchDistributorServer(cfg *setting.Cfg, features featuremgmt.Featu
}
type RingClient struct {
Client ResourceClient
Client SearchClient
grpc_health_v1.HealthClient
Conn *grpc.ClientConn
}
@@ -99,7 +99,7 @@ var (
func (ds *distributorServer) Search(ctx context.Context, r *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) {
ctx, span := ds.tracing.Start(ctx, "distributor.Search")
defer span.End()
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace, "Search")
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace)
if err != nil {
return nil, err
}
@@ -110,7 +110,7 @@ func (ds *distributorServer) Search(ctx context.Context, r *resourcepb.ResourceS
func (ds *distributorServer) GetStats(ctx context.Context, r *resourcepb.ResourceStatsRequest) (*resourcepb.ResourceStatsResponse, error) {
ctx, span := ds.tracing.Start(ctx, "distributor.GetStats")
defer span.End()
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace, "GetStats")
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace)
if err != nil {
return nil, err
}
@@ -215,7 +215,7 @@ func (ds *distributorServer) RebuildIndexes(ctx context.Context, r *resourcepb.R
func (ds *distributorServer) CountManagedObjects(ctx context.Context, r *resourcepb.CountManagedObjectsRequest) (*resourcepb.CountManagedObjectsResponse, error) {
ctx, span := ds.tracing.Start(ctx, "distributor.CountManagedObjects")
defer span.End()
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace, "CountManagedObjects")
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace)
if err != nil {
return nil, err
}
@@ -226,7 +226,7 @@ func (ds *distributorServer) CountManagedObjects(ctx context.Context, r *resourc
func (ds *distributorServer) ListManagedObjects(ctx context.Context, r *resourcepb.ListManagedObjectsRequest) (*resourcepb.ListManagedObjectsResponse, error) {
ctx, span := ds.tracing.Start(ctx, "distributor.ListManagedObjects")
defer span.End()
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace, "ListManagedObjects")
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace)
if err != nil {
return nil, err
}
@@ -234,7 +234,7 @@ func (ds *distributorServer) ListManagedObjects(ctx context.Context, r *resource
return client.ListManagedObjects(ctx, r)
}
func (ds *distributorServer) getClientToDistributeRequest(ctx context.Context, namespace string, methodName string) (context.Context, ResourceClient, error) {
func (ds *distributorServer) getClientToDistributeRequest(ctx context.Context, namespace string) (context.Context, SearchClient, error) {
ringHasher := fnv.New32a()
_, err := ringHasher.Write([]byte(namespace))
if err != nil {

View File

@@ -32,12 +32,17 @@ import (
var tracer = otel.Tracer("github.com/grafana/grafana/pkg/storage/unified/resource")
type SearchServer interface {
LifecycleHooks
resourcepb.ResourceIndexServer
resourcepb.ManagedObjectIndexServer
}
// ResourceServer implements all gRPC services
type ResourceServer interface {
resourcepb.ResourceStoreServer
resourcepb.BulkStoreServer
resourcepb.ResourceIndexServer
resourcepb.ManagedObjectIndexServer
resourcepb.BlobStoreServer
resourcepb.DiagnosticsServer
resourcepb.QuotasServer
@@ -220,10 +225,8 @@ type ResourceServerOptions struct {
Blob BlobConfig
// Search options
Search SearchOptions
// to be used by storage
SearchClient resourcepb.ResourceIndexClient
SearchOptions SearchOptions // TODO: needed?
Search SearchServer
// Quota service
OverridesService *OverridesService
@@ -252,16 +255,12 @@ type ResourceServerOptions struct {
storageMetrics *StorageMetrics
IndexMetrics *BleveIndexMetrics
// MaxPageSizeBytes is the maximum size of a page in bytes.
MaxPageSizeBytes int
// QOSQueue is the quality of service queue used to enqueue
QOSQueue QOSEnqueuer
QOSConfig QueueConfig
OwnsIndexFn func(key NamespacedResource) (bool, error)
}
func NewResourceServer(opts ResourceServerOptions) (*server, error) {
@@ -344,24 +343,25 @@ func NewResourceServer(opts ResourceServerOptions) (*server, error) {
ctx: ctx,
cancel: cancel,
storageMetrics: opts.storageMetrics,
indexMetrics: opts.IndexMetrics,
maxPageSizeBytes: opts.MaxPageSizeBytes,
reg: opts.Reg,
queue: opts.QOSQueue,
queueConfig: opts.QOSConfig,
overridesService: opts.OverridesService,
searchClient: opts.SearchClient,
search: opts.Search,
artificialSuccessfulWriteDelay: opts.Search.IndexMinUpdateInterval,
artificialSuccessfulWriteDelay: opts.SearchOptions.IndexMinUpdateInterval,
}
if opts.Search.Resources != nil {
var err error
s.search, err = newSearchSupport(opts.Search, s.backend, s.access, s.blob, opts.IndexMetrics, opts.OwnsIndexFn)
if err != nil {
return nil, err
/*
if opts.Search.Resources != nil {
var err error
s.search, err = newSearchSupport(opts.Search, s.backend, s.access, s.blob, opts.IndexMetrics, opts.OwnsIndexFn)
if err != nil {
return nil, err
}
}
}
*/
err := s.Init(ctx)
if err != nil {
@@ -379,7 +379,7 @@ type server struct {
backend StorageBackend
blob BlobSupport
secure secrets.InlineSecureValueSupport
search *searchSupport
search SearchServer
diagnostics resourcepb.DiagnosticsServer
access claims.AccessClient
writeHooks WriteAccessHooks
@@ -390,9 +390,6 @@ type server struct {
indexMetrics *BleveIndexMetrics
overridesService *OverridesService
// only to be used with storage server for field selector search
searchClient resourcepb.ResourceIndexClient
// Background watch task -- this has permissions for everything
ctx context.Context
cancel context.CancelFunc
@@ -429,11 +426,6 @@ func (s *server) Init(ctx context.Context) error {
s.initErr = s.overridesService.init(ctx)
}
// initialize the search index
if s.initErr == nil && s.search != nil {
s.initErr = s.search.init(ctx)
}
// Start watching for changes
if s.initErr == nil {
s.initErr = s.initWatcher()
@@ -458,10 +450,6 @@ func (s *server) Stop(ctx context.Context) error {
}
}
if s.search != nil {
s.search.stop()
}
if s.overridesService != nil {
if err := s.overridesService.stop(ctx); err != nil {
stopFailed = true
@@ -1066,8 +1054,7 @@ func (s *server) List(ctx context.Context, req *resourcepb.ListRequest) (*resour
// TODO: What to do about RV and version_match fields?
// If we get here, we're doing list with selectable fields. Let's do search instead, since
// we index all selectable fields, and fetch resulting documents one by one.
if (s.search != nil || s.searchClient != nil) && req.Source == resourcepb.ListRequest_STORE && (len(req.Options.Fields) > 0) {
if s.search != nil && req.Source == resourcepb.ListRequest_STORE && (len(req.Options.Fields) > 0) {
if req.Options.Key.Namespace == "" {
return &resourcepb.ListResponse{
Error: NewBadRequestError("namespace must be specified for list with filter"),
@@ -1083,13 +1070,7 @@ func (s *server) List(ctx context.Context, req *resourcepb.ListRequest) (*resour
// Permission: 0, // Not needed, default is List
}
var searchResp *resourcepb.ResourceSearchResponse
var err error
if s.searchClient != nil {
searchResp, err = s.searchClient.Search(ctx, srq)
} else {
searchResp, err = s.search.Search(ctx, srq)
}
searchResp, err := s.search.Search(ctx, srq)
if err != nil {
return nil, err
}
@@ -1446,47 +1427,6 @@ func (s *server) Watch(req *resourcepb.WatchRequest, srv resourcepb.ResourceStor
}
}
func (s *server) Search(ctx context.Context, req *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) {
if s.search == nil {
return nil, fmt.Errorf("search index not configured")
}
return s.search.Search(ctx, req)
}
// GetStats implements ResourceServer.
func (s *server) GetStats(ctx context.Context, req *resourcepb.ResourceStatsRequest) (*resourcepb.ResourceStatsResponse, error) {
if err := s.Init(ctx); err != nil {
return nil, err
}
if s.search == nil {
// If the backend implements "GetStats", we can use it
srv, ok := s.backend.(resourcepb.ResourceIndexServer)
if ok {
return srv.GetStats(ctx, req)
}
return nil, fmt.Errorf("search index not configured")
}
return s.search.GetStats(ctx, req)
}
func (s *server) ListManagedObjects(ctx context.Context, req *resourcepb.ListManagedObjectsRequest) (*resourcepb.ListManagedObjectsResponse, error) {
if s.search == nil {
return nil, fmt.Errorf("search index not configured")
}
return s.search.ListManagedObjects(ctx, req)
}
func (s *server) CountManagedObjects(ctx context.Context, req *resourcepb.CountManagedObjectsRequest) (*resourcepb.CountManagedObjectsResponse, error) {
if s.search == nil {
return nil, fmt.Errorf("search index not configured")
}
return s.search.CountManagedObjects(ctx, req)
}
// IsHealthy implements ResourceServer.
func (s *server) IsHealthy(ctx context.Context, req *resourcepb.HealthCheckRequest) (*resourcepb.HealthCheckResponse, error) {
return s.diagnostics.IsHealthy(ctx, req)

View File

@@ -6,7 +6,6 @@ import (
"os"
"strings"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/trace"
@@ -38,7 +37,6 @@ type ServerOptions struct {
Tracer trace.Tracer
Reg prometheus.Registerer
AccessClient types.AccessClient
SearchClient resourcepb.ResourceIndexClient
SearchOptions resource.SearchOptions
StorageMetrics *resource.StorageMetrics
IndexMetrics *resource.BleveIndexMetrics
@@ -48,7 +46,7 @@ type ServerOptions struct {
OwnsIndexFn func(key resource.NamespacedResource) (bool, error)
}
func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
func NewResourceServer(opts ServerOptions) (resource.ResourceServer, resource.SearchServer, error) {
apiserverCfg := opts.Cfg.SectionWithEnvOverrides("grafana-apiserver")
if opts.SecureValues == nil && opts.Cfg != nil && opts.Cfg.SecretsManagement.GrpcClientEnable {
@@ -59,7 +57,7 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
nil, // not needed for gRPC client mode
)
if err != nil {
return nil, fmt.Errorf("failed to create inline secure value service: %w", err)
return nil, nil, fmt.Errorf("failed to create inline secure value service: %w", err)
}
opts.SecureValues = inlineSecureValueService
}
@@ -79,7 +77,7 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
dir := strings.Replace(serverOptions.Blob.URL, "./data", opts.Cfg.DataPath, 1)
err := os.MkdirAll(dir, 0700)
if err != nil {
return nil, err
return nil, nil, err
}
serverOptions.Blob.URL = "file:///" + dir
}
@@ -96,7 +94,7 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
} else {
eDB, err := dbimpl.ProvideResourceDB(opts.DB, opts.Cfg, opts.Tracer)
if err != nil {
return nil, err
return nil, nil, err
}
isHA := isHighAvailabilityEnabled(opts.Cfg.SectionWithEnvOverrides("database"),
@@ -110,24 +108,31 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
LastImportTimeMaxAge: opts.SearchOptions.MaxIndexAge, // No need to keep last_import_times older than max index age.
})
if err != nil {
return nil, err
return nil, nil, err
}
serverOptions.Backend = backend
serverOptions.Diagnostics = backend
serverOptions.Lifecycle = backend
}
// use the search client when search isnt initialized. Dont need both.
if opts.SearchOptions.Resources == nil {
serverOptions.SearchClient = opts.SearchClient
search, err := resource.NewSearchServer(opts.SearchOptions, opts.Backend, opts.AccessClient, nil, opts.IndexMetrics, opts.OwnsIndexFn)
if err != nil {
return nil, nil, fmt.Errorf("failed to initialize search: %w", err)
}
serverOptions.Search = opts.SearchOptions
serverOptions.IndexMetrics = opts.IndexMetrics
if err := search.Init(context.Background()); err != nil {
return nil, nil, fmt.Errorf("failed to initialize search: %w", err)
}
serverOptions.Search = search
serverOptions.QOSQueue = opts.QOSQueue
serverOptions.OwnsIndexFn = opts.OwnsIndexFn
serverOptions.OverridesService = opts.OverridesService
return resource.NewResourceServer(serverOptions)
rs, err := resource.NewResourceServer(serverOptions)
if err != nil {
_ = search.Stop(context.Background())
}
return rs, nil, err
}
// isHighAvailabilityEnabled determines if high availability mode should

View File

@@ -59,13 +59,12 @@ type service struct {
subservicesWatcher *services.FailureWatcher
hasSubservices bool
backend resource.StorageBackend
searchClient resourcepb.ResourceIndexClient
cfg *setting.Cfg
features featuremgmt.FeatureToggles
db infraDB.DB
stopCh chan struct{}
stoppedCh chan error
backend resource.StorageBackend
cfg *setting.Cfg
features featuremgmt.FeatureToggles
db infraDB.DB
stopCh chan struct{}
stoppedCh chan error
handler grpcserver.Provider
@@ -100,7 +99,6 @@ func ProvideUnifiedStorageGrpcService(
memberlistKVConfig kv.Config,
httpServerRouter *mux.Router,
backend resource.StorageBackend,
searchClient resourcepb.ResourceIndexClient,
) (UnifiedStorageGrpcService, error) {
var err error
tracer := otel.Tracer("unified-storage")
@@ -114,7 +112,6 @@ func ProvideUnifiedStorageGrpcService(
s := &service{
backend: backend,
searchClient: searchClient,
cfg: cfg,
features: features,
stopCh: make(chan struct{}),
@@ -275,7 +272,6 @@ func (s *service) starting(ctx context.Context) error {
Tracer: s.tracing,
Reg: s.reg,
AccessClient: authzClient,
SearchClient: s.searchClient,
SearchOptions: searchOptions,
StorageMetrics: s.storageMetrics,
IndexMetrics: s.indexMetrics,
@@ -295,7 +291,7 @@ func (s *service) starting(ctx context.Context) error {
serverOptions.OverridesService = overridesSvc
}
server, err := NewResourceServer(serverOptions)
server, searchServer, err := NewResourceServer(serverOptions)
if err != nil {
return err
}
@@ -312,8 +308,8 @@ func (s *service) starting(ctx context.Context) error {
srv := s.handler.GetServer()
resourcepb.RegisterResourceStoreServer(srv, server)
resourcepb.RegisterBulkStoreServer(srv, server)
resourcepb.RegisterResourceIndexServer(srv, server)
resourcepb.RegisterManagedObjectIndexServer(srv, server)
resourcepb.RegisterResourceIndexServer(srv, searchServer)
resourcepb.RegisterManagedObjectIndexServer(srv, searchServer)
resourcepb.RegisterBlobStoreServer(srv, server)
resourcepb.RegisterDiagnosticsServer(srv, server)
resourcepb.RegisterQuotasServer(srv, server)