Compare commits
1 Commits
selectable
...
selectable
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c7293f6fb5 |
@@ -38,10 +38,10 @@ func (d *directResourceClient) GetBlob(ctx context.Context, in *resourcepb.GetBl
|
||||
return d.server.GetBlob(ctx, in)
|
||||
}
|
||||
|
||||
// GetStats implements ResourceClient.
|
||||
func (d *directResourceClient) GetStats(ctx context.Context, in *resourcepb.ResourceStatsRequest, opts ...grpc.CallOption) (*resourcepb.ResourceStatsResponse, error) {
|
||||
return d.server.GetStats(ctx, in)
|
||||
}
|
||||
//// GetStats implements ResourceClient.
|
||||
//func (d *directResourceClient) GetStats(ctx context.Context, in *resourcepb.ResourceStatsRequest, opts ...grpc.CallOption) (*resourcepb.ResourceStatsResponse, error) {
|
||||
// return d.server.GetStats(ctx, in)
|
||||
//}
|
||||
|
||||
// IsHealthy implements ResourceClient.
|
||||
func (d *directResourceClient) IsHealthy(ctx context.Context, in *resourcepb.HealthCheckRequest, opts ...grpc.CallOption) (*resourcepb.HealthCheckResponse, error) {
|
||||
@@ -53,13 +53,13 @@ func (d *directResourceClient) List(ctx context.Context, in *resourcepb.ListRequ
|
||||
return d.server.List(ctx, in)
|
||||
}
|
||||
|
||||
func (d *directResourceClient) ListManagedObjects(ctx context.Context, in *resourcepb.ListManagedObjectsRequest, opts ...grpc.CallOption) (*resourcepb.ListManagedObjectsResponse, error) {
|
||||
return d.server.ListManagedObjects(ctx, in)
|
||||
}
|
||||
|
||||
func (d *directResourceClient) CountManagedObjects(ctx context.Context, in *resourcepb.CountManagedObjectsRequest, opts ...grpc.CallOption) (*resourcepb.CountManagedObjectsResponse, error) {
|
||||
return d.server.CountManagedObjects(ctx, in)
|
||||
}
|
||||
//func (d *directResourceClient) ListManagedObjects(ctx context.Context, in *resourcepb.ListManagedObjectsRequest, opts ...grpc.CallOption) (*resourcepb.ListManagedObjectsResponse, error) {
|
||||
// return d.server.ListManagedObjects(ctx, in)
|
||||
//}
|
||||
//
|
||||
//func (d *directResourceClient) CountManagedObjects(ctx context.Context, in *resourcepb.CountManagedObjectsRequest, opts ...grpc.CallOption) (*resourcepb.CountManagedObjectsResponse, error) {
|
||||
// return d.server.CountManagedObjects(ctx, in)
|
||||
//}
|
||||
|
||||
// PutBlob implements ResourceClient.
|
||||
func (d *directResourceClient) PutBlob(ctx context.Context, in *resourcepb.PutBlobRequest, opts ...grpc.CallOption) (*resourcepb.PutBlobResponse, error) {
|
||||
@@ -72,9 +72,9 @@ func (d *directResourceClient) Read(ctx context.Context, in *resourcepb.ReadRequ
|
||||
}
|
||||
|
||||
// Search implements ResourceClient.
|
||||
func (d *directResourceClient) Search(ctx context.Context, in *resourcepb.ResourceSearchRequest, opts ...grpc.CallOption) (*resourcepb.ResourceSearchResponse, error) {
|
||||
return d.server.Search(ctx, in)
|
||||
}
|
||||
//func (d *directResourceClient) Search(ctx context.Context, in *resourcepb.ResourceSearchRequest, opts ...grpc.CallOption) (*resourcepb.ResourceSearchResponse, error) {
|
||||
// return d.server.Search(ctx, in)
|
||||
//}
|
||||
|
||||
// Update implements ResourceClient.
|
||||
func (d *directResourceClient) Update(ctx context.Context, in *resourcepb.UpdateRequest, opts ...grpc.CallOption) (*resourcepb.UpdateResponse, error) {
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
|
||||
"github.com/grafana/authlib/types"
|
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||||
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
|
||||
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
|
||||
|
||||
@@ -41,7 +41,6 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/folder"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/apistore"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
)
|
||||
|
||||
@@ -78,7 +77,7 @@ func RegisterAPIService(cfg *setting.Cfg,
|
||||
acService accesscontrol.Service,
|
||||
accessClient authlib.AccessClient,
|
||||
registerer prometheus.Registerer,
|
||||
unified resource.ResourceClient,
|
||||
unified resourcepb.ResourceIndexClient,
|
||||
zanzanaClient zanzana.Client,
|
||||
) *FolderAPIBuilder {
|
||||
builder := &FolderAPIBuilder{
|
||||
@@ -97,7 +96,7 @@ func RegisterAPIService(cfg *setting.Cfg,
|
||||
return builder
|
||||
}
|
||||
|
||||
func NewAPIService(ac authlib.AccessClient, searcher resource.ResourceClient, features featuremgmt.FeatureToggles, zanzanaClient zanzana.Client, resourcePermissionsSvc *dynamic.NamespaceableResourceInterface) *FolderAPIBuilder {
|
||||
func NewAPIService(ac authlib.AccessClient, searcher resourcepb.ResourceIndexClient, features featuremgmt.FeatureToggles, zanzanaClient zanzana.Client, resourcePermissionsSvc *dynamic.NamespaceableResourceInterface) *FolderAPIBuilder {
|
||||
return &FolderAPIBuilder{
|
||||
features: features,
|
||||
accessClient: ac,
|
||||
|
||||
@@ -615,7 +615,7 @@ func NewLocalStore(resourceInfo utils.ResourceInfo, scheme *runtime.Scheme, defa
|
||||
return nil, err
|
||||
}
|
||||
|
||||
client := resource.NewLocalResourceClient(server)
|
||||
client := resource.NewLocalResourceClient(server, nil)
|
||||
optsGetter := apistore.NewRESTOptionsGetterForClient(client, nil, defaultOpts.StorageConfig.Config, nil)
|
||||
|
||||
store, err := grafanaregistry.NewRegistryStore(scheme, resourceInfo, optsGetter)
|
||||
|
||||
@@ -79,7 +79,7 @@ func NewRESTOptionsGetterMemory(originalStorageConfig storagebackend.Config, sec
|
||||
}
|
||||
|
||||
return NewRESTOptionsGetterForClient(
|
||||
resource.NewLocalResourceClient(server),
|
||||
resource.NewLocalResourceClient(server, nil),
|
||||
secrets,
|
||||
originalStorageConfig,
|
||||
nil,
|
||||
@@ -118,7 +118,7 @@ func NewRESTOptionsGetterForFileXX(path string,
|
||||
}
|
||||
|
||||
return NewRESTOptionsGetterForClient(
|
||||
resource.NewLocalResourceClient(server),
|
||||
resource.NewLocalResourceClient(server, nil),
|
||||
nil, // secrets
|
||||
originalStorageConfig,
|
||||
nil,
|
||||
|
||||
@@ -156,7 +156,7 @@ func testSetup(t testing.TB, opts ...setupOption) (context.Context, storage.Inte
|
||||
default:
|
||||
t.Fatalf("unsupported storage type: %s", setupOpts.storageType)
|
||||
}
|
||||
client := resource.NewLocalResourceClient(server)
|
||||
client := resource.NewLocalResourceClient(server, nil)
|
||||
|
||||
config := storagebackend.NewDefaultConfig(setupOpts.prefix, setupOpts.codec)
|
||||
store, destroyFunc, err := apistore.NewStorage(
|
||||
|
||||
@@ -21,6 +21,7 @@ import (
|
||||
"github.com/grafana/dskit/grpcclient"
|
||||
"github.com/grafana/dskit/middleware"
|
||||
"github.com/grafana/dskit/services"
|
||||
|
||||
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
|
||||
infraDB "github.com/grafana/grafana/pkg/infra/db"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
@@ -135,7 +136,7 @@ func newClient(opts options.StorageOptions,
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resource.NewLocalResourceClient(server), nil
|
||||
return resource.NewLocalResourceClient(server, nil), nil
|
||||
|
||||
case options.StorageTypeUnifiedGrpc:
|
||||
if opts.Address == "" {
|
||||
@@ -224,11 +225,11 @@ func newClient(opts options.StorageOptions,
|
||||
serverOptions.OverridesService = overridesSvc
|
||||
}
|
||||
|
||||
server, err := sql.NewResourceServer(serverOptions)
|
||||
server, searchServer, err := sql.NewResourceServer(serverOptions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resource.NewLocalResourceClient(server), nil
|
||||
return resource.NewLocalResourceClient(server, searchServer), nil
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -31,11 +31,14 @@ import (
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
)
|
||||
|
||||
type SearchClient interface {
|
||||
resourcepb.ResourceIndexClient
|
||||
resourcepb.ManagedObjectIndexClient
|
||||
}
|
||||
|
||||
//go:generate mockery --name ResourceClient --structname MockResourceClient --inpackage --filename client_mock.go --with-expecter
|
||||
type ResourceClient interface {
|
||||
resourcepb.ResourceStoreClient
|
||||
resourcepb.ResourceIndexClient
|
||||
resourcepb.ManagedObjectIndexClient
|
||||
resourcepb.BulkStoreClient
|
||||
resourcepb.BlobStoreClient
|
||||
resourcepb.DiagnosticsClient
|
||||
@@ -100,8 +103,6 @@ func NewLocalResourceClient(server ResourceServer) ResourceClient {
|
||||
grpcAuthInt := grpcutils.NewUnsafeAuthenticator(tracer)
|
||||
for _, desc := range []*grpc.ServiceDesc{
|
||||
&resourcepb.ResourceStore_ServiceDesc,
|
||||
&resourcepb.ResourceIndex_ServiceDesc,
|
||||
&resourcepb.ManagedObjectIndex_ServiceDesc,
|
||||
&resourcepb.BlobStore_ServiceDesc,
|
||||
&resourcepb.BulkStore_ServiceDesc,
|
||||
&resourcepb.Diagnostics_ServiceDesc,
|
||||
|
||||
@@ -127,7 +127,10 @@ type SearchBackend interface {
|
||||
GetOpenIndexes() []NamespacedResource
|
||||
}
|
||||
|
||||
// This supports indexing+search regardless of implementation
|
||||
var _ SearchServer = &searchSupport{}
|
||||
|
||||
// This supports indexing+search regardless of implementation.
|
||||
// Implements SearchServer interface.
|
||||
type searchSupport struct {
|
||||
log log.Logger
|
||||
storage StorageBackend
|
||||
@@ -160,6 +163,10 @@ var (
|
||||
_ resourcepb.ManagedObjectIndexServer = (*searchSupport)(nil)
|
||||
)
|
||||
|
||||
func NewSearchServer(opts SearchOptions, storage StorageBackend, access types.AccessClient, blob BlobSupport, indexMetrics *BleveIndexMetrics, ownsIndexFn func(key NamespacedResource) (bool, error)) (SearchServer, error) {
|
||||
return newSearchSupport(opts, storage, access, blob, indexMetrics, ownsIndexFn)
|
||||
}
|
||||
|
||||
func newSearchSupport(opts SearchOptions, storage StorageBackend, access types.AccessClient, blob BlobSupport, indexMetrics *BleveIndexMetrics, ownsIndexFn func(key NamespacedResource) (bool, error)) (support *searchSupport, err error) {
|
||||
// No backend search support
|
||||
if opts.Backend == nil {
|
||||
@@ -598,6 +605,15 @@ func (s *searchSupport) buildIndexes(ctx context.Context) (int, error) {
|
||||
return totalBatchesIndexed, nil
|
||||
}
|
||||
|
||||
func (s *searchSupport) Init(ctx context.Context) error {
|
||||
return s.init(ctx)
|
||||
}
|
||||
|
||||
func (s *searchSupport) Stop(_ context.Context) error {
|
||||
s.stop()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *searchSupport) init(ctx context.Context) error {
|
||||
origCtx := ctx
|
||||
|
||||
|
||||
@@ -60,7 +60,7 @@ func ProvideSearchDistributorServer(cfg *setting.Cfg, features featuremgmt.Featu
|
||||
}
|
||||
|
||||
type RingClient struct {
|
||||
Client ResourceClient
|
||||
Client SearchClient
|
||||
grpc_health_v1.HealthClient
|
||||
Conn *grpc.ClientConn
|
||||
}
|
||||
@@ -99,7 +99,7 @@ var (
|
||||
func (ds *distributorServer) Search(ctx context.Context, r *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) {
|
||||
ctx, span := ds.tracing.Start(ctx, "distributor.Search")
|
||||
defer span.End()
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace, "Search")
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -110,7 +110,7 @@ func (ds *distributorServer) Search(ctx context.Context, r *resourcepb.ResourceS
|
||||
func (ds *distributorServer) GetStats(ctx context.Context, r *resourcepb.ResourceStatsRequest) (*resourcepb.ResourceStatsResponse, error) {
|
||||
ctx, span := ds.tracing.Start(ctx, "distributor.GetStats")
|
||||
defer span.End()
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace, "GetStats")
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -215,7 +215,7 @@ func (ds *distributorServer) RebuildIndexes(ctx context.Context, r *resourcepb.R
|
||||
func (ds *distributorServer) CountManagedObjects(ctx context.Context, r *resourcepb.CountManagedObjectsRequest) (*resourcepb.CountManagedObjectsResponse, error) {
|
||||
ctx, span := ds.tracing.Start(ctx, "distributor.CountManagedObjects")
|
||||
defer span.End()
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace, "CountManagedObjects")
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -226,7 +226,7 @@ func (ds *distributorServer) CountManagedObjects(ctx context.Context, r *resourc
|
||||
func (ds *distributorServer) ListManagedObjects(ctx context.Context, r *resourcepb.ListManagedObjectsRequest) (*resourcepb.ListManagedObjectsResponse, error) {
|
||||
ctx, span := ds.tracing.Start(ctx, "distributor.ListManagedObjects")
|
||||
defer span.End()
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace, "ListManagedObjects")
|
||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -234,7 +234,7 @@ func (ds *distributorServer) ListManagedObjects(ctx context.Context, r *resource
|
||||
return client.ListManagedObjects(ctx, r)
|
||||
}
|
||||
|
||||
func (ds *distributorServer) getClientToDistributeRequest(ctx context.Context, namespace string, methodName string) (context.Context, ResourceClient, error) {
|
||||
func (ds *distributorServer) getClientToDistributeRequest(ctx context.Context, namespace string) (context.Context, SearchClient, error) {
|
||||
ringHasher := fnv.New32a()
|
||||
_, err := ringHasher.Write([]byte(namespace))
|
||||
if err != nil {
|
||||
|
||||
@@ -32,12 +32,17 @@ import (
|
||||
|
||||
var tracer = otel.Tracer("github.com/grafana/grafana/pkg/storage/unified/resource")
|
||||
|
||||
type SearchServer interface {
|
||||
LifecycleHooks
|
||||
|
||||
resourcepb.ResourceIndexServer
|
||||
resourcepb.ManagedObjectIndexServer
|
||||
}
|
||||
|
||||
// ResourceServer implements all gRPC services
|
||||
type ResourceServer interface {
|
||||
resourcepb.ResourceStoreServer
|
||||
resourcepb.BulkStoreServer
|
||||
resourcepb.ResourceIndexServer
|
||||
resourcepb.ManagedObjectIndexServer
|
||||
resourcepb.BlobStoreServer
|
||||
resourcepb.DiagnosticsServer
|
||||
resourcepb.QuotasServer
|
||||
@@ -220,7 +225,8 @@ type ResourceServerOptions struct {
|
||||
Blob BlobConfig
|
||||
|
||||
// Search options
|
||||
Search SearchOptions
|
||||
SearchOptions SearchOptions // TODO: needed?
|
||||
Search SearchServer
|
||||
|
||||
// Quota service
|
||||
OverridesService *OverridesService
|
||||
@@ -249,16 +255,12 @@ type ResourceServerOptions struct {
|
||||
|
||||
storageMetrics *StorageMetrics
|
||||
|
||||
IndexMetrics *BleveIndexMetrics
|
||||
|
||||
// MaxPageSizeBytes is the maximum size of a page in bytes.
|
||||
MaxPageSizeBytes int
|
||||
|
||||
// QOSQueue is the quality of service queue used to enqueue
|
||||
QOSQueue QOSEnqueuer
|
||||
QOSConfig QueueConfig
|
||||
|
||||
OwnsIndexFn func(key NamespacedResource) (bool, error)
|
||||
}
|
||||
|
||||
func NewResourceServer(opts ResourceServerOptions) (*server, error) {
|
||||
@@ -341,23 +343,25 @@ func NewResourceServer(opts ResourceServerOptions) (*server, error) {
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
storageMetrics: opts.storageMetrics,
|
||||
indexMetrics: opts.IndexMetrics,
|
||||
maxPageSizeBytes: opts.MaxPageSizeBytes,
|
||||
reg: opts.Reg,
|
||||
queue: opts.QOSQueue,
|
||||
queueConfig: opts.QOSConfig,
|
||||
overridesService: opts.OverridesService,
|
||||
search: opts.Search,
|
||||
|
||||
artificialSuccessfulWriteDelay: opts.Search.IndexMinUpdateInterval,
|
||||
artificialSuccessfulWriteDelay: opts.SearchOptions.IndexMinUpdateInterval,
|
||||
}
|
||||
|
||||
if opts.Search.Resources != nil {
|
||||
var err error
|
||||
s.search, err = newSearchSupport(opts.Search, s.backend, s.access, s.blob, opts.IndexMetrics, opts.OwnsIndexFn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
/*
|
||||
if opts.Search.Resources != nil {
|
||||
var err error
|
||||
s.search, err = newSearchSupport(opts.Search, s.backend, s.access, s.blob, opts.IndexMetrics, opts.OwnsIndexFn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
err := s.Init(ctx)
|
||||
if err != nil {
|
||||
@@ -375,7 +379,7 @@ type server struct {
|
||||
backend StorageBackend
|
||||
blob BlobSupport
|
||||
secure secrets.InlineSecureValueSupport
|
||||
search *searchSupport
|
||||
search SearchServer
|
||||
diagnostics resourcepb.DiagnosticsServer
|
||||
access claims.AccessClient
|
||||
writeHooks WriteAccessHooks
|
||||
@@ -422,11 +426,6 @@ func (s *server) Init(ctx context.Context) error {
|
||||
s.initErr = s.overridesService.init(ctx)
|
||||
}
|
||||
|
||||
// initialize the search index
|
||||
if s.initErr == nil && s.search != nil {
|
||||
s.initErr = s.search.init(ctx)
|
||||
}
|
||||
|
||||
// Start watching for changes
|
||||
if s.initErr == nil {
|
||||
s.initErr = s.initWatcher()
|
||||
@@ -451,10 +450,6 @@ func (s *server) Stop(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
if s.search != nil {
|
||||
s.search.stop()
|
||||
}
|
||||
|
||||
if s.overridesService != nil {
|
||||
if err := s.overridesService.stop(ctx); err != nil {
|
||||
stopFailed = true
|
||||
@@ -1432,47 +1427,6 @@ func (s *server) Watch(req *resourcepb.WatchRequest, srv resourcepb.ResourceStor
|
||||
}
|
||||
}
|
||||
|
||||
func (s *server) Search(ctx context.Context, req *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) {
|
||||
if s.search == nil {
|
||||
return nil, fmt.Errorf("search index not configured")
|
||||
}
|
||||
|
||||
return s.search.Search(ctx, req)
|
||||
}
|
||||
|
||||
// GetStats implements ResourceServer.
|
||||
func (s *server) GetStats(ctx context.Context, req *resourcepb.ResourceStatsRequest) (*resourcepb.ResourceStatsResponse, error) {
|
||||
if err := s.Init(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if s.search == nil {
|
||||
// If the backend implements "GetStats", we can use it
|
||||
srv, ok := s.backend.(resourcepb.ResourceIndexServer)
|
||||
if ok {
|
||||
return srv.GetStats(ctx, req)
|
||||
}
|
||||
return nil, fmt.Errorf("search index not configured")
|
||||
}
|
||||
return s.search.GetStats(ctx, req)
|
||||
}
|
||||
|
||||
func (s *server) ListManagedObjects(ctx context.Context, req *resourcepb.ListManagedObjectsRequest) (*resourcepb.ListManagedObjectsResponse, error) {
|
||||
if s.search == nil {
|
||||
return nil, fmt.Errorf("search index not configured")
|
||||
}
|
||||
|
||||
return s.search.ListManagedObjects(ctx, req)
|
||||
}
|
||||
|
||||
func (s *server) CountManagedObjects(ctx context.Context, req *resourcepb.CountManagedObjectsRequest) (*resourcepb.CountManagedObjectsResponse, error) {
|
||||
if s.search == nil {
|
||||
return nil, fmt.Errorf("search index not configured")
|
||||
}
|
||||
|
||||
return s.search.CountManagedObjects(ctx, req)
|
||||
}
|
||||
|
||||
// IsHealthy implements ResourceServer.
|
||||
func (s *server) IsHealthy(ctx context.Context, req *resourcepb.HealthCheckRequest) (*resourcepb.HealthCheckResponse, error) {
|
||||
return s.diagnostics.IsHealthy(ctx, req)
|
||||
|
||||
@@ -46,7 +46,7 @@ type ServerOptions struct {
|
||||
OwnsIndexFn func(key resource.NamespacedResource) (bool, error)
|
||||
}
|
||||
|
||||
func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
||||
func NewResourceServer(opts ServerOptions) (resource.ResourceServer, resource.SearchServer, error) {
|
||||
apiserverCfg := opts.Cfg.SectionWithEnvOverrides("grafana-apiserver")
|
||||
|
||||
if opts.SecureValues == nil && opts.Cfg != nil && opts.Cfg.SecretsManagement.GrpcClientEnable {
|
||||
@@ -57,7 +57,7 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
||||
nil, // not needed for gRPC client mode
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create inline secure value service: %w", err)
|
||||
return nil, nil, fmt.Errorf("failed to create inline secure value service: %w", err)
|
||||
}
|
||||
opts.SecureValues = inlineSecureValueService
|
||||
}
|
||||
@@ -77,7 +77,7 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
||||
dir := strings.Replace(serverOptions.Blob.URL, "./data", opts.Cfg.DataPath, 1)
|
||||
err := os.MkdirAll(dir, 0700)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
serverOptions.Blob.URL = "file:///" + dir
|
||||
}
|
||||
@@ -94,7 +94,7 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
||||
} else {
|
||||
eDB, err := dbimpl.ProvideResourceDB(opts.DB, opts.Cfg, opts.Tracer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
isHA := isHighAvailabilityEnabled(opts.Cfg.SectionWithEnvOverrides("database"),
|
||||
@@ -108,20 +108,31 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
||||
LastImportTimeMaxAge: opts.SearchOptions.MaxIndexAge, // No need to keep last_import_times older than max index age.
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
serverOptions.Backend = backend
|
||||
serverOptions.Diagnostics = backend
|
||||
serverOptions.Lifecycle = backend
|
||||
}
|
||||
|
||||
serverOptions.Search = opts.SearchOptions
|
||||
serverOptions.IndexMetrics = opts.IndexMetrics
|
||||
search, err := resource.NewSearchServer(opts.SearchOptions, opts.Backend, opts.AccessClient, nil, opts.IndexMetrics, opts.OwnsIndexFn)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to initialize search: %w", err)
|
||||
}
|
||||
|
||||
if err := search.Init(context.Background()); err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to initialize search: %w", err)
|
||||
}
|
||||
|
||||
serverOptions.Search = search
|
||||
serverOptions.QOSQueue = opts.QOSQueue
|
||||
serverOptions.OwnsIndexFn = opts.OwnsIndexFn
|
||||
serverOptions.OverridesService = opts.OverridesService
|
||||
|
||||
return resource.NewResourceServer(serverOptions)
|
||||
rs, err := resource.NewResourceServer(serverOptions)
|
||||
if err != nil {
|
||||
_ = search.Stop(context.Background())
|
||||
}
|
||||
return rs, nil, err
|
||||
}
|
||||
|
||||
// isHighAvailabilityEnabled determines if high availability mode should
|
||||
|
||||
@@ -291,7 +291,7 @@ func (s *service) starting(ctx context.Context) error {
|
||||
serverOptions.OverridesService = overridesSvc
|
||||
}
|
||||
|
||||
server, err := NewResourceServer(serverOptions)
|
||||
server, searchServer, err := NewResourceServer(serverOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -308,8 +308,8 @@ func (s *service) starting(ctx context.Context) error {
|
||||
srv := s.handler.GetServer()
|
||||
resourcepb.RegisterResourceStoreServer(srv, server)
|
||||
resourcepb.RegisterBulkStoreServer(srv, server)
|
||||
resourcepb.RegisterResourceIndexServer(srv, server)
|
||||
resourcepb.RegisterManagedObjectIndexServer(srv, server)
|
||||
resourcepb.RegisterResourceIndexServer(srv, searchServer)
|
||||
resourcepb.RegisterManagedObjectIndexServer(srv, searchServer)
|
||||
resourcepb.RegisterBlobStoreServer(srv, server)
|
||||
resourcepb.RegisterDiagnosticsServer(srv, server)
|
||||
resourcepb.RegisterQuotasServer(srv, server)
|
||||
|
||||
Reference in New Issue
Block a user