Compare commits
11 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| b359875619 | |||
| a8a16a0c0c | |||
| 36ef5e3994 | |||
| f136b23a7e | |||
| 9f28da85e2 | |||
| 9be719c9aa | |||
| f1e388ff4e | |||
| a30a5f2022 | |||
| a3b1c68ac0 | |||
| e347d500d0 | |||
| 88bc65d383 |
@@ -8,7 +8,7 @@ import (
|
||||
"io"
|
||||
|
||||
"github.com/grafana/grafana/pkg/tsdb/tempo/traceql"
|
||||
"google.golang.org/grpc/metadata"
|
||||
stream_utils "github.com/grafana/grafana/pkg/tsdb/tempo/utils"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/tracing"
|
||||
@@ -64,10 +64,7 @@ func (s *Service) runMetricsStream(ctx context.Context, req *backend.RunStreamRe
|
||||
qrr.Start = uint64(backendQuery.TimeRange.From.UnixNano())
|
||||
qrr.End = uint64(backendQuery.TimeRange.To.UnixNano())
|
||||
|
||||
// Setting the user agent for the gRPC call. When DS is decoupled we don't recreate instance when grafana config
|
||||
// changes or updates, so we have to get it from context.
|
||||
// Ideally this would be pushed higher, so it's set once for all rpc calls, but we have only one now.
|
||||
ctx = metadata.AppendToOutgoingContext(ctx, "User-Agent", backend.UserAgentFromContext(ctx).String())
|
||||
ctx = stream_utils.AppendHeadersToOutgoingContext(ctx, req)
|
||||
|
||||
if isInstantQuery(tempoQuery.MetricsQueryType) {
|
||||
instantQuery := &tempopb.QueryInstantRequest{
|
||||
|
||||
@@ -7,12 +7,11 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"google.golang.org/grpc/metadata"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/tracing"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana/pkg/tsdb/tempo/kinds/dataquery"
|
||||
stream_utils "github.com/grafana/grafana/pkg/tsdb/tempo/utils"
|
||||
"github.com/grafana/tempo/pkg/tempopb"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
@@ -34,7 +33,6 @@ type StreamSender interface {
|
||||
func (s *Service) runSearchStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender, datasource *DatasourceInfo) error {
|
||||
ctx, span := tracing.DefaultTracer().Start(ctx, "datasource.tempo.runSearchStream")
|
||||
defer span.End()
|
||||
|
||||
response := &backend.DataResponse{}
|
||||
|
||||
var backendQuery *backend.DataQuery
|
||||
@@ -62,10 +60,7 @@ func (s *Service) runSearchStream(ctx context.Context, req *backend.RunStreamReq
|
||||
sr.Start = uint32(backendQuery.TimeRange.From.Unix())
|
||||
sr.End = uint32(backendQuery.TimeRange.To.Unix())
|
||||
|
||||
// Setting the user agent for the gRPC call. When DS is decoupled we don't recreate instance when grafana config
|
||||
// changes or updates, so we have to get it from context.
|
||||
// Ideally this would be pushed higher, so it's set once for all rpc calls, but we have only one now.
|
||||
ctx = metadata.AppendToOutgoingContext(ctx, "User-Agent", backend.UserAgentFromContext(ctx).String())
|
||||
ctx = stream_utils.AppendHeadersToOutgoingContext(ctx, req)
|
||||
|
||||
stream, err := datasource.StreamingClient.Search(ctx, sr)
|
||||
if err != nil {
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"strings"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
stream_utils "github.com/grafana/grafana/pkg/tsdb/tempo/utils"
|
||||
)
|
||||
|
||||
func (s *Service) SubscribeStream(_ context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
|
||||
@@ -39,11 +40,18 @@ func (s *Service) PublishStream(_ context.Context, _ *backend.PublishStreamReque
|
||||
|
||||
func (s *Service) RunStream(ctx context.Context, request *backend.RunStreamRequest, sender *backend.StreamSender) error {
|
||||
s.logger.Debug("New stream call", "path", request.Path)
|
||||
tempoDatasource, err := s.getDSInfo(ctx, request.PluginContext)
|
||||
tempoDatasource, dsInfoErr := s.getDSInfo(ctx, request.PluginContext)
|
||||
|
||||
// get incoming and team http headers and append to stream request.
|
||||
headers, err := stream_utils.SetHeadersFromIncomingContext(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
request.Headers = headers
|
||||
|
||||
if strings.HasPrefix(request.Path, SearchPathPrefix) {
|
||||
if err != nil {
|
||||
return backend.DownstreamErrorf("failed to get datasource information: %w", err)
|
||||
if dsInfoErr != nil {
|
||||
return backend.DownstreamErrorf("failed to get datasource information: %w", dsInfoErr)
|
||||
}
|
||||
if err = s.runSearchStream(ctx, request, sender, tempoDatasource); err != nil {
|
||||
return sendError(err, sender)
|
||||
@@ -52,8 +60,8 @@ func (s *Service) RunStream(ctx context.Context, request *backend.RunStreamReque
|
||||
}
|
||||
}
|
||||
if strings.HasPrefix(request.Path, MetricsPathPrefix) {
|
||||
if err != nil {
|
||||
return backend.DownstreamErrorf("failed to get datasource information: %w", err)
|
||||
if dsInfoErr != nil {
|
||||
return backend.DownstreamErrorf("failed to get datasource information: %w", dsInfoErr)
|
||||
}
|
||||
if err = s.runMetricsStream(ctx, request, sender, tempoDatasource); err != nil {
|
||||
return sendError(err, sender)
|
||||
|
||||
@@ -0,0 +1,121 @@
|
||||
package stream_utils
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"google.golang.org/grpc/metadata"
|
||||
)
|
||||
|
||||
// Appends incoming request headers to the outgoing context to make sure none are lost when we make the request to tempo.
|
||||
func AppendHeadersToOutgoingContext(ctx context.Context, req *backend.RunStreamRequest) context.Context {
|
||||
// append all incoming headers
|
||||
for key, value := range req.Headers {
|
||||
ctx = metadata.AppendToOutgoingContext(ctx, key, value)
|
||||
}
|
||||
// Setting the user agent for the gRPC call. When DS is decoupled we don't recreate instance when grafana config
|
||||
// changes or updates, so we have to get it from context.
|
||||
// Ideally this would be pushed higher, so it's set once for all rpc calls, but we have only one now.
|
||||
ctx = metadata.AppendToOutgoingContext(ctx, "User-Agent", backend.UserAgentFromContext(ctx).String())
|
||||
return ctx
|
||||
}
|
||||
|
||||
// When we receive a new query request we should make sure that all incoming HTTP headers are being forwarding to the grpc stream request
|
||||
// this is to make sure that no headers are lost when we make the actual call to Tempo later on.
|
||||
func SetHeadersFromIncomingContext(ctx context.Context) (map[string]string, error) {
|
||||
// get the plugin from context
|
||||
plugin := backend.PluginConfigFromContext(ctx)
|
||||
|
||||
// get the HTTP headers
|
||||
teamHeaders, error := getTeamHTTPHeaders(plugin)
|
||||
if error != nil {
|
||||
return nil, error
|
||||
}
|
||||
|
||||
// get the rest of the incoming headers
|
||||
headers, err := getClientOptionsHeaders(ctx, plugin)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for key, value := range teamHeaders {
|
||||
headers[key] = value
|
||||
}
|
||||
return headers, nil
|
||||
}
|
||||
|
||||
func getTeamHTTPHeaders(plugin backend.PluginContext) (map[string]string, error) {
|
||||
headers := map[string]string{}
|
||||
// Grab the JSON data from the datasource instance settings
|
||||
jsonData := plugin.DataSourceInstanceSettings.JSONData
|
||||
var data map[string]interface{}
|
||||
err := json.Unmarshal(jsonData, &data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// fetch team http headers
|
||||
if teamHttpHeaders, ok := data["teamHttpHeaders"]; ok {
|
||||
// team headers have the following structure
|
||||
// headers: [<team_id>: [{header: <header_name>, value: <header_value>}]]
|
||||
// header_value is whatever the user has set under LBAC permissions for their given rule.
|
||||
if lbacHeaders, ok := teamHttpHeaders.(map[string]interface{})["headers"]; ok {
|
||||
headerMap := lbacHeaders.(map[string]interface{})
|
||||
labelPolicyKey, labelPolicyValue := getLabelPolicyKeyValue(headerMap)
|
||||
|
||||
if labelPolicyKey != "" && labelPolicyValue != "" {
|
||||
headers[labelPolicyKey] = labelPolicyValue
|
||||
}
|
||||
}
|
||||
}
|
||||
return headers, nil
|
||||
}
|
||||
|
||||
func getLabelPolicyKeyValue(headerWithRules map[string]interface{}) (string, string) {
|
||||
labelPolicyKey := ""
|
||||
labelPolicyValue := ""
|
||||
// we go through each teams' rule and ignoring the team, go through their set rules and prepare them to be all appended for the X-Prom-Label-Policy header value
|
||||
// the result will be a comma separated list of the rules:
|
||||
// "<rule_num>:<rule_value>, <rule_num>:<rule_value>"
|
||||
for _, accessRuleValue := range headerWithRules {
|
||||
rules := accessRuleValue.([]interface{})
|
||||
for _, accessRule := range rules {
|
||||
header := accessRule.(map[string]interface{})
|
||||
for key, value := range header {
|
||||
// for now, team headers only contain a single header key value, but in case in the future more are introduced, we make sure we only set the one we care about.
|
||||
if key == "header" && value == "X-Prom-Label-Policy" {
|
||||
labelPolicyKey = value.(string)
|
||||
continue
|
||||
}
|
||||
if key == "value" {
|
||||
if valueStr, ok := value.(string); ok {
|
||||
if labelPolicyValue == "" {
|
||||
labelPolicyValue = valueStr
|
||||
} else {
|
||||
labelPolicyValue += "," + valueStr
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return labelPolicyKey, labelPolicyValue
|
||||
}
|
||||
|
||||
func getClientOptionsHeaders(ctx context.Context, plugin backend.PluginContext) (map[string]string, error) {
|
||||
headers := map[string]string{}
|
||||
opts, err := plugin.DataSourceInstanceSettings.HTTPClientOptions(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get HTTP client options: %w", err)
|
||||
}
|
||||
|
||||
for name, values := range opts.Header {
|
||||
for _, value := range values {
|
||||
headers[name] = value
|
||||
}
|
||||
}
|
||||
return headers, nil
|
||||
}
|
||||
@@ -0,0 +1,149 @@
|
||||
package stream_utils
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/useragent"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc/metadata"
|
||||
)
|
||||
|
||||
func TestAppendHeadersToOutgoingContext_AppendsHeadersAndUserAgent(t *testing.T) {
|
||||
ctx := context.TODO()
|
||||
ua, err := useragent.New("10.0.0", "linux", "amd64")
|
||||
require.NoError(t, err)
|
||||
ctx = backend.WithUserAgent(ctx, ua)
|
||||
ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("Existing", "one"))
|
||||
|
||||
req := &backend.RunStreamRequest{
|
||||
Headers: map[string]string{
|
||||
"X-Test": "value",
|
||||
},
|
||||
}
|
||||
|
||||
out := AppendHeadersToOutgoingContext(ctx, req)
|
||||
outgoingMD, ok := metadata.FromOutgoingContext(out)
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, []string{"value"}, outgoingMD.Get("x-test"))
|
||||
assert.Equal(t, []string{ua.String()}, outgoingMD.Get("user-agent"))
|
||||
assert.Equal(t, []string{"one"}, outgoingMD.Get("existing"))
|
||||
}
|
||||
|
||||
func TestSetHeadersFromIncomingContext_MergesTeamAndClientHeaders(t *testing.T) {
|
||||
jsonData := []byte(`{
|
||||
"teamHttpHeaders": {
|
||||
"headers": {
|
||||
"101": [
|
||||
{"header": "X-Prom-Label-Policy", "value": "1:team-value"},
|
||||
{"header": "X-Prom-Label-Policy", "value": "2:team-wins"}
|
||||
]
|
||||
}
|
||||
},
|
||||
"httpHeaderName1": "X-Client",
|
||||
"httpHeaderName2": "X-Shared"
|
||||
}`)
|
||||
|
||||
pluginCtx := backend.PluginContext{
|
||||
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{
|
||||
JSONData: jsonData,
|
||||
DecryptedSecureJSONData: map[string]string{
|
||||
"httpHeaderValue1": "client-value",
|
||||
"httpHeaderValue2": "client-overridden",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
ctx := backend.WithPluginContext(context.Background(), pluginCtx)
|
||||
headers, err := SetHeadersFromIncomingContext(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
expected := map[string]string{
|
||||
"X-Client": "client-value",
|
||||
"X-Prom-Label-Policy": "1:team-value,2:team-wins",
|
||||
"X-Shared": "client-overridden",
|
||||
}
|
||||
assert.Equal(t, expected, headers)
|
||||
}
|
||||
|
||||
func TestGetTeamHTTPHeaders_NoTeamHeaders(t *testing.T) {
|
||||
pluginCtx := backend.PluginContext{
|
||||
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{
|
||||
JSONData: []byte(`{"httpHeaderName1": "X-Client"}`),
|
||||
},
|
||||
}
|
||||
|
||||
headers, err := getTeamHTTPHeaders(pluginCtx)
|
||||
require.NoError(t, err)
|
||||
assert.Empty(t, headers)
|
||||
}
|
||||
|
||||
func TestGetTeamHTTPHeaders_LabelPolicyValue(t *testing.T) {
|
||||
pluginCtx := backend.PluginContext{
|
||||
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{
|
||||
JSONData: []byte(`{
|
||||
"teamHttpHeaders": {
|
||||
"headers": {
|
||||
"101": [
|
||||
{"header": "X-Prom-Label-Policy", "value": "1:team-value"},
|
||||
{"header": "X-Prom-Label-Policy", "value": "2:team-wins"}
|
||||
]
|
||||
}
|
||||
}
|
||||
}`),
|
||||
},
|
||||
}
|
||||
|
||||
headers, err := getTeamHTTPHeaders(pluginCtx)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, map[string]string{
|
||||
"X-Prom-Label-Policy": "1:team-value,2:team-wins",
|
||||
}, headers)
|
||||
}
|
||||
|
||||
func TestGetLabelPolicyKeyValue_AppendsValues(t *testing.T) {
|
||||
headerWithRules := map[string]interface{}{
|
||||
"101": []interface{}{
|
||||
map[string]interface{}{
|
||||
"header": "X-Prom-Label-Policy",
|
||||
"value": "1:alpha",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"header": "X-Prom-Label-Policy",
|
||||
"value": "2:beta",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
key, value := getLabelPolicyKeyValue(headerWithRules)
|
||||
assert.Equal(t, "X-Prom-Label-Policy", key)
|
||||
assert.Equal(t, "1:alpha,2:beta", value)
|
||||
}
|
||||
|
||||
func TestGetClientOptionsHeaders_ParsesHeaders(t *testing.T) {
|
||||
pluginCtx := backend.PluginContext{
|
||||
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{
|
||||
JSONData: []byte(`{"httpHeaderName1": "X-Client"}`),
|
||||
DecryptedSecureJSONData: map[string]string{
|
||||
"httpHeaderValue1": "client-value",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
headers, err := getClientOptionsHeaders(context.Background(), pluginCtx)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, map[string]string{"X-Client": "client-value"}, headers)
|
||||
}
|
||||
|
||||
func TestGetClientOptionsHeaders_InvalidJSON(t *testing.T) {
|
||||
pluginCtx := backend.PluginContext{
|
||||
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{
|
||||
JSONData: []byte("{"),
|
||||
},
|
||||
}
|
||||
|
||||
_, err := getClientOptionsHeaders(context.Background(), pluginCtx)
|
||||
require.Error(t, err)
|
||||
}
|
||||
Reference in New Issue
Block a user