Compare commits

...

1 Commits

Author SHA1 Message Date
Owen Smallwood
b2d14cc42b Gives resource server a search client so storage-api can call search 2026-01-14 19:51:49 -06:00
5 changed files with 78 additions and 9 deletions

View File

@@ -13,6 +13,9 @@ import (
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/ring"
ringclient "github.com/grafana/dskit/ring/client"
"github.com/grafana/grafana/pkg/services/apiserver/options"
"github.com/grafana/grafana/pkg/storage/unified"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/prometheus/client_golang/prometheus"
"github.com/urfave/cli/v2"
@@ -76,6 +79,22 @@ func newModuleServer(opts Options,
) (*ModuleServer, error) {
rootCtx, shutdownFn := context.WithCancel(context.Background())
// TODO should inject this with Wire
apiserverCfg := cfg.SectionWithEnvOverrides("grafana-apiserver")
searchServerAddress := apiserverCfg.Key("search_server_address").MustString("")
var searchClient resourcepb.ResourceIndexClient
var err error
if searchServerAddress != "" {
storageOptions := options.StorageOptions{
SearchServerAddress: searchServerAddress,
}
searchClient, err = unified.NewSearchClient(storageOptions, features)
if err != nil {
shutdownFn()
return nil, fmt.Errorf("failed to create search client: %w", err)
}
}
s := &ModuleServer{
opts: opts,
apiOpts: apiOpts,
@@ -96,6 +115,7 @@ func newModuleServer(opts Options,
license: license,
moduleRegisterer: moduleRegisterer,
storageBackend: storageBackend,
searchClient: searchClient,
hooksService: hooksService,
}
@@ -119,6 +139,7 @@ type ModuleServer struct {
isInitialized bool
mtx sync.Mutex
storageBackend resource.StorageBackend
searchClient resourcepb.ResourceIndexClient
storageMetrics *resource.StorageMetrics
indexMetrics *resource.BleveIndexMetrics
license licensing.Licensing
@@ -202,7 +223,7 @@ func (s *ModuleServer) Run() error {
if err != nil {
return nil, err
}
return sql.ProvideUnifiedStorageGrpcService(s.cfg, s.features, nil, s.log, s.registerer, docBuilders, s.storageMetrics, s.indexMetrics, s.searchServerRing, s.MemberlistKVConfig, s.httpServerRouter, s.storageBackend)
return sql.ProvideUnifiedStorageGrpcService(s.cfg, s.features, nil, s.log, s.registerer, docBuilders, s.storageMetrics, s.indexMetrics, s.searchServerRing, s.MemberlistKVConfig, s.httpServerRouter, s.storageBackend, s.searchClient)
})
m.RegisterModule(modules.ZanzanaServer, func() (services.Service, error) {

View File

@@ -7,6 +7,8 @@ import (
"time"
badger "github.com/dgraph-io/badger/v4"
"github.com/fullstorydev/grpchan"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
@@ -31,6 +33,7 @@ import (
"github.com/grafana/grafana/pkg/storage/legacysql"
"github.com/grafana/grafana/pkg/storage/unified/federated"
"github.com/grafana/grafana/pkg/storage/unified/resource"
grpcUtils "github.com/grafana/grafana/pkg/storage/unified/resource/grpc"
"github.com/grafana/grafana/pkg/storage/unified/search"
"github.com/grafana/grafana/pkg/storage/unified/sql"
"github.com/grafana/grafana/pkg/util/scheduler"
@@ -91,6 +94,27 @@ func ProvideUnifiedStorageClient(opts *Options,
return client, err
}
// TODO use wire to provide to module server
func NewSearchClient(opts options.StorageOptions, features featuremgmt.FeatureToggles) (resourcepb.ResourceIndexClient, error) {
if opts.SearchServerAddress == "" {
return nil, fmt.Errorf("expecting address for search server")
}
var (
conn grpc.ClientConnInterface
err error
metrics = newClientMetrics(prometheus.NewRegistry())
)
conn, err = newGrpcConn(opts.SearchServerAddress, metrics, features, opts.GrpcClientKeepaliveTime)
if err != nil {
return nil, err
}
cc := grpchan.InterceptClientConn(conn, grpcUtils.UnaryClientInterceptor, grpcUtils.StreamClientInterceptor)
return resourcepb.NewResourceIndexClient(cc), nil
}
func newClient(opts options.StorageOptions,
cfg *setting.Cfg,
features featuremgmt.FeatureToggles,

View File

@@ -222,6 +222,9 @@ type ResourceServerOptions struct {
// Search options
Search SearchOptions
// to be used by storage
SearchClient resourcepb.ResourceIndexClient
// Quota service
OverridesService *OverridesService
@@ -347,6 +350,7 @@ func NewResourceServer(opts ResourceServerOptions) (*server, error) {
queue: opts.QOSQueue,
queueConfig: opts.QOSConfig,
overridesService: opts.OverridesService,
searchClient: opts.SearchClient,
artificialSuccessfulWriteDelay: opts.Search.IndexMinUpdateInterval,
}
@@ -386,6 +390,9 @@ type server struct {
indexMetrics *BleveIndexMetrics
overridesService *OverridesService
// only to be used with storage server for field selector search
searchClient resourcepb.ResourceIndexClient
// Background watch task -- this has permissions for everything
ctx context.Context
cancel context.CancelFunc
@@ -1059,7 +1066,8 @@ func (s *server) List(ctx context.Context, req *resourcepb.ListRequest) (*resour
// TODO: What to do about RV and version_match fields?
// If we get here, we're doing list with selectable fields. Let's do search instead, since
// we index all selectable fields, and fetch resulting documents one by one.
if s.search != nil && req.Source == resourcepb.ListRequest_STORE && (len(req.Options.Fields) > 0) {
if (s.search != nil || s.searchClient != nil) && req.Source == resourcepb.ListRequest_STORE && (len(req.Options.Fields) > 0) {
if req.Options.Key.Namespace == "" {
return &resourcepb.ListResponse{
Error: NewBadRequestError("namespace must be specified for list with filter"),
@@ -1075,7 +1083,13 @@ func (s *server) List(ctx context.Context, req *resourcepb.ListRequest) (*resour
// Permission: 0, // Not needed, default is List
}
searchResp, err := s.search.Search(ctx, srq)
var searchResp *resourcepb.ResourceSearchResponse
var err error
if s.searchClient != nil {
searchResp, err = s.searchClient.Search(ctx, srq)
} else {
searchResp, err = s.search.Search(ctx, srq)
}
if err != nil {
return nil, err
}

View File

@@ -6,6 +6,7 @@ import (
"os"
"strings"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/trace"
@@ -37,6 +38,7 @@ type ServerOptions struct {
Tracer trace.Tracer
Reg prometheus.Registerer
AccessClient types.AccessClient
SearchClient resourcepb.ResourceIndexClient
SearchOptions resource.SearchOptions
StorageMetrics *resource.StorageMetrics
IndexMetrics *resource.BleveIndexMetrics
@@ -115,6 +117,10 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
serverOptions.Lifecycle = backend
}
// use the search client when search isnt initialized. Dont need both.
if opts.SearchOptions.Resources == nil {
serverOptions.SearchClient = opts.SearchClient
}
serverOptions.Search = opts.SearchOptions
serverOptions.IndexMetrics = opts.IndexMetrics
serverOptions.QOSQueue = opts.QOSQueue

View File

@@ -59,12 +59,13 @@ type service struct {
subservicesWatcher *services.FailureWatcher
hasSubservices bool
backend resource.StorageBackend
cfg *setting.Cfg
features featuremgmt.FeatureToggles
db infraDB.DB
stopCh chan struct{}
stoppedCh chan error
backend resource.StorageBackend
searchClient resourcepb.ResourceIndexClient
cfg *setting.Cfg
features featuremgmt.FeatureToggles
db infraDB.DB
stopCh chan struct{}
stoppedCh chan error
handler grpcserver.Provider
@@ -99,6 +100,7 @@ func ProvideUnifiedStorageGrpcService(
memberlistKVConfig kv.Config,
httpServerRouter *mux.Router,
backend resource.StorageBackend,
searchClient resourcepb.ResourceIndexClient,
) (UnifiedStorageGrpcService, error) {
var err error
tracer := otel.Tracer("unified-storage")
@@ -112,6 +114,7 @@ func ProvideUnifiedStorageGrpcService(
s := &service{
backend: backend,
searchClient: searchClient,
cfg: cfg,
features: features,
stopCh: make(chan struct{}),
@@ -272,6 +275,7 @@ func (s *service) starting(ctx context.Context) error {
Tracer: s.tracing,
Reg: s.reg,
AccessClient: authzClient,
SearchClient: s.searchClient,
SearchOptions: searchOptions,
StorageMetrics: s.storageMetrics,
IndexMetrics: s.indexMetrics,