MSSQL: Current-user authentication (#113977)
* Moving things around
* Update frontend to support CUA
* Add CUA support to backend
* Copy parseURL function to where it's used
* Update test
* Remove experimental-strip-types
* Docs
* A bit more of a refactor to reduce complexity
* Revert "Remove experimental-strip-types"
This reverts commit 70fbc1c0cd.
* Review
* Docs updates
* Another docs fix
This commit is contained in:
@@ -2,6 +2,7 @@ package sqleng
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
@@ -16,9 +17,11 @@ import (
|
||||
|
||||
"github.com/grafana/grafana-azure-sdk-go/v2/azcredentials"
|
||||
"github.com/grafana/grafana-azure-sdk-go/v2/azsettings"
|
||||
"github.com/grafana/grafana-azure-sdk-go/v2/azusercontext"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/gtime"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/proxy"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data/sqlutil"
|
||||
"github.com/grafana/grafana/pkg/cmd/grafana-cli/logger"
|
||||
@@ -74,6 +77,7 @@ type DataSourceInfo struct {
|
||||
Updated time.Time
|
||||
UID string
|
||||
DecryptedSecureJSONData map[string]string
|
||||
OrgID int64
|
||||
}
|
||||
|
||||
type DataPluginConfiguration struct {
|
||||
@@ -97,6 +101,8 @@ type DataSourceHandler struct {
|
||||
azureCredentials azcredentials.AzureCredentials
|
||||
kerberosAuth kerberos.KerberosAuth
|
||||
driverName string
|
||||
proxyClient proxy.Client
|
||||
dbConnections sync.Map
|
||||
}
|
||||
|
||||
type QueryJson struct {
|
||||
@@ -142,6 +148,12 @@ func NewQueryDataHandler(ctx context.Context, settings backend.DataSourceInstanc
|
||||
return nil, fmt.Errorf("error getting kerberos settings: %w", err)
|
||||
}
|
||||
|
||||
proxyClient, err := settings.ProxyClient(ctx)
|
||||
if err != nil {
|
||||
logger.Error("mssql proxy creation failed", "error", err)
|
||||
return nil, fmt.Errorf("mssql proxy creation failed")
|
||||
}
|
||||
|
||||
queryDataHandler := DataSourceHandler{
|
||||
queryResultTransformer: &queryResultTransformer,
|
||||
macroEngine: newMssqlMacroEngine(),
|
||||
@@ -154,6 +166,7 @@ func NewQueryDataHandler(ctx context.Context, settings backend.DataSourceInstanc
|
||||
azureCredentials: azureCredentials,
|
||||
kerberosAuth: kerberosAuth,
|
||||
driverName: driverName,
|
||||
proxyClient: proxyClient,
|
||||
}
|
||||
|
||||
if len(config.TimeColumnNames) > 0 {
|
||||
@@ -164,18 +177,21 @@ func NewQueryDataHandler(ctx context.Context, settings backend.DataSourceInstanc
|
||||
queryDataHandler.metricColumnTypes = config.MetricColumnTypes
|
||||
}
|
||||
|
||||
cnnstr, err := generateConnectionString(config.DSInfo, azureSettings.ManagedIdentityClientId, azureSettings.AzureEntraPasswordCredentialsEnabled, azureCredentials, kerberosAuth, log)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Every auth method besides Azure AD Current User Identity can use a persistent DB connection
|
||||
if config.DSInfo.JsonData.AuthenticationType != azureAuthentication || azureCredentials.AzureAuthType() != azcredentials.AzureAuthCurrentUserIdentity {
|
||||
cnnstr, err := generateConnectionString(config.DSInfo, azureCredentials, kerberosAuth, log, azureSettings, "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
db, err := newMSSQL(ctx, driverName, config.RowLimit, config.DSInfo, cnnstr, log, settings)
|
||||
if err != nil {
|
||||
logger.Error("Failed connecting to MSSQL", "err", err)
|
||||
return nil, err
|
||||
}
|
||||
db, err := newMSSQL(driverName, config.RowLimit, config.DSInfo, cnnstr, log, proxyClient)
|
||||
if err != nil {
|
||||
logger.Error("Failed connecting to MSSQL", "err", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
queryDataHandler.db = db
|
||||
queryDataHandler.db = db
|
||||
}
|
||||
|
||||
return &queryDataHandler, nil
|
||||
}
|
||||
@@ -192,9 +208,52 @@ func (e *DataSourceHandler) Dispose() {
|
||||
e.log.Error("Failed to dispose db", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Clear any cached user-specific connections
|
||||
e.dbConnections.Range(func(_, conn interface{}) bool {
|
||||
_ = conn.(*sql.DB).Close()
|
||||
return true
|
||||
})
|
||||
e.dbConnections.Clear()
|
||||
|
||||
e.log.Debug("DB disposed")
|
||||
}
|
||||
|
||||
func (e *DataSourceHandler) getDB(ctx context.Context) (*sql.DB, error) {
|
||||
e.log.Debug("Getting DB...")
|
||||
if e.dsInfo.JsonData.AuthenticationType != azureAuthentication || e.azureCredentials.AzureAuthType() != azcredentials.AzureAuthCurrentUserIdentity {
|
||||
if e.db == nil {
|
||||
return nil, fmt.Errorf("database connection is not initialized")
|
||||
}
|
||||
return e.db, nil
|
||||
}
|
||||
|
||||
userCtx, ok := azusercontext.GetCurrentUser(ctx)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("failed to get user from context for Azure Current User authentication")
|
||||
}
|
||||
cacheKey := fmt.Sprintf("mssql-%d-%x-%x-%s", e.dsInfo.OrgID, sha256.Sum256([]byte(userCtx.User.Email)), sha256.Sum256([]byte(userCtx.IdToken)), e.dsInfo.UID)
|
||||
|
||||
conn, ok := e.dbConnections.Load(cacheKey)
|
||||
if ok {
|
||||
return conn.(*sql.DB), nil
|
||||
}
|
||||
|
||||
cnnstr, err := generateConnectionString(e.dsInfo, e.azureCredentials, e.kerberosAuth, e.log, e.azureSettings, userCtx.IdToken)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
db, err := newMSSQL(e.driverName, e.rowLimit, e.dsInfo, cnnstr, e.log, e.proxyClient)
|
||||
if err != nil {
|
||||
logger.Error("Failed connecting to MSSQL", "err", err)
|
||||
return nil, err
|
||||
}
|
||||
e.dbConnections.Store(cacheKey, db)
|
||||
|
||||
return db, nil
|
||||
}
|
||||
|
||||
func (e *DataSourceHandler) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
||||
result := backend.NewQueryDataResponse()
|
||||
ch := make(chan DBDataResponse, len(req.Queries))
|
||||
@@ -293,7 +352,12 @@ func (e *DataSourceHandler) executeQuery(query backend.DataQuery, wg *sync.WaitG
|
||||
return
|
||||
}
|
||||
|
||||
rows, err := e.db.QueryContext(queryContext, interpolatedQuery)
|
||||
db, err := e.getDB(queryContext)
|
||||
if err != nil {
|
||||
errAppendDebug("retrieving database connection failed", e.TransformQueryError(logger, err), interpolatedQuery, backend.ErrorSourcePlugin)
|
||||
return
|
||||
}
|
||||
rows, err := db.QueryContext(queryContext, interpolatedQuery)
|
||||
if err != nil {
|
||||
errAppendDebug("db query error", e.TransformQueryError(logger, err), interpolatedQuery, backend.ErrorSourceDownstream)
|
||||
return
|
||||
@@ -310,12 +374,19 @@ func (e *DataSourceHandler) executeQuery(query backend.DataQuery, wg *sync.WaitG
|
||||
return
|
||||
}
|
||||
|
||||
frame := e.processResponse(qm, rows, interpolatedQuery, errAppendDebug)
|
||||
|
||||
queryResult.dataResponse.Frames = data.Frames{frame}
|
||||
ch <- queryResult
|
||||
}
|
||||
|
||||
func (e *DataSourceHandler) processResponse(qm *dataQueryModel, rows *sql.Rows, interpolatedQuery string, errAppendDebug func(string, error, string, backend.ErrorSource)) *data.Frame {
|
||||
// Convert row.Rows to dataframe
|
||||
stringConverters := e.queryResultTransformer.GetConverterList()
|
||||
frame, err := sqlutil.FrameFromRows(rows, e.rowLimit, sqlutil.ToConverters(stringConverters...)...)
|
||||
if err != nil {
|
||||
errAppendDebug("convert frame from rows error", err, interpolatedQuery, backend.ErrorSourcePlugin)
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
if frame.Meta == nil {
|
||||
@@ -330,21 +401,19 @@ func (e *DataSourceHandler) executeQuery(query backend.DataQuery, wg *sync.WaitG
|
||||
// additionally-needed frame data stays intact and is correctly passed to our visulization.
|
||||
if frame.Rows() == 0 {
|
||||
frame.Fields = []*data.Field{}
|
||||
queryResult.dataResponse.Frames = data.Frames{frame}
|
||||
ch <- queryResult
|
||||
return
|
||||
return frame
|
||||
}
|
||||
|
||||
if err := convertSQLTimeColumnsToEpochMS(frame, qm); err != nil {
|
||||
errAppendDebug("converting time columns failed", err, interpolatedQuery, backend.ErrorSourcePlugin)
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
if qm.Format == dataQueryFormatSeries {
|
||||
// time series has to have time column
|
||||
if qm.timeIndex == -1 {
|
||||
errAppendDebug("db has no time column", errors.New("time column is missing; make sure your data includes a time column for time series format or switch to a table format that doesn't require it"), interpolatedQuery, backend.ErrorSourceDownstream)
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
// Make sure to name the time field 'Time' to be backward compatible with Grafana pre-v8.
|
||||
@@ -362,7 +431,7 @@ func (e *DataSourceHandler) executeQuery(query backend.DataQuery, wg *sync.WaitG
|
||||
var err error
|
||||
if frame, err = convertSQLValueColumnToFloat(frame, i); err != nil {
|
||||
errAppendDebug("convert value to float failed", err, interpolatedQuery, backend.ErrorSourcePlugin)
|
||||
return
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -373,7 +442,7 @@ func (e *DataSourceHandler) executeQuery(query backend.DataQuery, wg *sync.WaitG
|
||||
frame, err = data.LongToWide(frame, qm.FillMissing)
|
||||
if err != nil {
|
||||
errAppendDebug("failed to convert long to wide series when converting from dataframe", err, interpolatedQuery, backend.ErrorSourcePlugin)
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
// Before 8x, a special metric column was used to name time series. The LongToWide transforms that into a metric label on the value field.
|
||||
@@ -408,8 +477,7 @@ func (e *DataSourceHandler) executeQuery(query backend.DataQuery, wg *sync.WaitG
|
||||
}
|
||||
}
|
||||
|
||||
queryResult.dataResponse.Frames = data.Frames{frame}
|
||||
ch <- queryResult
|
||||
return frame
|
||||
}
|
||||
|
||||
// Interpolate provides global macros/substitutions for all sql datasources.
|
||||
|
||||
Reference in New Issue
Block a user