c35c689a96
Automatically forward core plugin request HTTP headers in outgoing HTTP requests. Core datasource plugin authors don't have to specifically handle forwarding of HTTP headers, e.g. do not have to "hardcode" the header-names in the datasource plugin, if not having custom needs. Fixes #57065
432 lines
13 KiB
Go
432 lines
13 KiB
Go
package conditions
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/grafana/grafana/pkg/tsdb/legacydata"
|
|
"github.com/grafana/grafana/pkg/tsdb/legacydata/interval"
|
|
"github.com/grafana/grafana/pkg/tsdb/prometheus"
|
|
|
|
gocontext "context"
|
|
|
|
"github.com/grafana/grafana-plugin-sdk-go/data"
|
|
|
|
"github.com/grafana/grafana/pkg/components/null"
|
|
"github.com/grafana/grafana/pkg/components/simplejson"
|
|
"github.com/grafana/grafana/pkg/services/alerting"
|
|
"github.com/grafana/grafana/pkg/services/datasources"
|
|
ngalertmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
|
|
)
|
|
|
|
func init() {
|
|
alerting.RegisterCondition("query", func(model *simplejson.Json, index int) (alerting.Condition, error) {
|
|
return newQueryCondition(model, index)
|
|
})
|
|
}
|
|
|
|
// QueryCondition is responsible for issue and query, reduce the
|
|
// timeseries into single values and evaluate if they are firing or not.
|
|
type QueryCondition struct {
|
|
Index int
|
|
Query AlertQuery
|
|
Reducer *queryReducer
|
|
Evaluator AlertEvaluator
|
|
Operator string
|
|
}
|
|
|
|
// AlertQuery contains information about what datasource a query
|
|
// should be sent to and the query object.
|
|
type AlertQuery struct {
|
|
Model *simplejson.Json
|
|
DatasourceID int64
|
|
From string
|
|
To string
|
|
}
|
|
|
|
// Eval evaluates the `QueryCondition`.
|
|
func (c *QueryCondition) Eval(context *alerting.EvalContext, requestHandler legacydata.RequestHandler) (*alerting.ConditionResult, error) {
|
|
timeRange := legacydata.NewDataTimeRange(c.Query.From, c.Query.To)
|
|
|
|
seriesList, err := c.executeQuery(context, timeRange, requestHandler)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
emptySeriesCount := 0
|
|
evalMatchCount := 0
|
|
|
|
// matches represents all the series that violate the alert condition
|
|
var matches []*alerting.EvalMatch
|
|
// allMatches capture all evaluation matches irregardless on whether the condition is met or not
|
|
var allMatches []*alerting.EvalMatch
|
|
|
|
for _, series := range seriesList {
|
|
reducedValue := c.Reducer.Reduce(series)
|
|
evalMatch := c.Evaluator.Eval(reducedValue)
|
|
|
|
if !reducedValue.Valid {
|
|
emptySeriesCount++
|
|
}
|
|
|
|
if context.IsTestRun {
|
|
context.Logs = append(context.Logs, &alerting.ResultLogEntry{
|
|
Message: fmt.Sprintf("Condition[%d]: Eval: %v, Metric: %s, Value: %s", c.Index, evalMatch, series.Name, reducedValue),
|
|
})
|
|
}
|
|
|
|
em := alerting.EvalMatch{
|
|
Metric: series.Name,
|
|
Value: reducedValue,
|
|
Tags: series.Tags,
|
|
}
|
|
|
|
allMatches = append(allMatches, &em)
|
|
|
|
if evalMatch {
|
|
evalMatchCount++
|
|
matches = append(matches, &em)
|
|
}
|
|
}
|
|
|
|
// handle no series special case
|
|
if len(seriesList) == 0 {
|
|
// eval condition for null value
|
|
evalMatch := c.Evaluator.Eval(null.FloatFromPtr(nil))
|
|
|
|
if context.IsTestRun {
|
|
context.Logs = append(context.Logs, &alerting.ResultLogEntry{
|
|
Message: fmt.Sprintf("Condition: Eval: %v, Query Returned No Series (reduced to null/no value)", evalMatch),
|
|
})
|
|
}
|
|
|
|
if evalMatch {
|
|
evalMatchCount++
|
|
matches = append(matches, &alerting.EvalMatch{Metric: "NoData", Value: null.FloatFromPtr(nil)})
|
|
}
|
|
}
|
|
|
|
return &alerting.ConditionResult{
|
|
Firing: evalMatchCount > 0,
|
|
NoDataFound: emptySeriesCount == len(seriesList),
|
|
Operator: c.Operator,
|
|
EvalMatches: matches,
|
|
AllMatches: allMatches,
|
|
}, nil
|
|
}
|
|
|
|
func calculateInterval(timeRange legacydata.DataTimeRange, model *simplejson.Json, dsInfo *datasources.DataSource) (time.Duration, error) {
|
|
// if there is no min-interval specified in the datasource or in the dashboard-panel,
|
|
// the value of 1ms is used (this is how it is done in the dashboard-interval-calculation too,
|
|
// see https://github.com/grafana/grafana/blob/9a0040c0aeaae8357c650cec2ee644a571dddf3d/packages/grafana-data/src/datetime/rangeutil.ts#L264)
|
|
defaultMinInterval := time.Millisecond * 1
|
|
|
|
// interval.GetIntervalFrom has two problems (but they do not affect us here):
|
|
// - it returns the min-interval, so it should be called interval.GetMinIntervalFrom
|
|
// - it falls back to model.intervalMs. it should not, because that one is the real final
|
|
// interval-value calculated by the browser. but, in this specific case (old-alert),
|
|
// that value is not set, so the fallback never happens.
|
|
minInterval, err := interval.GetIntervalFrom(dsInfo, model, defaultMinInterval)
|
|
|
|
if err != nil {
|
|
return time.Duration(0), err
|
|
}
|
|
|
|
calc := interval.NewCalculator()
|
|
|
|
interval := calc.Calculate(timeRange, minInterval)
|
|
|
|
return interval.Value, nil
|
|
}
|
|
|
|
func (c *QueryCondition) executeQuery(context *alerting.EvalContext, timeRange legacydata.DataTimeRange,
|
|
requestHandler legacydata.RequestHandler) (legacydata.DataTimeSeriesSlice, error) {
|
|
getDsInfo := &datasources.GetDataSourceQuery{
|
|
Id: c.Query.DatasourceID,
|
|
OrgId: context.Rule.OrgID,
|
|
}
|
|
|
|
if err := context.GetDataSource(context.Ctx, getDsInfo); err != nil {
|
|
return nil, fmt.Errorf("could not find datasource: %w", err)
|
|
}
|
|
|
|
err := context.RequestValidator.Validate(getDsInfo.Result.Url, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("access denied: %w", err)
|
|
}
|
|
|
|
req, err := c.getRequestForAlertRule(getDsInfo.Result, timeRange, context.IsDebug)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("interval calculation failed: %w", err)
|
|
}
|
|
result := make(legacydata.DataTimeSeriesSlice, 0)
|
|
|
|
if context.IsDebug {
|
|
data := simplejson.New()
|
|
if req.TimeRange != nil {
|
|
data.Set("from", req.TimeRange.GetFromAsMsEpoch())
|
|
data.Set("to", req.TimeRange.GetToAsMsEpoch())
|
|
}
|
|
|
|
type queryDto struct {
|
|
RefID string `json:"refId"`
|
|
Model *simplejson.Json `json:"model"`
|
|
Datasource *simplejson.Json `json:"datasource"`
|
|
MaxDataPoints int64 `json:"maxDataPoints"`
|
|
IntervalMS int64 `json:"intervalMs"`
|
|
}
|
|
|
|
queries := []*queryDto{}
|
|
for _, q := range req.Queries {
|
|
queries = append(queries, &queryDto{
|
|
RefID: q.RefID,
|
|
Model: q.Model,
|
|
Datasource: simplejson.NewFromAny(map[string]interface{}{
|
|
"id": q.DataSource.Id,
|
|
"name": q.DataSource.Name,
|
|
}),
|
|
MaxDataPoints: q.MaxDataPoints,
|
|
IntervalMS: q.IntervalMS,
|
|
})
|
|
}
|
|
|
|
data.Set("queries", queries)
|
|
|
|
context.Logs = append(context.Logs, &alerting.ResultLogEntry{
|
|
Message: fmt.Sprintf("Condition[%d]: Query", c.Index),
|
|
Data: data,
|
|
})
|
|
}
|
|
|
|
resp, err := requestHandler.HandleRequest(context.Ctx, getDsInfo.Result, req)
|
|
if err != nil {
|
|
return nil, toCustomError(err)
|
|
}
|
|
|
|
for _, v := range resp.Results {
|
|
if v.Error != nil {
|
|
return nil, fmt.Errorf("request handler response error %v", v)
|
|
}
|
|
|
|
// If there are dataframes but no series on the result
|
|
useDataframes := v.Dataframes != nil && (v.Series == nil || len(v.Series) == 0)
|
|
|
|
if useDataframes { // convert the dataframes to plugins.DataTimeSeries
|
|
frames, err := v.Dataframes.Decoded()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("%v: %w", "request handler failed to unmarshal arrow dataframes from bytes", err)
|
|
}
|
|
|
|
for _, frame := range frames {
|
|
ss, err := FrameToSeriesSlice(frame)
|
|
if err != nil {
|
|
return nil, fmt.Errorf(
|
|
`request handler failed to convert dataframe "%v" to plugins.DataTimeSeriesSlice: %w`, frame.Name, err)
|
|
}
|
|
result = append(result, ss...)
|
|
}
|
|
} else {
|
|
result = append(result, v.Series...)
|
|
}
|
|
|
|
queryResultData := map[string]interface{}{}
|
|
|
|
if context.IsTestRun {
|
|
queryResultData["series"] = result
|
|
}
|
|
|
|
if context.IsDebug && v.Meta != nil {
|
|
queryResultData["meta"] = v.Meta
|
|
}
|
|
|
|
if context.IsTestRun || context.IsDebug {
|
|
if useDataframes {
|
|
queryResultData["fromDataframe"] = true
|
|
}
|
|
context.Logs = append(context.Logs, &alerting.ResultLogEntry{
|
|
Message: fmt.Sprintf("Condition[%d]: Query Result", c.Index),
|
|
Data: simplejson.NewFromAny(queryResultData),
|
|
})
|
|
}
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func (c *QueryCondition) getRequestForAlertRule(datasource *datasources.DataSource, timeRange legacydata.DataTimeRange,
|
|
debug bool) (legacydata.DataQuery, error) {
|
|
queryModel := c.Query.Model
|
|
|
|
calculatedInterval, err := calculateInterval(timeRange, queryModel, datasource)
|
|
if err != nil {
|
|
return legacydata.DataQuery{}, err
|
|
}
|
|
|
|
req := legacydata.DataQuery{
|
|
TimeRange: &timeRange,
|
|
Queries: []legacydata.DataSubQuery{
|
|
{
|
|
RefID: "A",
|
|
Model: queryModel,
|
|
DataSource: datasource,
|
|
QueryType: queryModel.Get("queryType").MustString(""),
|
|
MaxDataPoints: interval.DefaultRes,
|
|
IntervalMS: calculatedInterval.Milliseconds(),
|
|
},
|
|
},
|
|
Headers: map[string]string{
|
|
ngalertmodels.FromAlertHeaderName: "true",
|
|
ngalertmodels.CacheSkipHeaderName: "true",
|
|
},
|
|
Debug: debug,
|
|
}
|
|
|
|
return req, nil
|
|
}
|
|
|
|
func newQueryCondition(model *simplejson.Json, index int) (*QueryCondition, error) {
|
|
condition := QueryCondition{}
|
|
condition.Index = index
|
|
|
|
queryJSON := model.Get("query")
|
|
|
|
condition.Query.Model = queryJSON.Get("model")
|
|
condition.Query.From = queryJSON.Get("params").MustArray()[1].(string)
|
|
condition.Query.To = queryJSON.Get("params").MustArray()[2].(string)
|
|
|
|
if err := validateFromValue(condition.Query.From); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := validateToValue(condition.Query.To); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
condition.Query.DatasourceID = queryJSON.Get("datasourceId").MustInt64()
|
|
|
|
reducerJSON := model.Get("reducer")
|
|
condition.Reducer = newSimpleReducer(reducerJSON.Get("type").MustString())
|
|
|
|
evaluatorJSON := model.Get("evaluator")
|
|
evaluator, err := NewAlertEvaluator(evaluatorJSON)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error in condition %v: %v", index, err)
|
|
}
|
|
condition.Evaluator = evaluator
|
|
|
|
operatorJSON := model.Get("operator")
|
|
operator := operatorJSON.Get("type").MustString("and")
|
|
condition.Operator = operator
|
|
|
|
return &condition, nil
|
|
}
|
|
|
|
func validateFromValue(from string) error {
|
|
fromRaw := strings.Replace(from, "now-", "", 1)
|
|
|
|
_, err := time.ParseDuration("-" + fromRaw)
|
|
return err
|
|
}
|
|
|
|
func validateToValue(to string) error {
|
|
if to == "now" {
|
|
return nil
|
|
} else if strings.HasPrefix(to, "now-") {
|
|
withoutNow := strings.Replace(to, "now-", "", 1)
|
|
|
|
_, err := time.ParseDuration("-" + withoutNow)
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
_, err := time.ParseDuration(to)
|
|
return err
|
|
}
|
|
|
|
// FrameToSeriesSlice converts a frame that is a valid time series as per data.TimeSeriesSchema()
|
|
// to a DataTimeSeriesSlice.
|
|
func FrameToSeriesSlice(frame *data.Frame) (legacydata.DataTimeSeriesSlice, error) {
|
|
tsSchema := frame.TimeSeriesSchema()
|
|
if tsSchema.Type == data.TimeSeriesTypeNot {
|
|
// If no fields, or only a time field, create an empty plugins.DataTimeSeriesSlice with a single
|
|
// time series in order to trigger "no data" in alerting.
|
|
if frame.Rows() == 0 || (len(frame.Fields) == 1 && frame.Fields[0].Type().Time()) {
|
|
return legacydata.DataTimeSeriesSlice{{
|
|
Name: frame.Name,
|
|
Points: make(legacydata.DataTimeSeriesPoints, 0),
|
|
}}, nil
|
|
}
|
|
return nil, fmt.Errorf("input frame is not recognized as a time series")
|
|
}
|
|
seriesCount := len(tsSchema.ValueIndices)
|
|
seriesSlice := make(legacydata.DataTimeSeriesSlice, 0, seriesCount)
|
|
timeField := frame.Fields[tsSchema.TimeIndex]
|
|
timeNullFloatSlice := make([]null.Float, timeField.Len())
|
|
|
|
for i := 0; i < timeField.Len(); i++ { // built slice of time as epoch ms in null floats
|
|
tStamp, err := timeField.FloatAt(i)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
timeNullFloatSlice[i] = null.FloatFrom(tStamp)
|
|
}
|
|
|
|
for _, fieldIdx := range tsSchema.ValueIndices { // create a TimeSeries for each value Field
|
|
field := frame.Fields[fieldIdx]
|
|
ts := legacydata.DataTimeSeries{
|
|
Points: make(legacydata.DataTimeSeriesPoints, field.Len()),
|
|
}
|
|
|
|
if len(field.Labels) > 0 {
|
|
ts.Tags = field.Labels.Copy()
|
|
}
|
|
|
|
switch {
|
|
case field.Config != nil && field.Config.DisplayName != "":
|
|
ts.Name = field.Config.DisplayName
|
|
case field.Config != nil && field.Config.DisplayNameFromDS != "":
|
|
ts.Name = field.Config.DisplayNameFromDS
|
|
case len(field.Labels) > 0:
|
|
// Tags are appended to the name so they are eventually included in EvalMatch's Metric property
|
|
// for display in notifications.
|
|
ts.Name = fmt.Sprintf("%v {%v}", field.Name, field.Labels.String())
|
|
default:
|
|
ts.Name = field.Name
|
|
}
|
|
|
|
for rowIdx := 0; rowIdx < field.Len(); rowIdx++ { // for each value in the field, make a TimePoint
|
|
val, err := field.FloatAt(rowIdx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf(
|
|
"failed to convert frame to DataTimeSeriesSlice, can not convert value %v to float: %w", field.At(rowIdx), err)
|
|
}
|
|
ts.Points[rowIdx] = legacydata.DataTimePoint{
|
|
null.FloatFrom(val),
|
|
timeNullFloatSlice[rowIdx],
|
|
}
|
|
}
|
|
|
|
seriesSlice = append(seriesSlice, ts)
|
|
}
|
|
|
|
return seriesSlice, nil
|
|
}
|
|
|
|
func toCustomError(err error) error {
|
|
// is context timeout
|
|
if errors.Is(err, gocontext.DeadlineExceeded) {
|
|
return fmt.Errorf("alert execution exceeded the timeout")
|
|
}
|
|
|
|
// is Prometheus error
|
|
if prometheus.IsAPIError(err) {
|
|
return prometheus.ConvertAPIError(err)
|
|
}
|
|
|
|
// generic fallback
|
|
return fmt.Errorf("request handler error: %w", err)
|
|
}
|