xorm introduced some changes in
https://github.com/go-xorm/xorm/pull/824 and
https://github.com/go-xorm/xorm/pull/876 which by default will use
public as the postgres schema and this was a breaking change compared
to before. Grafana has implemented a custom postgres dialect so above
changes wasn't a problem here. However, Grafana's custom database
migration was using xorm dialect to check if the migration table exists
or not.
For those using a custom search_path (schema) in postgres configured on
server, database or user level the migration table check would not find
the migration table since it was looking in public schema due to xorm
changes above. This had the consequence that Grafana's database
migration failed the second time since migration had already run
migrations in another schema.
This change will make xorm use an empty default schema for postgres and
by that mimic the functionality of how it was functioning before
xorm's changes above.
Fixes #16720
Co-Authored-By: Carl Bergquist <carl@grafana.com>
(cherry picked from commit b7a9533476)
435 lines
11 KiB
Go
435 lines
11 KiB
Go
package sqlstore
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/url"
|
|
"os"
|
|
"path"
|
|
"path/filepath"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/go-sql-driver/mysql"
|
|
"github.com/go-xorm/xorm"
|
|
"github.com/grafana/grafana/pkg/bus"
|
|
"github.com/grafana/grafana/pkg/log"
|
|
m "github.com/grafana/grafana/pkg/models"
|
|
"github.com/grafana/grafana/pkg/registry"
|
|
"github.com/grafana/grafana/pkg/services/annotations"
|
|
"github.com/grafana/grafana/pkg/services/cache"
|
|
"github.com/grafana/grafana/pkg/services/sqlstore/migrations"
|
|
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
|
"github.com/grafana/grafana/pkg/services/sqlstore/sqlutil"
|
|
"github.com/grafana/grafana/pkg/setting"
|
|
_ "github.com/grafana/grafana/pkg/tsdb/mssql"
|
|
"github.com/grafana/grafana/pkg/util"
|
|
_ "github.com/lib/pq"
|
|
sqlite3 "github.com/mattn/go-sqlite3"
|
|
)
|
|
|
|
var (
|
|
x *xorm.Engine
|
|
dialect migrator.Dialect
|
|
|
|
sqlog log.Logger = log.New("sqlstore")
|
|
)
|
|
|
|
const ContextSessionName = "db-session"
|
|
|
|
func init() {
|
|
// This change will make xorm use an empty default schema for postgres and
|
|
// by that mimic the functionality of how it was functioning before
|
|
// xorm's changes above.
|
|
xorm.DefaultPostgresSchema = ""
|
|
|
|
registry.Register(®istry.Descriptor{
|
|
Name: "SqlStore",
|
|
Instance: &SqlStore{},
|
|
InitPriority: registry.High,
|
|
})
|
|
}
|
|
|
|
type SqlStore struct {
|
|
Cfg *setting.Cfg `inject:""`
|
|
Bus bus.Bus `inject:""`
|
|
CacheService *cache.CacheService `inject:""`
|
|
|
|
dbCfg DatabaseConfig
|
|
engine *xorm.Engine
|
|
log log.Logger
|
|
Dialect migrator.Dialect
|
|
skipEnsureAdmin bool
|
|
}
|
|
|
|
// NewSession returns a new DBSession
|
|
func (ss *SqlStore) NewSession() *DBSession {
|
|
return &DBSession{Session: ss.engine.NewSession()}
|
|
}
|
|
|
|
// WithDbSession calls the callback with an session attached to the context.
|
|
func (ss *SqlStore) WithDbSession(ctx context.Context, callback dbTransactionFunc) error {
|
|
sess, err := startSession(ctx, ss.engine, false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return callback(sess)
|
|
}
|
|
|
|
// WithTransactionalDbSession calls the callback with an session within a transaction
|
|
func (ss *SqlStore) WithTransactionalDbSession(ctx context.Context, callback dbTransactionFunc) error {
|
|
return ss.inTransactionWithRetryCtx(ctx, callback, 0)
|
|
}
|
|
|
|
func (ss *SqlStore) inTransactionWithRetryCtx(ctx context.Context, callback dbTransactionFunc, retry int) error {
|
|
sess, err := startSession(ctx, ss.engine, true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
defer sess.Close()
|
|
|
|
err = callback(sess)
|
|
|
|
// special handling of database locked errors for sqlite, then we can retry 5 times
|
|
if sqlError, ok := err.(sqlite3.Error); ok && retry < 5 {
|
|
if sqlError.Code == sqlite3.ErrLocked || sqlError.Code == sqlite3.ErrBusy {
|
|
sess.Rollback()
|
|
time.Sleep(time.Millisecond * time.Duration(10))
|
|
sqlog.Info("Database locked, sleeping then retrying", "error", err, "retry", retry)
|
|
return ss.inTransactionWithRetryCtx(ctx, callback, retry+1)
|
|
}
|
|
}
|
|
|
|
if err != nil {
|
|
sess.Rollback()
|
|
return err
|
|
} else if err = sess.Commit(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(sess.events) > 0 {
|
|
for _, e := range sess.events {
|
|
if err = bus.Publish(e); err != nil {
|
|
log.Error(3, "Failed to publish event after commit. error: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ss *SqlStore) Init() error {
|
|
ss.log = log.New("sqlstore")
|
|
ss.readConfig()
|
|
|
|
engine, err := ss.getEngine()
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("Fail to connect to database: %v", err)
|
|
}
|
|
|
|
ss.engine = engine
|
|
ss.Dialect = migrator.NewDialect(ss.engine)
|
|
|
|
// temporarily still set global var
|
|
x = engine
|
|
dialect = ss.Dialect
|
|
|
|
migrator := migrator.NewMigrator(x)
|
|
migrations.AddMigrations(migrator)
|
|
|
|
for _, descriptor := range registry.GetServices() {
|
|
sc, ok := descriptor.Instance.(registry.DatabaseMigrator)
|
|
if ok {
|
|
sc.AddMigration(migrator)
|
|
}
|
|
}
|
|
|
|
if err := migrator.Start(); err != nil {
|
|
return fmt.Errorf("Migration failed err: %v", err)
|
|
}
|
|
|
|
// Init repo instances
|
|
annotations.SetRepository(&SqlAnnotationRepo{})
|
|
ss.Bus.SetTransactionManager(ss)
|
|
|
|
// Register handlers
|
|
ss.addUserQueryAndCommandHandlers()
|
|
|
|
// ensure admin user
|
|
if ss.skipEnsureAdmin {
|
|
return nil
|
|
}
|
|
|
|
return ss.ensureAdminUser()
|
|
}
|
|
|
|
func (ss *SqlStore) ensureAdminUser() error {
|
|
systemUserCountQuery := m.GetSystemUserCountStatsQuery{}
|
|
|
|
err := ss.InTransaction(context.Background(), func(ctx context.Context) error {
|
|
|
|
err := bus.DispatchCtx(ctx, &systemUserCountQuery)
|
|
if err != nil {
|
|
return fmt.Errorf("Could not determine if admin user exists: %v", err)
|
|
}
|
|
|
|
if systemUserCountQuery.Result.Count > 0 {
|
|
return nil
|
|
}
|
|
|
|
cmd := m.CreateUserCommand{}
|
|
cmd.Login = setting.AdminUser
|
|
cmd.Email = setting.AdminUser + "@localhost"
|
|
cmd.Password = setting.AdminPassword
|
|
cmd.IsAdmin = true
|
|
|
|
if err := bus.DispatchCtx(ctx, &cmd); err != nil {
|
|
return fmt.Errorf("Failed to create admin user: %v", err)
|
|
}
|
|
|
|
ss.log.Info("Created default admin", "user", setting.AdminUser)
|
|
|
|
return nil
|
|
})
|
|
|
|
return err
|
|
}
|
|
|
|
func (ss *SqlStore) buildExtraConnectionString(sep rune) string {
|
|
if ss.dbCfg.UrlQueryParams == nil {
|
|
return ""
|
|
}
|
|
|
|
var sb strings.Builder
|
|
for key, values := range ss.dbCfg.UrlQueryParams {
|
|
for _, value := range values {
|
|
sb.WriteRune(sep)
|
|
sb.WriteString(key)
|
|
sb.WriteRune('=')
|
|
sb.WriteString(value)
|
|
}
|
|
}
|
|
return sb.String()
|
|
}
|
|
|
|
func (ss *SqlStore) buildConnectionString() (string, error) {
|
|
cnnstr := ss.dbCfg.ConnectionString
|
|
|
|
// special case used by integration tests
|
|
if cnnstr != "" {
|
|
return cnnstr, nil
|
|
}
|
|
|
|
switch ss.dbCfg.Type {
|
|
case migrator.MYSQL:
|
|
protocol := "tcp"
|
|
if strings.HasPrefix(ss.dbCfg.Host, "/") {
|
|
protocol = "unix"
|
|
}
|
|
|
|
cnnstr = fmt.Sprintf("%s:%s@%s(%s)/%s?collation=utf8mb4_unicode_ci&allowNativePasswords=true",
|
|
ss.dbCfg.User, ss.dbCfg.Pwd, protocol, ss.dbCfg.Host, ss.dbCfg.Name)
|
|
|
|
if ss.dbCfg.SslMode == "true" || ss.dbCfg.SslMode == "skip-verify" {
|
|
tlsCert, err := makeCert("custom", ss.dbCfg)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
mysql.RegisterTLSConfig("custom", tlsCert)
|
|
cnnstr += "&tls=custom"
|
|
}
|
|
|
|
cnnstr += ss.buildExtraConnectionString('&')
|
|
case migrator.POSTGRES:
|
|
host, port := util.SplitHostPortDefault(ss.dbCfg.Host, "127.0.0.1", "5432")
|
|
if ss.dbCfg.Pwd == "" {
|
|
ss.dbCfg.Pwd = "''"
|
|
}
|
|
if ss.dbCfg.User == "" {
|
|
ss.dbCfg.User = "''"
|
|
}
|
|
cnnstr = fmt.Sprintf("user=%s password=%s host=%s port=%s dbname=%s sslmode=%s sslcert=%s sslkey=%s sslrootcert=%s", ss.dbCfg.User, ss.dbCfg.Pwd, host, port, ss.dbCfg.Name, ss.dbCfg.SslMode, ss.dbCfg.ClientCertPath, ss.dbCfg.ClientKeyPath, ss.dbCfg.CaCertPath)
|
|
|
|
cnnstr += ss.buildExtraConnectionString(' ')
|
|
case migrator.SQLITE:
|
|
// special case for tests
|
|
if !filepath.IsAbs(ss.dbCfg.Path) {
|
|
ss.dbCfg.Path = filepath.Join(ss.Cfg.DataPath, ss.dbCfg.Path)
|
|
}
|
|
os.MkdirAll(path.Dir(ss.dbCfg.Path), os.ModePerm)
|
|
cnnstr = fmt.Sprintf("file:%s?cache=%s&mode=rwc", ss.dbCfg.Path, ss.dbCfg.CacheMode)
|
|
cnnstr += ss.buildExtraConnectionString('&')
|
|
default:
|
|
return "", fmt.Errorf("Unknown database type: %s", ss.dbCfg.Type)
|
|
}
|
|
|
|
return cnnstr, nil
|
|
}
|
|
|
|
func (ss *SqlStore) getEngine() (*xorm.Engine, error) {
|
|
connectionString, err := ss.buildConnectionString()
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
sqlog.Info("Connecting to DB", "dbtype", ss.dbCfg.Type)
|
|
engine, err := xorm.NewEngine(ss.dbCfg.Type, connectionString)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
engine.SetMaxOpenConns(ss.dbCfg.MaxOpenConn)
|
|
engine.SetMaxIdleConns(ss.dbCfg.MaxIdleConn)
|
|
engine.SetConnMaxLifetime(time.Second * time.Duration(ss.dbCfg.ConnMaxLifetime))
|
|
|
|
// configure sql logging
|
|
debugSql := ss.Cfg.Raw.Section("database").Key("log_queries").MustBool(false)
|
|
if !debugSql {
|
|
engine.SetLogger(&xorm.DiscardLogger{})
|
|
} else {
|
|
engine.SetLogger(NewXormLogger(log.LvlInfo, log.New("sqlstore.xorm")))
|
|
engine.ShowSQL(true)
|
|
engine.ShowExecTime(true)
|
|
}
|
|
|
|
return engine, nil
|
|
}
|
|
|
|
func (ss *SqlStore) readConfig() {
|
|
sec := ss.Cfg.Raw.Section("database")
|
|
|
|
cfgURL := sec.Key("url").String()
|
|
if len(cfgURL) != 0 {
|
|
dbURL, _ := url.Parse(cfgURL)
|
|
ss.dbCfg.Type = dbURL.Scheme
|
|
ss.dbCfg.Host = dbURL.Host
|
|
|
|
pathSplit := strings.Split(dbURL.Path, "/")
|
|
if len(pathSplit) > 1 {
|
|
ss.dbCfg.Name = pathSplit[1]
|
|
}
|
|
|
|
userInfo := dbURL.User
|
|
if userInfo != nil {
|
|
ss.dbCfg.User = userInfo.Username()
|
|
ss.dbCfg.Pwd, _ = userInfo.Password()
|
|
}
|
|
|
|
ss.dbCfg.UrlQueryParams = dbURL.Query()
|
|
} else {
|
|
ss.dbCfg.Type = sec.Key("type").String()
|
|
ss.dbCfg.Host = sec.Key("host").String()
|
|
ss.dbCfg.Name = sec.Key("name").String()
|
|
ss.dbCfg.User = sec.Key("user").String()
|
|
ss.dbCfg.ConnectionString = sec.Key("connection_string").String()
|
|
ss.dbCfg.Pwd = sec.Key("password").String()
|
|
}
|
|
|
|
ss.dbCfg.MaxOpenConn = sec.Key("max_open_conn").MustInt(0)
|
|
ss.dbCfg.MaxIdleConn = sec.Key("max_idle_conn").MustInt(2)
|
|
ss.dbCfg.ConnMaxLifetime = sec.Key("conn_max_lifetime").MustInt(14400)
|
|
|
|
ss.dbCfg.SslMode = sec.Key("ssl_mode").String()
|
|
ss.dbCfg.CaCertPath = sec.Key("ca_cert_path").String()
|
|
ss.dbCfg.ClientKeyPath = sec.Key("client_key_path").String()
|
|
ss.dbCfg.ClientCertPath = sec.Key("client_cert_path").String()
|
|
ss.dbCfg.ServerCertName = sec.Key("server_cert_name").String()
|
|
ss.dbCfg.Path = sec.Key("path").MustString("data/grafana.db")
|
|
|
|
ss.dbCfg.CacheMode = sec.Key("cache_mode").MustString("private")
|
|
}
|
|
|
|
func InitTestDB(t *testing.T) *SqlStore {
|
|
t.Helper()
|
|
sqlstore := &SqlStore{}
|
|
sqlstore.skipEnsureAdmin = true
|
|
sqlstore.Bus = bus.New()
|
|
sqlstore.CacheService = cache.New(5*time.Minute, 10*time.Minute)
|
|
|
|
dbType := migrator.SQLITE
|
|
|
|
// environment variable present for test db?
|
|
if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
|
|
dbType = db
|
|
}
|
|
|
|
// set test db config
|
|
sqlstore.Cfg = setting.NewCfg()
|
|
sec, _ := sqlstore.Cfg.Raw.NewSection("database")
|
|
sec.NewKey("type", dbType)
|
|
|
|
switch dbType {
|
|
case "mysql":
|
|
sec.NewKey("connection_string", sqlutil.TestDB_Mysql.ConnStr)
|
|
case "postgres":
|
|
sec.NewKey("connection_string", sqlutil.TestDB_Postgres.ConnStr)
|
|
default:
|
|
sec.NewKey("connection_string", sqlutil.TestDB_Sqlite3.ConnStr)
|
|
}
|
|
|
|
// need to get engine to clean db before we init
|
|
engine, err := xorm.NewEngine(dbType, sec.Key("connection_string").String())
|
|
if err != nil {
|
|
t.Fatalf("Failed to init test database: %v", err)
|
|
}
|
|
|
|
sqlstore.Dialect = migrator.NewDialect(engine)
|
|
|
|
// temp global var until we get rid of global vars
|
|
dialect = sqlstore.Dialect
|
|
|
|
if err := dialect.CleanDB(); err != nil {
|
|
t.Fatalf("Failed to clean test db %v", err)
|
|
}
|
|
|
|
if err := sqlstore.Init(); err != nil {
|
|
t.Fatalf("Failed to init test database: %v", err)
|
|
}
|
|
|
|
sqlstore.engine.DatabaseTZ = time.UTC
|
|
sqlstore.engine.TZLocation = time.UTC
|
|
|
|
return sqlstore
|
|
}
|
|
|
|
func IsTestDbMySql() bool {
|
|
if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
|
|
return db == migrator.MYSQL
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func IsTestDbPostgres() bool {
|
|
if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
|
|
return db == migrator.POSTGRES
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
type DatabaseConfig struct {
|
|
Type string
|
|
Host string
|
|
Name string
|
|
User string
|
|
Pwd string
|
|
Path string
|
|
SslMode string
|
|
CaCertPath string
|
|
ClientKeyPath string
|
|
ClientCertPath string
|
|
ServerCertName string
|
|
ConnectionString string
|
|
MaxOpenConn int
|
|
MaxIdleConn int
|
|
ConnMaxLifetime int
|
|
CacheMode string
|
|
UrlQueryParams map[string][]string
|
|
}
|