Prometheus: Add support to make parallel queries (#90316)

* Add support for prometheus datasource to make parallel queries

* Incorporate review comments

* Update pkg/promlib/querydata/request.go

Co-authored-by: ismail simsek <ismailsimsek09@gmail.com>

* Fix lint

* Add parallel queries behind feature flag

* Fixing lint issue

* Update go.mod

* Update pkg/promlib/querydata/request.go

* Update pkg/promlib/querydata/request.go

---------

Co-authored-by: ismail simsek <ismailsimsek09@gmail.com>
Co-authored-by: Charandas <542168+charandas@users.noreply.github.com>
This commit is contained in:
Vijay Samuel
2024-08-12 18:01:39 +05:30
committed by GitHub
parent 369cd6f81e
commit c9ddc688a2
9 changed files with 168 additions and 105 deletions
+109 -37
View File
@@ -5,21 +5,21 @@ import (
"fmt"
"net/http"
"regexp"
"sync"
"time"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana-plugin-sdk-go/data/utils/maputil"
"go.opentelemetry.io/otel/trace"
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
"github.com/grafana/grafana-plugin-sdk-go/backend/tracing"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana-plugin-sdk-go/data/utils/maputil"
"github.com/grafana/grafana/pkg/promlib/client"
"github.com/grafana/grafana/pkg/promlib/intervalv2"
"github.com/grafana/grafana/pkg/promlib/models"
"github.com/grafana/grafana/pkg/promlib/querydata/exemplar"
"github.com/grafana/grafana/pkg/promlib/utils"
"go.opentelemetry.io/otel/trace"
)
const legendFormatAuto = "__auto"
@@ -88,22 +88,49 @@ func (s *QueryData) Execute(ctx context.Context, req *backend.QueryDataRequest)
Responses: backend.Responses{},
}
cfg := backend.GrafanaConfigFromContext(ctx)
hasPromQLScopeFeatureFlag := cfg.FeatureToggles().IsEnabled("promQLScope")
hasPrometheusDataplaneFeatureFlag := cfg.FeatureToggles().IsEnabled("prometheusDataplane")
var (
cfg = backend.GrafanaConfigFromContext(ctx)
hasPromQLScopeFeatureFlag = cfg.FeatureToggles().IsEnabled("promQLScope")
hasPrometheusDataplaneFeatureFlag = cfg.FeatureToggles().IsEnabled("prometheusDataplane")
hasPrometheusRunQueriesInParallel = cfg.FeatureToggles().IsEnabled("prometheusRunQueriesInParallel")
)
for _, q := range req.Queries {
r := s.handleQuery(ctx, q, fromAlert, hasPromQLScopeFeatureFlag, hasPrometheusDataplaneFeatureFlag)
if r == nil {
continue
if hasPrometheusRunQueriesInParallel {
var (
m sync.Mutex
)
concurrentQueryCount, err := req.PluginContext.GrafanaConfig.ConcurrentQueryCount()
if err != nil {
logger := s.log.FromContext(ctx)
logger.Debug(fmt.Sprintf("Concurrent Query Count read/parse error: %v", err), "prometheusRunQueriesInParallel")
concurrentQueryCount = 10
}
_ = concurrency.ForEachJob(ctx, len(req.Queries), concurrentQueryCount, func(ctx context.Context, idx int) error {
query := req.Queries[idx]
r := s.handleQuery(ctx, query, fromAlert, hasPromQLScopeFeatureFlag, hasPrometheusDataplaneFeatureFlag, true)
if r != nil {
m.Lock()
result.Responses[query.RefID] = *r
m.Unlock()
}
return nil
})
} else {
for _, q := range req.Queries {
r := s.handleQuery(ctx, q, fromAlert, hasPromQLScopeFeatureFlag, hasPrometheusDataplaneFeatureFlag, false)
if r != nil {
result.Responses[q.RefID] = *r
}
}
result.Responses[q.RefID] = *r
}
return &result, nil
}
func (s *QueryData) handleQuery(ctx context.Context, bq backend.DataQuery, fromAlert, hasPromQLScopeFeatureFlag, hasPrometheusDataplaneFeatureFlag bool) *backend.DataResponse {
func (s *QueryData) handleQuery(ctx context.Context, bq backend.DataQuery, fromAlert,
hasPromQLScopeFeatureFlag, hasPrometheusDataplaneFeatureFlag, hasPrometheusRunQueriesInParallel bool) *backend.DataResponse {
traceCtx, span := s.tracer.Start(ctx, "datasource.prometheus")
defer span.End()
query, err := models.Parse(span, bq, s.TimeInterval, s.intervalCalculator, fromAlert, hasPromQLScopeFeatureFlag)
@@ -113,14 +140,15 @@ func (s *QueryData) handleQuery(ctx context.Context, bq backend.DataQuery, fromA
}
}
r := s.fetch(traceCtx, s.client, query, hasPrometheusDataplaneFeatureFlag)
r := s.fetch(traceCtx, s.client, query, hasPrometheusDataplaneFeatureFlag, hasPrometheusRunQueriesInParallel)
if r == nil {
s.log.FromContext(ctx).Debug("Received nil response from runQuery", "query", query.Expr)
}
return r
}
func (s *QueryData) fetch(traceCtx context.Context, client *client.Client, q *models.Query, enablePrometheusDataplane bool) *backend.DataResponse {
func (s *QueryData) fetch(traceCtx context.Context, client *client.Client, q *models.Query,
enablePrometheusDataplane, hasPrometheusRunQueriesInParallel bool) *backend.DataResponse {
logger := s.log.FromContext(traceCtx)
logger.Debug("Sending query", "start", q.Start, "end", q.End, "step", q.Step, "query", q.Expr)
@@ -129,37 +157,69 @@ func (s *QueryData) fetch(traceCtx context.Context, client *client.Client, q *mo
Error: nil,
}
var (
wg sync.WaitGroup
m sync.Mutex
)
if q.InstantQuery {
res := s.instantQuery(traceCtx, client, q, enablePrometheusDataplane)
dr.Error = res.Error
dr.Frames = res.Frames
dr.Status = res.Status
if hasPrometheusRunQueriesInParallel {
wg.Add(1)
go func() {
defer wg.Done()
res := s.instantQuery(traceCtx, client, q, enablePrometheusDataplane)
m.Lock()
addDataResponse(&res, dr)
m.Unlock()
}()
} else {
res := s.instantQuery(traceCtx, client, q, enablePrometheusDataplane)
addDataResponse(&res, dr)
}
}
if q.RangeQuery {
res := s.rangeQuery(traceCtx, client, q, enablePrometheusDataplane)
if res.Error != nil {
if dr.Error == nil {
dr.Error = res.Error
} else {
dr.Error = fmt.Errorf("%v %w", dr.Error, res.Error)
}
// When both instant and range are true, we may overwrite the status code.
// To fix this (and other things) they should come in separate http requests.
dr.Status = res.Status
if hasPrometheusRunQueriesInParallel {
wg.Add(1)
go func() {
defer wg.Done()
res := s.rangeQuery(traceCtx, client, q, enablePrometheusDataplane)
m.Lock()
addDataResponse(&res, dr)
m.Unlock()
}()
} else {
res := s.rangeQuery(traceCtx, client, q, enablePrometheusDataplane)
addDataResponse(&res, dr)
}
dr.Frames = append(dr.Frames, res.Frames...)
}
if q.ExemplarQuery {
res := s.exemplarQuery(traceCtx, client, q, enablePrometheusDataplane)
if res.Error != nil {
// If exemplar query returns error, we want to only log it and
// continue with other results processing
logger.Error("Exemplar query failed", "query", q.Expr, "err", res.Error)
if hasPrometheusRunQueriesInParallel {
wg.Add(1)
go func() {
defer wg.Done()
res := s.exemplarQuery(traceCtx, client, q, enablePrometheusDataplane)
m.Lock()
if res.Error != nil {
// If exemplar query returns error, we want to only log it and
// continue with other results processing
logger.Error("Exemplar query failed", "query", q.Expr, "err", res.Error)
}
dr.Frames = append(dr.Frames, res.Frames...)
m.Unlock()
}()
} else {
res := s.exemplarQuery(traceCtx, client, q, enablePrometheusDataplane)
if res.Error != nil {
// If exemplar query returns error, we want to only log it and
// continue with other results processing
logger.Error("Exemplar query failed", "query", q.Expr, "err", res.Error)
}
dr.Frames = append(dr.Frames, res.Frames...)
}
dr.Frames = append(dr.Frames, res.Frames...)
}
wg.Wait()
return dr
}
@@ -225,3 +285,15 @@ func (s *QueryData) exemplarQuery(ctx context.Context, c *client.Client, q *mode
}()
return s.parseResponse(ctx, q, res, enablePrometheusDataplaneFlag)
}
func addDataResponse(res *backend.DataResponse, dr *backend.DataResponse) {
if res.Error != nil {
if dr.Error == nil {
dr.Error = res.Error
} else {
dr.Error = fmt.Errorf("%v %w", dr.Error, res.Error)
}
dr.Status = res.Status
}
dr.Frames = append(dr.Frames, res.Frames...)
}