From 4164239f561e70669549ef376bea20b39c638f6e Mon Sep 17 00:00:00 2001 From: Will Assis <35489495+gassiss@users.noreply.github.com> Date: Fri, 19 Dec 2025 14:27:06 -0500 Subject: [PATCH] unified-storage: implement sqlkv Save method (#115458) * unified-storage: sqlkv save method --- .../data/sqlkv_delete_legacy_resource.sql | 5 + .../resource/data/sqlkv_insert_datastore.sql | 21 ++ .../data/sqlkv_insert_legacy_resource.sql | 31 +++ .../sqlkv_insert_legacy_resource_history.sql | 44 ++++ .../resource/data/sqlkv_save_event.sql | 15 ++ .../resource/data/sqlkv_update_datastore.sql | 3 + .../data/sqlkv_update_legacy_resource.sql | 9 + pkg/storage/unified/resource/datastore.go | 44 +++- pkg/storage/unified/resource/eventstore.go | 1 + pkg/storage/unified/resource/sqlkv.go | 249 ++++++++++++++++-- .../unified/resource/storage_backend.go | 59 ++++- .../unified/sql/rvmanager/rv_manager.go | 27 +- pkg/storage/unified/sql/server.go | 27 +- pkg/storage/unified/testing/kv.go | 45 ++-- pkg/storage/unified/testing/kv_test.go | 1 - 15 files changed, 534 insertions(+), 47 deletions(-) create mode 100644 pkg/storage/unified/resource/data/sqlkv_delete_legacy_resource.sql create mode 100644 pkg/storage/unified/resource/data/sqlkv_insert_datastore.sql create mode 100644 pkg/storage/unified/resource/data/sqlkv_insert_legacy_resource.sql create mode 100644 pkg/storage/unified/resource/data/sqlkv_insert_legacy_resource_history.sql create mode 100644 pkg/storage/unified/resource/data/sqlkv_save_event.sql create mode 100644 pkg/storage/unified/resource/data/sqlkv_update_datastore.sql create mode 100644 pkg/storage/unified/resource/data/sqlkv_update_legacy_resource.sql diff --git a/pkg/storage/unified/resource/data/sqlkv_delete_legacy_resource.sql b/pkg/storage/unified/resource/data/sqlkv_delete_legacy_resource.sql new file mode 100644 index 00000000000..e27ee578a41 --- /dev/null +++ b/pkg/storage/unified/resource/data/sqlkv_delete_legacy_resource.sql @@ -0,0 +1,5 @@ +DELETE FROM {{ .Ident "resource" }} +WHERE {{ .Ident "group" }} = {{ .Arg .Group }} +AND {{ .Ident "resource" }} = {{ .Arg .Resource }} +AND {{ .Ident "namespace" }} = {{ .Arg .Namespace }} +AND {{ .Ident "name" }} = {{ .Arg .Name }}; diff --git a/pkg/storage/unified/resource/data/sqlkv_insert_datastore.sql b/pkg/storage/unified/resource/data/sqlkv_insert_datastore.sql new file mode 100644 index 00000000000..8372eb73463 --- /dev/null +++ b/pkg/storage/unified/resource/data/sqlkv_insert_datastore.sql @@ -0,0 +1,21 @@ +INSERT INTO {{ .Ident .TableName }} +( + {{ .Ident "guid" }}, + {{ .Ident "key_path" }}, + {{ .Ident "value" }}, + {{ .Ident "group" }}, + {{ .Ident "resource" }}, + {{ .Ident "namespace" }}, + {{ .Ident "name" }}, + {{ .Ident "action" }} +) +VALUES ( + {{ .Arg .GUID }}, + {{ .Arg .KeyPath }}, + COALESCE({{ .Arg .Value }}, ""), + {{ .Arg .Group }}, + {{ .Arg .Resource }}, + {{ .Arg .Namespace }}, + {{ .Arg .Name }}, + {{ .Arg .Action }} +); diff --git a/pkg/storage/unified/resource/data/sqlkv_insert_legacy_resource.sql b/pkg/storage/unified/resource/data/sqlkv_insert_legacy_resource.sql new file mode 100644 index 00000000000..1f58bd28b43 --- /dev/null +++ b/pkg/storage/unified/resource/data/sqlkv_insert_legacy_resource.sql @@ -0,0 +1,31 @@ +INSERT INTO {{ .Ident "resource" }} +( + {{ .Ident "value" }}, + {{ .Ident "guid" }}, + {{ .Ident "group" }}, + {{ .Ident "resource" }}, + {{ .Ident "namespace" }}, + {{ .Ident "name" }}, + {{ .Ident "action" }}, + {{ .Ident "folder" }}, + {{ .Ident "previous_resource_version" }} +) +VALUES ( + COALESCE({{ .Arg .Value }}, ""), + {{ .Arg .GUID }}, + {{ .Arg .Group }}, + {{ .Arg .Resource }}, + {{ .Arg .Namespace }}, + {{ .Arg .Name }}, + {{ .Arg .Action }}, + {{ .Arg .Folder }}, + CASE WHEN {{ .Arg .Action }} = 1 THEN 0 ELSE ( + SELECT {{ .Ident "resource_version" }} + FROM {{ .Ident "resource" }} + WHERE {{ .Ident "group" }} = {{ .Arg .Group }} + AND {{ .Ident "resource" }} = {{ .Arg .Resource }} + AND {{ .Ident "namespace" }} = {{ .Arg .Namespace }} + AND {{ .Ident "name" }} = {{ .Arg .Name }} + ORDER BY {{ .Ident "resource_version" }} DESC LIMIT 1 + ) END +); diff --git a/pkg/storage/unified/resource/data/sqlkv_insert_legacy_resource_history.sql b/pkg/storage/unified/resource/data/sqlkv_insert_legacy_resource_history.sql new file mode 100644 index 00000000000..d52aac5063d --- /dev/null +++ b/pkg/storage/unified/resource/data/sqlkv_insert_legacy_resource_history.sql @@ -0,0 +1,44 @@ +INSERT INTO {{ .Ident "resource_history" }} +( + {{ .Ident "value" }}, + {{ .Ident "guid" }}, + {{ .Ident "group" }}, + {{ .Ident "resource" }}, + {{ .Ident "namespace" }}, + {{ .Ident "name" }}, + {{ .Ident "action" }}, + {{ .Ident "folder" }}, + {{ .Ident "previous_resource_version" }}, + {{ .Ident "generation" }} +) +VALUES ( + COALESCE({{ .Arg .Value }}, ""), + {{ .Arg .GUID }}, + {{ .Arg .Group }}, + {{ .Arg .Resource }}, + {{ .Arg .Namespace }}, + {{ .Arg .Name }}, + {{ .Arg .Action }}, + {{ .Arg .Folder }}, + CASE WHEN {{ .Arg .Action }} = 1 THEN 0 ELSE ( + SELECT {{ .Ident "resource_version" }} + FROM {{ .Ident "resource_history" }} + WHERE {{ .Ident "group" }} = {{ .Arg .Group }} + AND {{ .Ident "resource" }} = {{ .Arg .Resource }} + AND {{ .Ident "namespace" }} = {{ .Arg .Namespace }} + AND {{ .Ident "name" }} = {{ .Arg .Name }} + ORDER BY {{ .Ident "resource_version" }} DESC LIMIT 1 + ) END, + CASE + WHEN {{ .Arg .Action }} = 1 THEN 1 + WHEN {{ .Arg .Action }} = 3 THEN 0 + ELSE 1 + ( + SELECT COUNT(1) + FROM {{ .Ident "resource_history" }} + WHERE {{ .Ident "group" }} = {{ .Arg .Group }} + AND {{ .Ident "resource" }} = {{ .Arg .Resource }} + AND {{ .Ident "namespace" }} = {{ .Arg .Namespace }} + AND {{ .Ident "name" }} = {{ .Arg .Name }} + ) + END +); diff --git a/pkg/storage/unified/resource/data/sqlkv_save_event.sql b/pkg/storage/unified/resource/data/sqlkv_save_event.sql new file mode 100644 index 00000000000..669497dbb19 --- /dev/null +++ b/pkg/storage/unified/resource/data/sqlkv_save_event.sql @@ -0,0 +1,15 @@ +INSERT INTO {{ .Ident .TableName }} +( + {{ .Ident "key_path" }}, + {{ .Ident "value" }} +) +VALUES ( + {{ .Arg .KeyPath }}, + COALESCE({{ .Arg .Value }}, "") +) +{{- if eq .DialectName "mysql" }} +ON DUPLICATE KEY UPDATE {{ .Ident "value" }} = {{ .Arg .Value }} +{{- else }} +ON CONFLICT ({{ .Ident "key_path" }}) DO UPDATE SET {{ .Ident "value" }} = {{ .Arg .Value }} +{{- end }} +; diff --git a/pkg/storage/unified/resource/data/sqlkv_update_datastore.sql b/pkg/storage/unified/resource/data/sqlkv_update_datastore.sql new file mode 100644 index 00000000000..677666e00a4 --- /dev/null +++ b/pkg/storage/unified/resource/data/sqlkv_update_datastore.sql @@ -0,0 +1,3 @@ +UPDATE {{ .Ident .TableName }} +SET {{ .Ident "value" }} = {{ .Arg .Value }} +WHERE {{ .Ident "key_path" }} = {{ .Arg .KeyPath }}; diff --git a/pkg/storage/unified/resource/data/sqlkv_update_legacy_resource.sql b/pkg/storage/unified/resource/data/sqlkv_update_legacy_resource.sql new file mode 100644 index 00000000000..1565d0894a4 --- /dev/null +++ b/pkg/storage/unified/resource/data/sqlkv_update_legacy_resource.sql @@ -0,0 +1,9 @@ +UPDATE {{ .Ident "resource" }} +SET + {{ .Ident "value" }} = {{ .Arg .Value }}, + {{ .Ident "action" }} = {{ .Arg .Action }}, + {{ .Ident "folder" }} = {{ .Arg .Folder }} +WHERE {{ .Ident "group" }} = {{ .Arg .Group }} +AND {{ .Ident "resource" }} = {{ .Arg .Resource }} +AND {{ .Ident "namespace" }} = {{ .Arg .Namespace }} +AND {{ .Ident "name" }} = {{ .Arg .Name }}; diff --git a/pkg/storage/unified/resource/datastore.go b/pkg/storage/unified/resource/datastore.go index 8a930492420..7a1b614323e 100644 --- a/pkg/storage/unified/resource/datastore.go +++ b/pkg/storage/unified/resource/datastore.go @@ -49,6 +49,9 @@ type DataKey struct { ResourceVersion int64 Action DataAction Folder string + + // needed to maintain backwards compatibility with unified/sql + GUID string } // GroupResource represents a unique group/resource combination @@ -61,6 +64,12 @@ func (k DataKey) String() string { return fmt.Sprintf("%s/%s/%s/%s/%d~%s~%s", k.Group, k.Resource, k.Namespace, k.Name, k.ResourceVersion, k.Action, k.Folder) } +// Temporary while we need to support unified/sql/backend compatibility +// Remove once we stop using RvManager in storage_backend.go +func (k DataKey) StringWithGUID() string { + return fmt.Sprintf("%s/%s/%s/%s/%d~%s~%s~%s", k.Group, k.Resource, k.Namespace, k.Name, k.ResourceVersion, k.Action, k.Folder, k.GUID) +} + func (k DataKey) Equals(other DataKey) bool { return k.Group == other.Group && k.Resource == other.Resource && k.Namespace == other.Namespace && k.Name == other.Name && k.ResourceVersion == other.ResourceVersion && k.Action == other.Action && k.Folder == other.Folder } @@ -516,7 +525,13 @@ func (d *dataStore) Save(ctx context.Context, key DataKey, value io.Reader) erro return fmt.Errorf("invalid data key: %w", err) } - writer, err := d.kv.Save(ctx, dataSection, key.String()) + var writer io.WriteCloser + var err error + if key.GUID != "" { + writer, err = d.kv.Save(ctx, dataSection, key.StringWithGUID()) + } else { + writer, err = d.kv.Save(ctx, dataSection, key.String()) + } if err != nil { return err } @@ -583,6 +598,33 @@ func ParseKey(key string) (DataKey, error) { }, nil } +// Temporary while we need to support unified/sql/backend compatibility +// Remove once we stop using RvManager in storage_backend.go +func ParseKeyWithGUID(key string) (DataKey, error) { + parts := strings.Split(key, "/") + if len(parts) != 5 { + return DataKey{}, fmt.Errorf("invalid key: %s", key) + } + rvActionFolderGUIDParts := strings.Split(parts[4], "~") + if len(rvActionFolderGUIDParts) != 4 { + return DataKey{}, fmt.Errorf("invalid key: %s", key) + } + rv, err := strconv.ParseInt(rvActionFolderGUIDParts[0], 10, 64) + if err != nil { + return DataKey{}, fmt.Errorf("invalid resource version '%s' in key %s: %w", rvActionFolderGUIDParts[0], key, err) + } + return DataKey{ + Group: parts[0], + Resource: parts[1], + Namespace: parts[2], + Name: parts[3], + ResourceVersion: rv, + Action: DataAction(rvActionFolderGUIDParts[1]), + Folder: rvActionFolderGUIDParts[2], + GUID: rvActionFolderGUIDParts[3], + }, nil +} + // SameResource checks if this key represents the same resource as another key. // It compares the identifying fields: Group, Resource, Namespace, and Name. // ResourceVersion, Action, and Folder are ignored as they don't identify the resource itself. diff --git a/pkg/storage/unified/resource/eventstore.go b/pkg/storage/unified/resource/eventstore.go index 7f80fb6b87a..e0c01afb550 100644 --- a/pkg/storage/unified/resource/eventstore.go +++ b/pkg/storage/unified/resource/eventstore.go @@ -32,6 +32,7 @@ type EventKey struct { ResourceVersion int64 Action DataAction Folder string + GUID string } func (k EventKey) String() string { diff --git a/pkg/storage/unified/resource/sqlkv.go b/pkg/storage/unified/resource/sqlkv.go index 3c07403296b..9cc2cc32dd0 100644 --- a/pkg/storage/unified/resource/sqlkv.go +++ b/pkg/storage/unified/resource/sqlkv.go @@ -12,8 +12,10 @@ import ( "strings" "text/template" + "github.com/google/uuid" "github.com/grafana/grafana/pkg/storage/unified/sql/db" "github.com/grafana/grafana/pkg/storage/unified/sql/dbutil" + "github.com/grafana/grafana/pkg/storage/unified/sql/rvmanager" "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate" ) @@ -34,11 +36,18 @@ func mustTemplate(filename string) *template.Template { // Templates. var ( - sqlKVKeys = mustTemplate("sqlkv_keys.sql") - sqlKVGet = mustTemplate("sqlkv_get.sql") - sqlKVBatchGet = mustTemplate("sqlkv_batch_get.sql") - sqlKVDelete = mustTemplate("sqlkv_delete.sql") - sqlKVBatchDelete = mustTemplate("sqlkv_batch_delete.sql") + sqlKVKeys = mustTemplate("sqlkv_keys.sql") + sqlKVGet = mustTemplate("sqlkv_get.sql") + sqlKVBatchGet = mustTemplate("sqlkv_batch_get.sql") + sqlKVSaveEvent = mustTemplate("sqlkv_save_event.sql") + sqlKVInsertData = mustTemplate("sqlkv_insert_datastore.sql") + sqlKVUpdateData = mustTemplate("sqlkv_update_datastore.sql") + sqlKVInsertLegacyResourceHistory = mustTemplate("sqlkv_insert_legacy_resource_history.sql") + sqlKVInsertLegacyResource = mustTemplate("sqlkv_insert_legacy_resource.sql") + sqlKVUpdateLegacyResource = mustTemplate("sqlkv_update_legacy_resource.sql") + sqlKVDeleteLegacyResource = mustTemplate("sqlkv_delete_legacy_resource.sql") + sqlKVDelete = mustTemplate("sqlkv_delete.sql") + sqlKVBatchDelete = mustTemplate("sqlkv_batch_delete.sql") ) // sqlKVSection can be embedded in structs used when rendering query templates @@ -128,6 +137,45 @@ func (req sqlKVBatchRequest) KeyPaths() []string { return result } +type sqlKVSaveRequest struct { + sqltemplate.SQLTemplate + sqlKVSectionKey + Value []byte + + // old fields that can be removed once we prune resource_history + GUID string + Group string + Resource string + Namespace string + Name string + Action int64 + Folder string +} + +func (req sqlKVSaveRequest) Validate() error { + return req.sqlKVSectionKey.Validate() +} + +type sqlKVLegacySaveRequest struct { + sqltemplate.SQLTemplate + Value []byte + GUID string + Group string + Resource string + Namespace string + Name string + Action int64 + Folder string +} + +func (req sqlKVLegacySaveRequest) Validate() error { + return nil +} + +func (req sqlKVLegacySaveRequest) Results() ([]byte, error) { + return req.Value, nil +} + type sqlKVKeysRequest struct { sqltemplate.SQLTemplate sqlKVSection @@ -285,20 +333,187 @@ func (k *sqlKV) BatchGet(ctx context.Context, section string, keys []string) ite } } -// TODO: this function only exists to support the testing of the sqlkv implementation before -// we have a proper implementation of `Save`. -func (k *sqlKV) TestingSave(ctx context.Context, key string, value []byte) error { - stmt := fmt.Sprintf( - `INSERT INTO resource_events (key_path, value) VALUES (%s, %s)`, - k.dialect.ArgPlaceholder(1), k.dialect.ArgPlaceholder(2), - ) +func (k *sqlKV) Save(ctx context.Context, section string, key string) (io.WriteCloser, error) { + sectionKey := sqlKVSectionKey{sqlKVSection{section}, key} + if err := sectionKey.Validate(); err != nil { + return nil, err + } - _, err := k.db.ExecContext(ctx, stmt, eventsSection+"/"+key, value) - return err + return &sqlWriteCloser{ + kv: k, + ctx: ctx, + sectionKey: sectionKey, + buf: &bytes.Buffer{}, + closed: false, + }, nil } -func (k *sqlKV) Save(ctx context.Context, section string, key string) (io.WriteCloser, error) { - panic("not implemented!") +type sqlWriteCloser struct { + kv *sqlKV + ctx context.Context + sectionKey sqlKVSectionKey + buf *bytes.Buffer + closed bool +} + +func (w *sqlWriteCloser) Write(value []byte) (int, error) { + if w.closed { + return 0, errors.New("write to closed writer") + } + + return w.buf.Write(value) +} + +func (w *sqlWriteCloser) Close() error { + if w.closed { + return nil + } + + w.closed = true + + // do regular kv save: simple key_path + value insert with conflict check. + // can only do this on resource_events for now, until we drop the columns in resource_history + if w.sectionKey.Section == eventsSection { + _, err := dbutil.Exec(w.ctx, w.kv.db, sqlKVSaveEvent, sqlKVSaveRequest{ + SQLTemplate: sqltemplate.New(w.kv.dialect), + sqlKVSectionKey: w.sectionKey, + Value: w.buf.Bytes(), + }) + + if err != nil { + return fmt.Errorf("failed to save: %w", err) + } + + return nil + } + + // if storage_backend is running with an RvManager, it will inject a transaction into the context + // used to keep backwards compatibility between sql-based kvstore and unified/sql/backend + tx, ok := rvmanager.TxFromCtx(w.ctx) + if !ok { + // temporary save for dataStore without rvmanager + // we can use the same template as the event one after we: + // - move PK from GUID to key_path + // - remove all unnecessary columns (or at least their NOT NULL constraints) + _, err := w.kv.Get(w.ctx, w.sectionKey.Section, w.sectionKey.Key) + if errors.Is(err, ErrNotFound) { + _, err := dbutil.Exec(w.ctx, w.kv.db, sqlKVInsertData, sqlKVSaveRequest{ + SQLTemplate: sqltemplate.New(w.kv.dialect), + sqlKVSectionKey: w.sectionKey, + GUID: uuid.New().String(), + Value: w.buf.Bytes(), + }) + + if err != nil { + return fmt.Errorf("failed to insert to datastore: %w", err) + } + + return nil + } + + if err != nil { + return fmt.Errorf("failed to get for save: %w", err) + } + + _, err = dbutil.Exec(w.ctx, w.kv.db, sqlKVUpdateData, sqlKVSaveRequest{ + SQLTemplate: sqltemplate.New(w.kv.dialect), + sqlKVSectionKey: w.sectionKey, + Value: w.buf.Bytes(), + }) + + if err != nil { + return fmt.Errorf("failed to update to datastore: %w", err) + } + + return nil + } + + // special, temporary save that includes all the fields in resource_history that are not relevant for the kvstore, + // as well as the resource table. This is only called if an RvManager was passed to storage_backend, as that + // component will be responsible for populating the resource_version and key_path columns + // note that we are not touching resource_version table, neither the resource_version columns or the key_path column + // as the RvManager will be responsible for this + dataKey, err := ParseKeyWithGUID(w.sectionKey.Key) + if err != nil { + return fmt.Errorf("failed to parse key: %w", err) + } + + var action int64 + switch dataKey.Action { + case DataActionCreated: + action = 1 + case DataActionUpdated: + action = 2 + case DataActionDeleted: + action = 3 + default: + return fmt.Errorf("failed to parse key: %w", err) + } + + _, err = dbutil.Exec(w.ctx, tx, sqlKVInsertLegacyResourceHistory, sqlKVSaveRequest{ + SQLTemplate: sqltemplate.New(w.kv.dialect), + sqlKVSectionKey: w.sectionKey, + Value: w.buf.Bytes(), + GUID: dataKey.GUID, + Group: dataKey.Group, + Resource: dataKey.Resource, + Namespace: dataKey.Namespace, + Name: dataKey.Name, + Action: action, + Folder: dataKey.Folder, + }) + + if err != nil { + return fmt.Errorf("failed to save to resource_history: %w", err) + } + + switch dataKey.Action { + case DataActionCreated: + _, err = dbutil.Exec(w.ctx, tx, sqlKVInsertLegacyResource, sqlKVLegacySaveRequest{ + SQLTemplate: sqltemplate.New(w.kv.dialect), + Value: w.buf.Bytes(), + GUID: dataKey.GUID, + Group: dataKey.Group, + Resource: dataKey.Resource, + Namespace: dataKey.Namespace, + Name: dataKey.Name, + Action: action, + Folder: dataKey.Folder, + }) + + if err != nil { + return fmt.Errorf("failed to insert to resource: %w", err) + } + case DataActionUpdated: + _, err = dbutil.Exec(w.ctx, tx, sqlKVUpdateLegacyResource, sqlKVLegacySaveRequest{ + SQLTemplate: sqltemplate.New(w.kv.dialect), + Value: w.buf.Bytes(), + Group: dataKey.Group, + Resource: dataKey.Resource, + Namespace: dataKey.Namespace, + Name: dataKey.Name, + Action: action, + Folder: dataKey.Folder, + }) + + if err != nil { + return fmt.Errorf("failed to update resource: %w", err) + } + case DataActionDeleted: + _, err = dbutil.Exec(w.ctx, tx, sqlKVDeleteLegacyResource, sqlKVLegacySaveRequest{ + SQLTemplate: sqltemplate.New(w.kv.dialect), + Group: dataKey.Group, + Resource: dataKey.Resource, + Namespace: dataKey.Namespace, + Name: dataKey.Name, + }) + + if err != nil { + return fmt.Errorf("failed to delete from resource: %w", err) + } + } + + return nil } func (k *sqlKV) Delete(ctx context.Context, section string, key string) error { @@ -319,6 +534,8 @@ func (k *sqlKV) Delete(ctx context.Context, section string, key string) error { return ErrNotFound } + // TODO reflect change to resource table + return nil } diff --git a/pkg/storage/unified/resource/storage_backend.go b/pkg/storage/unified/resource/storage_backend.go index b0f51702775..13f2b9d6159 100644 --- a/pkg/storage/unified/resource/storage_backend.go +++ b/pkg/storage/unified/resource/storage_backend.go @@ -14,6 +14,7 @@ import ( "time" "github.com/bwmarrin/snowflake" + "github.com/google/uuid" "github.com/grafana/grafana-app-sdk/logging" "github.com/prometheus/client_golang/prometheus" "go.opentelemetry.io/otel/trace" @@ -22,6 +23,8 @@ import ( "github.com/grafana/grafana/pkg/apimachinery/utils" "github.com/grafana/grafana/pkg/storage/unified/resourcepb" + "github.com/grafana/grafana/pkg/storage/unified/sql/db" + "github.com/grafana/grafana/pkg/storage/unified/sql/rvmanager" "github.com/grafana/grafana/pkg/util/debouncer" ) @@ -68,6 +71,8 @@ type kvStorageBackend struct { withExperimentalClusterScope bool //tracer trace.Tracer //reg prometheus.Registerer + + rvManager *rvmanager.ResourceVersionManager } var _ KVBackend = &kvStorageBackend{} @@ -85,6 +90,10 @@ type KVBackendOptions struct { EventPruningInterval time.Duration // How often to run the event pruning (default: 5 minutes) Tracer trace.Tracer // TODO add tracing Reg prometheus.Registerer // TODO add metrics + + // Adding RvManager overrides the RV generated with snowflake in order to keep backwards compatibility with + // unified/sql + RvManager *rvmanager.ResourceVersionManager } func NewKVStorageBackend(opts KVBackendOptions) (KVBackend, error) { @@ -119,6 +128,7 @@ func NewKVStorageBackend(opts KVBackendOptions) (KVBackend, error) { eventRetentionPeriod: eventRetentionPeriod, eventPruningInterval: eventPruningInterval, withExperimentalClusterScope: opts.WithExperimentalClusterScope, + rvManager: opts.RvManager, } err = backend.initPruner(ctx) if err != nil { @@ -317,9 +327,28 @@ func (k *kvStorageBackend) WriteEvent(ctx context.Context, event WriteEvent) (in Action: action, Folder: obj.GetFolder(), } - err := k.dataStore.Save(ctx, dataKey, bytes.NewReader(event.Value)) - if err != nil { - return 0, fmt.Errorf("failed to write data: %w", err) + + if k.rvManager != nil { + dataKey.GUID = uuid.New().String() + var err error + rv, err = k.rvManager.ExecWithRV(ctx, event.Key, func(tx db.Tx) (string, error) { + err := k.dataStore.Save(rvmanager.ContextWithTx(ctx, tx), dataKey, bytes.NewReader(event.Value)) + if err != nil { + return "", fmt.Errorf("failed to write data: %w", err) + } + + return dataKey.GUID, nil + }) + if err != nil { + return 0, fmt.Errorf("failed to write data: %w", err) + } + + dataKey.ResourceVersion = rv + } else { + err := k.dataStore.Save(ctx, dataKey, bytes.NewReader(event.Value)) + if err != nil { + return 0, fmt.Errorf("failed to write data: %w", err) + } } // Optimistic concurrency control to verify our write is the latest version @@ -340,14 +369,22 @@ func (k *kvStorageBackend) WriteEvent(ctx context.Context, event WriteEvent) (in } // Check if the RV we just wrote is the latest. If not, a concurrent write with higher RV happened - if latestKey.ResourceVersion != rv { + if !rvmanager.IsRvEqual(latestKey.ResourceVersion, rv) { // Delete the data we just wrote since it's not the latest + // if we're running with rvManager, convert the ResourceVersion back to snowflake to delete + if k.rvManager != nil { + dataKey.ResourceVersion = rvmanager.SnowflakeFromRv(dataKey.ResourceVersion) + } _ = k.dataStore.Delete(ctx, dataKey) return 0, fmt.Errorf("optimistic locking failed: concurrent modification detected") } - if prevKey.ResourceVersion != event.PreviousRV { + if !rvmanager.IsRvEqual(prevKey.ResourceVersion, event.PreviousRV) { // Another concurrent write happened between our read and write + // if we're running with rvManager, convert the ResourceVersion back to snowflake to delete + if k.rvManager != nil { + dataKey.ResourceVersion = rvmanager.SnowflakeFromRv(dataKey.ResourceVersion) + } _ = k.dataStore.Delete(ctx, dataKey) return 0, fmt.Errorf("optimistic locking failed: resource was modified concurrently (expected previous RV %d, found %d)", event.PreviousRV, prevKey.ResourceVersion) } @@ -366,8 +403,12 @@ func (k *kvStorageBackend) WriteEvent(ctx context.Context, event WriteEvent) (in } // Check if the RV we just wrote is the latest. If not, a concurrent create with higher RV happened - if latestKey.ResourceVersion != rv { + if !rvmanager.IsRvEqual(latestKey.ResourceVersion, rv) { // Delete the data we just wrote since it's not the latest + // if we're running with rvManager, convert the ResourceVersion back to snowflake to delete + if k.rvManager != nil { + dataKey.ResourceVersion = rvmanager.SnowflakeFromRv(dataKey.ResourceVersion) + } _ = k.dataStore.Delete(ctx, dataKey) return 0, fmt.Errorf("optimistic locking failed: concurrent create detected") } @@ -375,6 +416,10 @@ func (k *kvStorageBackend) WriteEvent(ctx context.Context, event WriteEvent) (in // Verify that the immediate predecessor is not a create if prevKey.Action == DataActionCreated { // Another concurrent create happened - delete our write and return error + // if we're running with rvManager, convert the ResourceVersion back to snowflake to delete + if k.rvManager != nil { + dataKey.ResourceVersion = rvmanager.SnowflakeFromRv(dataKey.ResourceVersion) + } _ = k.dataStore.Delete(ctx, dataKey) return 0, fmt.Errorf("optimistic locking failed: concurrent create detected") } @@ -391,7 +436,7 @@ func (k *kvStorageBackend) WriteEvent(ctx context.Context, event WriteEvent) (in Folder: obj.GetFolder(), PreviousRV: event.PreviousRV, } - err = k.eventStore.Save(ctx, eventData) + err := k.eventStore.Save(ctx, eventData) if err != nil { // Clean up the data we wrote since event save failed _ = k.dataStore.Delete(ctx, dataKey) diff --git a/pkg/storage/unified/sql/rvmanager/rv_manager.go b/pkg/storage/unified/sql/rvmanager/rv_manager.go index b4f3b5de596..b10685f22ad 100644 --- a/pkg/storage/unified/sql/rvmanager/rv_manager.go +++ b/pkg/storage/unified/sql/rvmanager/rv_manager.go @@ -21,6 +21,19 @@ import ( "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate" ) +type contextKey string + +const txKey contextKey = "rvmanager_db_tx" + +func ContextWithTx(ctx context.Context, tx db.Tx) context.Context { + return context.WithValue(ctx, txKey, tx) +} + +func TxFromCtx(ctx context.Context) (db.Tx, bool) { + tx, ok := ctx.Value(txKey).(db.Tx) + return tx, ok +} + var tracer = otel.Tracer("github.com/grafana/grafana/pkg/storage/unified/sql/rvmanager") var ( @@ -294,7 +307,7 @@ func (m *ResourceVersionManager) execBatch(ctx context.Context, group, resource // Allocate the RVs for i, guid := range guids { guidToRV[guid] = rv - guidToSnowflakeRV[guid] = snowflakeFromRv(rv) + guidToSnowflakeRV[guid] = SnowflakeFromRv(rv) rvs[i] = rv rv++ } @@ -353,10 +366,20 @@ func (m *ResourceVersionManager) execBatch(ctx context.Context, group, resource // takes a unix microsecond rv and transforms into a snowflake format. The timestamp is converted from microsecond to // millisecond (the integer division) and the remainder is saved in the stepbits section. machine id is always 0 -func snowflakeFromRv(rv int64) int64 { +func SnowflakeFromRv(rv int64) int64 { return (((rv / 1000) - snowflake.Epoch) << (snowflake.NodeBits + snowflake.StepBits)) + (rv % 1000) } +// helper utility to compare two RVs. The first RV must be in snowflake format. Will convert rv2 to snowflake and retry +// if comparison fails +func IsRvEqual(rv1, rv2 int64) bool { + if rv1 == rv2 { + return true + } + + return rv1 == SnowflakeFromRv(rv2) +} + // Lock locks the resource version for the given key func (m *ResourceVersionManager) Lock(ctx context.Context, x db.ContextExecer, group, resource string) (nextRV int64, err error) { // 1. Lock the row and prevent concurrent updates until the transaction is committed diff --git a/pkg/storage/unified/sql/server.go b/pkg/storage/unified/sql/server.go index 84eda71ca20..f4a1ee3ce77 100644 --- a/pkg/storage/unified/sql/server.go +++ b/pkg/storage/unified/sql/server.go @@ -20,6 +20,8 @@ import ( "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/storage/unified/resource" "github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl" + "github.com/grafana/grafana/pkg/storage/unified/sql/rvmanager" + "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate" ) type QOSEnqueueDequeuer interface { @@ -103,11 +105,34 @@ func NewResourceServer(opts ServerOptions) (resource.ResourceServer, error) { return nil, fmt.Errorf("error creating sqlkv: %s", err) } - kvBackend, err := resource.NewKVStorageBackend(resource.KVBackendOptions{ + kvBackendOpts := resource.KVBackendOptions{ KvStore: sqlkv, Tracer: opts.Tracer, Reg: opts.Reg, + } + + ctx := context.Background() + dbConn, err := eDB.Init(ctx) + if err != nil { + return nil, fmt.Errorf("error initializing DB: %w", err) + } + + dialect := sqltemplate.DialectForDriver(dbConn.DriverName()) + if dialect == nil { + return nil, fmt.Errorf("unsupported database driver: %s", dbConn.DriverName()) + } + + rvManager, err := rvmanager.NewResourceVersionManager(rvmanager.ResourceManagerOptions{ + Dialect: dialect, + DB: dbConn, }) + if err != nil { + return nil, fmt.Errorf("failed to create resource version manager: %w", err) + } + + // TODO add config to decide whether to pass RvManager or not + kvBackendOpts.RvManager = rvManager + kvBackend, err := resource.NewKVStorageBackend(kvBackendOpts) if err != nil { return nil, fmt.Errorf("error creating kv backend: %s", err) } diff --git a/pkg/storage/unified/testing/kv.go b/pkg/storage/unified/testing/kv.go index 770bf9ac6e6..d1900c7a46e 100644 --- a/pkg/storage/unified/testing/kv.go +++ b/pkg/storage/unified/testing/kv.go @@ -148,14 +148,13 @@ func runTestKVGet(t *testing.T, kv resource.KV, nsPrefix string) { func runTestKVSave(t *testing.T, kv resource.KV, nsPrefix string) { ctx := testutil.NewTestContext(t, time.Now().Add(30*time.Second)) - section := nsPrefix + "-save" t.Run("save new key", func(t *testing.T) { testValue := "new test value" - saveKVHelper(t, kv, ctx, section, "new-key", strings.NewReader(testValue)) + saveKVHelper(t, kv, ctx, testSection, "new-key", strings.NewReader(testValue)) // Verify it was saved - reader, err := kv.Get(ctx, section, "new-key") + reader, err := kv.Get(ctx, testSection, "new-key") require.NoError(t, err) value, err := io.ReadAll(reader) @@ -166,6 +165,26 @@ func runTestKVSave(t *testing.T, kv resource.KV, nsPrefix string) { }) t.Run("save overwrite existing key", func(t *testing.T) { + // First save + saveKVHelper(t, kv, ctx, testSection, "overwrite-key", strings.NewReader("old value")) + + // Overwrite + newValue := "new value" + saveKVHelper(t, kv, ctx, testSection, "overwrite-key", strings.NewReader(newValue)) + + // Verify it was updated + reader, err := kv.Get(ctx, testSection, "overwrite-key") + require.NoError(t, err) + + value, err := io.ReadAll(reader) + require.NoError(t, err) + assert.Equal(t, newValue, string(value)) + err = reader.Close() + require.NoError(t, err) + }) + + t.Run("save overwrite existing key (datastore)", func(t *testing.T) { + section := "unified/data" // First save saveKVHelper(t, kv, ctx, section, "overwrite-key", strings.NewReader("old value")) @@ -192,10 +211,10 @@ func runTestKVSave(t *testing.T, kv resource.KV, nsPrefix string) { t.Run("save binary data", func(t *testing.T) { binaryData := []byte{0x00, 0x01, 0x02, 0x03, 0xFF, 0xFE, 0xFD} - saveKVHelper(t, kv, ctx, section, "binary-key", bytes.NewReader(binaryData)) + saveKVHelper(t, kv, ctx, testSection, "binary-key", bytes.NewReader(binaryData)) // Verify binary data - reader, err := kv.Get(ctx, section, "binary-key") + reader, err := kv.Get(ctx, testSection, "binary-key") require.NoError(t, err) value, err := io.ReadAll(reader) @@ -207,10 +226,10 @@ func runTestKVSave(t *testing.T, kv resource.KV, nsPrefix string) { t.Run("save key with no data", func(t *testing.T) { // Save a key with empty data - saveKVHelper(t, kv, ctx, section, "empty-key", strings.NewReader("")) + saveKVHelper(t, kv, ctx, testSection, "empty-key", strings.NewReader("")) // Verify it was saved with empty data - reader, err := kv.Get(ctx, section, "empty-key") + reader, err := kv.Get(ctx, testSection, "empty-key") require.NoError(t, err) value, err := io.ReadAll(reader) @@ -907,18 +926,6 @@ func runTestKVBatchDelete(t *testing.T, kv resource.KV, nsPrefix string) { func saveKVHelper(t *testing.T, kv resource.KV, ctx context.Context, section, key string, value io.Reader) { t.Helper() - // TODO: remove this check once the sqlkv implementation supports `Save`. - type testingSaver interface { - TestingSave(context.Context, string, []byte) error - } - - if saver, ok := kv.(testingSaver); ok { - blob, err := io.ReadAll(value) - require.NoError(t, err) - require.NoError(t, saver.TestingSave(ctx, key, blob)) - return - } - writer, err := kv.Save(ctx, section, key) require.NoError(t, err) _, err = io.Copy(writer, value) diff --git a/pkg/storage/unified/testing/kv_test.go b/pkg/storage/unified/testing/kv_test.go index 3814df17b7e..5e94ccd8a7f 100644 --- a/pkg/storage/unified/testing/kv_test.go +++ b/pkg/storage/unified/testing/kv_test.go @@ -47,7 +47,6 @@ func TestSQLKV(t *testing.T) { }, &KVTestOptions{ NSPrefix: "sql-kv-test", SkipTests: map[string]bool{ - TestKVSave: true, TestKVConcurrent: true, TestKVUnixTimestamp: true, },