From 0b8252fd7c07f57f6bb9d9ae39a51c2317de4ba2 Mon Sep 17 00:00:00 2001 From: Aleksandar Petrov <8142643+aleks-p@users.noreply.github.com> Date: Wed, 28 May 2025 05:42:19 -0300 Subject: [PATCH] Pyroscope: Annotation support for series queries (#104130) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Pyroscope: Add annotations frame to series response * Adapt to API change, add tests * Run make lint-go * Fix conflicts after rebase * Add annotation via a separate data frame * Process annotations fully at the datasource * Add mod owner for go-humanize * Pyroscope: Annotations in Query Response can be optional --------- Co-authored-by: Piotr Jamróz --- go.mod | 6 +- go.sum | 8 +- .../x/GrafanaPyroscopeDataQuery_types.gen.ts | 4 + .../annotations.go | 133 +++++++++++++ .../annotations_test.go | 188 ++++++++++++++++++ .../kinds/dataquery/types_dataquery_gen.go | 2 + .../pyroscopeClient.go | 8 +- .../grafana-pyroscope-datasource/query.go | 48 ++++- .../query_test.go | 149 +++++++++++++- .../dataquery.cue | 2 + .../dataquery.gen.ts | 4 + 11 files changed, 535 insertions(+), 17 deletions(-) create mode 100644 pkg/tsdb/grafana-pyroscope-datasource/annotations.go create mode 100644 pkg/tsdb/grafana-pyroscope-datasource/annotations_test.go diff --git a/go.mod b/go.mod index 094bec5e3ac..9f8ba8bd3ab 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( buf.build/gen/go/parca-dev/parca/protocolbuffers/go v1.34.2-20240902100956-02fd72488966.2 // @grafana/observability-traces-and-profiling cloud.google.com/go/kms v1.20.5 // @grafana/grafana-backend-group cloud.google.com/go/storage v1.50.0 // @grafana/grafana-backend-group - connectrpc.com/connect v1.17.0 // @grafana/observability-traces-and-profiling + connectrpc.com/connect v1.18.1 // @grafana/observability-traces-and-profiling cuelang.org/go v0.11.1 // @grafana/grafana-as-code filippo.io/age v1.2.1 // @grafana/identity-access-team github.com/1NCE-GmbH/grpc-go-pool v0.0.0-20231117122434-2a5bb974daa2 // @grafana/grafana-search-and-storage @@ -97,7 +97,7 @@ require ( github.com/grafana/loki/v3 v3.2.1 // @grafana/observability-logs github.com/grafana/otel-profiling-go v0.5.1 // @grafana/grafana-backend-group github.com/grafana/pyroscope-go/godeltaprof v0.1.8 // @grafana/observability-traces-and-profiling - github.com/grafana/pyroscope/api v1.0.0 // @grafana/observability-traces-and-profiling + github.com/grafana/pyroscope/api v1.2.1-0.20250415190842-3ff7247547ae // @grafana/observability-traces-and-profiling github.com/grafana/tempo v1.5.1-0.20241001135150-ed943d7a56b2 // @grafana/observability-traces-and-profiling github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1 // @grafana/plugins-platform-backend github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2 // @grafana/grafana-backend-group @@ -337,7 +337,7 @@ require ( github.com/dolthub/go-icu-regex v0.0.0-20250327004329-6799764f2dad // indirect github.com/dolthub/jsonpath v0.0.2-0.20240227200619-19675ab05c71 // indirect github.com/dolthub/maphash v0.1.0 // indirect - github.com/dustin/go-humanize v1.0.1 // indirect + github.com/dustin/go-humanize v1.0.1 // @grafana/observability-traces-and-profiling github.com/edsrzf/mmap-go v1.2.0 // indirect github.com/elazarl/goproxy v1.7.2 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect diff --git a/go.sum b/go.sum index 6d4c78f21a2..26f60296de0 100644 --- a/go.sum +++ b/go.sum @@ -626,8 +626,8 @@ cloud.google.com/go/workflows v1.7.0/go.mod h1:JhSrZuVZWuiDfKEFxU0/F1PQjmpnpcoIS cloud.google.com/go/workflows v1.8.0/go.mod h1:ysGhmEajwZxGn1OhGOGKsTXc5PyxOc0vfKf5Af+to4M= cloud.google.com/go/workflows v1.9.0/go.mod h1:ZGkj1aFIOd9c8Gerkjjq7OW7I5+l6cSvT3ujaO/WwSA= cloud.google.com/go/workflows v1.10.0/go.mod h1:fZ8LmRmZQWacon9UCX1r/g/DfAXx5VcPALq2CxzdePw= -connectrpc.com/connect v1.17.0 h1:W0ZqMhtVzn9Zhn2yATuUokDLO5N+gIuBWMOnsQrfmZk= -connectrpc.com/connect v1.17.0/go.mod h1:0292hj1rnx8oFrStN7cB4jjVBeqs+Yx5yDIC2prWDO8= +connectrpc.com/connect v1.18.1 h1:PAg7CjSAGvscaf6YZKUefjoih5Z/qYkyaTrBW8xvYPw= +connectrpc.com/connect v1.18.1/go.mod h1:0292hj1rnx8oFrStN7cB4jjVBeqs+Yx5yDIC2prWDO8= contrib.go.opencensus.io/exporter/ocagent v0.6.0/go.mod h1:zmKjrJcdo0aYcVS7bmEeSEBLPA9YJp5bjrofdU3pIXs= cuelabs.dev/go/oci/ociregistry v0.0.0-20240906074133-82eb438dd565 h1:R5wwEcbEZSBmeyg91MJZTxfd7WpBo2jPof3AYjRbxwY= cuelabs.dev/go/oci/ociregistry v0.0.0-20240906074133-82eb438dd565/go.mod h1:5A4xfTzHTXfeVJBU6RAUf+QrlfTCW+017q/QiW+sMLg= @@ -1629,8 +1629,8 @@ github.com/grafana/prometheus-alertmanager v0.25.1-0.20250417181314-6d0f5436a1fb github.com/grafana/prometheus-alertmanager v0.25.1-0.20250417181314-6d0f5436a1fb/go.mod h1:FGdGvhI40Dq+CTQaSzK9evuve774cgOUdGfVO04OXkw= github.com/grafana/pyroscope-go/godeltaprof v0.1.8 h1:iwOtYXeeVSAeYefJNaxDytgjKtUuKQbJqgAIjlnicKg= github.com/grafana/pyroscope-go/godeltaprof v0.1.8/go.mod h1:2+l7K7twW49Ct4wFluZD3tZ6e0SjanjcUUBPVD/UuGU= -github.com/grafana/pyroscope/api v1.0.0 h1:RWK3kpv8EAnB7JpOqnf//xwE84DdKF03N/iFxpFAoHY= -github.com/grafana/pyroscope/api v1.0.0/go.mod h1:CUrgOgSZDnx4M1mlRoxhrVKkTuKIse9p4FtuPbrGA04= +github.com/grafana/pyroscope/api v1.2.1-0.20250415190842-3ff7247547ae h1:35W3Wjp9KWnSoV/DuymmyIj5aHE0CYlDQ5m2KeXUPAc= +github.com/grafana/pyroscope/api v1.2.1-0.20250415190842-3ff7247547ae/go.mod h1:6CJ1uXmLZ13ufpO9xE4pST+DyaBt0uszzrV0YnoaVLQ= github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc h1:GN2Lv3MGO7AS6PrRoT6yV5+wkrOpcszoIsO4+4ds248= github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc/go.mod h1:+JKpmjMGhpgPL+rXZ5nsZieVzvarn86asRlBg4uNGnk= github.com/grafana/saml v0.4.15-0.20240917091248-ae3bbdad8a56 h1:SDGrP81Vcd102L3UJEryRd1eestRw73wt+b8vnVEFe0= diff --git a/packages/grafana-schema/src/raw/composable/grafanapyroscope/dataquery/x/GrafanaPyroscopeDataQuery_types.gen.ts b/packages/grafana-schema/src/raw/composable/grafanapyroscope/dataquery/x/GrafanaPyroscopeDataQuery_types.gen.ts index 756f2c575cc..f323c53b037 100644 --- a/packages/grafana-schema/src/raw/composable/grafanapyroscope/dataquery/x/GrafanaPyroscopeDataQuery_types.gen.ts +++ b/packages/grafana-schema/src/raw/composable/grafanapyroscope/dataquery/x/GrafanaPyroscopeDataQuery_types.gen.ts @@ -17,6 +17,10 @@ export type PyroscopeQueryType = ('metrics' | 'profile' | 'both'); export const defaultPyroscopeQueryType: PyroscopeQueryType = 'both'; export interface GrafanaPyroscopeDataQuery extends common.DataQuery { + /** + * If set to true, the response will contain annotations + */ + annotations?: boolean; /** * Allows to group the results. */ diff --git a/pkg/tsdb/grafana-pyroscope-datasource/annotations.go b/pkg/tsdb/grafana-pyroscope-datasource/annotations.go new file mode 100644 index 00000000000..aac2778895d --- /dev/null +++ b/pkg/tsdb/grafana-pyroscope-datasource/annotations.go @@ -0,0 +1,133 @@ +package pyroscope + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/dustin/go-humanize" + "github.com/grafana/grafana-plugin-sdk-go/data" +) + +// profileAnnotationKey represents the key for different types of annotations +type profileAnnotationKey string + +const ( + // profileAnnotationKeyThrottled is the key for throttling annotations + profileAnnotationKeyThrottled profileAnnotationKey = "pyroscope.ingest.throttled" +) + +// ProfileAnnotation represents the parsed annotation data +type ProfileAnnotation struct { + Body ProfileThrottledAnnotation `json:"body"` +} + +// ProfileThrottledAnnotation contains throttling information +type ProfileThrottledAnnotation struct { + PeriodType string `json:"periodType"` + PeriodLimitMb float64 `json:"periodLimitMb"` + LimitResetTime int64 `json:"limitResetTime"` + SamplingPeriodSec float64 `json:"samplingPeriodSec"` + SamplingRequests int64 `json:"samplingRequests"` + UsageGroup string `json:"usageGroup"` +} + +// processedProfileAnnotation represents a processed annotation ready for display +type processedProfileAnnotation struct { + text string + time int64 + timeEnd int64 + isRegion bool + duplicateTracker int64 +} + +// grafanaAnnotationData holds slices of processed annotation data +type grafanaAnnotationData struct { + times []time.Time + timeEnds []time.Time + texts []string + isRegions []bool +} + +// convertAnnotation converts a Pyroscope profile annotation into a Grafana annotation +func convertAnnotation(timedAnnotation *TimedAnnotation, duplicateTracker int64) (*processedProfileAnnotation, error) { + if timedAnnotation.getKey() != string(profileAnnotationKeyThrottled) { + // Currently we only support throttling annotations + return nil, nil + } + + var profileAnnotation ProfileAnnotation + err := json.Unmarshal([]byte(timedAnnotation.getValue()), &profileAnnotation) + if err != nil { + return nil, fmt.Errorf("error parsing annotation data: %w", err) + } + + throttlingInfo := profileAnnotation.Body + + if duplicateTracker == throttlingInfo.LimitResetTime { + return nil, nil + } + + limit := humanize.IBytes(uint64(throttlingInfo.PeriodLimitMb * 1024 * 1024)) + return &processedProfileAnnotation{ + text: fmt.Sprintf("Ingestion limit (%s/%s) reached", limit, throttlingInfo.PeriodType), + time: timedAnnotation.Timestamp, + timeEnd: throttlingInfo.LimitResetTime * 1000, + isRegion: throttlingInfo.LimitResetTime < time.Now().Unix(), + duplicateTracker: throttlingInfo.LimitResetTime, + }, nil +} + +// processAnnotations processes a slice of TimedAnnotation and returns grafanaAnnotationData +func processAnnotations(timedAnnotations []*TimedAnnotation) (*grafanaAnnotationData, error) { + result := &grafanaAnnotationData{ + times: []time.Time{}, + timeEnds: []time.Time{}, + texts: []string{}, + isRegions: []bool{}, + } + + var duplicateTracker int64 + + for _, timedAnnotation := range timedAnnotations { + if timedAnnotation == nil || timedAnnotation.Annotation == nil { + continue + } + processed, err := convertAnnotation(timedAnnotation, duplicateTracker) + if err != nil { + return nil, err + } + + if processed != nil { + result.times = append(result.times, time.UnixMilli(processed.time)) + result.timeEnds = append(result.timeEnds, time.UnixMilli(processed.timeEnd)) + result.isRegions = append(result.isRegions, processed.isRegion) + result.texts = append(result.texts, processed.text) + duplicateTracker = processed.duplicateTracker + } + } + + return result, nil +} + +// createAnnotationFrame creates a data frame for annotations +func createAnnotationFrame(annotations []*TimedAnnotation) (*data.Frame, error) { + annotationData, err := processAnnotations(annotations) + if err != nil { + return nil, err + } + + timeField := data.NewField("time", nil, annotationData.times) + timeEndField := data.NewField("timeEnd", nil, annotationData.timeEnds) + textField := data.NewField("text", nil, annotationData.texts) + isRegionField := data.NewField("isRegion", nil, annotationData.isRegions) + colorField := data.NewField("color", nil, make([]string, len(annotationData.times))) + + frame := data.NewFrame("annotations") + frame.Fields = data.Fields{timeField, timeEndField, textField, isRegionField, colorField} + frame.SetMeta(&data.FrameMeta{ + DataTopic: data.DataTopicAnnotations, + }) + + return frame, nil +} diff --git a/pkg/tsdb/grafana-pyroscope-datasource/annotations_test.go b/pkg/tsdb/grafana-pyroscope-datasource/annotations_test.go new file mode 100644 index 00000000000..3ca7d7433f1 --- /dev/null +++ b/pkg/tsdb/grafana-pyroscope-datasource/annotations_test.go @@ -0,0 +1,188 @@ +package pyroscope + +import ( + "testing" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/data" + typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" + "github.com/stretchr/testify/require" +) + +func TestConvertAnnotation(t *testing.T) { + rawAnnotation := `{"body":{"periodType":"day","periodLimitMb":1024,"limitResetTime":1609459200}}` + + t.Run("processes valid annotation", func(t *testing.T) { + timedAnnotation := &TimedAnnotation{ + Timestamp: 1609455600000, + Annotation: &typesv1.ProfileAnnotation{ + Key: string(profileAnnotationKeyThrottled), + Value: rawAnnotation, + }, + } + + processed, err := convertAnnotation(timedAnnotation, 0) + require.NoError(t, err) + require.NotNil(t, processed) + require.Contains(t, processed.text, "Ingestion limit (1.0 GiB/day) reached") + require.Contains(t, processed.text, "day") + require.Equal(t, int64(1609455600000), processed.time) + require.Equal(t, int64(1609459200000), processed.timeEnd) // LimitResetTime * 1000 + require.Equal(t, int64(1609459200), processed.duplicateTracker) + }) + + t.Run("ignores non-throttling annotations", func(t *testing.T) { + timedAnnotation := &TimedAnnotation{ + Timestamp: 1000, + Annotation: &typesv1.ProfileAnnotation{ + Key: "some.other.key", + Value: `{"test":"value"}`, + }, + } + + processed, err := convertAnnotation(timedAnnotation, 0) + require.NoError(t, err) + require.Nil(t, processed) + }) + + t.Run("handles invalid annotation data", func(t *testing.T) { + timedAnnotation := &TimedAnnotation{ + Timestamp: 1000, + Annotation: &typesv1.ProfileAnnotation{ + Key: string(profileAnnotationKeyThrottled), + Value: `invalid json`, + }, + } + + processed, err := convertAnnotation(timedAnnotation, 0) + require.Error(t, err) + require.Nil(t, processed) + require.Contains(t, err.Error(), "error parsing annotation data") + }) + + t.Run("skips duplicate annotations", func(t *testing.T) { + timedAnnotation := &TimedAnnotation{ + Timestamp: 1000, + Annotation: &typesv1.ProfileAnnotation{ + Key: string(profileAnnotationKeyThrottled), + Value: rawAnnotation, + }, + } + + // First call should process the annotation + processed1, err := convertAnnotation(timedAnnotation, 0) + require.NoError(t, err) + require.NotNil(t, processed1) + + // Second call with the same duplicateTracker should skip + processed2, err := convertAnnotation(timedAnnotation, processed1.duplicateTracker) + require.NoError(t, err) + require.Nil(t, processed2) + }) +} + +func TestProcessAnnotations(t *testing.T) { + rawAnnotation := `{"body":{"periodType":"day","periodLimitMb":1024,"limitResetTime":1609459200}}` + + t.Run("processes multiple annotations", func(t *testing.T) { + annotations := []*TimedAnnotation{ + { + Timestamp: 1609455600000, + Annotation: &typesv1.ProfileAnnotation{ + Key: string(profileAnnotationKeyThrottled), + Value: rawAnnotation, + }, + }, + { + Timestamp: 1609459200000, + Annotation: &typesv1.ProfileAnnotation{ + Key: string(profileAnnotationKeyThrottled), + Value: rawAnnotation, + }, + }, + } + + result, err := processAnnotations(annotations) + require.NoError(t, err) + require.Equal(t, 1, len(result.times)) + require.Equal(t, 1, len(result.timeEnds)) + require.Equal(t, 1, len(result.texts)) + require.Equal(t, 1, len(result.isRegions)) + }) + + t.Run("handles empty annotations list", func(t *testing.T) { + result, err := processAnnotations([]*TimedAnnotation{}) + require.NoError(t, err) + require.Equal(t, 0, len(result.times)) + require.Equal(t, 0, len(result.timeEnds)) + require.Equal(t, 0, len(result.texts)) + require.Equal(t, 0, len(result.isRegions)) + }) + + t.Run("handles nil annotations", func(t *testing.T) { + annotations := []*TimedAnnotation{nil} + result, err := processAnnotations(annotations) + require.NoError(t, err) + require.Equal(t, 0, len(result.times)) + }) + + t.Run("handles invalid annotation data", func(t *testing.T) { + annotations := []*TimedAnnotation{ + { + Timestamp: 1000, + Annotation: &typesv1.ProfileAnnotation{ + Key: string(profileAnnotationKeyThrottled), + Value: `invalid json`, + }, + }, + } + + result, err := processAnnotations(annotations) + require.Error(t, err) + require.Nil(t, result) + require.Contains(t, err.Error(), "error parsing annotation data") + }) +} + +func TestCreateAnnotationFrame(t *testing.T) { + rawAnnotation := `{"body":{"periodType":"day","periodLimitMb":1024,"limitResetTime":1609459200}}` + + t.Run("creates frame with correct fields", func(t *testing.T) { + annotations := []*TimedAnnotation{ + { + Timestamp: 1609455600000, + Annotation: &typesv1.ProfileAnnotation{ + Key: string(profileAnnotationKeyThrottled), + Value: rawAnnotation, + }, + }, + } + + frame, err := createAnnotationFrame(annotations) + require.NoError(t, err) + require.NotNil(t, frame) + + require.Equal(t, "annotations", frame.Name) + require.Equal(t, data.DataTopicAnnotations, frame.Meta.DataTopic) + + require.Equal(t, 5, len(frame.Fields)) + require.Equal(t, "time", frame.Fields[0].Name) + require.Equal(t, "timeEnd", frame.Fields[1].Name) + require.Equal(t, "text", frame.Fields[2].Name) + require.Equal(t, "isRegion", frame.Fields[3].Name) + require.Equal(t, "color", frame.Fields[4].Name) + + require.Equal(t, 1, frame.Fields[0].Len()) + require.Equal(t, time.UnixMilli(1609455600000), frame.Fields[0].At(0)) + require.Equal(t, time.UnixMilli(1609459200000), frame.Fields[1].At(0)) + require.Contains(t, frame.Fields[2].At(0).(string), "Ingestion limit") + }) + + t.Run("handles empty annotations list", func(t *testing.T) { + frame, err := createAnnotationFrame([]*TimedAnnotation{}) + require.NoError(t, err) + require.NotNil(t, frame) + require.Equal(t, 5, len(frame.Fields)) + require.Equal(t, 0, frame.Fields[0].Len()) + }) +} diff --git a/pkg/tsdb/grafana-pyroscope-datasource/kinds/dataquery/types_dataquery_gen.go b/pkg/tsdb/grafana-pyroscope-datasource/kinds/dataquery/types_dataquery_gen.go index d52e0124db5..29f27facdd8 100644 --- a/pkg/tsdb/grafana-pyroscope-datasource/kinds/dataquery/types_dataquery_gen.go +++ b/pkg/tsdb/grafana-pyroscope-datasource/kinds/dataquery/types_dataquery_gen.go @@ -41,6 +41,8 @@ type GrafanaPyroscopeDataQuery struct { // Specify the query flavor // TODO make this required and give it a default QueryType *string `json:"queryType,omitempty"` + // If set to true, the response will contain annotations + Annotations *bool `json:"annotations,omitempty"` // For mixed data sources the selected datasource is on the query level. // For non mixed scenarios this is undefined. // TODO find a better way to do this ^ that's friendly to schema diff --git a/pkg/tsdb/grafana-pyroscope-datasource/pyroscopeClient.go b/pkg/tsdb/grafana-pyroscope-datasource/pyroscopeClient.go index 74195ef7919..e742af31d30 100644 --- a/pkg/tsdb/grafana-pyroscope-datasource/pyroscopeClient.go +++ b/pkg/tsdb/grafana-pyroscope-datasource/pyroscopeClient.go @@ -46,7 +46,8 @@ type LabelPair struct { type Point struct { Value float64 // Milliseconds unix timestamp - Timestamp int64 + Timestamp int64 + Annotations []*typesv1.ProfileAnnotation } type ProfileResponse struct { @@ -133,8 +134,9 @@ func (c *PyroscopeClient) GetSeries(ctx context.Context, profileTypeID string, l points := make([]*Point, len(s.Points)) for i, p := range s.Points { points[i] = &Point{ - Value: p.Value, - Timestamp: p.Timestamp, + Value: p.Value, + Timestamp: p.Timestamp, + Annotations: p.Annotations, } } diff --git a/pkg/tsdb/grafana-pyroscope-datasource/query.go b/pkg/tsdb/grafana-pyroscope-datasource/query.go index f69dc1633be..c2243c7af05 100644 --- a/pkg/tsdb/grafana-pyroscope-datasource/query.go +++ b/pkg/tsdb/grafana-pyroscope-datasource/query.go @@ -14,6 +14,7 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana-plugin-sdk-go/live" "github.com/grafana/grafana/pkg/tsdb/grafana-pyroscope-datasource/kinds/dataquery" + typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" "github.com/xlab/treeprint" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" @@ -94,7 +95,15 @@ func (d *PyroscopeDatasource) query(ctx context.Context, pCtx backend.PluginCont } // add the frames to the response. responseMutex.Lock() - response.Frames = append(response.Frames, seriesToDataFrames(seriesResp)...) + withAnnotations := qm.Annotations != nil && *qm.Annotations + frames, err := seriesToDataFrames(seriesResp, withAnnotations) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + logger.Error("Querying SelectSeries()", "err", err, "function", logEntrypoint()) + return err + } + response.Frames = append(response.Frames, frames...) responseMutex.Unlock() return nil }) @@ -411,8 +420,22 @@ func walkTree(tree *ProfileTree, fn func(tree *ProfileTree)) { } } -func seriesToDataFrames(resp *SeriesResponse) []*data.Frame { +type TimedAnnotation struct { + Timestamp int64 `json:"timestamp"` + Annotation *typesv1.ProfileAnnotation `json:"annotation"` +} + +func (ta *TimedAnnotation) getKey() string { + return ta.Annotation.Key +} + +func (ta *TimedAnnotation) getValue() string { + return ta.Annotation.Value +} + +func seriesToDataFrames(resp *SeriesResponse, withAnnotations bool) ([]*data.Frame, error) { frames := make([]*data.Frame, 0, len(resp.Series)) + annotations := make([]*TimedAnnotation, 0) for _, series := range resp.Series { // We create separate data frames as the series may not have the same length @@ -430,15 +453,32 @@ func seriesToDataFrames(resp *SeriesResponse) []*data.Frame { valueField := data.NewField(resp.Label, labels, []float64{}) valueField.Config = &data.FieldConfig{Unit: resp.Units} + fields = append(fields, valueField) for _, point := range series.Points { timeField.Append(time.UnixMilli(point.Timestamp)) valueField.Append(point.Value) + if withAnnotations { + for _, a := range point.Annotations { + annotations = append(annotations, &TimedAnnotation{ + Timestamp: point.Timestamp, + Annotation: a, + }) + } + } } - fields = append(fields, valueField) frame.Fields = fields frames = append(frames, frame) } - return frames + + if len(annotations) > 0 { + frame, err := createAnnotationFrame(annotations) + if err != nil { + return nil, err + } + frames = append(frames, frame) + } + + return frames, nil } diff --git a/pkg/tsdb/grafana-pyroscope-datasource/query_test.go b/pkg/tsdb/grafana-pyroscope-datasource/query_test.go index 9cf8126dfeb..def783b917f 100644 --- a/pkg/tsdb/grafana-pyroscope-datasource/query_test.go +++ b/pkg/tsdb/grafana-pyroscope-datasource/query_test.go @@ -7,6 +7,7 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" + typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" "github.com/stretchr/testify/require" ) @@ -226,6 +227,145 @@ func Test_treeToNestedDataFrame(t *testing.T) { }) } +func Test_seriesToDataFrameAnnotations(t *testing.T) { + t.Run("annotations field is not added when no annotations are present", func(t *testing.T) { + series := &SeriesResponse{ + Series: []*Series{ + { + Labels: []*LabelPair{}, + Points: []*Point{ + { + Timestamp: int64(1000), + Value: 30, + }, + { + Timestamp: int64(2000), + Value: 20, + }, + { + Timestamp: int64(3000), + Value: 10, + }, + }, + }, + }, + Units: "short", + Label: "samples", + } + + frames, err := seriesToDataFrames(series, true) + require.NoError(t, err) + require.Equal(t, 1, len(frames)) + require.Equal(t, 2, len(frames[0].Fields)) + }) + + t.Run("annotations frame can be skipped", func(t *testing.T) { + rawAnnotation := `{"body":{"periodType":"day","periodLimitMb":1024,"limitResetTime":1609459200}}` + + series := &SeriesResponse{ + Series: []*Series{ + { + Points: []*Point{ + { + Timestamp: int64(1609455600000), + Value: 30, + Annotations: []*typesv1.ProfileAnnotation{ + {Key: string(profileAnnotationKeyThrottled), Value: rawAnnotation}, + }, + }, + }, + }, + }, + } + + frames, err := seriesToDataFrames(series, false) + require.NoError(t, err) + require.Equal(t, 1, len(frames)) + }) + + t.Run("throttling annotations are correctly processed", func(t *testing.T) { + rawAnnotation := `{"body":{"periodType":"day","periodLimitMb":1024,"limitResetTime":1609459200}}` + + series := &SeriesResponse{ + Series: []*Series{ + { + Points: []*Point{ + { + Timestamp: int64(1609455600000), + Value: 30, + Annotations: []*typesv1.ProfileAnnotation{ + {Key: string(profileAnnotationKeyThrottled), Value: rawAnnotation}, + }, + }, + }, + }, + }, + } + + frames, err := seriesToDataFrames(series, true) + require.NoError(t, err) + require.Equal(t, 2, len(frames)) + + annotationsFrame := frames[1] + require.Equal(t, "annotations", annotationsFrame.Name) + require.Equal(t, data.DataTopicAnnotations, annotationsFrame.Meta.DataTopic) + + require.Equal(t, 5, len(annotationsFrame.Fields)) + require.Equal(t, "time", annotationsFrame.Fields[0].Name) + require.Equal(t, "timeEnd", annotationsFrame.Fields[1].Name) + require.Equal(t, "text", annotationsFrame.Fields[2].Name) + require.Equal(t, "isRegion", annotationsFrame.Fields[3].Name) + require.Equal(t, "color", annotationsFrame.Fields[4].Name) + + require.Equal(t, 1, annotationsFrame.Fields[0].Len()) + require.Equal(t, time.UnixMilli(1609455600000), annotationsFrame.Fields[0].At(0)) + require.Equal(t, time.UnixMilli(1609459200000), annotationsFrame.Fields[1].At(0)) + require.Contains(t, annotationsFrame.Fields[2].At(0).(string), "Ingestion limit") + }) + + t.Run("non-throttling annotations are ignored", func(t *testing.T) { + series := &SeriesResponse{ + Series: []*Series{ + { + Points: []*Point{ + { + Timestamp: int64(1000), + Value: 30, + Annotations: []*typesv1.ProfileAnnotation{ + {Key: "key1", Value: "value1"}, + {Key: "key2", Value: "value2"}, + }, + }, + { + Timestamp: int64(2000), + Value: 20, + Annotations: []*typesv1.ProfileAnnotation{ + {Key: "key3", Value: "value3"}, + }, + }, + }, + }, + }, + } + + frames, err := seriesToDataFrames(series, true) + require.NoError(t, err) + require.Equal(t, 2, len(frames)) + + annotationsFrame := frames[1] + require.Equal(t, "annotations", annotationsFrame.Name) + require.Equal(t, data.DataTopicAnnotations, annotationsFrame.Meta.DataTopic) + + require.Equal(t, 5, len(annotationsFrame.Fields)) + + require.Equal(t, 0, annotationsFrame.Fields[0].Len()) + require.Equal(t, 0, annotationsFrame.Fields[1].Len()) + require.Equal(t, 0, annotationsFrame.Fields[2].Len()) + require.Equal(t, 0, annotationsFrame.Fields[3].Len()) + require.Equal(t, 0, annotationsFrame.Fields[4].Len()) + }) +} + func Test_seriesToDataFrame(t *testing.T) { t.Run("single series", func(t *testing.T) { series := &SeriesResponse{ @@ -235,7 +375,8 @@ func Test_seriesToDataFrame(t *testing.T) { Units: "short", Label: "samples", } - frames := seriesToDataFrames(series) + frames, err := seriesToDataFrames(series, true) + require.NoError(t, err) require.Equal(t, 2, len(frames[0].Fields)) require.Equal(t, data.NewField("time", nil, []time.Time{time.UnixMilli(1000), time.UnixMilli(2000)}), frames[0].Fields[0]) require.Equal(t, data.NewField("samples", map[string]string{}, []float64{30, 10}).SetConfig(&data.FieldConfig{Unit: "short"}), frames[0].Fields[1]) @@ -249,7 +390,8 @@ func Test_seriesToDataFrame(t *testing.T) { Label: "samples", } - frames = seriesToDataFrames(series) + frames, err = seriesToDataFrames(series, true) + require.NoError(t, err) require.Equal(t, data.NewField("samples", map[string]string{"app": "bar"}, []float64{30, 10}).SetConfig(&data.FieldConfig{Unit: "short"}), frames[0].Fields[1]) }) @@ -262,7 +404,8 @@ func Test_seriesToDataFrame(t *testing.T) { Units: "short", Label: "samples", } - frames := seriesToDataFrames(resp) + frames, err := seriesToDataFrames(resp, true) + require.NoError(t, err) require.Equal(t, 2, len(frames)) require.Equal(t, 2, len(frames[0].Fields)) require.Equal(t, 2, len(frames[1].Fields)) diff --git a/public/app/plugins/datasource/grafana-pyroscope-datasource/dataquery.cue b/public/app/plugins/datasource/grafana-pyroscope-datasource/dataquery.cue index 67a62cc71e4..8e277aa6265 100644 --- a/public/app/plugins/datasource/grafana-pyroscope-datasource/dataquery.cue +++ b/public/app/plugins/datasource/grafana-pyroscope-datasource/dataquery.cue @@ -42,6 +42,8 @@ composableKinds: DataQuery: { // Sets the maximum number of nodes in the flamegraph. maxNodes?: int64 #PyroscopeQueryType: "metrics" | "profile" | *"both" @cuetsy(kind="type") + // If set to true, the response will contain annotations + annotations?: bool } }] lenses: [] diff --git a/public/app/plugins/datasource/grafana-pyroscope-datasource/dataquery.gen.ts b/public/app/plugins/datasource/grafana-pyroscope-datasource/dataquery.gen.ts index 2995e242acb..8513d2f7d8e 100644 --- a/public/app/plugins/datasource/grafana-pyroscope-datasource/dataquery.gen.ts +++ b/public/app/plugins/datasource/grafana-pyroscope-datasource/dataquery.gen.ts @@ -15,6 +15,10 @@ export type PyroscopeQueryType = ('metrics' | 'profile' | 'both'); export const defaultPyroscopeQueryType: PyroscopeQueryType = 'both'; export interface GrafanaPyroscopeDataQuery extends common.DataQuery { + /** + * If set to true, the response will contain annotations + */ + annotations?: boolean; /** * Allows to group the results. */