Prometheus: Implement Streaming JSON Parser (#48477)
use `prometheusStreamingJSONParser` feature toggle to enable
This commit is contained in:
@@ -0,0 +1,143 @@
|
||||
package querydata_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/experimental"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/tsdb/prometheus/models"
|
||||
)
|
||||
|
||||
var update = false
|
||||
|
||||
func TestMatrixResponses(t *testing.T) {
|
||||
tt := []struct {
|
||||
name string
|
||||
filepath string
|
||||
}{
|
||||
{name: "parse a simple matrix response", filepath: "range_simple"},
|
||||
{name: "parse a simple matrix response with value missing steps", filepath: "range_missing"},
|
||||
{name: "parse a response with Infinity", filepath: "range_infinity"},
|
||||
{name: "parse a response with NaN", filepath: "range_nan"},
|
||||
}
|
||||
|
||||
for _, test := range tt {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
queryFileName := filepath.Join("../testdata", test.filepath+".query.json")
|
||||
responseFileName := filepath.Join("../testdata", test.filepath+".result.json")
|
||||
goldenFileName := filepath.Join("../testdata", test.filepath+".result.streaming.golden")
|
||||
|
||||
query, err := loadStoredQuery(queryFileName)
|
||||
require.NoError(t, err)
|
||||
|
||||
responseBytes, err := os.ReadFile(responseFileName)
|
||||
require.NoError(t, err)
|
||||
|
||||
result, err := runQuery(responseBytes, query)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, result.Responses, 1)
|
||||
|
||||
dr, found := result.Responses["A"]
|
||||
require.True(t, found)
|
||||
|
||||
actual, err := json.MarshalIndent(&dr, "", " ")
|
||||
require.NoError(t, err)
|
||||
|
||||
// nolint:gosec
|
||||
// We can ignore the gosec G304 because this is a test with static defined paths
|
||||
expected, err := ioutil.ReadFile(goldenFileName + ".json")
|
||||
if err != nil || update {
|
||||
err = os.WriteFile(goldenFileName+".json", actual, 0600)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
require.JSONEq(t, string(expected), string(actual))
|
||||
|
||||
require.NoError(t, experimental.CheckGoldenDataResponse(goldenFileName+".txt", &dr, update))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// we store the prometheus query data in a json file, here is some minimal code
|
||||
// to be able to read it back. unfortunately we cannot use the models.Query
|
||||
// struct here, because it has `time.time` and `time.duration` fields that
|
||||
// cannot be unmarshalled from JSON automatically.
|
||||
type storedPrometheusQuery struct {
|
||||
RefId string
|
||||
RangeQuery bool
|
||||
Start int64
|
||||
End int64
|
||||
Step int64
|
||||
Expr string
|
||||
}
|
||||
|
||||
func loadStoredQuery(fileName string) (*backend.QueryDataRequest, error) {
|
||||
bytes, err := os.ReadFile(fileName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var sq storedPrometheusQuery
|
||||
|
||||
err = json.Unmarshal(bytes, &sq)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
qm := models.QueryModel{
|
||||
RangeQuery: sq.RangeQuery,
|
||||
Expr: sq.Expr,
|
||||
Interval: fmt.Sprintf("%ds", sq.Step),
|
||||
IntervalMS: sq.Step * 1000,
|
||||
}
|
||||
|
||||
data, err := json.Marshal(&qm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &backend.QueryDataRequest{
|
||||
Queries: []backend.DataQuery{
|
||||
{
|
||||
TimeRange: backend.TimeRange{
|
||||
From: time.Unix(sq.Start, 0),
|
||||
To: time.Unix(sq.End, 0),
|
||||
},
|
||||
RefID: sq.RefId,
|
||||
Interval: time.Second * time.Duration(sq.Step),
|
||||
JSON: json.RawMessage(data),
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func runQuery(response []byte, q *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
||||
tCtx := setup()
|
||||
res := &http.Response{
|
||||
StatusCode: 200,
|
||||
Body: ioutil.NopCloser(bytes.NewReader(response)),
|
||||
}
|
||||
tCtx.httpProvider.setResponse(res)
|
||||
return tCtx.queryData.Execute(context.Background(), q)
|
||||
}
|
||||
|
||||
type fakeLogger struct {
|
||||
log.Logger
|
||||
}
|
||||
|
||||
func (fl *fakeLogger) Debug(testMessage string, ctx ...interface{}) {}
|
||||
func (fl *fakeLogger) Info(testMessage string, ctx ...interface{}) {}
|
||||
func (fl *fakeLogger) Warn(testMessage string, ctx ...interface{}) {}
|
||||
func (fl *fakeLogger) Error(testMessage string, ctx ...interface{}) {}
|
||||
@@ -0,0 +1,100 @@
|
||||
package querydata_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana/pkg/tsdb/prometheus/models"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// when memory-profiling this benchmark, these commands are recommended:
|
||||
// - go test -benchmem -run=^$ -benchtime 1x -memprofile memprofile.out -memprofilerate 1 -bench ^BenchmarkJson$ github.com/grafana/grafana/pkg/tsdb/prometheus
|
||||
// - go tool pprof -http=localhost:6061 memprofile.out
|
||||
func BenchmarkJson(b *testing.B) {
|
||||
body, q := createJsonTestData(1642000000, 1, 300, 400)
|
||||
tCtx := setup()
|
||||
b.ResetTimer()
|
||||
for n := 0; n < b.N; n++ {
|
||||
res := http.Response{
|
||||
StatusCode: 200,
|
||||
Body: ioutil.NopCloser(bytes.NewReader(body)),
|
||||
}
|
||||
tCtx.httpProvider.setResponse(&res)
|
||||
_, err := tCtx.queryData.Execute(context.Background(), q)
|
||||
require.NoError(b, err)
|
||||
}
|
||||
}
|
||||
|
||||
const nanRate = 0.002
|
||||
|
||||
// we build the JSON file from strings,
|
||||
// it was easier to write it this way.
|
||||
func makeJsonTestMetric(index int) string {
|
||||
return fmt.Sprintf(`{"server":"main","category":"maintenance","case":"%v"}`, index)
|
||||
}
|
||||
|
||||
// return a value between -100 and +100, sometimes NaN, in string
|
||||
func makeJsonTestValue(r *rand.Rand) string {
|
||||
if r.Float64() < nanRate {
|
||||
return "NaN"
|
||||
} else {
|
||||
return fmt.Sprintf("%f", (r.Float64()*200)-100)
|
||||
}
|
||||
}
|
||||
|
||||
// create one time-series
|
||||
func makeJsonTestSeries(start int64, step int64, timestampCount int, r *rand.Rand, seriesIndex int) string {
|
||||
var values []string
|
||||
for i := 0; i < timestampCount; i++ {
|
||||
value := fmt.Sprintf(`[%d,"%v"]`, start+(int64(i)*step), makeJsonTestValue(r))
|
||||
values = append(values, value)
|
||||
}
|
||||
return fmt.Sprintf(`{"metric":%v,"values":[%v]}`, makeJsonTestMetric(seriesIndex), strings.Join(values, ","))
|
||||
}
|
||||
|
||||
func createJsonTestData(start int64, step int64, timestampCount int, seriesCount int) ([]byte, *backend.QueryDataRequest) {
|
||||
// we use random numbers as values, but they have to be the same numbers
|
||||
// every time we call this, so we create a random source.
|
||||
r := rand.New(rand.NewSource(42))
|
||||
var allSeries []string
|
||||
for i := 0; i < seriesCount; i++ {
|
||||
allSeries = append(allSeries, makeJsonTestSeries(start, step, timestampCount, r, i))
|
||||
}
|
||||
bytes := []byte(fmt.Sprintf(`{"status":"success","data":{"resultType":"matrix","result":[%v]}}`, strings.Join(allSeries, ",")))
|
||||
|
||||
qm := models.QueryModel{
|
||||
RangeQuery: true,
|
||||
Expr: "test",
|
||||
}
|
||||
|
||||
data, err := json.Marshal(&qm)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
res := backend.QueryDataRequest{
|
||||
Queries: []backend.DataQuery{
|
||||
{
|
||||
RefID: "A",
|
||||
TimeRange: backend.TimeRange{
|
||||
From: time.Unix(start, 0),
|
||||
To: time.Unix(start+((int64(timestampCount)-1)*step), 0),
|
||||
},
|
||||
Interval: time.Second * time.Duration(step),
|
||||
JSON: data,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
return bytes, &res
|
||||
}
|
||||
@@ -0,0 +1,183 @@
|
||||
package querydata
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana/pkg/infra/httpclient"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/tsdb/intervalv2"
|
||||
"github.com/grafana/grafana/pkg/tsdb/prometheus/client"
|
||||
"github.com/grafana/grafana/pkg/tsdb/prometheus/models"
|
||||
"github.com/grafana/grafana/pkg/util/maputil"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
)
|
||||
|
||||
const legendFormatAuto = "__auto"
|
||||
|
||||
var legendFormatRegexp = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`)
|
||||
|
||||
type clientGetter func(map[string]string) (*client.Client, error)
|
||||
|
||||
type ExemplarEvent struct {
|
||||
Time time.Time
|
||||
Value float64
|
||||
Labels map[string]string
|
||||
}
|
||||
|
||||
type QueryData struct {
|
||||
intervalCalculator intervalv2.Calculator
|
||||
tracer tracing.Tracer
|
||||
getClient clientGetter
|
||||
log log.Logger
|
||||
ID int64
|
||||
URL string
|
||||
TimeInterval string
|
||||
}
|
||||
|
||||
func New(
|
||||
httpClientProvider httpclient.Provider,
|
||||
cfg *setting.Cfg,
|
||||
features featuremgmt.FeatureToggles,
|
||||
tracer tracing.Tracer,
|
||||
settings backend.DataSourceInstanceSettings,
|
||||
plog log.Logger,
|
||||
) (*QueryData, error) {
|
||||
var jsonData map[string]interface{}
|
||||
if err := json.Unmarshal(settings.JSONData, &jsonData); err != nil {
|
||||
return nil, fmt.Errorf("error reading settings: %w", err)
|
||||
}
|
||||
|
||||
timeInterval, err := maputil.GetStringOptional(jsonData, "timeInterval")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p := client.NewProvider(settings, jsonData, httpClientProvider, cfg, features, plog)
|
||||
pc, err := client.NewProviderCache(p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &QueryData{
|
||||
intervalCalculator: intervalv2.NewCalculator(),
|
||||
tracer: tracer,
|
||||
log: plog,
|
||||
getClient: pc.GetClient,
|
||||
TimeInterval: timeInterval,
|
||||
ID: settings.ID,
|
||||
URL: settings.URL,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *QueryData) Execute(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
||||
fromAlert := req.Headers["FromAlert"] == "true"
|
||||
result := backend.QueryDataResponse{
|
||||
Responses: backend.Responses{},
|
||||
}
|
||||
|
||||
client, err := s.getClient(req.Headers)
|
||||
if err != nil {
|
||||
return &result, err
|
||||
}
|
||||
|
||||
for _, q := range req.Queries {
|
||||
query, err := models.Parse(q, s.TimeInterval, s.intervalCalculator, fromAlert)
|
||||
if err != nil {
|
||||
return &result, err
|
||||
}
|
||||
r, err := s.fetch(ctx, client, query)
|
||||
if err != nil {
|
||||
return &result, err
|
||||
}
|
||||
if r == nil {
|
||||
s.log.Debug("Received nilresponse from runQuery", "query", query.Expr)
|
||||
continue
|
||||
}
|
||||
result.Responses[q.RefID] = *r
|
||||
}
|
||||
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
func (s *QueryData) fetch(ctx context.Context, client *client.Client, q *models.Query) (*backend.DataResponse, error) {
|
||||
s.log.Debug("Sending query", "start", q.Start, "end", q.End, "step", q.Step, "query", q.Expr)
|
||||
|
||||
traceCtx, span := s.trace(ctx, q)
|
||||
defer span.End()
|
||||
|
||||
response := &backend.DataResponse{
|
||||
Frames: data.Frames{},
|
||||
Error: nil,
|
||||
}
|
||||
|
||||
if q.RangeQuery {
|
||||
res, err := s.rangeQuery(traceCtx, client, q)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
response.Frames = res.Frames
|
||||
}
|
||||
|
||||
if q.InstantQuery {
|
||||
res, err := s.instantQuery(traceCtx, client, q)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
response.Frames = append(response.Frames, res.Frames...)
|
||||
}
|
||||
|
||||
if q.ExemplarQuery {
|
||||
res, err := s.exemplarQuery(traceCtx, client, q)
|
||||
if err != nil {
|
||||
// If exemplar query returns error, we want to only log it and
|
||||
// continue with other results processing
|
||||
s.log.Error("Exemplar query failed", "query", q.Expr, "err", err)
|
||||
}
|
||||
if res != nil {
|
||||
response.Frames = append(response.Frames, res.Frames...)
|
||||
}
|
||||
}
|
||||
|
||||
return response, nil
|
||||
}
|
||||
|
||||
func (s *QueryData) rangeQuery(ctx context.Context, c *client.Client, q *models.Query) (*backend.DataResponse, error) {
|
||||
res, err := c.QueryRange(ctx, q)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s.parseResponse(ctx, q, res)
|
||||
}
|
||||
|
||||
func (s *QueryData) instantQuery(ctx context.Context, c *client.Client, q *models.Query) (*backend.DataResponse, error) {
|
||||
res, err := c.QueryInstant(ctx, q)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s.parseResponse(ctx, q, res)
|
||||
}
|
||||
|
||||
func (s *QueryData) exemplarQuery(ctx context.Context, c *client.Client, q *models.Query) (*backend.DataResponse, error) {
|
||||
res, err := c.QueryExemplars(ctx, q)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s.parseResponse(ctx, q, res)
|
||||
}
|
||||
|
||||
func (s *QueryData) trace(ctx context.Context, q *models.Query) (context.Context, tracing.Span) {
|
||||
traceCtx, span := s.tracer.Start(ctx, "datasource.prometheus")
|
||||
span.SetAttributes("expr", q.Expr, attribute.Key("expr").String(q.Expr))
|
||||
span.SetAttributes("start_unixnano", q.Start, attribute.Key("start_unixnano").Int64(q.Start.UnixNano()))
|
||||
span.SetAttributes("stop_unixnano", q.End, attribute.Key("stop_unixnano").Int64(q.End.UnixNano()))
|
||||
return traceCtx, span
|
||||
}
|
||||
@@ -0,0 +1,464 @@
|
||||
package querydata_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"net/http"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
sdkhttpclient "github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana/pkg/infra/httpclient"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/tsdb/prometheus/models"
|
||||
"github.com/grafana/grafana/pkg/tsdb/prometheus/querydata"
|
||||
apiv1 "github.com/prometheus/client_golang/api/prometheus/v1"
|
||||
p "github.com/prometheus/common/model"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestPrometheus_parseTimeSeriesResponse(t *testing.T) {
|
||||
t.Run("exemplars response should be sampled and parsed normally", func(t *testing.T) {
|
||||
t.Skip()
|
||||
exemplars := []apiv1.ExemplarQueryResult{
|
||||
{
|
||||
SeriesLabels: p.LabelSet{
|
||||
"__name__": "tns_request_duration_seconds_bucket",
|
||||
"instance": "app:80",
|
||||
"job": "tns/app",
|
||||
},
|
||||
Exemplars: []apiv1.Exemplar{
|
||||
{
|
||||
Labels: p.LabelSet{"traceID": "test1"},
|
||||
Value: 0.003535405,
|
||||
Timestamp: p.TimeFromUnixNano(time.Now().Add(-2 * time.Minute).UnixNano()),
|
||||
},
|
||||
{
|
||||
Labels: p.LabelSet{"traceID": "test2"},
|
||||
Value: 0.005555605,
|
||||
Timestamp: p.TimeFromUnixNano(time.Now().Add(-4 * time.Minute).UnixNano()),
|
||||
},
|
||||
{
|
||||
Labels: p.LabelSet{"traceID": "test3"},
|
||||
Value: 0.007545445,
|
||||
Timestamp: p.TimeFromUnixNano(time.Now().Add(-6 * time.Minute).UnixNano()),
|
||||
},
|
||||
{
|
||||
Labels: p.LabelSet{"traceID": "test4"},
|
||||
Value: 0.009545445,
|
||||
Timestamp: p.TimeFromUnixNano(time.Now().Add(-7 * time.Minute).UnixNano()),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
tctx := setup()
|
||||
|
||||
qm := models.QueryModel{
|
||||
LegendFormat: "legend {{app}}",
|
||||
UtcOffsetSec: 0,
|
||||
ExemplarQuery: true,
|
||||
}
|
||||
b, err := json.Marshal(&qm)
|
||||
require.NoError(t, err)
|
||||
query := backend.DataQuery{
|
||||
RefID: "A",
|
||||
JSON: b,
|
||||
}
|
||||
res, err := execute(tctx, query, exemplars)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Test fields
|
||||
require.Len(t, res, 1)
|
||||
// require.Equal(t, res[0].Name, "exemplar")
|
||||
require.Equal(t, res[0].Fields[0].Name, "Time")
|
||||
require.Equal(t, res[0].Fields[1].Name, "Value")
|
||||
require.Len(t, res[0].Fields, 6)
|
||||
|
||||
// Test correct values (sampled to 2)
|
||||
require.Equal(t, res[0].Fields[1].Len(), 2)
|
||||
require.Equal(t, res[0].Fields[1].At(0), 0.009545445)
|
||||
require.Equal(t, res[0].Fields[1].At(1), 0.003535405)
|
||||
})
|
||||
|
||||
t.Run("matrix response should be parsed normally", func(t *testing.T) {
|
||||
values := []p.SamplePair{
|
||||
{Value: 1, Timestamp: 1000},
|
||||
{Value: 2, Timestamp: 2000},
|
||||
{Value: 3, Timestamp: 3000},
|
||||
{Value: 4, Timestamp: 4000},
|
||||
{Value: 5, Timestamp: 5000},
|
||||
}
|
||||
result := queryResult{
|
||||
Type: p.ValMatrix,
|
||||
Result: p.Matrix{
|
||||
&p.SampleStream{
|
||||
Metric: p.Metric{"app": "Application", "tag2": "tag2"},
|
||||
Values: values,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
qm := models.QueryModel{
|
||||
LegendFormat: "legend {{app}}",
|
||||
UtcOffsetSec: 0,
|
||||
RangeQuery: true,
|
||||
}
|
||||
b, err := json.Marshal(&qm)
|
||||
require.NoError(t, err)
|
||||
query := backend.DataQuery{
|
||||
TimeRange: backend.TimeRange{
|
||||
From: time.Unix(1, 0).UTC(),
|
||||
To: time.Unix(5, 0).UTC(),
|
||||
},
|
||||
JSON: b,
|
||||
}
|
||||
tctx := setup()
|
||||
res, err := execute(tctx, query, result)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, res, 1)
|
||||
//require.Equal(t, "legend Application", res[0].Name)
|
||||
require.Len(t, res[0].Fields, 2)
|
||||
require.Len(t, res[0].Fields[0].Labels, 0)
|
||||
require.Equal(t, "Time", res[0].Fields[0].Name)
|
||||
require.Len(t, res[0].Fields[1].Labels, 2)
|
||||
require.Equal(t, "app=Application, tag2=tag2", res[0].Fields[1].Labels.String())
|
||||
require.Equal(t, "Value", res[0].Fields[1].Name)
|
||||
require.Equal(t, "legend Application", res[0].Fields[1].Config.DisplayNameFromDS)
|
||||
|
||||
// Ensure the timestamps are UTC zoned
|
||||
testValue := res[0].Fields[0].At(0)
|
||||
require.Equal(t, "UTC", testValue.(time.Time).Location().String())
|
||||
})
|
||||
|
||||
t.Run("matrix response with missed data points should be parsed correctly", func(t *testing.T) {
|
||||
values := []p.SamplePair{
|
||||
{Value: 1, Timestamp: 1000},
|
||||
{Value: 4, Timestamp: 4000},
|
||||
}
|
||||
result := queryResult{
|
||||
Type: p.ValMatrix,
|
||||
Result: p.Matrix{
|
||||
&p.SampleStream{
|
||||
Metric: p.Metric{"app": "Application", "tag2": "tag2"},
|
||||
Values: values,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
qm := models.QueryModel{
|
||||
LegendFormat: "",
|
||||
UtcOffsetSec: 0,
|
||||
RangeQuery: true,
|
||||
}
|
||||
b, err := json.Marshal(&qm)
|
||||
require.NoError(t, err)
|
||||
query := backend.DataQuery{
|
||||
TimeRange: backend.TimeRange{
|
||||
From: time.Unix(1, 0).UTC(),
|
||||
To: time.Unix(4, 0).UTC(),
|
||||
},
|
||||
JSON: b,
|
||||
}
|
||||
tctx := setup()
|
||||
res, err := execute(tctx, query, result)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Len(t, res, 1)
|
||||
require.Equal(t, res[0].Fields[0].Len(), 2)
|
||||
require.Equal(t, time.Unix(1, 0).UTC(), res[0].Fields[0].At(0))
|
||||
require.Equal(t, time.Unix(4, 0).UTC(), res[0].Fields[0].At(1))
|
||||
require.Equal(t, res[0].Fields[1].Len(), 2)
|
||||
require.Equal(t, float64(1), res[0].Fields[1].At(0).(float64))
|
||||
require.Equal(t, float64(4), res[0].Fields[1].At(1).(float64))
|
||||
})
|
||||
|
||||
t.Run("matrix response with from alerting missed data points should be parsed correctly", func(t *testing.T) {
|
||||
values := []p.SamplePair{
|
||||
{Value: 1, Timestamp: 1000},
|
||||
{Value: 4, Timestamp: 4000},
|
||||
}
|
||||
result := queryResult{
|
||||
Type: p.ValMatrix,
|
||||
Result: p.Matrix{
|
||||
&p.SampleStream{
|
||||
Metric: p.Metric{"app": "Application", "tag2": "tag2"},
|
||||
Values: values,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
qm := models.QueryModel{
|
||||
LegendFormat: "",
|
||||
UtcOffsetSec: 0,
|
||||
RangeQuery: true,
|
||||
}
|
||||
b, err := json.Marshal(&qm)
|
||||
require.NoError(t, err)
|
||||
query := backend.DataQuery{
|
||||
TimeRange: backend.TimeRange{
|
||||
From: time.Unix(1, 0).UTC(),
|
||||
To: time.Unix(4, 0).UTC(),
|
||||
},
|
||||
JSON: b,
|
||||
}
|
||||
tctx := setup()
|
||||
res, err := execute(tctx, query, result)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Len(t, res, 1)
|
||||
require.Equal(t, res[0].Name, "{app=\"Application\", tag2=\"tag2\"}")
|
||||
require.Len(t, res[0].Fields, 2)
|
||||
require.Len(t, res[0].Fields[0].Labels, 0)
|
||||
require.Equal(t, res[0].Fields[0].Name, "Time")
|
||||
require.Len(t, res[0].Fields[1].Labels, 2)
|
||||
require.Equal(t, res[0].Fields[1].Labels.String(), "app=Application, tag2=tag2")
|
||||
require.Equal(t, res[0].Fields[1].Name, "Value")
|
||||
require.Equal(t, res[0].Fields[1].Config.DisplayNameFromDS, "{app=\"Application\", tag2=\"tag2\"}")
|
||||
})
|
||||
|
||||
t.Run("matrix response with NaN value should be changed to null", func(t *testing.T) {
|
||||
result := queryResult{
|
||||
Type: p.ValMatrix,
|
||||
Result: p.Matrix{
|
||||
&p.SampleStream{
|
||||
Metric: p.Metric{"app": "Application"},
|
||||
Values: []p.SamplePair{
|
||||
{Value: p.SampleValue(math.NaN()), Timestamp: 1000},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
qm := models.QueryModel{
|
||||
LegendFormat: "",
|
||||
UtcOffsetSec: 0,
|
||||
RangeQuery: true,
|
||||
}
|
||||
b, err := json.Marshal(&qm)
|
||||
require.NoError(t, err)
|
||||
query := backend.DataQuery{
|
||||
TimeRange: backend.TimeRange{
|
||||
From: time.Unix(1, 0).UTC(),
|
||||
To: time.Unix(4, 0).UTC(),
|
||||
},
|
||||
JSON: b,
|
||||
}
|
||||
|
||||
tctx := setup()
|
||||
res, err := execute(tctx, query, result)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, res[0].Fields[1].Name, "Value")
|
||||
require.True(t, math.IsNaN(res[0].Fields[1].At(0).(float64)))
|
||||
})
|
||||
|
||||
t.Run("vector response should be parsed normally", func(t *testing.T) {
|
||||
qr := queryResult{
|
||||
Type: p.ValVector,
|
||||
Result: p.Vector{
|
||||
&p.Sample{
|
||||
Metric: p.Metric{"app": "Application", "tag2": "tag2"},
|
||||
Value: 1,
|
||||
Timestamp: 123,
|
||||
},
|
||||
},
|
||||
}
|
||||
qm := models.QueryModel{
|
||||
LegendFormat: "legend {{app}}",
|
||||
UtcOffsetSec: 0,
|
||||
InstantQuery: true,
|
||||
}
|
||||
b, err := json.Marshal(&qm)
|
||||
require.NoError(t, err)
|
||||
query := backend.DataQuery{
|
||||
JSON: b,
|
||||
}
|
||||
tctx := setup()
|
||||
res, err := execute(tctx, query, qr)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, res, 1)
|
||||
require.Equal(t, res[0].Name, "legend Application")
|
||||
require.Len(t, res[0].Fields, 2)
|
||||
require.Len(t, res[0].Fields[0].Labels, 0)
|
||||
require.Equal(t, res[0].Fields[0].Name, "Time")
|
||||
require.Equal(t, res[0].Fields[0].Name, "Time")
|
||||
require.Len(t, res[0].Fields[1].Labels, 2)
|
||||
require.Equal(t, res[0].Fields[1].Labels.String(), "app=Application, tag2=tag2")
|
||||
require.Equal(t, res[0].Fields[1].Name, "Value")
|
||||
require.Equal(t, res[0].Fields[1].Config.DisplayNameFromDS, "legend Application")
|
||||
|
||||
// Ensure the timestamps are UTC zoned
|
||||
testValue := res[0].Fields[0].At(0)
|
||||
require.Equal(t, "UTC", testValue.(time.Time).Location().String())
|
||||
require.Equal(t, int64(123), testValue.(time.Time).UnixMilli())
|
||||
})
|
||||
|
||||
t.Run("scalar response should be parsed normally", func(t *testing.T) {
|
||||
t.Skip("TODO: implement scalar responses")
|
||||
qr := queryResult{
|
||||
Type: p.ValScalar,
|
||||
Result: &p.Scalar{
|
||||
Value: 1,
|
||||
Timestamp: 123,
|
||||
},
|
||||
}
|
||||
qm := models.QueryModel{
|
||||
LegendFormat: "",
|
||||
UtcOffsetSec: 0,
|
||||
InstantQuery: true,
|
||||
}
|
||||
b, err := json.Marshal(&qm)
|
||||
require.NoError(t, err)
|
||||
query := backend.DataQuery{
|
||||
JSON: b,
|
||||
}
|
||||
tctx := setup()
|
||||
res, err := execute(tctx, query, qr)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, res, 1)
|
||||
require.Equal(t, res[0].Name, "1")
|
||||
require.Len(t, res[0].Fields, 2)
|
||||
require.Len(t, res[0].Fields[0].Labels, 0)
|
||||
require.Equal(t, res[0].Fields[0].Name, "Time")
|
||||
require.Equal(t, res[0].Fields[1].Name, "Value")
|
||||
require.Equal(t, res[0].Fields[1].Config.DisplayNameFromDS, "1")
|
||||
|
||||
// Ensure the timestamps are UTC zoned
|
||||
testValue := res[0].Fields[0].At(0)
|
||||
require.Equal(t, "UTC", testValue.(time.Time).Location().String())
|
||||
require.Equal(t, int64(123), testValue.(time.Time).UnixMilli())
|
||||
})
|
||||
}
|
||||
|
||||
type queryResult struct {
|
||||
Type p.ValueType `json:"resultType"`
|
||||
Result interface{} `json:"result"`
|
||||
}
|
||||
|
||||
func execute(tctx *testContext, query backend.DataQuery, qr interface{}) (data.Frames, error) {
|
||||
req := backend.QueryDataRequest{
|
||||
Queries: []backend.DataQuery{query},
|
||||
Headers: map[string]string{},
|
||||
}
|
||||
|
||||
promRes, err := toAPIResponse(qr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tctx.httpProvider.setResponse(promRes)
|
||||
|
||||
res, err := tctx.queryData.Execute(context.Background(), &req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return res.Responses[req.Queries[0].RefID].Frames, nil
|
||||
}
|
||||
|
||||
type apiResponse struct {
|
||||
Status string `json:"status"`
|
||||
Data json.RawMessage `json:"data"`
|
||||
}
|
||||
|
||||
func toAPIResponse(d interface{}) (*http.Response, error) {
|
||||
b, err := json.Marshal(d)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res := apiResponse{
|
||||
Status: "success",
|
||||
Data: json.RawMessage(b),
|
||||
}
|
||||
|
||||
raw, err := json.Marshal(&res)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &http.Response{
|
||||
StatusCode: 200,
|
||||
Body: ioutil.NopCloser(bytes.NewReader(raw)),
|
||||
}, nil
|
||||
}
|
||||
|
||||
type testContext struct {
|
||||
httpProvider *fakeHttpClientProvider
|
||||
queryData *querydata.QueryData
|
||||
}
|
||||
|
||||
func setup() *testContext {
|
||||
tracer, err := tracing.InitializeTracerForTest()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
httpProvider := &fakeHttpClientProvider{
|
||||
opts: sdkhttpclient.Options{
|
||||
Timeouts: &sdkhttpclient.DefaultTimeoutOptions,
|
||||
},
|
||||
res: &http.Response{
|
||||
StatusCode: 200,
|
||||
Body: ioutil.NopCloser(bytes.NewReader([]byte(`{}`))),
|
||||
},
|
||||
}
|
||||
queryData, _ := querydata.New(
|
||||
httpProvider,
|
||||
setting.NewCfg(),
|
||||
&fakeFeatureToggles{enabled: true},
|
||||
tracer,
|
||||
backend.DataSourceInstanceSettings{URL: "http://localhost:9090", JSONData: json.RawMessage(`{"timeInterval": "15s"}`)},
|
||||
&fakeLogger{},
|
||||
)
|
||||
|
||||
return &testContext{
|
||||
httpProvider: httpProvider,
|
||||
queryData: queryData,
|
||||
}
|
||||
}
|
||||
|
||||
type fakeFeatureToggles struct {
|
||||
enabled bool
|
||||
}
|
||||
|
||||
func (f *fakeFeatureToggles) IsEnabled(feature string) bool {
|
||||
return f.enabled
|
||||
}
|
||||
|
||||
type fakeHttpClientProvider struct {
|
||||
httpclient.Provider
|
||||
opts sdkhttpclient.Options
|
||||
res *http.Response
|
||||
}
|
||||
|
||||
func (p *fakeHttpClientProvider) New(opts ...sdkhttpclient.Options) (*http.Client, error) {
|
||||
p.opts = opts[0]
|
||||
c, err := sdkhttpclient.New(opts[0])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.Transport = p
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (p *fakeHttpClientProvider) GetTransport(opts ...sdkhttpclient.Options) (http.RoundTripper, error) {
|
||||
p.opts = opts[0]
|
||||
return http.DefaultTransport, nil
|
||||
}
|
||||
|
||||
func (p *fakeHttpClientProvider) setResponse(res *http.Response) {
|
||||
p.res = res
|
||||
}
|
||||
|
||||
func (p *fakeHttpClientProvider) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
return p.res, nil
|
||||
}
|
||||
@@ -0,0 +1,111 @@
|
||||
package querydata
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana/pkg/tsdb/prometheus/models"
|
||||
"github.com/grafana/grafana/pkg/util/converter"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
)
|
||||
|
||||
func (s *QueryData) parseResponse(ctx context.Context, q *models.Query, res *http.Response) (*backend.DataResponse, error) {
|
||||
defer func() {
|
||||
if err := res.Body.Close(); err != nil {
|
||||
s.log.Error("Failed to close response body", "err", err)
|
||||
}
|
||||
}()
|
||||
|
||||
iter := jsoniter.Parse(jsoniter.ConfigDefault, res.Body, 1024)
|
||||
r := converter.ReadPrometheusStyleResult(iter)
|
||||
if r == nil {
|
||||
return nil, fmt.Errorf("received empty response from prometheus")
|
||||
}
|
||||
|
||||
// The ExecutedQueryString can be viewed in QueryInspector in UI
|
||||
for _, frame := range r.Frames {
|
||||
addMetadataToFrame(q, frame)
|
||||
}
|
||||
|
||||
return r, nil
|
||||
}
|
||||
|
||||
func addMetadataToFrame(q *models.Query, frame *data.Frame) {
|
||||
if frame.Meta == nil {
|
||||
frame.Meta = &data.FrameMeta{}
|
||||
}
|
||||
frame.Meta.ExecutedQueryString = executedQueryString(q)
|
||||
if len(frame.Fields) < 2 {
|
||||
return
|
||||
}
|
||||
frame.Name = getName(q, frame)
|
||||
frame.Fields[0].Config = &data.FieldConfig{Interval: float64(q.Step.Milliseconds())}
|
||||
if frame.Name != "" {
|
||||
frame.Fields[1].Config = &data.FieldConfig{DisplayNameFromDS: frame.Name}
|
||||
}
|
||||
}
|
||||
|
||||
// this is based on the logic from the String() function in github.com/prometheus/common/model.go
|
||||
func metricNameFromLabels(f *data.Frame) string {
|
||||
labels := f.Fields[1].Labels
|
||||
metricName, hasName := labels["__name__"]
|
||||
numLabels := len(labels) - 1
|
||||
if !hasName {
|
||||
numLabels = len(labels)
|
||||
}
|
||||
labelStrings := make([]string, 0, numLabels)
|
||||
for label, value := range labels {
|
||||
if label != "__name__" {
|
||||
labelStrings = append(labelStrings, fmt.Sprintf("%s=%q", label, value))
|
||||
}
|
||||
}
|
||||
|
||||
switch numLabels {
|
||||
case 0:
|
||||
if hasName {
|
||||
return metricName
|
||||
}
|
||||
return "{}"
|
||||
default:
|
||||
sort.Strings(labelStrings)
|
||||
return fmt.Sprintf("%s{%s}", metricName, strings.Join(labelStrings, ", "))
|
||||
}
|
||||
}
|
||||
|
||||
func executedQueryString(q *models.Query) string {
|
||||
return "Expr: " + q.Expr + "\n" + "Step: " + q.Step.String()
|
||||
}
|
||||
|
||||
func getName(q *models.Query, frame *data.Frame) string {
|
||||
labels := frame.Fields[1].Labels
|
||||
legend := metricNameFromLabels(frame)
|
||||
|
||||
if q.LegendFormat == legendFormatAuto && len(labels) > 0 {
|
||||
return ""
|
||||
}
|
||||
|
||||
if q.LegendFormat != "" {
|
||||
result := legendFormatRegexp.ReplaceAllFunc([]byte(q.LegendFormat), func(in []byte) []byte {
|
||||
labelName := strings.Replace(string(in), "{{", "", 1)
|
||||
labelName = strings.Replace(labelName, "}}", "", 1)
|
||||
labelName = strings.TrimSpace(labelName)
|
||||
if val, exists := labels[labelName]; exists {
|
||||
return []byte(val)
|
||||
}
|
||||
return []byte{}
|
||||
})
|
||||
legend = string(result)
|
||||
}
|
||||
|
||||
// If legend is empty brackets, use query expression
|
||||
if legend == "{}" {
|
||||
return q.Expr
|
||||
}
|
||||
|
||||
return legend
|
||||
}
|
||||
Reference in New Issue
Block a user