From df2f5286121fc9e6cc8fbc56465e4b5f33d2397f Mon Sep 17 00:00:00 2001 From: owensmallwood Date: Thu, 27 Nov 2025 10:29:16 -0600 Subject: [PATCH] Unified Storage: Adds overrides service to resource server (#113794) * first pass of adding quotas service resource server * passes prom reg as param init quota service as part of server params * init quota service as part of server params * adds config and only creates quota service when overrides file path is defined * when quota service enabled, check quota on create and log result * update log message * adds tests for quota service * adds tests for config reloading when the file changes * fix linter errors * fix comment * use startAndAwaitRunning * Simplifies quotas service. Call manager.GetConfig() when getting quota instead of watching for changes. * adds tracing to quotas service * adds nsr attributes to traces when getting quotas and resource stats * update comment * update comment remove check for nil overrides since it will (should) never happen * fix linter error * refactors naming to overrides service checks quotas in separate function * fix quotas naming * fixes more quotas -> overrides naming * use logger from ctx * linter - remove trailing whitespace * log FromContext() when checking quotas * adds events to spans instead of create new spans updates tenant -> namespace naming few other minor fixes --- pkg/setting/setting.go | 2 + pkg/setting/setting_unified_storage.go | 4 + pkg/storage/unified/client.go | 13 + pkg/storage/unified/resource/quotas.go | 127 ++++++ pkg/storage/unified/resource/quotas_test.go | 427 ++++++++++++++++++++ pkg/storage/unified/resource/server.go | 80 +++- pkg/storage/unified/sql/backend.go | 7 +- pkg/storage/unified/sql/server.go | 28 +- pkg/storage/unified/sql/service.go | 12 + 9 files changed, 673 insertions(+), 27 deletions(-) create mode 100644 pkg/storage/unified/resource/quotas.go create mode 100644 pkg/storage/unified/resource/quotas_test.go diff --git a/pkg/setting/setting.go b/pkg/setting/setting.go index 470b6910751..f6c0b3d3f19 100644 --- a/pkg/setting/setting.go +++ b/pkg/setting/setting.go @@ -615,6 +615,8 @@ type Cfg struct { HttpsSkipVerify bool ResourceServerJoinRingTimeout time.Duration EnableSearch bool + OverridesFilePath string + OverridesReloadInterval time.Duration // Secrets Management SecretsManagement SecretsManagerSettings diff --git a/pkg/setting/setting_unified_storage.go b/pkg/setting/setting_unified_storage.go index 29cb5d0270a..3623228fe8b 100644 --- a/pkg/setting/setting_unified_storage.go +++ b/pkg/setting/setting_unified_storage.go @@ -94,6 +94,10 @@ func (cfg *Cfg) setUnifiedStorageConfig() { cfg.HttpsSkipVerify = section.Key("https_skip_verify").MustBool(false) cfg.ResourceServerJoinRingTimeout = section.Key("resource_server_join_ring_timeout").MustDuration(10 * time.Second) + // quotas/limits config + cfg.OverridesFilePath = section.Key("overrides_path").String() + cfg.OverridesReloadInterval = section.Key("overrides_reload_period").MustDuration(30 * time.Second) + cfg.MaxFileIndexAge = section.Key("max_file_index_age").MustDuration(0) cfg.MinFileIndexBuildVersion = section.Key("min_file_index_build_version").MustString("") } diff --git a/pkg/storage/unified/client.go b/pkg/storage/unified/client.go index 07165937199..45130c8a6a6 100644 --- a/pkg/storage/unified/client.go +++ b/pkg/storage/unified/client.go @@ -211,6 +211,19 @@ func newClient(opts options.StorageOptions, serverOptions.QOSQueue = queue } + // only enable if an overrides file path is provided + if cfg.OverridesFilePath != "" { + overridesSvc, err := resource.NewOverridesService(ctx, cfg.Logger, reg, tracer, resource.ReloadOptions{ + FilePath: cfg.OverridesFilePath, + ReloadPeriod: cfg.OverridesReloadInterval, + }) + if err != nil { + return nil, err + } + + serverOptions.OverridesService = overridesSvc + } + server, err := sql.NewResourceServer(serverOptions) if err != nil { return nil, err diff --git a/pkg/storage/unified/resource/quotas.go b/pkg/storage/unified/resource/quotas.go new file mode 100644 index 00000000000..d956a6da017 --- /dev/null +++ b/pkg/storage/unified/resource/quotas.go @@ -0,0 +1,127 @@ +package resource + +import ( + "context" + "fmt" + "io" + "os" + "strings" + "time" + + "github.com/grafana/dskit/runtimeconfig" + "github.com/grafana/dskit/services" + "github.com/grafana/grafana/pkg/infra/log" + "github.com/prometheus/client_golang/prometheus" + "go.opentelemetry.io/otel/trace" + "go.yaml.in/yaml/v3" +) + +const DEFAULT_RESOURCE_LIMIT = 1000 + +type OverridesService struct { + manager *runtimeconfig.Manager + logger log.Logger + tracer trace.Tracer +} + +type ReloadOptions struct { + FilePath string + ReloadPeriod time.Duration +} + +// ResourceQuota represents quota limits for a specific resource +type ResourceQuota struct { + Limit int `yaml:"limit"` +} + +// NamespaceOverrides represents all overrides for a tenant +type NamespaceOverrides struct { + Quotas map[string]ResourceQuota `yaml:"quotas"` +} + +// Overrides represents the entire overrides configuration file +type Overrides struct { + Namespaces map[string]NamespaceOverrides +} + +/* +This service loads overrides (currently just quotas) from a YAML file with the following yaml structure: + +"123": + + quotas: + grafana.dashboard.app/dashboards: + limit: 1500 + grafana.folder.app/folders: + limit: 1500 +*/ +func NewOverridesService(_ context.Context, logger log.Logger, reg prometheus.Registerer, tracer trace.Tracer, opts ReloadOptions) (*OverridesService, error) { + // shouldn't be empty since we use file path existence to determine if we should enable the service + if opts.FilePath == "" { + return nil, fmt.Errorf("overrides file path is required") + } + if opts.ReloadPeriod == 0 { + opts.ReloadPeriod = time.Second * 30 + } + + // Check if file exists + if _, err := os.Stat(opts.FilePath); err != nil { + if os.IsNotExist(err) { + return nil, fmt.Errorf("overrides file does not exist: %s", opts.FilePath) + } + return nil, fmt.Errorf("failed to stat overrides file: %w", err) + } + + config := runtimeconfig.Config{ + ReloadPeriod: opts.ReloadPeriod, + LoadPath: []string{opts.FilePath}, + Loader: func(r io.Reader) (interface{}, error) { + var tenants map[string]NamespaceOverrides + decoder := yaml.NewDecoder(r) + if err := decoder.Decode(&tenants); err != nil { + return nil, err + } + return &Overrides{Namespaces: tenants}, nil + }, + } + + manager, err := runtimeconfig.New(config, "tenant-overrides", reg, logger) + if err != nil { + return nil, err + } + + return &OverridesService{ + manager: manager, + logger: logger, + tracer: tracer, + }, nil +} + +func (q *OverridesService) init(ctx context.Context) error { + return services.StartAndAwaitRunning(ctx, q.manager) +} + +func (q *OverridesService) stop(ctx context.Context) error { + return services.StopAndAwaitTerminated(ctx, q.manager) +} + +func (q *OverridesService) GetQuota(_ context.Context, nsr NamespacedResource) (ResourceQuota, error) { + if nsr.Namespace == "" || nsr.Resource == "" || nsr.Group == "" { + return ResourceQuota{}, fmt.Errorf("invalid namespaced resource: %+v", nsr) + } + + overrides, ok := q.manager.GetConfig().(*Overrides) + if !ok { + return ResourceQuota{}, fmt.Errorf("failed to get quota overrides from config manager") + } + + tenantId := strings.TrimPrefix(nsr.Namespace, "stacks-") + groupResource := nsr.Group + "/" + nsr.Resource + if tenantOverrides, ok := overrides.Namespaces[tenantId]; ok { + if resourceQuota, ok := tenantOverrides.Quotas[groupResource]; ok { + return resourceQuota, nil + } + } + + return ResourceQuota{Limit: DEFAULT_RESOURCE_LIMIT}, nil +} diff --git a/pkg/storage/unified/resource/quotas_test.go b/pkg/storage/unified/resource/quotas_test.go new file mode 100644 index 00000000000..97f9f355af6 --- /dev/null +++ b/pkg/storage/unified/resource/quotas_test.go @@ -0,0 +1,427 @@ +package resource + +import ( + "context" + "os" + "path/filepath" + "testing" + "time" + + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/infra/tracing" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewQuotaService(t *testing.T) { + tests := []struct { + name string + opts ReloadOptions + setupFile func(t *testing.T) string + expectError bool + errorMsg string + }{ + { + name: "success with valid file", + opts: ReloadOptions{}, + setupFile: func(t *testing.T) string { + tmpFile := filepath.Join(t.TempDir(), "overrides.yaml") + content := `"123": + quotas: + grafana.dashboard.app/dashboards: + limit: 1500 +` + require.NoError(t, os.WriteFile(tmpFile, []byte(content), 0644)) + return tmpFile + }, + expectError: false, + }, + { + name: "success with custom reload period", + opts: ReloadOptions{ + ReloadPeriod: time.Minute, + }, + setupFile: func(t *testing.T) string { + tmpFile := filepath.Join(t.TempDir(), "overrides.yaml") + require.NoError(t, os.WriteFile(tmpFile, []byte{}, 0644)) + return tmpFile + }, + expectError: false, + }, + { + name: "error when file path is empty", + opts: ReloadOptions{ + FilePath: "", + }, + setupFile: func(t *testing.T) string { return "" }, + expectError: true, + errorMsg: "overrides file path is required", + }, + { + name: "error when file does not exist", + opts: ReloadOptions{ + FilePath: "/nonexistent/path/overrides.yaml", + }, + setupFile: func(t *testing.T) string { return "/nonexistent/path/overrides.yaml" }, + expectError: true, + errorMsg: "overrides file does not exist", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + logger := log.NewNopLogger() + reg := prometheus.NewRegistry() + tcr := tracing.NewNoopTracerService() + + filePath := tt.setupFile(t) + if filePath != "" && tt.opts.FilePath == "" { + tt.opts.FilePath = filePath + } + + service, err := NewOverridesService(ctx, logger, reg, tcr, tt.opts) + + if tt.expectError { + require.Error(t, err) + assert.Contains(t, err.Error(), tt.errorMsg) + assert.Nil(t, service) + } else { + require.NoError(t, err) + assert.NotNil(t, service) + assert.NotNil(t, service.manager) + assert.NotNil(t, service.logger) + } + }) + } +} + +func TestQuotaService_ConfigReload(t *testing.T) { + ctx := context.Background() + logger := log.NewNopLogger() + reg := prometheus.NewRegistry() + tcr := tracing.NewNoopTracerService() + + // Create a temporary config file + tmpFile := filepath.Join(t.TempDir(), "overrides.yaml") + initialConfig := `"123": + quotas: + grafana.dashboard.app/dashboards: + limit: 1500 +` + require.NoError(t, os.WriteFile(tmpFile, []byte(initialConfig), 0644)) + + // Create service with a very short reload period + service, err := NewOverridesService(ctx, logger, reg, tcr, ReloadOptions{ + FilePath: tmpFile, + ReloadPeriod: 100 * time.Millisecond, // Very short reload period for testing + }) + require.NoError(t, err) + require.NotNil(t, service) + + // Initialize the service + err = service.init(ctx) + require.NoError(t, err) + defer func(service *OverridesService, ctx context.Context) { + err := service.stop(ctx) + require.NoError(t, err) + }(service, ctx) + + // Verify initial config + nsr := NamespacedResource{ + Namespace: "stacks-123", + Group: "grafana.dashboard.app", + Resource: "dashboards", + } + quota, err := service.GetQuota(ctx, nsr) + require.NoError(t, err) + assert.Equal(t, 1500, quota.Limit, "initial quota should be 1500") + + // Update the config file with new values + updatedConfig := `"123": + quotas: + grafana.dashboard.app/dashboards: + limit: 2500 +"456": + quotas: + grafana.folder.app/folders: + limit: 3000 +` + require.NoError(t, os.WriteFile(tmpFile, []byte(updatedConfig), 0644)) + + // Wait for the config to be reloaded (wait longer than reload period) + time.Sleep(500 * time.Millisecond) + + // Verify the config was updated for existing tenant + quota, err = service.GetQuota(ctx, nsr) + require.NoError(t, err) + assert.Equal(t, 2500, quota.Limit, "quota should be updated to 2500") + + // Verify new tenant config is also loaded + nsr2 := NamespacedResource{ + Namespace: "stacks-456", + Group: "grafana.folder.app", + Resource: "folders", + } + quota2, err := service.GetQuota(ctx, nsr2) + require.NoError(t, err) + assert.Equal(t, 3000, quota2.Limit, "new tenant quota should be 3000") +} + +func TestQuotaService_GetQuota(t *testing.T) { + tests := []struct { + name string + setupFile func(t *testing.T) string + nsr NamespacedResource + expectedLimit int + expectError bool + errorMsg string + description string + }{ + { + name: "returns custom quota for matching tenant and resource", + setupFile: func(t *testing.T) string { + tmpFile := filepath.Join(t.TempDir(), "overrides.yaml") + content := `"123": + quotas: + grafana.dashboard.app/dashboards: + limit: 1500 +` + require.NoError(t, os.WriteFile(tmpFile, []byte(content), 0644)) + return tmpFile + }, + nsr: NamespacedResource{ + Namespace: "stacks-123", + Group: "grafana.dashboard.app", + Resource: "dashboards", + }, + expectedLimit: 1500, + expectError: false, + description: "should return custom limit for matching tenant", + }, + { + name: "returns default quota when tenant not found", + setupFile: func(t *testing.T) string { + tmpFile := filepath.Join(t.TempDir(), "overrides.yaml") + content := `"123": + quotas: + grafana.dashboard.app/dashboards: + limit: 1500 +` + require.NoError(t, os.WriteFile(tmpFile, []byte(content), 0644)) + return tmpFile + }, + nsr: NamespacedResource{ + Namespace: "stacks-456", + Group: "grafana.dashboard.app", + Resource: "dashboards", + }, + expectedLimit: DEFAULT_RESOURCE_LIMIT, + expectError: false, + description: "should return default limit when tenant not found", + }, + { + name: "returns default quota when resource not found", + setupFile: func(t *testing.T) string { + tmpFile := filepath.Join(t.TempDir(), "overrides.yaml") + content := `"123": + quotas: + grafana.dashboard.app/dashboards: + limit: 1500 +` + require.NoError(t, os.WriteFile(tmpFile, []byte(content), 0644)) + return tmpFile + }, + nsr: NamespacedResource{ + Namespace: "stacks-123", + Group: "grafana.folder.app", + Resource: "folders", + }, + expectedLimit: DEFAULT_RESOURCE_LIMIT, + expectError: false, + description: "should return default limit when resource not found", + }, + { + name: "handles namespace without stacks- prefix", + setupFile: func(t *testing.T) string { + tmpFile := filepath.Join(t.TempDir(), "overrides.yaml") + content := `"123": + quotas: + grafana.dashboard.app/dashboards: + limit: 1500 +` + require.NoError(t, os.WriteFile(tmpFile, []byte(content), 0644)) + return tmpFile + }, + nsr: NamespacedResource{ + Namespace: "123", + Group: "grafana.dashboard.app", + Resource: "dashboards", + }, + expectedLimit: 1500, + expectError: false, + description: "should handle namespace without stacks- prefix", + }, + { + name: "returns default quota when config is empty", + setupFile: func(t *testing.T) string { + tmpFile := filepath.Join(t.TempDir(), "overrides.yaml") + content := "" + require.NoError(t, os.WriteFile(tmpFile, []byte(content), 0644)) + return tmpFile + }, + nsr: NamespacedResource{ + Namespace: "stacks-123", + Group: "grafana.dashboard.app", + Resource: "dashboards", + }, + expectedLimit: DEFAULT_RESOURCE_LIMIT, + expectError: false, + description: "should return default limit when config is empty", + }, + { + name: "handles multiple resources for same tenant", + setupFile: func(t *testing.T) string { + tmpFile := filepath.Join(t.TempDir(), "overrides.yaml") + content := `"123": + quotas: + grafana.dashboard.app/dashboards: + limit: 1500 + grafana.folder.app/folders: + limit: 2500 +` + require.NoError(t, os.WriteFile(tmpFile, []byte(content), 0644)) + return tmpFile + }, + nsr: NamespacedResource{ + Namespace: "stacks-123", + Group: "grafana.folder.app", + Resource: "folders", + }, + expectedLimit: 2500, + expectError: false, + description: "should return correct limit for specific resource", + }, + { + name: "returns error when namespace is empty", + setupFile: func(t *testing.T) string { + tmpFile := filepath.Join(t.TempDir(), "overrides.yaml") + content := `"123": + quotas: + grafana.dashboard.app/dashboards: + limit: 1500 + grafana.folder.app/folders: + limit: 2500 +` + require.NoError(t, os.WriteFile(tmpFile, []byte(content), 0644)) + return tmpFile + }, + nsr: NamespacedResource{ + Namespace: "", + Group: "grafana.dashboard.app", + Resource: "dashboards", + }, + expectError: true, + errorMsg: "invalid namespaced resource", + description: "should return error when namespace is empty", + }, + { + name: "returns error when group is empty", + setupFile: func(t *testing.T) string { + tmpFile := filepath.Join(t.TempDir(), "overrides.yaml") + content := `"123": + quotas: + grafana.dashboard.app/dashboards: + limit: 1500 + grafana.folder.app/folders: + limit: 2500 +` + require.NoError(t, os.WriteFile(tmpFile, []byte(content), 0644)) + return tmpFile + }, + nsr: NamespacedResource{ + Namespace: "stacks-123", + Group: "", + Resource: "dashboards", + }, + expectError: true, + errorMsg: "invalid namespaced resource", + description: "should return error when group is empty", + }, + { + name: "returns error when resource is empty", + setupFile: func(t *testing.T) string { + tmpFile := filepath.Join(t.TempDir(), "overrides.yaml") + content := `"123": + quotas: + grafana.dashboard.app/dashboards: + limit: 1500 + grafana.folder.app/folders: + limit: 2500 +` + require.NoError(t, os.WriteFile(tmpFile, []byte(content), 0644)) + return tmpFile + }, + nsr: NamespacedResource{ + Namespace: "stacks-123", + Group: "grafana.dashboard.app", + Resource: "", + }, + expectError: true, + errorMsg: "invalid namespaced resource", + description: "should return error when resource is empty", + }, + { + name: "returns error when all fields are empty", + setupFile: func(t *testing.T) string { + tmpFile := filepath.Join(t.TempDir(), "overrides.yaml") + content := `"123": + quotas: + grafana.dashboard.app/dashboards: + limit: 1500 + grafana.folder.app/folders: + limit: 2500 +` + require.NoError(t, os.WriteFile(tmpFile, []byte(content), 0644)) + return tmpFile + }, + nsr: NamespacedResource{ + Namespace: "", + Group: "", + Resource: "", + }, + expectError: true, + errorMsg: "invalid namespaced resource", + description: "should return error when all fields are empty", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + logger := log.NewNopLogger() + reg := prometheus.NewRegistry() + tcr := tracing.NewNoopTracerService() + opts := ReloadOptions{ + FilePath: tt.setupFile(t), + } + + service, err := NewOverridesService(ctx, logger, reg, tcr, opts) + require.NoError(t, err, "failed to create quota service") + err = service.init(ctx) + require.NoError(t, err, "failed to initialize quota service") + + quota, err := service.GetQuota(ctx, tt.nsr) + + if tt.expectError { + require.Error(t, err, tt.description) + assert.Contains(t, err.Error(), tt.errorMsg, tt.description) + assert.Equal(t, ResourceQuota{}, quota, "should return empty quota on error") + } else { + require.NoError(t, err, tt.description) + assert.Equal(t, tt.expectedLimit, quota.Limit, tt.description) + } + }) + } +} diff --git a/pkg/storage/unified/resource/server.go b/pkg/storage/unified/resource/server.go index 4c0adddf3c8..dd091c6e2d4 100644 --- a/pkg/storage/unified/resource/server.go +++ b/pkg/storage/unified/resource/server.go @@ -14,6 +14,8 @@ import ( "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -220,6 +222,9 @@ type ResourceServerOptions struct { // Search options Search SearchOptions + // Quota service + OverridesService *OverridesService + // Diagnostics Diagnostics resourcepb.DiagnosticsServer @@ -342,6 +347,7 @@ func NewResourceServer(opts ResourceServerOptions) (*server, error) { reg: opts.Reg, queue: opts.QOSQueue, queueConfig: opts.QOSConfig, + overridesService: opts.OverridesService, artificialSuccessfulWriteDelay: opts.Search.IndexMinUpdateInterval, } @@ -366,19 +372,20 @@ func NewResourceServer(opts ResourceServerOptions) (*server, error) { var _ ResourceServer = &server{} type server struct { - log log.Logger - backend StorageBackend - blob BlobSupport - secure secrets.InlineSecureValueSupport - search *searchSupport - diagnostics resourcepb.DiagnosticsServer - access claims.AccessClient - writeHooks WriteAccessHooks - lifecycle LifecycleHooks - now func() int64 - mostRecentRV atomic.Int64 // The most recent resource version seen by the server - storageMetrics *StorageMetrics - indexMetrics *BleveIndexMetrics + log log.Logger + backend StorageBackend + blob BlobSupport + secure secrets.InlineSecureValueSupport + search *searchSupport + diagnostics resourcepb.DiagnosticsServer + access claims.AccessClient + writeHooks WriteAccessHooks + lifecycle LifecycleHooks + now func() int64 + mostRecentRV atomic.Int64 // The most recent resource version seen by the server + storageMetrics *StorageMetrics + indexMetrics *BleveIndexMetrics + overridesService *OverridesService // Background watch task -- this has permissions for everything ctx context.Context @@ -411,6 +418,11 @@ func (s *server) Init(ctx context.Context) error { } } + // initialize tenant overrides service + if s.initErr == nil && s.overridesService != nil { + s.initErr = s.overridesService.init(ctx) + } + // initialize the search index if s.initErr == nil && s.search != nil { s.initErr = s.search.init(ctx) @@ -444,6 +456,13 @@ func (s *server) Stop(ctx context.Context) error { s.search.stop() } + if s.overridesService != nil { + if err := s.overridesService.stop(ctx); err != nil { + stopFailed = true + s.initErr = fmt.Errorf("service stopeed with error: %w", err) + } + } + // Stops the streaming s.cancel() @@ -647,6 +666,13 @@ func (s *server) Create(ctx context.Context, req *resourcepb.CreateRequest) (*re ctx, span := tracer.Start(ctx, "resource.server.Create") defer span.End() + // check quotas and log for now + s.checkQuota(ctx, NamespacedResource{ + Namespace: req.Key.Namespace, + Group: req.Key.Group, + Resource: req.Key.Resource, + }) + if r := verifyRequestKey(req.Key); r != nil { return nil, fmt.Errorf("invalid request key: %s", r.Message) } @@ -1549,3 +1575,31 @@ func (s *server) RebuildIndexes(ctx context.Context, req *resourcepb.RebuildInde return s.search.RebuildIndexes(ctx, req) } + +func (s *server) checkQuota(ctx context.Context, nsr NamespacedResource) { + span := trace.SpanFromContext(ctx) + span.AddEvent("checkQuota", trace.WithAttributes( + attribute.String("namespace", nsr.Namespace), + attribute.String("group", nsr.Group), + attribute.String("resource", nsr.Resource), + )) + + if s.overridesService == nil { + return + } + + quota, err := s.overridesService.GetQuota(ctx, nsr) + if err != nil { + s.log.FromContext(ctx).Error("failed to get quota for resource", "namespace", nsr.Namespace, "group", nsr.Group, "resource", nsr.Resource, "error", err) + return + } + + stats, err := s.backend.GetResourceStats(ctx, nsr, 0) + if err != nil { + s.log.FromContext(ctx).Error("failed to get resource stats for quota checking", "namespace", nsr.Namespace, "group", nsr.Group, "resource", nsr.Resource, "error", err) + return + } + if len(stats) > 0 && stats[0].Count >= int64(quota.Limit) { + s.log.FromContext(ctx).Info("Quota exceeded on create", "namespace", nsr.Namespace, "group", nsr.Group, "resource", nsr.Resource, "quota", quota.Limit, "count", stats[0].Count, "stats_resource", stats[0].Resource) + } +} diff --git a/pkg/storage/unified/sql/backend.go b/pkg/storage/unified/sql/backend.go index 8de43f318b0..0abc2fd7329 100644 --- a/pkg/storage/unified/sql/backend.go +++ b/pkg/storage/unified/sql/backend.go @@ -14,6 +14,7 @@ import ( "github.com/jackc/pgx/v5/pgconn" "github.com/lib/pq" "github.com/prometheus/client_golang/prometheus" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace/noop" "go.uber.org/atomic" @@ -263,7 +264,11 @@ func (b *backend) Stop(_ context.Context) error { // GetResourceStats implements Backend. func (b *backend) GetResourceStats(ctx context.Context, nsr resource.NamespacedResource, minCount int) ([]resource.ResourceStats, error) { - ctx, span := b.tracer.Start(ctx, tracePrefix+"GetResourceStats") + ctx, span := b.tracer.Start(ctx, tracePrefix+"GetResourceStats", trace.WithAttributes( + attribute.String("namespace", nsr.Namespace), + attribute.String("group", nsr.Group), + attribute.String("resource", nsr.Resource), + )) defer span.End() req := &sqlStatsRequest{ diff --git a/pkg/storage/unified/sql/server.go b/pkg/storage/unified/sql/server.go index 1c2b02d8637..f91baa7a695 100644 --- a/pkg/storage/unified/sql/server.go +++ b/pkg/storage/unified/sql/server.go @@ -30,19 +30,20 @@ type QOSEnqueueDequeuer interface { // ServerOptions contains the options for creating a new ResourceServer type ServerOptions struct { - Backend resource.StorageBackend - DB infraDB.DB - Cfg *setting.Cfg - Tracer trace.Tracer - Reg prometheus.Registerer - AccessClient types.AccessClient - SearchOptions resource.SearchOptions - StorageMetrics *resource.StorageMetrics - IndexMetrics *resource.BleveIndexMetrics - Features featuremgmt.FeatureToggles - QOSQueue QOSEnqueueDequeuer - SecureValues secrets.InlineSecureValueSupport - OwnsIndexFn func(key resource.NamespacedResource) (bool, error) + Backend resource.StorageBackend + OverridesService *resource.OverridesService + DB infraDB.DB + Cfg *setting.Cfg + Tracer trace.Tracer + Reg prometheus.Registerer + AccessClient types.AccessClient + SearchOptions resource.SearchOptions + StorageMetrics *resource.StorageMetrics + IndexMetrics *resource.BleveIndexMetrics + Features featuremgmt.FeatureToggles + QOSQueue QOSEnqueueDequeuer + SecureValues secrets.InlineSecureValueSupport + OwnsIndexFn func(key resource.NamespacedResource) (bool, error) } func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) { @@ -119,6 +120,7 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) { serverOptions.IndexMetrics = opts.IndexMetrics serverOptions.QOSQueue = opts.QOSQueue serverOptions.OwnsIndexFn = opts.OwnsIndexFn + serverOptions.OverridesService = opts.OverridesService return resource.NewResourceServer(serverOptions) } diff --git a/pkg/storage/unified/sql/service.go b/pkg/storage/unified/sql/service.go index 00e2ae73750..334c2dfee76 100644 --- a/pkg/storage/unified/sql/service.go +++ b/pkg/storage/unified/sql/service.go @@ -279,6 +279,18 @@ func (s *service) starting(ctx context.Context) error { QOSQueue: s.queue, OwnsIndexFn: s.OwnsIndex, } + + if s.cfg.OverridesFilePath != "" { + overridesSvc, err := resource.NewOverridesService(context.Background(), s.log, s.reg, s.tracing, resource.ReloadOptions{ + FilePath: s.cfg.OverridesFilePath, + ReloadPeriod: s.cfg.OverridesReloadInterval, + }) + if err != nil { + return err + } + serverOptions.OverridesService = overridesSvc + } + server, err := NewResourceServer(serverOptions) if err != nil { return err