PostgreSQL: PGX fix multiple results handling (#110452)

* PostgreSQL: FIx multiple results handling

- Added tests for handling multiple result sets, including compatible and incompatible structures, ensuring no panics occur.
- Improved `convertResultsToFrame` function to validate column compatibility and handle null values correctly.
- Introduced a new helper function `convertPostgresValue` for converting raw PostgreSQL values to appropriate Go types.
- Added comprehensive unit tests for `convertResultsToFrame` covering various scenarios including row limits and mixed result types.

* Add more test case
This commit is contained in:
Zoltán Bedi
2025-09-08 10:37:54 +02:00
committed by GitHub
parent 7bccabfb1f
commit d9f0d642cc
3 changed files with 530 additions and 102 deletions
@@ -388,35 +388,56 @@ func (e *DataSourceHandler) newProcessCfgPGX(queryContext context.Context, query
}
func convertResultsToFrame(results []*pgconn.Result, rowLimit int64) (*data.Frame, error) {
frame := data.Frame{}
m := pgtype.NewMap()
// Find the first SELECT result to establish the frame structure
var firstSelectResult *pgconn.Result
for _, result := range results {
// Skip non-select statements
if !result.CommandTag.Select() {
continue
if result.CommandTag.Select() {
firstSelectResult = result
break
}
fields := make(data.Fields, len(result.FieldDescriptions))
fieldTypes, err := getFieldTypesFromDescriptions(result.FieldDescriptions, m)
if err != nil {
return nil, err
}
for i, v := range result.FieldDescriptions {
fields[i] = data.NewFieldFromFieldType(fieldTypes[i], 0)
fields[i].Name = v.Name
}
// Create a new frame
frame = *data.NewFrame("", fields...)
}
// Add rows to the frame
// If no SELECT results found, return empty frame
if firstSelectResult == nil {
return data.NewFrame(""), nil
}
// Create frame structure based on the first SELECT result
fields := make(data.Fields, len(firstSelectResult.FieldDescriptions))
fieldTypes, err := getFieldTypesFromDescriptions(firstSelectResult.FieldDescriptions, m)
if err != nil {
return nil, err
}
for i, v := range firstSelectResult.FieldDescriptions {
fields[i] = data.NewFieldFromFieldType(fieldTypes[i], 0)
fields[i].Name = v.Name
}
frame := *data.NewFrame("", fields...)
// Process all SELECT results, but validate column compatibility
for _, result := range results {
// Skip non-select statements
if !result.CommandTag.Select() {
continue
}
// Validate that this result has the same structure as the frame
if len(result.FieldDescriptions) != len(frame.Fields) {
return nil, fmt.Errorf("incompatible result structure: expected %d columns, got %d columns",
len(frame.Fields), len(result.FieldDescriptions))
}
// Validate column names and types match
for i, fd := range result.FieldDescriptions {
if fd.Name != frame.Fields[i].Name {
return nil, fmt.Errorf("column name mismatch at position %d: expected %q, got %q",
i, frame.Fields[i].Name, fd.Name)
}
}
fieldDescriptions := result.FieldDescriptions
for rowIdx := range result.Rows {
if rowIdx == int(rowLimit) {
@@ -429,98 +450,25 @@ func convertResultsToFrame(results []*pgconn.Result, rowLimit int64) (*data.Fram
row := make([]any, len(fieldDescriptions))
for colIdx, fd := range fieldDescriptions {
rawValue := result.Rows[rowIdx][colIdx]
dataTypeOID := fd.DataTypeOID
format := fd.Format
if rawValue == nil {
row[colIdx] = nil
continue
}
// Convert based on type
switch fd.DataTypeOID {
case pgtype.Int2OID:
var d *int16
scanPlan := m.PlanScan(dataTypeOID, format, &d)
err := scanPlan.Scan(rawValue, &d)
if err != nil {
return nil, err
}
row[colIdx] = d
case pgtype.Int4OID:
var d *int32
scanPlan := m.PlanScan(dataTypeOID, format, &d)
err := scanPlan.Scan(rawValue, &d)
if err != nil {
return nil, err
}
row[colIdx] = d
case pgtype.Int8OID:
var d *int64
scanPlan := m.PlanScan(dataTypeOID, format, &d)
err := scanPlan.Scan(rawValue, &d)
if err != nil {
return nil, err
}
row[colIdx] = d
case pgtype.NumericOID, pgtype.Float8OID, pgtype.Float4OID:
var d *float64
scanPlan := m.PlanScan(dataTypeOID, format, &d)
err := scanPlan.Scan(rawValue, &d)
if err != nil {
return nil, err
}
row[colIdx] = d
case pgtype.BoolOID:
var d *bool
scanPlan := m.PlanScan(dataTypeOID, format, &d)
err := scanPlan.Scan(rawValue, &d)
if err != nil {
return nil, err
}
row[colIdx] = d
case pgtype.ByteaOID:
d, err := pgtype.ByteaCodec.DecodeValue(pgtype.ByteaCodec{}, m, dataTypeOID, format, rawValue)
if err != nil {
return nil, err
}
str := string(d.([]byte))
row[colIdx] = &str
case pgtype.TimestampOID, pgtype.TimestamptzOID, pgtype.DateOID:
var d *time.Time
scanPlan := m.PlanScan(dataTypeOID, format, &d)
err := scanPlan.Scan(rawValue, &d)
if err != nil {
return nil, err
}
row[colIdx] = d
case pgtype.TimeOID, pgtype.TimetzOID:
var d *string
scanPlan := m.PlanScan(dataTypeOID, format, &d)
err := scanPlan.Scan(rawValue, &d)
if err != nil {
return nil, err
}
row[colIdx] = d
case pgtype.JSONOID, pgtype.JSONBOID:
var d *string
scanPlan := m.PlanScan(dataTypeOID, format, &d)
err := scanPlan.Scan(rawValue, &d)
if err != nil {
return nil, err
}
j := json.RawMessage(*d)
row[colIdx] = &j
default:
var d *string
scanPlan := m.PlanScan(dataTypeOID, format, &d)
err := scanPlan.Scan(rawValue, &d)
if err != nil {
return nil, err
}
row[colIdx] = d
convertedValue, err := convertPostgresValue(rawValue, fd, m)
if err != nil {
return nil, err
}
row[colIdx] = convertedValue
}
// Validate row length matches frame field count before appending
if len(row) != len(frame.Fields) {
return nil, fmt.Errorf("row data length mismatch: expected %d values, got %d values",
len(frame.Fields), len(row))
}
frame.AppendRow(row...)
}
}
@@ -528,6 +476,96 @@ func convertResultsToFrame(results []*pgconn.Result, rowLimit int64) (*data.Fram
return &frame, nil
}
// convertPostgresValue converts a raw PostgreSQL value to the appropriate Go type
func convertPostgresValue(rawValue []byte, fd pgconn.FieldDescription, m *pgtype.Map) (interface{}, error) {
dataTypeOID := fd.DataTypeOID
format := fd.Format
// Convert based on type
switch fd.DataTypeOID {
case pgtype.Int2OID:
var d *int16
scanPlan := m.PlanScan(dataTypeOID, format, &d)
err := scanPlan.Scan(rawValue, &d)
if err != nil {
return nil, err
}
return d, nil
case pgtype.Int4OID:
var d *int32
scanPlan := m.PlanScan(dataTypeOID, format, &d)
err := scanPlan.Scan(rawValue, &d)
if err != nil {
return nil, err
}
return d, nil
case pgtype.Int8OID:
var d *int64
scanPlan := m.PlanScan(dataTypeOID, format, &d)
err := scanPlan.Scan(rawValue, &d)
if err != nil {
return nil, err
}
return d, nil
case pgtype.NumericOID, pgtype.Float8OID, pgtype.Float4OID:
var d *float64
scanPlan := m.PlanScan(dataTypeOID, format, &d)
err := scanPlan.Scan(rawValue, &d)
if err != nil {
return nil, err
}
return d, nil
case pgtype.BoolOID:
var d *bool
scanPlan := m.PlanScan(dataTypeOID, format, &d)
err := scanPlan.Scan(rawValue, &d)
if err != nil {
return nil, err
}
return d, nil
case pgtype.ByteaOID:
d, err := pgtype.ByteaCodec.DecodeValue(pgtype.ByteaCodec{}, m, dataTypeOID, format, rawValue)
if err != nil {
return nil, err
}
str := string(d.([]byte))
return &str, nil
case pgtype.TimestampOID, pgtype.TimestamptzOID, pgtype.DateOID:
var d *time.Time
scanPlan := m.PlanScan(dataTypeOID, format, &d)
err := scanPlan.Scan(rawValue, &d)
if err != nil {
return nil, err
}
return d, nil
case pgtype.TimeOID, pgtype.TimetzOID:
var d *string
scanPlan := m.PlanScan(dataTypeOID, format, &d)
err := scanPlan.Scan(rawValue, &d)
if err != nil {
return nil, err
}
return d, nil
case pgtype.JSONOID, pgtype.JSONBOID:
var d *string
scanPlan := m.PlanScan(dataTypeOID, format, &d)
err := scanPlan.Scan(rawValue, &d)
if err != nil {
return nil, err
}
j := json.RawMessage(*d)
return &j, nil
default:
var d *string
scanPlan := m.PlanScan(dataTypeOID, format, &d)
err := scanPlan.Scan(rawValue, &d)
if err != nil {
return nil, err
}
return d, nil
}
}
func getFieldTypesFromDescriptions(fieldDescriptions []pgconn.FieldDescription, m *pgtype.Map) ([]data.FieldType, error) {
fieldTypes := make([]data.FieldType, len(fieldDescriptions))
for i, v := range fieldDescriptions {