Compare commits
1 Commits
selectable
...
selectable
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c7293f6fb5 |
@@ -38,10 +38,10 @@ func (d *directResourceClient) GetBlob(ctx context.Context, in *resourcepb.GetBl
|
||||
return d.server.GetBlob(ctx, in)
|
||||
}
|
||||
|
||||
// GetStats implements ResourceClient.
|
||||
func (d *directResourceClient) GetStats(ctx context.Context, in *resourcepb.ResourceStatsRequest, opts ...grpc.CallOption) (*resourcepb.ResourceStatsResponse, error) {
|
||||
return d.server.GetStats(ctx, in)
|
||||
}
|
||||
//// GetStats implements ResourceClient.
|
||||
//func (d *directResourceClient) GetStats(ctx context.Context, in *resourcepb.ResourceStatsRequest, opts ...grpc.CallOption) (*resourcepb.ResourceStatsResponse, error) {
|
||||
// return d.server.GetStats(ctx, in)
|
||||
//}
|
||||
|
||||
// IsHealthy implements ResourceClient.
|
||||
func (d *directResourceClient) IsHealthy(ctx context.Context, in *resourcepb.HealthCheckRequest, opts ...grpc.CallOption) (*resourcepb.HealthCheckResponse, error) {
|
||||
@@ -53,13 +53,13 @@ func (d *directResourceClient) List(ctx context.Context, in *resourcepb.ListRequ
|
||||
return d.server.List(ctx, in)
|
||||
}
|
||||
|
||||
func (d *directResourceClient) ListManagedObjects(ctx context.Context, in *resourcepb.ListManagedObjectsRequest, opts ...grpc.CallOption) (*resourcepb.ListManagedObjectsResponse, error) {
|
||||
return d.server.ListManagedObjects(ctx, in)
|
||||
}
|
||||
|
||||
func (d *directResourceClient) CountManagedObjects(ctx context.Context, in *resourcepb.CountManagedObjectsRequest, opts ...grpc.CallOption) (*resourcepb.CountManagedObjectsResponse, error) {
|
||||
return d.server.CountManagedObjects(ctx, in)
|
||||
}
|
||||
//func (d *directResourceClient) ListManagedObjects(ctx context.Context, in *resourcepb.ListManagedObjectsRequest, opts ...grpc.CallOption) (*resourcepb.ListManagedObjectsResponse, error) {
|
||||
// return d.server.ListManagedObjects(ctx, in)
|
||||
//}
|
||||
//
|
||||
//func (d *directResourceClient) CountManagedObjects(ctx context.Context, in *resourcepb.CountManagedObjectsRequest, opts ...grpc.CallOption) (*resourcepb.CountManagedObjectsResponse, error) {
|
||||
// return d.server.CountManagedObjects(ctx, in)
|
||||
//}
|
||||
|
||||
// PutBlob implements ResourceClient.
|
||||
func (d *directResourceClient) PutBlob(ctx context.Context, in *resourcepb.PutBlobRequest, opts ...grpc.CallOption) (*resourcepb.PutBlobResponse, error) {
|
||||
@@ -72,9 +72,9 @@ func (d *directResourceClient) Read(ctx context.Context, in *resourcepb.ReadRequ
|
||||
}
|
||||
|
||||
// Search implements ResourceClient.
|
||||
func (d *directResourceClient) Search(ctx context.Context, in *resourcepb.ResourceSearchRequest, opts ...grpc.CallOption) (*resourcepb.ResourceSearchResponse, error) {
|
||||
return d.server.Search(ctx, in)
|
||||
}
|
||||
//func (d *directResourceClient) Search(ctx context.Context, in *resourcepb.ResourceSearchRequest, opts ...grpc.CallOption) (*resourcepb.ResourceSearchResponse, error) {
|
||||
// return d.server.Search(ctx, in)
|
||||
//}
|
||||
|
||||
// Update implements ResourceClient.
|
||||
func (d *directResourceClient) Update(ctx context.Context, in *resourcepb.UpdateRequest, opts ...grpc.CallOption) (*resourcepb.UpdateResponse, error) {
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
|
||||
"github.com/grafana/authlib/types"
|
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||||
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
|
||||
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
|
||||
|
||||
@@ -41,7 +41,6 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/folder"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/apistore"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
)
|
||||
|
||||
@@ -78,7 +77,7 @@ func RegisterAPIService(cfg *setting.Cfg,
|
||||
acService accesscontrol.Service,
|
||||
accessClient authlib.AccessClient,
|
||||
registerer prometheus.Registerer,
|
||||
unified resource.ResourceClient,
|
||||
unified resourcepb.ResourceIndexClient,
|
||||
zanzanaClient zanzana.Client,
|
||||
) *FolderAPIBuilder {
|
||||
builder := &FolderAPIBuilder{
|
||||
@@ -97,7 +96,7 @@ func RegisterAPIService(cfg *setting.Cfg,
|
||||
return builder
|
||||
}
|
||||
|
||||
func NewAPIService(ac authlib.AccessClient, searcher resource.ResourceClient, features featuremgmt.FeatureToggles, zanzanaClient zanzana.Client, resourcePermissionsSvc *dynamic.NamespaceableResourceInterface) *FolderAPIBuilder {
|
||||
func NewAPIService(ac authlib.AccessClient, searcher resourcepb.ResourceIndexClient, features featuremgmt.FeatureToggles, zanzanaClient zanzana.Client, resourcePermissionsSvc *dynamic.NamespaceableResourceInterface) *FolderAPIBuilder {
|
||||
return &FolderAPIBuilder{
|
||||
features: features,
|
||||
accessClient: ac,
|
||||
|
||||
@@ -615,7 +615,7 @@ func NewLocalStore(resourceInfo utils.ResourceInfo, scheme *runtime.Scheme, defa
|
||||
return nil, err
|
||||
}
|
||||
|
||||
client := resource.NewLocalResourceClient(server)
|
||||
client := resource.NewLocalResourceClient(server, nil)
|
||||
optsGetter := apistore.NewRESTOptionsGetterForClient(client, nil, defaultOpts.StorageConfig.Config, nil)
|
||||
|
||||
store, err := grafanaregistry.NewRegistryStore(scheme, resourceInfo, optsGetter)
|
||||
|
||||
@@ -13,9 +13,6 @@ import (
|
||||
"github.com/grafana/dskit/kv"
|
||||
"github.com/grafana/dskit/ring"
|
||||
ringclient "github.com/grafana/dskit/ring/client"
|
||||
"github.com/grafana/grafana/pkg/services/apiserver/options"
|
||||
"github.com/grafana/grafana/pkg/storage/unified"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/urfave/cli/v2"
|
||||
|
||||
@@ -79,22 +76,6 @@ func newModuleServer(opts Options,
|
||||
) (*ModuleServer, error) {
|
||||
rootCtx, shutdownFn := context.WithCancel(context.Background())
|
||||
|
||||
// TODO should inject this with Wire
|
||||
apiserverCfg := cfg.SectionWithEnvOverrides("grafana-apiserver")
|
||||
searchServerAddress := apiserverCfg.Key("search_server_address").MustString("")
|
||||
var searchClient resourcepb.ResourceIndexClient
|
||||
var err error
|
||||
if searchServerAddress != "" {
|
||||
storageOptions := options.StorageOptions{
|
||||
SearchServerAddress: searchServerAddress,
|
||||
}
|
||||
searchClient, err = unified.NewSearchClient(storageOptions, features)
|
||||
if err != nil {
|
||||
shutdownFn()
|
||||
return nil, fmt.Errorf("failed to create search client: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
s := &ModuleServer{
|
||||
opts: opts,
|
||||
apiOpts: apiOpts,
|
||||
@@ -115,7 +96,6 @@ func newModuleServer(opts Options,
|
||||
license: license,
|
||||
moduleRegisterer: moduleRegisterer,
|
||||
storageBackend: storageBackend,
|
||||
searchClient: searchClient,
|
||||
hooksService: hooksService,
|
||||
}
|
||||
|
||||
@@ -139,7 +119,6 @@ type ModuleServer struct {
|
||||
isInitialized bool
|
||||
mtx sync.Mutex
|
||||
storageBackend resource.StorageBackend
|
||||
searchClient resourcepb.ResourceIndexClient
|
||||
storageMetrics *resource.StorageMetrics
|
||||
indexMetrics *resource.BleveIndexMetrics
|
||||
license licensing.Licensing
|
||||
@@ -223,7 +202,7 @@ func (s *ModuleServer) Run() error {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return sql.ProvideUnifiedStorageGrpcService(s.cfg, s.features, nil, s.log, s.registerer, docBuilders, s.storageMetrics, s.indexMetrics, s.searchServerRing, s.MemberlistKVConfig, s.httpServerRouter, s.storageBackend, s.searchClient)
|
||||
return sql.ProvideUnifiedStorageGrpcService(s.cfg, s.features, nil, s.log, s.registerer, docBuilders, s.storageMetrics, s.indexMetrics, s.searchServerRing, s.MemberlistKVConfig, s.httpServerRouter, s.storageBackend)
|
||||
})
|
||||
|
||||
m.RegisterModule(modules.ZanzanaServer, func() (services.Service, error) {
|
||||
|
||||
@@ -79,7 +79,7 @@ func NewRESTOptionsGetterMemory(originalStorageConfig storagebackend.Config, sec
|
||||
}
|
||||
|
||||
return NewRESTOptionsGetterForClient(
|
||||
resource.NewLocalResourceClient(server),
|
||||
resource.NewLocalResourceClient(server, nil),
|
||||
secrets,
|
||||
originalStorageConfig,
|
||||
nil,
|
||||
@@ -118,7 +118,7 @@ func NewRESTOptionsGetterForFileXX(path string,
|
||||
}
|
||||
|
||||
return NewRESTOptionsGetterForClient(
|
||||
resource.NewLocalResourceClient(server),
|
||||
resource.NewLocalResourceClient(server, nil),
|
||||
nil, // secrets
|
||||
originalStorageConfig,
|
||||
nil,
|
||||
|
||||
@@ -156,7 +156,7 @@ func testSetup(t testing.TB, opts ...setupOption) (context.Context, storage.Inte
|
||||
default:
|
||||
t.Fatalf("unsupported storage type: %s", setupOpts.storageType)
|
||||
}
|
||||
client := resource.NewLocalResourceClient(server)
|
||||
client := resource.NewLocalResourceClient(server, nil)
|
||||
|
||||
config := storagebackend.NewDefaultConfig(setupOpts.prefix, setupOpts.codec)
|
||||
store, destroyFunc, err := apistore.NewStorage(
|
||||
|
||||
@@ -7,8 +7,6 @@ import (
|
||||
"time"
|
||||
|
||||
badger "github.com/dgraph-io/badger/v4"
|
||||
"github.com/fullstorydev/grpchan"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
otgrpc "github.com/opentracing-contrib/go-grpc"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
@@ -23,6 +21,7 @@ import (
|
||||
"github.com/grafana/dskit/grpcclient"
|
||||
"github.com/grafana/dskit/middleware"
|
||||
"github.com/grafana/dskit/services"
|
||||
|
||||
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
|
||||
infraDB "github.com/grafana/grafana/pkg/infra/db"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
@@ -33,7 +32,6 @@ import (
|
||||
"github.com/grafana/grafana/pkg/storage/legacysql"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/federated"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||
grpcUtils "github.com/grafana/grafana/pkg/storage/unified/resource/grpc"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/search"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/sql"
|
||||
"github.com/grafana/grafana/pkg/util/scheduler"
|
||||
@@ -94,27 +92,6 @@ func ProvideUnifiedStorageClient(opts *Options,
|
||||
return client, err
|
||||
}
|
||||
|
||||
// TODO use wire to provide to module server
|
||||
func NewSearchClient(opts options.StorageOptions, features featuremgmt.FeatureToggles) (resourcepb.ResourceIndexClient, error) {
|
||||
if opts.SearchServerAddress == "" {
|
||||
return nil, fmt.Errorf("expecting address for search server")
|
||||
}
|
||||
|
||||
var (
|
||||
conn grpc.ClientConnInterface
|
||||
err error
|
||||
metrics = newClientMetrics(prometheus.NewRegistry())
|
||||
)
|
||||
|
||||
conn, err = newGrpcConn(opts.SearchServerAddress, metrics, features, opts.GrpcClientKeepaliveTime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cc := grpchan.InterceptClientConn(conn, grpcUtils.UnaryClientInterceptor, grpcUtils.StreamClientInterceptor)
|
||||
return resourcepb.NewResourceIndexClient(cc), nil
|
||||
}
|
||||
|
||||
func newClient(opts options.StorageOptions,
|
||||
cfg *setting.Cfg,
|
||||
features featuremgmt.FeatureToggles,
|
||||
@@ -159,7 +136,7 @@ func newClient(opts options.StorageOptions,
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resource.NewLocalResourceClient(server), nil
|
||||
return resource.NewLocalResourceClient(server, nil), nil
|
||||
|
||||
case options.StorageTypeUnifiedGrpc:
|
||||
if opts.Address == "" {
|
||||
@@ -248,11 +225,11 @@ func newClient(opts options.StorageOptions,
|
||||
serverOptions.OverridesService = overridesSvc
|
||||
}
|
||||
|
||||
server, err := sql.NewResourceServer(serverOptions)
|
||||
server, searchServer, err := sql.NewResourceServer(serverOptions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resource.NewLocalResourceClient(server), nil
|
||||
return resource.NewLocalResourceClient(server, searchServer), nil
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -31,11 +31,14 @@ import (
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
)
|
||||
|
||||
type SearchClient interface {
|
||||
resourcepb.ResourceIndexClient
|
||||
resourcepb.ManagedObjectIndexClient
|
||||
}
|
||||
|
||||
//go:generate mockery --name ResourceClient --structname MockResourceClient --inpackage --filename client_mock.go --with-expecter
|
||||
type ResourceClient interface {
|
||||
resourcepb.ResourceStoreClient
|
||||
resourcepb.ResourceIndexClient
|
||||
resourcepb.ManagedObjectIndexClient
|
||||
resourcepb.BulkStoreClient
|
||||
resourcepb.BlobStoreClient
|
||||
resourcepb.DiagnosticsClient
|
||||
@@ -100,8 +103,6 @@ func NewLocalResourceClient(server ResourceServer) ResourceClient {
|
||||
grpcAuthInt := grpcutils.NewUnsafeAuthenticator(tracer)
|
||||
for _, desc := range []*grpc.ServiceDesc{
|
||||
&resourcepb.ResourceStore_ServiceDesc,
|
||||
&resourcepb.ResourceIndex_ServiceDesc,
|
||||
&resourcepb.ManagedObjectIndex_ServiceDesc,
|
||||
&resourcepb.BlobStore_ServiceDesc,
|
||||
&resourcepb.BulkStore_ServiceDesc,
|
||||
&resourcepb.Diagnostics_ServiceDesc,
|
||||
|
||||
@@ -127,7 +127,10 @@ type SearchBackend interface {
|
||||
GetOpenIndexes() []NamespacedResource
|
||||
}
|
||||
|
||||
// This supports indexing+search regardless of implementation
|
||||
var _ SearchServer = &searchSupport{}
|
||||
|
||||
// This supports indexing+search regardless of implementation.
|
||||
// Implements SearchServer interface.
|
||||
type searchSupport struct {
|
||||
log log.Logger
|
||||
storage StorageBackend
|
||||
@@ -160,6 +163,10 @@ var (
|
||||
_ resourcepb.ManagedObjectIndexServer = (*searchSupport)(nil)
|
||||
)
|
||||
|
||||
func NewSearchServer(opts SearchOptions, storage StorageBackend, access types.AccessClient, blob BlobSupport, indexMetrics *BleveIndexMetrics, ownsIndexFn func(key NamespacedResource) (bool, error)) (SearchServer, error) {
|
||||
return newSearchSupport(opts, storage, access, blob, indexMetrics, ownsIndexFn)
|
||||
}
|
||||
|
||||
func newSearchSupport(opts SearchOptions, storage StorageBackend, access types.AccessClient, blob BlobSupport, indexMetrics *BleveIndexMetrics, ownsIndexFn func(key NamespacedResource) (bool, error)) (support *searchSupport, err error) {
|
||||
// No backend search support
|
||||
if opts.Backend == nil {
|
||||
@@ -598,6 +605,15 @@ func (s *searchSupport) buildIndexes(ctx context.Context) (int, error) {
|
||||
return totalBatchesIndexed, nil
|
||||
}
|
||||
|
||||
func (s *searchSupport) Init(ctx context.Context) error {
|
||||
return s.init(ctx)
|
||||
}
|
||||
|
||||
func (s *searchSupport) Stop(_ context.Context) error {
|
||||
s.stop()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *searchSupport) init(ctx context.Context) error {
|
||||
origCtx := ctx
|
||||
|
||||
|
||||
@@ -60,7 +60,7 @@ func ProvideSearchDistributorServer(cfg *setting.Cfg, features featuremgmt.Featu
|
||||
}
|
||||
|
||||
type RingClient struct {
|
||||
Client ResourceClient
|
||||
Client SearchClient
|
||||
grpc_health_v1.HealthClient
|
||||
Conn *grpc.ClientConn
|
||||
}
|
||||
@@ -99,7 +99,7 @@ var (
|
||||
func (ds *distributorServer) Search(ctx context.Context, r *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) {
|
||||
ctx, span := ds.tracing.Start(ctx, "distributor.Search")
|
||||
defer span.End()
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace, "Search")
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -110,7 +110,7 @@ func (ds *distributorServer) Search(ctx context.Context, r *resourcepb.ResourceS
|
||||
func (ds *distributorServer) GetStats(ctx context.Context, r *resourcepb.ResourceStatsRequest) (*resourcepb.ResourceStatsResponse, error) {
|
||||
ctx, span := ds.tracing.Start(ctx, "distributor.GetStats")
|
||||
defer span.End()
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace, "GetStats")
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -215,7 +215,7 @@ func (ds *distributorServer) RebuildIndexes(ctx context.Context, r *resourcepb.R
|
||||
func (ds *distributorServer) CountManagedObjects(ctx context.Context, r *resourcepb.CountManagedObjectsRequest) (*resourcepb.CountManagedObjectsResponse, error) {
|
||||
ctx, span := ds.tracing.Start(ctx, "distributor.CountManagedObjects")
|
||||
defer span.End()
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace, "CountManagedObjects")
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -226,7 +226,7 @@ func (ds *distributorServer) CountManagedObjects(ctx context.Context, r *resourc
|
||||
func (ds *distributorServer) ListManagedObjects(ctx context.Context, r *resourcepb.ListManagedObjectsRequest) (*resourcepb.ListManagedObjectsResponse, error) {
|
||||
ctx, span := ds.tracing.Start(ctx, "distributor.ListManagedObjects")
|
||||
defer span.End()
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace, "ListManagedObjects")
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -234,7 +234,7 @@ func (ds *distributorServer) ListManagedObjects(ctx context.Context, r *resource
|
||||
return client.ListManagedObjects(ctx, r)
|
||||
}
|
||||
|
||||
func (ds *distributorServer) getClientToDistributeRequest(ctx context.Context, namespace string, methodName string) (context.Context, ResourceClient, error) {
|
||||
func (ds *distributorServer) getClientToDistributeRequest(ctx context.Context, namespace string) (context.Context, SearchClient, error) {
|
||||
ringHasher := fnv.New32a()
|
||||
_, err := ringHasher.Write([]byte(namespace))
|
||||
if err != nil {
|
||||
|
||||
@@ -32,12 +32,17 @@ import (
|
||||
|
||||
var tracer = otel.Tracer("github.com/grafana/grafana/pkg/storage/unified/resource")
|
||||
|
||||
type SearchServer interface {
|
||||
LifecycleHooks
|
||||
|
||||
resourcepb.ResourceIndexServer
|
||||
resourcepb.ManagedObjectIndexServer
|
||||
}
|
||||
|
||||
// ResourceServer implements all gRPC services
|
||||
type ResourceServer interface {
|
||||
resourcepb.ResourceStoreServer
|
||||
resourcepb.BulkStoreServer
|
||||
resourcepb.ResourceIndexServer
|
||||
resourcepb.ManagedObjectIndexServer
|
||||
resourcepb.BlobStoreServer
|
||||
resourcepb.DiagnosticsServer
|
||||
resourcepb.QuotasServer
|
||||
@@ -220,10 +225,8 @@ type ResourceServerOptions struct {
|
||||
Blob BlobConfig
|
||||
|
||||
// Search options
|
||||
Search SearchOptions
|
||||
|
||||
// to be used by storage
|
||||
SearchClient resourcepb.ResourceIndexClient
|
||||
SearchOptions SearchOptions // TODO: needed?
|
||||
Search SearchServer
|
||||
|
||||
// Quota service
|
||||
OverridesService *OverridesService
|
||||
@@ -252,16 +255,12 @@ type ResourceServerOptions struct {
|
||||
|
||||
storageMetrics *StorageMetrics
|
||||
|
||||
IndexMetrics *BleveIndexMetrics
|
||||
|
||||
// MaxPageSizeBytes is the maximum size of a page in bytes.
|
||||
MaxPageSizeBytes int
|
||||
|
||||
// QOSQueue is the quality of service queue used to enqueue
|
||||
QOSQueue QOSEnqueuer
|
||||
QOSConfig QueueConfig
|
||||
|
||||
OwnsIndexFn func(key NamespacedResource) (bool, error)
|
||||
}
|
||||
|
||||
func NewResourceServer(opts ResourceServerOptions) (*server, error) {
|
||||
@@ -344,24 +343,25 @@ func NewResourceServer(opts ResourceServerOptions) (*server, error) {
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
storageMetrics: opts.storageMetrics,
|
||||
indexMetrics: opts.IndexMetrics,
|
||||
maxPageSizeBytes: opts.MaxPageSizeBytes,
|
||||
reg: opts.Reg,
|
||||
queue: opts.QOSQueue,
|
||||
queueConfig: opts.QOSConfig,
|
||||
overridesService: opts.OverridesService,
|
||||
searchClient: opts.SearchClient,
|
||||
search: opts.Search,
|
||||
|
||||
artificialSuccessfulWriteDelay: opts.Search.IndexMinUpdateInterval,
|
||||
artificialSuccessfulWriteDelay: opts.SearchOptions.IndexMinUpdateInterval,
|
||||
}
|
||||
|
||||
if opts.Search.Resources != nil {
|
||||
var err error
|
||||
s.search, err = newSearchSupport(opts.Search, s.backend, s.access, s.blob, opts.IndexMetrics, opts.OwnsIndexFn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
/*
|
||||
if opts.Search.Resources != nil {
|
||||
var err error
|
||||
s.search, err = newSearchSupport(opts.Search, s.backend, s.access, s.blob, opts.IndexMetrics, opts.OwnsIndexFn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
err := s.Init(ctx)
|
||||
if err != nil {
|
||||
@@ -379,7 +379,7 @@ type server struct {
|
||||
backend StorageBackend
|
||||
blob BlobSupport
|
||||
secure secrets.InlineSecureValueSupport
|
||||
search *searchSupport
|
||||
search SearchServer
|
||||
diagnostics resourcepb.DiagnosticsServer
|
||||
access claims.AccessClient
|
||||
writeHooks WriteAccessHooks
|
||||
@@ -390,9 +390,6 @@ type server struct {
|
||||
indexMetrics *BleveIndexMetrics
|
||||
overridesService *OverridesService
|
||||
|
||||
// only to be used with storage server for field selector search
|
||||
searchClient resourcepb.ResourceIndexClient
|
||||
|
||||
// Background watch task -- this has permissions for everything
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
@@ -429,11 +426,6 @@ func (s *server) Init(ctx context.Context) error {
|
||||
s.initErr = s.overridesService.init(ctx)
|
||||
}
|
||||
|
||||
// initialize the search index
|
||||
if s.initErr == nil && s.search != nil {
|
||||
s.initErr = s.search.init(ctx)
|
||||
}
|
||||
|
||||
// Start watching for changes
|
||||
if s.initErr == nil {
|
||||
s.initErr = s.initWatcher()
|
||||
@@ -458,10 +450,6 @@ func (s *server) Stop(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
if s.search != nil {
|
||||
s.search.stop()
|
||||
}
|
||||
|
||||
if s.overridesService != nil {
|
||||
if err := s.overridesService.stop(ctx); err != nil {
|
||||
stopFailed = true
|
||||
@@ -1066,8 +1054,7 @@ func (s *server) List(ctx context.Context, req *resourcepb.ListRequest) (*resour
|
||||
// TODO: What to do about RV and version_match fields?
|
||||
// If we get here, we're doing list with selectable fields. Let's do search instead, since
|
||||
// we index all selectable fields, and fetch resulting documents one by one.
|
||||
|
||||
if (s.search != nil || s.searchClient != nil) && req.Source == resourcepb.ListRequest_STORE && (len(req.Options.Fields) > 0) {
|
||||
if s.search != nil && req.Source == resourcepb.ListRequest_STORE && (len(req.Options.Fields) > 0) {
|
||||
if req.Options.Key.Namespace == "" {
|
||||
return &resourcepb.ListResponse{
|
||||
Error: NewBadRequestError("namespace must be specified for list with filter"),
|
||||
@@ -1083,13 +1070,7 @@ func (s *server) List(ctx context.Context, req *resourcepb.ListRequest) (*resour
|
||||
// Permission: 0, // Not needed, default is List
|
||||
}
|
||||
|
||||
var searchResp *resourcepb.ResourceSearchResponse
|
||||
var err error
|
||||
if s.searchClient != nil {
|
||||
searchResp, err = s.searchClient.Search(ctx, srq)
|
||||
} else {
|
||||
searchResp, err = s.search.Search(ctx, srq)
|
||||
}
|
||||
searchResp, err := s.search.Search(ctx, srq)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -1446,47 +1427,6 @@ func (s *server) Watch(req *resourcepb.WatchRequest, srv resourcepb.ResourceStor
|
||||
}
|
||||
}
|
||||
|
||||
func (s *server) Search(ctx context.Context, req *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) {
|
||||
if s.search == nil {
|
||||
return nil, fmt.Errorf("search index not configured")
|
||||
}
|
||||
|
||||
return s.search.Search(ctx, req)
|
||||
}
|
||||
|
||||
// GetStats implements ResourceServer.
|
||||
func (s *server) GetStats(ctx context.Context, req *resourcepb.ResourceStatsRequest) (*resourcepb.ResourceStatsResponse, error) {
|
||||
if err := s.Init(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if s.search == nil {
|
||||
// If the backend implements "GetStats", we can use it
|
||||
srv, ok := s.backend.(resourcepb.ResourceIndexServer)
|
||||
if ok {
|
||||
return srv.GetStats(ctx, req)
|
||||
}
|
||||
return nil, fmt.Errorf("search index not configured")
|
||||
}
|
||||
return s.search.GetStats(ctx, req)
|
||||
}
|
||||
|
||||
func (s *server) ListManagedObjects(ctx context.Context, req *resourcepb.ListManagedObjectsRequest) (*resourcepb.ListManagedObjectsResponse, error) {
|
||||
if s.search == nil {
|
||||
return nil, fmt.Errorf("search index not configured")
|
||||
}
|
||||
|
||||
return s.search.ListManagedObjects(ctx, req)
|
||||
}
|
||||
|
||||
func (s *server) CountManagedObjects(ctx context.Context, req *resourcepb.CountManagedObjectsRequest) (*resourcepb.CountManagedObjectsResponse, error) {
|
||||
if s.search == nil {
|
||||
return nil, fmt.Errorf("search index not configured")
|
||||
}
|
||||
|
||||
return s.search.CountManagedObjects(ctx, req)
|
||||
}
|
||||
|
||||
// IsHealthy implements ResourceServer.
|
||||
func (s *server) IsHealthy(ctx context.Context, req *resourcepb.HealthCheckRequest) (*resourcepb.HealthCheckResponse, error) {
|
||||
return s.diagnostics.IsHealthy(ctx, req)
|
||||
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
||||
@@ -38,7 +37,6 @@ type ServerOptions struct {
|
||||
Tracer trace.Tracer
|
||||
Reg prometheus.Registerer
|
||||
AccessClient types.AccessClient
|
||||
SearchClient resourcepb.ResourceIndexClient
|
||||
SearchOptions resource.SearchOptions
|
||||
StorageMetrics *resource.StorageMetrics
|
||||
IndexMetrics *resource.BleveIndexMetrics
|
||||
@@ -48,7 +46,7 @@ type ServerOptions struct {
|
||||
OwnsIndexFn func(key resource.NamespacedResource) (bool, error)
|
||||
}
|
||||
|
||||
func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
||||
func NewResourceServer(opts ServerOptions) (resource.ResourceServer, resource.SearchServer, error) {
|
||||
apiserverCfg := opts.Cfg.SectionWithEnvOverrides("grafana-apiserver")
|
||||
|
||||
if opts.SecureValues == nil && opts.Cfg != nil && opts.Cfg.SecretsManagement.GrpcClientEnable {
|
||||
@@ -59,7 +57,7 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
||||
nil, // not needed for gRPC client mode
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create inline secure value service: %w", err)
|
||||
return nil, nil, fmt.Errorf("failed to create inline secure value service: %w", err)
|
||||
}
|
||||
opts.SecureValues = inlineSecureValueService
|
||||
}
|
||||
@@ -79,7 +77,7 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
||||
dir := strings.Replace(serverOptions.Blob.URL, "./data", opts.Cfg.DataPath, 1)
|
||||
err := os.MkdirAll(dir, 0700)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
serverOptions.Blob.URL = "file:///" + dir
|
||||
}
|
||||
@@ -96,7 +94,7 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
||||
} else {
|
||||
eDB, err := dbimpl.ProvideResourceDB(opts.DB, opts.Cfg, opts.Tracer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
isHA := isHighAvailabilityEnabled(opts.Cfg.SectionWithEnvOverrides("database"),
|
||||
@@ -110,24 +108,31 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
||||
LastImportTimeMaxAge: opts.SearchOptions.MaxIndexAge, // No need to keep last_import_times older than max index age.
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
serverOptions.Backend = backend
|
||||
serverOptions.Diagnostics = backend
|
||||
serverOptions.Lifecycle = backend
|
||||
}
|
||||
|
||||
// use the search client when search isnt initialized. Dont need both.
|
||||
if opts.SearchOptions.Resources == nil {
|
||||
serverOptions.SearchClient = opts.SearchClient
|
||||
search, err := resource.NewSearchServer(opts.SearchOptions, opts.Backend, opts.AccessClient, nil, opts.IndexMetrics, opts.OwnsIndexFn)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to initialize search: %w", err)
|
||||
}
|
||||
serverOptions.Search = opts.SearchOptions
|
||||
serverOptions.IndexMetrics = opts.IndexMetrics
|
||||
|
||||
if err := search.Init(context.Background()); err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to initialize search: %w", err)
|
||||
}
|
||||
|
||||
serverOptions.Search = search
|
||||
serverOptions.QOSQueue = opts.QOSQueue
|
||||
serverOptions.OwnsIndexFn = opts.OwnsIndexFn
|
||||
serverOptions.OverridesService = opts.OverridesService
|
||||
|
||||
return resource.NewResourceServer(serverOptions)
|
||||
rs, err := resource.NewResourceServer(serverOptions)
|
||||
if err != nil {
|
||||
_ = search.Stop(context.Background())
|
||||
}
|
||||
return rs, nil, err
|
||||
}
|
||||
|
||||
// isHighAvailabilityEnabled determines if high availability mode should
|
||||
|
||||
@@ -59,13 +59,12 @@ type service struct {
|
||||
subservicesWatcher *services.FailureWatcher
|
||||
hasSubservices bool
|
||||
|
||||
backend resource.StorageBackend
|
||||
searchClient resourcepb.ResourceIndexClient
|
||||
cfg *setting.Cfg
|
||||
features featuremgmt.FeatureToggles
|
||||
db infraDB.DB
|
||||
stopCh chan struct{}
|
||||
stoppedCh chan error
|
||||
backend resource.StorageBackend
|
||||
cfg *setting.Cfg
|
||||
features featuremgmt.FeatureToggles
|
||||
db infraDB.DB
|
||||
stopCh chan struct{}
|
||||
stoppedCh chan error
|
||||
|
||||
handler grpcserver.Provider
|
||||
|
||||
@@ -100,7 +99,6 @@ func ProvideUnifiedStorageGrpcService(
|
||||
memberlistKVConfig kv.Config,
|
||||
httpServerRouter *mux.Router,
|
||||
backend resource.StorageBackend,
|
||||
searchClient resourcepb.ResourceIndexClient,
|
||||
) (UnifiedStorageGrpcService, error) {
|
||||
var err error
|
||||
tracer := otel.Tracer("unified-storage")
|
||||
@@ -114,7 +112,6 @@ func ProvideUnifiedStorageGrpcService(
|
||||
|
||||
s := &service{
|
||||
backend: backend,
|
||||
searchClient: searchClient,
|
||||
cfg: cfg,
|
||||
features: features,
|
||||
stopCh: make(chan struct{}),
|
||||
@@ -275,7 +272,6 @@ func (s *service) starting(ctx context.Context) error {
|
||||
Tracer: s.tracing,
|
||||
Reg: s.reg,
|
||||
AccessClient: authzClient,
|
||||
SearchClient: s.searchClient,
|
||||
SearchOptions: searchOptions,
|
||||
StorageMetrics: s.storageMetrics,
|
||||
IndexMetrics: s.indexMetrics,
|
||||
@@ -295,7 +291,7 @@ func (s *service) starting(ctx context.Context) error {
|
||||
serverOptions.OverridesService = overridesSvc
|
||||
}
|
||||
|
||||
server, err := NewResourceServer(serverOptions)
|
||||
server, searchServer, err := NewResourceServer(serverOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -312,8 +308,8 @@ func (s *service) starting(ctx context.Context) error {
|
||||
srv := s.handler.GetServer()
|
||||
resourcepb.RegisterResourceStoreServer(srv, server)
|
||||
resourcepb.RegisterBulkStoreServer(srv, server)
|
||||
resourcepb.RegisterResourceIndexServer(srv, server)
|
||||
resourcepb.RegisterManagedObjectIndexServer(srv, server)
|
||||
resourcepb.RegisterResourceIndexServer(srv, searchServer)
|
||||
resourcepb.RegisterManagedObjectIndexServer(srv, searchServer)
|
||||
resourcepb.RegisterBlobStoreServer(srv, server)
|
||||
resourcepb.RegisterDiagnosticsServer(srv, server)
|
||||
resourcepb.RegisterQuotasServer(srv, server)
|
||||
|
||||
Reference in New Issue
Block a user