Files
grafana/pkg/services/live/features/testdata.go

114 lines
2.8 KiB
Go

package features
import (
"encoding/json"
"fmt"
"math/rand"
"time"
"github.com/centrifugal/centrifuge"
"github.com/grafana/grafana/pkg/models"
)
// testDataRunner manages all the `grafana/dashboard/*` channels.
type testDataRunner struct {
publisher models.ChannelPublisher
running bool
speedMillis int
dropPercent float64
channel string
name string
}
// TestDataSupplier manages all the `grafana/testdata/*` channels.
type TestDataSupplier struct {
Publisher models.ChannelPublisher
}
// GetHandlerForPath gets the channel handler for a path.
// Called on init.
func (s *TestDataSupplier) GetHandlerForPath(path string) (models.ChannelHandler, error) {
channel := "grafana/testdata/" + path
if path == "random-2s-stream" {
return &testDataRunner{
publisher: s.Publisher,
running: false,
speedMillis: 2000,
dropPercent: 0,
channel: channel,
name: path,
}, nil
}
if path == "random-flakey-stream" {
return &testDataRunner{
publisher: s.Publisher,
running: false,
speedMillis: 400,
dropPercent: .6,
channel: channel,
}, nil
}
return nil, fmt.Errorf("unknown channel")
}
// OnSubscribe will let anyone connect to the path
func (r *testDataRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) (centrifuge.SubscribeReply, error) {
if !r.running {
r.running = true
// Run in the background
go r.runRandomCSV()
}
return centrifuge.SubscribeReply{}, nil
}
// OnPublish checks if a message from the websocket can be broadcast on this channel
func (r *testDataRunner) OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) (centrifuge.PublishReply, error) {
return centrifuge.PublishReply{}, fmt.Errorf("can not publish to testdata")
}
// runRandomCSV is just for an example.
func (r *testDataRunner) runRandomCSV() {
spread := 50.0
walker := rand.Float64() * 100
ticker := time.NewTicker(time.Duration(r.speedMillis) * time.Millisecond)
measurement := models.Measurement{
Name: r.name,
Time: 0,
Values: make(map[string]interface{}, 5),
}
msg := models.MeasurementBatch{
Measurements: []models.Measurement{measurement}, // always a single measurement
}
for t := range ticker.C {
if rand.Float64() <= r.dropPercent {
continue
}
delta := rand.Float64() - 0.5
walker += delta
measurement.Time = t.UnixNano() / int64(time.Millisecond)
measurement.Values["value"] = walker
measurement.Values["min"] = walker - ((rand.Float64() * spread) + 0.01)
measurement.Values["max"] = walker + ((rand.Float64() * spread) + 0.01)
bytes, err := json.Marshal(&msg)
if err != nil {
logger.Warn("unable to marshal line", "error", err)
continue
}
err = r.publisher(r.channel, bytes)
if err != nil {
logger.Warn("write", "channel", r.channel, "measurement", measurement)
}
}
}