Compare commits

...

4 Commits

Author SHA1 Message Date
Rafael Paulovic
37a1a45194 Merge remote-tracking branch 'origin/main' into rm-dualwriter-datasyncer 2026-01-13 13:35:12 +01:00
Rafael Paulovic
8cb33642ba Merge remote-tracking branch 'origin/main' into rm-dualwriter-datasyncer 2026-01-13 11:25:42 +01:00
Rafael Paulovic
283ad15e84 Merge remote-tracking branch 'origin/main' into rm-dualwriter-datasyncer 2026-01-13 10:56:27 +01:00
Rafael Paulovic
070dc2288e chore(unified): remove DualWriter data syncer
- migrations are now handled using unified storage migrator framework
2026-01-12 18:09:29 +01:00
20 changed files with 27 additions and 1126 deletions

View File

@@ -7,7 +7,6 @@ require (
github.com/grafana/authlib/types v0.0.0-20251119142549-be091cf2f4d4
github.com/grafana/grafana-app-sdk/logging v0.48.7
github.com/grafana/grafana/pkg/apimachinery v0.0.0-20250514132646-acbc7b54ed9e
github.com/prometheus/client_golang v1.23.2
github.com/stretchr/testify v1.11.1
go.opentelemetry.io/contrib/propagators/jaeger v1.38.0
go.opentelemetry.io/otel v1.39.0
@@ -69,6 +68,7 @@ require (
github.com/onsi/gomega v1.36.2 // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.23.2 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.67.4 // indirect
github.com/prometheus/procfs v0.19.2 // indirect

View File

@@ -1,15 +1,9 @@
package rest
import (
"context"
"errors"
"fmt"
"time"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/klog/v2"
"github.com/grafana/grafana/pkg/apimachinery/utils"
)
@@ -66,103 +60,6 @@ const (
Mode5
)
type NamespacedKVStore interface {
Get(ctx context.Context, key string) (string, bool, error)
Set(ctx context.Context, key, value string) error
}
type ServerLockService interface {
LockExecuteAndRelease(ctx context.Context, actionName string, maxInterval time.Duration, fn func(ctx context.Context)) error
}
func SetDualWritingMode(
ctx context.Context,
kvs NamespacedKVStore,
cfg *SyncerConfig,
metrics *DualWriterMetrics,
) (DualWriterMode, error) {
if cfg == nil {
return Mode0, errors.New("syncer config is nil")
}
// Mode0 means no DualWriter
if cfg.Mode == Mode0 {
return Mode0, nil
}
toMode := map[string]DualWriterMode{
// It is not possible to initialize a mode 0 dual writer. Mode 0 represents
// writing to legacy storage without Unified Storage enabled.
"1": Mode1,
"2": Mode2,
"3": Mode3,
"4": Mode4,
"5": Mode5,
}
errDualWriterSetCurrentMode := errors.New("failed to set current dual writing mode")
// Use entity name as key
kvMode, ok, err := kvs.Get(ctx, cfg.Kind)
if err != nil {
return Mode0, errors.New("failed to fetch current dual writing mode")
}
currentMode, exists := toMode[kvMode]
// If the mode does not exist in our mapping, we log an error.
if !exists && ok {
// Only log if "ok" because initially all instances will have mode unset for playlists.
klog.Infof("invalid dual writing mode for %s mode: %v", cfg.Kind, kvMode)
}
// If the mode does not exist in our mapping, and we also didn't find an entry for this kind, fallback.
if !exists || !ok {
// Default to mode 1
currentMode = Mode1
if err := kvs.Set(ctx, cfg.Kind, fmt.Sprint(currentMode)); err != nil {
return Mode0, errDualWriterSetCurrentMode
}
}
isUpgradeToReadUnifiedMode := currentMode < Mode3 && cfg.Mode >= Mode3
if !isUpgradeToReadUnifiedMode {
if err := kvs.Set(ctx, cfg.Kind, fmt.Sprint(cfg.Mode)); err != nil {
return Mode0, errDualWriterSetCurrentMode
}
return cfg.Mode, nil
}
// If SkipDataSync is enabled, we can set the mode directly without running the syncer.
if cfg.SkipDataSync {
if err := kvs.Set(ctx, cfg.Kind, fmt.Sprint(cfg.Mode)); err != nil {
return Mode0, errDualWriterSetCurrentMode
}
return cfg.Mode, nil
}
// Transitioning to Mode3 or higher from Mode0, Mode1, or Mode2.
// We need to run the syncer in the current mode before we can upgrade to Mode3 or higher.
cfgModeTmp := cfg.Mode
// Before running the sync, set the syncer config to the current mode, as we have to run the syncer
// once in the current active mode before we can upgrade.
cfg.Mode = currentMode
syncOk, err := runDataSyncer(ctx, cfg, metrics)
// Once we are done with running the syncer, we can change the mode back on the config to the desired one.
cfg.Mode = cfgModeTmp
if err != nil {
klog.Error("data syncer failed for mode:", kvMode, "err", err)
return currentMode, nil
}
if !syncOk {
klog.Info("data syncer not ok for mode:", kvMode)
return currentMode, nil
}
// If sync is successful, update the mode to the desired one.
if err := kvs.Set(ctx, cfg.Kind, fmt.Sprint(cfg.Mode)); err != nil {
return Mode0, errDualWriterSetCurrentMode
}
return cfg.Mode, nil
}
// Compare asserts on the equality of objects returned from both stores (object storage and legacy storage)
func Compare(objA, objB runtime.Object) bool {
if objA == nil || objB == nil {

View File

@@ -1,328 +0,0 @@
package rest
import (
"context"
"fmt"
"math/rand"
"time"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/klog/v2"
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/apimachinery/utils"
)
type syncItem struct {
name string
objStorage runtime.Object
objLegacy runtime.Object
accessorStorage utils.GrafanaMetaAccessor
accessorLegacy utils.GrafanaMetaAccessor
}
type SyncerConfig struct {
Kind string
RequestInfo *request.RequestInfo
Mode DualWriterMode
LegacyStorage Storage
Storage Storage
ServerLockService ServerLockService
SkipDataSync bool
DataSyncerInterval time.Duration
DataSyncerRecordsLimit int
}
func (s *SyncerConfig) Validate() error {
if s == nil {
return fmt.Errorf("syncer config is nil")
}
if s.Kind == "" {
return fmt.Errorf("kind must be specified")
}
if s.RequestInfo == nil {
return fmt.Errorf("requestInfo must be specified")
}
if s.ServerLockService == nil {
return fmt.Errorf("serverLockService must be specified")
}
if s.Storage == nil {
return fmt.Errorf("storage must be specified")
}
if s.LegacyStorage == nil {
return fmt.Errorf("legacy storage must be specified")
}
if s.DataSyncerInterval == 0 {
s.DataSyncerInterval = time.Hour
}
if s.DataSyncerRecordsLimit == 0 {
s.DataSyncerRecordsLimit = 1000
}
return nil
}
// StartPeriodicDataSyncer starts a background job that will execute the DataSyncer, syncing the data
// from the hosted grafana backend into the unified storage backend. This is run in the grafana instance.
func StartPeriodicDataSyncer(ctx context.Context, cfg *SyncerConfig, metrics *DualWriterMetrics) error {
if err := cfg.Validate(); err != nil {
return fmt.Errorf("invalid syncer config: %w", err)
}
log := klog.NewKlogr().WithName("legacyToUnifiedStorageDataSyncer").WithValues("mode", cfg.Mode, "resource", cfg.Kind)
log.Info("Starting periodic data syncer")
// run in background
go func() {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
timeWindow := 600 // 600 seconds (10 minutes)
jitterSeconds := r.Int63n(int64(timeWindow))
log.Info("data syncer scheduled", "starting time", time.Now().Add(time.Second*time.Duration(jitterSeconds)))
time.Sleep(time.Second * time.Duration(jitterSeconds))
// run it immediately
syncOK, err := runDataSyncer(ctx, cfg, metrics)
log.Info("data syncer finished", "syncOK", syncOK, "error", err)
ticker := time.NewTicker(cfg.DataSyncerInterval)
for {
select {
case <-ticker.C:
syncOK, err = runDataSyncer(ctx, cfg, metrics)
log.Info("data syncer finished", "syncOK", syncOK, ", error", err)
case <-ctx.Done():
return
}
}
}()
return nil
}
// runDataSyncer will ensure that data between legacy storage and unified storage are in sync.
// The sync implementation depends on the DualWriter mode
func runDataSyncer(ctx context.Context, cfg *SyncerConfig, metrics *DualWriterMetrics) (bool, error) {
if err := cfg.Validate(); err != nil {
return false, fmt.Errorf("invalid syncer config: %w", err)
}
// ensure that execution takes no longer than necessary
timeout := cfg.DataSyncerInterval - time.Minute
ctx, cancelFn := context.WithTimeout(ctx, timeout)
defer cancelFn()
// implementation depends on the current DualWriter mode
switch cfg.Mode {
case Mode1, Mode2:
return legacyToUnifiedStorageDataSyncer(ctx, cfg, metrics)
default:
klog.Info("data syncer not implemented for mode:", cfg.Mode)
return false, nil
}
}
func legacyToUnifiedStorageDataSyncer(ctx context.Context, cfg *SyncerConfig, metrics *DualWriterMetrics) (bool, error) {
if err := cfg.Validate(); err != nil {
return false, fmt.Errorf("invalid syncer config: %w", err)
}
log := klog.NewKlogr().WithName("legacyToUnifiedStorageDataSyncer").WithValues("mode", cfg.Mode, "resource", cfg.Kind)
everythingSynced := false
outOfSync := 0
syncSuccess := 0
syncErr := 0
maxInterval := cfg.DataSyncerInterval + 5*time.Minute
var errSync error
// LockExecuteAndRelease ensures that just a single Grafana server acquires a lock at a time
// The parameter 'maxInterval' is a timeout safeguard, if the LastExecution in the
// database is older than maxInterval, we will assume the lock as timeouted. The 'maxInterval' parameter should be so long
// that is impossible for 2 processes to run at the same time.
err := cfg.ServerLockService.LockExecuteAndRelease(ctx, fmt.Sprintf("legacyToUnifiedStorageDataSyncer-%d-%s", cfg.Mode, cfg.Kind), maxInterval, func(context.Context) {
log.Info("starting legacyToUnifiedStorageDataSyncer")
startSync := time.Now()
ctx = klog.NewContext(ctx, log)
ctx, _ = identity.WithServiceIdentity(ctx, 0)
ctx = request.WithNamespace(ctx, cfg.RequestInfo.Namespace)
ctx = request.WithRequestInfo(ctx, cfg.RequestInfo)
storageList, err := getList(ctx, cfg.Storage, &metainternalversion.ListOptions{
Limit: int64(cfg.DataSyncerRecordsLimit),
})
if err != nil {
log.Error(err, "unable to extract list from storage")
return
}
if len(storageList) >= cfg.DataSyncerRecordsLimit {
errSync = fmt.Errorf("unified storage has more than %d records. Aborting sync", cfg.DataSyncerRecordsLimit)
log.Error(errSync, "Unified storage has more records to be synced than allowed")
return
}
log.Info("got items from unified storage", "items", len(storageList))
legacyList, err := getList(ctx, cfg.LegacyStorage, &metainternalversion.ListOptions{
Limit: int64(cfg.DataSyncerRecordsLimit),
})
if err != nil {
log.Error(err, "unable to extract list from legacy storage")
return
}
log.Info("got items from legacy storage", "items", len(legacyList))
itemsByName := map[string]syncItem{}
for _, obj := range legacyList {
accessor, err := utils.MetaAccessor(obj)
if err != nil {
log.Error(err, "error retrieving accessor data for object from legacy storage")
continue
}
name := accessor.GetName()
item := itemsByName[name]
item.name = name
item.objLegacy = obj
item.accessorLegacy = accessor
itemsByName[name] = item
}
for _, obj := range storageList {
accessor, err := utils.MetaAccessor(obj)
if err != nil {
log.Error(err, "error retrieving accessor data for object from storage")
continue
}
name := accessor.GetName()
item := itemsByName[name]
item.name = name
item.objStorage = obj
item.accessorStorage = accessor
itemsByName[name] = item
}
log.Info("got list of items to be synced", "items", len(itemsByName))
for name, item := range itemsByName {
// upsert if:
// - existing in both legacy and storage, but objects are different, or
// - if it's missing from storage
if item.objLegacy != nil &&
(item.objStorage == nil || !Compare(item.objLegacy, item.objStorage)) {
outOfSync++
if item.objStorage != nil {
item.accessorLegacy.SetResourceVersion(item.accessorStorage.GetResourceVersion())
item.accessorLegacy.SetUID(item.accessorStorage.GetUID())
log.Info("updating item on unified storage", "name", name)
} else {
item.accessorLegacy.SetResourceVersion("")
item.accessorLegacy.SetUID("")
log.Info("inserting item on unified storage", "name", name)
}
objInfo := rest.DefaultUpdatedObjectInfo(item.objLegacy, []rest.TransformFunc{}...)
res, _, err := cfg.Storage.Update(ctx,
name,
objInfo,
func(ctx context.Context, obj runtime.Object) error { return nil },
func(ctx context.Context, obj, old runtime.Object) error { return nil },
true, // force creation
&metav1.UpdateOptions{},
)
if err != nil {
log.WithValues("object", res).Error(err, "could not update in storage")
syncErr++
} else {
syncSuccess++
}
}
// delete if object does not exists on legacy but exists on storage
if item.objLegacy == nil && item.objStorage != nil {
outOfSync++
ctx = request.WithRequestInfo(ctx, &request.RequestInfo{
APIGroup: cfg.RequestInfo.APIGroup,
Resource: cfg.RequestInfo.Resource,
Name: name,
Namespace: cfg.RequestInfo.Namespace,
})
log.Info("deleting item from unified storage", "name", name)
deletedS, _, err := cfg.Storage.Delete(ctx, name, func(ctx context.Context, obj runtime.Object) error { return nil }, &metav1.DeleteOptions{})
if err != nil && !apierrors.IsNotFound(err) {
log.WithValues("objectList", deletedS).Error(err, "could not delete from storage")
syncErr++
continue
}
syncSuccess++
}
}
everythingSynced = outOfSync == syncSuccess
metrics.recordDataSyncerOutcome(cfg.Mode, cfg.Kind, everythingSynced)
metrics.recordDataSyncerDuration(err != nil, cfg.Mode, cfg.Kind, startSync)
log.Info("finished syncing items", "items", len(itemsByName), "updated", syncSuccess, "failed", syncErr, "outcome", everythingSynced)
})
if errSync != nil {
err = errSync
}
return everythingSynced, err
}
func getList(ctx context.Context, obj rest.Lister, listOptions *metainternalversion.ListOptions) ([]runtime.Object, error) {
var allItems []runtime.Object
for {
if int64(len(allItems)) >= listOptions.Limit {
return nil, fmt.Errorf("list has more than %d records. Aborting sync", listOptions.Limit)
}
ll, err := obj.List(ctx, listOptions)
if err != nil {
return nil, err
}
items, err := meta.ExtractList(ll)
if err != nil {
return nil, err
}
allItems = append(allItems, items...)
// Get continue token from the list metadata.
listMeta, err := meta.ListAccessor(ll)
if err != nil {
return nil, err
}
// If no continue token, we're done paginating.
if listMeta.GetContinue() == "" {
break
}
// Set continue token for next page.
listOptions.Continue = listMeta.GetContinue()
}
return allItems, nil
}

View File

@@ -1,248 +0,0 @@
package rest
import (
"context"
"errors"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/apis/example"
"k8s.io/apiserver/pkg/endpoints/request"
)
var legacyObj1 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo1", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
var legacyObj2 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo2", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
var legacyObj3 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo3", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
var legacyObj4 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo4", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
var legacyObj2WithHostname = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo2", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{Hostname: "hostname"}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
var storageObj1 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo1", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
var storageObj2 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo2", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
var storageObj3 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo3", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
var storageObj4 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo4", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
var legacyListWith3items = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{},
Items: []example.Pod{
*legacyObj1,
*legacyObj2,
*legacyObj3,
}}
var legacyListWith4items = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{},
Items: []example.Pod{
*legacyObj1,
*legacyObj2,
*legacyObj3,
*legacyObj4,
}}
var legacyListWith3itemsObj2IsDifferent = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{},
Items: []example.Pod{
*legacyObj1,
*legacyObj2WithHostname,
*legacyObj3,
}}
var storageListWith3items = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{},
Items: []example.Pod{
*storageObj1,
*storageObj2,
*storageObj3,
}}
var storageListWith4items = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{},
Items: []example.Pod{
*storageObj1,
*storageObj2,
*storageObj3,
*storageObj4,
}}
var storageListWith3itemsMissingFoo2 = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{},
Items: []example.Pod{
*storageObj1,
*storageObj3,
*storageObj4,
}}
func TestLegacyToUnifiedStorage_DataSyncer(t *testing.T) {
type testCase struct {
setupLegacyFn func(m *MockStorage)
setupStorageFn func(m *MockStorage)
name string
expectedOutcome bool
wantErr bool
}
tests :=
[]testCase{
{
name: "both stores are in sync",
setupLegacyFn: func(m *MockStorage) {
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil)
},
setupStorageFn: func(m *MockStorage) {
m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil)
},
expectedOutcome: true,
},
{
name: "both stores are in sync - fail to list from legacy",
setupLegacyFn: func(m *MockStorage) {
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, errors.New("error"))
},
setupStorageFn: func(m *MockStorage) {
m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil)
},
expectedOutcome: false,
},
{
name: "both stores are in sync - fail to list from storage",
setupLegacyFn: func(m *MockStorage) {
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil).Maybe()
},
setupStorageFn: func(m *MockStorage) {
m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, errors.New("error"))
},
expectedOutcome: false,
},
{
name: "storage is missing 1 entry (foo4)",
setupLegacyFn: func(m *MockStorage) {
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith4items, nil)
},
setupStorageFn: func(m *MockStorage) {
m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil)
m.On("Update", mock.Anything, "foo4", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil)
},
expectedOutcome: true,
},
{
name: "storage needs to be update (foo2 is different)",
setupLegacyFn: func(m *MockStorage) {
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3itemsObj2IsDifferent, nil)
},
setupStorageFn: func(m *MockStorage) {
m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil)
m.On("Update", mock.Anything, "foo2", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil)
},
expectedOutcome: true,
},
{
name: "storage is missing 1 entry (foo4) - fail to upsert",
setupLegacyFn: func(m *MockStorage) {
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith4items, nil)
},
setupStorageFn: func(m *MockStorage) {
m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil)
m.On("Update", mock.Anything, "foo4", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, errors.New("error"))
},
expectedOutcome: false,
},
{
name: "storage has an extra 1 entry (foo4)",
setupLegacyFn: func(m *MockStorage) {
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil)
},
setupStorageFn: func(m *MockStorage) {
m.On("List", mock.Anything, mock.Anything).Return(storageListWith4items, nil)
m.On("Delete", mock.Anything, "foo4", mock.Anything, mock.Anything).Return(exampleObj, false, nil)
},
expectedOutcome: true,
},
{
name: "storage has an extra 1 entry (foo4) - fail to delete",
setupLegacyFn: func(m *MockStorage) {
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil)
},
setupStorageFn: func(m *MockStorage) {
m.On("List", mock.Anything, mock.Anything).Return(storageListWith4items, nil)
m.On("Delete", mock.Anything, "foo4", mock.Anything, mock.Anything).Return(exampleObj, false, errors.New("error"))
},
expectedOutcome: false,
},
{
name: "storage is missing 1 entry (foo3) and has an extra 1 entry (foo4)",
setupLegacyFn: func(m *MockStorage) {
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil)
},
setupStorageFn: func(m *MockStorage) {
m.On("List", mock.Anything, mock.Anything).Return(storageListWith3itemsMissingFoo2, nil)
m.On("Update", mock.Anything, "foo2", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil)
m.On("Delete", mock.Anything, "foo4", mock.Anything, mock.Anything).Return(exampleObj, false, nil)
},
expectedOutcome: true,
},
}
// mode 1
for _, tt := range tests {
t.Run("Mode-1-"+tt.name, func(t *testing.T) {
ls := NewMockStorage(t)
us := NewMockStorage(t)
if tt.setupLegacyFn != nil {
tt.setupLegacyFn(ls)
}
if tt.setupStorageFn != nil {
tt.setupStorageFn(us)
}
outcome, err := legacyToUnifiedStorageDataSyncer(context.Background(), &SyncerConfig{
Mode: Mode1,
LegacyStorage: ls,
Storage: us,
Kind: "test.kind",
ServerLockService: &fakeServerLock{},
RequestInfo: &request.RequestInfo{},
DataSyncerRecordsLimit: 1000,
DataSyncerInterval: time.Hour,
}, NewDualWriterMetrics(nil))
if tt.wantErr {
assert.Error(t, err)
return
}
assert.NoError(t, err)
assert.Equal(t, tt.expectedOutcome, outcome)
})
}
// mode 2
for _, tt := range tests {
t.Run("Mode-2-"+tt.name, func(t *testing.T) {
ls := NewMockStorage(t)
us := NewMockStorage(t)
if tt.setupLegacyFn != nil {
tt.setupLegacyFn(ls)
}
if tt.setupStorageFn != nil {
tt.setupStorageFn(us)
}
outcome, err := legacyToUnifiedStorageDataSyncer(context.Background(), &SyncerConfig{
Mode: Mode2,
LegacyStorage: ls,
Storage: us,
Kind: "test.kind",
ServerLockService: &fakeServerLock{},
RequestInfo: &request.RequestInfo{},
DataSyncerRecordsLimit: 1000,
DataSyncerInterval: time.Hour,
}, NewDualWriterMetrics(nil))
if tt.wantErr {
assert.Error(t, err)
return
}
assert.NoError(t, err)
assert.Equal(t, tt.expectedOutcome, outcome)
})
}
}

