Provisioning: Add metrics to jobs (#111447)

This commit is contained in:
Stephanie Hingtgen
2025-09-22 13:05:34 -06:00
committed by GitHub
parent 172e040065
commit b02d2762f5
12 changed files with 316 additions and 98 deletions
@@ -8,26 +8,26 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"github.com/grafana/grafana-app-sdk/logging"
provisioning "github.com/grafana/grafana/apps/provisioning/pkg/apis/provisioning/v0alpha1"
"github.com/grafana/grafana/apps/provisioning/pkg/repository"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/resources"
"github.com/prometheus/client_golang/prometheus"
)
type Worker struct {
syncWorker jobs.Worker
wrapFn repository.WrapWithStageFn
resourcesFactory resources.RepositoryResourcesFactory
registry prometheus.Registerer
metrics jobs.JobMetrics
}
func NewWorker(syncWorker jobs.Worker, wrapFn repository.WrapWithStageFn, resourcesFactory resources.RepositoryResourcesFactory, registry prometheus.Registerer) *Worker {
func NewWorker(syncWorker jobs.Worker, wrapFn repository.WrapWithStageFn, resourcesFactory resources.RepositoryResourcesFactory, metrics jobs.JobMetrics) *Worker {
return &Worker{
syncWorker: syncWorker,
wrapFn: wrapFn,
resourcesFactory: resourcesFactory,
registry: registry,
metrics: metrics,
}
}
@@ -40,8 +40,15 @@ func (w *Worker) Process(ctx context.Context, repo repository.Repository, job pr
return errors.New("missing delete settings")
}
logger := logging.FromContext(ctx).With("job", job.GetName(), "namespace", job.GetNamespace())
opts := *job.Spec.Delete
paths := opts.Paths
start := time.Now()
outcome := jobs.ErrorOutcome
resourcesDeleted := 0
defer func() {
w.metrics.RecordJob(string(provisioning.JobActionDelete), outcome, resourcesDeleted, time.Since(start).Seconds())
}()
progress.SetTotal(ctx, len(paths)+len(opts.Resources))
progress.StrictMaxErrors(1) // Fail fast on any error during deletion
@@ -49,6 +56,7 @@ func (w *Worker) Process(ctx context.Context, repo repository.Repository, job pr
fn := func(repo repository.Repository, _ bool) error {
rw, ok := repo.(repository.ReaderWriter)
if !ok {
logger.Error("delete job submitted targeting repository that is not a ReaderWriter")
return errors.New("delete job submitted targeting repository that is not a ReaderWriter")
}
@@ -56,6 +64,7 @@ func (w *Worker) Process(ctx context.Context, repo repository.Repository, job pr
if len(opts.Resources) > 0 {
resolvedPaths, err := w.resolveResourcesToPaths(ctx, rw, progress, opts.Resources)
if err != nil {
logger.Error("failed to resolve resource paths", "error", err)
return err
}
paths = append(paths, resolvedPaths...)
@@ -78,6 +87,7 @@ func (w *Worker) Process(ctx context.Context, repo repository.Repository, job pr
err := w.wrapFn(ctx, repo, stageOptions, fn)
if err != nil {
logger.Error("failed to delete files from repository", "error", err)
return fmt.Errorf("delete files from repository: %w", err)
}
@@ -104,10 +114,17 @@ func (w *Worker) Process(ctx context.Context, repo repository.Repository, job pr
}
if err := w.syncWorker.Process(ctx, repo, syncJob, progress); err != nil {
logger.Error("failed to pull resources", "error", err)
return fmt.Errorf("pull resources: %w", err)
}
}
outcome = jobs.SuccessOutcome
jobStatus := progress.Complete(ctx, nil)
for _, summary := range jobStatus.Summary {
resourcesDeleted += int(summary.Delete)
}
return nil
}
@@ -38,6 +38,7 @@ func (s *simpleRepository) Test(ctx context.Context) (*v0alpha1.TestResults, err
}
func TestDeleteWorker_IsSupported(t *testing.T) {
metrics := jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry())
tests := []struct {
name string
job v0alpha1.Job
@@ -74,7 +75,7 @@ func TestDeleteWorker_IsSupported(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
worker := NewWorker(nil, nil, nil, prometheus.DefaultRegisterer)
worker := NewWorker(nil, nil, nil, metrics)
result := worker.IsSupported(context.Background(), tt.job)
require.Equal(t, tt.expected, result)
})
@@ -88,7 +89,7 @@ func TestDeleteWorker_ProcessMissingDeleteSettings(t *testing.T) {
},
}
worker := NewWorker(nil, nil, nil, prometheus.DefaultRegisterer)
worker := NewWorker(nil, nil, nil, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := worker.Process(context.Background(), nil, job, nil)
require.EqualError(t, err, "missing delete settings")
}
@@ -117,7 +118,7 @@ func TestDeleteWorker_ProcessNotReaderWriter(t *testing.T) {
mockProgress.On("SetTotal", mock.Anything, 1).Return()
mockProgress.On("StrictMaxErrors", 1).Return()
worker := NewWorker(nil, mockWrapFn.Execute, nil, prometheus.DefaultRegisterer)
worker := NewWorker(nil, mockWrapFn.Execute, nil, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := worker.Process(context.Background(), mockRepo, job, mockProgress)
require.EqualError(t, err, "delete files from repository: delete job submitted targeting repository that is not a ReaderWriter")
}
@@ -140,7 +141,7 @@ func TestDeleteWorker_ProcessWrapFnError(t *testing.T) {
mockProgress.On("SetTotal", mock.Anything, 1).Return()
mockProgress.On("StrictMaxErrors", 1).Return()
worker := NewWorker(nil, mockWrapFn.Execute, nil, prometheus.DefaultRegisterer)
worker := NewWorker(nil, mockWrapFn.Execute, nil, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := worker.Process(context.Background(), mockRepo, job, mockProgress)
require.EqualError(t, err, "delete files from repository: stage failed")
}
@@ -177,6 +178,7 @@ func TestDeleteWorker_ProcessDeleteFilesSuccess(t *testing.T) {
mockProgress.On("SetMessage", mock.Anything, "Deleting test/path1").Return()
mockProgress.On("SetMessage", mock.Anything, "Deleting test/path2").Return()
mockProgress.On("TooManyErrors").Return(nil).Twice()
mockProgress.On("Complete", mock.Anything, mock.Anything).Return(v0alpha1.JobStatus{})
mockRepo.On("Delete", mock.Anything, "test/path1", "main", "Delete test/path1").Return(nil)
mockRepo.On("Delete", mock.Anything, "test/path2", "main", "Delete test/path2").Return(nil)
@@ -188,7 +190,7 @@ func TestDeleteWorker_ProcessDeleteFilesSuccess(t *testing.T) {
return result.Path == "test/path2" && result.Action == repository.FileActionDeleted && result.Error == nil
})).Return()
worker := NewWorker(nil, mockWrapFn.Execute, nil, prometheus.DefaultRegisterer)
worker := NewWorker(nil, mockWrapFn.Execute, nil, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := worker.Process(context.Background(), mockRepo, job, mockProgress)
require.NoError(t, err)
}
@@ -226,7 +228,7 @@ func TestDeleteWorker_ProcessDeleteFilesWithError(t *testing.T) {
})).Return()
mockProgress.On("TooManyErrors").Return(errors.New("too many errors"))
worker := NewWorker(nil, mockWrapFn.Execute, nil, prometheus.DefaultRegisterer)
worker := NewWorker(nil, mockWrapFn.Execute, nil, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := worker.Process(context.Background(), mockRepo, job, mockProgress)
require.EqualError(t, err, "delete files from repository: too many errors")
}
@@ -256,6 +258,7 @@ func TestDeleteWorker_ProcessWithSyncWorker(t *testing.T) {
mockProgress.On("StrictMaxErrors", 1).Return()
mockProgress.On("SetMessage", mock.Anything, "Deleting test/path").Return()
mockProgress.On("TooManyErrors").Return(nil)
mockProgress.On("Complete", mock.Anything, mock.Anything).Return(v0alpha1.JobStatus{})
mockRepo.On("Delete", mock.Anything, "test/path", "", "Delete test/path").Return(nil)
@@ -270,7 +273,7 @@ func TestDeleteWorker_ProcessWithSyncWorker(t *testing.T) {
return syncJob.Spec.Pull != nil && !syncJob.Spec.Pull.Incremental
}), mockProgress).Return(nil)
worker := NewWorker(mockSyncWorker, mockWrapFn.Execute, nil, prometheus.DefaultRegisterer)
worker := NewWorker(mockSyncWorker, mockWrapFn.Execute, nil, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := worker.Process(context.Background(), mockRepo, job, mockProgress)
require.NoError(t, err)
}
@@ -310,7 +313,7 @@ func TestDeleteWorker_ProcessSyncWorkerError(t *testing.T) {
syncError := errors.New("sync failed")
mockSyncWorker.On("Process", mock.Anything, mockRepo, mock.Anything, mockProgress).Return(syncError)
worker := NewWorker(mockSyncWorker, mockWrapFn.Execute, nil, prometheus.DefaultRegisterer)
worker := NewWorker(mockSyncWorker, mockWrapFn.Execute, nil, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := worker.Process(context.Background(), mockRepo, job, mockProgress)
require.EqualError(t, err, "pull resources: sync failed")
}
@@ -379,7 +382,7 @@ func TestDeleteWorker_deleteFiles(t *testing.T) {
}
}
worker := NewWorker(nil, nil, nil, prometheus.DefaultRegisterer)
worker := NewWorker(nil, nil, nil, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := worker.deleteFiles(context.Background(), mockRepo, mockProgress, opts, tt.paths...)
if tt.expectedError != "" {
@@ -457,6 +460,7 @@ func TestDeleteWorker_ProcessWithResourceRefs(t *testing.T) {
mockProgress.On("SetMessage", mock.Anything, "Deleting dashboards/test-dashboard.json").Return()
mockProgress.On("SetMessage", mock.Anything, "Deleting folders/test-folder.json").Return()
mockProgress.On("TooManyErrors").Return(nil).Times(3)
mockProgress.On("Complete", mock.Anything, mock.Anything).Return(v0alpha1.JobStatus{})
// Mock file deletions
mockRepo.On("Delete", mock.Anything, "test/path1", "main", "Delete test/path1").Return(nil)
@@ -474,7 +478,7 @@ func TestDeleteWorker_ProcessWithResourceRefs(t *testing.T) {
return result.Path == "folders/test-folder.json" && result.Action == repository.FileActionDeleted && result.Error == nil
})).Return()
worker := NewWorker(nil, mockWrapFn.Execute, mockResourcesFactory, prometheus.DefaultRegisterer)
worker := NewWorker(nil, mockWrapFn.Execute, mockResourcesFactory, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := worker.Process(context.Background(), mockRepo, job, mockProgress)
require.NoError(t, err)
@@ -525,6 +529,7 @@ func TestDeleteWorker_ProcessResourceRefsOnly(t *testing.T) {
mockProgress.On("SetMessage", mock.Anything, "Finding path for resource dashboard.grafana.app/Dashboard/test-dashboard").Return()
mockProgress.On("SetMessage", mock.Anything, "Deleting dashboards/test-dashboard.json").Return()
mockProgress.On("TooManyErrors").Return(nil)
mockProgress.On("Complete", mock.Anything, mock.Anything).Return(v0alpha1.JobStatus{})
mockRepo.On("Delete", mock.Anything, "dashboards/test-dashboard.json", "main", "Delete dashboards/test-dashboard.json").Return(nil)
@@ -532,7 +537,7 @@ func TestDeleteWorker_ProcessResourceRefsOnly(t *testing.T) {
return result.Path == "dashboards/test-dashboard.json" && result.Action == repository.FileActionDeleted && result.Error == nil
})).Return()
worker := NewWorker(nil, mockWrapFn.Execute, mockResourcesFactory, prometheus.DefaultRegisterer)
worker := NewWorker(nil, mockWrapFn.Execute, mockResourcesFactory, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := worker.Process(context.Background(), mockRepo, job, mockProgress)
require.NoError(t, err)
}
@@ -578,6 +583,7 @@ func TestDeleteWorker_ProcessResourceResolutionError(t *testing.T) {
mockProgress.On("StrictMaxErrors", 1).Return()
mockProgress.On("SetMessage", mock.Anything, "Resolving resource paths").Return()
mockProgress.On("SetMessage", mock.Anything, "Finding path for resource dashboard.grafana.app/Dashboard/nonexistent-dashboard").Return()
mockProgress.On("Complete", mock.Anything, mock.Anything).Return(v0alpha1.JobStatus{})
// Expect error to be recorded, not thrown
mockProgress.On("Record", mock.Anything, mock.MatchedBy(func(result jobs.JobResourceResult) bool {
@@ -597,7 +603,7 @@ func TestDeleteWorker_ProcessResourceResolutionError(t *testing.T) {
return syncJob.Spec.Pull != nil && !syncJob.Spec.Pull.Incremental
}), mockProgress).Return(nil)
worker := NewWorker(mockSyncWorker, mockWrapFn.Execute, mockResourcesFactory, prometheus.DefaultRegisterer)
worker := NewWorker(mockSyncWorker, mockWrapFn.Execute, mockResourcesFactory, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := worker.Process(context.Background(), mockRepo, job, mockProgress)
require.NoError(t, err) // Should succeed even with resource resolution error
}
@@ -636,7 +642,7 @@ func TestDeleteWorker_ProcessResourcesFactoryError(t *testing.T) {
mockProgress.On("StrictMaxErrors", 1).Return()
mockProgress.On("SetMessage", mock.Anything, "Resolving resource paths").Return()
worker := NewWorker(nil, mockWrapFn.Execute, mockResourcesFactory, prometheus.DefaultRegisterer)
worker := NewWorker(nil, mockWrapFn.Execute, mockResourcesFactory, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := worker.Process(context.Background(), mockRepo, job, mockProgress)
require.EqualError(t, err, "delete files from repository: create repository resources client: failed to create repository resources client")
}
@@ -672,7 +678,7 @@ func TestDeleteWorker_ProcessResourceRefsNotReaderWriter(t *testing.T) {
mockProgress.On("SetTotal", mock.Anything, 1).Return()
mockProgress.On("StrictMaxErrors", 1).Return()
worker := NewWorker(nil, mockWrapFn.Execute, mockResourcesFactory, prometheus.DefaultRegisterer)
worker := NewWorker(nil, mockWrapFn.Execute, mockResourcesFactory, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := worker.Process(context.Background(), mockRepo, job, mockProgress)
require.EqualError(t, err, "delete files from repository: delete job submitted targeting repository that is not a ReaderWriter")
}
@@ -725,7 +731,7 @@ func TestDeleteWorker_ProcessResourceResolutionTooManyErrors(t *testing.T) {
})).Return()
mockProgress.On("TooManyErrors").Return(errors.New("too many errors"))
worker := NewWorker(nil, mockWrapFn.Execute, mockResourcesFactory, prometheus.DefaultRegisterer)
worker := NewWorker(nil, mockWrapFn.Execute, mockResourcesFactory, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := worker.Process(context.Background(), mockRepo, job, mockProgress)
require.EqualError(t, err, "delete files from repository: too many errors")
}
@@ -801,7 +807,7 @@ func TestDeleteWorker_ProcessMixedResourcesWithPartialFailure(t *testing.T) {
mockProgress.On("SetMessage", mock.Anything, "Finding path for resource folder.grafana.app/Folder/valid-folder").Return()
mockProgress.On("SetMessage", mock.Anything, "Deleting dashboards/valid-dashboard.json").Return()
mockProgress.On("SetMessage", mock.Anything, "Deleting folders/valid-folder.json").Return()
mockProgress.On("Complete", mock.Anything, mock.Anything).Return(v0alpha1.JobStatus{})
// Record the error for the failed resource
mockProgress.On("Record", mock.Anything, mock.MatchedBy(func(result jobs.JobResourceResult) bool {
return result.Name == "nonexistent-dashboard" && result.Error != nil
@@ -822,7 +828,7 @@ func TestDeleteWorker_ProcessMixedResourcesWithPartialFailure(t *testing.T) {
return result.Path == "folders/valid-folder.json" && result.Error == nil
})).Return()
worker := NewWorker(nil, mockWrapFn.Execute, mockResourcesFactory, prometheus.DefaultRegisterer)
worker := NewWorker(nil, mockWrapFn.Execute, mockResourcesFactory, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := worker.Process(context.Background(), mockRepo, job, mockProgress)
require.NoError(t, err) // Should succeed overall, with only the failed resource recorded as error
}
@@ -875,7 +881,7 @@ func TestDeleteWorker_ProcessWithPathDeduplication(t *testing.T) {
// Expect total of 5 items (2 explicit paths + 3 resources), but only 3 unique paths will be deleted
mockProgress.On("SetTotal", mock.Anything, 5).Return()
mockProgress.On("StrictMaxErrors", 1).Return()
mockProgress.On("Complete", mock.Anything, mock.Anything).Return(v0alpha1.JobStatus{})
// Resource resolution phase
mockProgress.On("SetMessage", mock.Anything, "Resolving resource paths").Return()
@@ -920,7 +926,7 @@ func TestDeleteWorker_ProcessWithPathDeduplication(t *testing.T) {
return result.Path == "dashboards/unique-dashboard.json" && result.Action == repository.FileActionDeleted && result.Error == nil
})).Return()
worker := NewWorker(nil, mockWrapFn.Execute, mockResourcesFactory, prometheus.DefaultRegisterer)
worker := NewWorker(nil, mockWrapFn.Execute, mockResourcesFactory, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := worker.Process(context.Background(), mockRepo, job, mockProgress)
require.NoError(t, err)
@@ -1015,7 +1021,7 @@ func TestDeleteWorker_RefURLsSetWithRef(t *testing.T) {
mockProgress.On("Record", mock.Anything, mock.Anything).Once()
mockProgress.On("TooManyErrors").Return(nil).Once()
mockProgress.On("SetRefURLs", mock.Anything, expectedRefURLs).Once()
mockProgress.On("Complete", mock.Anything, mock.Anything).Return(v0alpha1.JobStatus{})
mockReaderWriter := repository.NewMockReaderWriter(t)
mockReaderWriter.On("Delete", mock.Anything, "test.json", "feature-branch", "Delete test.json").Return(nil)
@@ -1037,7 +1043,7 @@ func TestDeleteWorker_RefURLsSetWithRef(t *testing.T) {
},
}
worker := NewWorker(nil, mockWrapFn.Execute, mockResourcesFactory, prometheus.DefaultRegisterer)
worker := NewWorker(nil, mockWrapFn.Execute, mockResourcesFactory, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := worker.Process(context.Background(), mockRepoWithURLs, job, mockProgress)
require.NoError(t, err)
@@ -1067,6 +1073,7 @@ func TestDeleteWorker_RefURLsNotSetWithoutRef(t *testing.T) {
mockProgress.On("TooManyErrors").Return(nil).Once()
mockProgress.On("ResetResults").Once()
mockProgress.On("SetMessage", mock.Anything, "pull resources").Once()
mockProgress.On("Complete", mock.Anything, mock.Anything).Return(v0alpha1.JobStatus{})
// SetRefURLs should NOT be called since no ref is specified
mockReaderWriter := repository.NewMockReaderWriter(t)
@@ -1093,7 +1100,7 @@ func TestDeleteWorker_RefURLsNotSetWithoutRef(t *testing.T) {
},
}
worker := NewWorker(mockSyncWorker, mockWrapFn.Execute, mockResourcesFactory, prometheus.DefaultRegisterer)
worker := NewWorker(mockSyncWorker, mockWrapFn.Execute, mockResourcesFactory, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := worker.Process(context.Background(), mockRepoWithURLs, job, mockProgress)
require.NoError(t, err)
@@ -1120,6 +1127,7 @@ func TestDeleteWorker_RefURLsNotSetForNonURLRepository(t *testing.T) {
mockProgress.On("SetMessage", mock.Anything, "Deleting test.json").Once()
mockProgress.On("Record", mock.Anything, mock.Anything).Once()
mockProgress.On("TooManyErrors").Return(nil).Once()
mockProgress.On("Complete", mock.Anything, mock.Anything).Return(v0alpha1.JobStatus{})
// SetRefURLs should NOT be called since repo doesn't support URLs
mockReaderWriter := repository.NewMockReaderWriter(t)
@@ -1143,7 +1151,7 @@ func TestDeleteWorker_RefURLsNotSetForNonURLRepository(t *testing.T) {
},
}
worker := NewWorker(nil, mockWrapFn.Execute, mockResourcesFactory, prometheus.DefaultRegisterer)
worker := NewWorker(nil, mockWrapFn.Execute, mockResourcesFactory, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := worker.Process(context.Background(), mockRepo, job, mockProgress)
require.NoError(t, err)
@@ -6,11 +6,11 @@ import (
"fmt"
"time"
"github.com/grafana/grafana-app-sdk/logging"
provisioning "github.com/grafana/grafana/apps/provisioning/pkg/apis/provisioning/v0alpha1"
"github.com/grafana/grafana/apps/provisioning/pkg/repository"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/resources"
"github.com/prometheus/client_golang/prometheus"
)
//go:generate mockery --name ExportFn --structname MockExportFn --inpackage --filename mock_export_fn.go --with-expecter
@@ -24,7 +24,7 @@ type ExportWorker struct {
repositoryResources resources.RepositoryResourcesFactory
exportFn ExportFn
wrapWithStageFn WrapWithStageFn
registry prometheus.Registerer
metrics jobs.JobMetrics
}
func NewExportWorker(
@@ -32,14 +32,14 @@ func NewExportWorker(
repositoryResources resources.RepositoryResourcesFactory,
exportFn ExportFn,
wrapWithStageFn WrapWithStageFn,
registry prometheus.Registerer,
metrics jobs.JobMetrics,
) *ExportWorker {
return &ExportWorker{
clientFactory: clientFactory,
repositoryResources: repositoryResources,
exportFn: exportFn,
wrapWithStageFn: wrapWithStageFn,
registry: registry,
metrics: metrics,
}
}
@@ -54,6 +54,13 @@ func (r *ExportWorker) Process(ctx context.Context, repo repository.Repository,
return errors.New("missing export settings")
}
logger := logging.FromContext(ctx).With("job", job.GetName(), "namespace", job.GetNamespace())
start := time.Now()
outcome := jobs.ErrorOutcome
resourcesExported := 0
defer func() {
r.metrics.RecordJob(string(provisioning.JobActionPush), outcome, resourcesExported, time.Since(start).Seconds())
}()
cfg := repo.Config()
// Can write to external branch
if err := repository.IsWriteAllowed(cfg, options.Branch); err != nil {
@@ -76,16 +83,19 @@ func (r *ExportWorker) Process(ctx context.Context, repo repository.Repository,
fn := func(repo repository.Repository, _ bool) error {
clients, err := r.clientFactory.Clients(ctx, cfg.Namespace)
if err != nil {
logger.Error("failed to create clients", "error", err)
return fmt.Errorf("create clients: %w", err)
}
rw, ok := repo.(repository.ReaderWriter)
if !ok {
logger.Error("export job submitted targeting repository that is not a ReaderWriter")
return errors.New("export job submitted targeting repository that is not a ReaderWriter")
}
repositoryResources, err := r.repositoryResources.Client(ctx, rw)
if err != nil {
logger.Error("failed to create repository resource client", "error", err)
return fmt.Errorf("create repository resource client: %w", err)
}
@@ -103,5 +113,16 @@ func (r *ExportWorker) Process(ctx context.Context, repo repository.Repository,
}
}
return err
if err != nil {
logger.Error("failed to export", "error", err)
return err
}
outcome = jobs.SuccessOutcome
jobStatus := progress.Complete(ctx, nil)
for _, summary := range jobStatus.Summary {
resourcesExported += int(summary.Write)
}
return nil
}
@@ -19,6 +19,7 @@ import (
)
func TestExportWorker_IsSupported(t *testing.T) {
metrics := jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry())
tests := []struct {
name string
job v0alpha1.Job
@@ -55,7 +56,7 @@ func TestExportWorker_IsSupported(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := NewExportWorker(nil, nil, nil, nil, prometheus.DefaultRegisterer)
r := NewExportWorker(nil, nil, nil, nil, metrics)
got := r.IsSupported(context.Background(), tt.job)
require.Equal(t, tt.want, got)
})
@@ -69,7 +70,7 @@ func TestExportWorker_ProcessNoExportSettings(t *testing.T) {
},
}
r := NewExportWorker(nil, nil, nil, nil, prometheus.DefaultRegisterer)
r := NewExportWorker(nil, nil, nil, nil, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := r.Process(context.Background(), nil, job, nil)
require.EqualError(t, err, "missing export settings")
}
@@ -92,7 +93,7 @@ func TestExportWorker_ProcessWriteNotAllowed(t *testing.T) {
},
})
r := NewExportWorker(nil, nil, nil, nil, prometheus.DefaultRegisterer)
r := NewExportWorker(nil, nil, nil, nil, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := r.Process(context.Background(), mockRepo, job, nil)
require.EqualError(t, err, "this repository is read only")
}
@@ -116,7 +117,7 @@ func TestExportWorker_ProcessBranchNotAllowedForLocal(t *testing.T) {
},
})
r := NewExportWorker(nil, nil, nil, nil, prometheus.DefaultRegisterer)
r := NewExportWorker(nil, nil, nil, nil, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := r.Process(context.Background(), mockRepo, job, nil)
require.EqualError(t, err, "this repository does not support the branch workflow")
}
@@ -148,7 +149,7 @@ func TestExportWorker_ProcessFailedToCreateClients(t *testing.T) {
return fn(repo, true)
})
r := NewExportWorker(mockClients, nil, nil, mockStageFn.Execute, prometheus.DefaultRegisterer)
r := NewExportWorker(mockClients, nil, nil, mockStageFn.Execute, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
mockProgress := jobs.NewMockJobProgressRecorder(t)
err := r.Process(context.Background(), mockRepo, job, mockProgress)
@@ -184,7 +185,7 @@ func TestExportWorker_ProcessNotReaderWriter(t *testing.T) {
return fn(repo, true)
})
r := NewExportWorker(mockClients, nil, nil, mockStageFn.Execute, prometheus.DefaultRegisterer)
r := NewExportWorker(mockClients, nil, nil, mockStageFn.Execute, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := r.Process(context.Background(), mockRepo, job, mockProgress)
require.EqualError(t, err, "export job submitted targeting repository that is not a ReaderWriter")
}
@@ -220,7 +221,7 @@ func TestExportWorker_ProcessRepositoryResourcesError(t *testing.T) {
mockStageFn.On("Execute", context.Background(), mockRepo, mock.Anything, mock.Anything).Return(func(ctx context.Context, repo repository.Repository, stageOpts repository.StageOptions, fn func(repository.Repository, bool) error) error {
return fn(repo, true)
})
r := NewExportWorker(mockClients, mockRepoResources, nil, mockStageFn.Execute, prometheus.DefaultRegisterer)
r := NewExportWorker(mockClients, mockRepoResources, nil, mockStageFn.Execute, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := r.Process(context.Background(), mockRepo, job, mockProgress)
require.EqualError(t, err, "create repository resource client: failed to create repository resources client")
}
@@ -248,6 +249,7 @@ func TestExportWorker_ProcessStageOptions(t *testing.T) {
})
mockProgress := jobs.NewMockJobProgressRecorder(t)
mockProgress.On("Complete", mock.Anything, mock.Anything).Return(v0alpha1.JobStatus{})
// No progress messages expected in current implementation
mockClients := resources.NewMockClientFactory(t)
@@ -271,7 +273,7 @@ func TestExportWorker_ProcessStageOptions(t *testing.T) {
return fn(repo, true)
})
r := NewExportWorker(mockClients, mockRepoResources, mockExportFn.Execute, mockStageFn.Execute, prometheus.DefaultRegisterer)
r := NewExportWorker(mockClients, mockRepoResources, mockExportFn.Execute, mockStageFn.Execute, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := r.Process(context.Background(), mockRepo, job, mockProgress)
require.NoError(t, err)
}
@@ -331,6 +333,7 @@ func TestExportWorker_ProcessStageOptionsWithBranch(t *testing.T) {
})
mockProgress := jobs.NewMockJobProgressRecorder(t)
mockProgress.On("Complete", mock.Anything, mock.Anything).Return(v0alpha1.JobStatus{})
mockClients := resources.NewMockClientFactory(t)
mockResourceClients := resources.NewMockResourceClients(t)
mockClients.On("Clients", mock.Anything, "test-namespace").Return(mockResourceClients, nil)
@@ -352,7 +355,7 @@ func TestExportWorker_ProcessStageOptionsWithBranch(t *testing.T) {
return fn(repo, true)
})
r := NewExportWorker(mockClients, mockRepoResources, mockExportFn.Execute, mockStageFn.Execute, prometheus.DefaultRegisterer)
r := NewExportWorker(mockClients, mockRepoResources, mockExportFn.Execute, mockStageFn.Execute, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := r.Process(context.Background(), mockRepo, job, mockProgress)
require.NoError(t, err)
})
@@ -395,7 +398,7 @@ func TestExportWorker_ProcessExportFnError(t *testing.T) {
return fn(repo, true)
})
r := NewExportWorker(mockClients, mockRepoResources, mockExportFn.Execute, mockStageFn.Execute, prometheus.DefaultRegisterer)
r := NewExportWorker(mockClients, mockRepoResources, mockExportFn.Execute, mockStageFn.Execute, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := r.Process(context.Background(), mockRepo, job, mockProgress)
require.EqualError(t, err, "export failed")
}
@@ -423,7 +426,7 @@ func TestExportWorker_ProcessWrapWithStageFnError(t *testing.T) {
mockStageFn := NewMockWrapWithStageFn(t)
mockStageFn.On("Execute", mock.Anything, mockRepo, mock.Anything, mock.Anything).Return(errors.New("stage failed"))
r := NewExportWorker(nil, nil, nil, mockStageFn.Execute, prometheus.DefaultRegisterer)
r := NewExportWorker(nil, nil, nil, mockStageFn.Execute, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := r.Process(context.Background(), mockRepo, job, mockProgress)
require.EqualError(t, err, "stage failed")
}
@@ -449,7 +452,7 @@ func TestExportWorker_ProcessBranchNotAllowedForStageableRepositories(t *testing
mockProgress := jobs.NewMockJobProgressRecorder(t)
// No progress messages expected in current implementation
r := NewExportWorker(nil, nil, nil, nil, prometheus.DefaultRegisterer)
r := NewExportWorker(nil, nil, nil, nil, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := r.Process(context.Background(), mockRepo, job, mockProgress)
require.EqualError(t, err, "this repository does not support the branch workflow")
}
@@ -479,6 +482,7 @@ func TestExportWorker_ProcessGitRepository(t *testing.T) {
})
mockProgress := jobs.NewMockJobProgressRecorder(t)
mockProgress.On("Complete", mock.Anything, mock.Anything).Return(v0alpha1.JobStatus{})
// No progress messages expected in current implementation
mockClients := resources.NewMockClientFactory(t)
@@ -500,7 +504,7 @@ func TestExportWorker_ProcessGitRepository(t *testing.T) {
return fn(repo, true)
})
r := NewExportWorker(mockClients, mockRepoResources, mockExportFn.Execute, mockStageFn.Execute, prometheus.DefaultRegisterer)
r := NewExportWorker(mockClients, mockRepoResources, mockExportFn.Execute, mockStageFn.Execute, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := r.Process(context.Background(), mockRepo, job, mockProgress)
require.NoError(t, err)
}
@@ -546,7 +550,7 @@ func TestExportWorker_ProcessGitRepositoryExportFnError(t *testing.T) {
return fn(repo, true)
})
r := NewExportWorker(mockClients, mockRepoResources, mockExportFn.Execute, mockStageFn.Execute, prometheus.DefaultRegisterer)
r := NewExportWorker(mockClients, mockRepoResources, mockExportFn.Execute, mockStageFn.Execute, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := r.Process(context.Background(), mockRepo, job, mockProgress)
require.EqualError(t, err, "export failed")
}
@@ -587,7 +591,7 @@ func TestExportWorker_RefURLsSetWithBranch(t *testing.T) {
// Mock progress recorder to expect SetRefURLs call
mockProgress := jobs.NewMockJobProgressRecorder(t)
mockProgress.On("SetRefURLs", mock.Anything, expectedRefURLs).Once()
mockProgress.On("Complete", mock.Anything, mock.Anything).Return(v0alpha1.JobStatus{})
// Mock other dependencies
mockClients := resources.NewMockClientFactory(t)
mockResourceClients := resources.NewMockResourceClients(t)
@@ -609,7 +613,7 @@ func TestExportWorker_RefURLsSetWithBranch(t *testing.T) {
return fn(mockReaderWriter, true)
})
r := NewExportWorker(mockClients, mockRepoResources, mockExportFn.Execute, mockStageFn.Execute, prometheus.DefaultRegisterer)
r := NewExportWorker(mockClients, mockRepoResources, mockExportFn.Execute, mockStageFn.Execute, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := r.Process(context.Background(), mockRepoWithURLs, job, mockProgress)
require.NoError(t, err)
@@ -644,6 +648,7 @@ func TestExportWorker_RefURLsNotSetWithoutBranch(t *testing.T) {
// Mock progress recorder - SetRefURLs should NOT be called
mockProgress := jobs.NewMockJobProgressRecorder(t)
mockProgress.On("Complete", mock.Anything, mock.Anything).Return(v0alpha1.JobStatus{})
// Explicitly NOT expecting SetRefURLs call
// Mock other dependencies
@@ -665,7 +670,7 @@ func TestExportWorker_RefURLsNotSetWithoutBranch(t *testing.T) {
return fn(mockReaderWriter, true)
})
r := NewExportWorker(mockClients, mockRepoResources, mockExportFn.Execute, mockStageFn.Execute, prometheus.DefaultRegisterer)
r := NewExportWorker(mockClients, mockRepoResources, mockExportFn.Execute, mockStageFn.Execute, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := r.Process(context.Background(), mockRepoWithURLs, job, mockProgress)
require.NoError(t, err)
@@ -700,6 +705,7 @@ func TestExportWorker_RefURLsNotSetForNonURLRepository(t *testing.T) {
// Mock progress recorder - SetRefURLs should NOT be called
mockProgress := jobs.NewMockJobProgressRecorder(t)
mockProgress.On("Complete", mock.Anything, mock.Anything).Return(v0alpha1.JobStatus{})
// Explicitly NOT expecting SetRefURLs call
// Mock other dependencies
@@ -721,7 +727,7 @@ func TestExportWorker_RefURLsNotSetForNonURLRepository(t *testing.T) {
return fn(mockReaderWriter, true)
})
r := NewExportWorker(mockClients, mockRepoResources, mockExportFn.Execute, mockStageFn.Execute, prometheus.DefaultRegisterer)
r := NewExportWorker(mockClients, mockRepoResources, mockExportFn.Execute, mockStageFn.Execute, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := r.Process(context.Background(), mockRepo, job, mockProgress)
require.NoError(t, err)
@@ -0,0 +1,112 @@
package jobs
import "github.com/prometheus/client_golang/prometheus"
const (
SuccessOutcome = "success"
ErrorOutcome = "error"
)
type JobMetrics struct {
registry prometheus.Registerer
processedTotal *prometheus.CounterVec
durationHist *prometheus.HistogramVec
}
type QueueMetrics struct {
queueSize *prometheus.GaugeVec
queueWaitTime *prometheus.HistogramVec
}
func RegisterQueueMetrics(registry prometheus.Registerer) QueueMetrics {
queueSize := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "grafana_provisioning_jobs_queue_size",
Help: "Number of jobs currently in the queue",
},
[]string{"action"},
)
registry.MustRegister(queueSize)
queueWaitTime := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "grafana_provisioning_jobs_queue_wait_seconds",
Help: "Time jobs spend waiting in the queue before being claimed",
Buckets: []float64{1.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0},
},
[]string{"action"},
)
registry.MustRegister(queueWaitTime)
return QueueMetrics{
queueSize: queueSize,
queueWaitTime: queueWaitTime,
}
}
func (m *QueueMetrics) IncreaseQueueSize(action string) {
m.queueSize.WithLabelValues(action).Inc()
}
func (m *QueueMetrics) DecreaseQueueSize(action string) {
m.queueSize.WithLabelValues(action).Dec()
}
func (m *QueueMetrics) RecordWaitTime(action string, waitSeconds float64) {
m.queueWaitTime.WithLabelValues(action).Observe(waitSeconds)
}
func RegisterJobMetrics(registry prometheus.Registerer) JobMetrics {
processedTotal := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "grafana_provisioning_jobs_processed_total",
Help: "Total number of jobs processed",
},
[]string{"action", "outcome"},
)
registry.MustRegister(processedTotal)
durationHist := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "grafana_provisioning_jobs_duration_seconds",
Help: "Duration of job",
Buckets: []float64{5.0, 10.0, 30.0, 60.0, 120.0, 300.0},
},
[]string{"action", "resources_changed_bucket"},
)
registry.MustRegister(durationHist)
return JobMetrics{
registry: registry,
processedTotal: processedTotal,
durationHist: durationHist,
}
}
func (m *JobMetrics) RecordJob(jobAction string, outcome string, resourceCountChanged int, duration float64) {
m.processedTotal.WithLabelValues(jobAction, outcome).Inc()
// only record duration when the job was successful. otherwise resource count will be incorrect
if outcome == SuccessOutcome {
m.durationHist.WithLabelValues(jobAction, getResourceCountBucket(resourceCountChanged)).Observe(duration)
}
}
func getResourceCountBucket(count int) string {
switch {
case count == 0:
return "0"
case count <= 10:
return "1-10"
case count <= 50:
return "11-50"
case count <= 100:
return "51-100"
case count <= 500:
return "101-500"
case count <= 1000:
return "501-1000"
default:
return "1000+"
}
}
@@ -9,27 +9,27 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"github.com/grafana/grafana-app-sdk/logging"
provisioning "github.com/grafana/grafana/apps/provisioning/pkg/apis/provisioning/v0alpha1"
"github.com/grafana/grafana/apps/provisioning/pkg/repository"
"github.com/grafana/grafana/apps/provisioning/pkg/safepath"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/resources"
"github.com/prometheus/client_golang/prometheus"
)
type Worker struct {
syncWorker jobs.Worker
wrapFn repository.WrapWithStageFn
resourcesFactory resources.RepositoryResourcesFactory
registry prometheus.Registerer
metrics jobs.JobMetrics
}
func NewWorker(syncWorker jobs.Worker, wrapFn repository.WrapWithStageFn, resourcesFactory resources.RepositoryResourcesFactory, registry prometheus.Registerer) *Worker {
func NewWorker(syncWorker jobs.Worker, wrapFn repository.WrapWithStageFn, resourcesFactory resources.RepositoryResourcesFactory, metrics jobs.JobMetrics) *Worker {
return &Worker{
syncWorker: syncWorker,
wrapFn: wrapFn,
resourcesFactory: resourcesFactory,
registry: registry,
metrics: metrics,
}
}
@@ -42,6 +42,13 @@ func (w *Worker) Process(ctx context.Context, repo repository.Repository, job pr
return errors.New("missing move settings")
}
opts := *job.Spec.Move
logger := logging.FromContext(ctx).With("job", job.GetName(), "namespace", job.GetNamespace())
outcome := jobs.ErrorOutcome
start := time.Now()
resourcesMoved := 0
defer func() {
w.metrics.RecordJob(string(provisioning.JobActionMove), outcome, resourcesMoved, time.Since(start).Seconds())
}()
if opts.TargetPath == "" {
return errors.New("target path is required for move operation")
@@ -60,6 +67,7 @@ func (w *Worker) Process(ctx context.Context, repo repository.Repository, job pr
fn := func(repo repository.Repository, _ bool) error {
rw, ok := repo.(repository.ReaderWriter)
if !ok {
logger.Error("move job submitted targeting repository that is not a ReaderWriter")
return errors.New("move job submitted targeting repository that is not a ReaderWriter")
}
@@ -89,6 +97,7 @@ func (w *Worker) Process(ctx context.Context, repo repository.Repository, job pr
err := w.wrapFn(ctx, repo, stageOptions, fn)
if err != nil {
logger.Error("failed to move files in repository", "error", err)
return fmt.Errorf("move files in repository: %w", err)
}
@@ -115,10 +124,18 @@ func (w *Worker) Process(ctx context.Context, repo repository.Repository, job pr
}
if err := w.syncWorker.Process(ctx, repo, syncJob, progress); err != nil {
logger.Error("failed to pull resources", "error", err)
return fmt.Errorf("pull resources: %w", err)
}
}
outcome = jobs.SuccessOutcome
jobStatus := progress.Complete(ctx, nil)
for _, summary := range jobStatus.Summary {
// FileActionRenamed increments both delete & create, use create here
resourcesMoved += int(summary.Create)
}
return nil
}
@@ -31,6 +31,7 @@ func (m *mockReaderWriter) Move(ctx context.Context, oldPath, newPath, ref, mess
}
func TestMoveWorker_IsSupported(t *testing.T) {
metrics := jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry())
tests := []struct {
name string
job provisioning.Job
@@ -76,7 +77,7 @@ func TestMoveWorker_IsSupported(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
worker := NewWorker(nil, nil, nil, prometheus.DefaultRegisterer)
worker := NewWorker(nil, nil, nil, metrics)
result := worker.IsSupported(context.Background(), tt.job)
require.Equal(t, tt.expected, result)
})
@@ -90,7 +91,7 @@ func TestMoveWorker_ProcessMissingMoveSettings(t *testing.T) {
},
}
worker := NewWorker(nil, nil, nil, prometheus.DefaultRegisterer)
worker := NewWorker(nil, nil, nil, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := worker.Process(context.Background(), nil, job, nil)
require.EqualError(t, err, "missing move settings")
}
@@ -105,7 +106,7 @@ func TestMoveWorker_ProcessMissingTargetPath(t *testing.T) {
},
}
worker := NewWorker(nil, nil, nil, prometheus.DefaultRegisterer)
worker := NewWorker(nil, nil, nil, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := worker.Process(context.Background(), nil, job, nil)
require.EqualError(t, err, "target path is required for move operation")
}
@@ -121,7 +122,7 @@ func TestMoveWorker_ProcessInvalidTargetPath(t *testing.T) {
},
}
worker := NewWorker(nil, nil, nil, prometheus.DefaultRegisterer)
worker := NewWorker(nil, nil, nil, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := worker.Process(context.Background(), nil, job, nil)
require.EqualError(t, err, "target path must be a directory (should end with '/')")
}
@@ -153,7 +154,7 @@ func TestMoveWorker_ProcessNotReaderWriter(t *testing.T) {
mockProgress.On("SetTotal", mock.Anything, 1).Return()
mockProgress.On("StrictMaxErrors", 1).Return()
worker := NewWorker(nil, mockWrapFn.Execute, nil, prometheus.DefaultRegisterer)
worker := NewWorker(nil, mockWrapFn.Execute, nil, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := worker.Process(context.Background(), mockRepo, job, mockProgress)
require.EqualError(t, err, "move files in repository: move job submitted targeting repository that is not a ReaderWriter")
}
@@ -177,7 +178,7 @@ func TestMoveWorker_ProcessWrapFnError(t *testing.T) {
mockProgress.On("SetTotal", mock.Anything, 1).Return()
mockProgress.On("StrictMaxErrors", 1).Return()
worker := NewWorker(nil, mockWrapFn.Execute, nil, prometheus.DefaultRegisterer)
worker := NewWorker(nil, mockWrapFn.Execute, nil, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := worker.Process(context.Background(), mockRepo, job, mockProgress)
require.EqualError(t, err, "move files in repository: stage failed")
}
@@ -213,6 +214,7 @@ func TestMoveWorker_ProcessMoveFilesSuccess(t *testing.T) {
mockProgress.On("SetMessage", mock.Anything, "Moving test/path1 to new/location/path1").Return()
mockProgress.On("SetMessage", mock.Anything, "Moving test/path2 to new/location/path2").Return()
mockProgress.On("TooManyErrors").Return(nil).Twice()
mockProgress.On("Complete", mock.Anything, mock.Anything).Return(provisioning.JobStatus{})
mockRepo.On("Move", mock.Anything, "test/path1", "new/location/path1", "main", "Move test/path1 to new/location/path1").Return(nil)
mockRepo.On("Move", mock.Anything, "test/path2", "new/location/path2", "main", "Move test/path2 to new/location/path2").Return(nil)
@@ -224,7 +226,7 @@ func TestMoveWorker_ProcessMoveFilesSuccess(t *testing.T) {
return result.Path == "test/path2" && result.Action == repository.FileActionRenamed && result.Error == nil
})).Return()
worker := NewWorker(nil, mockWrapFn.Execute, nil, prometheus.DefaultRegisterer)
worker := NewWorker(nil, mockWrapFn.Execute, nil, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := worker.Process(context.Background(), mockRepo, job, mockProgress)
require.NoError(t, err)
}
@@ -263,7 +265,7 @@ func TestMoveWorker_ProcessMoveFilesWithError(t *testing.T) {
})).Return()
mockProgress.On("TooManyErrors").Return(errors.New("too many errors"))
worker := NewWorker(nil, mockWrapFn.Execute, nil, prometheus.DefaultRegisterer)
worker := NewWorker(nil, mockWrapFn.Execute, nil, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := worker.Process(context.Background(), mockRepo, job, mockProgress)
require.EqualError(t, err, "move files in repository: too many errors")
}
@@ -303,12 +305,13 @@ func TestMoveWorker_ProcessWithSyncWorker(t *testing.T) {
mockProgress.On("ResetResults").Return()
mockProgress.On("SetMessage", mock.Anything, "pull resources").Return()
mockProgress.On("Complete", mock.Anything, mock.Anything).Return(provisioning.JobStatus{})
mockSyncWorker.On("Process", mock.Anything, mockRepo, mock.MatchedBy(func(syncJob provisioning.Job) bool {
return syncJob.Spec.Pull != nil && !syncJob.Spec.Pull.Incremental
}), mockProgress).Return(nil)
worker := NewWorker(mockSyncWorker, mockWrapFn.Execute, nil, prometheus.DefaultRegisterer)
worker := NewWorker(mockSyncWorker, mockWrapFn.Execute, nil, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := worker.Process(context.Background(), mockRepo, job, mockProgress)
require.NoError(t, err)
}
@@ -349,7 +352,7 @@ func TestMoveWorker_ProcessSyncWorkerError(t *testing.T) {
syncError := errors.New("sync failed")
mockSyncWorker.On("Process", mock.Anything, mockRepo, mock.Anything, mockProgress).Return(syncError)
worker := NewWorker(mockSyncWorker, mockWrapFn.Execute, nil, prometheus.DefaultRegisterer)
worker := NewWorker(mockSyncWorker, mockWrapFn.Execute, nil, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := worker.Process(context.Background(), mockRepo, job, mockProgress)
require.EqualError(t, err, "pull resources: sync failed")
}
@@ -430,7 +433,7 @@ func TestMoveWorker_moveFiles(t *testing.T) {
}
}
worker := NewWorker(nil, nil, nil, prometheus.DefaultRegisterer)
worker := NewWorker(nil, nil, nil, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := worker.moveFiles(context.Background(), mockRepo, mockProgress, opts, tt.paths...)
if tt.expectedError != "" {
@@ -486,7 +489,7 @@ func TestMoveWorker_constructTargetPath(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
worker := NewWorker(nil, nil, nil, prometheus.DefaultRegisterer)
worker := NewWorker(nil, nil, nil, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
result := worker.constructTargetPath(tt.jobTargetPath, tt.sourcePath)
require.Equal(t, tt.expectedTarget, result)
})
@@ -531,7 +534,7 @@ func TestMoveWorker_ProcessWithResourceReferences(t *testing.T) {
mockProgress.On("SetMessage", mock.Anything, "Moving test/path1 to new/location/path1").Return()
mockProgress.On("SetMessage", mock.Anything, "Moving dashboard/file.yaml to new/location/file.yaml").Return()
mockProgress.On("TooManyErrors").Return(nil).Times(2)
mockProgress.On("Complete", mock.Anything, mock.Anything).Return(provisioning.JobStatus{})
mockResourcesFactory.On("Client", mock.Anything, mockRepo).Return(mockRepoResources, nil)
mockRepoResources.On("FindResourcePath", mock.Anything, "dashboard-uid", schema.GroupVersionKind{
Group: "dashboard.grafana.app",
@@ -555,7 +558,7 @@ func TestMoveWorker_ProcessWithResourceReferences(t *testing.T) {
return syncJob.Spec.Pull != nil && !syncJob.Spec.Pull.Incremental
}), mockProgress).Return(nil)
worker := NewWorker(mockSyncWorker, mockWrapFn.Execute, mockResourcesFactory, prometheus.DefaultRegisterer)
worker := NewWorker(mockSyncWorker, mockWrapFn.Execute, mockResourcesFactory, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := worker.Process(context.Background(), mockRepo, job, mockProgress)
require.NoError(t, err)
}
@@ -594,6 +597,7 @@ func TestMoveWorker_ProcessResourceReferencesError(t *testing.T) {
mockProgress.On("SetMessage", mock.Anything, "Resolving resource paths").Return()
mockProgress.On("SetMessage", mock.Anything, "Finding path for resource dashboard.grafana.app/Dashboard/non-existent-uid").Return()
mockProgress.On("TooManyErrors").Return(nil)
mockProgress.On("Complete", mock.Anything, mock.Anything).Return(provisioning.JobStatus{})
mockResourcesFactory.On("Client", mock.Anything, mockRepo).Return(mockRepoResources, nil)
resourceError := errors.New("resource not found")
@@ -616,7 +620,7 @@ func TestMoveWorker_ProcessResourceReferencesError(t *testing.T) {
return syncJob.Spec.Pull != nil && !syncJob.Spec.Pull.Incremental
}), mockProgress).Return(nil)
worker := NewWorker(mockSyncWorker, mockWrapFn.Execute, mockResourcesFactory, prometheus.DefaultRegisterer)
worker := NewWorker(mockSyncWorker, mockWrapFn.Execute, mockResourcesFactory, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := worker.Process(context.Background(), mockRepo, job, mockProgress)
require.NoError(t, err) // Should continue despite individual resource errors
}
@@ -656,7 +660,7 @@ func TestMoveWorker_ProcessResourcesFactoryError(t *testing.T) {
factoryError := errors.New("failed to create resources client")
mockResourcesFactory.On("Client", mock.Anything, mockRepo).Return(nil, factoryError)
worker := NewWorker(nil, mockWrapFn.Execute, mockResourcesFactory, prometheus.DefaultRegisterer)
worker := NewWorker(nil, mockWrapFn.Execute, mockResourcesFactory, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := worker.Process(context.Background(), mockRepo, job, mockProgress)
require.EqualError(t, err, "move files in repository: create repository resources client: failed to create resources client")
}
@@ -774,7 +778,7 @@ func TestMoveWorker_resolveResourcesToPaths(t *testing.T) {
}
}
worker := NewWorker(nil, nil, mockResourcesFactory, prometheus.DefaultRegisterer)
worker := NewWorker(nil, nil, mockResourcesFactory, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
paths, err := worker.resolveResourcesToPaths(context.Background(), mockRepo, mockProgress, tt.resources)
if tt.expectedError != "" {
@@ -863,7 +867,7 @@ func TestMoveWorker_RefURLsSetWithRef(t *testing.T) {
mockProgress.On("Record", mock.Anything, mock.Anything).Once()
mockProgress.On("TooManyErrors").Return(nil).Once()
mockProgress.On("SetRefURLs", mock.Anything, expectedRefURLs).Once()
mockProgress.On("Complete", mock.Anything, mock.Anything).Return(provisioning.JobStatus{})
mockReaderWriter := repository.NewMockReaderWriter(t)
mockReaderWriter.On("Move", mock.Anything, "test.json", "target/test.json", "feature-branch", "Move test.json to target/test.json").Return(nil)
@@ -886,7 +890,7 @@ func TestMoveWorker_RefURLsSetWithRef(t *testing.T) {
},
}
worker := NewWorker(nil, mockWrapFn.Execute, mockResourcesFactory, prometheus.DefaultRegisterer)
worker := NewWorker(nil, mockWrapFn.Execute, mockResourcesFactory, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := worker.Process(context.Background(), mockRepoWithURLs, job, mockProgress)
require.NoError(t, err)
@@ -916,6 +920,7 @@ func TestMoveWorker_RefURLsNotSetWithoutRef(t *testing.T) {
mockProgress.On("TooManyErrors").Return(nil).Once()
mockProgress.On("ResetResults").Once()
mockProgress.On("SetMessage", mock.Anything, "pull resources").Once()
mockProgress.On("Complete", mock.Anything, mock.Anything).Return(provisioning.JobStatus{})
// SetRefURLs should NOT be called since no ref is specified
mockReaderWriter := repository.NewMockReaderWriter(t)
@@ -943,7 +948,7 @@ func TestMoveWorker_RefURLsNotSetWithoutRef(t *testing.T) {
},
}
worker := NewWorker(mockSyncWorker, mockWrapFn.Execute, mockResourcesFactory, prometheus.DefaultRegisterer)
worker := NewWorker(mockSyncWorker, mockWrapFn.Execute, mockResourcesFactory, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := worker.Process(context.Background(), mockRepoWithURLs, job, mockProgress)
require.NoError(t, err)
@@ -970,6 +975,7 @@ func TestMoveWorker_RefURLsNotSetForNonURLRepository(t *testing.T) {
mockProgress.On("SetMessage", mock.Anything, "Moving test.json to target/test.json").Once()
mockProgress.On("Record", mock.Anything, mock.Anything).Once()
mockProgress.On("TooManyErrors").Return(nil).Once()
mockProgress.On("Complete", mock.Anything, mock.Anything).Return(provisioning.JobStatus{})
// SetRefURLs should NOT be called since repo doesn't support URLs
mockReaderWriter := repository.NewMockReaderWriter(t)
@@ -994,7 +1000,7 @@ func TestMoveWorker_RefURLsNotSetForNonURLRepository(t *testing.T) {
},
}
worker := NewWorker(nil, mockWrapFn.Execute, mockResourcesFactory, prometheus.DefaultRegisterer)
worker := NewWorker(nil, mockWrapFn.Execute, mockResourcesFactory, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := worker.Process(context.Background(), mockRepo, job, mockProgress)
require.NoError(t, err)
@@ -74,7 +74,7 @@ type persistentStore struct {
// If a job is abandoned, it will have its claim cleaned up periodically.
expiry time.Duration
registry prometheus.Registerer
queueMetrics QueueMetrics
}
// NewJobStore creates a new job queue implementation using the API client.
@@ -83,11 +83,13 @@ func NewJobStore(provisioningClient client.ProvisioningV0alpha1Interface, expiry
expiry = time.Second * 30
}
queueMetrics := RegisterQueueMetrics(registry)
return &persistentStore{
client: provisioningClient,
clock: time.Now,
expiry: expiry,
registry: registry,
client: provisioningClient,
clock: time.Now,
expiry: expiry,
queueMetrics: queueMetrics,
}, nil
}
@@ -120,6 +122,7 @@ func (s *persistentStore) Claim(ctx context.Context) (job *provisioning.Job, rol
job.Labels = make(map[string]string)
}
job.Labels[LabelJobClaim] = strconv.FormatInt(s.clock().UnixMilli(), 10)
s.queueMetrics.RecordWaitTime(string(job.Spec.Action), s.clock().Sub(job.CreationTimestamp.Time).Seconds())
// Set up the provisioning identity for this namespace
ctx, _, err = identity.WithProvisioningIdentity(ctx, job.GetNamespace())
@@ -241,6 +244,7 @@ func (s *persistentStore) Complete(ctx context.Context, job *provisioning.Job) e
job.Labels = make(map[string]string)
}
delete(job.Labels, LabelJobClaim)
s.queueMetrics.DecreaseQueueSize(string(job.Spec.Action))
logger.Debug("job completion done")
return nil
@@ -383,6 +387,8 @@ func (s *persistentStore) Insert(ctx context.Context, namespace string, spec pro
return nil, apifmt.Errorf("failed to create job '%s' in '%s': %w", job.GetName(), job.GetNamespace(), err)
}
s.queueMetrics.IncreaseQueueSize(string(job.Spec.Action))
return created, nil
}
@@ -3,6 +3,7 @@ package sync
import (
"context"
"fmt"
"time"
"github.com/grafana/grafana-app-sdk/logging"
provisioning "github.com/grafana/grafana/apps/provisioning/pkg/apis/provisioning/v0alpha1"
@@ -10,7 +11,6 @@ import (
"github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/resources"
"github.com/grafana/grafana/pkg/storage/legacysql/dualwrite"
"github.com/prometheus/client_golang/prometheus"
)
//go:generate mockery --name RepositoryPatchFn --structname MockRepositoryPatchFn --inpackage --filename repository_patch_fn_mock.go --with-expecter
@@ -34,8 +34,7 @@ type SyncWorker struct {
// Sync functions
syncer Syncer
// Registry for metrics
registry prometheus.Registerer
metrics jobs.JobMetrics
}
func NewSyncWorker(
@@ -44,7 +43,7 @@ func NewSyncWorker(
storageStatus dualwrite.Service,
patchStatus RepositoryPatchFn,
syncer Syncer,
registry prometheus.Registerer,
metrics jobs.JobMetrics,
) *SyncWorker {
return &SyncWorker{
clients: clients,
@@ -52,7 +51,7 @@ func NewSyncWorker(
patchStatus: patchStatus,
storageStatus: storageStatus,
syncer: syncer,
registry: registry,
metrics: metrics,
}
}
@@ -63,6 +62,14 @@ func (r *SyncWorker) IsSupported(ctx context.Context, job provisioning.Job) bool
func (r *SyncWorker) Process(ctx context.Context, repo repository.Repository, job provisioning.Job, progress jobs.JobProgressRecorder) error {
cfg := repo.Config()
logger := logging.FromContext(ctx).With("job", job.GetName(), "namespace", job.GetNamespace())
start := time.Now()
outcome := jobs.ErrorOutcome
totalChangesMade := 0
defer func() {
r.metrics.RecordJob(string(provisioning.JobActionPull), outcome, totalChangesMade, time.Since(start).Seconds())
}()
// Check if we are onboarding from legacy storage
// HACK -- this should be handled outside of this worker
if r.storageStatus != nil && dualwrite.IsReadingLegacyDashboardsAndFolders(ctx, r.storageStatus) {
@@ -71,7 +78,7 @@ func (r *SyncWorker) Process(ctx context.Context, repo repository.Repository, jo
rw, ok := repo.(repository.ReaderWriter)
if !ok {
return fmt.Errorf("sync job submitted for repository that does not support read-write -- this is a bug")
return fmt.Errorf("sync job submitted for repository that does not support read-write")
}
syncStatus := job.Status.ToSyncStatus(job.Name)
@@ -90,16 +97,19 @@ func (r *SyncWorker) Process(ctx context.Context, repo repository.Repository, jo
progress.SetMessage(ctx, "update sync status at start")
if err := r.patchStatus(ctx, cfg, patchOperations...); err != nil {
logger.Error("failed to update the repository status at the start of the sync job", "error", err)
return fmt.Errorf("update repo with job status at start: %w", err)
}
repositoryResources, err := r.repositoryResources.Client(ctx, rw)
if err != nil {
logger.Error("failed to create repository resources client", "error", err)
return fmt.Errorf("create repository resources client: %w", err)
}
clients, err := r.clients.Clients(ctx, cfg.Namespace)
if err != nil {
logger.Error("failed to get clients for the repository", "error", err)
return fmt.Errorf("get clients for %s: %w", cfg.Name, err)
}
@@ -110,6 +120,15 @@ func (r *SyncWorker) Process(ctx context.Context, repo repository.Repository, jo
jobStatus := progress.Complete(ctx, syncError)
syncStatus = jobStatus.ToSyncStatus(job.Name)
if syncError != nil {
logger.Debug("failed to sync the repository", "error", syncError)
} else {
outcome = jobs.SuccessOutcome
for _, summary := range jobStatus.Summary {
totalChangesMade += int(summary.Create + summary.Update + summary.Delete)
}
}
// Create sync status and set hash if successful
if syncStatus.State == provisioning.JobStateSuccess {
syncStatus.LastRef = currentRef
@@ -146,6 +165,7 @@ func (r *SyncWorker) Process(ctx context.Context, repo repository.Repository, jo
// Only patch the specific fields we want to update, not the entire status
if err := r.patchStatus(ctx, cfg, patchOperations...); err != nil {
logger.Error("failed to update the repository status at the end of the sync job", "error", err)
return fmt.Errorf("update repo with job final status: %w", err)
}
@@ -17,6 +17,7 @@ import (
)
func TestSyncWorker_IsSupported(t *testing.T) {
metrics := jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry())
tests := []struct {
name string
job provisioning.Job
@@ -44,7 +45,7 @@ func TestSyncWorker_IsSupported(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
worker := NewSyncWorker(nil, nil, nil, nil, nil, prometheus.DefaultRegisterer)
worker := NewSyncWorker(nil, nil, nil, nil, nil, metrics)
result := worker.IsSupported(context.Background(), tt.job)
require.Equal(t, tt.expected, result)
})
@@ -63,9 +64,9 @@ func TestSyncWorker_ProcessNotReaderWriter(t *testing.T) {
})
fakeDualwrite := dualwrite.NewMockService(t)
fakeDualwrite.On("ReadFromUnified", mock.Anything, mock.Anything).Return(true, nil).Twice()
worker := NewSyncWorker(nil, nil, fakeDualwrite, nil, nil, prometheus.DefaultRegisterer)
worker := NewSyncWorker(nil, nil, fakeDualwrite, nil, nil, jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()))
err := worker.Process(context.Background(), repo, provisioning.Job{}, jobs.NewMockJobProgressRecorder(t))
require.EqualError(t, err, "sync job submitted for repository that does not support read-write -- this is a bug")
require.EqualError(t, err, "sync job submitted for repository that does not support read-write")
}
func TestSyncWorker_Process(t *testing.T) {
@@ -531,7 +532,7 @@ func TestSyncWorker_Process(t *testing.T) {
dualwriteService,
repositoryPatchFn.Execute,
syncer,
prometheus.DefaultRegisterer,
jobs.RegisterJobMetrics(prometheus.NewPedanticRegistry()),
)
// Create test job
+6 -4
View File
@@ -737,13 +737,15 @@ func (b *APIBuilder) GetPostStartHooks() (map[string]genericapiserver.PostStartH
usageMetricCollector := usage.MetricCollector(b.tracer, b.getRepositoriesInNamespace, b.unified)
b.usageStats.RegisterMetricsFunc(usageMetricCollector)
metrics := jobs.RegisterJobMetrics(b.registry)
stageIfPossible := repository.WrapWithStageAndPushIfPossible
exportWorker := export.NewExportWorker(
b.clients,
b.repositoryResources,
export.ExportAll,
stageIfPossible,
b.registry,
metrics,
)
syncer := sync.NewSyncer(sync.Compare, sync.FullSync, sync.IncrementalSync)
@@ -753,7 +755,7 @@ func (b *APIBuilder) GetPostStartHooks() (map[string]genericapiserver.PostStartH
b.storageStatus,
b.statusPatcher.Patch,
syncer,
b.registry,
metrics,
)
signerFactory := signature.NewSignerFactory(b.clients)
legacyResources := migrate.NewLegacyResourcesMigrator(
@@ -785,8 +787,8 @@ func (b *APIBuilder) GetPostStartHooks() (map[string]genericapiserver.PostStartH
b.storageStatus,
)
deleteWorker := deletepkg.NewWorker(syncWorker, stageIfPossible, b.repositoryResources, b.registry)
moveWorker := movepkg.NewWorker(syncWorker, stageIfPossible, b.repositoryResources, b.registry)
deleteWorker := deletepkg.NewWorker(syncWorker, stageIfPossible, b.repositoryResources, metrics)
moveWorker := movepkg.NewWorker(syncWorker, stageIfPossible, b.repositoryResources, metrics)
workers := []jobs.Worker{
deleteWorker,
exportWorker,