PostgreSQL: Remove feature toggle postgresDSUsePGX (#113675)

* PostgreSQL: Remove feature toggle `postgresDSUsePGX`

* Fix tests and linting

* Address review comments
This commit is contained in:
Zoltán Bedi
2025-11-24 10:26:41 +01:00
committed by GitHub
parent 39dc659ad8
commit 8d75d79313
34 changed files with 1176 additions and 5165 deletions
@@ -2,7 +2,6 @@ package sqleng
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
@@ -19,6 +18,9 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana-plugin-sdk-go/data/sqlutil"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
)
// MetaKeyExecutedQueryString is the key where the executed query should get stored
@@ -81,13 +83,13 @@ type DataPluginConfiguration struct {
type DataSourceHandler struct {
macroEngine SQLMacroEngine
queryResultTransformer SqlQueryResultTransformer
db *sql.DB
timeColumnNames []string
metricColumnTypes []string
log log.Logger
dsInfo DataSourceInfo
rowLimit int64
userError string
pool *pgxpool.Pool
}
type QueryJson struct {
@@ -112,7 +114,7 @@ func (e *DataSourceHandler) TransformQueryError(logger log.Logger, err error) er
return e.queryResultTransformer.TransformQueryError(logger, err)
}
func NewQueryDataHandler(userFacingDefaultError string, db *sql.DB, config DataPluginConfiguration, queryResultTransformer SqlQueryResultTransformer,
func NewQueryDataHandler(userFacingDefaultError string, p *pgxpool.Pool, config DataPluginConfiguration, queryResultTransformer SqlQueryResultTransformer,
macroEngine SQLMacroEngine, log log.Logger) (*DataSourceHandler, error) {
queryDataHandler := DataSourceHandler{
queryResultTransformer: queryResultTransformer,
@@ -132,7 +134,7 @@ func NewQueryDataHandler(userFacingDefaultError string, db *sql.DB, config DataP
queryDataHandler.metricColumnTypes = config.MetricColumnTypes
}
queryDataHandler.db = db
queryDataHandler.pool = p
return &queryDataHandler, nil
}
@@ -143,16 +145,16 @@ type DBDataResponse struct {
func (e *DataSourceHandler) Dispose() {
e.log.Debug("Disposing DB...")
if e.db != nil {
if err := e.db.Close(); err != nil {
e.log.Error("Failed to dispose db", "error", err)
}
if e.pool != nil {
e.pool.Close()
}
e.log.Debug("DB disposed")
}
func (e *DataSourceHandler) Ping() error {
return e.db.Ping()
func (e *DataSourceHandler) Ping(ctx context.Context) error {
return e.pool.Ping(ctx)
}
func (e *DataSourceHandler) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
@@ -167,13 +169,13 @@ func (e *DataSourceHandler) QueryData(ctx context.Context, req *backend.QueryDat
}
err := json.Unmarshal(query.JSON, &queryjson)
if err != nil {
return nil, fmt.Errorf("error unmarshal query json: %w", err)
return nil, backend.DownstreamErrorf("error unmarshal query json: %s", err.Error())
}
// the fill-params are only stored inside this function, during query-interpolation. we do not support
// sending them in "from the outside"
if queryjson.Fill || queryjson.FillInterval != 0.0 || queryjson.FillMode != "" || queryjson.FillValue != 0.0 {
return nil, fmt.Errorf("query fill-parameters not supported")
return nil, backend.DownstreamErrorf("query fill-parameters not supported")
}
if queryjson.RawSql == "" {
@@ -181,7 +183,7 @@ func (e *DataSourceHandler) QueryData(ctx context.Context, req *backend.QueryDat
}
wg.Add(1)
go e.executeQuery(query, &wg, ctx, ch, queryjson)
go e.executeQuery(ctx, query, &wg, ch, queryjson)
}
wg.Wait()
@@ -196,8 +198,21 @@ func (e *DataSourceHandler) QueryData(ctx context.Context, req *backend.QueryDat
return result, nil
}
func (e *DataSourceHandler) executeQuery(query backend.DataQuery, wg *sync.WaitGroup, queryContext context.Context,
ch chan DBDataResponse, queryJson QueryJson) {
func (e *DataSourceHandler) execQuery(ctx context.Context, query string) ([]*pgconn.Result, error) {
c, err := e.pool.Acquire(ctx)
if err != nil {
return nil, backend.DownstreamErrorf("failed to acquire connection: %w", err)
}
defer c.Release()
mrr := c.Conn().PgConn().Exec(ctx, query)
// Close returns the first error that occurred during the MultiResultReader's use. We will log that later.
defer mrr.Close() //nolint:errcheck
return mrr.ReadAll()
}
func (e *DataSourceHandler) executeQuery(queryContext context.Context, query backend.DataQuery, wg *sync.WaitGroup,
ch chan DBDataResponse, queryJSON QueryJson) {
defer wg.Done()
queryResult := DBDataResponse{
dataResponse: backend.DataResponse{},
@@ -205,84 +220,77 @@ func (e *DataSourceHandler) executeQuery(query backend.DataQuery, wg *sync.WaitG
}
logger := e.log.FromContext(queryContext)
defer e.handlePanic(logger, &queryResult, ch)
defer func() {
if r := recover(); r != nil {
logger.Error("ExecuteQuery panic", "error", r, "stack", string(debug.Stack()))
if theErr, ok := r.(error); ok {
queryResult.dataResponse.Error = theErr
queryResult.dataResponse.ErrorSource = backend.ErrorSourcePlugin
} else if theErrString, ok := r.(string); ok {
queryResult.dataResponse.Error = errors.New(theErrString)
queryResult.dataResponse.ErrorSource = backend.ErrorSourcePlugin
} else {
queryResult.dataResponse.Error = fmt.Errorf("unexpected error - %s", e.userError)
queryResult.dataResponse.ErrorSource = backend.ErrorSourceDownstream
}
ch <- queryResult
}
}()
if queryJson.RawSql == "" {
if queryJSON.RawSql == "" {
panic("Query model property rawSql should not be empty at this point")
}
timeRange := query.TimeRange
errAppendDebug := func(frameErr string, err error, query string, source backend.ErrorSource) {
var emptyFrame data.Frame
emptyFrame.SetMeta(&data.FrameMeta{
ExecutedQueryString: query,
})
if isDownstreamError(err) {
source = backend.ErrorSourceDownstream
}
queryResult.dataResponse.Error = fmt.Errorf("%s: %w", frameErr, err)
queryResult.dataResponse.ErrorSource = source
queryResult.dataResponse.Frames = data.Frames{&emptyFrame}
ch <- queryResult
}
// global substitutions
interpolatedQuery := Interpolate(query, timeRange, e.dsInfo.JsonData.TimeInterval, queryJson.RawSql)
interpolatedQuery := Interpolate(query, query.TimeRange, e.dsInfo.JsonData.TimeInterval, queryJSON.RawSql)
// data source specific substitutions
interpolatedQuery, err := e.macroEngine.Interpolate(&query, timeRange, interpolatedQuery)
interpolatedQuery, err := e.macroEngine.Interpolate(&query, query.TimeRange, interpolatedQuery)
if err != nil {
errAppendDebug("interpolation failed", e.TransformQueryError(logger, err), interpolatedQuery, backend.ErrorSourcePlugin)
e.handleQueryError("interpolation failed", e.TransformQueryError(logger, err), interpolatedQuery, backend.ErrorSourceDownstream, ch, queryResult)
return
}
rows, err := e.db.QueryContext(queryContext, interpolatedQuery)
results, err := e.execQuery(queryContext, interpolatedQuery)
if err != nil {
errAppendDebug("db query error", e.TransformQueryError(logger, err), interpolatedQuery, backend.ErrorSourceDownstream)
e.handleQueryError("db query error", e.TransformQueryError(logger, err), interpolatedQuery, backend.ErrorSourceDownstream, ch, queryResult)
return
}
defer func() {
if err := rows.Close(); err != nil {
logger.Warn("Failed to close rows", "err", err)
qm, err := e.newProcessCfg(queryContext, query, results, interpolatedQuery)
if err != nil {
e.handleQueryError("failed to get configurations", err, interpolatedQuery, backend.ErrorSourceDownstream, ch, queryResult)
return
}
frame, err := convertResultsToFrame(results, e.rowLimit)
if err != nil {
e.handleQueryError("convert frame from rows error", err, interpolatedQuery, backend.ErrorSourceDownstream, ch, queryResult)
return
}
e.processFrame(frame, qm, queryResult, ch, logger)
}
func (e *DataSourceHandler) handleQueryError(frameErr string, err error, query string, source backend.ErrorSource, ch chan DBDataResponse, queryResult DBDataResponse) {
var emptyFrame data.Frame
emptyFrame.SetMeta(&data.FrameMeta{ExecutedQueryString: query})
if isDownstreamError(err) {
source = backend.ErrorSourceDownstream
}
queryResult.dataResponse.Error = fmt.Errorf("%s: %w", frameErr, err)
queryResult.dataResponse.ErrorSource = source
queryResult.dataResponse.Frames = data.Frames{&emptyFrame}
ch <- queryResult
}
func (e *DataSourceHandler) handlePanic(logger log.Logger, queryResult *DBDataResponse, ch chan DBDataResponse) {
if r := recover(); r != nil {
logger.Error("ExecuteQuery panic", "error", r, "stack", string(debug.Stack()))
if theErr, ok := r.(error); ok {
queryResult.dataResponse.Error = theErr
queryResult.dataResponse.ErrorSource = backend.ErrorSourcePlugin
} else if theErrString, ok := r.(string); ok {
queryResult.dataResponse.Error = errors.New(theErrString)
queryResult.dataResponse.ErrorSource = backend.ErrorSourcePlugin
} else {
queryResult.dataResponse.Error = fmt.Errorf("unexpected error - %s", e.userError)
queryResult.dataResponse.ErrorSource = backend.ErrorSourceDownstream
}
}()
qm, err := e.newProcessCfg(query, queryContext, rows, interpolatedQuery)
if err != nil {
errAppendDebug("failed to get configurations", err, interpolatedQuery, backend.ErrorSourcePlugin)
return
}
// Convert row.Rows to dataframe
stringConverters := e.queryResultTransformer.GetConverterList()
frame, err := sqlutil.FrameFromRows(rows, e.rowLimit, sqlutil.ToConverters(stringConverters...)...)
if err != nil {
errAppendDebug("convert frame from rows error", err, interpolatedQuery, backend.ErrorSourcePlugin)
return
ch <- *queryResult
}
}
func (e *DataSourceHandler) processFrame(frame *data.Frame, qm *dataQueryModel, queryResult DBDataResponse, ch chan DBDataResponse, logger log.Logger) {
if frame.Meta == nil {
frame.Meta = &data.FrameMeta{}
}
frame.Meta.ExecutedQueryString = interpolatedQuery
frame.Meta.ExecutedQueryString = qm.InterpolatedQuery
// If no rows were returned, clear any previously set `Fields` with a single empty `data.Field` slice.
// Then assign `queryResult.dataResponse.Frames` the current single frame with that single empty Field.
@@ -296,14 +304,14 @@ func (e *DataSourceHandler) executeQuery(query backend.DataQuery, wg *sync.WaitG
}
if err := convertSQLTimeColumnsToEpochMS(frame, qm); err != nil {
errAppendDebug("converting time columns failed", err, interpolatedQuery, backend.ErrorSourcePlugin)
e.handleQueryError("converting time columns failed", err, qm.InterpolatedQuery, backend.ErrorSourceDownstream, ch, queryResult)
return
}
if qm.Format == dataQueryFormatSeries {
// time series has to have time column
if qm.timeIndex == -1 {
errAppendDebug("db has no time column", errors.New("time column is missing; make sure your data includes a time column for time series format or switch to a table format that doesn't require it"), interpolatedQuery, backend.ErrorSourceDownstream)
e.handleQueryError("db has no time column", errors.New("time column is missing; make sure your data includes a time column for time series format or switch to a table format that doesn't require it"), qm.InterpolatedQuery, backend.ErrorSourceDownstream, ch, queryResult)
return
}
@@ -321,7 +329,7 @@ func (e *DataSourceHandler) executeQuery(query backend.DataQuery, wg *sync.WaitG
var err error
if frame, err = convertSQLValueColumnToFloat(frame, i); err != nil {
errAppendDebug("convert value to float failed", err, interpolatedQuery, backend.ErrorSourcePlugin)
e.handleQueryError("convert value to float failed", err, qm.InterpolatedQuery, backend.ErrorSourceDownstream, ch, queryResult)
return
}
}
@@ -332,7 +340,7 @@ func (e *DataSourceHandler) executeQuery(query backend.DataQuery, wg *sync.WaitG
originalData := frame
frame, err = data.LongToWide(frame, qm.FillMissing)
if err != nil {
errAppendDebug("failed to convert long to wide series when converting from dataframe", err, interpolatedQuery, backend.ErrorSourcePlugin)
e.handleQueryError("failed to convert long to wide series when converting from dataframe", err, qm.InterpolatedQuery, backend.ErrorSourceDownstream, ch, queryResult)
return
}
@@ -364,6 +372,7 @@ func (e *DataSourceHandler) executeQuery(query backend.DataQuery, wg *sync.WaitG
if err != nil {
logger.Error("Failed to resample dataframe", "err", err)
frame.AppendNotices(data.Notice{Text: "Failed to resample dataframe", Severity: data.NoticeSeverityWarning})
return
}
}
}
@@ -384,15 +393,38 @@ var Interpolate = func(query backend.DataQuery, timeRange backend.TimeRange, tim
return sql
}
func (e *DataSourceHandler) newProcessCfg(query backend.DataQuery, queryContext context.Context,
rows *sql.Rows, interpolatedQuery string) (*dataQueryModel, error) {
columnNames, err := rows.Columns()
if err != nil {
return nil, err
func (e *DataSourceHandler) newProcessCfg(queryContext context.Context, query backend.DataQuery,
results []*pgconn.Result, interpolatedQuery string) (*dataQueryModel, error) {
// Calculate total number of fields to preallocate slices
totalFields := 0
for _, result := range results {
totalFields += len(result.FieldDescriptions)
}
columnTypes, err := rows.ColumnTypes()
if err != nil {
return nil, err
columnNames := make([]string, 0, totalFields)
columnTypes := make([]string, 0, totalFields)
// The results will contain column information in the metadata
for _, result := range results {
// Get column names from the result metadata
for _, field := range result.FieldDescriptions {
columnNames = append(columnNames, field.Name)
pqtype, ok := pgtype.NewMap().TypeForOID(field.DataTypeOID)
if !ok {
// Handle special cases for field types
switch field.DataTypeOID {
case pgtype.TimetzOID:
columnTypes = append(columnTypes, "timetz")
// money type is 790
case 790:
columnTypes = append(columnTypes, "money")
default:
columnTypes = append(columnTypes, "unknown")
}
} else {
columnTypes = append(columnTypes, pqtype.Name)
}
}
}
qm := &dataQueryModel{
@@ -405,23 +437,23 @@ func (e *DataSourceHandler) newProcessCfg(query backend.DataQuery, queryContext
queryContext: queryContext,
}
queryJson := QueryJson{}
err = json.Unmarshal(query.JSON, &queryJson)
queryJSON := QueryJson{}
err := json.Unmarshal(query.JSON, &queryJSON)
if err != nil {
return nil, err
}
if queryJson.Fill {
if queryJSON.Fill {
qm.FillMissing = &data.FillMissing{}
qm.Interval = time.Duration(queryJson.FillInterval * float64(time.Second))
switch strings.ToLower(queryJson.FillMode) {
qm.Interval = time.Duration(queryJSON.FillInterval * float64(time.Second))
switch strings.ToLower(queryJSON.FillMode) {
case "null":
qm.FillMissing.Mode = data.FillModeNull
case "previous":
qm.FillMissing.Mode = data.FillModePrevious
case "value":
qm.FillMissing.Mode = data.FillModeValue
qm.FillMissing.Value = queryJson.FillValue
qm.FillMissing.Value = queryJSON.FillValue
default:
}
}
@@ -429,13 +461,14 @@ func (e *DataSourceHandler) newProcessCfg(query backend.DataQuery, queryContext
qm.TimeRange.From = query.TimeRange.From.UTC()
qm.TimeRange.To = query.TimeRange.To.UTC()
switch queryJson.Format {
case "time_series":
qm.Format = dataQueryFormatSeries
// Default to time_series if no format is provided
switch queryJSON.Format {
case "table":
qm.Format = dataQueryFormatTable
case "time_series":
fallthrough
default:
panic(fmt.Sprintf("Unrecognized query model format: %q", queryJson.Format))
qm.Format = dataQueryFormatSeries
}
for i, col := range qm.columnNames {
@@ -456,7 +489,7 @@ func (e *DataSourceHandler) newProcessCfg(query backend.DataQuery, queryContext
qm.metricIndex = i
default:
if qm.metricIndex == -1 {
columnType := qm.columnTypes[i].DatabaseTypeName()
columnType := qm.columnTypes[i]
for _, mct := range e.metricColumnTypes {
if columnType == mct {
qm.metricIndex = i
@@ -487,7 +520,7 @@ type dataQueryModel struct {
FillMissing *data.FillMissing // property not set until after Interpolate()
Interval time.Duration
columnNames []string
columnTypes []*sql.ColumnType
columnTypes []string
timeIndex int
timeEndIndex int
metricIndex int
@@ -511,6 +544,219 @@ func convertSQLTimeColumnsToEpochMS(frame *data.Frame, qm *dataQueryModel) error
return nil
}
func convertResultsToFrame(results []*pgconn.Result, rowLimit int64) (*data.Frame, error) {
m := pgtype.NewMap()
// Find the first SELECT result to establish the frame structure
var firstSelectResult *pgconn.Result
for _, result := range results {
if result.CommandTag.Select() {
firstSelectResult = result
break
}
}
// If no SELECT results found, return empty frame
if firstSelectResult == nil {
return data.NewFrame(""), nil
}
// Create frame structure based on the first SELECT result
fields := make(data.Fields, len(firstSelectResult.FieldDescriptions))
fieldTypes, err := getFieldTypesFromDescriptions(firstSelectResult.FieldDescriptions, m)
if err != nil {
return nil, err
}
for i, v := range firstSelectResult.FieldDescriptions {
fields[i] = data.NewFieldFromFieldType(fieldTypes[i], 0)
fields[i].Name = v.Name
}
frame := *data.NewFrame("", fields...)
// Process all SELECT results, but validate column compatibility
for _, result := range results {
// Skip non-select statements
if !result.CommandTag.Select() {
continue
}
// Validate that this result has the same structure as the frame
if len(result.FieldDescriptions) != len(frame.Fields) {
return nil, fmt.Errorf("incompatible result structure: expected %d columns, got %d columns",
len(frame.Fields), len(result.FieldDescriptions))
}
// Validate column names and types match
for i, fd := range result.FieldDescriptions {
if fd.Name != frame.Fields[i].Name {
return nil, fmt.Errorf("column name mismatch at position %d: expected %q, got %q",
i, frame.Fields[i].Name, fd.Name)
}
}
fieldDescriptions := result.FieldDescriptions
for rowIdx := range result.Rows {
if rowIdx == int(rowLimit) {
frame.AppendNotices(data.Notice{
Severity: data.NoticeSeverityWarning,
Text: fmt.Sprintf("Results have been limited to %v because the SQL row limit was reached", rowLimit),
})
break
}
row := make([]any, len(fieldDescriptions))
for colIdx, fd := range fieldDescriptions {
rawValue := result.Rows[rowIdx][colIdx]
if rawValue == nil {
row[colIdx] = nil
continue
}
convertedValue, err := convertPostgresValue(rawValue, fd, m)
if err != nil {
return nil, err
}
row[colIdx] = convertedValue
}
// Validate row length matches frame field count before appending
if len(row) != len(frame.Fields) {
return nil, fmt.Errorf("row data length mismatch: expected %d values, got %d values",
len(frame.Fields), len(row))
}
frame.AppendRow(row...)
}
}
return &frame, nil
}
// convertPostgresValue converts a raw PostgreSQL value to the appropriate Go type
func convertPostgresValue(rawValue []byte, fd pgconn.FieldDescription, m *pgtype.Map) (interface{}, error) {
dataTypeOID := fd.DataTypeOID
format := fd.Format
// Convert based on type
switch fd.DataTypeOID {
case pgtype.Int2OID:
var d *int16
scanPlan := m.PlanScan(dataTypeOID, format, &d)
err := scanPlan.Scan(rawValue, &d)
if err != nil {
return nil, err
}
return d, nil
case pgtype.Int4OID:
var d *int32
scanPlan := m.PlanScan(dataTypeOID, format, &d)
err := scanPlan.Scan(rawValue, &d)
if err != nil {
return nil, err
}
return d, nil
case pgtype.Int8OID:
var d *int64
scanPlan := m.PlanScan(dataTypeOID, format, &d)
err := scanPlan.Scan(rawValue, &d)
if err != nil {
return nil, err
}
return d, nil
case pgtype.NumericOID, pgtype.Float8OID, pgtype.Float4OID:
var d *float64
scanPlan := m.PlanScan(dataTypeOID, format, &d)
err := scanPlan.Scan(rawValue, &d)
if err != nil {
return nil, err
}
return d, nil
case pgtype.BoolOID:
var d *bool
scanPlan := m.PlanScan(dataTypeOID, format, &d)
err := scanPlan.Scan(rawValue, &d)
if err != nil {
return nil, err
}
return d, nil
case pgtype.ByteaOID:
d, err := pgtype.ByteaCodec.DecodeValue(pgtype.ByteaCodec{}, m, dataTypeOID, format, rawValue)
if err != nil {
return nil, err
}
str := string(d.([]byte))
return &str, nil
case pgtype.TimestampOID, pgtype.TimestamptzOID, pgtype.DateOID:
var d *time.Time
scanPlan := m.PlanScan(dataTypeOID, format, &d)
err := scanPlan.Scan(rawValue, &d)
if err != nil {
return nil, err
}
return d, nil
case pgtype.TimeOID, pgtype.TimetzOID:
var d *string
scanPlan := m.PlanScan(dataTypeOID, format, &d)
err := scanPlan.Scan(rawValue, &d)
if err != nil {
return nil, err
}
return d, nil
case pgtype.JSONOID, pgtype.JSONBOID:
var d *string
scanPlan := m.PlanScan(dataTypeOID, format, &d)
err := scanPlan.Scan(rawValue, &d)
if err != nil {
return nil, err
}
// Handle null JSON values
if d == nil {
return nil, nil
}
j := json.RawMessage(*d)
return &j, nil
default:
var d *string
scanPlan := m.PlanScan(dataTypeOID, format, &d)
err := scanPlan.Scan(rawValue, &d)
if err != nil {
return nil, err
}
return d, nil
}
}
func getFieldTypesFromDescriptions(fieldDescriptions []pgconn.FieldDescription, m *pgtype.Map) ([]data.FieldType, error) {
fieldTypes := make([]data.FieldType, len(fieldDescriptions))
for i, v := range fieldDescriptions {
typeName, ok := m.TypeForOID(v.DataTypeOID)
if !ok {
fieldTypes[i] = data.FieldTypeNullableString
} else {
switch typeName.Name {
case "int2":
fieldTypes[i] = data.FieldTypeNullableInt16
case "int4":
fieldTypes[i] = data.FieldTypeNullableInt32
case "int8":
fieldTypes[i] = data.FieldTypeNullableInt64
case "float4", "float8", "numeric":
fieldTypes[i] = data.FieldTypeNullableFloat64
case "bool":
fieldTypes[i] = data.FieldTypeNullableBool
case "timestamptz", "timestamp", "date":
fieldTypes[i] = data.FieldTypeNullableTime
case "json", "jsonb":
fieldTypes[i] = data.FieldTypeNullableJSON
default:
fieldTypes[i] = data.FieldTypeNullableString
}
}
}
return fieldTypes, nil
}
// convertSQLTimeColumnToEpochMS converts column named time to unix timestamp in milliseconds
// to make native datetime types and epoch dates work in annotation and table queries.
func convertSQLTimeColumnToEpochMS(frame *data.Frame, timeIndex int) error {