diff --git a/pkg/services/annotations/annotationsimpl/annotations.go b/pkg/services/annotations/annotationsimpl/annotations.go index f9640a51e97..3cf88288d27 100644 --- a/pkg/services/annotations/annotationsimpl/annotations.go +++ b/pkg/services/annotations/annotationsimpl/annotations.go @@ -38,7 +38,7 @@ func ProvideService( historianStore := loki.NewLokiHistorianStore(cfg.UnifiedAlerting.StateHistory, features, db, log.New("annotations.loki")) if historianStore != nil { l.Debug("Using composite read store") - read = NewCompositeStore(xormStore, historianStore) + read = NewCompositeStore(log.New("annotations.composite"), xormStore, historianStore) } else { l.Debug("Using xorm read store") read = write diff --git a/pkg/services/annotations/annotationsimpl/composite_store.go b/pkg/services/annotations/annotationsimpl/composite_store.go index 4db6e1c9ece..3bcf0724b52 100644 --- a/pkg/services/annotations/annotationsimpl/composite_store.go +++ b/pkg/services/annotations/annotationsimpl/composite_store.go @@ -2,8 +2,11 @@ package annotationsimpl import ( "context" + "fmt" "sort" + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/dskit/concurrency" "github.com/grafana/grafana/pkg/services/annotations" "github.com/grafana/grafana/pkg/services/annotations/accesscontrol" @@ -11,20 +14,29 @@ import ( // CompositeStore is a read store that combines two or more read stores, and queries all stores in parallel. type CompositeStore struct { + logger log.Logger readers []readStore } -func NewCompositeStore(readers ...readStore) *CompositeStore { +func NewCompositeStore(logger log.Logger, readers ...readStore) *CompositeStore { return &CompositeStore{ + logger: logger, readers: readers, } } +// Satisfy the commonStore interface, in practice this is not used. +func (c *CompositeStore) Type() string { + return "composite" +} + // Get returns annotations from all stores, and combines the results. func (c *CompositeStore) Get(ctx context.Context, query *annotations.ItemQuery, accessResources *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error) { itemCh := make(chan []*annotations.ItemDTO, len(c.readers)) - err := concurrency.ForEachJob(ctx, len(c.readers), len(c.readers), func(ctx context.Context, i int) error { + err := concurrency.ForEachJob(ctx, len(c.readers), len(c.readers), func(ctx context.Context, i int) (err error) { + defer handleJobPanic(c.logger, c.readers[i].Type(), &err) + items, err := c.readers[i].Get(ctx, query, accessResources) itemCh <- items return err @@ -47,7 +59,9 @@ func (c *CompositeStore) Get(ctx context.Context, query *annotations.ItemQuery, func (c *CompositeStore) GetTags(ctx context.Context, query *annotations.TagsQuery) (annotations.FindTagsResult, error) { resCh := make(chan annotations.FindTagsResult, len(c.readers)) - err := concurrency.ForEachJob(ctx, len(c.readers), len(c.readers), func(ctx context.Context, i int) error { + err := concurrency.ForEachJob(ctx, len(c.readers), len(c.readers), func(ctx context.Context, i int) (err error) { + defer handleJobPanic(c.logger, c.readers[i].Type(), &err) + res, err := c.readers[i].GetTags(ctx, query) resCh <- res return err @@ -65,3 +79,20 @@ func (c *CompositeStore) GetTags(ctx context.Context, query *annotations.TagsQue return annotations.FindTagsResult{Tags: res}, nil } + +// handleJobPanic is a helper function that recovers from a panic in a concurrent job., +// It will log the error and set the job error if it is not nil. +func handleJobPanic(logger log.Logger, storeType string, jobErr *error) { + if r := recover(); r != nil { + logger.Error("Annotation store panic", "error", r, "store", storeType, "stack", log.Stack(1)) + errMsg := "concurrent job panic" + + if jobErr != nil { + err := fmt.Errorf(errMsg) + if panicErr, ok := r.(error); ok { + err = fmt.Errorf("%s: %w", errMsg, panicErr) + } + *jobErr = err + } + } +} diff --git a/pkg/services/annotations/annotationsimpl/composite_store_test.go b/pkg/services/annotations/annotationsimpl/composite_store_test.go index e7800d5dbdc..b5aa1d22d8c 100644 --- a/pkg/services/annotations/annotationsimpl/composite_store_test.go +++ b/pkg/services/annotations/annotationsimpl/composite_store_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/services/annotations" "github.com/grafana/grafana/pkg/services/annotations/accesscontrol" "github.com/stretchr/testify/require" @@ -18,6 +19,22 @@ var ( ) func TestCompositeStore(t *testing.T) { + t.Run("should handle panic", func(t *testing.T) { + r1 := newFakeReader() + getPanic := func(context.Context, *annotations.ItemQuery, *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error) { + panic("ohno") + } + r2 := newFakeReader(withGetFn(getPanic)) + store := &CompositeStore{ + log.NewNopLogger(), + []readStore{r1, r2}, + } + + _, err := store.Get(context.Background(), nil, nil) + require.Error(t, err) + require.Contains(t, err.Error(), "concurrent job panic") + }) + t.Run("should return first error", func(t *testing.T) { err1 := errors.New("error 1") r1 := newFakeReader(withError(err1)) @@ -25,6 +42,7 @@ func TestCompositeStore(t *testing.T) { r2 := newFakeReader(withError(err2), withWait(10*time.Millisecond)) store := &CompositeStore{ + log.NewNopLogger(), []readStore{r1, r2}, } @@ -64,6 +82,7 @@ func TestCompositeStore(t *testing.T) { r2 := newFakeReader(withItems(items2)) store := &CompositeStore{ + log.NewNopLogger(), []readStore{r1, r2}, } @@ -92,6 +111,7 @@ func TestCompositeStore(t *testing.T) { r2 := newFakeReader(withTags(tags2)) store := &CompositeStore{ + log.NewNopLogger(), []readStore{r1, r2}, } @@ -108,13 +128,23 @@ func TestCompositeStore(t *testing.T) { } type fakeReader struct { - items []*annotations.ItemDTO - tagRes annotations.FindTagsResult - wait time.Duration - err error + items []*annotations.ItemDTO + tagRes annotations.FindTagsResult + getFn func(context.Context, *annotations.ItemQuery, *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error) + getTagFn func(context.Context, *annotations.TagsQuery) (annotations.FindTagsResult, error) + wait time.Duration + err error +} + +func (f *fakeReader) Type() string { + return "fake" } func (f *fakeReader) Get(ctx context.Context, query *annotations.ItemQuery, accessResources *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error) { + if f.getFn != nil { + return f.getFn(ctx, query, accessResources) + } + if f.wait > 0 { time.Sleep(f.wait) } @@ -128,6 +158,10 @@ func (f *fakeReader) Get(ctx context.Context, query *annotations.ItemQuery, acce } func (f *fakeReader) GetTags(ctx context.Context, query *annotations.TagsQuery) (annotations.FindTagsResult, error) { + if f.getTagFn != nil { + return f.getTagFn(ctx, query) + } + if f.wait > 0 { time.Sleep(f.wait) } @@ -164,6 +198,12 @@ func withTags(tags []*annotations.TagsDTO) func(*fakeReader) { } } +func withGetFn(fn func(context.Context, *annotations.ItemQuery, *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error)) func(*fakeReader) { + return func(f *fakeReader) { + f.getFn = fn + } +} + func newFakeReader(opts ...func(*fakeReader)) *fakeReader { f := &fakeReader{} for _, opt := range opts { diff --git a/pkg/services/annotations/annotationsimpl/loki/historian_store.go b/pkg/services/annotations/annotationsimpl/loki/historian_store.go index 451375b5212..3d12623bd02 100644 --- a/pkg/services/annotations/annotationsimpl/loki/historian_store.go +++ b/pkg/services/annotations/annotationsimpl/loki/historian_store.go @@ -69,6 +69,10 @@ func NewLokiHistorianStore(cfg setting.UnifiedAlertingStateHistorySettings, ft f } } +func (r *LokiHistorianStore) Type() string { + return "loki" +} + func (r *LokiHistorianStore) Get(ctx context.Context, query *annotations.ItemQuery, accessResources *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error) { if query.Type == "annotation" { return make([]*annotations.ItemDTO, 0), nil @@ -124,6 +128,7 @@ func (r *LokiHistorianStore) annotationsFromStream(stream historian.Stream, ac a err := json.Unmarshal([]byte(sample.V), &entry) if err != nil { // bad data, skip + r.log.Debug("failed to unmarshal loki entry", "error", err, "entry", sample.V) continue } @@ -135,6 +140,7 @@ func (r *LokiHistorianStore) annotationsFromStream(stream historian.Stream, ac a transition, err := buildTransition(entry) if err != nil { // bad data, skip + r.log.Debug("failed to build transition", "error", err, "entry", entry) continue } @@ -207,6 +213,10 @@ type number interface { // numericMap converts a simplejson map[string]any to a map[string]N, where N is numeric (int or float). func numericMap[N number](j *simplejson.Json) (map[string]N, error) { + if j == nil { + return nil, fmt.Errorf("unexpected nil value") + } + m, err := j.Map() if err != nil { return nil, err diff --git a/pkg/services/annotations/annotationsimpl/loki/historian_store_test.go b/pkg/services/annotations/annotationsimpl/loki/historian_store_test.go index 1123a7bbddc..6ed3933de88 100644 --- a/pkg/services/annotations/annotationsimpl/loki/historian_store_test.go +++ b/pkg/services/annotations/annotationsimpl/loki/historian_store_test.go @@ -374,7 +374,23 @@ func TestHasAccess(t *testing.T) { }) } -func TestFloat64Map(t *testing.T) { +func TestNumericMap(t *testing.T) { + t.Run("should return error for nil value", func(t *testing.T) { + var jsonMap *simplejson.Json + _, err := numericMap[float64](jsonMap) + require.Error(t, err) + require.Contains(t, err.Error(), "unexpected nil value") + }) + + t.Run("should return error for nil interface value", func(t *testing.T) { + jsonMap := simplejson.NewFromAny(map[string]any{ + "key1": nil, + }) + _, err := numericMap[float64](jsonMap) + require.Error(t, err) + require.Contains(t, err.Error(), "unexpected value type") + }) + t.Run(`should convert json string:float kv to Golang map[string]float64`, func(t *testing.T) { jsonMap := simplejson.NewFromAny(map[string]any{ "key1": json.Number("1.0"), diff --git a/pkg/services/annotations/annotationsimpl/store.go b/pkg/services/annotations/annotationsimpl/store.go index 1a3d3991416..b1c3643a9c3 100644 --- a/pkg/services/annotations/annotationsimpl/store.go +++ b/pkg/services/annotations/annotationsimpl/store.go @@ -14,12 +14,18 @@ type store interface { writeStore } +type commonStore interface { + Type() string +} + type readStore interface { + commonStore Get(ctx context.Context, query *annotations.ItemQuery, accessResources *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error) GetTags(ctx context.Context, query *annotations.TagsQuery) (annotations.FindTagsResult, error) } type writeStore interface { + commonStore Add(ctx context.Context, items *annotations.Item) error AddMany(ctx context.Context, items []annotations.Item) error Update(ctx context.Context, item *annotations.Item) error diff --git a/pkg/services/annotations/annotationsimpl/xorm_store.go b/pkg/services/annotations/annotationsimpl/xorm_store.go index a9edcebbee2..6ae9ddc3ce1 100644 --- a/pkg/services/annotations/annotationsimpl/xorm_store.go +++ b/pkg/services/annotations/annotationsimpl/xorm_store.go @@ -54,6 +54,10 @@ func NewXormStore(cfg *setting.Cfg, l log.Logger, db db.DB, tagService tag.Servi } } +func (r *xormRepositoryImpl) Type() string { + return "sql" +} + func (r *xormRepositoryImpl) Add(ctx context.Context, item *annotations.Item) error { tags := tag.ParseTagPairs(item.Tags) item.Tags = tag.JoinTagPairs(tags)