Alerting: Notifiication history (#107644)
* Add unified_alerting.notification_history to ini files
* Parse notification history settings
* Move Loki client to a separate package
* Loki client: add params for metrics and traces
* add NotificationHistorian
* rm writeDuration
* remove RangeQuery stuff
* wip
* wip
* wip
* wip
* pass notification historian in tests
* unify loki settings
* unify loki settings
* add test
* update grafana/alerting
* make update-workspace
* add feature toggle
* fix configureNotificationHistorian
* Revert "add feature toggle"
This reverts commit de7af8f7
* add feature toggle
* more tests
* RuleUID
* fix metrics test
* met.Info.Set(0)
This commit is contained in:
@@ -8,6 +8,7 @@ import (
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/lokiclient"
|
||||
"golang.org/x/exp/constraints"
|
||||
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
@@ -44,7 +45,7 @@ type RuleStore interface {
|
||||
}
|
||||
|
||||
type lokiQueryClient interface {
|
||||
RangeQuery(ctx context.Context, query string, start, end, limit int64) (historian.QueryRes, error)
|
||||
RangeQuery(ctx context.Context, query string, start, end, limit int64) (lokiclient.QueryRes, error)
|
||||
MaxQuerySize() int
|
||||
}
|
||||
|
||||
@@ -60,14 +61,15 @@ func NewLokiHistorianStore(cfg setting.UnifiedAlertingStateHistorySettings, db d
|
||||
if !useStore(cfg) {
|
||||
return nil
|
||||
}
|
||||
lokiCfg, err := historian.NewLokiConfig(cfg)
|
||||
lokiCfg, err := lokiclient.NewLokiConfig(cfg.LokiSettings)
|
||||
if err != nil {
|
||||
// this config error is already handled elsewhere
|
||||
return nil
|
||||
}
|
||||
|
||||
metrics := ngmetrics.NewHistorianMetrics(reg, subsystem)
|
||||
return &LokiHistorianStore{
|
||||
client: historian.NewLokiClient(lokiCfg, historian.NewRequester(), ngmetrics.NewHistorianMetrics(reg, subsystem), log, tracer),
|
||||
client: lokiclient.NewLokiClient(lokiCfg, lokiclient.NewRequester(), metrics.BytesWritten, metrics.WriteDuration, log, tracer, historian.LokiClientSpanName),
|
||||
db: db,
|
||||
log: log,
|
||||
ruleStore: ruleStore,
|
||||
@@ -142,7 +144,7 @@ func (r *LokiHistorianStore) Get(ctx context.Context, query annotations.ItemQuer
|
||||
return items, err
|
||||
}
|
||||
|
||||
func (r *LokiHistorianStore) annotationsFromStream(stream historian.Stream, ac accesscontrol.AccessResources) []*annotations.ItemDTO {
|
||||
func (r *LokiHistorianStore) annotationsFromStream(stream lokiclient.Stream, ac accesscontrol.AccessResources) []*annotations.ItemDTO {
|
||||
items := make([]*annotations.ItemDTO, 0, len(stream.Values))
|
||||
for _, sample := range stream.Values {
|
||||
entry := historian.LokiEntry{}
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/lokiclient"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/exp/maps"
|
||||
@@ -84,7 +85,7 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) {
|
||||
t.Run("can query history by alert id", func(t *testing.T) {
|
||||
rule := dashboardRules[dashboard1.UID][0]
|
||||
|
||||
fakeLokiClient.rangeQueryRes = []historian.Stream{
|
||||
fakeLokiClient.rangeQueryRes = []lokiclient.Stream{
|
||||
historian.StatesToStream(ruleMetaFromRule(t, rule), transitions, map[string]string{}, log.NewNopLogger()),
|
||||
}
|
||||
|
||||
@@ -111,7 +112,7 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) {
|
||||
t.Run("can query history by alert uid", func(t *testing.T) {
|
||||
rule := dashboardRules[dashboard1.UID][0]
|
||||
|
||||
fakeLokiClient.rangeQueryRes = []historian.Stream{
|
||||
fakeLokiClient.rangeQueryRes = []lokiclient.Stream{
|
||||
historian.StatesToStream(ruleMetaFromRule(t, rule), transitions, map[string]string{}, log.NewNopLogger()),
|
||||
}
|
||||
|
||||
@@ -186,7 +187,7 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("can query history by dashboard id", func(t *testing.T) {
|
||||
fakeLokiClient.rangeQueryRes = []historian.Stream{
|
||||
fakeLokiClient.rangeQueryRes = []lokiclient.Stream{
|
||||
historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][0]), transitions, map[string]string{}, log.NewNopLogger()),
|
||||
historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][1]), transitions, map[string]string{}, log.NewNopLogger()),
|
||||
}
|
||||
@@ -212,7 +213,7 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("should return empty results when type is annotation", func(t *testing.T) {
|
||||
fakeLokiClient.rangeQueryRes = []historian.Stream{
|
||||
fakeLokiClient.rangeQueryRes = []lokiclient.Stream{
|
||||
historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][0]), transitions, map[string]string{}, log.NewNopLogger()),
|
||||
historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][1]), transitions, map[string]string{}, log.NewNopLogger()),
|
||||
}
|
||||
@@ -236,7 +237,7 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("should return empty results when history is outside time range", func(t *testing.T) {
|
||||
fakeLokiClient.rangeQueryRes = []historian.Stream{
|
||||
fakeLokiClient.rangeQueryRes = []lokiclient.Stream{
|
||||
historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][0]), transitions, map[string]string{}, log.NewNopLogger()),
|
||||
historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][1]), transitions, map[string]string{}, log.NewNopLogger()),
|
||||
}
|
||||
@@ -262,7 +263,7 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("should return partial results when history is partly outside clamped time range", func(t *testing.T) {
|
||||
fakeLokiClient.rangeQueryRes = []historian.Stream{
|
||||
fakeLokiClient.rangeQueryRes = []lokiclient.Stream{
|
||||
historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][0]), transitions, map[string]string{}, log.NewNopLogger()),
|
||||
historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][1]), transitions, map[string]string{}, log.NewNopLogger()),
|
||||
}
|
||||
@@ -295,7 +296,7 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("should sort history by time and be able to query by dashboard uid", func(t *testing.T) {
|
||||
fakeLokiClient.rangeQueryRes = []historian.Stream{
|
||||
fakeLokiClient.rangeQueryRes = []lokiclient.Stream{
|
||||
historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][0]), transitions, map[string]string{}, log.NewNopLogger()),
|
||||
historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][1]), transitions, map[string]string{}, log.NewNopLogger()),
|
||||
}
|
||||
@@ -329,7 +330,7 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("should return nothing if query is for tags only", func(t *testing.T) {
|
||||
fakeLokiClient.rangeQueryRes = []historian.Stream{
|
||||
fakeLokiClient.rangeQueryRes = []lokiclient.Stream{
|
||||
historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][0]), transitions, map[string]string{}, log.NewNopLogger()),
|
||||
historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][1]), transitions, map[string]string{}, log.NewNopLogger()),
|
||||
}
|
||||
@@ -360,13 +361,13 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) {
|
||||
store := createTestLokiStore(t, sql, fakeLokiClient)
|
||||
|
||||
t.Run("should return empty list when no streams", func(t *testing.T) {
|
||||
items := store.annotationsFromStream(historian.Stream{}, annotation_ac.AccessResources{})
|
||||
items := store.annotationsFromStream(lokiclient.Stream{}, annotation_ac.AccessResources{})
|
||||
require.Empty(t, items)
|
||||
})
|
||||
|
||||
t.Run("should return empty list when no entries", func(t *testing.T) {
|
||||
items := store.annotationsFromStream(historian.Stream{
|
||||
Values: []historian.Sample{},
|
||||
items := store.annotationsFromStream(lokiclient.Stream{
|
||||
Values: []lokiclient.Sample{},
|
||||
}, annotation_ac.AccessResources{})
|
||||
require.Empty(t, items)
|
||||
})
|
||||
@@ -419,7 +420,7 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) {
|
||||
rule = createAlertRule(t, sql, "Test rule", gen)
|
||||
stream2 := historian.StatesToStream(ruleMetaFromRule(t, rule), transitions, map[string]string{}, log.NewNopLogger())
|
||||
|
||||
stream := historian.Stream{
|
||||
stream := lokiclient.Stream{
|
||||
Values: append(stream1.Values, stream2.Values...),
|
||||
Stream: stream1.Stream,
|
||||
}
|
||||
@@ -450,7 +451,7 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) {
|
||||
rule.DashboardUID = nil
|
||||
stream2 := historian.StatesToStream(ruleMetaFromRule(t, rule), transitions, map[string]string{}, log.NewNopLogger())
|
||||
|
||||
stream := historian.Stream{
|
||||
stream := lokiclient.Stream{
|
||||
Values: append(stream1.Values, stream2.Values...),
|
||||
Stream: stream1.Stream,
|
||||
}
|
||||
@@ -811,23 +812,23 @@ func compareAnnotationItem(t *testing.T, expected, actual *annotations.ItemDTO)
|
||||
|
||||
type FakeLokiClient struct {
|
||||
client client.Requester
|
||||
cfg historian.LokiConfig
|
||||
cfg lokiclient.LokiConfig
|
||||
metrics *metrics.Historian
|
||||
log log.Logger
|
||||
rangeQueryRes []historian.Stream
|
||||
rangeQueryRes []lokiclient.Stream
|
||||
}
|
||||
|
||||
func NewFakeLokiClient() *FakeLokiClient {
|
||||
url, _ := url.Parse("http://some.url")
|
||||
req := historian.NewFakeRequester()
|
||||
req := lokiclient.NewFakeRequester()
|
||||
metrics := metrics.NewHistorianMetrics(prometheus.NewRegistry(), "annotations_test")
|
||||
|
||||
return &FakeLokiClient{
|
||||
client: client.NewTimedClient(req, metrics.WriteDuration),
|
||||
cfg: historian.LokiConfig{
|
||||
cfg: lokiclient.LokiConfig{
|
||||
WritePathURL: url,
|
||||
ReadPathURL: url,
|
||||
Encoder: historian.JsonEncoder{},
|
||||
Encoder: lokiclient.JsonEncoder{},
|
||||
MaxQueryLength: 721 * time.Hour,
|
||||
MaxQuerySize: 65536,
|
||||
},
|
||||
@@ -836,15 +837,15 @@ func NewFakeLokiClient() *FakeLokiClient {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *FakeLokiClient) RangeQuery(ctx context.Context, query string, from, to, limit int64) (historian.QueryRes, error) {
|
||||
streams := make([]historian.Stream, len(c.rangeQueryRes))
|
||||
func (c *FakeLokiClient) RangeQuery(ctx context.Context, query string, from, to, limit int64) (lokiclient.QueryRes, error) {
|
||||
streams := make([]lokiclient.Stream, len(c.rangeQueryRes))
|
||||
|
||||
// clamp time range using logic from historian
|
||||
from, to = historian.ClampRange(from, to, c.cfg.MaxQueryLength.Nanoseconds())
|
||||
from, to = lokiclient.ClampRange(from, to, c.cfg.MaxQueryLength.Nanoseconds())
|
||||
|
||||
for n, stream := range c.rangeQueryRes {
|
||||
streams[n].Stream = stream.Stream
|
||||
streams[n].Values = []historian.Sample{}
|
||||
streams[n].Values = []lokiclient.Sample{}
|
||||
for _, sample := range stream.Values {
|
||||
if sample.T.UnixNano() < from || sample.T.UnixNano() >= to { // matches Loki behavior
|
||||
continue
|
||||
@@ -853,14 +854,14 @@ func (c *FakeLokiClient) RangeQuery(ctx context.Context, query string, from, to,
|
||||
}
|
||||
}
|
||||
|
||||
res := historian.QueryRes{
|
||||
Data: historian.QueryData{
|
||||
res := lokiclient.QueryRes{
|
||||
Data: lokiclient.QueryData{
|
||||
Result: streams,
|
||||
},
|
||||
}
|
||||
|
||||
// reset expected streams on read
|
||||
c.rangeQueryRes = []historian.Stream{}
|
||||
c.rangeQueryRes = []lokiclient.Stream{}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -1800,6 +1800,15 @@ var (
|
||||
FrontendOnly: true,
|
||||
Owner: grafanaObservabilityLogsSquad,
|
||||
},
|
||||
{
|
||||
Name: "alertingNotificationHistory",
|
||||
Description: "Enables the notification history feature",
|
||||
Stage: FeatureStageExperimental,
|
||||
Owner: grafanaAlertingSquad,
|
||||
HideFromAdminPage: true,
|
||||
HideFromDocs: true,
|
||||
Expression: "false",
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@@ -233,3 +233,4 @@ enableAppChromeExtensions,experimental,@grafana/plugins-platform-backend,false,f
|
||||
foldersAppPlatformAPI,experimental,@grafana/grafana-search-navigate-organise,false,false,true
|
||||
enablePluginImporter,experimental,@grafana/plugins-platform-backend,false,false,true
|
||||
otelLogsFormatting,experimental,@grafana/observability-logs,false,false,true
|
||||
alertingNotificationHistory,experimental,@grafana/alerting-squad,false,false,false
|
||||
|
||||
|
@@ -942,4 +942,8 @@ const (
|
||||
// FlagOtelLogsFormatting
|
||||
// Applies OTel formatting templates to displayed logs
|
||||
FlagOtelLogsFormatting = "otelLogsFormatting"
|
||||
|
||||
// FlagAlertingNotificationHistory
|
||||
// Enables the notification history feature
|
||||
FlagAlertingNotificationHistory = "alertingNotificationHistory"
|
||||
)
|
||||
|
||||
@@ -294,6 +294,21 @@
|
||||
"expression": "true"
|
||||
}
|
||||
},
|
||||
{
|
||||
"metadata": {
|
||||
"name": "alertingNotificationHistory",
|
||||
"resourceVersion": "1752682072771",
|
||||
"creationTimestamp": "2025-07-16T16:07:52Z"
|
||||
},
|
||||
"spec": {
|
||||
"description": "Enables the notification history feature",
|
||||
"stage": "experimental",
|
||||
"codeowner": "@grafana/alerting-squad",
|
||||
"hideFromAdminPage": true,
|
||||
"hideFromDocs": true,
|
||||
"expression": "false"
|
||||
}
|
||||
},
|
||||
{
|
||||
"metadata": {
|
||||
"name": "alertingNotificationsStepMode",
|
||||
|
||||
@@ -611,6 +611,7 @@ func createMultiOrgAlertmanager(t *testing.T, configs map[int64]*ngmodels.AlertC
|
||||
log.New("testlogger"),
|
||||
secretsService,
|
||||
featuremgmt.WithManager(),
|
||||
nil,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
err = mam.LoadAndSyncAlertmanagersForOrgs(context.Background())
|
||||
|
||||
+18
-17
@@ -1,4 +1,4 @@
|
||||
package historian
|
||||
package lokiclient
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
@@ -11,11 +11,12 @@ import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/dskit/instrument"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/client"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const defaultPageSize = 1000
|
||||
@@ -45,7 +46,7 @@ type LokiConfig struct {
|
||||
MaxQuerySize int
|
||||
}
|
||||
|
||||
func NewLokiConfig(cfg setting.UnifiedAlertingStateHistorySettings) (LokiConfig, error) {
|
||||
func NewLokiConfig(cfg setting.UnifiedAlertingLokiSettings) (LokiConfig, error) {
|
||||
read, write := cfg.LokiReadURL, cfg.LokiWriteURL
|
||||
if read == "" {
|
||||
read = cfg.LokiRemoteURL
|
||||
@@ -85,11 +86,11 @@ func NewLokiConfig(cfg setting.UnifiedAlertingStateHistorySettings) (LokiConfig,
|
||||
}
|
||||
|
||||
type HttpLokiClient struct {
|
||||
client client.Requester
|
||||
encoder encoder
|
||||
cfg LokiConfig
|
||||
metrics *metrics.Historian
|
||||
log log.Logger
|
||||
client client.Requester
|
||||
encoder encoder
|
||||
cfg LokiConfig
|
||||
bytesWritten prometheus.Counter
|
||||
log log.Logger
|
||||
}
|
||||
|
||||
// Kind of Operation (=, !=, =~, !~)
|
||||
@@ -106,15 +107,15 @@ const (
|
||||
NeqRegEx Operator = "!~"
|
||||
)
|
||||
|
||||
func NewLokiClient(cfg LokiConfig, req client.Requester, metrics *metrics.Historian, logger log.Logger, tracer tracing.Tracer) *HttpLokiClient {
|
||||
tc := client.NewTimedClient(req, metrics.WriteDuration)
|
||||
trc := client.NewTracedClient(tc, tracer, "ngalert.historian.client")
|
||||
func NewLokiClient(cfg LokiConfig, req client.Requester, bytesWritten prometheus.Counter, writeDuration *instrument.HistogramCollector, logger log.Logger, tracer tracing.Tracer, spanName string) *HttpLokiClient {
|
||||
tc := client.NewTimedClient(req, writeDuration)
|
||||
trc := client.NewTracedClient(tc, tracer, spanName)
|
||||
return &HttpLokiClient{
|
||||
client: trc,
|
||||
encoder: cfg.Encoder,
|
||||
cfg: cfg,
|
||||
metrics: metrics,
|
||||
log: logger.New("protocol", "http"),
|
||||
client: trc,
|
||||
encoder: cfg.Encoder,
|
||||
cfg: cfg,
|
||||
bytesWritten: bytesWritten,
|
||||
log: logger.New("protocol", "http"),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -198,7 +199,7 @@ func (c *HttpLokiClient) Push(ctx context.Context, s []Stream) error {
|
||||
req.Header.Add(k, v)
|
||||
}
|
||||
|
||||
c.metrics.BytesWritten.Add(float64(len(enc)))
|
||||
c.bytesWritten.Add(float64(len(enc)))
|
||||
req = req.WithContext(ctx)
|
||||
resp, err := c.client.Do(req)
|
||||
if err != nil {
|
||||
+22
-18
@@ -1,4 +1,4 @@
|
||||
package historian
|
||||
package lokiclient
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
@@ -21,11 +21,13 @@ import (
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
)
|
||||
|
||||
const lokiClientSpanName = "testLokiClientSpanName"
|
||||
|
||||
func TestLokiConfig(t *testing.T) {
|
||||
t.Run("test URL options", func(t *testing.T) {
|
||||
type testCase struct {
|
||||
name string
|
||||
in setting.UnifiedAlertingStateHistorySettings
|
||||
in setting.UnifiedAlertingLokiSettings
|
||||
expRead string
|
||||
expWrite string
|
||||
expErr string
|
||||
@@ -34,7 +36,7 @@ func TestLokiConfig(t *testing.T) {
|
||||
cases := []testCase{
|
||||
{
|
||||
name: "remote url only",
|
||||
in: setting.UnifiedAlertingStateHistorySettings{
|
||||
in: setting.UnifiedAlertingLokiSettings{
|
||||
LokiRemoteURL: "http://url.com",
|
||||
},
|
||||
expRead: "http://url.com",
|
||||
@@ -42,7 +44,7 @@ func TestLokiConfig(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "separate urls",
|
||||
in: setting.UnifiedAlertingStateHistorySettings{
|
||||
in: setting.UnifiedAlertingLokiSettings{
|
||||
LokiReadURL: "http://read.url.com",
|
||||
LokiWriteURL: "http://write.url.com",
|
||||
},
|
||||
@@ -51,7 +53,7 @@ func TestLokiConfig(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "single fallback",
|
||||
in: setting.UnifiedAlertingStateHistorySettings{
|
||||
in: setting.UnifiedAlertingLokiSettings{
|
||||
LokiRemoteURL: "http://url.com",
|
||||
LokiReadURL: "http://read.url.com",
|
||||
},
|
||||
@@ -60,21 +62,21 @@ func TestLokiConfig(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "missing read",
|
||||
in: setting.UnifiedAlertingStateHistorySettings{
|
||||
in: setting.UnifiedAlertingLokiSettings{
|
||||
LokiWriteURL: "http://url.com",
|
||||
},
|
||||
expErr: "either read path URL or remote",
|
||||
},
|
||||
{
|
||||
name: "missing write",
|
||||
in: setting.UnifiedAlertingStateHistorySettings{
|
||||
in: setting.UnifiedAlertingLokiSettings{
|
||||
LokiReadURL: "http://url.com",
|
||||
},
|
||||
expErr: "either write path URL or remote",
|
||||
},
|
||||
{
|
||||
name: "invalid",
|
||||
in: setting.UnifiedAlertingStateHistorySettings{
|
||||
in: setting.UnifiedAlertingLokiSettings{
|
||||
LokiRemoteURL: "://://",
|
||||
},
|
||||
expErr: "failed to parse",
|
||||
@@ -95,7 +97,7 @@ func TestLokiConfig(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("captures external labels", func(t *testing.T) {
|
||||
set := setting.UnifiedAlertingStateHistorySettings{
|
||||
set := setting.UnifiedAlertingLokiSettings{
|
||||
LokiRemoteURL: "http://url.com",
|
||||
ExternalLabels: map[string]string{"a": "b"},
|
||||
}
|
||||
@@ -127,8 +129,8 @@ func TestLokiHTTPClient(t *testing.T) {
|
||||
err := client.Push(context.Background(), data)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Contains(t, "/loki/api/v1/push", req.lastRequest.URL.Path)
|
||||
sent := reqBody(t, req.lastRequest)
|
||||
require.Contains(t, "/loki/api/v1/push", req.LastRequest.URL.Path)
|
||||
sent := reqBody(t, req.LastRequest)
|
||||
exp := fmt.Sprintf(`{"streams": [{"stream": {}, "values": [["%d", "some line"]]}]}`, now.UnixNano())
|
||||
require.JSONEq(t, exp, sent)
|
||||
})
|
||||
@@ -149,7 +151,7 @@ func TestLokiHTTPClient(t *testing.T) {
|
||||
_, err := client.RangeQuery(context.Background(), q, now-100, now, 1100)
|
||||
|
||||
require.NoError(t, err)
|
||||
params := req.lastRequest.URL.Query()
|
||||
params := req.LastRequest.URL.Query()
|
||||
require.True(t, params.Has("limit"), "query params did not contain 'limit': %#v", params)
|
||||
require.Equal(t, fmt.Sprint(1100), params.Get("limit"))
|
||||
})
|
||||
@@ -169,7 +171,7 @@ func TestLokiHTTPClient(t *testing.T) {
|
||||
_, err := client.RangeQuery(context.Background(), q, now-100, now, 0)
|
||||
|
||||
require.NoError(t, err)
|
||||
params := req.lastRequest.URL.Query()
|
||||
params := req.LastRequest.URL.Query()
|
||||
require.True(t, params.Has("limit"), "query params did not contain 'limit': %#v", params)
|
||||
require.Equal(t, fmt.Sprint(defaultPageSize), params.Get("limit"))
|
||||
})
|
||||
@@ -189,7 +191,7 @@ func TestLokiHTTPClient(t *testing.T) {
|
||||
_, err := client.RangeQuery(context.Background(), q, now-100, now, -100)
|
||||
|
||||
require.NoError(t, err)
|
||||
params := req.lastRequest.URL.Query()
|
||||
params := req.LastRequest.URL.Query()
|
||||
require.True(t, params.Has("limit"), "query params did not contain 'limit': %#v", params)
|
||||
require.Equal(t, fmt.Sprint(defaultPageSize), params.Get("limit"))
|
||||
})
|
||||
@@ -209,7 +211,7 @@ func TestLokiHTTPClient(t *testing.T) {
|
||||
_, err := client.RangeQuery(context.Background(), q, now-100, now, maximumPageSize+1000)
|
||||
|
||||
require.NoError(t, err)
|
||||
params := req.lastRequest.URL.Query()
|
||||
params := req.LastRequest.URL.Query()
|
||||
require.True(t, params.Has("limit"), "query params did not contain 'limit': %#v", params)
|
||||
require.Equal(t, fmt.Sprint(maximumPageSize), params.Get("limit"))
|
||||
})
|
||||
@@ -224,11 +226,12 @@ func TestLokiHTTPClient_Manual(t *testing.T) {
|
||||
url, err := url.Parse("https://logs-prod-eu-west-0.grafana.net")
|
||||
require.NoError(t, err)
|
||||
|
||||
metrics := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem)
|
||||
client := NewLokiClient(LokiConfig{
|
||||
ReadPathURL: url,
|
||||
WritePathURL: url,
|
||||
Encoder: JsonEncoder{},
|
||||
}, NewRequester(), metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem), log.NewNopLogger(), tracing.InitializeTracerForTest())
|
||||
}, NewRequester(), metrics.BytesWritten, metrics.WriteDuration, log.NewNopLogger(), tracing.InitializeTracerForTest(), lokiClientSpanName)
|
||||
|
||||
// Unauthorized request should fail against Grafana Cloud.
|
||||
err = client.Ping(context.Background())
|
||||
@@ -250,13 +253,14 @@ func TestLokiHTTPClient_Manual(t *testing.T) {
|
||||
url, err := url.Parse("https://logs-prod-eu-west-0.grafana.net")
|
||||
require.NoError(t, err)
|
||||
|
||||
metrics := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem)
|
||||
client := NewLokiClient(LokiConfig{
|
||||
ReadPathURL: url,
|
||||
WritePathURL: url,
|
||||
BasicAuthUser: "<your_username>",
|
||||
BasicAuthPassword: "<your_password>",
|
||||
Encoder: JsonEncoder{},
|
||||
}, NewRequester(), metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem), log.NewNopLogger(), tracing.InitializeTracerForTest())
|
||||
}, NewRequester(), metrics.BytesWritten, metrics.WriteDuration, log.NewNopLogger(), tracing.InitializeTracerForTest(), lokiClientSpanName)
|
||||
|
||||
// When running on prem, you might need to set the tenant id,
|
||||
// so the x-scope-orgid header is set.
|
||||
@@ -390,7 +394,7 @@ func createTestLokiClient(req client.Requester) *HttpLokiClient {
|
||||
Encoder: JsonEncoder{},
|
||||
}
|
||||
met := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem)
|
||||
return NewLokiClient(cfg, req, met, log.NewNopLogger(), tracing.InitializeTracerForTest())
|
||||
return NewLokiClient(cfg, req, met.BytesWritten, met.WriteDuration, log.NewNopLogger(), tracing.InitializeTracerForTest(), lokiClientSpanName)
|
||||
}
|
||||
|
||||
func reqBody(t *testing.T, req *http.Request) string {
|
||||
+1
-1
@@ -1,4 +1,4 @@
|
||||
package historian
|
||||
package lokiclient
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
@@ -0,0 +1,45 @@
|
||||
package lokiclient
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
type FakeRequester struct {
|
||||
LastRequest *http.Request
|
||||
Resp *http.Response
|
||||
}
|
||||
|
||||
func NewFakeRequester() *FakeRequester {
|
||||
return &FakeRequester{
|
||||
Resp: &http.Response{
|
||||
Status: "200 OK",
|
||||
StatusCode: 200,
|
||||
Body: io.NopCloser(bytes.NewBufferString("")),
|
||||
ContentLength: int64(0),
|
||||
Header: make(http.Header, 0),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (f *FakeRequester) WithResponse(resp *http.Response) *FakeRequester {
|
||||
f.Resp = resp
|
||||
return f
|
||||
}
|
||||
|
||||
func (f *FakeRequester) Do(req *http.Request) (*http.Response, error) {
|
||||
f.LastRequest = req
|
||||
f.Resp.Request = req // Not concurrency-safe!
|
||||
return f.Resp, nil
|
||||
}
|
||||
|
||||
func BadResponse() *http.Response {
|
||||
return &http.Response{
|
||||
Status: "400 Bad Request",
|
||||
StatusCode: http.StatusBadRequest,
|
||||
Body: io.NopCloser(bytes.NewBufferString("")),
|
||||
ContentLength: int64(0),
|
||||
Header: make(http.Header, 0),
|
||||
}
|
||||
}
|
||||
@@ -25,26 +25,28 @@ type NGAlert struct {
|
||||
// Registerer is used by subcomponents which register their own metrics.
|
||||
Registerer prometheus.Registerer
|
||||
|
||||
schedulerMetrics *Scheduler
|
||||
stateMetrics *State
|
||||
multiOrgAlertmanagerMetrics *MultiOrgAlertmanager
|
||||
apiMetrics *API
|
||||
historianMetrics *Historian
|
||||
remoteAlertmanagerMetrics *RemoteAlertmanager
|
||||
remoteWriterMetrics *RemoteWriter
|
||||
schedulerMetrics *Scheduler
|
||||
stateMetrics *State
|
||||
multiOrgAlertmanagerMetrics *MultiOrgAlertmanager
|
||||
apiMetrics *API
|
||||
historianMetrics *Historian
|
||||
notificationHistorianMetrics *NotificationHistorian
|
||||
remoteAlertmanagerMetrics *RemoteAlertmanager
|
||||
remoteWriterMetrics *RemoteWriter
|
||||
}
|
||||
|
||||
// NewNGAlert manages the metrics of all the alerting components.
|
||||
func NewNGAlert(r prometheus.Registerer) *NGAlert {
|
||||
return &NGAlert{
|
||||
Registerer: r,
|
||||
schedulerMetrics: NewSchedulerMetrics(r),
|
||||
stateMetrics: NewStateMetrics(r),
|
||||
multiOrgAlertmanagerMetrics: NewMultiOrgAlertmanagerMetrics(r),
|
||||
apiMetrics: NewAPIMetrics(r),
|
||||
historianMetrics: NewHistorianMetrics(r, Subsystem),
|
||||
remoteAlertmanagerMetrics: NewRemoteAlertmanagerMetrics(r),
|
||||
remoteWriterMetrics: NewRemoteWriterMetrics(r),
|
||||
Registerer: r,
|
||||
schedulerMetrics: NewSchedulerMetrics(r),
|
||||
stateMetrics: NewStateMetrics(r),
|
||||
multiOrgAlertmanagerMetrics: NewMultiOrgAlertmanagerMetrics(r),
|
||||
apiMetrics: NewAPIMetrics(r),
|
||||
historianMetrics: NewHistorianMetrics(r, Subsystem),
|
||||
notificationHistorianMetrics: NewNotificationHistorianMetrics(r),
|
||||
remoteAlertmanagerMetrics: NewRemoteAlertmanagerMetrics(r),
|
||||
remoteWriterMetrics: NewRemoteWriterMetrics(r),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -68,6 +70,10 @@ func (ng *NGAlert) GetHistorianMetrics() *Historian {
|
||||
return ng.historianMetrics
|
||||
}
|
||||
|
||||
func (ng *NGAlert) GetNotificationHistorianMetrics() *NotificationHistorian {
|
||||
return ng.notificationHistorianMetrics
|
||||
}
|
||||
|
||||
func (ng *NGAlert) GetRemoteAlertmanagerMetrics() *RemoteAlertmanager {
|
||||
return ng.remoteAlertmanagerMetrics
|
||||
}
|
||||
|
||||
@@ -0,0 +1,51 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"github.com/grafana/dskit/instrument"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
)
|
||||
|
||||
type NotificationHistorian struct {
|
||||
Info prometheus.Gauge
|
||||
WritesTotal prometheus.Counter
|
||||
WritesFailed prometheus.Counter
|
||||
WriteDuration *instrument.HistogramCollector
|
||||
BytesWritten prometheus.Counter
|
||||
}
|
||||
|
||||
func NewNotificationHistorianMetrics(r prometheus.Registerer) *NotificationHistorian {
|
||||
return &NotificationHistorian{
|
||||
Info: promauto.With(r).NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: Namespace,
|
||||
Subsystem: Subsystem,
|
||||
Name: "notification_history_info",
|
||||
Help: "Information about the notification history store.",
|
||||
}),
|
||||
WritesTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
|
||||
Namespace: Namespace,
|
||||
Subsystem: Subsystem,
|
||||
Name: "notification_history_writes_total",
|
||||
Help: "The total number of notification history batches that were attempted to be written.",
|
||||
}),
|
||||
WritesFailed: promauto.With(r).NewCounter(prometheus.CounterOpts{
|
||||
Namespace: Namespace,
|
||||
Subsystem: Subsystem,
|
||||
Name: "notification_history_writes_failed_total",
|
||||
Help: "The total number of failed writes of notification history batches.",
|
||||
}),
|
||||
WriteDuration: instrument.NewHistogramCollector(promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
|
||||
Namespace: Namespace,
|
||||
Subsystem: Subsystem,
|
||||
Name: "notification_history_request_duration_seconds",
|
||||
Help: "Histogram of request durations to the notification history store.",
|
||||
Buckets: instrument.DefBuckets,
|
||||
}, instrument.HistogramCollectorBuckets)),
|
||||
BytesWritten: promauto.With(r).NewCounter(prometheus.CounterOpts{
|
||||
Namespace: Namespace,
|
||||
Subsystem: Subsystem,
|
||||
Name: "notification_history_writes_bytes_total",
|
||||
Help: "The total number of bytes sent within a batch to the notification history store.",
|
||||
}),
|
||||
}
|
||||
}
|
||||
@@ -7,6 +7,8 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/benbjohnson/clock"
|
||||
"github.com/grafana/alerting/notify/nfstatus"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/lokiclient"
|
||||
"github.com/prometheus/alertmanager/featurecontrol"
|
||||
"github.com/prometheus/alertmanager/matchers/compat"
|
||||
"golang.org/x/sync/errgroup"
|
||||
@@ -284,6 +286,18 @@ func (ng *AlertNG) init() error {
|
||||
overrides = append(overrides, override)
|
||||
}
|
||||
|
||||
notificationHistorian, err := configureNotificationHistorian(
|
||||
initCtx,
|
||||
ng.FeatureToggles,
|
||||
ng.Cfg.UnifiedAlerting.NotificationHistory,
|
||||
ng.Metrics.GetNotificationHistorianMetrics(),
|
||||
ng.Log,
|
||||
ng.tracer,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
decryptFn := ng.SecretsService.GetDecryptedValue
|
||||
multiOrgMetrics := ng.Metrics.GetMultiOrgAlertmanagerMetrics()
|
||||
moa, err := notifier.NewMultiOrgAlertmanager(
|
||||
@@ -299,6 +313,7 @@ func (ng *AlertNG) init() error {
|
||||
moaLogger,
|
||||
ng.SecretsService,
|
||||
ng.FeatureToggles,
|
||||
notificationHistorian,
|
||||
overrides...,
|
||||
)
|
||||
if err != nil {
|
||||
@@ -678,11 +693,11 @@ func configureHistorianBackend(
|
||||
return historian.NewAnnotationBackend(annotationBackendLogger, store, rs, met, ac), nil
|
||||
}
|
||||
if backend == historian.BackendTypeLoki {
|
||||
lcfg, err := historian.NewLokiConfig(cfg)
|
||||
lcfg, err := lokiclient.NewLokiConfig(cfg.LokiSettings)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid remote loki configuration: %w", err)
|
||||
}
|
||||
req := historian.NewRequester()
|
||||
req := lokiclient.NewRequester()
|
||||
logCtx := log.WithContextualAttributes(ctx, []any{"backend", "loki"})
|
||||
lokiBackendLogger := log.New("ngalert.state.historian").FromContext(logCtx)
|
||||
backend := historian.NewRemoteLokiBackend(lokiBackendLogger, lcfg, req, met, tracer, rs, ac)
|
||||
@@ -717,6 +732,36 @@ func configureHistorianBackend(
|
||||
return nil, fmt.Errorf("unrecognized state history backend: %s", backend)
|
||||
}
|
||||
|
||||
func configureNotificationHistorian(
|
||||
ctx context.Context,
|
||||
featureToggles featuremgmt.FeatureToggles,
|
||||
cfg setting.UnifiedAlertingNotificationHistorySettings,
|
||||
met *metrics.NotificationHistorian,
|
||||
l log.Logger,
|
||||
tracer tracing.Tracer,
|
||||
) (nfstatus.NotificationHistorian, error) {
|
||||
if !featureToggles.IsEnabled(ctx, featuremgmt.FlagAlertingNotificationHistory) || !cfg.Enabled {
|
||||
met.Info.Set(0)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
met.Info.Set(1)
|
||||
lcfg, err := lokiclient.NewLokiConfig(cfg.LokiSettings)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid remote loki configuration: %w", err)
|
||||
}
|
||||
req := lokiclient.NewRequester()
|
||||
logger := log.New("ngalert.notifier.historian").FromContext(ctx)
|
||||
notificationHistorian := notifier.NewNotificationHistorian(logger, lcfg, req, met, tracer)
|
||||
|
||||
testConnCtx, cancelFunc := context.WithTimeout(ctx, 10*time.Second)
|
||||
defer cancelFunc()
|
||||
if err := notificationHistorian.TestConnection(testConnCtx); err != nil {
|
||||
l.Error("Failed to communicate with configured remote Loki backend, notification history may not be persisted", "error", err)
|
||||
}
|
||||
return notificationHistorian, nil
|
||||
}
|
||||
|
||||
func createRemoteAlertmanager(ctx context.Context, cfg remote.AlertmanagerConfig, kvstore kvstore.KVStore, crypto remote.Crypto, autogenFn remote.AutogenFn, m *metrics.RemoteAlertmanager, tracer tracing.Tracer) (*remote.Alertmanager, error) {
|
||||
return remote.NewAlertmanager(ctx, cfg, notifier.NewFileStore(cfg.OrgID, kvstore), crypto, autogenFn, m, tracer)
|
||||
}
|
||||
|
||||
@@ -137,9 +137,11 @@ func TestConfigureHistorianBackend(t *testing.T) {
|
||||
cfg := setting.UnifiedAlertingStateHistorySettings{
|
||||
Enabled: true,
|
||||
Backend: "loki",
|
||||
// Should never resolve at the DNS level: https://www.rfc-editor.org/rfc/rfc6761#section-6.4
|
||||
LokiReadURL: "http://gone.invalid",
|
||||
LokiWriteURL: "http://gone.invalid",
|
||||
LokiSettings: setting.UnifiedAlertingLokiSettings{
|
||||
// Should never resolve at the DNS level: https://www.rfc-editor.org/rfc/rfc6761#section-6.4
|
||||
LokiReadURL: "http://gone.invalid",
|
||||
LokiWriteURL: "http://gone.invalid",
|
||||
},
|
||||
}
|
||||
ac := &acfakes.FakeRuleService{}
|
||||
|
||||
@@ -232,6 +234,75 @@ grafana_alerting_state_history_info{backend="noop"} 0
|
||||
})
|
||||
}
|
||||
|
||||
func TestConfigureNotificationHistorian(t *testing.T) {
|
||||
t.Run("do not fail initialization if pinging Loki fails", func(t *testing.T) {
|
||||
reg := prometheus.NewRegistry()
|
||||
met := metrics.NewNotificationHistorianMetrics(reg)
|
||||
logger := log.NewNopLogger()
|
||||
tracer := tracing.InitializeTracerForTest()
|
||||
ft := featuremgmt.WithFeatures(featuremgmt.FlagAlertingNotificationHistory)
|
||||
cfg := setting.UnifiedAlertingNotificationHistorySettings{
|
||||
Enabled: true,
|
||||
LokiSettings: setting.UnifiedAlertingLokiSettings{
|
||||
// Should never resolve at the DNS level: https://www.rfc-editor.org/rfc/rfc6761#section-6.4
|
||||
LokiRemoteURL: "http://gone.invalid",
|
||||
},
|
||||
}
|
||||
|
||||
h, err := configureNotificationHistorian(context.Background(), ft, cfg, met, logger, tracer)
|
||||
require.NotNil(t, h)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify that the metric value is set to 1, indicating that notification history is enabled.
|
||||
exp := bytes.NewBufferString(`
|
||||
# HELP grafana_alerting_notification_history_info Information about the notification history store.
|
||||
# TYPE grafana_alerting_notification_history_info gauge
|
||||
grafana_alerting_notification_history_info 1
|
||||
`)
|
||||
err = testutil.GatherAndCompare(reg, exp, "grafana_alerting_notification_history_info")
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("emit special zero metric if notification history disabled", func(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
ft featuremgmt.FeatureToggles
|
||||
cfg setting.UnifiedAlertingNotificationHistorySettings
|
||||
}{
|
||||
{
|
||||
"disabled via config",
|
||||
featuremgmt.WithFeatures(featuremgmt.FlagAlertingNotificationHistory),
|
||||
setting.UnifiedAlertingNotificationHistorySettings{Enabled: false},
|
||||
},
|
||||
{
|
||||
"disabled via feature toggle",
|
||||
featuremgmt.WithFeatures(),
|
||||
setting.UnifiedAlertingNotificationHistorySettings{Enabled: true},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
reg := prometheus.NewRegistry()
|
||||
met := metrics.NewNotificationHistorianMetrics(reg)
|
||||
logger := log.NewNopLogger()
|
||||
tracer := tracing.InitializeTracerForTest()
|
||||
h, err := configureNotificationHistorian(context.Background(), tc.ft, tc.cfg, met, logger, tracer)
|
||||
require.Nil(t, h)
|
||||
require.NoError(t, err)
|
||||
|
||||
exp := bytes.NewBufferString(`
|
||||
# HELP grafana_alerting_notification_history_info Information about the notification history store.
|
||||
# TYPE grafana_alerting_notification_history_info gauge
|
||||
grafana_alerting_notification_history_info 0
|
||||
`)
|
||||
err = testutil.GatherAndCompare(reg, exp, "grafana_alerting_notification_history_info")
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
type mockDB struct {
|
||||
db.DB
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"time"
|
||||
|
||||
alertingNotify "github.com/grafana/alerting/notify"
|
||||
"github.com/grafana/alerting/notify/nfstatus"
|
||||
"github.com/prometheus/alertmanager/config"
|
||||
|
||||
amv2 "github.com/prometheus/alertmanager/api/v2/models"
|
||||
@@ -87,7 +88,7 @@ func (m maintenanceOptions) MaintenanceFunc(state alertingNotify.State) (int64,
|
||||
|
||||
func NewAlertmanager(ctx context.Context, orgID int64, cfg *setting.Cfg, store AlertingStore, stateStore stateStore,
|
||||
peer alertingNotify.ClusterPeer, decryptFn alertingNotify.GetDecryptedValueFn, ns notifications.Service,
|
||||
m *metrics.Alertmanager, featureToggles featuremgmt.FeatureToggles, crypto Crypto,
|
||||
m *metrics.Alertmanager, featureToggles featuremgmt.FeatureToggles, crypto Crypto, notificationHistorian nfstatus.NotificationHistorian,
|
||||
) (*alertmanager, error) {
|
||||
nflog, err := stateStore.GetNotificationLog(ctx)
|
||||
if err != nil {
|
||||
@@ -129,15 +130,16 @@ func NewAlertmanager(ctx context.Context, orgID int64, cfg *setting.Cfg, store A
|
||||
MaxSilences: cfg.UnifiedAlerting.AlertmanagerMaxSilencesCount,
|
||||
MaxSilenceSizeBytes: cfg.UnifiedAlerting.AlertmanagerMaxSilenceSizeBytes,
|
||||
},
|
||||
EmailSender: &emailSender{ns},
|
||||
ImageProvider: newImageProvider(store, l.New("component", "image-provider")),
|
||||
Decrypter: decryptFn,
|
||||
Version: setting.BuildVersion,
|
||||
TenantKey: "orgID",
|
||||
TenantID: orgID,
|
||||
Peer: peer,
|
||||
Logger: l,
|
||||
Metrics: alertingNotify.NewGrafanaAlertmanagerMetrics(m.Registerer, l),
|
||||
EmailSender: &emailSender{ns},
|
||||
ImageProvider: newImageProvider(store, l.New("component", "image-provider")),
|
||||
Decrypter: decryptFn,
|
||||
Version: setting.BuildVersion,
|
||||
TenantKey: "orgID",
|
||||
TenantID: orgID,
|
||||
Peer: peer,
|
||||
Logger: l,
|
||||
Metrics: alertingNotify.NewGrafanaAlertmanagerMetrics(m.Registerer, l),
|
||||
NotificationHistorian: notificationHistorian,
|
||||
}
|
||||
|
||||
gam, err := alertingNotify.NewGrafanaAlertmanager(opts)
|
||||
|
||||
@@ -58,7 +58,7 @@ func setupAMTest(t *testing.T) *alertmanager {
|
||||
stateStore := NewFileStore(int64(orgID), kvStore)
|
||||
crypto := NewCrypto(secretsService, s, l)
|
||||
|
||||
am, err := NewAlertmanager(context.Background(), 1, cfg, s, stateStore, &NilPeer{}, decryptFn, nil, m, featuremgmt.WithFeatures(), crypto)
|
||||
am, err := NewAlertmanager(context.Background(), 1, cfg, s, stateStore, &NilPeer{}, decryptFn, nil, m, featuremgmt.WithFeatures(), crypto, nil)
|
||||
require.NoError(t, err)
|
||||
return am
|
||||
}
|
||||
|
||||
@@ -0,0 +1,190 @@
|
||||
package notifier
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
alertingModels "github.com/grafana/alerting/models"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/client"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/lokiclient"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
||||
"github.com/prometheus/alertmanager/notify"
|
||||
"github.com/prometheus/alertmanager/types"
|
||||
prometheusModel "github.com/prometheus/common/model"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
const LokiClientSpanName = "ngalert.notification-historian.client"
|
||||
const NotificationHistoryWriteTimeout = time.Minute
|
||||
const NotificationHistoryKey = "from"
|
||||
const NotificationHistoryLabelValue = "notify-history"
|
||||
|
||||
type NotificationHistoryLokiEntry struct {
|
||||
SchemaVersion int `json:"schemaVersion"`
|
||||
Receiver string `json:"receiver"`
|
||||
Status string `json:"status"`
|
||||
GroupLabels map[string]string `json:"groupLabels"`
|
||||
Alerts []NotificationHistoryLokiEntryAlert `json:"alerts"`
|
||||
Retry bool `json:"retry"`
|
||||
Error string `json:"error,omitempty"`
|
||||
Duration int64 `json:"duration"`
|
||||
}
|
||||
|
||||
type NotificationHistoryLokiEntryAlert struct {
|
||||
Status string `json:"status"`
|
||||
Labels map[string]string `json:"labels"`
|
||||
Annotations map[string]string `json:"annotations"`
|
||||
StartsAt time.Time `json:"startsAt"`
|
||||
EndsAt time.Time `json:"endsAt"`
|
||||
RuleUID string `json:"ruleUID"`
|
||||
}
|
||||
|
||||
type remoteLokiClient interface {
|
||||
Ping(context.Context) error
|
||||
Push(context.Context, []lokiclient.Stream) error
|
||||
}
|
||||
|
||||
type NotificationHistorian struct {
|
||||
client remoteLokiClient
|
||||
externalLabels map[string]string
|
||||
metrics *metrics.NotificationHistorian
|
||||
log log.Logger
|
||||
}
|
||||
|
||||
func NewNotificationHistorian(logger log.Logger, cfg lokiclient.LokiConfig, req client.Requester, metrics *metrics.NotificationHistorian, tracer tracing.Tracer) *NotificationHistorian {
|
||||
return &NotificationHistorian{
|
||||
client: lokiclient.NewLokiClient(cfg, req, metrics.BytesWritten, metrics.WriteDuration, logger, tracer, LokiClientSpanName),
|
||||
externalLabels: cfg.ExternalLabels,
|
||||
metrics: metrics,
|
||||
log: logger,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *NotificationHistorian) TestConnection(ctx context.Context) error {
|
||||
return h.client.Ping(ctx)
|
||||
}
|
||||
|
||||
func (h *NotificationHistorian) Record(ctx context.Context, alerts []*types.Alert, retry bool, notificationErr error, duration time.Duration) <-chan error {
|
||||
stream, err := h.prepareStream(ctx, alerts, retry, notificationErr, duration)
|
||||
logger := h.log.FromContext(ctx)
|
||||
errCh := make(chan error, 1)
|
||||
if err != nil {
|
||||
logger.Error("Failed to convert notification history to stream", "error", err)
|
||||
errCh <- fmt.Errorf("failed to convert notification history to stream: %w", err)
|
||||
close(errCh)
|
||||
return errCh
|
||||
}
|
||||
|
||||
// This is a new background job, so let's create a new context for it.
|
||||
// We want it to be isolated, i.e. we don't want grafana shutdowns to interrupt this work
|
||||
// immediately but rather try to flush writes.
|
||||
// This also prevents timeouts or other lingering objects (like transactions) from being
|
||||
// incorrectly propagated here from other areas.
|
||||
writeCtx := context.Background()
|
||||
writeCtx, cancel := context.WithTimeout(writeCtx, NotificationHistoryWriteTimeout)
|
||||
writeCtx = trace.ContextWithSpan(writeCtx, trace.SpanFromContext(ctx))
|
||||
|
||||
go func(ctx context.Context) {
|
||||
defer cancel()
|
||||
defer close(errCh)
|
||||
logger := h.log.FromContext(ctx)
|
||||
logger.Debug("Saving notification history")
|
||||
h.metrics.WritesTotal.Inc()
|
||||
|
||||
if err := h.recordStream(ctx, stream, logger); err != nil {
|
||||
logger.Error("Failed to save notification history", "error", err)
|
||||
h.metrics.WritesFailed.Inc()
|
||||
errCh <- fmt.Errorf("failed to save notification history: %w", err)
|
||||
}
|
||||
}(writeCtx)
|
||||
return errCh
|
||||
}
|
||||
|
||||
func (h *NotificationHistorian) prepareStream(ctx context.Context, alerts []*types.Alert, retry bool, notificationErr error, duration time.Duration) (lokiclient.Stream, error) {
|
||||
receiverName, ok := notify.ReceiverName(ctx)
|
||||
if !ok {
|
||||
return lokiclient.Stream{}, fmt.Errorf("receiver name not found in context")
|
||||
}
|
||||
groupLabels, ok := notify.GroupLabels(ctx)
|
||||
if !ok {
|
||||
return lokiclient.Stream{}, fmt.Errorf("group labels not found in context")
|
||||
}
|
||||
now, ok := notify.Now(ctx)
|
||||
if !ok {
|
||||
return lokiclient.Stream{}, fmt.Errorf("now not found in context")
|
||||
}
|
||||
|
||||
entryAlerts := make([]NotificationHistoryLokiEntryAlert, len(alerts))
|
||||
for i, alert := range alerts {
|
||||
labels := prepareLabels(alert.Labels)
|
||||
annotations := prepareLabels(alert.Annotations)
|
||||
entryAlerts[i] = NotificationHistoryLokiEntryAlert{
|
||||
Labels: labels,
|
||||
Annotations: annotations,
|
||||
Status: string(alert.StatusAt(now)),
|
||||
StartsAt: alert.StartsAt,
|
||||
EndsAt: alert.EndsAt,
|
||||
RuleUID: string(alert.Labels[alertingModels.RuleUIDLabel]),
|
||||
}
|
||||
}
|
||||
|
||||
notificationErrStr := ""
|
||||
if notificationErr != nil {
|
||||
notificationErrStr = notificationErr.Error()
|
||||
}
|
||||
|
||||
entry := NotificationHistoryLokiEntry{
|
||||
SchemaVersion: 1,
|
||||
Receiver: receiverName,
|
||||
Status: string(types.Alerts(alerts...).StatusAt(now)),
|
||||
GroupLabels: prepareLabels(groupLabels),
|
||||
Alerts: entryAlerts,
|
||||
Retry: retry,
|
||||
Error: notificationErrStr,
|
||||
Duration: duration.Milliseconds(),
|
||||
}
|
||||
|
||||
entryJSON, err := json.Marshal(entry)
|
||||
if err != nil {
|
||||
return lokiclient.Stream{}, err
|
||||
}
|
||||
|
||||
streamLabels := make(map[string]string)
|
||||
streamLabels[NotificationHistoryKey] = NotificationHistoryLabelValue
|
||||
for k, v := range h.externalLabels {
|
||||
streamLabels[k] = v
|
||||
}
|
||||
|
||||
return lokiclient.Stream{
|
||||
Stream: streamLabels,
|
||||
Values: []lokiclient.Sample{
|
||||
{
|
||||
T: now,
|
||||
V: string(entryJSON),
|
||||
}},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (h *NotificationHistorian) recordStream(ctx context.Context, stream lokiclient.Stream, logger log.Logger) error {
|
||||
if err := h.client.Push(ctx, []lokiclient.Stream{stream}); err != nil {
|
||||
return err
|
||||
}
|
||||
logger.Debug("Done saving notification history")
|
||||
return nil
|
||||
}
|
||||
|
||||
func prepareLabels(labels prometheusModel.LabelSet) map[string]string {
|
||||
result := make(map[string]string)
|
||||
for k, v := range labels {
|
||||
// Remove private labels
|
||||
if !strings.HasPrefix(string(k), "__") && !strings.HasSuffix(string(k), "__") {
|
||||
result[string(k)] = string(v)
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
@@ -0,0 +1,126 @@
|
||||
package notifier
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"net/url"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
alertingModels "github.com/grafana/alerting/models"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/client"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/lokiclient"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
||||
"github.com/prometheus/alertmanager/notify"
|
||||
"github.com/prometheus/alertmanager/types"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/testutil"
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var testNow = time.Date(2025, time.July, 15, 16, 55, 0, 0, time.UTC)
|
||||
var testAlerts = []*types.Alert{
|
||||
{
|
||||
Alert: model.Alert{
|
||||
Labels: model.LabelSet{"alertname": "Alert1", alertingModels.RuleUIDLabel: "testRuleUID"},
|
||||
Annotations: model.LabelSet{"foo": "bar", "__private__": "baz"},
|
||||
StartsAt: testNow,
|
||||
EndsAt: testNow,
|
||||
GeneratorURL: "http://localhost/test",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
func TestRecord(t *testing.T) {
|
||||
t.Run("write notification history to Loki", func(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
retry bool
|
||||
notificationErr error
|
||||
expected string
|
||||
}{
|
||||
{
|
||||
"successful notification",
|
||||
false,
|
||||
nil,
|
||||
"{\"streams\":[{\"stream\":{\"externalLabelKey\":\"externalLabelValue\",\"from\":\"notify-history\"},\"values\":[[\"1752598500000000000\",\"{\\\"schemaVersion\\\":1,\\\"receiver\\\":\\\"testReceiverName\\\",\\\"status\\\":\\\"resolved\\\",\\\"groupLabels\\\":{\\\"foo\\\":\\\"bar\\\"},\\\"alerts\\\":[{\\\"status\\\":\\\"resolved\\\",\\\"labels\\\":{\\\"alertname\\\":\\\"Alert1\\\"},\\\"annotations\\\":{\\\"foo\\\":\\\"bar\\\"},\\\"startsAt\\\":\\\"2025-07-15T16:55:00Z\\\",\\\"endsAt\\\":\\\"2025-07-15T16:55:00Z\\\",\\\"ruleUID\\\":\\\"testRuleUID\\\"}],\\\"retry\\\":false,\\\"duration\\\":1000}\"]]}]}",
|
||||
},
|
||||
{
|
||||
"failed notification",
|
||||
true,
|
||||
errors.New("test notification error"),
|
||||
"{\"streams\":[{\"stream\":{\"externalLabelKey\":\"externalLabelValue\",\"from\":\"notify-history\"},\"values\":[[\"1752598500000000000\",\"{\\\"schemaVersion\\\":1,\\\"receiver\\\":\\\"testReceiverName\\\",\\\"status\\\":\\\"resolved\\\",\\\"groupLabels\\\":{\\\"foo\\\":\\\"bar\\\"},\\\"alerts\\\":[{\\\"status\\\":\\\"resolved\\\",\\\"labels\\\":{\\\"alertname\\\":\\\"Alert1\\\"},\\\"annotations\\\":{\\\"foo\\\":\\\"bar\\\"},\\\"startsAt\\\":\\\"2025-07-15T16:55:00Z\\\",\\\"endsAt\\\":\\\"2025-07-15T16:55:00Z\\\",\\\"ruleUID\\\":\\\"testRuleUID\\\"}],\\\"retry\\\":true,\\\"error\\\":\\\"test notification error\\\",\\\"duration\\\":1000}\"]]}]}",
|
||||
},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
req := lokiclient.NewFakeRequester()
|
||||
met := metrics.NewNotificationHistorianMetrics(prometheus.NewRegistry())
|
||||
h := createTestNotificationHistorian(req, met)
|
||||
|
||||
err := <-h.Record(recordCtx(), testAlerts, tc.retry, tc.notificationErr, time.Second)
|
||||
require.NoError(t, err)
|
||||
|
||||
reqBody, err := io.ReadAll(req.LastRequest.Body)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.expected, string(reqBody))
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("emits expected write metrics", func(t *testing.T) {
|
||||
reg := prometheus.NewRegistry()
|
||||
met := metrics.NewNotificationHistorianMetrics(reg)
|
||||
goodHistorian := createTestNotificationHistorian(lokiclient.NewFakeRequester(), met)
|
||||
badHistorian := createTestNotificationHistorian(lokiclient.NewFakeRequester().WithResponse(lokiclient.BadResponse()), met)
|
||||
|
||||
<-goodHistorian.Record(recordCtx(), testAlerts, false, nil, time.Second)
|
||||
<-badHistorian.Record(recordCtx(), testAlerts, false, nil, time.Second)
|
||||
|
||||
exp := bytes.NewBufferString(`
|
||||
# HELP grafana_alerting_notification_history_writes_failed_total The total number of failed writes of notification history batches.
|
||||
# TYPE grafana_alerting_notification_history_writes_failed_total counter
|
||||
grafana_alerting_notification_history_writes_failed_total 1
|
||||
# HELP grafana_alerting_notification_history_writes_total The total number of notification history batches that were attempted to be written.
|
||||
# TYPE grafana_alerting_notification_history_writes_total counter
|
||||
grafana_alerting_notification_history_writes_total 2
|
||||
`)
|
||||
err := testutil.GatherAndCompare(reg, exp,
|
||||
"grafana_alerting_notification_history_writes_total",
|
||||
"grafana_alerting_notification_history_writes_failed_total",
|
||||
)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("returns error when context is missing required fields", func(t *testing.T) {
|
||||
req := lokiclient.NewFakeRequester()
|
||||
met := metrics.NewNotificationHistorianMetrics(prometheus.NewRegistry())
|
||||
h := createTestNotificationHistorian(req, met)
|
||||
|
||||
err := <-h.Record(context.Background(), testAlerts, false, nil, time.Second)
|
||||
require.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func createTestNotificationHistorian(req client.Requester, met *metrics.NotificationHistorian) *NotificationHistorian {
|
||||
writePathURL, _ := url.Parse("http://some.url")
|
||||
cfg := lokiclient.LokiConfig{
|
||||
WritePathURL: writePathURL,
|
||||
ExternalLabels: map[string]string{"externalLabelKey": "externalLabelValue"},
|
||||
Encoder: lokiclient.JsonEncoder{},
|
||||
}
|
||||
tracer := tracing.InitializeTracerForTest()
|
||||
return NewNotificationHistorian(log.NewNopLogger(), cfg, req, met, tracer)
|
||||
}
|
||||
|
||||
func recordCtx() context.Context {
|
||||
ctx := notify.WithReceiverName(context.Background(), "testReceiverName")
|
||||
ctx = notify.WithGroupLabels(ctx, model.LabelSet{"foo": "bar"})
|
||||
ctx = notify.WithNow(ctx, testNow)
|
||||
return ctx
|
||||
}
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/alerting/notify/nfstatus"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
alertingCluster "github.com/grafana/alerting/cluster"
|
||||
@@ -134,6 +135,7 @@ func NewMultiOrgAlertmanager(
|
||||
l log.Logger,
|
||||
s secrets.Service,
|
||||
featureManager featuremgmt.FeatureToggles,
|
||||
notificationHistorian nfstatus.NotificationHistorian,
|
||||
opts ...Option,
|
||||
) (*MultiOrgAlertmanager, error) {
|
||||
moa := &MultiOrgAlertmanager{
|
||||
@@ -166,7 +168,7 @@ func NewMultiOrgAlertmanager(
|
||||
moa.factory = func(ctx context.Context, orgID int64) (Alertmanager, error) {
|
||||
m := metrics.NewAlertmanagerMetrics(moa.metrics.GetOrCreateOrgRegistry(orgID), l)
|
||||
stateStore := NewFileStore(orgID, kvStore)
|
||||
return NewAlertmanager(ctx, orgID, moa.settings, moa.configStore, stateStore, moa.peer, moa.decryptFn, moa.ns, m, featureManager, moa.Crypto)
|
||||
return NewAlertmanager(ctx, orgID, moa.settings, moa.configStore, stateStore, moa.peer, moa.decryptFn, moa.ns, m, featureManager, moa.Crypto, notificationHistorian)
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
|
||||
@@ -102,6 +102,7 @@ func TestMultiorgAlertmanager_RemoteSecondaryMode(t *testing.T) {
|
||||
nopLogger,
|
||||
secretsService,
|
||||
featuremgmt.WithFeatures(),
|
||||
nil,
|
||||
override,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -396,6 +396,7 @@ func setupMam(t *testing.T, cfg *setting.Cfg) *MultiOrgAlertmanager {
|
||||
log.New("testlogger"),
|
||||
secretsService,
|
||||
featuremgmt.WithFeatures(),
|
||||
nil,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
return mam
|
||||
|
||||
@@ -504,6 +504,7 @@ func createMultiOrgAlertmanager(t *testing.T, orgs []int64) *notifier.MultiOrgAl
|
||||
log.New("testlogger"),
|
||||
secretsService,
|
||||
featuremgmt.WithFeatures(),
|
||||
nil,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, moa.LoadAndSyncAlertmanagersForOrgs(context.Background()))
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
|
||||
"github.com/benbjohnson/clock"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/lokiclient"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/errutil"
|
||||
@@ -42,6 +43,7 @@ const (
|
||||
const (
|
||||
StateHistoryLabelKey = "from"
|
||||
StateHistoryLabelValue = "state-history"
|
||||
LokiClientSpanName = "ngalert.historian.client"
|
||||
)
|
||||
|
||||
const defaultQueryRange = 6 * time.Hour
|
||||
@@ -67,8 +69,8 @@ func NewErrLokiQueryTooLong(query string, maxLimit int) error {
|
||||
|
||||
type remoteLokiClient interface {
|
||||
Ping(context.Context) error
|
||||
Push(context.Context, []Stream) error
|
||||
RangeQuery(ctx context.Context, logQL string, start, end, limit int64) (QueryRes, error)
|
||||
Push(context.Context, []lokiclient.Stream) error
|
||||
RangeQuery(ctx context.Context, logQL string, start, end, limit int64) (lokiclient.QueryRes, error)
|
||||
MaxQuerySize() int
|
||||
}
|
||||
|
||||
@@ -83,9 +85,9 @@ type RemoteLokiBackend struct {
|
||||
ruleStore RuleStore
|
||||
}
|
||||
|
||||
func NewRemoteLokiBackend(logger log.Logger, cfg LokiConfig, req client.Requester, metrics *metrics.Historian, tracer tracing.Tracer, ruleStore RuleStore, ac AccessControl) *RemoteLokiBackend {
|
||||
func NewRemoteLokiBackend(logger log.Logger, cfg lokiclient.LokiConfig, req client.Requester, metrics *metrics.Historian, tracer tracing.Tracer, ruleStore RuleStore, ac AccessControl) *RemoteLokiBackend {
|
||||
return &RemoteLokiBackend{
|
||||
client: NewLokiClient(cfg, req, metrics, logger, tracer),
|
||||
client: lokiclient.NewLokiClient(cfg, req, metrics.BytesWritten, metrics.WriteDuration, logger, tracer, LokiClientSpanName),
|
||||
externalLabels: cfg.ExternalLabels,
|
||||
clock: clock.New(),
|
||||
metrics: metrics,
|
||||
@@ -161,7 +163,7 @@ func (h *RemoteLokiBackend) Query(ctx context.Context, query models.HistoryQuery
|
||||
if query.From.IsZero() {
|
||||
query.From = now.Add(-defaultQueryRange)
|
||||
}
|
||||
var res []Stream
|
||||
var res []lokiclient.Stream
|
||||
for _, logQL := range queries {
|
||||
// Timestamps are expected in RFC3339Nano.
|
||||
// Apply user-defined limit to every request. Multiple batches is a very rare case, and therefore we can tolerate getting more data than needed.
|
||||
@@ -176,7 +178,7 @@ func (h *RemoteLokiBackend) Query(ctx context.Context, query models.HistoryQuery
|
||||
}
|
||||
|
||||
// merge will put all the results in one array sorted by timestamp.
|
||||
func merge(res []Stream, folderUIDToFilter []string) (*data.Frame, error) {
|
||||
func merge(res []lokiclient.Stream, folderUIDToFilter []string) (*data.Frame, error) {
|
||||
filterByFolderUIDMap := make(map[string]struct{}, len(folderUIDToFilter))
|
||||
for _, uid := range folderUIDToFilter {
|
||||
filterByFolderUIDMap[uid] = struct{}{}
|
||||
@@ -207,7 +209,7 @@ func merge(res []Stream, folderUIDToFilter []string) (*data.Frame, error) {
|
||||
pointers := make([]int, len(res))
|
||||
for {
|
||||
minTime := int64(math.MaxInt64)
|
||||
minEl := Sample{}
|
||||
minEl := lokiclient.Sample{}
|
||||
minElStreamIdx := -1
|
||||
// Find the element with the earliest time among all arrays.
|
||||
for i, stream := range res {
|
||||
@@ -269,7 +271,7 @@ func merge(res []Stream, folderUIDToFilter []string) (*data.Frame, error) {
|
||||
return frame, nil
|
||||
}
|
||||
|
||||
func StatesToStream(rule history_model.RuleMeta, states []state.StateTransition, externalLabels map[string]string, logger log.Logger) Stream {
|
||||
func StatesToStream(rule history_model.RuleMeta, states []state.StateTransition, externalLabels map[string]string, logger log.Logger) lokiclient.Stream {
|
||||
labels := mergeLabels(make(map[string]string), externalLabels)
|
||||
// System-defined labels take precedence over user-defined external labels.
|
||||
labels[StateHistoryLabelKey] = StateHistoryLabelValue
|
||||
@@ -277,7 +279,7 @@ func StatesToStream(rule history_model.RuleMeta, states []state.StateTransition,
|
||||
labels[GroupLabel] = fmt.Sprint(rule.Group)
|
||||
labels[FolderUIDLabel] = fmt.Sprint(rule.NamespaceUID)
|
||||
|
||||
samples := make([]Sample, 0, len(states))
|
||||
samples := make([]lokiclient.Sample, 0, len(states))
|
||||
for _, state := range states {
|
||||
if !shouldRecord(state) {
|
||||
continue
|
||||
@@ -309,20 +311,20 @@ func StatesToStream(rule history_model.RuleMeta, states []state.StateTransition,
|
||||
}
|
||||
line := string(jsn)
|
||||
|
||||
samples = append(samples, Sample{
|
||||
samples = append(samples, lokiclient.Sample{
|
||||
T: state.LastEvaluationTime,
|
||||
V: line,
|
||||
})
|
||||
}
|
||||
|
||||
return Stream{
|
||||
return lokiclient.Stream{
|
||||
Stream: labels,
|
||||
Values: samples,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *RemoteLokiBackend) recordStreams(ctx context.Context, stream Stream, logger log.Logger) error {
|
||||
if err := h.client.Push(ctx, []Stream{stream}); err != nil {
|
||||
func (h *RemoteLokiBackend) recordStreams(ctx context.Context, stream lokiclient.Stream, logger log.Logger) error {
|
||||
if err := h.client.Push(ctx, []lokiclient.Stream{stream}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/lokiclient"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/testutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
@@ -348,15 +349,15 @@ func TestBuildLogQuery(t *testing.T) {
|
||||
func TestMerge(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
res QueryRes
|
||||
res lokiclient.QueryRes
|
||||
expected *data.Frame
|
||||
folderUIDs []string
|
||||
}{
|
||||
{
|
||||
name: "Should return values from multiple streams in right order",
|
||||
res: QueryRes{
|
||||
Data: QueryData{
|
||||
Result: []Stream{
|
||||
res: lokiclient.QueryRes{
|
||||
Data: lokiclient.QueryData{
|
||||
Result: []lokiclient.Stream{
|
||||
{
|
||||
Stream: map[string]string{
|
||||
"from": "state-history",
|
||||
@@ -365,8 +366,8 @@ func TestMerge(t *testing.T) {
|
||||
"folderUID": "test-folder-1",
|
||||
"extra": "label",
|
||||
},
|
||||
Values: []Sample{
|
||||
{time.Unix(1, 0), `{"schemaVersion": 1, "previous": "normal", "current": "pending", "values":{"a": 1.5}, "ruleUID": "test-rule-1"}`},
|
||||
Values: []lokiclient.Sample{
|
||||
{T: time.Unix(1, 0), V: `{"schemaVersion": 1, "previous": "normal", "current": "pending", "values":{"a": 1.5}, "ruleUID": "test-rule-1"}`},
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -376,8 +377,8 @@ func TestMerge(t *testing.T) {
|
||||
"group": "test-group-2",
|
||||
"folderUID": "test-folder-1",
|
||||
},
|
||||
Values: []Sample{
|
||||
{time.Unix(2, 0), `{"schemaVersion": 1, "previous": "pending", "current": "firing", "values":{"a": 2.5}, "ruleUID": "test-rule-2"}`},
|
||||
Values: []lokiclient.Sample{
|
||||
{T: time.Unix(2, 0), V: `{"schemaVersion": 1, "previous": "pending", "current": "firing", "values":{"a": 2.5}, "ruleUID": "test-rule-2"}`},
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -411,14 +412,14 @@ func TestMerge(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "Should handle empty values",
|
||||
res: QueryRes{
|
||||
Data: QueryData{
|
||||
Result: []Stream{
|
||||
res: lokiclient.QueryRes{
|
||||
Data: lokiclient.QueryData{
|
||||
Result: []lokiclient.Stream{
|
||||
{
|
||||
Stream: map[string]string{
|
||||
"extra": "labels",
|
||||
},
|
||||
Values: []Sample{},
|
||||
Values: []lokiclient.Sample{},
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -431,9 +432,9 @@ func TestMerge(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "Should handle multiple values in one stream",
|
||||
res: QueryRes{
|
||||
Data: QueryData{
|
||||
Result: []Stream{
|
||||
res: lokiclient.QueryRes{
|
||||
Data: lokiclient.QueryData{
|
||||
Result: []lokiclient.Stream{
|
||||
{
|
||||
Stream: map[string]string{
|
||||
"from": "state-history",
|
||||
@@ -441,9 +442,9 @@ func TestMerge(t *testing.T) {
|
||||
"group": "test-group-1",
|
||||
"folderUID": "test-folder-1",
|
||||
},
|
||||
Values: []Sample{
|
||||
{time.Unix(1, 0), `{"schemaVersion": 1, "previous": "normal", "current": "pending", "values":{"a": 1.5}, "ruleUID": "test-rule-1"}`},
|
||||
{time.Unix(5, 0), `{"schemaVersion": 1, "previous": "pending", "current": "normal", "values":{"a": 0.5}, "ruleUID": "test-rule-2"}`},
|
||||
Values: []lokiclient.Sample{
|
||||
{T: time.Unix(1, 0), V: `{"schemaVersion": 1, "previous": "normal", "current": "pending", "values":{"a": 1.5}, "ruleUID": "test-rule-1"}`},
|
||||
{T: time.Unix(5, 0), V: `{"schemaVersion": 1, "previous": "pending", "current": "normal", "values":{"a": 0.5}, "ruleUID": "test-rule-2"}`},
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -453,8 +454,8 @@ func TestMerge(t *testing.T) {
|
||||
"group": "test-group-2",
|
||||
"folderUID": "test-folder-1",
|
||||
},
|
||||
Values: []Sample{
|
||||
{time.Unix(2, 0), `{"schemaVersion": 1, "previous": "pending", "current": "firing", "values":{"a": 2.5}, "ruleUID": "test-rule-3"}`},
|
||||
Values: []lokiclient.Sample{
|
||||
{T: time.Unix(2, 0), V: `{"schemaVersion": 1, "previous": "pending", "current": "firing", "values":{"a": 2.5}, "ruleUID": "test-rule-3"}`},
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -496,9 +497,9 @@ func TestMerge(t *testing.T) {
|
||||
{
|
||||
name: "should filter streams by folder UID",
|
||||
folderUIDs: []string{"test-folder-1"},
|
||||
res: QueryRes{
|
||||
Data: QueryData{
|
||||
Result: []Stream{
|
||||
res: lokiclient.QueryRes{
|
||||
Data: lokiclient.QueryData{
|
||||
Result: []lokiclient.Stream{
|
||||
{
|
||||
Stream: map[string]string{
|
||||
"from": "state-history",
|
||||
@@ -506,9 +507,9 @@ func TestMerge(t *testing.T) {
|
||||
"group": "test-group-1",
|
||||
"folderUID": "test-folder-1",
|
||||
},
|
||||
Values: []Sample{
|
||||
{time.Unix(1, 0), `{"schemaVersion": 1, "previous": "normal", "current": "pending", "values":{"a": 1.5}, "ruleUID": "test-rule-1"}`},
|
||||
{time.Unix(5, 0), `{"schemaVersion": 1, "previous": "pending", "current": "normal", "values":{"a": 0.5}, "ruleUID": "test-rule-2"}`},
|
||||
Values: []lokiclient.Sample{
|
||||
{T: time.Unix(1, 0), V: `{"schemaVersion": 1, "previous": "normal", "current": "pending", "values":{"a": 1.5}, "ruleUID": "test-rule-1"}`},
|
||||
{T: time.Unix(5, 0), V: `{"schemaVersion": 1, "previous": "pending", "current": "normal", "values":{"a": 0.5}, "ruleUID": "test-rule-2"}`},
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -518,8 +519,8 @@ func TestMerge(t *testing.T) {
|
||||
"group": "test-group-2",
|
||||
"folderUID": "test-folder-2",
|
||||
},
|
||||
Values: []Sample{
|
||||
{time.Unix(2, 0), `{"schemaVersion": 1, "previous": "pending", "current": "firing", "values":{"a": 2.5}, "ruleUID": "test-rule-3"}`},
|
||||
Values: []lokiclient.Sample{
|
||||
{T: time.Unix(2, 0), V: `{"schemaVersion": 1, "previous": "pending", "current": "firing", "values":{"a": 2.5}, "ruleUID": "test-rule-3"}`},
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -553,16 +554,16 @@ func TestMerge(t *testing.T) {
|
||||
{
|
||||
name: "should skip streams without folder UID if filter is specified",
|
||||
folderUIDs: []string{"test-folder-1"},
|
||||
res: QueryRes{
|
||||
Data: QueryData{
|
||||
Result: []Stream{
|
||||
res: lokiclient.QueryRes{
|
||||
Data: lokiclient.QueryData{
|
||||
Result: []lokiclient.Stream{
|
||||
{
|
||||
Stream: map[string]string{
|
||||
"group": "test-group-1",
|
||||
},
|
||||
Values: []Sample{
|
||||
{time.Unix(1, 0), `{"schemaVersion": 1, "previous": "normal", "current": "pending", "values":{"a": 1.5}, "ruleUID": "test-rule-1"}`},
|
||||
{time.Unix(5, 0), `{"schemaVersion": 1, "previous": "pending", "current": "normal", "values":{"a": 0.5}, "ruleUID": "test-rule-2"}`},
|
||||
Values: []lokiclient.Sample{
|
||||
{T: time.Unix(1, 0), V: `{"schemaVersion": 1, "previous": "normal", "current": "pending", "values":{"a": 1.5}, "ruleUID": "test-rule-1"}`},
|
||||
{T: time.Unix(5, 0), V: `{"schemaVersion": 1, "previous": "pending", "current": "normal", "values":{"a": 0.5}, "ruleUID": "test-rule-2"}`},
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -577,15 +578,15 @@ func TestMerge(t *testing.T) {
|
||||
{
|
||||
name: "should return streams without folder UID if filter is not specified",
|
||||
folderUIDs: []string{},
|
||||
res: QueryRes{
|
||||
Data: QueryData{
|
||||
Result: []Stream{
|
||||
res: lokiclient.QueryRes{
|
||||
Data: lokiclient.QueryData{
|
||||
Result: []lokiclient.Stream{
|
||||
{
|
||||
Stream: map[string]string{
|
||||
"group": "test-group-1",
|
||||
},
|
||||
Values: []Sample{
|
||||
{time.Unix(1, 0), `{"schemaVersion": 1, "previous": "normal", "current": "pending", "values":{"a": 1.5}, "ruleUID": "test-rule-1"}`},
|
||||
Values: []lokiclient.Sample{
|
||||
{T: time.Unix(1, 0), V: `{"schemaVersion": 1, "previous": "normal", "current": "pending", "values":{"a": 1.5}, "ruleUID": "test-rule-1"}`},
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -624,7 +625,7 @@ func TestMerge(t *testing.T) {
|
||||
|
||||
func TestRecordStates(t *testing.T) {
|
||||
t.Run("writes state transitions to loki", func(t *testing.T) {
|
||||
req := NewFakeRequester()
|
||||
req := lokiclient.NewFakeRequester()
|
||||
loki := createTestLokiBackend(t, req, metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem))
|
||||
rule := createTestRule()
|
||||
states := singleFromNormal(&state.State{
|
||||
@@ -635,14 +636,14 @@ func TestRecordStates(t *testing.T) {
|
||||
err := <-loki.Record(context.Background(), rule, states)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Contains(t, "/loki/api/v1/push", req.lastRequest.URL.Path)
|
||||
require.Contains(t, "/loki/api/v1/push", req.LastRequest.URL.Path)
|
||||
})
|
||||
|
||||
t.Run("emits expected write metrics", func(t *testing.T) {
|
||||
reg := prometheus.NewRegistry()
|
||||
met := metrics.NewHistorianMetrics(reg, metrics.Subsystem)
|
||||
loki := createTestLokiBackend(t, NewFakeRequester(), met)
|
||||
errLoki := createTestLokiBackend(t, NewFakeRequester().WithResponse(badResponse()), met) //nolint:bodyclose
|
||||
loki := createTestLokiBackend(t, lokiclient.NewFakeRequester(), met)
|
||||
errLoki := createTestLokiBackend(t, lokiclient.NewFakeRequester().WithResponse(lokiclient.BadResponse()), met) //nolint:bodyclose
|
||||
rule := createTestRule()
|
||||
states := singleFromNormal(&state.State{
|
||||
State: eval.Alerting,
|
||||
@@ -676,7 +677,7 @@ grafana_alerting_state_history_writes_total{backend="loki",org="1"} 2
|
||||
})
|
||||
|
||||
t.Run("elides request if nothing to send", func(t *testing.T) {
|
||||
req := NewFakeRequester()
|
||||
req := lokiclient.NewFakeRequester()
|
||||
loki := createTestLokiBackend(t, req, metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem))
|
||||
rule := createTestRule()
|
||||
states := []state.StateTransition{}
|
||||
@@ -684,11 +685,11 @@ grafana_alerting_state_history_writes_total{backend="loki",org="1"} 2
|
||||
err := <-loki.Record(context.Background(), rule, states)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, req.lastRequest)
|
||||
require.Nil(t, req.LastRequest)
|
||||
})
|
||||
|
||||
t.Run("succeeds with special chars in labels", func(t *testing.T) {
|
||||
req := NewFakeRequester()
|
||||
req := lokiclient.NewFakeRequester()
|
||||
loki := createTestLokiBackend(t, req, metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem))
|
||||
rule := createTestRule()
|
||||
states := singleFromNormal(&state.State{
|
||||
@@ -703,15 +704,15 @@ grafana_alerting_state_history_writes_total{backend="loki",org="1"} 2
|
||||
err := <-loki.Record(context.Background(), rule, states)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Contains(t, "/loki/api/v1/push", req.lastRequest.URL.Path)
|
||||
sent := string(readBody(t, req.lastRequest))
|
||||
require.Contains(t, "/loki/api/v1/push", req.LastRequest.URL.Path)
|
||||
sent := string(readBody(t, req.LastRequest))
|
||||
require.Contains(t, sent, "contains.dot")
|
||||
require.Contains(t, sent, "contains=equals")
|
||||
require.Contains(t, sent, "contains🤔emoji")
|
||||
})
|
||||
|
||||
t.Run("adds external labels to log lines", func(t *testing.T) {
|
||||
req := NewFakeRequester()
|
||||
req := lokiclient.NewFakeRequester()
|
||||
loki := createTestLokiBackend(t, req, metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem))
|
||||
rule := createTestRule()
|
||||
states := singleFromNormal(&state.State{
|
||||
@@ -721,8 +722,8 @@ grafana_alerting_state_history_writes_total{backend="loki",org="1"} 2
|
||||
err := <-loki.Record(context.Background(), rule, states)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Contains(t, "/loki/api/v1/push", req.lastRequest.URL.Path)
|
||||
sent := string(readBody(t, req.lastRequest))
|
||||
require.Contains(t, "/loki/api/v1/push", req.LastRequest.URL.Path)
|
||||
sent := string(readBody(t, req.LastRequest))
|
||||
require.Contains(t, sent, "externalLabelKey")
|
||||
require.Contains(t, sent, "externalLabelValue")
|
||||
})
|
||||
@@ -739,7 +740,7 @@ func TestGetFolderUIDsForFilter(t *testing.T) {
|
||||
usr := accesscontrol.BackgroundUser("test", 1, org.RoleNone, nil)
|
||||
|
||||
createLoki := func(ac AccessControl) *RemoteLokiBackend {
|
||||
req := NewFakeRequester()
|
||||
req := lokiclient.NewFakeRequester()
|
||||
loki := createTestLokiBackend(t, req, metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem))
|
||||
rules := fakes.NewRuleStore(t)
|
||||
f := make([]*folder.Folder, 0, len(folders))
|
||||
@@ -881,10 +882,10 @@ func TestGetFolderUIDsForFilter(t *testing.T) {
|
||||
|
||||
func createTestLokiBackend(t *testing.T, req client.Requester, met *metrics.Historian) *RemoteLokiBackend {
|
||||
url, _ := url.Parse("http://some.url")
|
||||
cfg := LokiConfig{
|
||||
cfg := lokiclient.LokiConfig{
|
||||
WritePathURL: url,
|
||||
ReadPathURL: url,
|
||||
Encoder: JsonEncoder{},
|
||||
Encoder: lokiclient.JsonEncoder{},
|
||||
ExternalLabels: map[string]string{"externalLabelKey": "externalLabelValue"},
|
||||
}
|
||||
lokiBackendLogger := log.New("ngalert.state.historian", "backend", "loki")
|
||||
@@ -915,12 +916,12 @@ func createTestRule() history_model.RuleMeta {
|
||||
}
|
||||
}
|
||||
|
||||
func requireSingleEntry(t *testing.T, res Stream) LokiEntry {
|
||||
func requireSingleEntry(t *testing.T, res lokiclient.Stream) LokiEntry {
|
||||
require.Len(t, res.Values, 1)
|
||||
return requireEntry(t, res.Values[0])
|
||||
}
|
||||
|
||||
func requireEntry(t *testing.T, row Sample) LokiEntry {
|
||||
func requireEntry(t *testing.T, row lokiclient.Sample) LokiEntry {
|
||||
t.Helper()
|
||||
|
||||
var entry LokiEntry
|
||||
@@ -929,16 +930,6 @@ func requireEntry(t *testing.T, row Sample) LokiEntry {
|
||||
return entry
|
||||
}
|
||||
|
||||
func badResponse() *http.Response {
|
||||
return &http.Response{
|
||||
Status: "400 Bad Request",
|
||||
StatusCode: http.StatusBadRequest,
|
||||
Body: io.NopCloser(bytes.NewBufferString("")),
|
||||
ContentLength: int64(0),
|
||||
Header: make(http.Header, 0),
|
||||
}
|
||||
}
|
||||
|
||||
func readBody(t *testing.T, req *http.Request) []byte {
|
||||
t.Helper()
|
||||
|
||||
|
||||
@@ -1,43 +1,12 @@
|
||||
package historian
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
|
||||
"github.com/grafana/grafana/pkg/services/annotations"
|
||||
)
|
||||
|
||||
type fakeRequester struct {
|
||||
lastRequest *http.Request
|
||||
resp *http.Response
|
||||
}
|
||||
|
||||
func NewFakeRequester() *fakeRequester {
|
||||
return &fakeRequester{
|
||||
resp: &http.Response{
|
||||
Status: "200 OK",
|
||||
StatusCode: 200,
|
||||
Body: io.NopCloser(bytes.NewBufferString("")),
|
||||
ContentLength: int64(0),
|
||||
Header: make(http.Header, 0),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (f *fakeRequester) WithResponse(resp *http.Response) *fakeRequester {
|
||||
f.resp = resp
|
||||
return f
|
||||
}
|
||||
|
||||
func (f *fakeRequester) Do(req *http.Request) (*http.Response, error) {
|
||||
f.lastRequest = req
|
||||
f.resp.Request = req // Not concurrency-safe!
|
||||
return f.resp, nil
|
||||
}
|
||||
|
||||
type failingAnnotationRepo struct{}
|
||||
|
||||
func (f *failingAnnotationRepo) SaveMany(_ context.Context, _ []annotations.Item) error {
|
||||
|
||||
Reference in New Issue
Block a user