diff --git a/conf/defaults.ini b/conf/defaults.ini index 319c7f4d8ab..5ca7494f1ec 100644 --- a/conf/defaults.ini +++ b/conf/defaults.ini @@ -901,6 +901,17 @@ plugin_catalog_url = https://grafana.com/grafana/plugins/ # tuning. 0 disables Live, -1 means unlimited connections. max_connections = 100 +# engine defines an HA (high availability) engine to use for Grafana Live. By default no engine used - in +# this case Live features work only on a single Grafana server. +# Available options: "redis". +# Setting ha_engine is an EXPERIMENTAL feature. +ha_engine = + +# ha_engine_address sets a connection address for Live HA engine. Depending on engine type address format can differ. +# For now we only support Redis connection address in "host:port" format. +# This option is EXPERIMENTAL. +ha_engine_address = "127.0.0.1:6379" + #################################### Grafana Image Renderer Plugin ########################## [plugin.grafana-image-renderer] # Instruct headless browser instance to use a default timezone when not provided by Grafana, e.g. when rendering panel image of alert. diff --git a/conf/sample.ini b/conf/sample.ini index e61f94ca5d1..3930f1764d7 100644 --- a/conf/sample.ini +++ b/conf/sample.ini @@ -887,6 +887,16 @@ # tuning. 0 disables Live, -1 means unlimited connections. ;max_connections = 100 +# engine defines an HA (high availability) engine to use for Grafana Live. By default no engine used - in +# this case Live features work only on a single Grafana server. Available options: "redis". +# Setting ha_engine is an EXPERIMENTAL feature. +;ha_engine = + +# ha_engine_address sets a connection address for Live HA engine. Depending on engine type address format can differ. +# For now we only support Redis connection address in "host:port" format. +# This option is EXPERIMENTAL. +;ha_engine_address = "127.0.0.1:6379" + #################################### Grafana Image Renderer Plugin ########################## [plugin.grafana-image-renderer] # Instruct headless browser instance to use a default timezone when not provided by Grafana, e.g. when rendering panel image of alert. diff --git a/go.mod b/go.mod index 125ae067671..9e0ac60b30b 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/beevik/etree v1.1.0 github.com/benbjohnson/clock v1.1.0 github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b - github.com/centrifugal/centrifuge v0.17.0 + github.com/centrifugal/centrifuge v0.17.1 github.com/cortexproject/cortex v1.8.2-0.20210428155238-d382e1d80eaf github.com/crewjam/saml v0.4.6-0.20201227203850-bca570abb2ce github.com/davecgh/go-spew v1.1.1 diff --git a/go.sum b/go.sum index 52c53419d59..fc4ca881eb2 100644 --- a/go.sum +++ b/go.sum @@ -292,8 +292,8 @@ github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm github.com/cenkalti/backoff/v4 v4.1.0/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= -github.com/centrifugal/centrifuge v0.17.0 h1:ANZMhcR8pFbRUPdv45nrIhhZcsSOdtshT3YM4v1/NHY= -github.com/centrifugal/centrifuge v0.17.0/go.mod h1:AEFs3KPGRpvX1jCe24NDlGWQu7DPa7vdzeY/aUluOm0= +github.com/centrifugal/centrifuge v0.17.1 h1:UWORzEE5SEhJSy8omW50AVKbpKaqDBpUFp3kDnMgsXs= +github.com/centrifugal/centrifuge v0.17.1/go.mod h1:AEFs3KPGRpvX1jCe24NDlGWQu7DPa7vdzeY/aUluOm0= github.com/centrifugal/protocol v0.5.0 h1:h71u2Q53yhplftmUk1tjc+Mu6TKJ/eO3YRD3h7Qjvj4= github.com/centrifugal/protocol v0.5.0/go.mod h1:ru2N4pwiND/jE+XLtiLYbUo3YmgqgniGNW9f9aRgoVI= github.com/certifi/gocertifi v0.0.0-20191021191039-0944d244cd40/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA= diff --git a/pkg/services/live/live.go b/pkg/services/live/live.go index 9d1b08d652b..ded3d3c45f8 100644 --- a/pkg/services/live/live.go +++ b/pkg/services/live/live.go @@ -2,12 +2,10 @@ package live import ( "context" - "encoding/json" "errors" "fmt" "net/http" "net/url" - "strconv" "strings" "sync" "time" @@ -26,10 +24,12 @@ import ( "github.com/grafana/grafana/pkg/services/live/database" "github.com/grafana/grafana/pkg/services/live/features" "github.com/grafana/grafana/pkg/services/live/livecontext" + "github.com/grafana/grafana/pkg/services/live/liveplugin" "github.com/grafana/grafana/pkg/services/live/managedstream" "github.com/grafana/grafana/pkg/services/live/orgchannel" "github.com/grafana/grafana/pkg/services/live/pushws" "github.com/grafana/grafana/pkg/services/live/runstream" + "github.com/grafana/grafana/pkg/services/live/survey" "github.com/grafana/grafana/pkg/services/sqlstore" "github.com/grafana/grafana/pkg/services/sqlstore/migrator" "github.com/grafana/grafana/pkg/setting" @@ -38,8 +38,8 @@ import ( "github.com/centrifugal/centrifuge" "github.com/grafana/grafana-plugin-sdk-go/backend" - "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana-plugin-sdk-go/live" + "gopkg.in/redis.v5" ) var ( @@ -84,7 +84,8 @@ type GrafanaLive struct { DatasourceCache datasources.CacheService `inject:""` SQLStore *sqlstore.SQLStore `inject:""` - node *centrifuge.Node + node *centrifuge.Node + surveyCaller *survey.Caller // Websocket handlers websocketHandler interface{} @@ -99,7 +100,7 @@ type GrafanaLive struct { ManagedStreamRunner *managedstream.Runner - contextGetter *pluginContextGetter + contextGetter *liveplugin.ContextGetter runStreamManager *runstream.Manager storage *database.Storage } @@ -128,17 +129,22 @@ func (g *GrafanaLive) AddMigration(mg *migrator.Migrator) { func (g *GrafanaLive) Run(ctx context.Context) error { if g.runStreamManager != nil { // Only run stream manager if GrafanaLive properly initialized. - return g.runStreamManager.Run(ctx) + _ = g.runStreamManager.Run(ctx) + return g.node.Shutdown(context.Background()) } return nil } var clientConcurrency = 8 +func (g *GrafanaLive) IsHA() bool { + return g.Cfg != nil && g.Cfg.LiveHAEngine != "" +} + // Init initializes Live service. // Required to implement the registry.Service interface. func (g *GrafanaLive) Init() error { - logger.Debug("GrafanaLive initialization") + logger.Debug("GrafanaLive initialization", "ha", g.IsHA()) // We use default config here as starting point. Default config contains // reasonable values for available options. @@ -158,10 +164,57 @@ func (g *GrafanaLive) Init() error { } g.node = node - g.contextGetter = newPluginContextGetter(g.PluginContextProvider) - channelSender := newPluginChannelSender(node) - presenceGetter := newPluginPresenceGetter(node) - g.runStreamManager = runstream.NewManager(channelSender, presenceGetter, g.contextGetter) + if g.IsHA() { + // Configure HA with Redis. In this case Centrifuge nodes + // will be connected over Redis PUB/SUB. Presence will work + // globally since kept inside Redis. + redisAddress := g.Cfg.LiveHAEngineAddress + redisShardConfigs := []centrifuge.RedisShardConfig{ + {Address: redisAddress}, + } + var redisShards []*centrifuge.RedisShard + for _, redisConf := range redisShardConfigs { + redisShard, err := centrifuge.NewRedisShard(node, redisConf) + if err != nil { + return fmt.Errorf("error connecting to Live Redis: %v", err) + } + redisShards = append(redisShards, redisShard) + } + + broker, err := centrifuge.NewRedisBroker(node, centrifuge.RedisBrokerConfig{ + Prefix: "gf_live", + + // We are using Redis streams here for history. Require Redis >= 5. + UseStreams: true, + + // Use reasonably large expiration interval for stream meta key, + // much bigger than maximum HistoryLifetime value in Node config. + // This way stream meta data will expire, in some cases you may want + // to prevent its expiration setting this to zero value. + HistoryMetaTTL: 7 * 24 * time.Hour, + + // And configure a couple of shards to use. + Shards: redisShards, + }) + if err != nil { + return fmt.Errorf("error creating Live Redis broker: %v", err) + } + node.SetBroker(broker) + + presenceManager, err := centrifuge.NewRedisPresenceManager(node, centrifuge.RedisPresenceManagerConfig{ + Prefix: "gf_live", + Shards: redisShards, + }) + if err != nil { + return fmt.Errorf("error creating Live Redis presence manager: %v", err) + } + node.SetPresenceManager(presenceManager) + } + + g.contextGetter = liveplugin.NewContextGetter(g.PluginContextProvider) + channelLocalPublisher := liveplugin.NewChannelLocalPublisher(node) + numLocalSubscribersGetter := liveplugin.NewNumLocalSubscribersGetter(node) + g.runStreamManager = runstream.NewManager(channelLocalPublisher, numLocalSubscribersGetter, g.contextGetter) // Initialize the main features dash := &features.DashboardHandler{ @@ -173,7 +226,32 @@ func (g *GrafanaLive) Init() error { g.GrafanaScope.Features["dashboard"] = dash g.GrafanaScope.Features["broadcast"] = features.NewBroadcastRunner(g.storage) - g.ManagedStreamRunner = managedstream.NewRunner(g.Publish) + var managedStreamRunner *managedstream.Runner + if g.IsHA() { + redisClient := redis.NewClient(&redis.Options{ + Addr: g.Cfg.LiveHAEngineAddress, + }) + cmd := redisClient.Ping() + if _, err := cmd.Result(); err != nil { + return fmt.Errorf("error pinging Redis: %v", err) + } + managedStreamRunner = managedstream.NewRunner( + g.Publish, + managedstream.NewRedisFrameCache(redisClient), + ) + } else { + managedStreamRunner = managedstream.NewRunner( + g.Publish, + managedstream.NewMemoryFrameCache(), + ) + } + + g.ManagedStreamRunner = managedStreamRunner + g.surveyCaller = survey.NewCaller(managedStreamRunner, node) + err = g.surveyCaller.SetupHandlers() + if err != nil { + return err + } // Set ConnectHandler called when client successfully connected to Node. Your code // inside handler must be synchronized since it will be called concurrently from @@ -222,6 +300,9 @@ func (g *GrafanaLive) Init() error { reason := "normal" if e.Disconnect != nil { reason = e.Disconnect.Reason + if e.Disconnect.Code == 3001 { // Shutdown + return + } } logger.Debug("Client disconnected", "user", client.UserID(), "client", client.ID(), "reason", reason, "elapsed", time.Since(connectedAt)) }) @@ -581,14 +662,7 @@ func (g *GrafanaLive) handleStreamScope(u *models.SignedInUser, namespace string func (g *GrafanaLive) handleDatasourceScope(user *models.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) { ds, err := g.DatasourceCache.GetDatasourceByUID(namespace, user, false) if err != nil { - // the namespace may be an ID - id, _ := strconv.ParseInt(namespace, 10, 64) - if id > 0 { - ds, err = g.DatasourceCache.GetDatasource(id, user, false) - } - if err != nil { - return nil, fmt.Errorf("error getting datasource: %w", err) - } + return nil, fmt.Errorf("error getting datasource: %w", err) } streamHandler, err := g.getStreamPlugin(ds.Type) if err != nil { @@ -603,13 +677,13 @@ func (g *GrafanaLive) handleDatasourceScope(user *models.SignedInUser, namespace ), nil } -// Publish sends the data to the channel without checking permissions etc +// Publish sends the data to the channel without checking permissions etc. func (g *GrafanaLive) Publish(orgID int64, channel string, data []byte) error { _, err := g.node.Publish(orgchannel.PrependOrgID(orgID, channel), data) return err } -// ClientCount returns the number of clients +// ClientCount returns the number of clients. func (g *GrafanaLive) ClientCount(orgID int64, channel string) (int, error) { p, err := g.node.Presence(orgchannel.PrependOrgID(orgID, channel)) if err != nil { @@ -652,35 +726,25 @@ func (g *GrafanaLive) HandleHTTPPublish(ctx *models.ReqContext, cmd dtos.LivePub return response.JSON(http.StatusOK, dtos.LivePublishResponse{}) } +type streamChannelListResponse struct { + Channels []*managedstream.ManagedChannel `json:"channels"` +} + // HandleListHTTP returns metadata so the UI can build a nice form func (g *GrafanaLive) HandleListHTTP(c *models.ReqContext) response.Response { - info := util.DynMap{} - channels := make([]util.DynMap, 0) - for k, v := range g.ManagedStreamRunner.Streams(c.SignedInUser.OrgId) { - channels = append(channels, v.ListChannels(c.SignedInUser.OrgId, "stream/"+k+"/")...) + var channels []*managedstream.ManagedChannel + var err error + if g.IsHA() { + channels, err = g.surveyCaller.CallManagedStreams(c.SignedInUser.OrgId) + } else { + channels, err = g.ManagedStreamRunner.GetManagedChannels(c.SignedInUser.OrgId) } - - // Hardcode sample streams - frameJSON, err := data.FrameToJSON(data.NewFrame("testdata", - data.NewField("Time", nil, make([]time.Time, 0)), - data.NewField("Value", nil, make([]float64, 0)), - data.NewField("Min", nil, make([]float64, 0)), - data.NewField("Max", nil, make([]float64, 0)), - ), data.IncludeSchemaOnly) - if err == nil { - channels = append(channels, util.DynMap{ - "channel": "plugin/testdata/random-2s-stream", - "data": json.RawMessage(frameJSON), - }, util.DynMap{ - "channel": "plugin/testdata/random-flakey-stream", - "data": json.RawMessage(frameJSON), - }, util.DynMap{ - "channel": "plugin/testdata/random-20Hz-stream", - "data": json.RawMessage(frameJSON), - }) + if err != nil { + return response.Error(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), err) + } + info := streamChannelListResponse{ + Channels: channels, } - - info["channels"] = channels return response.JSONStreaming(200, info) } @@ -696,3 +760,27 @@ func (g *GrafanaLive) HandleInfoHTTP(ctx *models.ReqContext) response.Response { "message": "Info is not supported for this channel", }) } + +// Write to the standard log15 logger +func handleLog(msg centrifuge.LogEntry) { + arr := make([]interface{}, 0) + for k, v := range msg.Fields { + if v == nil { + v = "" + } else if v == "" { + v = "" + } + arr = append(arr, k, v) + } + + switch msg.Level { + case centrifuge.LogLevelDebug: + loggerCF.Debug(msg.Message, arr...) + case centrifuge.LogLevelError: + loggerCF.Error(msg.Message, arr...) + case centrifuge.LogLevelInfo: + loggerCF.Info(msg.Message, arr...) + default: + loggerCF.Debug(msg.Message, arr...) + } +} diff --git a/pkg/services/live/liveplugin/plugin.go b/pkg/services/live/liveplugin/plugin.go new file mode 100644 index 00000000000..3c7939d0d1a --- /dev/null +++ b/pkg/services/live/liveplugin/plugin.go @@ -0,0 +1,56 @@ +package liveplugin + +import ( + "fmt" + + "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/plugins/plugincontext" + + "github.com/centrifugal/centrifuge" + "github.com/grafana/grafana-plugin-sdk-go/backend" +) + +type ChannelLocalPublisher struct { + node *centrifuge.Node +} + +func NewChannelLocalPublisher(node *centrifuge.Node) *ChannelLocalPublisher { + return &ChannelLocalPublisher{node: node} +} + +func (p *ChannelLocalPublisher) PublishLocal(channel string, data []byte) error { + pub := ¢rifuge.Publication{ + Data: data, + } + err := p.node.Hub().BroadcastPublication(channel, pub, centrifuge.StreamPosition{}) + if err != nil { + return fmt.Errorf("error publishing %s: %w", string(data), err) + } + return nil +} + +type NumLocalSubscribersGetter struct { + node *centrifuge.Node +} + +func NewNumLocalSubscribersGetter(node *centrifuge.Node) *NumLocalSubscribersGetter { + return &NumLocalSubscribersGetter{node: node} +} + +func (p *NumLocalSubscribersGetter) GetNumLocalSubscribers(channelID string) (int, error) { + return p.node.Hub().NumSubscribers(channelID), nil +} + +type ContextGetter struct { + PluginContextProvider *plugincontext.Provider +} + +func NewContextGetter(pluginContextProvider *plugincontext.Provider) *ContextGetter { + return &ContextGetter{ + PluginContextProvider: pluginContextProvider, + } +} + +func (g *ContextGetter) GetPluginContext(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) { + return g.PluginContextProvider.Get(pluginID, datasourceUID, user, skipCache) +} diff --git a/pkg/services/live/log.go b/pkg/services/live/log.go deleted file mode 100644 index 8c97094a4b4..00000000000 --- a/pkg/services/live/log.go +++ /dev/null @@ -1,27 +0,0 @@ -package live - -import "github.com/centrifugal/centrifuge" - -// Write to the standard log15 logger -func handleLog(msg centrifuge.LogEntry) { - arr := make([]interface{}, 0) - for k, v := range msg.Fields { - if v == nil { - v = "" - } else if v == "" { - v = "" - } - arr = append(arr, k, v) - } - - switch msg.Level { - case centrifuge.LogLevelDebug: - loggerCF.Debug(msg.Message, arr...) - case centrifuge.LogLevelError: - loggerCF.Error(msg.Message, arr...) - case centrifuge.LogLevelInfo: - loggerCF.Info(msg.Message, arr...) - default: - loggerCF.Debug(msg.Message, arr...) - } -} diff --git a/pkg/services/live/managedstream/cache.go b/pkg/services/live/managedstream/cache.go new file mode 100644 index 00000000000..9f84437efee --- /dev/null +++ b/pkg/services/live/managedstream/cache.go @@ -0,0 +1,17 @@ +package managedstream + +import ( + "encoding/json" + + "github.com/grafana/grafana-plugin-sdk-go/data" +) + +// FrameCache allows updating frame schema. Returns true is schema not changed. +type FrameCache interface { + // GetActiveChannels returns active managed stream channels with JSON schema. + GetActiveChannels(orgID int64) (map[string]json.RawMessage, error) + // GetFrame returns full JSON frame for a path. + GetFrame(orgID int64, channel string) (json.RawMessage, bool, error) + // Update updates frame cache and returns true if schema changed. + Update(orgID int64, channel string, frameJson data.FrameJSONCache) (bool, error) +} diff --git a/pkg/services/live/managedstream/cache_memory.go b/pkg/services/live/managedstream/cache_memory.go new file mode 100644 index 00000000000..520c4ee079d --- /dev/null +++ b/pkg/services/live/managedstream/cache_memory.go @@ -0,0 +1,54 @@ +package managedstream + +import ( + "encoding/json" + "sync" + + "github.com/grafana/grafana-plugin-sdk-go/data" +) + +// MemoryFrameCache ... +type MemoryFrameCache struct { + mu sync.RWMutex + frames map[int64]map[string]data.FrameJSONCache +} + +// NewMemoryFrameCache ... +func NewMemoryFrameCache() *MemoryFrameCache { + return &MemoryFrameCache{ + frames: map[int64]map[string]data.FrameJSONCache{}, + } +} + +func (c *MemoryFrameCache) GetActiveChannels(orgID int64) (map[string]json.RawMessage, error) { + c.mu.RLock() + defer c.mu.RUnlock() + frames, ok := c.frames[orgID] + if !ok { + return nil, nil + } + info := make(map[string]json.RawMessage, len(frames)) + for k, v := range frames { + info[k] = v.Bytes(data.IncludeSchemaOnly) + } + return info, nil +} + +func (c *MemoryFrameCache) GetFrame(orgID int64, channel string) (json.RawMessage, bool, error) { + c.mu.RLock() + defer c.mu.RUnlock() + cachedFrame, ok := c.frames[orgID][channel] + return cachedFrame.Bytes(data.IncludeAll), ok, nil +} + +func (c *MemoryFrameCache) Update(orgID int64, channel string, jsonFrame data.FrameJSONCache) (bool, error) { + c.mu.Lock() + defer c.mu.Unlock() + if _, ok := c.frames[orgID]; !ok { + c.frames[orgID] = map[string]data.FrameJSONCache{} + } + cachedJsonFrame, exists := c.frames[orgID][channel] + schemaUpdated := !exists || !cachedJsonFrame.SameSchema(&jsonFrame) + c.frames[orgID][channel] = jsonFrame + return schemaUpdated, nil +} diff --git a/pkg/services/live/managedstream/cache_memory_test.go b/pkg/services/live/managedstream/cache_memory_test.go new file mode 100644 index 00000000000..9f58969a8b4 --- /dev/null +++ b/pkg/services/live/managedstream/cache_memory_test.go @@ -0,0 +1,69 @@ +package managedstream + +import ( + "encoding/json" + "testing" + + "github.com/grafana/grafana-plugin-sdk-go/data" + + "github.com/stretchr/testify/require" +) + +func testFrameCache(t *testing.T, c FrameCache) { + // Create new frame and update cache. + frame := data.NewFrame("hello") + frameJsonCache, err := data.FrameToJSONCache(frame) + require.NoError(t, err) + + updated, err := c.Update(1, "test", frameJsonCache) + require.NoError(t, err) + require.True(t, updated) + + // Make sure channel is active. + channels, err := c.GetActiveChannels(1) + require.NoError(t, err) + schema, ok := channels["test"] + require.True(t, ok) + require.NotZero(t, schema) + + // Make sure the same frame does not update schema. + updated, err = c.Update(1, "test", frameJsonCache) + require.NoError(t, err) + require.False(t, updated) + + // Now construct new frame with updated schema. + newFrame := data.NewFrame("hello", data.NewField("new_field", nil, []int64{})) + frameJsonCache, err = data.FrameToJSONCache(newFrame) + require.NoError(t, err) + + // Make sure schema updated. + updated, err = c.Update(1, "test", frameJsonCache) + require.NoError(t, err) + require.True(t, updated) + + // Add the same with another orgID and make sure schema updated. + updated, err = c.Update(2, "test", frameJsonCache) + require.NoError(t, err) + require.True(t, updated) + + // Make sure that the last frame successfully saved in cache. + frameJSON, ok, err := c.GetFrame(1, "test") + require.NoError(t, err) + require.True(t, ok) + + var f data.Frame + err = json.Unmarshal(frameJSON, &f) + require.NoError(t, err) + require.Equal(t, "new_field", f.Fields[0].Name) + + // Make sure channel has updated schema. + channels, err = c.GetActiveChannels(1) + require.NoError(t, err) + require.NotEqual(t, string(channels["test"]), string(schema)) +} + +func TestMemoryFrameCache(t *testing.T) { + c := NewMemoryFrameCache() + require.NotNil(t, c) + testFrameCache(t, c) +} diff --git a/pkg/services/live/managedstream/cache_redis.go b/pkg/services/live/managedstream/cache_redis.go new file mode 100644 index 00000000000..e080a59f275 --- /dev/null +++ b/pkg/services/live/managedstream/cache_redis.go @@ -0,0 +1,111 @@ +package managedstream + +import ( + "encoding/json" + "errors" + "sync" + "time" + + "github.com/grafana/grafana/pkg/services/live/orgchannel" + + "github.com/grafana/grafana-plugin-sdk-go/data" + "gopkg.in/redis.v5" +) + +// RedisFrameCache ... +type RedisFrameCache struct { + mu sync.RWMutex + redisClient *redis.Client + frames map[int64]map[string]data.FrameJSONCache +} + +// NewRedisFrameCache ... +func NewRedisFrameCache(redisClient *redis.Client) *RedisFrameCache { + return &RedisFrameCache{ + frames: map[int64]map[string]data.FrameJSONCache{}, + redisClient: redisClient, + } +} + +func (c *RedisFrameCache) GetActiveChannels(orgID int64) (map[string]json.RawMessage, error) { + c.mu.RLock() + defer c.mu.RUnlock() + frames, ok := c.frames[orgID] + if !ok { + return nil, nil + } + info := make(map[string]json.RawMessage, len(frames)) + for k, v := range frames { + info[k] = v.Bytes(data.IncludeSchemaOnly) + } + return info, nil +} + +func (c *RedisFrameCache) GetFrame(orgID int64, channel string) (json.RawMessage, bool, error) { + key := getCacheKey(orgchannel.PrependOrgID(orgID, channel)) + cmd := c.redisClient.HGetAll(key) + result, err := cmd.Result() + if err != nil { + return nil, false, err + } + if len(result) == 0 { + return nil, false, nil + } + return json.RawMessage(result["frame"]), true, nil +} + +const ( + frameCacheTTL = 7 * 24 * time.Hour +) + +func (c *RedisFrameCache) Update(orgID int64, channel string, jsonFrame data.FrameJSONCache) (bool, error) { + c.mu.Lock() + if _, ok := c.frames[orgID]; !ok { + c.frames[orgID] = map[string]data.FrameJSONCache{} + } + c.frames[orgID][channel] = jsonFrame + c.mu.Unlock() + + stringSchema := string(jsonFrame.Bytes(data.IncludeSchemaOnly)) + + key := getCacheKey(orgchannel.PrependOrgID(orgID, channel)) + + pipe := c.redisClient.TxPipeline() + defer func() { _ = pipe.Close() }() + + pipe.HGetAll(key) + pipe.HMSet(key, map[string]string{ + "schema": stringSchema, + "frame": string(jsonFrame.Bytes(data.IncludeAll)), + }) + pipe.Expire(key, frameCacheTTL) + + replies, err := pipe.Exec() + if err != nil { + return false, err + } + if len(replies) == 0 { + return false, errors.New("no replies in response") + } + reply := replies[0] + + if reply.Err() != nil { + return false, err + } + + if mapReply, ok := reply.(*redis.StringStringMapCmd); ok { + result, err := mapReply.Result() + if err != nil { + return false, err + } + if len(result) == 0 { + return true, nil + } + return result["schema"] != stringSchema, nil + } + return true, nil +} + +func getCacheKey(channelID string) string { + return "gf_live.managed_stream." + channelID +} diff --git a/pkg/services/live/managedstream/cache_redis_test.go b/pkg/services/live/managedstream/cache_redis_test.go new file mode 100644 index 00000000000..05f029352e0 --- /dev/null +++ b/pkg/services/live/managedstream/cache_redis_test.go @@ -0,0 +1,19 @@ +// +build redis + +package managedstream + +import ( + "testing" + + "github.com/stretchr/testify/require" + "gopkg.in/redis.v5" +) + +func TestRedisCacheStorage(t *testing.T) { + redisClient := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + }) + c := NewRedisFrameCache(redisClient) + require.NotNil(t, c) + testFrameCache(t, c) +} diff --git a/pkg/services/live/managedstream/runner.go b/pkg/services/live/managedstream/runner.go index f98d08accd9..b98a63c36d7 100644 --- a/pkg/services/live/managedstream/runner.go +++ b/pkg/services/live/managedstream/runner.go @@ -3,6 +3,7 @@ package managedstream import ( "context" "encoding/json" + "fmt" "sync" "time" @@ -11,7 +12,6 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/live" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/models" - "github.com/grafana/grafana/pkg/util" ) var ( @@ -20,19 +20,53 @@ var ( // Runner keeps ManagedStream per streamID. type Runner struct { - mu sync.RWMutex - streams map[int64]map[string]*ManagedStream - publisher models.ChannelPublisher + mu sync.RWMutex + streams map[int64]map[string]*ManagedStream + publisher models.ChannelPublisher + frameCache FrameCache } // NewRunner creates new Runner. -func NewRunner(publisher models.ChannelPublisher) *Runner { +func NewRunner(publisher models.ChannelPublisher, frameCache FrameCache) *Runner { return &Runner{ - publisher: publisher, - streams: map[int64]map[string]*ManagedStream{}, + publisher: publisher, + streams: map[int64]map[string]*ManagedStream{}, + frameCache: frameCache, } } +func (r *Runner) GetManagedChannels(orgID int64) ([]*ManagedChannel, error) { + channels := make([]*ManagedChannel, 0) + for _, v := range r.Streams(orgID) { + streamChannels, err := v.ListChannels(orgID) + if err != nil { + return nil, err + } + channels = append(channels, streamChannels...) + } + + // Hardcode sample streams + frameJSON, err := data.FrameToJSON(data.NewFrame("testdata", + data.NewField("Time", nil, make([]time.Time, 0)), + data.NewField("Value", nil, make([]float64, 0)), + data.NewField("Min", nil, make([]float64, 0)), + data.NewField("Max", nil, make([]float64, 0)), + ), data.IncludeSchemaOnly) + if err == nil { + channels = append(channels, &ManagedChannel{ + Channel: "plugin/testdata/random-2s-stream", + Data: frameJSON, + }, &ManagedChannel{ + Channel: "plugin/testdata/random-flakey-stream", + Data: frameJSON, + }, &ManagedChannel{ + Channel: "plugin/testdata/random-20Hz-stream", + Data: frameJSON, + }) + } + return channels, nil +} + // Streams returns a map of active managed streams (per streamID). func (r *Runner) Streams(orgID int64) map[string]*ManagedStream { r.mu.RLock() @@ -58,7 +92,7 @@ func (r *Runner) GetOrCreateStream(orgID int64, streamID string) (*ManagedStream } s, ok := r.streams[orgID][streamID] if !ok { - s = NewManagedStream(streamID, r.publisher) + s = NewManagedStream(streamID, r.publisher, r.frameCache) r.streams[orgID][streamID] = s } return s, nil @@ -66,112 +100,90 @@ func (r *Runner) GetOrCreateStream(orgID int64, streamID string) (*ManagedStream // ManagedStream holds the state of a managed stream. type ManagedStream struct { - mu sync.RWMutex - id string - start time.Time - last map[int64]map[string]data.FrameJSONCache - publisher models.ChannelPublisher + id string + start time.Time + publisher models.ChannelPublisher + frameCache FrameCache } // NewManagedStream creates new ManagedStream. -func NewManagedStream(id string, publisher models.ChannelPublisher) *ManagedStream { +func NewManagedStream(id string, publisher models.ChannelPublisher, schemaUpdater FrameCache) *ManagedStream { return &ManagedStream{ - id: id, - start: time.Now(), - last: map[int64]map[string]data.FrameJSONCache{}, - publisher: publisher, + id: id, + start: time.Now(), + publisher: publisher, + frameCache: schemaUpdater, } } +// ManagedChannel represents a managed stream. +type ManagedChannel struct { + Channel string `json:"channel"` + Data json.RawMessage `json:"data"` +} + // ListChannels returns info for the UI about this stream. -func (s *ManagedStream) ListChannels(orgID int64, prefix string) []util.DynMap { - s.mu.RLock() - defer s.mu.RUnlock() - - if _, ok := s.last[orgID]; !ok { - return []util.DynMap{} +func (s *ManagedStream) ListChannels(orgID int64) ([]*ManagedChannel, error) { + paths, err := s.frameCache.GetActiveChannels(orgID) + if err != nil { + return []*ManagedChannel{}, fmt.Errorf("error getting active managed stream paths: %v", err) } - - info := make([]util.DynMap, 0, len(s.last[orgID])) - for k, v := range s.last[orgID] { - ch := util.DynMap{} - ch["channel"] = prefix + k - ch["data"] = json.RawMessage(v.Bytes(data.IncludeSchemaOnly)) - info = append(info, ch) + info := make([]*ManagedChannel, 0, len(paths)) + for k, v := range paths { + managedChannel := &ManagedChannel{ + Channel: k, + Data: v, + } + info = append(info, managedChannel) } - return info + return info, nil } // Push sends frame to the stream and saves it for later retrieval by subscribers. // unstableSchema flag can be set to disable schema caching for a path. func (s *ManagedStream) Push(orgID int64, path string, frame *data.Frame) error { - // Keep schema + data for last packet. - msg, err := data.FrameToJSONCache(frame) + jsonFrameCache, err := data.FrameToJSONCache(frame) if err != nil { - logger.Error("Error marshaling frame with data", "error", err) return err } - s.mu.Lock() - if _, ok := s.last[orgID]; !ok { - s.last[orgID] = map[string]data.FrameJSONCache{} - } - last, exists := s.last[orgID][path] - s.last[orgID][path] = msg - s.mu.Unlock() - - include := data.IncludeAll - if exists && last.SameSchema(&msg) { - // When the schema has not changed, just send the data. - include = data.IncludeDataOnly - } - frameJSON := msg.Bytes(include) - // The channel this will be posted into. channel := live.Channel{Scope: live.ScopeStream, Namespace: s.id, Path: path}.String() + + isUpdated, err := s.frameCache.Update(orgID, channel, jsonFrameCache) + if err != nil { + logger.Error("Error updating managed stream schema", "error", err) + return err + } + + // When the schema has not changed, just send the data. + include := data.IncludeDataOnly + if isUpdated { + // When the schema has been changed, send all. + include = data.IncludeAll + } + frameJSON := jsonFrameCache.Bytes(include) + logger.Debug("Publish data to channel", "channel", channel, "dataLength", len(frameJSON)) return s.publisher(orgID, channel, frameJSON) } -// getLastPacket retrieves last packet channel. -func (s *ManagedStream) getLastPacket(orgId int64, path string) (json.RawMessage, bool) { - s.mu.RLock() - defer s.mu.RUnlock() - _, ok := s.last[orgId] - if !ok { - return nil, false - } - msg, ok := s.last[orgId][path] - if ok { - return msg.Bytes(data.IncludeAll), ok - } - return nil, ok -} - func (s *ManagedStream) GetHandlerForPath(_ string) (models.ChannelHandler, error) { return s, nil } func (s *ManagedStream) OnSubscribe(_ context.Context, u *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, backend.SubscribeStreamStatus, error) { reply := models.SubscribeReply{} - packet, ok := s.getLastPacket(u.OrgId, e.Path) + frameJSON, ok, err := s.frameCache.GetFrame(u.OrgId, e.Channel) + if err != nil { + return reply, 0, err + } if ok { - reply.Data = packet + reply.Data = frameJSON } return reply, backend.SubscribeStreamStatusOK, nil } -func (s *ManagedStream) OnPublish(_ context.Context, u *models.SignedInUser, evt models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) { - var frame data.Frame - err := json.Unmarshal(evt.Data, &frame) - if err != nil { - // Stream scope only deals with data frames. - return models.PublishReply{}, 0, err - } - err = s.Push(u.OrgId, evt.Path, &frame) - if err != nil { - // Stream scope only deals with data frames. - return models.PublishReply{}, 0, err - } - return models.PublishReply{}, backend.PublishStreamStatusOK, nil +func (s *ManagedStream) OnPublish(_ context.Context, _ *models.SignedInUser, _ models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) { + return models.PublishReply{}, backend.PublishStreamStatusPermissionDenied, nil } diff --git a/pkg/services/live/managedstream/runner_test.go b/pkg/services/live/managedstream/runner_test.go index 6982ae29f76..1eca4fe7004 100644 --- a/pkg/services/live/managedstream/runner_test.go +++ b/pkg/services/live/managedstream/runner_test.go @@ -3,7 +3,6 @@ package managedstream import ( "testing" - "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/stretchr/testify/require" ) @@ -19,21 +18,6 @@ func (p *testPublisher) publish(orgID int64, _ string, _ []byte) error { func TestNewManagedStream(t *testing.T) { publisher := &testPublisher{orgID: 1, t: t} - c := NewManagedStream("a", publisher.publish) + c := NewManagedStream("a", publisher.publish, NewMemoryFrameCache()) require.NotNil(t, c) } - -func TestManagedStream_GetLastPacket(t *testing.T) { - var orgID int64 = 1 - publisher := &testPublisher{orgID: orgID, t: t} - c := NewManagedStream("a", publisher.publish) - _, ok := c.getLastPacket(orgID, "test") - require.False(t, ok) - err := c.Push(orgID, "test", data.NewFrame("hello")) - require.NoError(t, err) - - s, ok := c.getLastPacket(orgID, "test") - require.NoError(t, err) - require.True(t, ok) - require.JSONEq(t, `{"schema":{"name":"hello","fields":[]},"data":{"values":[]}}`, string(s)) -} diff --git a/pkg/services/live/plugin_helpers.go b/pkg/services/live/plugin_helpers.go deleted file mode 100644 index d90b79dcaf6..00000000000 --- a/pkg/services/live/plugin_helpers.go +++ /dev/null @@ -1,57 +0,0 @@ -package live - -import ( - "fmt" - - "github.com/grafana/grafana/pkg/models" - - "github.com/centrifugal/centrifuge" - "github.com/grafana/grafana-plugin-sdk-go/backend" - "github.com/grafana/grafana/pkg/plugins/plugincontext" -) - -type pluginChannelSender struct { - node *centrifuge.Node -} - -func newPluginChannelSender(node *centrifuge.Node) *pluginChannelSender { - return &pluginChannelSender{node: node} -} - -func (p *pluginChannelSender) Send(channel string, data []byte) error { - _, err := p.node.Publish(channel, data) - if err != nil { - return fmt.Errorf("error publishing %s: %w", string(data), err) - } - return nil -} - -type pluginPresenceGetter struct { - node *centrifuge.Node -} - -func newPluginPresenceGetter(node *centrifuge.Node) *pluginPresenceGetter { - return &pluginPresenceGetter{node: node} -} - -func (p *pluginPresenceGetter) GetNumSubscribers(channel string) (int, error) { - res, err := p.node.PresenceStats(channel) - if err != nil { - return 0, err - } - return res.NumClients, nil -} - -type pluginContextGetter struct { - PluginContextProvider *plugincontext.Provider -} - -func newPluginContextGetter(pluginContextProvider *plugincontext.Provider) *pluginContextGetter { - return &pluginContextGetter{ - PluginContextProvider: pluginContextProvider, - } -} - -func (g *pluginContextGetter) GetPluginContext(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) { - return g.PluginContextProvider.Get(pluginID, datasourceUID, user, skipCache) -} diff --git a/pkg/services/live/runstream/manager.go b/pkg/services/live/runstream/manager.go index 74179d8c6bf..e8c1949a0b5 100644 --- a/pkg/services/live/runstream/manager.go +++ b/pkg/services/live/runstream/manager.go @@ -18,18 +18,19 @@ var ( logger = log.New("live.runstream") ) -//go:generate mockgen -destination=mock.go -package=runstream github.com/grafana/grafana/pkg/services/live/runstream ChannelSender,PresenceGetter,StreamRunner,PluginContextGetter +//go:generate mockgen -destination=mock.go -package=runstream github.com/grafana/grafana/pkg/services/live/runstream ChannelLocalPublisher,NumLocalSubscribersGetter,StreamRunner,PluginContextGetter -type ChannelSender interface { - Send(channel string, data []byte) error +type ChannelLocalPublisher interface { + PublishLocal(channel string, data []byte) error } type PluginContextGetter interface { GetPluginContext(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) } -type PresenceGetter interface { - GetNumSubscribers(channel string) (int, error) +type NumLocalSubscribersGetter interface { + // GetNumSubscribers returns number of channel subscribers throughout all nodes. + GetNumLocalSubscribers(channel string) (int, error) } type StreamRunner interface { @@ -37,12 +38,12 @@ type StreamRunner interface { } type packetSender struct { - channelSender ChannelSender - channel string + channelLocalPublisher ChannelLocalPublisher + channel string } func (p *packetSender) Send(packet *backend.StreamPacket) error { - return p.channelSender.Send(p.channel, packet.Data) + return p.channelLocalPublisher.PublishLocal(p.channel, packet.Data) } // Manager manages streams from Grafana to plugins (i.e. RunStream method). @@ -51,9 +52,9 @@ type Manager struct { baseCtx context.Context streams map[string]streamContext datasourceStreams map[string]map[string]struct{} - presenceGetter PresenceGetter + presenceGetter NumLocalSubscribersGetter pluginContextGetter PluginContextGetter - channelSender ChannelSender + channelSender ChannelLocalPublisher registerCh chan submitRequest closedCh chan struct{} checkInterval time.Duration @@ -79,7 +80,7 @@ const ( ) // NewManager creates new Manager. -func NewManager(channelSender ChannelSender, presenceGetter PresenceGetter, pluginContextGetter PluginContextGetter, opts ...ManagerOption) *Manager { +func NewManager(channelSender ChannelLocalPublisher, presenceGetter NumLocalSubscribersGetter, pluginContextGetter PluginContextGetter, opts ...ManagerOption) *Manager { sm := &Manager{ streams: make(map[string]streamContext), datasourceStreams: map[string]map[string]struct{}{}, @@ -201,9 +202,9 @@ func (s *Manager) watchStream(ctx context.Context, cancelFn func(), sr streamReq } } case <-presenceTicker.C: - numSubscribers, err := s.presenceGetter.GetNumSubscribers(sr.Channel) + numSubscribers, err := s.presenceGetter.GetNumLocalSubscribers(sr.Channel) if err != nil { - logger.Error("Error checking num subscribers", "channel", sr.Channel, "path", sr.Path) + logger.Error("Error checking num subscribers", "channel", sr.Channel, "path", sr.Path, "error", err) continue } if numSubscribers > 0 { @@ -301,7 +302,7 @@ func (s *Manager) runStream(ctx context.Context, cancelFn func(), sr streamReque PluginContext: pluginCtx, Path: sr.Path, }, - backend.NewStreamSender(&packetSender{channelSender: s.channelSender, channel: sr.Channel}), + backend.NewStreamSender(&packetSender{channelLocalPublisher: s.channelSender, channel: sr.Channel}), ) if err != nil { if errors.Is(ctx.Err(), context.Canceled) { diff --git a/pkg/services/live/runstream/manager_test.go b/pkg/services/live/runstream/manager_test.go index fd1881f45ac..2cd329bc6a1 100644 --- a/pkg/services/live/runstream/manager_test.go +++ b/pkg/services/live/runstream/manager_test.go @@ -27,11 +27,11 @@ func TestStreamManager_Run(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - mockChannelSender := NewMockChannelSender(mockCtrl) - mockPresenceGetter := NewMockPresenceGetter(mockCtrl) + mockChannelPublisher := NewMockChannelLocalPublisher(mockCtrl) + mockNumSubscribersGetter := NewMockNumLocalSubscribersGetter(mockCtrl) mockContextGetter := NewMockPluginContextGetter(mockCtrl) - manager := NewManager(mockChannelSender, mockPresenceGetter, mockContextGetter) + manager := NewManager(mockChannelPublisher, mockNumSubscribersGetter, mockContextGetter) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -48,11 +48,11 @@ func TestStreamManager_SubmitStream_Send(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - mockPacketSender := NewMockChannelSender(mockCtrl) - mockPresenceGetter := NewMockPresenceGetter(mockCtrl) + mockPacketSender := NewMockChannelLocalPublisher(mockCtrl) + mockNumSubscribersGetter := NewMockNumLocalSubscribersGetter(mockCtrl) mockContextGetter := NewMockPluginContextGetter(mockCtrl) - manager := NewManager(mockPacketSender, mockPresenceGetter, mockContextGetter) + manager := NewManager(mockPacketSender, mockNumSubscribersGetter, mockContextGetter) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -79,7 +79,7 @@ func TestStreamManager_SubmitStream_Send(t *testing.T) { return testPluginContext, true, nil }).Times(0) - mockPacketSender.EXPECT().Send("1/test", gomock.Any()).Times(1) + mockPacketSender.EXPECT().PublishLocal("1/test", gomock.Any()).Times(1) mockStreamRunner := NewMockStreamRunner(mockCtrl) mockStreamRunner.EXPECT().RunStream( @@ -113,11 +113,11 @@ func TestStreamManager_SubmitStream_DifferentOrgID(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - mockPacketSender := NewMockChannelSender(mockCtrl) - mockPresenceGetter := NewMockPresenceGetter(mockCtrl) + mockPacketSender := NewMockChannelLocalPublisher(mockCtrl) + mockNumSubscribersGetter := NewMockNumLocalSubscribersGetter(mockCtrl) mockContextGetter := NewMockPluginContextGetter(mockCtrl) - manager := NewManager(mockPacketSender, mockPresenceGetter, mockContextGetter) + manager := NewManager(mockPacketSender, mockNumSubscribersGetter, mockContextGetter) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -130,8 +130,8 @@ func TestStreamManager_SubmitStream_DifferentOrgID(t *testing.T) { doneCh1 := make(chan struct{}) doneCh2 := make(chan struct{}) - mockPacketSender.EXPECT().Send("1/test", gomock.Any()).Times(1) - mockPacketSender.EXPECT().Send("2/test", gomock.Any()).Times(1) + mockPacketSender.EXPECT().PublishLocal("1/test", gomock.Any()).Times(1) + mockPacketSender.EXPECT().PublishLocal("2/test", gomock.Any()).Times(1) mockContextGetter.EXPECT().GetPluginContext(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) { return backend.PluginContext{}, true, nil @@ -184,14 +184,14 @@ func TestStreamManager_SubmitStream_CloseNoSubscribers(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - mockPacketSender := NewMockChannelSender(mockCtrl) - mockPresenceGetter := NewMockPresenceGetter(mockCtrl) + mockPacketSender := NewMockChannelLocalPublisher(mockCtrl) + mockNumSubscribersGetter := NewMockNumLocalSubscribersGetter(mockCtrl) mockContextGetter := NewMockPluginContextGetter(mockCtrl) // Create manager with very fast num subscribers checks. manager := NewManager( mockPacketSender, - mockPresenceGetter, + mockNumSubscribersGetter, mockContextGetter, WithCheckConfig(10*time.Millisecond, 3), ) @@ -209,7 +209,7 @@ func TestStreamManager_SubmitStream_CloseNoSubscribers(t *testing.T) { return backend.PluginContext{}, true, nil }).Times(0) - mockPresenceGetter.EXPECT().GetNumSubscribers("1/test").Return(0, nil).Times(3) + mockNumSubscribersGetter.EXPECT().GetNumLocalSubscribers("1/test").Return(0, nil).Times(3) mockStreamRunner := NewMockStreamRunner(mockCtrl) mockStreamRunner.EXPECT().RunStream(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error { @@ -231,11 +231,11 @@ func TestStreamManager_SubmitStream_ErrorRestartsRunStream(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - mockPacketSender := NewMockChannelSender(mockCtrl) - mockPresenceGetter := NewMockPresenceGetter(mockCtrl) + mockPacketSender := NewMockChannelLocalPublisher(mockCtrl) + mockNumSubscribersGetter := NewMockNumLocalSubscribersGetter(mockCtrl) mockContextGetter := NewMockPluginContextGetter(mockCtrl) - manager := NewManager(mockPacketSender, mockPresenceGetter, mockContextGetter) + manager := NewManager(mockPacketSender, mockNumSubscribersGetter, mockContextGetter) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -284,11 +284,11 @@ func TestStreamManager_SubmitStream_NilErrorStopsRunStream(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - mockPacketSender := NewMockChannelSender(mockCtrl) - mockPresenceGetter := NewMockPresenceGetter(mockCtrl) + mockPacketSender := NewMockChannelLocalPublisher(mockCtrl) + mockNumSubscribersGetter := NewMockNumLocalSubscribersGetter(mockCtrl) mockContextGetter := NewMockPluginContextGetter(mockCtrl) - manager := NewManager(mockPacketSender, mockPresenceGetter, mockContextGetter) + manager := NewManager(mockPacketSender, mockNumSubscribersGetter, mockContextGetter) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -317,11 +317,11 @@ func TestStreamManager_HandleDatasourceUpdate(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - mockPacketSender := NewMockChannelSender(mockCtrl) - mockPresenceGetter := NewMockPresenceGetter(mockCtrl) + mockPacketSender := NewMockChannelLocalPublisher(mockCtrl) + mockNumSubscribersGetter := NewMockNumLocalSubscribersGetter(mockCtrl) mockContextGetter := NewMockPluginContextGetter(mockCtrl) - manager := NewManager(mockPacketSender, mockPresenceGetter, mockContextGetter) + manager := NewManager(mockPacketSender, mockNumSubscribersGetter, mockContextGetter) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -383,11 +383,11 @@ func TestStreamManager_HandleDatasourceDelete(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - mockPacketSender := NewMockChannelSender(mockCtrl) - mockPresenceGetter := NewMockPresenceGetter(mockCtrl) + mockPacketSender := NewMockChannelLocalPublisher(mockCtrl) + mockNumSubscribersGetter := NewMockNumLocalSubscribersGetter(mockCtrl) mockContextGetter := NewMockPluginContextGetter(mockCtrl) - manager := NewManager(mockPacketSender, mockPresenceGetter, mockContextGetter) + manager := NewManager(mockPacketSender, mockNumSubscribersGetter, mockContextGetter) ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/pkg/services/live/runstream/mock.go b/pkg/services/live/runstream/mock.go index e8cbfa40eb5..43a7af93021 100644 --- a/pkg/services/live/runstream/mock.go +++ b/pkg/services/live/runstream/mock.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/grafana/grafana/pkg/services/live/runstream (interfaces: ChannelSender,PresenceGetter,StreamRunner,PluginContextGetter) +// Source: github.com/grafana/grafana/pkg/services/live/runstream (interfaces: ChannelLocalPublisher,NumLocalSubscribersGetter,StreamRunner,PluginContextGetter) // Package runstream is a generated GoMock package. package runstream @@ -13,79 +13,79 @@ import ( models "github.com/grafana/grafana/pkg/models" ) -// MockChannelSender is a mock of ChannelSender interface. -type MockChannelSender struct { +// MockChannelLocalPublisher is a mock of ChannelLocalPublisher interface. +type MockChannelLocalPublisher struct { ctrl *gomock.Controller - recorder *MockChannelSenderMockRecorder + recorder *MockChannelLocalPublisherMockRecorder } -// MockChannelSenderMockRecorder is the mock recorder for MockChannelSender. -type MockChannelSenderMockRecorder struct { - mock *MockChannelSender +// MockChannelLocalPublisherMockRecorder is the mock recorder for MockChannelLocalPublisher. +type MockChannelLocalPublisherMockRecorder struct { + mock *MockChannelLocalPublisher } -// NewMockChannelSender creates a new mock instance. -func NewMockChannelSender(ctrl *gomock.Controller) *MockChannelSender { - mock := &MockChannelSender{ctrl: ctrl} - mock.recorder = &MockChannelSenderMockRecorder{mock} +// NewMockChannelLocalPublisher creates a new mock instance. +func NewMockChannelLocalPublisher(ctrl *gomock.Controller) *MockChannelLocalPublisher { + mock := &MockChannelLocalPublisher{ctrl: ctrl} + mock.recorder = &MockChannelLocalPublisherMockRecorder{mock} return mock } // EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockChannelSender) EXPECT() *MockChannelSenderMockRecorder { +func (m *MockChannelLocalPublisher) EXPECT() *MockChannelLocalPublisherMockRecorder { return m.recorder } -// Send mocks base method. -func (m *MockChannelSender) Send(arg0 string, arg1 []byte) error { +// PublishLocal mocks base method. +func (m *MockChannelLocalPublisher) PublishLocal(arg0 string, arg1 []byte) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Send", arg0, arg1) + ret := m.ctrl.Call(m, "PublishLocal", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } -// Send indicates an expected call of Send. -func (mr *MockChannelSenderMockRecorder) Send(arg0, arg1 interface{}) *gomock.Call { +// PublishLocal indicates an expected call of PublishLocal. +func (mr *MockChannelLocalPublisherMockRecorder) PublishLocal(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockChannelSender)(nil).Send), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PublishLocal", reflect.TypeOf((*MockChannelLocalPublisher)(nil).PublishLocal), arg0, arg1) } -// MockPresenceGetter is a mock of PresenceGetter interface. -type MockPresenceGetter struct { +// MockNumLocalSubscribersGetter is a mock of NumLocalSubscribersGetter interface. +type MockNumLocalSubscribersGetter struct { ctrl *gomock.Controller - recorder *MockPresenceGetterMockRecorder + recorder *MockNumLocalSubscribersGetterMockRecorder } -// MockPresenceGetterMockRecorder is the mock recorder for MockPresenceGetter. -type MockPresenceGetterMockRecorder struct { - mock *MockPresenceGetter +// MockNumLocalSubscribersGetterMockRecorder is the mock recorder for MockNumLocalSubscribersGetter. +type MockNumLocalSubscribersGetterMockRecorder struct { + mock *MockNumLocalSubscribersGetter } -// NewMockPresenceGetter creates a new mock instance. -func NewMockPresenceGetter(ctrl *gomock.Controller) *MockPresenceGetter { - mock := &MockPresenceGetter{ctrl: ctrl} - mock.recorder = &MockPresenceGetterMockRecorder{mock} +// NewMockNumLocalSubscribersGetter creates a new mock instance. +func NewMockNumLocalSubscribersGetter(ctrl *gomock.Controller) *MockNumLocalSubscribersGetter { + mock := &MockNumLocalSubscribersGetter{ctrl: ctrl} + mock.recorder = &MockNumLocalSubscribersGetterMockRecorder{mock} return mock } // EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockPresenceGetter) EXPECT() *MockPresenceGetterMockRecorder { +func (m *MockNumLocalSubscribersGetter) EXPECT() *MockNumLocalSubscribersGetterMockRecorder { return m.recorder } -// GetNumSubscribers mocks base method. -func (m *MockPresenceGetter) GetNumSubscribers(arg0 string) (int, error) { +// GetNumLocalSubscribers mocks base method. +func (m *MockNumLocalSubscribersGetter) GetNumLocalSubscribers(arg0 string) (int, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetNumSubscribers", arg0) + ret := m.ctrl.Call(m, "GetNumLocalSubscribers", arg0) ret0, _ := ret[0].(int) ret1, _ := ret[1].(error) return ret0, ret1 } -// GetNumSubscribers indicates an expected call of GetNumSubscribers. -func (mr *MockPresenceGetterMockRecorder) GetNumSubscribers(arg0 interface{}) *gomock.Call { +// GetNumLocalSubscribers indicates an expected call of GetNumLocalSubscribers. +func (mr *MockNumLocalSubscribersGetterMockRecorder) GetNumLocalSubscribers(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetNumSubscribers", reflect.TypeOf((*MockPresenceGetter)(nil).GetNumSubscribers), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetNumLocalSubscribers", reflect.TypeOf((*MockNumLocalSubscribersGetter)(nil).GetNumLocalSubscribers), arg0) } // MockStreamRunner is a mock of StreamRunner interface. diff --git a/pkg/services/live/survey/survey.go b/pkg/services/live/survey/survey.go new file mode 100644 index 00000000000..e4743aab623 --- /dev/null +++ b/pkg/services/live/survey/survey.go @@ -0,0 +1,117 @@ +package survey + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/centrifugal/centrifuge" + "github.com/grafana/grafana/pkg/services/live/managedstream" +) + +type Caller struct { + managedStreamRunner *managedstream.Runner + node *centrifuge.Node +} + +const ( + managedStreamsCall = "managed_streams" +) + +func NewCaller(managedStreamRunner *managedstream.Runner, node *centrifuge.Node) *Caller { + return &Caller{managedStreamRunner: managedStreamRunner, node: node} +} + +func (c *Caller) SetupHandlers() error { + c.node.OnSurvey(c.handleSurvey) + return nil +} + +type NodeManagedChannelsRequest struct { + OrgID int64 `json:"orgId"` +} + +type NodeManagedChannelsResponse struct { + Channels []*managedstream.ManagedChannel `json:"channels"` +} + +func (c *Caller) handleSurvey(e centrifuge.SurveyEvent, cb centrifuge.SurveyCallback) { + var ( + resp interface{} + err error + ) + switch e.Op { + case managedStreamsCall: + resp, err = c.handleManagedStreams(e.Data) + default: + err = errors.New("method not found") + } + if err != nil { + cb(centrifuge.SurveyReply{Code: 1}) + return + } + jsonData, err := json.Marshal(resp) + if err != nil { + cb(centrifuge.SurveyReply{Code: 1}) + return + } + cb(centrifuge.SurveyReply{ + Code: 0, + Data: jsonData, + }) +} + +func (c *Caller) handleManagedStreams(data []byte) (interface{}, error) { + var req NodeManagedChannelsRequest + err := json.Unmarshal(data, &req) + if err != nil { + return nil, err + } + channels, err := c.managedStreamRunner.GetManagedChannels(req.OrgID) + if err != nil { + return nil, err + } + return NodeManagedChannelsResponse{ + Channels: channels, + }, nil +} + +func (c *Caller) CallManagedStreams(orgID int64) ([]*managedstream.ManagedChannel, error) { + req := NodeManagedChannelsRequest{OrgID: orgID} + jsonData, err := json.Marshal(req) + if err != nil { + return nil, err + } + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + resp, err := c.node.Survey(ctx, managedStreamsCall, jsonData) + if err != nil { + return nil, err + } + + channels := make([]*managedstream.ManagedChannel, 0) + duplicatesCheck := map[string]struct{}{} + + for _, result := range resp { + if result.Code != 0 { + return nil, fmt.Errorf("unexpected survey code: %d", result.Code) + } + var res NodeManagedChannelsResponse + err := json.Unmarshal(result.Data, &res) + if err != nil { + return nil, err + } + for _, ch := range res.Channels { + if _, ok := duplicatesCheck[ch.Channel]; ok { + continue + } + channels = append(channels, ch) + duplicatesCheck[ch.Channel] = struct{}{} + } + } + + return channels, nil +} diff --git a/pkg/setting/setting.go b/pkg/setting/setting.go index f87bcfdff7f..2232d88b970 100644 --- a/pkg/setting/setting.go +++ b/pkg/setting/setting.go @@ -386,6 +386,11 @@ type Cfg struct { // Grafana Live ws endpoint (per Grafana server instance). 0 disables // Live, -1 means unlimited connections. LiveMaxConnections int + // LiveHAEngine is a type of engine to use to achieve HA with Grafana Live. + // Zero value means in-memory single node setup. + LiveHAEngine string + // LiveHAEngineAddress is a connection address for Live HA engine. + LiveHAEngineAddress string // Grafana.com URL GrafanaComURL string @@ -1447,5 +1452,12 @@ func (cfg *Cfg) readLiveSettings(iniFile *ini.File) error { if cfg.LiveMaxConnections < -1 { return fmt.Errorf("unexpected value %d for [live] max_connections", cfg.LiveMaxConnections) } + cfg.LiveHAEngine = section.Key("ha_engine").MustString("") + switch cfg.LiveHAEngine { + case "", "redis": + default: + return fmt.Errorf("unsupported live HA engine type: %s", cfg.LiveHAEngine) + } + cfg.LiveHAEngineAddress = section.Key("ha_engine_address").MustString("127.0.0.1:6379") return nil }