Compare commits
8 Commits
ash/react-
...
feature/ex
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c8da64e4eb | ||
|
|
2fab497c18 | ||
|
|
b0bb71f834 | ||
|
|
a7aa55f908 | ||
|
|
0a61846b5e | ||
|
|
785cc739ee | ||
|
|
4913baaf04 | ||
|
|
c8f1efe7c7 |
@@ -74,7 +74,7 @@ func ToUnifiedStorage(c utils.CommandLine, cfg *setting.Cfg, sqlStore db.DB) err
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
grpcClient, err := newUnifiedClient(cfg, sqlStore, featureToggles)
|
grpcClient, err := newUnifiedMigratorClient(cfg, sqlStore, featureToggles)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -92,7 +92,7 @@ func ToUnifiedStorage(c utils.CommandLine, cfg *setting.Cfg, sqlStore db.DB) err
|
|||||||
return runInteractiveMigration(ctx, cfg, opts, dashboardAccess, grpcClient, start)
|
return runInteractiveMigration(ctx, cfg, opts, dashboardAccess, grpcClient, start)
|
||||||
}
|
}
|
||||||
|
|
||||||
func runNonInteractiveMigration(ctx context.Context, opts legacy.MigrateOptions, dashboardAccess legacy.MigrationDashboardAccessor, grpcClient resource.ResourceClient, start time.Time) error {
|
func runNonInteractiveMigration(ctx context.Context, opts legacy.MigrateOptions, dashboardAccess legacy.MigrationDashboardAccessor, grpcClient resource.MigratorClient, start time.Time) error {
|
||||||
migrator := migrations.ProvideUnifiedMigrator(dashboardAccess, grpcClient)
|
migrator := migrations.ProvideUnifiedMigrator(dashboardAccess, grpcClient)
|
||||||
|
|
||||||
opts.WithHistory = true // always include history in non-interactive mode
|
opts.WithHistory = true // always include history in non-interactive mode
|
||||||
@@ -109,7 +109,7 @@ func runNonInteractiveMigration(ctx context.Context, opts legacy.MigrateOptions,
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func runInteractiveMigration(ctx context.Context, cfg *setting.Cfg, opts legacy.MigrateOptions, dashboardAccess legacy.MigrationDashboardAccessor, grpcClient resource.ResourceClient, start time.Time) error {
|
func runInteractiveMigration(ctx context.Context, cfg *setting.Cfg, opts legacy.MigrateOptions, dashboardAccess legacy.MigrationDashboardAccessor, grpcClient resource.MigratorClient, start time.Time) error {
|
||||||
yes, err := promptYesNo(fmt.Sprintf("Count legacy resources for namespace: %s?", opts.Namespace))
|
yes, err := promptYesNo(fmt.Sprintf("Count legacy resources for namespace: %s?", opts.Namespace))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -225,7 +225,7 @@ func promptYesNo(prompt string) (bool, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newUnifiedClient(cfg *setting.Cfg, sqlStore db.DB, featureToggles featuremgmt.FeatureToggles) (resource.ResourceClient, error) {
|
func newUnifiedMigratorClient(cfg *setting.Cfg, sqlStore db.DB, featureToggles featuremgmt.FeatureToggles) (resource.MigratorClient, error) {
|
||||||
return unified.ProvideUnifiedStorageClient(&unified.Options{
|
return unified.ProvideUnifiedStorageClient(&unified.Options{
|
||||||
Cfg: cfg,
|
Cfg: cfg,
|
||||||
Features: featureToggles,
|
Features: featureToggles,
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ const (
|
|||||||
SearchServerRing string = "search-server-ring"
|
SearchServerRing string = "search-server-ring"
|
||||||
SearchServerDistributor string = "search-server-distributor"
|
SearchServerDistributor string = "search-server-distributor"
|
||||||
StorageServer string = "storage-server"
|
StorageServer string = "storage-server"
|
||||||
|
SearchServer string = "search-server"
|
||||||
ZanzanaServer string = "zanzana-server"
|
ZanzanaServer string = "zanzana-server"
|
||||||
InstrumentationServer string = "instrumentation-server"
|
InstrumentationServer string = "instrumentation-server"
|
||||||
FrontendServer string = "frontend-server"
|
FrontendServer string = "frontend-server"
|
||||||
@@ -21,6 +22,7 @@ var dependencyMap = map[string][]string{
|
|||||||
SearchServerRing: {InstrumentationServer, MemberlistKV},
|
SearchServerRing: {InstrumentationServer, MemberlistKV},
|
||||||
GrafanaAPIServer: {InstrumentationServer},
|
GrafanaAPIServer: {InstrumentationServer},
|
||||||
StorageServer: {InstrumentationServer, SearchServerRing},
|
StorageServer: {InstrumentationServer, SearchServerRing},
|
||||||
|
SearchServer: {InstrumentationServer, SearchServerRing},
|
||||||
ZanzanaServer: {InstrumentationServer},
|
ZanzanaServer: {InstrumentationServer},
|
||||||
SearchServerDistributor: {InstrumentationServer, MemberlistKV, SearchServerRing},
|
SearchServerDistributor: {InstrumentationServer, MemberlistKV, SearchServerRing},
|
||||||
Core: {},
|
Core: {},
|
||||||
|
|||||||
@@ -11,91 +11,59 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
_ resource.ResourceClient = (*directResourceClient)(nil)
|
_ resource.StorageClient = (*DirectStorageClient)(nil)
|
||||||
)
|
)
|
||||||
|
|
||||||
// The direct client passes requests directly to the server using the *same* context
|
// NewDirectStorageClient creates a client that passes requests directly to the server using the *same* context
|
||||||
func NewDirectResourceClient(server resource.ResourceServer) resource.ResourceClient {
|
func NewDirectStorageClient(server resource.ResourceServer) *DirectStorageClient {
|
||||||
return &directResourceClient{server}
|
return &DirectStorageClient{server}
|
||||||
}
|
}
|
||||||
|
|
||||||
type directResourceClient struct {
|
type DirectStorageClient struct {
|
||||||
server resource.ResourceServer
|
server resource.ResourceServer
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create implements ResourceClient.
|
// Create implements ResourceClient.
|
||||||
func (d *directResourceClient) Create(ctx context.Context, in *resourcepb.CreateRequest, opts ...grpc.CallOption) (*resourcepb.CreateResponse, error) {
|
func (d *DirectStorageClient) Create(ctx context.Context, in *resourcepb.CreateRequest, _ ...grpc.CallOption) (*resourcepb.CreateResponse, error) {
|
||||||
return d.server.Create(ctx, in)
|
return d.server.Create(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete implements ResourceClient.
|
// Delete implements ResourceClient.
|
||||||
func (d *directResourceClient) Delete(ctx context.Context, in *resourcepb.DeleteRequest, opts ...grpc.CallOption) (*resourcepb.DeleteResponse, error) {
|
func (d *DirectStorageClient) Delete(ctx context.Context, in *resourcepb.DeleteRequest, _ ...grpc.CallOption) (*resourcepb.DeleteResponse, error) {
|
||||||
return d.server.Delete(ctx, in)
|
return d.server.Delete(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetBlob implements ResourceClient.
|
// GetBlob implements ResourceClient.
|
||||||
func (d *directResourceClient) GetBlob(ctx context.Context, in *resourcepb.GetBlobRequest, opts ...grpc.CallOption) (*resourcepb.GetBlobResponse, error) {
|
func (d *DirectStorageClient) GetBlob(ctx context.Context, in *resourcepb.GetBlobRequest, _ ...grpc.CallOption) (*resourcepb.GetBlobResponse, error) {
|
||||||
return d.server.GetBlob(ctx, in)
|
return d.server.GetBlob(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetStats implements ResourceClient.
|
|
||||||
func (d *directResourceClient) GetStats(ctx context.Context, in *resourcepb.ResourceStatsRequest, opts ...grpc.CallOption) (*resourcepb.ResourceStatsResponse, error) {
|
|
||||||
return d.server.GetStats(ctx, in)
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsHealthy implements ResourceClient.
|
// IsHealthy implements ResourceClient.
|
||||||
func (d *directResourceClient) IsHealthy(ctx context.Context, in *resourcepb.HealthCheckRequest, opts ...grpc.CallOption) (*resourcepb.HealthCheckResponse, error) {
|
func (d *DirectStorageClient) IsHealthy(ctx context.Context, in *resourcepb.HealthCheckRequest, _ ...grpc.CallOption) (*resourcepb.HealthCheckResponse, error) {
|
||||||
return d.server.IsHealthy(ctx, in)
|
return d.server.IsHealthy(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
// List implements ResourceClient.
|
// List implements ResourceClient.
|
||||||
func (d *directResourceClient) List(ctx context.Context, in *resourcepb.ListRequest, opts ...grpc.CallOption) (*resourcepb.ListResponse, error) {
|
func (d *DirectStorageClient) List(ctx context.Context, in *resourcepb.ListRequest, _ ...grpc.CallOption) (*resourcepb.ListResponse, error) {
|
||||||
return d.server.List(ctx, in)
|
return d.server.List(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *directResourceClient) ListManagedObjects(ctx context.Context, in *resourcepb.ListManagedObjectsRequest, opts ...grpc.CallOption) (*resourcepb.ListManagedObjectsResponse, error) {
|
|
||||||
return d.server.ListManagedObjects(ctx, in)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *directResourceClient) CountManagedObjects(ctx context.Context, in *resourcepb.CountManagedObjectsRequest, opts ...grpc.CallOption) (*resourcepb.CountManagedObjectsResponse, error) {
|
|
||||||
return d.server.CountManagedObjects(ctx, in)
|
|
||||||
}
|
|
||||||
|
|
||||||
// PutBlob implements ResourceClient.
|
// PutBlob implements ResourceClient.
|
||||||
func (d *directResourceClient) PutBlob(ctx context.Context, in *resourcepb.PutBlobRequest, opts ...grpc.CallOption) (*resourcepb.PutBlobResponse, error) {
|
func (d *DirectStorageClient) PutBlob(ctx context.Context, in *resourcepb.PutBlobRequest, _ ...grpc.CallOption) (*resourcepb.PutBlobResponse, error) {
|
||||||
return d.server.PutBlob(ctx, in)
|
return d.server.PutBlob(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read implements ResourceClient.
|
// Read implements ResourceClient.
|
||||||
func (d *directResourceClient) Read(ctx context.Context, in *resourcepb.ReadRequest, opts ...grpc.CallOption) (*resourcepb.ReadResponse, error) {
|
func (d *DirectStorageClient) Read(ctx context.Context, in *resourcepb.ReadRequest, _ ...grpc.CallOption) (*resourcepb.ReadResponse, error) {
|
||||||
return d.server.Read(ctx, in)
|
return d.server.Read(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Search implements ResourceClient.
|
|
||||||
func (d *directResourceClient) Search(ctx context.Context, in *resourcepb.ResourceSearchRequest, opts ...grpc.CallOption) (*resourcepb.ResourceSearchResponse, error) {
|
|
||||||
return d.server.Search(ctx, in)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update implements ResourceClient.
|
// Update implements ResourceClient.
|
||||||
func (d *directResourceClient) Update(ctx context.Context, in *resourcepb.UpdateRequest, opts ...grpc.CallOption) (*resourcepb.UpdateResponse, error) {
|
func (d *DirectStorageClient) Update(ctx context.Context, in *resourcepb.UpdateRequest, _ ...grpc.CallOption) (*resourcepb.UpdateResponse, error) {
|
||||||
return d.server.Update(ctx, in)
|
return d.server.Update(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Watch implements ResourceClient.
|
// Watch implements ResourceClient.
|
||||||
func (d *directResourceClient) Watch(ctx context.Context, in *resourcepb.WatchRequest, opts ...grpc.CallOption) (resourcepb.ResourceStore_WatchClient, error) {
|
func (d *DirectStorageClient) Watch(_ context.Context, _ *resourcepb.WatchRequest, _ ...grpc.CallOption) (resourcepb.ResourceStore_WatchClient, error) {
|
||||||
return nil, fmt.Errorf("watch not supported with direct resource client")
|
return nil, fmt.Errorf("watch not supported with direct resource client")
|
||||||
}
|
}
|
||||||
|
|
||||||
// BulkProcess implements resource.ResourceClient.
|
|
||||||
func (d *directResourceClient) BulkProcess(ctx context.Context, opts ...grpc.CallOption) (resourcepb.BulkStore_BulkProcessClient, error) {
|
|
||||||
return nil, fmt.Errorf("BulkProcess not supported with direct resource client")
|
|
||||||
}
|
|
||||||
|
|
||||||
// RebuildIndexes implements resource.ResourceClient.
|
|
||||||
func (b *directResourceClient) RebuildIndexes(ctx context.Context, req *resourcepb.RebuildIndexesRequest, opts ...grpc.CallOption) (*resourcepb.RebuildIndexesResponse, error) {
|
|
||||||
return nil, fmt.Errorf("not implemented")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *directResourceClient) GetQuotaUsage(ctx context.Context, req *resourcepb.QuotaUsageRequest, opts ...grpc.CallOption) (*resourcepb.QuotaUsageResponse, error) {
|
|
||||||
return nil, fmt.Errorf("not implemented")
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import (
|
|||||||
"k8s.io/apiserver/pkg/registry/rest"
|
"k8s.io/apiserver/pkg/registry/rest"
|
||||||
|
|
||||||
"github.com/grafana/authlib/types"
|
"github.com/grafana/authlib/types"
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||||||
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
|
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
|
||||||
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
|
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
|
||||||
@@ -40,7 +41,7 @@ func (s *DashboardStorage) NewStore(dash utils.ResourceInfo, scheme *runtime.Sch
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
client := legacy.NewDirectResourceClient(server) // same context
|
client := legacy.NewDirectStorageClient(server) // same context
|
||||||
optsGetter := apistore.NewRESTOptionsGetterForClient(client, nil,
|
optsGetter := apistore.NewRESTOptionsGetterForClient(client, nil,
|
||||||
defaultOpts.StorageConfig.Config, nil,
|
defaultOpts.StorageConfig.Config, nil,
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -37,7 +37,6 @@ import (
|
|||||||
"github.com/grafana/grafana/pkg/services/folder"
|
"github.com/grafana/grafana/pkg/services/folder"
|
||||||
"github.com/grafana/grafana/pkg/setting"
|
"github.com/grafana/grafana/pkg/setting"
|
||||||
"github.com/grafana/grafana/pkg/storage/unified/apistore"
|
"github.com/grafana/grafana/pkg/storage/unified/apistore"
|
||||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
|
||||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -74,7 +73,7 @@ func RegisterAPIService(cfg *setting.Cfg,
|
|||||||
acService accesscontrol.Service,
|
acService accesscontrol.Service,
|
||||||
accessClient authlib.AccessClient,
|
accessClient authlib.AccessClient,
|
||||||
registerer prometheus.Registerer,
|
registerer prometheus.Registerer,
|
||||||
unified resource.ResourceClient,
|
unified resourcepb.ResourceIndexClient,
|
||||||
zanzanaClient zanzana.Client,
|
zanzanaClient zanzana.Client,
|
||||||
) *FolderAPIBuilder {
|
) *FolderAPIBuilder {
|
||||||
builder := &FolderAPIBuilder{
|
builder := &FolderAPIBuilder{
|
||||||
@@ -93,7 +92,7 @@ func RegisterAPIService(cfg *setting.Cfg,
|
|||||||
return builder
|
return builder
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewAPIService(ac authlib.AccessClient, searcher resource.ResourceClient, features featuremgmt.FeatureToggles, zanzanaClient zanzana.Client, resourcePermissionsSvc *dynamic.NamespaceableResourceInterface) *FolderAPIBuilder {
|
func NewAPIService(ac authlib.AccessClient, searcher resourcepb.ResourceIndexClient, features featuremgmt.FeatureToggles, zanzanaClient zanzana.Client, resourcePermissionsSvc *dynamic.NamespaceableResourceInterface) *FolderAPIBuilder {
|
||||||
return &FolderAPIBuilder{
|
return &FolderAPIBuilder{
|
||||||
features: features,
|
features: features,
|
||||||
accessClient: ac,
|
accessClient: ac,
|
||||||
|
|||||||
@@ -742,7 +742,7 @@ func NewLocalStore(resourceInfo utils.ResourceInfo, scheme *runtime.Scheme, defa
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
client := resource.NewLocalResourceClient(server)
|
client := resource.NewLocalResourceClient(server, nil)
|
||||||
optsGetter := apistore.NewRESTOptionsGetterForClient(client, nil, defaultOpts.StorageConfig.Config, nil)
|
optsGetter := apistore.NewRESTOptionsGetterForClient(client, nil, defaultOpts.StorageConfig.Config, nil)
|
||||||
|
|
||||||
store, err := grafanaregistry.NewRegistryStore(scheme, resourceInfo, optsGetter)
|
store, err := grafanaregistry.NewRegistryStore(scheme, resourceInfo, optsGetter)
|
||||||
|
|||||||
@@ -205,6 +205,14 @@ func (s *ModuleServer) Run() error {
|
|||||||
return sql.ProvideUnifiedStorageGrpcService(s.cfg, s.features, nil, s.log, s.registerer, docBuilders, s.storageMetrics, s.indexMetrics, s.searchServerRing, s.MemberlistKVConfig, s.httpServerRouter, s.storageBackend)
|
return sql.ProvideUnifiedStorageGrpcService(s.cfg, s.features, nil, s.log, s.registerer, docBuilders, s.storageMetrics, s.indexMetrics, s.searchServerRing, s.MemberlistKVConfig, s.httpServerRouter, s.storageBackend)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
m.RegisterModule(modules.SearchServer, func() (services.Service, error) {
|
||||||
|
docBuilders, err := InitializeDocumentBuilders(s.cfg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return sql.ProvideUnifiedSearchGrpcService(s.cfg, s.features, nil, s.log, s.registerer, docBuilders, s.indexMetrics, s.searchServerRing, s.MemberlistKVConfig, s.storageBackend)
|
||||||
|
})
|
||||||
|
|
||||||
m.RegisterModule(modules.ZanzanaServer, func() (services.Service, error) {
|
m.RegisterModule(modules.ZanzanaServer, func() (services.Service, error) {
|
||||||
return authz.ProvideZanzanaService(s.cfg, s.features, s.registerer)
|
return authz.ProvideZanzanaService(s.cfg, s.features, s.registerer)
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -372,7 +372,7 @@ func initModuleServerForTest(
|
|||||||
return testModuleServer{server: ms, grpcAddress: cfg.GRPCServer.Address, httpPort: cfg.HTTPPort, healthClient: healthClient, id: cfg.InstanceID}
|
return testModuleServer{server: ms, grpcAddress: cfg.GRPCServer.Address, httpPort: cfg.HTTPPort, healthClient: healthClient, id: cfg.InstanceID}
|
||||||
}
|
}
|
||||||
|
|
||||||
func createBaselineServer(t *testing.T, dbType, dbConnStr string, testNamespaces []string) resource.ResourceServer {
|
func createBaselineServer(t *testing.T, dbType, dbConnStr string, testNamespaces []string) resource.SearchServer {
|
||||||
cfg := setting.NewCfg()
|
cfg := setting.NewCfg()
|
||||||
section, err := cfg.Raw.NewSection("database")
|
section, err := cfg.Raw.NewSection("database")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@@ -391,17 +391,20 @@ func createBaselineServer(t *testing.T, dbType, dbConnStr string, testNamespaces
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
searchOpts, err := search.NewSearchOptions(features, cfg, docBuilders, nil, nil)
|
searchOpts, err := search.NewSearchOptions(features, cfg, docBuilders, nil, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
server, err := sql.NewResourceServer(sql.ServerOptions{
|
searchServer, err := sql.NewSearchServer(sql.SearchServerOptions{
|
||||||
DB: nil,
|
DB: nil,
|
||||||
Cfg: cfg,
|
Cfg: cfg,
|
||||||
Tracer: tracer,
|
Tracer: tracer,
|
||||||
Reg: nil,
|
Reg: nil,
|
||||||
AccessClient: nil,
|
AccessClient: nil,
|
||||||
SearchOptions: searchOpts,
|
SearchOptions: searchOpts,
|
||||||
StorageMetrics: nil,
|
IndexMetrics: nil,
|
||||||
IndexMetrics: nil,
|
})
|
||||||
Features: features,
|
require.NoError(t, err)
|
||||||
QOSQueue: nil,
|
storageServer, err := sql.NewStorageServer(sql.StorageServerOptions{
|
||||||
|
Cfg: cfg,
|
||||||
|
Tracer: tracer,
|
||||||
|
Features: features,
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
@@ -417,12 +420,12 @@ func createBaselineServer(t *testing.T, dbType, dbConnStr string, testNamespaces
|
|||||||
|
|
||||||
for _, ns := range testNamespaces {
|
for _, ns := range testNamespaces {
|
||||||
for range rand.Intn(maxPlaylistPerNamespace) + 1 {
|
for range rand.Intn(maxPlaylistPerNamespace) + 1 {
|
||||||
_, err = server.Create(ctx, generatePlaylistPayload(ns))
|
_, err = storageServer.Create(ctx, generatePlaylistPayload(ns))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return server
|
return searchServer
|
||||||
}
|
}
|
||||||
|
|
||||||
var counter int
|
var counter int
|
||||||
|
|||||||
23
pkg/server/wire_gen.go
generated
23
pkg/server/wire_gen.go
generated
@@ -513,6 +513,11 @@ func Initialize(ctx context.Context, cfg *setting.Cfg, opts Options, apiOpts api
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
storageMetrics := resource.ProvideStorageMetrics(registerer)
|
||||||
|
storageBackend, err := sql.ProvideStorageBackend(cfg, sqlStore, tracer, registerer, storageMetrics)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
options := &unified.Options{
|
options := &unified.Options{
|
||||||
Cfg: cfg,
|
Cfg: cfg,
|
||||||
Features: featureToggles,
|
Features: featureToggles,
|
||||||
@@ -522,8 +527,8 @@ func Initialize(ctx context.Context, cfg *setting.Cfg, opts Options, apiOpts api
|
|||||||
Authzc: accessClient,
|
Authzc: accessClient,
|
||||||
Docs: documentBuilderSupplier,
|
Docs: documentBuilderSupplier,
|
||||||
SecureValues: inlineSecureValueSupport,
|
SecureValues: inlineSecureValueSupport,
|
||||||
|
Backend: storageBackend,
|
||||||
}
|
}
|
||||||
storageMetrics := resource.ProvideStorageMetrics(registerer)
|
|
||||||
bleveIndexMetrics := resource.ProvideIndexMetrics(registerer)
|
bleveIndexMetrics := resource.ProvideIndexMetrics(registerer)
|
||||||
resourceClient, err := unified.ProvideUnifiedStorageClient(options, storageMetrics, bleveIndexMetrics)
|
resourceClient, err := unified.ProvideUnifiedStorageClient(options, storageMetrics, bleveIndexMetrics)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -1173,6 +1178,11 @@ func InitializeForTest(ctx context.Context, t sqlutil.ITestDB, testingT interfac
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
storageMetrics := resource.ProvideStorageMetrics(registerer)
|
||||||
|
storageBackend, err := sql.ProvideStorageBackend(cfg, sqlStore, tracer, registerer, storageMetrics)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
options := &unified.Options{
|
options := &unified.Options{
|
||||||
Cfg: cfg,
|
Cfg: cfg,
|
||||||
Features: featureToggles,
|
Features: featureToggles,
|
||||||
@@ -1182,8 +1192,8 @@ func InitializeForTest(ctx context.Context, t sqlutil.ITestDB, testingT interfac
|
|||||||
Authzc: accessClient,
|
Authzc: accessClient,
|
||||||
Docs: documentBuilderSupplier,
|
Docs: documentBuilderSupplier,
|
||||||
SecureValues: inlineSecureValueSupport,
|
SecureValues: inlineSecureValueSupport,
|
||||||
|
Backend: storageBackend,
|
||||||
}
|
}
|
||||||
storageMetrics := resource.ProvideStorageMetrics(registerer)
|
|
||||||
bleveIndexMetrics := resource.ProvideIndexMetrics(registerer)
|
bleveIndexMetrics := resource.ProvideIndexMetrics(registerer)
|
||||||
resourceClient, err := unified.ProvideUnifiedStorageClient(options, storageMetrics, bleveIndexMetrics)
|
resourceClient, err := unified.ProvideUnifiedStorageClient(options, storageMetrics, bleveIndexMetrics)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -1748,7 +1758,14 @@ func InitializeModuleServer(cfg *setting.Cfg, opts Options, apiOpts api.ServerOp
|
|||||||
hooksService := hooks.ProvideService()
|
hooksService := hooks.ProvideService()
|
||||||
ossLicensingService := licensing.ProvideService(cfg, hooksService)
|
ossLicensingService := licensing.ProvideService(cfg, hooksService)
|
||||||
moduleRegisterer := ProvideNoopModuleRegisterer()
|
moduleRegisterer := ProvideNoopModuleRegisterer()
|
||||||
storageBackend, err := sql.ProvideStorageBackend(cfg)
|
ossMigrations := migrations.ProvideOSSMigrations(featureToggles)
|
||||||
|
inProcBus := bus.ProvideBus(tracingService)
|
||||||
|
sqlStore, err := sqlstore.ProvideService(cfg, featureToggles, ossMigrations, inProcBus, tracingService)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
tracer := otelTracer()
|
||||||
|
storageBackend, err := sql.ProvideStorageBackend(cfg, sqlStore, tracer, registerer, storageMetrics)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,6 +6,9 @@ package server
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/google/wire"
|
"github.com/google/wire"
|
||||||
|
"github.com/grafana/grafana/pkg/bus"
|
||||||
|
"github.com/grafana/grafana/pkg/infra/db"
|
||||||
|
"github.com/grafana/grafana/pkg/services/sqlstore"
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/configprovider"
|
"github.com/grafana/grafana/pkg/configprovider"
|
||||||
"github.com/grafana/grafana/pkg/infra/metrics"
|
"github.com/grafana/grafana/pkg/infra/metrics"
|
||||||
@@ -65,6 +68,7 @@ import (
|
|||||||
"github.com/grafana/grafana/pkg/storage/legacysql"
|
"github.com/grafana/grafana/pkg/storage/legacysql"
|
||||||
"github.com/grafana/grafana/pkg/storage/unified"
|
"github.com/grafana/grafana/pkg/storage/unified"
|
||||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||||
|
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||||
search2 "github.com/grafana/grafana/pkg/storage/unified/search"
|
search2 "github.com/grafana/grafana/pkg/storage/unified/search"
|
||||||
"github.com/grafana/grafana/pkg/storage/unified/search/builders"
|
"github.com/grafana/grafana/pkg/storage/unified/search/builders"
|
||||||
"github.com/grafana/grafana/pkg/storage/unified/sql"
|
"github.com/grafana/grafana/pkg/storage/unified/sql"
|
||||||
@@ -145,8 +149,10 @@ var wireExtsBasicSet = wire.NewSet(
|
|||||||
sandbox.ProvideService,
|
sandbox.ProvideService,
|
||||||
wire.Bind(new(sandbox.Sandbox), new(*sandbox.Service)),
|
wire.Bind(new(sandbox.Sandbox), new(*sandbox.Service)),
|
||||||
wire.Struct(new(unified.Options), "*"),
|
wire.Struct(new(unified.Options), "*"),
|
||||||
unified.ProvideUnifiedStorageClient,
|
|
||||||
sql.ProvideStorageBackend,
|
sql.ProvideStorageBackend,
|
||||||
|
unified.ProvideUnifiedStorageClient,
|
||||||
|
wire.Bind(new(resourcepb.ResourceIndexClient), new(resource.ResourceClient)),
|
||||||
|
wire.Bind(new(resource.MigratorClient), new(resource.ResourceClient)),
|
||||||
builder.ProvideDefaultBuildHandlerChainFuncFromBuilders,
|
builder.ProvideDefaultBuildHandlerChainFuncFromBuilders,
|
||||||
aggregatorrunner.ProvideNoopAggregatorConfigurator,
|
aggregatorrunner.ProvideNoopAggregatorConfigurator,
|
||||||
apisregistry.WireSetExts,
|
apisregistry.WireSetExts,
|
||||||
@@ -195,6 +201,16 @@ var wireExtsModuleServerSet = wire.NewSet(
|
|||||||
tracing.ProvideTracingConfig,
|
tracing.ProvideTracingConfig,
|
||||||
tracing.ProvideService,
|
tracing.ProvideService,
|
||||||
wire.Bind(new(tracing.Tracer), new(*tracing.TracingService)),
|
wire.Bind(new(tracing.Tracer), new(*tracing.TracingService)),
|
||||||
|
otelTracer,
|
||||||
|
// Bus
|
||||||
|
bus.ProvideBus,
|
||||||
|
wire.Bind(new(bus.Bus), new(*bus.InProcBus)),
|
||||||
|
// Database migrations
|
||||||
|
migrations.ProvideOSSMigrations,
|
||||||
|
wire.Bind(new(registry.DatabaseMigrator), new(*migrations.OSSMigrations)),
|
||||||
|
// Database
|
||||||
|
sqlstore.ProvideService,
|
||||||
|
wire.Bind(new(db.DB), new(*sqlstore.SQLStore)),
|
||||||
// Unified storage
|
// Unified storage
|
||||||
resource.ProvideStorageMetrics,
|
resource.ProvideStorageMetrics,
|
||||||
resource.ProvideIndexMetrics,
|
resource.ProvideIndexMetrics,
|
||||||
|
|||||||
@@ -26,7 +26,7 @@ var _ generic.RESTOptionsGetter = (*RESTOptionsGetter)(nil)
|
|||||||
type StorageOptionsRegister func(gr schema.GroupResource, opts StorageOptions)
|
type StorageOptionsRegister func(gr schema.GroupResource, opts StorageOptions)
|
||||||
|
|
||||||
type RESTOptionsGetter struct {
|
type RESTOptionsGetter struct {
|
||||||
client resource.ResourceClient
|
client resource.StorageClient
|
||||||
secrets secret.InlineSecureValueSupport
|
secrets secret.InlineSecureValueSupport
|
||||||
original storagebackend.Config
|
original storagebackend.Config
|
||||||
configProvider RestConfigProvider
|
configProvider RestConfigProvider
|
||||||
@@ -36,7 +36,7 @@ type RESTOptionsGetter struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewRESTOptionsGetterForClient(
|
func NewRESTOptionsGetterForClient(
|
||||||
client resource.ResourceClient,
|
client resource.StorageClient,
|
||||||
secrets secret.InlineSecureValueSupport,
|
secrets secret.InlineSecureValueSupport,
|
||||||
original storagebackend.Config,
|
original storagebackend.Config,
|
||||||
configProvider RestConfigProvider,
|
configProvider RestConfigProvider,
|
||||||
@@ -79,7 +79,7 @@ func NewRESTOptionsGetterMemory(originalStorageConfig storagebackend.Config, sec
|
|||||||
}
|
}
|
||||||
|
|
||||||
return NewRESTOptionsGetterForClient(
|
return NewRESTOptionsGetterForClient(
|
||||||
resource.NewLocalResourceClient(server),
|
resource.NewLocalResourceClient(server, nil),
|
||||||
secrets,
|
secrets,
|
||||||
originalStorageConfig,
|
originalStorageConfig,
|
||||||
nil,
|
nil,
|
||||||
@@ -118,7 +118,7 @@ func NewRESTOptionsGetterForFileXX(path string,
|
|||||||
}
|
}
|
||||||
|
|
||||||
return NewRESTOptionsGetterForClient(
|
return NewRESTOptionsGetterForClient(
|
||||||
resource.NewLocalResourceClient(server),
|
resource.NewLocalResourceClient(server, nil),
|
||||||
nil, // secrets
|
nil, // secrets
|
||||||
originalStorageConfig,
|
originalStorageConfig,
|
||||||
nil,
|
nil,
|
||||||
|
|||||||
@@ -88,7 +88,7 @@ type Storage struct {
|
|||||||
trigger storage.IndexerFuncs
|
trigger storage.IndexerFuncs
|
||||||
indexers *cache.Indexers
|
indexers *cache.Indexers
|
||||||
|
|
||||||
store resource.ResourceClient
|
store resource.StorageClient
|
||||||
getKey func(string) (*resourcepb.ResourceKey, error)
|
getKey func(string) (*resourcepb.ResourceKey, error)
|
||||||
snowflake *snowflake.Node // used to enforce internal ids
|
snowflake *snowflake.Node // used to enforce internal ids
|
||||||
configProvider RestConfigProvider // used for provisioning
|
configProvider RestConfigProvider // used for provisioning
|
||||||
@@ -112,7 +112,7 @@ type RestConfigProvider interface {
|
|||||||
// NewStorage instantiates a new Storage.
|
// NewStorage instantiates a new Storage.
|
||||||
func NewStorage(
|
func NewStorage(
|
||||||
config *storagebackend.ConfigForResource,
|
config *storagebackend.ConfigForResource,
|
||||||
store resource.ResourceClient,
|
store resource.StorageClient,
|
||||||
keyFunc func(obj runtime.Object) (string, error),
|
keyFunc func(obj runtime.Object) (string, error),
|
||||||
keyParser func(key string) (*resourcepb.ResourceKey, error),
|
keyParser func(key string) (*resourcepb.ResourceKey, error),
|
||||||
newFunc func() runtime.Object,
|
newFunc func() runtime.Object,
|
||||||
|
|||||||
@@ -156,7 +156,7 @@ func testSetup(t testing.TB, opts ...setupOption) (context.Context, storage.Inte
|
|||||||
default:
|
default:
|
||||||
t.Fatalf("unsupported storage type: %s", setupOpts.storageType)
|
t.Fatalf("unsupported storage type: %s", setupOpts.storageType)
|
||||||
}
|
}
|
||||||
client := resource.NewLocalResourceClient(server)
|
client := resource.NewLocalResourceClient(server, nil)
|
||||||
|
|
||||||
config := storagebackend.NewDefaultConfig(setupOpts.prefix, setupOpts.codec)
|
config := storagebackend.NewDefaultConfig(setupOpts.prefix, setupOpts.codec)
|
||||||
store, destroyFunc, err := apistore.NewStorage(
|
store, destroyFunc, err := apistore.NewStorage(
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ import (
|
|||||||
"github.com/grafana/dskit/grpcclient"
|
"github.com/grafana/dskit/grpcclient"
|
||||||
"github.com/grafana/dskit/middleware"
|
"github.com/grafana/dskit/middleware"
|
||||||
"github.com/grafana/dskit/services"
|
"github.com/grafana/dskit/services"
|
||||||
|
|
||||||
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
|
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
|
||||||
infraDB "github.com/grafana/grafana/pkg/infra/db"
|
infraDB "github.com/grafana/grafana/pkg/infra/db"
|
||||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||||
@@ -45,6 +46,7 @@ type Options struct {
|
|||||||
Authzc types.AccessClient
|
Authzc types.AccessClient
|
||||||
Docs resource.DocumentBuilderSupplier
|
Docs resource.DocumentBuilderSupplier
|
||||||
SecureValues secrets.InlineSecureValueSupport
|
SecureValues secrets.InlineSecureValueSupport
|
||||||
|
Backend resource.StorageBackend // Shared backend to avoid duplicate metrics registration
|
||||||
}
|
}
|
||||||
|
|
||||||
type clientMetrics struct {
|
type clientMetrics struct {
|
||||||
@@ -66,7 +68,7 @@ func ProvideUnifiedStorageClient(opts *Options,
|
|||||||
BlobStoreURL: apiserverCfg.Key("blob_url").MustString(""),
|
BlobStoreURL: apiserverCfg.Key("blob_url").MustString(""),
|
||||||
BlobThresholdBytes: apiserverCfg.Key("blob_threshold_bytes").MustInt(options.BlobThresholdDefault),
|
BlobThresholdBytes: apiserverCfg.Key("blob_threshold_bytes").MustInt(options.BlobThresholdDefault),
|
||||||
GrpcClientKeepaliveTime: apiserverCfg.Key("grpc_client_keepalive_time").MustDuration(0),
|
GrpcClientKeepaliveTime: apiserverCfg.Key("grpc_client_keepalive_time").MustDuration(0),
|
||||||
}, opts.Cfg, opts.Features, opts.DB, opts.Tracer, opts.Reg, opts.Authzc, opts.Docs, storageMetrics, indexMetrics, opts.SecureValues)
|
}, opts.Cfg, opts.Features, opts.DB, opts.Tracer, opts.Reg, opts.Authzc, opts.Docs, storageMetrics, indexMetrics, opts.SecureValues, opts.Backend)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
// Decide whether to disable SQL fallback stats per resource in Mode 5.
|
// Decide whether to disable SQL fallback stats per resource in Mode 5.
|
||||||
// Otherwise we would still try to query the legacy SQL database in Mode 5.
|
// Otherwise we would still try to query the legacy SQL database in Mode 5.
|
||||||
@@ -102,6 +104,7 @@ func newClient(opts options.StorageOptions,
|
|||||||
storageMetrics *resource.StorageMetrics,
|
storageMetrics *resource.StorageMetrics,
|
||||||
indexMetrics *resource.BleveIndexMetrics,
|
indexMetrics *resource.BleveIndexMetrics,
|
||||||
secure secrets.InlineSecureValueSupport,
|
secure secrets.InlineSecureValueSupport,
|
||||||
|
backend resource.StorageBackend,
|
||||||
) (resource.ResourceClient, error) {
|
) (resource.ResourceClient, error) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
@@ -135,7 +138,7 @@ func newClient(opts options.StorageOptions,
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return resource.NewLocalResourceClient(server), nil
|
return resource.NewLocalResourceClient(server, nil), nil
|
||||||
|
|
||||||
case options.StorageTypeUnifiedGrpc:
|
case options.StorageTypeUnifiedGrpc:
|
||||||
if opts.Address == "" {
|
if opts.Address == "" {
|
||||||
@@ -168,24 +171,14 @@ func newClient(opts options.StorageOptions,
|
|||||||
return resource.NewResourceClient(conn, indexConn, cfg, features, tracer)
|
return resource.NewResourceClient(conn, indexConn, cfg, features, tracer)
|
||||||
|
|
||||||
default:
|
default:
|
||||||
|
// Create search options for the search server
|
||||||
searchOptions, err := search.NewSearchOptions(features, cfg, docs, indexMetrics, nil)
|
searchOptions, err := search.NewSearchOptions(features, cfg, docs, indexMetrics, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
serverOptions := sql.ServerOptions{
|
// Setup QOS queue if enabled
|
||||||
DB: db,
|
var qosQueue sql.QOSEnqueueDequeuer
|
||||||
Cfg: cfg,
|
|
||||||
Tracer: tracer,
|
|
||||||
Reg: reg,
|
|
||||||
AccessClient: authzc,
|
|
||||||
SearchOptions: searchOptions,
|
|
||||||
StorageMetrics: storageMetrics,
|
|
||||||
IndexMetrics: indexMetrics,
|
|
||||||
Features: features,
|
|
||||||
SecureValues: secure,
|
|
||||||
}
|
|
||||||
|
|
||||||
if cfg.QOSEnabled {
|
if cfg.QOSEnabled {
|
||||||
qosReg := prometheus.WrapRegistererWithPrefix("resource_server_qos_", reg)
|
qosReg := prometheus.WrapRegistererWithPrefix("resource_server_qos_", reg)
|
||||||
queue := scheduler.NewQueue(&scheduler.QueueOptions{
|
queue := scheduler.NewQueue(&scheduler.QueueOptions{
|
||||||
@@ -196,7 +189,7 @@ func newClient(opts options.StorageOptions,
|
|||||||
if err := services.StartAndAwaitRunning(ctx, queue); err != nil {
|
if err := services.StartAndAwaitRunning(ctx, queue); err != nil {
|
||||||
return nil, fmt.Errorf("failed to start queue: %w", err)
|
return nil, fmt.Errorf("failed to start queue: %w", err)
|
||||||
}
|
}
|
||||||
scheduler, err := scheduler.NewScheduler(queue, &scheduler.Config{
|
sched, err := scheduler.NewScheduler(queue, &scheduler.Config{
|
||||||
NumWorkers: cfg.QOSNumberWorker,
|
NumWorkers: cfg.QOSNumberWorker,
|
||||||
Logger: cfg.Logger,
|
Logger: cfg.Logger,
|
||||||
})
|
})
|
||||||
@@ -204,31 +197,59 @@ func newClient(opts options.StorageOptions,
|
|||||||
return nil, fmt.Errorf("failed to create scheduler: %w", err)
|
return nil, fmt.Errorf("failed to create scheduler: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = services.StartAndAwaitRunning(ctx, scheduler)
|
err = services.StartAndAwaitRunning(ctx, sched)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to start scheduler: %w", err)
|
return nil, fmt.Errorf("failed to start scheduler: %w", err)
|
||||||
}
|
}
|
||||||
serverOptions.QOSQueue = queue
|
qosQueue = queue
|
||||||
}
|
}
|
||||||
|
|
||||||
// only enable if an overrides file path is provided
|
// Setup overrides service if enabled
|
||||||
|
var overridesSvc *resource.OverridesService
|
||||||
if cfg.OverridesFilePath != "" {
|
if cfg.OverridesFilePath != "" {
|
||||||
overridesSvc, err := resource.NewOverridesService(ctx, cfg.Logger, reg, tracer, resource.ReloadOptions{
|
overridesSvc, err = resource.NewOverridesService(ctx, cfg.Logger, reg, tracer, resource.ReloadOptions{
|
||||||
FilePath: cfg.OverridesFilePath,
|
FilePath: cfg.OverridesFilePath,
|
||||||
ReloadPeriod: cfg.OverridesReloadInterval,
|
ReloadPeriod: cfg.OverridesReloadInterval,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
serverOptions.OverridesService = overridesSvc
|
|
||||||
}
|
}
|
||||||
|
|
||||||
server, err := sql.NewResourceServer(serverOptions)
|
// Create the search server with shared backend
|
||||||
|
searchServer, err := sql.NewSearchServer(sql.SearchServerOptions{
|
||||||
|
Backend: backend, // Use shared backend to avoid duplicate metrics registration
|
||||||
|
DB: db,
|
||||||
|
Cfg: cfg,
|
||||||
|
Tracer: tracer,
|
||||||
|
Reg: reg,
|
||||||
|
AccessClient: authzc,
|
||||||
|
SearchOptions: searchOptions,
|
||||||
|
IndexMetrics: indexMetrics,
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return resource.NewLocalResourceClient(server), nil
|
|
||||||
|
// Create the storage server with shared backend
|
||||||
|
storageServer, err := sql.NewStorageServer(sql.StorageServerOptions{
|
||||||
|
Backend: backend, // Use shared backend to avoid duplicate metrics registration
|
||||||
|
DB: db,
|
||||||
|
Cfg: cfg,
|
||||||
|
Tracer: tracer,
|
||||||
|
Reg: reg,
|
||||||
|
AccessClient: authzc,
|
||||||
|
StorageMetrics: storageMetrics,
|
||||||
|
Features: features,
|
||||||
|
QOSQueue: qosQueue,
|
||||||
|
SecureValues: secure,
|
||||||
|
OverridesService: overridesSvc,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return resource.NewLocalResourceClient(storageServer, searchServer), nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -48,7 +48,7 @@ func buildCollectionSettings(opts legacy.MigrateOptions) resource.BulkSettings {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type resourceClientStreamProvider struct {
|
type resourceClientStreamProvider struct {
|
||||||
client resource.ResourceClient
|
client resource.MigratorClient
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *resourceClientStreamProvider) createStream(ctx context.Context, opts legacy.MigrateOptions) (resourcepb.BulkStore_BulkProcessClient, error) {
|
func (r *resourceClientStreamProvider) createStream(ctx context.Context, opts legacy.MigrateOptions) (resourcepb.BulkStore_BulkProcessClient, error) {
|
||||||
@@ -71,7 +71,7 @@ func (b *bulkStoreClientStreamProvider) createStream(ctx context.Context, opts l
|
|||||||
// This can migrate Folders, Dashboards and LibraryPanels
|
// This can migrate Folders, Dashboards and LibraryPanels
|
||||||
func ProvideUnifiedMigrator(
|
func ProvideUnifiedMigrator(
|
||||||
dashboardAccess legacy.MigrationDashboardAccessor,
|
dashboardAccess legacy.MigrationDashboardAccessor,
|
||||||
client resource.ResourceClient,
|
client resource.MigratorClient,
|
||||||
) UnifiedMigrator {
|
) UnifiedMigrator {
|
||||||
return newUnifiedMigrator(
|
return newUnifiedMigrator(
|
||||||
dashboardAccess,
|
dashboardAccess,
|
||||||
|
|||||||
@@ -31,13 +31,31 @@ import (
|
|||||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||||
)
|
)
|
||||||
|
|
||||||
//go:generate mockery --name ResourceClient --structname MockResourceClient --inpackage --filename client_mock.go --with-expecter
|
// SearchClient is used to interact with unified search
|
||||||
type ResourceClient interface {
|
type SearchClient interface {
|
||||||
resourcepb.ResourceStoreClient
|
|
||||||
resourcepb.ResourceIndexClient
|
resourcepb.ResourceIndexClient
|
||||||
resourcepb.ManagedObjectIndexClient
|
resourcepb.ManagedObjectIndexClient
|
||||||
resourcepb.BulkStoreClient
|
}
|
||||||
|
|
||||||
|
// StorageClient is used to interact with unified storage
|
||||||
|
type StorageClient interface {
|
||||||
|
resourcepb.ResourceStoreClient
|
||||||
resourcepb.BlobStoreClient
|
resourcepb.BlobStoreClient
|
||||||
|
}
|
||||||
|
|
||||||
|
// MigratorClient is used to perform migrations to unified storage
|
||||||
|
type MigratorClient interface {
|
||||||
|
resourcepb.BulkStoreClient
|
||||||
|
GetStats(ctx context.Context, in *resourcepb.ResourceStatsRequest, opts ...grpc.CallOption) (*resourcepb.ResourceStatsResponse, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ResourceClient combines all resource-related clients and should be avoided in favor of more specific interfaces when possible
|
||||||
|
//
|
||||||
|
//go:generate mockery --name ResourceClient --structname MockResourceClient --inpackage --filename client_mock.go --with-expecter
|
||||||
|
type ResourceClient interface {
|
||||||
|
StorageClient
|
||||||
|
SearchClient
|
||||||
|
MigratorClient
|
||||||
resourcepb.DiagnosticsClient
|
resourcepb.DiagnosticsClient
|
||||||
resourcepb.QuotasClient
|
resourcepb.QuotasClient
|
||||||
}
|
}
|
||||||
@@ -92,16 +110,15 @@ func NewLegacyResourceClient(channel grpc.ClientConnInterface, indexChannel grpc
|
|||||||
return newResourceClient(cc, cci)
|
return newResourceClient(cc, cci)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLocalResourceClient(server ResourceServer) ResourceClient {
|
func NewLocalResourceClient(server ResourceServer, searchServer SearchServer) ResourceClient {
|
||||||
// scenario: local in-proc
|
// scenario: local in-proc
|
||||||
channel := &inprocgrpc.Channel{}
|
channel := &inprocgrpc.Channel{}
|
||||||
|
indexChannel := &inprocgrpc.Channel{}
|
||||||
tracer := otel.Tracer("github.com/grafana/grafana/pkg/storage/unified/resource")
|
tracer := otel.Tracer("github.com/grafana/grafana/pkg/storage/unified/resource")
|
||||||
|
|
||||||
grpcAuthInt := grpcutils.NewUnsafeAuthenticator(tracer)
|
grpcAuthInt := grpcutils.NewUnsafeAuthenticator(tracer)
|
||||||
for _, desc := range []*grpc.ServiceDesc{
|
for _, desc := range []*grpc.ServiceDesc{
|
||||||
&resourcepb.ResourceStore_ServiceDesc,
|
&resourcepb.ResourceStore_ServiceDesc,
|
||||||
&resourcepb.ResourceIndex_ServiceDesc,
|
|
||||||
&resourcepb.ManagedObjectIndex_ServiceDesc,
|
|
||||||
&resourcepb.BlobStore_ServiceDesc,
|
&resourcepb.BlobStore_ServiceDesc,
|
||||||
&resourcepb.BulkStore_ServiceDesc,
|
&resourcepb.BulkStore_ServiceDesc,
|
||||||
&resourcepb.Diagnostics_ServiceDesc,
|
&resourcepb.Diagnostics_ServiceDesc,
|
||||||
@@ -117,13 +134,31 @@ func NewLocalResourceClient(server ResourceServer) ResourceClient {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Register search services on the index channel if searchServer is provided
|
||||||
|
if searchServer != nil {
|
||||||
|
for _, desc := range []*grpc.ServiceDesc{
|
||||||
|
&resourcepb.ResourceIndex_ServiceDesc,
|
||||||
|
&resourcepb.ManagedObjectIndex_ServiceDesc,
|
||||||
|
} {
|
||||||
|
indexChannel.RegisterService(
|
||||||
|
grpchan.InterceptServer(
|
||||||
|
desc,
|
||||||
|
grpcAuth.UnaryServerInterceptor(grpcAuthInt),
|
||||||
|
grpcAuth.StreamServerInterceptor(grpcAuthInt),
|
||||||
|
),
|
||||||
|
searchServer,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
clientInt := authnlib.NewGrpcClientInterceptor(
|
clientInt := authnlib.NewGrpcClientInterceptor(
|
||||||
ProvideInProcExchanger(),
|
ProvideInProcExchanger(),
|
||||||
authnlib.WithClientInterceptorIDTokenExtractor(idTokenExtractor),
|
authnlib.WithClientInterceptorIDTokenExtractor(idTokenExtractor),
|
||||||
)
|
)
|
||||||
|
|
||||||
cc := grpchan.InterceptClientConn(channel, clientInt.UnaryClientInterceptor, clientInt.StreamClientInterceptor)
|
cc := grpchan.InterceptClientConn(channel, clientInt.UnaryClientInterceptor, clientInt.StreamClientInterceptor)
|
||||||
return newResourceClient(cc, cc)
|
cci := grpchan.InterceptClientConn(indexChannel, clientInt.UnaryClientInterceptor, clientInt.StreamClientInterceptor)
|
||||||
|
return newResourceClient(cc, cci)
|
||||||
}
|
}
|
||||||
|
|
||||||
type RemoteResourceClientConfig struct {
|
type RemoteResourceClientConfig struct {
|
||||||
|
|||||||
@@ -127,7 +127,10 @@ type SearchBackend interface {
|
|||||||
GetOpenIndexes() []NamespacedResource
|
GetOpenIndexes() []NamespacedResource
|
||||||
}
|
}
|
||||||
|
|
||||||
// This supports indexing+search regardless of implementation
|
var _ SearchServer = &searchSupport{}
|
||||||
|
|
||||||
|
// This supports indexing+search regardless of implementation.
|
||||||
|
// Implements SearchServer interface.
|
||||||
type searchSupport struct {
|
type searchSupport struct {
|
||||||
log log.Logger
|
log log.Logger
|
||||||
storage StorageBackend
|
storage StorageBackend
|
||||||
@@ -160,6 +163,10 @@ var (
|
|||||||
_ resourcepb.ManagedObjectIndexServer = (*searchSupport)(nil)
|
_ resourcepb.ManagedObjectIndexServer = (*searchSupport)(nil)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func NewSearchServer(opts SearchOptions, storage StorageBackend, access types.AccessClient, blob BlobSupport, indexMetrics *BleveIndexMetrics, ownsIndexFn func(key NamespacedResource) (bool, error)) (SearchServer, error) {
|
||||||
|
return newSearchSupport(opts, storage, access, blob, indexMetrics, ownsIndexFn)
|
||||||
|
}
|
||||||
|
|
||||||
func newSearchSupport(opts SearchOptions, storage StorageBackend, access types.AccessClient, blob BlobSupport, indexMetrics *BleveIndexMetrics, ownsIndexFn func(key NamespacedResource) (bool, error)) (support *searchSupport, err error) {
|
func newSearchSupport(opts SearchOptions, storage StorageBackend, access types.AccessClient, blob BlobSupport, indexMetrics *BleveIndexMetrics, ownsIndexFn func(key NamespacedResource) (bool, error)) (support *searchSupport, err error) {
|
||||||
// No backend search support
|
// No backend search support
|
||||||
if opts.Backend == nil {
|
if opts.Backend == nil {
|
||||||
@@ -598,6 +605,22 @@ func (s *searchSupport) buildIndexes(ctx context.Context) (int, error) {
|
|||||||
return totalBatchesIndexed, nil
|
return totalBatchesIndexed, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *searchSupport) Init(ctx context.Context) error {
|
||||||
|
return s.init(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *searchSupport) Stop(_ context.Context) error {
|
||||||
|
s.stop()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsHealthy implements resourcepb.DiagnosticsServer
|
||||||
|
func (s *searchSupport) IsHealthy(ctx context.Context, req *resourcepb.HealthCheckRequest) (*resourcepb.HealthCheckResponse, error) {
|
||||||
|
return &resourcepb.HealthCheckResponse{
|
||||||
|
Status: resourcepb.HealthCheckResponse_SERVING,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *searchSupport) init(ctx context.Context) error {
|
func (s *searchSupport) init(ctx context.Context) error {
|
||||||
origCtx := ctx
|
origCtx := ctx
|
||||||
|
|
||||||
|
|||||||
@@ -60,7 +60,7 @@ func ProvideSearchDistributorServer(cfg *setting.Cfg, features featuremgmt.Featu
|
|||||||
}
|
}
|
||||||
|
|
||||||
type RingClient struct {
|
type RingClient struct {
|
||||||
Client ResourceClient
|
Client SearchClient
|
||||||
grpc_health_v1.HealthClient
|
grpc_health_v1.HealthClient
|
||||||
Conn *grpc.ClientConn
|
Conn *grpc.ClientConn
|
||||||
}
|
}
|
||||||
@@ -99,7 +99,7 @@ var (
|
|||||||
func (ds *distributorServer) Search(ctx context.Context, r *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) {
|
func (ds *distributorServer) Search(ctx context.Context, r *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) {
|
||||||
ctx, span := ds.tracing.Start(ctx, "distributor.Search")
|
ctx, span := ds.tracing.Start(ctx, "distributor.Search")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace, "Search")
|
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -110,7 +110,7 @@ func (ds *distributorServer) Search(ctx context.Context, r *resourcepb.ResourceS
|
|||||||
func (ds *distributorServer) GetStats(ctx context.Context, r *resourcepb.ResourceStatsRequest) (*resourcepb.ResourceStatsResponse, error) {
|
func (ds *distributorServer) GetStats(ctx context.Context, r *resourcepb.ResourceStatsRequest) (*resourcepb.ResourceStatsResponse, error) {
|
||||||
ctx, span := ds.tracing.Start(ctx, "distributor.GetStats")
|
ctx, span := ds.tracing.Start(ctx, "distributor.GetStats")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace, "GetStats")
|
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -215,7 +215,7 @@ func (ds *distributorServer) RebuildIndexes(ctx context.Context, r *resourcepb.R
|
|||||||
func (ds *distributorServer) CountManagedObjects(ctx context.Context, r *resourcepb.CountManagedObjectsRequest) (*resourcepb.CountManagedObjectsResponse, error) {
|
func (ds *distributorServer) CountManagedObjects(ctx context.Context, r *resourcepb.CountManagedObjectsRequest) (*resourcepb.CountManagedObjectsResponse, error) {
|
||||||
ctx, span := ds.tracing.Start(ctx, "distributor.CountManagedObjects")
|
ctx, span := ds.tracing.Start(ctx, "distributor.CountManagedObjects")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace, "CountManagedObjects")
|
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -226,7 +226,7 @@ func (ds *distributorServer) CountManagedObjects(ctx context.Context, r *resourc
|
|||||||
func (ds *distributorServer) ListManagedObjects(ctx context.Context, r *resourcepb.ListManagedObjectsRequest) (*resourcepb.ListManagedObjectsResponse, error) {
|
func (ds *distributorServer) ListManagedObjects(ctx context.Context, r *resourcepb.ListManagedObjectsRequest) (*resourcepb.ListManagedObjectsResponse, error) {
|
||||||
ctx, span := ds.tracing.Start(ctx, "distributor.ListManagedObjects")
|
ctx, span := ds.tracing.Start(ctx, "distributor.ListManagedObjects")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace, "ListManagedObjects")
|
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Namespace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -234,7 +234,7 @@ func (ds *distributorServer) ListManagedObjects(ctx context.Context, r *resource
|
|||||||
return client.ListManagedObjects(ctx, r)
|
return client.ListManagedObjects(ctx, r)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ds *distributorServer) getClientToDistributeRequest(ctx context.Context, namespace string, methodName string) (context.Context, ResourceClient, error) {
|
func (ds *distributorServer) getClientToDistributeRequest(ctx context.Context, namespace string) (context.Context, SearchClient, error) {
|
||||||
ringHasher := fnv.New32a()
|
ringHasher := fnv.New32a()
|
||||||
_, err := ringHasher.Write([]byte(namespace))
|
_, err := ringHasher.Write([]byte(namespace))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -34,12 +34,18 @@ import (
|
|||||||
|
|
||||||
var tracer = otel.Tracer("github.com/grafana/grafana/pkg/storage/unified/resource")
|
var tracer = otel.Tracer("github.com/grafana/grafana/pkg/storage/unified/resource")
|
||||||
|
|
||||||
|
type SearchServer interface {
|
||||||
|
LifecycleHooks
|
||||||
|
|
||||||
|
resourcepb.ResourceIndexServer
|
||||||
|
resourcepb.ManagedObjectIndexServer
|
||||||
|
resourcepb.DiagnosticsServer
|
||||||
|
}
|
||||||
|
|
||||||
// ResourceServer implements all gRPC services
|
// ResourceServer implements all gRPC services
|
||||||
type ResourceServer interface {
|
type ResourceServer interface {
|
||||||
resourcepb.ResourceStoreServer
|
resourcepb.ResourceStoreServer
|
||||||
resourcepb.BulkStoreServer
|
resourcepb.BulkStoreServer
|
||||||
resourcepb.ResourceIndexServer
|
|
||||||
resourcepb.ManagedObjectIndexServer
|
|
||||||
resourcepb.BlobStoreServer
|
resourcepb.BlobStoreServer
|
||||||
resourcepb.DiagnosticsServer
|
resourcepb.DiagnosticsServer
|
||||||
resourcepb.QuotasServer
|
resourcepb.QuotasServer
|
||||||
@@ -221,9 +227,6 @@ type ResourceServerOptions struct {
|
|||||||
// The blob configuration
|
// The blob configuration
|
||||||
Blob BlobConfig
|
Blob BlobConfig
|
||||||
|
|
||||||
// Search options
|
|
||||||
Search SearchOptions
|
|
||||||
|
|
||||||
// Quota service
|
// Quota service
|
||||||
OverridesService *OverridesService
|
OverridesService *OverridesService
|
||||||
|
|
||||||
@@ -251,16 +254,15 @@ type ResourceServerOptions struct {
|
|||||||
|
|
||||||
storageMetrics *StorageMetrics
|
storageMetrics *StorageMetrics
|
||||||
|
|
||||||
IndexMetrics *BleveIndexMetrics
|
|
||||||
|
|
||||||
// MaxPageSizeBytes is the maximum size of a page in bytes.
|
// MaxPageSizeBytes is the maximum size of a page in bytes.
|
||||||
MaxPageSizeBytes int
|
MaxPageSizeBytes int
|
||||||
|
// IndexMinUpdateInterval is the time to wait after a successful write operation to ensure read-after-write consistency in search.
|
||||||
|
// This config is shared with search
|
||||||
|
IndexMinUpdateInterval time.Duration
|
||||||
|
|
||||||
// QOSQueue is the quality of service queue used to enqueue
|
// QOSQueue is the quality of service queue used to enqueue
|
||||||
QOSQueue QOSEnqueuer
|
QOSQueue QOSEnqueuer
|
||||||
QOSConfig QueueConfig
|
QOSConfig QueueConfig
|
||||||
|
|
||||||
OwnsIndexFn func(key NamespacedResource) (bool, error)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewResourceServer(opts ResourceServerOptions) (*server, error) {
|
func NewResourceServer(opts ResourceServerOptions) (*server, error) {
|
||||||
@@ -343,23 +345,24 @@ func NewResourceServer(opts ResourceServerOptions) (*server, error) {
|
|||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
storageMetrics: opts.storageMetrics,
|
storageMetrics: opts.storageMetrics,
|
||||||
indexMetrics: opts.IndexMetrics,
|
|
||||||
maxPageSizeBytes: opts.MaxPageSizeBytes,
|
maxPageSizeBytes: opts.MaxPageSizeBytes,
|
||||||
reg: opts.Reg,
|
reg: opts.Reg,
|
||||||
queue: opts.QOSQueue,
|
queue: opts.QOSQueue,
|
||||||
queueConfig: opts.QOSConfig,
|
queueConfig: opts.QOSConfig,
|
||||||
overridesService: opts.OverridesService,
|
overridesService: opts.OverridesService,
|
||||||
|
|
||||||
artificialSuccessfulWriteDelay: opts.Search.IndexMinUpdateInterval,
|
artificialSuccessfulWriteDelay: opts.IndexMinUpdateInterval,
|
||||||
}
|
}
|
||||||
|
|
||||||
if opts.Search.Resources != nil {
|
/*
|
||||||
var err error
|
if opts.Search.Resources != nil {
|
||||||
s.search, err = newSearchSupport(opts.Search, s.backend, s.access, s.blob, opts.IndexMetrics, opts.OwnsIndexFn)
|
var err error
|
||||||
if err != nil {
|
s.search, err = newSearchSupport(opts.Search, s.backend, s.access, s.blob, opts.IndexMetrics, opts.OwnsIndexFn)
|
||||||
return nil, err
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
*/
|
||||||
|
|
||||||
err := s.Init(ctx)
|
err := s.Init(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -377,7 +380,6 @@ type server struct {
|
|||||||
backend StorageBackend
|
backend StorageBackend
|
||||||
blob BlobSupport
|
blob BlobSupport
|
||||||
secure secrets.InlineSecureValueSupport
|
secure secrets.InlineSecureValueSupport
|
||||||
search *searchSupport
|
|
||||||
diagnostics resourcepb.DiagnosticsServer
|
diagnostics resourcepb.DiagnosticsServer
|
||||||
access claims.AccessClient
|
access claims.AccessClient
|
||||||
writeHooks WriteAccessHooks
|
writeHooks WriteAccessHooks
|
||||||
@@ -424,11 +426,6 @@ func (s *server) Init(ctx context.Context) error {
|
|||||||
s.initErr = s.overridesService.init(ctx)
|
s.initErr = s.overridesService.init(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// initialize the search index
|
|
||||||
if s.initErr == nil && s.search != nil {
|
|
||||||
s.initErr = s.search.init(ctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start watching for changes
|
// Start watching for changes
|
||||||
if s.initErr == nil {
|
if s.initErr == nil {
|
||||||
s.initErr = s.initWatcher()
|
s.initErr = s.initWatcher()
|
||||||
@@ -453,10 +450,6 @@ func (s *server) Stop(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.search != nil {
|
|
||||||
s.search.stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
if s.overridesService != nil {
|
if s.overridesService != nil {
|
||||||
if err := s.overridesService.stop(ctx); err != nil {
|
if err := s.overridesService.stop(ctx); err != nil {
|
||||||
stopFailed = true
|
stopFailed = true
|
||||||
@@ -1372,47 +1365,6 @@ func (s *server) Watch(req *resourcepb.WatchRequest, srv resourcepb.ResourceStor
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *server) Search(ctx context.Context, req *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) {
|
|
||||||
if s.search == nil {
|
|
||||||
return nil, fmt.Errorf("search index not configured")
|
|
||||||
}
|
|
||||||
|
|
||||||
return s.search.Search(ctx, req)
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetStats implements ResourceServer.
|
|
||||||
func (s *server) GetStats(ctx context.Context, req *resourcepb.ResourceStatsRequest) (*resourcepb.ResourceStatsResponse, error) {
|
|
||||||
if err := s.Init(ctx); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if s.search == nil {
|
|
||||||
// If the backend implements "GetStats", we can use it
|
|
||||||
srv, ok := s.backend.(resourcepb.ResourceIndexServer)
|
|
||||||
if ok {
|
|
||||||
return srv.GetStats(ctx, req)
|
|
||||||
}
|
|
||||||
return nil, fmt.Errorf("search index not configured")
|
|
||||||
}
|
|
||||||
return s.search.GetStats(ctx, req)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *server) ListManagedObjects(ctx context.Context, req *resourcepb.ListManagedObjectsRequest) (*resourcepb.ListManagedObjectsResponse, error) {
|
|
||||||
if s.search == nil {
|
|
||||||
return nil, fmt.Errorf("search index not configured")
|
|
||||||
}
|
|
||||||
|
|
||||||
return s.search.ListManagedObjects(ctx, req)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *server) CountManagedObjects(ctx context.Context, req *resourcepb.CountManagedObjectsRequest) (*resourcepb.CountManagedObjectsResponse, error) {
|
|
||||||
if s.search == nil {
|
|
||||||
return nil, fmt.Errorf("search index not configured")
|
|
||||||
}
|
|
||||||
|
|
||||||
return s.search.CountManagedObjects(ctx, req)
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsHealthy implements ResourceServer.
|
// IsHealthy implements ResourceServer.
|
||||||
func (s *server) IsHealthy(ctx context.Context, req *resourcepb.HealthCheckRequest) (*resourcepb.HealthCheckResponse, error) {
|
func (s *server) IsHealthy(ctx context.Context, req *resourcepb.HealthCheckRequest) (*resourcepb.HealthCheckResponse, error) {
|
||||||
return s.diagnostics.IsHealthy(ctx, req)
|
return s.diagnostics.IsHealthy(ctx, req)
|
||||||
@@ -1568,14 +1520,6 @@ func (s *server) runInQueue(ctx context.Context, tenantID string, runnable func(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *server) RebuildIndexes(ctx context.Context, req *resourcepb.RebuildIndexesRequest) (*resourcepb.RebuildIndexesResponse, error) {
|
|
||||||
if s.search == nil {
|
|
||||||
return nil, fmt.Errorf("search index not configured")
|
|
||||||
}
|
|
||||||
|
|
||||||
return s.search.RebuildIndexes(ctx, req)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *server) checkQuota(ctx context.Context, nsr NamespacedResource) {
|
func (s *server) checkQuota(ctx context.Context, nsr NamespacedResource) {
|
||||||
span := trace.SpanFromContext(ctx)
|
span := trace.SpanFromContext(ctx)
|
||||||
span.AddEvent("checkQuota", trace.WithAttributes(
|
span.AddEvent("checkQuota", trace.WithAttributes(
|
||||||
|
|||||||
@@ -11,6 +11,9 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/go-sql-driver/mysql"
|
"github.com/go-sql-driver/mysql"
|
||||||
|
infraDB "github.com/grafana/grafana/pkg/infra/db"
|
||||||
|
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
||||||
|
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
|
||||||
"github.com/jackc/pgx/v5/pgconn"
|
"github.com/jackc/pgx/v5/pgconn"
|
||||||
"github.com/lib/pq"
|
"github.com/lib/pq"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
@@ -44,10 +47,63 @@ const defaultPrunerHistoryLimit = 20
|
|||||||
|
|
||||||
func ProvideStorageBackend(
|
func ProvideStorageBackend(
|
||||||
cfg *setting.Cfg,
|
cfg *setting.Cfg,
|
||||||
|
db infraDB.DB,
|
||||||
|
tracer trace.Tracer,
|
||||||
|
reg prometheus.Registerer,
|
||||||
|
storageMetrics *resource.StorageMetrics,
|
||||||
) (resource.StorageBackend, error) {
|
) (resource.StorageBackend, error) {
|
||||||
// TODO: make this the central place to provide SQL backend
|
// Create the resource DB
|
||||||
// Currently it is skipped as we need to handle the cases of Diagnostics and Lifecycle
|
eDB, err := dbimpl.ProvideResourceDB(db, cfg, tracer)
|
||||||
return nil, nil
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to create resource DB: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if HA is enabled
|
||||||
|
isHA := isHighAvailabilityEnabled(
|
||||||
|
cfg.SectionWithEnvOverrides("database"),
|
||||||
|
cfg.SectionWithEnvOverrides("resource_api"),
|
||||||
|
)
|
||||||
|
|
||||||
|
// Create the backend
|
||||||
|
backend, err := NewBackend(BackendOptions{
|
||||||
|
DBProvider: eDB,
|
||||||
|
Reg: reg,
|
||||||
|
IsHA: isHA,
|
||||||
|
storageMetrics: storageMetrics,
|
||||||
|
LastImportTimeMaxAge: cfg.MaxFileIndexAge,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to create backend: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize the backend
|
||||||
|
if err := backend.Init(context.Background()); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to initialize backend: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return backend, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// isHighAvailabilityEnabled determines if high availability mode should
|
||||||
|
// be enabled based on database configuration. High availability is enabled
|
||||||
|
// by default except for SQLite databases.
|
||||||
|
func isHighAvailabilityEnabled(dbCfg, resourceAPICfg *setting.DynamicSection) bool {
|
||||||
|
// If the resource API is using a non-SQLite database, we assume it's in HA mode.
|
||||||
|
resourceDBType := resourceAPICfg.Key("db_type").String()
|
||||||
|
if resourceDBType != "" && resourceDBType != migrator.SQLite {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check in the config if HA is enabled - by default we always assume a HA setup.
|
||||||
|
isHA := dbCfg.Key("high_availability").MustBool(true)
|
||||||
|
|
||||||
|
// SQLite is not possible to run in HA, so we force it to false.
|
||||||
|
databaseType := dbCfg.Key("type").String()
|
||||||
|
if databaseType == migrator.SQLite {
|
||||||
|
isHA = false
|
||||||
|
}
|
||||||
|
|
||||||
|
return isHA
|
||||||
}
|
}
|
||||||
|
|
||||||
type Backend interface {
|
type Backend interface {
|
||||||
|
|||||||
322
pkg/storage/unified/sql/search_service.go
Normal file
322
pkg/storage/unified/sql/search_service.go
Normal file
@@ -0,0 +1,322 @@
|
|||||||
|
package sql
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"hash/fnv"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/gorilla/mux"
|
||||||
|
"github.com/grafana/grafana/pkg/storage/unified/search"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"go.opentelemetry.io/otel"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
"google.golang.org/grpc/health/grpc_health_v1"
|
||||||
|
|
||||||
|
"github.com/grafana/dskit/kv"
|
||||||
|
"github.com/grafana/dskit/ring"
|
||||||
|
"github.com/grafana/dskit/services"
|
||||||
|
|
||||||
|
infraDB "github.com/grafana/grafana/pkg/infra/db"
|
||||||
|
"github.com/grafana/grafana/pkg/infra/log"
|
||||||
|
"github.com/grafana/grafana/pkg/modules"
|
||||||
|
"github.com/grafana/grafana/pkg/services/authz"
|
||||||
|
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||||
|
"github.com/grafana/grafana/pkg/services/grpcserver"
|
||||||
|
"github.com/grafana/grafana/pkg/services/grpcserver/interceptors"
|
||||||
|
"github.com/grafana/grafana/pkg/setting"
|
||||||
|
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||||
|
"github.com/grafana/grafana/pkg/storage/unified/resource/grpc"
|
||||||
|
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ UnifiedStorageGrpcService = (*searchService)(nil)
|
||||||
|
|
||||||
|
// operation used by the search-servers to check if they own the namespace
|
||||||
|
var (
|
||||||
|
searchOwnerRead = ring.NewOp([]ring.InstanceState{ring.JOINING, ring.ACTIVE, ring.LEAVING}, nil)
|
||||||
|
)
|
||||||
|
|
||||||
|
type searchService struct {
|
||||||
|
*services.BasicService
|
||||||
|
|
||||||
|
backend resource.StorageBackend
|
||||||
|
cfg *setting.Cfg
|
||||||
|
features featuremgmt.FeatureToggles
|
||||||
|
db infraDB.DB
|
||||||
|
stopCh chan struct{}
|
||||||
|
stoppedCh chan error
|
||||||
|
handler grpcserver.Provider
|
||||||
|
tracing trace.Tracer
|
||||||
|
authenticator func(ctx context.Context) (context.Context, error)
|
||||||
|
httpServerRouter *mux.Router
|
||||||
|
|
||||||
|
log log.Logger
|
||||||
|
reg prometheus.Registerer
|
||||||
|
docBuilders resource.DocumentBuilderSupplier
|
||||||
|
indexMetrics *resource.BleveIndexMetrics
|
||||||
|
searchRing *ring.Ring
|
||||||
|
|
||||||
|
// Ring lifecycle and sharding support
|
||||||
|
ringLifecycler *ring.BasicLifecycler
|
||||||
|
|
||||||
|
// Subservices manager
|
||||||
|
subservices *services.Manager
|
||||||
|
subservicesWatcher *services.FailureWatcher
|
||||||
|
hasSubservices bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func ProvideUnifiedSearchGrpcService(
|
||||||
|
cfg *setting.Cfg,
|
||||||
|
features featuremgmt.FeatureToggles,
|
||||||
|
db infraDB.DB,
|
||||||
|
log log.Logger,
|
||||||
|
reg prometheus.Registerer,
|
||||||
|
docBuilders resource.DocumentBuilderSupplier,
|
||||||
|
indexMetrics *resource.BleveIndexMetrics,
|
||||||
|
searchRing *ring.Ring,
|
||||||
|
memberlistKVConfig kv.Config,
|
||||||
|
backend resource.StorageBackend,
|
||||||
|
httpServerRouter *mux.Router,
|
||||||
|
) (UnifiedStorageGrpcService, error) {
|
||||||
|
var err error
|
||||||
|
tracer := otel.Tracer("unified-search-server")
|
||||||
|
|
||||||
|
authn := NewAuthenticatorWithFallback(cfg, reg, tracer, func(ctx context.Context) (context.Context, error) {
|
||||||
|
auth := grpc.Authenticator{Tracer: tracer}
|
||||||
|
return auth.Authenticate(ctx)
|
||||||
|
})
|
||||||
|
|
||||||
|
s := &searchService{
|
||||||
|
backend: backend,
|
||||||
|
cfg: cfg,
|
||||||
|
features: features,
|
||||||
|
stopCh: make(chan struct{}),
|
||||||
|
stoppedCh: make(chan error, 1),
|
||||||
|
authenticator: authn,
|
||||||
|
tracing: tracer,
|
||||||
|
db: db,
|
||||||
|
log: log,
|
||||||
|
reg: reg,
|
||||||
|
docBuilders: docBuilders,
|
||||||
|
indexMetrics: indexMetrics,
|
||||||
|
searchRing: searchRing,
|
||||||
|
httpServerRouter: httpServerRouter,
|
||||||
|
subservicesWatcher: services.NewFailureWatcher(),
|
||||||
|
}
|
||||||
|
|
||||||
|
subservices := []services.Service{}
|
||||||
|
if cfg.EnableSharding {
|
||||||
|
ringStore, err := kv.NewClient(
|
||||||
|
memberlistKVConfig,
|
||||||
|
ring.GetCodec(),
|
||||||
|
kv.RegistererWithKVName(reg, resource.RingName),
|
||||||
|
log,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to create KV store client: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
lifecyclerCfg, err := toLifecyclerConfig(cfg, log)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to initialize search-ring lifecycler config: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Define lifecycler delegates in reverse order (last to be called defined first because they're
|
||||||
|
// chained via "next delegate").
|
||||||
|
delegate := ring.BasicLifecyclerDelegate(ring.NewInstanceRegisterDelegate(ring.JOINING, resource.RingNumTokens))
|
||||||
|
delegate = ring.NewLeaveOnStoppingDelegate(delegate, log)
|
||||||
|
delegate = ring.NewAutoForgetDelegate(resource.RingHeartbeatTimeout*2, delegate, log)
|
||||||
|
|
||||||
|
s.ringLifecycler, err = ring.NewBasicLifecycler(
|
||||||
|
lifecyclerCfg,
|
||||||
|
resource.RingName,
|
||||||
|
resource.RingKey,
|
||||||
|
ringStore,
|
||||||
|
delegate,
|
||||||
|
log,
|
||||||
|
reg,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to initialize search-ring lifecycler: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.ringLifecycler.SetKeepInstanceInTheRingOnShutdown(true)
|
||||||
|
subservices = append(subservices, s.ringLifecycler)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(subservices) > 0 {
|
||||||
|
s.hasSubservices = true
|
||||||
|
s.subservices, err = services.NewManager(subservices...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to create subservices manager: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// This will be used when running as a dskit service
|
||||||
|
s.BasicService = services.NewBasicService(s.starting, s.running, s.stopping).WithName(modules.SearchServer)
|
||||||
|
|
||||||
|
// Register HTTP endpoints if router is provided
|
||||||
|
s.RegisterHTTPEndpoints(httpServerRouter)
|
||||||
|
|
||||||
|
return s, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *searchService) PrepareDownscale(w http.ResponseWriter, r *http.Request) {
|
||||||
|
switch r.Method {
|
||||||
|
case http.MethodPost:
|
||||||
|
s.log.Info("Preparing for downscale. Will not keep instance in ring on shutdown.")
|
||||||
|
s.ringLifecycler.SetKeepInstanceInTheRingOnShutdown(false)
|
||||||
|
case http.MethodDelete:
|
||||||
|
s.log.Info("Downscale canceled. Will keep instance in ring on shutdown.")
|
||||||
|
s.ringLifecycler.SetKeepInstanceInTheRingOnShutdown(true)
|
||||||
|
case http.MethodGet:
|
||||||
|
// used for delayed downscale use case, which we don't support. Leaving here for completion sake
|
||||||
|
s.log.Info("Received GET request for prepare-downscale. Behavior not implemented.")
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *searchService) OwnsIndex(key resource.NamespacedResource) (bool, error) {
|
||||||
|
if s.searchRing == nil {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if st := s.searchRing.State(); st != services.Running {
|
||||||
|
return false, fmt.Errorf("ring is not Running: %s", st)
|
||||||
|
}
|
||||||
|
|
||||||
|
ringHasher := fnv.New32a()
|
||||||
|
_, err := ringHasher.Write([]byte(key.Namespace))
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("error hashing namespace: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
rs, err := s.searchRing.GetWithOptions(ringHasher.Sum32(), searchOwnerRead, ring.WithReplicationFactor(s.searchRing.ReplicationFactor()))
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("error getting replicaset from ring: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return rs.Includes(s.ringLifecycler.GetInstanceAddr()), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *searchService) starting(ctx context.Context) error {
|
||||||
|
if s.hasSubservices {
|
||||||
|
s.subservicesWatcher.WatchManager(s.subservices)
|
||||||
|
if err := services.StartManagerAndAwaitHealthy(ctx, s.subservices); err != nil {
|
||||||
|
return fmt.Errorf("failed to start subservices: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
authzClient, err := authz.ProvideStandaloneAuthZClient(s.cfg, s.features, s.tracing, s.reg)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create search options for the search server
|
||||||
|
searchOptions, err := search.NewSearchOptions(s.features, s.cfg, s.docBuilders, s.indexMetrics, s.OwnsIndex)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the search server
|
||||||
|
searchServer, err := NewSearchServer(SearchServerOptions{
|
||||||
|
Backend: s.backend,
|
||||||
|
DB: s.db,
|
||||||
|
Cfg: s.cfg,
|
||||||
|
Tracer: s.tracing,
|
||||||
|
Reg: s.reg,
|
||||||
|
AccessClient: authzClient,
|
||||||
|
SearchOptions: searchOptions,
|
||||||
|
IndexMetrics: s.indexMetrics,
|
||||||
|
OwnsIndexFn: s.OwnsIndex,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.handler, err = grpcserver.ProvideService(s.cfg, s.features, interceptors.AuthenticatorFunc(s.authenticator), s.tracing, prometheus.DefaultRegisterer)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
healthService, err := resource.ProvideHealthService(searchServer)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
srv := s.handler.GetServer()
|
||||||
|
// Register search services
|
||||||
|
resourcepb.RegisterResourceIndexServer(srv, searchServer)
|
||||||
|
resourcepb.RegisterManagedObjectIndexServer(srv, searchServer)
|
||||||
|
resourcepb.RegisterDiagnosticsServer(srv, searchServer)
|
||||||
|
grpc_health_v1.RegisterHealthServer(srv, healthService)
|
||||||
|
|
||||||
|
// register reflection service
|
||||||
|
_, err = grpcserver.ProvideReflectionService(s.cfg, s.handler)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.cfg.EnableSharding {
|
||||||
|
s.log.Info("waiting until search server is JOINING in the ring")
|
||||||
|
lfcCtx, cancel := context.WithTimeout(context.Background(), s.cfg.ResourceServerJoinRingTimeout)
|
||||||
|
defer cancel()
|
||||||
|
if err := ring.WaitInstanceState(lfcCtx, s.searchRing, s.ringLifecycler.GetInstanceID(), ring.JOINING); err != nil {
|
||||||
|
return fmt.Errorf("error switching to JOINING in the ring: %s", err)
|
||||||
|
}
|
||||||
|
s.log.Info("search server is JOINING in the ring")
|
||||||
|
|
||||||
|
if err := s.ringLifecycler.ChangeState(ctx, ring.ACTIVE); err != nil {
|
||||||
|
return fmt.Errorf("error switching to ACTIVE in the ring: %s", err)
|
||||||
|
}
|
||||||
|
s.log.Info("search server is ACTIVE in the ring")
|
||||||
|
}
|
||||||
|
|
||||||
|
// start the gRPC server
|
||||||
|
go func() {
|
||||||
|
err := s.handler.Run(ctx)
|
||||||
|
if err != nil {
|
||||||
|
s.stoppedCh <- err
|
||||||
|
} else {
|
||||||
|
s.stoppedCh <- nil
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetAddress returns the address of the gRPC server.
|
||||||
|
func (s *searchService) GetAddress() string {
|
||||||
|
return s.handler.GetAddress()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *searchService) running(ctx context.Context) error {
|
||||||
|
select {
|
||||||
|
case err := <-s.stoppedCh:
|
||||||
|
if err != nil && !errors.Is(err, context.Canceled) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
case err := <-s.subservicesWatcher.Chan():
|
||||||
|
return fmt.Errorf("subservice failure: %w", err)
|
||||||
|
case <-ctx.Done():
|
||||||
|
close(s.stopCh)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *searchService) stopping(_ error) error {
|
||||||
|
if s.hasSubservices {
|
||||||
|
err := services.StopManagerAndAwaitStopped(context.Background(), s.subservices)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to stop subservices: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *searchService) RegisterHTTPEndpoints(httpServerRouter *mux.Router) {
|
||||||
|
if httpServerRouter != nil && s.cfg.EnableSharding {
|
||||||
|
httpServerRouter.Path("/prepare-downscale").Methods("GET", "POST", "DELETE").Handler(http.HandlerFunc(s.PrepareDownscale))
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -16,7 +16,6 @@ import (
|
|||||||
secrets "github.com/grafana/grafana/pkg/registry/apis/secret/contracts"
|
secrets "github.com/grafana/grafana/pkg/registry/apis/secret/contracts"
|
||||||
inlinesecurevalue "github.com/grafana/grafana/pkg/registry/apis/secret/inline"
|
inlinesecurevalue "github.com/grafana/grafana/pkg/registry/apis/secret/inline"
|
||||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||||
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
|
||||||
"github.com/grafana/grafana/pkg/setting"
|
"github.com/grafana/grafana/pkg/setting"
|
||||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||||
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
|
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
|
||||||
@@ -30,8 +29,21 @@ type QOSEnqueueDequeuer interface {
|
|||||||
Dequeue(ctx context.Context) (func(), error)
|
Dequeue(ctx context.Context) (func(), error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ServerOptions contains the options for creating a new ResourceServer
|
// SearchServerOptions contains the options for creating a new SearchServer
|
||||||
type ServerOptions struct {
|
type SearchServerOptions struct {
|
||||||
|
Backend resource.StorageBackend
|
||||||
|
DB infraDB.DB
|
||||||
|
Cfg *setting.Cfg
|
||||||
|
Tracer trace.Tracer
|
||||||
|
Reg prometheus.Registerer
|
||||||
|
AccessClient types.AccessClient
|
||||||
|
SearchOptions resource.SearchOptions
|
||||||
|
IndexMetrics *resource.BleveIndexMetrics
|
||||||
|
OwnsIndexFn func(key resource.NamespacedResource) (bool, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// StorageServerOptions contains the options for creating a storage-only server (without search)
|
||||||
|
type StorageServerOptions struct {
|
||||||
Backend resource.StorageBackend
|
Backend resource.StorageBackend
|
||||||
OverridesService *resource.OverridesService
|
OverridesService *resource.OverridesService
|
||||||
DB infraDB.DB
|
DB infraDB.DB
|
||||||
@@ -39,16 +51,66 @@ type ServerOptions struct {
|
|||||||
Tracer trace.Tracer
|
Tracer trace.Tracer
|
||||||
Reg prometheus.Registerer
|
Reg prometheus.Registerer
|
||||||
AccessClient types.AccessClient
|
AccessClient types.AccessClient
|
||||||
SearchOptions resource.SearchOptions
|
|
||||||
StorageMetrics *resource.StorageMetrics
|
StorageMetrics *resource.StorageMetrics
|
||||||
IndexMetrics *resource.BleveIndexMetrics
|
|
||||||
Features featuremgmt.FeatureToggles
|
Features featuremgmt.FeatureToggles
|
||||||
QOSQueue QOSEnqueueDequeuer
|
QOSQueue QOSEnqueueDequeuer
|
||||||
SecureValues secrets.InlineSecureValueSupport
|
SecureValues secrets.InlineSecureValueSupport
|
||||||
OwnsIndexFn func(key resource.NamespacedResource) (bool, error)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
// NewSearchServer creates a new SearchServer with the given options.
|
||||||
|
// This can be used to create a standalone search server or to create a search server
|
||||||
|
// that will be passed to NewResourceServer.
|
||||||
|
//
|
||||||
|
// Important: When running in monolith mode, the backend should be provided by the caller
|
||||||
|
// to avoid duplicate metrics registration. Only in standalone microservice mode should
|
||||||
|
// this function create its own backend.
|
||||||
|
func NewSearchServer(opts SearchServerOptions) (resource.SearchServer, error) {
|
||||||
|
backend := opts.Backend
|
||||||
|
if backend == nil {
|
||||||
|
eDB, err := dbimpl.ProvideResourceDB(opts.DB, opts.Cfg, opts.Tracer)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
isHA := isHighAvailabilityEnabled(opts.Cfg.SectionWithEnvOverrides("database"),
|
||||||
|
opts.Cfg.SectionWithEnvOverrides("resource_api"))
|
||||||
|
|
||||||
|
b, err := NewBackend(BackendOptions{
|
||||||
|
DBProvider: eDB,
|
||||||
|
Reg: opts.Reg,
|
||||||
|
IsHA: isHA,
|
||||||
|
LastImportTimeMaxAge: opts.SearchOptions.MaxIndexAge,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize the backend before creating search server
|
||||||
|
if err := b.Init(context.Background()); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to initialize backend: %w", err)
|
||||||
|
}
|
||||||
|
backend = b
|
||||||
|
}
|
||||||
|
|
||||||
|
search, err := resource.NewSearchServer(opts.SearchOptions, backend, opts.AccessClient, nil, opts.IndexMetrics, opts.OwnsIndexFn)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to create search server: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := search.Init(context.Background()); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to initialize search server: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return search, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewStorageServer creates a storage-only server without search capabilities.
|
||||||
|
// Use this when you want to run storage and search as separate services.
|
||||||
|
//
|
||||||
|
// Important: When running in monolith mode, the backend should be provided by the caller
|
||||||
|
// to avoid duplicate metrics registration. Only in standalone microservice mode should
|
||||||
|
// this function create its own backend.
|
||||||
|
func NewStorageServer(opts StorageServerOptions) (resource.ResourceServer, error) {
|
||||||
apiserverCfg := opts.Cfg.SectionWithEnvOverrides("grafana-apiserver")
|
apiserverCfg := opts.Cfg.SectionWithEnvOverrides("grafana-apiserver")
|
||||||
|
|
||||||
if opts.SecureValues == nil && opts.Cfg != nil && opts.Cfg.SecretsManagement.GrpcClientEnable {
|
if opts.SecureValues == nil && opts.Cfg != nil && opts.Cfg.SecretsManagement.GrpcClientEnable {
|
||||||
@@ -92,7 +154,6 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
|||||||
|
|
||||||
if opts.Backend != nil {
|
if opts.Backend != nil {
|
||||||
serverOptions.Backend = opts.Backend
|
serverOptions.Backend = opts.Backend
|
||||||
// TODO: we should probably have a proper interface for diagnostics/lifecycle
|
|
||||||
} else {
|
} else {
|
||||||
eDB, err := dbimpl.ProvideResourceDB(opts.DB, opts.Cfg, opts.Tracer)
|
eDB, err := dbimpl.ProvideResourceDB(opts.DB, opts.Cfg, opts.Tracer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -130,7 +191,6 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
|||||||
return nil, fmt.Errorf("failed to create resource version manager: %w", err)
|
return nil, fmt.Errorf("failed to create resource version manager: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO add config to decide whether to pass RvManager or not
|
|
||||||
kvBackendOpts.RvManager = rvManager
|
kvBackendOpts.RvManager = rvManager
|
||||||
kvBackend, err := resource.NewKVStorageBackend(kvBackendOpts)
|
kvBackend, err := resource.NewKVStorageBackend(kvBackendOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -148,7 +208,7 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
|||||||
Reg: opts.Reg,
|
Reg: opts.Reg,
|
||||||
IsHA: isHA,
|
IsHA: isHA,
|
||||||
storageMetrics: opts.StorageMetrics,
|
storageMetrics: opts.StorageMetrics,
|
||||||
LastImportTimeMaxAge: opts.SearchOptions.MaxIndexAge, // No need to keep last_import_times older than max index age.
|
LastImportTimeMaxAge: opts.Cfg.MaxFileIndexAge,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -159,33 +219,15 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
serverOptions.Search = opts.SearchOptions
|
// Initialize the backend before creating server
|
||||||
serverOptions.IndexMetrics = opts.IndexMetrics
|
if serverOptions.Lifecycle != nil {
|
||||||
|
if err := serverOptions.Lifecycle.Init(context.Background()); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to initialize backend: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
serverOptions.QOSQueue = opts.QOSQueue
|
serverOptions.QOSQueue = opts.QOSQueue
|
||||||
serverOptions.OwnsIndexFn = opts.OwnsIndexFn
|
|
||||||
serverOptions.OverridesService = opts.OverridesService
|
serverOptions.OverridesService = opts.OverridesService
|
||||||
|
|
||||||
return resource.NewResourceServer(serverOptions)
|
return resource.NewResourceServer(serverOptions)
|
||||||
}
|
}
|
||||||
|
|
||||||
// isHighAvailabilityEnabled determines if high availability mode should
|
|
||||||
// be enabled based on database configuration. High availability is enabled
|
|
||||||
// by default except for SQLite databases.
|
|
||||||
func isHighAvailabilityEnabled(dbCfg, resourceAPICfg *setting.DynamicSection) bool {
|
|
||||||
// If the resource API is using a non-SQLite database, we assume it's in HA mode.
|
|
||||||
resourceDBType := resourceAPICfg.Key("db_type").String()
|
|
||||||
if resourceDBType != "" && resourceDBType != migrator.SQLite {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check in the config if HA is enabled - by default we always assume a HA setup.
|
|
||||||
isHA := dbCfg.Key("high_availability").MustBool(true)
|
|
||||||
|
|
||||||
// SQLite is not possible to run in HA, so we force it to false.
|
|
||||||
databaseType := dbCfg.Key("type").String()
|
|
||||||
if databaseType == migrator.SQLite {
|
|
||||||
isHA = false
|
|
||||||
}
|
|
||||||
|
|
||||||
return isHA
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
|
"github.com/grafana/grafana/pkg/storage/unified/search"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||||
"go.opentelemetry.io/otel"
|
"go.opentelemetry.io/otel"
|
||||||
@@ -36,7 +37,7 @@ import (
|
|||||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||||
"github.com/grafana/grafana/pkg/storage/unified/resource/grpc"
|
"github.com/grafana/grafana/pkg/storage/unified/resource/grpc"
|
||||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||||
"github.com/grafana/grafana/pkg/storage/unified/search"
|
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
|
||||||
"github.com/grafana/grafana/pkg/util/scheduler"
|
"github.com/grafana/grafana/pkg/util/scheduler"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -54,38 +55,40 @@ type UnifiedStorageGrpcService interface {
|
|||||||
type service struct {
|
type service struct {
|
||||||
*services.BasicService
|
*services.BasicService
|
||||||
|
|
||||||
// Subservices manager
|
backend resource.StorageBackend
|
||||||
subservices *services.Manager
|
cfg *setting.Cfg
|
||||||
subservicesWatcher *services.FailureWatcher
|
features featuremgmt.FeatureToggles
|
||||||
hasSubservices bool
|
stopCh chan struct{}
|
||||||
|
stoppedCh chan error
|
||||||
backend resource.StorageBackend
|
authenticator func(context.Context) (context.Context, error)
|
||||||
cfg *setting.Cfg
|
tracing trace.Tracer
|
||||||
features featuremgmt.FeatureToggles
|
db infraDB.DB
|
||||||
db infraDB.DB
|
|
||||||
stopCh chan struct{}
|
|
||||||
stoppedCh chan error
|
|
||||||
|
|
||||||
handler grpcserver.Provider
|
|
||||||
|
|
||||||
tracing trace.Tracer
|
|
||||||
|
|
||||||
authenticator func(ctx context.Context) (context.Context, error)
|
|
||||||
|
|
||||||
log log.Logger
|
log log.Logger
|
||||||
reg prometheus.Registerer
|
reg prometheus.Registerer
|
||||||
|
docBuilders resource.DocumentBuilderSupplier
|
||||||
storageMetrics *resource.StorageMetrics
|
storageMetrics *resource.StorageMetrics
|
||||||
indexMetrics *resource.BleveIndexMetrics
|
indexMetrics *resource.BleveIndexMetrics
|
||||||
|
|
||||||
docBuilders resource.DocumentBuilderSupplier
|
|
||||||
|
|
||||||
searchRing *ring.Ring
|
searchRing *ring.Ring
|
||||||
|
|
||||||
|
// Handler for the gRPC server
|
||||||
|
handler grpcserver.Provider
|
||||||
|
|
||||||
|
// Ring lifecycle and sharding support
|
||||||
ringLifecycler *ring.BasicLifecycler
|
ringLifecycler *ring.BasicLifecycler
|
||||||
|
|
||||||
queue QOSEnqueueDequeuer
|
// QoS support
|
||||||
|
queue *scheduler.Queue
|
||||||
scheduler *scheduler.Scheduler
|
scheduler *scheduler.Scheduler
|
||||||
|
|
||||||
|
// Subservices management
|
||||||
|
hasSubservices bool
|
||||||
|
subservices *services.Manager
|
||||||
|
subservicesWatcher *services.FailureWatcher
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ProvideUnifiedStorageGrpcService provides a combined storage and search service running on the same gRPC server.
|
||||||
|
// This is used when running Grafana as a monolith where both services share the same process.
|
||||||
|
// Each service (storage and search) maintains its own lifecycle but shares the gRPC server.
|
||||||
func ProvideUnifiedStorageGrpcService(
|
func ProvideUnifiedStorageGrpcService(
|
||||||
cfg *setting.Cfg,
|
cfg *setting.Cfg,
|
||||||
features featuremgmt.FeatureToggles,
|
features featuremgmt.FeatureToggles,
|
||||||
@@ -101,7 +104,7 @@ func ProvideUnifiedStorageGrpcService(
|
|||||||
backend resource.StorageBackend,
|
backend resource.StorageBackend,
|
||||||
) (UnifiedStorageGrpcService, error) {
|
) (UnifiedStorageGrpcService, error) {
|
||||||
var err error
|
var err error
|
||||||
tracer := otel.Tracer("unified-storage")
|
tracer := otel.Tracer("unified-storage-combined")
|
||||||
|
|
||||||
// FIXME: This is a temporary solution while we are migrating to the new authn interceptor
|
// FIXME: This is a temporary solution while we are migrating to the new authn interceptor
|
||||||
// grpcutils.NewGrpcAuthenticator should be used instead.
|
// grpcutils.NewGrpcAuthenticator should be used instead.
|
||||||
@@ -178,7 +181,7 @@ func ProvideUnifiedStorageGrpcService(
|
|||||||
MaxSizePerTenant: cfg.QOSMaxSizePerTenant,
|
MaxSizePerTenant: cfg.QOSMaxSizePerTenant,
|
||||||
Registerer: qosReg,
|
Registerer: qosReg,
|
||||||
})
|
})
|
||||||
scheduler, err := scheduler.NewScheduler(queue, &scheduler.Config{
|
sched, err := scheduler.NewScheduler(queue, &scheduler.Config{
|
||||||
NumWorkers: cfg.QOSNumberWorker,
|
NumWorkers: cfg.QOSNumberWorker,
|
||||||
Logger: log,
|
Logger: log,
|
||||||
})
|
})
|
||||||
@@ -187,7 +190,7 @@ func ProvideUnifiedStorageGrpcService(
|
|||||||
}
|
}
|
||||||
|
|
||||||
s.queue = queue
|
s.queue = queue
|
||||||
s.scheduler = scheduler
|
s.scheduler = sched
|
||||||
subservices = append(subservices, s.queue, s.scheduler)
|
subservices = append(subservices, s.queue, s.scheduler)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -200,6 +203,7 @@ func ProvideUnifiedStorageGrpcService(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// This will be used when running as a dskit service
|
// This will be used when running as a dskit service
|
||||||
|
// Note: We use StorageServer as the module name for backward compatibility
|
||||||
s.BasicService = services.NewBasicService(s.starting, s.running, s.stopping).WithName(modules.StorageServer)
|
s.BasicService = services.NewBasicService(s.starting, s.running, s.stopping).WithName(modules.StorageServer)
|
||||||
|
|
||||||
return s, nil
|
return s, nil
|
||||||
@@ -220,11 +224,6 @@ func (s *service) PrepareDownscale(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
|
||||||
// operation used by the search-servers to check if they own the namespace
|
|
||||||
searchOwnerRead = ring.NewOp([]ring.InstanceState{ring.JOINING, ring.ACTIVE, ring.LEAVING}, nil)
|
|
||||||
)
|
|
||||||
|
|
||||||
func (s *service) OwnsIndex(key resource.NamespacedResource) (bool, error) {
|
func (s *service) OwnsIndex(key resource.NamespacedResource) (bool, error) {
|
||||||
if s.searchRing == nil {
|
if s.searchRing == nil {
|
||||||
return true, nil
|
return true, nil
|
||||||
@@ -261,59 +260,108 @@ func (s *service) starting(ctx context.Context) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
searchOptions, err := search.NewSearchOptions(s.features, s.cfg, s.docBuilders, s.indexMetrics, s.OwnsIndex)
|
// Setup overrides service if enabled
|
||||||
if err != nil {
|
var overridesSvc *resource.OverridesService
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
serverOptions := ServerOptions{
|
|
||||||
Backend: s.backend,
|
|
||||||
DB: s.db,
|
|
||||||
Cfg: s.cfg,
|
|
||||||
Tracer: s.tracing,
|
|
||||||
Reg: s.reg,
|
|
||||||
AccessClient: authzClient,
|
|
||||||
SearchOptions: searchOptions,
|
|
||||||
StorageMetrics: s.storageMetrics,
|
|
||||||
IndexMetrics: s.indexMetrics,
|
|
||||||
Features: s.features,
|
|
||||||
QOSQueue: s.queue,
|
|
||||||
OwnsIndexFn: s.OwnsIndex,
|
|
||||||
}
|
|
||||||
|
|
||||||
if s.cfg.OverridesFilePath != "" {
|
if s.cfg.OverridesFilePath != "" {
|
||||||
overridesSvc, err := resource.NewOverridesService(context.Background(), s.log, s.reg, s.tracing, resource.ReloadOptions{
|
overridesSvc, err = resource.NewOverridesService(context.Background(), s.log, s.reg, s.tracing, resource.ReloadOptions{
|
||||||
FilePath: s.cfg.OverridesFilePath,
|
FilePath: s.cfg.OverridesFilePath,
|
||||||
ReloadPeriod: s.cfg.OverridesReloadInterval,
|
ReloadPeriod: s.cfg.OverridesReloadInterval,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
serverOptions.OverridesService = overridesSvc
|
|
||||||
}
|
}
|
||||||
|
|
||||||
server, err := NewResourceServer(serverOptions)
|
// Ensure we have a backend - create one if needed
|
||||||
|
// This is critical: we create the backend ONCE and share it between search and storage servers
|
||||||
|
// to avoid duplicate metrics registration
|
||||||
|
backend := s.backend
|
||||||
|
if backend == nil {
|
||||||
|
eDB, err := dbimpl.ProvideResourceDB(s.db, s.cfg, s.tracing)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create resource DB: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
isHA := isHighAvailabilityEnabled(s.cfg.SectionWithEnvOverrides("database"),
|
||||||
|
s.cfg.SectionWithEnvOverrides("resource_api"))
|
||||||
|
|
||||||
|
b, err := NewBackend(BackendOptions{
|
||||||
|
DBProvider: eDB,
|
||||||
|
Reg: s.reg,
|
||||||
|
IsHA: isHA,
|
||||||
|
storageMetrics: s.storageMetrics,
|
||||||
|
LastImportTimeMaxAge: s.cfg.MaxFileIndexAge,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create backend: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize the backend
|
||||||
|
if err := b.Init(context.Background()); err != nil {
|
||||||
|
return fmt.Errorf("failed to initialize backend: %w", err)
|
||||||
|
}
|
||||||
|
backend = b
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create search options for the search server
|
||||||
|
searchOptions, err := search.NewSearchOptions(s.features, s.cfg, s.docBuilders, s.indexMetrics, s.OwnsIndex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Create the search server - pass the shared backend
|
||||||
|
searchServer, err := NewSearchServer(SearchServerOptions{
|
||||||
|
Backend: backend, // Use the shared backend
|
||||||
|
DB: s.db,
|
||||||
|
Cfg: s.cfg,
|
||||||
|
Tracer: s.tracing,
|
||||||
|
Reg: s.reg,
|
||||||
|
AccessClient: authzClient,
|
||||||
|
SearchOptions: searchOptions,
|
||||||
|
IndexMetrics: s.indexMetrics,
|
||||||
|
OwnsIndexFn: s.OwnsIndex,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the storage server - pass the shared backend
|
||||||
|
storageServer, err := NewStorageServer(StorageServerOptions{
|
||||||
|
Backend: backend, // Use the shared backend
|
||||||
|
OverridesService: overridesSvc,
|
||||||
|
DB: s.db,
|
||||||
|
Cfg: s.cfg,
|
||||||
|
Tracer: s.tracing,
|
||||||
|
Reg: s.reg,
|
||||||
|
AccessClient: authzClient,
|
||||||
|
StorageMetrics: s.storageMetrics,
|
||||||
|
Features: s.features,
|
||||||
|
QOSQueue: s.queue,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
s.handler, err = grpcserver.ProvideService(s.cfg, s.features, interceptors.AuthenticatorFunc(s.authenticator), s.tracing, prometheus.DefaultRegisterer)
|
s.handler, err = grpcserver.ProvideService(s.cfg, s.features, interceptors.AuthenticatorFunc(s.authenticator), s.tracing, prometheus.DefaultRegisterer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
healthService, err := resource.ProvideHealthService(server)
|
healthService, err := resource.ProvideHealthService(storageServer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
srv := s.handler.GetServer()
|
srv := s.handler.GetServer()
|
||||||
resourcepb.RegisterResourceStoreServer(srv, server)
|
// Register storage services
|
||||||
resourcepb.RegisterBulkStoreServer(srv, server)
|
resourcepb.RegisterResourceStoreServer(srv, storageServer)
|
||||||
resourcepb.RegisterResourceIndexServer(srv, server)
|
resourcepb.RegisterBulkStoreServer(srv, storageServer)
|
||||||
resourcepb.RegisterManagedObjectIndexServer(srv, server)
|
resourcepb.RegisterBlobStoreServer(srv, storageServer)
|
||||||
resourcepb.RegisterBlobStoreServer(srv, server)
|
resourcepb.RegisterDiagnosticsServer(srv, storageServer)
|
||||||
resourcepb.RegisterDiagnosticsServer(srv, server)
|
resourcepb.RegisterQuotasServer(srv, storageServer)
|
||||||
resourcepb.RegisterQuotasServer(srv, server)
|
// Register search services
|
||||||
|
resourcepb.RegisterResourceIndexServer(srv, searchServer)
|
||||||
|
resourcepb.RegisterManagedObjectIndexServer(srv, searchServer)
|
||||||
grpc_health_v1.RegisterHealthServer(srv, healthService)
|
grpc_health_v1.RegisterHealthServer(srv, healthService)
|
||||||
|
|
||||||
// register reflection service
|
// register reflection service
|
||||||
|
|||||||
234
pkg/storage/unified/sql/storage_service.go
Normal file
234
pkg/storage/unified/sql/storage_service.go
Normal file
@@ -0,0 +1,234 @@
|
|||||||
|
package sql
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"go.opentelemetry.io/otel"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
"google.golang.org/grpc/health/grpc_health_v1"
|
||||||
|
|
||||||
|
"github.com/grafana/dskit/services"
|
||||||
|
|
||||||
|
infraDB "github.com/grafana/grafana/pkg/infra/db"
|
||||||
|
"github.com/grafana/grafana/pkg/infra/log"
|
||||||
|
"github.com/grafana/grafana/pkg/modules"
|
||||||
|
"github.com/grafana/grafana/pkg/services/authz"
|
||||||
|
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||||
|
"github.com/grafana/grafana/pkg/services/grpcserver"
|
||||||
|
"github.com/grafana/grafana/pkg/services/grpcserver/interceptors"
|
||||||
|
"github.com/grafana/grafana/pkg/setting"
|
||||||
|
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||||
|
"github.com/grafana/grafana/pkg/storage/unified/resource/grpc"
|
||||||
|
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||||
|
"github.com/grafana/grafana/pkg/util/scheduler"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ UnifiedStorageGrpcService = (*storageService)(nil)
|
||||||
|
|
||||||
|
type storageService struct {
|
||||||
|
*services.BasicService
|
||||||
|
|
||||||
|
backend resource.StorageBackend
|
||||||
|
cfg *setting.Cfg
|
||||||
|
features featuremgmt.FeatureToggles
|
||||||
|
db infraDB.DB
|
||||||
|
stopCh chan struct{}
|
||||||
|
stoppedCh chan error
|
||||||
|
|
||||||
|
handler grpcserver.Provider
|
||||||
|
|
||||||
|
tracing trace.Tracer
|
||||||
|
|
||||||
|
authenticator func(ctx context.Context) (context.Context, error)
|
||||||
|
|
||||||
|
log log.Logger
|
||||||
|
reg prometheus.Registerer
|
||||||
|
storageMetrics *resource.StorageMetrics
|
||||||
|
|
||||||
|
queue QOSEnqueueDequeuer
|
||||||
|
scheduler *scheduler.Scheduler
|
||||||
|
|
||||||
|
// Subservices manager
|
||||||
|
subservices *services.Manager
|
||||||
|
subservicesWatcher *services.FailureWatcher
|
||||||
|
hasSubservices bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func ProvideStorageService(
|
||||||
|
cfg *setting.Cfg,
|
||||||
|
features featuremgmt.FeatureToggles,
|
||||||
|
db infraDB.DB,
|
||||||
|
log log.Logger,
|
||||||
|
reg prometheus.Registerer,
|
||||||
|
storageMetrics *resource.StorageMetrics,
|
||||||
|
backend resource.StorageBackend,
|
||||||
|
) (UnifiedStorageGrpcService, error) {
|
||||||
|
var err error
|
||||||
|
tracer := otel.Tracer("unified-storage-server")
|
||||||
|
|
||||||
|
authn := NewAuthenticatorWithFallback(cfg, reg, tracer, func(ctx context.Context) (context.Context, error) {
|
||||||
|
auth := grpc.Authenticator{Tracer: tracer}
|
||||||
|
return auth.Authenticate(ctx)
|
||||||
|
})
|
||||||
|
|
||||||
|
s := &storageService{
|
||||||
|
backend: backend,
|
||||||
|
cfg: cfg,
|
||||||
|
features: features,
|
||||||
|
stopCh: make(chan struct{}),
|
||||||
|
stoppedCh: make(chan error, 1),
|
||||||
|
authenticator: authn,
|
||||||
|
tracing: tracer,
|
||||||
|
db: db,
|
||||||
|
log: log,
|
||||||
|
reg: reg,
|
||||||
|
storageMetrics: storageMetrics,
|
||||||
|
subservicesWatcher: services.NewFailureWatcher(),
|
||||||
|
}
|
||||||
|
|
||||||
|
subservices := []services.Service{}
|
||||||
|
|
||||||
|
// Setup QOS if enabled
|
||||||
|
if cfg.QOSEnabled {
|
||||||
|
qosReg := prometheus.WrapRegistererWithPrefix("resource_server_qos_", reg)
|
||||||
|
queue := scheduler.NewQueue(&scheduler.QueueOptions{
|
||||||
|
MaxSizePerTenant: cfg.QOSMaxSizePerTenant,
|
||||||
|
Registerer: qosReg,
|
||||||
|
})
|
||||||
|
sched, err := scheduler.NewScheduler(queue, &scheduler.Config{
|
||||||
|
NumWorkers: cfg.QOSNumberWorker,
|
||||||
|
Logger: log,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to create qos scheduler: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.queue = queue
|
||||||
|
s.scheduler = sched
|
||||||
|
subservices = append(subservices, s.queue, s.scheduler)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(subservices) > 0 {
|
||||||
|
s.hasSubservices = true
|
||||||
|
s.subservices, err = services.NewManager(subservices...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to create subservices manager: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// This will be used when running as a dskit service
|
||||||
|
s.BasicService = services.NewBasicService(s.starting, s.running, s.stopping).WithName(modules.StorageServer)
|
||||||
|
|
||||||
|
return s, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *storageService) starting(ctx context.Context) error {
|
||||||
|
if s.hasSubservices {
|
||||||
|
s.subservicesWatcher.WatchManager(s.subservices)
|
||||||
|
if err := services.StartManagerAndAwaitHealthy(ctx, s.subservices); err != nil {
|
||||||
|
return fmt.Errorf("failed to start subservices: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
authzClient, err := authz.ProvideStandaloneAuthZClient(s.cfg, s.features, s.tracing, s.reg)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Setup overrides service if enabled
|
||||||
|
var overridesSvc *resource.OverridesService
|
||||||
|
if s.cfg.OverridesFilePath != "" {
|
||||||
|
overridesSvc, err = resource.NewOverridesService(context.Background(), s.log, s.reg, s.tracing, resource.ReloadOptions{
|
||||||
|
FilePath: s.cfg.OverridesFilePath,
|
||||||
|
ReloadPeriod: s.cfg.OverridesReloadInterval,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the storage server
|
||||||
|
storageServer, err := NewStorageServer(StorageServerOptions{
|
||||||
|
Backend: s.backend,
|
||||||
|
OverridesService: overridesSvc,
|
||||||
|
DB: s.db,
|
||||||
|
Cfg: s.cfg,
|
||||||
|
Tracer: s.tracing,
|
||||||
|
Reg: s.reg,
|
||||||
|
AccessClient: authzClient,
|
||||||
|
StorageMetrics: s.storageMetrics,
|
||||||
|
Features: s.features,
|
||||||
|
QOSQueue: s.queue,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.handler, err = grpcserver.ProvideService(s.cfg, s.features, interceptors.AuthenticatorFunc(s.authenticator), s.tracing, prometheus.DefaultRegisterer)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
healthService, err := resource.ProvideHealthService(storageServer)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
srv := s.handler.GetServer()
|
||||||
|
// Register storage services
|
||||||
|
resourcepb.RegisterResourceStoreServer(srv, storageServer)
|
||||||
|
resourcepb.RegisterBulkStoreServer(srv, storageServer)
|
||||||
|
resourcepb.RegisterBlobStoreServer(srv, storageServer)
|
||||||
|
resourcepb.RegisterDiagnosticsServer(srv, storageServer)
|
||||||
|
resourcepb.RegisterQuotasServer(srv, storageServer)
|
||||||
|
grpc_health_v1.RegisterHealthServer(srv, healthService)
|
||||||
|
|
||||||
|
// register reflection service
|
||||||
|
_, err = grpcserver.ProvideReflectionService(s.cfg, s.handler)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// start the gRPC server
|
||||||
|
go func() {
|
||||||
|
err := s.handler.Run(ctx)
|
||||||
|
if err != nil {
|
||||||
|
s.stoppedCh <- err
|
||||||
|
} else {
|
||||||
|
s.stoppedCh <- nil
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetAddress returns the address of the gRPC server.
|
||||||
|
func (s *storageService) GetAddress() string {
|
||||||
|
return s.handler.GetAddress()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *storageService) running(ctx context.Context) error {
|
||||||
|
select {
|
||||||
|
case err := <-s.stoppedCh:
|
||||||
|
if err != nil && !errors.Is(err, context.Canceled) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
case err := <-s.subservicesWatcher.Chan():
|
||||||
|
return fmt.Errorf("subservice failure: %w", err)
|
||||||
|
case <-ctx.Done():
|
||||||
|
close(s.stopCh)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *storageService) stopping(_ error) error {
|
||||||
|
if s.hasSubservices {
|
||||||
|
err := services.StopManagerAndAwaitStopped(context.Background(), s.subservices)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to stop subservices: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
@@ -32,6 +32,7 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
|
|||||||
nsPrefix := "test-ns"
|
nsPrefix := "test-ns"
|
||||||
|
|
||||||
var server resource.ResourceServer
|
var server resource.ResourceServer
|
||||||
|
var searchServer resource.SearchServer
|
||||||
|
|
||||||
t.Run("Create initial resources in storage", func(t *testing.T) {
|
t.Run("Create initial resources in storage", func(t *testing.T) {
|
||||||
initialResources := []struct {
|
initialResources := []struct {
|
||||||
@@ -96,25 +97,34 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
|
|||||||
})
|
})
|
||||||
|
|
||||||
t.Run("Create a resource server with both backends", func(t *testing.T) {
|
t.Run("Create a resource server with both backends", func(t *testing.T) {
|
||||||
// Create a resource server with both backends
|
// Create search server first
|
||||||
var err error
|
var err error
|
||||||
server, err = resource.NewResourceServer(resource.ResourceServerOptions{
|
searchOpts := resource.SearchOptions{
|
||||||
Backend: backend,
|
Backend: searchBackend,
|
||||||
Search: resource.SearchOptions{
|
Resources: &resource.TestDocumentBuilderSupplier{
|
||||||
Backend: searchBackend,
|
GroupsResources: map[string]string{
|
||||||
Resources: &resource.TestDocumentBuilderSupplier{
|
"test.grafana.app": "testresources",
|
||||||
GroupsResources: map[string]string{
|
|
||||||
"test.grafana.app": "testresources",
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
}
|
||||||
|
searchServer, err = resource.NewSearchServer(searchOpts, backend, nil, nil, nil, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, searchServer)
|
||||||
|
|
||||||
|
// Initialize the search server
|
||||||
|
err = searchServer.Init(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Create a resource server separately from the search server
|
||||||
|
server, err = resource.NewResourceServer(resource.ResourceServerOptions{
|
||||||
|
Backend: backend,
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("Search for initial resources", func(t *testing.T) {
|
t.Run("Search for initial resources", func(t *testing.T) {
|
||||||
// Test 1: Search for initial resources
|
// Test 1: Search for initial resources
|
||||||
searchResp, err := server.Search(ctx, &resourcepb.ResourceSearchRequest{
|
searchResp, err := searchServer.Search(ctx, &resourcepb.ResourceSearchRequest{
|
||||||
Options: &resourcepb.ListOptions{
|
Options: &resourcepb.ListOptions{
|
||||||
Key: &resourcepb.ResourceKey{
|
Key: &resourcepb.ResourceKey{
|
||||||
Group: "test.grafana.app",
|
Group: "test.grafana.app",
|
||||||
@@ -194,7 +204,7 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
|
|||||||
})
|
})
|
||||||
|
|
||||||
t.Run("Search for documents", func(t *testing.T) {
|
t.Run("Search for documents", func(t *testing.T) {
|
||||||
searchResp, err := server.Search(ctx, &resourcepb.ResourceSearchRequest{
|
searchResp, err := searchServer.Search(ctx, &resourcepb.ResourceSearchRequest{
|
||||||
Options: &resourcepb.ListOptions{
|
Options: &resourcepb.ListOptions{
|
||||||
Key: &resourcepb.ResourceKey{
|
Key: &resourcepb.ResourceKey{
|
||||||
Group: "test.grafana.app",
|
Group: "test.grafana.app",
|
||||||
@@ -212,7 +222,7 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
|
|||||||
})
|
})
|
||||||
|
|
||||||
t.Run("Search with tags", func(t *testing.T) {
|
t.Run("Search with tags", func(t *testing.T) {
|
||||||
searchResp, err := server.Search(ctx, &resourcepb.ResourceSearchRequest{
|
searchResp, err := searchServer.Search(ctx, &resourcepb.ResourceSearchRequest{
|
||||||
Options: &resourcepb.ListOptions{
|
Options: &resourcepb.ListOptions{
|
||||||
Key: &resourcepb.ResourceKey{
|
Key: &resourcepb.ResourceKey{
|
||||||
Group: "test.grafana.app",
|
Group: "test.grafana.app",
|
||||||
@@ -231,7 +241,7 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
|
|||||||
require.Equal(t, int64(0), searchResp.TotalHits)
|
require.Equal(t, int64(0), searchResp.TotalHits)
|
||||||
|
|
||||||
// this is the correct way of searching by tag
|
// this is the correct way of searching by tag
|
||||||
searchResp, err = server.Search(ctx, &resourcepb.ResourceSearchRequest{
|
searchResp, err = searchServer.Search(ctx, &resourcepb.ResourceSearchRequest{
|
||||||
Options: &resourcepb.ListOptions{
|
Options: &resourcepb.ListOptions{
|
||||||
Key: &resourcepb.ResourceKey{
|
Key: &resourcepb.ResourceKey{
|
||||||
Group: "test.grafana.app",
|
Group: "test.grafana.app",
|
||||||
@@ -253,7 +263,7 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
|
|||||||
})
|
})
|
||||||
|
|
||||||
t.Run("Search with specific tag", func(t *testing.T) {
|
t.Run("Search with specific tag", func(t *testing.T) {
|
||||||
searchResp, err := server.Search(ctx, &resourcepb.ResourceSearchRequest{
|
searchResp, err := searchServer.Search(ctx, &resourcepb.ResourceSearchRequest{
|
||||||
Options: &resourcepb.ListOptions{
|
Options: &resourcepb.ListOptions{
|
||||||
Key: &resourcepb.ResourceKey{
|
Key: &resourcepb.ResourceKey{
|
||||||
Group: "test.grafana.app",
|
Group: "test.grafana.app",
|
||||||
@@ -272,7 +282,7 @@ func RunTestSearchAndStorage(t *testing.T, ctx context.Context, backend resource
|
|||||||
require.Equal(t, int64(0), searchResp.TotalHits)
|
require.Equal(t, int64(0), searchResp.TotalHits)
|
||||||
|
|
||||||
// this is the correct way of searching by tag
|
// this is the correct way of searching by tag
|
||||||
searchResp, err = server.Search(ctx, &resourcepb.ResourceSearchRequest{
|
searchResp, err = searchServer.Search(ctx, &resourcepb.ResourceSearchRequest{
|
||||||
Options: &resourcepb.ListOptions{
|
Options: &resourcepb.ListOptions{
|
||||||
Key: &resourcepb.ResourceKey{
|
Key: &resourcepb.ResourceKey{
|
||||||
Group: "test.grafana.app",
|
Group: "test.grafana.app",
|
||||||
|
|||||||
Reference in New Issue
Block a user