Nested Folders: Fix /api/folders pagination (#79447)

* Nested Folders: Fix /api/folders pagination

We used to check access to the root folders after fetching them from the DB with pagination.
This fix splits logic for fetching folders in:
- fetching subfolders
- fetching root folders
and refactors the query for the latter so that is filters by folders with permissions

* Add tests

* Update benchmarks
This commit is contained in:
Sofia Papagiannaki
2023-12-15 19:34:08 +02:00
committed by GitHub
parent cf8e8852c3
commit d89a8a3a82
9 changed files with 434 additions and 55 deletions
+80 -41
View File
@@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"runtime"
"strings"
"sync"
"time"
@@ -11,6 +12,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/exp/slices"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/events"
"github.com/grafana/grafana/pkg/infra/db"
@@ -149,32 +151,36 @@ func (s *Service) Get(ctx context.Context, cmd *folder.GetFolderQuery) (*folder.
return f, err
}
func (s *Service) GetChildren(ctx context.Context, cmd *folder.GetChildrenQuery) ([]*folder.Folder, error) {
if cmd.SignedInUser == nil {
func (s *Service) GetChildren(ctx context.Context, q *folder.GetChildrenQuery) ([]*folder.Folder, error) {
if q.SignedInUser == nil {
return nil, folder.ErrBadRequest.Errorf("missing signed in user")
}
if s.features.IsEnabled(ctx, featuremgmt.FlagNestedFolders) && cmd.UID == folder.SharedWithMeFolderUID {
return s.GetSharedWithMe(ctx, cmd)
if s.features.IsEnabled(ctx, featuremgmt.FlagNestedFolders) && q.UID == folder.SharedWithMeFolderUID {
return s.GetSharedWithMe(ctx, q)
}
if cmd.UID != "" {
g, err := guardian.NewByUID(ctx, cmd.UID, cmd.OrgID, cmd.SignedInUser)
if err != nil {
return nil, err
}
canView, err := g.CanView()
if err != nil {
return nil, err
}
if !canView {
return nil, dashboards.ErrFolderAccessDenied
}
if q.UID == "" {
return s.getRootFolders(ctx, q)
}
children, err := s.store.GetChildren(ctx, *cmd)
// we only need to check access to the folder
// if the parent is accessible then the subfolders are accessible as well (due to inheritance)
g, err := guardian.NewByUID(ctx, q.UID, q.OrgID, q.SignedInUser)
if err != nil {
return nil, err
}
canView, err := g.CanView()
if err != nil {
return nil, err
}
if !canView {
return nil, dashboards.ErrFolderAccessDenied
}
children, err := s.store.GetChildren(ctx, *q)
if err != nil {
return nil, err
}
@@ -184,12 +190,11 @@ func (s *Service) GetChildren(ctx context.Context, cmd *folder.GetChildrenQuery)
childrenUIDs = append(childrenUIDs, f.UID)
}
dashFolders, err := s.dashboardFolderStore.GetFolders(ctx, cmd.OrgID, childrenUIDs)
dashFolders, err := s.dashboardFolderStore.GetFolders(ctx, q.OrgID, childrenUIDs)
if err != nil {
return nil, folder.ErrInternal.Errorf("failed to fetch subfolders from dashboard store: %w", err)
}
filtered := make([]*folder.Folder, 0, len(children))
for _, f := range children {
// fetch folder from dashboard store
dashFolder, ok := dashFolders[f.UID]
@@ -201,33 +206,67 @@ func (s *Service) GetChildren(ctx context.Context, cmd *folder.GetChildrenQuery)
// always expose the dashboard store sequential ID
// nolint:staticcheck
f.ID = dashFolder.ID
}
if cmd.UID != "" {
// parent access has been checked already
// the subfolder must be accessible as well (due to inheritance)
filtered = append(filtered, f)
continue
}
return children, nil
}
g, err := guardian.NewByFolder(ctx, dashFolder, dashFolder.OrgID, cmd.SignedInUser)
if err != nil {
return nil, err
func (s *Service) getRootFolders(ctx context.Context, q *folder.GetChildrenQuery) ([]*folder.Folder, error) {
permissions := q.SignedInUser.GetPermissions()
folderPermissions := permissions[dashboards.ActionFoldersRead]
folderPermissions = append(folderPermissions, permissions[dashboards.ActionDashboardsRead]...)
q.FolderUIDs = make([]string, 0, len(folderPermissions))
for _, p := range folderPermissions {
if p == dashboards.ScopeFoldersAll {
// no need to query for folders with permissions
// the user has permission to access all folders
q.FolderUIDs = nil
break
}
canView, err := g.CanView()
if err != nil {
return nil, err
}
if canView {
filtered = append(filtered, f)
if folderUid, found := strings.CutPrefix(p, dashboards.ScopeFoldersPrefix); found {
if !slices.Contains(q.FolderUIDs, folderUid) {
q.FolderUIDs = append(q.FolderUIDs, folderUid)
}
}
}
if len(filtered) < len(children) {
// add "shared with me" folder
filtered = append(filtered, &folder.SharedWithMeFolder)
children, err := s.store.GetChildren(ctx, *q)
if err != nil {
return nil, err
}
return filtered, nil
childrenUIDs := make([]string, 0, len(children))
for _, f := range children {
childrenUIDs = append(childrenUIDs, f.UID)
}
dashFolders, err := s.dashboardFolderStore.GetFolders(ctx, q.OrgID, childrenUIDs)
if err != nil {
return nil, folder.ErrInternal.Errorf("failed to fetch subfolders from dashboard store: %w", err)
}
if err := concurrency.ForEachJob(ctx, len(children), runtime.NumCPU(), func(ctx context.Context, i int) error {
f := children[i]
// fetch folder from dashboard store
dashFolder, ok := dashFolders[f.UID]
if !ok {
s.log.Error("failed to fetch folder by UID from dashboard store", "orgID", f.OrgID, "uid", f.UID)
}
// always expose the dashboard store sequential ID
// nolint:staticcheck
f.ID = dashFolder.ID
return nil
}); err != nil {
return nil, folder.ErrInternal.Errorf("failed to assign folder sequential ID: %w", err)
}
// add "shared with me" folder on the 1st page
if (q.Page == 0 || q.Page == 1) && len(q.FolderUIDs) != 0 {
children = append(children, &folder.SharedWithMeFolder)
}
return children, nil
}
// GetSharedWithMe returns folders available to user, which cannot be accessed from the root folders
@@ -253,7 +292,7 @@ func (s *Service) getAvailableNonRootFolders(ctx context.Context, orgID int64, u
folderPermissions := permissions[dashboards.ActionFoldersRead]
folderPermissions = append(folderPermissions, permissions[dashboards.ActionDashboardsRead]...)
nonRootFolders := make([]*folder.Folder, 0)
folderUids := make([]string, 0)
folderUids := make([]string, 0, len(folderPermissions))
for _, p := range folderPermissions {
if folderUid, found := strings.CutPrefix(p, dashboards.ScopeFoldersPrefix); found {
if !slices.Contains(folderUids, folderUid) {