From acd843303ebb27c9423ea7bb4a77fa45258ad1d8 Mon Sep 17 00:00:00 2001 From: Sarah Zinger Date: Thu, 10 Apr 2025 14:51:44 -0400 Subject: [PATCH] SQL Expression: Add instrumentation for sql expressions (#103758) --- pkg/expr/classic/classic.go | 3 +- pkg/expr/classic/classic_test.go | 2 +- pkg/expr/commands.go | 10 ++- pkg/expr/commands_test.go | 12 +-- pkg/expr/converter_test.go | 3 +- pkg/expr/dataplane_test.go | 3 +- pkg/expr/hysteresis.go | 13 +-- pkg/expr/hysteresis_test.go | 2 +- pkg/expr/metrics.go | 47 ----------- pkg/expr/metrics/metrics.go | 125 ++++++++++++++++++++++++++++ pkg/expr/ml.go | 2 +- pkg/expr/ml_test.go | 5 +- pkg/expr/nodes.go | 6 +- pkg/expr/service.go | 5 +- pkg/expr/service_test.go | 3 +- pkg/expr/sql_command.go | 25 ++++-- pkg/expr/sql_command_test.go | 26 +++++- pkg/expr/threshold.go | 3 +- pkg/expr/threshold_bench_test.go | 10 +-- pkg/expr/threshold_test.go | 2 +- pkg/expr/transform.go | 2 +- pkg/registry/apis/query/metrics.go | 48 ----------- pkg/registry/apis/query/query.go | 4 +- pkg/registry/apis/query/register.go | 5 +- 24 files changed, 223 insertions(+), 143 deletions(-) delete mode 100644 pkg/expr/metrics.go create mode 100644 pkg/expr/metrics/metrics.go delete mode 100644 pkg/registry/apis/query/metrics.go diff --git a/pkg/expr/classic/classic.go b/pkg/expr/classic/classic.go index 0f42ac45d6f..63cd0cc6ef9 100644 --- a/pkg/expr/classic/classic.go +++ b/pkg/expr/classic/classic.go @@ -10,6 +10,7 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana/pkg/expr/mathexp" + "github.com/grafana/grafana/pkg/expr/metrics" "github.com/grafana/grafana/pkg/infra/tracing" ) @@ -69,7 +70,7 @@ func (cmd *ConditionsCmd) NeedsVars() []string { // Execute runs the command and returns the results or an error if the command // failed to execute. -func (cmd *ConditionsCmd) Execute(ctx context.Context, t time.Time, vars mathexp.Vars, tracer tracing.Tracer) (mathexp.Results, error) { +func (cmd *ConditionsCmd) Execute(ctx context.Context, t time.Time, vars mathexp.Vars, tracer tracing.Tracer, _ *metrics.ExprMetrics) (mathexp.Results, error) { ctx, span := tracer.Start(ctx, "SSE.ExecuteClassicConditions") defer span.End() // isFiring and isNoData contains the outcome of ConditionsCmd, and is derived from the diff --git a/pkg/expr/classic/classic_test.go b/pkg/expr/classic/classic_test.go index 14db0059f0f..a05e63d4f50 100644 --- a/pkg/expr/classic/classic_test.go +++ b/pkg/expr/classic/classic_test.go @@ -639,7 +639,7 @@ func TestConditionsCmd(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - res, err := tt.cmd.Execute(context.Background(), time.Now(), tt.vars, tracing.InitializeTracerForTest()) + res, err := tt.cmd.Execute(context.Background(), time.Now(), tt.vars, tracing.InitializeTracerForTest(), nil) require.NoError(t, err) require.Equal(t, tt.expected(), res) }) diff --git a/pkg/expr/commands.go b/pkg/expr/commands.go index cc8f1f9fda6..5b18ce1a104 100644 --- a/pkg/expr/commands.go +++ b/pkg/expr/commands.go @@ -12,13 +12,15 @@ import ( "go.opentelemetry.io/otel/attribute" "github.com/grafana/grafana/pkg/expr/mathexp" + "github.com/grafana/grafana/pkg/expr/metrics" + "github.com/grafana/grafana/pkg/infra/tracing" ) // Command is an interface for all expression commands. type Command interface { NeedsVars() []string - Execute(ctx context.Context, now time.Time, vars mathexp.Vars, tracer tracing.Tracer) (mathexp.Results, error) + Execute(ctx context.Context, now time.Time, vars mathexp.Vars, tracer tracing.Tracer, metrics *metrics.ExprMetrics) (mathexp.Results, error) Type() string } @@ -69,7 +71,7 @@ func (gm *MathCommand) NeedsVars() []string { // Execute runs the command and returns the results or an error if the command // failed to execute. -func (gm *MathCommand) Execute(ctx context.Context, _ time.Time, vars mathexp.Vars, tracer tracing.Tracer) (mathexp.Results, error) { +func (gm *MathCommand) Execute(ctx context.Context, _ time.Time, vars mathexp.Vars, tracer tracing.Tracer, _ *metrics.ExprMetrics) (mathexp.Results, error) { _, span := tracer.Start(ctx, "SSE.ExecuteMath") span.SetAttributes(attribute.String("expression", gm.RawExpression)) defer span.End() @@ -165,7 +167,7 @@ func (gr *ReduceCommand) NeedsVars() []string { // Execute runs the command and returns the results or an error if the command // failed to execute. -func (gr *ReduceCommand) Execute(ctx context.Context, _ time.Time, vars mathexp.Vars, tracer tracing.Tracer) (mathexp.Results, error) { +func (gr *ReduceCommand) Execute(ctx context.Context, _ time.Time, vars mathexp.Vars, tracer tracing.Tracer, _ *metrics.ExprMetrics) (mathexp.Results, error) { _, span := tracer.Start(ctx, "SSE.ExecuteReduce") defer span.End() @@ -295,7 +297,7 @@ func (gr *ResampleCommand) NeedsVars() []string { // Execute runs the command and returns the results or an error if the command // failed to execute. -func (gr *ResampleCommand) Execute(ctx context.Context, now time.Time, vars mathexp.Vars, tracer tracing.Tracer) (mathexp.Results, error) { +func (gr *ResampleCommand) Execute(ctx context.Context, now time.Time, vars mathexp.Vars, tracer tracing.Tracer, _ *metrics.ExprMetrics) (mathexp.Results, error) { _, span := tracer.Start(ctx, "SSE.ExecuteResample") defer span.End() newRes := mathexp.Results{} diff --git a/pkg/expr/commands_test.go b/pkg/expr/commands_test.go index 7fd58329171..dfe80046c90 100644 --- a/pkg/expr/commands_test.go +++ b/pkg/expr/commands_test.go @@ -119,7 +119,7 @@ func TestReduceExecute(t *testing.T) { }, } - execute, err := cmd.Execute(context.Background(), time.Now(), vars, tracing.InitializeTracerForTest()) + execute, err := cmd.Execute(context.Background(), time.Now(), vars, tracing.InitializeTracerForTest(), nil) require.NoError(t, err) require.Len(t, execute.Values, len(numbers)) @@ -163,7 +163,7 @@ func TestReduceExecute(t *testing.T) { t.Run("drop all non numbers if mapper is DropNonNumber", func(t *testing.T) { cmd, err := NewReduceCommand(util.GenerateShortUID(), randomReduceFunc(), varToReduce, &mathexp.DropNonNumber{}) require.NoError(t, err) - execute, err := cmd.Execute(context.Background(), time.Now(), vars, tracing.InitializeTracerForTest()) + execute, err := cmd.Execute(context.Background(), time.Now(), vars, tracing.InitializeTracerForTest(), nil) require.NoError(t, err) require.Len(t, execute.Values, 2) }) @@ -171,7 +171,7 @@ func TestReduceExecute(t *testing.T) { t.Run("replace all non numbers if mapper is ReplaceNonNumberWithValue", func(t *testing.T) { cmd, err := NewReduceCommand(util.GenerateShortUID(), randomReduceFunc(), varToReduce, &mathexp.ReplaceNonNumberWithValue{Value: 1}) require.NoError(t, err) - execute, err := cmd.Execute(context.Background(), time.Now(), vars, tracing.InitializeTracerForTest()) + execute, err := cmd.Execute(context.Background(), time.Now(), vars, tracing.InitializeTracerForTest(), nil) require.NoError(t, err) require.Len(t, execute.Values, len(numbers)) for _, value := range execute.Values[1 : len(numbers)-1] { @@ -194,7 +194,7 @@ func TestReduceExecute(t *testing.T) { } cmd, err := NewReduceCommand(util.GenerateShortUID(), randomReduceFunc(), varToReduce, nil) require.NoError(t, err) - results, err := cmd.Execute(context.Background(), time.Now(), vars, tracing.InitializeTracerForTest()) + results, err := cmd.Execute(context.Background(), time.Now(), vars, tracing.InitializeTracerForTest(), nil) require.NoError(t, err) require.Len(t, results.Values, 1) @@ -253,7 +253,7 @@ func TestResampleCommand_Execute(t *testing.T) { t.Run(test.name, func(t *testing.T) { result, err := cmd.Execute(context.Background(), time.Now(), mathexp.Vars{ varToReduce: mathexp.Results{Values: mathexp.Values{test.vals}}, - }, tracing.InitializeTracerForTest()) + }, tracing.InitializeTracerForTest(), nil) if test.isError { require.Error(t, err) } else { @@ -268,7 +268,7 @@ func TestResampleCommand_Execute(t *testing.T) { t.Run("should return empty result if input is nil Value", func(t *testing.T) { result, err := cmd.Execute(context.Background(), time.Now(), mathexp.Vars{ varToReduce: mathexp.Results{Values: mathexp.Values{nil}}, - }, tracing.InitializeTracerForTest()) + }, tracing.InitializeTracerForTest(), nil) require.Empty(t, result.Values) require.NoError(t, err) }) diff --git a/pkg/expr/converter_test.go b/pkg/expr/converter_test.go index d5bb8e64f74..45d04169c32 100644 --- a/pkg/expr/converter_test.go +++ b/pkg/expr/converter_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/grafana/pkg/expr/mathexp" + "github.com/grafana/grafana/pkg/expr/metrics" "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/services/datasources" "github.com/grafana/grafana/pkg/services/featuremgmt" @@ -21,7 +22,7 @@ func TestConvertDataFramesToResults(t *testing.T) { cfg: setting.NewCfg(), features: featuremgmt.WithFeatures(), tracer: tracing.InitializeTracerForTest(), - metrics: newMetrics(nil), + metrics: metrics.NewSSEMetrics(nil), } converter := &ResultConverter{Features: s.features, Tracer: s.tracer} diff --git a/pkg/expr/dataplane_test.go b/pkg/expr/dataplane_test.go index 53094212f75..56039aa867c 100644 --- a/pkg/expr/dataplane_test.go +++ b/pkg/expr/dataplane_test.go @@ -11,6 +11,7 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/stretchr/testify/require" + "github.com/grafana/grafana/pkg/expr/metrics" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/plugins" @@ -64,7 +65,7 @@ func framesPassThroughService(t *testing.T, frames data.Frames) (data.Frames, er &datafakes.FakeCacheService{}, &datafakes.FakeDataSourceService{}, nil, pluginconfig.NewFakePluginRequestConfigProvider()), tracer: tracing.InitializeTracerForTest(), - metrics: newMetrics(nil), + metrics: metrics.NewSSEMetrics(nil), converter: &ResultConverter{ Features: features, Tracer: tracing.InitializeTracerForTest(), diff --git a/pkg/expr/hysteresis.go b/pkg/expr/hysteresis.go index d55c78a801d..0911225dfdd 100644 --- a/pkg/expr/hysteresis.go +++ b/pkg/expr/hysteresis.go @@ -9,6 +9,7 @@ import ( "go.opentelemetry.io/otel/attribute" "github.com/grafana/grafana/pkg/expr/mathexp" + "github.com/grafana/grafana/pkg/expr/metrics" "github.com/grafana/grafana/pkg/infra/tracing" ) @@ -32,7 +33,7 @@ func (h *HysteresisCommand) NeedsVars() []string { return []string{h.ReferenceVar} } -func (h *HysteresisCommand) Execute(ctx context.Context, now time.Time, vars mathexp.Vars, tracer tracing.Tracer) (mathexp.Results, error) { +func (h *HysteresisCommand) Execute(ctx context.Context, now time.Time, vars mathexp.Vars, tracer tracing.Tracer, metrics *metrics.ExprMetrics) (mathexp.Results, error) { results := vars[h.ReferenceVar] logger := logger.FromContext(ctx) @@ -46,7 +47,7 @@ func (h *HysteresisCommand) Execute(ctx context.Context, now time.Time, vars mat return mathexp.Results{Values: mathexp.Values{mathexp.NewNoData()}}, nil } if len(h.LoadedDimensions) == 0 { - return h.LoadingThresholdFunc.Execute(traceCtx, now, vars, tracer) + return h.LoadingThresholdFunc.Execute(traceCtx, now, vars, tracer, metrics) } var loadedVals, unloadedVals mathexp.Values for _, value := range results.Values { @@ -62,10 +63,10 @@ func (h *HysteresisCommand) Execute(ctx context.Context, now time.Time, vars mat logger.Debug("Evaluating thresholds", "unloadingThresholdDimensions", len(loadedVals), "loadingThresholdDimensions", len(unloadedVals)) if len(loadedVals) == 0 { // if all values are unloaded - return h.LoadingThresholdFunc.Execute(traceCtx, now, vars, tracer) + return h.LoadingThresholdFunc.Execute(traceCtx, now, vars, tracer, metrics) } if len(unloadedVals) == 0 { // if all values are loaded - return h.UnloadingThresholdFunc.Execute(traceCtx, now, vars, tracer) + return h.UnloadingThresholdFunc.Execute(traceCtx, now, vars, tracer, metrics) } defer func() { @@ -74,12 +75,12 @@ func (h *HysteresisCommand) Execute(ctx context.Context, now time.Time, vars mat }() vars[h.ReferenceVar] = mathexp.Results{Values: unloadedVals} - loadingResults, err := h.LoadingThresholdFunc.Execute(traceCtx, now, vars, tracer) + loadingResults, err := h.LoadingThresholdFunc.Execute(traceCtx, now, vars, tracer, metrics) if err != nil { return mathexp.Results{}, fmt.Errorf("failed to execute loading threshold: %w", err) } vars[h.ReferenceVar] = mathexp.Results{Values: loadedVals} - unloadingResults, err := h.UnloadingThresholdFunc.Execute(traceCtx, now, vars, tracer) + unloadingResults, err := h.UnloadingThresholdFunc.Execute(traceCtx, now, vars, tracer, metrics) if err != nil { return mathexp.Results{}, fmt.Errorf("failed to execute unloading threshold: %w", err) } diff --git a/pkg/expr/hysteresis_test.go b/pkg/expr/hysteresis_test.go index 31fd875213a..2f4f3181a5d 100644 --- a/pkg/expr/hysteresis_test.go +++ b/pkg/expr/hysteresis_test.go @@ -108,7 +108,7 @@ func TestHysteresisExecute(t *testing.T) { result, err := cmd.Execute(context.Background(), time.Now(), mathexp.Vars{ "A": mathexp.Results{Values: tc.input}, - }, tracer) + }, tracer, nil) if tc.expectedError != nil { require.ErrorIs(t, err, tc.expectedError) return diff --git a/pkg/expr/metrics.go b/pkg/expr/metrics.go deleted file mode 100644 index 4cdbdbbd7cd..00000000000 --- a/pkg/expr/metrics.go +++ /dev/null @@ -1,47 +0,0 @@ -package expr - -import ( - "github.com/prometheus/client_golang/prometheus" -) - -const ( - metricsSubSystem = "sse" - metricsNamespace = "grafana" -) - -type metrics struct { - dsRequests *prometheus.CounterVec - - // older metric - expressionsQuerySummary *prometheus.SummaryVec -} - -func newMetrics(reg prometheus.Registerer) *metrics { - m := &metrics{ - dsRequests: prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: metricsNamespace, - Subsystem: metricsSubSystem, - Name: "ds_queries_total", - Help: "Number of datasource queries made via server side expression requests", - }, []string{"error", "dataplane", "datasource_type"}), - - // older (No Namespace or Subsystem) - expressionsQuerySummary: prometheus.NewSummaryVec( - prometheus.SummaryOpts{ - Name: "expressions_queries_duration_milliseconds", - Help: "Expressions query summary", - Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, - }, - []string{"status"}, - ), - } - - if reg != nil { - reg.MustRegister( - m.dsRequests, - m.expressionsQuerySummary, - ) - } - - return m -} diff --git a/pkg/expr/metrics/metrics.go b/pkg/expr/metrics/metrics.go new file mode 100644 index 00000000000..7f798941878 --- /dev/null +++ b/pkg/expr/metrics/metrics.go @@ -0,0 +1,125 @@ +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +// ExprMetrics is a struct that contains all the metrics for an implementation of the expressions service +// shared between multiple versions of expressions service, which are delineated by the subsystem string +type ExprMetrics struct { + DSRequests *prometheus.CounterVec + ExpressionsQuerySummary *prometheus.SummaryVec + SqlCommandDuration *prometheus.HistogramVec + SqlCommandErrorCount *prometheus.CounterVec + SqlCommandCellCount *prometheus.HistogramVec +} + +func newExprMetrics(subsystem string) *ExprMetrics { + return &ExprMetrics{ + DSRequests: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "grafana", + Subsystem: subsystem, + Name: "ds_queries_total", + Help: "Number of datasource queries made via server side expression requests", + }, []string{"error", "dataplane", "datasource_type"}), + + ExpressionsQuerySummary: prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: "grafana", + Subsystem: subsystem, + Name: "expressions_queries_duration_milliseconds", + Help: "Expressions query summary", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + }, + []string{"status"}, + ), + + SqlCommandDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "grafana", + Subsystem: subsystem, + Name: "sql_command_duration_seconds", + Help: "Duration of SQL command execution", + Buckets: prometheus.DefBuckets, + }, []string{"status"}), + + SqlCommandErrorCount: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "grafana", + Subsystem: subsystem, + Name: "sql_command_errors_total", + Help: "Total number of SQL command execution errors", + }, []string{}), + + SqlCommandCellCount: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "grafana", + Subsystem: subsystem, + Name: "sql_command_cell_count", + Help: "Distribution of the total number of cells in each SQL command execution", + Buckets: prometheus.ExponentialBuckets(100, 2, 10), + }, + []string{"status"}, + ), + } +} + +// NewSSEMetrics creates a new ExprMetrics struct for the ST implementation of the expressions service +func NewSSEMetrics(reg prometheus.Registerer) *ExprMetrics { + metricsSubSystem := "sse" + + m := &ExprMetrics{ + DSRequests: newExprMetrics(metricsSubSystem).DSRequests, + + ExpressionsQuerySummary: newExprMetrics(metricsSubSystem).ExpressionsQuerySummary, + + SqlCommandDuration: newExprMetrics(metricsSubSystem).SqlCommandDuration, + + SqlCommandErrorCount: newExprMetrics(metricsSubSystem).SqlCommandErrorCount, + + SqlCommandCellCount: newExprMetrics(metricsSubSystem).SqlCommandCellCount, + } + + if reg != nil { + reg.MustRegister( + m.DSRequests, + m.ExpressionsQuerySummary, + m.SqlCommandDuration, + m.SqlCommandErrorCount, + m.SqlCommandCellCount, + ) + } + + return m +} + +// NewQueryServiceExpressionsMetrics creates a new ExprMetrics struct for the query service implementation of the expressions service +func NewQueryServiceExpressionsMetrics(reg prometheus.Registerer) *ExprMetrics { + metricsSubSystem := "queryservice" + + m := &ExprMetrics{ + DSRequests: newExprMetrics(metricsSubSystem).DSRequests, + + ExpressionsQuerySummary: newExprMetrics(metricsSubSystem).ExpressionsQuerySummary, + + SqlCommandDuration: newExprMetrics(metricsSubSystem).SqlCommandDuration, + + SqlCommandErrorCount: newExprMetrics(metricsSubSystem).SqlCommandErrorCount, + + SqlCommandCellCount: newExprMetrics(metricsSubSystem).SqlCommandCellCount, + } + + if reg != nil { + reg.MustRegister( + m.DSRequests, + m.ExpressionsQuerySummary, + m.SqlCommandDuration, + m.SqlCommandErrorCount, + m.SqlCommandCellCount, + ) + } + + return m +} + +func NewTestMetrics() *ExprMetrics { + return newExprMetrics("test") +} diff --git a/pkg/expr/ml.go b/pkg/expr/ml.go index 9efa143e805..5903857ae2b 100644 --- a/pkg/expr/ml.go +++ b/pkg/expr/ml.go @@ -86,7 +86,7 @@ func (m *MLNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s * } logger.Debug("Data source queried", "responseType", responseType) useDataplane := strings.HasPrefix("dataplane-", responseType) - s.metrics.dsRequests.WithLabelValues(respStatus, fmt.Sprintf("%t", useDataplane), mlPluginID).Inc() + s.metrics.DSRequests.WithLabelValues(respStatus, fmt.Sprintf("%t", useDataplane), mlPluginID).Inc() }() // Execute the command and provide callback function for sending a request via plugin API. diff --git a/pkg/expr/ml_test.go b/pkg/expr/ml_test.go index e7d6768c466..045b7f1a34b 100644 --- a/pkg/expr/ml_test.go +++ b/pkg/expr/ml_test.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/stretchr/testify/require" + "github.com/grafana/grafana/pkg/expr/metrics" "github.com/grafana/grafana/pkg/expr/ml" "github.com/grafana/grafana/pkg/plugins" "github.com/grafana/grafana/pkg/services/user" @@ -61,7 +62,7 @@ func TestMLNodeExecute(t *testing.T) { features: nil, pluginsClient: pluginsClient, tracer: nil, - metrics: newMetrics(nil), + metrics: metrics.NewSSEMetrics(nil), } cmdResponse := data.NewFrame("test", @@ -198,7 +199,7 @@ func TestMLNodeExecute(t *testing.T) { features: nil, pluginsClient: pluginsClient, tracer: nil, - metrics: newMetrics(nil), + metrics: metrics.NewSSEMetrics(nil), } cmd := &ml.FakeCommand{ diff --git a/pkg/expr/nodes.go b/pkg/expr/nodes.go index e18bad3a7cb..b39e17a1384 100644 --- a/pkg/expr/nodes.go +++ b/pkg/expr/nodes.go @@ -103,7 +103,7 @@ func (gn *CMDNode) NeedsVars() []string { // other nodes they must have already been executed and their results must // already by in vars. func (gn *CMDNode) Execute(ctx context.Context, now time.Time, vars mathexp.Vars, s *Service) (mathexp.Results, error) { - return gn.Command.Execute(ctx, now, vars, s.tracer) + return gn.Command.Execute(ctx, now, vars, s.tracer, s.metrics) } func buildCMDNode(rn *rawNode, toggles featuremgmt.FeatureToggles, sqlExpressionCellLimit int64) (*CMDNode, error) { @@ -320,7 +320,7 @@ func executeDSNodesGrouped(ctx context.Context, now time.Time, vars mathexp.Vars } logger.Debug("Data source queried", "responseType", responseType) useDataplane := strings.HasPrefix(responseType, "dataplane-") - s.metrics.dsRequests.WithLabelValues(respStatus, fmt.Sprintf("%t", useDataplane), firstNode.datasource.Type).Inc() + s.metrics.DSRequests.WithLabelValues(respStatus, fmt.Sprintf("%t", useDataplane), firstNode.datasource.Type).Inc() } resp, err := s.dataService.QueryData(ctx, req) @@ -395,7 +395,7 @@ func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s } logger.Debug("Data source queried", "responseType", responseType) useDataplane := strings.HasPrefix(responseType, "dataplane-") - s.metrics.dsRequests.WithLabelValues(respStatus, fmt.Sprintf("%t", useDataplane), dn.datasource.Type).Inc() + s.metrics.DSRequests.WithLabelValues(respStatus, fmt.Sprintf("%t", useDataplane), dn.datasource.Type).Inc() }() resp, err := s.dataService.QueryData(ctx, req) diff --git a/pkg/expr/service.go b/pkg/expr/service.go index 1b04ce41c3f..e3ea54ea056 100644 --- a/pkg/expr/service.go +++ b/pkg/expr/service.go @@ -11,6 +11,7 @@ import ( "github.com/grafana/grafana/pkg/apimachinery/identity" "github.com/grafana/grafana/pkg/components/simplejson" + "github.com/grafana/grafana/pkg/expr/metrics" "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/plugins" "github.com/grafana/grafana/pkg/services/datasources" @@ -65,7 +66,7 @@ type Service struct { pluginsClient backend.CallResourceHandler tracer tracing.Tracer - metrics *metrics + metrics *metrics.ExprMetrics } type pluginContextProvider interface { @@ -81,7 +82,7 @@ func ProvideService(cfg *setting.Cfg, pluginClient plugins.Client, pCtxProvider pCtxProvider: pCtxProvider, features: features, tracer: tracer, - metrics: newMetrics(registerer), + metrics: metrics.NewSSEMetrics(registerer), pluginsClient: pluginClient, converter: &ResultConverter{ Features: features, diff --git a/pkg/expr/service_test.go b/pkg/expr/service_test.go index a343cdaf7b4..306113079db 100644 --- a/pkg/expr/service_test.go +++ b/pkg/expr/service_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/grafana/pkg/apimachinery/errutil" + "github.com/grafana/grafana/pkg/expr/metrics" "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/plugins" "github.com/grafana/grafana/pkg/services/datasources" @@ -249,7 +250,7 @@ func newMockQueryService(responses map[string]backend.DataResponse, queries []Qu pCtxProvider: pCtxProvider, features: featuremgmt.WithFeatures(), tracer: tracing.InitializeTracerForTest(), - metrics: newMetrics(nil), + metrics: metrics.NewSSEMetrics(nil), converter: &ResultConverter{ Features: features, Tracer: tracing.InitializeTracerForTest(), diff --git a/pkg/expr/sql_command.go b/pkg/expr/sql_command.go index 270ccc3530c..25207accf4a 100644 --- a/pkg/expr/sql_command.go +++ b/pkg/expr/sql_command.go @@ -10,6 +10,7 @@ import ( "github.com/grafana/grafana/pkg/apimachinery/errutil" "github.com/grafana/grafana/pkg/expr/mathexp" + "github.com/grafana/grafana/pkg/expr/metrics" "github.com/grafana/grafana/pkg/expr/sql" "github.com/grafana/grafana/pkg/infra/tracing" ) @@ -99,9 +100,22 @@ func (gr *SQLCommand) NeedsVars() []string { // Execute runs the command and returns the results or an error if the command // failed to execute. -func (gr *SQLCommand) Execute(ctx context.Context, now time.Time, vars mathexp.Vars, tracer tracing.Tracer) (mathexp.Results, error) { +func (gr *SQLCommand) Execute(ctx context.Context, now time.Time, vars mathexp.Vars, tracer tracing.Tracer, metrics *metrics.ExprMetrics) (mathExprResult mathexp.Results, resultError error) { _, span := tracer.Start(ctx, "SSE.ExecuteSQL") - defer span.End() + start := time.Now() + tc := int64(0) + + defer func() { + span.End() + statusLabel := "ok" + duration := float64(time.Since(start).Nanoseconds()) / float64(time.Millisecond) + if resultError != nil { + statusLabel = "error" + metrics.SqlCommandErrorCount.WithLabelValues().Inc() + } + metrics.SqlCommandDuration.WithLabelValues(statusLabel).Observe(duration) + metrics.SqlCommandCellCount.WithLabelValues(statusLabel).Observe(float64(tc)) + }() allFrames := []*data.Frame{} for _, ref := range gr.varsToQuery { @@ -114,14 +128,15 @@ func (gr *SQLCommand) Execute(ctx context.Context, now time.Time, vars mathexp.V allFrames = append(allFrames, frames...) } - totalCells := totalCells(allFrames) + tc = totalCells(allFrames) + // limit of 0 or less means no limit (following convention) - if gr.limit > 0 && totalCells > gr.limit { + if gr.limit > 0 && tc > gr.limit { return mathexp.Results{}, fmt.Errorf( "SQL expression: total cell count across all input tables exceeds limit of %d. Total cells: %d", gr.limit, - totalCells, + tc, ) } diff --git a/pkg/expr/sql_command_test.go b/pkg/expr/sql_command_test.go index 64654b34900..32ed9ee9934 100644 --- a/pkg/expr/sql_command_test.go +++ b/pkg/expr/sql_command_test.go @@ -10,6 +10,8 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana/pkg/expr/mathexp" + "github.com/grafana/grafana/pkg/expr/metrics" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/trace" ) @@ -134,7 +136,7 @@ func TestSQLCommandCellLimits(t *testing.T) { } } - _, err = cmd.Execute(context.Background(), time.Now(), vars, &testTracer{}) + _, err = cmd.Execute(context.Background(), time.Now(), vars, &testTracer{}, metrics.NewTestMetrics()) if tt.expectError { require.Error(t, err) @@ -146,6 +148,28 @@ func TestSQLCommandCellLimits(t *testing.T) { } } +func TestSQLCommandMetrics(t *testing.T) { + // Create test metrics + m := metrics.NewTestMetrics() + + // Create a command + cmd, err := NewSQLCommand("A", "someformat", "select * from foo", 0) + require.NoError(t, err) + + // Execute successful command + _, err = cmd.Execute(context.Background(), time.Now(), mathexp.Vars{}, &testTracer{}, m) + require.NoError(t, err) + + // Verify error count was not incremented + require.Equal(t, 0, testutil.CollectAndCount(m.SqlCommandErrorCount), "Expected error metric not to be recorded") + + // Verify duration was recorded + require.Equal(t, 1, testutil.CollectAndCount(m.SqlCommandDuration), "Expected duration metric to be recorded") + + // Verify cell count was recorded + require.Equal(t, 1, testutil.CollectAndCount(m.SqlCommandCellCount), "Expected cell count metric to be recorded") +} + type testTracer struct { trace.Tracer } diff --git a/pkg/expr/threshold.go b/pkg/expr/threshold.go index d07a5d7676e..f9e9656e03e 100644 --- a/pkg/expr/threshold.go +++ b/pkg/expr/threshold.go @@ -11,6 +11,7 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana/pkg/expr/mathexp" + "github.com/grafana/grafana/pkg/expr/metrics" "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/util" @@ -174,7 +175,7 @@ func (tc *ThresholdCommand) NeedsVars() []string { return []string{tc.ReferenceVar} } -func (tc *ThresholdCommand) Execute(_ context.Context, _ time.Time, vars mathexp.Vars, _ tracing.Tracer) (mathexp.Results, error) { +func (tc *ThresholdCommand) Execute(_ context.Context, _ time.Time, vars mathexp.Vars, _ tracing.Tracer, _ *metrics.ExprMetrics) (mathexp.Results, error) { eval := func(maybeValue *float64) *float64 { if maybeValue == nil { return nil diff --git a/pkg/expr/threshold_bench_test.go b/pkg/expr/threshold_bench_test.go index ddda45eba97..13b42eb44b2 100644 --- a/pkg/expr/threshold_bench_test.go +++ b/pkg/expr/threshold_bench_test.go @@ -33,7 +33,7 @@ func BenchmarkThreshold(b *testing.B) { } b.ResetTimer() for i := 0; i < b.N; i++ { - _, _ = greater.Execute(ctx, timeNow, vars, trace) + _, _ = greater.Execute(ctx, timeNow, vars, trace, nil) } }) b.Run("less than", func(b *testing.B) { @@ -43,7 +43,7 @@ func BenchmarkThreshold(b *testing.B) { } b.ResetTimer() for i := 0; i < b.N; i++ { - _, _ = greater.Execute(ctx, timeNow, vars, trace) + _, _ = greater.Execute(ctx, timeNow, vars, trace, nil) } }) b.Run("within range", func(b *testing.B) { @@ -53,7 +53,7 @@ func BenchmarkThreshold(b *testing.B) { } b.ResetTimer() for i := 0; i < b.N; i++ { - _, _ = greater.Execute(ctx, timeNow, vars, trace) + _, _ = greater.Execute(ctx, timeNow, vars, trace, nil) } }) b.Run("within range, no labels", func(b *testing.B) { @@ -72,7 +72,7 @@ func BenchmarkThreshold(b *testing.B) { } b.ResetTimer() for i := 0; i < b.N; i++ { - _, _ = greater.Execute(ctx, timeNow, vars, trace) + _, _ = greater.Execute(ctx, timeNow, vars, trace, nil) } }) b.Run("outside range", func(b *testing.B) { @@ -82,7 +82,7 @@ func BenchmarkThreshold(b *testing.B) { } b.ResetTimer() for i := 0; i < b.N; i++ { - _, _ = greater.Execute(ctx, timeNow, vars, trace) + _, _ = greater.Execute(ctx, timeNow, vars, trace, nil) } }) } diff --git a/pkg/expr/threshold_test.go b/pkg/expr/threshold_test.go index 2cfcadab18f..0bdc695e7ed 100644 --- a/pkg/expr/threshold_test.go +++ b/pkg/expr/threshold_test.go @@ -639,7 +639,7 @@ func TestThresholdExecute(t *testing.T) { t.Run(name, func(t *testing.T) { result, err := cmd.Execute(context.Background(), time.Now(), mathexp.Vars{ "A": newResults(input[name]), - }, tracing.InitializeTracerForTest()) + }, tracing.InitializeTracerForTest(), nil) require.NoError(t, err) require.Equal(t, newResults(tc.expected[name]), result) }) diff --git a/pkg/expr/transform.go b/pkg/expr/transform.go index 57925e1da91..db49522eff1 100644 --- a/pkg/expr/transform.go +++ b/pkg/expr/transform.go @@ -81,7 +81,7 @@ func (s *Service) TransformData(ctx context.Context, now time.Time, req *Request respStatus = "failure" } duration := float64(time.Since(start).Nanoseconds()) / float64(time.Millisecond) - s.metrics.expressionsQuerySummary.WithLabelValues(respStatus).Observe(duration) + s.metrics.ExpressionsQuerySummary.WithLabelValues(respStatus).Observe(duration) span.End() }() diff --git a/pkg/registry/apis/query/metrics.go b/pkg/registry/apis/query/metrics.go deleted file mode 100644 index f3b8212f9db..00000000000 --- a/pkg/registry/apis/query/metrics.go +++ /dev/null @@ -1,48 +0,0 @@ -package query - -import ( - "github.com/prometheus/client_golang/prometheus" -) - -const ( - metricsSubSystem = "queryservice" - metricsNamespace = "grafana" -) - -type queryMetrics struct { - dsRequests *prometheus.CounterVec - - // older metric - expressionsQuerySummary *prometheus.SummaryVec -} - -func newQueryMetrics(reg prometheus.Registerer) *queryMetrics { - m := &queryMetrics{ - dsRequests: prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: metricsNamespace, - Subsystem: metricsSubSystem, - Name: "ds_queries_total", - Help: "Number of datasource queries made from the query service", - }, []string{"error", "dataplane", "datasource_type"}), - - expressionsQuerySummary: prometheus.NewSummaryVec( - prometheus.SummaryOpts{ - Namespace: metricsNamespace, - Subsystem: metricsSubSystem, - Name: "expressions_queries_duration_milliseconds", - Help: "Expressions query summary", - Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, - }, - []string{"status"}, - ), - } - - if reg != nil { - reg.MustRegister( - m.dsRequests, - m.expressionsQuerySummary, - ) - } - - return m -} diff --git a/pkg/registry/apis/query/query.go b/pkg/registry/apis/query/query.go index f019f058044..da73b222fab 100644 --- a/pkg/registry/apis/query/query.go +++ b/pkg/registry/apis/query/query.go @@ -360,7 +360,7 @@ func (b *QueryAPIBuilder) handleExpressions(ctx context.Context, req parsedReque respStatus = "failure" } duration := float64(time.Since(start).Nanoseconds()) / float64(time.Millisecond) - b.metrics.expressionsQuerySummary.WithLabelValues(respStatus).Observe(duration) + b.metrics.ExpressionsQuerySummary.WithLabelValues(respStatus).Observe(duration) span.End() }() @@ -403,7 +403,7 @@ func (b *QueryAPIBuilder) handleExpressions(ctx context.Context, req parsedReque } refId := expression.RefID - results, err := expression.Command.Execute(ctx, now, vars, b.tracer) + results, err := expression.Command.Execute(ctx, now, vars, b.tracer, b.metrics) if err != nil { expressionsLogger.Error("error executing expression", "error", err) results.Error = err diff --git a/pkg/registry/apis/query/register.go b/pkg/registry/apis/query/register.go index 83ca935a662..9692c21d2c7 100644 --- a/pkg/registry/apis/query/register.go +++ b/pkg/registry/apis/query/register.go @@ -17,6 +17,7 @@ import ( claims "github.com/grafana/authlib/types" query "github.com/grafana/grafana/pkg/apis/query/v0alpha1" "github.com/grafana/grafana/pkg/expr" + "github.com/grafana/grafana/pkg/expr/metrics" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/plugins" @@ -43,7 +44,7 @@ type QueryAPIBuilder struct { authorizer authorizer.Authorizer tracer tracing.Tracer - metrics *queryMetrics + metrics *metrics.ExprMetrics parser *queryParser client clientapi.DataSourceClientSupplier registry query.DataSourceApiServerRegistry @@ -83,7 +84,7 @@ func NewQueryAPIBuilder(features featuremgmt.FeatureToggles, authorizer: ar, registry: registry, parser: newQueryParser(reader, legacy, tracer, log.New("query_parser")), - metrics: newQueryMetrics(registerer), + metrics: metrics.NewQueryServiceExpressionsMetrics(registerer), tracer: tracer, features: features, queryTypes: queryTypes,