diff --git a/pkg/expr/convert_to_full_long.go b/pkg/expr/convert_to_full_long.go index 863e45b39df..6c17badb1d8 100644 --- a/pkg/expr/convert_to_full_long.go +++ b/pkg/expr/convert_to_full_long.go @@ -27,6 +27,7 @@ func ConvertToFullLong(frames data.Frames) (data.Frames, error) { if frames[0].Meta != nil && frames[0].Meta.Type != "" { inputType = frames[0].Meta.Type } else { + // shouldn't hit this when calling from handleSqlInput as supportedToLongConversion is called first return nil, fmt.Errorf("input frame missing FrameMeta.Type") } @@ -40,6 +41,7 @@ func ConvertToFullLong(frames data.Frames) (data.Frames, error) { case data.FrameTypeTimeSeriesWide: return convertTimeSeriesWideToFullLong(frames) default: + // Shouldn't hit this when calling from handleSqlInput as supportedToLongConversion is called first return nil, fmt.Errorf("unsupported input type %s for full long conversion", inputType) } } diff --git a/pkg/expr/converter.go b/pkg/expr/converter.go index e4c444c5ce7..3017afaeef1 100644 --- a/pkg/expr/converter.go +++ b/pkg/expr/converter.go @@ -23,17 +23,11 @@ type ResultConverter struct { func (c *ResultConverter) Convert(ctx context.Context, datasourceType string, frames data.Frames, - forSqlInput bool, ) (string, mathexp.Results, error) { if len(frames) == 0 { return "no-data", mathexp.Results{Values: mathexp.Values{mathexp.NewNoData()}}, nil } - if forSqlInput { - results := handleSqlInput(frames) - return "sql input", results, nil - } - var dt data.FrameType dt, useDataplane, _ := shouldUseDataplane(frames, logger, c.Features.IsEnabled(ctx, featuremgmt.FlagDisableSSEDataplane)) if useDataplane { @@ -126,73 +120,6 @@ func (c *ResultConverter) Convert(ctx context.Context, }, nil } -// handleSqlInput normalizes input DataFrames into a single dataframe with no labels for use with SQL expressions. -// -// It handles three cases: -// 1. If the input declares a supported time series or numeric kind in the wide or multi format (via FrameMeta.Type), it converts to a full-long formatted table using ConvertToFullLong. -// 2. If the input is a single frame (no labels, no declared type), it passes through as-is. -// 3. If the input has multiple frames or label metadata but lacks a supported type, it returns an error. -func handleSqlInput(dataFrames data.Frames) mathexp.Results { - var result mathexp.Results - - // dataframes len > 0 is checked in the caller -- Convert - first := dataFrames[0] - - // Single Frame no data case - // Note: In the case of a support Frame Type, we may want to return the matching schema - // with no rows (e.g. include the `__value__` column). But not sure about this at this time. - if len(dataFrames) == 1 && len(first.Fields) == 0 { - result.Values = mathexp.Values{ - mathexp.TableData{Frame: first}, - } - - return result - } - - var metaType data.FrameType - if first.Meta != nil { - metaType = first.Meta.Type - } - - if supportedToLongConversion(metaType) { - convertedFrames, err := ConvertToFullLong(dataFrames) - if err != nil { - result.Error = fmt.Errorf("failed to convert data frames to long format for SQL: %w", err) - } - - if len(convertedFrames) == 0 { - result.Error = fmt.Errorf("conversion succeeded but returned no frames") - return result - } - - result.Values = mathexp.Values{ - mathexp.TableData{Frame: convertedFrames[0]}, - } - - return result - } - - // If Meta.Type is not supported, but there are labels or more than 1 frame, fail fast - if len(dataFrames) > 1 { - result.Error = fmt.Errorf("response has more than one frame but frame type is missing or unsupported for sql conversion") - return result - } - for _, frame := range dataFrames { - for _, field := range frame.Fields { - if len(field.Labels) > 0 { - result.Error = fmt.Errorf("frame has labels but frame type is missing or unsupported for sql conversion") - return result - } - } - } - - // Can pass through as table without conversion - result.Values = mathexp.Values{ - mathexp.TableData{Frame: first}, - } - return result -} - func getResponseFrame(logger *log.ConcreteLogger, resp *backend.QueryDataResponse, refID string) (data.Frames, error) { response, ok := resp.Responses[refID] if !ok { diff --git a/pkg/expr/converter_test.go b/pkg/expr/converter_test.go index 15c45ebf0e2..692573728a7 100644 --- a/pkg/expr/converter_test.go +++ b/pkg/expr/converter_test.go @@ -41,7 +41,7 @@ func TestConvertDataFramesToResults(t *testing.T) { for _, dtype := range supported { t.Run(dtype, func(t *testing.T) { - resultType, res, err := converter.Convert(context.Background(), dtype, frames, false) + resultType, res, err := converter.Convert(context.Background(), dtype, frames) require.NoError(t, err) assert.Equal(t, "single frame series", resultType) require.Len(t, res.Values, 2) @@ -69,7 +69,7 @@ func TestConvertDataFramesToResults(t *testing.T) { for _, dtype := range supported { t.Run(dtype, func(t *testing.T) { - resultType, res, err := converter.Convert(context.Background(), dtype, frames, false) + resultType, res, err := converter.Convert(context.Background(), dtype, frames) require.NoError(t, err) assert.Equal(t, "multi frame series", resultType) require.Len(t, res.Values, 2) @@ -102,7 +102,7 @@ func TestConvertDataFramesToResults(t *testing.T) { for _, dtype := range supported { t.Run(dtype, func(t *testing.T) { - resultType, res, err := converter.Convert(context.Background(), dtype, frames, false) + resultType, res, err := converter.Convert(context.Background(), dtype, frames) require.NoError(t, err) assert.Equal(t, "multi frame series", resultType) require.Len(t, res.Values, 2) @@ -120,85 +120,3 @@ func TestConvertDataFramesToResults(t *testing.T) { }) }) } - -func TestHandleSqlInput(t *testing.T) { - tests := []struct { - name string - frames data.Frames - expectErr string - expectFrame bool - }{ - { - name: "single frame with no fields and no type is passed through", - frames: data.Frames{data.NewFrame("")}, - expectFrame: true, - }, - { - name: "single frame with no fields but type timeseries-multi is passed through", - frames: data.Frames{data.NewFrame("").SetMeta(&data.FrameMeta{Type: data.FrameTypeTimeSeriesMulti})}, - expectFrame: true, - }, - { - name: "single frame, no labels, no type → passes through", - frames: data.Frames{ - data.NewFrame("", - data.NewField("time", nil, []time.Time{time.Unix(1, 0)}), - data.NewField("value", nil, []*float64{fp(2)}), - ), - }, - expectFrame: true, - }, - { - name: "single frame with labels, but missing FrameMeta.Type → error", - frames: data.Frames{ - data.NewFrame("", - data.NewField("time", nil, []time.Time{time.Unix(1, 0)}), - data.NewField("value", data.Labels{"foo": "bar"}, []*float64{fp(2)}), - ), - }, - expectErr: "frame has labels but frame type is missing or unsupported", - }, - { - name: "multiple frames, no type → error", - frames: data.Frames{ - data.NewFrame("", - data.NewField("time", nil, []time.Time{time.Unix(1, 0)}), - data.NewField("value", nil, []*float64{fp(2)}), - ), - data.NewFrame("", - data.NewField("time", nil, []time.Time{time.Unix(1, 0)}), - data.NewField("value", nil, []*float64{fp(2)}), - ), - }, - expectErr: "response has more than one frame but frame type is missing or unsupported", - }, - { - name: "supported type (timeseries-multi) triggers ConvertToFullLong", - frames: data.Frames{ - data.NewFrame("", - data.NewField("time", nil, []time.Time{time.Unix(1, 0)}), - data.NewField("value", data.Labels{"host": "a"}, []*float64{fp(2)}), - ).SetMeta(&data.FrameMeta{Type: data.FrameTypeTimeSeriesMulti}), - }, - expectFrame: true, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - res := handleSqlInput(tc.frames) - - if tc.expectErr != "" { - require.Error(t, res.Error) - require.ErrorContains(t, res.Error, tc.expectErr) - } else { - require.NoError(t, res.Error) - if tc.expectFrame { - require.Len(t, res.Values, 1) - require.IsType(t, mathexp.TableData{}, res.Values[0]) - assert.NotNil(t, res.Values[0].(mathexp.TableData).Frame) - } - } - }) - } -} diff --git a/pkg/expr/errors.go b/pkg/expr/errors.go index c14e550f972..072a2d27f4f 100644 --- a/pkg/expr/errors.go +++ b/pkg/expr/errors.go @@ -3,8 +3,6 @@ package expr import ( "errors" "fmt" - "sort" - "strings" "github.com/grafana/grafana/pkg/apimachinery/errutil" ) @@ -95,31 +93,3 @@ func makeUnexpectedNodeTypeError(refID, nodeType string) error { return UnexpectedNodeTypeError.Build(data) } - -var DuplicateStringColumnError = errutil.NewBase( - errutil.StatusBadRequest, "sse.duplicateStringColumns").MustTemplate( - "your SQL query returned {{ .Public.count }} rows with duplicate values across the string columns, which is not allowed for alerting. Examples: ({{ .Public.examples }}). Hint: use GROUP BY or aggregation (e.g. MAX(), AVG()) to return one row per unique combination.", - errutil.WithPublic("SQL query returned duplicate combinations of string column values. Use GROUP BY or aggregation to return one row per combination."), -) - -func makeDuplicateStringColumnError(examples []string) error { - const limit = 5 - sort.Strings(examples) - exampleStr := strings.Join(truncateExamples(examples, limit), ", ") - - return DuplicateStringColumnError.Build(errutil.TemplateData{ - Public: map[string]any{ - "examples": exampleStr, - "count": len(examples), - }, - }) -} - -func truncateExamples(examples []string, limit int) []string { - if len(examples) <= limit { - return examples - } - truncated := examples[:limit] - truncated = append(truncated, fmt.Sprintf("... and %d more", len(examples)-limit)) - return truncated -} diff --git a/pkg/expr/graph.go b/pkg/expr/graph.go index ad60027d988..8e5fa8abea1 100644 --- a/pkg/expr/graph.go +++ b/pkg/expr/graph.go @@ -3,6 +3,7 @@ package expr import ( "context" "encoding/json" + "errors" "fmt" "slices" "time" @@ -13,6 +14,7 @@ import ( "gonum.org/v1/gonum/graph/topo" "github.com/grafana/grafana/pkg/expr/mathexp" + "github.com/grafana/grafana/pkg/expr/sql" "github.com/grafana/grafana/pkg/services/featuremgmt" ) @@ -48,6 +50,8 @@ type Node interface { RefID() string String() string NeedsVars() []string + SetInputTo(refID string) + IsInputTo() map[string]struct{} } type ExecutableNode interface { @@ -87,8 +91,26 @@ func (dp *DataPipeline) execute(c context.Context, now time.Time, s *Service) (m for _, neededVar := range node.NeedsVars() { if res, ok := vars[neededVar]; ok { if res.Error != nil { + var depErr error + // IF SQL expression dependency error + if node.NodeType() == TypeCMDNode && node.(*CMDNode).CMDType == TypeSQL { + e := sql.MakeSQLDependencyError(node.RefID(), neededVar) + + // although the SQL expression won't be executed, + // we track a dependency error on the metric. + eType := e.Category() + var errWithType *sql.ErrorWithCategory + if errors.As(res.Error, &errWithType) { + // If it is already SQL error with type (e.g. limit exceeded, input conversion, capture the type as that) + eType = errWithType.Category() + } + s.metrics.SqlCommandCount.WithLabelValues("error", eType) + depErr = e + } else { // general SSE dependency error + depErr = MakeDependencyError(node.RefID(), neededVar) + } errResult := mathexp.Results{ - Error: MakeDependencyError(node.RefID(), neededVar), + Error: depErr, } vars[node.RefID()] = errResult hasDepError = true @@ -202,7 +224,7 @@ func (s *Service) buildDependencyGraph(ctx context.Context, req *Request) (*simp registry := buildNodeRegistry(graph) - if err := buildGraphEdges(graph, registry); err != nil { + if err := s.buildGraphEdges(graph, registry); err != nil { return nil, err } @@ -311,7 +333,7 @@ func (s *Service) buildGraph(ctx context.Context, req *Request) (*simple.Directe } // buildGraphEdges generates graph edges based on each node's dependencies. -func buildGraphEdges(dp *simple.DirectedGraph, registry map[string]Node) error { +func (s *Service) buildGraphEdges(dp *simple.DirectedGraph, registry map[string]Node) error { nodeIt := dp.Nodes() for nodeIt.Next() { @@ -328,6 +350,14 @@ func buildGraphEdges(dp *simple.DirectedGraph, registry map[string]Node) error { for _, neededVar := range cmdNode.Command.NeedsVars() { neededNode, ok := registry[neededVar] if !ok { + if cmdNode.CMDType == TypeSQL { + // With the current flow, the SQL expression won't be executed with + // this missing dependency. But we collection the metric as there was an + // attempt to execute a SQL expression. + e := sql.MakeTableNotFoundError(cmdNode.refID, neededVar) + s.metrics.SqlCommandCount.WithLabelValues("error", e.Category()).Inc() + return e + } return fmt.Errorf("unable to find dependent node '%v'", neededVar) } @@ -365,6 +395,7 @@ func buildGraphEdges(dp *simple.DirectedGraph, registry map[string]Node) error { } edge := dp.NewEdge(neededNode, cmdNode) + neededNode.SetInputTo(cmdNode.RefID()) dp.SetEdge(edge) } diff --git a/pkg/expr/metrics/metrics.go b/pkg/expr/metrics/metrics.go index ee5c99ce405..8040b2ede91 100644 --- a/pkg/expr/metrics/metrics.go +++ b/pkg/expr/metrics/metrics.go @@ -12,6 +12,7 @@ type ExprMetrics struct { SqlCommandDuration *prometheus.HistogramVec SqlCommandCount *prometheus.CounterVec SqlCommandCellCount *prometheus.HistogramVec + SqlCommandInputCount *prometheus.CounterVec } func newExprMetrics(subsystem string) *ExprMetrics { @@ -46,8 +47,8 @@ func newExprMetrics(subsystem string) *ExprMetrics { Namespace: "grafana", Subsystem: subsystem, Name: "sql_command_count", - Help: "Total number of SQL command executions with a status label", - }, []string{"status"}), + Help: "Total number of SQL command executions with a status label and error_type for more detailed categorization of errors. When there is no error, error_type is 'none'. The two types of error_types that are unhandled are 'general_gms_error', and and 'unknown'", + }, []string{"status", "error_type"}), SqlCommandCellCount: prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -59,6 +60,13 @@ func newExprMetrics(subsystem string) *ExprMetrics { }, []string{"status"}, ), + + SqlCommandInputCount: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "grafana", + Subsystem: subsystem, + Name: "sql_command_input_count", + Help: "Total number of inputs to the SQL command. Errors here are also counted in the sql_command_count metric but without the datasource_type and input_frame_type. The attempted_conversion label indicates if the input was converted from another format (e.g. from labeled time series) or passed through as a table. Since a single SQL expression can have multiple inputs, this can count higher than sql_command_count.", + }, []string{"status", "attempted_conversion", "datasource_type", "input_frame_type"}), } } @@ -76,6 +84,8 @@ func NewSSEMetrics(reg prometheus.Registerer) *ExprMetrics { SqlCommandCount: newExprMetrics(metricsSubSystem).SqlCommandCount, SqlCommandCellCount: newExprMetrics(metricsSubSystem).SqlCommandCellCount, + + SqlCommandInputCount: newExprMetrics(metricsSubSystem).SqlCommandInputCount, } if reg != nil { @@ -85,6 +95,7 @@ func NewSSEMetrics(reg prometheus.Registerer) *ExprMetrics { m.SqlCommandDuration, m.SqlCommandCount, m.SqlCommandCellCount, + m.SqlCommandInputCount, ) } @@ -105,6 +116,8 @@ func NewQueryServiceExpressionsMetrics(reg prometheus.Registerer) *ExprMetrics { SqlCommandCount: newExprMetrics(metricsSubSystem).SqlCommandCount, SqlCommandCellCount: newExprMetrics(metricsSubSystem).SqlCommandCellCount, + + SqlCommandInputCount: newExprMetrics(metricsSubSystem).SqlCommandInputCount, } if reg != nil { @@ -114,6 +127,7 @@ func NewQueryServiceExpressionsMetrics(reg prometheus.Registerer) *ExprMetrics { m.SqlCommandDuration, m.SqlCommandCount, m.SqlCommandCellCount, + m.SqlCommandInputCount, ) } diff --git a/pkg/expr/ml.go b/pkg/expr/ml.go index bee5d18ad22..54fdbc798a9 100644 --- a/pkg/expr/ml.go +++ b/pkg/expr/ml.go @@ -130,7 +130,7 @@ func (m *MLNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s * } // process the response the same way DSNode does. Use plugin ID as data source type. Semantically, they are the same. - responseType, result, err = s.converter.Convert(ctx, mlPluginID, dataFrames, false) + responseType, result, err = s.converter.Convert(ctx, mlPluginID, dataFrames) return result, err } diff --git a/pkg/expr/nodes.go b/pkg/expr/nodes.go index 8d2c257f6ad..2ea0e36c40e 100644 --- a/pkg/expr/nodes.go +++ b/pkg/expr/nodes.go @@ -32,8 +32,9 @@ var ( // baseNode includes common properties used across DPNodes. type baseNode struct { - id int64 - refID string + id int64 + refID string + isInputTo map[string]struct{} } type rawNode struct { @@ -91,6 +92,17 @@ func (b *baseNode) RefID() string { return b.refID } +func (b *baseNode) SetInputTo(refID string) { + if b.isInputTo == nil { + b.isInputTo = make(map[string]struct{}) + } + b.isInputTo[refID] = struct{}{} +} + +func (b *baseNode) IsInputTo() map[string]struct{} { + return b.isInputTo +} + // NodeType returns the data pipeline node type. func (gn *CMDNode) NodeType() NodeType { return TypeCMDNode @@ -367,7 +379,7 @@ func executeDSNodesGrouped(ctx context.Context, now time.Time, vars mathexp.Vars } var result mathexp.Results - responseType, result, err := s.converter.Convert(ctx, dn.datasource.Type, dataFrames, dn.isInputToSQLExpr) + responseType, result, err := s.converter.Convert(ctx, dn.datasource.Type, dataFrames) if err != nil { result.Error = makeConversionError(dn.RefID(), err) } @@ -457,10 +469,22 @@ func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s var result mathexp.Results - responseType, result, err = s.converter.Convert(ctx, dn.datasource.Type, dataFrames, dn.isInputToSQLExpr) + if dn.isInputToSQLExpr { + var converted bool + dataType := categorizeFrameInputType(dataFrames) - if err != nil { - err = makeConversionError(dn.refID, err) + result, converted = handleSqlInput(ctx, s.tracer, dn.RefID(), dn.IsInputTo(), dn.datasource.Type, dataFrames) + status := "ok" + if result.Error != nil { + status = "error" + } + s.metrics.SqlCommandInputCount.WithLabelValues(status, fmt.Sprintf("%t", converted), dn.datasource.Type, dataType).Inc() + } else { + responseType, result, err = s.converter.Convert(ctx, dn.datasource.Type, dataFrames) + if err != nil { + err = makeConversionError(dn.refID, err) + } } + return result, err } diff --git a/pkg/expr/reader.go b/pkg/expr/reader.go index 5c85b9d61fb..ddabbf3cb04 100644 --- a/pkg/expr/reader.go +++ b/pkg/expr/reader.go @@ -5,6 +5,7 @@ import ( "fmt" "strings" + "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend/gtime" "github.com/grafana/grafana-plugin-sdk-go/data/utils/jsoniter" data "github.com/grafana/grafana-plugin-sdk-go/experimental/apis/data/v0alpha1" @@ -137,7 +138,8 @@ func (h *ExpressionQueryReader) ReadQuery( eq.Properties = q // TODO: Cascade limit from Grafana config in this (new Expression Parser) branch of the code cellLimit := 0 // zero means no limit - eq.Command, err = NewSQLCommand(ctx, common.RefID, q.Format, q.Expression, int64(cellLimit), 0, 0) + sqlLogger := backend.NewLoggerWith("logger", SQLLoggerName).FromContext(ctx) + eq.Command, err = NewSQLCommand(ctx, sqlLogger, common.RefID, q.Format, q.Expression, int64(cellLimit), 0, 0) } case QueryTypeThreshold: diff --git a/pkg/expr/service_sql_test.go b/pkg/expr/service_sql_test.go index 714c1b98d7b..d65702a9d45 100644 --- a/pkg/expr/service_sql_test.go +++ b/pkg/expr/service_sql_test.go @@ -90,7 +90,7 @@ func TestSQLService(t *testing.T) { require.NoError(t, err) require.Error(t, rsp.Responses["B"].Error, "should return invalid sql error") - require.ErrorContains(t, rsp.Responses["B"].Error, "blocked function load_file") + require.ErrorContains(t, rsp.Responses["B"].Error, "not in the allowed list of") }) t.Run("parse error should be returned", func(t *testing.T) { @@ -110,3 +110,93 @@ func TestSQLService(t *testing.T) { require.ErrorContains(t, rsp.Responses["B"].Error, "limit expression expected to be numeric") }) } + +func TestSQLServiceErrors(t *testing.T) { + tsMulti := data.NewFrame("", + data.NewField("time", nil, []time.Time{time.Unix(1, 0)}), + data.NewField("value", data.Labels{"testLabelKey": "testLabelValue"}, []*float64{fp(2)}), + ).SetMeta(&data.FrameMeta{Type: data.FrameTypeTimeSeriesMulti}) + + tsMultiNoType := data.NewFrame("", + data.NewField("time", nil, []time.Time{time.Unix(1, 0)}), + data.NewField("value", data.Labels{"testLabelKey": "testLabelValue"}, []*float64{fp(2)}), + ) + + resp := map[string]backend.DataResponse{ + "tsMulti": {Frames: data.Frames{tsMulti}}, + "tsMultiNoType": {Frames: data.Frames{tsMultiNoType}}, + } + + newABSQLQueries := func(q string) []Query { + escaped, err := json.Marshal(q) + require.NoError(t, err) + return []Query{ + { + RefID: "tsMulti", + DataSource: &datasources.DataSource{ + OrgID: 1, + UID: "test", + Type: "test", + }, + JSON: json.RawMessage(`{ "datasource": { "uid": "1" }, "intervalMs": 1000, "maxDataPoints": 1000 }`), + TimeRange: AbsoluteTimeRange{ + From: time.Time{}, + To: time.Time{}, + }, + }, + { + RefID: "tsMultiNoType", + DataSource: &datasources.DataSource{ + OrgID: 1, + UID: "test", + Type: "test", + }, + JSON: json.RawMessage(`{ "datasource": { "uid": "1" }, "intervalMs": 1000, "maxDataPoints": 1000 }`), + TimeRange: AbsoluteTimeRange{ + From: time.Time{}, + To: time.Time{}, + }, + }, + { + RefID: "sqlExpression", + DataSource: dataSourceModel(), + JSON: json.RawMessage(fmt.Sprintf(`{ "datasource": { "uid": "__expr__", "type": "__expr__"}, "type": "sql", "expression": %s }`, escaped)), + TimeRange: AbsoluteTimeRange{ + From: time.Time{}, + To: time.Time{}, + }, + }, + } + } + + t.Run("conversion failure (and therefore dependency error)", func(t *testing.T) { + s, req := newMockQueryService(resp, + newABSQLQueries(`SELECT * FROM tsMultiNoType`), + ) + + s.features = featuremgmt.WithFeatures(featuremgmt.FlagSqlExpressions) + + pl, err := s.BuildPipeline(t.Context(), req) + require.NoError(t, err) + + rsp, err := s.ExecutePipeline(context.Background(), time.Now(), pl) + require.NoError(t, err) + + require.Error(t, rsp.Responses["tsMultiNoType"].Error, "should return conversion error on DS response") + require.ErrorContains(t, rsp.Responses["tsMultiNoType"].Error, "missing the data type") + + require.Error(t, rsp.Responses["sqlExpression"].Error, "should return dependency error") + require.ErrorContains(t, rsp.Responses["sqlExpression"].Error, "dependency") + }) + + t.Run("pipeline (expressions and DS queries) will fail if the table is not found, before execution of the sql expression", func(t *testing.T) { + s, req := newMockQueryService(resp, + newABSQLQueries(`SELECT * FROM nonExisting`), + ) + + s.features = featuremgmt.WithFeatures(featuremgmt.FlagSqlExpressions) + + _, err := s.BuildPipeline(t.Context(), req) + require.Error(t, err, "whole pipeline fails when selecting a dependency that does not exist") + }) +} diff --git a/pkg/expr/sql/db.go b/pkg/expr/sql/db.go index d8a8934f81e..8d98e3dea51 100644 --- a/pkg/expr/sql/db.go +++ b/pkg/expr/sql/db.go @@ -19,42 +19,6 @@ import ( // DB is a database that can execute SQL queries against a set of Frames. type DB struct{} -// GoMySQLServerError represents an error from the underlying Go MySQL Server -type GoMySQLServerError struct { - Err error -} - -// Error implements the error interface -func (e *GoMySQLServerError) Error() string { - return fmt.Sprintf("error in go-mysql-server: %v", e.Err) -} - -// Unwrap provides the original error for errors.Is/As -func (e *GoMySQLServerError) Unwrap() error { - return e.Err -} - -// WrapGoMySQLServerError wraps errors from Go MySQL Server with additional context -func WrapGoMySQLServerError(err error) error { - // Don't wrap nil errors - if err == nil { - return nil - } - - // Check if it's a function not found error or other specific GMS errors - if isFunctionNotFoundError(err) { - return &GoMySQLServerError{Err: err} - } - - // Return original error if it's not one we want to wrap - return err -} - -// isFunctionNotFoundError checks if the error is related to a function not being found -func isFunctionNotFoundError(err error) bool { - return mysql.ErrFunctionNotFound.Is(err) -} - type QueryOption func(*QueryOptions) type QueryOptions struct { @@ -80,7 +44,7 @@ func WithMaxOutputCells(n int64) QueryOption { // The name becomes the name and RefID of the returned frame. func (db *DB) QueryFrames(ctx context.Context, tracer tracing.Tracer, name string, query string, frames []*data.Frame, opts ...QueryOption) (*data.Frame, error) { // We are parsing twice due to TablesList, but don't care fow now. We can save the parsed query and reuse it later if we want. - if allow, err := AllowQuery(query); err != nil || !allow { + if allow, err := AllowQuery(name, query); err != nil || !allow { if err != nil { return nil, err } @@ -122,9 +86,9 @@ func (db *DB) QueryFrames(ctx context.Context, tracer tracing.Tracer, name strin contextErr := func(err error) error { switch { case errors.Is(err, context.DeadlineExceeded): - return fmt.Errorf("SQL expression for refId %v did not complete within the timeout of %v: %w", name, QueryOptions.Timeout, err) + return MakeTimeOutError(err, name, QueryOptions.Timeout) case errors.Is(err, context.Canceled): - return fmt.Errorf("SQL expression for refId %v was cancelled before it completed: %w", name, err) + return MakeCancelError(err, name) default: return fmt.Errorf("SQL expression for refId %v ended unexpectedly: %w", name, err) } @@ -136,7 +100,7 @@ func (db *DB) QueryFrames(ctx context.Context, tracer tracing.Tracer, name strin if ctx.Err() != nil { return nil, contextErr(ctx.Err()) } - return nil, WrapGoMySQLServerError(err) + return nil, MakeGMSError(name, err) } // Convert the iterator into a Grafana data.Frame diff --git a/pkg/expr/sql/db_test.go b/pkg/expr/sql/db_test.go index 5eb9cb069c8..ac4448ed1b5 100644 --- a/pkg/expr/sql/db_test.go +++ b/pkg/expr/sql/db_test.go @@ -206,7 +206,7 @@ func TestErrorsFromGoMySQLServerAreFlagged(t *testing.T) { _, err := db.QueryFrames(context.Background(), &testTracer{}, "sqlExpressionRefId", query, nil) require.Error(t, err) - require.Contains(t, err.Error(), "error in go-mysql-server") + require.Contains(t, err.Error(), "error from the sql expression engine") } func TestFrameToSQLAndBack_JSONRoundtrip(t *testing.T) { @@ -308,7 +308,7 @@ func TestQueryFrames_Limits(t *testing.T) { CROSS JOIN (SELECT 1 AS val UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4 UNION ALL SELECT 5) b `, opts: []QueryOption{WithTimeout(1 * time.Nanosecond)}, - expectError: "did not complete within the timeout", + expectError: "timed out", }, } diff --git a/pkg/expr/sql/errors.go b/pkg/expr/sql/errors.go new file mode 100644 index 00000000000..24953ff8d30 --- /dev/null +++ b/pkg/expr/sql/errors.go @@ -0,0 +1,391 @@ +package sql + +import ( + "errors" + "fmt" + "sort" + "strings" + "time" + + mysql "github.com/dolthub/go-mysql-server/sql" + "github.com/grafana/grafana/pkg/apimachinery/errutil" +) + +const sseErrBase = "sse.sql." + +// GoMySQLServerError represents an error from the underlying Go MySQL Server +type GoMySQLServerError struct { + err error + category string +} + +// CategorizedError is an Error with a Category string for use with metrics, logs, and traces. +type CategorizedError interface { + error + Category() string +} + +// ErrorWithCategory is a concrete implementation of CategorizedError that holds an error and its category. +type ErrorWithCategory struct { + category string + err error +} + +func (e *ErrorWithCategory) Error() string { + return e.err.Error() +} + +func (e *ErrorWithCategory) Category() string { + return e.category +} + +// Unwrap provides the original error for errors.Is/As +func (e *ErrorWithCategory) Unwrap() error { + return e.err +} + +// Error implements the error interface +func (e *GoMySQLServerError) Error() string { + return e.err.Error() +} + +// Unwrap provides the original error for errors.Is/As +func (e *GoMySQLServerError) Unwrap() error { + return e.err +} + +func (e *GoMySQLServerError) Category() string { + return e.category +} + +// MakeGMSError creates a GoMySQLServerError with the given refID and error. +// It also used to wrap GMS errors into a GeneralGMSError or specific CategorizedError. +func MakeGMSError(refID string, err error) error { + err = WrapGoMySQLServerError(refID, err) + + gmsError := &GoMySQLServerError{} + if errors.As(err, &gmsError) { + return MakeGeneralGMSError(gmsError, refID) + } + + return err +} + +const ErrCategoryGMSFunctionNotFound = "gms_function_not_found" +const ErrCategoryGMSTableNotFound = "gms_table_not_found" + +// WrapGoMySQLServerError wraps errors from Go MySQL Server with additional context +// and a category. +func WrapGoMySQLServerError(refID string, err error) error { + // Don't wrap nil errors + if err == nil { + return nil + } + + switch { + case mysql.ErrFunctionNotFound.Is(err): + return &GoMySQLServerError{err: err, category: ErrCategoryGMSFunctionNotFound} + case mysql.ErrTableNotFound.Is(err): + // This is different from the TableNotFoundError, which is used when the engine can't find the dependency before it gets to the SQL engine. + return &GoMySQLServerError{err: err, category: ErrCategoryGMSTableNotFound} + case mysql.ErrColumnNotFound.Is(err): + return MakeColumnNotFoundError(refID, err) + default: + // For all other errors, wrap them as a general GMS error + return MakeGeneralGMSError(&GoMySQLServerError{ + err: err, + category: ErrCategoryGeneralGMSError, + }, refID) + } +} + +const ErrCategoryGeneralGMSError = "general_gms_error" + +var generalGMSErrorStr = "sql expression failed due to error from the sql expression engine: {{ .Error }}" + +var GeneralGMSError = errutil.NewBase( + errutil.StatusBadRequest, sseErrBase+ErrCategoryGeneralGMSError).MustTemplate( + generalGMSErrorStr, + errutil.WithPublic(generalGMSErrorStr)) + +// MakeGeneralGMSError is for errors returned from the GMS engine that we have not make a more specific error for. +func MakeGeneralGMSError(err *GoMySQLServerError, refID string) CategorizedError { + data := errutil.TemplateData{ + Public: map[string]interface{}{ + "refId": refID, + }, + Error: err, + } + + return &ErrorWithCategory{category: err.Category(), err: GeneralGMSError.Build(data)} +} + +const ErrCategoryInputLimitExceeded = "input_limit_exceeded" + +var inputLimitExceededStr = "sql expression [{{ .Public.refId }}] was not run because the number of input cells (columns*rows) to the sql expression exceeded the configured limit of {{ .Public.inputLimit }}" + +var InputLimitExceededError = errutil.NewBase( + errutil.StatusBadRequest, sseErrBase+ErrCategoryInputLimitExceeded).MustTemplate( + inputLimitExceededStr, + errutil.WithPublic(inputLimitExceededStr)) + +func MakeInputLimitExceededError(refID string, inputLimit int64) CategorizedError { + data := errutil.TemplateData{ + Public: map[string]interface{}{ + "refId": refID, + "inputLimit": inputLimit, + }, + } + + return &ErrorWithCategory{category: ErrCategoryInputLimitExceeded, err: InputLimitExceededError.Build(data)} +} + +const ErrCategoryDuplicateStringColumns = "duplicate_string_columns" + +var duplicateStringColumnErrorStr = "sql expression [{{ .Public.refId }}] failed because it returned duplicate values across the string columns, which is not allowed for alerting. Examples: ({{ .Public.examples }}). Hint: use GROUP BY or aggregation (e.g. MAX(), AVG()) to return one row per unique combination." + +var DuplicateStringColumnError = errutil.NewBase( + errutil.StatusBadRequest, sseErrBase+ErrCategoryDuplicateStringColumns).MustTemplate( + duplicateStringColumnErrorStr, + errutil.WithPublic(duplicateStringColumnErrorStr), +) + +func MakeDuplicateStringColumnError(examples []string) CategorizedError { + const limit = 5 + sort.Strings(examples) + exampleStr := strings.Join(truncateExamples(examples, limit), ", ") + + data := errutil.TemplateData{ + Public: map[string]interface{}{ + "examples": exampleStr, + "count": len(examples), + }, + } + + return &ErrorWithCategory{ + category: ErrCategoryDuplicateStringColumns, + err: DuplicateStringColumnError.Build(data), + } +} + +func truncateExamples(examples []string, limit int) []string { + if len(examples) <= limit { + return examples + } + truncated := examples[:limit] + truncated = append(truncated, fmt.Sprintf("... and %d more", len(examples)-limit)) + return truncated +} + +const ErrCategoryTimeout = "timeout" + +var timeoutStr = "sql expression [{{ .Public.refId }}] timed out after {{ .Public.timeout }}" + +var TimeoutError = errutil.NewBase( + errutil.StatusTimeout, sseErrBase+ErrCategoryTimeout).MustTemplate( + timeoutStr, + errutil.WithPublic(timeoutStr)) + +// MakeTimeOutError creates an error for when a query times out because it took longer that the configured timeout. +func MakeTimeOutError(err error, refID string, timeout time.Duration) CategorizedError { + data := errutil.TemplateData{ + Public: map[string]interface{}{ + "refId": refID, + "timeout": timeout.String(), + }, + + Error: err, + } + + return &ErrorWithCategory{category: ErrCategoryTimeout, err: TimeoutError.Build(data)} +} + +var ErrCategoryCancelled = "cancelled" + +var cancelStr = "sql expression [{{ .Public.refId }}] was cancelled before completion" + +var CancelError = errutil.NewBase( + errutil.StatusClientClosedRequest, sseErrBase+ErrCategoryCancelled).MustTemplate( + cancelStr, + errutil.WithPublic(cancelStr)) + +// MakeCancelError creates an error for when a query is cancelled before completion. +// Users won't see this error in the browser, rather an empty response when the browser cancels the connection. +func MakeCancelError(err error, refID string) CategorizedError { + data := errutil.TemplateData{ + Public: map[string]interface{}{ + "refId": refID, + }, + + Error: err, + } + + return &ErrorWithCategory{category: ErrCategoryCancelled, err: CancelError.Build(data)} +} + +var ErrCategoryTableNotFound = "table_not_found" + +var tableNotFoundStr = "failed to run sql expression [{{ .Public.refId }}] because it selects from table (refId/query) [{{ .Public.table }}] and that table was not found" + +var TableNotFoundError = errutil.NewBase( + errutil.StatusBadRequest, sseErrBase+ErrCategoryTableNotFound).MustTemplate( + tableNotFoundStr, + errutil.WithPublic(tableNotFoundStr)) + +// MakeTableNotFoundError creates an error for when a referenced table +// does not exist. +func MakeTableNotFoundError(refID, table string) CategorizedError { + data := errutil.TemplateData{ + Public: map[string]interface{}{ + "refId": refID, + "table": table, + }, + + Error: fmt.Errorf("sql expression [%s] failed: table (refId)'%s' not found", refID, table), + } + + return &ErrorWithCategory{category: ErrCategoryTableNotFound, err: TableNotFoundError.Build(data)} +} + +const ErrCategoryDependency = "failed_dependency" + +var sqlDepErrStr = "could not run sql expression [{{ .Public.refId }}] because it selects from the results of query [{{.Public.depRefId }}] which has an error" + +var DependencyError = errutil.NewBase( + errutil.StatusBadRequest, sseErrBase+ErrCategoryDependency).MustTemplate( + sqlDepErrStr, + errutil.WithPublic(sqlDepErrStr)) + +func MakeSQLDependencyError(refID, depRefID string) CategorizedError { + data := errutil.TemplateData{ + Public: map[string]interface{}{ + "refId": refID, + "depRefId": depRefID, + }, + Error: fmt.Errorf("could not run sql expression %v because it selects from the results of query %v which has an error", refID, depRefID), + } + + return &ErrorWithCategory{category: ErrCategoryDependency, err: DependencyError.Build(data)} +} + +const ErrCategoryInputConversion = "input_conversion" + +var sqlInputConvertErrorStr = "failed to convert the results of query [{{.Public.refId}}] (Datasource Type: [{{.Public.dsType}}]) into a SQL/Tabular format for sql expression {{ .Public.forRefID }}: {{ .Error }}" + +var InputConvertError = errutil.NewBase( + errutil.StatusBadRequest, sseErrBase+ErrCategoryInputConversion).MustTemplate( + sqlInputConvertErrorStr, + errutil.WithPublic(sqlInputConvertErrorStr)) + +// MakeInputConvertError creates an error for when the input conversion to a table for a SQL expressions fails. +func MakeInputConvertError(err error, refID string, forRefIDs map[string]struct{}, dsType string) CategorizedError { + forRefIdsSlice := make([]string, 0, len(forRefIDs)) + for k := range forRefIDs { + forRefIdsSlice = append(forRefIdsSlice, k) + } + data := errutil.TemplateData{ + Public: map[string]interface{}{ + "refId": refID, + "forRefID": forRefIdsSlice, + "dsType": dsType, + }, + Error: err, + } + + return &ErrorWithCategory{category: ErrCategoryInputConversion, err: InputConvertError.Build(data)} +} + +const ErrCategoryEmptyQuery = "empty_query" + +var errEmptyQueryString = "sql expression [{{.Public.refId}}] failed because it has an empty SQL query" + +var ErrEmptySQLQuery = errutil.NewBase( + errutil.StatusBadRequest, sseErrBase+ErrCategoryEmptyQuery).MustTemplate( + errEmptyQueryString, + errutil.WithPublic(errEmptyQueryString)) + +// MakeTableNotFoundError creates an error for when a referenced table +// does not exist. +func MakeErrEmptyQuery(refID string) CategorizedError { + data := errutil.TemplateData{ + Public: map[string]interface{}{ + "refId": refID, + }, + + Error: fmt.Errorf("sql expression [%s] failed because it has an empty SQL query", refID), + } + + return &ErrorWithCategory{category: ErrCategoryEmptyQuery, err: ErrEmptySQLQuery.Build(data)} +} + +const ErrCategoryInvalidQuery = "invalid_query" + +var invalidQueryStr = "sql expression [{{.Public.refId}}] failed because it has an invalid SQL query: {{ .Public.error }}" + +var ErrInvalidQuery = errutil.NewBase( + errutil.StatusBadRequest, sseErrBase+ErrCategoryInvalidQuery).MustTemplate( + invalidQueryStr, + errutil.WithPublic(invalidQueryStr)) + +func MakeErrInvalidQuery(refID string, err error) CategorizedError { + data := errutil.TemplateData{ + Public: map[string]interface{}{ + "refId": refID, + "error": err.Error(), + }, + + Error: fmt.Errorf("sql expression [%s] failed because it has an invalid SQL query: %w", refID, err), + } + + return &ErrorWithCategory{category: ErrCategoryInvalidQuery, err: ErrInvalidQuery.Build(data)} +} + +var ErrCategoryBlockedNodeOrFunc = "blocked_node_or_func" + +var blockedNodeOrFuncStr = "did not execute the SQL expression {{.Public.refId}} because the sql {{.Public.tokenType}} '{{.Public.token}}' is not in the allowed list of {{.Public.tokenType}}s" + +var BlockedNodeOrFuncError = errutil.NewBase( + errutil.StatusBadRequest, sseErrBase+ErrCategoryBlockedNodeOrFunc).MustTemplate( + blockedNodeOrFuncStr, + errutil.WithPublic(blockedNodeOrFuncStr)) + +// MakeBlockedNodeOrFuncError creates an error for when a sql function or keyword is not allowed. +func MakeBlockedNodeOrFuncError(refID, token string, isFunction bool) CategorizedError { + tokenType := "keyword" + if isFunction { + tokenType = "function" + } + data := errutil.TemplateData{ + Public: map[string]interface{}{ + "refId": refID, + "token": token, + "tokenType": tokenType, + }, + + Error: fmt.Errorf("sql expression [%s] failed because the sql function or keyword '%s' is not in the allowed list of keywords and functions", refID, token), + } + + return &ErrorWithCategory{category: ErrCategoryBlockedNodeOrFunc, err: BlockedNodeOrFuncError.Build(data)} +} + +const ErrCategoryColumnNotFound = "column_not_found" + +var columnNotFoundStr = `sql expression [{{.Public.refId}}] failed because it selects from a column (refId/query) that does not exist: {{ .Error }}. +If this happens on a previously working query, it might mean that the query has returned no data, or the resulting schema of the query has changed.` + +var ColumnNotFoundError = errutil.NewBase( + errutil.StatusBadRequest, sseErrBase+ErrCategoryColumnNotFound).MustTemplate( + columnNotFoundStr, + errutil.WithPublic(columnNotFoundStr)) + +func MakeColumnNotFoundError(refID string, err error) CategorizedError { + data := errutil.TemplateData{ + Public: map[string]interface{}{ + "refId": refID, + }, + + Error: err, + } + + return &ErrorWithCategory{category: ErrCategoryColumnNotFound, err: ColumnNotFoundError.Build(data)} +} diff --git a/pkg/expr/sql/parser_allow.go b/pkg/expr/sql/parser_allow.go index ad04529ba86..6f8cfc1d063 100644 --- a/pkg/expr/sql/parser_allow.go +++ b/pkg/expr/sql/parser_allow.go @@ -1,6 +1,7 @@ package sql import ( + "errors" "fmt" "strings" @@ -9,7 +10,7 @@ import ( // AllowQuery parses the query and checks it against an allow list of allowed SQL nodes // and functions. -func AllowQuery(rawSQL string) (bool, error) { +func AllowQuery(refID, rawSQL string) (bool, error) { s, err := sqlparser.Parse(rawSQL) if err != nil { return false, fmt.Errorf("error parsing sql: %s", err.Error()) @@ -19,15 +20,19 @@ func AllowQuery(rawSQL string) (bool, error) { err := sqlparser.Walk(func(node sqlparser.SQLNode) (bool, error) { if !allowedNode(node) { if fT, ok := node.(*sqlparser.FuncExpr); ok { - return false, fmt.Errorf("blocked function %s - not supported in queries", fT.Name) + return false, MakeBlockedNodeOrFuncError(refID, fT.Name.String(), true) } - return false, fmt.Errorf("blocked node %T - not supported in queries", node) + return false, MakeBlockedNodeOrFuncError(refID, fmt.Sprintf("%T", node), false) } return true, nil }, node) if err != nil { - return fmt.Errorf("failed to parse SQL expression: %w", err) + var bn *ErrorWithCategory + if !errors.As(err, &bn) { + return fmt.Errorf("failed to parse SQL expression: %w", err) + } + return err } return nil diff --git a/pkg/expr/sql/parser_allow_test.go b/pkg/expr/sql/parser_allow_test.go index f03b3ff2cd1..9a36b3ad8e8 100644 --- a/pkg/expr/sql/parser_allow_test.go +++ b/pkg/expr/sql/parser_allow_test.go @@ -95,7 +95,7 @@ func TestAllowQuery(t *testing.T) { } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - _, err := AllowQuery(tc.q) + _, err := AllowQuery("A", tc.q) if tc.err != nil { require.Error(t, err) } else { diff --git a/pkg/expr/sql_command.go b/pkg/expr/sql_command.go index 016b1c87f5e..8a93400a9f8 100644 --- a/pkg/expr/sql_command.go +++ b/pkg/expr/sql_command.go @@ -8,9 +8,12 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" + semconv "go.opentelemetry.io/otel/semconv/v1.17.0" + "go.opentelemetry.io/otel/trace" - "github.com/grafana/grafana/pkg/apimachinery/errutil" + "github.com/grafana/grafana-plugin-sdk-go/backend/log" "github.com/grafana/grafana/pkg/expr/mathexp" "github.com/grafana/grafana/pkg/expr/metrics" "github.com/grafana/grafana/pkg/expr/sql" @@ -18,15 +21,7 @@ import ( "github.com/grafana/grafana/pkg/setting" ) -var ( - ErrMissingSQLQuery = errutil.BadRequest("sql-missing-query").Errorf("missing SQL query") - ErrInvalidSQLQuery = errutil.BadRequest("sql-invalid-sql").MustTemplate( - "invalid SQL query: {{ .Private.query }} err: {{ .Error }}", - errutil.WithPublic( - "Invalid SQL query: {{ .Public.error }}", - ), - ) -) +const SQLLoggerName = "expr.sql" // SQLCommand is an expression to run SQL over results type SQLCommand struct { @@ -39,31 +34,25 @@ type SQLCommand struct { inputLimit int64 outputLimit int64 timeout time.Duration + logger log.Logger } // NewSQLCommand creates a new SQLCommand. -func NewSQLCommand(ctx context.Context, refID, format, rawSQL string, intputLimit, outputLimit int64, timeout time.Duration) (*SQLCommand, error) { +func NewSQLCommand(ctx context.Context, logger log.Logger, refID, format, rawSQL string, intputLimit, outputLimit int64, timeout time.Duration) (*SQLCommand, error) { + sqlLogger := backend.NewLoggerWith("logger", SQLLoggerName).FromContext(ctx) if rawSQL == "" { - return nil, ErrMissingSQLQuery + return nil, sql.MakeErrEmptyQuery(refID) } tables, err := sql.TablesList(ctx, rawSQL) if err != nil { - logger.Warn("invalid sql query", "sql", rawSQL, "error", err) - return nil, ErrInvalidSQLQuery.Build(errutil.TemplateData{ - Error: err, - Public: map[string]any{ - "error": err.Error(), - }, - Private: map[string]any{ - "query": rawSQL, - }, - }) + sqlLogger.Warn("invalid sql query", "sql", rawSQL, "error", err) + return nil, sql.MakeErrInvalidQuery(refID, err) } if len(tables) == 0 { - logger.Warn("no tables found in SQL query", "sql", rawSQL) + sqlLogger.Warn("no tables found in SQL query", "sql", rawSQL) } if tables != nil { - logger.Debug("REF tables", "tables", tables, "sql", rawSQL) + sqlLogger.Debug("REF tables", "tables", tables, "sql", rawSQL) } return &SQLCommand{ @@ -74,14 +63,15 @@ func NewSQLCommand(ctx context.Context, refID, format, rawSQL string, intputLimi outputLimit: outputLimit, timeout: timeout, format: format, + logger: sqlLogger, }, nil } // UnmarshalSQLCommand creates a SQLCommand from Grafana's frontend query. func UnmarshalSQLCommand(ctx context.Context, rn *rawNode, cfg *setting.Cfg) (*SQLCommand, error) { - sqlLogger := backend.NewLoggerWith("logger", "expr.sql").FromContext(ctx) + sqlLogger := backend.NewLoggerWith("logger", SQLLoggerName).FromContext(ctx) if rn.TimeRange == nil { - logger.Error("time range must be specified for refID", "refID", rn.RefID) + sqlLogger.Error("time range must be specified for refID", "refID", rn.RefID) return nil, fmt.Errorf("time range must be specified for refID %s", rn.RefID) } @@ -99,7 +89,7 @@ func UnmarshalSQLCommand(ctx context.Context, rn *rawNode, cfg *setting.Cfg) (*S formatRaw := rn.Query["format"] format, _ := formatRaw.(string) - return NewSQLCommand(ctx, rn.RefID, format, expression, cfg.SQLExpressionCellLimit, cfg.SQLExpressionOutputCellLimit, cfg.SQLExpressionTimeout) + return NewSQLCommand(ctx, sqlLogger, rn.RefID, format, expression, cfg.SQLExpressionCellLimit, cfg.SQLExpressionOutputCellLimit, cfg.SQLExpressionTimeout) } // NeedsVars returns the variable names (refIds) that are dependencies @@ -113,23 +103,32 @@ func (gr *SQLCommand) NeedsVars() []string { func (gr *SQLCommand) Execute(ctx context.Context, now time.Time, vars mathexp.Vars, tracer tracing.Tracer, metrics *metrics.ExprMetrics) (mathexp.Results, error) { _, span := tracer.Start(ctx, "SSE.ExecuteSQL") start := time.Now() - sqlLogger := backend.NewLoggerWith("logger", "expr.sql").FromContext(ctx) tc := int64(0) rsp := mathexp.Results{} + errorType := "none" defer func() { duration := float64(time.Since(start).Milliseconds()) - statusLabel := "ok" if rsp.Error != nil { + e := &sql.ErrorWithCategory{} + if errors.As(rsp.Error, &e) { + errorType = e.Category() + } else { + errorType = "unknown" + } statusLabel = "error" - span.RecordError(rsp.Error) - span.SetStatus(codes.Error, rsp.Error.Error()) - sqlLogger.Error("SQL command execution failed", "error", rsp.Error.Error()) + span.AddEvent("exception", trace.WithAttributes( + semconv.ExceptionType(errorType), + semconv.ExceptionMessage(rsp.Error.Error()), + )) + span.SetAttributes(attribute.String("error.category", errorType)) + span.SetStatus(codes.Error, errorType) + gr.logger.Error("SQL command execution failed", "error", rsp.Error.Error(), "error_type", errorType) } span.End() - metrics.SqlCommandCount.WithLabelValues(statusLabel).Inc() + metrics.SqlCommandCount.WithLabelValues(statusLabel, errorType).Inc() metrics.SqlCommandDuration.WithLabelValues(statusLabel).Observe(duration) metrics.SqlCommandCellCount.WithLabelValues(statusLabel).Observe(float64(tc)) }() @@ -138,7 +137,7 @@ func (gr *SQLCommand) Execute(ctx context.Context, now time.Time, vars mathexp.V for _, ref := range gr.varsToQuery { results, ok := vars[ref] if !ok { - sqlLogger.Warn("no results found for", "ref", ref) + gr.logger.Warn("no results found for", "ref", ref) continue } frames := results.Values.AsDataFrames(ref) @@ -149,15 +148,11 @@ func (gr *SQLCommand) Execute(ctx context.Context, now time.Time, vars mathexp.V // limit of 0 or less means no limit (following convention) if gr.inputLimit > 0 && tc > gr.inputLimit { - rsp.Error = fmt.Errorf( - "SQL expression: total cell count across all input tables exceeds limit of %d. Total cells: %d", - gr.inputLimit, - tc, - ) + rsp.Error = sql.MakeInputLimitExceededError(gr.refID, gr.inputLimit) return rsp, nil } - sqlLogger.Debug("Executing query", "query", gr.query, "frames", len(allFrames)) + gr.logger.Debug("Executing query", "query", gr.query, "frames", len(allFrames)) db := sql.DB{} frame, err := db.QueryFrames(ctx, tracer, gr.refID, gr.query, allFrames, sql.WithMaxOutputCells(gr.outputLimit), sql.WithTimeout(gr.timeout)) @@ -166,7 +161,7 @@ func (gr *SQLCommand) Execute(ctx context.Context, now time.Time, vars mathexp.V return rsp, nil } - sqlLogger.Debug("Done Executing query", "query", gr.query, "rows", frame.Rows()) + gr.logger.Debug("Done Executing query", "query", gr.query, "rows", frame.Rows()) if frame.Rows() == 0 { rsp.Values = mathexp.Values{ @@ -293,7 +288,7 @@ func extractNumberSetFromSQLForAlerting(frame *data.Frame) ([]mathexp.Number, er } if len(duplicates) > 0 { - return nil, makeDuplicateStringColumnError(duplicates) + return nil, sql.MakeDuplicateStringColumnError(duplicates) } // Build final result @@ -307,3 +302,132 @@ func extractNumberSetFromSQLForAlerting(frame *data.Frame) ([]mathexp.Number, er return numbers, nil } + +// handleSqlInput normalizes input DataFrames into a single dataframe with no labels so it can represent a table for use with SQL expressions. +// +// It handles three cases: +// 1. If the input declares a supported time series or numeric kind in the wide or multi format (via FrameMeta.Type), it converts to a full-long formatted table using ConvertToFullLong. +// 2. If the input is a single frame (no labels, no declared type), it passes through as-is. +// 3. If the input has multiple frames or label metadata but lacks a supported type, it returns an error. +// +// The returned bool indicates if the input was (attempted to be) converted or passed through as-is. +func handleSqlInput(ctx context.Context, tracer trace.Tracer, refID string, forRefIDs map[string]struct{}, dsType string, dataFrames data.Frames) (mathexp.Results, bool) { + _, span := tracer.Start(ctx, "SSE.HandleConvertSQLInput") + start := time.Now() + var result mathexp.Results + errorType := "none" + var metaType data.FrameType + + defer func() { + duration := float64(time.Since(start).Milliseconds()) + statusLabel := "ok" + if result.Error != nil { + statusLabel = "error" + } + dataType := categorizeFrameInputType(dataFrames) + span.SetAttributes( + attribute.String("status", statusLabel), + attribute.Float64("duration", duration), + attribute.String("data.type", dataType), + attribute.String("datasource.type", dsType), + ) + + if result.Error != nil { + e := &sql.ErrorWithCategory{} + if errors.As(result.Error, &e) { + errorType = e.Category() + } else { + errorType = "unknown" + } + span.AddEvent("exception", trace.WithAttributes( + semconv.ExceptionType(errorType), + semconv.ExceptionMessage(result.Error.Error()), + )) + span.SetAttributes(attribute.String("error.category", errorType)) + span.SetStatus(codes.Error, errorType) + } + span.End() + }() + + if len(dataFrames) == 0 { + return mathexp.Results{Values: mathexp.Values{mathexp.NewNoData()}}, false + } + + first := dataFrames[0] + + // Single Frame no data case + // Note: In the case of a support Frame Type, we may want to return the matching schema + // with no rows (e.g. include the `__value__` column). But not sure about this at this time. + if len(dataFrames) == 1 && len(first.Fields) == 0 { + result.Values = mathexp.Values{ + mathexp.TableData{Frame: first}, + } + + return result, false + } + + if first.Meta != nil { + metaType = first.Meta.Type + } + + if supportedToLongConversion(metaType) { + convertedFrames, err := ConvertToFullLong(dataFrames) + if err != nil { + result.Error = sql.MakeInputConvertError(err, refID, forRefIDs, dsType) + } + + if len(convertedFrames) == 0 { + result.Error = fmt.Errorf("conversion succeeded but returned no frames") + return result, true + } + + result.Values = mathexp.Values{ + mathexp.TableData{Frame: convertedFrames[0]}, + } + + return result, true + } + + // If we don't have a supported type for conversion, see if we can pass through as a table (no labels, and only a single frame) + var frameTypeIssue string + if metaType == "" { + frameTypeIssue = "is missing the data type (frame.meta.type)" + } else { + frameTypeIssue = fmt.Sprintf("has an unsupported data type [%s]", metaType) + } + + // If meta.type is not supported, but there are labels or more than 1 frame error + if len(dataFrames) > 1 { + result.Error = sql.MakeInputConvertError(fmt.Errorf("can not convert because the response %s and has more than one dataframe that can not be automatically mapped to a single table", frameTypeIssue), refID, forRefIDs, dsType) + return result, false + } + for _, frame := range dataFrames { + for _, field := range frame.Fields { + if len(field.Labels) > 0 { + result.Error = sql.MakeInputConvertError(fmt.Errorf("can not convert because the response %s and has labels in the response that can not be mapped to a table", frameTypeIssue), refID, forRefIDs, dsType) + return result, false + } + } + } + + // Can pass through as table without conversion + result.Values = mathexp.Values{ + mathexp.TableData{Frame: first}, + } + return result, false +} + +func categorizeFrameInputType(dataFrames data.Frames) string { + switch { + case len(dataFrames) == 0: + return "missing" + case dataFrames[0].Meta == nil: + return "missing" + case dataFrames[0].Meta.Type == "": + return "missing" + case dataFrames[0].Meta.Type.IsKnownType(): + return string(dataFrames[0].Meta.Type) + default: + return "unknown" + } +} diff --git a/pkg/expr/sql_command_test.go b/pkg/expr/sql_command_test.go index dd9b274a8cd..bc4faa5ec0f 100644 --- a/pkg/expr/sql_command_test.go +++ b/pkg/expr/sql_command_test.go @@ -8,17 +8,19 @@ import ( "testing" "time" + "github.com/grafana/grafana-plugin-sdk-go/backend/log" "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana/pkg/expr/mathexp" "github.com/grafana/grafana/pkg/expr/metrics" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" ) func TestNewCommand(t *testing.T) { - cmd, err := NewSQLCommand(t.Context(), "a", "", "select a from foo, bar", 0, 0, 0) + cmd, err := NewSQLCommand(t.Context(), log.NewNullLogger(), "a", "", "select a from foo, bar", 0, 0, 0) if err != nil && strings.Contains(err.Error(), "feature is not enabled") { return } @@ -91,7 +93,7 @@ func TestSQLCommandCellLimits(t *testing.T) { }, vars: []string{"foo"}, expectError: true, - errorContains: "exceeds limit", + errorContains: "exceeded the configured limit", }, { name: "single (wide) frame exceeds cell limit", @@ -101,7 +103,7 @@ func TestSQLCommandCellLimits(t *testing.T) { }, vars: []string{"foo"}, expectError: true, - errorContains: "exceeds limit", + errorContains: "exceeded the configured limit", }, { name: "multiple frames exceed cell limit", @@ -112,7 +114,7 @@ func TestSQLCommandCellLimits(t *testing.T) { }, vars: []string{"foo", "bar"}, expectError: true, - errorContains: "exceeds limit", + errorContains: "exceeded the configured limit", }, { name: "limit of 0 means no limit: allow large frame", @@ -126,7 +128,7 @@ func TestSQLCommandCellLimits(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - cmd, err := NewSQLCommand(t.Context(), "a", "", "select a from foo, bar", tt.limit, 0, 0) + cmd, err := NewSQLCommand(t.Context(), log.New(), "a", "", "select a from foo, bar", tt.limit, 0, 0) require.NoError(t, err, "Failed to create SQL command") vars := mathexp.Vars{} @@ -154,15 +156,15 @@ func TestSQLCommandMetrics(t *testing.T) { m := metrics.NewTestMetrics() // Create a command - cmd, err := NewSQLCommand(t.Context(), "A", "someformat", "select * from foo", 0, 0, 0) + cmd, err := NewSQLCommand(t.Context(), log.NewNullLogger(), "A", "someformat", "select * from foo", 0, 0, 0) require.NoError(t, err) // Execute successful command _, err = cmd.Execute(context.Background(), time.Now(), mathexp.Vars{}, &testTracer{}, m) require.NoError(t, err) - // Verify error count was not incremented - require.Equal(t, 1, testutil.CollectAndCount(m.SqlCommandCount), "Expected error metric not to be recorded") + // Verify count metric was recorded + require.Equal(t, 1, testutil.CollectAndCount(m.SqlCommandCount), "Expected count metric to be recorded") // Verify duration was recorded require.Equal(t, 1, testutil.CollectAndCount(m.SqlCommandDuration), "Expected duration metric to be recorded") @@ -171,6 +173,90 @@ func TestSQLCommandMetrics(t *testing.T) { require.Equal(t, 1, testutil.CollectAndCount(m.SqlCommandCellCount), "Expected cell count metric to be recorded") } +func TestHandleSqlInput(t *testing.T) { + tests := []struct { + name string + frames data.Frames + expectErr string + expectFrame bool + converted bool + }{ + { + name: "single frame with no fields and no type is passed through", + frames: data.Frames{data.NewFrame("")}, + expectFrame: true, + }, + { + name: "single frame with no fields but type timeseries-multi is passed through", + frames: data.Frames{data.NewFrame("").SetMeta(&data.FrameMeta{Type: data.FrameTypeTimeSeriesMulti})}, + expectFrame: true, + }, + { + name: "single frame, no labels, no type → passes through", + frames: data.Frames{ + data.NewFrame("", + data.NewField("time", nil, []time.Time{time.Unix(1, 0)}), + data.NewField("value", nil, []*float64{fp(2)}), + ), + }, + expectFrame: true, + }, + { + name: "single frame with labels, but missing FrameMeta.Type → error", + frames: data.Frames{ + data.NewFrame("", + data.NewField("time", nil, []time.Time{time.Unix(1, 0)}), + data.NewField("value", data.Labels{"foo": "bar"}, []*float64{fp(2)}), + ), + }, + expectErr: "labels in the response that can not be mapped to a table", + }, + { + name: "multiple frames, no type → error", + frames: data.Frames{ + data.NewFrame("", + data.NewField("time", nil, []time.Time{time.Unix(1, 0)}), + data.NewField("value", nil, []*float64{fp(2)}), + ), + data.NewFrame("", + data.NewField("time", nil, []time.Time{time.Unix(1, 0)}), + data.NewField("value", nil, []*float64{fp(2)}), + ), + }, + expectErr: "more than one dataframe that can not be automatically mapped to a single table", + }, + { + name: "supported type (timeseries-multi) triggers ConvertToFullLong", + frames: data.Frames{ + data.NewFrame("", + data.NewField("time", nil, []time.Time{time.Unix(1, 0)}), + data.NewField("value", data.Labels{"host": "a"}, []*float64{fp(2)}), + ).SetMeta(&data.FrameMeta{Type: data.FrameTypeTimeSeriesMulti}), + }, + expectFrame: true, + converted: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + res, c := handleSqlInput(t.Context(), &testTracer{}, "a", map[string]struct{}{"b": {}}, "fakeDS", tc.frames) + require.Equal(t, tc.converted, c, "conversion bool mismatch") + if tc.expectErr != "" { + require.Error(t, res.Error) + require.ErrorContains(t, res.Error, tc.expectErr) + } else { + require.NoError(t, res.Error) + if tc.expectFrame { + require.Len(t, res.Values, 1) + require.IsType(t, mathexp.TableData{}, res.Values[0]) + require.NotNil(t, res.Values[0].(mathexp.TableData).Frame) + } + } + }) + } +} + type testTracer struct { trace.Tracer } @@ -193,3 +279,7 @@ func (ts *testSpan) RecordError(err error, opt ...trace.EventOption) { } func (ts *testSpan) SetStatus(code codes.Code, msg string) {} + +func (ts *testSpan) AddEvent(name string, opts ...trace.EventOption) {} + +func (ts *testSpan) SetAttributes(kv ...attribute.KeyValue) {} diff --git a/pkg/services/ngalert/eval/eval_test.go b/pkg/services/ngalert/eval/eval_test.go index abc66238702..b66ee008739 100644 --- a/pkg/services/ngalert/eval/eval_test.go +++ b/pkg/services/ngalert/eval/eval_test.go @@ -1596,3 +1596,10 @@ func (f fakeNode) String() string { func (f fakeNode) NeedsVars() []string { return nil } + +func (f fakeNode) IsInputTo() map[string]struct{} { + return nil +} + +func (f fakeNode) SetInputTo(a string) { +}