Provisioning: Watch file system for changes (#112184)
* trigger sync on any change * better comments * add deletes to test * Update apps/provisioning/pkg/repository/local/watch.go * Update pkg/services/provisioning/dashboards/file_reader.go * Update apps/provisioning/pkg/repository/local/watch.go --------- Co-authored-by: Stephanie Hingtgen <stephanie.hingtgen@grafana.com>
This commit is contained in:
@@ -11,6 +11,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/apps/provisioning/pkg/repository/local"
|
||||
"github.com/grafana/grafana/pkg/apimachinery/identity"
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
@@ -80,12 +81,22 @@ func NewDashboardFileReader(cfg *config, log log.Logger, service dashboards.Dash
|
||||
|
||||
// pollChanges periodically runs walkDisk based on interval specified in the config.
|
||||
func (fr *FileReader) pollChanges(ctx context.Context) {
|
||||
ticker := time.NewTicker(time.Duration(int64(time.Second) * fr.Cfg.UpdateIntervalSeconds))
|
||||
interval := fr.Cfg.UpdateIntervalSeconds
|
||||
if interval <= 10 { // the minimum time
|
||||
err := fr.watchChanges(ctx)
|
||||
if err == nil {
|
||||
return // finished
|
||||
}
|
||||
fr.log.Warn("error watching folder: %w", err)
|
||||
interval = 30
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(time.Duration(int64(time.Second) * interval))
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if err := fr.walkDisk(ctx); err != nil {
|
||||
fr.log.Error("failed to search for dashboards", "error", err)
|
||||
fr.log.Error("failed to walk provisioned dashboards", "error", err)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
@@ -93,6 +104,41 @@ func (fr *FileReader) pollChanges(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
func (fr *FileReader) watchChanges(ctx context.Context) error {
|
||||
watcher, err := local.NewFileWatcher(fr.resolvedPath(), func(name string) bool {
|
||||
return strings.HasSuffix(name, ".json")
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
changed := false
|
||||
events := make(chan string, 10)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case _, ok := <-events:
|
||||
// channel closed
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
changed = true
|
||||
case <-time.After(time.Second * 5): // 5s maximum refresh
|
||||
if changed {
|
||||
if err := fr.walkDisk(ctx); err != nil {
|
||||
fr.log.Error("failed to walk provisioned dashboards", "error", err)
|
||||
}
|
||||
changed = false
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
watcher.Watch(ctx, events)
|
||||
return nil
|
||||
}
|
||||
|
||||
// walkDisk traverses the file system for the defined path, reading dashboard definition files,
|
||||
// and applies any change to the database.
|
||||
func (fr *FileReader) walkDisk(ctx context.Context) error {
|
||||
|
||||
Reference in New Issue
Block a user