Elasticsearch: data query refactor (#113360)
* split up data_query.go * split up response_parser * remove white space
This commit is contained in:
@@ -0,0 +1,173 @@
|
||||
package elasticsearch
|
||||
|
||||
import (
|
||||
"regexp"
|
||||
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
|
||||
)
|
||||
|
||||
// addDateHistogramAgg adds a date histogram aggregation to the aggregation builder
|
||||
func addDateHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, timeFrom, timeTo int64, timeField string) es.AggBuilder {
|
||||
// If no field is specified, use the time field
|
||||
field := bucketAgg.Field
|
||||
if field == "" {
|
||||
field = timeField
|
||||
}
|
||||
aggBuilder.DateHistogram(bucketAgg.ID, field, func(a *es.DateHistogramAgg, b es.AggBuilder) {
|
||||
var interval = bucketAgg.Settings.Get("interval").MustString("auto")
|
||||
if isCalendarInterval(interval) {
|
||||
a.CalendarInterval = interval
|
||||
} else {
|
||||
if interval == "auto" {
|
||||
// note this is not really a valid grafana-variable-handling,
|
||||
// because normally this would not match `$__interval_ms`,
|
||||
// but because how we apply these in the go-code, this will work
|
||||
// correctly, and becomes something like `500ms`.
|
||||
// a nicer way would be to use `${__interval_ms}ms`, but
|
||||
// that format is not recognized where we apply these variables
|
||||
// in the elasticsearch datasource
|
||||
a.FixedInterval = "$__interval_msms"
|
||||
} else {
|
||||
a.FixedInterval = interval
|
||||
}
|
||||
}
|
||||
a.MinDocCount = bucketAgg.Settings.Get("min_doc_count").MustInt(0)
|
||||
a.ExtendedBounds = &es.ExtendedBounds{Min: timeFrom, Max: timeTo}
|
||||
a.Format = bucketAgg.Settings.Get("format").MustString(es.DateFormatEpochMS)
|
||||
|
||||
if offset, err := bucketAgg.Settings.Get("offset").String(); err == nil {
|
||||
a.Offset = offset
|
||||
}
|
||||
|
||||
if missing, err := bucketAgg.Settings.Get("missing").String(); err == nil {
|
||||
a.Missing = &missing
|
||||
}
|
||||
|
||||
if timezone, err := bucketAgg.Settings.Get("timeZone").String(); err == nil {
|
||||
if timezone != "utc" {
|
||||
a.TimeZone = timezone
|
||||
}
|
||||
}
|
||||
|
||||
aggBuilder = b
|
||||
})
|
||||
|
||||
return aggBuilder
|
||||
}
|
||||
|
||||
// addHistogramAgg adds a histogram aggregation to the aggregation builder
|
||||
func addHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg) es.AggBuilder {
|
||||
aggBuilder.Histogram(bucketAgg.ID, bucketAgg.Field, func(a *es.HistogramAgg, b es.AggBuilder) {
|
||||
a.Interval = stringToFloatWithDefaultValue(bucketAgg.Settings.Get("interval").MustString(), 1000)
|
||||
a.MinDocCount = bucketAgg.Settings.Get("min_doc_count").MustInt(0)
|
||||
|
||||
if missing, err := bucketAgg.Settings.Get("missing").Int(); err == nil {
|
||||
a.Missing = &missing
|
||||
}
|
||||
|
||||
aggBuilder = b
|
||||
})
|
||||
|
||||
return aggBuilder
|
||||
}
|
||||
|
||||
// addTermsAgg adds a terms aggregation to the aggregation builder
|
||||
func addTermsAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, metrics []*MetricAgg) es.AggBuilder {
|
||||
aggBuilder.Terms(bucketAgg.ID, bucketAgg.Field, func(a *es.TermsAggregation, b es.AggBuilder) {
|
||||
if size, err := bucketAgg.Settings.Get("size").Int(); err == nil {
|
||||
a.Size = size
|
||||
} else {
|
||||
a.Size = stringToIntWithDefaultValue(bucketAgg.Settings.Get("size").MustString(), defaultSize)
|
||||
}
|
||||
|
||||
if minDocCount, err := bucketAgg.Settings.Get("min_doc_count").Int(); err == nil {
|
||||
a.MinDocCount = &minDocCount
|
||||
}
|
||||
if missing, err := bucketAgg.Settings.Get("missing").String(); err == nil {
|
||||
a.Missing = &missing
|
||||
}
|
||||
|
||||
if orderBy, err := bucketAgg.Settings.Get("orderBy").String(); err == nil {
|
||||
/*
|
||||
The format for extended stats and percentiles is {metricId}[bucket_path]
|
||||
for everything else it's just {metricId}, _count, _term, or _key
|
||||
*/
|
||||
metricIdRegex := regexp.MustCompile(`^(\d+)`)
|
||||
metricId := metricIdRegex.FindString(orderBy)
|
||||
|
||||
if len(metricId) > 0 {
|
||||
for _, m := range metrics {
|
||||
if m.ID == metricId {
|
||||
if m.Type == "count" {
|
||||
a.Order["_count"] = bucketAgg.Settings.Get("order").MustString("desc")
|
||||
} else {
|
||||
a.Order[orderBy] = bucketAgg.Settings.Get("order").MustString("desc")
|
||||
b.Metric(m.ID, m.Type, m.Field, nil)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
} else {
|
||||
a.Order[orderBy] = bucketAgg.Settings.Get("order").MustString("desc")
|
||||
}
|
||||
}
|
||||
|
||||
aggBuilder = b
|
||||
})
|
||||
|
||||
return aggBuilder
|
||||
}
|
||||
|
||||
// addNestedAgg adds a nested aggregation to the aggregation builder
|
||||
func addNestedAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg) es.AggBuilder {
|
||||
aggBuilder.Nested(bucketAgg.ID, bucketAgg.Field, func(a *es.NestedAggregation, b es.AggBuilder) {
|
||||
aggBuilder = b
|
||||
})
|
||||
|
||||
return aggBuilder
|
||||
}
|
||||
|
||||
// addFiltersAgg adds a filters aggregation to the aggregation builder
|
||||
func addFiltersAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg) es.AggBuilder {
|
||||
filters := make(map[string]any)
|
||||
for _, filter := range bucketAgg.Settings.Get("filters").MustArray() {
|
||||
json := simplejson.NewFromAny(filter)
|
||||
query := json.Get("query").MustString()
|
||||
label := json.Get("label").MustString()
|
||||
if label == "" {
|
||||
label = query
|
||||
}
|
||||
filters[label] = &es.QueryStringFilter{Query: query, AnalyzeWildcard: true}
|
||||
}
|
||||
|
||||
if len(filters) > 0 {
|
||||
aggBuilder.Filters(bucketAgg.ID, func(a *es.FiltersAggregation, b es.AggBuilder) {
|
||||
a.Filters = filters
|
||||
aggBuilder = b
|
||||
})
|
||||
}
|
||||
|
||||
return aggBuilder
|
||||
}
|
||||
|
||||
// addGeoHashGridAgg adds a geohash grid aggregation to the aggregation builder
|
||||
func addGeoHashGridAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg) es.AggBuilder {
|
||||
aggBuilder.GeoHashGrid(bucketAgg.ID, bucketAgg.Field, func(a *es.GeoHashGridAggregation, b es.AggBuilder) {
|
||||
a.Precision = stringToIntWithDefaultValue(bucketAgg.Settings.Get("precision").MustString(), es.DefaultGeoHashPrecision)
|
||||
aggBuilder = b
|
||||
})
|
||||
|
||||
return aggBuilder
|
||||
}
|
||||
|
||||
// isCalendarInterval checks if the interval is a calendar interval
|
||||
func isCalendarInterval(interval string) bool {
|
||||
calendarIntervals := []string{"1w", "1M", "1q", "1y"}
|
||||
for _, ci := range calendarIntervals {
|
||||
if interval == ci {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
@@ -6,16 +6,12 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"regexp"
|
||||
"slices"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
|
||||
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
|
||||
)
|
||||
|
||||
@@ -108,436 +104,3 @@ func (e *elasticsearchDataQuery) execute() (*backend.QueryDataResponse, error) {
|
||||
|
||||
return parseResponse(e.ctx, res.Responses, queries, e.client.GetConfiguredFields(), e.keepLabelsInResponse, e.logger)
|
||||
}
|
||||
|
||||
func (e *elasticsearchDataQuery) processQuery(q *Query, ms *es.MultiSearchRequestBuilder, from, to int64) error {
|
||||
err := isQueryWithError(q)
|
||||
if err != nil {
|
||||
return backend.DownstreamError(fmt.Errorf("received invalid query. %w", err))
|
||||
}
|
||||
|
||||
defaultTimeField := e.client.GetConfiguredFields().TimeField
|
||||
b := ms.Search(q.Interval, q.TimeRange)
|
||||
b.Size(0)
|
||||
filters := b.Query().Bool().Filter()
|
||||
filters.AddDateRangeFilter(defaultTimeField, to, from, es.DateFormatEpochMS)
|
||||
filters.AddQueryStringFilter(q.RawQuery, true)
|
||||
|
||||
if isLogsQuery(q) {
|
||||
processLogsQuery(q, b, from, to, defaultTimeField)
|
||||
} else if isDocumentQuery(q) {
|
||||
processDocumentQuery(q, b, from, to, defaultTimeField)
|
||||
} else {
|
||||
// Otherwise, it is a time series query and we process it
|
||||
processTimeSeriesQuery(q, b, from, to, defaultTimeField)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func setFloatPath(settings *simplejson.Json, path ...string) {
|
||||
if stringValue, err := settings.GetPath(path...).String(); err == nil {
|
||||
if value, err := strconv.ParseFloat(stringValue, 64); err == nil {
|
||||
settings.SetPath(path, value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func setIntPath(settings *simplejson.Json, path ...string) {
|
||||
if stringValue, err := settings.GetPath(path...).String(); err == nil {
|
||||
if value, err := strconv.ParseInt(stringValue, 10, 64); err == nil {
|
||||
settings.SetPath(path, value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Casts values to float when required by Elastic's query DSL
|
||||
func (metricAggregation MetricAgg) generateSettingsForDSL() map[string]any {
|
||||
switch metricAggregation.Type {
|
||||
case "moving_avg":
|
||||
setFloatPath(metricAggregation.Settings, "window")
|
||||
setFloatPath(metricAggregation.Settings, "predict")
|
||||
setFloatPath(metricAggregation.Settings, "settings", "alpha")
|
||||
setFloatPath(metricAggregation.Settings, "settings", "beta")
|
||||
setFloatPath(metricAggregation.Settings, "settings", "gamma")
|
||||
setFloatPath(metricAggregation.Settings, "settings", "period")
|
||||
case "serial_diff":
|
||||
setFloatPath(metricAggregation.Settings, "lag")
|
||||
}
|
||||
|
||||
if isMetricAggregationWithInlineScriptSupport(metricAggregation.Type) {
|
||||
scriptValue, err := metricAggregation.Settings.GetPath("script").String()
|
||||
if err != nil {
|
||||
// the script is stored using the old format : `script:{inline: "value"}` or is not set
|
||||
scriptValue, err = metricAggregation.Settings.GetPath("script", "inline").String()
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
metricAggregation.Settings.SetPath([]string{"script"}, scriptValue)
|
||||
}
|
||||
}
|
||||
|
||||
return metricAggregation.Settings.MustMap()
|
||||
}
|
||||
|
||||
func (bucketAgg BucketAgg) generateSettingsForDSL() map[string]any {
|
||||
setIntPath(bucketAgg.Settings, "min_doc_count")
|
||||
|
||||
return bucketAgg.Settings.MustMap()
|
||||
}
|
||||
|
||||
func addDateHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, timeFrom, timeTo int64, timeField string) es.AggBuilder {
|
||||
// If no field is specified, use the time field
|
||||
field := bucketAgg.Field
|
||||
if field == "" {
|
||||
field = timeField
|
||||
}
|
||||
aggBuilder.DateHistogram(bucketAgg.ID, field, func(a *es.DateHistogramAgg, b es.AggBuilder) {
|
||||
var interval = bucketAgg.Settings.Get("interval").MustString("auto")
|
||||
if slices.Contains(es.GetCalendarIntervals(), interval) {
|
||||
a.CalendarInterval = interval
|
||||
} else {
|
||||
if interval == "auto" {
|
||||
// note this is not really a valid grafana-variable-handling,
|
||||
// because normally this would not match `$__interval_ms`,
|
||||
// but because how we apply these in the go-code, this will work
|
||||
// correctly, and becomes something like `500ms`.
|
||||
// a nicer way would be to use `${__interval_ms}ms`, but
|
||||
// that format is not recognized where we apply these variables
|
||||
// in the elasticsearch datasource
|
||||
a.FixedInterval = "$__interval_msms"
|
||||
} else {
|
||||
a.FixedInterval = interval
|
||||
}
|
||||
}
|
||||
a.MinDocCount = bucketAgg.Settings.Get("min_doc_count").MustInt(0)
|
||||
a.ExtendedBounds = &es.ExtendedBounds{Min: timeFrom, Max: timeTo}
|
||||
a.Format = bucketAgg.Settings.Get("format").MustString(es.DateFormatEpochMS)
|
||||
|
||||
if offset, err := bucketAgg.Settings.Get("offset").String(); err == nil {
|
||||
a.Offset = offset
|
||||
}
|
||||
|
||||
if missing, err := bucketAgg.Settings.Get("missing").String(); err == nil {
|
||||
a.Missing = &missing
|
||||
}
|
||||
|
||||
if timezone, err := bucketAgg.Settings.Get("timeZone").String(); err == nil {
|
||||
if timezone != "utc" {
|
||||
a.TimeZone = timezone
|
||||
}
|
||||
}
|
||||
|
||||
aggBuilder = b
|
||||
})
|
||||
|
||||
return aggBuilder
|
||||
}
|
||||
|
||||
func addHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg) es.AggBuilder {
|
||||
aggBuilder.Histogram(bucketAgg.ID, bucketAgg.Field, func(a *es.HistogramAgg, b es.AggBuilder) {
|
||||
a.Interval = stringToFloatWithDefaultValue(bucketAgg.Settings.Get("interval").MustString(), 1000)
|
||||
a.MinDocCount = bucketAgg.Settings.Get("min_doc_count").MustInt(0)
|
||||
|
||||
if missing, err := bucketAgg.Settings.Get("missing").Int(); err == nil {
|
||||
a.Missing = &missing
|
||||
}
|
||||
|
||||
aggBuilder = b
|
||||
})
|
||||
|
||||
return aggBuilder
|
||||
}
|
||||
|
||||
func addTermsAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, metrics []*MetricAgg) es.AggBuilder {
|
||||
aggBuilder.Terms(bucketAgg.ID, bucketAgg.Field, func(a *es.TermsAggregation, b es.AggBuilder) {
|
||||
if size, err := bucketAgg.Settings.Get("size").Int(); err == nil {
|
||||
a.Size = size
|
||||
} else {
|
||||
a.Size = stringToIntWithDefaultValue(bucketAgg.Settings.Get("size").MustString(), defaultSize)
|
||||
}
|
||||
|
||||
if minDocCount, err := bucketAgg.Settings.Get("min_doc_count").Int(); err == nil {
|
||||
a.MinDocCount = &minDocCount
|
||||
}
|
||||
if missing, err := bucketAgg.Settings.Get("missing").String(); err == nil {
|
||||
a.Missing = &missing
|
||||
}
|
||||
|
||||
if orderBy, err := bucketAgg.Settings.Get("orderBy").String(); err == nil {
|
||||
/*
|
||||
The format for extended stats and percentiles is {metricId}[bucket_path]
|
||||
for everything else it's just {metricId}, _count, _term, or _key
|
||||
*/
|
||||
metricIdRegex := regexp.MustCompile(`^(\d+)`)
|
||||
metricId := metricIdRegex.FindString(orderBy)
|
||||
|
||||
if len(metricId) > 0 {
|
||||
for _, m := range metrics {
|
||||
if m.ID == metricId {
|
||||
if m.Type == "count" {
|
||||
a.Order["_count"] = bucketAgg.Settings.Get("order").MustString("desc")
|
||||
} else {
|
||||
a.Order[orderBy] = bucketAgg.Settings.Get("order").MustString("desc")
|
||||
b.Metric(m.ID, m.Type, m.Field, nil)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
} else {
|
||||
a.Order[orderBy] = bucketAgg.Settings.Get("order").MustString("desc")
|
||||
}
|
||||
}
|
||||
|
||||
aggBuilder = b
|
||||
})
|
||||
|
||||
return aggBuilder
|
||||
}
|
||||
|
||||
func addNestedAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg) es.AggBuilder {
|
||||
aggBuilder.Nested(bucketAgg.ID, bucketAgg.Field, func(a *es.NestedAggregation, b es.AggBuilder) {
|
||||
aggBuilder = b
|
||||
})
|
||||
|
||||
return aggBuilder
|
||||
}
|
||||
|
||||
func addFiltersAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg) es.AggBuilder {
|
||||
filters := make(map[string]any)
|
||||
for _, filter := range bucketAgg.Settings.Get("filters").MustArray() {
|
||||
json := simplejson.NewFromAny(filter)
|
||||
query := json.Get("query").MustString()
|
||||
label := json.Get("label").MustString()
|
||||
if label == "" {
|
||||
label = query
|
||||
}
|
||||
filters[label] = &es.QueryStringFilter{Query: query, AnalyzeWildcard: true}
|
||||
}
|
||||
|
||||
if len(filters) > 0 {
|
||||
aggBuilder.Filters(bucketAgg.ID, func(a *es.FiltersAggregation, b es.AggBuilder) {
|
||||
a.Filters = filters
|
||||
aggBuilder = b
|
||||
})
|
||||
}
|
||||
|
||||
return aggBuilder
|
||||
}
|
||||
|
||||
func addGeoHashGridAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg) es.AggBuilder {
|
||||
aggBuilder.GeoHashGrid(bucketAgg.ID, bucketAgg.Field, func(a *es.GeoHashGridAggregation, b es.AggBuilder) {
|
||||
a.Precision = stringToIntWithDefaultValue(bucketAgg.Settings.Get("precision").MustString(), es.DefaultGeoHashPrecision)
|
||||
aggBuilder = b
|
||||
})
|
||||
|
||||
return aggBuilder
|
||||
}
|
||||
|
||||
func getPipelineAggField(m *MetricAgg) string {
|
||||
// In frontend we are using Field as pipelineAggField
|
||||
// There might be historical reason why in backend we were using PipelineAggregate as pipelineAggField
|
||||
// So for now let's check Field first and then PipelineAggregate to ensure that we are not breaking anything
|
||||
// TODO: Investigate, if we can remove check for PipelineAggregate
|
||||
pipelineAggField := m.Field
|
||||
|
||||
if pipelineAggField == "" {
|
||||
pipelineAggField = m.PipelineAggregate
|
||||
}
|
||||
return pipelineAggField
|
||||
}
|
||||
|
||||
func isQueryWithError(query *Query) error {
|
||||
if len(query.BucketAggs) == 0 {
|
||||
// If no aggregations, only document and logs queries are valid
|
||||
if len(query.Metrics) == 0 || (!isLogsQuery(query) && !isDocumentQuery(query)) {
|
||||
return fmt.Errorf("invalid query, missing metrics and aggregations")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func isLogsQuery(query *Query) bool {
|
||||
return query.Metrics[0].Type == logsType
|
||||
}
|
||||
|
||||
func isDocumentQuery(query *Query) bool {
|
||||
return isRawDataQuery(query) || isRawDocumentQuery(query)
|
||||
}
|
||||
|
||||
func isRawDataQuery(query *Query) bool {
|
||||
return query.Metrics[0].Type == rawDataType
|
||||
}
|
||||
|
||||
func isRawDocumentQuery(query *Query) bool {
|
||||
return query.Metrics[0].Type == rawDocumentType
|
||||
}
|
||||
|
||||
func processLogsQuery(q *Query, b *es.SearchRequestBuilder, from, to int64, defaultTimeField string) {
|
||||
metric := q.Metrics[0]
|
||||
sort := es.SortOrderDesc
|
||||
if metric.Settings.Get("sortDirection").MustString() == "asc" {
|
||||
// This is currently used only for log context query
|
||||
sort = es.SortOrderAsc
|
||||
}
|
||||
b.Sort(sort, defaultTimeField, "boolean")
|
||||
b.Sort(sort, "_doc", "")
|
||||
b.AddDocValueField(defaultTimeField)
|
||||
// We need to add timeField as field with standardized time format to not receive
|
||||
// invalid formats that elasticsearch can parse, but our frontend can't (e.g. yyyy_MM_dd_HH_mm_ss)
|
||||
b.AddTimeFieldWithStandardizedFormat(defaultTimeField)
|
||||
b.Size(stringToIntWithDefaultValue(metric.Settings.Get("limit").MustString(), defaultSize))
|
||||
b.AddHighlight()
|
||||
|
||||
// This is currently used only for log context query to get
|
||||
// log lines before and after the selected log line
|
||||
searchAfter := metric.Settings.Get("searchAfter").MustArray()
|
||||
for _, value := range searchAfter {
|
||||
b.AddSearchAfter(value)
|
||||
}
|
||||
|
||||
// For log query, we add a date histogram aggregation
|
||||
aggBuilder := b.Agg()
|
||||
q.BucketAggs = append(q.BucketAggs, &BucketAgg{
|
||||
Type: dateHistType,
|
||||
Field: defaultTimeField,
|
||||
ID: "1",
|
||||
Settings: simplejson.NewFromAny(map[string]any{
|
||||
"interval": "auto",
|
||||
}),
|
||||
})
|
||||
bucketAgg := q.BucketAggs[0]
|
||||
bucketAgg.Settings = simplejson.NewFromAny(
|
||||
bucketAgg.generateSettingsForDSL(),
|
||||
)
|
||||
_ = addDateHistogramAgg(aggBuilder, bucketAgg, from, to, defaultTimeField)
|
||||
}
|
||||
|
||||
func processDocumentQuery(q *Query, b *es.SearchRequestBuilder, from, to int64, defaultTimeField string) {
|
||||
metric := q.Metrics[0]
|
||||
b.Sort(es.SortOrderDesc, defaultTimeField, "boolean")
|
||||
b.Sort(es.SortOrderDesc, "_doc", "")
|
||||
b.AddDocValueField(defaultTimeField)
|
||||
if isRawDataQuery(q) {
|
||||
// For raw_data queries we need to add timeField as field with standardized time format to not receive
|
||||
// invalid formats that elasticsearch can parse, but our frontend can't (e.g. yyyy_MM_dd_HH_mm_ss)
|
||||
b.AddTimeFieldWithStandardizedFormat(defaultTimeField)
|
||||
}
|
||||
b.Size(stringToIntWithDefaultValue(metric.Settings.Get("size").MustString(), defaultSize))
|
||||
}
|
||||
|
||||
func processTimeSeriesQuery(q *Query, b *es.SearchRequestBuilder, from, to int64, defaultTimeField string) {
|
||||
aggBuilder := b.Agg()
|
||||
// Process buckets
|
||||
// iterate backwards to create aggregations bottom-down
|
||||
for _, bucketAgg := range q.BucketAggs {
|
||||
bucketAgg.Settings = simplejson.NewFromAny(
|
||||
bucketAgg.generateSettingsForDSL(),
|
||||
)
|
||||
switch bucketAgg.Type {
|
||||
case dateHistType:
|
||||
aggBuilder = addDateHistogramAgg(aggBuilder, bucketAgg, from, to, defaultTimeField)
|
||||
case histogramType:
|
||||
aggBuilder = addHistogramAgg(aggBuilder, bucketAgg)
|
||||
case filtersType:
|
||||
aggBuilder = addFiltersAgg(aggBuilder, bucketAgg)
|
||||
case termsType:
|
||||
aggBuilder = addTermsAgg(aggBuilder, bucketAgg, q.Metrics)
|
||||
case geohashGridType:
|
||||
aggBuilder = addGeoHashGridAgg(aggBuilder, bucketAgg)
|
||||
case nestedType:
|
||||
aggBuilder = addNestedAgg(aggBuilder, bucketAgg)
|
||||
}
|
||||
}
|
||||
|
||||
// Process metrics
|
||||
for _, m := range q.Metrics {
|
||||
m := m
|
||||
|
||||
if m.Type == countType {
|
||||
continue
|
||||
}
|
||||
|
||||
if isPipelineAgg(m.Type) {
|
||||
if isPipelineAggWithMultipleBucketPaths(m.Type) {
|
||||
if len(m.PipelineVariables) > 0 {
|
||||
bucketPaths := map[string]any{}
|
||||
for name, pipelineAgg := range m.PipelineVariables {
|
||||
if _, err := strconv.Atoi(pipelineAgg); err == nil {
|
||||
var appliedAgg *MetricAgg
|
||||
for _, pipelineMetric := range q.Metrics {
|
||||
if pipelineMetric.ID == pipelineAgg {
|
||||
appliedAgg = pipelineMetric
|
||||
break
|
||||
}
|
||||
}
|
||||
if appliedAgg != nil {
|
||||
if appliedAgg.Type == countType {
|
||||
bucketPaths[name] = "_count"
|
||||
} else {
|
||||
bucketPaths[name] = pipelineAgg
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
aggBuilder.Pipeline(m.ID, m.Type, bucketPaths, func(a *es.PipelineAggregation) {
|
||||
a.Settings = m.generateSettingsForDSL()
|
||||
})
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
pipelineAggField := getPipelineAggField(m)
|
||||
if _, err := strconv.Atoi(pipelineAggField); err == nil {
|
||||
var appliedAgg *MetricAgg
|
||||
for _, pipelineMetric := range q.Metrics {
|
||||
if pipelineMetric.ID == pipelineAggField {
|
||||
appliedAgg = pipelineMetric
|
||||
break
|
||||
}
|
||||
}
|
||||
if appliedAgg != nil {
|
||||
bucketPath := pipelineAggField
|
||||
if appliedAgg.Type == countType {
|
||||
bucketPath = "_count"
|
||||
}
|
||||
|
||||
aggBuilder.Pipeline(m.ID, m.Type, bucketPath, func(a *es.PipelineAggregation) {
|
||||
a.Settings = m.generateSettingsForDSL()
|
||||
})
|
||||
}
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
}
|
||||
} else {
|
||||
aggBuilder.Metric(m.ID, m.Type, m.Field, func(a *es.MetricAggregation) {
|
||||
a.Settings = m.generateSettingsForDSL()
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func stringToIntWithDefaultValue(valueStr string, defaultValue int) int {
|
||||
value, err := strconv.Atoi(valueStr)
|
||||
if err != nil {
|
||||
value = defaultValue
|
||||
}
|
||||
// In our case, 0 is not a valid value and in this case we default to defaultValue
|
||||
if value == 0 {
|
||||
value = defaultValue
|
||||
}
|
||||
return value
|
||||
}
|
||||
|
||||
func stringToFloatWithDefaultValue(valueStr string, defaultValue float64) float64 {
|
||||
value, err := strconv.ParseFloat(valueStr, 64)
|
||||
if err != nil {
|
||||
value = defaultValue
|
||||
}
|
||||
// In our case, 0 is not a valid value and in this case we default to defaultValue
|
||||
if value == 0 {
|
||||
value = defaultValue
|
||||
}
|
||||
return value
|
||||
}
|
||||
|
||||
@@ -0,0 +1,199 @@
|
||||
package elasticsearch
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
|
||||
)
|
||||
|
||||
// processQuery processes a single query and adds it to the multi-search request builder
|
||||
func (e *elasticsearchDataQuery) processQuery(q *Query, ms *es.MultiSearchRequestBuilder, from, to int64) error {
|
||||
err := isQueryWithError(q)
|
||||
if err != nil {
|
||||
return backend.DownstreamError(fmt.Errorf("received invalid query. %w", err))
|
||||
}
|
||||
|
||||
defaultTimeField := e.client.GetConfiguredFields().TimeField
|
||||
b := ms.Search(q.Interval, q.TimeRange)
|
||||
b.Size(0)
|
||||
filters := b.Query().Bool().Filter()
|
||||
filters.AddDateRangeFilter(defaultTimeField, to, from, es.DateFormatEpochMS)
|
||||
filters.AddQueryStringFilter(q.RawQuery, true)
|
||||
|
||||
if isLogsQuery(q) {
|
||||
processLogsQuery(q, b, from, to, defaultTimeField)
|
||||
} else if isDocumentQuery(q) {
|
||||
processDocumentQuery(q, b, from, to, defaultTimeField)
|
||||
} else {
|
||||
// Otherwise, it is a time series query and we process it
|
||||
processTimeSeriesQuery(q, b, from, to, defaultTimeField)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// processLogsQuery processes a logs query and configures the search request accordingly
|
||||
func processLogsQuery(q *Query, b *es.SearchRequestBuilder, from, to int64, defaultTimeField string) {
|
||||
metric := q.Metrics[0]
|
||||
sort := es.SortOrderDesc
|
||||
if metric.Settings.Get("sortDirection").MustString() == "asc" {
|
||||
// This is currently used only for log context query
|
||||
sort = es.SortOrderAsc
|
||||
}
|
||||
b.Sort(sort, defaultTimeField, "boolean")
|
||||
b.Sort(sort, "_doc", "")
|
||||
b.AddDocValueField(defaultTimeField)
|
||||
// We need to add timeField as field with standardized time format to not receive
|
||||
// invalid formats that elasticsearch can parse, but our frontend can't (e.g. yyyy_MM_dd_HH_mm_ss)
|
||||
b.AddTimeFieldWithStandardizedFormat(defaultTimeField)
|
||||
b.Size(stringToIntWithDefaultValue(metric.Settings.Get("limit").MustString(), defaultSize))
|
||||
b.AddHighlight()
|
||||
|
||||
// This is currently used only for log context query to get
|
||||
// log lines before and after the selected log line
|
||||
searchAfter := metric.Settings.Get("searchAfter").MustArray()
|
||||
for _, value := range searchAfter {
|
||||
b.AddSearchAfter(value)
|
||||
}
|
||||
|
||||
// For log query, we add a date histogram aggregation
|
||||
aggBuilder := b.Agg()
|
||||
q.BucketAggs = append(q.BucketAggs, &BucketAgg{
|
||||
Type: dateHistType,
|
||||
Field: defaultTimeField,
|
||||
ID: "1",
|
||||
Settings: simplejson.NewFromAny(map[string]any{
|
||||
"interval": "auto",
|
||||
}),
|
||||
})
|
||||
bucketAgg := q.BucketAggs[0]
|
||||
bucketAgg.Settings = simplejson.NewFromAny(
|
||||
bucketAgg.generateSettingsForDSL(),
|
||||
)
|
||||
_ = addDateHistogramAgg(aggBuilder, bucketAgg, from, to, defaultTimeField)
|
||||
}
|
||||
|
||||
// processDocumentQuery processes a document query (raw_data or raw_document) and configures the search request
|
||||
func processDocumentQuery(q *Query, b *es.SearchRequestBuilder, from, to int64, defaultTimeField string) {
|
||||
metric := q.Metrics[0]
|
||||
b.Sort(es.SortOrderDesc, defaultTimeField, "boolean")
|
||||
b.Sort(es.SortOrderDesc, "_doc", "")
|
||||
b.AddDocValueField(defaultTimeField)
|
||||
if isRawDataQuery(q) {
|
||||
// For raw_data queries we need to add timeField as field with standardized time format to not receive
|
||||
// invalid formats that elasticsearch can parse, but our frontend can't (e.g. yyyy_MM_dd_HH_mm_ss)
|
||||
b.AddTimeFieldWithStandardizedFormat(defaultTimeField)
|
||||
}
|
||||
b.Size(stringToIntWithDefaultValue(metric.Settings.Get("size").MustString(), defaultSize))
|
||||
}
|
||||
|
||||
// processTimeSeriesQuery processes a time series query with aggregations and metrics
|
||||
func processTimeSeriesQuery(q *Query, b *es.SearchRequestBuilder, from, to int64, defaultTimeField string) {
|
||||
aggBuilder := b.Agg()
|
||||
// Process buckets
|
||||
// iterate backwards to create aggregations bottom-down
|
||||
for _, bucketAgg := range q.BucketAggs {
|
||||
bucketAgg.Settings = simplejson.NewFromAny(
|
||||
bucketAgg.generateSettingsForDSL(),
|
||||
)
|
||||
switch bucketAgg.Type {
|
||||
case dateHistType:
|
||||
aggBuilder = addDateHistogramAgg(aggBuilder, bucketAgg, from, to, defaultTimeField)
|
||||
case histogramType:
|
||||
aggBuilder = addHistogramAgg(aggBuilder, bucketAgg)
|
||||
case filtersType:
|
||||
aggBuilder = addFiltersAgg(aggBuilder, bucketAgg)
|
||||
case termsType:
|
||||
aggBuilder = addTermsAgg(aggBuilder, bucketAgg, q.Metrics)
|
||||
case geohashGridType:
|
||||
aggBuilder = addGeoHashGridAgg(aggBuilder, bucketAgg)
|
||||
case nestedType:
|
||||
aggBuilder = addNestedAgg(aggBuilder, bucketAgg)
|
||||
}
|
||||
}
|
||||
|
||||
// Process metrics
|
||||
for _, m := range q.Metrics {
|
||||
m := m
|
||||
|
||||
if m.Type == countType {
|
||||
continue
|
||||
}
|
||||
|
||||
if isPipelineAgg(m.Type) {
|
||||
if isPipelineAggWithMultipleBucketPaths(m.Type) {
|
||||
if len(m.PipelineVariables) > 0 {
|
||||
bucketPaths := map[string]any{}
|
||||
for name, pipelineAgg := range m.PipelineVariables {
|
||||
if _, err := strconv.Atoi(pipelineAgg); err == nil {
|
||||
var appliedAgg *MetricAgg
|
||||
for _, pipelineMetric := range q.Metrics {
|
||||
if pipelineMetric.ID == pipelineAgg {
|
||||
appliedAgg = pipelineMetric
|
||||
break
|
||||
}
|
||||
}
|
||||
if appliedAgg != nil {
|
||||
if appliedAgg.Type == countType {
|
||||
bucketPaths[name] = "_count"
|
||||
} else {
|
||||
bucketPaths[name] = pipelineAgg
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
aggBuilder.Pipeline(m.ID, m.Type, bucketPaths, func(a *es.PipelineAggregation) {
|
||||
a.Settings = m.generateSettingsForDSL()
|
||||
})
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
pipelineAggField := getPipelineAggField(m)
|
||||
if _, err := strconv.Atoi(pipelineAggField); err == nil {
|
||||
var appliedAgg *MetricAgg
|
||||
for _, pipelineMetric := range q.Metrics {
|
||||
if pipelineMetric.ID == pipelineAggField {
|
||||
appliedAgg = pipelineMetric
|
||||
break
|
||||
}
|
||||
}
|
||||
if appliedAgg != nil {
|
||||
bucketPath := pipelineAggField
|
||||
if appliedAgg.Type == countType {
|
||||
bucketPath = "_count"
|
||||
}
|
||||
|
||||
aggBuilder.Pipeline(m.ID, m.Type, bucketPath, func(a *es.PipelineAggregation) {
|
||||
a.Settings = m.generateSettingsForDSL()
|
||||
})
|
||||
}
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
}
|
||||
} else {
|
||||
aggBuilder.Metric(m.ID, m.Type, m.Field, func(a *es.MetricAggregation) {
|
||||
a.Settings = m.generateSettingsForDSL()
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// getPipelineAggField returns the pipeline aggregation field
|
||||
func getPipelineAggField(m *MetricAgg) string {
|
||||
// In frontend we are using Field as pipelineAggField
|
||||
// There might be historical reason why in backend we were using PipelineAggregate as pipelineAggField
|
||||
// So for now let's check Field first and then PipelineAggregate to ensure that we are not breaking anything
|
||||
// TODO: Investigate, if we can remove check for PipelineAggregate
|
||||
pipelineAggField := m.Field
|
||||
|
||||
if pipelineAggField == "" {
|
||||
pipelineAggField = m.PipelineAggregate
|
||||
}
|
||||
return pipelineAggField
|
||||
}
|
||||
@@ -0,0 +1,87 @@
|
||||
package elasticsearch
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
)
|
||||
|
||||
// setFloatPath converts a string value at the specified path to float64
|
||||
func setFloatPath(settings *simplejson.Json, path ...string) {
|
||||
if stringValue, err := settings.GetPath(path...).String(); err == nil {
|
||||
if value, err := strconv.ParseFloat(stringValue, 64); err == nil {
|
||||
settings.SetPath(path, value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// setIntPath converts a string value at the specified path to int64
|
||||
func setIntPath(settings *simplejson.Json, path ...string) {
|
||||
if stringValue, err := settings.GetPath(path...).String(); err == nil {
|
||||
if value, err := strconv.ParseInt(stringValue, 10, 64); err == nil {
|
||||
settings.SetPath(path, value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// generateSettingsForDSL casts values to float when required by Elastic's query DSL for MetricAgg
|
||||
func (metricAggregation MetricAgg) generateSettingsForDSL() map[string]any {
|
||||
switch metricAggregation.Type {
|
||||
case "moving_avg":
|
||||
setFloatPath(metricAggregation.Settings, "window")
|
||||
setFloatPath(metricAggregation.Settings, "predict")
|
||||
setFloatPath(metricAggregation.Settings, "settings", "alpha")
|
||||
setFloatPath(metricAggregation.Settings, "settings", "beta")
|
||||
setFloatPath(metricAggregation.Settings, "settings", "gamma")
|
||||
setFloatPath(metricAggregation.Settings, "settings", "period")
|
||||
case "serial_diff":
|
||||
setFloatPath(metricAggregation.Settings, "lag")
|
||||
}
|
||||
|
||||
if isMetricAggregationWithInlineScriptSupport(metricAggregation.Type) {
|
||||
scriptValue, err := metricAggregation.Settings.GetPath("script").String()
|
||||
if err != nil {
|
||||
// the script is stored using the old format : `script:{inline: "value"}` or is not set
|
||||
scriptValue, err = metricAggregation.Settings.GetPath("script", "inline").String()
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
metricAggregation.Settings.SetPath([]string{"script"}, scriptValue)
|
||||
}
|
||||
}
|
||||
|
||||
return metricAggregation.Settings.MustMap()
|
||||
}
|
||||
|
||||
// generateSettingsForDSL converts bucket aggregation settings to DSL format
|
||||
func (bucketAgg BucketAgg) generateSettingsForDSL() map[string]any {
|
||||
setIntPath(bucketAgg.Settings, "min_doc_count")
|
||||
|
||||
return bucketAgg.Settings.MustMap()
|
||||
}
|
||||
|
||||
// stringToIntWithDefaultValue converts a string to int with a default fallback value
|
||||
func stringToIntWithDefaultValue(valueStr string, defaultValue int) int {
|
||||
value, err := strconv.Atoi(valueStr)
|
||||
if err != nil {
|
||||
value = defaultValue
|
||||
}
|
||||
// In our case, 0 is not a valid value and in this case we default to defaultValue
|
||||
if value == 0 {
|
||||
value = defaultValue
|
||||
}
|
||||
return value
|
||||
}
|
||||
|
||||
// stringToFloatWithDefaultValue converts a string to float64 with a default fallback value
|
||||
func stringToFloatWithDefaultValue(valueStr string, defaultValue float64) float64 {
|
||||
value, err := strconv.ParseFloat(valueStr, 64)
|
||||
if err != nil {
|
||||
value = defaultValue
|
||||
}
|
||||
// In our case, 0 is not a valid value and in this case we default to defaultValue
|
||||
if value == 0 {
|
||||
value = defaultValue
|
||||
}
|
||||
return value
|
||||
}
|
||||
@@ -0,0 +1,36 @@
|
||||
package elasticsearch
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// isQueryWithError validates the query and returns an error if invalid
|
||||
func isQueryWithError(query *Query) error {
|
||||
if len(query.BucketAggs) == 0 {
|
||||
// If no aggregations, only document and logs queries are valid
|
||||
if len(query.Metrics) == 0 || (!isLogsQuery(query) && !isDocumentQuery(query)) {
|
||||
return fmt.Errorf("invalid query, missing metrics and aggregations")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// isLogsQuery checks if the query is a logs query
|
||||
func isLogsQuery(query *Query) bool {
|
||||
return len(query.Metrics) > 0 && query.Metrics[0].Type == logsType
|
||||
}
|
||||
|
||||
// isDocumentQuery checks if the query is a document query (raw_data or raw_document)
|
||||
func isDocumentQuery(query *Query) bool {
|
||||
return isRawDataQuery(query) || isRawDocumentQuery(query)
|
||||
}
|
||||
|
||||
// isRawDataQuery checks if the query is a raw_data query
|
||||
func isRawDataQuery(query *Query) bool {
|
||||
return len(query.Metrics) > 0 && query.Metrics[0].Type == rawDataType
|
||||
}
|
||||
|
||||
// isRawDocumentQuery checks if the query is a raw_document query
|
||||
func isRawDocumentQuery(query *Query) bool {
|
||||
return len(query.Metrics) > 0 && query.Metrics[0].Type == rawDocumentType
|
||||
}
|
||||
@@ -0,0 +1,155 @@
|
||||
package elasticsearch
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
|
||||
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
|
||||
)
|
||||
|
||||
// processDocsToDataFrameFields converts documents to data frame fields
|
||||
func processDocsToDataFrameFields(docs []map[string]interface{}, propNames []string, configuredFields es.ConfiguredFields) []*data.Field {
|
||||
size := len(docs)
|
||||
isFilterable := true
|
||||
allFields := make([]*data.Field, len(propNames))
|
||||
timeString := ""
|
||||
timeStringOk := false
|
||||
|
||||
for propNameIdx, propName := range propNames {
|
||||
// Special handling for time field
|
||||
if propName == configuredFields.TimeField {
|
||||
timeVector := make([]*time.Time, size)
|
||||
for i, doc := range docs {
|
||||
// Check if time field is a string
|
||||
timeString, timeStringOk = doc[configuredFields.TimeField].(string)
|
||||
// If not, it might be an array with one time string
|
||||
if !timeStringOk {
|
||||
timeList, ok := doc[configuredFields.TimeField].([]interface{})
|
||||
if !ok || len(timeList) != 1 {
|
||||
continue
|
||||
}
|
||||
// Check if the first element is a string
|
||||
timeString, timeStringOk = timeList[0].(string)
|
||||
if !timeStringOk {
|
||||
continue
|
||||
}
|
||||
}
|
||||
timeValue, err := time.Parse(time.RFC3339Nano, timeString)
|
||||
if err != nil {
|
||||
// We skip time values that cannot be parsed
|
||||
continue
|
||||
} else {
|
||||
timeVector[i] = &timeValue
|
||||
}
|
||||
}
|
||||
field := data.NewField(configuredFields.TimeField, nil, timeVector)
|
||||
field.Config = &data.FieldConfig{Filterable: &isFilterable}
|
||||
allFields[propNameIdx] = field
|
||||
continue
|
||||
}
|
||||
|
||||
propNameValue := findTheFirstNonNilDocValueForPropName(docs, propName)
|
||||
switch propNameValue.(type) {
|
||||
// We are checking for default data types values (float64, int, bool, string)
|
||||
// and default to json.RawMessage if we cannot find any of them
|
||||
case float64:
|
||||
allFields[propNameIdx] = createFieldOfType[float64](docs, propName, size, isFilterable)
|
||||
case int:
|
||||
allFields[propNameIdx] = createFieldOfType[int](docs, propName, size, isFilterable)
|
||||
case string:
|
||||
allFields[propNameIdx] = createFieldOfType[string](docs, propName, size, isFilterable)
|
||||
case bool:
|
||||
allFields[propNameIdx] = createFieldOfType[bool](docs, propName, size, isFilterable)
|
||||
default:
|
||||
fieldVector := make([]*json.RawMessage, size)
|
||||
for i, doc := range docs {
|
||||
bytes, err := json.Marshal(doc[propName])
|
||||
if err != nil {
|
||||
// We skip values that cannot be marshalled
|
||||
continue
|
||||
}
|
||||
value := json.RawMessage(bytes)
|
||||
fieldVector[i] = &value
|
||||
}
|
||||
field := data.NewField(propName, nil, fieldVector)
|
||||
field.Config = &data.FieldConfig{Filterable: &isFilterable}
|
||||
allFields[propNameIdx] = field
|
||||
}
|
||||
}
|
||||
|
||||
return allFields
|
||||
}
|
||||
|
||||
// findTheFirstNonNilDocValueForPropName finds the first non-nil value for propName in docs.
|
||||
// If none of the values are non-nil, it returns the value of propName in the first doc.
|
||||
func findTheFirstNonNilDocValueForPropName(docs []map[string]interface{}, propName string) interface{} {
|
||||
for _, doc := range docs {
|
||||
if doc[propName] != nil {
|
||||
return doc[propName]
|
||||
}
|
||||
}
|
||||
return docs[0][propName]
|
||||
}
|
||||
|
||||
// createFieldOfType creates a data field of the specified type
|
||||
func createFieldOfType[T int | float64 | bool | string](docs []map[string]interface{}, propName string, size int, isFilterable bool) *data.Field {
|
||||
fieldVector := make([]*T, size)
|
||||
for i, doc := range docs {
|
||||
value, ok := doc[propName].(T)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
fieldVector[i] = &value
|
||||
}
|
||||
field := data.NewField(propName, nil, fieldVector)
|
||||
field.Config = &data.FieldConfig{Filterable: &isFilterable}
|
||||
return field
|
||||
}
|
||||
|
||||
// createFields creates data fields from existing frames or from propKeys
|
||||
func createFields(frames data.Frames, propKeys []string) []*data.Field {
|
||||
var fields []*data.Field
|
||||
have := map[string]bool{}
|
||||
|
||||
// collect existing fields
|
||||
for _, frame := range frames {
|
||||
for _, f := range frame.Fields {
|
||||
fields = append(fields, f)
|
||||
have[f.Name] = true
|
||||
}
|
||||
}
|
||||
|
||||
// add missing prop fields
|
||||
for _, pk := range propKeys {
|
||||
if !have[pk] {
|
||||
fields = append(fields, data.NewField(pk, nil, []*string{}))
|
||||
}
|
||||
}
|
||||
|
||||
return fields
|
||||
}
|
||||
|
||||
// createPropKeys creates a sorted list of property keys from a map
|
||||
func createPropKeys(props map[string]string) []string {
|
||||
propKeys := make([]string, 0)
|
||||
for k := range props {
|
||||
propKeys = append(propKeys, k)
|
||||
}
|
||||
sort.Strings(propKeys)
|
||||
return propKeys
|
||||
}
|
||||
|
||||
// getSortedKeys returns sorted keys from a map
|
||||
func getSortedKeys(data map[string]interface{}) []string {
|
||||
keys := make([]string, 0, len(data))
|
||||
|
||||
for k := range data {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
|
||||
sort.Strings(keys)
|
||||
return keys
|
||||
}
|
||||
@@ -0,0 +1,170 @@
|
||||
package elasticsearch
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
)
|
||||
|
||||
// nameFields applies naming logic to data frame fields based on query configuration
|
||||
func nameFields(queryResult backend.DataResponse, target *Query, keepLabelsInResponse bool) {
|
||||
set := make(map[string]struct{})
|
||||
frames := queryResult.Frames
|
||||
for _, v := range frames {
|
||||
for _, vv := range v.Fields {
|
||||
if metricType, exists := vv.Labels["metric"]; exists {
|
||||
if _, ok := set[metricType]; !ok {
|
||||
set[metricType] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
metricTypeCount := len(set)
|
||||
for _, frame := range frames {
|
||||
if frame.Meta != nil && frame.Meta.Type == data.FrameTypeTimeSeriesMulti {
|
||||
// if it is a time-series-multi, it means it has two columns, one is "time",
|
||||
// another is "number"
|
||||
valueField := frame.Fields[1]
|
||||
fieldName := getFieldName(*valueField, target, metricTypeCount)
|
||||
// If we need to keep the labels in the response, to prevent duplication in names and to keep
|
||||
// backward compatibility with alerting and expressions we use DisplayNameFromDS
|
||||
if keepLabelsInResponse {
|
||||
if valueField.Config == nil {
|
||||
valueField.Config = &data.FieldConfig{}
|
||||
}
|
||||
valueField.Config.DisplayNameFromDS = fieldName
|
||||
// If we don't need to keep labels (how frontend mode worked), we use frame.Name and remove labels
|
||||
} else {
|
||||
valueField.Labels = nil
|
||||
frame.Name = fieldName
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// getFieldName generates a field name based on the data field and target configuration
|
||||
func getFieldName(dataField data.Field, target *Query, metricTypeCount int) string {
|
||||
metricType := dataField.Labels["metric"]
|
||||
metricName := getMetricName(metricType)
|
||||
delete(dataField.Labels, "metric")
|
||||
|
||||
field := ""
|
||||
if v, ok := dataField.Labels["field"]; ok {
|
||||
field = v
|
||||
delete(dataField.Labels, "field")
|
||||
}
|
||||
|
||||
if target.Alias != "" {
|
||||
frameName := target.Alias
|
||||
|
||||
subMatches := aliasPatternRegex.FindAllStringSubmatch(target.Alias, -1)
|
||||
for _, subMatch := range subMatches {
|
||||
group := subMatch[0]
|
||||
|
||||
if len(subMatch) > 1 {
|
||||
group = subMatch[1]
|
||||
}
|
||||
|
||||
if strings.Index(group, "term ") == 0 {
|
||||
frameName = strings.Replace(frameName, subMatch[0], dataField.Labels[group[5:]], 1)
|
||||
}
|
||||
if v, ok := dataField.Labels[group]; ok {
|
||||
frameName = strings.Replace(frameName, subMatch[0], v, 1)
|
||||
}
|
||||
if group == "metric" {
|
||||
frameName = strings.Replace(frameName, subMatch[0], metricName, 1)
|
||||
}
|
||||
if group == "field" {
|
||||
frameName = strings.Replace(frameName, subMatch[0], field, 1)
|
||||
}
|
||||
}
|
||||
|
||||
return frameName
|
||||
}
|
||||
// todo, if field and pipelineAgg
|
||||
if isPipelineAgg(metricType) {
|
||||
if metricType != "" && isPipelineAggWithMultipleBucketPaths(metricType) {
|
||||
metricID := ""
|
||||
if v, ok := dataField.Labels["metricId"]; ok {
|
||||
metricID = v
|
||||
}
|
||||
|
||||
for _, metric := range target.Metrics {
|
||||
if metric.ID == metricID {
|
||||
metricName = metric.Settings.Get("script").MustString()
|
||||
for name, pipelineAgg := range metric.PipelineVariables {
|
||||
for _, m := range target.Metrics {
|
||||
if m.ID == pipelineAgg {
|
||||
metricName = strings.ReplaceAll(metricName, "params."+name, describeMetric(m.Type, m.Field))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if field != "" {
|
||||
found := false
|
||||
for _, metric := range target.Metrics {
|
||||
if metric.ID == field {
|
||||
metricName += " " + describeMetric(metric.Type, metric.Field)
|
||||
found = true
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
metricName = "Unset"
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if field != "" {
|
||||
metricName += " " + field
|
||||
}
|
||||
|
||||
delete(dataField.Labels, "metricId")
|
||||
|
||||
if len(dataField.Labels) == 0 {
|
||||
return metricName
|
||||
}
|
||||
|
||||
name := ""
|
||||
for _, v := range getSortedLabelValues(dataField.Labels) {
|
||||
name += v + " "
|
||||
}
|
||||
|
||||
if metricTypeCount == 1 {
|
||||
return strings.TrimSpace(name)
|
||||
}
|
||||
|
||||
return strings.TrimSpace(name) + " " + metricName
|
||||
}
|
||||
|
||||
// getSortedLabelValues sorts label keys and returns the label values in sorted order
|
||||
func getSortedLabelValues(labels data.Labels) []string {
|
||||
keys := make([]string, 0, len(labels))
|
||||
for key := range labels {
|
||||
keys = append(keys, key)
|
||||
}
|
||||
|
||||
sort.Strings(keys)
|
||||
|
||||
values := make([]string, len(keys))
|
||||
for i, key := range keys {
|
||||
values[i] = labels[key]
|
||||
}
|
||||
|
||||
return values
|
||||
}
|
||||
|
||||
// getMetricName returns the display name for a metric type
|
||||
func getMetricName(metric string) string {
|
||||
if text, ok := metricAggType[metric]; ok {
|
||||
return text
|
||||
}
|
||||
|
||||
if text, ok := extendedStats[metric]; ok {
|
||||
return text
|
||||
}
|
||||
|
||||
return metric
|
||||
}
|
||||
@@ -0,0 +1,143 @@
|
||||
package elasticsearch
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sort"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
|
||||
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
|
||||
)
|
||||
|
||||
// logsResponseProcessor handles processing of logs query responses
|
||||
type logsResponseProcessor struct {
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
// newLogsResponseProcessor creates a new logs response processor
|
||||
func newLogsResponseProcessor(logger log.Logger) *logsResponseProcessor {
|
||||
return &logsResponseProcessor{
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
// processLogsResponse processes logs query responses
|
||||
func (p *logsResponseProcessor) processLogsResponse(res *es.SearchResponse, target *Query, configuredFields es.ConfiguredFields, queryRes *backend.DataResponse) error {
|
||||
propNames := make(map[string]bool)
|
||||
docs := make([]map[string]interface{}, len(res.Hits.Hits))
|
||||
searchWords := make(map[string]bool)
|
||||
|
||||
for hitIdx, hit := range res.Hits.Hits {
|
||||
var flattened map[string]interface{}
|
||||
var sourceString string
|
||||
if hit["_source"] != nil {
|
||||
flattened = flatten(hit["_source"].(map[string]interface{}), 10)
|
||||
sourceMarshalled, err := json.Marshal(flattened)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sourceString = string(sourceMarshalled)
|
||||
}
|
||||
|
||||
doc := map[string]interface{}{
|
||||
"_id": hit["_id"],
|
||||
"_type": hit["_type"],
|
||||
"_index": hit["_index"],
|
||||
"sort": hit["sort"],
|
||||
"highlight": hit["highlight"],
|
||||
// In case of logs query we want to have the raw source as a string field so it can be visualized in logs panel
|
||||
"_source": sourceString,
|
||||
}
|
||||
|
||||
for k, v := range flattened {
|
||||
if configuredFields.LogLevelField != "" && k == configuredFields.LogLevelField {
|
||||
doc["level"] = v
|
||||
} else {
|
||||
doc[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
if hit["fields"] != nil {
|
||||
source, ok := hit["fields"].(map[string]interface{})
|
||||
if ok {
|
||||
for k, v := range source {
|
||||
doc[k] = v
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// we are going to add an `id` field with the concatenation of `_id` and `_index`
|
||||
_, ok := doc["id"]
|
||||
if !ok {
|
||||
doc["id"] = fmt.Sprintf("%v#%v", doc["_index"], doc["_id"])
|
||||
}
|
||||
|
||||
for key := range doc {
|
||||
propNames[key] = true
|
||||
}
|
||||
|
||||
// Process highlight to searchWords
|
||||
if highlights, ok := doc["highlight"].(map[string]interface{}); ok {
|
||||
for _, highlight := range highlights {
|
||||
if highlightList, ok := highlight.([]interface{}); ok {
|
||||
for _, highlightValue := range highlightList {
|
||||
str := fmt.Sprintf("%v", highlightValue)
|
||||
matches := searchWordsRegex.FindAllStringSubmatch(str, -1)
|
||||
|
||||
for _, v := range matches {
|
||||
searchWords[v[1]] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
docs[hitIdx] = doc
|
||||
}
|
||||
|
||||
sortedPropNames := sortPropNames(propNames, configuredFields, true)
|
||||
fields := processDocsToDataFrameFields(docs, sortedPropNames, configuredFields)
|
||||
|
||||
frames := data.Frames{}
|
||||
frame := data.NewFrame("", fields...)
|
||||
setPreferredVisType(frame, data.VisTypeLogs)
|
||||
|
||||
var total int
|
||||
if res.Hits.Total != nil {
|
||||
total = res.Hits.Total.Value
|
||||
}
|
||||
setLogsCustomMeta(frame, searchWords, stringToIntWithDefaultValue(target.Metrics[0].Settings.Get("limit").MustString(), defaultSize), total)
|
||||
frames = append(frames, frame)
|
||||
queryRes.Frames = frames
|
||||
|
||||
p.logger.Debug("Processed log query response", "fieldsLength", len(frame.Fields))
|
||||
return nil
|
||||
}
|
||||
|
||||
// setLogsCustomMeta sets custom metadata for logs frames
|
||||
func setLogsCustomMeta(frame *data.Frame, searchWords map[string]bool, limit int, total int) {
|
||||
i := 0
|
||||
searchWordsList := make([]string, len(searchWords))
|
||||
for searchWord := range searchWords {
|
||||
searchWordsList[i] = searchWord
|
||||
i++
|
||||
}
|
||||
sort.Strings(searchWordsList)
|
||||
|
||||
if frame.Meta == nil {
|
||||
frame.Meta = &data.FrameMeta{}
|
||||
}
|
||||
|
||||
if frame.Meta.Custom == nil {
|
||||
frame.Meta.Custom = map[string]interface{}{}
|
||||
}
|
||||
|
||||
frame.Meta.Custom = map[string]interface{}{
|
||||
"searchWords": searchWordsList,
|
||||
"limit": limit,
|
||||
"total": total,
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,725 @@
|
||||
package elasticsearch
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
)
|
||||
|
||||
// metricsResponseProcessor handles processing of metrics query responses
|
||||
type metricsResponseProcessor struct{}
|
||||
|
||||
// newMetricsResponseProcessor creates a new metrics response processor
|
||||
func newMetricsResponseProcessor() *metricsResponseProcessor {
|
||||
return &metricsResponseProcessor{}
|
||||
}
|
||||
|
||||
// processBuckets processes aggregation buckets recursively
|
||||
func (p *metricsResponseProcessor) processBuckets(aggs map[string]interface{}, target *Query,
|
||||
queryResult *backend.DataResponse, props map[string]string, depth int) error {
|
||||
var err error
|
||||
maxDepth := len(target.BucketAggs) - 1
|
||||
|
||||
aggIDs := make([]string, 0)
|
||||
for k := range aggs {
|
||||
aggIDs = append(aggIDs, k)
|
||||
}
|
||||
sort.Strings(aggIDs)
|
||||
for _, aggID := range aggIDs {
|
||||
v := aggs[aggID]
|
||||
aggDef, _ := findAgg(target, aggID)
|
||||
esAgg := simplejson.NewFromAny(v)
|
||||
if aggDef == nil {
|
||||
continue
|
||||
}
|
||||
if aggDef.Type == nestedType {
|
||||
err = p.processBuckets(esAgg.MustMap(), target, queryResult, props, depth+1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if depth == maxDepth {
|
||||
if aggDef.Type == dateHistType {
|
||||
err = p.processMetrics(esAgg, target, queryResult, props)
|
||||
} else {
|
||||
err = p.processAggregationDocs(esAgg, aggDef, target, queryResult, props)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
for _, b := range esAgg.Get("buckets").MustArray() {
|
||||
bucket := simplejson.NewFromAny(b)
|
||||
newProps := make(map[string]string)
|
||||
|
||||
for k, v := range props {
|
||||
newProps[k] = v
|
||||
}
|
||||
|
||||
if key, err := bucket.Get("key").String(); err == nil {
|
||||
newProps[aggDef.Field] = key
|
||||
} else if key, err := bucket.Get("key").Int64(); err == nil {
|
||||
newProps[aggDef.Field] = strconv.FormatInt(key, 10)
|
||||
}
|
||||
|
||||
if key, err := bucket.Get("key_as_string").String(); err == nil {
|
||||
newProps[aggDef.Field] = key
|
||||
}
|
||||
err = p.processBuckets(bucket.MustMap(), target, queryResult, newProps, depth+1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
buckets := esAgg.Get("buckets").MustMap()
|
||||
bucketKeys := make([]string, 0)
|
||||
for k := range buckets {
|
||||
bucketKeys = append(bucketKeys, k)
|
||||
}
|
||||
sort.Strings(bucketKeys)
|
||||
|
||||
for _, bucketKey := range bucketKeys {
|
||||
bucket := simplejson.NewFromAny(buckets[bucketKey])
|
||||
newProps := make(map[string]string)
|
||||
|
||||
for k, v := range props {
|
||||
newProps[k] = v
|
||||
}
|
||||
|
||||
newProps["filter"] = bucketKey
|
||||
|
||||
err = p.processBuckets(bucket.MustMap(), target, queryResult, newProps, depth+1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// processMetrics processes metric aggregations from date histogram buckets
|
||||
func (p *metricsResponseProcessor) processMetrics(esAgg *simplejson.Json, target *Query, query *backend.DataResponse,
|
||||
props map[string]string) error {
|
||||
frames := data.Frames{}
|
||||
esAggBuckets := esAgg.Get("buckets").MustArray()
|
||||
|
||||
jsonBuckets := make([]*simplejson.Json, len(esAggBuckets))
|
||||
|
||||
for i, v := range esAggBuckets {
|
||||
jsonBuckets[i] = simplejson.NewFromAny(v)
|
||||
}
|
||||
|
||||
for _, metric := range target.Metrics {
|
||||
if metric.Hide {
|
||||
continue
|
||||
}
|
||||
|
||||
switch metric.Type {
|
||||
case countType:
|
||||
countFrames, err := p.processCountMetric(jsonBuckets, props)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error processing count metric: %w", err)
|
||||
}
|
||||
frames = append(frames, countFrames...)
|
||||
case percentilesType:
|
||||
percentileFrames, err := p.processPercentilesMetric(metric, jsonBuckets, props)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error processing percentiles metric: %w", err)
|
||||
}
|
||||
frames = append(frames, percentileFrames...)
|
||||
case topMetricsType:
|
||||
topMetricsFrames, err := p.processTopMetricsMetric(metric, jsonBuckets, props)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error processing top metrics metric: %w", err)
|
||||
}
|
||||
frames = append(frames, topMetricsFrames...)
|
||||
case extendedStatsType:
|
||||
extendedStatsFrames, err := p.processExtendedStatsMetric(metric, jsonBuckets, props)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error processing extended stats metric: %w", err)
|
||||
}
|
||||
|
||||
frames = append(frames, extendedStatsFrames...)
|
||||
default:
|
||||
defaultFrames, err := p.processDefaultMetric(metric, jsonBuckets, props)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error processing default metric: %w", err)
|
||||
}
|
||||
frames = append(frames, defaultFrames...)
|
||||
}
|
||||
}
|
||||
if query.Frames != nil {
|
||||
oldFrames := query.Frames
|
||||
frames = append(oldFrames, frames...)
|
||||
}
|
||||
query.Frames = frames
|
||||
return nil
|
||||
}
|
||||
|
||||
// processCountMetric processes count metric aggregations
|
||||
func (p *metricsResponseProcessor) processCountMetric(buckets []*simplejson.Json, props map[string]string) (data.Frames, error) {
|
||||
tags := make(map[string]string, len(props))
|
||||
timeVector := make([]time.Time, 0, len(buckets))
|
||||
values := make([]*float64, 0, len(buckets))
|
||||
|
||||
for _, bucket := range buckets {
|
||||
value := castToFloat(bucket.Get("doc_count"))
|
||||
timeValue, err := getAsTime(bucket.Get("key"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
timeVector = append(timeVector, timeValue)
|
||||
values = append(values, value)
|
||||
}
|
||||
|
||||
for k, v := range props {
|
||||
tags[k] = v
|
||||
}
|
||||
tags["metric"] = countType
|
||||
return data.Frames{newTimeSeriesFrame(timeVector, tags, values)}, nil
|
||||
}
|
||||
|
||||
// processPercentilesMetric processes percentiles metric aggregations
|
||||
func (p *metricsResponseProcessor) processPercentilesMetric(metric *MetricAgg, buckets []*simplejson.Json, props map[string]string) (data.Frames, error) {
|
||||
if len(buckets) == 0 {
|
||||
return data.Frames{}, nil
|
||||
}
|
||||
|
||||
firstBucket := buckets[0]
|
||||
percentiles := firstBucket.GetPath(metric.ID, "values").MustMap()
|
||||
|
||||
percentileKeys := make([]string, 0)
|
||||
for k := range percentiles {
|
||||
percentileKeys = append(percentileKeys, k)
|
||||
}
|
||||
sort.Strings(percentileKeys)
|
||||
|
||||
frames := data.Frames{}
|
||||
|
||||
for _, percentileName := range percentileKeys {
|
||||
tags := make(map[string]string, len(props))
|
||||
timeVector := make([]time.Time, 0, len(buckets))
|
||||
values := make([]*float64, 0, len(buckets))
|
||||
|
||||
for k, v := range props {
|
||||
tags[k] = v
|
||||
}
|
||||
tags["metric"] = "p" + percentileName
|
||||
tags["field"] = metric.Field
|
||||
for _, bucket := range buckets {
|
||||
value := castToFloat(bucket.GetPath(metric.ID, "values", percentileName))
|
||||
key := bucket.Get("key")
|
||||
timeValue, err := getAsTime(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
timeVector = append(timeVector, timeValue)
|
||||
values = append(values, value)
|
||||
}
|
||||
frames = append(frames, newTimeSeriesFrame(timeVector, tags, values))
|
||||
}
|
||||
|
||||
return frames, nil
|
||||
}
|
||||
|
||||
// processTopMetricsMetric processes top_metrics metric aggregations
|
||||
func (p *metricsResponseProcessor) processTopMetricsMetric(metric *MetricAgg, buckets []*simplejson.Json, props map[string]string) (data.Frames, error) {
|
||||
metrics := metric.Settings.Get("metrics").MustArray()
|
||||
|
||||
frames := data.Frames{}
|
||||
|
||||
for _, metricField := range metrics {
|
||||
tags := make(map[string]string, len(props))
|
||||
timeVector := make([]time.Time, 0, len(buckets))
|
||||
values := make([]*float64, 0, len(buckets))
|
||||
for k, v := range props {
|
||||
tags[k] = v
|
||||
}
|
||||
|
||||
tags["field"] = metricField.(string)
|
||||
tags["metric"] = "top_metrics"
|
||||
|
||||
for _, bucket := range buckets {
|
||||
stats := bucket.GetPath(metric.ID, "top")
|
||||
timeValue, err := getAsTime(bucket.Get("key"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
timeVector = append(timeVector, timeValue)
|
||||
|
||||
for _, stat := range stats.MustArray() {
|
||||
stat := stat.(map[string]interface{})
|
||||
|
||||
metrics, hasMetrics := stat["metrics"]
|
||||
if hasMetrics {
|
||||
metrics := metrics.(map[string]interface{})
|
||||
metricValue, hasMetricValue := metrics[metricField.(string)]
|
||||
|
||||
if hasMetricValue && metricValue != nil {
|
||||
v := metricValue.(float64)
|
||||
values = append(values, &v)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
frames = append(frames, newTimeSeriesFrame(timeVector, tags, values))
|
||||
}
|
||||
|
||||
return frames, nil
|
||||
}
|
||||
|
||||
// processExtendedStatsMetric processes extended_stats metric aggregations
|
||||
func (p *metricsResponseProcessor) processExtendedStatsMetric(metric *MetricAgg, buckets []*simplejson.Json, props map[string]string) (data.Frames, error) {
|
||||
metaKeys := make([]string, 0)
|
||||
meta := metric.Meta.MustMap()
|
||||
for k := range meta {
|
||||
metaKeys = append(metaKeys, k)
|
||||
}
|
||||
sort.Strings(metaKeys)
|
||||
|
||||
frames := data.Frames{}
|
||||
|
||||
for _, statName := range metaKeys {
|
||||
v := meta[statName]
|
||||
if enabled, ok := v.(bool); !ok || !enabled {
|
||||
continue
|
||||
}
|
||||
|
||||
tags := make(map[string]string, len(props))
|
||||
timeVector := make([]time.Time, 0, len(buckets))
|
||||
values := make([]*float64, 0, len(buckets))
|
||||
|
||||
for k, v := range props {
|
||||
tags[k] = v
|
||||
}
|
||||
tags["metric"] = statName
|
||||
tags["field"] = metric.Field
|
||||
|
||||
for _, bucket := range buckets {
|
||||
timeValue, err := getAsTime(bucket.Get("key"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var value *float64
|
||||
switch statName {
|
||||
case "std_deviation_bounds_upper":
|
||||
value = castToFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "upper"))
|
||||
case "std_deviation_bounds_lower":
|
||||
value = castToFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "lower"))
|
||||
default:
|
||||
value = castToFloat(bucket.GetPath(metric.ID, statName))
|
||||
}
|
||||
timeVector = append(timeVector, timeValue)
|
||||
values = append(values, value)
|
||||
}
|
||||
labels := tags
|
||||
frames = append(frames, newTimeSeriesFrame(timeVector, labels, values))
|
||||
}
|
||||
|
||||
return frames, nil
|
||||
}
|
||||
|
||||
// processDefaultMetric processes default metric aggregations
|
||||
func (p *metricsResponseProcessor) processDefaultMetric(metric *MetricAgg, buckets []*simplejson.Json, props map[string]string) (data.Frames, error) {
|
||||
tags := make(map[string]string, len(props))
|
||||
timeVector := make([]time.Time, 0, len(buckets))
|
||||
values := make([]*float64, 0, len(buckets))
|
||||
|
||||
for k, v := range props {
|
||||
tags[k] = v
|
||||
}
|
||||
|
||||
tags["metric"] = metric.Type
|
||||
tags["field"] = metric.Field
|
||||
tags["metricId"] = metric.ID
|
||||
for _, bucket := range buckets {
|
||||
timeValue, err := getAsTime(bucket.Get("key"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
valueObj, err := bucket.Get(metric.ID).Map()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
var value *float64
|
||||
if _, ok := valueObj["normalized_value"]; ok {
|
||||
value = castToFloat(bucket.GetPath(metric.ID, "normalized_value"))
|
||||
} else {
|
||||
value = castToFloat(bucket.GetPath(metric.ID, "value"))
|
||||
}
|
||||
timeVector = append(timeVector, timeValue)
|
||||
values = append(values, value)
|
||||
}
|
||||
return data.Frames{newTimeSeriesFrame(timeVector, tags, values)}, nil
|
||||
}
|
||||
|
||||
// ensurePropFields guarantees all property columns exist even if prior frames lacked them
|
||||
func ensurePropFields(fields *[]*data.Field, keys []string) {
|
||||
have := map[string]bool{}
|
||||
for _, f := range *fields {
|
||||
have[f.Name] = true
|
||||
}
|
||||
for _, k := range keys {
|
||||
if !have[k] {
|
||||
d := ""
|
||||
f := extractDataField(k, &d)
|
||||
*fields = append(*fields, f)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// appendPropsRow appends one row of property values; skipKey avoids double-append
|
||||
func appendPropsRow(fields *[]*data.Field, props map[string]string, propKeys []string, skipKey string) {
|
||||
for _, f := range *fields {
|
||||
for _, pk := range propKeys {
|
||||
if pk == skipKey {
|
||||
continue
|
||||
}
|
||||
if f.Name == pk {
|
||||
val := props[pk]
|
||||
f.Append(&val)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// appendMetrics appends all metric values for a single bucket/row
|
||||
func appendMetrics(fields *[]*data.Field, bucket *simplejson.Json, target *Query) {
|
||||
var values []interface{}
|
||||
for _, metric := range target.Metrics {
|
||||
switch metric.Type {
|
||||
case countType:
|
||||
addMetricValueToFields(fields, values, getMetricName(metric.Type), castToFloat(bucket.Get("doc_count")))
|
||||
case extendedStatsType:
|
||||
addExtendedStatsToFields(fields, bucket, metric, values)
|
||||
case percentilesType:
|
||||
addPercentilesToFields(fields, bucket, metric, values)
|
||||
case topMetricsType:
|
||||
addTopMetricsToFields(fields, bucket, metric, values)
|
||||
default:
|
||||
addOtherMetricsToFields(fields, bucket, metric, values, target)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// appendKeyColumnString appends a string key to an existing field or creates it
|
||||
func appendKeyColumnString(fields *[]*data.Field, fieldName, key string) {
|
||||
for _, f := range *fields {
|
||||
if f.Name == fieldName {
|
||||
k := key
|
||||
f.Append(&k)
|
||||
return
|
||||
}
|
||||
}
|
||||
k := key
|
||||
f := extractDataField(fieldName, &k)
|
||||
f.Append(&k)
|
||||
*fields = append(*fields, f)
|
||||
}
|
||||
|
||||
// appendBucketKeyValue appends the bucket's "key" (string or number) to fieldName
|
||||
func appendBucketKeyValue(fields *[]*data.Field, fieldName string, bucket *simplejson.Json) error {
|
||||
for _, f := range *fields {
|
||||
if f.Name == fieldName {
|
||||
if s, err := bucket.Get("key").String(); err == nil {
|
||||
f.Append(&s)
|
||||
return nil
|
||||
}
|
||||
num, err := bucket.Get("key").Float64()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error appending bucket key to existing field %q: %w", fieldName, err)
|
||||
}
|
||||
f.Append(&num)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// field not present yet
|
||||
if s, err := bucket.Get("key").String(); err == nil {
|
||||
f := extractDataField(fieldName, &s)
|
||||
f.Append(&s)
|
||||
*fields = append(*fields, f)
|
||||
return nil
|
||||
}
|
||||
|
||||
num, err := bucket.Get("key").Float64()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error appending bucket key to new field %q: %w", fieldName, err)
|
||||
}
|
||||
|
||||
f := extractDataField(fieldName, &num)
|
||||
f.Append(&num)
|
||||
*fields = append(*fields, f)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *metricsResponseProcessor) processAggregationDocs(
|
||||
esAgg *simplejson.Json,
|
||||
aggDef *BucketAgg,
|
||||
target *Query,
|
||||
queryResult *backend.DataResponse,
|
||||
props map[string]string,
|
||||
) error {
|
||||
propKeys := createPropKeys(props)
|
||||
buckets := esAgg.Get("buckets")
|
||||
|
||||
if arr := buckets.MustArray(); len(arr) > 0 {
|
||||
fields := createFields(queryResult.Frames, propKeys)
|
||||
ensurePropFields(&fields, propKeys)
|
||||
|
||||
for _, v := range arr {
|
||||
bucket := simplejson.NewFromAny(v)
|
||||
|
||||
appendPropsRow(&fields, props, propKeys, "")
|
||||
if aggDef.Field != "" {
|
||||
if err := appendBucketKeyValue(&fields, aggDef.Field, bucket); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
appendMetrics(&fields, bucket, target)
|
||||
}
|
||||
|
||||
queryResult.Frames = data.Frames{&data.Frame{Fields: fields}}
|
||||
return nil
|
||||
}
|
||||
|
||||
if m := buckets.MustMap(); len(m) > 0 {
|
||||
// default key column to "filter" for leaf filters
|
||||
keyFieldName := aggDef.Field
|
||||
if keyFieldName == "" {
|
||||
keyFieldName = "filter"
|
||||
}
|
||||
|
||||
// ensure "filter" exists among props
|
||||
hasFilter := false
|
||||
for _, pk := range propKeys {
|
||||
if pk == "filter" {
|
||||
hasFilter = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !hasFilter {
|
||||
propKeys = append(propKeys, "filter")
|
||||
}
|
||||
|
||||
fields := createFields(queryResult.Frames, propKeys)
|
||||
ensurePropFields(&fields, propKeys)
|
||||
|
||||
keys := make([]string, 0, len(m))
|
||||
for k := range m {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
|
||||
for _, k := range keys {
|
||||
bucket := simplejson.NewFromAny(m[k])
|
||||
|
||||
locProps := make(map[string]string, len(props)+1)
|
||||
for kk, vv := range props {
|
||||
locProps[kk] = vv
|
||||
}
|
||||
locProps["filter"] = k
|
||||
|
||||
// avoid double-append when the key column is "filter"
|
||||
skip := ""
|
||||
if keyFieldName == "filter" {
|
||||
skip = "filter"
|
||||
}
|
||||
|
||||
appendPropsRow(&fields, locProps, propKeys, skip)
|
||||
appendKeyColumnString(&fields, keyFieldName, k)
|
||||
appendMetrics(&fields, bucket, target)
|
||||
}
|
||||
|
||||
queryResult.Frames = data.Frames{&data.Frame{Fields: fields}}
|
||||
return nil
|
||||
}
|
||||
|
||||
// no buckets present
|
||||
queryResult.Frames = data.Frames{}
|
||||
return nil
|
||||
}
|
||||
|
||||
// newTimeSeriesFrame creates a new time series frame
|
||||
func newTimeSeriesFrame(timeData []time.Time, tags map[string]string, values []*float64) *data.Frame {
|
||||
frame := data.NewFrame("",
|
||||
data.NewField(data.TimeSeriesTimeFieldName, nil, timeData),
|
||||
data.NewField(data.TimeSeriesValueFieldName, tags, values))
|
||||
frame.Meta = &data.FrameMeta{
|
||||
Type: data.FrameTypeTimeSeriesMulti,
|
||||
}
|
||||
return frame
|
||||
}
|
||||
|
||||
// trimDatapoints trims datapoints from the beginning and end of the results
|
||||
func trimDatapoints(queryResult backend.DataResponse, target *Query) {
|
||||
var histogram *BucketAgg
|
||||
for _, bucketAgg := range target.BucketAggs {
|
||||
if bucketAgg.Type == dateHistType {
|
||||
histogram = bucketAgg
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if histogram == nil {
|
||||
return
|
||||
}
|
||||
|
||||
trimEdges, err := castToInt(histogram.Settings.Get("trimEdges"))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
frames := queryResult.Frames
|
||||
|
||||
for _, frame := range frames {
|
||||
for _, field := range frame.Fields {
|
||||
if field.Len() > trimEdges*2 {
|
||||
// first we delete the first "trim" items
|
||||
for i := 0; i < trimEdges; i++ {
|
||||
field.Delete(0)
|
||||
}
|
||||
|
||||
// then we delete the last "trim" items
|
||||
for i := 0; i < trimEdges; i++ {
|
||||
field.Delete(field.Len() - 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Helper functions for adding metrics to fields
|
||||
|
||||
func addMetricValueToFields(fields *[]*data.Field, values []interface{}, metricName string, value *float64) {
|
||||
index := -1
|
||||
for i, f := range *fields {
|
||||
if f.Name == metricName {
|
||||
index = i
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
var field data.Field
|
||||
if index == -1 {
|
||||
field = *data.NewField(metricName, nil, []*float64{})
|
||||
*fields = append(*fields, &field)
|
||||
} else {
|
||||
field = *(*fields)[index]
|
||||
}
|
||||
field.Append(value)
|
||||
}
|
||||
|
||||
func addPercentilesToFields(fields *[]*data.Field, bucket *simplejson.Json, metric *MetricAgg, values []interface{}) {
|
||||
percentiles := bucket.GetPath(metric.ID, "values")
|
||||
for _, percentileName := range getSortedKeys(percentiles.MustMap()) {
|
||||
percentileValue := percentiles.Get(percentileName).MustFloat64()
|
||||
addMetricValueToFields(fields, values, fmt.Sprintf("p%v %v", percentileName, metric.Field), &percentileValue)
|
||||
}
|
||||
}
|
||||
|
||||
func addExtendedStatsToFields(fields *[]*data.Field, bucket *simplejson.Json, metric *MetricAgg, values []interface{}) {
|
||||
metaKeys := make([]string, 0)
|
||||
meta := metric.Meta.MustMap()
|
||||
for k := range meta {
|
||||
metaKeys = append(metaKeys, k)
|
||||
}
|
||||
sort.Strings(metaKeys)
|
||||
for _, statName := range metaKeys {
|
||||
v := meta[statName]
|
||||
if enabled, ok := v.(bool); !ok || !enabled {
|
||||
continue
|
||||
}
|
||||
var value *float64
|
||||
switch statName {
|
||||
case "std_deviation_bounds_upper":
|
||||
value = castToFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "upper"))
|
||||
case "std_deviation_bounds_lower":
|
||||
value = castToFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "lower"))
|
||||
default:
|
||||
value = castToFloat(bucket.GetPath(metric.ID, statName))
|
||||
}
|
||||
|
||||
addMetricValueToFields(fields, values, getMetricName(metric.Type), value)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
func addTopMetricsToFields(fields *[]*data.Field, bucket *simplejson.Json, metric *MetricAgg, values []interface{}) {
|
||||
baseName := getMetricName(metric.Type)
|
||||
metrics := metric.Settings.Get("metrics").MustStringArray()
|
||||
for _, metricField := range metrics {
|
||||
// If we selected more than one metric we also add each metric name
|
||||
metricName := baseName
|
||||
if len(metrics) > 1 {
|
||||
metricName += " " + metricField
|
||||
}
|
||||
top := bucket.GetPath(metric.ID, "top").MustArray()
|
||||
metrics, hasMetrics := top[0].(map[string]interface{})["metrics"]
|
||||
if hasMetrics {
|
||||
metrics := metrics.(map[string]interface{})
|
||||
metricValue, hasMetricValue := metrics[metricField]
|
||||
if hasMetricValue && metricValue != nil {
|
||||
v := metricValue.(float64)
|
||||
addMetricValueToFields(fields, values, metricName, &v)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func addOtherMetricsToFields(fields *[]*data.Field, bucket *simplejson.Json, metric *MetricAgg, values []interface{}, target *Query) {
|
||||
metricName := getMetricName(metric.Type)
|
||||
otherMetrics := make([]*MetricAgg, 0)
|
||||
|
||||
for _, m := range target.Metrics {
|
||||
// To other metrics we add metric of the same type that are not the current metric
|
||||
if m.ID != metric.ID && m.Type == metric.Type {
|
||||
otherMetrics = append(otherMetrics, m)
|
||||
}
|
||||
}
|
||||
|
||||
if len(otherMetrics) > 0 {
|
||||
metricName += " " + metric.Field
|
||||
|
||||
// We check if we have metric with the same type and same field name
|
||||
// If so, append metric.ID to the metric name
|
||||
for _, m := range otherMetrics {
|
||||
if m.Field == metric.Field {
|
||||
metricName += " " + metric.ID
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if metric.Type == "bucket_script" {
|
||||
// Use the formula in the column name
|
||||
metricName = metric.Settings.Get("script").MustString("")
|
||||
}
|
||||
}
|
||||
addMetricValueToFields(fields, values, metricName, castToFloat(bucket.GetPath(metric.ID, "value")))
|
||||
}
|
||||
|
||||
func extractDataField(name string, v interface{}) *data.Field {
|
||||
var field *data.Field
|
||||
switch v.(type) {
|
||||
case *string:
|
||||
field = data.NewField(name, nil, []*string{})
|
||||
case *float64:
|
||||
field = data.NewField(name, nil, []*float64{})
|
||||
default:
|
||||
field = &data.Field{}
|
||||
}
|
||||
isFilterable := true
|
||||
field.Config = &data.FieldConfig{Filterable: &isFilterable}
|
||||
return field
|
||||
}
|
||||
@@ -0,0 +1,133 @@
|
||||
package elasticsearch
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
|
||||
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
|
||||
)
|
||||
|
||||
// rawResponseProcessor handles processing of raw data and raw document query responses
|
||||
type rawResponseProcessor struct {
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
// newRawResponseProcessor creates a new raw response processor
|
||||
func newRawResponseProcessor(logger log.Logger) *rawResponseProcessor {
|
||||
return &rawResponseProcessor{
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
// processRawDataResponse processes raw data query responses
|
||||
func (p *rawResponseProcessor) processRawDataResponse(res *es.SearchResponse, target *Query, configuredFields es.ConfiguredFields, queryRes *backend.DataResponse) error {
|
||||
propNames := make(map[string]bool)
|
||||
docs := make([]map[string]interface{}, len(res.Hits.Hits))
|
||||
|
||||
for hitIdx, hit := range res.Hits.Hits {
|
||||
var flattened map[string]interface{}
|
||||
if hit["_source"] != nil {
|
||||
flattened = flatten(hit["_source"].(map[string]interface{}), 10)
|
||||
}
|
||||
|
||||
doc := map[string]interface{}{
|
||||
"_id": hit["_id"],
|
||||
"_type": hit["_type"],
|
||||
"_index": hit["_index"],
|
||||
"sort": hit["sort"],
|
||||
"highlight": hit["highlight"],
|
||||
}
|
||||
|
||||
for k, v := range flattened {
|
||||
doc[k] = v
|
||||
}
|
||||
|
||||
if hit["fields"] != nil {
|
||||
source, ok := hit["fields"].(map[string]interface{})
|
||||
if ok {
|
||||
for k, v := range source {
|
||||
doc[k] = v
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for key := range doc {
|
||||
propNames[key] = true
|
||||
}
|
||||
|
||||
docs[hitIdx] = doc
|
||||
}
|
||||
|
||||
sortedPropNames := sortPropNames(propNames, configuredFields, false)
|
||||
fields := processDocsToDataFrameFields(docs, sortedPropNames, configuredFields)
|
||||
|
||||
frames := data.Frames{}
|
||||
frame := data.NewFrame("", fields...)
|
||||
|
||||
frames = append(frames, frame)
|
||||
queryRes.Frames = frames
|
||||
|
||||
p.logger.Debug("Processed raw data query response", "fieldsLength", len(frame.Fields))
|
||||
return nil
|
||||
}
|
||||
|
||||
// processRawDocumentResponse processes raw document query responses
|
||||
func (p *rawResponseProcessor) processRawDocumentResponse(res *es.SearchResponse, target *Query, queryRes *backend.DataResponse) error {
|
||||
docs := make([]map[string]interface{}, len(res.Hits.Hits))
|
||||
for hitIdx, hit := range res.Hits.Hits {
|
||||
doc := map[string]interface{}{
|
||||
"_id": hit["_id"],
|
||||
"_type": hit["_type"],
|
||||
"_index": hit["_index"],
|
||||
"sort": hit["sort"],
|
||||
"highlight": hit["highlight"],
|
||||
}
|
||||
|
||||
if hit["_source"] != nil {
|
||||
source, ok := hit["_source"].(map[string]interface{})
|
||||
if ok {
|
||||
for k, v := range source {
|
||||
doc[k] = v
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if hit["fields"] != nil {
|
||||
source, ok := hit["fields"].(map[string]interface{})
|
||||
if ok {
|
||||
for k, v := range source {
|
||||
doc[k] = v
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
docs[hitIdx] = doc
|
||||
}
|
||||
|
||||
fieldVector := make([]*json.RawMessage, len(res.Hits.Hits))
|
||||
for i, doc := range docs {
|
||||
bytes, err := json.Marshal(doc)
|
||||
if err != nil {
|
||||
// We skip docs that can't be marshalled
|
||||
// should not happen
|
||||
continue
|
||||
}
|
||||
value := json.RawMessage(bytes)
|
||||
fieldVector[i] = &value
|
||||
}
|
||||
|
||||
isFilterable := true
|
||||
field := data.NewField(target.RefID, nil, fieldVector)
|
||||
field.Config = &data.FieldConfig{Filterable: &isFilterable}
|
||||
|
||||
frames := data.Frames{}
|
||||
frame := data.NewFrame(target.RefID, field)
|
||||
frames = append(frames, frame)
|
||||
|
||||
queryRes.Frames = frames
|
||||
p.logger.Debug("Processed raw document query response", "fieldsLength", len(frame.Fields))
|
||||
return nil
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,127 @@
|
||||
package elasticsearch
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
|
||||
)
|
||||
|
||||
// flatten flattens multi-level objects to single level objects. It uses dot notation to join keys.
|
||||
func flatten(target map[string]interface{}, maxDepth int) map[string]interface{} {
|
||||
// On frontend maxDepth wasn't used but as we are processing on backend
|
||||
// let's put a limit to avoid infinite loop. 10 was chosen arbitrary.
|
||||
output := make(map[string]interface{})
|
||||
step(0, maxDepth, target, "", output)
|
||||
return output
|
||||
}
|
||||
|
||||
// step is a recursive helper for flatten
|
||||
func step(currentDepth, maxDepth int, target map[string]interface{}, prev string, output map[string]interface{}) {
|
||||
nextDepth := currentDepth + 1
|
||||
for key, value := range target {
|
||||
newKey := strings.Trim(prev+"."+key, ".")
|
||||
|
||||
v, ok := value.(map[string]interface{})
|
||||
if ok && len(v) > 0 && currentDepth < maxDepth {
|
||||
step(nextDepth, maxDepth, v, newKey, output)
|
||||
} else {
|
||||
output[newKey] = value
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// sortPropNames orders propNames so that timeField is first (if it exists), log message field is second
|
||||
// if shouldSortLogMessageField is true, and rest of propNames are ordered alphabetically
|
||||
func sortPropNames(propNames map[string]bool, configuredFields es.ConfiguredFields, shouldSortLogMessageField bool) []string {
|
||||
hasTimeField := false
|
||||
hasLogMessageField := false
|
||||
|
||||
var sortedPropNames []string
|
||||
for k := range propNames {
|
||||
if configuredFields.TimeField != "" && k == configuredFields.TimeField {
|
||||
hasTimeField = true
|
||||
} else if shouldSortLogMessageField && configuredFields.LogMessageField != "" && k == configuredFields.LogMessageField {
|
||||
hasLogMessageField = true
|
||||
} else {
|
||||
sortedPropNames = append(sortedPropNames, k)
|
||||
}
|
||||
}
|
||||
|
||||
sort.Strings(sortedPropNames)
|
||||
|
||||
if hasLogMessageField {
|
||||
sortedPropNames = append([]string{configuredFields.LogMessageField}, sortedPropNames...)
|
||||
}
|
||||
|
||||
if hasTimeField {
|
||||
sortedPropNames = append([]string{configuredFields.TimeField}, sortedPropNames...)
|
||||
}
|
||||
|
||||
return sortedPropNames
|
||||
}
|
||||
|
||||
// castToInt casts a simplejson.Json value to int
|
||||
func castToInt(j *simplejson.Json) (int, error) {
|
||||
i, err := j.Int()
|
||||
if err == nil {
|
||||
return i, nil
|
||||
}
|
||||
|
||||
s, err := j.String()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
v, err := strconv.Atoi(s)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return v, nil
|
||||
}
|
||||
|
||||
// castToFloat casts a simplejson.Json value to float64
|
||||
func castToFloat(j *simplejson.Json) *float64 {
|
||||
f, err := j.Float64()
|
||||
if err == nil {
|
||||
return &f
|
||||
}
|
||||
|
||||
if s, err := j.String(); err == nil {
|
||||
if strings.ToLower(s) == "nan" {
|
||||
return nil
|
||||
}
|
||||
|
||||
if v, err := strconv.ParseFloat(s, 64); err == nil {
|
||||
return &v
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// getAsTime converts a simplejson.Json value to time.Time
|
||||
func getAsTime(j *simplejson.Json) (time.Time, error) {
|
||||
// these are stored as numbers
|
||||
number, err := j.Float64()
|
||||
if err != nil {
|
||||
return time.Time{}, err
|
||||
}
|
||||
|
||||
return time.UnixMilli(int64(number)).UTC(), nil
|
||||
}
|
||||
|
||||
// findAgg finds a bucket aggregation by ID
|
||||
func findAgg(target *Query, aggID string) (*BucketAgg, error) {
|
||||
for _, v := range target.BucketAggs {
|
||||
if aggID == v.ID {
|
||||
return v, nil
|
||||
}
|
||||
}
|
||||
return nil, errors.New("can't found aggDef, aggID:" + aggID)
|
||||
}
|
||||
Reference in New Issue
Block a user