Compare commits
3 Commits
sriram/SQL
...
gareth/ope
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
045b33c040 | ||
|
|
32023fa243 | ||
|
|
6a0f6eee30 |
@@ -84,7 +84,7 @@ func TestIntegrationOpenTSDB(t *testing.T) {
|
|||||||
// nolint:gosec
|
// nolint:gosec
|
||||||
resp, err := http.Post(u, "application/json", buf1)
|
resp, err := http.Post(u, "application/json", buf1)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, http.StatusInternalServerError, resp.StatusCode)
|
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
err := resp.Body.Close()
|
err := resp.Body.Close()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|||||||
@@ -3,10 +3,12 @@ package opentsdb
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"path"
|
"path"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
|
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
|
||||||
@@ -165,28 +167,52 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
|
|||||||
|
|
||||||
dsInfo, err := s.getDSInfo(ctx, req.PluginContext)
|
dsInfo, err := s.getDSInfo(ctx, req.PluginContext)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
resp := backend.NewQueryDataResponse()
|
||||||
|
for _, q := range req.Queries {
|
||||||
|
resp.Responses[q.RefID] = backend.ErrorResponseWithErrorSource(backend.PluginError(err))
|
||||||
|
}
|
||||||
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
result := backend.NewQueryDataResponse()
|
result := backend.NewQueryDataResponse()
|
||||||
|
|
||||||
for _, query := range req.Queries {
|
for _, query := range req.Queries {
|
||||||
|
metric, err := BuildMetric(query)
|
||||||
|
if err != nil {
|
||||||
|
result.Responses[query.RefID] = backend.ErrorResponseWithErrorSource(backend.PluginError(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
tsdbQuery := OpenTsdbQuery{
|
tsdbQuery := OpenTsdbQuery{
|
||||||
Start: query.TimeRange.From.Unix(),
|
Start: query.TimeRange.From.Unix(),
|
||||||
End: query.TimeRange.To.Unix(),
|
End: query.TimeRange.To.Unix(),
|
||||||
Queries: []map[string]any{
|
Queries: []map[string]any{
|
||||||
BuildMetric(query),
|
metric,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
httpReq, err := CreateRequest(ctx, logger, dsInfo, tsdbQuery)
|
httpReq, err := CreateRequest(ctx, logger, dsInfo, tsdbQuery)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
var urlErr *url.Error
|
||||||
|
if errors.As(err, &urlErr) {
|
||||||
|
result.Responses[query.RefID] = backend.ErrorResponseWithErrorSource(backend.DownstreamError(err))
|
||||||
|
} else {
|
||||||
|
result.Responses[query.RefID] = backend.ErrorResponseWithErrorSource(backend.PluginError(err))
|
||||||
|
}
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
httpRes, err := dsInfo.HTTPClient.Do(httpReq)
|
httpRes, err := dsInfo.HTTPClient.Do(httpReq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
if backend.IsDownstreamHTTPError(err) {
|
||||||
|
err = backend.DownstreamError(err)
|
||||||
|
}
|
||||||
|
var urlErr *url.Error
|
||||||
|
if errors.As(err, &urlErr) && urlErr.Err != nil && strings.HasPrefix(urlErr.Err.Error(), "unsupported protocol scheme") {
|
||||||
|
err = backend.DownstreamError(err)
|
||||||
|
}
|
||||||
|
result.Responses[query.RefID] = backend.ErrorResponseWithErrorSource(err)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
@@ -197,7 +223,8 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
|
|||||||
|
|
||||||
queryRes, err := ParseResponse(logger, httpRes, query.RefID, dsInfo.TSDBVersion)
|
queryRes, err := ParseResponse(logger, httpRes, query.RefID, dsInfo.TSDBVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
result.Responses[query.RefID] = backend.ErrorResponseWithErrorSource(backend.DownstreamError(err))
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
result.Responses[query.RefID] = queryRes.Responses[query.RefID]
|
result.Responses[query.RefID] = queryRes.Responses[query.RefID]
|
||||||
|
|||||||
@@ -86,7 +86,7 @@ func TestBuildMetric(t *testing.T) {
|
|||||||
Interval: 30 * time.Second,
|
Interval: 30 * time.Second,
|
||||||
}
|
}
|
||||||
|
|
||||||
metric := BuildMetric(query)
|
metric, _ := BuildMetric(query)
|
||||||
require.Equal(t, "30s-avg", metric["downsample"], "should use query interval formatted as seconds")
|
require.Equal(t, "30s-avg", metric["downsample"], "should use query interval formatted as seconds")
|
||||||
})
|
})
|
||||||
|
|
||||||
@@ -104,7 +104,7 @@ func TestBuildMetric(t *testing.T) {
|
|||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
metric := BuildMetric(query)
|
metric, _ := BuildMetric(query)
|
||||||
require.Equal(t, "500ms-avg", metric["downsample"], "should convert 0.5s to 500ms")
|
require.Equal(t, "500ms-avg", metric["downsample"], "should convert 0.5s to 500ms")
|
||||||
})
|
})
|
||||||
|
|
||||||
@@ -123,7 +123,7 @@ func TestBuildMetric(t *testing.T) {
|
|||||||
Interval: 500 * time.Millisecond,
|
Interval: 500 * time.Millisecond,
|
||||||
}
|
}
|
||||||
|
|
||||||
metric := BuildMetric(query)
|
metric, _ := BuildMetric(query)
|
||||||
require.Equal(t, "500ms-avg", metric["downsample"], "should use query interval formatted as milliseconds")
|
require.Equal(t, "500ms-avg", metric["downsample"], "should use query interval formatted as milliseconds")
|
||||||
})
|
})
|
||||||
|
|
||||||
@@ -142,7 +142,7 @@ func TestBuildMetric(t *testing.T) {
|
|||||||
Interval: 5 * time.Minute,
|
Interval: 5 * time.Minute,
|
||||||
}
|
}
|
||||||
|
|
||||||
metric := BuildMetric(query)
|
metric, _ := BuildMetric(query)
|
||||||
require.Equal(t, "5m-sum", metric["downsample"], "should use query interval formatted as minutes")
|
require.Equal(t, "5m-sum", metric["downsample"], "should use query interval formatted as minutes")
|
||||||
})
|
})
|
||||||
|
|
||||||
@@ -161,7 +161,7 @@ func TestBuildMetric(t *testing.T) {
|
|||||||
Interval: 2 * time.Hour,
|
Interval: 2 * time.Hour,
|
||||||
}
|
}
|
||||||
|
|
||||||
metric := BuildMetric(query)
|
metric, _ := BuildMetric(query)
|
||||||
require.Equal(t, "2h-max", metric["downsample"], "should use query interval formatted as hours")
|
require.Equal(t, "2h-max", metric["downsample"], "should use query interval formatted as hours")
|
||||||
})
|
})
|
||||||
|
|
||||||
@@ -180,7 +180,7 @@ func TestBuildMetric(t *testing.T) {
|
|||||||
Interval: 48 * time.Hour,
|
Interval: 48 * time.Hour,
|
||||||
}
|
}
|
||||||
|
|
||||||
metric := BuildMetric(query)
|
metric, _ := BuildMetric(query)
|
||||||
require.Equal(t, "2d-min", metric["downsample"], "should use query interval formatted as days")
|
require.Equal(t, "2d-min", metric["downsample"], "should use query interval formatted as days")
|
||||||
})
|
})
|
||||||
|
|
||||||
@@ -199,7 +199,7 @@ func TestBuildMetric(t *testing.T) {
|
|||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
metric := BuildMetric(query)
|
metric, _ := BuildMetric(query)
|
||||||
require.True(t, metric["explicitTags"].(bool), "explicitTags should be true")
|
require.True(t, metric["explicitTags"].(bool), "explicitTags should be true")
|
||||||
|
|
||||||
metricTags := metric["tags"].(map[string]any)
|
metricTags := metric["tags"].(map[string]any)
|
||||||
@@ -221,7 +221,7 @@ func TestBuildMetric(t *testing.T) {
|
|||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
metric := BuildMetric(query)
|
metric, _ := BuildMetric(query)
|
||||||
require.Nil(t, metric["explicitTags"], "explicitTags should not be present when false")
|
require.Nil(t, metric["explicitTags"], "explicitTags should not be present when false")
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -501,7 +501,7 @@ func TestOpenTsdbExecutor(t *testing.T) {
|
|||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
metric := BuildMetric(query)
|
metric, _ := BuildMetric(query)
|
||||||
|
|
||||||
require.Len(t, metric, 3)
|
require.Len(t, metric, 3)
|
||||||
require.Equal(t, "cpu.average.percent", metric["metric"])
|
require.Equal(t, "cpu.average.percent", metric["metric"])
|
||||||
@@ -523,7 +523,7 @@ func TestOpenTsdbExecutor(t *testing.T) {
|
|||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
metric := BuildMetric(query)
|
metric, _ := BuildMetric(query)
|
||||||
|
|
||||||
require.Len(t, metric, 2)
|
require.Len(t, metric, 2)
|
||||||
require.Equal(t, "cpu.average.percent", metric["metric"])
|
require.Equal(t, "cpu.average.percent", metric["metric"])
|
||||||
@@ -544,7 +544,7 @@ func TestOpenTsdbExecutor(t *testing.T) {
|
|||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
metric := BuildMetric(query)
|
metric, _ := BuildMetric(query)
|
||||||
|
|
||||||
require.Len(t, metric, 3)
|
require.Len(t, metric, 3)
|
||||||
require.Equal(t, "cpu.average.percent", metric["metric"])
|
require.Equal(t, "cpu.average.percent", metric["metric"])
|
||||||
@@ -570,7 +570,7 @@ func TestOpenTsdbExecutor(t *testing.T) {
|
|||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
metric := BuildMetric(query)
|
metric, _ := BuildMetric(query)
|
||||||
|
|
||||||
require.Len(t, metric, 3)
|
require.Len(t, metric, 3)
|
||||||
require.Equal(t, "cpu.average.percent", metric["metric"])
|
require.Equal(t, "cpu.average.percent", metric["metric"])
|
||||||
@@ -601,7 +601,7 @@ func TestOpenTsdbExecutor(t *testing.T) {
|
|||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
metric := BuildMetric(query)
|
metric, _ := BuildMetric(query)
|
||||||
|
|
||||||
require.Len(t, metric, 5)
|
require.Len(t, metric, 5)
|
||||||
require.Equal(t, "cpu.average.percent", metric["metric"])
|
require.Equal(t, "cpu.average.percent", metric["metric"])
|
||||||
@@ -636,7 +636,7 @@ func TestOpenTsdbExecutor(t *testing.T) {
|
|||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
metric := BuildMetric(query)
|
metric, _ := BuildMetric(query)
|
||||||
t.Log(metric)
|
t.Log(metric)
|
||||||
|
|
||||||
require.Len(t, metric, 5)
|
require.Len(t, metric, 5)
|
||||||
|
|||||||
@@ -44,12 +44,12 @@ func FormatDownsampleInterval(ms int64) string {
|
|||||||
return strconv.FormatInt(days, 10) + "d"
|
return strconv.FormatInt(days, 10) + "d"
|
||||||
}
|
}
|
||||||
|
|
||||||
func BuildMetric(query backend.DataQuery) map[string]any {
|
func BuildMetric(query backend.DataQuery) (map[string]any, error) {
|
||||||
metric := make(map[string]any)
|
metric := make(map[string]any)
|
||||||
|
|
||||||
var model QueryModel
|
var model QueryModel
|
||||||
if err := json.Unmarshal(query.JSON, &model); err != nil {
|
if err := json.Unmarshal(query.JSON, &model); err != nil {
|
||||||
return nil
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Setting metric and aggregator
|
// Setting metric and aggregator
|
||||||
@@ -126,7 +126,7 @@ func BuildMetric(query backend.DataQuery) map[string]any {
|
|||||||
metric["explicitTags"] = true
|
metric["explicitTags"] = true
|
||||||
}
|
}
|
||||||
|
|
||||||
return metric
|
return metric, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func CreateRequest(ctx context.Context, logger log.Logger, dsInfo *datasourceInfo, data OpenTsdbQuery) (*http.Request, error) {
|
func CreateRequest(ctx context.Context, logger log.Logger, dsInfo *datasourceInfo, data OpenTsdbQuery) (*http.Request, error) {
|
||||||
|
|||||||
Reference in New Issue
Block a user