Live: update Streaming plugin definitions, put frame schema in subscribe result data (#32561)

Co-authored-by: Torkel Ödegaard <torkel@grafana.org>
Co-authored-by: Ryan McKinley <ryantxu@gmail.com>
This commit is contained in:
Alexander Emelin
2021-04-02 19:41:45 +03:00
committed by GitHub
parent 7fcb6ecb91
commit 93292f6eef
21 changed files with 408 additions and 205 deletions
+6 -4
View File
@@ -4,6 +4,8 @@ import (
"context"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/models"
)
@@ -17,18 +19,18 @@ func (b *BroadcastRunner) GetHandlerForPath(path string) (models.ChannelHandler,
}
// OnSubscribe will let anyone connect to the path
func (b *BroadcastRunner) OnSubscribe(ctx context.Context, _ *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, bool, error) {
func (b *BroadcastRunner) OnSubscribe(ctx context.Context, _ *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, backend.SubscribeStreamStatus, error) {
return models.SubscribeReply{
Presence: true,
JoinLeave: true,
Recover: true, // loads the saved value from history
}, true, nil
}, backend.SubscribeStreamStatusOK, nil
}
// OnPublish is called when a client wants to broadcast on the websocket
func (b *BroadcastRunner) OnPublish(ctx context.Context, _ *models.SignedInUser, e models.PublishEvent) (models.PublishReply, bool, error) {
func (b *BroadcastRunner) OnPublish(ctx context.Context, _ *models.SignedInUser, e models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) {
return models.PublishReply{
HistorySize: 1, // The last message is saved for 10 min.
HistoryTTL: 10 * time.Minute,
}, true, nil
}, backend.PublishStreamStatusOK, nil
}
+7 -5
View File
@@ -4,6 +4,8 @@ import (
"context"
"encoding/json"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/models"
)
@@ -26,16 +28,16 @@ func (h *DashboardHandler) GetHandlerForPath(path string) (models.ChannelHandler
}
// OnSubscribe for now allows anyone to subscribe to any dashboard
func (h *DashboardHandler) OnSubscribe(ctx context.Context, _ *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, bool, error) {
func (h *DashboardHandler) OnSubscribe(ctx context.Context, _ *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, backend.SubscribeStreamStatus, error) {
return models.SubscribeReply{
Presence: true,
JoinLeave: true,
}, true, nil
}, backend.SubscribeStreamStatusOK, nil
}
// OnPublish is called when someone begins to edit a dashoard
func (h *DashboardHandler) OnPublish(ctx context.Context, _ *models.SignedInUser, e models.PublishEvent) (models.PublishReply, bool, error) {
return models.PublishReply{}, true, nil
// OnPublish is called when someone begins to edit a dashboard
func (h *DashboardHandler) OnPublish(ctx context.Context, _ *models.SignedInUser, e models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) {
return models.PublishReply{}, backend.PublishStreamStatusOK, nil
}
// DashboardSaved should broadcast to the appropriate stream
+6 -4
View File
@@ -3,6 +3,8 @@ package features
import (
"context"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
)
@@ -23,12 +25,12 @@ func (m *MeasurementsRunner) GetHandlerForPath(path string) (models.ChannelHandl
}
// OnSubscribe will let anyone connect to the path
func (m *MeasurementsRunner) OnSubscribe(ctx context.Context, _ *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, bool, error) {
return models.SubscribeReply{}, true, nil
func (m *MeasurementsRunner) OnSubscribe(ctx context.Context, _ *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, backend.SubscribeStreamStatus, error) {
return models.SubscribeReply{}, backend.SubscribeStreamStatusOK, nil
}
// OnPublish is called when a client wants to broadcast on the websocket
// Currently this sends measurements over websocket -- should be replaced with the HTTP interface
func (m *MeasurementsRunner) OnPublish(ctx context.Context, _ *models.SignedInUser, e models.PublishEvent) (models.PublishReply, bool, error) {
return models.PublishReply{}, true, nil
func (m *MeasurementsRunner) OnPublish(ctx context.Context, _ *models.SignedInUser, e models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) {
return models.PublishReply{}, backend.PublishStreamStatusOK, nil
}
+20 -19
View File
@@ -1,5 +1,5 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/grafana/grafana/pkg/services/live/features (interfaces: ChannelPublisher,PresenceGetter,PluginContextGetter,StreamRunner)
// Source: github.com/grafana/grafana/pkg/services/live/features (interfaces: StreamPacketSender,PresenceGetter,PluginContextGetter,StreamRunner)
// Package features is a generated GoMock package.
package features
@@ -10,43 +10,44 @@ import (
gomock "github.com/golang/mock/gomock"
backend "github.com/grafana/grafana-plugin-sdk-go/backend"
models "github.com/grafana/grafana/pkg/models"
)
// MockChannelPublisher is a mock of ChannelPublisher interface.
type MockChannelPublisher struct {
// MockStreamPacketSender is a mock of StreamPacketSender interface.
type MockStreamPacketSender struct {
ctrl *gomock.Controller
recorder *MockChannelPublisherMockRecorder
recorder *MockStreamPacketSenderMockRecorder
}
// MockChannelPublisherMockRecorder is the mock recorder for MockChannelPublisher.
type MockChannelPublisherMockRecorder struct {
mock *MockChannelPublisher
// MockStreamPacketSenderMockRecorder is the mock recorder for MockStreamPacketSender.
type MockStreamPacketSenderMockRecorder struct {
mock *MockStreamPacketSender
}
// NewMockChannelPublisher creates a new mock instance.
func NewMockChannelPublisher(ctrl *gomock.Controller) *MockChannelPublisher {
mock := &MockChannelPublisher{ctrl: ctrl}
mock.recorder = &MockChannelPublisherMockRecorder{mock}
// NewMockStreamPacketSender creates a new mock instance.
func NewMockStreamPacketSender(ctrl *gomock.Controller) *MockStreamPacketSender {
mock := &MockStreamPacketSender{ctrl: ctrl}
mock.recorder = &MockStreamPacketSenderMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockChannelPublisher) EXPECT() *MockChannelPublisherMockRecorder {
func (m *MockStreamPacketSender) EXPECT() *MockStreamPacketSenderMockRecorder {
return m.recorder
}
// Publish mocks base method.
func (m *MockChannelPublisher) Publish(arg0 string, arg1 []byte) error {
// Send mocks base method.
func (m *MockStreamPacketSender) Send(arg0 string, arg1 *backend.StreamPacket) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Publish", arg0, arg1)
ret := m.ctrl.Call(m, "Send", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// Publish indicates an expected call of Publish.
func (mr *MockChannelPublisherMockRecorder) Publish(arg0, arg1 interface{}) *gomock.Call {
// Send indicates an expected call of Send.
func (mr *MockStreamPacketSenderMockRecorder) Send(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Publish", reflect.TypeOf((*MockChannelPublisher)(nil).Publish), arg0, arg1)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockStreamPacketSender)(nil).Send), arg0, arg1)
}
// MockPresenceGetter is a mock of PresenceGetter interface.
@@ -111,7 +112,7 @@ func (m *MockPluginContextGetter) EXPECT() *MockPluginContextGetterMockRecorder
}
// GetPluginContext mocks base method.
func (m *MockPluginContextGetter) GetPluginContext(arg0 context.Context, arg1, arg2 string) (backend.PluginContext, bool, error) {
func (m *MockPluginContextGetter) GetPluginContext(arg0 *models.SignedInUser, arg1, arg2 string) (backend.PluginContext, bool, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetPluginContext", arg0, arg1, arg2)
ret0, _ := ret[0].(backend.PluginContext)
+59 -26
View File
@@ -2,17 +2,16 @@ package features
import (
"context"
"fmt"
"github.com/centrifugal/centrifuge"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/models"
)
//go:generate mockgen -destination=mock.go -package=features github.com/grafana/grafana/pkg/services/live/features ChannelPublisher,PresenceGetter,PluginContextGetter,StreamRunner
//go:generate mockgen -destination=mock.go -package=features github.com/grafana/grafana/pkg/services/live/features StreamPacketSender,PresenceGetter,PluginContextGetter,StreamRunner
type ChannelPublisher interface {
Publish(channel string, data []byte) error
type StreamPacketSender interface {
Send(channel string, packet *backend.StreamPacket) error
}
type PresenceGetter interface {
@@ -28,16 +27,19 @@ type StreamRunner interface {
}
type streamSender struct {
channel string
channelPublisher ChannelPublisher
channel string
packetSender StreamPacketSender
}
func newStreamSender(channel string, publisher ChannelPublisher) *streamSender {
return &streamSender{channel: channel, channelPublisher: publisher}
func newStreamSender(channel string, packetSender StreamPacketSender) *streamSender {
return &streamSender{
channel: channel,
packetSender: packetSender,
}
}
func (p *streamSender) Send(packet *backend.StreamPacket) error {
return p.channelPublisher.Publish(p.channel, packet.Payload)
return p.packetSender.Send(p.channel, packet)
}
// PluginRunner can handle streaming operations for channels belonging to plugins.
@@ -83,39 +85,70 @@ type PluginPathRunner struct {
}
// OnSubscribe passes control to a plugin.
func (r *PluginPathRunner) OnSubscribe(ctx context.Context, user *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, bool, error) {
func (r *PluginPathRunner) OnSubscribe(ctx context.Context, user *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, backend.SubscribeStreamStatus, error) {
pCtx, found, err := r.pluginContextGetter.GetPluginContext(user, r.pluginID, r.datasourceUID)
if err != nil {
logger.Error("Get plugin context error", "error", err, "path", r.path)
return models.SubscribeReply{}, false, err
return models.SubscribeReply{}, 0, err
}
if !found {
logger.Error("Plugin context not found", "path", r.path)
return models.SubscribeReply{}, false, centrifuge.ErrorInternal
return models.SubscribeReply{}, 0, centrifuge.ErrorInternal
}
resp, err := r.handler.CanSubscribeToStream(ctx, &backend.SubscribeToStreamRequest{
resp, err := r.handler.SubscribeStream(ctx, &backend.SubscribeStreamRequest{
PluginContext: pCtx,
Path: r.path,
})
if err != nil {
logger.Error("Plugin CanSubscribeToStream call error", "error", err, "path", r.path)
return models.SubscribeReply{}, false, err
return models.SubscribeReply{}, 0, err
}
if !resp.OK {
return models.SubscribeReply{}, false, nil
if resp.Status != backend.SubscribeStreamStatusOK {
return models.SubscribeReply{}, resp.Status, nil
}
err = r.streamManager.SubmitStream(e.Channel, r.path, pCtx, r.handler)
if err != nil {
logger.Error("Error submitting stream to manager", "error", err, "path", r.path)
return models.SubscribeReply{}, false, centrifuge.ErrorInternal
if resp.UseRunStream {
submitResult, err := r.streamManager.SubmitStream(ctx, e.Channel, r.path, pCtx, r.handler)
if err != nil {
logger.Error("Error submitting stream to manager", "error", err, "path", r.path)
return models.SubscribeReply{}, 0, centrifuge.ErrorInternal
}
if submitResult.StreamExists {
logger.Debug("Skip running new stream (already exists)", "path", r.path)
} else {
logger.Debug("Running a new keepalive stream", "path", r.path)
}
}
return models.SubscribeReply{
Presence: true,
}, true, nil
reply := models.SubscribeReply{
Presence: resp.UseRunStream, // only enable presence for streams with UseRunStream on at the moment.
Data: resp.Data,
}
return reply, backend.SubscribeStreamStatusOK, nil
}
// OnPublish passes control to a plugin.
func (r *PluginPathRunner) OnPublish(_ context.Context, _ *models.SignedInUser, _ models.PublishEvent) (models.PublishReply, bool, error) {
// TODO: pass control to a plugin.
return models.PublishReply{}, false, fmt.Errorf("not implemented yet")
func (r *PluginPathRunner) OnPublish(ctx context.Context, user *models.SignedInUser, e models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) {
pCtx, found, err := r.pluginContextGetter.GetPluginContext(user, r.pluginID, r.datasourceUID)
if err != nil {
logger.Error("Get plugin context error", "error", err, "path", r.path)
return models.PublishReply{}, 0, err
}
if !found {
logger.Error("Plugin context not found", "path", r.path)
return models.PublishReply{}, 0, centrifuge.ErrorInternal
}
resp, err := r.handler.PublishStream(ctx, &backend.PublishStreamRequest{
PluginContext: pCtx,
Path: r.path,
Data: e.Data,
})
if err != nil {
logger.Error("Plugin CanSubscribeToStream call error", "error", err, "path", r.path)
return models.PublishReply{}, 0, err
}
if resp.Status != backend.PublishStreamStatusOK {
return models.PublishReply{}, resp.Status, nil
}
return models.PublishReply{Data: resp.Data}, backend.PublishStreamStatusOK, nil
}
+71 -37
View File
@@ -11,14 +11,14 @@ import (
// StreamManager manages streams from Grafana to plugins.
type StreamManager struct {
mu sync.RWMutex
streams map[string]struct{}
presenceGetter PresenceGetter
channelPublisher ChannelPublisher
registerCh chan streamRequest
closedCh chan struct{}
checkInterval time.Duration
maxChecks int
mu sync.RWMutex
streams map[string]struct{}
presenceGetter PresenceGetter
packetSender StreamPacketSender
registerCh chan submitRequest
closedCh chan struct{}
checkInterval time.Duration
maxChecks int
}
// StreamManagerOption modifies StreamManager behavior (used for tests for example).
@@ -38,15 +38,15 @@ const (
)
// NewStreamManager creates new StreamManager.
func NewStreamManager(chPublisher ChannelPublisher, presenceGetter PresenceGetter, opts ...StreamManagerOption) *StreamManager {
func NewStreamManager(packetSender StreamPacketSender, presenceGetter PresenceGetter, opts ...StreamManagerOption) *StreamManager {
sm := &StreamManager{
streams: make(map[string]struct{}),
channelPublisher: chPublisher,
presenceGetter: presenceGetter,
registerCh: make(chan streamRequest),
closedCh: make(chan struct{}),
checkInterval: defaultCheckInterval,
maxChecks: defaultMaxChecks,
streams: make(map[string]struct{}),
packetSender: packetSender,
presenceGetter: presenceGetter,
registerCh: make(chan submitRequest),
closedCh: make(chan struct{}),
checkInterval: defaultCheckInterval,
maxChecks: defaultMaxChecks,
}
for _, opt := range opts {
opt(sm)
@@ -80,7 +80,7 @@ func (s *StreamManager) watchStream(ctx context.Context, cancelFn func(), sr str
}
numNoSubscribersChecks++
if numNoSubscribersChecks >= s.maxChecks {
logger.Info("Stop stream since no active subscribers", "channel", sr.Channel, "path", sr.Path)
logger.Debug("Stop stream since no active subscribers", "channel", sr.Channel, "path", sr.Path)
s.stopStream(sr, cancelFn)
return
}
@@ -102,11 +102,11 @@ func (s *StreamManager) runStream(ctx context.Context, sr streamRequest) {
PluginContext: sr.PluginContext,
Path: sr.Path,
},
newStreamSender(sr.Channel, s.channelPublisher),
newStreamSender(sr.Channel, s.packetSender),
)
if err != nil {
if errors.Is(ctx.Err(), context.Canceled) {
logger.Info("Stream cleanly finished", "path", sr.Path)
logger.Debug("Stream cleanly finished", "path", sr.Path)
return
}
logger.Error("Error running stream, retrying", "path", sr.Path, "error", err)
@@ -117,20 +117,22 @@ func (s *StreamManager) runStream(ctx context.Context, sr streamRequest) {
}
}
func (s *StreamManager) registerStream(ctx context.Context, sr streamRequest) {
var errClosed = errors.New("stream manager closed")
func (s *StreamManager) registerStream(ctx context.Context, sr submitRequest) {
s.mu.Lock()
if _, ok := s.streams[sr.Channel]; ok {
logger.Debug("Skip running new stream (already exists)", "path", sr.Path)
if _, ok := s.streams[sr.streamRequest.Channel]; ok {
s.mu.Unlock()
sr.responseCh <- submitResponse{Result: submitResult{StreamExists: true}}
return
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
s.streams[sr.Channel] = struct{}{}
s.streams[sr.streamRequest.Channel] = struct{}{}
s.mu.Unlock()
go s.watchStream(ctx, cancel, sr)
s.runStream(ctx, sr)
sr.responseCh <- submitResponse{Result: submitResult{StreamExists: false}}
go s.watchStream(ctx, cancel, sr.streamRequest)
s.runStream(ctx, sr.streamRequest)
}
// Run StreamManager till context canceled.
@@ -153,21 +155,53 @@ type streamRequest struct {
StreamRunner StreamRunner
}
type submitRequest struct {
responseCh chan submitResponse
streamRequest streamRequest
}
type submitResult struct {
StreamExists bool
}
type submitResponse struct {
Error error
Result submitResult
}
// SubmitStream submits stream handler in StreamManager to manage.
// The stream will be opened and kept till channel has active subscribers.
func (s *StreamManager) SubmitStream(channel string, path string, pCtx backend.PluginContext, streamRunner StreamRunner) error {
func (s *StreamManager) SubmitStream(ctx context.Context, channel string, path string, pCtx backend.PluginContext, streamRunner StreamRunner) (*submitResult, error) {
req := submitRequest{
responseCh: make(chan submitResponse, 1),
streamRequest: streamRequest{
Channel: channel,
Path: path,
PluginContext: pCtx,
StreamRunner: streamRunner,
},
}
// Send submit request.
select {
case s.registerCh <- req:
case <-s.closedCh:
close(s.registerCh)
return nil
case s.registerCh <- streamRequest{
Channel: channel,
Path: path,
PluginContext: pCtx,
StreamRunner: streamRunner,
}:
case <-time.After(time.Second):
return errors.New("timeout")
return nil, errClosed
case <-ctx.Done():
return nil, ctx.Err()
}
// Wait for submit response.
select {
case resp := <-req.responseCh:
if resp.Error != nil {
return nil, resp.Error
}
return &resp.Result, nil
case <-s.closedCh:
return nil, errClosed
case <-ctx.Done():
return nil, ctx.Err()
}
return nil
}
+13 -11
View File
@@ -24,10 +24,10 @@ func TestStreamManager_Run(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mockChannelPublisher := NewMockChannelPublisher(mockCtrl)
mockPacketSender := NewMockStreamPacketSender(mockCtrl)
mockPresenceGetter := NewMockPresenceGetter(mockCtrl)
manager := NewStreamManager(mockChannelPublisher, mockPresenceGetter)
manager := NewStreamManager(mockPacketSender, mockPresenceGetter)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -44,10 +44,10 @@ func TestStreamManager_SubmitStream_Send(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mockChannelPublisher := NewMockChannelPublisher(mockCtrl)
mockPacketSender := NewMockStreamPacketSender(mockCtrl)
mockPresenceGetter := NewMockPresenceGetter(mockCtrl)
manager := NewStreamManager(mockChannelPublisher, mockPresenceGetter)
manager := NewStreamManager(mockPacketSender, mockPresenceGetter)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -58,7 +58,7 @@ func TestStreamManager_SubmitStream_Send(t *testing.T) {
startedCh := make(chan struct{})
doneCh := make(chan struct{})
mockChannelPublisher.EXPECT().Publish("test", []byte("test")).Times(1)
mockPacketSender.EXPECT().Send("test", gomock.Any()).Times(1)
mockStreamRunner := NewMockStreamRunner(mockCtrl)
mockStreamRunner.EXPECT().RunStream(
@@ -67,7 +67,7 @@ func TestStreamManager_SubmitStream_Send(t *testing.T) {
require.Equal(t, "test", req.Path)
close(startedCh)
err := sender.Send(&backend.StreamPacket{
Payload: []byte("test"),
Data: []byte("test"),
})
require.NoError(t, err)
<-ctx.Done()
@@ -75,12 +75,14 @@ func TestStreamManager_SubmitStream_Send(t *testing.T) {
return ctx.Err()
}).Times(1)
err := manager.SubmitStream("test", "test", backend.PluginContext{}, mockStreamRunner)
result, err := manager.SubmitStream(context.Background(), "test", "test", backend.PluginContext{}, mockStreamRunner)
require.NoError(t, err)
require.False(t, result.StreamExists)
// try submit the same.
err = manager.SubmitStream("test", "test", backend.PluginContext{}, mockStreamRunner)
result, err = manager.SubmitStream(context.Background(), "test", "test", backend.PluginContext{}, mockStreamRunner)
require.NoError(t, err)
require.True(t, result.StreamExists)
waitWithTimeout(t, startedCh, time.Second)
require.Len(t, manager.streams, 1)
@@ -92,11 +94,11 @@ func TestStreamManager_SubmitStream_CloseNoSubscribers(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mockChannelPublisher := NewMockChannelPublisher(mockCtrl)
mockPacketSender := NewMockStreamPacketSender(mockCtrl)
mockPresenceGetter := NewMockPresenceGetter(mockCtrl)
manager := NewStreamManager(
mockChannelPublisher,
mockPacketSender,
mockPresenceGetter,
WithCheckConfig(10*time.Millisecond, 3),
)
@@ -120,7 +122,7 @@ func TestStreamManager_SubmitStream_CloseNoSubscribers(t *testing.T) {
return ctx.Err()
}).Times(1)
err := manager.SubmitStream("test", "test", backend.PluginContext{}, mockStreamRunner)
_, err := manager.SubmitStream(context.Background(), "test", "test", backend.PluginContext{}, mockStreamRunner)
require.NoError(t, err)
waitWithTimeout(t, startedCh, time.Second)