From bd8f0083cec7355dcc9984d71df588b2b3c00e63 Mon Sep 17 00:00:00 2001 From: Renato Costa <103441181+renatolabs@users.noreply.github.com> Date: Tue, 16 Dec 2025 14:32:13 -0500 Subject: [PATCH] unified-storage: add `Get` support to the sqlkv implementation (#115382) --- apps/iam/go.sum | 2 + .../unified/resource/data/sqlkv_get.sql | 3 + pkg/storage/unified/resource/kv.go | 3 + pkg/storage/unified/resource/sqlkv.go | 115 +++++++++++++++++- pkg/storage/unified/testing/kv.go | 34 +++++- pkg/storage/unified/testing/kv_test.go | 1 - 6 files changed, 151 insertions(+), 7 deletions(-) create mode 100644 pkg/storage/unified/resource/data/sqlkv_get.sql diff --git a/apps/iam/go.sum b/apps/iam/go.sum index cf4535fbe71..4c8ec5e38b5 100644 --- a/apps/iam/go.sum +++ b/apps/iam/go.sum @@ -150,6 +150,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/toml v1.5.0 h1:W5quZX/G/csjUnuI8SUYlsHs9M38FC7znL0lIO+DvMg= github.com/BurntSushi/toml v1.5.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= +github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/FZambia/eagle v0.2.0 h1:1kQaZpJvbkvAXFRE/9K2ucBMuVqo+E29EMLYB74hIis= github.com/FZambia/eagle v0.2.0/go.mod h1:LKMYBwGYhao5sJI0TppvQ4SvvldFj9gITxrl8NvGwG0= diff --git a/pkg/storage/unified/resource/data/sqlkv_get.sql b/pkg/storage/unified/resource/data/sqlkv_get.sql new file mode 100644 index 00000000000..48c83cf0450 --- /dev/null +++ b/pkg/storage/unified/resource/data/sqlkv_get.sql @@ -0,0 +1,3 @@ +SELECT {{ .Ident "value" | .Into .Value }} +FROM {{ .Ident .TableName }} +WHERE {{ .Ident "key_path" }} = {{ .Arg .KeyPath }}; diff --git a/pkg/storage/unified/resource/kv.go b/pkg/storage/unified/resource/kv.go index 043ea45f695..0af2cc3c4a1 100644 --- a/pkg/storage/unified/resource/kv.go +++ b/pkg/storage/unified/resource/kv.go @@ -87,6 +87,9 @@ func (k *badgerKV) Get(ctx context.Context, section string, key string) (io.Read if section == "" { return nil, fmt.Errorf("section is required") } + if key == "" { + return nil, fmt.Errorf("key is required") + } key = section + "/" + key diff --git a/pkg/storage/unified/resource/sqlkv.go b/pkg/storage/unified/resource/sqlkv.go index f6e8d0698f3..c943b5292f6 100644 --- a/pkg/storage/unified/resource/sqlkv.go +++ b/pkg/storage/unified/resource/sqlkv.go @@ -1,19 +1,101 @@ package resource import ( + "bytes" "context" + "database/sql" + "embed" + "errors" "fmt" "io" "iter" + "text/template" "github.com/grafana/grafana/pkg/storage/unified/sql/db" + "github.com/grafana/grafana/pkg/storage/unified/sql/dbutil" + "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate" ) +// Templates setup. +var ( + //go:embed data/*.sql + sqlTemplatesFS embed.FS + + sqlTemplates = template.Must(template.New("sql").ParseFS(sqlTemplatesFS, `data/*.sql`)) +) + +func mustTemplate(filename string) *template.Template { + if t := sqlTemplates.Lookup(filename); t != nil { + return t + } + panic(fmt.Sprintf("template file not found: %s", filename)) +} + +// Templates. +var ( + sqlKVGet = mustTemplate("sqlkv_get.sql") +) + +type sqlKVSection struct { + Section string +} + +func (req sqlKVSection) Validate() error { + if req.Section == "" { + return fmt.Errorf("section is required") + } + + if req.Section != dataSection && req.Section != eventsSection { + return fmt.Errorf("invalid section: %s", req.Section) + } + + return nil +} + +func (req sqlKVSection) TableName() string { + if req.Section == dataSection { + return "resource_history" + } + + return "resource_events" +} + +type sqlKVGetRequest struct { + sqltemplate.SQLTemplate + sqlKVSection + Key string + *sqlKVGetResponse +} + +type sqlKVGetResponse struct { + Value []byte +} + +func (req sqlKVGetRequest) Validate() error { + if err := req.sqlKVSection.Validate(); err != nil { + return err + } + if req.Key == "" { + return fmt.Errorf("key is required") + } + + return nil +} + +func (req sqlKVGetRequest) Results() ([]byte, error) { + return req.Value, nil +} + +func (req sqlKVGetRequest) KeyPath() string { + return req.Section + "/" + req.Key +} + var _ KV = &sqlKV{} type sqlKV struct { dbProvider db.DBProvider db db.DB + dialect sqltemplate.Dialect } func NewSQLKV(dbProvider db.DBProvider) (KV, error) { @@ -27,9 +109,15 @@ func NewSQLKV(dbProvider db.DBProvider) (KV, error) { return nil, fmt.Errorf("error initializing DB: %w", err) } + dialect := sqltemplate.DialectForDriver(dbConn.DriverName()) + if dialect == nil { + return nil, fmt.Errorf("unsupported database driver: %s", dbConn.DriverName()) + } + return &sqlKV{ dbProvider: dbProvider, db: dbConn, + dialect: dialect, }, nil } @@ -44,7 +132,20 @@ func (k *sqlKV) Keys(ctx context.Context, section string, opt ListOptions) iter. } func (k *sqlKV) Get(ctx context.Context, section string, key string) (io.ReadCloser, error) { - panic("not implemented!") + value, err := dbutil.QueryRow(ctx, k.db, sqlKVGet, sqlKVGetRequest{ + SQLTemplate: sqltemplate.New(k.dialect), + sqlKVSection: sqlKVSection{section}, + Key: key, + sqlKVGetResponse: new(sqlKVGetResponse), + }) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, ErrNotFound + } + return nil, fmt.Errorf("failed to get key: %w", err) + } + + return io.NopCloser(bytes.NewReader(value)), nil } func (k *sqlKV) BatchGet(ctx context.Context, section string, keys []string) iter.Seq2[KeyValue, error] { @@ -53,6 +154,18 @@ func (k *sqlKV) BatchGet(ctx context.Context, section string, keys []string) ite } } +// TODO: this function only exists to support the testing of the sqlkv implementation before +// we have a proper implementation of `Save`. +func (k *sqlKV) TestingSave(ctx context.Context, key string, value []byte) error { + stmt := fmt.Sprintf( + `INSERT INTO resource_events (key_path, value) VALUES (%s, %s)`, + k.dialect.ArgPlaceholder(1), k.dialect.ArgPlaceholder(2), + ) + + _, err := k.db.ExecContext(ctx, stmt, eventsSection+"/"+key, value) + return err +} + func (k *sqlKV) Save(ctx context.Context, section string, key string) (io.WriteCloser, error) { panic("not implemented!") } diff --git a/pkg/storage/unified/testing/kv.go b/pkg/storage/unified/testing/kv.go index f30f1d761d7..9688c5c1029 100644 --- a/pkg/storage/unified/testing/kv.go +++ b/pkg/storage/unified/testing/kv.go @@ -86,15 +86,20 @@ func RunKVTest(t *testing.T, newKV NewKVFunc, opts *KVTestOptions) { func runTestKVGet(t *testing.T, kv resource.KV, nsPrefix string) { ctx := testutil.NewTestContext(t, time.Now().Add(30*time.Second)) - section := nsPrefix + "-get" + // Use `eventsSection` as the section for these tests, as the sqlkv implementation + // needs a real section to determine which table to use, and apply `nsPrefix` on + // the key itself. + section := "unified/events" + keyPrefix := nsPrefix + "-get" + prefixed := func(name string) string { return keyPrefix + "/" + name } t.Run("get existing key", func(t *testing.T) { // First save a key testValue := "test value for get" - saveKVHelper(t, kv, ctx, section, "existing-key", strings.NewReader(testValue)) + saveKVHelper(t, kv, ctx, section, prefixed("existing-key"), strings.NewReader(testValue)) // Now get it - reader, err := kv.Get(ctx, section, "existing-key") + reader, err := kv.Get(ctx, section, prefixed("existing-key")) require.NoError(t, err) // Read the value @@ -108,16 +113,22 @@ func runTestKVGet(t *testing.T, kv resource.KV, nsPrefix string) { }) t.Run("get non-existent key", func(t *testing.T) { - _, err := kv.Get(ctx, section, "non-existent-key") + _, err := kv.Get(ctx, section, prefixed("non-existent-key")) assert.Error(t, err) assert.Equal(t, resource.ErrNotFound, err) }) t.Run("get with empty section", func(t *testing.T) { - _, err := kv.Get(ctx, "", "some-key") + _, err := kv.Get(ctx, "", prefixed("some-key")) assert.Error(t, err) assert.Contains(t, err.Error(), "section is required") }) + + t.Run("get with empty key", func(t *testing.T) { + _, err := kv.Get(ctx, section, "") + assert.Error(t, err) + assert.Contains(t, err.Error(), "key is required") + }) } func runTestKVSave(t *testing.T, kv resource.KV, nsPrefix string) { @@ -800,6 +811,19 @@ func runTestKVBatchDelete(t *testing.T, kv resource.KV, nsPrefix string) { // saveKVHelper is a helper function to save data to KV store using the new WriteCloser interface func saveKVHelper(t *testing.T, kv resource.KV, ctx context.Context, section, key string, value io.Reader) { t.Helper() + + // TODO: remove this check once the sqlkv implementation supports `Save`. + type testingSaver interface { + TestingSave(context.Context, string, []byte) error + } + + if saver, ok := kv.(testingSaver); ok { + blob, err := io.ReadAll(value) + require.NoError(t, err) + require.NoError(t, saver.TestingSave(ctx, key, blob)) + return + } + writer, err := kv.Save(ctx, section, key) require.NoError(t, err) _, err = io.Copy(writer, value) diff --git a/pkg/storage/unified/testing/kv_test.go b/pkg/storage/unified/testing/kv_test.go index 4dbd27d5ec9..4fc343d450e 100644 --- a/pkg/storage/unified/testing/kv_test.go +++ b/pkg/storage/unified/testing/kv_test.go @@ -47,7 +47,6 @@ func TestSQLKV(t *testing.T) { }, &KVTestOptions{ NSPrefix: "sql-kv-test", SkipTests: map[string]bool{ - TestKVGet: true, TestKVSave: true, TestKVDelete: true, TestKVKeys: true,