Provisioning: fix race condition in usage metrics collection (#108289)
* Register usage stats after lister is created For enterprise where metrics are collected more frequently and in different ways the collection was happening before the listing was initialized. * Move usage to separate package
This commit is contained in:
committed by
GitHub
parent
046134db22
commit
446054a61d
@@ -54,6 +54,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/registry/apis/provisioning/resources/signature"
|
||||
"github.com/grafana/grafana/pkg/registry/apis/provisioning/safepath"
|
||||
"github.com/grafana/grafana/pkg/registry/apis/provisioning/secrets"
|
||||
"github.com/grafana/grafana/pkg/registry/apis/provisioning/usage"
|
||||
"github.com/grafana/grafana/pkg/services/apiserver"
|
||||
"github.com/grafana/grafana/pkg/services/apiserver/builder"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
@@ -74,7 +75,8 @@ var (
|
||||
)
|
||||
|
||||
type APIBuilder struct {
|
||||
features featuremgmt.FeatureToggles
|
||||
features featuremgmt.FeatureToggles
|
||||
usageStats usagestats.Service
|
||||
|
||||
tracer tracing.Tracer
|
||||
getter rest.Getter
|
||||
@@ -117,6 +119,7 @@ func NewAPIBuilder(
|
||||
ghFactory *github.Factory,
|
||||
legacyMigrator legacy.LegacyMigrator,
|
||||
storageStatus dualwrite.Service,
|
||||
usageStats usagestats.Service,
|
||||
repositorySecrets secrets.RepositorySecrets,
|
||||
access authlib.AccessChecker,
|
||||
tracer tracing.Tracer,
|
||||
@@ -134,6 +137,7 @@ func NewAPIBuilder(
|
||||
b := &APIBuilder{
|
||||
mutators: mutators,
|
||||
tracer: tracer,
|
||||
usageStats: usageStats,
|
||||
localFileResolver: local,
|
||||
features: features,
|
||||
ghFactory: ghFactory,
|
||||
@@ -184,7 +188,7 @@ func RegisterAPIService(
|
||||
access authlib.AccessClient,
|
||||
legacyMigrator legacy.LegacyMigrator,
|
||||
storageStatus dualwrite.Service,
|
||||
usageStatsService usagestats.Service,
|
||||
usageStats usagestats.Service,
|
||||
repositorySecrets secrets.RepositorySecrets,
|
||||
tracer tracing.Tracer,
|
||||
extraBuilders []ExtraBuilder,
|
||||
@@ -202,13 +206,13 @@ func RegisterAPIService(
|
||||
filepath.Join(cfg.DataPath, "clone"), // where repositories are cloned (temporarialy for now)
|
||||
configProvider, ghFactory,
|
||||
legacyMigrator, storageStatus,
|
||||
usageStats,
|
||||
repositorySecrets,
|
||||
access,
|
||||
tracer,
|
||||
extraBuilders,
|
||||
)
|
||||
apiregistration.RegisterAPI(builder)
|
||||
usageStatsService.RegisterMetricsFunc(builder.collectProvisioningStats)
|
||||
return builder, nil
|
||||
}
|
||||
|
||||
@@ -596,6 +600,10 @@ func (b *APIBuilder) GetPostStartHooks() (map[string]genericapiserver.PostStartH
|
||||
|
||||
b.repositoryLister = repoInformer.Lister()
|
||||
|
||||
// Create the repository resources factory
|
||||
usageMetricCollector := usage.MetricCollector(b.tracer, b.repositoryLister, b.unified)
|
||||
b.usageStats.RegisterMetricsFunc(usageMetricCollector)
|
||||
|
||||
stageIfPossible := repository.WrapWithStageAndPushIfPossible
|
||||
exportWorker := export.NewExportWorker(
|
||||
b.clients,
|
||||
|
||||
@@ -1,78 +0,0 @@
|
||||
package provisioning
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/identity"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
)
|
||||
|
||||
func (b *APIBuilder) collectProvisioningStats(ctx context.Context) (metrics map[string]any, err error) {
|
||||
ctx, span := b.tracer.Start(ctx, "Provisioning.Usage.collectProvisioningStats")
|
||||
defer func() {
|
||||
span.SetStatus(codes.Error, fmt.Sprintf("failed to fetch provisioning usage stats: %v", err))
|
||||
span.End()
|
||||
}()
|
||||
|
||||
m := map[string]any{}
|
||||
if b.unified == nil {
|
||||
span.SetStatus(codes.Ok, "unified storage is not available")
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// FIXME: hardcoded to "default" for now -- it works for single tenant deployments
|
||||
// we could discover the set of valid namespaces, but that would count everything for
|
||||
// each instance in cloud.
|
||||
ns := "default"
|
||||
ctx, _, err = identity.WithProvisioningIdentity(ctx, ns)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ctx = request.WithNamespace(ctx, ns)
|
||||
|
||||
// FIXME: hardcoded to "default" for now -- it works for single tenant deployments
|
||||
// we could discover the set of valid namespaces, but that would count everything for
|
||||
// each instance in cloud.
|
||||
//
|
||||
// We could get namespaces from the list of repos below, but that could be zero
|
||||
// while we still have resources managed by terraform, etc
|
||||
count, err := b.unified.CountManagedObjects(ctx, &resourcepb.CountManagedObjectsRequest{
|
||||
Namespace: ns,
|
||||
})
|
||||
if err != nil {
|
||||
return m, fmt.Errorf("count managed objects: %w", err)
|
||||
}
|
||||
counts := make(map[string]int, 10)
|
||||
for _, v := range count.Items {
|
||||
counts[v.Kind] = counts[v.Kind] + int(v.Count)
|
||||
}
|
||||
|
||||
span.SetAttributes(attribute.Int("totalManagedObjectsCount", len(count.Items)))
|
||||
for k, v := range counts {
|
||||
m[fmt.Sprintf("stats.managed_by.%s.count", k)] = v
|
||||
}
|
||||
|
||||
// Inspect all configs
|
||||
repos, err := b.repositoryLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
return m, fmt.Errorf("list repositories: %w", err)
|
||||
}
|
||||
clear(counts)
|
||||
for _, repo := range repos {
|
||||
counts[string(repo.Spec.Type)] = counts[string(repo.Spec.Type)] + 1
|
||||
}
|
||||
|
||||
span.SetAttributes(attribute.Int("repositoryCount", len(repos)))
|
||||
// Count how many items of each repository type
|
||||
for k, v := range counts {
|
||||
m[fmt.Sprintf("stats.repository.%s.count", k)] = v
|
||||
}
|
||||
|
||||
return m, nil
|
||||
}
|
||||
@@ -0,0 +1,85 @@
|
||||
package usage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/identity"
|
||||
listers "github.com/grafana/grafana/pkg/generated/listers/provisioning/v0alpha1"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/infra/usagestats"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
)
|
||||
|
||||
func MetricCollector(tracer tracing.Tracer, repositoryLister listers.RepositoryLister, unified resource.ResourceClient) usagestats.MetricsFunc {
|
||||
return func(ctx context.Context) (metrics map[string]any, err error) {
|
||||
ctx, span := tracer.Start(ctx, "Provisioning.Usage.collectProvisioningStats")
|
||||
defer func() {
|
||||
span.SetStatus(codes.Error, fmt.Sprintf("failed to fetch provisioning usage stats: %v", err))
|
||||
span.End()
|
||||
}()
|
||||
|
||||
m := map[string]any{}
|
||||
if unified == nil {
|
||||
// FIXME: does this case make any sense? no unified storage -> no game
|
||||
span.SetStatus(codes.Ok, "unified storage is not available")
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// FIXME: hardcoded to "default" for now -- it works for single tenant deployments
|
||||
// we could discover the set of valid namespaces, but that would count everything for
|
||||
// each instance in cloud.
|
||||
ns := "default"
|
||||
ctx, _, err = identity.WithProvisioningIdentity(ctx, ns)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ctx = request.WithNamespace(ctx, ns)
|
||||
|
||||
// FIXME: hardcoded to "default" for now -- it works for single tenant deployments
|
||||
// we could discover the set of valid namespaces, but that would count everything for
|
||||
// each instance in cloud.
|
||||
//
|
||||
// We could get namespaces from the list of repos below, but that could be zero
|
||||
// while we still have resources managed by terraform, etc
|
||||
count, err := unified.CountManagedObjects(ctx, &resourcepb.CountManagedObjectsRequest{
|
||||
Namespace: ns,
|
||||
})
|
||||
if err != nil {
|
||||
return m, fmt.Errorf("count managed objects: %w", err)
|
||||
}
|
||||
counts := make(map[string]int, 10)
|
||||
for _, v := range count.Items {
|
||||
counts[v.Kind] = counts[v.Kind] + int(v.Count)
|
||||
}
|
||||
|
||||
span.SetAttributes(attribute.Int("totalManagedObjectsCount", len(count.Items)))
|
||||
for k, v := range counts {
|
||||
m[fmt.Sprintf("stats.managed_by.%s.count", k)] = v
|
||||
}
|
||||
|
||||
// Inspect all configs
|
||||
repos, err := repositoryLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
return m, fmt.Errorf("list repositories: %w", err)
|
||||
}
|
||||
clear(counts)
|
||||
for _, repo := range repos {
|
||||
counts[string(repo.Spec.Type)] = counts[string(repo.Spec.Type)] + 1
|
||||
}
|
||||
|
||||
span.SetAttributes(attribute.Int("repositoryCount", len(repos)))
|
||||
// Count how many items of each repository type
|
||||
for k, v := range counts {
|
||||
m[fmt.Sprintf("stats.repository.%s.count", k)] = v
|
||||
}
|
||||
|
||||
return m, nil
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user