diff --git a/pkg/expr/dataplane_test.go b/pkg/expr/dataplane_test.go index ac3e7c42f95..a8247a2bb9b 100644 --- a/pkg/expr/dataplane_test.go +++ b/pkg/expr/dataplane_test.go @@ -92,7 +92,7 @@ func framesPassThroughService(t *testing.T, frames data.Frames) (data.Frames, er User: &user.SignedInUser{}, } - pl, err := s.BuildPipeline(req) + pl, err := s.BuildPipeline(t.Context(), req) require.NoError(t, err) res, err := s.ExecutePipeline(context.Background(), time.Now(), pl) diff --git a/pkg/expr/graph.go b/pkg/expr/graph.go index e19b73dd15e..ad60027d988 100644 --- a/pkg/expr/graph.go +++ b/pkg/expr/graph.go @@ -175,12 +175,12 @@ func (dp *DataPipeline) GetCommandTypes() []string { // BuildPipeline builds a graph of the nodes, and returns the nodes in an // executable order. -func (s *Service) buildPipeline(req *Request) (DataPipeline, error) { +func (s *Service) buildPipeline(ctx context.Context, req *Request) (DataPipeline, error) { if req != nil && len(req.Headers) == 0 { req.Headers = map[string]string{} } - graph, err := s.buildDependencyGraph(req) + graph, err := s.buildDependencyGraph(ctx, req) if err != nil { return nil, err } @@ -194,8 +194,8 @@ func (s *Service) buildPipeline(req *Request) (DataPipeline, error) { } // buildDependencyGraph returns a dependency graph for a set of queries. -func (s *Service) buildDependencyGraph(req *Request) (*simple.DirectedGraph, error) { - graph, err := s.buildGraph(req) +func (s *Service) buildDependencyGraph(ctx context.Context, req *Request) (*simple.DirectedGraph, error) { + graph, err := s.buildGraph(ctx, req) if err != nil { return nil, err } @@ -252,7 +252,7 @@ func buildNodeRegistry(g *simple.DirectedGraph) map[string]Node { } // buildGraph creates a new graph populated with nodes for every query. -func (s *Service) buildGraph(req *Request) (*simple.DirectedGraph, error) { +func (s *Service) buildGraph(ctx context.Context, req *Request) (*simple.DirectedGraph, error) { dp := simple.NewDirectedGraph() for i, query := range req.Queries { @@ -287,7 +287,7 @@ func (s *Service) buildGraph(req *Request) (*simple.DirectedGraph, error) { case TypeDatasourceNode: node, err = s.buildDSNode(dp, rn, req) case TypeCMDNode: - node, err = buildCMDNode(rn, s.features, s.cfg) + node, err = buildCMDNode(ctx, rn, s.features, s.cfg) case TypeMLNode: if s.features.IsEnabledGlobally(featuremgmt.FlagMlExpressions) { node, err = s.buildMLNode(dp, rn, req) diff --git a/pkg/expr/graph_test.go b/pkg/expr/graph_test.go index dfa9f5f5b0a..bcfee8fb306 100644 --- a/pkg/expr/graph_test.go +++ b/pkg/expr/graph_test.go @@ -239,7 +239,7 @@ func TestServicebuildPipeLine(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - nodes, err := s.buildPipeline(tt.req) + nodes, err := s.buildPipeline(t.Context(), tt.req) if tt.expectErrContains != "" { require.Error(t, err) require.Contains(t, err.Error(), tt.expectErrContains) diff --git a/pkg/expr/nodes.go b/pkg/expr/nodes.go index 300488ae643..f80ddf77db8 100644 --- a/pkg/expr/nodes.go +++ b/pkg/expr/nodes.go @@ -107,7 +107,7 @@ func (gn *CMDNode) Execute(ctx context.Context, now time.Time, vars mathexp.Vars return gn.Command.Execute(ctx, now, vars, s.tracer, s.metrics) } -func buildCMDNode(rn *rawNode, toggles featuremgmt.FeatureToggles, cfg *setting.Cfg) (*CMDNode, error) { +func buildCMDNode(ctx context.Context, rn *rawNode, toggles featuremgmt.FeatureToggles, cfg *setting.Cfg) (*CMDNode, error) { commandType, err := GetExpressionCommandType(rn.Query) if err != nil { return nil, fmt.Errorf("invalid command type in expression '%v': %w", rn.RefID, err) @@ -141,7 +141,7 @@ func buildCMDNode(rn *rawNode, toggles featuremgmt.FeatureToggles, cfg *setting. if err != nil { return nil, err } - q, err := reader.ReadQuery(data.NewDataQuery(map[string]any{ + q, err := reader.ReadQuery(ctx, data.NewDataQuery(map[string]any{ "refId": rn.RefID, "type": rn.QueryType, }), iter) @@ -164,7 +164,7 @@ func buildCMDNode(rn *rawNode, toggles featuremgmt.FeatureToggles, cfg *setting. case TypeThreshold: node.Command, err = UnmarshalThresholdCommand(rn) case TypeSQL: - node.Command, err = UnmarshalSQLCommand(rn, cfg) + node.Command, err = UnmarshalSQLCommand(ctx, rn, cfg) default: return nil, fmt.Errorf("expression command type '%v' in expression '%v' not implemented", commandType, rn.RefID) } diff --git a/pkg/expr/reader.go b/pkg/expr/reader.go index 62cd6af4539..5c85b9d61fb 100644 --- a/pkg/expr/reader.go +++ b/pkg/expr/reader.go @@ -1,6 +1,7 @@ package expr import ( + "context" "fmt" "strings" @@ -44,6 +45,7 @@ func NewExpressionQueryReader(features featuremgmt.FeatureToggles) *ExpressionQu // nolint:gocyclo func (h *ExpressionQueryReader) ReadQuery( + ctx context.Context, // Properties that have been parsed off the same node common data.DataQuery, // An iterator with context for the full node (include common values) @@ -135,7 +137,7 @@ func (h *ExpressionQueryReader) ReadQuery( eq.Properties = q // TODO: Cascade limit from Grafana config in this (new Expression Parser) branch of the code cellLimit := 0 // zero means no limit - eq.Command, err = NewSQLCommand(common.RefID, q.Format, q.Expression, int64(cellLimit), 0, 0) + eq.Command, err = NewSQLCommand(ctx, common.RefID, q.Format, q.Expression, int64(cellLimit), 0, 0) } case QueryTypeThreshold: diff --git a/pkg/expr/reader_test.go b/pkg/expr/reader_test.go index ee0ec2265f4..39b99a21ac0 100644 --- a/pkg/expr/reader_test.go +++ b/pkg/expr/reader_test.go @@ -141,7 +141,7 @@ func TestReaderReduceMode(t *testing.T) { reader := NewExpressionQueryReader(featuremgmt.WithFeatures()) - eq, err := reader.ReadQuery(q, iter) + eq, err := reader.ReadQuery(t.Context(), q, iter) if test.expectError { require.Error(t, err) diff --git a/pkg/expr/service.go b/pkg/expr/service.go index 19b124b70d2..b624685c38e 100644 --- a/pkg/expr/service.go +++ b/pkg/expr/service.go @@ -102,8 +102,8 @@ func (s *Service) isDisabled() bool { } // BuildPipeline builds a pipeline from a request. -func (s *Service) BuildPipeline(req *Request) (DataPipeline, error) { - return s.buildPipeline(req) +func (s *Service) BuildPipeline(ctx context.Context, req *Request) (DataPipeline, error) { + return s.buildPipeline(ctx, req) } // ExecutePipeline executes an expression pipeline and returns all the results. diff --git a/pkg/expr/service_sql_test.go b/pkg/expr/service_sql_test.go index 0a1d991da68..714c1b98d7b 100644 --- a/pkg/expr/service_sql_test.go +++ b/pkg/expr/service_sql_test.go @@ -56,14 +56,14 @@ func TestSQLService(t *testing.T) { t.Run("no feature flag no queries for you", func(t *testing.T) { s, req := newMockQueryService(resp, newABSQLQueries("")) - _, err := s.BuildPipeline(req) + _, err := s.BuildPipeline(t.Context(), req) require.Error(t, err, "should not be able to build pipeline without feature flag") }) t.Run("with feature flag basic select works", func(t *testing.T) { s, req := newMockQueryService(resp, newABSQLQueries("SELECT * FROM A")) s.features = featuremgmt.WithFeatures(featuremgmt.FlagSqlExpressions) - pl, err := s.BuildPipeline(req) + pl, err := s.BuildPipeline(t.Context(), req) require.NoError(t, err) res, err := s.ExecutePipeline(context.Background(), time.Now(), pl) @@ -83,7 +83,7 @@ func TestSQLService(t *testing.T) { s.features = featuremgmt.WithFeatures(featuremgmt.FlagSqlExpressions) - pl, err := s.BuildPipeline(req) + pl, err := s.BuildPipeline(t.Context(), req) require.NoError(t, err) rsp, err := s.ExecutePipeline(context.Background(), time.Now(), pl) @@ -100,7 +100,7 @@ func TestSQLService(t *testing.T) { s.features = featuremgmt.WithFeatures(featuremgmt.FlagSqlExpressions) - pl, err := s.BuildPipeline(req) + pl, err := s.BuildPipeline(t.Context(), req) require.NoError(t, err) rsp, err := s.ExecutePipeline(context.Background(), time.Now(), pl) diff --git a/pkg/expr/service_test.go b/pkg/expr/service_test.go index 779791a7812..42f495d67ac 100644 --- a/pkg/expr/service_test.go +++ b/pkg/expr/service_test.go @@ -61,7 +61,7 @@ func TestService(t *testing.T) { s, req := newMockQueryService(resp, queries) - pl, err := s.BuildPipeline(req) + pl, err := s.BuildPipeline(t.Context(), req) require.NoError(t, err) res, err := s.ExecutePipeline(context.Background(), time.Now(), pl) @@ -135,7 +135,7 @@ func TestDSQueryError(t *testing.T) { s, req := newMockQueryService(resp, queries) - pl, err := s.BuildPipeline(req) + pl, err := s.BuildPipeline(t.Context(), req) require.NoError(t, err) res, err := s.ExecutePipeline(context.Background(), time.Now(), pl) @@ -200,7 +200,7 @@ func TestSQLExpressionCellLimitFromConfig(t *testing.T) { req := &Request{Queries: queries, User: &user.SignedInUser{}} // Build the pipeline - pipeline, err := s.BuildPipeline(req) + pipeline, err := s.BuildPipeline(t.Context(), req) require.NoError(t, err) node := pipeline[0] diff --git a/pkg/expr/sql/parser.go b/pkg/expr/sql/parser.go index 0e401c94a15..f2308ece9ae 100644 --- a/pkg/expr/sql/parser.go +++ b/pkg/expr/sql/parser.go @@ -1,19 +1,19 @@ package sql import ( + "context" "fmt" "sort" "strings" "github.com/dolthub/vitess/go/vt/sqlparser" - "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana-plugin-sdk-go/backend" ) -var logger = log.New("sql_expr") - // TablesList returns a list of tables for the sql statement excluding // CTEs and the 'dual' table. The list is sorted alphabetically. -func TablesList(rawSQL string) ([]string, error) { +func TablesList(ctx context.Context, rawSQL string) ([]string, error) { + logger := backend.NewLoggerWith("logger", "expr.sql").FromContext(ctx) stmt, err := sqlparser.Parse(rawSQL) if err != nil { logger.Error("error parsing sql", "error", err.Error(), "sql", rawSQL) diff --git a/pkg/expr/sql/parser_test.go b/pkg/expr/sql/parser_test.go index 44ac1ccb102..02d7ce344f7 100644 --- a/pkg/expr/sql/parser_test.go +++ b/pkg/expr/sql/parser_test.go @@ -121,7 +121,7 @@ func TestTablesList(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - tables, err := TablesList(tc.sql) + tables, err := TablesList(t.Context(), tc.sql) if tc.expectError { require.NotNil(t, err, "expected error for SQL: %s", tc.sql) } else { diff --git a/pkg/expr/sql_command.go b/pkg/expr/sql_command.go index 6c80ac717d6..016b1c87f5e 100644 --- a/pkg/expr/sql_command.go +++ b/pkg/expr/sql_command.go @@ -6,6 +6,7 @@ import ( "fmt" "time" + "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" "go.opentelemetry.io/otel/codes" @@ -41,11 +42,11 @@ type SQLCommand struct { } // NewSQLCommand creates a new SQLCommand. -func NewSQLCommand(refID, format, rawSQL string, intputLimit, outputLimit int64, timeout time.Duration) (*SQLCommand, error) { +func NewSQLCommand(ctx context.Context, refID, format, rawSQL string, intputLimit, outputLimit int64, timeout time.Duration) (*SQLCommand, error) { if rawSQL == "" { return nil, ErrMissingSQLQuery } - tables, err := sql.TablesList(rawSQL) + tables, err := sql.TablesList(ctx, rawSQL) if err != nil { logger.Warn("invalid sql query", "sql", rawSQL, "error", err) return nil, ErrInvalidSQLQuery.Build(errutil.TemplateData{ @@ -77,7 +78,8 @@ func NewSQLCommand(refID, format, rawSQL string, intputLimit, outputLimit int64, } // UnmarshalSQLCommand creates a SQLCommand from Grafana's frontend query. -func UnmarshalSQLCommand(rn *rawNode, cfg *setting.Cfg) (*SQLCommand, error) { +func UnmarshalSQLCommand(ctx context.Context, rn *rawNode, cfg *setting.Cfg) (*SQLCommand, error) { + sqlLogger := backend.NewLoggerWith("logger", "expr.sql").FromContext(ctx) if rn.TimeRange == nil { logger.Error("time range must be specified for refID", "refID", rn.RefID) return nil, fmt.Errorf("time range must be specified for refID %s", rn.RefID) @@ -85,19 +87,19 @@ func UnmarshalSQLCommand(rn *rawNode, cfg *setting.Cfg) (*SQLCommand, error) { expressionRaw, ok := rn.Query["expression"] if !ok { - logger.Error("no expression in the query", "query", rn.Query) + sqlLogger.Error("no expression in the query", "query", rn.Query) return nil, errors.New("no expression in the query") } expression, ok := expressionRaw.(string) if !ok { - logger.Error("expected sql expression to be type string", "expression", expressionRaw) + sqlLogger.Error("expected sql expression to be type string", "expression", expressionRaw) return nil, fmt.Errorf("expected sql expression to be type string, but got type %T", expressionRaw) } formatRaw := rn.Query["format"] format, _ := formatRaw.(string) - return NewSQLCommand(rn.RefID, format, expression, cfg.SQLExpressionCellLimit, cfg.SQLExpressionOutputCellLimit, cfg.SQLExpressionTimeout) + return NewSQLCommand(ctx, rn.RefID, format, expression, cfg.SQLExpressionCellLimit, cfg.SQLExpressionOutputCellLimit, cfg.SQLExpressionTimeout) } // NeedsVars returns the variable names (refIds) that are dependencies @@ -111,6 +113,7 @@ func (gr *SQLCommand) NeedsVars() []string { func (gr *SQLCommand) Execute(ctx context.Context, now time.Time, vars mathexp.Vars, tracer tracing.Tracer, metrics *metrics.ExprMetrics) (mathexp.Results, error) { _, span := tracer.Start(ctx, "SSE.ExecuteSQL") start := time.Now() + sqlLogger := backend.NewLoggerWith("logger", "expr.sql").FromContext(ctx) tc := int64(0) rsp := mathexp.Results{} @@ -122,6 +125,7 @@ func (gr *SQLCommand) Execute(ctx context.Context, now time.Time, vars mathexp.V statusLabel = "error" span.RecordError(rsp.Error) span.SetStatus(codes.Error, rsp.Error.Error()) + sqlLogger.Error("SQL command execution failed", "error", rsp.Error.Error()) } span.End() @@ -134,7 +138,7 @@ func (gr *SQLCommand) Execute(ctx context.Context, now time.Time, vars mathexp.V for _, ref := range gr.varsToQuery { results, ok := vars[ref] if !ok { - logger.Warn("no results found for", "ref", ref) + sqlLogger.Warn("no results found for", "ref", ref) continue } frames := results.Values.AsDataFrames(ref) @@ -153,16 +157,16 @@ func (gr *SQLCommand) Execute(ctx context.Context, now time.Time, vars mathexp.V return rsp, nil } - logger.Debug("Executing query", "query", gr.query, "frames", len(allFrames)) + sqlLogger.Debug("Executing query", "query", gr.query, "frames", len(allFrames)) db := sql.DB{} frame, err := db.QueryFrames(ctx, tracer, gr.refID, gr.query, allFrames, sql.WithMaxOutputCells(gr.outputLimit), sql.WithTimeout(gr.timeout)) if err != nil { - logger.Error("Failed to query frames", "error", err.Error()) rsp.Error = err return rsp, nil } - logger.Debug("Done Executing query", "query", gr.query, "rows", frame.Rows()) + + sqlLogger.Debug("Done Executing query", "query", gr.query, "rows", frame.Rows()) if frame.Rows() == 0 { rsp.Values = mathexp.Values{ diff --git a/pkg/expr/sql_command_test.go b/pkg/expr/sql_command_test.go index c4ecdc480f6..dd9b274a8cd 100644 --- a/pkg/expr/sql_command_test.go +++ b/pkg/expr/sql_command_test.go @@ -18,7 +18,7 @@ import ( ) func TestNewCommand(t *testing.T) { - cmd, err := NewSQLCommand("a", "", "select a from foo, bar", 0, 0, 0) + cmd, err := NewSQLCommand(t.Context(), "a", "", "select a from foo, bar", 0, 0, 0) if err != nil && strings.Contains(err.Error(), "feature is not enabled") { return } @@ -126,7 +126,7 @@ func TestSQLCommandCellLimits(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - cmd, err := NewSQLCommand("a", "", "select a from foo, bar", tt.limit, 0, 0) + cmd, err := NewSQLCommand(t.Context(), "a", "", "select a from foo, bar", tt.limit, 0, 0) require.NoError(t, err, "Failed to create SQL command") vars := mathexp.Vars{} @@ -154,7 +154,7 @@ func TestSQLCommandMetrics(t *testing.T) { m := metrics.NewTestMetrics() // Create a command - cmd, err := NewSQLCommand("A", "someformat", "select * from foo", 0, 0, 0) + cmd, err := NewSQLCommand(t.Context(), "A", "someformat", "select * from foo", 0, 0, 0) require.NoError(t, err) // Execute successful command diff --git a/pkg/expr/transform.go b/pkg/expr/transform.go index db49522eff1..049bcb117a4 100644 --- a/pkg/expr/transform.go +++ b/pkg/expr/transform.go @@ -88,7 +88,7 @@ func (s *Service) TransformData(ctx context.Context, now time.Time, req *Request // Build the pipeline from the request, checking for ordering issues (e.g. loops) // and parsing graph nodes from the queries. - pipeline, err := s.BuildPipeline(req) + pipeline, err := s.BuildPipeline(ctx, req) if err != nil { return nil, err } diff --git a/pkg/services/ngalert/eval/eval.go b/pkg/services/ngalert/eval/eval.go index fec321ddd11..67aa263b24a 100644 --- a/pkg/services/ngalert/eval/eval.go +++ b/pkg/services/ngalert/eval/eval.go @@ -45,7 +45,7 @@ type expressionExecutor interface { type expressionBuilder interface { expressionExecutor - BuildPipeline(req *expr.Request) (expr.DataPipeline, error) + BuildPipeline(ctx context.Context, req *expr.Request) (expr.DataPipeline, error) } type conditionEvaluator struct { @@ -849,11 +849,11 @@ func (e *evaluatorImpl) Create(ctx EvaluationContext, condition models.Condition if err != nil { return nil, err } - return e.create(condition, req) + return e.create(ctx.Ctx, condition, req) } -func (e *evaluatorImpl) create(condition models.Condition, req *expr.Request) (ConditionEvaluator, error) { - pipeline, err := e.expressionService.BuildPipeline(req) +func (e *evaluatorImpl) create(ctx context.Context, condition models.Condition, req *expr.Request) (ConditionEvaluator, error) { + pipeline, err := e.expressionService.BuildPipeline(ctx, req) if err != nil { return nil, err } diff --git a/pkg/services/ngalert/eval/eval_test.go b/pkg/services/ngalert/eval/eval_test.go index 15a9d464b15..a93ecacd64d 100644 --- a/pkg/services/ngalert/eval/eval_test.go +++ b/pkg/services/ngalert/eval/eval_test.go @@ -1539,7 +1539,7 @@ func TestCreate(t *testing.T) { factory := evaluatorImpl{ expressionService: fakeExpressionService{ - buildHook: func(req *expr.Request) (expr.DataPipeline, error) { + buildHook: func(ctx context.Context, req *expr.Request) (expr.DataPipeline, error) { if request != nil { assert.Fail(t, "BuildPipeline was called twice but should be only once") } @@ -1562,15 +1562,15 @@ func TestCreate(t *testing.T) { type fakeExpressionService struct { hook func(ctx context.Context, now time.Time, pipeline expr.DataPipeline) (*backend.QueryDataResponse, error) - buildHook func(req *expr.Request) (expr.DataPipeline, error) + buildHook func(ctx context.Context, req *expr.Request) (expr.DataPipeline, error) } func (f fakeExpressionService) ExecutePipeline(ctx context.Context, now time.Time, pipeline expr.DataPipeline) (*backend.QueryDataResponse, error) { return f.hook(ctx, now, pipeline) } -func (f fakeExpressionService) BuildPipeline(req *expr.Request) (expr.DataPipeline, error) { - return f.buildHook(req) +func (f fakeExpressionService) BuildPipeline(ctx context.Context, req *expr.Request) (expr.DataPipeline, error) { + return f.buildHook(ctx, req) } type fakeNode struct { diff --git a/pkg/services/ngalert/eval/validate.go b/pkg/services/ngalert/eval/validate.go index 37c58385102..573426f06c1 100644 --- a/pkg/services/ngalert/eval/validate.go +++ b/pkg/services/ngalert/eval/validate.go @@ -50,7 +50,7 @@ func (e *ConditionValidator) Validate(ctx EvaluationContext, condition models.Co case expr.TypeCMDNode: } } - pipeline, err := e.expressionService.BuildPipeline(req) + pipeline, err := e.expressionService.BuildPipeline(ctx.Ctx, req) if err != nil { return err }