Compare commits

...

1 Commits

Author SHA1 Message Date
Alexander Akhmetov 27fe7f642e Alerting: Fix writing recording rules with query offset 2026-01-08 20:08:30 +01:00
4 changed files with 239 additions and 2 deletions
+19
View File
@@ -849,6 +849,25 @@ func (alertRule *AlertRule) Type() RuleType {
return RuleTypeAlerting
}
// GetRecordingRuleEvaluationOffset returns the evaluation offset for a recording rule
// derived from the source query's RelativeTimeRange.To.
func (alertRule *AlertRule) GetRecordingRuleEvaluationOffset() time.Duration {
if alertRule.Record == nil {
return 0
}
for _, query := range alertRule.Data {
if query.RefID == alertRule.Record.From {
if isExpr, _ := query.IsExpression(); isExpr {
return 0
}
return time.Duration(query.RelativeTimeRange.To)
}
}
return 0
}
// Copy creates and returns a deep copy of the AlertRule instance, duplicating all fields and nested data structures.
func (alertRule *AlertRule) Copy() *AlertRule {
if alertRule == nil {
@@ -18,6 +18,7 @@ import (
"go.yaml.in/yaml/v3"
"golang.org/x/exp/maps"
"github.com/grafana/grafana/pkg/expr"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/util"
"github.com/grafana/grafana/pkg/util/cmputil"
@@ -1448,3 +1449,107 @@ func TestWithoutPrivateLabels(t *testing.T) {
})
}
}
func TestAlertRule_GetRecordingRuleEvaluationOffset(t *testing.T) {
tests := []struct {
name string
rule *AlertRule
expected time.Duration
}{
{
name: "recording rule with offset",
rule: &AlertRule{
Record: &Record{
From: "A",
Metric: "test_metric",
TargetDatasourceUID: "prometheus-uid",
},
Data: []AlertQuery{
{
RefID: "A",
DatasourceUID: "prometheus-uid",
RelativeTimeRange: RelativeTimeRange{
From: Duration(10 * time.Minute),
To: Duration(5 * time.Minute),
},
},
},
},
expected: 5 * time.Minute,
},
{
name: "recording rule without offset",
rule: &AlertRule{
Record: &Record{
From: "A",
Metric: "test_metric",
TargetDatasourceUID: "prometheus-uid",
},
Data: []AlertQuery{
{
RefID: "A",
DatasourceUID: "prometheus-uid",
RelativeTimeRange: RelativeTimeRange{
From: Duration(10 * time.Minute),
To: Duration(0),
},
},
},
},
expected: 0,
},
{
name: "recording rule with expression source",
rule: &AlertRule{
Record: &Record{
From: "B",
Metric: "test_metric",
TargetDatasourceUID: "prometheus-uid",
},
Data: []AlertQuery{
{
RefID: "A",
DatasourceUID: "prometheus-uid",
RelativeTimeRange: RelativeTimeRange{
From: Duration(10 * time.Minute),
To: Duration(5 * time.Minute),
},
},
{
RefID: "B",
DatasourceUID: expr.DatasourceUID,
RelativeTimeRange: RelativeTimeRange{
From: Duration(10 * time.Minute),
To: Duration(5 * time.Minute),
},
},
},
},
expected: 0,
},
{
name: "not a recording rule",
rule: &AlertRule{
Record: nil,
Data: []AlertQuery{
{
RefID: "A",
DatasourceUID: "prometheus-uid",
RelativeTimeRange: RelativeTimeRange{
From: Duration(10 * time.Minute),
To: Duration(5 * time.Minute),
},
},
},
},
expected: 0,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := tt.rule.GetRecordingRuleEvaluationOffset()
require.Equal(t, tt.expected, got)
})
}
}
@@ -298,8 +298,15 @@ func (r *recordingRule) tryEvaluation(ctx context.Context, ev *Evaluation, logge
}
filteredLabels := ngmodels.WithoutPrivateLabels(ev.rule.Labels)
// Calculate write timestamp: scheduledAt - evaluation offset
// This matches Prometheus behavior where the metric timestamp reflects
// when the data was actually queried, not when the evaluation was scheduled.
evaluationOffset := ev.rule.GetRecordingRuleEvaluationOffset()
writeTimestamp := ev.scheduledAt.Add(-evaluationOffset)
writeStart := r.clock.Now()
err = r.writer.WriteDatasource(ctx, ev.rule.Record.TargetDatasourceUID, ev.rule.Record.Metric, ev.scheduledAt, frames, ev.rule.OrgID, filteredLabels)
err = r.writer.WriteDatasource(ctx, ev.rule.Record.TargetDatasourceUID, ev.rule.Record.Metric, writeTimestamp, frames, ev.rule.OrgID, filteredLabels)
writeDur := r.clock.Now().Sub(writeStart)
if err != nil {
@@ -308,7 +315,7 @@ func (r *recordingRule) tryEvaluation(ctx context.Context, ev *Evaluation, logge
return fmt.Errorf("remote write failed: %w", err)
}
logger.Debug("Metrics written", "duration", writeDur)
logger.Debug("Metrics written", "duration", writeDur, "evaluationOffset", evaluationOffset, "writeTimestamp", writeTimestamp)
span.AddEvent("metrics written", trace.WithAttributes(
attribute.Int64("frames", int64(len(frames))),
))
@@ -17,11 +17,13 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/components/simplejson"
@@ -29,6 +31,8 @@ import (
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/datasources"
dsfakes "github.com/grafana/grafana/pkg/services/datasources/fakes"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/services/ngalert/eval/eval_mocks"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
models "github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/writer"
@@ -832,6 +836,74 @@ func testRecordingRule_Integration(t *testing.T, writeTarget *writer.TestRemoteW
}
})
})
testWriteTimestamp := func(t *testing.T, evaluationOffset time.Duration) {
writeTarget.Reset()
scheduledAt := time.Date(2024, 1, 15, 12, 0, 0, 0, time.UTC)
mockEvalFactory := createMockEvaluatorFactory(t, "A")
testRuleStore := newFakeRulesStore()
testReg := prometheus.NewPedanticRegistry()
testSch := setupScheduler(t, testRuleStore, nil, testReg, nil, mockEvalFactory, nil, withSchedulerClock(clk))
testSch.recordingWriter = writer
// Create rule with evaluation offset
rule := gen.GenerateRef()
rule.Record.TargetDatasourceUID = dsUID
rule.Record.From = "A"
rule.Data = []models.AlertQuery{
{
RefID: "A",
DatasourceUID: "prometheus-uid",
Model: json.RawMessage(`{"expr": "up"}`),
RelativeTimeRange: models.RelativeTimeRange{
From: models.Duration(10 * time.Minute),
To: models.Duration(evaluationOffset),
},
},
}
testRuleStore.PutRule(context.Background(), rule)
folderTitle := testRuleStore.getNamespaceTitle(rule.NamespaceUID)
ruleFactory := ruleFactoryFromScheduler(testSch)
process := ruleFactory.new(context.Background(), ruleWithFolder{rule: rule, folderTitle: ""})
evalDoneChan := make(chan time.Time)
process.(*recordingRule).evalAppliedHook = func(_ models.AlertRuleKey, t time.Time) {
evalDoneChan <- t
}
go func() {
_ = process.Run()
}()
process.Eval(&Evaluation{
scheduledAt: scheduledAt,
rule: rule,
folderTitle: folderTitle,
})
_ = waitForTimeChannel(t, evalDoneChan)
process.Stop(nil)
require.NotEmpty(t, writeTarget.LastRequestBody)
writeRequest := decodePrometheusWriteRequest(t, writeTarget.LastRequestBody)
require.NotEmpty(t, writeRequest.Timeseries)
// Verify timestamp on first sample equals scheduledAt - offset
expectedWriteTimestamp := scheduledAt.Add(-evaluationOffset)
actualTimestamp, found := getFirstSampleTimestamp(t, writeRequest)
require.True(t, found)
require.Equal(t, expectedWriteTimestamp.UnixMilli(), actualTimestamp.UnixMilli())
}
t.Run("write timestamp without evaluation offset", func(t *testing.T) {
testWriteTimestamp(t, 0)
})
t.Run("write timestamp with evaluation offset", func(t *testing.T) {
testWriteTimestamp(t, 5*time.Minute)
})
}
func withQueryForHealth(health string) models.AlertRuleMutator {
@@ -922,6 +994,40 @@ func decodePrometheusWriteRequest(t *testing.T, data string) *prompb.WriteReques
return &writeReq
}
func getFirstSampleTimestamp(t *testing.T, req *prompb.WriteRequest) (time.Time, bool) {
t.Helper()
for _, ts := range req.Timeseries {
if len(ts.Samples) > 0 {
return time.UnixMilli(ts.Samples[0].Timestamp), true
}
}
return time.Time{}, false
}
func createMockEvaluatorFactory(t *testing.T, refID string) eval.EvaluatorFactory {
t.Helper()
// Create a frame with proper metadata for the writer to process
frame := data.NewFrame("test",
data.NewField("value", nil, []float64{42}),
)
frame.SetMeta(&data.FrameMeta{
Type: data.FrameTypeNumericWide,
})
mockEval := &eval_mocks.ConditionEvaluatorMock{}
mockEval.EXPECT().EvaluateRaw(mock.Anything, mock.Anything).Return(&backend.QueryDataResponse{
Responses: map[string]backend.DataResponse{
refID: {
Frames: data.Frames{frame},
},
},
}, nil)
return eval_mocks.NewEvaluatorFactory(mockEval)
}
func getLabel(req *prompb.WriteRequest, labelName string) (string, bool) {
for _, ts := range req.Timeseries {
for _, label := range ts.Labels {