Unified-storage: sql backend key path backfill (#115033)
* unified-storage: add migration to backfill key_path in resource_history
This commit is contained in:
@@ -2,8 +2,11 @@ package migrations
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/bwmarrin/snowflake"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
||||
"github.com/grafana/grafana/pkg/util/xorm"
|
||||
)
|
||||
|
||||
func initResourceTables(mg *migrator.Migrator) string {
|
||||
@@ -204,5 +207,142 @@ func initResourceTables(mg *migrator.Migrator) string {
|
||||
Name: "IDX_resource_history_key_path",
|
||||
}))
|
||||
|
||||
mg.AddMigration("resource_history key_path backfill", &ResourceHistoryKeyPathBackfillMigration{})
|
||||
|
||||
return marker
|
||||
}
|
||||
|
||||
type ResourceHistoryKeyPathBackfillMigration struct {
|
||||
migrator.MigrationBase
|
||||
}
|
||||
|
||||
func (m *ResourceHistoryKeyPathBackfillMigration) SQL(_ migrator.Dialect) string {
|
||||
return "resource_history key_path backfill code migration"
|
||||
}
|
||||
|
||||
func (m *ResourceHistoryKeyPathBackfillMigration) Exec(sess *xorm.Session, mg *migrator.Migrator) error {
|
||||
rows, err := getResourceHistoryRows(sess, mg, resourceHistoryRow{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for len(rows) > 0 {
|
||||
if err := updateResourceHistoryKeyPath(sess, rows); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rows, err = getResourceHistoryRows(sess, mg, rows[len(rows)-1])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func updateResourceHistoryKeyPath(sess *xorm.Session, rows []resourceHistoryRow) error {
|
||||
if len(rows) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
updates := []resourceHistoryRow{}
|
||||
|
||||
for _, row := range rows {
|
||||
if row.KeyPath == "" {
|
||||
row.KeyPath = parseKeyPath(row)
|
||||
updates = append(updates, row)
|
||||
}
|
||||
}
|
||||
|
||||
if len(updates) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
guids := ""
|
||||
setCases := "CASE"
|
||||
for _, row := range updates {
|
||||
guids += fmt.Sprintf("'%s',", row.GUID)
|
||||
setCases += fmt.Sprintf(" WHEN guid = '%s' THEN '%s'", row.GUID, row.KeyPath)
|
||||
}
|
||||
|
||||
guids = strings.TrimRight(guids, ",")
|
||||
setCases += " ELSE key_path END "
|
||||
|
||||
// the query will look like this
|
||||
// UPDATE resource_history
|
||||
// SET key_path = CASE
|
||||
// WHEN guid = '1402de51-669b-4206-8a6c-005a00eee6e3' then 'unified/data/folder.grafana.app/folders/default/cf6lylpvls000c/1998492888241012800~created~'
|
||||
// WHEN guid = '8842cc56-f22b-45e1-82b1-99759cd443b3' then 'unified/data/dashboard.grafana.app/dashboards/default/adzvfhp/1998492902577144677~created~cf6lylpvls000c'
|
||||
// ELSE key_path END
|
||||
// WHERE guid IN ('1402de51-669b-4206-8a6c-005a00eee6e3', '8842cc56-f22b-45e1-82b1-99759cd443b3')
|
||||
// AND key_path = '';
|
||||
sql := fmt.Sprintf(`
|
||||
UPDATE resource_history
|
||||
SET key_path = %s
|
||||
WHERE guid IN (%s)
|
||||
AND key_path = '';
|
||||
`, setCases, guids)
|
||||
|
||||
if _, err := sess.Exec(sql); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func parseKeyPath(row resourceHistoryRow) string {
|
||||
var action string
|
||||
switch row.Action {
|
||||
case 1:
|
||||
action = "created"
|
||||
case 2:
|
||||
action = "updated"
|
||||
case 3:
|
||||
action = "deleted"
|
||||
}
|
||||
return fmt.Sprintf("unified/data/%s/%s/%s/%s/%d~%s~%s", row.Group, row.Resource, row.Namespace, row.Name, snowflakeFromRv(row.ResourceVersion), action, row.Folder)
|
||||
}
|
||||
|
||||
func snowflakeFromRv(rv int64) int64 {
|
||||
return (((rv / 1000) - snowflake.Epoch) << (snowflake.NodeBits + snowflake.StepBits)) + (rv % 1000)
|
||||
}
|
||||
|
||||
type resourceHistoryRow struct {
|
||||
GUID string `xorm:"guid"`
|
||||
Group string `xorm:"group"`
|
||||
Resource string `xorm:"resource"`
|
||||
Namespace string `xorm:"namespace"`
|
||||
Name string `xorm:"name"`
|
||||
ResourceVersion int64 `xorm:"resource_version"`
|
||||
Action int64 `xorm:"action"`
|
||||
Folder string `xorm:"folder"`
|
||||
KeyPath string `xorm:"key_path"`
|
||||
}
|
||||
|
||||
func getResourceHistoryRows(sess *xorm.Session, mg *migrator.Migrator, continueRow resourceHistoryRow) ([]resourceHistoryRow, error) {
|
||||
var rows []resourceHistoryRow
|
||||
cols := fmt.Sprintf(
|
||||
"%s, %s, %s, %s, %s, %s, %s, %s, %s",
|
||||
mg.Dialect.Quote("guid"),
|
||||
mg.Dialect.Quote("group"),
|
||||
mg.Dialect.Quote("resource"),
|
||||
mg.Dialect.Quote("namespace"),
|
||||
mg.Dialect.Quote("name"),
|
||||
mg.Dialect.Quote("resource_version"),
|
||||
mg.Dialect.Quote("action"),
|
||||
mg.Dialect.Quote("folder"),
|
||||
mg.Dialect.Quote("key_path"))
|
||||
sql := fmt.Sprintf(`
|
||||
SELECT %s
|
||||
FROM resource_history
|
||||
WHERE (resource_version > %d OR (resource_version = %d AND guid > '%s'))
|
||||
AND key_path = ''
|
||||
ORDER BY resource_version ASC, guid ASC
|
||||
LIMIT 1000;
|
||||
`, cols, continueRow.ResourceVersion, continueRow.ResourceVersion, continueRow.GUID)
|
||||
if err := sess.SQL(sql).Find(&rows); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return rows, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user