diff --git a/apps/provisioning/go.mod b/apps/provisioning/go.mod index f8a84dca296..9b144b4060c 100644 --- a/apps/provisioning/go.mod +++ b/apps/provisioning/go.mod @@ -3,6 +3,7 @@ module github.com/grafana/grafana/apps/provisioning go 1.25.2 require ( + github.com/fsnotify/fsnotify v1.9.0 github.com/google/go-github/v70 v70.0.0 github.com/google/uuid v1.6.0 github.com/grafana/authlib v0.0.0-20250930082137-a40e2c2b094f diff --git a/apps/provisioning/go.sum b/apps/provisioning/go.sum index a2f85bf8eda..ce659fcf611 100644 --- a/apps/provisioning/go.sum +++ b/apps/provisioning/go.sum @@ -10,6 +10,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/emicklei/go-restful/v3 v3.13.0 h1:C4Bl2xDndpU6nJ4bc1jXd+uTmYPVUwkD6bFY/oTyCes= github.com/emicklei/go-restful/v3 v3.13.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= +github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= +github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= github.com/fxamacker/cbor/v2 v2.9.0 h1:NpKPmjDBgUfBms6tr6JZkTHtfFGcMKsw3eGcmD/sapM= github.com/fxamacker/cbor/v2 v2.9.0/go.mod h1:vM4b+DJCtHn+zz7h3FFp/hDAI9WNWCsZj23V5ytsSxQ= github.com/go-jose/go-jose/v3 v3.0.4 h1:Wp5HA7bLQcKnf6YYao/4kpRpVMp/yf6+pJKV8WFSaNY= diff --git a/apps/provisioning/pkg/repository/local/watch.go b/apps/provisioning/pkg/repository/local/watch.go new file mode 100644 index 00000000000..91caed9a1d3 --- /dev/null +++ b/apps/provisioning/pkg/repository/local/watch.go @@ -0,0 +1,130 @@ +package local + +import ( + "context" + "fmt" + "io/fs" + "math" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/fsnotify/fsnotify" + + "github.com/grafana/grafana-app-sdk/logging" +) + +type FileWatcher interface { + Watch(ctx context.Context, events chan<- string) +} + +type fileWatcher struct { + prefix string + accept func(string) bool + waitFor time.Duration + timersMu sync.Mutex + timers map[string]*time.Timer + watcher *fsnotify.Watcher + logger logging.Logger +} + +// File watcher that buffers events for 100ms before actually firing them +// this is helpful because editing a file may often update the same file many many times +// for what seems like a single operation. +// See: https://github.com/fsnotify/fsnotify/blob/main/cmd/fsnotify/dedup.go +func NewFileWatcher(path string, accept func(string) bool) (FileWatcher, error) { + info, _ := os.Stat(path) + if info == nil || !info.IsDir() { + return nil, fmt.Errorf("expecting to watch a folder") + } + + w, err := fsnotify.NewWatcher() + if err != nil { + return nil, err + } + + if err := w.Add(path); err != nil { + _ = w.Close() + return nil, err + } + + if err = filepath.WalkDir(path, func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + if d.IsDir() { + if err = w.Add(path); err != nil { + return err + } + } + return nil + }); err != nil { + _ = w.Close() + return nil, err + } + + return &fileWatcher{ + prefix: path + "/", + accept: accept, + waitFor: 100 * time.Millisecond, + timers: make(map[string]*time.Timer), + watcher: w, + logger: logging.DefaultLogger.With("watch", path), + }, nil +} + +// Keep watching for changes until the context is done +func (f *fileWatcher) Watch(ctx context.Context, events chan<- string) { + for { + select { + case <-ctx.Done(): + close(events) + return + + case _, ok := <-f.watcher.Errors: + if !ok { // Channel was closed (i.e. Watcher.Close() was called). + close(events) + return + } + + // Read from Events. + case e, ok := <-f.watcher.Events: + if !ok { // Channel was closed (i.e. Watcher.Close() was called). + close(events) + return + } + name := filepath.Base(e.Name) + if strings.HasPrefix(name, ".") { + continue // ignore hidden files+folders + } + if !f.accept(name) { + info, _ := os.Stat(e.Name) + if info != nil && info.IsDir() { + if err := f.watcher.Add(e.Name); err != nil { + f.logger.Warn("error adding folder", "folder", e.Name, "error", err) + } + } + continue + } + + f.timersMu.Lock() + t, ok := f.timers[e.Name] + if !ok { + nameCopy := e.Name + t = time.AfterFunc(math.MaxInt64, func() { + path, _ := strings.CutPrefix(nameCopy, f.prefix) + events <- path + + f.timersMu.Lock() + delete(f.timers, nameCopy) + f.timersMu.Unlock() + }) + f.timers[e.Name] = t + } + f.timersMu.Unlock() + t.Reset(f.waitFor) + } + } +} diff --git a/apps/provisioning/pkg/repository/local/watch_test.go b/apps/provisioning/pkg/repository/local/watch_test.go new file mode 100644 index 00000000000..ef271d345a9 --- /dev/null +++ b/apps/provisioning/pkg/repository/local/watch_test.go @@ -0,0 +1,84 @@ +package local + +import ( + "context" + "os" + "path/filepath" + "slices" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestWatch_TempFiles(t *testing.T) { + tmpdir := t.TempDir() + sub1 := filepath.Join(tmpdir, "sub1") + err := os.MkdirAll(sub1, 0700) + require.NoError(t, err) + + watcher, err := NewFileWatcher(tmpdir, func(name string) bool { + return filepath.Ext(name) == ".txt" + }) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + events := make(chan string, 10) + go func() { + watcher.Watch(ctx, events) + }() + + go func() { + time.Sleep(20 * time.Millisecond) + err = os.WriteFile(filepath.Join(tmpdir, "aaa.txt"), []byte("aaa"), 0600) + require.NoError(t, err) + err = os.WriteFile(filepath.Join(tmpdir, ".hidden.txt"), []byte("hidden"), 0600) + require.NoError(t, err) + err = os.WriteFile(filepath.Join(tmpdir, "bbb.txt"), []byte("bbb"), 0600) + require.NoError(t, err) + err = os.WriteFile(filepath.Join(tmpdir, "xxx.json"), []byte("ignore"), 0600) + require.NoError(t, err) + err = os.WriteFile(filepath.Join(sub1, "ccc.txt"), []byte("ccc"), 0600) + require.NoError(t, err) + + // make a sub folder + sub2 := filepath.Join(tmpdir, "sub2") + err = os.MkdirAll(sub2, 0700) + require.NoError(t, err) + time.Sleep(50 * time.Millisecond) + err = os.WriteFile(filepath.Join(sub2, "ddd.txt"), []byte("ddd"), 0600) + require.NoError(t, err) + + // Check all the paths we are watching + w, ok := watcher.(*fileWatcher) + require.True(t, ok, "explicit cast") + watching := w.watcher.WatchList() + slices.Sort(watching) + require.Equal(t, []string{ + tmpdir, sub1, sub2, + }, watching) + + // Removing the subfolder should trigger the event again + time.Sleep(time.Millisecond * 150) + err = os.RemoveAll(sub2) + require.NoError(t, err) + + // Finish all the events + time.Sleep(time.Millisecond * 250) + cancel() // stops the context + }() + + received := []string{} + for event := range events { + received = append(received, event) + } + slices.Sort(received) + + require.Equal(t, []string{ + "aaa.txt", + "bbb.txt", + "sub1/ccc.txt", + "sub2/ddd.txt", // first time because we added it + "sub2/ddd.txt", // second time because we removed it + }, received) +} diff --git a/pkg/services/provisioning/dashboards/file_reader.go b/pkg/services/provisioning/dashboards/file_reader.go index 73cd50fef90..e85f8ce1b69 100644 --- a/pkg/services/provisioning/dashboards/file_reader.go +++ b/pkg/services/provisioning/dashboards/file_reader.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "github.com/grafana/grafana/apps/provisioning/pkg/repository/local" "github.com/grafana/grafana/pkg/apimachinery/identity" "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/infra/log" @@ -80,12 +81,22 @@ func NewDashboardFileReader(cfg *config, log log.Logger, service dashboards.Dash // pollChanges periodically runs walkDisk based on interval specified in the config. func (fr *FileReader) pollChanges(ctx context.Context) { - ticker := time.NewTicker(time.Duration(int64(time.Second) * fr.Cfg.UpdateIntervalSeconds)) + interval := fr.Cfg.UpdateIntervalSeconds + if interval <= 10 { // the minimum time + err := fr.watchChanges(ctx) + if err == nil { + return // finished + } + fr.log.Warn("error watching folder: %w", err) + interval = 30 + } + + ticker := time.NewTicker(time.Duration(int64(time.Second) * interval)) for { select { case <-ticker.C: if err := fr.walkDisk(ctx); err != nil { - fr.log.Error("failed to search for dashboards", "error", err) + fr.log.Error("failed to walk provisioned dashboards", "error", err) } case <-ctx.Done(): return @@ -93,6 +104,41 @@ func (fr *FileReader) pollChanges(ctx context.Context) { } } +func (fr *FileReader) watchChanges(ctx context.Context) error { + watcher, err := local.NewFileWatcher(fr.resolvedPath(), func(name string) bool { + return strings.HasSuffix(name, ".json") + }) + if err != nil { + return err + } + + changed := false + events := make(chan string, 10) + go func() { + for { + select { + case <-ctx.Done(): + return + case _, ok := <-events: + // channel closed + if !ok { + return + } + changed = true + case <-time.After(time.Second * 5): // 5s maximum refresh + if changed { + if err := fr.walkDisk(ctx); err != nil { + fr.log.Error("failed to walk provisioned dashboards", "error", err) + } + changed = false + } + } + } + }() + watcher.Watch(ctx, events) + return nil +} + // walkDisk traverses the file system for the defined path, reading dashboard definition files, // and applies any change to the database. func (fr *FileReader) walkDisk(ctx context.Context) error {