Files
grafana/pkg/services/pluginsintegration/installsync/syncer.go
2025-12-17 16:25:54 +01:00

229 lines
6.8 KiB
Go

package installsync
import (
"context"
"time"
"github.com/grafana/dskit/services"
"github.com/grafana/grafana-app-sdk/logging"
"github.com/grafana/grafana-app-sdk/resource"
pluginsv0alpha1 "github.com/grafana/grafana/apps/plugins/pkg/apis/plugins/v0alpha1"
"github.com/grafana/grafana/apps/plugins/pkg/app/install"
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/configprovider"
"github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/services/apiserver"
"github.com/grafana/grafana/pkg/services/apiserver/client"
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/org"
"github.com/grafana/grafana/pkg/services/pluginsintegration/pluginstore"
)
const (
ServiceName = "plugins.installsync"
syncerLockActionName = "plugin-install-api-sync"
)
var (
lockTimeout = 10 * time.Minute
)
// Syncer is the interface for syncing plugin installations to the Kubernetes-style API.
type Syncer interface {
registry.BackgroundService
registry.CanBeDisabled
Sync(ctx context.Context, source install.Source, installedPlugins []pluginstore.Plugin) error
}
// ServerLock is the interface for acquiring distributed locks.
type ServerLock interface {
LockExecuteAndRelease(ctx context.Context, actionName string, maxInterval time.Duration, fn func(ctx context.Context)) error
}
type syncer struct {
services.NamedService
featureToggles featuremgmt.FeatureToggles
clientGenerator resource.ClientGenerator
installRegistrar *install.InstallRegistrar
orgService org.Service
namespaceMapper request.NamespaceMapper
serverLock ServerLock
restConfigProvider apiserver.RestConfigProvider
pluginsStoreService pluginstore.Store
}
var _ Syncer = (*syncer)(nil)
var _ registry.BackgroundService = (*syncer)(nil)
var _ registry.CanBeDisabled = (*syncer)(nil)
var _ services.NamedService = (*syncer)(nil)
// newSyncer creates a new syncer with the provided dependencies.
func newSyncer(
featureToggles featuremgmt.FeatureToggles,
clientGenerator resource.ClientGenerator,
installRegistrar *install.InstallRegistrar,
orgService org.Service,
namespaceMapper request.NamespaceMapper,
serverLock ServerLock,
restConfigProvider apiserver.RestConfigProvider,
pluginsStoreService pluginstore.Store,
) *syncer {
s := syncer{
clientGenerator: clientGenerator,
featureToggles: featureToggles,
installRegistrar: installRegistrar,
orgService: orgService,
namespaceMapper: namespaceMapper,
serverLock: serverLock,
restConfigProvider: restConfigProvider,
pluginsStoreService: pluginsStoreService,
}
s.NamedService = services.NewBasicService(nil, s.running, nil).WithName(ServiceName)
return &s
}
// ProvideSyncer creates a new Syncer for syncing plugin installations to the API.
func ProvideSyncer(
featureToggles featuremgmt.FeatureToggles,
clientGenerator resource.ClientGenerator,
orgService org.Service,
cfgProvider configprovider.ConfigProvider,
serverLock ServerLock,
restConfigProvider apiserver.RestConfigProvider,
pluginsStoreService pluginstore.Store,
) (Syncer, error) {
cfg, err := cfgProvider.Get(context.Background())
if err != nil {
return nil, err
}
installRegistrar := install.NewInstallRegistrar(clientGenerator)
namespaceMapper := request.GetNamespaceMapper(cfg)
return newSyncer(
featureToggles,
clientGenerator,
installRegistrar,
orgService,
namespaceMapper,
serverLock,
restConfigProvider,
pluginsStoreService,
), nil
}
func (s *syncer) IsDisabled() bool {
//nolint:staticcheck // not yet migrated to OpenFeature
syncEnabled := s.featureToggles.IsEnabled(context.Background(), featuremgmt.FlagPluginInstallAPISync)
//nolint:staticcheck // not yet migrated to OpenFeature
serviceLoadingEnabled := s.featureToggles.IsEnabled(context.Background(), featuremgmt.FlagPluginStoreServiceLoading)
return !syncEnabled || !serviceLoadingEnabled
}
func (s *syncer) Run(ctx context.Context) error {
if err := s.StartAsync(ctx); err != nil {
return err
}
return s.AwaitTerminated(context.Background())
}
func (s *syncer) running(ctx context.Context) error {
ctxLog := logging.FromContext(ctx)
restConfig, err := s.restConfigProvider.GetRestConfig(ctx)
if err != nil {
return err
}
discoveryClient, err := client.NewDiscoveryClient(restConfig)
if err != nil {
ctxLog.Warn("Failed to create discovery client, skipping plugin sync", "error", err)
}
if err := discoveryClient.WaitForAvailability(ctx, pluginsv0alpha1.PluginKind().GroupVersionKind().GroupVersion()); err != nil {
ctxLog.Warn("Failed to wait for plugin API availability, skipping plugin sync", "error", err)
}
if err := s.Sync(ctx, install.SourcePluginStore, s.pluginsStoreService.Plugins(ctx)); err != nil {
ctxLog.Warn("Failed to sync plugins", "error", err)
}
<-ctx.Done()
return nil
}
func (s *syncer) Sync(ctx context.Context, source install.Source, installedPlugins []pluginstore.Plugin) error {
if s.IsDisabled() {
return nil
}
if len(installedPlugins) == 0 {
return nil
}
var syncErr error
lockErr := s.serverLock.LockExecuteAndRelease(ctx, syncerLockActionName, lockTimeout, func(ctx context.Context) {
syncErr = s.syncAllNamespaces(ctx, source, installedPlugins)
})
if lockErr != nil {
return lockErr
}
return syncErr
}
func (s *syncer) syncAllNamespaces(ctx context.Context, source install.Source, installedPlugins []pluginstore.Plugin) error {
orgs, err := s.orgService.Search(ctx, &org.SearchOrgsQuery{})
if err != nil {
return err
}
for _, org := range orgs {
ctx = identity.WithServiceIdentityForSingleNamespaceContext(ctx, s.namespaceMapper(org.ID))
err := s.syncNamespace(ctx, s.namespaceMapper(org.ID), source, installedPlugins)
if err != nil {
return err
}
}
return nil
}
func (s *syncer) syncNamespace(ctx context.Context, namespace string, source install.Source, installedPlugins []pluginstore.Plugin) error {
client, err := s.installRegistrar.GetClient()
if err != nil {
return err
}
apiPlugins, err := client.ListAll(ctx, namespace, resource.ListOptions{})
if err != nil {
return err
}
installedMap := make(map[string]struct{})
for _, p := range installedPlugins {
installedMap[p.ID] = struct{}{}
}
// unregister plugins that are not installed
for _, apiPlugin := range apiPlugins.Items {
if _, exists := installedMap[apiPlugin.Spec.Id]; !exists {
err := s.installRegistrar.Unregister(ctx, namespace, apiPlugin.Spec.Id, source)
if err != nil {
return err
}
}
}
// register plugins that are installed
for _, p := range installedPlugins {
err := s.installRegistrar.Register(ctx, namespace, &install.PluginInstall{
ID: p.ID,
Version: p.Info.Version,
Source: source,
})
if err != nil {
return err
}
}
return nil
}