Plugins: Add synchronous CDN plugin loader (#99096)
* WIP * Run plugin validations and validation steps sequentially if feature is off * Remove dependency between sources.Service and pluginscdn.Service * lint * Parallelize validation only if class is CDN * re-generate feature toggles * remove waitgroup usage * PR review: Add loader concurrency limit setting * re-generate feature toggles * pr review feedback * fix const name * Skip module.js validation for cdn plugins * do not run validation steps in parallel * lint * reduce diff * re-generate feature toggles * lint * pr review feedback * remove leftover config.PluginManagementCfg from sources.Service
This commit is contained in:
@@ -8,6 +8,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
pluginsCfg "github.com/grafana/grafana/pkg/plugins/config"
|
||||
"github.com/grafana/grafana/pkg/plugins/log"
|
||||
"github.com/grafana/grafana/pkg/plugins/manager/pipeline/bootstrap"
|
||||
"github.com/grafana/grafana/pkg/plugins/manager/pipeline/discovery"
|
||||
@@ -17,7 +18,10 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/pluginsintegration/pluginerrs"
|
||||
)
|
||||
|
||||
const concurrencyLimit = 32
|
||||
|
||||
type Loader struct {
|
||||
cfg *pluginsCfg.PluginManagementCfg
|
||||
discovery discovery.Discoverer
|
||||
bootstrap bootstrap.Bootstrapper
|
||||
initializer initialization.Initializer
|
||||
@@ -27,9 +31,13 @@ type Loader struct {
|
||||
log log.Logger
|
||||
}
|
||||
|
||||
func New(discovery discovery.Discoverer, bootstrap bootstrap.Bootstrapper, validation validation.Validator,
|
||||
initializer initialization.Initializer, termination termination.Terminator, errorTracker pluginerrs.ErrorTracker) *Loader {
|
||||
func New(
|
||||
cfg *pluginsCfg.PluginManagementCfg,
|
||||
discovery discovery.Discoverer, bootstrap bootstrap.Bootstrapper, validation validation.Validator,
|
||||
initializer initialization.Initializer, termination termination.Terminator, errorTracker pluginerrs.ErrorTracker,
|
||||
) *Loader {
|
||||
return &Loader{
|
||||
cfg: cfg,
|
||||
discovery: discovery,
|
||||
bootstrap: bootstrap,
|
||||
validation: validation,
|
||||
@@ -55,11 +63,14 @@ func (l *Loader) recordError(ctx context.Context, p *plugins.Plugin, err error)
|
||||
func (l *Loader) Load(ctx context.Context, src plugins.PluginSource) ([]*plugins.Plugin, error) {
|
||||
end := l.instrumentLoad(ctx, src)
|
||||
|
||||
st := time.Now()
|
||||
discoveredPlugins, err := l.discovery.Discover(ctx, src)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
l.log.Debug("Discovered", "class", src.PluginClass(ctx), "duration", time.Since(st))
|
||||
|
||||
st = time.Now()
|
||||
bootstrappedPlugins := []*plugins.Plugin{}
|
||||
for _, foundBundle := range discoveredPlugins {
|
||||
bootstrappedPlugin, err := l.bootstrap.Bootstrap(ctx, src, foundBundle)
|
||||
@@ -72,17 +83,47 @@ func (l *Loader) Load(ctx context.Context, src plugins.PluginSource) ([]*plugins
|
||||
}
|
||||
bootstrappedPlugins = append(bootstrappedPlugins, bootstrappedPlugin...)
|
||||
}
|
||||
l.log.Debug("Bootstrapped", "class", src.PluginClass(ctx), "duration", time.Since(st))
|
||||
|
||||
st = time.Now()
|
||||
validatedPlugins := []*plugins.Plugin{}
|
||||
type validateResult struct {
|
||||
bootstrappedPlugin *plugins.Plugin
|
||||
err error
|
||||
}
|
||||
validateResults := make(chan validateResult, len(bootstrappedPlugins))
|
||||
|
||||
// If the PluginsCDNSyncLoaderEnabled feature is enabled, validate plugins in parallel.
|
||||
// Otherwise, validate plugins sequentially.
|
||||
var limitSize int
|
||||
if l.cfg.Features.PluginsCDNSyncLoaderEnabled && src.PluginClass(ctx) == plugins.ClassCDN {
|
||||
limitSize = min(len(bootstrappedPlugins), concurrencyLimit)
|
||||
} else {
|
||||
limitSize = 1
|
||||
}
|
||||
limit := make(chan struct{}, limitSize)
|
||||
for _, bootstrappedPlugin := range bootstrappedPlugins {
|
||||
err := l.validation.Validate(ctx, bootstrappedPlugin)
|
||||
if err != nil {
|
||||
l.recordError(ctx, bootstrappedPlugin, err)
|
||||
limit <- struct{}{}
|
||||
go func(p *plugins.Plugin) {
|
||||
err := l.validation.Validate(ctx, p)
|
||||
validateResults <- validateResult{
|
||||
bootstrappedPlugin: bootstrappedPlugin,
|
||||
err: err,
|
||||
}
|
||||
<-limit
|
||||
}(bootstrappedPlugin)
|
||||
}
|
||||
for i := 0; i < len(bootstrappedPlugins); i++ {
|
||||
r := <-validateResults
|
||||
if r.err != nil {
|
||||
l.recordError(ctx, r.bootstrappedPlugin, r.err)
|
||||
continue
|
||||
}
|
||||
validatedPlugins = append(validatedPlugins, bootstrappedPlugin)
|
||||
validatedPlugins = append(validatedPlugins, r.bootstrappedPlugin)
|
||||
}
|
||||
l.log.Debug("Validated", "class", src.PluginClass(ctx), "duration", time.Since(st), "total", len(validatedPlugins))
|
||||
|
||||
st = time.Now()
|
||||
initializedPlugins := []*plugins.Plugin{}
|
||||
for _, validatedPlugin := range validatedPlugins {
|
||||
initializedPlugin, err := l.initializer.Initialize(ctx, validatedPlugin)
|
||||
@@ -92,6 +133,7 @@ func (l *Loader) Load(ctx context.Context, src plugins.PluginSource) ([]*plugins
|
||||
}
|
||||
initializedPlugins = append(initializedPlugins, initializedPlugin)
|
||||
}
|
||||
l.log.Debug("Initialized", "class", src.PluginClass(ctx), "duration", time.Since(st))
|
||||
|
||||
// Clean errors from registry for initialized plugins
|
||||
for _, p := range initializedPlugins {
|
||||
|
||||
@@ -58,6 +58,7 @@ func TestLoader_Load(t *testing.T) {
|
||||
t.Errorf("could not construct absolute path of current dir")
|
||||
return
|
||||
}
|
||||
zeroCfg := &config.PluginManagementCfg{}
|
||||
tests := []struct {
|
||||
name string
|
||||
class plugins.Class
|
||||
@@ -420,7 +421,7 @@ func TestLoader_Load(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
et := pluginerrs.ProvideErrorTracker()
|
||||
|
||||
l := New(discovery.New(tt.cfg, discovery.Opts{}), bootstrap.New(tt.cfg, bootstrap.Opts{}),
|
||||
l := New(zeroCfg, discovery.New(tt.cfg, discovery.Opts{}), bootstrap.New(tt.cfg, bootstrap.Opts{}),
|
||||
validation.New(tt.cfg, validation.Opts{}), initialization.New(tt.cfg, initialization.Opts{}),
|
||||
terminationStage, et)
|
||||
|
||||
@@ -455,6 +456,7 @@ func TestLoader_Load(t *testing.T) {
|
||||
|
||||
var steps []string
|
||||
l := New(
|
||||
zeroCfg,
|
||||
&fakes.FakeDiscoverer{
|
||||
DiscoverFunc: func(ctx context.Context, s plugins.PluginSource) ([]*plugins.FoundBundle, error) {
|
||||
require.Equal(t, src, s)
|
||||
@@ -512,6 +514,7 @@ func TestLoader_Load(t *testing.T) {
|
||||
|
||||
var steps []string
|
||||
l := New(
|
||||
zeroCfg,
|
||||
&fakes.FakeDiscoverer{
|
||||
DiscoverFunc: func(ctx context.Context, s plugins.PluginSource) ([]*plugins.FoundBundle, error) {
|
||||
require.Equal(t, src, s)
|
||||
@@ -574,6 +577,7 @@ func TestLoader_Load(t *testing.T) {
|
||||
|
||||
var steps []string
|
||||
l := New(
|
||||
zeroCfg,
|
||||
&fakes.FakeDiscoverer{
|
||||
DiscoverFunc: func(ctx context.Context, s plugins.PluginSource) ([]*plugins.FoundBundle, error) {
|
||||
require.Equal(t, src, s)
|
||||
@@ -629,7 +633,9 @@ func TestLoader_Unload(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, tc := range tcs {
|
||||
l := New(&fakes.FakeDiscoverer{},
|
||||
l := New(
|
||||
&config.PluginManagementCfg{},
|
||||
&fakes.FakeDiscoverer{},
|
||||
&fakes.FakeBootstrapper{},
|
||||
&fakes.FakeValidator{},
|
||||
&fakes.FakeInitializer{},
|
||||
|
||||
Reference in New Issue
Block a user