View File

@@ -1,124 +1,18 @@
package rest
import (
"context"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/apis/example"
"k8s.io/apiserver/pkg/endpoints/request"
)
var now = time.Now()
var exampleObj = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1", CreationTimestamp: metav1.Time{}, GenerateName: "foo"}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: now}}}
var anotherObj = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "bar", ResourceVersion: "2", GenerateName: "foo"}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: now}}}
var exampleList = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{}, Items: []example.Pod{*exampleObj}}
var anotherList = &example.PodList{Items: []example.Pod{*anotherObj}}
func TestSetDualWritingMode(t *testing.T) {
type testCase struct {
name string
kvStore *fakeNamespacedKV
desiredMode DualWriterMode
expectedMode DualWriterMode
expectedKVMode string
skipDataSync bool
serverLockError error
}
tests :=
[]testCase{
{
name: "should return a mode 2 dual writer when mode 2 is set as the desired mode",
kvStore: &fakeNamespacedKV{data: map[string]string{"playlist.grafana.app/playlists": "2"}, namespace: "storage.dualwriting"},
desiredMode: Mode2,
expectedMode: Mode2,
expectedKVMode: "2",
},
{
name: "should return a mode 1 dual writer when mode 1 is set as the desired mode",
kvStore: &fakeNamespacedKV{data: map[string]string{"playlist.grafana.app/playlists": "2"}, namespace: "storage.dualwriting"},
desiredMode: Mode1,
expectedMode: Mode1,
expectedKVMode: "1",
},
{
name: "should return mode 3 as desired mode when current mode is > 3",
kvStore: &fakeNamespacedKV{data: map[string]string{"playlist.grafana.app/playlists": "5"}, namespace: "storage.dualwriting"},
desiredMode: Mode3,
expectedMode: Mode3,
expectedKVMode: "3",
},
{
name: "should return mode 3 as desired mode when current mode is 2",
kvStore: &fakeNamespacedKV{data: map[string]string{"playlist.grafana.app/playlists": "2"}, namespace: "storage.dualwriting"},
desiredMode: Mode3,
expectedMode: Mode3,
expectedKVMode: "3",
},
{
name: "should default to mode 0 if there is no desired mode",
kvStore: &fakeNamespacedKV{data: map[string]string{}, namespace: "storage.dualwriting"},
desiredMode: Mode0,
expectedMode: Mode0,
expectedKVMode: "",
},
{
name: "should keep mode2 when trying to go from mode2 to mode3 and the server lock service returns an error",
kvStore: &fakeNamespacedKV{data: map[string]string{"playlist.grafana.app/playlists": "2"}, namespace: "storage.dualwriting"},
desiredMode: Mode3,
expectedMode: Mode2,
expectedKVMode: "2",
serverLockError: fmt.Errorf("lock already exists"),
},
{
name: "should keep mode2 when trying to go from mode2 to mode3 and migration is disabled",
kvStore: &fakeNamespacedKV{data: map[string]string{"playlist.grafana.app/playlists": "2"}, namespace: "storage.dualwriting"},
desiredMode: Mode3,
expectedMode: Mode3,
expectedKVMode: "3",
skipDataSync: true,
},
}
for _, tt := range tests {
us := NewMockStorage(t)
us.On("List", mock.Anything, mock.Anything).Return(anotherList, nil).Maybe()
us.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil).Maybe()
us.On("Delete", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil).Maybe()
ls := NewMockStorage(t)
ls.On("List", mock.Anything, mock.Anything).Return(exampleList, nil).Maybe()
serverLockSvc := &fakeServerLock{
err: tt.serverLockError,
}
dwMode, err := SetDualWritingMode(context.Background(), tt.kvStore, &SyncerConfig{
LegacyStorage: ls,
Storage: us,
Kind: "playlist.grafana.app/playlists",
Mode: tt.desiredMode,
SkipDataSync: tt.skipDataSync,
ServerLockService: serverLockSvc,
RequestInfo: &request.RequestInfo{},
DataSyncerRecordsLimit: 1000,
DataSyncerInterval: time.Hour,
}, NewDualWriterMetrics(nil))
require.NoError(t, err)
require.Equal(t, tt.expectedMode, dwMode)
kvMode, _, err := tt.kvStore.Get(context.Background(), "playlist.grafana.app/playlists")
require.NoError(t, err)
require.Equal(t, tt.expectedKVMode, kvMode, "expected mode for playlist.grafana.app/playlists")
}
}
func TestCompare(t *testing.T) {
var exampleObjGen1 = &example.Pod{ObjectMeta: metav1.ObjectMeta{Generation: 1}, Spec: example.PodSpec{Hostname: "one"}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Unix(0, 0)}}}
@@ -163,30 +57,3 @@ func TestCompare(t *testing.T) {
})
}
}
type fakeNamespacedKV struct {
namespace string
data map[string]string
}
func (f *fakeNamespacedKV) Get(ctx context.Context, key string) (string, bool, error) {
val, ok := f.data[key]
return val, ok, nil
}
func (f *fakeNamespacedKV) Set(ctx context.Context, key, value string) error {
f.data[key] = value
return nil
}
type fakeServerLock struct {
err error
}
func (f *fakeServerLock) LockExecuteAndRelease(ctx context.Context, actionName string, duration time.Duration, fn func(ctx context.Context)) error {
if f.err != nil {
return f.err
}
fn(ctx)
return nil
}

