Cloudwatch: Fix duplicated time series (#35433)

* make sure queries are only ran once

* test aliases

* use correct dates
This commit is contained in:
Erik Sundell
2021-06-10 10:23:17 +02:00
committed by GitHub
parent e3afb63e62
commit eff2410bae
4 changed files with 193 additions and 97 deletions
+80 -95
View File
@@ -5,120 +5,105 @@ import (
"fmt"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/infra/log"
"golang.org/x/sync/errgroup"
)
type responseWrapper struct {
DataResponse *backend.DataResponse
RefId string
}
func (e *cloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
plog.Debug("Executing time series query")
resp := backend.NewQueryDataResponse()
for _, q := range req.Queries {
startTime := q.TimeRange.From
endTime := q.TimeRange.To
if !startTime.Before(endTime) {
return nil, fmt.Errorf("invalid time range: start time must be before end time")
}
if len(req.Queries) == 0 {
return nil, fmt.Errorf("request contains no queries")
}
requestQueriesByRegion, err := e.parseQueries(req, startTime, endTime)
if err != nil {
return nil, err
}
// startTime and endTime are always the same for all queries
startTime := req.Queries[0].TimeRange.From
endTime := req.Queries[0].TimeRange.To
if !startTime.Before(endTime) {
return nil, fmt.Errorf("invalid time range: start time must be before end time")
}
if len(requestQueriesByRegion) == 0 {
return backend.NewQueryDataResponse(), nil
}
requestQueriesByRegion, err := e.parseQueries(req.Queries, startTime, endTime)
if err != nil {
return nil, err
}
resultChan := make(chan *backend.DataResponse, len(req.Queries))
eg, ectx := errgroup.WithContext(ctx)
for r, q := range requestQueriesByRegion {
requestQueries := q
region := r
eg.Go(func() error {
defer func() {
if err := recover(); err != nil {
plog.Error("Execute Get Metric Data Query Panic", "error", err, "stack", log.Stack(1))
if theErr, ok := err.(error); ok {
resultChan <- &backend.DataResponse{
if len(requestQueriesByRegion) == 0 {
return backend.NewQueryDataResponse(), nil
}
resultChan := make(chan *responseWrapper, len(req.Queries))
eg, ectx := errgroup.WithContext(ctx)
for r, q := range requestQueriesByRegion {
requestQueries := q
region := r
eg.Go(func() error {
defer func() {
if err := recover(); err != nil {
plog.Error("Execute Get Metric Data Query Panic", "error", err, "stack", log.Stack(1))
if theErr, ok := err.(error); ok {
resultChan <- &responseWrapper{
DataResponse: &backend.DataResponse{
Error: theErr,
}
},
}
}
}()
client, err := e.getCWClient(region, req.PluginContext)
if err != nil {
return err
}
}()
queries, err := e.transformRequestQueriesToCloudWatchQueries(requestQueries)
if err != nil {
for _, query := range requestQueries {
resultChan <- &backend.DataResponse{
Frames: data.Frames{data.NewFrame(query.RefId)},
Error: err,
}
}
return nil
client, err := e.getCWClient(region, req.PluginContext)
if err != nil {
return err
}
queries, err := e.transformRequestQueriesToCloudWatchQueries(requestQueries)
if err != nil {
return err
}
metricDataInput, err := e.buildMetricDataInput(startTime, endTime, queries)
if err != nil {
return err
}
mdo, err := e.executeRequest(ectx, client, metricDataInput)
if err != nil {
return err
}
responses, err := e.parseResponse(mdo, queries)
if err != nil {
return err
}
res, err := e.transformQueryResponsesToQueryResult(responses, requestQueries, startTime, endTime)
if err != nil {
return err
}
for refID, queryRes := range res {
resultChan <- &responseWrapper{
DataResponse: queryRes,
RefId: refID,
}
}
return nil
})
}
metricDataInput, err := e.buildMetricDataInput(startTime, endTime, queries)
if err != nil {
return err
}
if err := eg.Wait(); err != nil {
return nil, err
}
close(resultChan)
cloudwatchResponses := make([]*cloudwatchResponse, 0)
mdo, err := e.executeRequest(ectx, client, metricDataInput)
if err != nil {
for _, query := range requestQueries {
resultChan <- &backend.DataResponse{
Frames: data.Frames{data.NewFrame(query.RefId)},
Error: err,
}
}
return nil
}
responses, err := e.parseResponse(mdo, queries)
if err != nil {
for _, query := range requestQueries {
resultChan <- &backend.DataResponse{
Frames: data.Frames{data.NewFrame(query.RefId)},
Error: err,
}
}
return nil
}
cloudwatchResponses = append(cloudwatchResponses, responses...)
res, err := e.transformQueryResponsesToQueryResult(cloudwatchResponses, requestQueries, startTime, endTime)
if err != nil {
for _, query := range requestQueries {
resultChan <- &backend.DataResponse{
Frames: data.Frames{data.NewFrame(query.RefId)},
Error: err,
}
}
return nil
}
for _, queryRes := range res {
resultChan <- queryRes
}
return nil
})
}
if err := eg.Wait(); err != nil {
return nil, err
}
close(resultChan)
for result := range resultChan {
resp.Responses[q.RefID] = *result
}
for result := range resultChan {
resp.Responses[result.RefId] = *result.DataResponse
}
return resp, nil