Files
grafana/pkg/tsdb/influxdb/influxql/buffered/response_parser.go
T
ismail simsek c088d003f2 InfluxDB: Implement InfluxQL json streaming parser (#76934)
* Have the first iteration

* Prepare bench testing

* rename the test files

* Remove unnecessary test file

* Introduce influxqlStreamingParser feature flag

* Apply streaming parser feature flag

* Add new tests

* More tests

* return executedQueryString only in first frame

* add frame meta and config

* Update golden json files

* Support tags/labels

* more tests

* more tests

* Don't change original response_parser.go

* provide context

* create util package

* don't pass the row

* update converter with formatted frameName

* add executedQueryString info only to first frame

* update golden files

* rename

* update test file

* use pointer values

* update testdata

* update parsing

* update converter for null values

* prepare converter for table response

* clean up

* return timeField in fields

* handle no time column responses

* better nil field handling

* refactor the code

* add table tests

* fix config for table

* table response format

* fix value

* if there is no time column set name

* linting

* refactoring

* handle the status code

* add tracing

* Update pkg/tsdb/influxdb/influxql/converter/converter_test.go

Co-authored-by: İnanç Gümüş <m@inanc.io>

* fix import

* update test data

* sanity

* sanity

* linting

* simplicity

* return empty rsp

* rename to prevent confusion

* nullableJson field type for null values

* better handling null values

* remove duplicate test file

* fix healthcheck

* use util for pointer

* move bench test to root

* provide fake feature manager

* add more tests

* partial fix for null values in table response format

* handle partial null fields

* comments for easy testing

* move frameName allocation in readSeries

* one less append operation

* performance improvement by making string conversion once

pkg: github.com/grafana/grafana/pkg/tsdb/influxdb/influxql
             │ stream2.txt │            stream3.txt             │
             │   sec/op    │   sec/op     vs base               │
ParseJson-10   314.4m ± 1%   303.9m ± 1%  -3.34% (p=0.000 n=10)

             │ stream2.txt  │             stream3.txt              │
             │     B/op     │     B/op      vs base                │
ParseJson-10   425.2Mi ± 0%   382.7Mi ± 0%  -10.00% (p=0.000 n=10)

             │ stream2.txt │            stream3.txt             │
             │  allocs/op  │  allocs/op   vs base               │
ParseJson-10   7.224M ± 0%   6.689M ± 0%  -7.41% (p=0.000 n=10)

* add comment lines

---------

Co-authored-by: İnanç Gümüş <m@inanc.io>
2023-12-06 12:39:05 +01:00

346 lines
9.6 KiB
Go

package buffered
import (
"encoding/json"
"fmt"
"io"
"strings"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/tsdb/influxdb/influxql/util"
"github.com/grafana/grafana/pkg/tsdb/influxdb/models"
)
func ResponseParse(buf io.ReadCloser, statusCode int, query *models.Query) *backend.DataResponse {
return parse(buf, statusCode, query)
}
// parse is the same as Parse, but without the io.ReadCloser (we don't need to
// close the buffer)
func parse(buf io.Reader, statusCode int, query *models.Query) *backend.DataResponse {
response, jsonErr := parseJSON(buf)
if statusCode/100 != 2 {
return &backend.DataResponse{Error: fmt.Errorf("InfluxDB returned error: %s", response.Error)}
}
if jsonErr != nil {
return &backend.DataResponse{Error: jsonErr}
}
if response.Error != "" {
return &backend.DataResponse{Error: fmt.Errorf(response.Error)}
}
result := response.Results[0]
if result.Error != "" {
return &backend.DataResponse{Error: fmt.Errorf(result.Error)}
}
if query.ResultFormat == "table" {
return &backend.DataResponse{Frames: transformRowsForTable(result.Series, *query)}
}
return &backend.DataResponse{Frames: transformRowsForTimeSeries(result.Series, *query)}
}
func parseJSON(buf io.Reader) (models.Response, error) {
var response models.Response
dec := json.NewDecoder(buf)
dec.UseNumber()
err := dec.Decode(&response)
return response, err
}
func transformRowsForTable(rows []models.Row, query models.Query) data.Frames {
if len(rows) == 0 {
return make([]*data.Frame, 0)
}
frames := make([]*data.Frame, 0, 1)
newFrame := data.NewFrame(rows[0].Name)
newFrame.Meta = &data.FrameMeta{
ExecutedQueryString: query.RawQuery,
PreferredVisualization: util.GetVisType(query.ResultFormat),
}
conLen := len(rows[0].Columns)
if rows[0].Columns[0] == "time" {
newFrame.Fields = append(newFrame.Fields, newTimeField(rows))
} else {
newFrame.Fields = append(newFrame.Fields, newValueFields(rows, nil, 0, 1)...)
}
newFrame.Fields = append(newFrame.Fields, newTagField(rows, nil)...)
newFrame.Fields = append(newFrame.Fields, newValueFields(rows, nil, 1, conLen)...)
frames = append(frames, newFrame)
return frames
}
func newTimeField(rows []models.Row) *data.Field {
var timeArray []time.Time
for _, row := range rows {
for _, valuePair := range row.Values {
timestamp, timestampErr := util.ParseTimestamp(valuePair[0])
// we only add this row if the timestamp is valid
if timestampErr != nil {
continue
}
timeArray = append(timeArray, timestamp)
}
}
timeField := data.NewField("Time", nil, timeArray)
return timeField
}
func newTagField(rows []models.Row, labels data.Labels) []*data.Field {
fields := make([]*data.Field, 0, len(rows[0].Tags))
for key := range rows[0].Tags {
tagField := data.NewField(key, labels, []*string{})
for _, row := range rows {
for range row.Values {
value := row.Tags[key]
tagField.Append(&value)
}
}
tagField.SetConfig(&data.FieldConfig{DisplayNameFromDS: key})
fields = append(fields, tagField)
}
return fields
}
func newValueFields(rows []models.Row, labels data.Labels, colIdxStart, colIdxEnd int) []*data.Field {
fields := make([]*data.Field, 0)
for colIdx := colIdxStart; colIdx < colIdxEnd; colIdx++ {
var valueField *data.Field
var floatArray []*float64
var stringArray []*string
var boolArray []*bool
for _, row := range rows {
valType := util.Typeof(row.Values, colIdx)
for _, valuePair := range row.Values {
switch valType {
case "string":
value, ok := valuePair[colIdx].(string)
if ok {
// we handle null values by adding nil to floatArray
// if then we see the valueField should be a sting field
// we append those nil elements into the stringArray
// then we clear the floatArray
// these steps are necessary for the responses like in string_column_with_null_value.json
for range floatArray {
stringArray = append(stringArray, nil)
}
floatArray = nil
stringArray = append(stringArray, &value)
} else {
stringArray = append(stringArray, nil)
}
case "json.Number":
value := util.ParseNumber(valuePair[colIdx])
floatArray = append(floatArray, value)
case "bool":
value, ok := valuePair[colIdx].(bool)
if ok {
// we handle null values by adding nil to floatArray
// if then we see the valueField should be a bool field
// we append those nil elements into the boolArray
// then we clear the floatArray
for range floatArray {
boolArray = append(boolArray, nil)
}
floatArray = nil
boolArray = append(boolArray, &value)
} else {
boolArray = append(boolArray, nil)
}
case "null":
// If there is already a valueField, instead of adding nil to floatArray
// we add nil to the valueField and to the array of valueField constructed from
if valueField != nil {
valueFieldType := valueField.Type()
switch valueFieldType {
case data.FieldTypeNullableString:
stringArray = append(stringArray, nil)
case data.FieldTypeNullableBool:
boolArray = append(boolArray, nil)
default:
floatArray = append(floatArray, nil)
}
valueField.Append(nil)
} else {
// If there is no valueField created before we add the nil value to floatArray
// when we have the real value of the field these will be appended to the field
floatArray = append(floatArray, nil)
}
}
}
switch valType {
case "string":
valueField = data.NewField(row.Columns[colIdx], labels, stringArray)
case "json.Number":
valueField = data.NewField(row.Columns[colIdx], labels, floatArray)
case "bool":
valueField = data.NewField(row.Columns[colIdx], labels, boolArray)
case "null":
if valueField == nil {
valueField = data.NewField(row.Columns[colIdx], labels, floatArray)
}
}
valueField.SetConfig(&data.FieldConfig{DisplayNameFromDS: row.Columns[colIdx]})
}
fields = append(fields, valueField)
}
return fields
}
func transformRowsForTimeSeries(rows []models.Row, query models.Query) data.Frames {
// pre-allocate frames - this can save many allocations
cols := 0
for _, row := range rows {
cols += len(row.Columns)
}
if len(rows) == 0 {
return make([]*data.Frame, 0)
}
// Preallocate for the worst-case scenario
frames := make([]*data.Frame, 0, len(rows)*len(rows[0].Columns))
// frameName is pre-allocated. So we can reuse it, saving memory.
// It's sized for a reasonably-large name, but will grow if needed.
frameName := make([]byte, 0, 128)
for _, row := range rows {
var hasTimeCol = false
for _, column := range row.Columns {
if strings.ToLower(column) == "time" {
hasTimeCol = true
}
}
if !hasTimeCol {
newFrame := newFrameWithoutTimeField(row, query)
frames = append(frames, newFrame)
} else {
for colIndex, column := range row.Columns {
if column == "time" {
continue
}
newFrame := newFrameWithTimeField(row, column, colIndex, query, frameName)
if len(frames) == 0 {
newFrame.Meta = &data.FrameMeta{
ExecutedQueryString: query.RawQuery,
PreferredVisualization: util.GetVisType(query.ResultFormat),
}
}
frames = append(frames, newFrame)
}
}
}
return frames
}
func newFrameWithTimeField(row models.Row, column string, colIndex int, query models.Query, frameName []byte) *data.Frame {
var timeArray []time.Time
var floatArray []*float64
var stringArray []*string
var boolArray []*bool
valType := util.Typeof(row.Values, colIndex)
for _, valuePair := range row.Values {
timestamp, timestampErr := util.ParseTimestamp(valuePair[0])
// we only add this row if the timestamp is valid
if timestampErr != nil {
continue
}
timeArray = append(timeArray, timestamp)
switch valType {
case "string":
value, ok := valuePair[colIndex].(string)
if ok {
stringArray = append(stringArray, &value)
} else {
stringArray = append(stringArray, nil)
}
case "json.Number":
value := util.ParseNumber(valuePair[colIndex])
floatArray = append(floatArray, value)
case "bool":
value, ok := valuePair[colIndex].(bool)
if ok {
boolArray = append(boolArray, &value)
} else {
boolArray = append(boolArray, nil)
}
case "null":
floatArray = append(floatArray, nil)
}
}
timeField := data.NewField("Time", nil, timeArray)
var valueField *data.Field
switch valType {
case "string":
valueField = data.NewField("Value", row.Tags, stringArray)
case "json.Number":
valueField = data.NewField("Value", row.Tags, floatArray)
case "bool":
valueField = data.NewField("Value", row.Tags, boolArray)
case "null":
valueField = data.NewField("Value", row.Tags, floatArray)
}
name := string(util.FormatFrameName(row.Name, column, row.Tags, query, frameName[:]))
valueField.SetConfig(&data.FieldConfig{DisplayNameFromDS: name})
return data.NewFrame(name, timeField, valueField)
}
func newFrameWithoutTimeField(row models.Row, query models.Query) *data.Frame {
var values []*string
for _, valuePair := range row.Values {
if strings.Contains(strings.ToLower(query.RawQuery), strings.ToLower("SHOW TAG VALUES")) {
if len(valuePair) >= 2 {
values = append(values, util.ToPtr(valuePair[1].(string)))
}
} else {
if len(valuePair) >= 1 {
values = append(values, util.ToPtr(valuePair[0].(string)))
}
}
}
field := data.NewField("Value", nil, values)
frame := data.NewFrame(row.Name, field)
frame.Meta = &data.FrameMeta{
ExecutedQueryString: query.RawQuery,
PreferredVisualization: util.GetVisType(query.ResultFormat),
}
return frame
}