From 08c611c68b7f5498ebdefa6dfcdaf367f6ef97bd Mon Sep 17 00:00:00 2001 From: Georges Chaudy Date: Thu, 18 Jul 2024 17:03:18 +0200 Subject: [PATCH] ResourceServer: Resource store sql backend (#90170) --- pkg/services/apiserver/service.go | 26 +- pkg/services/store/entity/sqlstash/create.go | 2 +- pkg/services/store/entity/sqlstash/delete.go | 2 +- .../store/entity/sqlstash/folder_support.go | 2 +- pkg/services/store/entity/sqlstash/queries.go | 2 +- .../store/entity/sqlstash/queries_test.go | 2 +- .../entity/sqlstash/sql_storage_server.go | 2 +- .../sqlstash/sql_storage_server_test.go | 2 +- pkg/services/store/entity/sqlstash/update.go | 2 +- pkg/services/store/entity/sqlstash/utils.go | 2 +- .../store/entity/sqlstash/utils_test.go | 4 +- pkg/storage/unified/sql/backend.go | 733 ++++++++++++++++++ pkg/storage/unified/sql/backend_test.go | 254 ++++++ pkg/storage/unified/sql/continue.go | 32 + .../unified/sql/data/resource_delete.sql | 7 + .../sql/data/resource_history_insert.sql | 23 + .../sql/data/resource_history_list.sql | 32 + .../sql/data/resource_history_poll.sql | 12 + .../sql/data/resource_history_read.sql | 17 + .../sql/data/resource_history_update_rv.sql | 4 + .../unified/sql/data/resource_insert.sql | 23 + .../unified/sql/data/resource_list.sql | 24 + .../unified/sql/data/resource_read.sql | 10 + .../unified/sql/data/resource_update.sql | 11 + .../unified/sql/data/resource_update_rv.sql | 4 + .../unified/sql/data/resource_version_get.sql | 8 + .../unified/sql/data/resource_version_inc.sql | 7 + .../sql/data/resource_version_insert.sql | 13 + pkg/storage/unified/sql/db/dbimpl/db.go | 59 ++ pkg/storage/unified/sql/db/dbimpl/dbEngine.go | 105 +++ .../unified/sql/db/dbimpl/dbEngine_test.go | 92 +++ pkg/storage/unified/sql/db/dbimpl/db_test.go | 154 ++++ pkg/storage/unified/sql/db/dbimpl/dbimpl.go | 166 ++++ pkg/storage/unified/sql/db/dbimpl/util.go | 111 +++ .../unified/sql/db/dbimpl/util_test.go | 108 +++ .../unified/sql/db/migrations/migrator.go | 24 + .../unified/sql/db/migrations/resource_mig.go | 101 +++ pkg/storage/unified/sql/db/service.go | 71 ++ pkg/storage/unified/sql/queries.go | 191 +++++ pkg/storage/unified/sql/queries_test.go | 364 +++++++++ pkg/storage/unified/sql/server.go | 31 + .../unified/sql}/sqltemplate/args.go | 0 .../unified/sql}/sqltemplate/args_test.go | 0 .../unified/sql}/sqltemplate/dialect.go | 0 .../unified/sql}/sqltemplate/dialect_mysql.go | 0 .../sql}/sqltemplate/dialect_postgresql.go | 0 .../sqltemplate/dialect_postgresql_test.go | 0 .../sql}/sqltemplate/dialect_sqlite.go | 0 .../unified/sql}/sqltemplate/dialect_test.go | 0 .../unified/sql}/sqltemplate/example_test.go | 0 .../unified/sql}/sqltemplate/into.go | 0 .../unified/sql}/sqltemplate/into_test.go | 0 .../sqltemplate/mocks/SQLTemplateIface.go | 2 +- .../sql}/sqltemplate/mocks/WithResults.go | 2 +- .../unified/sql}/sqltemplate/sqltemplate.go | 0 .../sql}/sqltemplate/sqltemplate_test.go | 0 .../testdata/resource_delete_mysql_sqlite.sql | 1 + .../sql/testdata/resource_delete_postgres.sql | 1 + .../resource_history_insert_mysql_sqlite.sql | 3 + .../resource_history_list_mysql_sqlite.sql | 12 + .../resource_history_read_mysql_sqlite.sql | 6 + ...esource_history_update_rv_mysql_sqlite.sql | 3 + .../testdata/resource_insert_mysql_sqlite.sql | 4 + .../testdata/resource_list_mysql_sqlite.sql | 6 + .../testdata/resource_read_mysql_sqlite.sql | 4 + .../testdata/resource_update_mysql_sqlite.sql | 4 + .../resource_update_rv_mysql_sqlite.sql | 4 + .../testdata/resource_version_get_mysql.sql | 4 + .../testdata/resource_version_get_sqlite.sql | 4 + .../resource_version_inc_mysql_sqlite.sql | 4 + .../resource_version_insert_mysql_sqlite.sql | 3 + 71 files changed, 2871 insertions(+), 35 deletions(-) create mode 100644 pkg/storage/unified/sql/backend.go create mode 100644 pkg/storage/unified/sql/backend_test.go create mode 100644 pkg/storage/unified/sql/continue.go create mode 100644 pkg/storage/unified/sql/data/resource_delete.sql create mode 100644 pkg/storage/unified/sql/data/resource_history_insert.sql create mode 100644 pkg/storage/unified/sql/data/resource_history_list.sql create mode 100644 pkg/storage/unified/sql/data/resource_history_poll.sql create mode 100644 pkg/storage/unified/sql/data/resource_history_read.sql create mode 100644 pkg/storage/unified/sql/data/resource_history_update_rv.sql create mode 100644 pkg/storage/unified/sql/data/resource_insert.sql create mode 100644 pkg/storage/unified/sql/data/resource_list.sql create mode 100644 pkg/storage/unified/sql/data/resource_read.sql create mode 100644 pkg/storage/unified/sql/data/resource_update.sql create mode 100644 pkg/storage/unified/sql/data/resource_update_rv.sql create mode 100644 pkg/storage/unified/sql/data/resource_version_get.sql create mode 100644 pkg/storage/unified/sql/data/resource_version_inc.sql create mode 100644 pkg/storage/unified/sql/data/resource_version_insert.sql create mode 100644 pkg/storage/unified/sql/db/dbimpl/db.go create mode 100644 pkg/storage/unified/sql/db/dbimpl/dbEngine.go create mode 100644 pkg/storage/unified/sql/db/dbimpl/dbEngine_test.go create mode 100644 pkg/storage/unified/sql/db/dbimpl/db_test.go create mode 100644 pkg/storage/unified/sql/db/dbimpl/dbimpl.go create mode 100644 pkg/storage/unified/sql/db/dbimpl/util.go create mode 100644 pkg/storage/unified/sql/db/dbimpl/util_test.go create mode 100644 pkg/storage/unified/sql/db/migrations/migrator.go create mode 100644 pkg/storage/unified/sql/db/migrations/resource_mig.go create mode 100755 pkg/storage/unified/sql/db/service.go create mode 100644 pkg/storage/unified/sql/queries.go create mode 100644 pkg/storage/unified/sql/queries_test.go create mode 100644 pkg/storage/unified/sql/server.go rename pkg/{services/store/entity/sqlstash => storage/unified/sql}/sqltemplate/args.go (100%) rename pkg/{services/store/entity/sqlstash => storage/unified/sql}/sqltemplate/args_test.go (100%) rename pkg/{services/store/entity/sqlstash => storage/unified/sql}/sqltemplate/dialect.go (100%) rename pkg/{services/store/entity/sqlstash => storage/unified/sql}/sqltemplate/dialect_mysql.go (100%) rename pkg/{services/store/entity/sqlstash => storage/unified/sql}/sqltemplate/dialect_postgresql.go (100%) rename pkg/{services/store/entity/sqlstash => storage/unified/sql}/sqltemplate/dialect_postgresql_test.go (100%) rename pkg/{services/store/entity/sqlstash => storage/unified/sql}/sqltemplate/dialect_sqlite.go (100%) rename pkg/{services/store/entity/sqlstash => storage/unified/sql}/sqltemplate/dialect_test.go (100%) rename pkg/{services/store/entity/sqlstash => storage/unified/sql}/sqltemplate/example_test.go (100%) rename pkg/{services/store/entity/sqlstash => storage/unified/sql}/sqltemplate/into.go (100%) rename pkg/{services/store/entity/sqlstash => storage/unified/sql}/sqltemplate/into_test.go (100%) rename pkg/{services/store/entity/sqlstash => storage/unified/sql}/sqltemplate/mocks/SQLTemplateIface.go (99%) rename pkg/{services/store/entity/sqlstash => storage/unified/sql}/sqltemplate/mocks/WithResults.go (99%) rename pkg/{services/store/entity/sqlstash => storage/unified/sql}/sqltemplate/sqltemplate.go (100%) rename pkg/{services/store/entity/sqlstash => storage/unified/sql}/sqltemplate/sqltemplate_test.go (100%) create mode 100644 pkg/storage/unified/sql/testdata/resource_delete_mysql_sqlite.sql create mode 100644 pkg/storage/unified/sql/testdata/resource_delete_postgres.sql create mode 100644 pkg/storage/unified/sql/testdata/resource_history_insert_mysql_sqlite.sql create mode 100644 pkg/storage/unified/sql/testdata/resource_history_list_mysql_sqlite.sql create mode 100644 pkg/storage/unified/sql/testdata/resource_history_read_mysql_sqlite.sql create mode 100644 pkg/storage/unified/sql/testdata/resource_history_update_rv_mysql_sqlite.sql create mode 100644 pkg/storage/unified/sql/testdata/resource_insert_mysql_sqlite.sql create mode 100644 pkg/storage/unified/sql/testdata/resource_list_mysql_sqlite.sql create mode 100644 pkg/storage/unified/sql/testdata/resource_read_mysql_sqlite.sql create mode 100644 pkg/storage/unified/sql/testdata/resource_update_mysql_sqlite.sql create mode 100644 pkg/storage/unified/sql/testdata/resource_update_rv_mysql_sqlite.sql create mode 100644 pkg/storage/unified/sql/testdata/resource_version_get_mysql.sql create mode 100644 pkg/storage/unified/sql/testdata/resource_version_get_sqlite.sql create mode 100644 pkg/storage/unified/sql/testdata/resource_version_inc_mysql_sqlite.sql create mode 100644 pkg/storage/unified/sql/testdata/resource_version_insert_mysql_sqlite.sql diff --git a/pkg/services/apiserver/service.go b/pkg/services/apiserver/service.go index 7e456bd0a1d..1eff0cf9e3f 100644 --- a/pkg/services/apiserver/service.go +++ b/pkg/services/apiserver/service.go @@ -4,13 +4,10 @@ import ( "context" "fmt" "net/http" - "os" "path" - "path/filepath" "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" - "gocloud.dev/blob/fileblob" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -51,6 +48,7 @@ import ( "github.com/grafana/grafana/pkg/storage/unified/apistore" "github.com/grafana/grafana/pkg/storage/unified/entitybridge" "github.com/grafana/grafana/pkg/storage/unified/resource" + "github.com/grafana/grafana/pkg/storage/unified/sql" ) var ( @@ -272,27 +270,11 @@ func (s *service) start(ctx context.Context) error { } case grafanaapiserveroptions.StorageTypeUnifiedNext: - // CDK (for now) - dir := filepath.Join(s.cfg.DataPath, "unistore", "resource") - if err := os.MkdirAll(dir, 0o750); err != nil { - return err + if !s.features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorage) { + return fmt.Errorf("unified storage requires the unifiedStorage feature flag") } - bucket, err := fileblob.OpenBucket(dir, &fileblob.Options{ - CreateDir: true, - Metadata: fileblob.MetadataDontWrite, // skip - }) - if err != nil { - return err - } - backend, err := resource.NewCDKBackend(context.Background(), resource.CDKBackendOptions{ - Tracer: s.tracing, - Bucket: bucket, - }) - if err != nil { - return err - } - server, err := resource.NewResourceServer(resource.ResourceServerOptions{Backend: backend}) + server, err := sql.ProvideResourceServer(s.db, s.cfg, s.features, s.tracing) if err != nil { return err } diff --git a/pkg/services/store/entity/sqlstash/create.go b/pkg/services/store/entity/sqlstash/create.go index 85195b263e8..24b69454f8e 100644 --- a/pkg/services/store/entity/sqlstash/create.go +++ b/pkg/services/store/entity/sqlstash/create.go @@ -11,7 +11,7 @@ import ( grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic" "github.com/grafana/grafana/pkg/services/store/entity" "github.com/grafana/grafana/pkg/services/store/entity/db" - "github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate" + "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate" ) func (s *sqlEntityServer) Create(ctx context.Context, r *entity.CreateEntityRequest) (*entity.CreateEntityResponse, error) { diff --git a/pkg/services/store/entity/sqlstash/delete.go b/pkg/services/store/entity/sqlstash/delete.go index ac0b56f0a1f..74cd4b93a81 100644 --- a/pkg/services/store/entity/sqlstash/delete.go +++ b/pkg/services/store/entity/sqlstash/delete.go @@ -10,7 +10,7 @@ import ( grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic" "github.com/grafana/grafana/pkg/services/store/entity" "github.com/grafana/grafana/pkg/services/store/entity/db" - "github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate" + "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate" ) func (s *sqlEntityServer) Delete(ctx context.Context, r *entity.DeleteEntityRequest) (*entity.DeleteEntityResponse, error) { diff --git a/pkg/services/store/entity/sqlstash/folder_support.go b/pkg/services/store/entity/sqlstash/folder_support.go index 2b264804819..e2a3c075c2c 100644 --- a/pkg/services/store/entity/sqlstash/folder_support.go +++ b/pkg/services/store/entity/sqlstash/folder_support.go @@ -8,7 +8,7 @@ import ( folder "github.com/grafana/grafana/pkg/apis/folder/v0alpha1" "github.com/grafana/grafana/pkg/services/store/entity/db" - "github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate" + "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate" ) type folderInfo struct { diff --git a/pkg/services/store/entity/sqlstash/queries.go b/pkg/services/store/entity/sqlstash/queries.go index 5a91f56bc0e..f82e07a1cfb 100644 --- a/pkg/services/store/entity/sqlstash/queries.go +++ b/pkg/services/store/entity/sqlstash/queries.go @@ -16,7 +16,7 @@ import ( grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic" "github.com/grafana/grafana/pkg/services/store/entity" "github.com/grafana/grafana/pkg/services/store/entity/db" - "github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate" + "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate" ) // Templates setup. diff --git a/pkg/services/store/entity/sqlstash/queries_test.go b/pkg/services/store/entity/sqlstash/queries_test.go index ca34342bc4d..61038fde0a9 100644 --- a/pkg/services/store/entity/sqlstash/queries_test.go +++ b/pkg/services/store/entity/sqlstash/queries_test.go @@ -16,7 +16,7 @@ import ( grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic" "github.com/grafana/grafana/pkg/services/store/entity" "github.com/grafana/grafana/pkg/services/store/entity/db" - "github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate" + "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate" "github.com/grafana/grafana/pkg/util/testutil" ) diff --git a/pkg/services/store/entity/sqlstash/sql_storage_server.go b/pkg/services/store/entity/sqlstash/sql_storage_server.go index 723bf34c52d..219c454e9bf 100644 --- a/pkg/services/store/entity/sqlstash/sql_storage_server.go +++ b/pkg/services/store/entity/sqlstash/sql_storage_server.go @@ -25,8 +25,8 @@ import ( "github.com/grafana/grafana/pkg/services/sqlstore/session" "github.com/grafana/grafana/pkg/services/store/entity" "github.com/grafana/grafana/pkg/services/store/entity/db" - "github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate" "github.com/grafana/grafana/pkg/storage/unified/resource" + "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate" ) const entityTable = "entity" diff --git a/pkg/services/store/entity/sqlstash/sql_storage_server_test.go b/pkg/services/store/entity/sqlstash/sql_storage_server_test.go index 938f7f85ed0..c5cb4b33354 100644 --- a/pkg/services/store/entity/sqlstash/sql_storage_server_test.go +++ b/pkg/services/store/entity/sqlstash/sql_storage_server_test.go @@ -10,7 +10,7 @@ import ( "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/services/sqlstore/session" "github.com/grafana/grafana/pkg/services/store/entity" - "github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate" + "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate" "github.com/grafana/grafana/pkg/util/testutil" ) diff --git a/pkg/services/store/entity/sqlstash/update.go b/pkg/services/store/entity/sqlstash/update.go index af059717a2a..0c03e80f9a5 100644 --- a/pkg/services/store/entity/sqlstash/update.go +++ b/pkg/services/store/entity/sqlstash/update.go @@ -11,7 +11,7 @@ import ( grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic" "github.com/grafana/grafana/pkg/services/store/entity" "github.com/grafana/grafana/pkg/services/store/entity/db" - "github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate" + "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate" ) func (s *sqlEntityServer) Update(ctx context.Context, r *entity.UpdateEntityRequest) (*entity.UpdateEntityResponse, error) { diff --git a/pkg/services/store/entity/sqlstash/utils.go b/pkg/services/store/entity/sqlstash/utils.go index 3d4666b9649..3d140ea31c3 100644 --- a/pkg/services/store/entity/sqlstash/utils.go +++ b/pkg/services/store/entity/sqlstash/utils.go @@ -10,7 +10,7 @@ import ( "github.com/grafana/grafana/pkg/apimachinery/identity" "github.com/grafana/grafana/pkg/services/store/entity/db" - "github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate" + "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate" ) func createETag(body []byte, meta []byte, status []byte) string { diff --git a/pkg/services/store/entity/sqlstash/utils_test.go b/pkg/services/store/entity/sqlstash/utils_test.go index 70d02263623..b3d4907c2ec 100644 --- a/pkg/services/store/entity/sqlstash/utils_test.go +++ b/pkg/services/store/entity/sqlstash/utils_test.go @@ -16,8 +16,8 @@ import ( "github.com/grafana/grafana/pkg/services/store/entity/db" "github.com/grafana/grafana/pkg/services/store/entity/db/dbimpl" - "github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate" - sqltemplateMocks "github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate/mocks" + "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate" + sqltemplateMocks "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate/mocks" "github.com/grafana/grafana/pkg/util/testutil" ) diff --git a/pkg/storage/unified/sql/backend.go b/pkg/storage/unified/sql/backend.go new file mode 100644 index 00000000000..99d3c656371 --- /dev/null +++ b/pkg/storage/unified/sql/backend.go @@ -0,0 +1,733 @@ +package sql + +import ( + "context" + "database/sql" + "errors" + "fmt" + "strings" + "text/template" + "time" + + "github.com/google/uuid" + "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/otel/trace/noop" + + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/services/sqlstore/migrator" + "github.com/grafana/grafana/pkg/services/sqlstore/session" + "github.com/grafana/grafana/pkg/storage/unified/resource" + "github.com/grafana/grafana/pkg/storage/unified/sql/db" + "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate" +) + +const trace_prefix = "sql.resource." + +type backendOptions struct { + DB db.ResourceDBInterface + Tracer trace.Tracer +} + +func NewBackendStore(opts backendOptions) (*backend, error) { + ctx, cancel := context.WithCancel(context.Background()) + + if opts.Tracer == nil { + opts.Tracer = noop.NewTracerProvider().Tracer("sql-backend") + } + + return &backend{ + db: opts.DB, + log: log.New("sql-resource-server"), + ctx: ctx, + cancel: cancel, + tracer: opts.Tracer, + }, nil +} + +type backend struct { + log log.Logger + db db.ResourceDBInterface // needed to keep xorm engine in scope + sess *session.SessionDB + dialect migrator.Dialect + ctx context.Context // TODO: remove + cancel context.CancelFunc + tracer trace.Tracer + + //stream chan *resource.WatchEvent + + sqlDB db.DB + sqlDialect sqltemplate.Dialect +} + +func (b *backend) Init() error { + if b.sess != nil { + return nil + } + + if b.db == nil { + return errors.New("missing db") + } + + err := b.db.Init() + if err != nil { + return err + } + + sqlDB, err := b.db.GetDB() + if err != nil { + return err + } + b.sqlDB = sqlDB + + driverName := sqlDB.DriverName() + driverName = strings.TrimSuffix(driverName, "WithHooks") + switch driverName { + case db.DriverMySQL: + b.sqlDialect = sqltemplate.MySQL + case db.DriverPostgres: + b.sqlDialect = sqltemplate.PostgreSQL + case db.DriverSQLite, db.DriverSQLite3: + b.sqlDialect = sqltemplate.SQLite + default: + return fmt.Errorf("no dialect for driver %q", driverName) + } + + sess, err := b.db.GetSession() + if err != nil { + return err + } + + engine, err := b.db.GetEngine() + if err != nil { + return err + } + + b.sess = sess + b.dialect = migrator.NewDialect(engine.DriverName()) + + return nil +} + +func (b *backend) IsHealthy(ctx context.Context, r *resource.HealthCheckRequest) (*resource.HealthCheckResponse, error) { + // ctxLogger := s.log.FromContext(log.WithContextualAttributes(ctx, []any{"method", "isHealthy"})) + + if err := b.sqlDB.PingContext(ctx); err != nil { + return nil, err + } + // TODO: check the status of the watcher implementation as well + return &resource.HealthCheckResponse{Status: resource.HealthCheckResponse_SERVING}, nil +} + +func (b *backend) Stop() { + b.cancel() +} + +func (b *backend) WriteEvent(ctx context.Context, event resource.WriteEvent) (int64, error) { + _, span := b.tracer.Start(ctx, trace_prefix+"WriteEvent") + defer span.End() + // TODO: validate key ? + if err := b.Init(); err != nil { + return 0, err + } + switch event.Type { + case resource.WatchEvent_ADDED: + return b.create(ctx, event) + case resource.WatchEvent_MODIFIED: + return b.update(ctx, event) + case resource.WatchEvent_DELETED: + return b.delete(ctx, event) + default: + return 0, fmt.Errorf("unsupported event type") + } +} + +func (b *backend) create(ctx context.Context, event resource.WriteEvent) (int64, error) { + ctx, span := b.tracer.Start(ctx, trace_prefix+"Create") + defer span.End() + var newVersion int64 + guid := uuid.New().String() + err := b.sqlDB.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error { + // TODO: Set the Labels + + // 1. Insert into resource + if _, err := exec(ctx, tx, sqlResourceInsert, sqlResourceRequest{ + SQLTemplate: sqltemplate.New(b.sqlDialect), + WriteEvent: event, + GUID: guid, + }); err != nil { + return fmt.Errorf("insert into resource: %w", err) + } + + // 2. Insert into resource history + if _, err := exec(ctx, tx, sqlResourceHistoryInsert, sqlResourceRequest{ + SQLTemplate: sqltemplate.New(b.sqlDialect), + WriteEvent: event, + GUID: guid, + }); err != nil { + return fmt.Errorf("insert into resource history: %w", err) + } + + // 3. TODO: Rebuild the whole folder tree structure if we're creating a folder + + // 4. Atomically increpement resource version for this kind + rv, err := resourceVersionAtomicInc(ctx, tx, b.sqlDialect, event.Key) + if err != nil { + return err + } + newVersion = rv + + // 5. Update the RV in both resource and resource_history + if _, err = exec(ctx, tx, sqlResourceHistoryUpdateRV, sqlResourceUpdateRVRequest{ + SQLTemplate: sqltemplate.New(b.sqlDialect), + GUID: guid, + ResourceVersion: newVersion, + }); err != nil { + return fmt.Errorf("update history rv: %w", err) + } + if _, err = exec(ctx, tx, sqlResourceUpdateRV, sqlResourceUpdateRVRequest{ + SQLTemplate: sqltemplate.New(b.sqlDialect), + GUID: guid, + ResourceVersion: newVersion, + }); err != nil { + return fmt.Errorf("update resource rv: %w", err) + } + return nil + }) + + return newVersion, err +} + +func (b *backend) update(ctx context.Context, event resource.WriteEvent) (int64, error) { + ctx, span := b.tracer.Start(ctx, trace_prefix+"Update") + defer span.End() + var newVersion int64 + guid := uuid.New().String() + err := b.sqlDB.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error { + // TODO: Set the Labels + + // 1. Update into resource + res, err := exec(ctx, tx, sqlResourceUpdate, sqlResourceRequest{ + SQLTemplate: sqltemplate.New(b.sqlDialect), + WriteEvent: event, + GUID: guid, + }) + if err != nil { + return fmt.Errorf("update into resource: %w", err) + } + + count, err := res.RowsAffected() + if err != nil { + return fmt.Errorf("update into resource: %w", err) + } + if count == 0 { + return fmt.Errorf("no rows affected") + } + + // 2. Insert into resource history + if _, err := exec(ctx, tx, sqlResourceHistoryInsert, sqlResourceRequest{ + SQLTemplate: sqltemplate.New(b.sqlDialect), + WriteEvent: event, + GUID: guid, + }); err != nil { + return fmt.Errorf("insert into resource history: %w", err) + } + + // 3. TODO: Rebuild the whole folder tree structure if we're creating a folder + + // 4. Atomically increpement resource version for this kind + rv, err := resourceVersionAtomicInc(ctx, tx, b.sqlDialect, event.Key) + if err != nil { + return err + } + newVersion = rv + + // 5. Update the RV in both resource and resource_history + if _, err = exec(ctx, tx, sqlResourceHistoryUpdateRV, sqlResourceUpdateRVRequest{ + SQLTemplate: sqltemplate.New(b.sqlDialect), + GUID: guid, + ResourceVersion: newVersion, + }); err != nil { + return fmt.Errorf("update history rv: %w", err) + } + if _, err = exec(ctx, tx, sqlResourceUpdateRV, sqlResourceUpdateRVRequest{ + SQLTemplate: sqltemplate.New(b.sqlDialect), + GUID: guid, + ResourceVersion: newVersion, + }); err != nil { + return fmt.Errorf("update resource rv: %w", err) + } + + return nil + }) + + return newVersion, err +} + +func (b *backend) delete(ctx context.Context, event resource.WriteEvent) (int64, error) { + ctx, span := b.tracer.Start(ctx, trace_prefix+"Delete") + defer span.End() + var newVersion int64 + guid := uuid.New().String() + + err := b.sqlDB.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error { + // TODO: Set the Labels + + // 1. delete from resource + res, err := exec(ctx, tx, sqlResourceDelete, sqlResourceRequest{ + SQLTemplate: sqltemplate.New(b.sqlDialect), + WriteEvent: event, + GUID: guid, + }) + if err != nil { + return fmt.Errorf("delete resource: %w", err) + } + count, err := res.RowsAffected() + if err != nil { + return fmt.Errorf("delete resource: %w", err) + } + if count == 0 { + return fmt.Errorf("no rows affected") + } + + // 2. Add event to resource history + if _, err := exec(ctx, tx, sqlResourceHistoryInsert, sqlResourceRequest{ + SQLTemplate: sqltemplate.New(b.sqlDialect), + WriteEvent: event, + GUID: guid, + }); err != nil { + return fmt.Errorf("insert into resource history: %w", err) + } + + // 3. TODO: Rebuild the whole folder tree structure if we're creating a folder + + // 4. Atomically increpement resource version for this kind + newVersion, err = resourceVersionAtomicInc(ctx, tx, b.sqlDialect, event.Key) + if err != nil { + return err + } + + // 5. Update the RV in resource_history + if _, err = exec(ctx, tx, sqlResourceHistoryUpdateRV, sqlResourceUpdateRVRequest{ + SQLTemplate: sqltemplate.New(b.sqlDialect), + GUID: guid, + ResourceVersion: newVersion, + }); err != nil { + return fmt.Errorf("update history rv: %w", err) + } + return nil + }) + + return newVersion, err +} + +func (b *backend) Read(ctx context.Context, req *resource.ReadRequest) (*resource.ReadResponse, error) { + _, span := b.tracer.Start(ctx, trace_prefix+".Read") + defer span.End() + + // TODO: validate key ? + if err := b.Init(); err != nil { + return nil, err + } + + readReq := sqlResourceReadRequest{ + SQLTemplate: sqltemplate.New(b.sqlDialect), + Request: req, + readResponse: new(readResponse), + } + + sr := sqlResourceRead + if req.ResourceVersion > 0 { + // read a specific version + sr = sqlResourceHistoryRead + } + + res, err := queryRow(ctx, b.sqlDB, sr, readReq) + if errors.Is(err, sql.ErrNoRows) { + return nil, resource.ErrNotFound + } else if err != nil { + return nil, fmt.Errorf("get resource version: %w", err) + } + + return &res.ReadResponse, nil +} + +func (b *backend) PrepareList(ctx context.Context, req *resource.ListRequest) (*resource.ListResponse, error) { + _, span := b.tracer.Start(ctx, trace_prefix+"List") + defer span.End() + + // TODO: think about how to handler VersionMatch. We should be able to use latest for the first page (only). + + if req.ResourceVersion > 0 || req.NextPageToken != "" { + return b.listAtRevision(ctx, req) + } + return b.listLatest(ctx, req) +} + +// listLatest fetches the resources from the resource table. +func (b *backend) listLatest(ctx context.Context, req *resource.ListRequest) (*resource.ListResponse, error) { + out := &resource.ListResponse{ + Items: []*resource.ResourceWrapper{}, // TODO: we could pre-allocate the capacity if we estimate the number of items + ResourceVersion: 0, + } + + err := b.sqlDB.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error { + var err error + + // TODO: Here the lastest RV might be lower than the actual latest RV + // because delete events are not included in the resource table. + out.ResourceVersion, err = fetchLatestRV(ctx, tx) + if err != nil { + return err + } + + // Fetch one extra row for Limit + lim := req.Limit + if req.Limit > 0 { + req.Limit++ + } + listReq := sqlResourceListRequest{ + SQLTemplate: sqltemplate.New(b.sqlDialect), + Request: req, + Response: new(resource.ResourceWrapper), + } + query, err := sqltemplate.Execute(sqlResourceList, listReq) + if err != nil { + return fmt.Errorf("execute SQL template to list resources: %w", err) + } + + rows, err := tx.QueryContext(ctx, query, listReq.GetArgs()...) + if err != nil { + return fmt.Errorf("list latest resources: %w", err) + } + defer func() { _ = rows.Close() }() + for i := int64(1); rows.Next(); i++ { + if ctx.Err() != nil { + return ctx.Err() + } + if err := rows.Scan(listReq.GetScanDest()...); err != nil { + return fmt.Errorf("scan row #%d: %w", i, err) + } + + if lim > 0 && i > lim { + continueToken := &ContinueToken{ResourceVersion: out.ResourceVersion, StartOffset: lim} + out.NextPageToken = continueToken.String() + break + } + out.Items = append(out.Items, &resource.ResourceWrapper{ + ResourceVersion: listReq.Response.ResourceVersion, + Value: listReq.Response.Value, + }) + } + + return nil + }) + + return out, err +} + +// listAtRevision fetches the resources from the resource_history table at a specific revision. +func (b *backend) listAtRevision(ctx context.Context, req *resource.ListRequest) (*resource.ListResponse, error) { + // Get the RV + rv := req.ResourceVersion + offset := int64(0) + if req.NextPageToken != "" { + continueToken, err := GetContinueToken(req.NextPageToken) + if err != nil { + return nil, fmt.Errorf("get continue token: %w", err) + } + rv = continueToken.ResourceVersion + offset = continueToken.StartOffset + } + + out := &resource.ListResponse{ + Items: []*resource.ResourceWrapper{}, // TODO: we could pre-allocate the capacity if we estimate the number of items + ResourceVersion: rv, + } + + err := b.sqlDB.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error { + var err error + + // Fetch one extra row for Limit + lim := req.Limit + if lim > 0 { + req.Limit++ + } + listReq := sqlResourceHistoryListRequest{ + SQLTemplate: sqltemplate.New(b.sqlDialect), + Request: &historyListRequest{ + ResourceVersion: rv, + Limit: req.Limit, + Offset: offset, + Options: req.Options, + }, + Response: new(resource.ResourceWrapper), + } + query, err := sqltemplate.Execute(sqlResourceHistoryList, listReq) + if err != nil { + return fmt.Errorf("execute SQL template to list resources at revision: %w", err) + } + rows, err := tx.QueryContext(ctx, query, listReq.GetArgs()...) + if err != nil { + return fmt.Errorf("list resources at revision: %w", err) + } + defer func() { _ = rows.Close() }() + for i := int64(1); rows.Next(); i++ { + if ctx.Err() != nil { + return ctx.Err() + } + if err := rows.Scan(listReq.GetScanDest()...); err != nil { + return fmt.Errorf("scan row #%d: %w", i, err) + } + + if lim > 0 && i > lim { + continueToken := &ContinueToken{ResourceVersion: out.ResourceVersion, StartOffset: offset + lim} + out.NextPageToken = continueToken.String() + break + } + out.Items = append(out.Items, &resource.ResourceWrapper{ + ResourceVersion: listReq.Response.ResourceVersion, + Value: listReq.Response.Value, + }) + } + + return nil + }) + + return out, err +} + +func (b *backend) WatchWriteEvents(ctx context.Context) (<-chan *resource.WrittenEvent, error) { + if err := b.Init(); err != nil { + return nil, err + } + // Fetch the lastest RV + since, err := fetchLatestRV(ctx, b.sqlDB) + if err != nil { + return nil, err + } + // Start the poller + stream := make(chan *resource.WrittenEvent) + go b.poller(ctx, since, stream) + return stream, nil +} + +func (b *backend) poller(ctx context.Context, since int64, stream chan<- *resource.WrittenEvent) { + var err error + + interval := 100 * time.Millisecond // TODO make this configurable + t := time.NewTicker(interval) + defer close(stream) + defer t.Stop() + + for { + select { + case <-b.ctx.Done(): + return + case <-t.C: + since, err = b.poll(ctx, since, stream) + if err != nil { + b.log.Error("watch error", "err", err) + } + t.Reset(interval) + } + } +} + +// fetchLatestRV returns the current maxium RV in the resource table +func fetchLatestRV(ctx context.Context, db db.ContextExecer) (int64, error) { + // Fetch the lastest RV + rows, err := db.QueryContext(ctx, `SELECT COALESCE(max("resource_version"), 0) FROM "resource";`) + if err != nil { + return 0, fmt.Errorf("fetch latest rv: %w", err) + } + defer func() { _ = rows.Close() }() + if rows.Next() { + rv := new(int64) + if err := rows.Scan(&rv); err != nil { + return 0, fmt.Errorf("scan since resource version: %w", err) + } + return *rv, nil + } + return 0, fmt.Errorf("no rows") +} + +func (b *backend) poll(ctx context.Context, since int64, stream chan<- *resource.WrittenEvent) (int64, error) { + ctx, span := b.tracer.Start(ctx, trace_prefix+"poll") + defer span.End() + + pollReq := sqlResourceHistoryPollRequest{ + SQLTemplate: sqltemplate.New(b.sqlDialect), + SinceResourceVersion: since, + Response: new(historyPollResponse), + } + query, err := sqltemplate.Execute(sqlResourceHistoryPoll, pollReq) + if err != nil { + return 0, fmt.Errorf("execute SQL template to poll for resource history: %w", err) + } + rows, err := b.sqlDB.QueryContext(ctx, query, pollReq.GetArgs()...) + if err != nil { + return 0, fmt.Errorf("poll for resource history: %w", err) + } + defer func() { _ = rows.Close() }() + next := since + for i := 1; rows.Next(); i++ { + // check if the context is done + if ctx.Err() != nil { + return 0, ctx.Err() + } + if err := rows.Scan(pollReq.GetScanDest()...); err != nil { + return 0, fmt.Errorf("scan row #%d polling for resource history: %w", i, err) + } + resp := pollReq.Response + next = resp.ResourceVersion + + stream <- &resource.WrittenEvent{ + WriteEvent: resource.WriteEvent{ + Value: resp.Value, + Key: &resource.ResourceKey{ + Namespace: resp.Key.Namespace, + Group: resp.Key.Group, + Resource: resp.Key.Resource, + Name: resp.Key.Name, + }, + Type: resource.WatchEvent_Type(resp.Action), + }, + ResourceVersion: resp.ResourceVersion, + // Timestamp: , // TODO: add timestamp + } + } + return next, nil +} + +// resourceVersionAtomicInc atomically increases the version of a kind within a +// transaction. +// TODO: Ideally we should attempt to update the RV in the resource and resource_history tables +// in a single roundtrip. This would reduce the latency of the operation, and also increase the +// throughput of the system. This is a good candidate for a future optimization. +func resourceVersionAtomicInc(ctx context.Context, x db.ContextExecer, d sqltemplate.Dialect, key *resource.ResourceKey) (newVersion int64, err error) { + // TODO: refactor this code to run in a multi-statement transaction in order to minimise the number of roundtrips. + // 1 Lock the row for update + req := sqlResourceVersionRequest{ + SQLTemplate: sqltemplate.New(d), + Group: key.Group, + Resource: key.Resource, + resourceVersion: new(resourceVersion), + } + rv, err := queryRow(ctx, x, sqlResourceVersionGet, req) + + if errors.Is(err, sql.ErrNoRows) { + // if there wasn't a row associated with the given resource, we create one with + // version 1 + if _, err = exec(ctx, x, sqlResourceVersionInsert, sqlResourceVersionRequest{ + SQLTemplate: sqltemplate.New(d), + Group: key.Group, + Resource: key.Resource, + }); err != nil { + return 0, fmt.Errorf("insert into resource_version: %w", err) + } + return 1, nil + } + if err != nil { + return 0, fmt.Errorf("increase resource version: %w", err) + } + nextRV := rv.ResourceVersion + 1 + // 2. Increment the resource version + res, err := exec(ctx, x, sqlResourceVersionInc, sqlResourceVersionRequest{ + SQLTemplate: sqltemplate.New(d), + Group: key.Group, + Resource: key.Resource, + resourceVersion: &resourceVersion{ + ResourceVersion: nextRV, + }, + }) + if err != nil { + return 0, fmt.Errorf("increase resource version: %w", err) + } + + if count, err := res.RowsAffected(); err != nil || count == 0 { + return 0, fmt.Errorf("increase resource version did not affect any rows: %w", err) + } + + // 3. Retun the incremended value + return nextRV, nil +} + +// exec uses `req` as input for a non-data returning query generated with +// `tmpl`, and executed in `x`. +func exec(ctx context.Context, x db.ContextExecer, tmpl *template.Template, req sqltemplate.SQLTemplateIface) (sql.Result, error) { + if err := req.Validate(); err != nil { + return nil, fmt.Errorf("exec: invalid request for template %q: %w", + tmpl.Name(), err) + } + + rawQuery, err := sqltemplate.Execute(tmpl, req) + if err != nil { + return nil, fmt.Errorf("execute template: %w", err) + } + query := sqltemplate.FormatSQL(rawQuery) + + res, err := x.ExecContext(ctx, query, req.GetArgs()...) + if err != nil { + return nil, SQLError{ + Err: err, + CallType: "Exec", + TemplateName: tmpl.Name(), + arguments: req.GetArgs(), + Query: query, + RawQuery: rawQuery, + } + } + + return res, nil +} + +// queryRow uses `req` as input and output for a single-row returning query +// generated with `tmpl`, and executed in `x`. +func queryRow[T any](ctx context.Context, x db.ContextExecer, tmpl *template.Template, req sqltemplate.WithResults[T]) (T, error) { + var zero T + + if err := req.Validate(); err != nil { + return zero, fmt.Errorf("query: invalid request for template %q: %w", + tmpl.Name(), err) + } + + rawQuery, err := sqltemplate.Execute(tmpl, req) + if err != nil { + return zero, fmt.Errorf("execute template: %w", err) + } + query := sqltemplate.FormatSQL(rawQuery) + + row := x.QueryRowContext(ctx, query, req.GetArgs()...) + if err := row.Err(); err != nil { + return zero, SQLError{ + Err: err, + CallType: "QueryRow", + TemplateName: tmpl.Name(), + arguments: req.GetArgs(), + ScanDest: req.GetScanDest(), + Query: query, + RawQuery: rawQuery, + } + } + + return scanRow(row, req) +} + +type scanner interface { + Scan(dest ...any) error +} + +// scanRow is used on *sql.Row and *sql.Rows, and is factored out here not to +// improving code reuse, but rather for ease of testing. +func scanRow[T any](sc scanner, req sqltemplate.WithResults[T]) (zero T, err error) { + if err = sc.Scan(req.GetScanDest()...); err != nil { + return zero, fmt.Errorf("row scan: %w", err) + } + + res, err := req.Results() + if err != nil { + return zero, fmt.Errorf("row results: %w", err) + } + + return res, nil +} diff --git a/pkg/storage/unified/sql/backend_test.go b/pkg/storage/unified/sql/backend_test.go new file mode 100644 index 00000000000..42aa24e8034 --- /dev/null +++ b/pkg/storage/unified/sql/backend_test.go @@ -0,0 +1,254 @@ +package sql + +import ( + "context" + "testing" + + "github.com/grafana/grafana/pkg/infra/db" + "github.com/grafana/grafana/pkg/services/featuremgmt" + "github.com/grafana/grafana/pkg/setting" + "github.com/grafana/grafana/pkg/storage/unified/resource" + "github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl" + "github.com/grafana/grafana/pkg/tests/testsuite" + "github.com/stretchr/testify/assert" +) + +func TestMain(m *testing.M) { + testsuite.Run(m) +} + +func TestBackendHappyPath(t *testing.T) { + ctx := context.Background() + dbstore := db.InitTestDB(t) + + rdb, err := dbimpl.ProvideResourceDB(dbstore, setting.NewCfg(), featuremgmt.WithFeatures(featuremgmt.FlagUnifiedStorage), nil) + assert.NoError(t, err) + store, err := NewBackendStore(backendOptions{ + DB: rdb, + }) + + assert.NoError(t, err) + assert.NotNil(t, store) + + stream, err := store.WatchWriteEvents(ctx) + assert.NoError(t, err) + + t.Run("Add 3 resources", func(t *testing.T) { + rv, err := writeEvent(ctx, store, "item1", resource.WatchEvent_ADDED) + assert.NoError(t, err) + assert.Equal(t, int64(1), rv) + + rv, err = writeEvent(ctx, store, "item2", resource.WatchEvent_ADDED) + assert.NoError(t, err) + assert.Equal(t, int64(2), rv) + + rv, err = writeEvent(ctx, store, "item3", resource.WatchEvent_ADDED) + assert.NoError(t, err) + assert.Equal(t, int64(3), rv) + }) + + t.Run("Update item2", func(t *testing.T) { + rv, err := writeEvent(ctx, store, "item2", resource.WatchEvent_MODIFIED) + assert.NoError(t, err) + assert.Equal(t, int64(4), rv) + }) + + t.Run("Delete item1", func(t *testing.T) { + rv, err := writeEvent(ctx, store, "item1", resource.WatchEvent_DELETED) + assert.NoError(t, err) + assert.Equal(t, int64(5), rv) + }) + + t.Run("Read latest item 2", func(t *testing.T) { + resp, err := store.Read(ctx, &resource.ReadRequest{Key: resourceKey("item2")}) + assert.NoError(t, err) + assert.Equal(t, int64(4), resp.ResourceVersion) + assert.Equal(t, "item2 MODIFIED", string(resp.Value)) + }) + + t.Run("Read early verion of item2", func(t *testing.T) { + resp, err := store.Read(ctx, &resource.ReadRequest{ + Key: resourceKey("item2"), + ResourceVersion: 3, // item2 was created at rv=2 and updated at rv=4 + }) + assert.NoError(t, err) + assert.Equal(t, int64(2), resp.ResourceVersion) + assert.Equal(t, "item2 ADDED", string(resp.Value)) + }) + + t.Run("PrepareList latest", func(t *testing.T) { + resp, err := store.PrepareList(ctx, &resource.ListRequest{}) + assert.NoError(t, err) + assert.Len(t, resp.Items, 2) + assert.Equal(t, "item2 MODIFIED", string(resp.Items[0].Value)) + assert.Equal(t, "item3 ADDED", string(resp.Items[1].Value)) + assert.Equal(t, int64(4), resp.ResourceVersion) + }) + + t.Run("Watch events", func(t *testing.T) { + event := <-stream + assert.Equal(t, "item1", event.Key.Name) + assert.Equal(t, int64(1), event.ResourceVersion) + assert.Equal(t, resource.WatchEvent_ADDED, event.Type) + event = <-stream + assert.Equal(t, "item2", event.Key.Name) + assert.Equal(t, int64(2), event.ResourceVersion) + assert.Equal(t, resource.WatchEvent_ADDED, event.Type) + + event = <-stream + assert.Equal(t, "item3", event.Key.Name) + assert.Equal(t, int64(3), event.ResourceVersion) + assert.Equal(t, resource.WatchEvent_ADDED, event.Type) + + event = <-stream + assert.Equal(t, "item2", event.Key.Name) + assert.Equal(t, int64(4), event.ResourceVersion) + assert.Equal(t, resource.WatchEvent_MODIFIED, event.Type) + + event = <-stream + assert.Equal(t, "item1", event.Key.Name) + assert.Equal(t, int64(5), event.ResourceVersion) + assert.Equal(t, resource.WatchEvent_DELETED, event.Type) + }) +} + +func TestBackendWatchWriteEventsFromLastest(t *testing.T) { + ctx := context.Background() + dbstore := db.InitTestDB(t) + + rdb, err := dbimpl.ProvideResourceDB(dbstore, setting.NewCfg(), featuremgmt.WithFeatures(featuremgmt.FlagUnifiedStorage), nil) + assert.NoError(t, err) + store, err := NewBackendStore(backendOptions{ + DB: rdb, + }) + + assert.NoError(t, err) + assert.NotNil(t, store) + + // Create a few resources before initing the watch + _, err = writeEvent(ctx, store, "item1", resource.WatchEvent_ADDED) + assert.NoError(t, err) + + // Start the watch + stream, err := store.WatchWriteEvents(ctx) + assert.NoError(t, err) + + // Create one more event + _, err = writeEvent(ctx, store, "item2", resource.WatchEvent_ADDED) + assert.NoError(t, err) + assert.Equal(t, "item2", (<-stream).Key.Name) +} + +func TestBackendPrepareList(t *testing.T) { + ctx := context.Background() + dbstore := db.InitTestDB(t) + + rdb, err := dbimpl.ProvideResourceDB(dbstore, setting.NewCfg(), featuremgmt.WithFeatures(featuremgmt.FlagUnifiedStorage), nil) + assert.NoError(t, err) + store, err := NewBackendStore(backendOptions{ + DB: rdb, + }) + + assert.NoError(t, err) + assert.NotNil(t, store) + + // Create a few resources before initing the watch + _, _ = writeEvent(ctx, store, "item1", resource.WatchEvent_ADDED) // rv=1 + _, _ = writeEvent(ctx, store, "item2", resource.WatchEvent_ADDED) // rv=2 - will be modified at rv=6 + _, _ = writeEvent(ctx, store, "item3", resource.WatchEvent_ADDED) // rv=3 - will be deleted at rv=7 + _, _ = writeEvent(ctx, store, "item4", resource.WatchEvent_ADDED) // rv=4 + _, _ = writeEvent(ctx, store, "item5", resource.WatchEvent_ADDED) // rv=5 + _, _ = writeEvent(ctx, store, "item2", resource.WatchEvent_MODIFIED) // rv=6 + _, _ = writeEvent(ctx, store, "item3", resource.WatchEvent_DELETED) // rv=7 + _, _ = writeEvent(ctx, store, "item6", resource.WatchEvent_ADDED) // rv=8 + t.Run("fetch all latest", func(t *testing.T) { + res, err := store.PrepareList(ctx, &resource.ListRequest{}) + assert.NoError(t, err) + assert.Len(t, res.Items, 5) + assert.Empty(t, res.NextPageToken) + }) + + t.Run("list latest first page ", func(t *testing.T) { + res, err := store.PrepareList(ctx, &resource.ListRequest{ + Limit: 3, + }) + assert.NoError(t, err) + assert.Len(t, res.Items, 3) + continueToken, err := GetContinueToken(res.NextPageToken) + assert.NoError(t, err) + assert.Equal(t, int64(8), continueToken.ResourceVersion) + assert.Equal(t, int64(3), continueToken.StartOffset) + }) + + t.Run("list at revision", func(t *testing.T) { + res, err := store.PrepareList(ctx, &resource.ListRequest{ + ResourceVersion: 4, + }) + assert.NoError(t, err) + assert.Len(t, res.Items, 4) + assert.Equal(t, "item1 ADDED", string(res.Items[0].Value)) + assert.Equal(t, "item2 ADDED", string(res.Items[1].Value)) + assert.Equal(t, "item3 ADDED", string(res.Items[2].Value)) + assert.Equal(t, "item4 ADDED", string(res.Items[3].Value)) + assert.Empty(t, res.NextPageToken) + }) + + t.Run("fetch first page at revision with limit", func(t *testing.T) { + res, err := store.PrepareList(ctx, &resource.ListRequest{ + Limit: 3, + ResourceVersion: 7, + }) + assert.NoError(t, err) + assert.Len(t, res.Items, 3) + assert.Equal(t, "item1 ADDED", string(res.Items[0].Value)) + assert.Equal(t, "item4 ADDED", string(res.Items[1].Value)) + assert.Equal(t, "item5 ADDED", string(res.Items[2].Value)) + + continueToken, err := GetContinueToken(res.NextPageToken) + assert.NoError(t, err) + assert.Equal(t, int64(7), continueToken.ResourceVersion) + assert.Equal(t, int64(3), continueToken.StartOffset) + }) + + t.Run("fetch second page at revision", func(t *testing.T) { + continueToken := &ContinueToken{ + ResourceVersion: 8, + StartOffset: 2, + } + res, err := store.PrepareList(ctx, &resource.ListRequest{ + NextPageToken: continueToken.String(), + Limit: 2, + }) + assert.NoError(t, err) + assert.Len(t, res.Items, 2) + assert.Equal(t, "item5 ADDED", string(res.Items[0].Value)) + assert.Equal(t, "item2 MODIFIED", string(res.Items[1].Value)) + + continueToken, err = GetContinueToken(res.NextPageToken) + assert.NoError(t, err) + assert.Equal(t, int64(8), continueToken.ResourceVersion) + assert.Equal(t, int64(4), continueToken.StartOffset) + }) +} + +func writeEvent(ctx context.Context, store *backend, name string, action resource.WatchEvent_Type) (int64, error) { + return store.WriteEvent(ctx, resource.WriteEvent{ + Type: action, + Value: []byte(name + " " + resource.WatchEvent_Type_name[int32(action)]), + Key: &resource.ResourceKey{ + Namespace: "namespace", + Group: "group", + Resource: "resource", + Name: name, + }, + }) +} + +func resourceKey(name string) *resource.ResourceKey { + return &resource.ResourceKey{ + Namespace: "namespace", + Group: "group", + Resource: "resource", + Name: name, + } +} diff --git a/pkg/storage/unified/sql/continue.go b/pkg/storage/unified/sql/continue.go new file mode 100644 index 00000000000..9e92e8eedd0 --- /dev/null +++ b/pkg/storage/unified/sql/continue.go @@ -0,0 +1,32 @@ +package sql + +import ( + "encoding/base64" + "encoding/json" + "fmt" +) + +type ContinueToken struct { + StartOffset int64 `json:"o"` + ResourceVersion int64 `json:"v"` +} + +func (c *ContinueToken) String() string { + b, _ := json.Marshal(c) + return base64.StdEncoding.EncodeToString(b) +} + +func GetContinueToken(token string) (*ContinueToken, error) { + continueVal, err := base64.StdEncoding.DecodeString(token) + if err != nil { + return nil, fmt.Errorf("error decoding continue token") + } + + t := &ContinueToken{} + err = json.Unmarshal(continueVal, t) + if err != nil { + return nil, err + } + + return t, nil +} diff --git a/pkg/storage/unified/sql/data/resource_delete.sql b/pkg/storage/unified/sql/data/resource_delete.sql new file mode 100644 index 00000000000..d28ae687f87 --- /dev/null +++ b/pkg/storage/unified/sql/data/resource_delete.sql @@ -0,0 +1,7 @@ +DELETE FROM {{ .Ident "resource" }} + WHERE 1 = 1 + AND {{ .Ident "namespace" }} = {{ .Arg .WriteEvent.Key.Namespace }} + AND {{ .Ident "group" }} = {{ .Arg .WriteEvent.Key.Group }} + AND {{ .Ident "resource" }} = {{ .Arg .WriteEvent.Key.Resource }} + AND {{ .Ident "name" }} = {{ .Arg .WriteEvent.Key.Name }} +; diff --git a/pkg/storage/unified/sql/data/resource_history_insert.sql b/pkg/storage/unified/sql/data/resource_history_insert.sql new file mode 100644 index 00000000000..018b65739d8 --- /dev/null +++ b/pkg/storage/unified/sql/data/resource_history_insert.sql @@ -0,0 +1,23 @@ +INSERT INTO {{ .Ident "resource_history" }} + ( + {{ .Ident "guid" }}, + {{ .Ident "group" }}, + {{ .Ident "resource" }}, + {{ .Ident "namespace" }}, + {{ .Ident "name" }}, + + {{ .Ident "value" }}, + {{ .Ident "action" }} + ) + + VALUES ( + {{ .Arg .GUID }}, + {{ .Arg .WriteEvent.Key.Group }}, + {{ .Arg .WriteEvent.Key.Resource }}, + {{ .Arg .WriteEvent.Key.Namespace }}, + {{ .Arg .WriteEvent.Key.Name }}, + + {{ .Arg .WriteEvent.Value }}, + {{ .Arg .WriteEvent.Type }} + ) +; diff --git a/pkg/storage/unified/sql/data/resource_history_list.sql b/pkg/storage/unified/sql/data/resource_history_list.sql new file mode 100644 index 00000000000..9974863a5bb --- /dev/null +++ b/pkg/storage/unified/sql/data/resource_history_list.sql @@ -0,0 +1,32 @@ +SELECT + kv.{{ .Ident "resource_version" | .Into .Response.ResourceVersion }}, + {{ .Ident "value" | .Into .Response.Value }} + FROM {{ .Ident "resource_history" }} as kv + JOIN ( + SELECT {{ .Ident "guid" }}, max({{ .Ident "resource_version" }}) AS {{ .Ident "resource_version" }} + FROM {{ .Ident "resource_history" }} AS mkv + WHERE 1 = 1 + AND {{ .Ident "resource_version" }} <= {{ .Arg .Request.ResourceVersion }} + {{ if and .Request.Options .Request.Options.Key }} + {{ if .Request.Options.Key.Namespace }} + AND {{ .Ident "namespace" }} = {{ .Arg .Request.Options.Key.Namespace }} + {{ end }} + {{ if .Request.Options.Key.Group }} + AND {{ .Ident "group" }} = {{ .Arg .Request.Options.Key.Group }} + {{ end }} + {{ if .Request.Options.Key.Resource }} + AND {{ .Ident "resource" }} = {{ .Arg .Request.Options.Key.Resource }} + {{ end }} + {{ if .Request.Options.Key.Name }} + AND {{ .Ident "name" }} = {{ .Arg .Request.Options.Key.Name }} + {{ end }} + {{ end }} + GROUP BY mkv.{{ .Ident "namespace" }}, mkv.{{ .Ident "group" }}, mkv.{{ .Ident "resource" }}, mkv.{{ .Ident "name" }} + ) AS maxkv + ON maxkv.{{ .Ident "guid" }} = kv.{{ .Ident "guid" }} + WHERE kv.{{ .Ident "action" }} != 3 + ORDER BY kv.{{ .Ident "resource_version" }} ASC + {{ if (gt .Request.Limit 0) }} + LIMIT {{ .Arg .Request.Offset }}, {{ .Arg .Request.Limit }} + {{ end }} +; diff --git a/pkg/storage/unified/sql/data/resource_history_poll.sql b/pkg/storage/unified/sql/data/resource_history_poll.sql new file mode 100644 index 00000000000..d2b6dca8e2b --- /dev/null +++ b/pkg/storage/unified/sql/data/resource_history_poll.sql @@ -0,0 +1,12 @@ +SELECT + {{ .Ident "resource_version" | .Into .Response.ResourceVersion }}, + {{ .Ident "namespace" | .Into .Response.Key.Namespace }}, + {{ .Ident "group" | .Into .Response.Key.Group }}, + {{ .Ident "resource" | .Into .Response.Key.Resource }}, + {{ .Ident "name" | .Into .Response.Key.Name }}, + {{ .Ident "value" | .Into .Response.Value }}, + {{ .Ident "action" | .Into .Response.Action }} + + FROM {{ .Ident "resource_history" }} + WHERE {{ .Ident "resource_version" }} > {{ .Arg .SinceResourceVersion }} +; diff --git a/pkg/storage/unified/sql/data/resource_history_read.sql b/pkg/storage/unified/sql/data/resource_history_read.sql new file mode 100644 index 00000000000..e7e4e0561a5 --- /dev/null +++ b/pkg/storage/unified/sql/data/resource_history_read.sql @@ -0,0 +1,17 @@ +SELECT + {{ .Ident "resource_version" | .Into .ResourceVersion }}, + {{ .Ident "value" | .Into .Value }} + + FROM {{ .Ident "resource_history" }} + + WHERE 1 = 1 + AND {{ .Ident "namespace" }} = {{ .Arg .Request.Key.Namespace }} + AND {{ .Ident "group" }} = {{ .Arg .Request.Key.Group }} + AND {{ .Ident "resource" }} = {{ .Arg .Request.Key.Resource }} + AND {{ .Ident "name" }} = {{ .Arg .Request.Key.Name }} + {{ if gt .Request.ResourceVersion 0 }} + AND {{ .Ident "resource_version" }} <= {{ .Arg .Request.ResourceVersion }} + {{ end }} + ORDER BY {{ .Ident "resource_version" }} DESC + LIMIT 1 +; diff --git a/pkg/storage/unified/sql/data/resource_history_update_rv.sql b/pkg/storage/unified/sql/data/resource_history_update_rv.sql new file mode 100644 index 00000000000..c37ed2fbe8f --- /dev/null +++ b/pkg/storage/unified/sql/data/resource_history_update_rv.sql @@ -0,0 +1,4 @@ +UPDATE {{ .Ident "resource_history" }} + SET {{ .Ident "resource_version" }} = {{ .Arg .ResourceVersion }} + WHERE {{ .Ident "guid" }} = {{ .Arg .GUID }} +; diff --git a/pkg/storage/unified/sql/data/resource_insert.sql b/pkg/storage/unified/sql/data/resource_insert.sql new file mode 100644 index 00000000000..e127901ae50 --- /dev/null +++ b/pkg/storage/unified/sql/data/resource_insert.sql @@ -0,0 +1,23 @@ +INSERT INTO {{ .Ident "resource" }} + + ( + {{ .Ident "guid" }}, + {{ .Ident "group" }}, + {{ .Ident "resource" }}, + {{ .Ident "namespace" }}, + {{ .Ident "name" }}, + + {{ .Ident "value" }}, + {{ .Ident "action" }} + ) + VALUES ( + {{ .Arg .GUID }}, + {{ .Arg .WriteEvent.Key.Group }}, + {{ .Arg .WriteEvent.Key.Resource }}, + {{ .Arg .WriteEvent.Key.Namespace }}, + {{ .Arg .WriteEvent.Key.Name }}, + + {{ .Arg .WriteEvent.Value }}, + {{ .Arg .WriteEvent.Type }} + ) +; diff --git a/pkg/storage/unified/sql/data/resource_list.sql b/pkg/storage/unified/sql/data/resource_list.sql new file mode 100644 index 00000000000..7b24980e6a9 --- /dev/null +++ b/pkg/storage/unified/sql/data/resource_list.sql @@ -0,0 +1,24 @@ +SELECT + {{ .Ident "resource_version" | .Into .Response.ResourceVersion }}, + {{ .Ident "value" | .Into .Response.Value }} + FROM {{ .Ident "resource" }} + WHERE 1 = 1 + {{ if and .Request.Options .Request.Options.Key }} + {{ if .Request.Options.Key.Namespace }} + AND {{ .Ident "namespace" }} = {{ .Arg .Request.Options.Key.Namespace }} + {{ end }} + {{ if .Request.Options.Key.Group }} + AND {{ .Ident "group" }} = {{ .Arg .Request.Options.Key.Group }} + {{ end }} + {{ if .Request.Options.Key.Resource }} + AND {{ .Ident "resource" }} = {{ .Arg .Request.Options.Key.Resource }} + {{ end }} + {{ if .Request.Options.Key.Name }} + AND {{ .Ident "name" }} = {{ .Arg .Request.Options.Key.Name }} + {{ end }} + {{ end }} + ORDER BY {{ .Ident "resource_version" }} DESC + {{ if (gt .Request.Limit 0) }} + LIMIT {{ .Arg .Request.Limit }} + {{ end }} +; diff --git a/pkg/storage/unified/sql/data/resource_read.sql b/pkg/storage/unified/sql/data/resource_read.sql new file mode 100644 index 00000000000..410bf5f1e74 --- /dev/null +++ b/pkg/storage/unified/sql/data/resource_read.sql @@ -0,0 +1,10 @@ +SELECT + {{ .Ident "resource_version" | .Into .ResourceVersion }}, + {{ .Ident "value" | .Into .Value }} + FROM {{ .Ident "resource" }} + WHERE 1 = 1 + AND {{ .Ident "namespace" }} = {{ .Arg .Request.Key.Namespace }} + AND {{ .Ident "group" }} = {{ .Arg .Request.Key.Group }} + AND {{ .Ident "resource" }} = {{ .Arg .Request.Key.Resource }} + AND {{ .Ident "name" }} = {{ .Arg .Request.Key.Name }} +; diff --git a/pkg/storage/unified/sql/data/resource_update.sql b/pkg/storage/unified/sql/data/resource_update.sql new file mode 100644 index 00000000000..47ab9fabd5e --- /dev/null +++ b/pkg/storage/unified/sql/data/resource_update.sql @@ -0,0 +1,11 @@ +UPDATE {{ .Ident "resource" }} + SET + {{ .Ident "guid" }} = {{ .Arg .GUID }}, + {{ .Ident "value" }} = {{ .Arg .WriteEvent.Value }}, + {{ .Ident "action" }} = {{ .Arg .WriteEvent.Type }} + WHERE 1 = 1 + AND {{ .Ident "group" }} = {{ .Arg .WriteEvent.Key.Group }} + AND {{ .Ident "resource" }} = {{ .Arg .WriteEvent.Key.Resource }} + AND {{ .Ident "namespace" }} = {{ .Arg .WriteEvent.Key.Namespace }} + AND {{ .Ident "name" }} = {{ .Arg .WriteEvent.Key.Name }} +; diff --git a/pkg/storage/unified/sql/data/resource_update_rv.sql b/pkg/storage/unified/sql/data/resource_update_rv.sql new file mode 100644 index 00000000000..33c97a9e17a --- /dev/null +++ b/pkg/storage/unified/sql/data/resource_update_rv.sql @@ -0,0 +1,4 @@ +UPDATE {{ .Ident "resource" }} + SET {{ .Ident "resource_version" }} = {{ .Arg .ResourceVersion }} + WHERE {{ .Ident "guid" }} = {{ .Arg .GUID }} +; diff --git a/pkg/storage/unified/sql/data/resource_version_get.sql b/pkg/storage/unified/sql/data/resource_version_get.sql new file mode 100644 index 00000000000..95d10d915a0 --- /dev/null +++ b/pkg/storage/unified/sql/data/resource_version_get.sql @@ -0,0 +1,8 @@ +SELECT + {{ .Ident "resource_version" | .Into .ResourceVersion }} + FROM {{ .Ident "resource_version" }} + WHERE 1 = 1 + AND {{ .Ident "group" }} = {{ .Arg .Group }} + AND {{ .Ident "resource" }} = {{ .Arg .Resource }} + {{ .SelectFor "UPDATE" }} +; diff --git a/pkg/storage/unified/sql/data/resource_version_inc.sql b/pkg/storage/unified/sql/data/resource_version_inc.sql new file mode 100644 index 00000000000..e7bf52fd1eb --- /dev/null +++ b/pkg/storage/unified/sql/data/resource_version_inc.sql @@ -0,0 +1,7 @@ +UPDATE {{ .Ident "resource_version" }} +SET + {{ .Ident "resource_version" }} = {{ .Arg .ResourceVersion}} +WHERE 1 = 1 + AND {{ .Ident "group" }} = {{ .Arg .Group }} + AND {{ .Ident "resource" }} = {{ .Arg .Resource }} +; diff --git a/pkg/storage/unified/sql/data/resource_version_insert.sql b/pkg/storage/unified/sql/data/resource_version_insert.sql new file mode 100644 index 00000000000..6c2342905da --- /dev/null +++ b/pkg/storage/unified/sql/data/resource_version_insert.sql @@ -0,0 +1,13 @@ +INSERT INTO {{ .Ident "resource_version" }} + ( + {{ .Ident "group" }}, + {{ .Ident "resource" }}, + {{ .Ident "resource_version" }} + ) + + VALUES ( + {{ .Arg .Group }}, + {{ .Arg .Resource }}, + 1 + ) +; diff --git a/pkg/storage/unified/sql/db/dbimpl/db.go b/pkg/storage/unified/sql/db/dbimpl/db.go new file mode 100644 index 00000000000..625ddd9bcec --- /dev/null +++ b/pkg/storage/unified/sql/db/dbimpl/db.go @@ -0,0 +1,59 @@ +package dbimpl + +import ( + "context" + "database/sql" + "fmt" + + resourcedb "github.com/grafana/grafana/pkg/storage/unified/sql/db" +) + +func NewDB(d *sql.DB, driverName string) resourcedb.DB { + return sqldb{ + DB: d, + driverName: driverName, + } +} + +type sqldb struct { + *sql.DB + driverName string +} + +func (d sqldb) DriverName() string { + return d.driverName +} + +func (d sqldb) BeginTx(ctx context.Context, opts *sql.TxOptions) (resourcedb.Tx, error) { + t, err := d.DB.BeginTx(ctx, opts) + if err != nil { + return nil, err + } + return tx{ + Tx: t, + }, nil +} + +func (d sqldb) WithTx(ctx context.Context, opts *sql.TxOptions, f resourcedb.TxFunc) error { + t, err := d.BeginTx(ctx, opts) + if err != nil { + return fmt.Errorf("begin tx: %w", err) + } + + if err := f(ctx, t); err != nil { + if rollbackErr := t.Rollback(); rollbackErr != nil { + return fmt.Errorf("tx err: %w; rollback err: %w", err, rollbackErr) + } + return fmt.Errorf("tx err: %w", err) + } + + if err = t.Commit(); err != nil { + return fmt.Errorf("commit err: %w", err) + } + + return nil +} + +type tx struct { + *sql.Tx +} diff --git a/pkg/storage/unified/sql/db/dbimpl/dbEngine.go b/pkg/storage/unified/sql/db/dbimpl/dbEngine.go new file mode 100644 index 00000000000..5b1fb09462d --- /dev/null +++ b/pkg/storage/unified/sql/db/dbimpl/dbEngine.go @@ -0,0 +1,105 @@ +package dbimpl + +import ( + "cmp" + "fmt" + "strings" + "time" + + "github.com/go-sql-driver/mysql" + "xorm.io/xorm" + + "github.com/grafana/grafana/pkg/infra/tracing" + "github.com/grafana/grafana/pkg/services/store/entity/db" +) + +func getEngineMySQL(getter *sectionGetter, _ tracing.Tracer) (*xorm.Engine, error) { + config := mysql.NewConfig() + config.User = getter.String("db_user") + config.Passwd = getter.String("db_pass") + config.Net = "tcp" + config.Addr = getter.String("db_host") + config.DBName = getter.String("db_name") + config.Params = map[string]string{ + // See: https://dev.mysql.com/doc/refman/en/sql-mode.html + "@@SESSION.sql_mode": "ANSI", + } + config.Collation = "utf8mb4_unicode_ci" + config.Loc = time.UTC + config.AllowNativePasswords = true + config.ClientFoundRows = true + + // TODO: do we want to support these? + // config.ServerPubKey = getter.String("db_server_pub_key") + // config.TLSConfig = getter.String("db_tls_config_name") + + if err := getter.Err(); err != nil { + return nil, fmt.Errorf("config error: %w", err) + } + + if strings.HasPrefix(config.Addr, "/") { + config.Net = "unix" + } + + // FIXME: get rid of xorm + engine, err := xorm.NewEngine(db.DriverMySQL, config.FormatDSN()) + if err != nil { + return nil, fmt.Errorf("open database: %w", err) + } + + engine.SetMaxOpenConns(0) + engine.SetMaxIdleConns(2) + engine.SetConnMaxLifetime(4 * time.Hour) + + return engine, nil +} + +func getEnginePostgres(getter *sectionGetter, _ tracing.Tracer) (*xorm.Engine, error) { + dsnKV := map[string]string{ + "user": getter.String("db_user"), + "password": getter.String("db_pass"), + "dbname": getter.String("db_name"), + "sslmode": cmp.Or(getter.String("db_sslmode"), "disable"), + } + + // TODO: probably interesting: + // "passfile", "statement_timeout", "lock_timeout", "connect_timeout" + + // TODO: for CockroachDB, we probably need to use the following: + // dsnKV["options"] = "-c enable_experimental_alter_column_type_general=true" + // Or otherwise specify it as: + // dsnKV["enable_experimental_alter_column_type_general"] = "true" + + // TODO: do we want to support these options in the DSN as well? + // "sslkey", "sslcert", "sslrootcert", "sslpassword", "sslsni", "krbspn", + // "krbsrvname", "target_session_attrs", "service", "servicefile" + + // More on Postgres connection string parameters: + // https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING + + hostport := getter.String("db_host") + + if err := getter.Err(); err != nil { + return nil, fmt.Errorf("config error: %w", err) + } + + host, port, err := splitHostPortDefault(hostport, "127.0.0.1", "5432") + if err != nil { + return nil, fmt.Errorf("invalid db_host: %w", err) + } + dsnKV["host"] = host + dsnKV["port"] = port + + dsn, err := MakeDSN(dsnKV) + if err != nil { + return nil, fmt.Errorf("error building DSN: %w", err) + } + + // FIXME: get rid of xorm + engine, err := xorm.NewEngine(db.DriverPostgres, dsn) + if err != nil { + return nil, fmt.Errorf("open database: %w", err) + } + + return engine, nil +} diff --git a/pkg/storage/unified/sql/db/dbimpl/dbEngine_test.go b/pkg/storage/unified/sql/db/dbimpl/dbEngine_test.go new file mode 100644 index 00000000000..8c3a77412c0 --- /dev/null +++ b/pkg/storage/unified/sql/db/dbimpl/dbEngine_test.go @@ -0,0 +1,92 @@ +package dbimpl + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestGetEngineMySQLFromConfig(t *testing.T) { + t.Parallel() + + t.Run("happy path", func(t *testing.T) { + t.Parallel() + + getter := newTestSectionGetter(map[string]string{ + "db_type": "mysql", + "db_host": "/var/run/mysql.socket", + "db_name": "grafana", + "db_user": "user", + "db_password": "password", + }) + engine, err := getEngineMySQL(getter, nil) + assert.NotNil(t, engine) + assert.NoError(t, err) + }) + + t.Run("invalid string", func(t *testing.T) { + t.Parallel() + + getter := newTestSectionGetter(map[string]string{ + "db_type": "mysql", + "db_host": "/var/run/mysql.socket", + "db_name": string(invalidUTF8ByteSequence), + "db_user": "user", + "db_password": "password", + }) + engine, err := getEngineMySQL(getter, nil) + assert.Nil(t, engine) + assert.Error(t, err) + assert.ErrorIs(t, err, ErrInvalidUTF8Sequence) + }) +} + +func TestGetEnginePostgresFromConfig(t *testing.T) { + t.Parallel() + + t.Run("happy path", func(t *testing.T) { + t.Parallel() + getter := newTestSectionGetter(map[string]string{ + "db_type": "mysql", + "db_host": "localhost", + "db_name": "grafana", + "db_user": "user", + "db_password": "password", + }) + engine, err := getEnginePostgres(getter, nil) + + assert.NotNil(t, engine) + assert.NoError(t, err) + }) + + t.Run("invalid string", func(t *testing.T) { + t.Parallel() + getter := newTestSectionGetter(map[string]string{ + "db_type": "mysql", + "db_host": string(invalidUTF8ByteSequence), + "db_name": "grafana", + "db_user": "user", + "db_password": "password", + }) + engine, err := getEnginePostgres(getter, nil) + + assert.Nil(t, engine) + assert.Error(t, err) + assert.ErrorIs(t, err, ErrInvalidUTF8Sequence) + }) + + t.Run("invalid hostport", func(t *testing.T) { + t.Parallel() + getter := newTestSectionGetter(map[string]string{ + "db_type": "mysql", + "db_host": "1:1:1", + "db_name": "grafana", + "db_user": "user", + "db_password": "password", + }) + engine, err := getEnginePostgres(getter, nil) + + assert.Nil(t, engine) + assert.Error(t, err) + }) +} diff --git a/pkg/storage/unified/sql/db/dbimpl/db_test.go b/pkg/storage/unified/sql/db/dbimpl/db_test.go new file mode 100644 index 00000000000..7e7ff659bbb --- /dev/null +++ b/pkg/storage/unified/sql/db/dbimpl/db_test.go @@ -0,0 +1,154 @@ +package dbimpl + +import ( + "context" + "errors" + "testing" + "time" + + sqlmock "github.com/DATA-DOG/go-sqlmock" + "github.com/stretchr/testify/require" + + resourcedb "github.com/grafana/grafana/pkg/storage/unified/sql/db" +) + +func newCtx(t *testing.T) context.Context { + t.Helper() + + d, ok := t.Deadline() + if !ok { + // provide a default timeout for tests + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + t.Cleanup(cancel) + + return ctx + } + + ctx, cancel := context.WithDeadline(context.Background(), d) + t.Cleanup(cancel) + + return ctx +} + +var errTest = errors.New("because of reasons") + +const driverName = "sqlmock" + +func TestDB_BeginTx(t *testing.T) { + t.Parallel() + + t.Run("happy path", func(t *testing.T) { + t.Parallel() + + sqldb, mock, err := sqlmock.New() + require.NoError(t, err) + db := NewDB(sqldb, driverName) + require.Equal(t, driverName, db.DriverName()) + + mock.ExpectBegin() + tx, err := db.BeginTx(newCtx(t), nil) + + require.NoError(t, err) + require.NotNil(t, tx) + }) + + t.Run("fail begin", func(t *testing.T) { + t.Parallel() + + sqldb, mock, err := sqlmock.New() + require.NoError(t, err) + db := NewDB(sqldb, "sqlmock") + + mock.ExpectBegin().WillReturnError(errTest) + tx, err := db.BeginTx(newCtx(t), nil) + + require.Nil(t, tx) + require.Error(t, err) + require.ErrorIs(t, err, errTest) + }) +} + +func TestDB_WithTx(t *testing.T) { + t.Parallel() + + newTxFunc := func(err error) resourcedb.TxFunc { + return func(context.Context, resourcedb.Tx) error { + return err + } + } + + t.Run("happy path", func(t *testing.T) { + t.Parallel() + + sqldb, mock, err := sqlmock.New() + require.NoError(t, err) + db := NewDB(sqldb, "sqlmock") + + mock.ExpectBegin() + mock.ExpectCommit() + err = db.WithTx(newCtx(t), nil, newTxFunc(nil)) + + require.NoError(t, err) + }) + + t.Run("fail begin", func(t *testing.T) { + t.Parallel() + + sqldb, mock, err := sqlmock.New() + require.NoError(t, err) + db := NewDB(sqldb, "sqlmock") + + mock.ExpectBegin().WillReturnError(errTest) + err = db.WithTx(newCtx(t), nil, newTxFunc(nil)) + + require.Error(t, err) + require.ErrorIs(t, err, errTest) + }) + + t.Run("fail tx", func(t *testing.T) { + t.Parallel() + + sqldb, mock, err := sqlmock.New() + require.NoError(t, err) + db := NewDB(sqldb, "sqlmock") + + mock.ExpectBegin() + mock.ExpectRollback() + err = db.WithTx(newCtx(t), nil, newTxFunc(errTest)) + + require.Error(t, err) + require.ErrorIs(t, err, errTest) + }) + + t.Run("fail tx; fail rollback", func(t *testing.T) { + t.Parallel() + + sqldb, mock, err := sqlmock.New() + require.NoError(t, err) + db := NewDB(sqldb, "sqlmock") + errTest2 := errors.New("yet another err") + + mock.ExpectBegin() + mock.ExpectRollback().WillReturnError(errTest) + err = db.WithTx(newCtx(t), nil, newTxFunc(errTest2)) + + require.Error(t, err) + require.ErrorIs(t, err, errTest) + require.ErrorIs(t, err, errTest2) + }) + + t.Run("fail commit", func(t *testing.T) { + t.Parallel() + + sqldb, mock, err := sqlmock.New() + require.NoError(t, err) + db := NewDB(sqldb, "sqlmock") + + mock.ExpectBegin() + mock.ExpectCommit().WillReturnError(errTest) + err = db.WithTx(newCtx(t), nil, newTxFunc(nil)) + + require.Error(t, err) + require.ErrorIs(t, err, errTest) + }) +} diff --git a/pkg/storage/unified/sql/db/dbimpl/dbimpl.go b/pkg/storage/unified/sql/db/dbimpl/dbimpl.go new file mode 100644 index 00000000000..b6c5703a490 --- /dev/null +++ b/pkg/storage/unified/sql/db/dbimpl/dbimpl.go @@ -0,0 +1,166 @@ +package dbimpl + +import ( + "fmt" + "sync" + + "github.com/dlmiddlecote/sqlstats" + "github.com/jmoiron/sqlx" + "github.com/prometheus/client_golang/prometheus" + "xorm.io/xorm" + + "github.com/grafana/grafana/pkg/infra/db" + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/infra/tracing" + "github.com/grafana/grafana/pkg/services/featuremgmt" + "github.com/grafana/grafana/pkg/services/sqlstore/session" + "github.com/grafana/grafana/pkg/setting" + resourcedb "github.com/grafana/grafana/pkg/storage/unified/sql/db" + "github.com/grafana/grafana/pkg/storage/unified/sql/db/migrations" +) + +var _ resourcedb.ResourceDBInterface = (*ResourceDB)(nil) + +func ProvideResourceDB(db db.DB, cfg *setting.Cfg, features featuremgmt.FeatureToggles, tracer tracing.Tracer) (*ResourceDB, error) { + return &ResourceDB{ + db: db, + cfg: cfg, + features: features, + log: log.New("entity-db"), + tracer: tracer, + }, nil +} + +type ResourceDB struct { + once sync.Once + onceErr error + + db db.DB + features featuremgmt.FeatureToggles + engine *xorm.Engine + cfg *setting.Cfg + log log.Logger + tracer tracing.Tracer +} + +func (db *ResourceDB) Init() error { + db.once.Do(func() { + db.onceErr = db.init() + }) + + return db.onceErr +} + +func (db *ResourceDB) GetEngine() (*xorm.Engine, error) { + if err := db.Init(); err != nil { + return nil, err + } + + return db.engine, db.onceErr +} + +func (db *ResourceDB) init() error { + if db.engine != nil { + return nil + } + + var engine *xorm.Engine + var err error + + // TODO: This should be renamed resource_api + getter := §ionGetter{ + DynamicSection: db.cfg.SectionWithEnvOverrides("resource_api"), + } + + dbType := getter.Key("db_type").MustString("") + + // if explicit connection settings are provided, use them + if dbType != "" { + if dbType == "postgres" { + engine, err = getEnginePostgres(getter, db.tracer) + if err != nil { + return err + } + + // FIXME: this config option is cockroachdb-specific, it's not supported by postgres + // FIXME: this only sets this option for the session that we get + // from the pool right now. A *sql.DB is a pool of connections, + // there is no guarantee that the session where this is run will be + // the same where we need to change the type of a column + _, err = engine.Exec("SET SESSION enable_experimental_alter_column_type_general=true") + if err != nil { + db.log.Error("error connecting to postgres", "msg", err.Error()) + // FIXME: return nil, err + } + } else if dbType == "mysql" { + engine, err = getEngineMySQL(getter, db.tracer) + if err != nil { + return err + } + + if err = engine.Ping(); err != nil { + return err + } + } else { + // TODO: sqlite support + return fmt.Errorf("invalid db type specified: %s", dbType) + } + + // register sql stat metrics + if err := prometheus.Register(sqlstats.NewStatsCollector("unified_storage", engine.DB().DB)); err != nil { + db.log.Warn("Failed to register unified storage sql stats collector", "error", err) + } + + // configure sql logging + debugSQL := getter.Key("log_queries").MustBool(false) + if !debugSQL { + engine.SetLogger(&xorm.DiscardLogger{}) + } else { + // add stack to database calls to be able to see what repository initiated queries. Top 7 items from the stack as they are likely in the xorm library. + // engine.SetLogger(sqlstore.NewXormLogger(log.LvlInfo, log.WithSuffix(log.New("sqlstore.xorm"), log.CallerContextKey, log.StackCaller(log.DefaultCallerDepth)))) + engine.ShowSQL(true) + engine.ShowExecTime(true) + } + + // otherwise, try to use the grafana db connection + } else { + if db.db == nil { + return fmt.Errorf("no db connection provided") + } + + engine = db.db.GetEngine() + } + + db.engine = engine + + if err := migrations.MigrateResourceStore(engine, db.cfg, db.features); err != nil { + db.engine = nil + return fmt.Errorf("run migrations: %w", err) + } + + return nil +} + +func (db *ResourceDB) GetSession() (*session.SessionDB, error) { + engine, err := db.GetEngine() + if err != nil { + return nil, err + } + + return session.GetSession(sqlx.NewDb(engine.DB().DB, engine.DriverName())), nil +} + +func (db *ResourceDB) GetCfg() *setting.Cfg { + return db.cfg +} + +func (db *ResourceDB) GetDB() (resourcedb.DB, error) { + engine, err := db.GetEngine() + if err != nil { + return nil, err + } + + ret := NewDB(engine.DB().DB, engine.Dialect().DriverName()) + + return ret, nil +} diff --git a/pkg/storage/unified/sql/db/dbimpl/util.go b/pkg/storage/unified/sql/db/dbimpl/util.go new file mode 100644 index 00000000000..44ff1138236 --- /dev/null +++ b/pkg/storage/unified/sql/db/dbimpl/util.go @@ -0,0 +1,111 @@ +package dbimpl + +import ( + "cmp" + "errors" + "fmt" + "net" + "sort" + "strings" + "unicode/utf8" + + "github.com/grafana/grafana/pkg/setting" +) + +var ( + ErrInvalidUTF8Sequence = errors.New("invalid UTF-8 sequence") +) + +type sectionGetter struct { + *setting.DynamicSection + err error +} + +func (g *sectionGetter) Err() error { + return g.err +} + +func (g *sectionGetter) String(key string) string { + v := g.DynamicSection.Key(key).MustString("") + if !utf8.ValidString(v) { + g.err = fmt.Errorf("value for key %q: %w", key, ErrInvalidUTF8Sequence) + + return "" + } + + return v +} + +// MakeDSN creates a DSN from the given key/value pair. It validates the strings +// form valid UTF-8 sequences and escapes values if needed. +func MakeDSN(m map[string]string) (string, error) { + b := new(strings.Builder) + + ks := keys(m) + sort.Strings(ks) // provide deterministic behaviour + for _, k := range ks { + v := m[k] + if !utf8.ValidString(v) { + return "", fmt.Errorf("value for DSN key %q: %w", k, + ErrInvalidUTF8Sequence) + } + if v == "" { + continue + } + + if b.Len() > 0 { + _ = b.WriteByte(' ') + } + _, _ = b.WriteString(k) + _ = b.WriteByte('=') + writeDSNValue(b, v) + } + + return b.String(), nil +} + +func keys(m map[string]string) []string { + ret := make([]string, 0, len(m)) + for k := range m { + ret = append(ret, k) + } + return ret +} + +func writeDSNValue(b *strings.Builder, v string) { + numq := strings.Count(v, `'`) + numb := strings.Count(v, `\`) + if numq+numb == 0 && v != "" { + b.WriteString(v) + + return + } + b.Grow(2 + numq + numb + len(v)) + + _ = b.WriteByte('\'') + for _, r := range v { + if r == '\\' || r == '\'' { + _ = b.WriteByte('\\') + } + _, _ = b.WriteRune(r) + } + _ = b.WriteByte('\'') +} + +// splitHostPortDefault is similar to net.SplitHostPort, but will also accept a +// specification with no port and apply the default port instead. It also +// applies the given defaults if the results are empty strings. +func splitHostPortDefault(hostport, defaultHost, defaultPort string) (string, string, error) { + host, port, err := net.SplitHostPort(hostport) + if err != nil { + // try appending the port + host, port, err = net.SplitHostPort(hostport + ":" + defaultPort) + if err != nil { + return "", "", fmt.Errorf("invalid hostport: %q", hostport) + } + } + host = cmp.Or(host, defaultHost) + port = cmp.Or(port, defaultPort) + + return host, port, nil +} diff --git a/pkg/storage/unified/sql/db/dbimpl/util_test.go b/pkg/storage/unified/sql/db/dbimpl/util_test.go new file mode 100644 index 00000000000..cb3a76aeee7 --- /dev/null +++ b/pkg/storage/unified/sql/db/dbimpl/util_test.go @@ -0,0 +1,108 @@ +package dbimpl + +import ( + "fmt" + "testing" + + "github.com/grafana/grafana/pkg/setting" + "github.com/stretchr/testify/require" +) + +var invalidUTF8ByteSequence = []byte{0xff, 0xfe, 0xfd} + +func setSectionKeyValues(section *setting.DynamicSection, m map[string]string) { + for k, v := range m { + section.Key(k).SetValue(v) + } +} + +func newTestSectionGetter(m map[string]string) *sectionGetter { + section := setting.NewCfg().SectionWithEnvOverrides("entity_api") + setSectionKeyValues(section, m) + + return §ionGetter{ + DynamicSection: section, + } +} + +func TestSectionGetter(t *testing.T) { + t.Parallel() + + var ( + key = "the key" + val = string(invalidUTF8ByteSequence) + ) + + g := newTestSectionGetter(map[string]string{ + key: val, + }) + + v := g.String("whatever") + require.Empty(t, v) + require.NoError(t, g.Err()) + + v = g.String(key) + require.Empty(t, v) + require.Error(t, g.Err()) + require.ErrorIs(t, g.Err(), ErrInvalidUTF8Sequence) +} + +func TestMakeDSN(t *testing.T) { + t.Parallel() + + s, err := MakeDSN(map[string]string{ + "db_name": string(invalidUTF8ByteSequence), + }) + require.Empty(t, s) + require.Error(t, err) + require.ErrorIs(t, err, ErrInvalidUTF8Sequence) + + s, err = MakeDSN(map[string]string{ + "skip": "", + "user": `shou'ld esc\ape`, + "pass": "noescape", + }) + require.NoError(t, err) + require.Equal(t, `pass=noescape user='shou\'ld esc\\ape'`, s) +} + +func TestSplitHostPort(t *testing.T) { + t.Parallel() + + testCases := []struct { + hostport string + defaultHost string + defaultPort string + fails bool + + host string + port string + }{ + {hostport: "192.168.0.140:456", defaultHost: "", defaultPort: "", host: "192.168.0.140", port: "456"}, + {hostport: "192.168.0.140", defaultHost: "", defaultPort: "123", host: "192.168.0.140", port: "123"}, + {hostport: "[::1]:456", defaultHost: "", defaultPort: "", host: "::1", port: "456"}, + {hostport: "[::1]", defaultHost: "", defaultPort: "123", host: "::1", port: "123"}, + {hostport: ":456", defaultHost: "1.2.3.4", defaultPort: "", host: "1.2.3.4", port: "456"}, + {hostport: "xyz.rds.amazonaws.com", defaultHost: "", defaultPort: "123", host: "xyz.rds.amazonaws.com", port: "123"}, + {hostport: "xyz.rds.amazonaws.com:123", defaultHost: "", defaultPort: "", host: "xyz.rds.amazonaws.com", port: "123"}, + {hostport: "", defaultHost: "localhost", defaultPort: "1433", host: "localhost", port: "1433"}, + {hostport: "1:1:1", fails: true}, + } + + for i, tc := range testCases { + t.Run(fmt.Sprintf("test index #%d", i), func(t *testing.T) { + t.Parallel() + + host, port, err := splitHostPortDefault(tc.hostport, tc.defaultHost, tc.defaultPort) + if tc.fails { + require.Error(t, err) + require.Empty(t, host) + require.Empty(t, port) + } else { + require.NoError(t, err) + require.Equal(t, tc.host, host) + require.Equal(t, tc.port, port) + } + }) + } +} diff --git a/pkg/storage/unified/sql/db/migrations/migrator.go b/pkg/storage/unified/sql/db/migrations/migrator.go new file mode 100644 index 00000000000..f362342639f --- /dev/null +++ b/pkg/storage/unified/sql/db/migrations/migrator.go @@ -0,0 +1,24 @@ +package migrations + +import ( + "xorm.io/xorm" + + "github.com/grafana/grafana/pkg/services/featuremgmt" + "github.com/grafana/grafana/pkg/services/sqlstore/migrator" + "github.com/grafana/grafana/pkg/setting" +) + +func MigrateResourceStore(engine *xorm.Engine, cfg *setting.Cfg, features featuremgmt.FeatureToggles) error { + // Skip if feature flag is not enabled + if !features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorage) { + return nil + } + + mg := migrator.NewScopedMigrator(engine, cfg, "resource") + mg.AddCreateMigration() + + initResourceTables(mg) + + // since it's a new feature enable migration locking by default + return mg.Start(true, 0) +} diff --git a/pkg/storage/unified/sql/db/migrations/resource_mig.go b/pkg/storage/unified/sql/db/migrations/resource_mig.go new file mode 100644 index 00000000000..adfd75a0b73 --- /dev/null +++ b/pkg/storage/unified/sql/db/migrations/resource_mig.go @@ -0,0 +1,101 @@ +package migrations + +import ( + "fmt" + + "github.com/grafana/grafana/pkg/services/sqlstore/migrator" +) + +func initResourceTables(mg *migrator.Migrator) string { + marker := "Initialize resource tables" + mg.AddMigration(marker, &migrator.RawSQLMigration{}) + + tables := []migrator.Table{} + tables = append(tables, migrator.Table{ + Name: "resource", + Columns: []*migrator.Column{ + // primary identifier + {Name: "guid", Type: migrator.DB_NVarchar, Length: 36, Nullable: false, IsPrimaryKey: true}, + + {Name: "resource_version", Type: migrator.DB_BigInt, Nullable: true}, + + // K8s Identity group+(version)+namespace+resource+name + {Name: "group", Type: migrator.DB_NVarchar, Length: 190, Nullable: false}, + {Name: "resource", Type: migrator.DB_NVarchar, Length: 190, Nullable: false}, + {Name: "namespace", Type: migrator.DB_NVarchar, Length: 63, Nullable: false}, + {Name: "name", Type: migrator.DB_NVarchar, Length: 190, Nullable: false}, + {Name: "value", Type: migrator.DB_LongText, Nullable: true}, + {Name: "action", Type: migrator.DB_Int, Nullable: false}, // 1: create, 2: update, 3: delete + + // Hashed label set + {Name: "label_set", Type: migrator.DB_NVarchar, Length: 64, Nullable: true}, // null is no labels + }, + Indices: []*migrator.Index{ + {Cols: []string{"namespace", "group", "resource", "name"}, Type: migrator.UniqueIndex}, + }, + }) + + tables = append(tables, migrator.Table{ + Name: "resource_history", + Columns: []*migrator.Column{ + // primary identifier + {Name: "guid", Type: migrator.DB_NVarchar, Length: 36, Nullable: false, IsPrimaryKey: true}, + {Name: "resource_version", Type: migrator.DB_BigInt, Nullable: true}, + + // K8s Identity group+(version)+namespace+resource+name + {Name: "group", Type: migrator.DB_NVarchar, Length: 190, Nullable: false}, + {Name: "resource", Type: migrator.DB_NVarchar, Length: 190, Nullable: false}, + {Name: "namespace", Type: migrator.DB_NVarchar, Length: 63, Nullable: false}, + {Name: "name", Type: migrator.DB_NVarchar, Length: 190, Nullable: false}, + {Name: "value", Type: migrator.DB_LongText, Nullable: true}, + {Name: "action", Type: migrator.DB_Int, Nullable: false}, // 1: create, 2: update, 3: delete + + // Hashed label set + {Name: "label_set", Type: migrator.DB_NVarchar, Length: 64, Nullable: true}, // null is no labels + }, + Indices: []*migrator.Index{ + { + Cols: []string{"namespace", "group", "resource", "name", "resource_version"}, + Type: migrator.UniqueIndex, + Name: "UQE_resource_history_namespace_group_name_version", + }, + // index to support watch poller + {Cols: []string{"resource_version"}, Type: migrator.IndexType}, + }, + }) + + // tables = append(tables, migrator.Table{ + // Name: "resource_label_set", + // Columns: []*migrator.Column{ + // {Name: "label_set", Type: migrator.DB_NVarchar, Length: 64, Nullable: false}, + // {Name: "label", Type: migrator.DB_NVarchar, Length: 190, Nullable: false}, + // {Name: "value", Type: migrator.DB_Text, Nullable: false}, + // }, + // Indices: []*migrator.Index{ + // {Cols: []string{"label_set", "label"}, Type: migrator.UniqueIndex}, + // }, + // }) + + tables = append(tables, migrator.Table{ + Name: "resource_version", + Columns: []*migrator.Column{ + {Name: "group", Type: migrator.DB_NVarchar, Length: 190, Nullable: false}, + {Name: "resource", Type: migrator.DB_NVarchar, Length: 190, Nullable: false}, + {Name: "resource_version", Type: migrator.DB_BigInt, Nullable: false}, + }, + Indices: []*migrator.Index{ + {Cols: []string{"group", "resource"}, Type: migrator.UniqueIndex}, + }, + }) + + // Initialize all tables + for t := range tables { + mg.AddMigration("drop table "+tables[t].Name, migrator.NewDropTableMigration(tables[t].Name)) + mg.AddMigration("create table "+tables[t].Name, migrator.NewAddTableMigration(tables[t])) + for i := range tables[t].Indices { + mg.AddMigration(fmt.Sprintf("create table %s, index: %d", tables[t].Name, i), migrator.NewAddIndexMigration(tables[t], tables[t].Indices[i])) + } + } + + return marker +} diff --git a/pkg/storage/unified/sql/db/service.go b/pkg/storage/unified/sql/db/service.go new file mode 100755 index 00000000000..aee367fae7c --- /dev/null +++ b/pkg/storage/unified/sql/db/service.go @@ -0,0 +1,71 @@ +package db + +import ( + "context" + "database/sql" + + "xorm.io/xorm" + + "github.com/grafana/grafana/pkg/services/sqlstore/session" + "github.com/grafana/grafana/pkg/setting" +) + +const ( + DriverPostgres = "postgres" + DriverMySQL = "mysql" + DriverSQLite = "sqlite" + DriverSQLite3 = "sqlite3" +) + +// ResourceDBInterface provides access to a database capable of supporting the +// Entity Server. +type ResourceDBInterface interface { + Init() error + GetCfg() *setting.Cfg + GetDB() (DB, error) + + // TODO: deprecate. + GetSession() (*session.SessionDB, error) + GetEngine() (*xorm.Engine, error) +} + +// DB is a thin abstraction on *sql.DB to allow mocking to provide better unit +// testing. We purposefully hide database operation methods that would use +// context.Background(). +type DB interface { + ContextExecer + BeginTx(context.Context, *sql.TxOptions) (Tx, error) + WithTx(context.Context, *sql.TxOptions, TxFunc) error + PingContext(context.Context) error + Stats() sql.DBStats + DriverName() string +} + +// TxFunc is a function that executes with access to a transaction. The context +// it receives is the same context used to create the transaction, and is +// provided so that a general prupose TxFunc is able to retrieve information +// from that context, and derive other contexts that may be used to run database +// operation methods accepting a context. A derived context can be used to +// request a specific database operation to take no more than a specific +// fraction of the remaining timeout of the transaction context, or to enrich +// the downstream observability layer with relevant information regarding the +// specific operation being carried out. +type TxFunc = func(context.Context, Tx) error + +// Tx is a thin abstraction on *sql.Tx to allow mocking to provide better unit +// testing. We allow database operation methods that do not take a +// context.Context here since a Tx can only be obtained with DB.BeginTx, which +// already takes a context.Context. +type Tx interface { + ContextExecer + Commit() error + Rollback() error +} + +// ContextExecer is a set of database operation methods that take +// context.Context. +type ContextExecer interface { + ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) + QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) + QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row +} diff --git a/pkg/storage/unified/sql/queries.go b/pkg/storage/unified/sql/queries.go new file mode 100644 index 00000000000..5c7bdf108ef --- /dev/null +++ b/pkg/storage/unified/sql/queries.go @@ -0,0 +1,191 @@ +package sql + +import ( + "database/sql" + "embed" + "fmt" + "text/template" + + "github.com/grafana/grafana/pkg/storage/unified/resource" + "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate" +) + +// Templates setup. +var ( + //go:embed data/*.sql + sqlTemplatesFS embed.FS + + sqlTemplates = template.Must(template.New("sql").ParseFS(sqlTemplatesFS, `data/*.sql`)) +) + +func mustTemplate(filename string) *template.Template { + if t := sqlTemplates.Lookup(filename); t != nil { + return t + } + panic(fmt.Sprintf("template file not found: %s", filename)) +} + +// Templates. +var ( + sqlResourceDelete = mustTemplate("resource_delete.sql") + sqlResourceInsert = mustTemplate("resource_insert.sql") + sqlResourceUpdate = mustTemplate("resource_update.sql") + sqlResourceRead = mustTemplate("resource_read.sql") + sqlResourceList = mustTemplate("resource_list.sql") + sqlResourceHistoryList = mustTemplate("resource_history_list.sql") + sqlResourceUpdateRV = mustTemplate("resource_update_rv.sql") + sqlResourceHistoryRead = mustTemplate("resource_history_read.sql") + sqlResourceHistoryUpdateRV = mustTemplate("resource_history_update_rv.sql") + sqlResourceHistoryInsert = mustTemplate("resource_history_insert.sql") + sqlResourceHistoryPoll = mustTemplate("resource_history_poll.sql") + + // sqlResourceLabelsInsert = mustTemplate("resource_labels_insert.sql") + sqlResourceVersionGet = mustTemplate("resource_version_get.sql") + sqlResourceVersionInc = mustTemplate("resource_version_inc.sql") + sqlResourceVersionInsert = mustTemplate("resource_version_insert.sql") +) + +// TxOptions. +var ( + ReadCommitted = &sql.TxOptions{ + Isolation: sql.LevelReadCommitted, + } + ReadCommittedRO = &sql.TxOptions{ + Isolation: sql.LevelReadCommitted, + ReadOnly: true, + } +) + +// SQLError is an error returned by the database, which includes additionally +// debugging information about what was sent to the database. +type SQLError struct { + Err error + CallType string // either Query, QueryRow or Exec + TemplateName string + Query string + RawQuery string + ScanDest []any + + // potentially regulated information is not exported and only directly + // available for local testing and local debugging purposes, making sure it + // is never marshaled to JSON or any other serialization. + + arguments []any +} + +func (e SQLError) Unwrap() error { + return e.Err +} + +func (e SQLError) Error() string { + return fmt.Sprintf("%s: %s with %d input arguments and %d output "+ + "destination arguments: %v", e.TemplateName, e.CallType, + len(e.arguments), len(e.ScanDest), e.Err) +} + +type sqlResourceRequest struct { + *sqltemplate.SQLTemplate + GUID string + WriteEvent resource.WriteEvent +} + +func (r sqlResourceRequest) Validate() error { + return nil // TODO +} + +type historyPollResponse struct { + Key resource.ResourceKey + ResourceVersion int64 + Value []byte + Action int +} + +func (r *historyPollResponse) Results() (*historyPollResponse, error) { + return r, nil +} + +type sqlResourceHistoryPollRequest struct { + *sqltemplate.SQLTemplate + SinceResourceVersion int64 + Response *historyPollResponse +} + +func (r sqlResourceHistoryPollRequest) Validate() error { + return nil // TODO +} + +// sqlResourceReadRequest can be used to retrieve a row fromthe "resource" tables. + +type readResponse struct { + resource.ReadResponse +} + +func (r *readResponse) Results() (*readResponse, error) { + return r, nil +} + +type sqlResourceReadRequest struct { + *sqltemplate.SQLTemplate + Request *resource.ReadRequest + *readResponse +} + +func (r sqlResourceReadRequest) Validate() error { + return nil // TODO +} + +// List +type sqlResourceListRequest struct { + *sqltemplate.SQLTemplate + Request *resource.ListRequest + Response *resource.ResourceWrapper +} + +func (r sqlResourceListRequest) Validate() error { + return nil // TODO +} + +type historyListRequest struct { + ResourceVersion, Limit, Offset int64 + Options *resource.ListOptions +} +type sqlResourceHistoryListRequest struct { + *sqltemplate.SQLTemplate + Request *historyListRequest + Response *resource.ResourceWrapper +} + +func (r sqlResourceHistoryListRequest) Validate() error { + return nil // TODO +} + +// update RV + +type sqlResourceUpdateRVRequest struct { + *sqltemplate.SQLTemplate + GUID string + ResourceVersion int64 +} + +func (r sqlResourceUpdateRVRequest) Validate() error { + return nil // TODO +} + +// resource_version table requests. +type resourceVersion struct { + ResourceVersion int64 +} + +func (r *resourceVersion) Results() (*resourceVersion, error) { + return r, nil +} + +type sqlResourceVersionRequest struct { + *sqltemplate.SQLTemplate + Group, Resource string + *resourceVersion +} + +func (r sqlResourceVersionRequest) Validate() error { + return nil // TODO +} diff --git a/pkg/storage/unified/sql/queries_test.go b/pkg/storage/unified/sql/queries_test.go new file mode 100644 index 00000000000..6c4b066173c --- /dev/null +++ b/pkg/storage/unified/sql/queries_test.go @@ -0,0 +1,364 @@ +package sql + +import ( + "embed" + "errors" + "testing" + "text/template" + + "github.com/stretchr/testify/require" + + "github.com/grafana/grafana/pkg/storage/unified/resource" + "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate" +) + +// debug is meant to provide greater debugging detail about certain errors. The +// returned error will either provide more detailed information or be the same +// original error, suitable only for local debugging. The details provided are +// not meant to be logged, since they could include PII or otherwise +// sensitive/confidential information. These information should only be used for +// local debugging with fake or otherwise non-regulated information. +func debug(err error) error { + var d interface{ Debug() string } + if errors.As(err, &d) { + return errors.New(d.Debug()) + } + + return err +} + +var _ = debug // silence the `unused` linter + +//go:embed testdata/* +var testdataFS embed.FS + +func testdata(t *testing.T, filename string) []byte { + t.Helper() + b, err := testdataFS.ReadFile(`testdata/` + filename) + require.NoError(t, err) + + return b +} + +func TestQueries(t *testing.T) { + t.Parallel() + + // Each template has one or more test cases, each identified with a + // descriptive name (e.g. "happy path", "error twiddling the frobb"). Each + // of them will test that for the same input data they must produce a result + // that will depend on the Dialect. Expected queries should be defined in + // separate files in the testdata directory. This improves the testing + // experience by separating test data from test code, since mixing both + // tends to make it more difficult to reason about what is being done, + // especially as we want testing code to scale and make it easy to add + // tests. + type ( + // type aliases to make code more semantic and self-documenting + resultSQLFilename = string + dialects = []sqltemplate.Dialect + expected map[resultSQLFilename]dialects + + testCase = struct { + Name string + + // Data should be the struct passed to the template. + Data sqltemplate.SQLTemplateIface + + // Expected maps the filename containing the expected result query + // to the list of dialects that would produce it. For simple + // queries, it is possible that more than one dialect produce the + // same output. The filename is expected to be in the `testdata` + // directory. + Expected expected + } + ) + + // Define tests cases. Most templates are trivial and testing that they + // generate correct code for a single Dialect is fine, since the one thing + // that always changes is how SQL placeholder arguments are passed (most + // Dialects use `?` while PostgreSQL uses `$1`, `$2`, etc.), and that is + // something that should be tested in the Dialect implementation instead of + // here. We will ask to have at least one test per SQL template, and we will + // lean to test MySQL. Templates containing branching (conditionals, loops, + // etc.) should be exercised at least once in each of their branches. + // + // NOTE: in the Data field, make sure to have pointers populated to simulate + // data is set as it would be in a real request. The data being correctly + // populated in each case should be tested in integration tests, where the + // data will actually flow to and from a real database. In this tests we + // only care about producing the correct SQL. + testCases := map[*template.Template][]*testCase{ + sqlResourceDelete: { + { + Name: "single path", + Data: &sqlResourceRequest{ + SQLTemplate: new(sqltemplate.SQLTemplate), + WriteEvent: resource.WriteEvent{ + Key: &resource.ResourceKey{}, + }, + }, + Expected: expected{ + "resource_delete_mysql_sqlite.sql": dialects{ + sqltemplate.MySQL, + sqltemplate.SQLite, + }, + "resource_delete_postgres.sql": dialects{ + sqltemplate.PostgreSQL, + }, + }, + }, + }, + + sqlResourceInsert: { + { + Name: "insert into resource", + Data: &sqlResourceRequest{ + SQLTemplate: new(sqltemplate.SQLTemplate), + WriteEvent: resource.WriteEvent{ + Key: &resource.ResourceKey{}, + }, + }, + Expected: expected{ + "resource_insert_mysql_sqlite.sql": dialects{ + sqltemplate.MySQL, + sqltemplate.SQLite, + }, + }, + }, + }, + sqlResourceUpdate: { + { + Name: "single path", + Data: &sqlResourceRequest{ + SQLTemplate: new(sqltemplate.SQLTemplate), + WriteEvent: resource.WriteEvent{ + Key: &resource.ResourceKey{}, + }, + }, + Expected: expected{ + "resource_update_mysql_sqlite.sql": dialects{ + sqltemplate.MySQL, + sqltemplate.SQLite, + }, + }, + }, + }, + + sqlResourceRead: { + { + Name: "without resource version", + Data: &sqlResourceReadRequest{ + SQLTemplate: new(sqltemplate.SQLTemplate), + Request: &resource.ReadRequest{ + Key: &resource.ResourceKey{}, + }, + readResponse: new(readResponse), + }, + Expected: expected{ + "resource_read_mysql_sqlite.sql": dialects{ + sqltemplate.MySQL, + sqltemplate.SQLite, + }, + }, + }, + }, + sqlResourceList: { + { + Name: "filter on namespace", + Data: &sqlResourceListRequest{ + SQLTemplate: new(sqltemplate.SQLTemplate), + Request: &resource.ListRequest{ + Limit: 10, + Options: &resource.ListOptions{ + Key: &resource.ResourceKey{ + Namespace: "ns", + }, + }, + }, + Response: new(resource.ResourceWrapper), + }, + Expected: expected{ + "resource_list_mysql_sqlite.sql": dialects{ + sqltemplate.MySQL, + sqltemplate.SQLite, + }, + }, + }, + }, + sqlResourceHistoryList: { + { + Name: "single path", + Data: &sqlResourceHistoryListRequest{ + SQLTemplate: new(sqltemplate.SQLTemplate), + Request: &historyListRequest{ + Limit: 10, + Options: &resource.ListOptions{ + Key: &resource.ResourceKey{ + Namespace: "ns", + }, + }, + }, + Response: new(resource.ResourceWrapper), + }, + Expected: expected{ + "resource_history_list_mysql_sqlite.sql": dialects{ + sqltemplate.MySQL, + sqltemplate.SQLite, + }, + }, + }, + }, + sqlResourceUpdateRV: { + { + Name: "single path", + Data: &sqlResourceUpdateRVRequest{ + SQLTemplate: new(sqltemplate.SQLTemplate), + }, + Expected: expected{ + "resource_update_rv_mysql_sqlite.sql": dialects{ + sqltemplate.MySQL, + sqltemplate.SQLite, + }, + }, + }, + }, + sqlResourceHistoryRead: { + { + Name: "single path", + Data: &sqlResourceReadRequest{ + SQLTemplate: new(sqltemplate.SQLTemplate), + Request: &resource.ReadRequest{ + ResourceVersion: 123, + Key: &resource.ResourceKey{}, + }, + readResponse: new(readResponse), + }, + Expected: expected{ + "resource_history_read_mysql_sqlite.sql": dialects{ + sqltemplate.MySQL, + sqltemplate.SQLite, + }, + }, + }, + }, + sqlResourceHistoryUpdateRV: { + { + Name: "single path", + Data: &sqlResourceUpdateRVRequest{ + SQLTemplate: new(sqltemplate.SQLTemplate), + }, + Expected: expected{ + "resource_history_update_rv_mysql_sqlite.sql": dialects{ + sqltemplate.MySQL, + sqltemplate.SQLite, + }, + }, + }, + }, + sqlResourceHistoryInsert: { + { + Name: "insert into resource_history", + Data: &sqlResourceRequest{ + SQLTemplate: new(sqltemplate.SQLTemplate), + WriteEvent: resource.WriteEvent{ + Key: &resource.ResourceKey{}, + }, + }, + Expected: expected{ + "resource_history_insert_mysql_sqlite.sql": dialects{ + sqltemplate.MySQL, + sqltemplate.SQLite, + }, + }, + }, + }, + + sqlResourceVersionGet: { + { + Name: "single path", + Data: &sqlResourceVersionRequest{ + SQLTemplate: new(sqltemplate.SQLTemplate), + resourceVersion: new(resourceVersion), + }, + Expected: expected{ + "resource_version_get_mysql.sql": dialects{ + sqltemplate.MySQL, + }, + "resource_version_get_sqlite.sql": dialects{ + sqltemplate.SQLite, + }, + }, + }, + }, + + sqlResourceVersionInc: { + { + Name: "increment resource version", + Data: &sqlResourceVersionRequest{ + SQLTemplate: new(sqltemplate.SQLTemplate), + resourceVersion: &resourceVersion{ + ResourceVersion: 123, + }, + }, + Expected: expected{ + "resource_version_inc_mysql_sqlite.sql": dialects{ + sqltemplate.MySQL, + sqltemplate.SQLite, + }, + }, + }, + }, + + sqlResourceVersionInsert: { + { + Name: "single path", + Data: &sqlResourceVersionRequest{ + SQLTemplate: new(sqltemplate.SQLTemplate), + }, + Expected: expected{ + "resource_version_insert_mysql_sqlite.sql": dialects{ + sqltemplate.MySQL, + sqltemplate.SQLite, + }, + }, + }, + }, + } + + // Execute test cases + for tmpl, tcs := range testCases { + t.Run(tmpl.Name(), func(t *testing.T) { + t.Parallel() + + for _, tc := range tcs { + t.Run(tc.Name, func(t *testing.T) { + t.Parallel() + + for filename, ds := range tc.Expected { + t.Run(filename, func(t *testing.T) { + // not parallel because we're sharing tc.Data, not + // worth it deep cloning + + rawQuery := string(testdata(t, filename)) + expectedQuery := sqltemplate.FormatSQL(rawQuery) + + for _, d := range ds { + t.Run(d.Name(), func(t *testing.T) { + // not parallel for the same reason + + tc.Data.SetDialect(d) + err := tc.Data.Validate() + require.NoError(t, err) + got, err := sqltemplate.Execute(tmpl, tc.Data) + require.NoError(t, err) + got = sqltemplate.FormatSQL(got) + require.Equal(t, expectedQuery, got) + }) + } + }) + } + }) + } + }) + } +} diff --git a/pkg/storage/unified/sql/server.go b/pkg/storage/unified/sql/server.go new file mode 100644 index 00000000000..eac94aba2b7 --- /dev/null +++ b/pkg/storage/unified/sql/server.go @@ -0,0 +1,31 @@ +package sql + +import ( + "github.com/grafana/grafana/pkg/infra/db" + "github.com/grafana/grafana/pkg/infra/tracing" + "github.com/grafana/grafana/pkg/services/featuremgmt" + "github.com/grafana/grafana/pkg/setting" + "github.com/grafana/grafana/pkg/storage/unified/resource" + "github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl" +) + +// Creates a ResourceServer +func ProvideResourceServer(db db.DB, cfg *setting.Cfg, features featuremgmt.FeatureToggles, tracer tracing.Tracer) (resource.ResourceServer, error) { + opts := resource.ResourceServerOptions{ + Tracer: tracer, + } + + eDB, err := dbimpl.ProvideResourceDB(db, cfg, features, tracer) + if err != nil { + return nil, err + } + store, err := NewBackendStore(backendOptions{DB: eDB, Tracer: tracer}) + if err != nil { + return nil, err + } + opts.Backend = store + opts.Diagnostics = store + opts.Lifecycle = store + + return resource.NewResourceServer(opts) +} diff --git a/pkg/services/store/entity/sqlstash/sqltemplate/args.go b/pkg/storage/unified/sql/sqltemplate/args.go similarity index 100% rename from pkg/services/store/entity/sqlstash/sqltemplate/args.go rename to pkg/storage/unified/sql/sqltemplate/args.go diff --git a/pkg/services/store/entity/sqlstash/sqltemplate/args_test.go b/pkg/storage/unified/sql/sqltemplate/args_test.go similarity index 100% rename from pkg/services/store/entity/sqlstash/sqltemplate/args_test.go rename to pkg/storage/unified/sql/sqltemplate/args_test.go diff --git a/pkg/services/store/entity/sqlstash/sqltemplate/dialect.go b/pkg/storage/unified/sql/sqltemplate/dialect.go similarity index 100% rename from pkg/services/store/entity/sqlstash/sqltemplate/dialect.go rename to pkg/storage/unified/sql/sqltemplate/dialect.go diff --git a/pkg/services/store/entity/sqlstash/sqltemplate/dialect_mysql.go b/pkg/storage/unified/sql/sqltemplate/dialect_mysql.go similarity index 100% rename from pkg/services/store/entity/sqlstash/sqltemplate/dialect_mysql.go rename to pkg/storage/unified/sql/sqltemplate/dialect_mysql.go diff --git a/pkg/services/store/entity/sqlstash/sqltemplate/dialect_postgresql.go b/pkg/storage/unified/sql/sqltemplate/dialect_postgresql.go similarity index 100% rename from pkg/services/store/entity/sqlstash/sqltemplate/dialect_postgresql.go rename to pkg/storage/unified/sql/sqltemplate/dialect_postgresql.go diff --git a/pkg/services/store/entity/sqlstash/sqltemplate/dialect_postgresql_test.go b/pkg/storage/unified/sql/sqltemplate/dialect_postgresql_test.go similarity index 100% rename from pkg/services/store/entity/sqlstash/sqltemplate/dialect_postgresql_test.go rename to pkg/storage/unified/sql/sqltemplate/dialect_postgresql_test.go diff --git a/pkg/services/store/entity/sqlstash/sqltemplate/dialect_sqlite.go b/pkg/storage/unified/sql/sqltemplate/dialect_sqlite.go similarity index 100% rename from pkg/services/store/entity/sqlstash/sqltemplate/dialect_sqlite.go rename to pkg/storage/unified/sql/sqltemplate/dialect_sqlite.go diff --git a/pkg/services/store/entity/sqlstash/sqltemplate/dialect_test.go b/pkg/storage/unified/sql/sqltemplate/dialect_test.go similarity index 100% rename from pkg/services/store/entity/sqlstash/sqltemplate/dialect_test.go rename to pkg/storage/unified/sql/sqltemplate/dialect_test.go diff --git a/pkg/services/store/entity/sqlstash/sqltemplate/example_test.go b/pkg/storage/unified/sql/sqltemplate/example_test.go similarity index 100% rename from pkg/services/store/entity/sqlstash/sqltemplate/example_test.go rename to pkg/storage/unified/sql/sqltemplate/example_test.go diff --git a/pkg/services/store/entity/sqlstash/sqltemplate/into.go b/pkg/storage/unified/sql/sqltemplate/into.go similarity index 100% rename from pkg/services/store/entity/sqlstash/sqltemplate/into.go rename to pkg/storage/unified/sql/sqltemplate/into.go diff --git a/pkg/services/store/entity/sqlstash/sqltemplate/into_test.go b/pkg/storage/unified/sql/sqltemplate/into_test.go similarity index 100% rename from pkg/services/store/entity/sqlstash/sqltemplate/into_test.go rename to pkg/storage/unified/sql/sqltemplate/into_test.go diff --git a/pkg/services/store/entity/sqlstash/sqltemplate/mocks/SQLTemplateIface.go b/pkg/storage/unified/sql/sqltemplate/mocks/SQLTemplateIface.go similarity index 99% rename from pkg/services/store/entity/sqlstash/sqltemplate/mocks/SQLTemplateIface.go rename to pkg/storage/unified/sql/sqltemplate/mocks/SQLTemplateIface.go index d41f66f97c3..4461b349a31 100644 --- a/pkg/services/store/entity/sqlstash/sqltemplate/mocks/SQLTemplateIface.go +++ b/pkg/storage/unified/sql/sqltemplate/mocks/SQLTemplateIface.go @@ -7,7 +7,7 @@ import ( mock "github.com/stretchr/testify/mock" - sqltemplate "github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate" + sqltemplate "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate" ) // SQLTemplateIface is an autogenerated mock type for the SQLTemplateIface type diff --git a/pkg/services/store/entity/sqlstash/sqltemplate/mocks/WithResults.go b/pkg/storage/unified/sql/sqltemplate/mocks/WithResults.go similarity index 99% rename from pkg/services/store/entity/sqlstash/sqltemplate/mocks/WithResults.go rename to pkg/storage/unified/sql/sqltemplate/mocks/WithResults.go index 7c98a0c3433..dea246cd777 100644 --- a/pkg/services/store/entity/sqlstash/sqltemplate/mocks/WithResults.go +++ b/pkg/storage/unified/sql/sqltemplate/mocks/WithResults.go @@ -7,7 +7,7 @@ import ( mock "github.com/stretchr/testify/mock" - sqltemplate "github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate" + sqltemplate "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate" ) // WithResults is an autogenerated mock type for the WithResults type diff --git a/pkg/services/store/entity/sqlstash/sqltemplate/sqltemplate.go b/pkg/storage/unified/sql/sqltemplate/sqltemplate.go similarity index 100% rename from pkg/services/store/entity/sqlstash/sqltemplate/sqltemplate.go rename to pkg/storage/unified/sql/sqltemplate/sqltemplate.go diff --git a/pkg/services/store/entity/sqlstash/sqltemplate/sqltemplate_test.go b/pkg/storage/unified/sql/sqltemplate/sqltemplate_test.go similarity index 100% rename from pkg/services/store/entity/sqlstash/sqltemplate/sqltemplate_test.go rename to pkg/storage/unified/sql/sqltemplate/sqltemplate_test.go diff --git a/pkg/storage/unified/sql/testdata/resource_delete_mysql_sqlite.sql b/pkg/storage/unified/sql/testdata/resource_delete_mysql_sqlite.sql new file mode 100644 index 00000000000..0c4e634b02c --- /dev/null +++ b/pkg/storage/unified/sql/testdata/resource_delete_mysql_sqlite.sql @@ -0,0 +1 @@ +DELETE FROM "resource" WHERE 1 = 1 AND "namespace" = ? AND "group" = ? AND "resource" = ? AND "name" = ?; diff --git a/pkg/storage/unified/sql/testdata/resource_delete_postgres.sql b/pkg/storage/unified/sql/testdata/resource_delete_postgres.sql new file mode 100644 index 00000000000..47f39a82e5d --- /dev/null +++ b/pkg/storage/unified/sql/testdata/resource_delete_postgres.sql @@ -0,0 +1 @@ +DELETE FROM "resource" WHERE 1 = 1 AND "namespace" = $1 AND "group" = $2 AND "resource" = $3 AND "name" = $4; diff --git a/pkg/storage/unified/sql/testdata/resource_history_insert_mysql_sqlite.sql b/pkg/storage/unified/sql/testdata/resource_history_insert_mysql_sqlite.sql new file mode 100644 index 00000000000..d267d8d31bd --- /dev/null +++ b/pkg/storage/unified/sql/testdata/resource_history_insert_mysql_sqlite.sql @@ -0,0 +1,3 @@ +INSERT INTO "resource_history" + ("guid", "group", "resource", "namespace", "name", "value", "action") + VALUES (?, ?, ?, ?, ?, ?, ?); diff --git a/pkg/storage/unified/sql/testdata/resource_history_list_mysql_sqlite.sql b/pkg/storage/unified/sql/testdata/resource_history_list_mysql_sqlite.sql new file mode 100644 index 00000000000..f8fcb9535b4 --- /dev/null +++ b/pkg/storage/unified/sql/testdata/resource_history_list_mysql_sqlite.sql @@ -0,0 +1,12 @@ +SELECT kv."resource_version", "value" +FROM "resource_history" as kv +JOIN ( + SELECT "guid", max("resource_version") AS "resource_version" + FROM "resource_history" AS mkv + WHERE 1 = 1 AND "resource_version" <= ? AND "namespace" = ? + GROUP BY mkv."namespace", mkv."group", mkv."resource", mkv."name" +) AS maxkv ON maxkv."guid" = kv."guid" +WHERE kv."action" != 3 +ORDER BY kv."resource_version" ASC +LIMIT ?, ? +; diff --git a/pkg/storage/unified/sql/testdata/resource_history_read_mysql_sqlite.sql b/pkg/storage/unified/sql/testdata/resource_history_read_mysql_sqlite.sql new file mode 100644 index 00000000000..228b54f7e7e --- /dev/null +++ b/pkg/storage/unified/sql/testdata/resource_history_read_mysql_sqlite.sql @@ -0,0 +1,6 @@ +SELECT "resource_version", "value" + FROM "resource_history" + WHERE 1 = 1 AND "namespace" = ? AND "group" = ? AND "resource" = ? AND "name" = ? AND "resource_version" <= ? + ORDER BY "resource_version" DESC + LIMIT 1 +; diff --git a/pkg/storage/unified/sql/testdata/resource_history_update_rv_mysql_sqlite.sql b/pkg/storage/unified/sql/testdata/resource_history_update_rv_mysql_sqlite.sql new file mode 100644 index 00000000000..e11a0b5f092 --- /dev/null +++ b/pkg/storage/unified/sql/testdata/resource_history_update_rv_mysql_sqlite.sql @@ -0,0 +1,3 @@ +UPDATE "resource_history" SET "resource_version" = ? +WHERE "guid" = ? +; diff --git a/pkg/storage/unified/sql/testdata/resource_insert_mysql_sqlite.sql b/pkg/storage/unified/sql/testdata/resource_insert_mysql_sqlite.sql new file mode 100644 index 00000000000..b61dff00552 --- /dev/null +++ b/pkg/storage/unified/sql/testdata/resource_insert_mysql_sqlite.sql @@ -0,0 +1,4 @@ +INSERT INTO "resource" + ("guid", "group", "resource", "namespace", "name", "value", "action") + VALUES (?, ?, ?, ?, ?, ?, ?) +; diff --git a/pkg/storage/unified/sql/testdata/resource_list_mysql_sqlite.sql b/pkg/storage/unified/sql/testdata/resource_list_mysql_sqlite.sql new file mode 100644 index 00000000000..0b2edae1e31 --- /dev/null +++ b/pkg/storage/unified/sql/testdata/resource_list_mysql_sqlite.sql @@ -0,0 +1,6 @@ +SELECT "resource_version", "value" + FROM "resource" + WHERE 1 = 1 AND "namespace" = ? + ORDER BY "resource_version" DESC + LIMIT ? +; diff --git a/pkg/storage/unified/sql/testdata/resource_read_mysql_sqlite.sql b/pkg/storage/unified/sql/testdata/resource_read_mysql_sqlite.sql new file mode 100644 index 00000000000..80d06fca4d5 --- /dev/null +++ b/pkg/storage/unified/sql/testdata/resource_read_mysql_sqlite.sql @@ -0,0 +1,4 @@ +SELECT "resource_version", "value" + FROM "resource" + WHERE 1 = 1 AND "namespace" = ? AND "group" = ? AND "resource" = ? AND "name" = ? +; diff --git a/pkg/storage/unified/sql/testdata/resource_update_mysql_sqlite.sql b/pkg/storage/unified/sql/testdata/resource_update_mysql_sqlite.sql new file mode 100644 index 00000000000..79cb0546a0f --- /dev/null +++ b/pkg/storage/unified/sql/testdata/resource_update_mysql_sqlite.sql @@ -0,0 +1,4 @@ +UPDATE "resource" SET "guid" = ?, "value" = ?, "action" = ? +WHERE 1 =1 AND "group" = ? AND "resource" = ? AND "namespace" = ? AND "name" = ? +; + \ No newline at end of file diff --git a/pkg/storage/unified/sql/testdata/resource_update_rv_mysql_sqlite.sql b/pkg/storage/unified/sql/testdata/resource_update_rv_mysql_sqlite.sql new file mode 100644 index 00000000000..71a1128c233 --- /dev/null +++ b/pkg/storage/unified/sql/testdata/resource_update_rv_mysql_sqlite.sql @@ -0,0 +1,4 @@ +UPDATE "resource" SET "resource_version" = ? +WHERE "guid" = ? +; + \ No newline at end of file diff --git a/pkg/storage/unified/sql/testdata/resource_version_get_mysql.sql b/pkg/storage/unified/sql/testdata/resource_version_get_mysql.sql new file mode 100644 index 00000000000..0815e484f2f --- /dev/null +++ b/pkg/storage/unified/sql/testdata/resource_version_get_mysql.sql @@ -0,0 +1,4 @@ +SELECT "resource_version" + FROM "resource_version" + WHERE 1 = 1 AND "group" = ? AND "resource" = ? +FOR UPDATE; diff --git a/pkg/storage/unified/sql/testdata/resource_version_get_sqlite.sql b/pkg/storage/unified/sql/testdata/resource_version_get_sqlite.sql new file mode 100644 index 00000000000..329fc0f6d04 --- /dev/null +++ b/pkg/storage/unified/sql/testdata/resource_version_get_sqlite.sql @@ -0,0 +1,4 @@ +SELECT "resource_version" + FROM "resource_version" + WHERE 1 = 1 AND "group" = ? AND "resource" = ? +; diff --git a/pkg/storage/unified/sql/testdata/resource_version_inc_mysql_sqlite.sql b/pkg/storage/unified/sql/testdata/resource_version_inc_mysql_sqlite.sql new file mode 100644 index 00000000000..2d4c6b37574 --- /dev/null +++ b/pkg/storage/unified/sql/testdata/resource_version_inc_mysql_sqlite.sql @@ -0,0 +1,4 @@ +UPDATE "resource_version" + SET "resource_version" = ? + WHERE 1 = 1 AND "group" = ? AND "resource" = ? +; diff --git a/pkg/storage/unified/sql/testdata/resource_version_insert_mysql_sqlite.sql b/pkg/storage/unified/sql/testdata/resource_version_insert_mysql_sqlite.sql new file mode 100644 index 00000000000..ca378d00b30 --- /dev/null +++ b/pkg/storage/unified/sql/testdata/resource_version_insert_mysql_sqlite.sql @@ -0,0 +1,3 @@ +INSERT INTO "resource_version" + ("group", "resource", "resource_version") + VALUES (?, ?, 1);