Datasources: Support mixed datasources in a single query (#56832)
* initial cut at refactor - need to run more tests * fix unit tests * change newly unused function to test helper * create unit tests for parsing query requests that cover a range of cases * add some comments * rename function to avoid dev confusion
This commit is contained in:
+76
-61
@@ -16,7 +16,6 @@ import (
|
||||
"github.com/grafana/grafana/pkg/plugins/adapters"
|
||||
"github.com/grafana/grafana/pkg/services/datasources"
|
||||
"github.com/grafana/grafana/pkg/services/oauthtoken"
|
||||
publicDashboards "github.com/grafana/grafana/pkg/services/publicdashboards/queries"
|
||||
"github.com/grafana/grafana/pkg/services/user"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/tsdb/grafanads"
|
||||
@@ -69,58 +68,57 @@ func (s *Service) Run(ctx context.Context) error {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
// QueryData can process queries and return query responses.
|
||||
func (s *Service) QueryData(ctx context.Context, user *user.SignedInUser, skipCache bool, reqDTO dtos.MetricRequest, handleExpressions bool) (*backend.QueryDataResponse, error) {
|
||||
// QueryData processes queries and returns query responses. It handles queries to single or mixed datasources, as well as expressions.
|
||||
func (s *Service) QueryData(ctx context.Context, user *user.SignedInUser, skipCache bool, reqDTO dtos.MetricRequest) (*backend.QueryDataResponse, error) {
|
||||
// Parse the request into parsed queries grouped by datasource uid
|
||||
parsedReq, err := s.parseMetricRequest(ctx, user, skipCache, reqDTO)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if handleExpressions && parsedReq.hasExpression {
|
||||
// If there are expressions, handle them and return
|
||||
if parsedReq.hasExpression {
|
||||
return s.handleExpressions(ctx, user, parsedReq)
|
||||
}
|
||||
return s.handleQueryData(ctx, user, parsedReq)
|
||||
}
|
||||
|
||||
// QueryData can process queries and return query responses.
|
||||
func (s *Service) QueryDataMultipleSources(ctx context.Context, user *user.SignedInUser, skipCache bool, reqDTO dtos.MetricRequest, handleExpressions bool) (*backend.QueryDataResponse, error) {
|
||||
byDataSource := publicDashboards.GroupQueriesByDataSource(reqDTO.Queries)
|
||||
|
||||
// The expression service will handle mixed datasources, so we don't need to group them when an expression is present.
|
||||
if publicDashboards.HasExpressionQuery(reqDTO.Queries) || len(byDataSource) == 1 {
|
||||
return s.QueryData(ctx, user, skipCache, reqDTO, handleExpressions)
|
||||
} else {
|
||||
resp := backend.NewQueryDataResponse()
|
||||
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
results := make([]backend.Responses, len(byDataSource))
|
||||
|
||||
for _, queries := range byDataSource {
|
||||
dataSourceQueries := queries
|
||||
g.Go(func() error {
|
||||
subDTO := reqDTO.CloneWithQueries(dataSourceQueries)
|
||||
|
||||
subResp, err := s.QueryData(ctx, user, skipCache, subDTO, handleExpressions)
|
||||
|
||||
if err == nil {
|
||||
results = append(results, subResp.Responses)
|
||||
}
|
||||
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
if err := g.Wait(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, result := range results {
|
||||
for refId, dataResponse := range result {
|
||||
resp.Responses[refId] = dataResponse
|
||||
}
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
// If there is only one datasource, query it and return
|
||||
if len(parsedReq.parsedQueries) == 1 {
|
||||
return s.handleQuerySingleDatasource(ctx, user, parsedReq)
|
||||
}
|
||||
// If there are multiple datasources, handle their queries concurrently and return the aggregate result
|
||||
byDataSource := parsedReq.parsedQueries
|
||||
resp := backend.NewQueryDataResponse()
|
||||
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
results := make([]backend.Responses, len(byDataSource))
|
||||
|
||||
for _, queries := range byDataSource {
|
||||
rawQueries := make([]*simplejson.Json, len(queries))
|
||||
for i := 0; i < len(queries); i++ {
|
||||
rawQueries[i] = queries[i].rawQuery
|
||||
}
|
||||
g.Go(func() error {
|
||||
subDTO := reqDTO.CloneWithQueries(rawQueries)
|
||||
|
||||
subResp, err := s.QueryData(ctx, user, skipCache, subDTO)
|
||||
|
||||
if err == nil {
|
||||
results = append(results, subResp.Responses)
|
||||
}
|
||||
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
if err := g.Wait(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, result := range results {
|
||||
for refId, dataResponse := range result {
|
||||
resp.Responses[refId] = dataResponse
|
||||
}
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// handleExpressions handles POST /api/ds/query when there is an expression.
|
||||
@@ -130,7 +128,7 @@ func (s *Service) handleExpressions(ctx context.Context, user *user.SignedInUser
|
||||
Queries: []expr.Query{},
|
||||
}
|
||||
|
||||
for _, pq := range parsedReq.parsedQueries {
|
||||
for _, pq := range parsedReq.getFlattenedQueries() {
|
||||
if pq.datasource == nil {
|
||||
return nil, ErrMissingDataSourceInfo.Build(errutil.TemplateData{
|
||||
Public: map[string]interface{}{
|
||||
@@ -160,12 +158,21 @@ func (s *Service) handleExpressions(ctx context.Context, user *user.SignedInUser
|
||||
return qdr, nil
|
||||
}
|
||||
|
||||
func (s *Service) handleQueryData(ctx context.Context, user *user.SignedInUser, parsedReq *parsedRequest) (*backend.QueryDataResponse, error) {
|
||||
ds := parsedReq.parsedQueries[0].datasource
|
||||
// handleQuerySingleDatasource handles one or more queries to a single datasource
|
||||
func (s *Service) handleQuerySingleDatasource(ctx context.Context, user *user.SignedInUser, parsedReq *parsedRequest) (*backend.QueryDataResponse, error) {
|
||||
queries := parsedReq.getFlattenedQueries()
|
||||
ds := queries[0].datasource
|
||||
if err := s.pluginRequestValidator.Validate(ds.Url, nil); err != nil {
|
||||
return nil, datasources.ErrDataSourceAccessDenied
|
||||
}
|
||||
|
||||
// ensure that each query passed to this function has the same datasource
|
||||
for _, pq := range queries {
|
||||
if ds.Uid != pq.datasource.Uid {
|
||||
return nil, fmt.Errorf("all queries must have the same datasource - found %s and %s", ds.Uid, pq.datasource.Uid)
|
||||
}
|
||||
}
|
||||
|
||||
instanceSettings, err := adapters.ModelToInstanceSettings(ds, s.decryptSecureJsonDataFn(ctx))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -208,7 +215,7 @@ func (s *Service) handleQueryData(ctx context.Context, user *user.SignedInUser,
|
||||
}
|
||||
}
|
||||
|
||||
for _, q := range parsedReq.parsedQueries {
|
||||
for _, q := range queries {
|
||||
req.Queries = append(req.Queries, q.query)
|
||||
}
|
||||
|
||||
@@ -220,14 +227,24 @@ func (s *Service) handleQueryData(ctx context.Context, user *user.SignedInUser,
|
||||
type parsedQuery struct {
|
||||
datasource *datasources.DataSource
|
||||
query backend.DataQuery
|
||||
rawQuery *simplejson.Json
|
||||
}
|
||||
|
||||
type parsedRequest struct {
|
||||
hasExpression bool
|
||||
parsedQueries []parsedQuery
|
||||
parsedQueries map[string][]parsedQuery
|
||||
httpRequest *http.Request
|
||||
}
|
||||
|
||||
func (pr parsedRequest) getFlattenedQueries() []parsedQuery {
|
||||
queries := make([]parsedQuery, 0)
|
||||
for _, pq := range pr.parsedQueries {
|
||||
queries = append(queries, pq...)
|
||||
}
|
||||
return queries
|
||||
}
|
||||
|
||||
// parseRequest parses a request into parsed queries grouped by datasource uid
|
||||
func (s *Service) parseMetricRequest(ctx context.Context, user *user.SignedInUser, skipCache bool, reqDTO dtos.MetricRequest) (*parsedRequest, error) {
|
||||
if len(reqDTO.Queries) == 0 {
|
||||
return nil, ErrNoQueriesFound
|
||||
@@ -236,10 +253,10 @@ func (s *Service) parseMetricRequest(ctx context.Context, user *user.SignedInUse
|
||||
timeRange := legacydata.NewDataTimeRange(reqDTO.From, reqDTO.To)
|
||||
req := &parsedRequest{
|
||||
hasExpression: false,
|
||||
parsedQueries: []parsedQuery{},
|
||||
parsedQueries: make(map[string][]parsedQuery),
|
||||
}
|
||||
|
||||
// Parse the queries
|
||||
// Parse the queries and store them by datasource
|
||||
datasourcesByUid := map[string]*datasources.DataSource{}
|
||||
for _, query := range reqDTO.Queries {
|
||||
ds, err := s.getDataSourceFromQuery(ctx, user, skipCache, query, datasourcesByUid)
|
||||
@@ -255,6 +272,10 @@ func (s *Service) parseMetricRequest(ctx context.Context, user *user.SignedInUse
|
||||
req.hasExpression = true
|
||||
}
|
||||
|
||||
if _, ok := req.parsedQueries[ds.Uid]; !ok {
|
||||
req.parsedQueries[ds.Uid] = []parsedQuery{}
|
||||
}
|
||||
|
||||
s.log.Debug("Processing metrics query", "query", query)
|
||||
|
||||
modelJSON, err := query.MarshalJSON()
|
||||
@@ -262,7 +283,7 @@ func (s *Service) parseMetricRequest(ctx context.Context, user *user.SignedInUse
|
||||
return nil, err
|
||||
}
|
||||
|
||||
req.parsedQueries = append(req.parsedQueries, parsedQuery{
|
||||
req.parsedQueries[ds.Uid] = append(req.parsedQueries[ds.Uid], parsedQuery{
|
||||
datasource: ds,
|
||||
query: backend.DataQuery{
|
||||
TimeRange: backend.TimeRange{
|
||||
@@ -275,16 +296,10 @@ func (s *Service) parseMetricRequest(ctx context.Context, user *user.SignedInUse
|
||||
QueryType: query.Get("queryType").MustString(""),
|
||||
JSON: modelJSON,
|
||||
},
|
||||
rawQuery: query,
|
||||
})
|
||||
}
|
||||
|
||||
if !req.hasExpression {
|
||||
if len(datasourcesByUid) > 1 {
|
||||
// We do not (yet) support mixed query type
|
||||
return nil, ErrMultipleDatasources
|
||||
}
|
||||
}
|
||||
|
||||
if reqDTO.HTTPRequest != nil {
|
||||
req.httpRequest = reqDTO.HTTPRequest
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user