Compare commits
1 Commits
selectable
...
selectable
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b2d14cc42b |
@@ -13,6 +13,9 @@ import (
|
||||
"github.com/grafana/dskit/kv"
|
||||
"github.com/grafana/dskit/ring"
|
||||
ringclient "github.com/grafana/dskit/ring/client"
|
||||
"github.com/grafana/grafana/pkg/services/apiserver/options"
|
||||
"github.com/grafana/grafana/pkg/storage/unified"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/urfave/cli/v2"
|
||||
|
||||
@@ -76,6 +79,22 @@ func newModuleServer(opts Options,
|
||||
) (*ModuleServer, error) {
|
||||
rootCtx, shutdownFn := context.WithCancel(context.Background())
|
||||
|
||||
// TODO should inject this with Wire
|
||||
apiserverCfg := cfg.SectionWithEnvOverrides("grafana-apiserver")
|
||||
searchServerAddress := apiserverCfg.Key("search_server_address").MustString("")
|
||||
var searchClient resourcepb.ResourceIndexClient
|
||||
var err error
|
||||
if searchServerAddress != "" {
|
||||
storageOptions := options.StorageOptions{
|
||||
SearchServerAddress: searchServerAddress,
|
||||
}
|
||||
searchClient, err = unified.NewSearchClient(storageOptions, features)
|
||||
if err != nil {
|
||||
shutdownFn()
|
||||
return nil, fmt.Errorf("failed to create search client: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
s := &ModuleServer{
|
||||
opts: opts,
|
||||
apiOpts: apiOpts,
|
||||
@@ -96,6 +115,7 @@ func newModuleServer(opts Options,
|
||||
license: license,
|
||||
moduleRegisterer: moduleRegisterer,
|
||||
storageBackend: storageBackend,
|
||||
searchClient: searchClient,
|
||||
hooksService: hooksService,
|
||||
}
|
||||
|
||||
@@ -119,6 +139,7 @@ type ModuleServer struct {
|
||||
isInitialized bool
|
||||
mtx sync.Mutex
|
||||
storageBackend resource.StorageBackend
|
||||
searchClient resourcepb.ResourceIndexClient
|
||||
storageMetrics *resource.StorageMetrics
|
||||
indexMetrics *resource.BleveIndexMetrics
|
||||
license licensing.Licensing
|
||||
@@ -202,7 +223,7 @@ func (s *ModuleServer) Run() error {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return sql.ProvideUnifiedStorageGrpcService(s.cfg, s.features, nil, s.log, s.registerer, docBuilders, s.storageMetrics, s.indexMetrics, s.searchServerRing, s.MemberlistKVConfig, s.httpServerRouter, s.storageBackend)
|
||||
return sql.ProvideUnifiedStorageGrpcService(s.cfg, s.features, nil, s.log, s.registerer, docBuilders, s.storageMetrics, s.indexMetrics, s.searchServerRing, s.MemberlistKVConfig, s.httpServerRouter, s.storageBackend, s.searchClient)
|
||||
})
|
||||
|
||||
m.RegisterModule(modules.ZanzanaServer, func() (services.Service, error) {
|
||||
|
||||
@@ -7,6 +7,8 @@ import (
|
||||
"time"
|
||||
|
||||
badger "github.com/dgraph-io/badger/v4"
|
||||
"github.com/fullstorydev/grpchan"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
otgrpc "github.com/opentracing-contrib/go-grpc"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
@@ -31,6 +33,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/storage/legacysql"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/federated"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||
grpcUtils "github.com/grafana/grafana/pkg/storage/unified/resource/grpc"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/search"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/sql"
|
||||
"github.com/grafana/grafana/pkg/util/scheduler"
|
||||
@@ -91,6 +94,27 @@ func ProvideUnifiedStorageClient(opts *Options,
|
||||
return client, err
|
||||
}
|
||||
|
||||
// TODO use wire to provide to module server
|
||||
func NewSearchClient(opts options.StorageOptions, features featuremgmt.FeatureToggles) (resourcepb.ResourceIndexClient, error) {
|
||||
if opts.SearchServerAddress == "" {
|
||||
return nil, fmt.Errorf("expecting address for search server")
|
||||
}
|
||||
|
||||
var (
|
||||
conn grpc.ClientConnInterface
|
||||
err error
|
||||
metrics = newClientMetrics(prometheus.NewRegistry())
|
||||
)
|
||||
|
||||
conn, err = newGrpcConn(opts.SearchServerAddress, metrics, features, opts.GrpcClientKeepaliveTime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cc := grpchan.InterceptClientConn(conn, grpcUtils.UnaryClientInterceptor, grpcUtils.StreamClientInterceptor)
|
||||
return resourcepb.NewResourceIndexClient(cc), nil
|
||||
}
|
||||
|
||||
func newClient(opts options.StorageOptions,
|
||||
cfg *setting.Cfg,
|
||||
features featuremgmt.FeatureToggles,
|
||||
|
||||
@@ -222,6 +222,9 @@ type ResourceServerOptions struct {
|
||||
// Search options
|
||||
Search SearchOptions
|
||||
|
||||
// to be used by storage
|
||||
SearchClient resourcepb.ResourceIndexClient
|
||||
|
||||
// Quota service
|
||||
OverridesService *OverridesService
|
||||
|
||||
@@ -347,6 +350,7 @@ func NewResourceServer(opts ResourceServerOptions) (*server, error) {
|
||||
queue: opts.QOSQueue,
|
||||
queueConfig: opts.QOSConfig,
|
||||
overridesService: opts.OverridesService,
|
||||
searchClient: opts.SearchClient,
|
||||
|
||||
artificialSuccessfulWriteDelay: opts.Search.IndexMinUpdateInterval,
|
||||
}
|
||||
@@ -386,6 +390,9 @@ type server struct {
|
||||
indexMetrics *BleveIndexMetrics
|
||||
overridesService *OverridesService
|
||||
|
||||
// only to be used with storage server for field selector search
|
||||
searchClient resourcepb.ResourceIndexClient
|
||||
|
||||
// Background watch task -- this has permissions for everything
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
@@ -1059,7 +1066,8 @@ func (s *server) List(ctx context.Context, req *resourcepb.ListRequest) (*resour
|
||||
// TODO: What to do about RV and version_match fields?
|
||||
// If we get here, we're doing list with selectable fields. Let's do search instead, since
|
||||
// we index all selectable fields, and fetch resulting documents one by one.
|
||||
if s.search != nil && req.Source == resourcepb.ListRequest_STORE && (len(req.Options.Fields) > 0) {
|
||||
|
||||
if (s.search != nil || s.searchClient != nil) && req.Source == resourcepb.ListRequest_STORE && (len(req.Options.Fields) > 0) {
|
||||
if req.Options.Key.Namespace == "" {
|
||||
return &resourcepb.ListResponse{
|
||||
Error: NewBadRequestError("namespace must be specified for list with filter"),
|
||||
@@ -1075,7 +1083,13 @@ func (s *server) List(ctx context.Context, req *resourcepb.ListRequest) (*resour
|
||||
// Permission: 0, // Not needed, default is List
|
||||
}
|
||||
|
||||
searchResp, err := s.search.Search(ctx, srq)
|
||||
var searchResp *resourcepb.ResourceSearchResponse
|
||||
var err error
|
||||
if s.searchClient != nil {
|
||||
searchResp, err = s.searchClient.Search(ctx, srq)
|
||||
} else {
|
||||
searchResp, err = s.search.Search(ctx, srq)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
||||
@@ -37,6 +38,7 @@ type ServerOptions struct {
|
||||
Tracer trace.Tracer
|
||||
Reg prometheus.Registerer
|
||||
AccessClient types.AccessClient
|
||||
SearchClient resourcepb.ResourceIndexClient
|
||||
SearchOptions resource.SearchOptions
|
||||
StorageMetrics *resource.StorageMetrics
|
||||
IndexMetrics *resource.BleveIndexMetrics
|
||||
@@ -115,6 +117,10 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
||||
serverOptions.Lifecycle = backend
|
||||
}
|
||||
|
||||
// use the search client when search isnt initialized. Dont need both.
|
||||
if opts.SearchOptions.Resources == nil {
|
||||
serverOptions.SearchClient = opts.SearchClient
|
||||
}
|
||||
serverOptions.Search = opts.SearchOptions
|
||||
serverOptions.IndexMetrics = opts.IndexMetrics
|
||||
serverOptions.QOSQueue = opts.QOSQueue
|
||||
|
||||
@@ -59,12 +59,13 @@ type service struct {
|
||||
subservicesWatcher *services.FailureWatcher
|
||||
hasSubservices bool
|
||||
|
||||
backend resource.StorageBackend
|
||||
cfg *setting.Cfg
|
||||
features featuremgmt.FeatureToggles
|
||||
db infraDB.DB
|
||||
stopCh chan struct{}
|
||||
stoppedCh chan error
|
||||
backend resource.StorageBackend
|
||||
searchClient resourcepb.ResourceIndexClient
|
||||
cfg *setting.Cfg
|
||||
features featuremgmt.FeatureToggles
|
||||
db infraDB.DB
|
||||
stopCh chan struct{}
|
||||
stoppedCh chan error
|
||||
|
||||
handler grpcserver.Provider
|
||||
|
||||
@@ -99,6 +100,7 @@ func ProvideUnifiedStorageGrpcService(
|
||||
memberlistKVConfig kv.Config,
|
||||
httpServerRouter *mux.Router,
|
||||
backend resource.StorageBackend,
|
||||
searchClient resourcepb.ResourceIndexClient,
|
||||
) (UnifiedStorageGrpcService, error) {
|
||||
var err error
|
||||
tracer := otel.Tracer("unified-storage")
|
||||
@@ -112,6 +114,7 @@ func ProvideUnifiedStorageGrpcService(
|
||||
|
||||
s := &service{
|
||||
backend: backend,
|
||||
searchClient: searchClient,
|
||||
cfg: cfg,
|
||||
features: features,
|
||||
stopCh: make(chan struct{}),
|
||||
@@ -272,6 +275,7 @@ func (s *service) starting(ctx context.Context) error {
|
||||
Tracer: s.tracing,
|
||||
Reg: s.reg,
|
||||
AccessClient: authzClient,
|
||||
SearchClient: s.searchClient,
|
||||
SearchOptions: searchOptions,
|
||||
StorageMetrics: s.storageMetrics,
|
||||
IndexMetrics: s.indexMetrics,
|
||||
|
||||
Reference in New Issue
Block a user