Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 27fe7f642e |
@@ -849,6 +849,25 @@ func (alertRule *AlertRule) Type() RuleType {
|
||||
return RuleTypeAlerting
|
||||
}
|
||||
|
||||
// GetRecordingRuleEvaluationOffset returns the evaluation offset for a recording rule
|
||||
// derived from the source query's RelativeTimeRange.To.
|
||||
func (alertRule *AlertRule) GetRecordingRuleEvaluationOffset() time.Duration {
|
||||
if alertRule.Record == nil {
|
||||
return 0
|
||||
}
|
||||
|
||||
for _, query := range alertRule.Data {
|
||||
if query.RefID == alertRule.Record.From {
|
||||
if isExpr, _ := query.IsExpression(); isExpr {
|
||||
return 0
|
||||
}
|
||||
return time.Duration(query.RelativeTimeRange.To)
|
||||
}
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
|
||||
// Copy creates and returns a deep copy of the AlertRule instance, duplicating all fields and nested data structures.
|
||||
func (alertRule *AlertRule) Copy() *AlertRule {
|
||||
if alertRule == nil {
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
"go.yaml.in/yaml/v3"
|
||||
"golang.org/x/exp/maps"
|
||||
|
||||
"github.com/grafana/grafana/pkg/expr"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/util"
|
||||
"github.com/grafana/grafana/pkg/util/cmputil"
|
||||
@@ -1448,3 +1449,107 @@ func TestWithoutPrivateLabels(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestAlertRule_GetRecordingRuleEvaluationOffset(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
rule *AlertRule
|
||||
expected time.Duration
|
||||
}{
|
||||
{
|
||||
name: "recording rule with offset",
|
||||
rule: &AlertRule{
|
||||
Record: &Record{
|
||||
From: "A",
|
||||
Metric: "test_metric",
|
||||
TargetDatasourceUID: "prometheus-uid",
|
||||
},
|
||||
Data: []AlertQuery{
|
||||
{
|
||||
RefID: "A",
|
||||
DatasourceUID: "prometheus-uid",
|
||||
RelativeTimeRange: RelativeTimeRange{
|
||||
From: Duration(10 * time.Minute),
|
||||
To: Duration(5 * time.Minute),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: 5 * time.Minute,
|
||||
},
|
||||
{
|
||||
name: "recording rule without offset",
|
||||
rule: &AlertRule{
|
||||
Record: &Record{
|
||||
From: "A",
|
||||
Metric: "test_metric",
|
||||
TargetDatasourceUID: "prometheus-uid",
|
||||
},
|
||||
Data: []AlertQuery{
|
||||
{
|
||||
RefID: "A",
|
||||
DatasourceUID: "prometheus-uid",
|
||||
RelativeTimeRange: RelativeTimeRange{
|
||||
From: Duration(10 * time.Minute),
|
||||
To: Duration(0),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: 0,
|
||||
},
|
||||
{
|
||||
name: "recording rule with expression source",
|
||||
rule: &AlertRule{
|
||||
Record: &Record{
|
||||
From: "B",
|
||||
Metric: "test_metric",
|
||||
TargetDatasourceUID: "prometheus-uid",
|
||||
},
|
||||
Data: []AlertQuery{
|
||||
{
|
||||
RefID: "A",
|
||||
DatasourceUID: "prometheus-uid",
|
||||
RelativeTimeRange: RelativeTimeRange{
|
||||
From: Duration(10 * time.Minute),
|
||||
To: Duration(5 * time.Minute),
|
||||
},
|
||||
},
|
||||
{
|
||||
RefID: "B",
|
||||
DatasourceUID: expr.DatasourceUID,
|
||||
RelativeTimeRange: RelativeTimeRange{
|
||||
From: Duration(10 * time.Minute),
|
||||
To: Duration(5 * time.Minute),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: 0,
|
||||
},
|
||||
{
|
||||
name: "not a recording rule",
|
||||
rule: &AlertRule{
|
||||
Record: nil,
|
||||
Data: []AlertQuery{
|
||||
{
|
||||
RefID: "A",
|
||||
DatasourceUID: "prometheus-uid",
|
||||
RelativeTimeRange: RelativeTimeRange{
|
||||
From: Duration(10 * time.Minute),
|
||||
To: Duration(5 * time.Minute),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: 0,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got := tt.rule.GetRecordingRuleEvaluationOffset()
|
||||
require.Equal(t, tt.expected, got)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -298,8 +298,15 @@ func (r *recordingRule) tryEvaluation(ctx context.Context, ev *Evaluation, logge
|
||||
}
|
||||
|
||||
filteredLabels := ngmodels.WithoutPrivateLabels(ev.rule.Labels)
|
||||
|
||||
// Calculate write timestamp: scheduledAt - evaluation offset
|
||||
// This matches Prometheus behavior where the metric timestamp reflects
|
||||
// when the data was actually queried, not when the evaluation was scheduled.
|
||||
evaluationOffset := ev.rule.GetRecordingRuleEvaluationOffset()
|
||||
writeTimestamp := ev.scheduledAt.Add(-evaluationOffset)
|
||||
|
||||
writeStart := r.clock.Now()
|
||||
err = r.writer.WriteDatasource(ctx, ev.rule.Record.TargetDatasourceUID, ev.rule.Record.Metric, ev.scheduledAt, frames, ev.rule.OrgID, filteredLabels)
|
||||
err = r.writer.WriteDatasource(ctx, ev.rule.Record.TargetDatasourceUID, ev.rule.Record.Metric, writeTimestamp, frames, ev.rule.OrgID, filteredLabels)
|
||||
writeDur := r.clock.Now().Sub(writeStart)
|
||||
|
||||
if err != nil {
|
||||
@@ -308,7 +315,7 @@ func (r *recordingRule) tryEvaluation(ctx context.Context, ev *Evaluation, logge
|
||||
return fmt.Errorf("remote write failed: %w", err)
|
||||
}
|
||||
|
||||
logger.Debug("Metrics written", "duration", writeDur)
|
||||
logger.Debug("Metrics written", "duration", writeDur, "evaluationOffset", evaluationOffset, "writeTimestamp", writeTimestamp)
|
||||
span.AddEvent("metrics written", trace.WithAttributes(
|
||||
attribute.Int64("frames", int64(len(frames))),
|
||||
))
|
||||
|
||||
@@ -17,11 +17,13 @@ import (
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/testutil"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/atomic"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/identity"
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
@@ -29,6 +31,8 @@ import (
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/services/datasources"
|
||||
dsfakes "github.com/grafana/grafana/pkg/services/datasources/fakes"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/eval"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/eval/eval_mocks"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
||||
models "github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/writer"
|
||||
@@ -832,6 +836,74 @@ func testRecordingRule_Integration(t *testing.T, writeTarget *writer.TestRemoteW
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
testWriteTimestamp := func(t *testing.T, evaluationOffset time.Duration) {
|
||||
writeTarget.Reset()
|
||||
scheduledAt := time.Date(2024, 1, 15, 12, 0, 0, 0, time.UTC)
|
||||
|
||||
mockEvalFactory := createMockEvaluatorFactory(t, "A")
|
||||
|
||||
testRuleStore := newFakeRulesStore()
|
||||
testReg := prometheus.NewPedanticRegistry()
|
||||
testSch := setupScheduler(t, testRuleStore, nil, testReg, nil, mockEvalFactory, nil, withSchedulerClock(clk))
|
||||
testSch.recordingWriter = writer
|
||||
|
||||
// Create rule with evaluation offset
|
||||
rule := gen.GenerateRef()
|
||||
rule.Record.TargetDatasourceUID = dsUID
|
||||
rule.Record.From = "A"
|
||||
rule.Data = []models.AlertQuery{
|
||||
{
|
||||
RefID: "A",
|
||||
DatasourceUID: "prometheus-uid",
|
||||
Model: json.RawMessage(`{"expr": "up"}`),
|
||||
RelativeTimeRange: models.RelativeTimeRange{
|
||||
From: models.Duration(10 * time.Minute),
|
||||
To: models.Duration(evaluationOffset),
|
||||
},
|
||||
},
|
||||
}
|
||||
testRuleStore.PutRule(context.Background(), rule)
|
||||
|
||||
folderTitle := testRuleStore.getNamespaceTitle(rule.NamespaceUID)
|
||||
ruleFactory := ruleFactoryFromScheduler(testSch)
|
||||
|
||||
process := ruleFactory.new(context.Background(), ruleWithFolder{rule: rule, folderTitle: ""})
|
||||
evalDoneChan := make(chan time.Time)
|
||||
process.(*recordingRule).evalAppliedHook = func(_ models.AlertRuleKey, t time.Time) {
|
||||
evalDoneChan <- t
|
||||
}
|
||||
|
||||
go func() {
|
||||
_ = process.Run()
|
||||
}()
|
||||
|
||||
process.Eval(&Evaluation{
|
||||
scheduledAt: scheduledAt,
|
||||
rule: rule,
|
||||
folderTitle: folderTitle,
|
||||
})
|
||||
_ = waitForTimeChannel(t, evalDoneChan)
|
||||
process.Stop(nil)
|
||||
|
||||
require.NotEmpty(t, writeTarget.LastRequestBody)
|
||||
writeRequest := decodePrometheusWriteRequest(t, writeTarget.LastRequestBody)
|
||||
require.NotEmpty(t, writeRequest.Timeseries)
|
||||
|
||||
// Verify timestamp on first sample equals scheduledAt - offset
|
||||
expectedWriteTimestamp := scheduledAt.Add(-evaluationOffset)
|
||||
actualTimestamp, found := getFirstSampleTimestamp(t, writeRequest)
|
||||
require.True(t, found)
|
||||
require.Equal(t, expectedWriteTimestamp.UnixMilli(), actualTimestamp.UnixMilli())
|
||||
}
|
||||
|
||||
t.Run("write timestamp without evaluation offset", func(t *testing.T) {
|
||||
testWriteTimestamp(t, 0)
|
||||
})
|
||||
|
||||
t.Run("write timestamp with evaluation offset", func(t *testing.T) {
|
||||
testWriteTimestamp(t, 5*time.Minute)
|
||||
})
|
||||
}
|
||||
|
||||
func withQueryForHealth(health string) models.AlertRuleMutator {
|
||||
@@ -922,6 +994,40 @@ func decodePrometheusWriteRequest(t *testing.T, data string) *prompb.WriteReques
|
||||
return &writeReq
|
||||
}
|
||||
|
||||
func getFirstSampleTimestamp(t *testing.T, req *prompb.WriteRequest) (time.Time, bool) {
|
||||
t.Helper()
|
||||
|
||||
for _, ts := range req.Timeseries {
|
||||
if len(ts.Samples) > 0 {
|
||||
return time.UnixMilli(ts.Samples[0].Timestamp), true
|
||||
}
|
||||
}
|
||||
return time.Time{}, false
|
||||
}
|
||||
|
||||
func createMockEvaluatorFactory(t *testing.T, refID string) eval.EvaluatorFactory {
|
||||
t.Helper()
|
||||
|
||||
// Create a frame with proper metadata for the writer to process
|
||||
frame := data.NewFrame("test",
|
||||
data.NewField("value", nil, []float64{42}),
|
||||
)
|
||||
frame.SetMeta(&data.FrameMeta{
|
||||
Type: data.FrameTypeNumericWide,
|
||||
})
|
||||
|
||||
mockEval := &eval_mocks.ConditionEvaluatorMock{}
|
||||
mockEval.EXPECT().EvaluateRaw(mock.Anything, mock.Anything).Return(&backend.QueryDataResponse{
|
||||
Responses: map[string]backend.DataResponse{
|
||||
refID: {
|
||||
Frames: data.Frames{frame},
|
||||
},
|
||||
},
|
||||
}, nil)
|
||||
|
||||
return eval_mocks.NewEvaluatorFactory(mockEval)
|
||||
}
|
||||
|
||||
func getLabel(req *prompb.WriteRequest, labelName string) (string, bool) {
|
||||
for _, ts := range req.Timeseries {
|
||||
for _, label := range ts.Labels {
|
||||
|
||||
Reference in New Issue
Block a user