refactor(unified-storage): remove resource_server config option (#107649)
This commit is contained in:
committed by
GitHub
parent
73ab088804
commit
602b7826c4
@@ -1,24 +1,14 @@
|
||||
package dbimpl
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/go-sql-driver/mysql"
|
||||
"github.com/grafana/dskit/crypto/tls"
|
||||
|
||||
"github.com/grafana/grafana/pkg/util/xorm"
|
||||
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/sql/db"
|
||||
)
|
||||
|
||||
// tlsConfigName is the name of the TLS config that we register with the MySQL
|
||||
// driver.
|
||||
const tlsConfigName = "db_engine_tls"
|
||||
|
||||
func getEngine(config *sqlstore.DatabaseConfig) (*xorm.Engine, error) {
|
||||
switch config.Type {
|
||||
case dbTypeMySQL, dbTypePostgres, dbTypeSQLite:
|
||||
@@ -36,156 +26,3 @@ func getEngine(config *sqlstore.DatabaseConfig) (*xorm.Engine, error) {
|
||||
return nil, fmt.Errorf("unsupported database type: %s", config.Type)
|
||||
}
|
||||
}
|
||||
|
||||
// Deprecated: use getEngine instead
|
||||
func getEngineMySQL(getter confGetter) (*xorm.Engine, error) {
|
||||
config := mysql.NewConfig()
|
||||
config.User = getter.String("user")
|
||||
// accept the core Grafana jargon of `password` as well, originally Unified
|
||||
// Storage used `pass`
|
||||
config.Passwd = cmp.Or(getter.String("pass"), getter.String("password"))
|
||||
config.Net = "tcp"
|
||||
config.Addr = getter.String("host")
|
||||
config.DBName = getter.String("name")
|
||||
config.Params = map[string]string{
|
||||
// See: https://dev.mysql.com/doc/refman/en/sql-mode.html
|
||||
"@@SESSION.sql_mode": "ANSI",
|
||||
}
|
||||
config.Collation = "utf8mb4_unicode_ci"
|
||||
config.Loc = time.UTC
|
||||
config.AllowNativePasswords = true
|
||||
config.ClientFoundRows = true
|
||||
config.ParseTime = true
|
||||
|
||||
// Setup TLS for the database connection if configured.
|
||||
if err := configureTLS(getter, config); err != nil {
|
||||
return nil, fmt.Errorf("failed to configure TLS: %w", err)
|
||||
}
|
||||
|
||||
// allow executing multiple SQL statements in a single roundtrip, and also
|
||||
// enable executing the CALL statement to run stored procedures that execute
|
||||
// multiple SQL statements.
|
||||
//config.MultiStatements = true
|
||||
|
||||
if err := getter.Err(); err != nil {
|
||||
return nil, fmt.Errorf("config error: %w", err)
|
||||
}
|
||||
|
||||
if strings.HasPrefix(config.Addr, "/") {
|
||||
config.Net = "unix"
|
||||
}
|
||||
|
||||
engine, err := xorm.NewEngine(db.DriverMySQL, config.FormatDSN())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("open database: %w", err)
|
||||
}
|
||||
|
||||
engine.SetMaxOpenConns(getter.Int("max_open_conn", 0))
|
||||
engine.SetMaxIdleConns(getter.Int("max_idle_conn", 4))
|
||||
maxLifetime := time.Duration(getter.Int("conn_max_lifetime", 14400)) * time.Second
|
||||
engine.SetConnMaxLifetime(maxLifetime)
|
||||
|
||||
return engine, nil
|
||||
}
|
||||
|
||||
func configureTLS(getter confGetter, config *mysql.Config) error {
|
||||
sslMode := getter.String("ssl_mode")
|
||||
|
||||
if sslMode == "true" || sslMode == "skip-verify" {
|
||||
tlsCfg := tls.ClientConfig{
|
||||
CAPath: getter.String("ca_cert_path"),
|
||||
CertPath: getter.String("client_cert_path"),
|
||||
KeyPath: getter.String("client_key_path"),
|
||||
ServerName: getter.String("server_cert_name"),
|
||||
}
|
||||
|
||||
rawTLSCfg, err := tlsCfg.GetTLSConfig()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get TLS config for mysql: %w", err)
|
||||
}
|
||||
|
||||
if sslMode == "skip-verify" {
|
||||
rawTLSCfg.InsecureSkipVerify = true
|
||||
}
|
||||
|
||||
if err := mysql.RegisterTLSConfig(tlsConfigName, rawTLSCfg); err != nil {
|
||||
return fmt.Errorf("failed to register TLS config for mysql: %w", err)
|
||||
}
|
||||
|
||||
config.TLSConfig = tlsConfigName
|
||||
}
|
||||
|
||||
// If the TLS mode is set in the database config, we need to set it here.
|
||||
if tls := getter.String("tls"); tls != "" {
|
||||
// If the user has provided TLS certs, we don't want to use the tls=<value>, as
|
||||
// they would override the TLS config that we set above. They both use the same
|
||||
// parameter, so we need to check for that.
|
||||
if sslMode == "true" {
|
||||
return fmt.Errorf("cannot provide tls certs and tls=<value> at the same time")
|
||||
}
|
||||
config.Params["tls"] = tls
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Deprecated: use getEngine instead
|
||||
func getEnginePostgres(getter confGetter) (*xorm.Engine, error) {
|
||||
dsnKV := map[string]string{
|
||||
"user": getter.String("user"),
|
||||
// accept the core Grafana jargon of `password` as well, originally
|
||||
// Unified Storage used `pass`
|
||||
"password": cmp.Or(getter.String("pass"), getter.String("password")),
|
||||
"dbname": getter.String("name"),
|
||||
"sslmode": cmp.Or(getter.String("ssl_mode"), "disable"),
|
||||
"sslsni": getter.String("ssl_sni"),
|
||||
"sslrootcert": getter.String("ca_cert_path"),
|
||||
"sslkey": getter.String("client_key_path"),
|
||||
"sslcert": getter.String("client_cert_path"),
|
||||
}
|
||||
|
||||
// TODO: probably interesting:
|
||||
// "passfile", "statement_timeout", "lock_timeout", "connect_timeout"
|
||||
|
||||
// TODO: for CockroachDB, we probably need to use the following:
|
||||
// dsnKV["options"] = "-c enable_experimental_alter_column_type_general=true"
|
||||
// Or otherwise specify it as:
|
||||
// dsnKV["enable_experimental_alter_column_type_general"] = "true"
|
||||
|
||||
// TODO: do we want to support these options in the DSN as well?
|
||||
// "sslpassword", "krbspn", "krbsrvname", "target_session_attrs", "service", "servicefile"
|
||||
|
||||
// More on Postgres connection string parameters:
|
||||
// https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING
|
||||
|
||||
hostport := getter.String("host")
|
||||
|
||||
if err := getter.Err(); err != nil {
|
||||
return nil, fmt.Errorf("config error: %w", err)
|
||||
}
|
||||
|
||||
host, port, err := splitHostPortDefault(hostport, "127.0.0.1", "5432")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid host: %w", err)
|
||||
}
|
||||
dsnKV["host"] = host
|
||||
dsnKV["port"] = port
|
||||
|
||||
dsn, err := MakeDSN(dsnKV)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error building DSN: %w", err)
|
||||
}
|
||||
|
||||
// FIXME: get rid of xorm
|
||||
engine, err := xorm.NewEngine(db.DriverPostgres, dsn)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("open database: %w", err)
|
||||
}
|
||||
|
||||
engine.SetMaxOpenConns(getter.Int("max_open_conn", 0))
|
||||
engine.SetMaxIdleConns(getter.Int("max_idle_conn", 4))
|
||||
maxLifetime := time.Duration(getter.Int("conn_max_lifetime", 14400)) * time.Second
|
||||
engine.SetConnMaxLifetime(maxLifetime)
|
||||
|
||||
return engine, nil
|
||||
}
|
||||
|
||||
@@ -1,17 +1,10 @@
|
||||
package dbimpl
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"crypto/rsa"
|
||||
"crypto/x509"
|
||||
"crypto/x509/pkix"
|
||||
"encoding/pem"
|
||||
"math/big"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
@@ -19,20 +12,6 @@ import (
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
)
|
||||
|
||||
func newValidMySQLGetter(withKeyPrefix bool) confGetter {
|
||||
var prefix string
|
||||
if withKeyPrefix {
|
||||
prefix = "db_"
|
||||
}
|
||||
return newTestConfGetter(map[string]string{
|
||||
prefix + "type": dbTypeMySQL,
|
||||
prefix + "host": "/var/run/mysql.socket",
|
||||
prefix + "name": "grafana",
|
||||
prefix + "user": "user",
|
||||
prefix + "password": "password",
|
||||
}, prefix)
|
||||
}
|
||||
|
||||
func TestNewResourceDbProvider(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
@@ -146,274 +125,3 @@ password = password
|
||||
require.Equal(t, dbTypeMySQL, engine.engine.Dialect().DriverName())
|
||||
require.Contains(t, engine.engine.DataSourceName(), "overthere:3306")
|
||||
}
|
||||
|
||||
func TestGetEngineMySQLFromConfig(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("happy path - with key prefix", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
engine, err := getEngineMySQL(newValidMySQLGetter(true))
|
||||
require.NotNil(t, engine)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("happy path - without key prefix", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
engine, err := getEngineMySQL(newValidMySQLGetter(false))
|
||||
require.NotNil(t, engine)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("invalid string", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
getter := newTestConfGetter(map[string]string{
|
||||
"db_type": dbTypeMySQL,
|
||||
"db_host": "/var/run/mysql.socket",
|
||||
"db_name": string(invalidUTF8ByteSequence),
|
||||
"db_user": "user",
|
||||
"db_password": "password",
|
||||
}, "db_")
|
||||
engine, err := getEngineMySQL(getter)
|
||||
require.Nil(t, engine)
|
||||
require.Error(t, err)
|
||||
require.ErrorIs(t, err, errInvalidUTF8Sequence)
|
||||
})
|
||||
}
|
||||
|
||||
func newValidPostgresGetter(withKeyPrefix bool) confGetter {
|
||||
var prefix string
|
||||
if withKeyPrefix {
|
||||
prefix = "db_"
|
||||
}
|
||||
return newTestConfGetter(map[string]string{
|
||||
prefix + "type": dbTypePostgres,
|
||||
prefix + "host": "localhost",
|
||||
prefix + "name": "grafana",
|
||||
prefix + "user": "user",
|
||||
prefix + "password": "password",
|
||||
}, prefix)
|
||||
}
|
||||
|
||||
func TestGetEnginePostgresFromConfig(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("happy path - with key prefix", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
engine, err := getEnginePostgres(newValidPostgresGetter(true))
|
||||
require.NotNil(t, engine)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("happy path - without key prefix", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
engine, err := getEnginePostgres(newValidPostgresGetter(false))
|
||||
require.NotNil(t, engine)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("invalid string", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
getter := newTestConfGetter(map[string]string{
|
||||
"db_type": dbTypePostgres,
|
||||
"db_host": string(invalidUTF8ByteSequence),
|
||||
"db_name": "grafana",
|
||||
"db_user": "user",
|
||||
"db_password": "password",
|
||||
}, "db_")
|
||||
engine, err := getEnginePostgres(getter)
|
||||
|
||||
require.Nil(t, engine)
|
||||
require.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("invalid hostport", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
getter := newTestConfGetter(map[string]string{
|
||||
"db_type": dbTypePostgres,
|
||||
"db_host": "1:1:1",
|
||||
"db_name": "grafana",
|
||||
"db_user": "user",
|
||||
"db_password": "password",
|
||||
}, "db_")
|
||||
engine, err := getEnginePostgres(getter)
|
||||
|
||||
require.Nil(t, engine)
|
||||
require.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestGetEngineMySQLTLS(t *testing.T) {
|
||||
certs := generateTestCerts(t)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
config map[string]string
|
||||
shouldErr bool
|
||||
}{
|
||||
{
|
||||
name: "with TLS disabled",
|
||||
config: map[string]string{
|
||||
"type": "mysql",
|
||||
"user": "user",
|
||||
"pass": "pass",
|
||||
"host": "localhost",
|
||||
"name": "dbname",
|
||||
"ssl_mode": "disable",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "with TLS skip-verify",
|
||||
config: map[string]string{
|
||||
"type": "mysql",
|
||||
"user": "user",
|
||||
"pass": "pass",
|
||||
"host": "localhost",
|
||||
"name": "dbname",
|
||||
"ssl_mode": "skip-verify",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "with valid TLS certificates",
|
||||
config: map[string]string{
|
||||
"type": "mysql",
|
||||
"user": "user",
|
||||
"pass": "pass",
|
||||
"host": "localhost",
|
||||
"name": "dbname",
|
||||
"ssl_mode": "true",
|
||||
"ca_cert_path": certs.caFile,
|
||||
"client_cert_path": certs.certFile,
|
||||
"client_key_path": certs.keyFile,
|
||||
"server_cert_name": "mysql.example.com",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "with invalid cert paths",
|
||||
config: map[string]string{
|
||||
"type": "mysql",
|
||||
"user": "user",
|
||||
"pass": "pass",
|
||||
"host": "localhost",
|
||||
"name": "dbname",
|
||||
"ssl_mode": "true",
|
||||
"ca_cert_path": "nonexistent/ca.pem",
|
||||
"client_cert_path": "nonexistent/client-cert.pem",
|
||||
"client_key_path": "nonexistent/client-key.pem",
|
||||
"server_cert_name": "mysql.example.com",
|
||||
},
|
||||
shouldErr: true,
|
||||
},
|
||||
{
|
||||
name: "with TLS certs and tls parameter",
|
||||
config: map[string]string{
|
||||
"type": "mysql",
|
||||
"user": "user",
|
||||
"pass": "pass",
|
||||
"host": "localhost",
|
||||
"name": "dbname",
|
||||
"ssl_mode": "true",
|
||||
"ca_cert_path": certs.caFile,
|
||||
"client_cert_path": certs.certFile,
|
||||
"client_key_path": certs.keyFile,
|
||||
"server_cert_name": "mysql.example.com",
|
||||
"tls": "preferred",
|
||||
},
|
||||
shouldErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
getter := newTestConfGetter(tt.config, "")
|
||||
engine, err := getEngineMySQL(getter)
|
||||
|
||||
if tt.shouldErr {
|
||||
require.Error(t, err)
|
||||
return
|
||||
}
|
||||
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, engine)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type testCerts struct {
|
||||
caFile string
|
||||
certFile string
|
||||
keyFile string
|
||||
}
|
||||
|
||||
func generateTestCerts(t *testing.T) testCerts {
|
||||
t.Helper()
|
||||
tempDir := t.TempDir()
|
||||
|
||||
// Generate CA private key
|
||||
caKey, err := rsa.GenerateKey(rand.Reader, 2048)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Generate CA certificate
|
||||
ca := &x509.Certificate{
|
||||
SerialNumber: big.NewInt(1),
|
||||
Subject: pkix.Name{
|
||||
CommonName: "Test CA",
|
||||
},
|
||||
NotBefore: time.Now(),
|
||||
NotAfter: time.Now().AddDate(1, 0, 0),
|
||||
IsCA: true,
|
||||
KeyUsage: x509.KeyUsageCertSign | x509.KeyUsageDigitalSignature,
|
||||
BasicConstraintsValid: true,
|
||||
}
|
||||
|
||||
caBytes, err := x509.CreateCertificate(rand.Reader, ca, ca, &caKey.PublicKey, caKey)
|
||||
require.NoError(t, err)
|
||||
|
||||
clientKey, err := rsa.GenerateKey(rand.Reader, 2048)
|
||||
require.NoError(t, err)
|
||||
|
||||
client := &x509.Certificate{
|
||||
SerialNumber: big.NewInt(2),
|
||||
Subject: pkix.Name{
|
||||
CommonName: "Test Client",
|
||||
},
|
||||
NotBefore: time.Now(),
|
||||
NotAfter: time.Now().AddDate(1, 0, 0),
|
||||
KeyUsage: x509.KeyUsageDigitalSignature,
|
||||
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth},
|
||||
SubjectKeyId: []byte{1, 2, 3, 4, 5},
|
||||
}
|
||||
|
||||
clientBytes, err := x509.CreateCertificate(rand.Reader, client, ca, &clientKey.PublicKey, caKey)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Write certificates and keys to temporary files
|
||||
caFile := filepath.Join(tempDir, "ca.pem")
|
||||
certFile := filepath.Join(tempDir, "cert.pem")
|
||||
keyFile := filepath.Join(tempDir, "key.pem")
|
||||
|
||||
writePEMFile(t, caFile, "CERTIFICATE", caBytes)
|
||||
writePEMFile(t, certFile, "CERTIFICATE", clientBytes)
|
||||
writePEMFile(t, keyFile, "RSA PRIVATE KEY", x509.MarshalPKCS1PrivateKey(clientKey))
|
||||
|
||||
return testCerts{
|
||||
caFile: caFile,
|
||||
certFile: certFile,
|
||||
keyFile: keyFile,
|
||||
}
|
||||
}
|
||||
|
||||
func writePEMFile(t *testing.T, filename string, blockType string, bytes []byte) {
|
||||
t.Helper()
|
||||
//nolint:gosec
|
||||
file, err := os.Create(filename)
|
||||
require.NoError(t, err)
|
||||
//nolint:errcheck
|
||||
defer file.Close()
|
||||
|
||||
err = pem.Encode(file, &pem.Block{
|
||||
Type: blockType,
|
||||
Bytes: bytes,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
@@ -62,62 +62,37 @@ type resourceDBProvider struct {
|
||||
initErr error
|
||||
}
|
||||
|
||||
func newResourceDBProvider(grafanaDB infraDB.DB, cfg *setting.Cfg, tracer trace.Tracer) (p *resourceDBProvider, err error) {
|
||||
// Resource API has other configs in its section besides database ones, so
|
||||
// we prefix them with "db_". We use the database config from core Grafana
|
||||
// as fallback, and as it uses a dedicated INI section, then keys are not
|
||||
// prefixed with "db_"
|
||||
getter := newConfGetter(cfg.SectionWithEnvOverrides("resource_api"), "db_")
|
||||
fallbackConfig, fallbackErr := sqlstore.NewDatabaseConfig(cfg, nil)
|
||||
if fallbackErr != nil {
|
||||
// Ignore error here and keep going.
|
||||
fallbackConfig = nil
|
||||
}
|
||||
|
||||
logger := log.New("entity-db")
|
||||
p = &resourceDBProvider{
|
||||
func newResourceDBProvider(grafanaDB infraDB.DB, cfg *setting.Cfg, tracer trace.Tracer) (*resourceDBProvider, error) {
|
||||
logger := log.New("resource-db")
|
||||
p := &resourceDBProvider{
|
||||
cfg: cfg,
|
||||
log: logger,
|
||||
logQueries: getter.Bool("log_queries"),
|
||||
migrateFunc: migrations.MigrateResourceStore,
|
||||
tracer: tracer,
|
||||
}
|
||||
|
||||
dbType := getter.String("type")
|
||||
switch {
|
||||
// Deprecated: First try with the config in the "resource_api" section, which is specific to Unified Storage
|
||||
case dbType == dbTypePostgres:
|
||||
logger.Info("Using resource_api section", "db_type", dbType)
|
||||
p.registerMetrics = true
|
||||
p.engine, err = getEnginePostgres(getter)
|
||||
return p, err
|
||||
|
||||
case dbType == dbTypeMySQL:
|
||||
logger.Info("Using resource_api section", "db_type", dbType)
|
||||
p.registerMetrics = true
|
||||
p.engine, err = getEngineMySQL(getter)
|
||||
return p, err
|
||||
|
||||
case dbType != "":
|
||||
return p, fmt.Errorf("invalid db type specified: %s", dbType)
|
||||
|
||||
// If we have an empty Resource API db config, try with the core Grafana database config
|
||||
case fallbackConfig != nil && fallbackConfig.Type != "":
|
||||
logger.Info("Using database section", "db_type", fallbackConfig.Type)
|
||||
p.registerMetrics = true
|
||||
p.engine, err = getEngine(fallbackConfig)
|
||||
return p, err
|
||||
case grafanaDB != nil:
|
||||
// try to use the grafana db connection (should only happen in tests)
|
||||
// Try to use the grafana db connection, should only happen in tests.
|
||||
if grafanaDB != nil {
|
||||
if newConfGetter(cfg.SectionWithEnvOverrides("database"), "").Bool(grafanaDBInstrumentQueriesKey) {
|
||||
return nil, errGrafanaDBInstrumentedNotSupported
|
||||
}
|
||||
p.engine = grafanaDB.GetEngine()
|
||||
p.logQueries = cfg.SectionWithEnvOverrides("database").Key("log_queries").MustBool(false)
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// If we don't provide a DB, lets build it from the config.
|
||||
dbCfg, err := sqlstore.NewDatabaseConfig(cfg, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch {
|
||||
case dbCfg.Type != "":
|
||||
logger.Info("Using database section", "db_type", dbCfg.Type)
|
||||
p.registerMetrics = true
|
||||
p.engine, err = getEngine(dbCfg)
|
||||
return p, err
|
||||
default:
|
||||
if fallbackErr != nil {
|
||||
return nil, fallbackErr
|
||||
}
|
||||
return nil, fmt.Errorf("no database type specified")
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user