Provisioning: Fix import cycle between grafana and provisioning app (#110406)

* Move operators to grafana/grafana

* Go mod tidy
This commit is contained in:
Roberto Jiménez Sánchez
2025-09-01 15:29:34 +02:00
committed by GitHub
parent 7324087273
commit 4de9ec7310
6 changed files with 2 additions and 13 deletions
-4
View File
@@ -7,7 +7,6 @@ require (
github.com/grafana/grafana-app-sdk/logging v0.40.3
github.com/grafana/grafana/pkg/apimachinery v0.0.0-20250804150913-990f1c69ecc2
github.com/stretchr/testify v1.10.0
github.com/urfave/cli/v2 v2.27.7
k8s.io/apimachinery v0.33.3
k8s.io/apiserver v0.33.3
k8s.io/client-go v0.33.3
@@ -19,7 +18,6 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.7 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/emicklei/go-restful/v3 v3.12.1 // indirect
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
@@ -51,11 +49,9 @@ require (
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.65.0 // indirect
github.com/prometheus/procfs v0.16.1 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/spf13/pflag v1.0.7 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/otel v1.37.0 // indirect
go.opentelemetry.io/otel/metric v1.37.0 // indirect
-8
View File
@@ -4,8 +4,6 @@ github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM
github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cpuguy83/go-md2man/v2 v2.0.7 h1:zbFlGlXEAKlwXpmvle3d8Oe3YnkKIK4xSRTd3sHPnBo=
github.com/cpuguy83/go-md2man/v2 v2.0.7/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
@@ -97,8 +95,6 @@ github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzM
github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/spf13/pflag v1.0.7 h1:vN6T9TfwStFPFM5XzjsvmzZkLuaLX+HS+0SeFLRgU6M=
github.com/spf13/pflag v1.0.7/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@@ -108,12 +104,8 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/urfave/cli/v2 v2.27.7 h1:bH59vdhbjLv3LAvIu6gd0usJHgoTTPhCFib8qqOwXYU=
github.com/urfave/cli/v2 v2.27.7/go.mod h1:CyNAG/xg+iAOg0N4MPGZqVmv2rCoP267496AOXUZjA4=
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4=
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
-145
View File
@@ -1,145 +0,0 @@
# Jobs Controller
> [!WARNING]
> This controller has current limitations:
>
> - Does not start the ConcurrentJobDriver yet. Notifications are logged but not consumed by workers here.
> - Job processing (claim/renew/update/complete) isn't implemented yet as it requires refactoring of some components.
### Behavior
- Watches provisioning `Jobs` and emits notifications on job creation.
- Optionally cleans up `HistoricJobs` after a configurable expiration. Disable when job history is stored in Loki.
- Queueing and claiming:
- Creating a `Job` enqueues work. Drivers “claim” one job at a time under a time-bound lease so only one worker processes it at once.
- If a driver crashes or loses its lease, cleanup makes the job eligible to be claimed again. This yields at-least-once processing.
- New job notifications reduce latency; periodic ticks ensure progress even without notifications.
- Processing and status:
- A supporting worker processes the job, renewing the lease in the background. If lease renewal fails or expires, processing aborts.
- Status updates are persisted with conflict-aware retries. Progress is throttled to avoid excessive writes while still providing timely feedback.
- When processing finishes, the job is marked complete and a copy is written to history.
- Historic jobs role:
- Historic jobs are a read-only audit trail and UX surface for recent job outcomes, progress summaries, errors, and reference URLs.
- Retention is implementation-dependent: this controller can prune old history objects periodically, or history can be stored in Loki; when using Loki, disable local cleanup with `--history-expiration=0`.
This binary currently wires informers and emits job-create notifications. In the full setup, concurrent drivers consume notifications and execute workers to process jobs using the behavior above.
### Flags
- `--token` (string): Token to use for authentication against the provisioning API.
- `--token-exchange-url` (string): Token exchange endpoint used to mint the access token for the provisioning API.
- `--provisioning-server-url` (string): Base URL to the provisioning API server (e.g., `https://localhost:6446`).
- `--history-expiration` (duration): If greater than zero, enables HistoricJobs cleanup and sets the retention window (e.g., `30s`, `15m`, `24h`). If `0`, cleanup is disabled.
#### TLS Configuration
- `--tls-insecure` (bool): Skip TLS certificate verification. Default: `true` (for development/testing).
- `--tls-cert-file` (string): Path to TLS client certificate file for mutual TLS authentication.
- `--tls-key-file` (string): Path to TLS client private key file for mutual TLS authentication.
- `--tls-ca-file` (string): Path to TLS CA certificate file for server certificate verification.
### How to run
1. Build grafana:
- `make build`
2. Ensure the following services are running locally: provisioning API server, secrets service API server, repository controller, unified storage, and auth.
3. Create a operator.ini file:
```
[operator]
provisioning_server_url = https://localhost:6446
tls_insecure = true
[grpc_client_authentication]
token = ProvisioningAdminToken
token_exchange_url = http://localhost:6481/sign/access-token
# Uncomment to enable history cleanup via Loki. First ensure the Provisioning API is configured with Loki for job history (see `createJobHistoryConfigFromSettings` in `pkg/registry/apis/provisioning/register.go`).
# history_expiration = 24h
```
3. Start the controller:
- `GF_DEFAULT_TARGET=operator GF_OPERATOR_NAME=provisioning-jobs ./bin/darwin-arm64/grafana server target --config=conf/operator.ini`
#### TLS Configuration Examples
- **Production with proper TLS verification**:
```
[operator]
provisioning_server_url = https://localhost:6446
tls_insecure = false
tls_ca_file = /path/to/ca-cert.pem
[grpc_client_authentication]
token = ProvisioningAdminToken
token_exchange_url = http://localhost:6481/sign/access-token
```
- **Mutual TLS authentication**:
```
[operator]
provisioning_server_url = https://localhost:6446
tls_insecure = false
tls_ca_file = /path/to/ca-cert.pem
tls_cert_file = /path/to/client-cert.pem
tls_key_file = /path/to/client-key.pem
[grpc_client_authentication]
token = ProvisioningAdminToken
token_exchange_url = http://localhost:6481/sign/access-token
```
- **Development with self-signed certificates (insecure)**:
```
[operator]
provisioning_server_url = https://localhost:6446
tls_insecure = true
[grpc_client_authentication]
token = ProvisioningAdminToken
token_exchange_url = http://localhost:6481/sign/access-token
```
### Expected behavior
1. Create a repository and enqueue a job (note that the repository must be marked as healthy):
```curl
export ACCESS_TOKEN=$(curl -X POST http://localhost:6481/sign/access-token \
-H "X-Realms: [{\"type\":\"system\",\"identifier\":\"system\"}]" \
-H "X-Org-ID: 0" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer ProvisioningAdminToken" \
-d '{
"namespace": "*",
"audiences": ["provisioning.grafana.app"]
}' | jq -r '.data.token')
```
```curl
curl -X POST https://localhost:6446/apis/provisioning.grafana.app/v0alpha1/namespaces/default/repositories/test6/jobs \
-H "Content-Type: application/json" --insecure \
-H "X-Access-Token: Bearer $ACCESS_TOKEN" \
-d '{
"action": "pull",
"pull": {
"incremental": false
}
}'
```
2. The controller emits a notification on job creation.
```
➜ job-controller git:(feature/standalone-job-controller) ✗ ./bin/job-controller --token-exchange-url=http://localhost:6481/sign/access-token --token=ProvisioningAdminToken --provisioning-server-url=https://localhost:6446
{"time":"2025-08-21T14:27:03.789337+02:00","level":"INFO","msg":"job create notification received","logger":"provisioning-job-controller"}
```
```
```
3. In a full setup with the concurrent driver, workers claim and process jobs, updating status and writing history.
4. Entries move to `HistoricJobs`; if cleanup is enabled, older entries are pruned based on `--history-expiration`.
-210
View File
@@ -1,210 +0,0 @@
package operators
import (
"context"
"crypto/x509"
"fmt"
"log/slog"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/grafana/authlib/authn"
"github.com/grafana/grafana-app-sdk/logging"
"github.com/urfave/cli/v2"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/transport"
"github.com/grafana/grafana/pkg/server"
"github.com/grafana/grafana/pkg/services/apiserver/standalone"
"github.com/grafana/grafana/pkg/setting"
authrt "github.com/grafana/grafana/apps/provisioning/pkg/auth"
"github.com/grafana/grafana/apps/provisioning/pkg/controller"
client "github.com/grafana/grafana/apps/provisioning/pkg/generated/clientset/versioned"
informer "github.com/grafana/grafana/apps/provisioning/pkg/generated/informers/externalversions"
)
func init() {
server.RegisterOperator(server.Operator{
Name: "provisioning-jobs",
Description: "Watch provisioning jobs and manage job history cleanup",
RunFunc: runJobController,
})
}
type controllerConfig struct {
provisioningClient *client.Clientset
historyExpiration time.Duration
}
func runJobController(opts standalone.BuildInfo, c *cli.Context, cfg *setting.Cfg) error {
logger := logging.NewSLogLogger(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelDebug,
})).With("logger", "provisioning-job-controller")
logger.Info("Starting provisioning job controller")
controllerCfg, err := setupFromConfig(cfg)
if err != nil {
return fmt.Errorf("failed to setup operator: %w", err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
fmt.Println("Received shutdown signal, stopping controllers")
cancel()
}()
// Jobs informer and controller (resync ~60s like in register.go)
jobInformerFactory := informer.NewSharedInformerFactoryWithOptions(
controllerCfg.provisioningClient,
60*time.Second,
)
jobInformer := jobInformerFactory.Provisioning().V0alpha1().Jobs()
jobController, err := controller.NewJobController(jobInformer)
if err != nil {
return fmt.Errorf("failed to create job controller: %w", err)
}
logger.Info("jobs controller started")
notifications := jobController.InsertNotifications()
go func() {
for {
select {
case <-ctx.Done():
return
case <-notifications:
logger.Info("job create notification received")
}
}
}()
var startHistoryInformers func()
if controllerCfg.historyExpiration > 0 {
// History jobs informer and controller (separate factory with resync == expiration)
historyInformerFactory := informer.NewSharedInformerFactoryWithOptions(
controllerCfg.provisioningClient,
controllerCfg.historyExpiration,
)
historyJobInformer := historyInformerFactory.Provisioning().V0alpha1().HistoricJobs()
_, err = controller.NewHistoryJobController(
controllerCfg.provisioningClient.ProvisioningV0alpha1(),
historyJobInformer,
controllerCfg.historyExpiration,
)
if err != nil {
return fmt.Errorf("failed to create history job controller: %w", err)
}
logger.Info("history cleanup enabled", "expiration", controllerCfg.historyExpiration.String())
startHistoryInformers = func() { historyInformerFactory.Start(ctx.Done()) }
} else {
startHistoryInformers = func() {}
}
// Start informers
go jobInformerFactory.Start(ctx.Done())
go startHistoryInformers()
// Optionally wait for job cache sync; history cleanup can rely on resync events
if !cache.WaitForCacheSync(ctx.Done(), jobInformer.Informer().HasSynced) {
return fmt.Errorf("failed to sync job informer cache")
}
<-ctx.Done()
return nil
}
func setupFromConfig(cfg *setting.Cfg) (controllerCfg *controllerConfig, err error) {
if cfg == nil {
return nil, fmt.Errorf("no configuration available")
}
gRPCAuth := cfg.SectionWithEnvOverrides("grpc_client_authentication")
token := gRPCAuth.Key("token").String()
if token == "" {
return nil, fmt.Errorf("token is required in [grpc_client_authentication] section")
}
tokenExchangeURL := gRPCAuth.Key("token_exchange_url").String()
if tokenExchangeURL == "" {
return nil, fmt.Errorf("token_exchange_url is required in [grpc_client_authentication] section")
}
operatorSec := cfg.SectionWithEnvOverrides("operator")
provisioningServerURL := operatorSec.Key("provisioning_server_url").String()
if provisioningServerURL == "" {
return nil, fmt.Errorf("provisioning_server_url is required in [operator] section")
}
tlsInsecure := operatorSec.Key("tls_insecure").MustBool(false)
tlsCertFile := operatorSec.Key("tls_cert_file").String()
tlsKeyFile := operatorSec.Key("tls_key_file").String()
tlsCAFile := operatorSec.Key("tls_ca_file").String()
tokenExchangeClient, err := authn.NewTokenExchangeClient(authn.TokenExchangeConfig{
TokenExchangeURL: tokenExchangeURL,
Token: token,
})
if err != nil {
return nil, fmt.Errorf("failed to create token exchange client: %w", err)
}
tlsConfig, err := buildTLSConfig(tlsInsecure, tlsCertFile, tlsKeyFile, tlsCAFile)
if err != nil {
return nil, fmt.Errorf("failed to build TLS configuration: %w", err)
}
config := &rest.Config{
APIPath: "/apis",
Host: provisioningServerURL,
WrapTransport: transport.WrapperFunc(func(rt http.RoundTripper) http.RoundTripper {
return authrt.NewRoundTripper(tokenExchangeClient, rt)
}),
TLSClientConfig: tlsConfig,
}
provisioningClient, err := client.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to create provisioning client: %w", err)
}
return &controllerConfig{
provisioningClient: provisioningClient,
historyExpiration: operatorSec.Key("history_expiration").MustDuration(0),
}, nil
}
func buildTLSConfig(insecure bool, certFile, keyFile, caFile string) (rest.TLSClientConfig, error) {
tlsConfig := rest.TLSClientConfig{
Insecure: insecure,
}
if certFile != "" && keyFile != "" {
tlsConfig.CertFile = certFile
tlsConfig.KeyFile = keyFile
}
if caFile != "" {
// caFile is set in operator.ini file
// nolint:gosec
caCert, err := os.ReadFile(caFile)
if err != nil {
return tlsConfig, fmt.Errorf("failed to read CA certificate file: %w", err)
}
caCertPool := x509.NewCertPool()
if !caCertPool.AppendCertsFromPEM(caCert) {
return tlsConfig, fmt.Errorf("failed to parse CA certificate")
}
tlsConfig.CAData = caCert
}
return tlsConfig, nil
}