From 1a8a19a9edeb2896f04409382eb72c95ebdc2eae Mon Sep 17 00:00:00 2001 From: Andres Martinez Gotor Date: Tue, 12 Sep 2023 10:34:12 +0200 Subject: [PATCH] Chore: Avoid stopping plugin (#74654) --- pkg/plugins/manager/fakes/fakes.go | 8 ++- pkg/plugins/manager/process/process.go | 57 ++++++++++----------- pkg/plugins/manager/process/process_test.go | 26 ++++++++++ 3 files changed, 60 insertions(+), 31 deletions(-) diff --git a/pkg/plugins/manager/fakes/fakes.go b/pkg/plugins/manager/fakes/fakes.go index 7e0914cbd8a..5a50d8e9ef2 100644 --- a/pkg/plugins/manager/fakes/fakes.go +++ b/pkg/plugins/manager/fakes/fakes.go @@ -496,13 +496,17 @@ type FakeBackendPlugin struct { Decommissioned bool Running bool + // ExitedCheckDoneOrStopped is used to signal that the Exited() or Stop() method has been called. + ExitedCheckDoneOrStopped chan struct{} + mutex sync.RWMutex backendplugin.Plugin } func NewFakeBackendPlugin(managed bool) *FakeBackendPlugin { return &FakeBackendPlugin{ - Managed: managed, + Managed: managed, + ExitedCheckDoneOrStopped: make(chan struct{}), } } @@ -519,6 +523,7 @@ func (p *FakeBackendPlugin) Stop(_ context.Context) error { defer p.mutex.Unlock() p.Running = false p.StopCount++ + go func() { p.ExitedCheckDoneOrStopped <- struct{}{} }() return nil } @@ -544,6 +549,7 @@ func (p *FakeBackendPlugin) IsManaged() bool { func (p *FakeBackendPlugin) Exited() bool { p.mutex.RLock() defer p.mutex.RUnlock() + go func() { p.ExitedCheckDoneOrStopped <- struct{}{} }() return !p.Running } diff --git a/pkg/plugins/manager/process/process.go b/pkg/plugins/manager/process/process.go index 5d727de4f9f..a663690bc4d 100644 --- a/pkg/plugins/manager/process/process.go +++ b/pkg/plugins/manager/process/process.go @@ -2,12 +2,15 @@ package process import ( "context" - "errors" "time" "github.com/grafana/grafana/pkg/plugins" ) +var ( + keepPluginAliveTickerDuration = time.Second * 1 +) + type Service struct{} func ProvideService() *Service { @@ -19,7 +22,7 @@ func (*Service) Start(ctx context.Context, p *plugins.Plugin) error { return nil } - if err := startPluginAndRestartKilledProcesses(ctx, p); err != nil { + if err := startPluginAndKeepItAlive(ctx, p); err != nil { return err } @@ -40,7 +43,7 @@ func (*Service) Stop(ctx context.Context, p *plugins.Plugin) error { return nil } -func startPluginAndRestartKilledProcesses(ctx context.Context, p *plugins.Plugin) error { +func startPluginAndKeepItAlive(ctx context.Context, p *plugins.Plugin) error { if err := p.Start(ctx); err != nil { return err } @@ -49,41 +52,35 @@ func startPluginAndRestartKilledProcesses(ctx context.Context, p *plugins.Plugin return nil } - go func(ctx context.Context, p *plugins.Plugin) { - if err := restartKilledProcess(ctx, p); err != nil { + go func(p *plugins.Plugin) { + if err := keepPluginAlive(p); err != nil { p.Logger().Error("Attempt to restart killed plugin process failed", "error", err) } - }(ctx, p) + }(p) return nil } -func restartKilledProcess(ctx context.Context, p *plugins.Plugin) error { - ticker := time.NewTicker(time.Second * 1) +// keepPluginAlive will restart the plugin if the process is killed or exits +func keepPluginAlive(p *plugins.Plugin) error { + ticker := time.NewTicker(keepPluginAliveTickerDuration) for { - select { - case <-ctx.Done(): - if err := ctx.Err(); err != nil && !errors.Is(err, context.Canceled) { - return err - } - return p.Stop(ctx) - case <-ticker.C: - if p.IsDecommissioned() { - p.Logger().Debug("Plugin decommissioned") - return nil - } - - if !p.Exited() { - continue - } - - p.Logger().Debug("Restarting plugin") - if err := p.Start(ctx); err != nil { - p.Logger().Error("Failed to restart plugin", "error", err) - continue - } - p.Logger().Debug("Plugin restarted") + <-ticker.C + if p.IsDecommissioned() { + p.Logger().Debug("Plugin decommissioned") + return nil } + + if !p.Exited() { + continue + } + + p.Logger().Debug("Restarting plugin") + if err := p.Start(context.Background()); err != nil { + p.Logger().Error("Failed to restart plugin", "error", err) + continue + } + p.Logger().Debug("Plugin restarted") } } diff --git a/pkg/plugins/manager/process/process_test.go b/pkg/plugins/manager/process/process_test.go index 7dd7cedf36a..db9641dcc25 100644 --- a/pkg/plugins/manager/process/process_test.go +++ b/pkg/plugins/manager/process/process_test.go @@ -4,6 +4,7 @@ import ( "context" "sync" "testing" + "time" "github.com/stretchr/testify/require" @@ -71,6 +72,31 @@ func TestProcessManager_Start(t *testing.T) { }) } }) + + t.Run("Won't stop the plugin if the context is cancelled", func(t *testing.T) { + bp := fakes.NewFakeBackendPlugin(true) + p := createPlugin(t, bp, func(plugin *plugins.Plugin) { + plugin.Backend = true + }) + + tickerDuration := keepPluginAliveTickerDuration + keepPluginAliveTickerDuration = 1 * time.Millisecond + defer func() { + keepPluginAliveTickerDuration = tickerDuration + }() + + m := &Service{} + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + err := m.Start(ctx, p) + require.NoError(t, err) + require.Equal(t, 1, bp.StartCount) + cancel() + + <-bp.ExitedCheckDoneOrStopped + require.False(t, p.Exited()) + require.Equal(t, 0, bp.StopCount) + }) } func TestProcessManager_Stop(t *testing.T) {