SQL Expressions: Rework backend errors and error instrumentation (#109633)
* Capture error_type label on metrics/traces * Make error messages more helpful to user * Use errutil, categorized errors, and tie them to error_type (category in code) * Misc trace fixes * Add metric to track SQL input conversion
This commit is contained in:
+166
-42
@@ -8,9 +8,12 @@ import (
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/errutil"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
|
||||
"github.com/grafana/grafana/pkg/expr/mathexp"
|
||||
"github.com/grafana/grafana/pkg/expr/metrics"
|
||||
"github.com/grafana/grafana/pkg/expr/sql"
|
||||
@@ -18,15 +21,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrMissingSQLQuery = errutil.BadRequest("sql-missing-query").Errorf("missing SQL query")
|
||||
ErrInvalidSQLQuery = errutil.BadRequest("sql-invalid-sql").MustTemplate(
|
||||
"invalid SQL query: {{ .Private.query }} err: {{ .Error }}",
|
||||
errutil.WithPublic(
|
||||
"Invalid SQL query: {{ .Public.error }}",
|
||||
),
|
||||
)
|
||||
)
|
||||
const SQLLoggerName = "expr.sql"
|
||||
|
||||
// SQLCommand is an expression to run SQL over results
|
||||
type SQLCommand struct {
|
||||
@@ -39,31 +34,25 @@ type SQLCommand struct {
|
||||
inputLimit int64
|
||||
outputLimit int64
|
||||
timeout time.Duration
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
// NewSQLCommand creates a new SQLCommand.
|
||||
func NewSQLCommand(ctx context.Context, refID, format, rawSQL string, intputLimit, outputLimit int64, timeout time.Duration) (*SQLCommand, error) {
|
||||
func NewSQLCommand(ctx context.Context, logger log.Logger, refID, format, rawSQL string, intputLimit, outputLimit int64, timeout time.Duration) (*SQLCommand, error) {
|
||||
sqlLogger := backend.NewLoggerWith("logger", SQLLoggerName).FromContext(ctx)
|
||||
if rawSQL == "" {
|
||||
return nil, ErrMissingSQLQuery
|
||||
return nil, sql.MakeErrEmptyQuery(refID)
|
||||
}
|
||||
tables, err := sql.TablesList(ctx, rawSQL)
|
||||
if err != nil {
|
||||
logger.Warn("invalid sql query", "sql", rawSQL, "error", err)
|
||||
return nil, ErrInvalidSQLQuery.Build(errutil.TemplateData{
|
||||
Error: err,
|
||||
Public: map[string]any{
|
||||
"error": err.Error(),
|
||||
},
|
||||
Private: map[string]any{
|
||||
"query": rawSQL,
|
||||
},
|
||||
})
|
||||
sqlLogger.Warn("invalid sql query", "sql", rawSQL, "error", err)
|
||||
return nil, sql.MakeErrInvalidQuery(refID, err)
|
||||
}
|
||||
if len(tables) == 0 {
|
||||
logger.Warn("no tables found in SQL query", "sql", rawSQL)
|
||||
sqlLogger.Warn("no tables found in SQL query", "sql", rawSQL)
|
||||
}
|
||||
if tables != nil {
|
||||
logger.Debug("REF tables", "tables", tables, "sql", rawSQL)
|
||||
sqlLogger.Debug("REF tables", "tables", tables, "sql", rawSQL)
|
||||
}
|
||||
|
||||
return &SQLCommand{
|
||||
@@ -74,14 +63,15 @@ func NewSQLCommand(ctx context.Context, refID, format, rawSQL string, intputLimi
|
||||
outputLimit: outputLimit,
|
||||
timeout: timeout,
|
||||
format: format,
|
||||
logger: sqlLogger,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// UnmarshalSQLCommand creates a SQLCommand from Grafana's frontend query.
|
||||
func UnmarshalSQLCommand(ctx context.Context, rn *rawNode, cfg *setting.Cfg) (*SQLCommand, error) {
|
||||
sqlLogger := backend.NewLoggerWith("logger", "expr.sql").FromContext(ctx)
|
||||
sqlLogger := backend.NewLoggerWith("logger", SQLLoggerName).FromContext(ctx)
|
||||
if rn.TimeRange == nil {
|
||||
logger.Error("time range must be specified for refID", "refID", rn.RefID)
|
||||
sqlLogger.Error("time range must be specified for refID", "refID", rn.RefID)
|
||||
return nil, fmt.Errorf("time range must be specified for refID %s", rn.RefID)
|
||||
}
|
||||
|
||||
@@ -99,7 +89,7 @@ func UnmarshalSQLCommand(ctx context.Context, rn *rawNode, cfg *setting.Cfg) (*S
|
||||
formatRaw := rn.Query["format"]
|
||||
format, _ := formatRaw.(string)
|
||||
|
||||
return NewSQLCommand(ctx, rn.RefID, format, expression, cfg.SQLExpressionCellLimit, cfg.SQLExpressionOutputCellLimit, cfg.SQLExpressionTimeout)
|
||||
return NewSQLCommand(ctx, sqlLogger, rn.RefID, format, expression, cfg.SQLExpressionCellLimit, cfg.SQLExpressionOutputCellLimit, cfg.SQLExpressionTimeout)
|
||||
}
|
||||
|
||||
// NeedsVars returns the variable names (refIds) that are dependencies
|
||||
@@ -113,23 +103,32 @@ func (gr *SQLCommand) NeedsVars() []string {
|
||||
func (gr *SQLCommand) Execute(ctx context.Context, now time.Time, vars mathexp.Vars, tracer tracing.Tracer, metrics *metrics.ExprMetrics) (mathexp.Results, error) {
|
||||
_, span := tracer.Start(ctx, "SSE.ExecuteSQL")
|
||||
start := time.Now()
|
||||
sqlLogger := backend.NewLoggerWith("logger", "expr.sql").FromContext(ctx)
|
||||
tc := int64(0)
|
||||
rsp := mathexp.Results{}
|
||||
errorType := "none"
|
||||
|
||||
defer func() {
|
||||
duration := float64(time.Since(start).Milliseconds())
|
||||
|
||||
statusLabel := "ok"
|
||||
if rsp.Error != nil {
|
||||
e := &sql.ErrorWithCategory{}
|
||||
if errors.As(rsp.Error, &e) {
|
||||
errorType = e.Category()
|
||||
} else {
|
||||
errorType = "unknown"
|
||||
}
|
||||
statusLabel = "error"
|
||||
span.RecordError(rsp.Error)
|
||||
span.SetStatus(codes.Error, rsp.Error.Error())
|
||||
sqlLogger.Error("SQL command execution failed", "error", rsp.Error.Error())
|
||||
span.AddEvent("exception", trace.WithAttributes(
|
||||
semconv.ExceptionType(errorType),
|
||||
semconv.ExceptionMessage(rsp.Error.Error()),
|
||||
))
|
||||
span.SetAttributes(attribute.String("error.category", errorType))
|
||||
span.SetStatus(codes.Error, errorType)
|
||||
gr.logger.Error("SQL command execution failed", "error", rsp.Error.Error(), "error_type", errorType)
|
||||
}
|
||||
span.End()
|
||||
|
||||
metrics.SqlCommandCount.WithLabelValues(statusLabel).Inc()
|
||||
metrics.SqlCommandCount.WithLabelValues(statusLabel, errorType).Inc()
|
||||
metrics.SqlCommandDuration.WithLabelValues(statusLabel).Observe(duration)
|
||||
metrics.SqlCommandCellCount.WithLabelValues(statusLabel).Observe(float64(tc))
|
||||
}()
|
||||
@@ -138,7 +137,7 @@ func (gr *SQLCommand) Execute(ctx context.Context, now time.Time, vars mathexp.V
|
||||
for _, ref := range gr.varsToQuery {
|
||||
results, ok := vars[ref]
|
||||
if !ok {
|
||||
sqlLogger.Warn("no results found for", "ref", ref)
|
||||
gr.logger.Warn("no results found for", "ref", ref)
|
||||
continue
|
||||
}
|
||||
frames := results.Values.AsDataFrames(ref)
|
||||
@@ -149,15 +148,11 @@ func (gr *SQLCommand) Execute(ctx context.Context, now time.Time, vars mathexp.V
|
||||
|
||||
// limit of 0 or less means no limit (following convention)
|
||||
if gr.inputLimit > 0 && tc > gr.inputLimit {
|
||||
rsp.Error = fmt.Errorf(
|
||||
"SQL expression: total cell count across all input tables exceeds limit of %d. Total cells: %d",
|
||||
gr.inputLimit,
|
||||
tc,
|
||||
)
|
||||
rsp.Error = sql.MakeInputLimitExceededError(gr.refID, gr.inputLimit)
|
||||
return rsp, nil
|
||||
}
|
||||
|
||||
sqlLogger.Debug("Executing query", "query", gr.query, "frames", len(allFrames))
|
||||
gr.logger.Debug("Executing query", "query", gr.query, "frames", len(allFrames))
|
||||
|
||||
db := sql.DB{}
|
||||
frame, err := db.QueryFrames(ctx, tracer, gr.refID, gr.query, allFrames, sql.WithMaxOutputCells(gr.outputLimit), sql.WithTimeout(gr.timeout))
|
||||
@@ -166,7 +161,7 @@ func (gr *SQLCommand) Execute(ctx context.Context, now time.Time, vars mathexp.V
|
||||
return rsp, nil
|
||||
}
|
||||
|
||||
sqlLogger.Debug("Done Executing query", "query", gr.query, "rows", frame.Rows())
|
||||
gr.logger.Debug("Done Executing query", "query", gr.query, "rows", frame.Rows())
|
||||
|
||||
if frame.Rows() == 0 {
|
||||
rsp.Values = mathexp.Values{
|
||||
@@ -293,7 +288,7 @@ func extractNumberSetFromSQLForAlerting(frame *data.Frame) ([]mathexp.Number, er
|
||||
}
|
||||
|
||||
if len(duplicates) > 0 {
|
||||
return nil, makeDuplicateStringColumnError(duplicates)
|
||||
return nil, sql.MakeDuplicateStringColumnError(duplicates)
|
||||
}
|
||||
|
||||
// Build final result
|
||||
@@ -307,3 +302,132 @@ func extractNumberSetFromSQLForAlerting(frame *data.Frame) ([]mathexp.Number, er
|
||||
|
||||
return numbers, nil
|
||||
}
|
||||
|
||||
// handleSqlInput normalizes input DataFrames into a single dataframe with no labels so it can represent a table for use with SQL expressions.
|
||||
//
|
||||
// It handles three cases:
|
||||
// 1. If the input declares a supported time series or numeric kind in the wide or multi format (via FrameMeta.Type), it converts to a full-long formatted table using ConvertToFullLong.
|
||||
// 2. If the input is a single frame (no labels, no declared type), it passes through as-is.
|
||||
// 3. If the input has multiple frames or label metadata but lacks a supported type, it returns an error.
|
||||
//
|
||||
// The returned bool indicates if the input was (attempted to be) converted or passed through as-is.
|
||||
func handleSqlInput(ctx context.Context, tracer trace.Tracer, refID string, forRefIDs map[string]struct{}, dsType string, dataFrames data.Frames) (mathexp.Results, bool) {
|
||||
_, span := tracer.Start(ctx, "SSE.HandleConvertSQLInput")
|
||||
start := time.Now()
|
||||
var result mathexp.Results
|
||||
errorType := "none"
|
||||
var metaType data.FrameType
|
||||
|
||||
defer func() {
|
||||
duration := float64(time.Since(start).Milliseconds())
|
||||
statusLabel := "ok"
|
||||
if result.Error != nil {
|
||||
statusLabel = "error"
|
||||
}
|
||||
dataType := categorizeFrameInputType(dataFrames)
|
||||
span.SetAttributes(
|
||||
attribute.String("status", statusLabel),
|
||||
attribute.Float64("duration", duration),
|
||||
attribute.String("data.type", dataType),
|
||||
attribute.String("datasource.type", dsType),
|
||||
)
|
||||
|
||||
if result.Error != nil {
|
||||
e := &sql.ErrorWithCategory{}
|
||||
if errors.As(result.Error, &e) {
|
||||
errorType = e.Category()
|
||||
} else {
|
||||
errorType = "unknown"
|
||||
}
|
||||
span.AddEvent("exception", trace.WithAttributes(
|
||||
semconv.ExceptionType(errorType),
|
||||
semconv.ExceptionMessage(result.Error.Error()),
|
||||
))
|
||||
span.SetAttributes(attribute.String("error.category", errorType))
|
||||
span.SetStatus(codes.Error, errorType)
|
||||
}
|
||||
span.End()
|
||||
}()
|
||||
|
||||
if len(dataFrames) == 0 {
|
||||
return mathexp.Results{Values: mathexp.Values{mathexp.NewNoData()}}, false
|
||||
}
|
||||
|
||||
first := dataFrames[0]
|
||||
|
||||
// Single Frame no data case
|
||||
// Note: In the case of a support Frame Type, we may want to return the matching schema
|
||||
// with no rows (e.g. include the `__value__` column). But not sure about this at this time.
|
||||
if len(dataFrames) == 1 && len(first.Fields) == 0 {
|
||||
result.Values = mathexp.Values{
|
||||
mathexp.TableData{Frame: first},
|
||||
}
|
||||
|
||||
return result, false
|
||||
}
|
||||
|
||||
if first.Meta != nil {
|
||||
metaType = first.Meta.Type
|
||||
}
|
||||
|
||||
if supportedToLongConversion(metaType) {
|
||||
convertedFrames, err := ConvertToFullLong(dataFrames)
|
||||
if err != nil {
|
||||
result.Error = sql.MakeInputConvertError(err, refID, forRefIDs, dsType)
|
||||
}
|
||||
|
||||
if len(convertedFrames) == 0 {
|
||||
result.Error = fmt.Errorf("conversion succeeded but returned no frames")
|
||||
return result, true
|
||||
}
|
||||
|
||||
result.Values = mathexp.Values{
|
||||
mathexp.TableData{Frame: convertedFrames[0]},
|
||||
}
|
||||
|
||||
return result, true
|
||||
}
|
||||
|
||||
// If we don't have a supported type for conversion, see if we can pass through as a table (no labels, and only a single frame)
|
||||
var frameTypeIssue string
|
||||
if metaType == "" {
|
||||
frameTypeIssue = "is missing the data type (frame.meta.type)"
|
||||
} else {
|
||||
frameTypeIssue = fmt.Sprintf("has an unsupported data type [%s]", metaType)
|
||||
}
|
||||
|
||||
// If meta.type is not supported, but there are labels or more than 1 frame error
|
||||
if len(dataFrames) > 1 {
|
||||
result.Error = sql.MakeInputConvertError(fmt.Errorf("can not convert because the response %s and has more than one dataframe that can not be automatically mapped to a single table", frameTypeIssue), refID, forRefIDs, dsType)
|
||||
return result, false
|
||||
}
|
||||
for _, frame := range dataFrames {
|
||||
for _, field := range frame.Fields {
|
||||
if len(field.Labels) > 0 {
|
||||
result.Error = sql.MakeInputConvertError(fmt.Errorf("can not convert because the response %s and has labels in the response that can not be mapped to a table", frameTypeIssue), refID, forRefIDs, dsType)
|
||||
return result, false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Can pass through as table without conversion
|
||||
result.Values = mathexp.Values{
|
||||
mathexp.TableData{Frame: first},
|
||||
}
|
||||
return result, false
|
||||
}
|
||||
|
||||
func categorizeFrameInputType(dataFrames data.Frames) string {
|
||||
switch {
|
||||
case len(dataFrames) == 0:
|
||||
return "missing"
|
||||
case dataFrames[0].Meta == nil:
|
||||
return "missing"
|
||||
case dataFrames[0].Meta.Type == "":
|
||||
return "missing"
|
||||
case dataFrames[0].Meta.Type.IsKnownType():
|
||||
return string(dataFrames[0].Meta.Type)
|
||||
default:
|
||||
return "unknown"
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user