[v11.2.x] Annotations: Fix composite store read (#94182)

Annotations: Fix composite store read (#94158)

* Annotations: Fix composite store read

* Add test

* check error

(cherry picked from commit bd1741653d)
This commit is contained in:
Alexander Zobnin
2024-10-03 17:12:27 +02:00
committed by GitHub
parent 2e16baaa96
commit d0df683dd5
10 changed files with 82 additions and 57 deletions
@@ -77,7 +77,7 @@ func (r *RepositoryImpl) Find(ctx context.Context, query *annotations.ItemQuery)
// Search without dashboard UID filter is expensive, so check without access control first
if query.DashboardID == 0 && query.DashboardUID == "" {
// Return early if no annotations found, it's not necessary to perform expensive access control filtering
res, err := r.reader.Get(ctx, query, &accesscontrol.AccessResources{
res, err := r.reader.Get(ctx, *query, &accesscontrol.AccessResources{
SkipAccessControlFilter: true,
})
if err != nil || len(res) == 0 {
@@ -95,12 +95,12 @@ func (r *RepositoryImpl) Find(ctx context.Context, query *annotations.ItemQuery)
// Iterate over available annotations until query limit is reached
// or all available dashboards are checked
for len(results) < int(query.Limit) {
resources, err := r.authZ.Authorize(ctx, query)
resources, err := r.authZ.Authorize(ctx, *query)
if err != nil {
return nil, err
}
res, err := r.reader.Get(ctx, query, resources)
res, err := r.reader.Get(ctx, *query, resources)
if err != nil {
return nil, err
}
@@ -121,5 +121,5 @@ func (r *RepositoryImpl) Delete(ctx context.Context, params *annotations.DeleteP
}
func (r *RepositoryImpl) FindTags(ctx context.Context, query *annotations.TagsQuery) (annotations.FindTagsResult, error) {
return r.reader.GetTags(ctx, query)
return r.reader.GetTags(ctx, *query)
}
@@ -31,7 +31,7 @@ func (c *CompositeStore) Type() string {
}
// Get returns annotations from all stores, and combines the results.
func (c *CompositeStore) Get(ctx context.Context, query *annotations.ItemQuery, accessResources *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error) {
func (c *CompositeStore) Get(ctx context.Context, query annotations.ItemQuery, accessResources *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error) {
itemCh := make(chan []*annotations.ItemDTO, len(c.readers))
err := concurrency.ForEachJob(ctx, len(c.readers), len(c.readers), func(ctx context.Context, i int) (err error) {
@@ -56,7 +56,7 @@ func (c *CompositeStore) Get(ctx context.Context, query *annotations.ItemQuery,
}
// GetTags returns tags from all stores, and combines the results.
func (c *CompositeStore) GetTags(ctx context.Context, query *annotations.TagsQuery) (annotations.FindTagsResult, error) {
func (c *CompositeStore) GetTags(ctx context.Context, query annotations.TagsQuery) (annotations.FindTagsResult, error) {
resCh := make(chan annotations.FindTagsResult, len(c.readers))
err := concurrency.ForEachJob(ctx, len(c.readers), len(c.readers), func(ctx context.Context, i int) (err error) {
@@ -7,10 +7,11 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/annotations"
"github.com/grafana/grafana/pkg/services/annotations/accesscontrol"
"github.com/stretchr/testify/require"
)
var (
@@ -21,7 +22,7 @@ var (
func TestCompositeStore(t *testing.T) {
t.Run("should handle panic", func(t *testing.T) {
r1 := newFakeReader()
getPanic := func(context.Context, *annotations.ItemQuery, *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error) {
getPanic := func(context.Context, annotations.ItemQuery, *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error) {
panic("ohno")
}
r2 := newFakeReader(withGetFn(getPanic))
@@ -30,7 +31,7 @@ func TestCompositeStore(t *testing.T) {
[]readStore{r1, r2},
}
_, err := store.Get(context.Background(), nil, nil)
_, err := store.Get(context.Background(), annotations.ItemQuery{}, nil)
require.Error(t, err)
require.Contains(t, err.Error(), "concurrent job panic")
})
@@ -51,11 +52,11 @@ func TestCompositeStore(t *testing.T) {
err error
}{
{
f: func() (any, error) { return store.Get(context.Background(), nil, nil) },
f: func() (any, error) { return store.Get(context.Background(), annotations.ItemQuery{}, nil) },
err: errGet,
},
{
f: func() (any, error) { return store.GetTags(context.Background(), nil) },
f: func() (any, error) { return store.GetTags(context.Background(), annotations.TagsQuery{}) },
err: errGetTags,
},
}
@@ -93,7 +94,7 @@ func TestCompositeStore(t *testing.T) {
{TimeEnd: 1, Time: 1},
}
items, _ := store.Get(context.Background(), nil, nil)
items, _ := store.Get(context.Background(), annotations.ItemQuery{}, nil)
require.Equal(t, expected, items)
})
@@ -122,16 +123,40 @@ func TestCompositeStore(t *testing.T) {
{Tag: "key2:val2"},
}
res, _ := store.GetTags(context.Background(), nil)
res, _ := store.GetTags(context.Background(), annotations.TagsQuery{})
require.Equal(t, expected, res.Tags)
})
// Check if reader is not modifying query since it might cause a race condition in case of composite store
t.Run("should not modify query", func(t *testing.T) {
getFn1 := func(ctx context.Context, query annotations.ItemQuery, resources *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error) {
query.From = 1
return []*annotations.ItemDTO{}, nil
}
getFn2 := func(ctx context.Context, query annotations.ItemQuery, resources *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error) {
return []*annotations.ItemDTO{}, nil
}
r1 := newFakeReader(withGetFn(getFn1))
r2 := newFakeReader(withGetFn(getFn2))
store := &CompositeStore{
log.NewNopLogger(),
[]readStore{r1, r2},
}
query := annotations.ItemQuery{}
_, err := store.Get(context.Background(), query, nil)
require.NoError(t, err)
require.Equal(t, int64(0), query.From)
})
}
type fakeReader struct {
items []*annotations.ItemDTO
tagRes annotations.FindTagsResult
getFn func(context.Context, *annotations.ItemQuery, *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error)
getTagFn func(context.Context, *annotations.TagsQuery) (annotations.FindTagsResult, error)
getFn func(context.Context, annotations.ItemQuery, *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error)
getTagFn func(context.Context, annotations.TagsQuery) (annotations.FindTagsResult, error)
wait time.Duration
err error
}
@@ -140,7 +165,7 @@ func (f *fakeReader) Type() string {
return "fake"
}
func (f *fakeReader) Get(ctx context.Context, query *annotations.ItemQuery, accessResources *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error) {
func (f *fakeReader) Get(ctx context.Context, query annotations.ItemQuery, accessResources *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error) {
if f.getFn != nil {
return f.getFn(ctx, query, accessResources)
}
@@ -157,7 +182,7 @@ func (f *fakeReader) Get(ctx context.Context, query *annotations.ItemQuery, acce
return f.items, nil
}
func (f *fakeReader) GetTags(ctx context.Context, query *annotations.TagsQuery) (annotations.FindTagsResult, error) {
func (f *fakeReader) GetTags(ctx context.Context, query annotations.TagsQuery) (annotations.FindTagsResult, error) {
if f.getTagFn != nil {
return f.getTagFn(ctx, query)
}
@@ -198,7 +223,7 @@ func withTags(tags []*annotations.TagsDTO) func(*fakeReader) {
}
}
func withGetFn(fn func(context.Context, *annotations.ItemQuery, *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error)) func(*fakeReader) {
func withGetFn(fn func(context.Context, annotations.ItemQuery, *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error)) func(*fakeReader) {
return func(f *fakeReader) {
f.getFn = fn
}
@@ -76,7 +76,7 @@ func (r *LokiHistorianStore) Type() string {
return "loki"
}
func (r *LokiHistorianStore) Get(ctx context.Context, query *annotations.ItemQuery, accessResources *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error) {
func (r *LokiHistorianStore) Get(ctx context.Context, query annotations.ItemQuery, accessResources *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error) {
if query.Type == "annotation" {
return make([]*annotations.ItemDTO, 0), nil
}
@@ -100,7 +100,7 @@ func (r *LokiHistorianStore) Get(ctx context.Context, query *annotations.ItemQue
}
// No folders in the filter because it filter by Dashboard UID, and the request is already authorized.
logQL, err := historian.BuildLogQuery(buildHistoryQuery(query, accessResources.Dashboards, rule.UID), nil, r.client.MaxQuerySize())
logQL, err := historian.BuildLogQuery(buildHistoryQuery(&query, accessResources.Dashboards, rule.UID), nil, r.client.MaxQuerySize())
if err != nil {
grafanaErr := errutil.Error{}
if errors.As(err, &grafanaErr) {
@@ -188,7 +188,7 @@ func (r *LokiHistorianStore) annotationsFromStream(stream historian.Stream, ac a
return items
}
func (r *LokiHistorianStore) GetTags(ctx context.Context, query *annotations.TagsQuery) (annotations.FindTagsResult, error) {
func (r *LokiHistorianStore) GetTags(ctx context.Context, query annotations.TagsQuery) (annotations.FindTagsResult, error) {
return annotations.FindTagsResult{Tags: []*annotations.TagsDTO{}}, nil
}
@@ -93,7 +93,7 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) {
}
res, err := store.Get(
context.Background(),
&query,
query,
&annotation_ac.AccessResources{
Dashboards: map[string]int64{
dashboard1.UID: dashboard1.ID,
@@ -119,7 +119,7 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) {
}
res, err := store.Get(
context.Background(),
&query,
query,
&annotation_ac.AccessResources{
Dashboards: map[string]int64{
dashboard1.UID: dashboard1.ID,
@@ -143,7 +143,7 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) {
}
res, err := store.Get(
context.Background(),
&query,
query,
&annotation_ac.AccessResources{
Dashboards: map[string]int64{
dashboard1.UID: dashboard1.ID,
@@ -169,7 +169,7 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) {
}
res, err := store.Get(
context.Background(),
&query,
query,
&annotation_ac.AccessResources{
Dashboards: map[string]int64{
dashboard1.UID: dashboard1.ID,
@@ -199,7 +199,7 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) {
}
res, err := store.Get(
context.Background(),
&query,
query,
&annotation_ac.AccessResources{
Dashboards: map[string]int64{
dashboard1.UID: dashboard1.ID,
@@ -228,7 +228,7 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) {
}
res, err := store.Get(
context.Background(),
&query,
query,
&annotation_ac.AccessResources{
Dashboards: map[string]int64{
dashboard1.UID: dashboard1.ID,
@@ -262,7 +262,7 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) {
}
res, err := store.Get(
context.Background(),
&query,
query,
&annotation_ac.AccessResources{
Dashboards: map[string]int64{
dashboard1.UID: dashboard1.ID,
@@ -20,8 +20,8 @@ type commonStore interface {
type readStore interface {
commonStore
Get(ctx context.Context, query *annotations.ItemQuery, accessResources *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error)
GetTags(ctx context.Context, query *annotations.TagsQuery) (annotations.FindTagsResult, error)
Get(ctx context.Context, query annotations.ItemQuery, accessResources *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error)
GetTags(ctx context.Context, query annotations.TagsQuery) (annotations.FindTagsResult, error)
}
type writeStore interface {
@@ -245,7 +245,7 @@ func tagSet[T any](fn func(T) int64, list []T) map[int64]struct{} {
return set
}
func (r *xormRepositoryImpl) Get(ctx context.Context, query *annotations.ItemQuery, accessResources *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error) {
func (r *xormRepositoryImpl) Get(ctx context.Context, query annotations.ItemQuery, accessResources *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error) {
var sql bytes.Buffer
params := make([]interface{}, 0)
items := make([]*annotations.ItemDTO, 0)
@@ -446,7 +446,7 @@ func (r *xormRepositoryImpl) Delete(ctx context.Context, params *annotations.Del
})
}
func (r *xormRepositoryImpl) GetTags(ctx context.Context, query *annotations.TagsQuery) (annotations.FindTagsResult, error) {
func (r *xormRepositoryImpl) GetTags(ctx context.Context, query annotations.TagsQuery) (annotations.FindTagsResult, error) {
var items []*annotations.Tag
err := r.db.WithDbSession(ctx, func(dbSession *db.Session) error {
if query.Limit == 0 {
@@ -133,7 +133,7 @@ func TestIntegrationAnnotations(t *testing.T) {
assert.Greater(t, organizationAnnotation2.ID, int64(0))
t.Run("Can query for annotation by dashboard id", func(t *testing.T) {
items, err := store.Get(context.Background(), &annotations.ItemQuery{
items, err := store.Get(context.Background(), annotations.ItemQuery{
OrgID: 1,
DashboardID: dashboard.ID,
From: 0,
@@ -182,7 +182,7 @@ func TestIntegrationAnnotations(t *testing.T) {
err := store.AddMany(context.Background(), items)
require.NoError(t, err)
query := &annotations.ItemQuery{OrgID: 100, SignedInUser: testUser}
query := annotations.ItemQuery{OrgID: 100, SignedInUser: testUser}
accRes := &annotation_ac.AccessResources{CanAccessOrgAnnotations: true}
inserted, err := store.Get(context.Background(), query, accRes)
require.NoError(t, err)
@@ -209,7 +209,7 @@ func TestIntegrationAnnotations(t *testing.T) {
err := store.AddMany(context.Background(), items)
require.NoError(t, err)
query := &annotations.ItemQuery{OrgID: 101, SignedInUser: testUser}
query := annotations.ItemQuery{OrgID: 101, SignedInUser: testUser}
accRes := &annotation_ac.AccessResources{CanAccessOrgAnnotations: true}
inserted, err := store.Get(context.Background(), query, accRes)
require.NoError(t, err)
@@ -217,7 +217,7 @@ func TestIntegrationAnnotations(t *testing.T) {
})
t.Run("Can query for annotation by id", func(t *testing.T) {
items, err := store.Get(context.Background(), &annotations.ItemQuery{
items, err := store.Get(context.Background(), annotations.ItemQuery{
OrgID: 1,
AnnotationID: annotation2.ID,
SignedInUser: testUser,
@@ -237,7 +237,7 @@ func TestIntegrationAnnotations(t *testing.T) {
Dashboards: map[string]int64{"foo": 1},
CanAccessDashAnnotations: true,
}
items, err := store.Get(context.Background(), &annotations.ItemQuery{
items, err := store.Get(context.Background(), annotations.ItemQuery{
OrgID: 1,
DashboardID: 1,
From: 12,
@@ -253,7 +253,7 @@ func TestIntegrationAnnotations(t *testing.T) {
Dashboards: map[string]int64{"foo": 1},
CanAccessDashAnnotations: true,
}
items, err := store.Get(context.Background(), &annotations.ItemQuery{
items, err := store.Get(context.Background(), annotations.ItemQuery{
OrgID: 1,
DashboardID: 1,
From: 1,
@@ -270,7 +270,7 @@ func TestIntegrationAnnotations(t *testing.T) {
Dashboards: map[string]int64{"foo": 1},
CanAccessDashAnnotations: true,
}
items, err := store.Get(context.Background(), &annotations.ItemQuery{
items, err := store.Get(context.Background(), annotations.ItemQuery{
OrgID: 1,
DashboardID: 1,
From: 1,
@@ -287,7 +287,7 @@ func TestIntegrationAnnotations(t *testing.T) {
Dashboards: map[string]int64{"foo": 1},
CanAccessDashAnnotations: true,
}
items, err := store.Get(context.Background(), &annotations.ItemQuery{
items, err := store.Get(context.Background(), annotations.ItemQuery{
OrgID: 1,
DashboardID: 1,
From: 1,
@@ -301,7 +301,7 @@ func TestIntegrationAnnotations(t *testing.T) {
t.Run("Should find two annotations using partial match", func(t *testing.T) {
accRes := &annotation_ac.AccessResources{CanAccessOrgAnnotations: true}
items, err := store.Get(context.Background(), &annotations.ItemQuery{
items, err := store.Get(context.Background(), annotations.ItemQuery{
OrgID: 1,
From: 1,
To: 25,
@@ -318,7 +318,7 @@ func TestIntegrationAnnotations(t *testing.T) {
Dashboards: map[string]int64{"foo": 1},
CanAccessDashAnnotations: true,
}
items, err := store.Get(context.Background(), &annotations.ItemQuery{
items, err := store.Get(context.Background(), annotations.ItemQuery{
OrgID: 1,
DashboardID: 1,
From: 1,
@@ -331,7 +331,7 @@ func TestIntegrationAnnotations(t *testing.T) {
})
t.Run("Can update annotation and remove all tags", func(t *testing.T) {
query := &annotations.ItemQuery{
query := annotations.ItemQuery{
OrgID: 1,
DashboardID: 1,
From: 0,
@@ -366,7 +366,7 @@ func TestIntegrationAnnotations(t *testing.T) {
})
t.Run("Can update annotation with new tags", func(t *testing.T) {
query := &annotations.ItemQuery{
query := annotations.ItemQuery{
OrgID: 1,
DashboardID: 1,
From: 0,
@@ -399,7 +399,7 @@ func TestIntegrationAnnotations(t *testing.T) {
})
t.Run("Can update annotation with additional tags", func(t *testing.T) {
query := &annotations.ItemQuery{
query := annotations.ItemQuery{
OrgID: 1,
DashboardID: 1,
From: 0,
@@ -432,7 +432,7 @@ func TestIntegrationAnnotations(t *testing.T) {
})
t.Run("Can update annotations with data", func(t *testing.T) {
query := &annotations.ItemQuery{
query := annotations.ItemQuery{
OrgID: 1,
DashboardID: 1,
From: 0,
@@ -468,7 +468,7 @@ func TestIntegrationAnnotations(t *testing.T) {
})
t.Run("Can delete annotation", func(t *testing.T) {
query := &annotations.ItemQuery{
query := annotations.ItemQuery{
OrgID: 1,
DashboardID: 1,
From: 0,
@@ -512,7 +512,7 @@ func TestIntegrationAnnotations(t *testing.T) {
CanAccessDashAnnotations: true,
}
query := &annotations.ItemQuery{
query := annotations.ItemQuery{
OrgID: 1,
AnnotationID: annotation3.ID,
SignedInUser: testUser,
@@ -531,7 +531,7 @@ func TestIntegrationAnnotations(t *testing.T) {
})
t.Run("Should find tags by key", func(t *testing.T) {
result, err := store.GetTags(context.Background(), &annotations.TagsQuery{
result, err := store.GetTags(context.Background(), annotations.TagsQuery{
OrgID: 1,
Tag: "server",
})
@@ -542,7 +542,7 @@ func TestIntegrationAnnotations(t *testing.T) {
})
t.Run("Should find tags by value", func(t *testing.T) {
result, err := store.GetTags(context.Background(), &annotations.TagsQuery{
result, err := store.GetTags(context.Background(), annotations.TagsQuery{
OrgID: 1,
Tag: "outage",
})
@@ -555,7 +555,7 @@ func TestIntegrationAnnotations(t *testing.T) {
})
t.Run("Should not find tags in other org", func(t *testing.T) {
result, err := store.GetTags(context.Background(), &annotations.TagsQuery{
result, err := store.GetTags(context.Background(), annotations.TagsQuery{
OrgID: 0,
Tag: "server-1",
})
@@ -564,7 +564,7 @@ func TestIntegrationAnnotations(t *testing.T) {
})
t.Run("Should not find tags that do not exist", func(t *testing.T) {
result, err := store.GetTags(context.Background(), &annotations.TagsQuery{
result, err := store.GetTags(context.Background(), annotations.TagsQuery{
OrgID: 0,
Tag: "unknown:tag",
})
@@ -650,7 +650,7 @@ func benchmarkFindTags(b *testing.B, numAnnotations int) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
result, err := store.GetTags(context.Background(), &annotations.TagsQuery{
result, err := store.GetTags(context.Background(), annotations.TagsQuery{
OrgID: 1,
Tag: "outage",
})