diff --git a/pkg/expr/converter.go b/pkg/expr/converter.go new file mode 100644 index 00000000000..3b080f42e59 --- /dev/null +++ b/pkg/expr/converter.go @@ -0,0 +1,361 @@ +package expr + +import ( + "context" + "fmt" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/data" + + "github.com/grafana/grafana/pkg/expr/mathexp" + "github.com/grafana/grafana/pkg/infra/tracing" + "github.com/grafana/grafana/pkg/services/datasources" + "github.com/grafana/grafana/pkg/services/featuremgmt" +) + +type ResultConverter struct { + Features featuremgmt.FeatureToggles + Tracer tracing.Tracer +} + +func (c *ResultConverter) Convert(ctx context.Context, + datasourceType string, + frames data.Frames, + allowLongFrames bool, +) (string, mathexp.Results, error) { + if len(frames) == 0 { + return "no-data", mathexp.Results{Values: mathexp.Values{mathexp.NewNoData()}}, nil + } + + var dt data.FrameType + dt, useDataplane, _ := shouldUseDataplane(frames, logger, c.Features.IsEnabled(ctx, featuremgmt.FlagDisableSSEDataplane)) + if useDataplane { + logger.Debug("Handling SSE data source query through dataplane", "datatype", dt) + result, err := handleDataplaneFrames(ctx, c.Tracer, dt, frames) + return fmt.Sprintf("dataplane-%s", dt), result, err + } + + if isAllFrameVectors(datasourceType, frames) { // Prometheus Specific Handling + vals, err := framesToNumbers(frames) + if err != nil { + return "", mathexp.Results{}, fmt.Errorf("failed to read frames as numbers: %w", err) + } + return "vector", mathexp.Results{Values: vals}, nil + } + + if len(frames) == 1 { + frame := frames[0] + // Handle Untyped NoData + if len(frame.Fields) == 0 { + return "no-data", mathexp.Results{Values: mathexp.Values{mathexp.NoData{Frame: frame}}}, nil + } + + // Handle Numeric Table + if frame.TimeSeriesSchema().Type == data.TimeSeriesTypeNot && isNumberTable(frame) { + numberSet, err := extractNumberSet(frame) + if err != nil { + return "", mathexp.Results{}, err + } + vals := make([]mathexp.Value, 0, len(numberSet)) + for _, n := range numberSet { + vals = append(vals, n) + } + return "number set", mathexp.Results{ + Values: vals, + }, nil + } + } + + filtered := make([]*data.Frame, 0, len(frames)) + totalLen := 0 + for _, frame := range frames { + schema := frame.TimeSeriesSchema() + // Check for TimeSeriesTypeNot in InfluxDB queries. A data frame of this type will cause + // the WideToMany() function to error out, which results in unhealthy alerts. + // This check should be removed once inconsistencies in data source responses are solved. + if schema.Type == data.TimeSeriesTypeNot && datasourceType == datasources.DS_INFLUXDB { + logger.Warn("Ignoring InfluxDB data frame due to missing numeric fields") + continue + } + + if schema.Type != data.TimeSeriesTypeWide && !allowLongFrames { + return "", mathexp.Results{}, fmt.Errorf("input data must be a wide series but got type %s (input refid)", schema.Type) + } + filtered = append(filtered, frame) + totalLen += len(schema.ValueIndices) + } + + if len(filtered) == 0 { + return "no data", mathexp.Results{Values: mathexp.Values{mathexp.NoData{Frame: frames[0]}}}, nil + } + + maybeFixerFn := checkIfSeriesNeedToBeFixed(filtered, datasourceType) + + dataType := "single frame series" + if len(filtered) > 1 { + dataType = "multi frame series" + } + + vals := make([]mathexp.Value, 0, totalLen) + for _, frame := range filtered { + schema := frame.TimeSeriesSchema() + if schema.Type == data.TimeSeriesTypeWide { + series, err := WideToMany(frame, maybeFixerFn) + if err != nil { + return "", mathexp.Results{}, err + } + for _, ser := range series { + vals = append(vals, ser) + } + } else { + v := mathexp.TableData{Frame: frame} + vals = append(vals, v) + dataType = "single frame" + } + } + + return dataType, mathexp.Results{ + Values: vals, + }, nil +} + +func getResponseFrame(resp *backend.QueryDataResponse, refID string) (data.Frames, error) { + response, ok := resp.Responses[refID] + if !ok { + // This indicates that the RefID of the request was not included to the response, i.e. some problem in the data source plugin + keys := make([]string, 0, len(resp.Responses)) + for refID := range resp.Responses { + keys = append(keys, refID) + } + logger.Warn("Can't find response by refID. Return nodata", "responseRefIds", keys) + return nil, nil + } + + if response.Error != nil { + return nil, response.Error + } + return response.Frames, nil +} + +func isAllFrameVectors(datasourceType string, frames data.Frames) bool { + if datasourceType != datasources.DS_PROMETHEUS { + return false + } + allVector := false + for i, frame := range frames { + if frame.Meta != nil && frame.Meta.Custom != nil { + if sMap, ok := frame.Meta.Custom.(map[string]string); ok { + if sMap != nil { + if sMap["resultType"] == "vector" { + if i != 0 && !allVector { + break + } + allVector = true + } + } + } + } + } + return allVector +} + +func framesToNumbers(frames data.Frames) ([]mathexp.Value, error) { + vals := make([]mathexp.Value, 0, len(frames)) + for _, frame := range frames { + if frame == nil { + continue + } + if len(frame.Fields) == 2 && frame.Fields[0].Len() == 1 { + // Can there be zero Len Field results that are being skipped? + valueField := frame.Fields[1] + if valueField.Type().Numeric() { // should be []float64 + val, err := valueField.FloatAt(0) // FloatAt should not err if numeric + if err != nil { + return nil, fmt.Errorf("failed to read value of frame [%v] (RefID %v) of type [%v] as float: %w", frame.Name, frame.RefID, valueField.Type(), err) + } + n := mathexp.NewNumber(frame.Name, valueField.Labels) + n.SetValue(&val) + vals = append(vals, n) + } + } + } + return vals, nil +} + +func isNumberTable(frame *data.Frame) bool { + if frame == nil || frame.Fields == nil { + return false + } + numericCount := 0 + stringCount := 0 + otherCount := 0 + for _, field := range frame.Fields { + fType := field.Type() + switch { + case fType.Numeric(): + numericCount++ + case fType == data.FieldTypeString || fType == data.FieldTypeNullableString: + stringCount++ + default: + otherCount++ + } + } + return numericCount == 1 && otherCount == 0 +} + +func extractNumberSet(frame *data.Frame) ([]mathexp.Number, error) { + numericField := 0 + stringFieldIdxs := []int{} + stringFieldNames := []string{} + for i, field := range frame.Fields { + fType := field.Type() + switch { + case fType.Numeric(): + numericField = i + case fType == data.FieldTypeString || fType == data.FieldTypeNullableString: + stringFieldIdxs = append(stringFieldIdxs, i) + stringFieldNames = append(stringFieldNames, field.Name) + } + } + numbers := make([]mathexp.Number, frame.Rows()) + + for rowIdx := 0; rowIdx < frame.Rows(); rowIdx++ { + val, _ := frame.FloatAt(numericField, rowIdx) + var labels data.Labels + for i := 0; i < len(stringFieldIdxs); i++ { + if i == 0 { + labels = make(data.Labels) + } + key := stringFieldNames[i] // TODO check for duplicate string column names + val, _ := frame.ConcreteAt(stringFieldIdxs[i], rowIdx) + labels[key] = val.(string) // TODO check assertion / return error + } + + n := mathexp.NewNumber(frame.Fields[numericField].Name, labels) + + // The new value fields' configs gets pointed to the one in the original frame + n.Frame.Fields[0].Config = frame.Fields[numericField].Config + n.SetValue(&val) + + numbers[rowIdx] = n + } + return numbers, nil +} + +// WideToMany converts a data package wide type Frame to one or multiple Series. A series +// is created for each value type column of wide frame. +// +// This might not be a good idea long term, but works now as an adapter/shim. +func WideToMany(frame *data.Frame, fixSeries func(series mathexp.Series, valueField *data.Field)) ([]mathexp.Series, error) { + tsSchema := frame.TimeSeriesSchema() + if tsSchema.Type != data.TimeSeriesTypeWide { + return nil, fmt.Errorf("input data must be a wide series but got type %s", tsSchema.Type) + } + + if len(tsSchema.ValueIndices) == 1 { + s, err := mathexp.SeriesFromFrame(frame) + if err != nil { + return nil, err + } + if fixSeries != nil { + fixSeries(s, frame.Fields[tsSchema.ValueIndices[0]]) + } + return []mathexp.Series{s}, nil + } + + series := make([]mathexp.Series, 0, len(tsSchema.ValueIndices)) + for _, valIdx := range tsSchema.ValueIndices { + l := frame.Rows() + f := data.NewFrameOfFieldTypes(frame.Name, l, frame.Fields[tsSchema.TimeIndex].Type(), frame.Fields[valIdx].Type()) + f.Fields[0].Name = frame.Fields[tsSchema.TimeIndex].Name + f.Fields[1].Name = frame.Fields[valIdx].Name + + // The new value fields' configs gets pointed to the one in the original frame + f.Fields[1].Config = frame.Fields[valIdx].Config + + if frame.Fields[valIdx].Labels != nil { + f.Fields[1].Labels = frame.Fields[valIdx].Labels.Copy() + } + for i := 0; i < l; i++ { + f.SetRow(i, frame.Fields[tsSchema.TimeIndex].CopyAt(i), frame.Fields[valIdx].CopyAt(i)) + } + s, err := mathexp.SeriesFromFrame(f) + if err != nil { + return nil, err + } + if fixSeries != nil { + fixSeries(s, frame.Fields[valIdx]) + } + series = append(series, s) + } + + return series, nil +} + +// checkIfSeriesNeedToBeFixed scans all value fields of all provided frames and determines whether the resulting mathexp.Series +// needs to be updated so each series could be identifiable by labels. +// NOTE: applicable only to only datasources.DS_GRAPHITE and datasources.DS_TESTDATA data sources +// returns a function that patches the mathexp.Series with information from data.Field from which it was created if the all series need to be fixed. Otherwise, returns nil +func checkIfSeriesNeedToBeFixed(frames []*data.Frame, datasourceType string) func(series mathexp.Series, valueField *data.Field) { + if !(datasourceType == datasources.DS_GRAPHITE || datasourceType == datasources.DS_TESTDATA) { + return nil + } + + // get all value fields + var valueFields []*data.Field + for _, frame := range frames { + tsSchema := frame.TimeSeriesSchema() + for _, index := range tsSchema.ValueIndices { + field := frame.Fields[index] + // if at least one value field contains labels, the result does not need to be fixed. + if len(field.Labels) > 0 { + return nil + } + if valueFields == nil { + valueFields = make([]*data.Field, 0, len(frames)*len(tsSchema.ValueIndices)) + } + valueFields = append(valueFields, field) + } + } + + // selectors are in precedence order. + nameSelectors := []func(f *data.Field) string{ + func(f *data.Field) string { + if f == nil || f.Config == nil { + return "" + } + return f.Config.DisplayNameFromDS + }, + func(f *data.Field) string { + if f == nil || f.Config == nil { + return "" + } + return f.Config.DisplayName + }, + func(f *data.Field) string { + return f.Name + }, + } + + // now look for the first selector that would make all value fields be unique + for _, selector := range nameSelectors { + names := make(map[string]struct{}, len(valueFields)) + good := true + for _, field := range valueFields { + name := selector(field) + if _, ok := names[name]; ok || name == "" { + good = false + break + } + names[name] = struct{}{} + } + if good { + return func(series mathexp.Series, valueField *data.Field) { + series.SetLabels(data.Labels{ + nameLabelName: selector(valueField), + }) + } + } + } + return nil +} diff --git a/pkg/expr/converter_test.go b/pkg/expr/converter_test.go new file mode 100644 index 00000000000..2a484ea9673 --- /dev/null +++ b/pkg/expr/converter_test.go @@ -0,0 +1,121 @@ +package expr + +import ( + "context" + "testing" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/grafana/grafana/pkg/expr/mathexp" + "github.com/grafana/grafana/pkg/infra/tracing" + "github.com/grafana/grafana/pkg/services/datasources" + "github.com/grafana/grafana/pkg/services/featuremgmt" + "github.com/grafana/grafana/pkg/setting" +) + +func TestConvertDataFramesToResults(t *testing.T) { + s := &Service{ + cfg: setting.NewCfg(), + features: &featuremgmt.FeatureManager{}, + tracer: tracing.InitializeTracerForTest(), + metrics: newMetrics(nil), + } + converter := &ResultConverter{Features: s.features, Tracer: s.tracer} + + t.Run("should add name label if no labels and specific data source", func(t *testing.T) { + supported := []string{datasources.DS_GRAPHITE, datasources.DS_TESTDATA} + t.Run("when only field name is specified", func(t *testing.T) { + t.Run("use value field names if one frame - many series", func(t *testing.T) { + supported := []string{datasources.DS_GRAPHITE, datasources.DS_TESTDATA} + + frames := []*data.Frame{ + data.NewFrame("test", + data.NewField("time", nil, []time.Time{time.Unix(1, 0)}), + data.NewField("test-value1", nil, []*float64{fp(2)}), + data.NewField("test-value2", nil, []*float64{fp(2)})), + } + + for _, dtype := range supported { + t.Run(dtype, func(t *testing.T) { + resultType, res, err := converter.Convert(context.Background(), dtype, frames, s.allowLongFrames) + require.NoError(t, err) + assert.Equal(t, "single frame series", resultType) + require.Len(t, res.Values, 2) + + var names []string + for _, value := range res.Values { + require.IsType(t, mathexp.Series{}, value) + lbls := value.GetLabels() + require.Contains(t, lbls, nameLabelName) + names = append(names, lbls[nameLabelName]) + } + require.EqualValues(t, []string{"test-value1", "test-value2"}, names) + }) + } + }) + t.Run("should use frame name if one frame - one series", func(t *testing.T) { + frames := []*data.Frame{ + data.NewFrame("test-frame1", + data.NewField("time", nil, []time.Time{time.Unix(1, 0)}), + data.NewField("test-value1", nil, []*float64{fp(2)})), + data.NewFrame("test-frame2", + data.NewField("time", nil, []time.Time{time.Unix(1, 0)}), + data.NewField("test-value2", nil, []*float64{fp(2)})), + } + + for _, dtype := range supported { + t.Run(dtype, func(t *testing.T) { + resultType, res, err := converter.Convert(context.Background(), dtype, frames, s.allowLongFrames) + require.NoError(t, err) + assert.Equal(t, "multi frame series", resultType) + require.Len(t, res.Values, 2) + + var names []string + for _, value := range res.Values { + require.IsType(t, mathexp.Series{}, value) + lbls := value.GetLabels() + require.Contains(t, lbls, nameLabelName) + names = append(names, lbls[nameLabelName]) + } + require.EqualValues(t, []string{"test-frame1", "test-frame2"}, names) + }) + } + }) + }) + t.Run("should use fields DisplayNameFromDS when it is unique", func(t *testing.T) { + f1 := data.NewField("test-value1", nil, []*float64{fp(2)}) + f1.Config = &data.FieldConfig{DisplayNameFromDS: "test-value1"} + f2 := data.NewField("test-value2", nil, []*float64{fp(2)}) + f2.Config = &data.FieldConfig{DisplayNameFromDS: "test-value2"} + frames := []*data.Frame{ + data.NewFrame("test-frame1", + data.NewField("time", nil, []time.Time{time.Unix(1, 0)}), + f1), + data.NewFrame("test-frame2", + data.NewField("time", nil, []time.Time{time.Unix(1, 0)}), + f2), + } + + for _, dtype := range supported { + t.Run(dtype, func(t *testing.T) { + resultType, res, err := converter.Convert(context.Background(), dtype, frames, s.allowLongFrames) + require.NoError(t, err) + assert.Equal(t, "multi frame series", resultType) + require.Len(t, res.Values, 2) + + var names []string + for _, value := range res.Values { + require.IsType(t, mathexp.Series{}, value) + lbls := value.GetLabels() + require.Contains(t, lbls, nameLabelName) + names = append(names, lbls[nameLabelName]) + } + require.EqualValues(t, []string{"test-value1", "test-value2"}, names) + }) + } + }) + }) +} diff --git a/pkg/expr/dataplane_test.go b/pkg/expr/dataplane_test.go index 186f0add4e8..674d4432f0a 100644 --- a/pkg/expr/dataplane_test.go +++ b/pkg/expr/dataplane_test.go @@ -50,12 +50,13 @@ func framesPassThroughService(t *testing.T, frames data.Frames) (data.Frames, er map[string]backend.DataResponse{"A": {Frames: frames}}, } + features := featuremgmt.WithFeatures() cfg := setting.NewCfg() s := Service{ cfg: cfg, dataService: me, - features: &featuremgmt.FeatureManager{}, + features: features, pCtxProvider: plugincontext.ProvideService(cfg, nil, &pluginstore.FakePluginStore{ PluginList: []pluginstore.Plugin{ {JSONData: plugins.JSONData{ID: "test"}}, @@ -64,6 +65,10 @@ func framesPassThroughService(t *testing.T, frames data.Frames) (data.Frames, er nil, pluginconfig.NewFakePluginRequestConfigProvider()), tracer: tracing.InitializeTracerForTest(), metrics: newMetrics(nil), + converter: &ResultConverter{ + Features: features, + Tracer: tracing.InitializeTracerForTest(), + }, } queries := []Query{{ RefID: "A", diff --git a/pkg/expr/ml.go b/pkg/expr/ml.go index 185b265474e..affdad77a95 100644 --- a/pkg/expr/ml.go +++ b/pkg/expr/ml.go @@ -130,7 +130,7 @@ func (m *MLNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s * } // process the response the same way DSNode does. Use plugin ID as data source type. Semantically, they are the same. - responseType, result, err = convertDataFramesToResults(ctx, dataFrames, mlPluginID, s, logger) + responseType, result, err = s.converter.Convert(ctx, mlPluginID, dataFrames, s.allowLongFrames) return result, err } diff --git a/pkg/expr/models.go b/pkg/expr/models.go index 6e7f0919adf..6fffd7d116c 100644 --- a/pkg/expr/models.go +++ b/pkg/expr/models.go @@ -50,8 +50,8 @@ type ResampleQuery struct { // The math expression Expression string `json:"expression" jsonschema:"minLength=1,example=$A + 1,example=$A"` - // The time durration - Window string `json:"window" jsonschema:"minLength=1,example=1w,example=10m"` + // The time duration + Window string `json:"window" jsonschema:"minLength=1,example=1d,example=10m"` // The downsample function Downsampler mathexp.ReducerID `json:"downsampler"` diff --git a/pkg/expr/nodes.go b/pkg/expr/nodes.go index 8ecf303cc4f..f5defd3b61e 100644 --- a/pkg/expr/nodes.go +++ b/pkg/expr/nodes.go @@ -9,9 +9,7 @@ import ( "time" "github.com/grafana/grafana-plugin-sdk-go/backend" - "github.com/grafana/grafana-plugin-sdk-go/data" - jsonitersdk "github.com/grafana/grafana-plugin-sdk-go/data/utils/jsoniter" - jsoniter "github.com/json-iterator/go" + "github.com/grafana/grafana-plugin-sdk-go/data/utils/jsoniter" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "gonum.org/v1/gonum/graph/simple" @@ -130,13 +128,12 @@ func buildCMDNode(rn *rawNode, toggles featuremgmt.FeatureToggles) (*CMDNode, er // NOTE: this structure of this is weird now, because it is targeting a structure // where this is actually run in the root loop, however we want to verify the individual // node parsing before changing the full tree parser - reader, err := NewExpressionQueryReader(toggles) + reader := NewExpressionQueryReader(toggles) + iter, err := jsoniter.ParseBytes(jsoniter.ConfigDefault, rn.QueryRaw) if err != nil { return nil, err } - - iter := jsoniter.ParseBytes(jsoniter.ConfigDefault, rn.QueryRaw) - q, err := reader.ReadQuery(rn, jsonitersdk.NewIterator(iter)) + q, err := reader.ReadQuery(rn, iter) if err != nil { return nil, err } @@ -325,7 +322,7 @@ func executeDSNodesGrouped(ctx context.Context, now time.Time, vars mathexp.Vars } var result mathexp.Results - responseType, result, err := convertDataFramesToResults(ctx, dataFrames, dn.datasource.Type, s, logger) + responseType, result, err := s.converter.Convert(ctx, dn.datasource.Type, dataFrames, s.allowLongFrames) if err != nil { result.Error = makeConversionError(dn.RefID(), err) } @@ -393,347 +390,9 @@ func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s } var result mathexp.Results - responseType, result, err = convertDataFramesToResults(ctx, dataFrames, dn.datasource.Type, s, logger) + responseType, result, err = s.converter.Convert(ctx, dn.datasource.Type, dataFrames, s.allowLongFrames) if err != nil { err = makeConversionError(dn.refID, err) } return result, err } - -func getResponseFrame(resp *backend.QueryDataResponse, refID string) (data.Frames, error) { - response, ok := resp.Responses[refID] - if !ok { - // This indicates that the RefID of the request was not included to the response, i.e. some problem in the data source plugin - keys := make([]string, 0, len(resp.Responses)) - for refID := range resp.Responses { - keys = append(keys, refID) - } - logger.Warn("Can't find response by refID. Return nodata", "responseRefIds", keys) - return nil, nil - } - - if response.Error != nil { - return nil, response.Error - } - return response.Frames, nil -} - -func convertDataFramesToResults(ctx context.Context, frames data.Frames, datasourceType string, s *Service, logger log.Logger) (string, mathexp.Results, error) { - if len(frames) == 0 { - return "no-data", mathexp.Results{Values: mathexp.Values{mathexp.NewNoData()}}, nil - } - - var dt data.FrameType - dt, useDataplane, _ := shouldUseDataplane(frames, logger, s.features.IsEnabled(ctx, featuremgmt.FlagDisableSSEDataplane)) - if useDataplane { - logger.Debug("Handling SSE data source query through dataplane", "datatype", dt) - result, err := handleDataplaneFrames(ctx, s.tracer, dt, frames) - return fmt.Sprintf("dataplane-%s", dt), result, err - } - - if isAllFrameVectors(datasourceType, frames) { // Prometheus Specific Handling - vals, err := framesToNumbers(frames) - if err != nil { - return "", mathexp.Results{}, fmt.Errorf("failed to read frames as numbers: %w", err) - } - return "vector", mathexp.Results{Values: vals}, nil - } - - if len(frames) == 1 { - frame := frames[0] - // Handle Untyped NoData - if len(frame.Fields) == 0 { - return "no-data", mathexp.Results{Values: mathexp.Values{mathexp.NoData{Frame: frame}}}, nil - } - - // Handle Numeric Table - if frame.TimeSeriesSchema().Type == data.TimeSeriesTypeNot && isNumberTable(frame) { - numberSet, err := extractNumberSet(frame) - if err != nil { - return "", mathexp.Results{}, err - } - vals := make([]mathexp.Value, 0, len(numberSet)) - for _, n := range numberSet { - vals = append(vals, n) - } - return "number set", mathexp.Results{ - Values: vals, - }, nil - } - } - - filtered := make([]*data.Frame, 0, len(frames)) - totalLen := 0 - for _, frame := range frames { - schema := frame.TimeSeriesSchema() - // Check for TimeSeriesTypeNot in InfluxDB queries. A data frame of this type will cause - // the WideToMany() function to error out, which results in unhealthy alerts. - // This check should be removed once inconsistencies in data source responses are solved. - if schema.Type == data.TimeSeriesTypeNot && datasourceType == datasources.DS_INFLUXDB { - logger.Warn("Ignoring InfluxDB data frame due to missing numeric fields") - continue - } - - if schema.Type != data.TimeSeriesTypeWide && !s.allowLongFrames { - return "", mathexp.Results{}, fmt.Errorf("input data must be a wide series but got type %s (input refid)", schema.Type) - } - filtered = append(filtered, frame) - totalLen += len(schema.ValueIndices) - } - - if len(filtered) == 0 { - return "no data", mathexp.Results{Values: mathexp.Values{mathexp.NoData{Frame: frames[0]}}}, nil - } - - maybeFixerFn := checkIfSeriesNeedToBeFixed(filtered, datasourceType) - - dataType := "single frame series" - if len(filtered) > 1 { - dataType = "multi frame series" - } - - vals := make([]mathexp.Value, 0, totalLen) - for _, frame := range filtered { - schema := frame.TimeSeriesSchema() - if schema.Type == data.TimeSeriesTypeWide { - series, err := WideToMany(frame, maybeFixerFn) - if err != nil { - return "", mathexp.Results{}, err - } - for _, ser := range series { - vals = append(vals, ser) - } - } else { - v := mathexp.TableData{Frame: frame} - vals = append(vals, v) - dataType = "single frame" - } - } - - return dataType, mathexp.Results{ - Values: vals, - }, nil -} - -func isAllFrameVectors(datasourceType string, frames data.Frames) bool { - if datasourceType != datasources.DS_PROMETHEUS { - return false - } - allVector := false - for i, frame := range frames { - if frame.Meta != nil && frame.Meta.Custom != nil { - if sMap, ok := frame.Meta.Custom.(map[string]string); ok { - if sMap != nil { - if sMap["resultType"] == "vector" { - if i != 0 && !allVector { - break - } - allVector = true - } - } - } - } - } - return allVector -} - -func framesToNumbers(frames data.Frames) ([]mathexp.Value, error) { - vals := make([]mathexp.Value, 0, len(frames)) - for _, frame := range frames { - if frame == nil { - continue - } - if len(frame.Fields) == 2 && frame.Fields[0].Len() == 1 { - // Can there be zero Len Field results that are being skipped? - valueField := frame.Fields[1] - if valueField.Type().Numeric() { // should be []float64 - val, err := valueField.FloatAt(0) // FloatAt should not err if numeric - if err != nil { - return nil, fmt.Errorf("failed to read value of frame [%v] (RefID %v) of type [%v] as float: %w", frame.Name, frame.RefID, valueField.Type(), err) - } - n := mathexp.NewNumber(frame.Name, valueField.Labels) - n.SetValue(&val) - vals = append(vals, n) - } - } - } - return vals, nil -} - -func isNumberTable(frame *data.Frame) bool { - if frame == nil || frame.Fields == nil { - return false - } - numericCount := 0 - stringCount := 0 - otherCount := 0 - for _, field := range frame.Fields { - fType := field.Type() - switch { - case fType.Numeric(): - numericCount++ - case fType == data.FieldTypeString || fType == data.FieldTypeNullableString: - stringCount++ - default: - otherCount++ - } - } - return numericCount == 1 && otherCount == 0 -} - -func extractNumberSet(frame *data.Frame) ([]mathexp.Number, error) { - numericField := 0 - stringFieldIdxs := []int{} - stringFieldNames := []string{} - for i, field := range frame.Fields { - fType := field.Type() - switch { - case fType.Numeric(): - numericField = i - case fType == data.FieldTypeString || fType == data.FieldTypeNullableString: - stringFieldIdxs = append(stringFieldIdxs, i) - stringFieldNames = append(stringFieldNames, field.Name) - } - } - numbers := make([]mathexp.Number, frame.Rows()) - - for rowIdx := 0; rowIdx < frame.Rows(); rowIdx++ { - val, _ := frame.FloatAt(numericField, rowIdx) - var labels data.Labels - for i := 0; i < len(stringFieldIdxs); i++ { - if i == 0 { - labels = make(data.Labels) - } - key := stringFieldNames[i] // TODO check for duplicate string column names - val, _ := frame.ConcreteAt(stringFieldIdxs[i], rowIdx) - labels[key] = val.(string) // TODO check assertion / return error - } - - n := mathexp.NewNumber(frame.Fields[numericField].Name, labels) - - // The new value fields' configs gets pointed to the one in the original frame - n.Frame.Fields[0].Config = frame.Fields[numericField].Config - n.SetValue(&val) - - numbers[rowIdx] = n - } - return numbers, nil -} - -// WideToMany converts a data package wide type Frame to one or multiple Series. A series -// is created for each value type column of wide frame. -// -// This might not be a good idea long term, but works now as an adapter/shim. -func WideToMany(frame *data.Frame, fixSeries func(series mathexp.Series, valueField *data.Field)) ([]mathexp.Series, error) { - tsSchema := frame.TimeSeriesSchema() - if tsSchema.Type != data.TimeSeriesTypeWide { - return nil, fmt.Errorf("input data must be a wide series but got type %s", tsSchema.Type) - } - - if len(tsSchema.ValueIndices) == 1 { - s, err := mathexp.SeriesFromFrame(frame) - if err != nil { - return nil, err - } - if fixSeries != nil { - fixSeries(s, frame.Fields[tsSchema.ValueIndices[0]]) - } - return []mathexp.Series{s}, nil - } - - series := make([]mathexp.Series, 0, len(tsSchema.ValueIndices)) - for _, valIdx := range tsSchema.ValueIndices { - l := frame.Rows() - f := data.NewFrameOfFieldTypes(frame.Name, l, frame.Fields[tsSchema.TimeIndex].Type(), frame.Fields[valIdx].Type()) - f.Fields[0].Name = frame.Fields[tsSchema.TimeIndex].Name - f.Fields[1].Name = frame.Fields[valIdx].Name - - // The new value fields' configs gets pointed to the one in the original frame - f.Fields[1].Config = frame.Fields[valIdx].Config - - if frame.Fields[valIdx].Labels != nil { - f.Fields[1].Labels = frame.Fields[valIdx].Labels.Copy() - } - for i := 0; i < l; i++ { - f.SetRow(i, frame.Fields[tsSchema.TimeIndex].CopyAt(i), frame.Fields[valIdx].CopyAt(i)) - } - s, err := mathexp.SeriesFromFrame(f) - if err != nil { - return nil, err - } - if fixSeries != nil { - fixSeries(s, frame.Fields[valIdx]) - } - series = append(series, s) - } - - return series, nil -} - -// checkIfSeriesNeedToBeFixed scans all value fields of all provided frames and determines whether the resulting mathexp.Series -// needs to be updated so each series could be identifiable by labels. -// NOTE: applicable only to only datasources.DS_GRAPHITE and datasources.DS_TESTDATA data sources -// returns a function that patches the mathexp.Series with information from data.Field from which it was created if the all series need to be fixed. Otherwise, returns nil -func checkIfSeriesNeedToBeFixed(frames []*data.Frame, datasourceType string) func(series mathexp.Series, valueField *data.Field) { - if !(datasourceType == datasources.DS_GRAPHITE || datasourceType == datasources.DS_TESTDATA) { - return nil - } - - // get all value fields - var valueFields []*data.Field - for _, frame := range frames { - tsSchema := frame.TimeSeriesSchema() - for _, index := range tsSchema.ValueIndices { - field := frame.Fields[index] - // if at least one value field contains labels, the result does not need to be fixed. - if len(field.Labels) > 0 { - return nil - } - if valueFields == nil { - valueFields = make([]*data.Field, 0, len(frames)*len(tsSchema.ValueIndices)) - } - valueFields = append(valueFields, field) - } - } - - // selectors are in precedence order. - nameSelectors := []func(f *data.Field) string{ - func(f *data.Field) string { - if f == nil || f.Config == nil { - return "" - } - return f.Config.DisplayNameFromDS - }, - func(f *data.Field) string { - if f == nil || f.Config == nil { - return "" - } - return f.Config.DisplayName - }, - func(f *data.Field) string { - return f.Name - }, - } - - // now look for the first selector that would make all value fields be unique - for _, selector := range nameSelectors { - names := make(map[string]struct{}, len(valueFields)) - good := true - for _, field := range valueFields { - name := selector(field) - if _, ok := names[name]; ok || name == "" { - good = false - break - } - names[name] = struct{}{} - } - if good { - return func(series mathexp.Series, valueField *data.Field) { - series.SetLabels(data.Labels{ - nameLabelName: selector(valueField), - }) - } - } - } - return nil -} diff --git a/pkg/expr/nodes_test.go b/pkg/expr/nodes_test.go index f889e817c2e..ada215a8ad9 100644 --- a/pkg/expr/nodes_test.go +++ b/pkg/expr/nodes_test.go @@ -1,7 +1,6 @@ package expr import ( - "context" "errors" "fmt" "testing" @@ -12,11 +11,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/grafana/pkg/expr/mathexp" - "github.com/grafana/grafana/pkg/infra/log/logtest" - "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/services/datasources" - "github.com/grafana/grafana/pkg/services/featuremgmt" - "github.com/grafana/grafana/pkg/setting" ) type expectedError struct{} @@ -169,106 +164,3 @@ func TestCheckIfSeriesNeedToBeFixed(t *testing.T) { }) } } - -func TestConvertDataFramesToResults(t *testing.T) { - s := &Service{ - cfg: setting.NewCfg(), - features: &featuremgmt.FeatureManager{}, - tracer: tracing.InitializeTracerForTest(), - metrics: newMetrics(nil), - } - - t.Run("should add name label if no labels and specific data source", func(t *testing.T) { - supported := []string{datasources.DS_GRAPHITE, datasources.DS_TESTDATA} - t.Run("when only field name is specified", func(t *testing.T) { - t.Run("use value field names if one frame - many series", func(t *testing.T) { - supported := []string{datasources.DS_GRAPHITE, datasources.DS_TESTDATA} - - frames := []*data.Frame{ - data.NewFrame("test", - data.NewField("time", nil, []time.Time{time.Unix(1, 0)}), - data.NewField("test-value1", nil, []*float64{fp(2)}), - data.NewField("test-value2", nil, []*float64{fp(2)})), - } - - for _, dtype := range supported { - t.Run(dtype, func(t *testing.T) { - resultType, res, err := convertDataFramesToResults(context.Background(), frames, dtype, s, &logtest.Fake{}) - require.NoError(t, err) - assert.Equal(t, "single frame series", resultType) - require.Len(t, res.Values, 2) - - var names []string - for _, value := range res.Values { - require.IsType(t, mathexp.Series{}, value) - lbls := value.GetLabels() - require.Contains(t, lbls, nameLabelName) - names = append(names, lbls[nameLabelName]) - } - require.EqualValues(t, []string{"test-value1", "test-value2"}, names) - }) - } - }) - t.Run("should use frame name if one frame - one series", func(t *testing.T) { - frames := []*data.Frame{ - data.NewFrame("test-frame1", - data.NewField("time", nil, []time.Time{time.Unix(1, 0)}), - data.NewField("test-value1", nil, []*float64{fp(2)})), - data.NewFrame("test-frame2", - data.NewField("time", nil, []time.Time{time.Unix(1, 0)}), - data.NewField("test-value2", nil, []*float64{fp(2)})), - } - - for _, dtype := range supported { - t.Run(dtype, func(t *testing.T) { - resultType, res, err := convertDataFramesToResults(context.Background(), frames, dtype, s, &logtest.Fake{}) - require.NoError(t, err) - assert.Equal(t, "multi frame series", resultType) - require.Len(t, res.Values, 2) - - var names []string - for _, value := range res.Values { - require.IsType(t, mathexp.Series{}, value) - lbls := value.GetLabels() - require.Contains(t, lbls, nameLabelName) - names = append(names, lbls[nameLabelName]) - } - require.EqualValues(t, []string{"test-frame1", "test-frame2"}, names) - }) - } - }) - }) - t.Run("should use fields DisplayNameFromDS when it is unique", func(t *testing.T) { - f1 := data.NewField("test-value1", nil, []*float64{fp(2)}) - f1.Config = &data.FieldConfig{DisplayNameFromDS: "test-value1"} - f2 := data.NewField("test-value2", nil, []*float64{fp(2)}) - f2.Config = &data.FieldConfig{DisplayNameFromDS: "test-value2"} - frames := []*data.Frame{ - data.NewFrame("test-frame1", - data.NewField("time", nil, []time.Time{time.Unix(1, 0)}), - f1), - data.NewFrame("test-frame2", - data.NewField("time", nil, []time.Time{time.Unix(1, 0)}), - f2), - } - - for _, dtype := range supported { - t.Run(dtype, func(t *testing.T) { - resultType, res, err := convertDataFramesToResults(context.Background(), frames, dtype, s, &logtest.Fake{}) - require.NoError(t, err) - assert.Equal(t, "multi frame series", resultType) - require.Len(t, res.Values, 2) - - var names []string - for _, value := range res.Values { - require.IsType(t, mathexp.Series{}, value) - lbls := value.GetLabels() - require.Contains(t, lbls, nameLabelName) - names = append(names, lbls[nameLabelName]) - } - require.EqualValues(t, []string{"test-value1", "test-value2"}, names) - }) - } - }) - }) -} diff --git a/pkg/expr/reader.go b/pkg/expr/reader.go index f4ca1cde5fd..9227b19c13b 100644 --- a/pkg/expr/reader.go +++ b/pkg/expr/reader.go @@ -14,38 +14,52 @@ import ( // Once we are comfortable with the parsing logic, this struct will // be merged/replace the existing Query struct in grafana/pkg/expr/transform.go type ExpressionQuery struct { - RefID string - Command Command + GraphID int64 `json:"id,omitempty"` + RefID string `json:"refId"` + QueryType QueryType `json:"queryType"` + + // The typed query parameters + Properties any `json:"properties"` + + // Hidden in debug JSON + Command Command `json:"-"` +} + +// ID is used to identify nodes in the directed graph +func (q ExpressionQuery) ID() int64 { + return q.GraphID } type ExpressionQueryReader struct { features featuremgmt.FeatureToggles } -func NewExpressionQueryReader(features featuremgmt.FeatureToggles) (*ExpressionQueryReader, error) { - h := &ExpressionQueryReader{ +func NewExpressionQueryReader(features featuremgmt.FeatureToggles) *ExpressionQueryReader { + return &ExpressionQueryReader{ features: features, } - return h, nil } -// ReadQuery implements query.TypedQueryHandler. // nolint:gocyclo func (h *ExpressionQueryReader) ReadQuery( // Properties that have been parsed off the same node - common *rawNode, // common query.CommonQueryProperties + common *rawNode, // An iterator with context for the full node (include common values) iter *jsoniter.Iterator, ) (eq ExpressionQuery, err error) { referenceVar := "" eq.RefID = common.RefID - qt := QueryType(common.QueryType) - switch qt { + if common.QueryType == "" { + return eq, fmt.Errorf("missing queryType") + } + eq.QueryType = QueryType(common.QueryType) + switch eq.QueryType { case QueryTypeMath: q := &MathQuery{} err = iter.ReadVal(q) if err == nil { eq.Command, err = NewMathCommand(common.RefID, q.Expression) + eq.Properties = q } case QueryTypeReduce: @@ -54,6 +68,7 @@ func (h *ExpressionQueryReader) ReadQuery( err = iter.ReadVal(q) if err == nil { referenceVar, err = getReferenceVar(q.Expression, common.RefID) + eq.Properties = q } if err == nil && q.Settings != nil { switch q.Settings.Mode { @@ -69,6 +84,7 @@ func (h *ExpressionQueryReader) ReadQuery( } } if err == nil { + eq.Properties = q eq.Command, err = NewReduceCommand(common.RefID, q.Reducer, referenceVar, mapper) } @@ -83,23 +99,21 @@ func (h *ExpressionQueryReader) ReadQuery( referenceVar, err = getReferenceVar(q.Expression, common.RefID) } if err == nil { - // tr := legacydata.NewDataTimeRange(common.TimeRange.From, common.TimeRange.To) - // AbsoluteTimeRange{ - // From: tr.GetFromAsTimeUTC(), - // To: tr.GetToAsTimeUTC(), - // }) + eq.Properties = q eq.Command, err = NewResampleCommand(common.RefID, q.Window, referenceVar, q.Downsampler, q.Upsampler, - common.TimeRange) + common.TimeRange, + ) } case QueryTypeClassic: q := &ClassicQuery{} err = iter.ReadVal(q) if err == nil { + eq.Properties = q eq.Command, err = classic.NewConditionCmd(common.RefID, q.Conditions) } @@ -107,7 +121,8 @@ func (h *ExpressionQueryReader) ReadQuery( q := &SQLExpression{} err = iter.ReadVal(q) if err == nil { - eq.Command, err = NewSQLCommand(common.RefID, q.Expression, common.TimeRange) + eq.Properties = q + eq.Command, err = NewSQLCommand(common.RefID, q.Expression) } case QueryTypeThreshold: @@ -128,6 +143,7 @@ func (h *ExpressionQueryReader) ReadQuery( return eq, fmt.Errorf("invalid condition: %w", err) } eq.Command = threshold + eq.Properties = q if firstCondition.UnloadEvaluator != nil && h.features.IsEnabledGlobally(featuremgmt.FlagRecoveryThreshold) { unloading, err := NewThresholdCommand(common.RefID, referenceVar, firstCondition.UnloadEvaluator.Type, firstCondition.UnloadEvaluator.Params) diff --git a/pkg/expr/service.go b/pkg/expr/service.go index 978ee147211..01c88be979b 100644 --- a/pkg/expr/service.go +++ b/pkg/expr/service.go @@ -60,6 +60,7 @@ type Service struct { dataService backend.QueryDataHandler pCtxProvider pluginContextProvider features featuremgmt.FeatureToggles + converter *ResultConverter pluginsClient backend.CallResourceHandler @@ -83,6 +84,10 @@ func ProvideService(cfg *setting.Cfg, pluginClient plugins.Client, pCtxProvider tracer: tracer, metrics: newMetrics(registerer), pluginsClient: pluginClient, + converter: &ResultConverter{ + Features: features, + Tracer: tracer, + }, } } diff --git a/pkg/expr/service_test.go b/pkg/expr/service_test.go index 3492b1121dc..2ee7ff5d357 100644 --- a/pkg/expr/service_test.go +++ b/pkg/expr/service_test.go @@ -43,13 +43,18 @@ func TestService(t *testing.T) { }, }, &datafakes.FakeCacheService{}, &datafakes.FakeDataSourceService{}, nil, pluginconfig.NewFakePluginRequestConfigProvider()) + features := featuremgmt.WithFeatures() s := Service{ cfg: setting.NewCfg(), dataService: me, pCtxProvider: pCtxProvider, - features: &featuremgmt.FeatureManager{}, + features: features, tracer: tracing.InitializeTracerForTest(), metrics: newMetrics(nil), + converter: &ResultConverter{ + Features: features, + Tracer: tracing.InitializeTracerForTest(), + }, } queries := []Query{ diff --git a/pkg/expr/sql_command.go b/pkg/expr/sql_command.go index ce69bd550d5..44bbe559862 100644 --- a/pkg/expr/sql_command.go +++ b/pkg/expr/sql_command.go @@ -19,12 +19,11 @@ import ( type SQLCommand struct { query string varsToQuery []string - timeRange TimeRange refID string } // NewSQLCommand creates a new SQLCommand. -func NewSQLCommand(refID, rawSQL string, tr TimeRange) (*SQLCommand, error) { +func NewSQLCommand(refID, rawSQL string) (*SQLCommand, error) { if rawSQL == "" { return nil, errutil.BadRequest("sql-missing-query", errutil.WithPublicMessage("missing SQL query")) @@ -39,7 +38,6 @@ func NewSQLCommand(refID, rawSQL string, tr TimeRange) (*SQLCommand, error) { return &SQLCommand{ query: rawSQL, varsToQuery: tables, - timeRange: tr, refID: refID, }, nil } @@ -59,7 +57,7 @@ func UnmarshalSQLCommand(rn *rawNode) (*SQLCommand, error) { return nil, fmt.Errorf("expected sql expression to be type string, but got type %T", expressionRaw) } - return NewSQLCommand(rn.RefID, expression, rn.TimeRange) + return NewSQLCommand(rn.RefID, expression) } // NeedsVars returns the variable names (refIds) that are dependencies diff --git a/pkg/expr/sql_command_test.go b/pkg/expr/sql_command_test.go index 90ba470ae07..7bd9d3c06e2 100644 --- a/pkg/expr/sql_command_test.go +++ b/pkg/expr/sql_command_test.go @@ -6,7 +6,7 @@ import ( ) func TestNewCommand(t *testing.T) { - cmd, err := NewSQLCommand("a", "select a from foo, bar", nil) + cmd, err := NewSQLCommand("a", "select a from foo, bar") if err != nil && strings.Contains(err.Error(), "feature is not enabled") { return } diff --git a/pkg/services/ngalert/backtesting/eval_data.go b/pkg/services/ngalert/backtesting/eval_data.go index 12b8b872bd5..999c0bc6302 100644 --- a/pkg/services/ngalert/backtesting/eval_data.go +++ b/pkg/services/ngalert/backtesting/eval_data.go @@ -32,8 +32,8 @@ func newDataEvaluator(refID string, frame *data.Frame) (*dataEvaluator, error) { return &dataEvaluator{ refID: refID, data: series, - downsampleFunction: "last", - upsampleFunction: "pad", + downsampleFunction: mathexp.ReducerLast, + upsampleFunction: mathexp.UpsamplerPad, }, nil }