View File

@@ -1,48 +0,0 @@
package rest
import (
"fmt"
"strconv"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
type DualWriterMetrics struct {
// DualWriterSyncerDuration is a metric summary for dual writer sync duration per mode
syncer *prometheus.HistogramVec
// DualWriterDataSyncerOutcome is a metric summary for dual writer data syncer outcome comparison between the 2 stores per mode
syncerOutcome *prometheus.HistogramVec
}
func NewDualWriterMetrics(reg prometheus.Registerer) *DualWriterMetrics {
return &DualWriterMetrics{
syncer: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "dual_writer_data_syncer_duration_seconds",
Help: "Histogram for the runtime of dual writer data syncer duration per mode",
Namespace: "grafana",
NativeHistogramBucketFactor: 1.1,
}, []string{"is_error", "mode", "resource"}),
syncerOutcome: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "dual_writer_data_syncer_outcome",
Help: "Histogram for the runtime of dual writer data syncer outcome comparison between the 2 stores per mode",
Namespace: "grafana",
NativeHistogramBucketFactor: 1.1,
}, []string{"mode", "resource"}),
}
}
func (m *DualWriterMetrics) recordDataSyncerDuration(isError bool, mode DualWriterMode, resource string, startFrom time.Time) {
duration := time.Since(startFrom).Seconds()
m.syncer.WithLabelValues(strconv.FormatBool(isError), fmt.Sprintf("%d", mode), resource).Observe(duration)
}
func (m *DualWriterMetrics) recordDataSyncerOutcome(mode DualWriterMode, resource string, synced bool) {
var observeValue float64
if !synced {
observeValue = 1
}
m.syncerOutcome.WithLabelValues(fmt.Sprintf("%d", mode), resource).Observe(observeValue)
}

