Files
grafana/pkg/operators/provisioning/jobs_operator.go
Roberto Jiménez Sánchez ed2273b2d2 Provisioning: processing of jobs in job controller (#110223)
* WIP: Controller

* WIP: more changes

* Use patcher from new location

* Separate import

* Move operators to grafana/grafana

* Tidy go mod

* Remove duplicate TODO

* Wrapper for unified storage

* WIP: build unified storage client

* More attempts

* Revert update workspace

* Improve comment

* Fix linting

* Change signature of repository getter

* Add ticket numbers

* Remove question

* Read config from file for decrypt service

* Config struct for unified torage

* Add local config

* Fix compilation

* Try to configure it

* Fix linting

* Add FIXME comment

* Move reusable logic into controller config

* Remove unused

* More logic to be reused

* Extract workers into separate function

* Clean up unified storage client

* Revert a couple of files

* Remove secrets decrypter from this PR

* Revert enterprise imports

* Clean up unified storage setup logic

* Add TODO

* Revert some changes

* Remove file

* Use the expected clients

---------

Co-authored-by: Stephanie Hingtgen <stephanie.hingtgen@grafana.com>
2025-09-05 18:28:31 +00:00

297 lines
9.5 KiB
Go

package provisioning
import (
"context"
"fmt"
"log/slog"
"os"
"os/signal"
"syscall"
"time"
"github.com/grafana/grafana-app-sdk/logging"
"github.com/urfave/cli/v2"
"k8s.io/client-go/tools/cache"
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/apimachinery/utils"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs/export"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs/migrate"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs/move"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs/sync"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/resources"
"github.com/grafana/grafana/pkg/services/apiserver/standalone"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/grafana/grafana/apps/provisioning/pkg/controller"
informer "github.com/grafana/grafana/apps/provisioning/pkg/generated/informers/externalversions"
"github.com/grafana/grafana/apps/provisioning/pkg/repository"
deletepkg "github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs/delete"
)
func RunJobController(opts standalone.BuildInfo, c *cli.Context, cfg *setting.Cfg) error {
logger := logging.NewSLogLogger(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelDebug,
})).With("logger", "provisioning-job-controller")
logger.Info("Starting provisioning job controller")
controllerCfg, err := setupJobsControllerFromConfig(cfg)
if err != nil {
return fmt.Errorf("failed to setup operator: %w", err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
fmt.Println("Received shutdown signal, stopping controllers")
cancel()
}()
// Use unified storage client and API clients for testing purposes.
// TODO: remove this once the processing logic is in place
// https://github.com/grafana/git-ui-sync-project/issues/467
go temporaryPeriodicTestClients(ctx, logger, controllerCfg)
// Jobs informer and controller (resync ~60s like in register.go)
jobInformerFactory := informer.NewSharedInformerFactoryWithOptions(
controllerCfg.provisioningClient,
controllerCfg.resyncInterval,
)
jobInformer := jobInformerFactory.Provisioning().V0alpha1().Jobs()
jobController, err := controller.NewJobController(jobInformer)
if err != nil {
return fmt.Errorf("failed to create job controller: %w", err)
}
logger.Info("jobs controller started")
var startHistoryInformers func()
if controllerCfg.historyExpiration > 0 {
// History jobs informer and controller (separate factory with resync == expiration)
historyInformerFactory := informer.NewSharedInformerFactoryWithOptions(
controllerCfg.provisioningClient,
controllerCfg.historyExpiration,
)
historyJobInformer := historyInformerFactory.Provisioning().V0alpha1().HistoricJobs()
_, err = controller.NewHistoryJobController(
controllerCfg.provisioningClient.ProvisioningV0alpha1(),
historyJobInformer,
controllerCfg.historyExpiration,
)
if err != nil {
return fmt.Errorf("failed to create history job controller: %w", err)
}
logger.Info("history cleanup enabled", "expiration", controllerCfg.historyExpiration.String())
startHistoryInformers = func() { historyInformerFactory.Start(ctx.Done()) }
} else {
startHistoryInformers = func() {}
}
// HistoryWriter can be either Loki or the API server
// TODO: Loki configuration and setup in the same way we do for the API server
// https://github.com/grafana/git-ui-sync-project/issues/508
// var jobHistoryWriter jobs.HistoryWriter
// if b.jobHistoryLoki != nil {
// jobHistoryWriter = b.jobHistoryLoki
// } else {
// jobHistoryWriter = jobs.NewAPIClientHistoryWriter(provisioningClient.ProvisioningV0alpha1())
// }
jobHistoryWriter := jobs.NewAPIClientHistoryWriter(controllerCfg.provisioningClient.ProvisioningV0alpha1())
jobStore, err := jobs.NewJobStore(controllerCfg.provisioningClient.ProvisioningV0alpha1(), 30*time.Second)
if err != nil {
return fmt.Errorf("create API client job store: %w", err)
}
workers, err := setupWorkers(controllerCfg)
if err != nil {
return fmt.Errorf("setup workers: %w", err)
}
repoGetter := resources.NewRepositoryGetter(
controllerCfg.repoFactory,
controllerCfg.provisioningClient.ProvisioningV0alpha1(),
)
// This is basically our own JobQueue system
driver, err := jobs.NewConcurrentJobDriver(
3, // 3 drivers for now
20*time.Minute, // Max time for each job
time.Minute, // Cleanup jobs
30*time.Second, // Periodically look for new jobs
30*time.Second, // Lease renewal interval
jobStore,
repoGetter,
jobHistoryWriter,
jobController.InsertNotifications(),
workers...,
)
if err != nil {
return fmt.Errorf("create concurrent job driver: %w", err)
}
go func() {
logger.Info("jobs controller started")
if err := driver.Run(ctx); err != nil {
logger.Error("job driver failed", "error", err)
}
}()
// Start informers
go jobInformerFactory.Start(ctx.Done())
go startHistoryInformers()
// Optionally wait for job cache sync; history cleanup can rely on resync events
if !cache.WaitForCacheSync(ctx.Done(), jobInformer.Informer().HasSynced) {
return fmt.Errorf("failed to sync job informer cache")
}
<-ctx.Done()
return nil
}
type jobsControllerConfig struct {
provisioningControllerConfig
historyExpiration time.Duration
}
func setupJobsControllerFromConfig(cfg *setting.Cfg) (*jobsControllerConfig, error) {
controllerCfg, err := setupFromConfig(cfg)
if err != nil {
return nil, err
}
return &jobsControllerConfig{
provisioningControllerConfig: *controllerCfg,
historyExpiration: cfg.SectionWithEnvOverrides("operator").Key("history_expiration").MustDuration(0),
}, nil
}
func setupWorkers(controllerCfg *jobsControllerConfig) ([]jobs.Worker, error) {
clients := controllerCfg.clients
parsers := resources.NewParserFactory(clients)
resourceLister := resources.NewResourceLister(controllerCfg.unified)
repositoryResources := resources.NewRepositoryResourcesFactory(parsers, clients, resourceLister)
statusPatcher := controller.NewRepositoryStatusPatcher(controllerCfg.provisioningClient.ProvisioningV0alpha1())
workers := make([]jobs.Worker, 0)
// Sync
syncer := sync.NewSyncer(sync.Compare, sync.FullSync, sync.IncrementalSync)
syncWorker := sync.NewSyncWorker(
clients,
repositoryResources,
nil, // HACK: we have updated the worker to check for nil
statusPatcher.Patch,
syncer,
)
workers = append(workers, syncWorker)
// Export
stageIfPossible := repository.WrapWithStageAndPushIfPossible
exportWorker := export.NewExportWorker(
clients,
repositoryResources,
export.ExportAll,
stageIfPossible,
)
workers = append(workers, exportWorker)
// Migrate
cleaner := migrate.NewNamespaceCleaner(clients)
unifiedStorageMigrator := migrate.NewUnifiedStorageMigrator(
cleaner,
exportWorker,
syncWorker,
)
migrationWorker := migrate.NewMigrationWorkerFromUnified(unifiedStorageMigrator)
workers = append(workers, migrationWorker)
// Delete
deleteWorker := deletepkg.NewWorker(syncWorker, stageIfPossible, repositoryResources)
workers = append(workers, deleteWorker)
// Move
moveWorker := move.NewWorker(syncWorker, stageIfPossible, repositoryResources)
workers = append(workers, moveWorker)
return workers, nil
}
// Use unified storage client for testing purposes.
// TODO: remove this once the processing logic is in place
// https://github.com/grafana/git-ui-sync-project/issues/467
func temporaryPeriodicTestClients(ctx context.Context, logger logging.Logger, controllerCfg *jobsControllerConfig) {
tick := time.NewTicker(controllerCfg.resyncInterval)
logger.Info("starting periodic using clients", "interval", controllerCfg.resyncInterval.String())
fetchAndLog := func(ctx context.Context) {
ctx, _, err := identity.WithProvisioningIdentity(ctx, "*") // "*" grants us access to all namespaces.
if err != nil {
logger.Error("failed to set identity", "error", err)
return
}
resp, err := controllerCfg.unified.CountManagedObjects(ctx, &resourcepb.CountManagedObjectsRequest{
Kind: string(utils.ManagerKindRepo),
})
if err != nil {
logger.Error("failed to list managed objects", "error", err)
} else {
if len(resp.Items) == 0 {
logger.Info("no managed objects found")
} else {
for _, obj := range resp.Items {
logger.Info("manage object counts", "item", obj)
}
}
}
// List all supported resources
client, err := controllerCfg.clients.Clients(ctx, "")
if err != nil {
logger.Error("failed to get resource clients", "error", err)
return
}
for kind, gvr := range resources.SupportedProvisioningResources {
logger := logger.With("kind", kind, "gvr", gvr.String())
logger.Info("fetching resources")
resourceClient, gvk, err := client.ForResource(ctx, gvr)
if err != nil {
logger.Error("failed to get resource client", "error", err)
continue
}
logger = logger.With("gvk", gvk.String())
list, err := resourceClient.List(ctx, metav1.ListOptions{})
if err != nil {
logger.Error("failed to list resources", "error", err)
continue
}
for _, item := range list.Items {
logger.Info("resource", "name", item.GetName(), "namespace", item.GetNamespace())
}
}
}
fetchAndLog(ctx) // Initial fetch
for {
select {
case <-ctx.Done():
tick.Stop()
return
case <-tick.C:
// Periodic fetch
fetchAndLog(ctx)
}
}
}