fix(unified): in-proc SQLite data migration (#114537)

* feat: unified storage migrations integration tests

* chore: add comment and adjust db path name

* chore: refactor test cases into interface

* fix: unified SQLite migration with SQLStore migrator

* revert changes to newResourceDBProvider
This commit is contained in:
Rafael Bortolon Paulovic
2025-11-28 13:13:35 +01:00
committed by GitHub
parent 11a27ab870
commit 12c6d7e83f
7 changed files with 286 additions and 132 deletions
+143 -107
View File
@@ -8,6 +8,7 @@ import (
"sync"
"time"
"github.com/fullstorydev/grpchan/inprocgrpc"
"github.com/google/uuid"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -20,6 +21,7 @@ import (
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/grafana/grafana/pkg/storage/unified/sql/db"
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
"github.com/grafana/grafana/pkg/storage/unified/sql/dbutil"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
)
@@ -111,6 +113,19 @@ func (b *backend) ProcessBulk(ctx context.Context, setting resource.BulkSettings
}
defer b.bulkLock.Finish(setting.Collection)
// If provided, reuse the inproc transaction for SQLite
if clientCtx := inprocgrpc.ClientContext(ctx); clientCtx != nil && b.dialect.DialectName() == "sqlite" {
if externalTx := resource.TransactionFromContext(clientCtx); externalTx != nil {
b.log.Info("Using SQLite transaction from client context")
rsp := &resourcepb.BulkResponse{}
err := b.processBulkWithTx(ctx, dbimpl.NewTx(externalTx), setting, iter, rsp)
if err != nil {
rsp.Error = resource.AsErrorResult(err)
}
return rsp
}
}
// We may want to first write parquet, then read parquet
if b.dialect.DialectName() == "sqlite" {
file, err := os.CreateTemp("", "grafana-bulk-export-*.parquet")
@@ -151,109 +166,134 @@ func (b *backend) ProcessBulk(ctx context.Context, setting resource.BulkSettings
func (b *backend) processBulk(ctx context.Context, setting resource.BulkSettings, iter resource.BulkRequestIterator) *resourcepb.BulkResponse {
rsp := &resourcepb.BulkResponse{}
err := b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error {
rollbackWithError := func(err error) error {
txerr := tx.Rollback()
if txerr != nil {
b.log.Warn("rollback", "error", txerr)
} else {
b.log.Info("rollback")
return b.processBulkWithTx(ctx, tx, setting, iter, rsp)
})
if err != nil {
rsp.Error = resource.AsErrorResult(err)
}
return rsp
}
// processBulkWithTx performs the bulk operation using the provided transaction.
// This is used both when creating our own transaction and when reusing an external one.
func (b *backend) processBulkWithTx(ctx context.Context, tx db.Tx, setting resource.BulkSettings, iter resource.BulkRequestIterator, rsp *resourcepb.BulkResponse) error {
rollbackWithError := func(err error) error {
txerr := tx.Rollback()
if txerr != nil {
b.log.Warn("rollback", "error", txerr)
} else {
b.log.Info("rollback")
}
return err
}
bulk := &bulkWroker{
ctx: ctx,
tx: tx,
dialect: b.dialect,
logger: logging.FromContext(ctx),
}
// Calculate the RV based on incoming request timestamps
rv := newBulkRV()
summaries := make(map[string]*resourcepb.BulkResponse_Summary, len(setting.Collection))
// First clear everything in the transaction
if setting.RebuildCollection {
for _, key := range setting.Collection {
summary, err := bulk.deleteCollection(key)
if err != nil {
return rollbackWithError(err)
}
summaries[resource.NSGR(key)] = summary
rsp.Summary = append(rsp.Summary, summary)
}
} else {
for _, key := range setting.Collection {
summaries[resource.NSGR(key)] = &resourcepb.BulkResponse_Summary{
Namespace: key.Namespace,
Group: key.Group,
Resource: key.Resource,
}
}
}
obj := &unstructured.Unstructured{}
// Write each event into the history
for iter.Next() {
if iter.RollbackRequested() {
return rollbackWithError(nil)
}
req := iter.Request()
if req == nil {
return rollbackWithError(fmt.Errorf("missing request"))
}
rsp.Processed++
if req.Action == resourcepb.BulkRequest_UNKNOWN {
rsp.Rejected = append(rsp.Rejected, &resourcepb.BulkResponse_Rejected{
Key: req.Key,
Action: req.Action,
Error: "unknown action",
})
continue
}
err := obj.UnmarshalJSON(req.Value)
if err != nil {
rsp.Rejected = append(rsp.Rejected, &resourcepb.BulkResponse_Rejected{
Key: req.Key,
Action: req.Action,
Error: "unable to unmarshal json",
})
continue
}
// Write the event to history
if _, err := dbutil.Exec(ctx, tx, sqlResourceHistoryInsert, sqlResourceRequest{
SQLTemplate: sqltemplate.New(b.dialect),
WriteEvent: resource.WriteEvent{
Key: req.Key,
Type: resourcepb.WatchEvent_Type(req.Action),
Value: req.Value,
PreviousRV: -1, // Used for WATCH, but we want to skip watch events
},
Folder: req.Folder,
GUID: uuid.New().String(),
ResourceVersion: rv.next(obj),
}); err != nil {
return rollbackWithError(fmt.Errorf("insert into resource history: %w", err))
}
}
// Now update the resource table from history
for _, key := range setting.Collection {
k := fmt.Sprintf("%s/%s/%s", key.Namespace, key.Group, key.Resource)
summary := summaries[k]
if summary == nil {
return rollbackWithError(fmt.Errorf("missing summary key for: %s", k))
}
err := bulk.syncCollection(key, summary)
if err != nil {
return err
}
bulk := &bulkWroker{
ctx: ctx,
tx: tx,
dialect: b.dialect,
logger: logging.FromContext(ctx),
}
// Calculate the RV based on incoming request timestamps
rv := newBulkRV()
summaries := make(map[string]*resourcepb.BulkResponse_Summary, len(setting.Collection))
// First clear everything in the transaction
if setting.RebuildCollection {
for _, key := range setting.Collection {
summary, err := bulk.deleteCollection(key)
if err != nil {
return rollbackWithError(err)
if b.dialect.DialectName() == "sqlite" {
nextRV, err := b.rvManager.lock(ctx, tx, key.Group, key.Resource)
if err != nil {
b.log.Error("error locking RV", "error", err, "key", resource.NSGR(key))
} else {
b.log.Info("successfully locked RV", "nextRV", nextRV, "key", resource.NSGR(key))
// Save the incremented RV
if err := b.rvManager.saveRV(ctx, tx, key.Group, key.Resource, nextRV); err != nil {
b.log.Error("error saving RV", "error", err, "key", resource.NSGR(key))
} else {
b.log.Info("successfully saved RV", "rv", nextRV, "key", resource.NSGR(key))
}
summaries[resource.NSGR(key)] = summary
rsp.Summary = append(rsp.Summary, summary)
}
} else {
for _, key := range setting.Collection {
summaries[resource.NSGR(key)] = &resourcepb.BulkResponse_Summary{
Namespace: key.Namespace,
Group: key.Group,
Resource: key.Resource,
}
}
}
obj := &unstructured.Unstructured{}
// Write each event into the history
for iter.Next() {
if iter.RollbackRequested() {
return rollbackWithError(nil)
}
req := iter.Request()
if req == nil {
return rollbackWithError(fmt.Errorf("missing request"))
}
rsp.Processed++
if req.Action == resourcepb.BulkRequest_UNKNOWN {
rsp.Rejected = append(rsp.Rejected, &resourcepb.BulkResponse_Rejected{
Key: req.Key,
Action: req.Action,
Error: "unknown action",
})
continue
}
err := obj.UnmarshalJSON(req.Value)
if err != nil {
rsp.Rejected = append(rsp.Rejected, &resourcepb.BulkResponse_Rejected{
Key: req.Key,
Action: req.Action,
Error: "unable to unmarshal json",
})
continue
}
// Write the event to history
if _, err := dbutil.Exec(ctx, tx, sqlResourceHistoryInsert, sqlResourceRequest{
SQLTemplate: sqltemplate.New(b.dialect),
WriteEvent: resource.WriteEvent{
Key: req.Key,
Type: resourcepb.WatchEvent_Type(req.Action),
Value: req.Value,
PreviousRV: -1, // Used for WATCH, but we want to skip watch events
},
Folder: req.Folder,
GUID: uuid.New().String(),
ResourceVersion: rv.next(obj),
}); err != nil {
return rollbackWithError(fmt.Errorf("insert into resource history: %w", err))
}
}
// Now update the resource table from history
for _, key := range setting.Collection {
k := fmt.Sprintf("%s/%s/%s", key.Namespace, key.Group, key.Resource)
summary := summaries[k]
if summary == nil {
return rollbackWithError(fmt.Errorf("missing summary key for: %s", k))
}
err := bulk.syncCollection(key, summary)
if err != nil {
return err
}
// Make sure the collection RV is above our last written event
_, err = b.rvManager.ExecWithRV(ctx, key, func(tx db.Tx) (string, error) {
return "", nil
@@ -261,19 +301,15 @@ func (b *backend) processBulk(ctx context.Context, setting resource.BulkSettings
if err != nil {
b.log.Warn("error increasing RV", "error", err)
}
// Update the last import time. This is important to trigger reindexing
// of the resource for a given namespace.
if err := b.updateLastImportTime(ctx, tx, key, time.Now()); err != nil {
return rollbackWithError(err)
}
}
return nil
})
if err != nil {
rsp.Error = resource.AsErrorResult(err)
// Update the last import time. This is important to trigger reindexing
// of the resource for a given namespace.
if err := b.updateLastImportTime(ctx, tx, key, time.Now()); err != nil {
return rollbackWithError(err)
}
}
return rsp
return nil
}
func (b *backend) updateLastImportTime(ctx context.Context, tx db.Tx, key *resourcepb.ResourceKey, now time.Time) error {
+5
View File
@@ -48,6 +48,11 @@ type sqlTx struct {
*sql.Tx
}
// NewTx wraps an existing *sql.Tx with sqlTx
func NewTx(tx *sql.Tx) db.Tx {
return sqlTx{tx}
}
func (tx sqlTx) QueryContext(ctx context.Context, query string, args ...any) (db.Rows, error) {
// // codeql-suppress go/sql-query-built-from-user-controlled-sources "The query comes from a safe template source
// and the parameters are passed as arguments."