Compare commits

...

11 Commits

Author SHA1 Message Date
Jocelyn Collado-Kuri b359875619 Merge branch 'main' into jck/tempo-forward-team-headers 2026-01-12 13:20:35 -08:00
Jocelyn Collado-Kuri a8a16a0c0c use helper functions to extract team headers instead of core functions 2026-01-12 13:09:59 -08:00
Jocelyn Collado-Kuri 36ef5e3994 rename utils for clarity 2026-01-12 13:09:58 -08:00
Jocelyn Collado-Kuri f136b23a7e remove redundant assignments 2026-01-09 15:04:57 -08:00
Jocelyn Collado-Kuri 9f28da85e2 tests 2026-01-09 14:41:44 -08:00
Jocelyn Collado-Kuri 9be719c9aa clean up code into helper functions and utils file 2026-01-09 12:56:42 -08:00
Jocelyn Collado-Kuri f1e388ff4e Merge branch 'main' into jck/tempo-fix-streaming 2026-01-09 09:28:58 -08:00
Jocelyn Collado-Kuri a30a5f2022 Merge branch 'main' into jck/tempo-fix-streaming 2026-01-08 11:53:45 -08:00
Jocelyn Collado-Kuri a3b1c68ac0 Merge branch 'main' into jck/tempo-fix-streaming 2026-01-07 13:34:47 -08:00
Jocelyn Collado-Kuri e347d500d0 grab team headers from JSON data in datasource instance settings 2026-01-07 13:34:09 -08:00
Jocelyn Collado-Kuri 88bc65d383 add headers to streaming 2026-01-07 06:54:37 -08:00
5 changed files with 287 additions and 17 deletions
+2 -5
View File
@@ -8,7 +8,7 @@ import (
"io"
"github.com/grafana/grafana/pkg/tsdb/tempo/traceql"
"google.golang.org/grpc/metadata"
stream_utils "github.com/grafana/grafana/pkg/tsdb/tempo/utils"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/tracing"
@@ -64,10 +64,7 @@ func (s *Service) runMetricsStream(ctx context.Context, req *backend.RunStreamRe
qrr.Start = uint64(backendQuery.TimeRange.From.UnixNano())
qrr.End = uint64(backendQuery.TimeRange.To.UnixNano())
// Setting the user agent for the gRPC call. When DS is decoupled we don't recreate instance when grafana config
// changes or updates, so we have to get it from context.
// Ideally this would be pushed higher, so it's set once for all rpc calls, but we have only one now.
ctx = metadata.AppendToOutgoingContext(ctx, "User-Agent", backend.UserAgentFromContext(ctx).String())
ctx = stream_utils.AppendHeadersToOutgoingContext(ctx, req)
if isInstantQuery(tempoQuery.MetricsQueryType) {
instantQuery := &tempopb.QueryInstantRequest{
+2 -7
View File
@@ -7,12 +7,11 @@ import (
"fmt"
"io"
"google.golang.org/grpc/metadata"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/tracing"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/tsdb/tempo/kinds/dataquery"
stream_utils "github.com/grafana/grafana/pkg/tsdb/tempo/utils"
"github.com/grafana/tempo/pkg/tempopb"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
@@ -34,7 +33,6 @@ type StreamSender interface {
func (s *Service) runSearchStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender, datasource *DatasourceInfo) error {
ctx, span := tracing.DefaultTracer().Start(ctx, "datasource.tempo.runSearchStream")
defer span.End()
response := &backend.DataResponse{}
var backendQuery *backend.DataQuery
@@ -62,10 +60,7 @@ func (s *Service) runSearchStream(ctx context.Context, req *backend.RunStreamReq
sr.Start = uint32(backendQuery.TimeRange.From.Unix())
sr.End = uint32(backendQuery.TimeRange.To.Unix())
// Setting the user agent for the gRPC call. When DS is decoupled we don't recreate instance when grafana config
// changes or updates, so we have to get it from context.
// Ideally this would be pushed higher, so it's set once for all rpc calls, but we have only one now.
ctx = metadata.AppendToOutgoingContext(ctx, "User-Agent", backend.UserAgentFromContext(ctx).String())
ctx = stream_utils.AppendHeadersToOutgoingContext(ctx, req)
stream, err := datasource.StreamingClient.Search(ctx, sr)
if err != nil {
+13 -5
View File
@@ -6,6 +6,7 @@ import (
"strings"
"github.com/grafana/grafana-plugin-sdk-go/backend"
stream_utils "github.com/grafana/grafana/pkg/tsdb/tempo/utils"
)
func (s *Service) SubscribeStream(_ context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
@@ -39,11 +40,18 @@ func (s *Service) PublishStream(_ context.Context, _ *backend.PublishStreamReque
func (s *Service) RunStream(ctx context.Context, request *backend.RunStreamRequest, sender *backend.StreamSender) error {
s.logger.Debug("New stream call", "path", request.Path)
tempoDatasource, err := s.getDSInfo(ctx, request.PluginContext)
tempoDatasource, dsInfoErr := s.getDSInfo(ctx, request.PluginContext)
// get incoming and team http headers and append to stream request.
headers, err := stream_utils.SetHeadersFromIncomingContext(ctx)
if err != nil {
return err
}
request.Headers = headers
if strings.HasPrefix(request.Path, SearchPathPrefix) {
if err != nil {
return backend.DownstreamErrorf("failed to get datasource information: %w", err)
if dsInfoErr != nil {
return backend.DownstreamErrorf("failed to get datasource information: %w", dsInfoErr)
}
if err = s.runSearchStream(ctx, request, sender, tempoDatasource); err != nil {
return sendError(err, sender)
@@ -52,8 +60,8 @@ func (s *Service) RunStream(ctx context.Context, request *backend.RunStreamReque
}
}
if strings.HasPrefix(request.Path, MetricsPathPrefix) {
if err != nil {
return backend.DownstreamErrorf("failed to get datasource information: %w", err)
if dsInfoErr != nil {
return backend.DownstreamErrorf("failed to get datasource information: %w", dsInfoErr)
}
if err = s.runMetricsStream(ctx, request, sender, tempoDatasource); err != nil {
return sendError(err, sender)
+121
View File
@@ -0,0 +1,121 @@
package stream_utils
import (
"context"
"encoding/json"
"fmt"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"google.golang.org/grpc/metadata"
)
// Appends incoming request headers to the outgoing context to make sure none are lost when we make the request to tempo.
func AppendHeadersToOutgoingContext(ctx context.Context, req *backend.RunStreamRequest) context.Context {
// append all incoming headers
for key, value := range req.Headers {
ctx = metadata.AppendToOutgoingContext(ctx, key, value)
}
// Setting the user agent for the gRPC call. When DS is decoupled we don't recreate instance when grafana config
// changes or updates, so we have to get it from context.
// Ideally this would be pushed higher, so it's set once for all rpc calls, but we have only one now.
ctx = metadata.AppendToOutgoingContext(ctx, "User-Agent", backend.UserAgentFromContext(ctx).String())
return ctx
}
// When we receive a new query request we should make sure that all incoming HTTP headers are being forwarding to the grpc stream request
// this is to make sure that no headers are lost when we make the actual call to Tempo later on.
func SetHeadersFromIncomingContext(ctx context.Context) (map[string]string, error) {
// get the plugin from context
plugin := backend.PluginConfigFromContext(ctx)
// get the HTTP headers
teamHeaders, error := getTeamHTTPHeaders(plugin)
if error != nil {
return nil, error
}
// get the rest of the incoming headers
headers, err := getClientOptionsHeaders(ctx, plugin)
if err != nil {
return nil, err
}
for key, value := range teamHeaders {
headers[key] = value
}
return headers, nil
}
func getTeamHTTPHeaders(plugin backend.PluginContext) (map[string]string, error) {
headers := map[string]string{}
// Grab the JSON data from the datasource instance settings
jsonData := plugin.DataSourceInstanceSettings.JSONData
var data map[string]interface{}
err := json.Unmarshal(jsonData, &data)
if err != nil {
return nil, err
}
// fetch team http headers
if teamHttpHeaders, ok := data["teamHttpHeaders"]; ok {
// team headers have the following structure
// headers: [<team_id>: [{header: <header_name>, value: <header_value>}]]
// header_value is whatever the user has set under LBAC permissions for their given rule.
if lbacHeaders, ok := teamHttpHeaders.(map[string]interface{})["headers"]; ok {
headerMap := lbacHeaders.(map[string]interface{})
labelPolicyKey, labelPolicyValue := getLabelPolicyKeyValue(headerMap)
if labelPolicyKey != "" && labelPolicyValue != "" {
headers[labelPolicyKey] = labelPolicyValue
}
}
}
return headers, nil
}
func getLabelPolicyKeyValue(headerWithRules map[string]interface{}) (string, string) {
labelPolicyKey := ""
labelPolicyValue := ""
// we go through each teams' rule and ignoring the team, go through their set rules and prepare them to be all appended for the X-Prom-Label-Policy header value
// the result will be a comma separated list of the rules:
// "<rule_num>:<rule_value>, <rule_num>:<rule_value>"
for _, accessRuleValue := range headerWithRules {
rules := accessRuleValue.([]interface{})
for _, accessRule := range rules {
header := accessRule.(map[string]interface{})
for key, value := range header {
// for now, team headers only contain a single header key value, but in case in the future more are introduced, we make sure we only set the one we care about.
if key == "header" && value == "X-Prom-Label-Policy" {
labelPolicyKey = value.(string)
continue
}
if key == "value" {
if valueStr, ok := value.(string); ok {
if labelPolicyValue == "" {
labelPolicyValue = valueStr
} else {
labelPolicyValue += "," + valueStr
}
}
}
}
}
}
return labelPolicyKey, labelPolicyValue
}
func getClientOptionsHeaders(ctx context.Context, plugin backend.PluginContext) (map[string]string, error) {
headers := map[string]string{}
opts, err := plugin.DataSourceInstanceSettings.HTTPClientOptions(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get HTTP client options: %w", err)
}
for name, values := range opts.Header {
for _, value := range values {
headers[name] = value
}
}
return headers, nil
}
+149
View File
@@ -0,0 +1,149 @@
package stream_utils
import (
"context"
"testing"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/useragent"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/metadata"
)
func TestAppendHeadersToOutgoingContext_AppendsHeadersAndUserAgent(t *testing.T) {
ctx := context.TODO()
ua, err := useragent.New("10.0.0", "linux", "amd64")
require.NoError(t, err)
ctx = backend.WithUserAgent(ctx, ua)
ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("Existing", "one"))
req := &backend.RunStreamRequest{
Headers: map[string]string{
"X-Test": "value",
},
}
out := AppendHeadersToOutgoingContext(ctx, req)
outgoingMD, ok := metadata.FromOutgoingContext(out)
require.True(t, ok)
assert.Equal(t, []string{"value"}, outgoingMD.Get("x-test"))
assert.Equal(t, []string{ua.String()}, outgoingMD.Get("user-agent"))
assert.Equal(t, []string{"one"}, outgoingMD.Get("existing"))
}
func TestSetHeadersFromIncomingContext_MergesTeamAndClientHeaders(t *testing.T) {
jsonData := []byte(`{
"teamHttpHeaders": {
"headers": {
"101": [
{"header": "X-Prom-Label-Policy", "value": "1:team-value"},
{"header": "X-Prom-Label-Policy", "value": "2:team-wins"}
]
}
},
"httpHeaderName1": "X-Client",
"httpHeaderName2": "X-Shared"
}`)
pluginCtx := backend.PluginContext{
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{
JSONData: jsonData,
DecryptedSecureJSONData: map[string]string{
"httpHeaderValue1": "client-value",
"httpHeaderValue2": "client-overridden",
},
},
}
ctx := backend.WithPluginContext(context.Background(), pluginCtx)
headers, err := SetHeadersFromIncomingContext(ctx)
require.NoError(t, err)
expected := map[string]string{
"X-Client": "client-value",
"X-Prom-Label-Policy": "1:team-value,2:team-wins",
"X-Shared": "client-overridden",
}
assert.Equal(t, expected, headers)
}
func TestGetTeamHTTPHeaders_NoTeamHeaders(t *testing.T) {
pluginCtx := backend.PluginContext{
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{
JSONData: []byte(`{"httpHeaderName1": "X-Client"}`),
},
}
headers, err := getTeamHTTPHeaders(pluginCtx)
require.NoError(t, err)
assert.Empty(t, headers)
}
func TestGetTeamHTTPHeaders_LabelPolicyValue(t *testing.T) {
pluginCtx := backend.PluginContext{
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{
JSONData: []byte(`{
"teamHttpHeaders": {
"headers": {
"101": [
{"header": "X-Prom-Label-Policy", "value": "1:team-value"},
{"header": "X-Prom-Label-Policy", "value": "2:team-wins"}
]
}
}
}`),
},
}
headers, err := getTeamHTTPHeaders(pluginCtx)
require.NoError(t, err)
assert.Equal(t, map[string]string{
"X-Prom-Label-Policy": "1:team-value,2:team-wins",
}, headers)
}
func TestGetLabelPolicyKeyValue_AppendsValues(t *testing.T) {
headerWithRules := map[string]interface{}{
"101": []interface{}{
map[string]interface{}{
"header": "X-Prom-Label-Policy",
"value": "1:alpha",
},
map[string]interface{}{
"header": "X-Prom-Label-Policy",
"value": "2:beta",
},
},
}
key, value := getLabelPolicyKeyValue(headerWithRules)
assert.Equal(t, "X-Prom-Label-Policy", key)
assert.Equal(t, "1:alpha,2:beta", value)
}
func TestGetClientOptionsHeaders_ParsesHeaders(t *testing.T) {
pluginCtx := backend.PluginContext{
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{
JSONData: []byte(`{"httpHeaderName1": "X-Client"}`),
DecryptedSecureJSONData: map[string]string{
"httpHeaderValue1": "client-value",
},
},
}
headers, err := getClientOptionsHeaders(context.Background(), pluginCtx)
require.NoError(t, err)
assert.Equal(t, map[string]string{"X-Client": "client-value"}, headers)
}
func TestGetClientOptionsHeaders_InvalidJSON(t *testing.T) {
pluginCtx := backend.PluginContext{
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{
JSONData: []byte("{"),
},
}
_, err := getClientOptionsHeaders(context.Background(), pluginCtx)
require.Error(t, err)
}