Compare commits
1 Commits
selectable
...
selectable
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b2d14cc42b |
@@ -13,6 +13,9 @@ import (
|
|||||||
"github.com/grafana/dskit/kv"
|
"github.com/grafana/dskit/kv"
|
||||||
"github.com/grafana/dskit/ring"
|
"github.com/grafana/dskit/ring"
|
||||||
ringclient "github.com/grafana/dskit/ring/client"
|
ringclient "github.com/grafana/dskit/ring/client"
|
||||||
|
"github.com/grafana/grafana/pkg/services/apiserver/options"
|
||||||
|
"github.com/grafana/grafana/pkg/storage/unified"
|
||||||
|
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/urfave/cli/v2"
|
"github.com/urfave/cli/v2"
|
||||||
|
|
||||||
@@ -76,6 +79,22 @@ func newModuleServer(opts Options,
|
|||||||
) (*ModuleServer, error) {
|
) (*ModuleServer, error) {
|
||||||
rootCtx, shutdownFn := context.WithCancel(context.Background())
|
rootCtx, shutdownFn := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
// TODO should inject this with Wire
|
||||||
|
apiserverCfg := cfg.SectionWithEnvOverrides("grafana-apiserver")
|
||||||
|
searchServerAddress := apiserverCfg.Key("search_server_address").MustString("")
|
||||||
|
var searchClient resourcepb.ResourceIndexClient
|
||||||
|
var err error
|
||||||
|
if searchServerAddress != "" {
|
||||||
|
storageOptions := options.StorageOptions{
|
||||||
|
SearchServerAddress: searchServerAddress,
|
||||||
|
}
|
||||||
|
searchClient, err = unified.NewSearchClient(storageOptions, features)
|
||||||
|
if err != nil {
|
||||||
|
shutdownFn()
|
||||||
|
return nil, fmt.Errorf("failed to create search client: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
s := &ModuleServer{
|
s := &ModuleServer{
|
||||||
opts: opts,
|
opts: opts,
|
||||||
apiOpts: apiOpts,
|
apiOpts: apiOpts,
|
||||||
@@ -96,6 +115,7 @@ func newModuleServer(opts Options,
|
|||||||
license: license,
|
license: license,
|
||||||
moduleRegisterer: moduleRegisterer,
|
moduleRegisterer: moduleRegisterer,
|
||||||
storageBackend: storageBackend,
|
storageBackend: storageBackend,
|
||||||
|
searchClient: searchClient,
|
||||||
hooksService: hooksService,
|
hooksService: hooksService,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -119,6 +139,7 @@ type ModuleServer struct {
|
|||||||
isInitialized bool
|
isInitialized bool
|
||||||
mtx sync.Mutex
|
mtx sync.Mutex
|
||||||
storageBackend resource.StorageBackend
|
storageBackend resource.StorageBackend
|
||||||
|
searchClient resourcepb.ResourceIndexClient
|
||||||
storageMetrics *resource.StorageMetrics
|
storageMetrics *resource.StorageMetrics
|
||||||
indexMetrics *resource.BleveIndexMetrics
|
indexMetrics *resource.BleveIndexMetrics
|
||||||
license licensing.Licensing
|
license licensing.Licensing
|
||||||
@@ -202,7 +223,7 @@ func (s *ModuleServer) Run() error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return sql.ProvideUnifiedStorageGrpcService(s.cfg, s.features, nil, s.log, s.registerer, docBuilders, s.storageMetrics, s.indexMetrics, s.searchServerRing, s.MemberlistKVConfig, s.httpServerRouter, s.storageBackend)
|
return sql.ProvideUnifiedStorageGrpcService(s.cfg, s.features, nil, s.log, s.registerer, docBuilders, s.storageMetrics, s.indexMetrics, s.searchServerRing, s.MemberlistKVConfig, s.httpServerRouter, s.storageBackend, s.searchClient)
|
||||||
})
|
})
|
||||||
|
|
||||||
m.RegisterModule(modules.ZanzanaServer, func() (services.Service, error) {
|
m.RegisterModule(modules.ZanzanaServer, func() (services.Service, error) {
|
||||||
|
|||||||
@@ -7,6 +7,8 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
badger "github.com/dgraph-io/badger/v4"
|
badger "github.com/dgraph-io/badger/v4"
|
||||||
|
"github.com/fullstorydev/grpchan"
|
||||||
|
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||||
otgrpc "github.com/opentracing-contrib/go-grpc"
|
otgrpc "github.com/opentracing-contrib/go-grpc"
|
||||||
"github.com/opentracing/opentracing-go"
|
"github.com/opentracing/opentracing-go"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
@@ -31,6 +33,7 @@ import (
|
|||||||
"github.com/grafana/grafana/pkg/storage/legacysql"
|
"github.com/grafana/grafana/pkg/storage/legacysql"
|
||||||
"github.com/grafana/grafana/pkg/storage/unified/federated"
|
"github.com/grafana/grafana/pkg/storage/unified/federated"
|
||||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||||
|
grpcUtils "github.com/grafana/grafana/pkg/storage/unified/resource/grpc"
|
||||||
"github.com/grafana/grafana/pkg/storage/unified/search"
|
"github.com/grafana/grafana/pkg/storage/unified/search"
|
||||||
"github.com/grafana/grafana/pkg/storage/unified/sql"
|
"github.com/grafana/grafana/pkg/storage/unified/sql"
|
||||||
"github.com/grafana/grafana/pkg/util/scheduler"
|
"github.com/grafana/grafana/pkg/util/scheduler"
|
||||||
@@ -91,6 +94,27 @@ func ProvideUnifiedStorageClient(opts *Options,
|
|||||||
return client, err
|
return client, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO use wire to provide to module server
|
||||||
|
func NewSearchClient(opts options.StorageOptions, features featuremgmt.FeatureToggles) (resourcepb.ResourceIndexClient, error) {
|
||||||
|
if opts.SearchServerAddress == "" {
|
||||||
|
return nil, fmt.Errorf("expecting address for search server")
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
conn grpc.ClientConnInterface
|
||||||
|
err error
|
||||||
|
metrics = newClientMetrics(prometheus.NewRegistry())
|
||||||
|
)
|
||||||
|
|
||||||
|
conn, err = newGrpcConn(opts.SearchServerAddress, metrics, features, opts.GrpcClientKeepaliveTime)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
cc := grpchan.InterceptClientConn(conn, grpcUtils.UnaryClientInterceptor, grpcUtils.StreamClientInterceptor)
|
||||||
|
return resourcepb.NewResourceIndexClient(cc), nil
|
||||||
|
}
|
||||||
|
|
||||||
func newClient(opts options.StorageOptions,
|
func newClient(opts options.StorageOptions,
|
||||||
cfg *setting.Cfg,
|
cfg *setting.Cfg,
|
||||||
features featuremgmt.FeatureToggles,
|
features featuremgmt.FeatureToggles,
|
||||||
|
|||||||
@@ -222,6 +222,9 @@ type ResourceServerOptions struct {
|
|||||||
// Search options
|
// Search options
|
||||||
Search SearchOptions
|
Search SearchOptions
|
||||||
|
|
||||||
|
// to be used by storage
|
||||||
|
SearchClient resourcepb.ResourceIndexClient
|
||||||
|
|
||||||
// Quota service
|
// Quota service
|
||||||
OverridesService *OverridesService
|
OverridesService *OverridesService
|
||||||
|
|
||||||
@@ -347,6 +350,7 @@ func NewResourceServer(opts ResourceServerOptions) (*server, error) {
|
|||||||
queue: opts.QOSQueue,
|
queue: opts.QOSQueue,
|
||||||
queueConfig: opts.QOSConfig,
|
queueConfig: opts.QOSConfig,
|
||||||
overridesService: opts.OverridesService,
|
overridesService: opts.OverridesService,
|
||||||
|
searchClient: opts.SearchClient,
|
||||||
|
|
||||||
artificialSuccessfulWriteDelay: opts.Search.IndexMinUpdateInterval,
|
artificialSuccessfulWriteDelay: opts.Search.IndexMinUpdateInterval,
|
||||||
}
|
}
|
||||||
@@ -386,6 +390,9 @@ type server struct {
|
|||||||
indexMetrics *BleveIndexMetrics
|
indexMetrics *BleveIndexMetrics
|
||||||
overridesService *OverridesService
|
overridesService *OverridesService
|
||||||
|
|
||||||
|
// only to be used with storage server for field selector search
|
||||||
|
searchClient resourcepb.ResourceIndexClient
|
||||||
|
|
||||||
// Background watch task -- this has permissions for everything
|
// Background watch task -- this has permissions for everything
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
@@ -1059,7 +1066,8 @@ func (s *server) List(ctx context.Context, req *resourcepb.ListRequest) (*resour
|
|||||||
// TODO: What to do about RV and version_match fields?
|
// TODO: What to do about RV and version_match fields?
|
||||||
// If we get here, we're doing list with selectable fields. Let's do search instead, since
|
// If we get here, we're doing list with selectable fields. Let's do search instead, since
|
||||||
// we index all selectable fields, and fetch resulting documents one by one.
|
// we index all selectable fields, and fetch resulting documents one by one.
|
||||||
if s.search != nil && req.Source == resourcepb.ListRequest_STORE && (len(req.Options.Fields) > 0) {
|
|
||||||
|
if (s.search != nil || s.searchClient != nil) && req.Source == resourcepb.ListRequest_STORE && (len(req.Options.Fields) > 0) {
|
||||||
if req.Options.Key.Namespace == "" {
|
if req.Options.Key.Namespace == "" {
|
||||||
return &resourcepb.ListResponse{
|
return &resourcepb.ListResponse{
|
||||||
Error: NewBadRequestError("namespace must be specified for list with filter"),
|
Error: NewBadRequestError("namespace must be specified for list with filter"),
|
||||||
@@ -1075,7 +1083,13 @@ func (s *server) List(ctx context.Context, req *resourcepb.ListRequest) (*resour
|
|||||||
// Permission: 0, // Not needed, default is List
|
// Permission: 0, // Not needed, default is List
|
||||||
}
|
}
|
||||||
|
|
||||||
searchResp, err := s.search.Search(ctx, srq)
|
var searchResp *resourcepb.ResourceSearchResponse
|
||||||
|
var err error
|
||||||
|
if s.searchClient != nil {
|
||||||
|
searchResp, err = s.searchClient.Search(ctx, srq)
|
||||||
|
} else {
|
||||||
|
searchResp, err = s.search.Search(ctx, srq)
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
|
||||||
@@ -37,6 +38,7 @@ type ServerOptions struct {
|
|||||||
Tracer trace.Tracer
|
Tracer trace.Tracer
|
||||||
Reg prometheus.Registerer
|
Reg prometheus.Registerer
|
||||||
AccessClient types.AccessClient
|
AccessClient types.AccessClient
|
||||||
|
SearchClient resourcepb.ResourceIndexClient
|
||||||
SearchOptions resource.SearchOptions
|
SearchOptions resource.SearchOptions
|
||||||
StorageMetrics *resource.StorageMetrics
|
StorageMetrics *resource.StorageMetrics
|
||||||
IndexMetrics *resource.BleveIndexMetrics
|
IndexMetrics *resource.BleveIndexMetrics
|
||||||
@@ -115,6 +117,10 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
|||||||
serverOptions.Lifecycle = backend
|
serverOptions.Lifecycle = backend
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// use the search client when search isnt initialized. Dont need both.
|
||||||
|
if opts.SearchOptions.Resources == nil {
|
||||||
|
serverOptions.SearchClient = opts.SearchClient
|
||||||
|
}
|
||||||
serverOptions.Search = opts.SearchOptions
|
serverOptions.Search = opts.SearchOptions
|
||||||
serverOptions.IndexMetrics = opts.IndexMetrics
|
serverOptions.IndexMetrics = opts.IndexMetrics
|
||||||
serverOptions.QOSQueue = opts.QOSQueue
|
serverOptions.QOSQueue = opts.QOSQueue
|
||||||
|
|||||||
@@ -59,12 +59,13 @@ type service struct {
|
|||||||
subservicesWatcher *services.FailureWatcher
|
subservicesWatcher *services.FailureWatcher
|
||||||
hasSubservices bool
|
hasSubservices bool
|
||||||
|
|
||||||
backend resource.StorageBackend
|
backend resource.StorageBackend
|
||||||
cfg *setting.Cfg
|
searchClient resourcepb.ResourceIndexClient
|
||||||
features featuremgmt.FeatureToggles
|
cfg *setting.Cfg
|
||||||
db infraDB.DB
|
features featuremgmt.FeatureToggles
|
||||||
stopCh chan struct{}
|
db infraDB.DB
|
||||||
stoppedCh chan error
|
stopCh chan struct{}
|
||||||
|
stoppedCh chan error
|
||||||
|
|
||||||
handler grpcserver.Provider
|
handler grpcserver.Provider
|
||||||
|
|
||||||
@@ -99,6 +100,7 @@ func ProvideUnifiedStorageGrpcService(
|
|||||||
memberlistKVConfig kv.Config,
|
memberlistKVConfig kv.Config,
|
||||||
httpServerRouter *mux.Router,
|
httpServerRouter *mux.Router,
|
||||||
backend resource.StorageBackend,
|
backend resource.StorageBackend,
|
||||||
|
searchClient resourcepb.ResourceIndexClient,
|
||||||
) (UnifiedStorageGrpcService, error) {
|
) (UnifiedStorageGrpcService, error) {
|
||||||
var err error
|
var err error
|
||||||
tracer := otel.Tracer("unified-storage")
|
tracer := otel.Tracer("unified-storage")
|
||||||
@@ -112,6 +114,7 @@ func ProvideUnifiedStorageGrpcService(
|
|||||||
|
|
||||||
s := &service{
|
s := &service{
|
||||||
backend: backend,
|
backend: backend,
|
||||||
|
searchClient: searchClient,
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
features: features,
|
features: features,
|
||||||
stopCh: make(chan struct{}),
|
stopCh: make(chan struct{}),
|
||||||
@@ -272,6 +275,7 @@ func (s *service) starting(ctx context.Context) error {
|
|||||||
Tracer: s.tracing,
|
Tracer: s.tracing,
|
||||||
Reg: s.reg,
|
Reg: s.reg,
|
||||||
AccessClient: authzClient,
|
AccessClient: authzClient,
|
||||||
|
SearchClient: s.searchClient,
|
||||||
SearchOptions: searchOptions,
|
SearchOptions: searchOptions,
|
||||||
StorageMetrics: s.storageMetrics,
|
StorageMetrics: s.storageMetrics,
|
||||||
IndexMetrics: s.indexMetrics,
|
IndexMetrics: s.indexMetrics,
|
||||||
|
|||||||
Reference in New Issue
Block a user