Prometheus: Handle errors in buffered client (#58504)

* Handle prometheus errors in buffered client

* Handle prometheus warnings

* Fix tests

* Add unit test for warnings
This commit is contained in:
ismail simsek
2022-11-11 17:49:46 +03:00
committed by GitHub
parent 0bd120e01b
commit 8edeb1aa22
2 changed files with 145 additions and 44 deletions
@@ -3,6 +3,7 @@ package buffered
import (
"context"
"encoding/json"
"errors"
"fmt"
"math"
"net/http"
@@ -66,6 +67,11 @@ type Buffered struct {
TimeInterval string
}
type bufferedResponse struct {
Response interface{}
Warnings apiv1.Warnings
}
// New creates and object capable of executing and parsing a Prometheus queries. It's "buffered" because there is
// another implementation capable of streaming parse the response.
func New(roundTripper http.RoundTripper, tracer tracing.Tracer, settings backend.DataSourceInstanceSettings, plog log.Logger) (*Buffered, error) {
@@ -143,7 +149,7 @@ func (b *Buffered) runQuery(ctx context.Context, query *PrometheusQuery) (backen
logger := b.log.FromContext(ctx) // read trace-id and other info from the context
logger.Debug("Sending query", "start", query.Start, "end", query.End, "step", query.Step, "query", query.Expr)
response := make(map[TimeSeriesQueryType]interface{})
response := make(map[TimeSeriesQueryType]bufferedResponse)
timeRange := apiv1.Range{
Step: query.Step,
@@ -153,21 +159,39 @@ func (b *Buffered) runQuery(ctx context.Context, query *PrometheusQuery) (backen
}
if query.RangeQuery {
rangeResponse, _, err := b.client.QueryRange(ctx, query.Expr, timeRange)
rangeResponse, warnings, err := b.client.QueryRange(ctx, query.Expr, timeRange)
if err != nil {
var promErr *apiv1.Error
if errors.As(err, &promErr) {
logger.Error("Range query failed", "query", query.Expr, "error", err, "detail", promErr.Detail)
return backend.DataResponse{Error: fmt.Errorf("%w: details: %s", err, promErr.Detail)}, nil
}
logger.Error("Range query failed", "query", query.Expr, "err", err)
return backend.DataResponse{Error: err}, nil
}
response[RangeQueryType] = rangeResponse
response[RangeQueryType] = bufferedResponse{
Response: rangeResponse,
Warnings: warnings,
}
}
if query.InstantQuery {
instantResponse, _, err := b.client.Query(ctx, query.Expr, query.End)
instantResponse, warnings, err := b.client.Query(ctx, query.Expr, query.End)
if err != nil {
var promErr *apiv1.Error
if errors.As(err, &promErr) {
logger.Error("Instant query failed", "query", query.Expr, "error", err, "detail", promErr.Detail)
return backend.DataResponse{Error: fmt.Errorf("%w: details: %s", err, promErr.Detail)}, nil
}
logger.Error("Instant query failed", "query", query.Expr, "err", err)
return backend.DataResponse{Error: err}, nil
}
response[InstantQueryType] = instantResponse
response[InstantQueryType] = bufferedResponse{
Response: instantResponse,
Warnings: warnings,
}
}
// This is a special case
@@ -177,7 +201,10 @@ func (b *Buffered) runQuery(ctx context.Context, query *PrometheusQuery) (backen
if err != nil {
logger.Error("Exemplar query failed", "query", query.Expr, "err", err)
} else {
response[ExemplarQueryType] = exemplarResponse
response[ExemplarQueryType] = bufferedResponse{
Response: exemplarResponse,
Warnings: nil,
}
}
}
@@ -270,17 +297,17 @@ func (b *Buffered) parseTimeSeriesQuery(req *backend.QueryDataRequest) ([]*Prome
return qs, nil
}
func parseTimeSeriesResponse(value map[TimeSeriesQueryType]interface{}, query *PrometheusQuery) (data.Frames, error) {
func parseTimeSeriesResponse(value map[TimeSeriesQueryType]bufferedResponse, query *PrometheusQuery) (data.Frames, error) {
var (
frames = data.Frames{}
nextFrames = data.Frames{}
)
for _, value := range value {
for _, val := range value {
// Zero out the slice to prevent data corruption.
nextFrames = nextFrames[:0]
switch v := value.(type) {
switch v := val.Response.(type) {
case model.Matrix:
nextFrames = matrixToDataFrames(v, query, nextFrames)
case model.Vector:
@@ -293,12 +320,35 @@ func parseTimeSeriesResponse(value map[TimeSeriesQueryType]interface{}, query *P
return nil, fmt.Errorf("unexpected result type: %s query: %s", v, query.Expr)
}
if len(val.Warnings) > 0 {
for _, frame := range nextFrames {
if frame.Meta == nil {
frame.Meta = &data.FrameMeta{}
}
frame.Meta.Notices = readWarnings(val.Warnings)
}
}
frames = append(frames, nextFrames...)
}
return frames, nil
}
func readWarnings(warnings apiv1.Warnings) []data.Notice {
notices := []data.Notice{}
for _, w := range warnings {
notice := data.Notice{
Severity: data.NoticeSeverityWarning,
Text: w,
}
notices = append(notices, notice)
}
return notices
}
func calculatePrometheusInterval(model *QueryModel, timeInterval string, query backend.DataQuery, intervalCalculator intervalv2.Calculator) (time.Duration, error) {
queryInterval := model.Interval
@@ -598,7 +598,7 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) {
func TestPrometheus_parseTimeSeriesResponse(t *testing.T) {
t.Run("exemplars response should be sampled and parsed normally", func(t *testing.T) {
value := make(map[TimeSeriesQueryType]interface{})
value := make(map[TimeSeriesQueryType]bufferedResponse)
exemplars := []apiv1.ExemplarQueryResult{
{
SeriesLabels: p.LabelSet{
@@ -631,7 +631,10 @@ func TestPrometheus_parseTimeSeriesResponse(t *testing.T) {
},
}
value[ExemplarQueryType] = exemplars
value[ExemplarQueryType] = bufferedResponse{
Response: exemplars,
Warnings: nil,
}
query := &PrometheusQuery{
LegendFormat: "legend {{app}}",
}
@@ -652,7 +655,7 @@ func TestPrometheus_parseTimeSeriesResponse(t *testing.T) {
})
t.Run("exemplars response with inconsistent labels should marshal json ok", func(t *testing.T) {
value := make(map[TimeSeriesQueryType]interface{})
value := make(map[TimeSeriesQueryType]bufferedResponse)
exemplars := []apiv1.ExemplarQueryResult{
{
SeriesLabels: p.LabelSet{
@@ -685,7 +688,10 @@ func TestPrometheus_parseTimeSeriesResponse(t *testing.T) {
},
}
value[ExemplarQueryType] = exemplars
value[ExemplarQueryType] = bufferedResponse{
Response: exemplars,
Warnings: nil,
}
query := &PrometheusQuery{
LegendFormat: "legend {{app}}",
}
@@ -723,12 +729,15 @@ func TestPrometheus_parseTimeSeriesResponse(t *testing.T) {
{Value: 4, Timestamp: 4000},
{Value: 5, Timestamp: 5000},
}
value := make(map[TimeSeriesQueryType]interface{})
value[RangeQueryType] = p.Matrix{
&p.SampleStream{
Metric: p.Metric{"app": "Application", "tag2": "tag2"},
Values: values,
value := make(map[TimeSeriesQueryType]bufferedResponse)
value[RangeQueryType] = bufferedResponse{
Response: p.Matrix{
&p.SampleStream{
Metric: p.Metric{"app": "Application", "tag2": "tag2"},
Values: values,
},
},
Warnings: nil,
}
query := &PrometheusQuery{
LegendFormat: "legend {{app}}",
@@ -760,12 +769,15 @@ func TestPrometheus_parseTimeSeriesResponse(t *testing.T) {
{Value: 1, Timestamp: 1000},
{Value: 4, Timestamp: 4000},
}
value := make(map[TimeSeriesQueryType]interface{})
value[RangeQueryType] = p.Matrix{
&p.SampleStream{
Metric: p.Metric{"app": "Application", "tag2": "tag2"},
Values: values,
value := make(map[TimeSeriesQueryType]bufferedResponse)
value[RangeQueryType] = bufferedResponse{
Response: p.Matrix{
&p.SampleStream{
Metric: p.Metric{"app": "Application", "tag2": "tag2"},
Values: values,
},
},
Warnings: nil,
}
query := &PrometheusQuery{
LegendFormat: "",
@@ -791,12 +803,15 @@ func TestPrometheus_parseTimeSeriesResponse(t *testing.T) {
{Value: 1, Timestamp: 1000},
{Value: 4, Timestamp: 4000},
}
value := make(map[TimeSeriesQueryType]interface{})
value[RangeQueryType] = p.Matrix{
&p.SampleStream{
Metric: p.Metric{"app": "Application", "tag2": "tag2"},
Values: values,
value := make(map[TimeSeriesQueryType]bufferedResponse)
value[RangeQueryType] = bufferedResponse{
Response: p.Matrix{
&p.SampleStream{
Metric: p.Metric{"app": "Application", "tag2": "tag2"},
Values: values,
},
},
Warnings: nil,
}
query := &PrometheusQuery{
LegendFormat: "",
@@ -820,14 +835,17 @@ func TestPrometheus_parseTimeSeriesResponse(t *testing.T) {
})
t.Run("matrix response with NaN value should be changed to null", func(t *testing.T) {
value := make(map[TimeSeriesQueryType]interface{})
value[RangeQueryType] = p.Matrix{
&p.SampleStream{
Metric: p.Metric{"app": "Application"},
Values: []p.SamplePair{
{Value: p.SampleValue(math.NaN()), Timestamp: 1000},
value := make(map[TimeSeriesQueryType]bufferedResponse)
value[RangeQueryType] = bufferedResponse{
Response: p.Matrix{
&p.SampleStream{
Metric: p.Metric{"app": "Application"},
Values: []p.SamplePair{
{Value: p.SampleValue(math.NaN()), Timestamp: 1000},
},
},
},
Warnings: nil,
}
query := &PrometheusQuery{
LegendFormat: "",
@@ -844,13 +862,16 @@ func TestPrometheus_parseTimeSeriesResponse(t *testing.T) {
})
t.Run("vector response should be parsed normally", func(t *testing.T) {
value := make(map[TimeSeriesQueryType]interface{})
value[RangeQueryType] = p.Vector{
&p.Sample{
Metric: p.Metric{"app": "Application", "tag2": "tag2"},
Value: 1,
Timestamp: 123,
value := make(map[TimeSeriesQueryType]bufferedResponse)
value[RangeQueryType] = bufferedResponse{
Response: p.Vector{
&p.Sample{
Metric: p.Metric{"app": "Application", "tag2": "tag2"},
Value: 1,
Timestamp: 123,
},
},
Warnings: nil,
}
query := &PrometheusQuery{
LegendFormat: "legend {{app}}",
@@ -876,10 +897,13 @@ func TestPrometheus_parseTimeSeriesResponse(t *testing.T) {
})
t.Run("scalar response should be parsed normally", func(t *testing.T) {
value := make(map[TimeSeriesQueryType]interface{})
value[RangeQueryType] = &p.Scalar{
Value: 1,
Timestamp: 123,
value := make(map[TimeSeriesQueryType]bufferedResponse)
value[RangeQueryType] = bufferedResponse{
Response: &p.Scalar{
Value: 1,
Timestamp: 123,
},
Warnings: nil,
}
query := &PrometheusQuery{}
@@ -899,6 +923,33 @@ func TestPrometheus_parseTimeSeriesResponse(t *testing.T) {
require.Equal(t, "UTC", testValue.(time.Time).Location().String())
require.Equal(t, int64(123), testValue.(time.Time).UnixMilli())
})
t.Run("warnings, if there is any, should be added to each frame",
func(t *testing.T) {
value := make(map[TimeSeriesQueryType]bufferedResponse)
value[RangeQueryType] = bufferedResponse{
Response: &p.Scalar{
Value: 1,
Timestamp: 123,
},
Warnings: []string{"warning1", "warning2"},
}
query := &PrometheusQuery{}
res, err := parseTimeSeriesResponse(value, query)
require.NoError(t, err)
require.Len(t, res, 1)
require.Equal(t, res[0].Name, "1")
require.Len(t, res[0].Fields, 2)
require.Len(t, res[0].Fields[0].Labels, 0)
require.Equal(t, res[0].Fields[0].Name, "Time")
require.Equal(t, res[0].Fields[1].Name, "Value")
require.Equal(t, res[0].Fields[1].Config.DisplayNameFromDS, "1")
require.Equal(t, res[0].Meta.Notices[0].Text, "warning1")
require.Equal(t, res[0].Meta.Notices[1].Text, "warning2")
})
}
func queryContext(json string, timeRange backend.TimeRange) *backend.QueryDataRequest {