Compare commits
4 Commits
plugin-jso
...
rm-dualwri
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
37a1a45194 | ||
|
|
8cb33642ba | ||
|
|
283ad15e84 | ||
|
|
070dc2288e |
@@ -7,7 +7,6 @@ require (
|
||||
github.com/grafana/authlib/types v0.0.0-20251119142549-be091cf2f4d4
|
||||
github.com/grafana/grafana-app-sdk/logging v0.48.7
|
||||
github.com/grafana/grafana/pkg/apimachinery v0.0.0-20250514132646-acbc7b54ed9e
|
||||
github.com/prometheus/client_golang v1.23.2
|
||||
github.com/stretchr/testify v1.11.1
|
||||
go.opentelemetry.io/contrib/propagators/jaeger v1.38.0
|
||||
go.opentelemetry.io/otel v1.39.0
|
||||
@@ -69,6 +68,7 @@ require (
|
||||
github.com/onsi/gomega v1.36.2 // indirect
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
|
||||
github.com/prometheus/client_golang v1.23.2 // indirect
|
||||
github.com/prometheus/client_model v0.6.2 // indirect
|
||||
github.com/prometheus/common v0.67.4 // indirect
|
||||
github.com/prometheus/procfs v0.19.2 // indirect
|
||||
|
||||
@@ -1,15 +1,9 @@
|
||||
package rest
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||||
)
|
||||
@@ -66,103 +60,6 @@ const (
|
||||
Mode5
|
||||
)
|
||||
|
||||
type NamespacedKVStore interface {
|
||||
Get(ctx context.Context, key string) (string, bool, error)
|
||||
Set(ctx context.Context, key, value string) error
|
||||
}
|
||||
|
||||
type ServerLockService interface {
|
||||
LockExecuteAndRelease(ctx context.Context, actionName string, maxInterval time.Duration, fn func(ctx context.Context)) error
|
||||
}
|
||||
|
||||
func SetDualWritingMode(
|
||||
ctx context.Context,
|
||||
kvs NamespacedKVStore,
|
||||
cfg *SyncerConfig,
|
||||
metrics *DualWriterMetrics,
|
||||
) (DualWriterMode, error) {
|
||||
if cfg == nil {
|
||||
return Mode0, errors.New("syncer config is nil")
|
||||
}
|
||||
// Mode0 means no DualWriter
|
||||
if cfg.Mode == Mode0 {
|
||||
return Mode0, nil
|
||||
}
|
||||
|
||||
toMode := map[string]DualWriterMode{
|
||||
// It is not possible to initialize a mode 0 dual writer. Mode 0 represents
|
||||
// writing to legacy storage without Unified Storage enabled.
|
||||
"1": Mode1,
|
||||
"2": Mode2,
|
||||
"3": Mode3,
|
||||
"4": Mode4,
|
||||
"5": Mode5,
|
||||
}
|
||||
errDualWriterSetCurrentMode := errors.New("failed to set current dual writing mode")
|
||||
|
||||
// Use entity name as key
|
||||
kvMode, ok, err := kvs.Get(ctx, cfg.Kind)
|
||||
if err != nil {
|
||||
return Mode0, errors.New("failed to fetch current dual writing mode")
|
||||
}
|
||||
|
||||
currentMode, exists := toMode[kvMode]
|
||||
|
||||
// If the mode does not exist in our mapping, we log an error.
|
||||
if !exists && ok {
|
||||
// Only log if "ok" because initially all instances will have mode unset for playlists.
|
||||
klog.Infof("invalid dual writing mode for %s mode: %v", cfg.Kind, kvMode)
|
||||
}
|
||||
|
||||
// If the mode does not exist in our mapping, and we also didn't find an entry for this kind, fallback.
|
||||
if !exists || !ok {
|
||||
// Default to mode 1
|
||||
currentMode = Mode1
|
||||
if err := kvs.Set(ctx, cfg.Kind, fmt.Sprint(currentMode)); err != nil {
|
||||
return Mode0, errDualWriterSetCurrentMode
|
||||
}
|
||||
}
|
||||
|
||||
isUpgradeToReadUnifiedMode := currentMode < Mode3 && cfg.Mode >= Mode3
|
||||
if !isUpgradeToReadUnifiedMode {
|
||||
if err := kvs.Set(ctx, cfg.Kind, fmt.Sprint(cfg.Mode)); err != nil {
|
||||
return Mode0, errDualWriterSetCurrentMode
|
||||
}
|
||||
return cfg.Mode, nil
|
||||
}
|
||||
|
||||
// If SkipDataSync is enabled, we can set the mode directly without running the syncer.
|
||||
if cfg.SkipDataSync {
|
||||
if err := kvs.Set(ctx, cfg.Kind, fmt.Sprint(cfg.Mode)); err != nil {
|
||||
return Mode0, errDualWriterSetCurrentMode
|
||||
}
|
||||
return cfg.Mode, nil
|
||||
}
|
||||
|
||||
// Transitioning to Mode3 or higher from Mode0, Mode1, or Mode2.
|
||||
// We need to run the syncer in the current mode before we can upgrade to Mode3 or higher.
|
||||
cfgModeTmp := cfg.Mode
|
||||
// Before running the sync, set the syncer config to the current mode, as we have to run the syncer
|
||||
// once in the current active mode before we can upgrade.
|
||||
cfg.Mode = currentMode
|
||||
syncOk, err := runDataSyncer(ctx, cfg, metrics)
|
||||
// Once we are done with running the syncer, we can change the mode back on the config to the desired one.
|
||||
cfg.Mode = cfgModeTmp
|
||||
if err != nil {
|
||||
klog.Error("data syncer failed for mode:", kvMode, "err", err)
|
||||
return currentMode, nil
|
||||
}
|
||||
if !syncOk {
|
||||
klog.Info("data syncer not ok for mode:", kvMode)
|
||||
return currentMode, nil
|
||||
}
|
||||
// If sync is successful, update the mode to the desired one.
|
||||
if err := kvs.Set(ctx, cfg.Kind, fmt.Sprint(cfg.Mode)); err != nil {
|
||||
return Mode0, errDualWriterSetCurrentMode
|
||||
}
|
||||
return cfg.Mode, nil
|
||||
}
|
||||
|
||||
// Compare asserts on the equality of objects returned from both stores (object storage and legacy storage)
|
||||
func Compare(objA, objB runtime.Object) bool {
|
||||
if objA == nil || objB == nil {
|
||||
|
||||
@@ -1,328 +0,0 @@
|
||||
package rest
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/identity"
|
||||
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||||
)
|
||||
|
||||
type syncItem struct {
|
||||
name string
|
||||
objStorage runtime.Object
|
||||
objLegacy runtime.Object
|
||||
accessorStorage utils.GrafanaMetaAccessor
|
||||
accessorLegacy utils.GrafanaMetaAccessor
|
||||
}
|
||||
|
||||
type SyncerConfig struct {
|
||||
Kind string
|
||||
RequestInfo *request.RequestInfo
|
||||
|
||||
Mode DualWriterMode
|
||||
LegacyStorage Storage
|
||||
Storage Storage
|
||||
ServerLockService ServerLockService
|
||||
SkipDataSync bool
|
||||
|
||||
DataSyncerInterval time.Duration
|
||||
DataSyncerRecordsLimit int
|
||||
}
|
||||
|
||||
func (s *SyncerConfig) Validate() error {
|
||||
if s == nil {
|
||||
return fmt.Errorf("syncer config is nil")
|
||||
}
|
||||
if s.Kind == "" {
|
||||
return fmt.Errorf("kind must be specified")
|
||||
}
|
||||
if s.RequestInfo == nil {
|
||||
return fmt.Errorf("requestInfo must be specified")
|
||||
}
|
||||
if s.ServerLockService == nil {
|
||||
return fmt.Errorf("serverLockService must be specified")
|
||||
}
|
||||
if s.Storage == nil {
|
||||
return fmt.Errorf("storage must be specified")
|
||||
}
|
||||
if s.LegacyStorage == nil {
|
||||
return fmt.Errorf("legacy storage must be specified")
|
||||
}
|
||||
if s.DataSyncerInterval == 0 {
|
||||
s.DataSyncerInterval = time.Hour
|
||||
}
|
||||
if s.DataSyncerRecordsLimit == 0 {
|
||||
s.DataSyncerRecordsLimit = 1000
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// StartPeriodicDataSyncer starts a background job that will execute the DataSyncer, syncing the data
|
||||
// from the hosted grafana backend into the unified storage backend. This is run in the grafana instance.
|
||||
func StartPeriodicDataSyncer(ctx context.Context, cfg *SyncerConfig, metrics *DualWriterMetrics) error {
|
||||
if err := cfg.Validate(); err != nil {
|
||||
return fmt.Errorf("invalid syncer config: %w", err)
|
||||
}
|
||||
|
||||
log := klog.NewKlogr().WithName("legacyToUnifiedStorageDataSyncer").WithValues("mode", cfg.Mode, "resource", cfg.Kind)
|
||||
|
||||
log.Info("Starting periodic data syncer")
|
||||
|
||||
// run in background
|
||||
go func() {
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
timeWindow := 600 // 600 seconds (10 minutes)
|
||||
jitterSeconds := r.Int63n(int64(timeWindow))
|
||||
log.Info("data syncer scheduled", "starting time", time.Now().Add(time.Second*time.Duration(jitterSeconds)))
|
||||
time.Sleep(time.Second * time.Duration(jitterSeconds))
|
||||
|
||||
// run it immediately
|
||||
syncOK, err := runDataSyncer(ctx, cfg, metrics)
|
||||
log.Info("data syncer finished", "syncOK", syncOK, "error", err)
|
||||
|
||||
ticker := time.NewTicker(cfg.DataSyncerInterval)
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
syncOK, err = runDataSyncer(ctx, cfg, metrics)
|
||||
log.Info("data syncer finished", "syncOK", syncOK, ", error", err)
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
// runDataSyncer will ensure that data between legacy storage and unified storage are in sync.
|
||||
// The sync implementation depends on the DualWriter mode
|
||||
func runDataSyncer(ctx context.Context, cfg *SyncerConfig, metrics *DualWriterMetrics) (bool, error) {
|
||||
if err := cfg.Validate(); err != nil {
|
||||
return false, fmt.Errorf("invalid syncer config: %w", err)
|
||||
}
|
||||
// ensure that execution takes no longer than necessary
|
||||
timeout := cfg.DataSyncerInterval - time.Minute
|
||||
ctx, cancelFn := context.WithTimeout(ctx, timeout)
|
||||
defer cancelFn()
|
||||
|
||||
// implementation depends on the current DualWriter mode
|
||||
switch cfg.Mode {
|
||||
case Mode1, Mode2:
|
||||
return legacyToUnifiedStorageDataSyncer(ctx, cfg, metrics)
|
||||
default:
|
||||
klog.Info("data syncer not implemented for mode:", cfg.Mode)
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
func legacyToUnifiedStorageDataSyncer(ctx context.Context, cfg *SyncerConfig, metrics *DualWriterMetrics) (bool, error) {
|
||||
if err := cfg.Validate(); err != nil {
|
||||
return false, fmt.Errorf("invalid syncer config: %w", err)
|
||||
}
|
||||
|
||||
log := klog.NewKlogr().WithName("legacyToUnifiedStorageDataSyncer").WithValues("mode", cfg.Mode, "resource", cfg.Kind)
|
||||
|
||||
everythingSynced := false
|
||||
outOfSync := 0
|
||||
syncSuccess := 0
|
||||
syncErr := 0
|
||||
|
||||
maxInterval := cfg.DataSyncerInterval + 5*time.Minute
|
||||
|
||||
var errSync error
|
||||
|
||||
// LockExecuteAndRelease ensures that just a single Grafana server acquires a lock at a time
|
||||
// The parameter 'maxInterval' is a timeout safeguard, if the LastExecution in the
|
||||
// database is older than maxInterval, we will assume the lock as timeouted. The 'maxInterval' parameter should be so long
|
||||
// that is impossible for 2 processes to run at the same time.
|
||||
err := cfg.ServerLockService.LockExecuteAndRelease(ctx, fmt.Sprintf("legacyToUnifiedStorageDataSyncer-%d-%s", cfg.Mode, cfg.Kind), maxInterval, func(context.Context) {
|
||||
log.Info("starting legacyToUnifiedStorageDataSyncer")
|
||||
startSync := time.Now()
|
||||
|
||||
ctx = klog.NewContext(ctx, log)
|
||||
ctx, _ = identity.WithServiceIdentity(ctx, 0)
|
||||
ctx = request.WithNamespace(ctx, cfg.RequestInfo.Namespace)
|
||||
ctx = request.WithRequestInfo(ctx, cfg.RequestInfo)
|
||||
|
||||
storageList, err := getList(ctx, cfg.Storage, &metainternalversion.ListOptions{
|
||||
Limit: int64(cfg.DataSyncerRecordsLimit),
|
||||
})
|
||||
if err != nil {
|
||||
log.Error(err, "unable to extract list from storage")
|
||||
return
|
||||
}
|
||||
|
||||
if len(storageList) >= cfg.DataSyncerRecordsLimit {
|
||||
errSync = fmt.Errorf("unified storage has more than %d records. Aborting sync", cfg.DataSyncerRecordsLimit)
|
||||
log.Error(errSync, "Unified storage has more records to be synced than allowed")
|
||||
return
|
||||
}
|
||||
|
||||
log.Info("got items from unified storage", "items", len(storageList))
|
||||
|
||||
legacyList, err := getList(ctx, cfg.LegacyStorage, &metainternalversion.ListOptions{
|
||||
Limit: int64(cfg.DataSyncerRecordsLimit),
|
||||
})
|
||||
if err != nil {
|
||||
log.Error(err, "unable to extract list from legacy storage")
|
||||
return
|
||||
}
|
||||
log.Info("got items from legacy storage", "items", len(legacyList))
|
||||
|
||||
itemsByName := map[string]syncItem{}
|
||||
for _, obj := range legacyList {
|
||||
accessor, err := utils.MetaAccessor(obj)
|
||||
if err != nil {
|
||||
log.Error(err, "error retrieving accessor data for object from legacy storage")
|
||||
continue
|
||||
}
|
||||
name := accessor.GetName()
|
||||
|
||||
item := itemsByName[name]
|
||||
item.name = name
|
||||
item.objLegacy = obj
|
||||
item.accessorLegacy = accessor
|
||||
itemsByName[name] = item
|
||||
}
|
||||
|
||||
for _, obj := range storageList {
|
||||
accessor, err := utils.MetaAccessor(obj)
|
||||
if err != nil {
|
||||
log.Error(err, "error retrieving accessor data for object from storage")
|
||||
continue
|
||||
}
|
||||
name := accessor.GetName()
|
||||
|
||||
item := itemsByName[name]
|
||||
item.name = name
|
||||
item.objStorage = obj
|
||||
item.accessorStorage = accessor
|
||||
itemsByName[name] = item
|
||||
}
|
||||
log.Info("got list of items to be synced", "items", len(itemsByName))
|
||||
|
||||
for name, item := range itemsByName {
|
||||
// upsert if:
|
||||
// - existing in both legacy and storage, but objects are different, or
|
||||
// - if it's missing from storage
|
||||
if item.objLegacy != nil &&
|
||||
(item.objStorage == nil || !Compare(item.objLegacy, item.objStorage)) {
|
||||
outOfSync++
|
||||
|
||||
if item.objStorage != nil {
|
||||
item.accessorLegacy.SetResourceVersion(item.accessorStorage.GetResourceVersion())
|
||||
item.accessorLegacy.SetUID(item.accessorStorage.GetUID())
|
||||
|
||||
log.Info("updating item on unified storage", "name", name)
|
||||
} else {
|
||||
item.accessorLegacy.SetResourceVersion("")
|
||||
item.accessorLegacy.SetUID("")
|
||||
|
||||
log.Info("inserting item on unified storage", "name", name)
|
||||
}
|
||||
|
||||
objInfo := rest.DefaultUpdatedObjectInfo(item.objLegacy, []rest.TransformFunc{}...)
|
||||
res, _, err := cfg.Storage.Update(ctx,
|
||||
name,
|
||||
objInfo,
|
||||
func(ctx context.Context, obj runtime.Object) error { return nil },
|
||||
func(ctx context.Context, obj, old runtime.Object) error { return nil },
|
||||
true, // force creation
|
||||
&metav1.UpdateOptions{},
|
||||
)
|
||||
if err != nil {
|
||||
log.WithValues("object", res).Error(err, "could not update in storage")
|
||||
syncErr++
|
||||
} else {
|
||||
syncSuccess++
|
||||
}
|
||||
}
|
||||
|
||||
// delete if object does not exists on legacy but exists on storage
|
||||
if item.objLegacy == nil && item.objStorage != nil {
|
||||
outOfSync++
|
||||
|
||||
ctx = request.WithRequestInfo(ctx, &request.RequestInfo{
|
||||
APIGroup: cfg.RequestInfo.APIGroup,
|
||||
Resource: cfg.RequestInfo.Resource,
|
||||
Name: name,
|
||||
Namespace: cfg.RequestInfo.Namespace,
|
||||
})
|
||||
|
||||
log.Info("deleting item from unified storage", "name", name)
|
||||
|
||||
deletedS, _, err := cfg.Storage.Delete(ctx, name, func(ctx context.Context, obj runtime.Object) error { return nil }, &metav1.DeleteOptions{})
|
||||
if err != nil && !apierrors.IsNotFound(err) {
|
||||
log.WithValues("objectList", deletedS).Error(err, "could not delete from storage")
|
||||
syncErr++
|
||||
continue
|
||||
}
|
||||
|
||||
syncSuccess++
|
||||
}
|
||||
}
|
||||
|
||||
everythingSynced = outOfSync == syncSuccess
|
||||
|
||||
metrics.recordDataSyncerOutcome(cfg.Mode, cfg.Kind, everythingSynced)
|
||||
metrics.recordDataSyncerDuration(err != nil, cfg.Mode, cfg.Kind, startSync)
|
||||
|
||||
log.Info("finished syncing items", "items", len(itemsByName), "updated", syncSuccess, "failed", syncErr, "outcome", everythingSynced)
|
||||
})
|
||||
|
||||
if errSync != nil {
|
||||
err = errSync
|
||||
}
|
||||
|
||||
return everythingSynced, err
|
||||
}
|
||||
|
||||
func getList(ctx context.Context, obj rest.Lister, listOptions *metainternalversion.ListOptions) ([]runtime.Object, error) {
|
||||
var allItems []runtime.Object
|
||||
|
||||
for {
|
||||
if int64(len(allItems)) >= listOptions.Limit {
|
||||
return nil, fmt.Errorf("list has more than %d records. Aborting sync", listOptions.Limit)
|
||||
}
|
||||
|
||||
ll, err := obj.List(ctx, listOptions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
items, err := meta.ExtractList(ll)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
allItems = append(allItems, items...)
|
||||
|
||||
// Get continue token from the list metadata.
|
||||
listMeta, err := meta.ListAccessor(ll)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// If no continue token, we're done paginating.
|
||||
if listMeta.GetContinue() == "" {
|
||||
break
|
||||
}
|
||||
|
||||
// Set continue token for next page.
|
||||
listOptions.Continue = listMeta.GetContinue()
|
||||
}
|
||||
|
||||
return allItems, nil
|
||||
}
|
||||
@@ -1,248 +0,0 @@
|
||||
package rest
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apiserver/pkg/apis/example"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
)
|
||||
|
||||
var legacyObj1 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo1", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
|
||||
var legacyObj2 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo2", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
|
||||
var legacyObj3 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo3", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
|
||||
var legacyObj4 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo4", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
|
||||
|
||||
var legacyObj2WithHostname = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo2", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{Hostname: "hostname"}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
|
||||
|
||||
var storageObj1 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo1", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
|
||||
var storageObj2 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo2", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
|
||||
var storageObj3 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo3", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
|
||||
var storageObj4 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo4", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
|
||||
|
||||
var legacyListWith3items = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{},
|
||||
Items: []example.Pod{
|
||||
*legacyObj1,
|
||||
*legacyObj2,
|
||||
*legacyObj3,
|
||||
}}
|
||||
|
||||
var legacyListWith4items = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{},
|
||||
Items: []example.Pod{
|
||||
*legacyObj1,
|
||||
*legacyObj2,
|
||||
*legacyObj3,
|
||||
*legacyObj4,
|
||||
}}
|
||||
|
||||
var legacyListWith3itemsObj2IsDifferent = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{},
|
||||
Items: []example.Pod{
|
||||
*legacyObj1,
|
||||
*legacyObj2WithHostname,
|
||||
*legacyObj3,
|
||||
}}
|
||||
|
||||
var storageListWith3items = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{},
|
||||
Items: []example.Pod{
|
||||
*storageObj1,
|
||||
*storageObj2,
|
||||
*storageObj3,
|
||||
}}
|
||||
|
||||
var storageListWith4items = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{},
|
||||
Items: []example.Pod{
|
||||
*storageObj1,
|
||||
*storageObj2,
|
||||
*storageObj3,
|
||||
*storageObj4,
|
||||
}}
|
||||
|
||||
var storageListWith3itemsMissingFoo2 = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{},
|
||||
Items: []example.Pod{
|
||||
*storageObj1,
|
||||
*storageObj3,
|
||||
*storageObj4,
|
||||
}}
|
||||
|
||||
func TestLegacyToUnifiedStorage_DataSyncer(t *testing.T) {
|
||||
type testCase struct {
|
||||
setupLegacyFn func(m *MockStorage)
|
||||
setupStorageFn func(m *MockStorage)
|
||||
name string
|
||||
expectedOutcome bool
|
||||
wantErr bool
|
||||
}
|
||||
tests :=
|
||||
[]testCase{
|
||||
{
|
||||
name: "both stores are in sync",
|
||||
setupLegacyFn: func(m *MockStorage) {
|
||||
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil)
|
||||
},
|
||||
setupStorageFn: func(m *MockStorage) {
|
||||
m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil)
|
||||
},
|
||||
expectedOutcome: true,
|
||||
},
|
||||
{
|
||||
name: "both stores are in sync - fail to list from legacy",
|
||||
setupLegacyFn: func(m *MockStorage) {
|
||||
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, errors.New("error"))
|
||||
},
|
||||
setupStorageFn: func(m *MockStorage) {
|
||||
m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil)
|
||||
},
|
||||
expectedOutcome: false,
|
||||
},
|
||||
{
|
||||
name: "both stores are in sync - fail to list from storage",
|
||||
setupLegacyFn: func(m *MockStorage) {
|
||||
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil).Maybe()
|
||||
},
|
||||
setupStorageFn: func(m *MockStorage) {
|
||||
m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, errors.New("error"))
|
||||
},
|
||||
expectedOutcome: false,
|
||||
},
|
||||
{
|
||||
name: "storage is missing 1 entry (foo4)",
|
||||
setupLegacyFn: func(m *MockStorage) {
|
||||
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith4items, nil)
|
||||
},
|
||||
setupStorageFn: func(m *MockStorage) {
|
||||
m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil)
|
||||
m.On("Update", mock.Anything, "foo4", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil)
|
||||
},
|
||||
expectedOutcome: true,
|
||||
},
|
||||
{
|
||||
name: "storage needs to be update (foo2 is different)",
|
||||
setupLegacyFn: func(m *MockStorage) {
|
||||
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3itemsObj2IsDifferent, nil)
|
||||
},
|
||||
setupStorageFn: func(m *MockStorage) {
|
||||
m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil)
|
||||
m.On("Update", mock.Anything, "foo2", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil)
|
||||
},
|
||||
expectedOutcome: true,
|
||||
},
|
||||
{
|
||||
name: "storage is missing 1 entry (foo4) - fail to upsert",
|
||||
setupLegacyFn: func(m *MockStorage) {
|
||||
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith4items, nil)
|
||||
},
|
||||
setupStorageFn: func(m *MockStorage) {
|
||||
m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil)
|
||||
m.On("Update", mock.Anything, "foo4", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, errors.New("error"))
|
||||
},
|
||||
expectedOutcome: false,
|
||||
},
|
||||
{
|
||||
name: "storage has an extra 1 entry (foo4)",
|
||||
setupLegacyFn: func(m *MockStorage) {
|
||||
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil)
|
||||
},
|
||||
setupStorageFn: func(m *MockStorage) {
|
||||
m.On("List", mock.Anything, mock.Anything).Return(storageListWith4items, nil)
|
||||
m.On("Delete", mock.Anything, "foo4", mock.Anything, mock.Anything).Return(exampleObj, false, nil)
|
||||
},
|
||||
expectedOutcome: true,
|
||||
},
|
||||
{
|
||||
name: "storage has an extra 1 entry (foo4) - fail to delete",
|
||||
setupLegacyFn: func(m *MockStorage) {
|
||||
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil)
|
||||
},
|
||||
setupStorageFn: func(m *MockStorage) {
|
||||
m.On("List", mock.Anything, mock.Anything).Return(storageListWith4items, nil)
|
||||
m.On("Delete", mock.Anything, "foo4", mock.Anything, mock.Anything).Return(exampleObj, false, errors.New("error"))
|
||||
},
|
||||
expectedOutcome: false,
|
||||
},
|
||||
{
|
||||
name: "storage is missing 1 entry (foo3) and has an extra 1 entry (foo4)",
|
||||
setupLegacyFn: func(m *MockStorage) {
|
||||
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil)
|
||||
},
|
||||
setupStorageFn: func(m *MockStorage) {
|
||||
m.On("List", mock.Anything, mock.Anything).Return(storageListWith3itemsMissingFoo2, nil)
|
||||
m.On("Update", mock.Anything, "foo2", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil)
|
||||
m.On("Delete", mock.Anything, "foo4", mock.Anything, mock.Anything).Return(exampleObj, false, nil)
|
||||
},
|
||||
expectedOutcome: true,
|
||||
},
|
||||
}
|
||||
|
||||
// mode 1
|
||||
for _, tt := range tests {
|
||||
t.Run("Mode-1-"+tt.name, func(t *testing.T) {
|
||||
ls := NewMockStorage(t)
|
||||
us := NewMockStorage(t)
|
||||
|
||||
if tt.setupLegacyFn != nil {
|
||||
tt.setupLegacyFn(ls)
|
||||
}
|
||||
if tt.setupStorageFn != nil {
|
||||
tt.setupStorageFn(us)
|
||||
}
|
||||
|
||||
outcome, err := legacyToUnifiedStorageDataSyncer(context.Background(), &SyncerConfig{
|
||||
Mode: Mode1,
|
||||
LegacyStorage: ls,
|
||||
Storage: us,
|
||||
Kind: "test.kind",
|
||||
ServerLockService: &fakeServerLock{},
|
||||
RequestInfo: &request.RequestInfo{},
|
||||
|
||||
DataSyncerRecordsLimit: 1000,
|
||||
DataSyncerInterval: time.Hour,
|
||||
}, NewDualWriterMetrics(nil))
|
||||
if tt.wantErr {
|
||||
assert.Error(t, err)
|
||||
return
|
||||
}
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, tt.expectedOutcome, outcome)
|
||||
})
|
||||
}
|
||||
|
||||
// mode 2
|
||||
for _, tt := range tests {
|
||||
t.Run("Mode-2-"+tt.name, func(t *testing.T) {
|
||||
ls := NewMockStorage(t)
|
||||
us := NewMockStorage(t)
|
||||
|
||||
if tt.setupLegacyFn != nil {
|
||||
tt.setupLegacyFn(ls)
|
||||
}
|
||||
if tt.setupStorageFn != nil {
|
||||
tt.setupStorageFn(us)
|
||||
}
|
||||
|
||||
outcome, err := legacyToUnifiedStorageDataSyncer(context.Background(), &SyncerConfig{
|
||||
Mode: Mode2,
|
||||
LegacyStorage: ls,
|
||||
Storage: us,
|
||||
Kind: "test.kind",
|
||||
ServerLockService: &fakeServerLock{},
|
||||
RequestInfo: &request.RequestInfo{},
|
||||
|
||||
DataSyncerRecordsLimit: 1000,
|
||||
DataSyncerInterval: time.Hour,
|
||||
}, NewDualWriterMetrics(nil))
|
||||
if tt.wantErr {
|
||||
assert.Error(t, err)
|
||||
return
|
||||
}
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, tt.expectedOutcome, outcome)
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -1,124 +1,18 @@
|
||||
package rest
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apiserver/pkg/apis/example"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
)
|
||||
|
||||
var now = time.Now()
|
||||
|
||||
var exampleObj = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1", CreationTimestamp: metav1.Time{}, GenerateName: "foo"}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: now}}}
|
||||
var anotherObj = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "bar", ResourceVersion: "2", GenerateName: "foo"}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: now}}}
|
||||
var exampleList = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{}, Items: []example.Pod{*exampleObj}}
|
||||
var anotherList = &example.PodList{Items: []example.Pod{*anotherObj}}
|
||||
|
||||
func TestSetDualWritingMode(t *testing.T) {
|
||||
type testCase struct {
|
||||
name string
|
||||
kvStore *fakeNamespacedKV
|
||||
desiredMode DualWriterMode
|
||||
expectedMode DualWriterMode
|
||||
expectedKVMode string
|
||||
skipDataSync bool
|
||||
serverLockError error
|
||||
}
|
||||
tests :=
|
||||
[]testCase{
|
||||
{
|
||||
name: "should return a mode 2 dual writer when mode 2 is set as the desired mode",
|
||||
kvStore: &fakeNamespacedKV{data: map[string]string{"playlist.grafana.app/playlists": "2"}, namespace: "storage.dualwriting"},
|
||||
desiredMode: Mode2,
|
||||
expectedMode: Mode2,
|
||||
expectedKVMode: "2",
|
||||
},
|
||||
{
|
||||
name: "should return a mode 1 dual writer when mode 1 is set as the desired mode",
|
||||
kvStore: &fakeNamespacedKV{data: map[string]string{"playlist.grafana.app/playlists": "2"}, namespace: "storage.dualwriting"},
|
||||
desiredMode: Mode1,
|
||||
expectedMode: Mode1,
|
||||
expectedKVMode: "1",
|
||||
},
|
||||
{
|
||||
name: "should return mode 3 as desired mode when current mode is > 3",
|
||||
kvStore: &fakeNamespacedKV{data: map[string]string{"playlist.grafana.app/playlists": "5"}, namespace: "storage.dualwriting"},
|
||||
desiredMode: Mode3,
|
||||
expectedMode: Mode3,
|
||||
expectedKVMode: "3",
|
||||
},
|
||||
{
|
||||
name: "should return mode 3 as desired mode when current mode is 2",
|
||||
kvStore: &fakeNamespacedKV{data: map[string]string{"playlist.grafana.app/playlists": "2"}, namespace: "storage.dualwriting"},
|
||||
desiredMode: Mode3,
|
||||
expectedMode: Mode3,
|
||||
expectedKVMode: "3",
|
||||
},
|
||||
{
|
||||
name: "should default to mode 0 if there is no desired mode",
|
||||
kvStore: &fakeNamespacedKV{data: map[string]string{}, namespace: "storage.dualwriting"},
|
||||
desiredMode: Mode0,
|
||||
expectedMode: Mode0,
|
||||
expectedKVMode: "",
|
||||
},
|
||||
{
|
||||
name: "should keep mode2 when trying to go from mode2 to mode3 and the server lock service returns an error",
|
||||
kvStore: &fakeNamespacedKV{data: map[string]string{"playlist.grafana.app/playlists": "2"}, namespace: "storage.dualwriting"},
|
||||
desiredMode: Mode3,
|
||||
expectedMode: Mode2,
|
||||
expectedKVMode: "2",
|
||||
serverLockError: fmt.Errorf("lock already exists"),
|
||||
},
|
||||
{
|
||||
name: "should keep mode2 when trying to go from mode2 to mode3 and migration is disabled",
|
||||
kvStore: &fakeNamespacedKV{data: map[string]string{"playlist.grafana.app/playlists": "2"}, namespace: "storage.dualwriting"},
|
||||
desiredMode: Mode3,
|
||||
expectedMode: Mode3,
|
||||
expectedKVMode: "3",
|
||||
skipDataSync: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
us := NewMockStorage(t)
|
||||
us.On("List", mock.Anything, mock.Anything).Return(anotherList, nil).Maybe()
|
||||
us.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil).Maybe()
|
||||
us.On("Delete", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil).Maybe()
|
||||
|
||||
ls := NewMockStorage(t)
|
||||
ls.On("List", mock.Anything, mock.Anything).Return(exampleList, nil).Maybe()
|
||||
|
||||
serverLockSvc := &fakeServerLock{
|
||||
err: tt.serverLockError,
|
||||
}
|
||||
|
||||
dwMode, err := SetDualWritingMode(context.Background(), tt.kvStore, &SyncerConfig{
|
||||
LegacyStorage: ls,
|
||||
Storage: us,
|
||||
Kind: "playlist.grafana.app/playlists",
|
||||
Mode: tt.desiredMode,
|
||||
SkipDataSync: tt.skipDataSync,
|
||||
ServerLockService: serverLockSvc,
|
||||
RequestInfo: &request.RequestInfo{},
|
||||
|
||||
DataSyncerRecordsLimit: 1000,
|
||||
DataSyncerInterval: time.Hour,
|
||||
}, NewDualWriterMetrics(nil))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tt.expectedMode, dwMode)
|
||||
|
||||
kvMode, _, err := tt.kvStore.Get(context.Background(), "playlist.grafana.app/playlists")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tt.expectedKVMode, kvMode, "expected mode for playlist.grafana.app/playlists")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCompare(t *testing.T) {
|
||||
var exampleObjGen1 = &example.Pod{ObjectMeta: metav1.ObjectMeta{Generation: 1}, Spec: example.PodSpec{Hostname: "one"}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Unix(0, 0)}}}
|
||||
@@ -163,30 +57,3 @@ func TestCompare(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type fakeNamespacedKV struct {
|
||||
namespace string
|
||||
data map[string]string
|
||||
}
|
||||
|
||||
func (f *fakeNamespacedKV) Get(ctx context.Context, key string) (string, bool, error) {
|
||||
val, ok := f.data[key]
|
||||
return val, ok, nil
|
||||
}
|
||||
|
||||
func (f *fakeNamespacedKV) Set(ctx context.Context, key, value string) error {
|
||||
f.data[key] = value
|
||||
return nil
|
||||
}
|
||||
|
||||
type fakeServerLock struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (f *fakeServerLock) LockExecuteAndRelease(ctx context.Context, actionName string, duration time.Duration, fn func(ctx context.Context)) error {
|
||||
if f.err != nil {
|
||||
return f.err
|
||||
}
|
||||
fn(ctx)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1,48 +0,0 @@
|
||||
package rest
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
)
|
||||
|
||||
type DualWriterMetrics struct {
|
||||
// DualWriterSyncerDuration is a metric summary for dual writer sync duration per mode
|
||||
syncer *prometheus.HistogramVec
|
||||
// DualWriterDataSyncerOutcome is a metric summary for dual writer data syncer outcome comparison between the 2 stores per mode
|
||||
syncerOutcome *prometheus.HistogramVec
|
||||
}
|
||||
|
||||
func NewDualWriterMetrics(reg prometheus.Registerer) *DualWriterMetrics {
|
||||
return &DualWriterMetrics{
|
||||
syncer: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
|
||||
Name: "dual_writer_data_syncer_duration_seconds",
|
||||
Help: "Histogram for the runtime of dual writer data syncer duration per mode",
|
||||
Namespace: "grafana",
|
||||
NativeHistogramBucketFactor: 1.1,
|
||||
}, []string{"is_error", "mode", "resource"}),
|
||||
|
||||
syncerOutcome: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
|
||||
Name: "dual_writer_data_syncer_outcome",
|
||||
Help: "Histogram for the runtime of dual writer data syncer outcome comparison between the 2 stores per mode",
|
||||
Namespace: "grafana",
|
||||
NativeHistogramBucketFactor: 1.1,
|
||||
}, []string{"mode", "resource"}),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *DualWriterMetrics) recordDataSyncerDuration(isError bool, mode DualWriterMode, resource string, startFrom time.Time) {
|
||||
duration := time.Since(startFrom).Seconds()
|
||||
m.syncer.WithLabelValues(strconv.FormatBool(isError), fmt.Sprintf("%d", mode), resource).Observe(duration)
|
||||
}
|
||||
|
||||
func (m *DualWriterMetrics) recordDataSyncerOutcome(mode DualWriterMode, resource string, synced bool) {
|
||||
var observeValue float64
|
||||
if !synced {
|
||||
observeValue = 1
|
||||
}
|
||||
m.syncerOutcome.WithLabelValues(fmt.Sprintf("%d", mode), resource).Observe(observeValue)
|
||||
}
|
||||
4
pkg/server/wire_gen.go
generated
4
pkg/server/wire_gen.go
generated
@@ -835,7 +835,7 @@ func Initialize(ctx context.Context, cfg *setting.Cfg, opts Options, apiOpts api
|
||||
builderMetrics := builder.ProvideBuilderMetrics(registerer)
|
||||
backend := auditing.ProvideNoopBackend()
|
||||
policyRuleProvider := auditing.ProvideNoopPolicyRuleProvider()
|
||||
apiserverService, err := apiserver.ProvideService(cfg, featureToggles, routeRegisterImpl, tracingService, serverLockService, sqlStore, kvStore, middlewareHandler, scopedPluginDatasourceProvider, plugincontextProvider, pluginstoreService, dualwriteService, resourceClient, inlineSecureValueSupport, eventualRestConfigProvider, v, eventualRestConfigProvider, registerer, aggregatorRunner, v2, builderMetrics, backend, policyRuleProvider)
|
||||
apiserverService, err := apiserver.ProvideService(cfg, featureToggles, routeRegisterImpl, tracingService, sqlStore, middlewareHandler, scopedPluginDatasourceProvider, plugincontextProvider, pluginstoreService, dualwriteService, resourceClient, inlineSecureValueSupport, eventualRestConfigProvider, v, eventualRestConfigProvider, registerer, aggregatorRunner, v2, builderMetrics, backend, policyRuleProvider)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -1503,7 +1503,7 @@ func InitializeForTest(ctx context.Context, t sqlutil.ITestDB, testingT interfac
|
||||
builderMetrics := builder.ProvideBuilderMetrics(registerer)
|
||||
backend := auditing.ProvideNoopBackend()
|
||||
policyRuleProvider := auditing.ProvideNoopPolicyRuleProvider()
|
||||
apiserverService, err := apiserver.ProvideService(cfg, featureToggles, routeRegisterImpl, tracingService, serverLockService, sqlStore, kvStore, middlewareHandler, scopedPluginDatasourceProvider, plugincontextProvider, pluginstoreService, dualwriteService, resourceClient, inlineSecureValueSupport, eventualRestConfigProvider, v, eventualRestConfigProvider, registerer, aggregatorRunner, v2, builderMetrics, backend, policyRuleProvider)
|
||||
apiserverService, err := apiserver.ProvideService(cfg, featureToggles, routeRegisterImpl, tracingService, sqlStore, middlewareHandler, scopedPluginDatasourceProvider, plugincontextProvider, pluginstoreService, dualwriteService, resourceClient, inlineSecureValueSupport, eventualRestConfigProvider, v, eventualRestConfigProvider, registerer, aggregatorRunner, v2, builderMetrics, backend, policyRuleProvider)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -67,7 +67,6 @@ kubernetesPlaylists = true
|
||||
|
||||
[unified_storage.playlists.playlist.grafana.app]
|
||||
dualWriterMode = 2
|
||||
dualWriterPeriodicDataSyncJobEnabled = true
|
||||
```
|
||||
|
||||
This will create a development kubeconfig and start a parallel ssl listener. It can be registered by
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"maps"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
@@ -21,13 +20,10 @@ import (
|
||||
"github.com/grafana/grafana-app-sdk/logging"
|
||||
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
|
||||
"github.com/grafana/grafana/pkg/services/apiserver/builder"
|
||||
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request"
|
||||
grafanaapiserveroptions "github.com/grafana/grafana/pkg/services/apiserver/options"
|
||||
"github.com/grafana/grafana/pkg/storage/legacysql/dualwrite"
|
||||
)
|
||||
|
||||
type LegacyStorageGetterFunc func(schema.GroupVersionResource) grafanarest.Storage
|
||||
|
||||
type LegacyStorageProvider interface {
|
||||
GetLegacyStorage(schema.GroupVersionResource) grafanarest.Storage
|
||||
}
|
||||
@@ -47,11 +43,6 @@ type AppInstallerConfig struct {
|
||||
AllowedV0Alpha1Resources []string
|
||||
}
|
||||
|
||||
// serverLock interface defines a lock mechanism for executing actions with a timeout
|
||||
type serverLock interface {
|
||||
LockExecuteAndRelease(ctx context.Context, actionName string, maxInterval time.Duration, fn func(ctx context.Context)) error
|
||||
}
|
||||
|
||||
// AddToScheme adds app installer schemas to the runtime scheme
|
||||
func AddToScheme(
|
||||
appInstallers []appsdkapiserver.AppInstaller,
|
||||
@@ -139,11 +130,7 @@ func InstallAPIs(
|
||||
server *genericapiserver.GenericAPIServer,
|
||||
restOpsGetter generic.RESTOptionsGetter,
|
||||
storageOpts *grafanaapiserveroptions.StorageOptions,
|
||||
kvStore grafanarest.NamespacedKVStore,
|
||||
lock serverLock,
|
||||
namespaceMapper request.NamespaceMapper,
|
||||
dualWriteService dualwrite.Service,
|
||||
dualWriterMetrics *grafanarest.DualWriterMetrics,
|
||||
builderMetrics *builder.BuilderMetrics,
|
||||
apiResourceConfig *serverstore.ResourceConfig,
|
||||
) error {
|
||||
@@ -156,11 +143,7 @@ func InstallAPIs(
|
||||
installer: installer,
|
||||
storageOpts: storageOpts,
|
||||
restOptionsGetter: restOpsGetter,
|
||||
kvStore: kvStore,
|
||||
lock: lock,
|
||||
namespaceMapper: namespaceMapper,
|
||||
dualWriteService: dualWriteService,
|
||||
dualWriterMetrics: dualWriterMetrics,
|
||||
builderMetrics: builderMetrics,
|
||||
apiResourceConfig: apiResourceConfig,
|
||||
}
|
||||
|
||||
@@ -18,7 +18,6 @@ import (
|
||||
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
|
||||
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
|
||||
"github.com/grafana/grafana/pkg/services/apiserver/builder"
|
||||
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request"
|
||||
grafanaapiserveroptions "github.com/grafana/grafana/pkg/services/apiserver/options"
|
||||
"github.com/grafana/grafana/pkg/storage/legacysql/dualwrite"
|
||||
)
|
||||
@@ -31,11 +30,7 @@ type serverWrapper struct {
|
||||
installer appsdkapiserver.AppInstaller
|
||||
restOptionsGetter generic.RESTOptionsGetter
|
||||
storageOpts *grafanaapiserveroptions.StorageOptions
|
||||
kvStore grafanarest.NamespacedKVStore
|
||||
lock serverLock
|
||||
namespaceMapper request.NamespaceMapper
|
||||
dualWriteService dualwrite.Service
|
||||
dualWriterMetrics *grafanarest.DualWriterMetrics
|
||||
builderMetrics *builder.BuilderMetrics
|
||||
apiResourceConfig *serverstorage.ResourceConfig
|
||||
}
|
||||
@@ -64,16 +59,11 @@ func (s *serverWrapper) InstallAPIGroup(apiGroupInfo *genericapiserver.APIGroupI
|
||||
if unifiedStorage, ok := storage.(grafanarest.Storage); ok {
|
||||
log.Debug("Configuring dual writer for storage", "resource", gr.String(), "version", v, "storagePath", storagePath)
|
||||
storage, err = NewDualWriter(
|
||||
s.ctx,
|
||||
gr,
|
||||
s.storageOpts,
|
||||
legacyProvider.GetLegacyStorage(gr.WithVersion(v)),
|
||||
unifiedStorage,
|
||||
s.kvStore,
|
||||
s.lock,
|
||||
s.namespaceMapper,
|
||||
s.dualWriteService,
|
||||
s.dualWriterMetrics,
|
||||
s.builderMetrics,
|
||||
)
|
||||
if err != nil {
|
||||
|
||||
@@ -1,32 +1,21 @@
|
||||
package appinstaller
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
k8srequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
|
||||
"github.com/grafana/grafana/pkg/services/apiserver/builder"
|
||||
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request"
|
||||
"github.com/grafana/grafana/pkg/services/apiserver/options"
|
||||
"github.com/grafana/grafana/pkg/storage/legacysql/dualwrite"
|
||||
)
|
||||
|
||||
// NewDualWriter creates a dual writer for the given group resource using the provided configuration
|
||||
func NewDualWriter(
|
||||
_ context.Context,
|
||||
gr schema.GroupResource,
|
||||
storageOpts *options.StorageOptions,
|
||||
legacy grafanarest.Storage,
|
||||
storage grafanarest.Storage,
|
||||
kvStore grafanarest.NamespacedKVStore,
|
||||
lock serverLock,
|
||||
namespaceMapper request.NamespaceMapper,
|
||||
dualWriteService dualwrite.Service,
|
||||
dualWriterMetrics *grafanarest.DualWriterMetrics,
|
||||
builderMetrics *builder.BuilderMetrics,
|
||||
) (grafanarest.Storage, error) {
|
||||
// Dashboards + Folders may be managed (depends on feature toggles and database state)
|
||||
@@ -40,54 +29,14 @@ func NewDualWriter(
|
||||
// when missing this will default to mode zero (legacy only)
|
||||
var mode = grafanarest.DualWriterMode(0)
|
||||
|
||||
var (
|
||||
dualWriterPeriodicDataSyncJobEnabled bool
|
||||
dualWriterMigrationDataSyncDisabled bool
|
||||
dataSyncerInterval = time.Hour
|
||||
dataSyncerRecordsLimit = 1000
|
||||
)
|
||||
|
||||
resourceConfig, resourceExists := storageOpts.UnifiedStorageConfig[key]
|
||||
if resourceExists {
|
||||
mode = resourceConfig.DualWriterMode
|
||||
dualWriterPeriodicDataSyncJobEnabled = resourceConfig.DualWriterPeriodicDataSyncJobEnabled
|
||||
dualWriterMigrationDataSyncDisabled = resourceConfig.DualWriterMigrationDataSyncDisabled
|
||||
dataSyncerInterval = resourceConfig.DataSyncerInterval
|
||||
dataSyncerRecordsLimit = resourceConfig.DataSyncerRecordsLimit
|
||||
}
|
||||
|
||||
// Force using storage only -- regardless of internal synchronization state
|
||||
if mode == grafanarest.Mode5 {
|
||||
builderMetrics.RecordDualWriterModes(gr.Resource, gr.Group, mode, grafanarest.Mode5)
|
||||
return storage, nil
|
||||
}
|
||||
builderMetrics.RecordDualWriterModes(gr.Resource, gr.Group, mode)
|
||||
|
||||
// Moving from one version to the next can only happen after the previous step has
|
||||
// successfully synchronized.
|
||||
requestInfo := getRequestInfo(gr, namespaceMapper)
|
||||
|
||||
syncerCfg := &grafanarest.SyncerConfig{
|
||||
Kind: key,
|
||||
RequestInfo: requestInfo,
|
||||
Mode: mode,
|
||||
SkipDataSync: dualWriterMigrationDataSyncDisabled,
|
||||
LegacyStorage: legacy,
|
||||
Storage: storage,
|
||||
ServerLockService: lock,
|
||||
DataSyncerInterval: dataSyncerInterval,
|
||||
DataSyncerRecordsLimit: dataSyncerRecordsLimit,
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
// This also sets the currentMode on the syncer config.
|
||||
currentMode, err := grafanarest.SetDualWritingMode(ctx, kvStore, syncerCfg, dualWriterMetrics)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
builderMetrics.RecordDualWriterModes(gr.Resource, gr.Group, mode, currentMode)
|
||||
|
||||
switch currentMode {
|
||||
switch mode {
|
||||
case grafanarest.Mode0:
|
||||
return legacy, nil
|
||||
case grafanarest.Mode4, grafanarest.Mode5:
|
||||
@@ -95,26 +44,5 @@ func NewDualWriter(
|
||||
default:
|
||||
}
|
||||
|
||||
if dualWriterPeriodicDataSyncJobEnabled {
|
||||
// The mode might have changed in SetDualWritingMode, so apply current mode first.
|
||||
syncerCfg.Mode = currentMode
|
||||
if err := grafanarest.StartPeriodicDataSyncer(ctx, syncerCfg, dualWriterMetrics); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// when unable to use
|
||||
if currentMode != mode {
|
||||
klog.Warningf("Requested DualWrite mode: %d, but using %d for %+v", mode, currentMode, gr)
|
||||
}
|
||||
return dualwrite.NewStaticStorage(gr, currentMode, legacy, storage)
|
||||
}
|
||||
|
||||
func getRequestInfo(gr schema.GroupResource, namespaceMapper request.NamespaceMapper) *k8srequest.RequestInfo {
|
||||
return &k8srequest.RequestInfo{
|
||||
APIGroup: gr.Group,
|
||||
Resource: gr.Resource,
|
||||
Name: "",
|
||||
Namespace: namespaceMapper(int64(1)),
|
||||
}
|
||||
return dualwrite.NewStaticStorage(gr, mode, legacy, storage)
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package builder
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/csv"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
@@ -10,7 +9,6 @@ import (
|
||||
"os"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@@ -19,7 +17,6 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
|
||||
k8srequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/registry/generic"
|
||||
genericapiserver "k8s.io/apiserver/pkg/server"
|
||||
serverstorage "k8s.io/apiserver/pkg/server/storage"
|
||||
@@ -32,7 +29,6 @@ import (
|
||||
"github.com/grafana/grafana/pkg/apiserver/auditing"
|
||||
"github.com/grafana/grafana/pkg/apiserver/endpoints/filters"
|
||||
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
|
||||
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request"
|
||||
"github.com/grafana/grafana/pkg/services/apiserver/options"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/storage/legacysql/dualwrite"
|
||||
@@ -253,19 +249,6 @@ func SetupConfig(
|
||||
return nil
|
||||
}
|
||||
|
||||
type ServerLockService interface {
|
||||
LockExecuteAndRelease(ctx context.Context, actionName string, maxInterval time.Duration, fn func(ctx context.Context)) error
|
||||
}
|
||||
|
||||
func getRequestInfo(gr schema.GroupResource, namespaceMapper request.NamespaceMapper) *k8srequest.RequestInfo {
|
||||
return &k8srequest.RequestInfo{
|
||||
APIGroup: gr.Group,
|
||||
Resource: gr.Resource,
|
||||
Name: "",
|
||||
Namespace: namespaceMapper(int64(1)),
|
||||
}
|
||||
}
|
||||
|
||||
func InstallAPIs(
|
||||
scheme *runtime.Scheme,
|
||||
codecs serializer.CodecFactory,
|
||||
@@ -274,13 +257,9 @@ func InstallAPIs(
|
||||
builders []APIGroupBuilder,
|
||||
storageOpts *options.StorageOptions,
|
||||
reg prometheus.Registerer,
|
||||
namespaceMapper request.NamespaceMapper,
|
||||
kvStore grafanarest.NamespacedKVStore,
|
||||
serverLock ServerLockService,
|
||||
dualWriteService dualwrite.Service,
|
||||
optsregister apistore.StorageOptionsRegister,
|
||||
features featuremgmt.FeatureToggles,
|
||||
dualWriterMetrics *grafanarest.DualWriterMetrics,
|
||||
builderMetrics *BuilderMetrics,
|
||||
apiResourceConfig *serverstorage.ResourceConfig,
|
||||
) error {
|
||||
@@ -303,79 +282,20 @@ func InstallAPIs(
|
||||
// when missing this will default to mode zero (legacy only)
|
||||
var mode = grafanarest.DualWriterMode(0)
|
||||
|
||||
var (
|
||||
err error
|
||||
dualWriterPeriodicDataSyncJobEnabled bool
|
||||
dualWriterMigrationDataSyncDisabled bool
|
||||
dataSyncerInterval = time.Hour
|
||||
dataSyncerRecordsLimit = 1000
|
||||
)
|
||||
|
||||
resourceConfig, resourceExists := storageOpts.UnifiedStorageConfig[key]
|
||||
if resourceExists {
|
||||
mode = resourceConfig.DualWriterMode
|
||||
dualWriterPeriodicDataSyncJobEnabled = resourceConfig.DualWriterPeriodicDataSyncJobEnabled
|
||||
dualWriterMigrationDataSyncDisabled = resourceConfig.DualWriterMigrationDataSyncDisabled
|
||||
dataSyncerInterval = resourceConfig.DataSyncerInterval
|
||||
dataSyncerRecordsLimit = resourceConfig.DataSyncerRecordsLimit
|
||||
}
|
||||
|
||||
// Force using storage only -- regardless of internal synchronization state
|
||||
if mode == grafanarest.Mode5 {
|
||||
builderMetrics.RecordDualWriterModes(gr.Resource, gr.Group, mode, grafanarest.Mode5)
|
||||
return storage, nil
|
||||
}
|
||||
builderMetrics.RecordDualWriterModes(gr.Resource, gr.Group, mode)
|
||||
|
||||
currentMode := mode
|
||||
if !dualWriterMigrationDataSyncDisabled || dualWriterPeriodicDataSyncJobEnabled {
|
||||
// TODO: inherited context from main Grafana process
|
||||
ctx := context.Background()
|
||||
|
||||
// Moving from one version to the next can only happen after the previous step has
|
||||
// successfully synchronized.
|
||||
requestInfo := getRequestInfo(gr, namespaceMapper)
|
||||
|
||||
syncerCfg := &grafanarest.SyncerConfig{
|
||||
Kind: key,
|
||||
RequestInfo: requestInfo,
|
||||
Mode: mode,
|
||||
SkipDataSync: dualWriterMigrationDataSyncDisabled,
|
||||
LegacyStorage: legacy,
|
||||
Storage: storage,
|
||||
ServerLockService: serverLock,
|
||||
DataSyncerInterval: dataSyncerInterval,
|
||||
DataSyncerRecordsLimit: dataSyncerRecordsLimit,
|
||||
}
|
||||
|
||||
// This also sets the currentMode on the syncer config.
|
||||
currentMode, err = grafanarest.SetDualWritingMode(ctx, kvStore, syncerCfg, dualWriterMetrics)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// when unable to use
|
||||
if currentMode != mode {
|
||||
klog.Warningf("Requested DualWrite mode: %d, but using %d for %+v", mode, currentMode, gr)
|
||||
}
|
||||
|
||||
if dualWriterPeriodicDataSyncJobEnabled && (currentMode >= grafanarest.Mode1 && currentMode <= grafanarest.Mode3) {
|
||||
// The mode might have changed in SetDualWritingMode, so apply current mode first.
|
||||
syncerCfg.Mode = currentMode
|
||||
if err := grafanarest.StartPeriodicDataSyncer(ctx, syncerCfg, dualWriterMetrics); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
builderMetrics.RecordDualWriterModes(gr.Resource, gr.Group, mode, currentMode)
|
||||
|
||||
switch currentMode {
|
||||
switch mode {
|
||||
case grafanarest.Mode0:
|
||||
return legacy, nil
|
||||
case grafanarest.Mode4, grafanarest.Mode5:
|
||||
return storage, nil
|
||||
default:
|
||||
return dualwrite.NewStaticStorage(gr, currentMode, legacy, storage)
|
||||
return dualwrite.NewStaticStorage(gr, mode, legacy, storage)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,11 +24,7 @@ func ProvideBuilderMetrics(reg prometheus.Registerer) *BuilderMetrics {
|
||||
}
|
||||
}
|
||||
|
||||
func (m *BuilderMetrics) RecordDualWriterModes(resource, group string, targetMode, currentMode grafanarest.DualWriterMode) {
|
||||
m.dualWriterTargetMode.WithLabelValues(resource, group).Set(float64(targetMode))
|
||||
m.dualWriterCurrentMode.WithLabelValues(resource, group).Set(float64(currentMode))
|
||||
}
|
||||
|
||||
func ProvideDualWriterMetrics(reg prometheus.Registerer) *grafanarest.DualWriterMetrics {
|
||||
return grafanarest.NewDualWriterMetrics(reg)
|
||||
func (m *BuilderMetrics) RecordDualWriterModes(resource, group string, mode grafanarest.DualWriterMode) {
|
||||
m.dualWriterTargetMode.WithLabelValues(resource, group).Set(float64(mode))
|
||||
m.dualWriterCurrentMode.WithLabelValues(resource, group).Set(float64(mode))
|
||||
}
|
||||
|
||||
@@ -135,8 +135,7 @@ func (v *unifiedStorageConfigValue) Set(val string) error {
|
||||
}
|
||||
|
||||
(*v.config)[key] = setting.UnifiedStorageConfig{
|
||||
DualWriterMode: apiserverrest.DualWriterMode(mode),
|
||||
DualWriterMigrationDataSyncDisabled: true,
|
||||
DualWriterMode: apiserverrest.DualWriterMode(mode),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -30,11 +30,8 @@ import (
|
||||
"github.com/grafana/grafana/pkg/apimachinery/identity"
|
||||
"github.com/grafana/grafana/pkg/apiserver/auditing"
|
||||
grafanaresponsewriter "github.com/grafana/grafana/pkg/apiserver/endpoints/responsewriter"
|
||||
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
|
||||
"github.com/grafana/grafana/pkg/infra/db"
|
||||
"github.com/grafana/grafana/pkg/infra/kvstore"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/serverlock"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/middleware"
|
||||
"github.com/grafana/grafana/pkg/modules"
|
||||
@@ -47,7 +44,6 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/apiserver/auth/authenticator"
|
||||
"github.com/grafana/grafana/pkg/services/apiserver/auth/authorizer"
|
||||
"github.com/grafana/grafana/pkg/services/apiserver/builder"
|
||||
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request"
|
||||
grafanaapiserveroptions "github.com/grafana/grafana/pkg/services/apiserver/options"
|
||||
"github.com/grafana/grafana/pkg/services/apiserver/utils"
|
||||
contextmodel "github.com/grafana/grafana/pkg/services/contexthandler/model"
|
||||
@@ -97,10 +93,8 @@ type service struct {
|
||||
tracing *tracing.TracingService
|
||||
metrics prometheus.Registerer
|
||||
|
||||
authorizer *authorizer.GrafanaAuthorizer
|
||||
serverLockService builder.ServerLockService
|
||||
dualWriter dualwrite.Service
|
||||
kvStore kvstore.KVStore
|
||||
authorizer *authorizer.GrafanaAuthorizer
|
||||
dualWriter dualwrite.Service
|
||||
|
||||
pluginClient plugins.Client
|
||||
datasources datasource.ScopedPluginDatasourceProvider
|
||||
@@ -114,7 +108,6 @@ type service struct {
|
||||
aggregatorRunner aggregatorrunner.AggregatorRunner
|
||||
appInstallers []appsdkapiserver.AppInstaller
|
||||
builderMetrics *builder.BuilderMetrics
|
||||
dualWriterMetrics *grafanarest.DualWriterMetrics
|
||||
|
||||
auditBackend audit.Backend
|
||||
auditPolicyRuleProvider auditing.PolicyRuleProvider
|
||||
@@ -125,9 +118,7 @@ func ProvideService(
|
||||
features featuremgmt.FeatureToggles,
|
||||
rr routing.RouteRegister,
|
||||
tracing *tracing.TracingService,
|
||||
serverLockService *serverlock.ServerLockService,
|
||||
db db.DB,
|
||||
kvStore kvstore.KVStore,
|
||||
pluginClient plugins.Client,
|
||||
datasources datasource.ScopedPluginDatasourceProvider,
|
||||
contextProvider datasource.PluginContextWrapper,
|
||||
@@ -159,12 +150,10 @@ func ProvideService(
|
||||
tracing: tracing,
|
||||
db: db, // For Unified storage
|
||||
metrics: reg,
|
||||
kvStore: kvStore,
|
||||
pluginClient: pluginClient,
|
||||
datasources: datasources,
|
||||
contextProvider: contextProvider,
|
||||
pluginStore: pluginStore,
|
||||
serverLockService: serverLockService,
|
||||
dualWriter: dualWriter,
|
||||
unified: unified,
|
||||
secrets: secrets,
|
||||
@@ -173,7 +162,6 @@ func ProvideService(
|
||||
aggregatorRunner: aggregatorRunner,
|
||||
appInstallers: appInstallers,
|
||||
builderMetrics: builderMetrics,
|
||||
dualWriterMetrics: grafanarest.NewDualWriterMetrics(reg),
|
||||
auditBackend: auditBackend,
|
||||
auditPolicyRuleProvider: auditPolicyRuleProvider,
|
||||
}
|
||||
@@ -412,13 +400,9 @@ func (s *service) start(ctx context.Context) error {
|
||||
builders,
|
||||
o.StorageOptions,
|
||||
s.metrics,
|
||||
request.GetNamespaceMapper(s.cfg),
|
||||
kvstore.WithNamespace(s.kvStore, 0, "storage.dualwriting"),
|
||||
s.serverLockService,
|
||||
s.dualWriter,
|
||||
optsregister,
|
||||
s.features,
|
||||
s.dualWriterMetrics,
|
||||
s.builderMetrics,
|
||||
apiResourceConfig,
|
||||
)
|
||||
@@ -432,11 +416,7 @@ func (s *service) start(ctx context.Context) error {
|
||||
server,
|
||||
serverConfig.RESTOptionsGetter,
|
||||
o.StorageOptions,
|
||||
kvstore.WithNamespace(s.kvStore, 0, "storage.dualwriting"),
|
||||
s.serverLockService,
|
||||
request.GetNamespaceMapper(s.cfg),
|
||||
s.dualWriter,
|
||||
s.dualWriterMetrics,
|
||||
s.builderMetrics,
|
||||
serverConfig.MergedResourceConfig,
|
||||
); err != nil {
|
||||
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
)
|
||||
|
||||
var WireSet = wire.NewSet(
|
||||
builder.ProvideDualWriterMetrics,
|
||||
builder.ProvideBuilderMetrics,
|
||||
ProvideEventualRestConfigProvider,
|
||||
wire.Bind(new(RestConfigProvider), new(*eventualRestConfigProvider)),
|
||||
|
||||
@@ -629,13 +629,7 @@ type Cfg struct {
|
||||
}
|
||||
|
||||
type UnifiedStorageConfig struct {
|
||||
DualWriterMode rest.DualWriterMode
|
||||
DualWriterPeriodicDataSyncJobEnabled bool
|
||||
DualWriterMigrationDataSyncDisabled bool
|
||||
// DataSyncerInterval defines how often the data syncer should run for a resource on the grafana instance.
|
||||
DataSyncerInterval time.Duration
|
||||
// DataSyncerRecordsLimit defines how many records will be processed at max during a sync invocation.
|
||||
DataSyncerRecordsLimit int
|
||||
DualWriterMode rest.DualWriterMode
|
||||
// EnableMigration indicates whether migration is enabled for the resource.
|
||||
// If not set, will use the default from MigratedUnifiedResources.
|
||||
EnableMigration bool
|
||||
|
||||
@@ -52,18 +52,6 @@ func (cfg *Cfg) setUnifiedStorageConfig() {
|
||||
// parse dualWriter modes from the section
|
||||
dualWriterMode := section.Key("dualWriterMode").MustInt(0)
|
||||
|
||||
// parse dualWriter periodic data syncer config
|
||||
dualWriterPeriodicDataSyncJobEnabled := section.Key("dualWriterPeriodicDataSyncJobEnabled").MustBool(false)
|
||||
|
||||
// parse dualWriter migration data sync disabled from resource section
|
||||
dualWriterMigrationDataSyncDisabled := section.Key("dualWriterMigrationDataSyncDisabled").MustBool(false)
|
||||
|
||||
// parse dataSyncerRecordsLimit from resource section
|
||||
dataSyncerRecordsLimit := section.Key("dataSyncerRecordsLimit").MustInt(1000)
|
||||
|
||||
// parse dataSyncerInterval from resource section
|
||||
dataSyncerInterval := section.Key("dataSyncerInterval").MustDuration(time.Hour)
|
||||
|
||||
// parse EnableMigration from resource section
|
||||
enableMigration := MigratedUnifiedResources[resourceName]
|
||||
if section.HasKey("enableMigration") {
|
||||
@@ -78,13 +66,9 @@ func (cfg *Cfg) setUnifiedStorageConfig() {
|
||||
}
|
||||
|
||||
storageConfig[resourceName] = UnifiedStorageConfig{
|
||||
DualWriterMode: rest.DualWriterMode(dualWriterMode),
|
||||
DualWriterPeriodicDataSyncJobEnabled: dualWriterPeriodicDataSyncJobEnabled,
|
||||
DualWriterMigrationDataSyncDisabled: dualWriterMigrationDataSyncDisabled,
|
||||
DataSyncerRecordsLimit: dataSyncerRecordsLimit,
|
||||
DataSyncerInterval: dataSyncerInterval,
|
||||
EnableMigration: enableMigration,
|
||||
AutoMigrationThreshold: autoMigrationThreshold,
|
||||
DualWriterMode: rest.DualWriterMode(dualWriterMode),
|
||||
EnableMigration: enableMigration,
|
||||
AutoMigrationThreshold: autoMigrationThreshold,
|
||||
}
|
||||
}
|
||||
cfg.UnifiedStorage = storageConfig
|
||||
@@ -163,10 +147,9 @@ func (cfg *Cfg) enforceMigrationToUnifiedConfigs() {
|
||||
}
|
||||
cfg.Logger.Info("Enforcing mode 5 for resource in unified storage", "resource", resource)
|
||||
cfg.UnifiedStorage[resource] = UnifiedStorageConfig{
|
||||
DualWriterMode: 5,
|
||||
DualWriterMigrationDataSyncDisabled: true,
|
||||
EnableMigration: true,
|
||||
AutoMigrationThreshold: resourceCfg.AutoMigrationThreshold,
|
||||
DualWriterMode: 5,
|
||||
EnableMigration: true,
|
||||
AutoMigrationThreshold: resourceCfg.AutoMigrationThreshold,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -204,7 +187,6 @@ func (cfg *Cfg) EnableMode5(resource string) {
|
||||
}
|
||||
config := cfg.UnifiedStorage[resource]
|
||||
config.DualWriterMode = rest.Mode5
|
||||
config.DualWriterMigrationDataSyncDisabled = true
|
||||
config.EnableMigration = true
|
||||
cfg.UnifiedStorage[resource] = config
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@ package setting
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/apiserver/rest"
|
||||
"github.com/stretchr/testify/assert"
|
||||
@@ -49,10 +48,9 @@ func TestCfg_setUnifiedStorageConfig(t *testing.T) {
|
||||
}
|
||||
|
||||
assert.Equal(t, UnifiedStorageConfig{
|
||||
DualWriterMode: 5,
|
||||
DualWriterMigrationDataSyncDisabled: true,
|
||||
EnableMigration: isEnabled,
|
||||
AutoMigrationThreshold: expectedThreshold,
|
||||
DualWriterMode: 5,
|
||||
EnableMigration: isEnabled,
|
||||
AutoMigrationThreshold: expectedThreshold,
|
||||
}, resourceCfg, migratedResource)
|
||||
}
|
||||
}
|
||||
@@ -60,9 +58,6 @@ func TestCfg_setUnifiedStorageConfig(t *testing.T) {
|
||||
setMigratedResourceKey("dualWriterMode", "1") // migrated resources enabled by default will change to 5 in setUnifiedStorageConfig
|
||||
|
||||
setSectionKey("unified_storage.resource.not_migrated.grafana.app", "dualWriterMode", "2")
|
||||
setSectionKey("unified_storage.resource.not_migrated.grafana.app", "dualWriterPeriodicDataSyncJobEnabled", "true")
|
||||
setSectionKey("unified_storage.resource.not_migrated.grafana.app", "dataSyncerRecordsLimit", "1001")
|
||||
setSectionKey("unified_storage.resource.not_migrated.grafana.app", "dataSyncerInterval", "10m")
|
||||
|
||||
// Add unified_storage section for index settings
|
||||
setSectionKey("unified_storage", "index_min_count", "5")
|
||||
@@ -73,11 +68,8 @@ func TestCfg_setUnifiedStorageConfig(t *testing.T) {
|
||||
|
||||
assert.Equal(t, exists, true)
|
||||
assert.Equal(t, value, UnifiedStorageConfig{
|
||||
DualWriterMode: 2,
|
||||
DualWriterPeriodicDataSyncJobEnabled: true,
|
||||
DataSyncerRecordsLimit: 1001,
|
||||
DataSyncerInterval: time.Minute * 10,
|
||||
AutoMigrationThreshold: 0,
|
||||
DualWriterMode: 2,
|
||||
AutoMigrationThreshold: 0,
|
||||
})
|
||||
|
||||
validateMigratedResources(false)
|
||||
|
||||
@@ -212,7 +212,6 @@ func TestResourceMigration_AutoMigrateEnablesMode5(t *testing.T) {
|
||||
if tt.wantMode5Enabled {
|
||||
require.Equal(t, 5, int(config.DualWriterMode), "%s: %s", tt.description, resourceName)
|
||||
require.True(t, config.EnableMigration, "%s: EnableMigration should be true for %s", tt.description, resourceName)
|
||||
require.True(t, config.DualWriterMigrationDataSyncDisabled, "%s: DualWriterMigrationDataSyncDisabled should be true for %s", tt.description, resourceName)
|
||||
} else {
|
||||
require.Equal(t, 0, int(config.DualWriterMode), "%s: mode should be 0 for %s", tt.description, resourceName)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user