View File

@@ -835,7 +835,7 @@ func Initialize(ctx context.Context, cfg *setting.Cfg, opts Options, apiOpts api
builderMetrics := builder.ProvideBuilderMetrics(registerer)
backend := auditing.ProvideNoopBackend()
policyRuleProvider := auditing.ProvideNoopPolicyRuleProvider()
apiserverService, err := apiserver.ProvideService(cfg, featureToggles, routeRegisterImpl, tracingService, serverLockService, sqlStore, kvStore, middlewareHandler, scopedPluginDatasourceProvider, plugincontextProvider, pluginstoreService, dualwriteService, resourceClient, inlineSecureValueSupport, eventualRestConfigProvider, v, eventualRestConfigProvider, registerer, aggregatorRunner, v2, builderMetrics, backend, policyRuleProvider)
apiserverService, err := apiserver.ProvideService(cfg, featureToggles, routeRegisterImpl, tracingService, sqlStore, middlewareHandler, scopedPluginDatasourceProvider, plugincontextProvider, pluginstoreService, dualwriteService, resourceClient, inlineSecureValueSupport, eventualRestConfigProvider, v, eventualRestConfigProvider, registerer, aggregatorRunner, v2, builderMetrics, backend, policyRuleProvider)
if err != nil {
return nil, err
}
@@ -1503,7 +1503,7 @@ func InitializeForTest(ctx context.Context, t sqlutil.ITestDB, testingT interfac
builderMetrics := builder.ProvideBuilderMetrics(registerer)
backend := auditing.ProvideNoopBackend()
policyRuleProvider := auditing.ProvideNoopPolicyRuleProvider()
apiserverService, err := apiserver.ProvideService(cfg, featureToggles, routeRegisterImpl, tracingService, serverLockService, sqlStore, kvStore, middlewareHandler, scopedPluginDatasourceProvider, plugincontextProvider, pluginstoreService, dualwriteService, resourceClient, inlineSecureValueSupport, eventualRestConfigProvider, v, eventualRestConfigProvider, registerer, aggregatorRunner, v2, builderMetrics, backend, policyRuleProvider)
apiserverService, err := apiserver.ProvideService(cfg, featureToggles, routeRegisterImpl, tracingService, sqlStore, middlewareHandler, scopedPluginDatasourceProvider, plugincontextProvider, pluginstoreService, dualwriteService, resourceClient, inlineSecureValueSupport, eventualRestConfigProvider, v, eventualRestConfigProvider, registerer, aggregatorRunner, v2, builderMetrics, backend, policyRuleProvider)
if err != nil {
return nil, err
}

View File

@@ -67,7 +67,6 @@ kubernetesPlaylists = true
[unified_storage.playlists.playlist.grafana.app]
dualWriterMode = 2
dualWriterPeriodicDataSyncJobEnabled = true
```
This will create a development kubeconfig and start a parallel ssl listener. It can be registered by

View File

@@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"maps"
"time"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -21,13 +20,10 @@ import (
"github.com/grafana/grafana-app-sdk/logging"
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
"github.com/grafana/grafana/pkg/services/apiserver/builder"
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request"
grafanaapiserveroptions "github.com/grafana/grafana/pkg/services/apiserver/options"
"github.com/grafana/grafana/pkg/storage/legacysql/dualwrite"
)
type LegacyStorageGetterFunc func(schema.GroupVersionResource) grafanarest.Storage
type LegacyStorageProvider interface {
GetLegacyStorage(schema.GroupVersionResource) grafanarest.Storage
}
@@ -47,11 +43,6 @@ type AppInstallerConfig struct {
AllowedV0Alpha1Resources []string
}
// serverLock interface defines a lock mechanism for executing actions with a timeout
type serverLock interface {
LockExecuteAndRelease(ctx context.Context, actionName string, maxInterval time.Duration, fn func(ctx context.Context)) error
}
// AddToScheme adds app installer schemas to the runtime scheme
func AddToScheme(
appInstallers []appsdkapiserver.AppInstaller,
@@ -139,11 +130,7 @@ func InstallAPIs(
server *genericapiserver.GenericAPIServer,
restOpsGetter generic.RESTOptionsGetter,
storageOpts *grafanaapiserveroptions.StorageOptions,
kvStore grafanarest.NamespacedKVStore,
lock serverLock,
namespaceMapper request.NamespaceMapper,
dualWriteService dualwrite.Service,
dualWriterMetrics *grafanarest.DualWriterMetrics,
builderMetrics *builder.BuilderMetrics,
apiResourceConfig *serverstore.ResourceConfig,
) error {
@@ -156,11 +143,7 @@ func InstallAPIs(
installer: installer,
storageOpts: storageOpts,
restOptionsGetter: restOpsGetter,
kvStore: kvStore,
lock: lock,
namespaceMapper: namespaceMapper,
dualWriteService: dualWriteService,
dualWriterMetrics: dualWriterMetrics,
builderMetrics: builderMetrics,
apiResourceConfig: apiResourceConfig,
}

View File

@@ -18,7 +18,6 @@ import (
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
"github.com/grafana/grafana/pkg/services/apiserver/builder"
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request"
grafanaapiserveroptions "github.com/grafana/grafana/pkg/services/apiserver/options"
"github.com/grafana/grafana/pkg/storage/legacysql/dualwrite"
)
@@ -31,11 +30,7 @@ type serverWrapper struct {
installer appsdkapiserver.AppInstaller
restOptionsGetter generic.RESTOptionsGetter
storageOpts *grafanaapiserveroptions.StorageOptions
kvStore grafanarest.NamespacedKVStore
lock serverLock
namespaceMapper request.NamespaceMapper
dualWriteService dualwrite.Service
dualWriterMetrics *grafanarest.DualWriterMetrics
builderMetrics *builder.BuilderMetrics
apiResourceConfig *serverstorage.ResourceConfig
}
@@ -64,16 +59,11 @@ func (s *serverWrapper) InstallAPIGroup(apiGroupInfo *genericapiserver.APIGroupI
if unifiedStorage, ok := storage.(grafanarest.Storage); ok {
log.Debug("Configuring dual writer for storage", "resource", gr.String(), "version", v, "storagePath", storagePath)
storage, err = NewDualWriter(
s.ctx,
gr,
s.storageOpts,
legacyProvider.GetLegacyStorage(gr.WithVersion(v)),
unifiedStorage,
s.kvStore,
s.lock,
s.namespaceMapper,
s.dualWriteService,
s.dualWriterMetrics,
s.builderMetrics,
)
if err != nil {

View File

@@ -1,32 +1,21 @@
package appinstaller
import (
"context"
"time"
"k8s.io/apimachinery/pkg/runtime/schema"
k8srequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/klog/v2"
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
"github.com/grafana/grafana/pkg/services/apiserver/builder"
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request"
"github.com/grafana/grafana/pkg/services/apiserver/options"
"github.com/grafana/grafana/pkg/storage/legacysql/dualwrite"
)
// NewDualWriter creates a dual writer for the given group resource using the provided configuration
func NewDualWriter(
_ context.Context,
gr schema.GroupResource,
storageOpts *options.StorageOptions,
legacy grafanarest.Storage,
storage grafanarest.Storage,
kvStore grafanarest.NamespacedKVStore,
lock serverLock,
namespaceMapper request.NamespaceMapper,
dualWriteService dualwrite.Service,
dualWriterMetrics *grafanarest.DualWriterMetrics,
builderMetrics *builder.BuilderMetrics,
) (grafanarest.Storage, error) {
// Dashboards + Folders may be managed (depends on feature toggles and database state)
@@ -40,54 +29,14 @@ func NewDualWriter(
// when missing this will default to mode zero (legacy only)
var mode = grafanarest.DualWriterMode(0)
var (
dualWriterPeriodicDataSyncJobEnabled bool
dualWriterMigrationDataSyncDisabled bool
dataSyncerInterval = time.Hour
dataSyncerRecordsLimit = 1000
)
resourceConfig, resourceExists := storageOpts.UnifiedStorageConfig[key]
if resourceExists {
mode = resourceConfig.DualWriterMode
dualWriterPeriodicDataSyncJobEnabled = resourceConfig.DualWriterPeriodicDataSyncJobEnabled
dualWriterMigrationDataSyncDisabled = resourceConfig.DualWriterMigrationDataSyncDisabled
dataSyncerInterval = resourceConfig.DataSyncerInterval
dataSyncerRecordsLimit = resourceConfig.DataSyncerRecordsLimit
}
// Force using storage only -- regardless of internal synchronization state
if mode == grafanarest.Mode5 {
builderMetrics.RecordDualWriterModes(gr.Resource, gr.Group, mode, grafanarest.Mode5)
return storage, nil
}
builderMetrics.RecordDualWriterModes(gr.Resource, gr.Group, mode)
// Moving from one version to the next can only happen after the previous step has
// successfully synchronized.
requestInfo := getRequestInfo(gr, namespaceMapper)
syncerCfg := &grafanarest.SyncerConfig{
Kind: key,
RequestInfo: requestInfo,
Mode: mode,
SkipDataSync: dualWriterMigrationDataSyncDisabled,
LegacyStorage: legacy,
Storage: storage,
ServerLockService: lock,
DataSyncerInterval: dataSyncerInterval,
DataSyncerRecordsLimit: dataSyncerRecordsLimit,
}
ctx := context.Background()
// This also sets the currentMode on the syncer config.
currentMode, err := grafanarest.SetDualWritingMode(ctx, kvStore, syncerCfg, dualWriterMetrics)
if err != nil {
return nil, err
}
builderMetrics.RecordDualWriterModes(gr.Resource, gr.Group, mode, currentMode)
switch currentMode {
switch mode {
case grafanarest.Mode0:
return legacy, nil
case grafanarest.Mode4, grafanarest.Mode5:
@@ -95,26 +44,5 @@ func NewDualWriter(
default:
}
if dualWriterPeriodicDataSyncJobEnabled {
// The mode might have changed in SetDualWritingMode, so apply current mode first.
syncerCfg.Mode = currentMode
if err := grafanarest.StartPeriodicDataSyncer(ctx, syncerCfg, dualWriterMetrics); err != nil {
return nil, err
}
}
// when unable to use
if currentMode != mode {
klog.Warningf("Requested DualWrite mode: %d, but using %d for %+v", mode, currentMode, gr)
}
return dualwrite.NewStaticStorage(gr, currentMode, legacy, storage)
}
func getRequestInfo(gr schema.GroupResource, namespaceMapper request.NamespaceMapper) *k8srequest.RequestInfo {
return &k8srequest.RequestInfo{
APIGroup: gr.Group,
Resource: gr.Resource,
Name: "",
Namespace: namespaceMapper(int64(1)),
}
return dualwrite.NewStaticStorage(gr, mode, legacy, storage)
}

View File

@@ -1,7 +1,6 @@
package builder
import (
"context"
"encoding/csv"
"encoding/json"
"errors"
@@ -10,7 +9,6 @@ import (
"os"
"regexp"
"strings"
"time"
"github.com/prometheus/client_golang/prometheus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -19,7 +17,6 @@ import (
"k8s.io/apimachinery/pkg/runtime/serializer"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
k8srequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/generic"
genericapiserver "k8s.io/apiserver/pkg/server"
serverstorage "k8s.io/apiserver/pkg/server/storage"
@@ -32,7 +29,6 @@ import (
"github.com/grafana/grafana/pkg/apiserver/auditing"
"github.com/grafana/grafana/pkg/apiserver/endpoints/filters"
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request"
"github.com/grafana/grafana/pkg/services/apiserver/options"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/storage/legacysql/dualwrite"
@@ -253,19 +249,6 @@ func SetupConfig(
return nil
}
type ServerLockService interface {
LockExecuteAndRelease(ctx context.Context, actionName string, maxInterval time.Duration, fn func(ctx context.Context)) error
}
func getRequestInfo(gr schema.GroupResource, namespaceMapper request.NamespaceMapper) *k8srequest.RequestInfo {
return &k8srequest.RequestInfo{
APIGroup: gr.Group,
Resource: gr.Resource,
Name: "",
Namespace: namespaceMapper(int64(1)),
}
}
func InstallAPIs(
scheme *runtime.Scheme,
codecs serializer.CodecFactory,
@@ -274,13 +257,9 @@ func InstallAPIs(
builders []APIGroupBuilder,
storageOpts *options.StorageOptions,
reg prometheus.Registerer,
namespaceMapper request.NamespaceMapper,
kvStore grafanarest.NamespacedKVStore,
serverLock ServerLockService,
dualWriteService dualwrite.Service,
optsregister apistore.StorageOptionsRegister,
features featuremgmt.FeatureToggles,
dualWriterMetrics *grafanarest.DualWriterMetrics,
builderMetrics *BuilderMetrics,
apiResourceConfig *serverstorage.ResourceConfig,
) error {
@@ -303,79 +282,20 @@ func InstallAPIs(
// when missing this will default to mode zero (legacy only)
var mode = grafanarest.DualWriterMode(0)
var (
err error
dualWriterPeriodicDataSyncJobEnabled bool
dualWriterMigrationDataSyncDisabled bool
dataSyncerInterval = time.Hour
dataSyncerRecordsLimit = 1000
)
resourceConfig, resourceExists := storageOpts.UnifiedStorageConfig[key]
if resourceExists {
mode = resourceConfig.DualWriterMode
dualWriterPeriodicDataSyncJobEnabled = resourceConfig.DualWriterPeriodicDataSyncJobEnabled
dualWriterMigrationDataSyncDisabled = resourceConfig.DualWriterMigrationDataSyncDisabled
dataSyncerInterval = resourceConfig.DataSyncerInterval
dataSyncerRecordsLimit = resourceConfig.DataSyncerRecordsLimit
}
// Force using storage only -- regardless of internal synchronization state
if mode == grafanarest.Mode5 {
builderMetrics.RecordDualWriterModes(gr.Resource, gr.Group, mode, grafanarest.Mode5)
return storage, nil
}
builderMetrics.RecordDualWriterModes(gr.Resource, gr.Group, mode)
currentMode := mode
if !dualWriterMigrationDataSyncDisabled || dualWriterPeriodicDataSyncJobEnabled {
// TODO: inherited context from main Grafana process
ctx := context.Background()
// Moving from one version to the next can only happen after the previous step has
// successfully synchronized.
requestInfo := getRequestInfo(gr, namespaceMapper)
syncerCfg := &grafanarest.SyncerConfig{
Kind: key,
RequestInfo: requestInfo,
Mode: mode,
SkipDataSync: dualWriterMigrationDataSyncDisabled,
LegacyStorage: legacy,
Storage: storage,
ServerLockService: serverLock,
DataSyncerInterval: dataSyncerInterval,
DataSyncerRecordsLimit: dataSyncerRecordsLimit,
}
// This also sets the currentMode on the syncer config.
currentMode, err = grafanarest.SetDualWritingMode(ctx, kvStore, syncerCfg, dualWriterMetrics)
if err != nil {
return nil, err
}
// when unable to use
if currentMode != mode {
klog.Warningf("Requested DualWrite mode: %d, but using %d for %+v", mode, currentMode, gr)
}
if dualWriterPeriodicDataSyncJobEnabled && (currentMode >= grafanarest.Mode1 && currentMode <= grafanarest.Mode3) {
// The mode might have changed in SetDualWritingMode, so apply current mode first.
syncerCfg.Mode = currentMode
if err := grafanarest.StartPeriodicDataSyncer(ctx, syncerCfg, dualWriterMetrics); err != nil {
return nil, err
}
}
}
builderMetrics.RecordDualWriterModes(gr.Resource, gr.Group, mode, currentMode)
switch currentMode {
switch mode {
case grafanarest.Mode0:
return legacy, nil
case grafanarest.Mode4, grafanarest.Mode5:
return storage, nil
default:
return dualwrite.NewStaticStorage(gr, currentMode, legacy, storage)
return dualwrite.NewStaticStorage(gr, mode, legacy, storage)
}
}
}

View File

@@ -24,11 +24,7 @@ func ProvideBuilderMetrics(reg prometheus.Registerer) *BuilderMetrics {
}
}
func (m *BuilderMetrics) RecordDualWriterModes(resource, group string, targetMode, currentMode grafanarest.DualWriterMode) {
m.dualWriterTargetMode.WithLabelValues(resource, group).Set(float64(targetMode))
m.dualWriterCurrentMode.WithLabelValues(resource, group).Set(float64(currentMode))
}
func ProvideDualWriterMetrics(reg prometheus.Registerer) *grafanarest.DualWriterMetrics {
return grafanarest.NewDualWriterMetrics(reg)
func (m *BuilderMetrics) RecordDualWriterModes(resource, group string, mode grafanarest.DualWriterMode) {
m.dualWriterTargetMode.WithLabelValues(resource, group).Set(float64(mode))
m.dualWriterCurrentMode.WithLabelValues(resource, group).Set(float64(mode))
}

View File

@@ -135,8 +135,7 @@ func (v *unifiedStorageConfigValue) Set(val string) error {
}
(*v.config)[key] = setting.UnifiedStorageConfig{
DualWriterMode: apiserverrest.DualWriterMode(mode),
DualWriterMigrationDataSyncDisabled: true,
DualWriterMode: apiserverrest.DualWriterMode(mode),
}
}

View File

@@ -30,11 +30,8 @@ import (
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/apiserver/auditing"
grafanaresponsewriter "github.com/grafana/grafana/pkg/apiserver/endpoints/responsewriter"
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/kvstore"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/serverlock"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/middleware"
"github.com/grafana/grafana/pkg/modules"
@@ -47,7 +44,6 @@ import (
"github.com/grafana/grafana/pkg/services/apiserver/auth/authenticator"
"github.com/grafana/grafana/pkg/services/apiserver/auth/authorizer"
"github.com/grafana/grafana/pkg/services/apiserver/builder"
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request"
grafanaapiserveroptions "github.com/grafana/grafana/pkg/services/apiserver/options"
"github.com/grafana/grafana/pkg/services/apiserver/utils"
contextmodel "github.com/grafana/grafana/pkg/services/contexthandler/model"
@@ -97,10 +93,8 @@ type service struct {
tracing *tracing.TracingService
metrics prometheus.Registerer
authorizer *authorizer.GrafanaAuthorizer
serverLockService builder.ServerLockService
dualWriter dualwrite.Service
kvStore kvstore.KVStore
authorizer *authorizer.GrafanaAuthorizer
dualWriter dualwrite.Service
pluginClient plugins.Client
datasources datasource.ScopedPluginDatasourceProvider
@@ -114,7 +108,6 @@ type service struct {
aggregatorRunner aggregatorrunner.AggregatorRunner
appInstallers []appsdkapiserver.AppInstaller
builderMetrics *builder.BuilderMetrics
dualWriterMetrics *grafanarest.DualWriterMetrics
auditBackend audit.Backend
auditPolicyRuleProvider auditing.PolicyRuleProvider
@@ -125,9 +118,7 @@ func ProvideService(
features featuremgmt.FeatureToggles,
rr routing.RouteRegister,
tracing *tracing.TracingService,
serverLockService *serverlock.ServerLockService,
db db.DB,
kvStore kvstore.KVStore,
pluginClient plugins.Client,
datasources datasource.ScopedPluginDatasourceProvider,
contextProvider datasource.PluginContextWrapper,
@@ -159,12 +150,10 @@ func ProvideService(
tracing: tracing,
db: db, // For Unified storage
metrics: reg,
kvStore: kvStore,
pluginClient: pluginClient,
datasources: datasources,
contextProvider: contextProvider,
pluginStore: pluginStore,
serverLockService: serverLockService,
dualWriter: dualWriter,
unified: unified,
secrets: secrets,
@@ -173,7 +162,6 @@ func ProvideService(
aggregatorRunner: aggregatorRunner,
appInstallers: appInstallers,
builderMetrics: builderMetrics,
dualWriterMetrics: grafanarest.NewDualWriterMetrics(reg),
auditBackend: auditBackend,
auditPolicyRuleProvider: auditPolicyRuleProvider,
}
@@ -412,13 +400,9 @@ func (s *service) start(ctx context.Context) error {
builders,
o.StorageOptions,
s.metrics,
request.GetNamespaceMapper(s.cfg),
kvstore.WithNamespace(s.kvStore, 0, "storage.dualwriting"),
s.serverLockService,
s.dualWriter,
optsregister,
s.features,
s.dualWriterMetrics,
s.builderMetrics,
apiResourceConfig,
)
@@ -432,11 +416,7 @@ func (s *service) start(ctx context.Context) error {
server,
serverConfig.RESTOptionsGetter,
o.StorageOptions,
kvstore.WithNamespace(s.kvStore, 0, "storage.dualwriting"),
s.serverLockService,
request.GetNamespaceMapper(s.cfg),
s.dualWriter,
s.dualWriterMetrics,
s.builderMetrics,
serverConfig.MergedResourceConfig,
); err != nil {

View File

@@ -7,7 +7,6 @@ import (
)
var WireSet = wire.NewSet(
builder.ProvideDualWriterMetrics,
builder.ProvideBuilderMetrics,
ProvideEventualRestConfigProvider,
wire.Bind(new(RestConfigProvider), new(*eventualRestConfigProvider)),

View File

@@ -629,13 +629,7 @@ type Cfg struct {
}
type UnifiedStorageConfig struct {
DualWriterMode rest.DualWriterMode
DualWriterPeriodicDataSyncJobEnabled bool
DualWriterMigrationDataSyncDisabled bool
// DataSyncerInterval defines how often the data syncer should run for a resource on the grafana instance.
DataSyncerInterval time.Duration
// DataSyncerRecordsLimit defines how many records will be processed at max during a sync invocation.
DataSyncerRecordsLimit int
DualWriterMode rest.DualWriterMode
// EnableMigration indicates whether migration is enabled for the resource.
// If not set, will use the default from MigratedUnifiedResources.
EnableMigration bool

View File

@@ -52,18 +52,6 @@ func (cfg *Cfg) setUnifiedStorageConfig() {
// parse dualWriter modes from the section
dualWriterMode := section.Key("dualWriterMode").MustInt(0)
// parse dualWriter periodic data syncer config
dualWriterPeriodicDataSyncJobEnabled := section.Key("dualWriterPeriodicDataSyncJobEnabled").MustBool(false)
// parse dualWriter migration data sync disabled from resource section
dualWriterMigrationDataSyncDisabled := section.Key("dualWriterMigrationDataSyncDisabled").MustBool(false)
// parse dataSyncerRecordsLimit from resource section
dataSyncerRecordsLimit := section.Key("dataSyncerRecordsLimit").MustInt(1000)
// parse dataSyncerInterval from resource section
dataSyncerInterval := section.Key("dataSyncerInterval").MustDuration(time.Hour)
// parse EnableMigration from resource section
enableMigration := MigratedUnifiedResources[resourceName]
if section.HasKey("enableMigration") {
@@ -78,13 +66,9 @@ func (cfg *Cfg) setUnifiedStorageConfig() {
}
storageConfig[resourceName] = UnifiedStorageConfig{
DualWriterMode: rest.DualWriterMode(dualWriterMode),
DualWriterPeriodicDataSyncJobEnabled: dualWriterPeriodicDataSyncJobEnabled,
DualWriterMigrationDataSyncDisabled: dualWriterMigrationDataSyncDisabled,
DataSyncerRecordsLimit: dataSyncerRecordsLimit,
DataSyncerInterval: dataSyncerInterval,
EnableMigration: enableMigration,
AutoMigrationThreshold: autoMigrationThreshold,
DualWriterMode: rest.DualWriterMode(dualWriterMode),
EnableMigration: enableMigration,
AutoMigrationThreshold: autoMigrationThreshold,
}
}
cfg.UnifiedStorage = storageConfig
@@ -163,10 +147,9 @@ func (cfg *Cfg) enforceMigrationToUnifiedConfigs() {
}
cfg.Logger.Info("Enforcing mode 5 for resource in unified storage", "resource", resource)
cfg.UnifiedStorage[resource] = UnifiedStorageConfig{
DualWriterMode: 5,
DualWriterMigrationDataSyncDisabled: true,
EnableMigration: true,
AutoMigrationThreshold: resourceCfg.AutoMigrationThreshold,
DualWriterMode: 5,
EnableMigration: true,
AutoMigrationThreshold: resourceCfg.AutoMigrationThreshold,
}
}
}
@@ -204,7 +187,6 @@ func (cfg *Cfg) EnableMode5(resource string) {
}
config := cfg.UnifiedStorage[resource]
config.DualWriterMode = rest.Mode5
config.DualWriterMigrationDataSyncDisabled = true
config.EnableMigration = true
cfg.UnifiedStorage[resource] = config
}

View File

@@ -2,7 +2,6 @@ package setting
import (
"testing"
"time"
"github.com/grafana/grafana/pkg/apiserver/rest"
"github.com/stretchr/testify/assert"
@@ -49,10 +48,9 @@ func TestCfg_setUnifiedStorageConfig(t *testing.T) {
}
assert.Equal(t, UnifiedStorageConfig{
DualWriterMode: 5,
DualWriterMigrationDataSyncDisabled: true,
EnableMigration: isEnabled,
AutoMigrationThreshold: expectedThreshold,
DualWriterMode: 5,
EnableMigration: isEnabled,
AutoMigrationThreshold: expectedThreshold,
}, resourceCfg, migratedResource)
}
}
@@ -60,9 +58,6 @@ func TestCfg_setUnifiedStorageConfig(t *testing.T) {
setMigratedResourceKey("dualWriterMode", "1") // migrated resources enabled by default will change to 5 in setUnifiedStorageConfig
setSectionKey("unified_storage.resource.not_migrated.grafana.app", "dualWriterMode", "2")
setSectionKey("unified_storage.resource.not_migrated.grafana.app", "dualWriterPeriodicDataSyncJobEnabled", "true")
setSectionKey("unified_storage.resource.not_migrated.grafana.app", "dataSyncerRecordsLimit", "1001")
setSectionKey("unified_storage.resource.not_migrated.grafana.app", "dataSyncerInterval", "10m")
// Add unified_storage section for index settings
setSectionKey("unified_storage", "index_min_count", "5")
@@ -73,11 +68,8 @@ func TestCfg_setUnifiedStorageConfig(t *testing.T) {
assert.Equal(t, exists, true)
assert.Equal(t, value, UnifiedStorageConfig{
DualWriterMode: 2,
DualWriterPeriodicDataSyncJobEnabled: true,
DataSyncerRecordsLimit: 1001,
DataSyncerInterval: time.Minute * 10,
AutoMigrationThreshold: 0,
DualWriterMode: 2,
AutoMigrationThreshold: 0,
})
validateMigratedResources(false)

View File

@@ -212,7 +212,6 @@ func TestResourceMigration_AutoMigrateEnablesMode5(t *testing.T) {
if tt.wantMode5Enabled {
require.Equal(t, 5, int(config.DualWriterMode), "%s: %s", tt.description, resourceName)
require.True(t, config.EnableMigration, "%s: EnableMigration should be true for %s", tt.description, resourceName)
require.True(t, config.DualWriterMigrationDataSyncDisabled, "%s: DualWriterMigrationDataSyncDisabled should be true for %s", tt.description, resourceName)
} else {
require.Equal(t, 0, int(config.DualWriterMode), "%s: mode should be 0 for %s", tt.description, resourceName)
}