diff --git a/pkg/storage/unified/sql/backend.go b/pkg/storage/unified/sql/backend.go index ebd04ce4a30..a129a01727a 100644 --- a/pkg/storage/unified/sql/backend.go +++ b/pkg/storage/unified/sql/backend.go @@ -31,6 +31,7 @@ import ( "github.com/grafana/grafana/pkg/storage/unified/resourcepb" "github.com/grafana/grafana/pkg/storage/unified/sql/db" "github.com/grafana/grafana/pkg/storage/unified/sql/dbutil" + "github.com/grafana/grafana/pkg/storage/unified/sql/rvmanager" "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate" "github.com/grafana/grafana/pkg/util/debouncer" ) @@ -126,7 +127,7 @@ type backend struct { notifier eventNotifier // resource version manager - rvManager *resourceVersionManager + rvManager *rvmanager.ResourceVersionManager // testing simulatedNetworkLatency time.Duration @@ -163,7 +164,7 @@ func (b *backend) initLocked(ctx context.Context) error { } // Initialize ResourceVersionManager - rvManager, err := NewResourceVersionManager(ResourceManagerOptions{ + rvManager, err := rvmanager.NewResourceVersionManager(rvmanager.ResourceManagerOptions{ Dialect: b.dialect, DB: b.db, }) @@ -928,12 +929,12 @@ func (b *backend) listLatestRVs(ctx context.Context) (groupResourceRV, error) { func (b *backend) fetchLatestRV(ctx context.Context, x db.ContextExecer, d sqltemplate.Dialect, group, resource string) (int64, error) { ctx, span := tracer.Start(ctx, "sql.backend.fetchLatestRV") defer span.End() - res, err := dbutil.QueryRow(ctx, x, sqlResourceVersionGet, sqlResourceVersionGetRequest{ + res, err := dbutil.QueryRow(ctx, x, rvmanager.SqlResourceVersionGet, rvmanager.SqlResourceVersionGetRequest{ SQLTemplate: sqltemplate.New(d), Group: group, Resource: resource, ReadOnly: true, - Response: new(resourceVersionResponse), + Response: new(rvmanager.ResourceVersionResponse), }) if errors.Is(err, sql.ErrNoRows) { return 1, nil diff --git a/pkg/storage/unified/sql/backend_test.go b/pkg/storage/unified/sql/backend_test.go index f7ecd755f32..2f8f9fcb060 100644 --- a/pkg/storage/unified/sql/backend_test.go +++ b/pkg/storage/unified/sql/backend_test.go @@ -40,6 +40,26 @@ type testBackend struct { test.TestDBProvider } +func expectSuccessfulResourceVersionLock(t *testing.T, dbp test.TestDBProvider, rv int64, timestamp int64) { + dbp.SQLMock.ExpectQuery("select resource_version, unix_timestamp for update"). + WillReturnRows(sqlmock.NewRows([]string{"resource_version", "unix_timestamp"}). + AddRow(rv, timestamp)) +} + +func expectSuccessfulResourceVersionSaveRV(t *testing.T, dbp test.TestDBProvider) { + dbp.SQLMock.ExpectExec("update resource set resource_version").WillReturnResult(sqlmock.NewResult(1, 1)) + dbp.SQLMock.ExpectExec("update resource_history set resource_version").WillReturnResult(sqlmock.NewResult(1, 1)) + dbp.SQLMock.ExpectExec("update resource_version set resource_version").WillReturnResult(sqlmock.NewResult(1, 1)) +} + +func expectSuccessfulResourceVersionExec(t *testing.T, dbp test.TestDBProvider, cbs ...func()) { + for _, cb := range cbs { + cb() + } + expectSuccessfulResourceVersionLock(t, dbp, 100, 200) + expectSuccessfulResourceVersionSaveRV(t, dbp) +} + func (b testBackend) ExecWithResult(expectedSQL string, lastInsertID int64, rowsAffected int64) { b.SQLMock.ExpectExec(expectedSQL).WillReturnResult(sqlmock.NewResult(lastInsertID, rowsAffected)) } diff --git a/pkg/storage/unified/sql/bulk.go b/pkg/storage/unified/sql/bulk.go index 6580975a764..4037c74f0dd 100644 --- a/pkg/storage/unified/sql/bulk.go +++ b/pkg/storage/unified/sql/bulk.go @@ -281,13 +281,13 @@ func (b *backend) processBulkWithTx(ctx context.Context, tx db.Tx, setting resou } if b.dialect.DialectName() == "sqlite" { - nextRV, err := b.rvManager.lock(ctx, tx, key.Group, key.Resource) + nextRV, err := b.rvManager.Lock(ctx, tx, key.Group, key.Resource) if err != nil { b.log.Error("error locking RV", "error", err, "key", resource.NSGR(key)) } else { b.log.Info("successfully locked RV", "nextRV", nextRV, "key", resource.NSGR(key)) // Save the incremented RV - if err := b.rvManager.saveRV(ctx, tx, key.Group, key.Resource, nextRV); err != nil { + if err := b.rvManager.SaveRV(ctx, tx, key.Group, key.Resource, nextRV); err != nil { b.log.Error("error saving RV", "error", err, "key", resource.NSGR(key)) } else { b.log.Info("successfully saved RV", "rv", nextRV, "key", resource.NSGR(key)) diff --git a/pkg/storage/unified/sql/list_iterator_test.go b/pkg/storage/unified/sql/list_iterator_test.go index 3b567ca0f3e..e049e06525e 100644 --- a/pkg/storage/unified/sql/list_iterator_test.go +++ b/pkg/storage/unified/sql/list_iterator_test.go @@ -17,6 +17,7 @@ import ( dbsql "github.com/grafana/grafana/pkg/storage/unified/sql/db" "github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl" "github.com/grafana/grafana/pkg/storage/unified/sql/dbutil" + "github.com/grafana/grafana/pkg/storage/unified/sql/rvmanager" "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate" "github.com/grafana/grafana/pkg/tests/testsuite" "github.com/grafana/grafana/pkg/util/testutil" @@ -94,7 +95,7 @@ func TestIntegrationListIter(t *testing.T) { return fmt.Errorf("failed to insert test data: %w", err) } - if _, err = dbutil.Exec(ctx, tx, sqlResourceUpdateRV, sqlResourceUpdateRVRequest{ + if _, err = dbutil.Exec(ctx, tx, rvmanager.SqlResourceUpdateRV, rvmanager.SqlResourceUpdateRVRequest{ SQLTemplate: sqltemplate.New(dialect), GUIDToRV: map[string]int64{ item.guid: item.resourceVersion, diff --git a/pkg/storage/unified/sql/queries.go b/pkg/storage/unified/sql/queries.go index 5b9a177e17c..725c4617403 100644 --- a/pkg/storage/unified/sql/queries.go +++ b/pkg/storage/unified/sql/queries.go @@ -38,10 +38,8 @@ var ( sqlResourceList = mustTemplate("resource_list.sql") sqlResourceHistoryList = mustTemplate("resource_history_list.sql") sqlResourceHistoryListModifiedSince = mustTemplate("resource_history_list_since_modified.sql") - sqlResourceUpdateRV = mustTemplate("resource_update_rv.sql") sqlResourceHistoryRead = mustTemplate("resource_history_read.sql") sqlResourceHistoryReadLatestRV = mustTemplate("resource_history_read_latest_rv.sql") - sqlResourceHistoryUpdateRV = mustTemplate("resource_history_update_rv.sql") sqlResourceHistoryInsert = mustTemplate("resource_history_insert.sql") sqlResourceHistoryPoll = mustTemplate("resource_history_poll.sql") sqlResourceHistoryGet = mustTemplate("resource_history_get.sql") @@ -51,10 +49,7 @@ var ( sqlResourceInsertFromHistory = mustTemplate("resource_insert_from_history.sql") // sqlResourceLabelsInsert = mustTemplate("resource_labels_insert.sql") - sqlResourceVersionGet = mustTemplate("resource_version_get.sql") - sqlResourceVersionUpdate = mustTemplate("resource_version_update.sql") - sqlResourceVersionInsert = mustTemplate("resource_version_insert.sql") - sqlResourceVersionList = mustTemplate("resource_version_list.sql") + sqlResourceVersionList = mustTemplate("resource_version_list.sql") sqlResourceBlobInsert = mustTemplate("resource_blob_insert.sql") sqlResourceBlobQuery = mustTemplate("resource_blob_query.sql") @@ -365,76 +360,11 @@ func (r sqlResourceBlobQueryRequest) Validate() error { return nil } -// update RV - -type sqlResourceUpdateRVRequest struct { - sqltemplate.SQLTemplate - GUIDToRV map[string]int64 - GUIDToSnowflakeRV map[string]int64 -} - -func (r sqlResourceUpdateRVRequest) Validate() error { - return nil // TODO -} - -func (r sqlResourceUpdateRVRequest) SlashFunc() string { - if r.DialectName() == "postgres" { - return "CHR(47)" - } - - return "CHAR(47)" -} - -func (r sqlResourceUpdateRVRequest) TildeFunc() string { - if r.DialectName() == "postgres" { - return "CHR(126)" - } - - return "CHAR(126)" -} - -// resource_version table requests. -type resourceVersionResponse struct { - ResourceVersion int64 - CurrentEpoch int64 -} - -func (r *resourceVersionResponse) Results() (*resourceVersionResponse, error) { - return r, nil -} - type groupResourceVersion struct { Group, Resource string ResourceVersion int64 } -type sqlResourceVersionUpsertRequest struct { - sqltemplate.SQLTemplate - Group, Resource string - ResourceVersion int64 -} - -func (r sqlResourceVersionUpsertRequest) Validate() error { - return nil // TODO -} - -type sqlResourceVersionGetRequest struct { - sqltemplate.SQLTemplate - Group, Resource string - ReadOnly bool - Response *resourceVersionResponse -} - -func (r sqlResourceVersionGetRequest) Validate() error { - return nil // TODO -} -func (r sqlResourceVersionGetRequest) Results() (*resourceVersionResponse, error) { - return &resourceVersionResponse{ - ResourceVersion: r.Response.ResourceVersion, - CurrentEpoch: r.Response.CurrentEpoch, - }, nil -} - type sqlResourceVersionListRequest struct { sqltemplate.SQLTemplate *groupResourceVersion diff --git a/pkg/storage/unified/sql/queries_test.go b/pkg/storage/unified/sql/queries_test.go index 5673bfad1e8..93e9b1f5af4 100644 --- a/pkg/storage/unified/sql/queries_test.go +++ b/pkg/storage/unified/sql/queries_test.go @@ -8,6 +8,7 @@ import ( "github.com/grafana/grafana/pkg/apimachinery/utils" "github.com/grafana/grafana/pkg/storage/unified/resource" "github.com/grafana/grafana/pkg/storage/unified/resourcepb" + "github.com/grafana/grafana/pkg/storage/unified/sql/rvmanager" "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate/mocks" ) @@ -162,10 +163,10 @@ func TestUnifiedStorageQueries(t *testing.T) { }, }, - sqlResourceUpdateRV: { + rvmanager.SqlResourceUpdateRV: { { Name: "single path", - Data: &sqlResourceUpdateRVRequest{ + Data: &rvmanager.SqlResourceUpdateRVRequest{ SQLTemplate: mocks.NewTestingSQLTemplate(), GUIDToRV: map[string]int64{ "guid1": 123, @@ -228,10 +229,10 @@ func TestUnifiedStorageQueries(t *testing.T) { }, }, - sqlResourceHistoryUpdateRV: { + rvmanager.SqlResourceHistoryUpdateRV: { { Name: "single path", - Data: &sqlResourceUpdateRVRequest{ + Data: &rvmanager.SqlResourceUpdateRVRequest{ SQLTemplate: mocks.NewTestingSQLTemplate(), GUIDToRV: map[string]int64{ "guid1": 123, @@ -334,23 +335,23 @@ func TestUnifiedStorageQueries(t *testing.T) { }, }, - sqlResourceVersionGet: { + rvmanager.SqlResourceVersionGet: { { Name: "single path", - Data: &sqlResourceVersionGetRequest{ + Data: &rvmanager.SqlResourceVersionGetRequest{ SQLTemplate: mocks.NewTestingSQLTemplate(), Resource: "resource", Group: "group", - Response: new(resourceVersionResponse), + Response: new(rvmanager.ResourceVersionResponse), ReadOnly: false, }, }, }, - sqlResourceVersionUpdate: { + rvmanager.SqlResourceVersionUpdate: { { Name: "increment resource version", - Data: &sqlResourceVersionUpsertRequest{ + Data: &rvmanager.SqlResourceVersionUpsertRequest{ SQLTemplate: mocks.NewTestingSQLTemplate(), Resource: "resource", Group: "group", @@ -359,10 +360,10 @@ func TestUnifiedStorageQueries(t *testing.T) { }, }, - sqlResourceVersionInsert: { + rvmanager.SqlResourceVersionInsert: { { Name: "single path", - Data: &sqlResourceVersionUpsertRequest{ + Data: &rvmanager.SqlResourceVersionUpsertRequest{ SQLTemplate: mocks.NewTestingSQLTemplate(), ResourceVersion: int64(12354), }, diff --git a/pkg/storage/unified/sql/data/resource_history_update_rv.sql b/pkg/storage/unified/sql/rvmanager/data/resource_history_update_rv.sql similarity index 100% rename from pkg/storage/unified/sql/data/resource_history_update_rv.sql rename to pkg/storage/unified/sql/rvmanager/data/resource_history_update_rv.sql diff --git a/pkg/storage/unified/sql/data/resource_update_rv.sql b/pkg/storage/unified/sql/rvmanager/data/resource_update_rv.sql similarity index 100% rename from pkg/storage/unified/sql/data/resource_update_rv.sql rename to pkg/storage/unified/sql/rvmanager/data/resource_update_rv.sql diff --git a/pkg/storage/unified/sql/data/resource_version_get.sql b/pkg/storage/unified/sql/rvmanager/data/resource_version_get.sql similarity index 100% rename from pkg/storage/unified/sql/data/resource_version_get.sql rename to pkg/storage/unified/sql/rvmanager/data/resource_version_get.sql diff --git a/pkg/storage/unified/sql/data/resource_version_insert.sql b/pkg/storage/unified/sql/rvmanager/data/resource_version_insert.sql similarity index 100% rename from pkg/storage/unified/sql/data/resource_version_insert.sql rename to pkg/storage/unified/sql/rvmanager/data/resource_version_insert.sql diff --git a/pkg/storage/unified/sql/data/resource_version_update.sql b/pkg/storage/unified/sql/rvmanager/data/resource_version_update.sql similarity index 100% rename from pkg/storage/unified/sql/data/resource_version_update.sql rename to pkg/storage/unified/sql/rvmanager/data/resource_version_update.sql diff --git a/pkg/storage/unified/sql/rvmanager/queries.go b/pkg/storage/unified/sql/rvmanager/queries.go new file mode 100644 index 00000000000..c0411cdf704 --- /dev/null +++ b/pkg/storage/unified/sql/rvmanager/queries.go @@ -0,0 +1,84 @@ +package rvmanager + +import ( + "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate" +) + +type SqlResourceUpdateRVRequest struct { + sqltemplate.SQLTemplate + GUIDToRV map[string]int64 + GUIDToSnowflakeRV map[string]int64 +} + +func (r SqlResourceUpdateRVRequest) Validate() error { + return nil // TODO +} + +func (r SqlResourceUpdateRVRequest) SlashFunc() string { + if r.DialectName() == "postgres" { + return "CHR(47)" + } + + return "CHAR(47)" +} + +func (r SqlResourceUpdateRVRequest) TildeFunc() string { + if r.DialectName() == "postgres" { + return "CHR(126)" + } + + return "CHAR(126)" +} + +type ResourceVersionResponse struct { + ResourceVersion int64 + CurrentEpoch int64 +} + +func (r *ResourceVersionResponse) Results() (*ResourceVersionResponse, error) { + return r, nil +} + +type sqlResourceVersionGetRequest struct { + sqltemplate.SQLTemplate + Group, Resource string + ReadOnly bool + Response *ResourceVersionResponse +} + +func (r sqlResourceVersionGetRequest) Validate() error { + return nil // TODO +} +func (r sqlResourceVersionGetRequest) Results() (*ResourceVersionResponse, error) { + return &ResourceVersionResponse{ + ResourceVersion: r.Response.ResourceVersion, + CurrentEpoch: r.Response.CurrentEpoch, + }, nil +} + +type SqlResourceVersionUpsertRequest struct { + sqltemplate.SQLTemplate + Group, Resource string + ResourceVersion int64 +} + +func (r SqlResourceVersionUpsertRequest) Validate() error { + return nil // TODO +} + +type SqlResourceVersionGetRequest struct { + sqltemplate.SQLTemplate + Group, Resource string + ReadOnly bool + Response *ResourceVersionResponse +} + +func (r SqlResourceVersionGetRequest) Validate() error { + return nil // TODO +} +func (r SqlResourceVersionGetRequest) Results() (*ResourceVersionResponse, error) { + return &ResourceVersionResponse{ + ResourceVersion: r.Response.ResourceVersion, + CurrentEpoch: r.Response.CurrentEpoch, + }, nil +} diff --git a/pkg/storage/unified/sql/rv_manager.go b/pkg/storage/unified/sql/rvmanager/rv_manager.go similarity index 89% rename from pkg/storage/unified/sql/rv_manager.go rename to pkg/storage/unified/sql/rvmanager/rv_manager.go index 858345b1fc2..b4f3b5de596 100644 --- a/pkg/storage/unified/sql/rv_manager.go +++ b/pkg/storage/unified/sql/rvmanager/rv_manager.go @@ -1,4 +1,4 @@ -package sql +package rvmanager import ( "context" @@ -11,6 +11,7 @@ import ( "github.com/bwmarrin/snowflake" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -20,6 +21,8 @@ import ( "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate" ) +var tracer = otel.Tracer("github.com/grafana/grafana/pkg/storage/unified/sql/rvmanager") + var ( rvmWriteDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ Name: "rvmanager_write_duration_seconds", @@ -62,8 +65,8 @@ const ( defaultBatchTimeout = 5 * time.Second ) -// resourceVersionManager handles resource version operations -type resourceVersionManager struct { +// ResourceVersionManager handles resource version operations +type ResourceVersionManager struct { dialect sqltemplate.Dialect db db.DB batchMu sync.RWMutex @@ -100,7 +103,7 @@ type ResourceManagerOptions struct { } // NewResourceVersionManager creates a new ResourceVersionManager -func NewResourceVersionManager(opts ResourceManagerOptions) (*resourceVersionManager, error) { +func NewResourceVersionManager(opts ResourceManagerOptions) (*ResourceVersionManager, error) { if opts.MaxBatchSize == 0 { opts.MaxBatchSize = defaultMaxBatchSize } @@ -113,7 +116,7 @@ func NewResourceVersionManager(opts ResourceManagerOptions) (*resourceVersionMan if opts.DB == nil { return nil, errors.New("db is required") } - return &resourceVersionManager{ + return &ResourceVersionManager{ dialect: opts.Dialect, db: opts.DB, batchChMap: make(map[string]chan *writeOp), @@ -123,7 +126,7 @@ func NewResourceVersionManager(opts ResourceManagerOptions) (*resourceVersionMan } // ExecWithRV executes the given function with an incremented resource version -func (m *resourceVersionManager) ExecWithRV(ctx context.Context, key *resourcepb.ResourceKey, fn WriteEventFunc) (rv int64, err error) { +func (m *ResourceVersionManager) ExecWithRV(ctx context.Context, key *resourcepb.ResourceKey, fn WriteEventFunc) (rv int64, err error) { rvmInflightWrites.WithLabelValues(key.Group, key.Resource).Inc() defer rvmInflightWrites.WithLabelValues(key.Group, key.Resource).Dec() @@ -179,7 +182,7 @@ func (m *resourceVersionManager) ExecWithRV(ctx context.Context, key *resourcepb } // startBatchProcessor is responsible for processing batches of write operations -func (m *resourceVersionManager) startBatchProcessor(group, resource string) { +func (m *ResourceVersionManager) startBatchProcessor(group, resource string) { ctx := context.TODO() batchKey := fmt.Sprintf("%s/%s", group, resource) @@ -216,7 +219,11 @@ func (m *resourceVersionManager) startBatchProcessor(group, resource string) { } } -func (m *resourceVersionManager) execBatch(ctx context.Context, group, resource string, batch []writeOp) { +var readCommitted = &sql.TxOptions{ + Isolation: sql.LevelReadCommitted, +} + +func (m *ResourceVersionManager) execBatch(ctx context.Context, group, resource string, batch []writeOp) { ctx, span := tracer.Start(ctx, "sql.resourceVersionManager.execBatch") defer span.End() @@ -245,7 +252,7 @@ func (m *resourceVersionManager) execBatch(ctx context.Context, group, resource guids := make([]string, len(batch)) // The GUIDs of the created resources in the same order as the batch rvs := make([]int64, len(batch)) // The RVs of the created resources in the same order as the batch - err = m.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error { + err = m.db.WithTx(ctx, readCommitted, func(ctx context.Context, tx db.Tx) error { span.AddEvent("starting_batch_transaction") writeTimer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) { @@ -268,7 +275,7 @@ func (m *resourceVersionManager) execBatch(ctx context.Context, group, resource lockTimer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) { rvmExecBatchPhaseDuration.WithLabelValues(group, resource, "waiting_for_lock").Observe(v) })) - rv, err := m.lock(ctx, tx, group, resource) + rv, err := m.Lock(ctx, tx, group, resource) lockTimer.ObserveDuration() if err != nil { span.AddEvent("resource_version_lock_failed", trace.WithAttributes( @@ -292,7 +299,7 @@ func (m *resourceVersionManager) execBatch(ctx context.Context, group, resource rv++ } // Update the resource version for the created resources in both the resource and the resource history - if _, err := dbutil.Exec(ctx, tx, sqlResourceUpdateRV, sqlResourceUpdateRVRequest{ + if _, err := dbutil.Exec(ctx, tx, SqlResourceUpdateRV, SqlResourceUpdateRVRequest{ SQLTemplate: sqltemplate.New(m.dialect), GUIDToRV: guidToRV, }); err != nil { @@ -303,7 +310,7 @@ func (m *resourceVersionManager) execBatch(ctx context.Context, group, resource } span.AddEvent("resource_versions_updated") - if _, err := dbutil.Exec(ctx, tx, sqlResourceHistoryUpdateRV, sqlResourceUpdateRVRequest{ + if _, err := dbutil.Exec(ctx, tx, SqlResourceHistoryUpdateRV, SqlResourceUpdateRVRequest{ SQLTemplate: sqltemplate.New(m.dialect), GUIDToRV: guidToRV, GUIDToSnowflakeRV: guidToSnowflakeRV, @@ -316,7 +323,7 @@ func (m *resourceVersionManager) execBatch(ctx context.Context, group, resource span.AddEvent("resource_history_versions_updated") // Record the latest RV in the resource version table - err = m.saveRV(ctx, tx, group, resource, rv) + err = m.SaveRV(ctx, tx, group, resource, rv) if err != nil { span.AddEvent("save_rv_failed", trace.WithAttributes( attribute.String("error", err.Error()), @@ -350,20 +357,20 @@ func snowflakeFromRv(rv int64) int64 { return (((rv / 1000) - snowflake.Epoch) << (snowflake.NodeBits + snowflake.StepBits)) + (rv % 1000) } -// lock locks the resource version for the given key -func (m *resourceVersionManager) lock(ctx context.Context, x db.ContextExecer, group, resource string) (nextRV int64, err error) { +// Lock locks the resource version for the given key +func (m *ResourceVersionManager) Lock(ctx context.Context, x db.ContextExecer, group, resource string) (nextRV int64, err error) { // 1. Lock the row and prevent concurrent updates until the transaction is committed - res, err := dbutil.QueryRow(ctx, x, sqlResourceVersionGet, sqlResourceVersionGetRequest{ + res, err := dbutil.QueryRow(ctx, x, SqlResourceVersionGet, sqlResourceVersionGetRequest{ SQLTemplate: sqltemplate.New(m.dialect), Group: group, Resource: resource, - Response: new(resourceVersionResponse), + Response: new(ResourceVersionResponse), ReadOnly: false, // Lock the row for update }) if errors.Is(err, sql.ErrNoRows) { // If there wasn't a row for this resource, create it - if _, err = dbutil.Exec(ctx, x, sqlResourceVersionInsert, sqlResourceVersionUpsertRequest{ + if _, err = dbutil.Exec(ctx, x, SqlResourceVersionInsert, SqlResourceVersionUpsertRequest{ SQLTemplate: sqltemplate.New(m.dialect), Group: group, Resource: resource, @@ -372,11 +379,11 @@ func (m *resourceVersionManager) lock(ctx context.Context, x db.ContextExecer, g } // Fetch the newly created resource version - res, err = dbutil.QueryRow(ctx, x, sqlResourceVersionGet, sqlResourceVersionGetRequest{ + res, err = dbutil.QueryRow(ctx, x, SqlResourceVersionGet, sqlResourceVersionGetRequest{ SQLTemplate: sqltemplate.New(m.dialect), Group: group, Resource: resource, - Response: new(resourceVersionResponse), + Response: new(ResourceVersionResponse), ReadOnly: true, }) if err != nil { @@ -390,8 +397,8 @@ func (m *resourceVersionManager) lock(ctx context.Context, x db.ContextExecer, g return max(res.CurrentEpoch, res.ResourceVersion+1), nil } -func (m *resourceVersionManager) saveRV(ctx context.Context, x db.ContextExecer, group, resource string, rv int64) error { - _, err := dbutil.Exec(ctx, x, sqlResourceVersionUpdate, sqlResourceVersionUpsertRequest{ +func (m *ResourceVersionManager) SaveRV(ctx context.Context, x db.ContextExecer, group, resource string, rv int64) error { + _, err := dbutil.Exec(ctx, x, SqlResourceVersionUpdate, SqlResourceVersionUpsertRequest{ SQLTemplate: sqltemplate.New(m.dialect), Group: group, Resource: resource, diff --git a/pkg/storage/unified/sql/rv_manager_test.go b/pkg/storage/unified/sql/rvmanager/rv_manager_test.go similarity index 99% rename from pkg/storage/unified/sql/rv_manager_test.go rename to pkg/storage/unified/sql/rvmanager/rv_manager_test.go index 46d23d26a59..9a2e105aa2f 100644 --- a/pkg/storage/unified/sql/rv_manager_test.go +++ b/pkg/storage/unified/sql/rvmanager/rv_manager_test.go @@ -1,4 +1,4 @@ -package sql +package rvmanager import ( "testing" diff --git a/pkg/storage/unified/sql/rvmanager/templates.go b/pkg/storage/unified/sql/rvmanager/templates.go new file mode 100644 index 00000000000..65c91a273af --- /dev/null +++ b/pkg/storage/unified/sql/rvmanager/templates.go @@ -0,0 +1,30 @@ +package rvmanager + +import ( + "embed" + "fmt" + "text/template" +) + +// Templates setup. +var ( + //go:embed data/*.sql + sqlTemplatesFS embed.FS + + sqlTemplates = template.Must(template.New("sql").ParseFS(sqlTemplatesFS, `data/*.sql`)) +) + +func mustTemplate(filename string) *template.Template { + if t := sqlTemplates.Lookup(filename); t != nil { + return t + } + panic(fmt.Sprintf("template file not found: %s", filename)) +} + +var ( + SqlResourceUpdateRV = mustTemplate("resource_update_rv.sql") + SqlResourceHistoryUpdateRV = mustTemplate("resource_history_update_rv.sql") + SqlResourceVersionGet = mustTemplate("resource_version_get.sql") + SqlResourceVersionUpdate = mustTemplate("resource_version_update.sql") + SqlResourceVersionInsert = mustTemplate("resource_version_insert.sql") +)