From e1dfec49f94471faaef5a4eef68159acb7f685a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Philippe=20Qu=C3=A9m=C3=A9ner?= Date: Tue, 12 Oct 2021 12:05:02 +0200 Subject: [PATCH] Alerting: cleanup alert resources on org removal (#39938) --- pkg/services/ngalert/notifier/file_store.go | 51 ++++++++++--------- .../ngalert/notifier/multiorg_alertmanager.go | 41 +++++++++++++++ .../notifier/multiorg_alertmanager_test.go | 35 +++++++++++++ pkg/services/sqlstore/org.go | 11 ++++ 4 files changed, 113 insertions(+), 25 deletions(-) diff --git a/pkg/services/ngalert/notifier/file_store.go b/pkg/services/ngalert/notifier/file_store.go index 98c3759f260..42ec6e207bf 100644 --- a/pkg/services/ngalert/notifier/file_store.go +++ b/pkg/services/ngalert/notifier/file_store.go @@ -8,6 +8,7 @@ import ( "path/filepath" "github.com/grafana/grafana/pkg/infra/kvstore" + "github.com/grafana/grafana/pkg/infra/log" ) const KVNamespace = "alertmanager" @@ -24,6 +25,7 @@ type FileStore struct { kv *kvstore.NamespacedKVStore orgID int64 workingDirPath string + logger log.Logger } func NewFileStore(orgID int64, store kvstore.KVStore, workingDirPath string) *FileStore { @@ -31,6 +33,7 @@ func NewFileStore(orgID int64, store kvstore.KVStore, workingDirPath string) *Fi workingDirPath: workingDirPath, orgID: orgID, kv: kvstore.WithNamespace(store, orgID, KVNamespace), + logger: log.New("filestore", "org", orgID), } } @@ -38,22 +41,16 @@ func NewFileStore(orgID int64, store kvstore.KVStore, workingDirPath string) *Fi // If the file is already present on disk it no-ops. // If not, it tries to read the database and if there's no file it no-ops. // If there is a file in the database, it decodes it and writes to disk for Alertmanager consumption. -func (fs *FileStore) FilepathFor(ctx context.Context, filename string) (string, error) { - // If a file is already present, we'll use that one and eventually save it to the database. - // We don't need to do anything else. - if fs.IsExists(filename) { - return fs.pathFor(filename), nil - } - +func (fileStore *FileStore) FilepathFor(ctx context.Context, filename string) (string, error) { // Then, let's attempt to read it from the database. - content, exists, err := fs.kv.Get(ctx, filename) + content, exists, err := fileStore.kv.Get(ctx, filename) if err != nil { return "", fmt.Errorf("error reading file '%s' from database: %w", filename, err) } // if it doesn't exist, let's no-op and let the Alertmanager create one. We'll eventually save it to the database. if !exists { - return fs.pathFor(filename), nil + return fileStore.pathFor(filename), nil } // If we have a file stored in the database, let's decode it and write it to disk to perform that initial load to memory. @@ -62,15 +59,15 @@ func (fs *FileStore) FilepathFor(ctx context.Context, filename string) (string, return "", fmt.Errorf("error decoding file '%s': %w", filename, err) } - if err := fs.WriteFileToDisk(filename, bytes); err != nil { + if err := fileStore.WriteFileToDisk(filename, bytes); err != nil { return "", fmt.Errorf("error writing file %s: %w", filename, err) } - return fs.pathFor(filename), err + return fileStore.pathFor(filename), err } // Persist takes care of persisting the binary representation of internal state to the database as a base64 encoded string. -func (fs *FileStore) Persist(ctx context.Context, filename string, st State) (int64, error) { +func (fileStore *FileStore) Persist(ctx context.Context, filename string, st State) (int64, error) { var size int64 bytes, err := st.MarshalBinary() @@ -78,32 +75,36 @@ func (fs *FileStore) Persist(ctx context.Context, filename string, st State) (in return size, err } - if err = fs.kv.Set(ctx, filename, encode(bytes)); err != nil { + if err = fileStore.kv.Set(ctx, filename, encode(bytes)); err != nil { return size, err } return int64(len(bytes)), err } -// IsExists verifies if the file exists or not. -func (fs *FileStore) IsExists(fn string) bool { - _, err := os.Stat(fs.pathFor(fn)) - return os.IsExist(err) -} - // WriteFileToDisk writes a file with the provided name and contents to the Alertmanager working directory with the default grafana permission. -func (fs *FileStore) WriteFileToDisk(fn string, content []byte) error { +func (fileStore *FileStore) WriteFileToDisk(fn string, content []byte) error { // Ensure the working directory is created - err := os.MkdirAll(fs.workingDirPath, 0750) + err := os.MkdirAll(fileStore.workingDirPath, 0750) if err != nil { - return fmt.Errorf("unable to create the working directory %q: %s", fs.workingDirPath, err) + return fmt.Errorf("unable to create the working directory %q: %s", fileStore.workingDirPath, err) } - return os.WriteFile(fs.pathFor(fn), content, 0644) + return os.WriteFile(fileStore.pathFor(fn), content, 0644) } -func (fs *FileStore) pathFor(fn string) string { - return filepath.Join(fs.workingDirPath, fn) +// CleanUp will remove the working directory from disk. +func (fileStore *FileStore) CleanUp() { + if err := os.RemoveAll(fileStore.workingDirPath); err != nil { + fileStore.logger.Warn("unable to delete the local working directory", "dir", fileStore.workingDirPath, + "err", err) + return + } + fileStore.logger.Info("successfully deleted working directory", "dir", fileStore.workingDirPath) +} + +func (fileStore *FileStore) pathFor(fn string) string { + return filepath.Join(fileStore.workingDirPath, fn) } func decode(s string) ([]byte, error) { diff --git a/pkg/services/ngalert/notifier/multiorg_alertmanager.go b/pkg/services/ngalert/notifier/multiorg_alertmanager.go index 4f2aa506c71..304b4f7b306 100644 --- a/pkg/services/ngalert/notifier/multiorg_alertmanager.go +++ b/pkg/services/ngalert/notifier/multiorg_alertmanager.go @@ -3,6 +3,9 @@ package notifier import ( "context" "fmt" + "io/ioutil" + "path/filepath" + "strconv" "sync" "time" @@ -213,6 +216,44 @@ func (moa *MultiOrgAlertmanager) SyncAlertmanagersForOrgs(ctx context.Context, o moa.logger.Info("stopping Alertmanager", "org", orgID) am.StopAndWait() moa.logger.Info("stopped Alertmanager", "org", orgID) + // Cleanup all the remaining resources from this alertmanager. + am.fileStore.CleanUp() + } + + // We look for orphan directories and remove them. Orphan directories can + // occur when an organization is deleted and the node running Grafana is + // shutdown before the next sync is executed. + moa.cleanupOrphanLocalOrgState(orgsFound) +} + +// cleanupOrphanLocalOrgState will check if there is any organization on +// disk that is not part of the active organizations. If this is the case +// it will delete the local state from disk. +func (moa *MultiOrgAlertmanager) cleanupOrphanLocalOrgState(activeOrganizations map[int64]struct{}) { + dataDir := filepath.Join(moa.settings.DataPath, workingDir) + files, err := ioutil.ReadDir(dataDir) + if err != nil { + moa.logger.Error("failed to list local working directory", "dir", dataDir, "err", err) + return + } + for _, file := range files { + if !file.IsDir() { + moa.logger.Warn("ignoring unexpected file while scanning local working directory", "filename", filepath.Join(dataDir, file.Name())) + continue + } + orgID, err := strconv.ParseInt(file.Name(), 10, 64) + if err != nil { + moa.logger.Error("unable to parse orgID from directory name", "name", file.Name(), "err", err) + continue + } + _, exists := activeOrganizations[orgID] + if !exists { + moa.logger.Info("found orphan organization directory", "orgID", orgID) + workingDirPath := filepath.Join(dataDir, strconv.FormatInt(orgID, 10)) + fileStore := NewFileStore(orgID, moa.kvStore, workingDirPath) + // Cleanup all the remaining resources from this alertmanager. + fileStore.CleanUp() + } } } diff --git a/pkg/services/ngalert/notifier/multiorg_alertmanager_test.go b/pkg/services/ngalert/notifier/multiorg_alertmanager_test.go index d2b1331b9aa..3b57b048bbc 100644 --- a/pkg/services/ngalert/notifier/multiorg_alertmanager_test.go +++ b/pkg/services/ngalert/notifier/multiorg_alertmanager_test.go @@ -3,8 +3,11 @@ package notifier import ( "bytes" "context" + "errors" + "io/fs" "io/ioutil" "os" + "path/filepath" "testing" "time" @@ -93,6 +96,38 @@ grafana_alerting_discovered_configurations 4 require.NoError(t, mam.LoadAndSyncAlertmanagersForOrgs(ctx)) require.Len(t, mam.alertmanagers, 4) } + + // Orphaned local state should be removed. + { + // First we create a directory and two files for an ograniztation that + // is not existing in the current state. + orphanDir := filepath.Join(tmpDir, "alerting", "6") + err := os.Mkdir(orphanDir, 0750) + require.NoError(t, err) + + silencesPath := filepath.Join(orphanDir, silencesFilename) + err = os.WriteFile(silencesPath, []byte("file_1"), 0644) + require.NoError(t, err) + + notificationPath := filepath.Join(orphanDir, notificationLogFilename) + err = os.WriteFile(notificationPath, []byte("file_2"), 0644) + require.NoError(t, err) + + // We make sure that both files are on disk. + info, err := os.Stat(silencesPath) + require.NoError(t, err) + require.Equal(t, info.Name(), silencesFilename) + info, err = os.Stat(notificationPath) + require.NoError(t, err) + require.Equal(t, info.Name(), notificationLogFilename) + + // Now re run the sync job once. + require.NoError(t, mam.LoadAndSyncAlertmanagersForOrgs(ctx)) + + // The organization directory should be gone by now. + _, err = os.Stat(orphanDir) + require.True(t, errors.Is(err, fs.ErrNotExist)) + } } func TestMultiOrgAlertmanager_AlertmanagerFor(t *testing.T) { diff --git a/pkg/services/sqlstore/org.go b/pkg/services/sqlstore/org.go index d190068590e..e16a369b02d 100644 --- a/pkg/services/sqlstore/org.go +++ b/pkg/services/sqlstore/org.go @@ -241,6 +241,17 @@ func DeleteOrg(cmd *models.DeleteOrgCommand) error { "DELETE FROM org_user WHERE org_id = ?", "DELETE FROM org WHERE id = ?", "DELETE FROM temp_user WHERE org_id = ?", + "DELETE FROM ngalert_configuration WHERE org_id = ?", + "DELETE FROM alert_configuration WHERE org_id = ?", + "DELETE FROM alert_instance WHERE rule_org_id = ?", + "DELETE FROM alert_notification WHERE org_id = ?", + "DELETE FROM alert_notification_state WHERE org_id = ?", + "DELETE FROM alert_rule WHERE org_id = ?", + "DELETE FROM alert_rule_tag WHERE EXISTS (SELECT 1 FROM alert WHERE alert.org_id = ? AND alert.id = alert_rule_tag.alert_id)", + "DELETE FROM alert_rule_version WHERE rule_org_id = ?", + "DELETE FROM alert WHERE org_id = ?", + "DELETE FROM annotation WHERE org_id = ?", + "DELETE FROM kv_store WHERE org_id = ?", } for _, sql := range deletes {