Compare commits

...

1 Commits

Author SHA1 Message Date
Owen Smallwood
b2d14cc42b Gives resource server a search client so storage-api can call search 2026-01-14 19:51:49 -06:00
5 changed files with 78 additions and 9 deletions

View File

@@ -13,6 +13,9 @@ import (
"github.com/grafana/dskit/kv" "github.com/grafana/dskit/kv"
"github.com/grafana/dskit/ring" "github.com/grafana/dskit/ring"
ringclient "github.com/grafana/dskit/ring/client" ringclient "github.com/grafana/dskit/ring/client"
"github.com/grafana/grafana/pkg/services/apiserver/options"
"github.com/grafana/grafana/pkg/storage/unified"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
@@ -76,6 +79,22 @@ func newModuleServer(opts Options,
) (*ModuleServer, error) { ) (*ModuleServer, error) {
rootCtx, shutdownFn := context.WithCancel(context.Background()) rootCtx, shutdownFn := context.WithCancel(context.Background())
// TODO should inject this with Wire
apiserverCfg := cfg.SectionWithEnvOverrides("grafana-apiserver")
searchServerAddress := apiserverCfg.Key("search_server_address").MustString("")
var searchClient resourcepb.ResourceIndexClient
var err error
if searchServerAddress != "" {
storageOptions := options.StorageOptions{
SearchServerAddress: searchServerAddress,
}
searchClient, err = unified.NewSearchClient(storageOptions, features)
if err != nil {
shutdownFn()
return nil, fmt.Errorf("failed to create search client: %w", err)
}
}
s := &ModuleServer{ s := &ModuleServer{
opts: opts, opts: opts,
apiOpts: apiOpts, apiOpts: apiOpts,
@@ -96,6 +115,7 @@ func newModuleServer(opts Options,
license: license, license: license,
moduleRegisterer: moduleRegisterer, moduleRegisterer: moduleRegisterer,
storageBackend: storageBackend, storageBackend: storageBackend,
searchClient: searchClient,
hooksService: hooksService, hooksService: hooksService,
} }
@@ -119,6 +139,7 @@ type ModuleServer struct {
isInitialized bool isInitialized bool
mtx sync.Mutex mtx sync.Mutex
storageBackend resource.StorageBackend storageBackend resource.StorageBackend
searchClient resourcepb.ResourceIndexClient
storageMetrics *resource.StorageMetrics storageMetrics *resource.StorageMetrics
indexMetrics *resource.BleveIndexMetrics indexMetrics *resource.BleveIndexMetrics
license licensing.Licensing license licensing.Licensing
@@ -202,7 +223,7 @@ func (s *ModuleServer) Run() error {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return sql.ProvideUnifiedStorageGrpcService(s.cfg, s.features, nil, s.log, s.registerer, docBuilders, s.storageMetrics, s.indexMetrics, s.searchServerRing, s.MemberlistKVConfig, s.httpServerRouter, s.storageBackend) return sql.ProvideUnifiedStorageGrpcService(s.cfg, s.features, nil, s.log, s.registerer, docBuilders, s.storageMetrics, s.indexMetrics, s.searchServerRing, s.MemberlistKVConfig, s.httpServerRouter, s.storageBackend, s.searchClient)
}) })
m.RegisterModule(modules.ZanzanaServer, func() (services.Service, error) { m.RegisterModule(modules.ZanzanaServer, func() (services.Service, error) {

View File

@@ -7,6 +7,8 @@ import (
"time" "time"
badger "github.com/dgraph-io/badger/v4" badger "github.com/dgraph-io/badger/v4"
"github.com/fullstorydev/grpchan"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
otgrpc "github.com/opentracing-contrib/go-grpc" otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@@ -31,6 +33,7 @@ import (
"github.com/grafana/grafana/pkg/storage/legacysql" "github.com/grafana/grafana/pkg/storage/legacysql"
"github.com/grafana/grafana/pkg/storage/unified/federated" "github.com/grafana/grafana/pkg/storage/unified/federated"
"github.com/grafana/grafana/pkg/storage/unified/resource" "github.com/grafana/grafana/pkg/storage/unified/resource"
grpcUtils "github.com/grafana/grafana/pkg/storage/unified/resource/grpc"
"github.com/grafana/grafana/pkg/storage/unified/search" "github.com/grafana/grafana/pkg/storage/unified/search"
"github.com/grafana/grafana/pkg/storage/unified/sql" "github.com/grafana/grafana/pkg/storage/unified/sql"
"github.com/grafana/grafana/pkg/util/scheduler" "github.com/grafana/grafana/pkg/util/scheduler"
@@ -91,6 +94,27 @@ func ProvideUnifiedStorageClient(opts *Options,
return client, err return client, err
} }
// TODO use wire to provide to module server
func NewSearchClient(opts options.StorageOptions, features featuremgmt.FeatureToggles) (resourcepb.ResourceIndexClient, error) {
if opts.SearchServerAddress == "" {
return nil, fmt.Errorf("expecting address for search server")
}
var (
conn grpc.ClientConnInterface
err error
metrics = newClientMetrics(prometheus.NewRegistry())
)
conn, err = newGrpcConn(opts.SearchServerAddress, metrics, features, opts.GrpcClientKeepaliveTime)
if err != nil {
return nil, err
}
cc := grpchan.InterceptClientConn(conn, grpcUtils.UnaryClientInterceptor, grpcUtils.StreamClientInterceptor)
return resourcepb.NewResourceIndexClient(cc), nil
}
func newClient(opts options.StorageOptions, func newClient(opts options.StorageOptions,
cfg *setting.Cfg, cfg *setting.Cfg,
features featuremgmt.FeatureToggles, features featuremgmt.FeatureToggles,

View File

@@ -222,6 +222,9 @@ type ResourceServerOptions struct {
// Search options // Search options
Search SearchOptions Search SearchOptions
// to be used by storage
SearchClient resourcepb.ResourceIndexClient
// Quota service // Quota service
OverridesService *OverridesService OverridesService *OverridesService
@@ -347,6 +350,7 @@ func NewResourceServer(opts ResourceServerOptions) (*server, error) {
queue: opts.QOSQueue, queue: opts.QOSQueue,
queueConfig: opts.QOSConfig, queueConfig: opts.QOSConfig,
overridesService: opts.OverridesService, overridesService: opts.OverridesService,
searchClient: opts.SearchClient,
artificialSuccessfulWriteDelay: opts.Search.IndexMinUpdateInterval, artificialSuccessfulWriteDelay: opts.Search.IndexMinUpdateInterval,
} }
@@ -386,6 +390,9 @@ type server struct {
indexMetrics *BleveIndexMetrics indexMetrics *BleveIndexMetrics
overridesService *OverridesService overridesService *OverridesService
// only to be used with storage server for field selector search
searchClient resourcepb.ResourceIndexClient
// Background watch task -- this has permissions for everything // Background watch task -- this has permissions for everything
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
@@ -1059,7 +1066,8 @@ func (s *server) List(ctx context.Context, req *resourcepb.ListRequest) (*resour
// TODO: What to do about RV and version_match fields? // TODO: What to do about RV and version_match fields?
// If we get here, we're doing list with selectable fields. Let's do search instead, since // If we get here, we're doing list with selectable fields. Let's do search instead, since
// we index all selectable fields, and fetch resulting documents one by one. // we index all selectable fields, and fetch resulting documents one by one.
if s.search != nil && req.Source == resourcepb.ListRequest_STORE && (len(req.Options.Fields) > 0) {
if (s.search != nil || s.searchClient != nil) && req.Source == resourcepb.ListRequest_STORE && (len(req.Options.Fields) > 0) {
if req.Options.Key.Namespace == "" { if req.Options.Key.Namespace == "" {
return &resourcepb.ListResponse{ return &resourcepb.ListResponse{
Error: NewBadRequestError("namespace must be specified for list with filter"), Error: NewBadRequestError("namespace must be specified for list with filter"),
@@ -1075,7 +1083,13 @@ func (s *server) List(ctx context.Context, req *resourcepb.ListRequest) (*resour
// Permission: 0, // Not needed, default is List // Permission: 0, // Not needed, default is List
} }
searchResp, err := s.search.Search(ctx, srq) var searchResp *resourcepb.ResourceSearchResponse
var err error
if s.searchClient != nil {
searchResp, err = s.searchClient.Search(ctx, srq)
} else {
searchResp, err = s.search.Search(ctx, srq)
}
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -6,6 +6,7 @@ import (
"os" "os"
"strings" "strings"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
@@ -37,6 +38,7 @@ type ServerOptions struct {
Tracer trace.Tracer Tracer trace.Tracer
Reg prometheus.Registerer Reg prometheus.Registerer
AccessClient types.AccessClient AccessClient types.AccessClient
SearchClient resourcepb.ResourceIndexClient
SearchOptions resource.SearchOptions SearchOptions resource.SearchOptions
StorageMetrics *resource.StorageMetrics StorageMetrics *resource.StorageMetrics
IndexMetrics *resource.BleveIndexMetrics IndexMetrics *resource.BleveIndexMetrics
@@ -115,6 +117,10 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
serverOptions.Lifecycle = backend serverOptions.Lifecycle = backend
} }
// use the search client when search isnt initialized. Dont need both.
if opts.SearchOptions.Resources == nil {
serverOptions.SearchClient = opts.SearchClient
}
serverOptions.Search = opts.SearchOptions serverOptions.Search = opts.SearchOptions
serverOptions.IndexMetrics = opts.IndexMetrics serverOptions.IndexMetrics = opts.IndexMetrics
serverOptions.QOSQueue = opts.QOSQueue serverOptions.QOSQueue = opts.QOSQueue

View File

@@ -59,12 +59,13 @@ type service struct {
subservicesWatcher *services.FailureWatcher subservicesWatcher *services.FailureWatcher
hasSubservices bool hasSubservices bool
backend resource.StorageBackend backend resource.StorageBackend
cfg *setting.Cfg searchClient resourcepb.ResourceIndexClient
features featuremgmt.FeatureToggles cfg *setting.Cfg
db infraDB.DB features featuremgmt.FeatureToggles
stopCh chan struct{} db infraDB.DB
stoppedCh chan error stopCh chan struct{}
stoppedCh chan error
handler grpcserver.Provider handler grpcserver.Provider
@@ -99,6 +100,7 @@ func ProvideUnifiedStorageGrpcService(
memberlistKVConfig kv.Config, memberlistKVConfig kv.Config,
httpServerRouter *mux.Router, httpServerRouter *mux.Router,
backend resource.StorageBackend, backend resource.StorageBackend,
searchClient resourcepb.ResourceIndexClient,
) (UnifiedStorageGrpcService, error) { ) (UnifiedStorageGrpcService, error) {
var err error var err error
tracer := otel.Tracer("unified-storage") tracer := otel.Tracer("unified-storage")
@@ -112,6 +114,7 @@ func ProvideUnifiedStorageGrpcService(
s := &service{ s := &service{
backend: backend, backend: backend,
searchClient: searchClient,
cfg: cfg, cfg: cfg,
features: features, features: features,
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
@@ -272,6 +275,7 @@ func (s *service) starting(ctx context.Context) error {
Tracer: s.tracing, Tracer: s.tracing,
Reg: s.reg, Reg: s.reg,
AccessClient: authzClient, AccessClient: authzClient,
SearchClient: s.searchClient,
SearchOptions: searchOptions, SearchOptions: searchOptions,
StorageMetrics: s.storageMetrics, StorageMetrics: s.storageMetrics,
IndexMetrics: s.indexMetrics, IndexMetrics: s.indexMetrics,