diff --git a/go.mod b/go.mod index 2a7ed6f3dbd..97af42462d6 100644 --- a/go.mod +++ b/go.mod @@ -102,6 +102,10 @@ require ( github.com/yudai/gojsondiff v1.0.0 go.opentelemetry.io/collector v0.31.0 go.opentelemetry.io/collector/model v0.31.0 + go.opentelemetry.io/otel v1.0.0 + go.opentelemetry.io/otel/exporters/jaeger v1.0.0 + go.opentelemetry.io/otel/sdk v1.0.0 + go.opentelemetry.io/otel/trace v1.0.0 golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e golang.org/x/exp v0.0.0-20210220032938-85be41e4509f // indirect golang.org/x/net v0.0.0-20210903162142-ad29c8ab022f diff --git a/go.sum b/go.sum index 53b47528756..138498a333a 100644 --- a/go.sum +++ b/go.sum @@ -2380,11 +2380,19 @@ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.21.0/go.mod h1: go.opentelemetry.io/contrib/zpages v0.0.0-20210722161726-7668016acb73/go.mod h1:NAkejuYm41lpyL43Fu1XdnCOYxN5NVV80/MJ03JQ/X8= go.opentelemetry.io/otel v0.11.0/go.mod h1:G8UCk+KooF2HLkgo8RHX9epABH/aRGYET7gQOqBVdB0= go.opentelemetry.io/otel v1.0.0-RC1/go.mod h1:x9tRa9HK4hSSq7jf2TKbqFbtt58/TGk0f9XiEYISI1I= +go.opentelemetry.io/otel v1.0.0 h1:qTTn6x71GVBvoafHK/yaRUmFzI4LcONZD0/kXxl5PHI= +go.opentelemetry.io/otel v1.0.0/go.mod h1:AjRVh9A5/5DE7S+mZtTR6t8vpKKryam+0lREnfmS4cg= +go.opentelemetry.io/otel/exporters/jaeger v1.0.0 h1:cLhx8llHw02h5JTqGqaRbYn+QVKHmrzD9vEbKnSPk5U= +go.opentelemetry.io/otel/exporters/jaeger v1.0.0/go.mod h1:q10N1AolE1JjqKrFJK2tYw0iZpmX+HBaXBtuCzRnBGQ= go.opentelemetry.io/otel/internal/metric v0.21.0/go.mod h1:iOfAaY2YycsXfYD4kaRSbLx2LKmfpKObWBEv9QK5zFo= go.opentelemetry.io/otel/metric v0.21.0/go.mod h1:JWCt1bjivC4iCrz/aCrM1GSw+ZcvY44KCbaeeRhzHnc= go.opentelemetry.io/otel/oteltest v1.0.0-RC1/go.mod h1:+eoIG0gdEOaPNftuy1YScLr1Gb4mL/9lpDkZ0JjMRq4= go.opentelemetry.io/otel/sdk v1.0.0-RC1/go.mod h1:kj6yPn7Pgt5ByRuwesbaWcRLA+V7BSDg3Hf8xRvsvf8= +go.opentelemetry.io/otel/sdk v1.0.0 h1:BNPMYUONPNbLneMttKSjQhOTlFLOD9U22HNG1KrIN2Y= +go.opentelemetry.io/otel/sdk v1.0.0/go.mod h1:PCrDHlSy5x1kjezSdL37PhbFUMjrsLRshJ2zCzeXwbM= go.opentelemetry.io/otel/trace v1.0.0-RC1/go.mod h1:86UHmyHWFEtWjfWPSbu0+d0Pf9Q6e1U+3ViBOc+NXAg= +go.opentelemetry.io/otel/trace v1.0.0 h1:TSBr8GTEtKevYMG/2d21M989r5WJYVimhTHBKVEZuh4= +go.opentelemetry.io/otel/trace v1.0.0/go.mod h1:PXTWqayeFUlJV1YDNhsJYB184+IvAH814St6o6ajzIs= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.starlark.net v0.0.0-20200901195727-6e684ef5eeee/go.mod h1:f0znQkUKRrkk36XxWbGjMqQM8wGv/xHBVE2qc3B5oFU= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= @@ -2768,6 +2776,7 @@ golang.org/x/sys v0.0.0-20210324051608-47abb6519492/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/pkg/services/live/live.go b/pkg/services/live/live.go index aff5ea83753..ad8b2241eb0 100644 --- a/pkg/services/live/live.go +++ b/pkg/services/live/live.go @@ -192,6 +192,22 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r } } channelRuleGetter := pipeline.NewCacheSegmentedTree(builder) + + // Pre-build/validate channel rules for all organizations on start. + // This can be unreasonable to have in production scenario with many + // organizations. + query := &models.SearchOrgsQuery{} + err := sqlstore.SearchOrgs(query) + if err != nil { + return nil, fmt.Errorf("can't get org list: %w", err) + } + for _, org := range query.Result { + _, _, err := channelRuleGetter.Get(org.Id, "") + if err != nil { + return nil, fmt.Errorf("error building channel rules for org %d: %w", org.Id, err) + } + } + g.Pipeline, err = pipeline.New(channelRuleGetter) if err != nil { return nil, err @@ -537,28 +553,47 @@ func (g *GrafanaLive) handleOnSubscribe(client *centrifuge.Client, e centrifuge. var reply models.SubscribeReply var status backend.SubscribeStreamStatus + var ruleFound bool - var subscribeRuleFound bool if g.Pipeline != nil { rule, ok, err := g.Pipeline.Get(user.OrgId, channel) if err != nil { logger.Error("Error getting channel rule", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err) return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal } - if ok && rule.Subscriber != nil { - subscribeRuleFound = true - var err error - reply, status, err = rule.Subscriber.Subscribe(client.Context(), pipeline.Vars{ - OrgID: orgID, - Channel: channel, - }) - if err != nil { - logger.Error("Error channel rule subscribe", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err) - return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal + ruleFound = ok + if ok { + if rule.SubscribeAuth != nil { + ok, err := rule.SubscribeAuth.CanSubscribe(client.Context(), user) + if err != nil { + logger.Error("Error checking subscribe permissions", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err) + return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal + } + if !ok { + // using HTTP error codes for WS errors too. + code, text := subscribeStatusToHTTPError(backend.SubscribeStreamStatusPermissionDenied) + return centrifuge.SubscribeReply{}, ¢rifuge.Error{Code: uint32(code), Message: text} + } + } + if len(rule.Subscribers) > 0 { + var err error + for _, sub := range rule.Subscribers { + reply, status, err = sub.Subscribe(client.Context(), pipeline.Vars{ + OrgID: orgID, + Channel: channel, + }) + if err != nil { + logger.Error("Error channel rule subscribe", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err) + return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal + } + if status != backend.SubscribeStreamStatusOK { + break + } + } } } } - if !subscribeRuleFound { + if !ruleFound { handler, addr, err := g.GetChannelHandler(user, channel) if err != nil { if errors.Is(err, live.ErrInvalidChannelID) { @@ -615,6 +650,42 @@ func (g *GrafanaLive) handleOnPublish(client *centrifuge.Client, e centrifuge.Pu return centrifuge.PublishReply{}, centrifuge.ErrorPermissionDenied } + if g.Pipeline != nil { + rule, ok, err := g.Pipeline.Get(user.OrgId, channel) + if err != nil { + logger.Error("Error getting channel rule", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err) + return centrifuge.PublishReply{}, centrifuge.ErrorInternal + } + if ok { + if rule.PublishAuth != nil { + ok, err := rule.PublishAuth.CanPublish(client.Context(), user) + if err != nil { + logger.Error("Error checking publish permissions", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err) + return centrifuge.PublishReply{}, centrifuge.ErrorInternal + } + if !ok { + // using HTTP error codes for WS errors too. + code, text := publishStatusToHTTPError(backend.PublishStreamStatusPermissionDenied) + return centrifuge.PublishReply{}, ¢rifuge.Error{Code: uint32(code), Message: text} + } + } else { + if !user.HasRole(models.ROLE_ADMIN) { + // using HTTP error codes for WS errors too. + code, text := publishStatusToHTTPError(backend.PublishStreamStatusPermissionDenied) + return centrifuge.PublishReply{}, ¢rifuge.Error{Code: uint32(code), Message: text} + } + } + _, err := g.Pipeline.ProcessInput(client.Context(), user.OrgId, channel, e.Data) + if err != nil { + logger.Error("Error processing input", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err) + return centrifuge.PublishReply{}, centrifuge.ErrorInternal + } + return centrifuge.PublishReply{ + Result: ¢rifuge.PublishResult{}, + }, nil + } + } + handler, addr, err := g.GetChannelHandler(user, channel) if err != nil { if errors.Is(err, live.ErrInvalidChannelID) { @@ -633,6 +704,7 @@ func (g *GrafanaLive) handleOnPublish(client *centrifuge.Client, e centrifuge.Pu logger.Error("Error calling channel handler publish", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err) return centrifuge.PublishReply{}, centrifuge.ErrorInternal } + if status != backend.PublishStreamStatusOK { // using HTTP error codes for WS errors too. code, text := publishStatusToHTTPError(status) @@ -812,6 +884,38 @@ func (g *GrafanaLive) HandleHTTPPublish(ctx *models.ReqContext, cmd dtos.LivePub } logger.Debug("Publish API cmd", "user", ctx.SignedInUser.UserId, "channel", cmd.Channel) + user := ctx.SignedInUser + channel := cmd.Channel + + if g.Pipeline != nil { + rule, ok, err := g.Pipeline.Get(user.OrgId, channel) + if err != nil { + logger.Error("Error getting channel rule", "user", user, "channel", channel, "error", err) + return response.Error(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), nil) + } + if ok { + if rule.PublishAuth != nil { + ok, err := rule.PublishAuth.CanPublish(ctx.Req.Context(), user) + if err != nil { + logger.Error("Error checking publish permissions", "user", user, "channel", channel, "error", err) + return response.Error(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), nil) + } + if !ok { + return response.Error(http.StatusForbidden, http.StatusText(http.StatusForbidden), nil) + } + } else { + if !user.HasRole(models.ROLE_ADMIN) { + return response.Error(http.StatusForbidden, http.StatusText(http.StatusForbidden), nil) + } + } + _, err := g.Pipeline.ProcessInput(ctx.Req.Context(), user.OrgId, channel, cmd.Data) + if err != nil { + logger.Error("Error processing input", "user", user, "channel", channel, "error", err) + return response.Error(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), nil) + } + return response.JSON(http.StatusOK, dtos.LivePublishResponse{}) + } + } channelHandler, addr, err := g.GetChannelHandler(ctx.SignedInUser, cmd.Channel) if err != nil { @@ -824,6 +928,7 @@ func (g *GrafanaLive) HandleHTTPPublish(ctx *models.ReqContext, cmd dtos.LivePub logger.Error("Error calling OnPublish", "error", err, "channel", cmd.Channel) return response.Error(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), nil) } + if status != backend.PublishStreamStatusOK { code, text := publishStatusToHTTPError(status) return response.Error(code, text, nil) @@ -952,13 +1057,13 @@ func (g *GrafanaLive) HandlePipelineConvertTestHTTP(c *models.ReqContext) respon if !ok { return response.Error(http.StatusNotFound, "No rule found", nil) } - channelFrames, ok, err := pipe.DataToChannelFrames(c.Req.Context(), *rule, c.OrgId, req.Channel, []byte(req.Data)) + if rule.Converter == nil { + return response.Error(http.StatusNotFound, "No converter found", nil) + } + channelFrames, err := pipe.DataToChannelFrames(c.Req.Context(), *rule, c.OrgId, req.Channel, []byte(req.Data)) if err != nil { return response.Error(http.StatusInternalServerError, "Error converting data", err) } - if !ok { - return response.Error(http.StatusNotFound, "No converter found", nil) - } return response.JSON(http.StatusOK, ConvertDryRunResponse{ ChannelFrames: channelFrames, }) @@ -1031,10 +1136,11 @@ func (g *GrafanaLive) HandleChannelRulesDeleteHTTP(c *models.ReqContext) respons // HandlePipelineEntitiesListHTTP ... func (g *GrafanaLive) HandlePipelineEntitiesListHTTP(_ *models.ReqContext) response.Response { return response.JSON(http.StatusOK, util.DynMap{ - "subscribers": pipeline.SubscribersRegistry, - "outputs": pipeline.OutputsRegistry, - "converters": pipeline.ConvertersRegistry, - "processors": pipeline.ProcessorsRegistry, + "subscribers": pipeline.SubscribersRegistry, + "dataOutputs": pipeline.DataOutputsRegistry, + "converters": pipeline.ConvertersRegistry, + "frameProcessors": pipeline.FrameProcessorsRegistry, + "frameOutputs": pipeline.FrameOutputsRegistry, }) } diff --git a/pkg/services/live/pipeline/auth.go b/pkg/services/live/pipeline/auth.go new file mode 100644 index 00000000000..ae949c0b109 --- /dev/null +++ b/pkg/services/live/pipeline/auth.go @@ -0,0 +1,23 @@ +package pipeline + +import ( + "context" + + "github.com/grafana/grafana/pkg/models" +) + +type RoleCheckAuthorizer struct { + role models.RoleType +} + +func NewRoleCheckAuthorizer(role models.RoleType) *RoleCheckAuthorizer { + return &RoleCheckAuthorizer{role: role} +} + +func (s *RoleCheckAuthorizer) CanSubscribe(_ context.Context, u *models.SignedInUser) (bool, error) { + return u.HasRole(s.role), nil +} + +func (s *RoleCheckAuthorizer) CanPublish(_ context.Context, u *models.SignedInUser) (bool, error) { + return u.HasRole(s.role), nil +} diff --git a/pkg/services/live/pipeline/compare.go b/pkg/services/live/pipeline/compare.go new file mode 100644 index 00000000000..d61559f1ec6 --- /dev/null +++ b/pkg/services/live/pipeline/compare.go @@ -0,0 +1,14 @@ +package pipeline + +// NumberCompareOp is an comparison operator. +type NumberCompareOp string + +// Known NumberCompareOp types. +const ( + NumberCompareOpLt NumberCompareOp = "lt" + NumberCompareOpGt NumberCompareOp = "gt" + NumberCompareOpLte NumberCompareOp = "lte" + NumberCompareOpGte NumberCompareOp = "gte" + NumberCompareOpEq NumberCompareOp = "eq" + NumberCompareOpNe NumberCompareOp = "ne" +) diff --git a/pkg/services/live/pipeline/condition_checker.go b/pkg/services/live/pipeline/condition_checker.go deleted file mode 100644 index a50fd67ec0d..00000000000 --- a/pkg/services/live/pipeline/condition_checker.go +++ /dev/null @@ -1,13 +0,0 @@ -package pipeline - -import ( - "context" - - "github.com/grafana/grafana-plugin-sdk-go/data" -) - -// ConditionChecker checks conditions in context of data.Frame being processed. -type ConditionChecker interface { - Type() string - CheckCondition(ctx context.Context, frame *data.Frame) (bool, error) -} diff --git a/pkg/services/live/pipeline/condition_checker_multiple.go b/pkg/services/live/pipeline/condition_checker_multiple.go deleted file mode 100644 index 83e7c9ba069..00000000000 --- a/pkg/services/live/pipeline/condition_checker_multiple.go +++ /dev/null @@ -1,51 +0,0 @@ -package pipeline - -import ( - "context" - - "github.com/grafana/grafana-plugin-sdk-go/data" -) - -// ConditionType represents multiple condition operator type. -type ConditionType string - -const ( - ConditionAll ConditionType = "all" - ConditionAny ConditionType = "any" -) - -// MultipleConditionChecker can check multiple conditions according to ConditionType. -type MultipleConditionChecker struct { - ConditionType ConditionType - Conditions []ConditionChecker -} - -const ConditionCheckerTypeMultiple = "multiple" - -func (c *MultipleConditionChecker) Type() string { - return ConditionCheckerTypeMultiple -} - -func (c *MultipleConditionChecker) CheckCondition(ctx context.Context, frame *data.Frame) (bool, error) { - for _, cond := range c.Conditions { - ok, err := cond.CheckCondition(ctx, frame) - if err != nil { - return false, err - } - if ok && c.ConditionType == ConditionAny { - return true, nil - } - if !ok && c.ConditionType == ConditionAll { - return false, nil - } - } - if c.ConditionType == ConditionAny { - return false, nil - } - return true, nil -} - -// NewMultipleConditionChecker creates new MultipleConditionChecker. -func NewMultipleConditionChecker(conditionType ConditionType, conditions ...ConditionChecker) *MultipleConditionChecker { - return &MultipleConditionChecker{ConditionType: conditionType, Conditions: conditions} -} diff --git a/pkg/services/live/pipeline/condition_type.go b/pkg/services/live/pipeline/condition_type.go new file mode 100644 index 00000000000..e02ad55e133 --- /dev/null +++ b/pkg/services/live/pipeline/condition_type.go @@ -0,0 +1,10 @@ +package pipeline + +// ConditionType represents multiple condition operator type. +type ConditionType string + +// Known condition types. +const ( + ConditionAll ConditionType = "all" + ConditionAny ConditionType = "any" +) diff --git a/pkg/services/live/pipeline/config.go b/pkg/services/live/pipeline/config.go index ccec43b698a..5b4511db31d 100644 --- a/pkg/services/live/pipeline/config.go +++ b/pkg/services/live/pipeline/config.go @@ -4,10 +4,10 @@ import ( "context" "fmt" - "github.com/grafana/grafana/pkg/services/live/pipeline/tree" - + "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/services/live/managedstream" "github.com/grafana/grafana/pkg/services/live/pipeline/pattern" + "github.com/grafana/grafana/pkg/services/live/pipeline/tree" "github.com/centrifugal/centrifuge" ) @@ -22,33 +22,33 @@ type ConverterConfig struct { JsonFrameConverterConfig *JsonFrameConverterConfig `json:"jsonFrame,omitempty"` } -type ProcessorConfig struct { - Type string `json:"type"` - DropFieldsProcessorConfig *DropFieldsProcessorConfig `json:"dropFields,omitempty"` - KeepFieldsProcessorConfig *KeepFieldsProcessorConfig `json:"keepFields,omitempty"` - MultipleProcessorConfig *MultipleProcessorConfig `json:"multiple,omitempty"` +type FrameProcessorConfig struct { + Type string `json:"type"` + DropFieldsProcessorConfig *DropFieldsFrameProcessorConfig `json:"dropFields,omitempty"` + KeepFieldsProcessorConfig *KeepFieldsFrameProcessorConfig `json:"keepFields,omitempty"` + MultipleProcessorConfig *MultipleFrameProcessorConfig `json:"multiple,omitempty"` } -type MultipleProcessorConfig struct { - Processors []ProcessorConfig `json:"processors"` +type MultipleFrameProcessorConfig struct { + Processors []FrameProcessorConfig `json:"processors"` } type MultipleOutputterConfig struct { - Outputters []OutputterConfig `json:"outputs"` + Outputters []FrameOutputterConfig `json:"outputs"` } type ManagedStreamOutputConfig struct{} type ConditionalOutputConfig struct { - Condition *ConditionCheckerConfig `json:"condition"` - Outputter *OutputterConfig `json:"output"` + Condition *FrameConditionCheckerConfig `json:"condition"` + Outputter *FrameOutputterConfig `json:"output"` } type RemoteWriteOutputConfig struct { UID string `json:"uid"` } -type OutputterConfig struct { +type FrameOutputterConfig struct { Type string `json:"type"` ManagedStreamConfig *ManagedStreamOutputConfig `json:"managedStream,omitempty"` MultipleOutputterConfig *MultipleOutputterConfig `json:"multiple,omitempty"` @@ -59,21 +59,40 @@ type OutputterConfig struct { ChangeLogOutputConfig *ChangeLogOutputConfig `json:"changeLog,omitempty"` } +type DataOutputterConfig struct { + Type string `json:"type"` + RedirectDataOutputConfig *RedirectDataOutputConfig `json:"redirect,omitempty"` +} + type MultipleSubscriberConfig struct { Subscribers []SubscriberConfig `json:"subscribers"` } type SubscriberConfig struct { - Type string `json:"type"` - MultipleSubscriberConfig *MultipleSubscriberConfig `json:"multiple,omitempty"` - AuthorizeRoleSubscriberConfig *AuthorizeRoleSubscriberConfig `json:"authorizeRole,omitempty"` + Type string `json:"type"` + MultipleSubscriberConfig *MultipleSubscriberConfig `json:"multiple,omitempty"` +} + +// ChannelAuthCheckConfig is used to define auth rules for a channel. +type ChannelAuthCheckConfig struct { + RequireRole models.RoleType `json:"role,omitempty"` +} + +type ChannelAuthConfig struct { + // By default anyone can subscribe. + Subscribe *ChannelAuthCheckConfig `json:"subscribe,omitempty"` + + // By default HTTP and WS require admin permissions to publish. + Publish *ChannelAuthCheckConfig `json:"publish,omitempty"` } type ChannelRuleSettings struct { - Subscriber *SubscriberConfig `json:"subscriber,omitempty"` - Converter *ConverterConfig `json:"converter,omitempty"` - Processor *ProcessorConfig `json:"processor,omitempty"` - Outputter *OutputterConfig `json:"output,omitempty"` + Auth *ChannelAuthConfig `json:"auth,omitempty"` + Subscribers []*SubscriberConfig `json:"subscribers,omitempty"` + DataOutputters []*DataOutputterConfig `json:"dataOutputs,omitempty"` + Converter *ConverterConfig `json:"converter,omitempty"` + FrameProcessors []*FrameProcessorConfig `json:"frameProcessors,omitempty"` + FrameOutputters []*FrameOutputterConfig `json:"frameOutputs,omitempty"` } type ChannelRule struct { @@ -92,19 +111,25 @@ func (r ChannelRule) Valid() (bool, string) { return false, fmt.Sprintf("unknown converter type: %s", r.Settings.Converter.Type) } } - if r.Settings.Subscriber != nil { - if !typeRegistered(r.Settings.Subscriber.Type, SubscribersRegistry) { - return false, fmt.Sprintf("unknown subscriber type: %s", r.Settings.Subscriber.Type) + if len(r.Settings.Subscribers) > 0 { + for _, sub := range r.Settings.Subscribers { + if !typeRegistered(sub.Type, SubscribersRegistry) { + return false, fmt.Sprintf("unknown subscriber type: %s", sub.Type) + } } } - if r.Settings.Processor != nil { - if !typeRegistered(r.Settings.Processor.Type, ProcessorsRegistry) { - return false, fmt.Sprintf("unknown processor type: %s", r.Settings.Processor.Type) + if len(r.Settings.FrameProcessors) > 0 { + for _, proc := range r.Settings.FrameProcessors { + if !typeRegistered(proc.Type, FrameProcessorsRegistry) { + return false, fmt.Sprintf("unknown processor type: %s", proc.Type) + } } } - if r.Settings.Outputter != nil { - if !typeRegistered(r.Settings.Outputter.Type, OutputsRegistry) { - return false, fmt.Sprintf("unknown output type: %s", r.Settings.Outputter.Type) + if len(r.Settings.FrameOutputters) > 0 { + for _, out := range r.Settings.FrameOutputters { + if !typeRegistered(out.Type, FrameOutputsRegistry) { + return false, fmt.Sprintf("unknown output type: %s", out.Type) + } } } return true, "" @@ -150,21 +175,21 @@ func checkRulesValid(orgID int64, rules []ChannelRule) (ok bool, reason string) return ok, reason } -type MultipleConditionCheckerConfig struct { - Type ConditionType `json:"type"` - Conditions []ConditionCheckerConfig `json:"conditions"` +type MultipleFrameConditionCheckerConfig struct { + Type ConditionType `json:"type"` + Conditions []FrameConditionCheckerConfig `json:"conditions"` } -type NumberCompareConditionConfig struct { +type NumberCompareFrameConditionConfig struct { FieldName string `json:"fieldName"` Op NumberCompareOp `json:"op"` Value float64 `json:"value"` } -type ConditionCheckerConfig struct { - Type string `json:"type"` - MultipleConditionCheckerConfig *MultipleConditionCheckerConfig `json:"multiple,omitempty"` - NumberCompareConditionConfig *NumberCompareConditionConfig `json:"numberCompare,omitempty"` +type FrameConditionCheckerConfig struct { + Type string `json:"type"` + MultipleConditionCheckerConfig *MultipleFrameConditionCheckerConfig `json:"multiple,omitempty"` + NumberCompareConditionConfig *NumberCompareFrameConditionConfig `json:"numberCompare,omitempty"` } type RuleStorage interface { @@ -193,11 +218,6 @@ func (f *StorageRuleBuilder) extractSubscriber(config *SubscriberConfig) (Subscr return NewBuiltinSubscriber(f.ChannelHandlerGetter), nil case SubscriberTypeManagedStream: return NewManagedStreamSubscriber(f.ManagedStream), nil - case SubscriberTypeAuthorizeRole: - if config.AuthorizeRoleSubscriberConfig == nil { - return nil, missingConfiguration - } - return NewAuthorizeRoleSubscriber(*config.AuthorizeRoleSubscriberConfig), nil case SubscriberTypeMultiple: if config.MultipleSubscriberConfig == nil { return nil, missingConfiguration @@ -225,7 +245,7 @@ func (f *StorageRuleBuilder) extractConverter(config *ConverterConfig) (Converte switch config.Type { case ConverterTypeJsonAuto: if config.AutoJsonConverterConfig == nil { - return nil, missingConfiguration + config.AutoJsonConverterConfig = &AutoJsonConverterConfig{} } return NewAutoJsonConverter(*config.AutoJsonConverterConfig), nil case ConverterTypeJsonExact: @@ -235,7 +255,7 @@ func (f *StorageRuleBuilder) extractConverter(config *ConverterConfig) (Converte return NewExactJsonConverter(*config.ExactJsonConverterConfig), nil case ConverterTypeJsonFrame: if config.JsonFrameConverterConfig == nil { - return nil, missingConfiguration + config.JsonFrameConverterConfig = &JsonFrameConverterConfig{} } return NewJsonFrameConverter(*config.JsonFrameConverterConfig), nil case ConverterTypeInfluxAuto: @@ -248,120 +268,120 @@ func (f *StorageRuleBuilder) extractConverter(config *ConverterConfig) (Converte } } -func (f *StorageRuleBuilder) extractProcessor(config *ProcessorConfig) (Processor, error) { +func (f *StorageRuleBuilder) extractFrameProcessor(config *FrameProcessorConfig) (FrameProcessor, error) { if config == nil { return nil, nil } missingConfiguration := fmt.Errorf("missing configuration for %s", config.Type) switch config.Type { - case ProcessorTypeDropFields: + case FrameProcessorTypeDropFields: if config.DropFieldsProcessorConfig == nil { return nil, missingConfiguration } - return NewDropFieldsProcessor(*config.DropFieldsProcessorConfig), nil - case ProcessorTypeKeepFields: + return NewDropFieldsFrameProcessor(*config.DropFieldsProcessorConfig), nil + case FrameProcessorTypeKeepFields: if config.KeepFieldsProcessorConfig == nil { return nil, missingConfiguration } - return NewKeepFieldsProcessor(*config.KeepFieldsProcessorConfig), nil - case ProcessorTypeMultiple: + return NewKeepFieldsFrameProcessor(*config.KeepFieldsProcessorConfig), nil + case FrameProcessorTypeMultiple: if config.MultipleProcessorConfig == nil { return nil, missingConfiguration } - var processors []Processor + var processors []FrameProcessor for _, outConf := range config.MultipleProcessorConfig.Processors { out := outConf - proc, err := f.extractProcessor(&out) + proc, err := f.extractFrameProcessor(&out) if err != nil { return nil, err } processors = append(processors, proc) } - return NewMultipleProcessor(processors...), nil + return NewMultipleFrameProcessor(processors...), nil default: return nil, fmt.Errorf("unknown processor type: %s", config.Type) } } -func (f *StorageRuleBuilder) extractConditionChecker(config *ConditionCheckerConfig) (ConditionChecker, error) { +func (f *StorageRuleBuilder) extractFrameConditionChecker(config *FrameConditionCheckerConfig) (FrameConditionChecker, error) { if config == nil { return nil, nil } missingConfiguration := fmt.Errorf("missing configuration for %s", config.Type) switch config.Type { - case ConditionCheckerTypeNumberCompare: + case FrameConditionCheckerTypeNumberCompare: if config.NumberCompareConditionConfig == nil { return nil, missingConfiguration } c := *config.NumberCompareConditionConfig - return NewNumberCompareCondition(c.FieldName, c.Op, c.Value), nil - case ConditionCheckerTypeMultiple: - var conditions []ConditionChecker + return NewFrameNumberCompareCondition(c.FieldName, c.Op, c.Value), nil + case FrameConditionCheckerTypeMultiple: + var conditions []FrameConditionChecker if config.MultipleConditionCheckerConfig == nil { return nil, missingConfiguration } for _, outConf := range config.MultipleConditionCheckerConfig.Conditions { out := outConf - cond, err := f.extractConditionChecker(&out) + cond, err := f.extractFrameConditionChecker(&out) if err != nil { return nil, err } conditions = append(conditions, cond) } - return NewMultipleConditionChecker(config.MultipleConditionCheckerConfig.Type, conditions...), nil + return NewMultipleFrameConditionChecker(config.MultipleConditionCheckerConfig.Type, conditions...), nil default: return nil, fmt.Errorf("unknown condition type: %s", config.Type) } } -func (f *StorageRuleBuilder) extractOutputter(config *OutputterConfig, remoteWriteBackends []RemoteWriteBackend) (Outputter, error) { +func (f *StorageRuleBuilder) extractFrameOutputter(config *FrameOutputterConfig, remoteWriteBackends []RemoteWriteBackend) (FrameOutputter, error) { if config == nil { return nil, nil } missingConfiguration := fmt.Errorf("missing configuration for %s", config.Type) switch config.Type { - case OutputTypeRedirect: + case FrameOutputTypeRedirect: if config.RedirectOutputConfig == nil { return nil, missingConfiguration } - return NewRedirectOutput(*config.RedirectOutputConfig), nil - case OutputTypeMultiple: + return NewRedirectFrameOutput(*config.RedirectOutputConfig), nil + case FrameOutputTypeMultiple: if config.MultipleOutputterConfig == nil { return nil, missingConfiguration } - var outputters []Outputter + var outputters []FrameOutputter for _, outConf := range config.MultipleOutputterConfig.Outputters { out := outConf - outputter, err := f.extractOutputter(&out, remoteWriteBackends) + outputter, err := f.extractFrameOutputter(&out, remoteWriteBackends) if err != nil { return nil, err } outputters = append(outputters, outputter) } - return NewMultipleOutput(outputters...), nil - case OutputTypeManagedStream: - return NewManagedStreamOutput(f.ManagedStream), nil - case OutputTypeLocalSubscribers: - return NewLocalSubscribersOutput(f.Node), nil - case OutputTypeConditional: + return NewMultipleFrameOutput(outputters...), nil + case FrameOutputTypeManagedStream: + return NewManagedStreamFrameOutput(f.ManagedStream), nil + case FrameOutputTypeLocalSubscribers: + return NewLocalSubscribersFrameOutput(f.Node), nil + case FrameOutputTypeConditional: if config.ConditionalOutputConfig == nil { return nil, missingConfiguration } - condition, err := f.extractConditionChecker(config.ConditionalOutputConfig.Condition) + condition, err := f.extractFrameConditionChecker(config.ConditionalOutputConfig.Condition) if err != nil { return nil, err } - outputter, err := f.extractOutputter(config.ConditionalOutputConfig.Outputter, remoteWriteBackends) + outputter, err := f.extractFrameOutputter(config.ConditionalOutputConfig.Outputter, remoteWriteBackends) if err != nil { return nil, err } return NewConditionalOutput(condition, outputter), nil - case OutputTypeThreshold: + case FrameOutputTypeThreshold: if config.ThresholdOutputConfig == nil { return nil, missingConfiguration } return NewThresholdOutput(f.FrameStorage, *config.ThresholdOutputConfig), nil - case OutputTypeRemoteWrite: + case FrameOutputTypeRemoteWrite: if config.RemoteWriteOutputConfig == nil { return nil, missingConfiguration } @@ -369,17 +389,37 @@ func (f *StorageRuleBuilder) extractOutputter(config *OutputterConfig, remoteWri if !ok { return nil, fmt.Errorf("unknown remote write backend uid: %s", config.RemoteWriteOutputConfig.UID) } - return NewRemoteWriteOutput(*remoteWriteConfig), nil - case OutputTypeChangeLog: + return NewRemoteWriteFrameOutput(*remoteWriteConfig), nil + case FrameOutputTypeChangeLog: if config.ChangeLogOutputConfig == nil { return nil, missingConfiguration } - return NewChangeLogOutput(f.FrameStorage, *config.ChangeLogOutputConfig), nil + return NewChangeLogFrameOutput(f.FrameStorage, *config.ChangeLogOutputConfig), nil default: return nil, fmt.Errorf("unknown output type: %s", config.Type) } } +func (f *StorageRuleBuilder) extractDataOutputter(config *DataOutputterConfig) (DataOutputter, error) { + if config == nil { + return nil, nil + } + missingConfiguration := fmt.Errorf("missing configuration for %s", config.Type) + switch config.Type { + case DataOutputTypeRedirect: + if config.RedirectDataOutputConfig == nil { + return nil, missingConfiguration + } + return NewRedirectDataOutput(*config.RedirectDataOutputConfig), nil + case DataOutputTypeBuiltin: + return NewBuiltinDataOutput(f.ChannelHandlerGetter), nil + case DataOutputTypeLocalSubscribers: + return NewLocalSubscribersDataOutput(f.Node), nil + default: + return nil, fmt.Errorf("unknown data output type: %s", config.Type) + } +} + func (f *StorageRuleBuilder) getRemoteWriteConfig(uid string, remoteWriteBackends []RemoteWriteBackend) (*RemoteWriteConfig, bool) { for _, rwb := range remoteWriteBackends { if rwb.UID == uid { @@ -407,23 +447,62 @@ func (f *StorageRuleBuilder) BuildRules(ctx context.Context, orgID int64) ([]*Li OrgId: orgID, Pattern: ruleConfig.Pattern, } - var err error - rule.Subscriber, err = f.extractSubscriber(ruleConfig.Settings.Subscriber) - if err != nil { - return nil, err + + if ruleConfig.Settings.Auth != nil && ruleConfig.Settings.Auth.Subscribe != nil { + rule.SubscribeAuth = NewRoleCheckAuthorizer(ruleConfig.Settings.Auth.Subscribe.RequireRole) } + + if ruleConfig.Settings.Auth != nil && ruleConfig.Settings.Auth.Publish != nil { + rule.PublishAuth = NewRoleCheckAuthorizer(ruleConfig.Settings.Auth.Publish.RequireRole) + } + + var err error + rule.Converter, err = f.extractConverter(ruleConfig.Settings.Converter) if err != nil { - return nil, err + return nil, fmt.Errorf("error building converter for %s: %w", rule.Pattern, err) } - rule.Processor, err = f.extractProcessor(ruleConfig.Settings.Processor) - if err != nil { - return nil, err + + var processors []FrameProcessor + for _, procConfig := range ruleConfig.Settings.FrameProcessors { + proc, err := f.extractFrameProcessor(procConfig) + if err != nil { + return nil, fmt.Errorf("error building processor for %s: %w", rule.Pattern, err) + } + processors = append(processors, proc) } - rule.Outputter, err = f.extractOutputter(ruleConfig.Settings.Outputter, remoteWriteBackends) - if err != nil { - return nil, err + rule.FrameProcessors = processors + + var dataOutputters []DataOutputter + for _, outConfig := range ruleConfig.Settings.DataOutputters { + out, err := f.extractDataOutputter(outConfig) + if err != nil { + return nil, fmt.Errorf("error building data outputter for %s: %w", rule.Pattern, err) + } + dataOutputters = append(dataOutputters, out) } + rule.DataOutputters = dataOutputters + + var outputters []FrameOutputter + for _, outConfig := range ruleConfig.Settings.FrameOutputters { + out, err := f.extractFrameOutputter(outConfig, remoteWriteBackends) + if err != nil { + return nil, fmt.Errorf("error building frame outputter for %s: %w", rule.Pattern, err) + } + outputters = append(outputters, out) + } + rule.FrameOutputters = outputters + + var subscribers []Subscriber + for _, subConfig := range ruleConfig.Settings.Subscribers { + sub, err := f.extractSubscriber(subConfig) + if err != nil { + return nil, fmt.Errorf("error building subscriber for %s: %w", rule.Pattern, err) + } + subscribers = append(subscribers, sub) + } + rule.Subscribers = subscribers + rules = append(rules, rule) } diff --git a/pkg/services/live/pipeline/converter_influx_auto.go b/pkg/services/live/pipeline/converter_influx_auto.go index 18ac1c8e9e2..c87a0fdb649 100644 --- a/pkg/services/live/pipeline/converter_influx_auto.go +++ b/pkg/services/live/pipeline/converter_influx_auto.go @@ -6,6 +6,7 @@ import ( "github.com/grafana/grafana/pkg/services/live/convert" ) +// AutoInfluxConverterConfig ... type AutoInfluxConverterConfig struct { FrameFormat string `json:"frameFormat"` } @@ -18,6 +19,7 @@ type AutoInfluxConverter struct { converter *convert.Converter } +// NewAutoInfluxConverter creates new AutoInfluxConverter. func NewAutoInfluxConverter(config AutoInfluxConverterConfig) *AutoInfluxConverter { return &AutoInfluxConverter{config: config, converter: convert.NewConverter()} } diff --git a/pkg/services/live/pipeline/converter_json_auto.go b/pkg/services/live/pipeline/converter_json_auto.go index 7b599eea33d..f9307652854 100644 --- a/pkg/services/live/pipeline/converter_json_auto.go +++ b/pkg/services/live/pipeline/converter_json_auto.go @@ -28,8 +28,8 @@ func (c *AutoJsonConverter) Type() string { // * Time added automatically // * Nulls dropped // To preserve nulls we need FieldTips from a user. -// Custom time can be injected on Processor stage theoretically. -// Custom labels can be injected on Processor stage theoretically. +// Custom time can be injected on FrameProcessor stage theoretically. +// Custom labels can be injected on FrameProcessor stage theoretically. func (c *AutoJsonConverter) Convert(_ context.Context, vars Vars, body []byte) ([]*ChannelFrame, error) { nowTimeFunc := c.nowTimeFunc if nowTimeFunc == nil { diff --git a/pkg/services/live/pipeline/converter_json_exact_test.go b/pkg/services/live/pipeline/converter_json_exact_test.go index 6bb9845fd17..e1bfb90bbc1 100644 --- a/pkg/services/live/pipeline/converter_json_exact_test.go +++ b/pkg/services/live/pipeline/converter_json_exact_test.go @@ -6,9 +6,8 @@ import ( "testing" "time" - "github.com/grafana/grafana-plugin-sdk-go/data" - "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana-plugin-sdk-go/experimental" "github.com/stretchr/testify/require" ) diff --git a/pkg/services/live/pipeline/data_output_builtin.go b/pkg/services/live/pipeline/data_output_builtin.go new file mode 100644 index 00000000000..4b2d4b20b1b --- /dev/null +++ b/pkg/services/live/pipeline/data_output_builtin.go @@ -0,0 +1,47 @@ +package pipeline + +import ( + "context" + "errors" + + "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/services/live/livecontext" + + "github.com/grafana/grafana-plugin-sdk-go/backend" +) + +type BuiltinDataOutput struct { + channelHandlerGetter ChannelHandlerGetter +} + +const DataOutputTypeBuiltin = "builtin" + +func NewBuiltinDataOutput(channelHandlerGetter ChannelHandlerGetter) *BuiltinDataOutput { + return &BuiltinDataOutput{channelHandlerGetter: channelHandlerGetter} +} + +func (s *BuiltinDataOutput) Type() string { + return DataOutputTypeBuiltin +} + +func (s *BuiltinDataOutput) OutputData(ctx context.Context, vars Vars, data []byte) ([]*ChannelData, error) { + u, ok := livecontext.GetContextSignedUser(ctx) + if !ok { + return nil, errors.New("user not found in context") + } + handler, _, err := s.channelHandlerGetter.GetChannelHandler(u, vars.Channel) + if err != nil { + return nil, err + } + _, status, err := handler.OnPublish(ctx, u, models.PublishEvent{ + Channel: vars.Channel, + Data: data, + }) + if err != nil { + return nil, err + } + if status != backend.PublishStreamStatusOK { + return nil, errors.New("unauthorized publish") + } + return nil, nil +} diff --git a/pkg/services/live/pipeline/data_output_local_subscribers.go b/pkg/services/live/pipeline/data_output_local_subscribers.go new file mode 100644 index 00000000000..186d4b19b1a --- /dev/null +++ b/pkg/services/live/pipeline/data_output_local_subscribers.go @@ -0,0 +1,38 @@ +package pipeline + +import ( + "context" + "fmt" + + "github.com/grafana/grafana/pkg/services/live/orgchannel" + + "github.com/centrifugal/centrifuge" +) + +type LocalSubscribersDataOutput struct { + // TODO: refactor to depend on interface (avoid Centrifuge dependency here). + node *centrifuge.Node +} + +func NewLocalSubscribersDataOutput(node *centrifuge.Node) *LocalSubscribersDataOutput { + return &LocalSubscribersDataOutput{node: node} +} + +const DataOutputTypeLocalSubscribers = "localSubscribers" + +func (out *LocalSubscribersDataOutput) Type() string { + return DataOutputTypeLocalSubscribers +} + +func (out *LocalSubscribersDataOutput) OutputData(_ context.Context, vars Vars, data []byte) ([]*ChannelData, error) { + channelID := vars.Channel + channel := orgchannel.PrependOrgID(vars.OrgID, channelID) + pub := ¢rifuge.Publication{ + Data: data, + } + err := out.node.Hub().BroadcastPublication(channel, pub, centrifuge.StreamPosition{}) + if err != nil { + return nil, fmt.Errorf("error publishing %s: %w", string(data), err) + } + return nil, nil +} diff --git a/pkg/services/live/pipeline/data_output_redirect.go b/pkg/services/live/pipeline/data_output_redirect.go new file mode 100644 index 00000000000..abb5d3b3c34 --- /dev/null +++ b/pkg/services/live/pipeline/data_output_redirect.go @@ -0,0 +1,37 @@ +package pipeline + +import ( + "context" + "fmt" +) + +// RedirectDataOutputConfig ... +type RedirectDataOutputConfig struct { + Channel string `json:"channel"` +} + +// RedirectDataOutput passes processing control to the rule defined +// for a configured channel. +type RedirectDataOutput struct { + config RedirectDataOutputConfig +} + +func NewRedirectDataOutput(config RedirectDataOutputConfig) *RedirectDataOutput { + return &RedirectDataOutput{config: config} +} + +const DataOutputTypeRedirect = "redirect" + +func (out *RedirectDataOutput) Type() string { + return DataOutputTypeRedirect +} + +func (out *RedirectDataOutput) OutputData(_ context.Context, vars Vars, data []byte) ([]*ChannelData, error) { + if vars.Channel == out.config.Channel { + return nil, fmt.Errorf("redirect to the same channel: %s", out.config.Channel) + } + return []*ChannelData{{ + Channel: out.config.Channel, + Data: data, + }}, nil +} diff --git a/pkg/services/live/pipeline/devdata.go b/pkg/services/live/pipeline/devdata.go index 1c7bd369d53..1c1afdb67cf 100644 --- a/pkg/services/live/pipeline/devdata.go +++ b/pkg/services/live/pipeline/devdata.go @@ -99,27 +99,31 @@ type DevRuleBuilder struct { func (f *DevRuleBuilder) BuildRules(_ context.Context, _ int64) ([]*LiveChannelRule, error) { return []*LiveChannelRule{ { - Pattern: "plugin/testdata/random-20Hz-stream", - Subscriber: NewMultipleSubscriber( - NewBuiltinSubscriber(f.ChannelHandlerGetter), - NewManagedStreamSubscriber(f.ManagedStream), - ), + Pattern: "plugin/testdata/random-20Hz-stream", Converter: NewJsonFrameConverter(JsonFrameConverterConfig{}), - Outputter: NewMultipleOutput( - NewManagedStreamOutput(f.ManagedStream), - NewRemoteWriteOutput(RemoteWriteConfig{ + FrameOutputters: []FrameOutputter{ + NewManagedStreamFrameOutput(f.ManagedStream), + NewRemoteWriteFrameOutput(RemoteWriteConfig{ Endpoint: os.Getenv("GF_LIVE_REMOTE_WRITE_ENDPOINT"), User: os.Getenv("GF_LIVE_REMOTE_WRITE_USER"), Password: os.Getenv("GF_LIVE_REMOTE_WRITE_PASSWORD"), }), - ), + }, + Subscribers: []Subscriber{ + NewBuiltinSubscriber(f.ChannelHandlerGetter), + NewManagedStreamSubscriber(f.ManagedStream), + }, }, { Pattern: "stream/testdata/random-20Hz-stream", - Processor: NewKeepFieldsProcessor(KeepFieldsProcessorConfig{ - FieldNames: []string{"Time", "Min", "Max"}, - }), - Outputter: NewManagedStreamOutput(f.ManagedStream), + FrameProcessors: []FrameProcessor{ + NewKeepFieldsFrameProcessor(KeepFieldsFrameProcessorConfig{ + FieldNames: []string{"Time", "Min", "Max"}, + }), + }, + FrameOutputters: []FrameOutputter{ + NewManagedStreamFrameOutput(f.ManagedStream), + }, }, { OrgId: 1, @@ -129,9 +133,11 @@ func (f *DevRuleBuilder) BuildRules(_ context.Context, _ int64) ([]*LiveChannelR }), }, { - OrgId: 1, - Pattern: "stream/influx/input/:rest", - Outputter: NewManagedStreamOutput(f.ManagedStream), + OrgId: 1, + Pattern: "stream/influx/input/:rest", + FrameOutputters: []FrameOutputter{ + NewManagedStreamFrameOutput(f.ManagedStream), + }, }, { OrgId: 1, @@ -140,29 +146,31 @@ func (f *DevRuleBuilder) BuildRules(_ context.Context, _ int64) ([]*LiveChannelR // since there are cases when labels attached to a field, and cases where labels // set in a first frame column (in Influx converter). For example, this will allow // to leave only "total-cpu" data while dropping individual CPUs. - Processor: NewKeepFieldsProcessor(KeepFieldsProcessorConfig{ - FieldNames: []string{"labels", "time", "usage_user"}, - }), - Outputter: NewMultipleOutput( - NewManagedStreamOutput(f.ManagedStream), + FrameProcessors: []FrameProcessor{ + NewKeepFieldsFrameProcessor(KeepFieldsFrameProcessorConfig{ + FieldNames: []string{"labels", "time", "usage_user"}, + }), + }, + FrameOutputters: []FrameOutputter{ + NewManagedStreamFrameOutput(f.ManagedStream), NewConditionalOutput( - NewNumberCompareCondition("usage_user", "gte", 50), - NewRedirectOutput(RedirectOutputConfig{ + NewFrameNumberCompareCondition("usage_user", "gte", 50), + NewRedirectFrameOutput(RedirectOutputConfig{ Channel: "stream/influx/input/cpu/spikes", }), ), - ), + }, }, { - OrgId: 1, - Pattern: "stream/influx/input/cpu/spikes", - Outputter: NewManagedStreamOutput(f.ManagedStream), + OrgId: 1, + Pattern: "stream/influx/input/cpu/spikes", + FrameOutputters: []FrameOutputter{NewManagedStreamFrameOutput(f.ManagedStream)}, }, { - OrgId: 1, - Pattern: "stream/json/auto", - Converter: NewAutoJsonConverter(AutoJsonConverterConfig{}), - Outputter: NewManagedStreamOutput(f.ManagedStream), + OrgId: 1, + Pattern: "stream/json/auto", + Converter: NewAutoJsonConverter(AutoJsonConverterConfig{}), + FrameOutputters: []FrameOutputter{NewManagedStreamFrameOutput(f.ManagedStream)}, }, { OrgId: 1, @@ -179,10 +187,14 @@ func (f *DevRuleBuilder) BuildRules(_ context.Context, _ int64) ([]*LiveChannelR }, }, }), - Processor: NewDropFieldsProcessor(DropFieldsProcessorConfig{ - FieldNames: []string{"value2"}, - }), - Outputter: NewManagedStreamOutput(f.ManagedStream), + FrameProcessors: []FrameProcessor{ + NewDropFieldsFrameProcessor(DropFieldsFrameProcessorConfig{ + FieldNames: []string{"value2"}, + }), + }, + FrameOutputters: []FrameOutputter{ + NewManagedStreamFrameOutput(f.ManagedStream), + }, }, { OrgId: 1, @@ -274,28 +286,28 @@ func (f *DevRuleBuilder) BuildRules(_ context.Context, _ int64) ([]*LiveChannelR }, }, }), - Outputter: NewMultipleOutput( - NewManagedStreamOutput(f.ManagedStream), - NewRemoteWriteOutput(RemoteWriteConfig{ + FrameOutputters: []FrameOutputter{ + NewManagedStreamFrameOutput(f.ManagedStream), + NewRemoteWriteFrameOutput(RemoteWriteConfig{ Endpoint: os.Getenv("GF_LIVE_REMOTE_WRITE_ENDPOINT"), User: os.Getenv("GF_LIVE_REMOTE_WRITE_USER"), Password: os.Getenv("GF_LIVE_REMOTE_WRITE_PASSWORD"), }), - NewChangeLogOutput(f.FrameStorage, ChangeLogOutputConfig{ + NewChangeLogFrameOutput(f.FrameStorage, ChangeLogOutputConfig{ FieldName: "value3", Channel: "stream/json/exact/value3/changes", }), - NewChangeLogOutput(f.FrameStorage, ChangeLogOutputConfig{ + NewChangeLogFrameOutput(f.FrameStorage, ChangeLogOutputConfig{ FieldName: "annotation", Channel: "stream/json/exact/annotation/changes", }), NewConditionalOutput( - NewMultipleConditionChecker( + NewMultipleFrameConditionChecker( ConditionAll, - NewNumberCompareCondition("value1", "gte", 3.0), - NewNumberCompareCondition("value2", "gte", 3.0), + NewFrameNumberCompareCondition("value1", "gte", 3.0), + NewFrameNumberCompareCondition("value2", "gte", 3.0), ), - NewRedirectOutput(RedirectOutputConfig{ + NewRedirectFrameOutput(RedirectOutputConfig{ Channel: "stream/json/exact/condition", }), ), @@ -303,34 +315,40 @@ func (f *DevRuleBuilder) BuildRules(_ context.Context, _ int64) ([]*LiveChannelR FieldName: "value4", Channel: "stream/json/exact/value4/state", }), - ), + }, }, { OrgId: 1, Pattern: "stream/json/exact/value3/changes", - Outputter: NewMultipleOutput( - NewManagedStreamOutput(f.ManagedStream), - NewRemoteWriteOutput(RemoteWriteConfig{ + FrameOutputters: []FrameOutputter{ + NewManagedStreamFrameOutput(f.ManagedStream), + NewRemoteWriteFrameOutput(RemoteWriteConfig{ Endpoint: os.Getenv("GF_LIVE_REMOTE_WRITE_ENDPOINT"), User: os.Getenv("GF_LIVE_REMOTE_WRITE_USER"), Password: os.Getenv("GF_LIVE_REMOTE_WRITE_PASSWORD"), }), - ), + }, }, { - OrgId: 1, - Pattern: "stream/json/exact/annotation/changes", - Outputter: NewManagedStreamOutput(f.ManagedStream), + OrgId: 1, + Pattern: "stream/json/exact/annotation/changes", + FrameOutputters: []FrameOutputter{ + NewManagedStreamFrameOutput(f.ManagedStream), + }, }, { - OrgId: 1, - Pattern: "stream/json/exact/condition", - Outputter: NewManagedStreamOutput(f.ManagedStream), + OrgId: 1, + Pattern: "stream/json/exact/condition", + FrameOutputters: []FrameOutputter{ + NewManagedStreamFrameOutput(f.ManagedStream), + }, }, { - OrgId: 1, - Pattern: "stream/json/exact/value4/state", - Outputter: NewManagedStreamOutput(f.ManagedStream), + OrgId: 1, + Pattern: "stream/json/exact/value4/state", + FrameOutputters: []FrameOutputter{ + NewManagedStreamFrameOutput(f.ManagedStream), + }, }, }, nil } diff --git a/pkg/services/live/pipeline/frame_condition_checker.go b/pkg/services/live/pipeline/frame_condition_checker.go new file mode 100644 index 00000000000..64683e3125b --- /dev/null +++ b/pkg/services/live/pipeline/frame_condition_checker.go @@ -0,0 +1,13 @@ +package pipeline + +import ( + "context" + + "github.com/grafana/grafana-plugin-sdk-go/data" +) + +// FrameConditionChecker checks conditions in context of data.Frame being processed. +type FrameConditionChecker interface { + Type() string + CheckFrameCondition(ctx context.Context, frame *data.Frame) (bool, error) +} diff --git a/pkg/services/live/pipeline/frame_condition_checker_multiple.go b/pkg/services/live/pipeline/frame_condition_checker_multiple.go new file mode 100644 index 00000000000..7d2afe63dae --- /dev/null +++ b/pkg/services/live/pipeline/frame_condition_checker_multiple.go @@ -0,0 +1,43 @@ +package pipeline + +import ( + "context" + + "github.com/grafana/grafana-plugin-sdk-go/data" +) + +// MultipleFrameConditionChecker can check multiple conditions according to ConditionType. +type MultipleFrameConditionChecker struct { + ConditionType ConditionType + Conditions []FrameConditionChecker +} + +const FrameConditionCheckerTypeMultiple = "multiple" + +func (c *MultipleFrameConditionChecker) Type() string { + return FrameConditionCheckerTypeMultiple +} + +func (c *MultipleFrameConditionChecker) CheckFrameCondition(ctx context.Context, frame *data.Frame) (bool, error) { + for _, cond := range c.Conditions { + ok, err := cond.CheckFrameCondition(ctx, frame) + if err != nil { + return false, err + } + if ok && c.ConditionType == ConditionAny { + return true, nil + } + if !ok && c.ConditionType == ConditionAll { + return false, nil + } + } + if c.ConditionType == ConditionAny { + return false, nil + } + return true, nil +} + +// NewMultipleFrameConditionChecker creates new MultipleFrameConditionChecker. +func NewMultipleFrameConditionChecker(conditionType ConditionType, conditions ...FrameConditionChecker) *MultipleFrameConditionChecker { + return &MultipleFrameConditionChecker{ConditionType: conditionType, Conditions: conditions} +} diff --git a/pkg/services/live/pipeline/condition_number_compare.go b/pkg/services/live/pipeline/frame_condition_number_compare.go similarity index 52% rename from pkg/services/live/pipeline/condition_number_compare.go rename to pkg/services/live/pipeline/frame_condition_number_compare.go index afcefe03c2a..11cec2e0028 100644 --- a/pkg/services/live/pipeline/condition_number_compare.go +++ b/pkg/services/live/pipeline/frame_condition_number_compare.go @@ -7,33 +7,20 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/data" ) -// NumberCompareCondition can compare numbers. -type NumberCompareCondition struct { +// FrameNumberCompareCondition can compare numbers. +type FrameNumberCompareCondition struct { FieldName string Op NumberCompareOp Value float64 } -// NumberCompareOp is an comparison operator. -type NumberCompareOp string +const FrameConditionCheckerTypeNumberCompare = "numberCompare" -// Known NumberCompareOp types. -const ( - NumberCompareOpLt NumberCompareOp = "lt" - NumberCompareOpGt NumberCompareOp = "gt" - NumberCompareOpLte NumberCompareOp = "lte" - NumberCompareOpGte NumberCompareOp = "gte" - NumberCompareOpEq NumberCompareOp = "eq" - NumberCompareOpNe NumberCompareOp = "ne" -) - -const ConditionCheckerTypeNumberCompare = "numberCompare" - -func (c *NumberCompareCondition) Type() string { - return ConditionCheckerTypeNumberCompare +func (c *FrameNumberCompareCondition) Type() string { + return FrameConditionCheckerTypeNumberCompare } -func (c *NumberCompareCondition) CheckCondition(_ context.Context, frame *data.Frame) (bool, error) { +func (c *FrameNumberCompareCondition) CheckFrameCondition(_ context.Context, frame *data.Frame) (bool, error) { for _, field := range frame.Fields { // TODO: support other numeric types. if field.Name == c.FieldName && (field.Type() == data.FieldTypeNullableFloat64) { @@ -65,6 +52,6 @@ func (c *NumberCompareCondition) CheckCondition(_ context.Context, frame *data.F return false, nil } -func NewNumberCompareCondition(fieldName string, op NumberCompareOp, value float64) *NumberCompareCondition { - return &NumberCompareCondition{FieldName: fieldName, Op: op, Value: value} +func NewFrameNumberCompareCondition(fieldName string, op NumberCompareOp, value float64) *FrameNumberCompareCondition { + return &FrameNumberCompareCondition{FieldName: fieldName, Op: op, Value: value} } diff --git a/pkg/services/live/pipeline/output_changelog.go b/pkg/services/live/pipeline/frame_output_changelog.go similarity index 78% rename from pkg/services/live/pipeline/output_changelog.go rename to pkg/services/live/pipeline/frame_output_changelog.go index f2769806677..58cad760d7a 100644 --- a/pkg/services/live/pipeline/output_changelog.go +++ b/pkg/services/live/pipeline/frame_output_changelog.go @@ -13,24 +13,24 @@ type ChangeLogOutputConfig struct { Channel string `json:"channel"` } -// ChangeLogOutput can monitor value changes of the specified field and output +// ChangeLogFrameOutput can monitor value changes of the specified field and output // special change frame to the configured channel. -type ChangeLogOutput struct { +type ChangeLogFrameOutput struct { frameStorage FrameGetSetter config ChangeLogOutputConfig } -func NewChangeLogOutput(frameStorage FrameGetSetter, config ChangeLogOutputConfig) *ChangeLogOutput { - return &ChangeLogOutput{frameStorage: frameStorage, config: config} +func NewChangeLogFrameOutput(frameStorage FrameGetSetter, config ChangeLogOutputConfig) *ChangeLogFrameOutput { + return &ChangeLogFrameOutput{frameStorage: frameStorage, config: config} } -const OutputTypeChangeLog = "changeLog" +const FrameOutputTypeChangeLog = "changeLog" -func (out *ChangeLogOutput) Type() string { - return OutputTypeChangeLog +func (out *ChangeLogFrameOutput) Type() string { + return FrameOutputTypeChangeLog } -func (out *ChangeLogOutput) Output(_ context.Context, vars OutputVars, frame *data.Frame) ([]*ChannelFrame, error) { +func (out *ChangeLogFrameOutput) OutputFrame(_ context.Context, vars Vars, frame *data.Frame) ([]*ChannelFrame, error) { previousFrame, previousFrameOK, err := out.frameStorage.Get(vars.OrgID, out.config.Channel) if err != nil { return nil, err diff --git a/pkg/services/live/pipeline/output_changelog_test.go b/pkg/services/live/pipeline/frame_output_changelog_test.go similarity index 88% rename from pkg/services/live/pipeline/output_changelog_test.go rename to pkg/services/live/pipeline/frame_output_changelog_test.go index 13c88dac948..713a3171f49 100644 --- a/pkg/services/live/pipeline/output_changelog_test.go +++ b/pkg/services/live/pipeline/frame_output_changelog_test.go @@ -22,7 +22,7 @@ func TestChangeLogOutput_NoPreviousFrame_SingleRow(t *testing.T) { mockStorage.EXPECT().Set(gomock.Any(), gomock.Any(), gomock.Any()).Times(1) - outputter := NewChangeLogOutput(mockStorage, ChangeLogOutputConfig{ + outputter := NewChangeLogFrameOutput(mockStorage, ChangeLogOutputConfig{ FieldName: "test", Channel: "stream/test/no_previous_frame", }) @@ -35,7 +35,7 @@ func TestChangeLogOutput_NoPreviousFrame_SingleRow(t *testing.T) { frame := data.NewFrame("test", f1, f2) - channelFrames, err := outputter.Output(context.Background(), OutputVars{}, frame) + channelFrames, err := outputter.OutputFrame(context.Background(), Vars{}, frame) require.NoError(t, err) require.Len(t, channelFrames, 1) @@ -59,7 +59,7 @@ func TestChangeLogOutput_NoPreviousFrame_MultipleRows(t *testing.T) { mockStorage.EXPECT().Set(gomock.Any(), gomock.Any(), gomock.Any()).Times(1) - outputter := NewChangeLogOutput(mockStorage, ChangeLogOutputConfig{ + outputter := NewChangeLogFrameOutput(mockStorage, ChangeLogOutputConfig{ FieldName: "test", Channel: "stream/test/no_previous_frame", }) @@ -74,7 +74,7 @@ func TestChangeLogOutput_NoPreviousFrame_MultipleRows(t *testing.T) { frame := data.NewFrame("test", f1, f2) - channelFrames, err := outputter.Output(context.Background(), OutputVars{}, frame) + channelFrames, err := outputter.OutputFrame(context.Background(), Vars{}, frame) require.NoError(t, err) require.Len(t, channelFrames, 1) changeFrame := channelFrames[0].Frame diff --git a/pkg/services/live/pipeline/frame_output_conditional.go b/pkg/services/live/pipeline/frame_output_conditional.go new file mode 100644 index 00000000000..045dd27dd17 --- /dev/null +++ b/pkg/services/live/pipeline/frame_output_conditional.go @@ -0,0 +1,33 @@ +package pipeline + +import ( + "context" + + "github.com/grafana/grafana-plugin-sdk-go/data" +) + +type ConditionalOutput struct { + Condition FrameConditionChecker + Outputter FrameOutputter +} + +func NewConditionalOutput(condition FrameConditionChecker, outputter FrameOutputter) *ConditionalOutput { + return &ConditionalOutput{Condition: condition, Outputter: outputter} +} + +const FrameOutputTypeConditional = "conditional" + +func (out *ConditionalOutput) Type() string { + return FrameOutputTypeConditional +} + +func (out ConditionalOutput) OutputFrame(ctx context.Context, vars Vars, frame *data.Frame) ([]*ChannelFrame, error) { + ok, err := out.Condition.CheckFrameCondition(ctx, frame) + if err != nil { + return nil, err + } + if !ok { + return nil, nil + } + return out.Outputter.OutputFrame(ctx, vars, frame) +} diff --git a/pkg/services/live/pipeline/output_local_subscribers.go b/pkg/services/live/pipeline/frame_output_local_subscribers.go similarity index 61% rename from pkg/services/live/pipeline/output_local_subscribers.go rename to pkg/services/live/pipeline/frame_output_local_subscribers.go index ed1adf4788f..1f4179be7e8 100644 --- a/pkg/services/live/pipeline/output_local_subscribers.go +++ b/pkg/services/live/pipeline/frame_output_local_subscribers.go @@ -11,22 +11,22 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/data" ) -type LocalSubscribersOutput struct { +type LocalSubscribersFrameOutput struct { // TODO: refactor to depend on interface (avoid Centrifuge dependency here). node *centrifuge.Node } -func NewLocalSubscribersOutput(node *centrifuge.Node) *LocalSubscribersOutput { - return &LocalSubscribersOutput{node: node} +func NewLocalSubscribersFrameOutput(node *centrifuge.Node) *LocalSubscribersFrameOutput { + return &LocalSubscribersFrameOutput{node: node} } -const OutputTypeLocalSubscribers = "localSubscribers" +const FrameOutputTypeLocalSubscribers = "localSubscribers" -func (out *LocalSubscribersOutput) Type() string { - return OutputTypeLocalSubscribers +func (out *LocalSubscribersFrameOutput) Type() string { + return FrameOutputTypeLocalSubscribers } -func (out *LocalSubscribersOutput) Output(_ context.Context, vars OutputVars, frame *data.Frame) ([]*ChannelFrame, error) { +func (out *LocalSubscribersFrameOutput) OutputFrame(_ context.Context, vars Vars, frame *data.Frame) ([]*ChannelFrame, error) { channelID := vars.Channel channel := orgchannel.PrependOrgID(vars.OrgID, channelID) frameJSON, err := json.Marshal(frame) diff --git a/pkg/services/live/pipeline/frame_output_managed_stream.go b/pkg/services/live/pipeline/frame_output_managed_stream.go new file mode 100644 index 00000000000..cbc0393ed7a --- /dev/null +++ b/pkg/services/live/pipeline/frame_output_managed_stream.go @@ -0,0 +1,32 @@ +package pipeline + +import ( + "context" + + "github.com/grafana/grafana/pkg/services/live/managedstream" + + "github.com/grafana/grafana-plugin-sdk-go/data" +) + +type ManagedStreamFrameOutput struct { + managedStream *managedstream.Runner +} + +func NewManagedStreamFrameOutput(managedStream *managedstream.Runner) *ManagedStreamFrameOutput { + return &ManagedStreamFrameOutput{managedStream: managedStream} +} + +const FrameOutputTypeManagedStream = "managedStream" + +func (out *ManagedStreamFrameOutput) Type() string { + return FrameOutputTypeManagedStream +} + +func (out *ManagedStreamFrameOutput) OutputFrame(_ context.Context, vars Vars, frame *data.Frame) ([]*ChannelFrame, error) { + stream, err := out.managedStream.GetOrCreateStream(vars.OrgID, vars.Scope, vars.Namespace) + if err != nil { + logger.Error("Error getting stream", "error", err) + return nil, err + } + return nil, stream.Push(vars.Path, frame) +} diff --git a/pkg/services/live/pipeline/frame_output_multiple.go b/pkg/services/live/pipeline/frame_output_multiple.go new file mode 100644 index 00000000000..e5ffef7efa7 --- /dev/null +++ b/pkg/services/live/pipeline/frame_output_multiple.go @@ -0,0 +1,36 @@ +package pipeline + +import ( + "context" + + "github.com/grafana/grafana-plugin-sdk-go/data" +) + +// MultipleFrameOutput can combine several FrameOutputter and +// execute them sequentially. +type MultipleFrameOutput struct { + Outputters []FrameOutputter +} + +const FrameOutputTypeMultiple = "multiple" + +func (out *MultipleFrameOutput) Type() string { + return FrameOutputTypeMultiple +} + +func (out MultipleFrameOutput) OutputFrame(ctx context.Context, vars Vars, frame *data.Frame) ([]*ChannelFrame, error) { + var frames []*ChannelFrame + for _, out := range out.Outputters { + f, err := out.OutputFrame(ctx, vars, frame) + if err != nil { + logger.Error("Error outputting frame", "error", err) + return nil, err + } + frames = append(frames, f...) + } + return frames, nil +} + +func NewMultipleFrameOutput(outputters ...FrameOutputter) *MultipleFrameOutput { + return &MultipleFrameOutput{Outputters: outputters} +} diff --git a/pkg/services/live/pipeline/output_redirect.go b/pkg/services/live/pipeline/frame_output_redirect.go similarity index 50% rename from pkg/services/live/pipeline/output_redirect.go rename to pkg/services/live/pipeline/frame_output_redirect.go index 803a14f27f2..7edf15bbc5f 100644 --- a/pkg/services/live/pipeline/output_redirect.go +++ b/pkg/services/live/pipeline/frame_output_redirect.go @@ -12,23 +12,23 @@ type RedirectOutputConfig struct { Channel string `json:"channel"` } -// RedirectOutput passes processing control to the rule defined +// RedirectFrameOutput passes processing control to the rule defined // for a configured channel. -type RedirectOutput struct { +type RedirectFrameOutput struct { config RedirectOutputConfig } -func NewRedirectOutput(config RedirectOutputConfig) *RedirectOutput { - return &RedirectOutput{config: config} +func NewRedirectFrameOutput(config RedirectOutputConfig) *RedirectFrameOutput { + return &RedirectFrameOutput{config: config} } -const OutputTypeRedirect = "redirect" +const FrameOutputTypeRedirect = "redirect" -func (out *RedirectOutput) Type() string { - return OutputTypeRedirect +func (out *RedirectFrameOutput) Type() string { + return FrameOutputTypeRedirect } -func (out *RedirectOutput) Output(_ context.Context, vars OutputVars, frame *data.Frame) ([]*ChannelFrame, error) { +func (out *RedirectFrameOutput) OutputFrame(_ context.Context, vars Vars, frame *data.Frame) ([]*ChannelFrame, error) { if vars.Channel == out.config.Channel { return nil, fmt.Errorf("redirect to the same channel: %s", out.config.Channel) } diff --git a/pkg/services/live/pipeline/output_remote_write.go b/pkg/services/live/pipeline/frame_output_remote_write.go similarity index 83% rename from pkg/services/live/pipeline/output_remote_write.go rename to pkg/services/live/pipeline/frame_output_remote_write.go index 3ee0643b2c5..19fa3642041 100644 --- a/pkg/services/live/pipeline/output_remote_write.go +++ b/pkg/services/live/pipeline/frame_output_remote_write.go @@ -23,15 +23,15 @@ type RemoteWriteConfig struct { Password string `json:"password"` } -type RemoteWriteOutput struct { +type RemoteWriteFrameOutput struct { mu sync.Mutex config RemoteWriteConfig httpClient *http.Client buffer []prompb.TimeSeries } -func NewRemoteWriteOutput(config RemoteWriteConfig) *RemoteWriteOutput { - out := &RemoteWriteOutput{ +func NewRemoteWriteFrameOutput(config RemoteWriteConfig) *RemoteWriteFrameOutput { + out := &RemoteWriteFrameOutput{ config: config, httpClient: &http.Client{Timeout: 2 * time.Second}, } @@ -41,13 +41,13 @@ func NewRemoteWriteOutput(config RemoteWriteConfig) *RemoteWriteOutput { return out } -const OutputTypeRemoteWrite = "remoteWrite" +const FrameOutputTypeRemoteWrite = "remoteWrite" -func (out *RemoteWriteOutput) Type() string { - return OutputTypeRemoteWrite +func (out *RemoteWriteFrameOutput) Type() string { + return FrameOutputTypeRemoteWrite } -func (out *RemoteWriteOutput) flushPeriodically() { +func (out *RemoteWriteFrameOutput) flushPeriodically() { for range time.NewTicker(15 * time.Second).C { out.mu.Lock() if len(out.buffer) == 0 { @@ -70,7 +70,7 @@ func (out *RemoteWriteOutput) flushPeriodically() { } } -func (out *RemoteWriteOutput) flush(timeSeries []prompb.TimeSeries) error { +func (out *RemoteWriteFrameOutput) flush(timeSeries []prompb.TimeSeries) error { logger.Debug("Remote write flush", "num time series", len(timeSeries)) remoteWriteData, err := remotewrite.TimeSeriesToBytes(timeSeries) if err != nil { @@ -100,7 +100,7 @@ func (out *RemoteWriteOutput) flush(timeSeries []prompb.TimeSeries) error { return nil } -func (out *RemoteWriteOutput) Output(_ context.Context, _ OutputVars, frame *data.Frame) ([]*ChannelFrame, error) { +func (out *RemoteWriteFrameOutput) OutputFrame(_ context.Context, _ Vars, frame *data.Frame) ([]*ChannelFrame, error) { if out.config.Endpoint == "" { logger.Debug("Skip sending to remote write: no url") return nil, nil diff --git a/pkg/services/live/pipeline/output_threshold.go b/pkg/services/live/pipeline/frame_output_threshold.go similarity index 91% rename from pkg/services/live/pipeline/output_threshold.go rename to pkg/services/live/pipeline/frame_output_threshold.go index 48a26b2e678..30e1f523f12 100644 --- a/pkg/services/live/pipeline/output_threshold.go +++ b/pkg/services/live/pipeline/frame_output_threshold.go @@ -13,7 +13,7 @@ type ThresholdOutputConfig struct { Channel string `json:"channel"` } -//go:generate mockgen -destination=output_threshold_mock.go -package=pipeline github.com/grafana/grafana/pkg/services/live/pipeline FrameGetSetter +//go:generate mockgen -destination=frame_output_threshold_mock.go -package=pipeline github.com/grafana/grafana/pkg/services/live/pipeline FrameGetSetter type FrameGetSetter interface { Get(orgID int64, channel string) (*data.Frame, bool, error) @@ -31,13 +31,13 @@ func NewThresholdOutput(frameStorage FrameGetSetter, config ThresholdOutputConfi return &ThresholdOutput{frameStorage: frameStorage, config: config} } -const OutputTypeThreshold = "threshold" +const FrameOutputTypeThreshold = "threshold" func (out *ThresholdOutput) Type() string { - return OutputTypeThreshold + return FrameOutputTypeThreshold } -func (out *ThresholdOutput) Output(_ context.Context, vars OutputVars, frame *data.Frame) ([]*ChannelFrame, error) { +func (out *ThresholdOutput) OutputFrame(_ context.Context, vars Vars, frame *data.Frame) ([]*ChannelFrame, error) { if frame == nil { return nil, nil } diff --git a/pkg/services/live/pipeline/output_threshold_mock.go b/pkg/services/live/pipeline/frame_output_threshold_mock.go similarity index 100% rename from pkg/services/live/pipeline/output_threshold_mock.go rename to pkg/services/live/pipeline/frame_output_threshold_mock.go diff --git a/pkg/services/live/pipeline/output_threshold_test.go b/pkg/services/live/pipeline/frame_output_threshold_test.go similarity index 91% rename from pkg/services/live/pipeline/output_threshold_test.go rename to pkg/services/live/pipeline/frame_output_threshold_test.go index 6f063ea41ba..9dd9ac26d31 100644 --- a/pkg/services/live/pipeline/output_threshold_test.go +++ b/pkg/services/live/pipeline/frame_output_threshold_test.go @@ -17,7 +17,7 @@ func TestThresholdOutput_Output(t *testing.T) { } type args struct { in0 context.Context - vars OutputVars + vars Vars frame *data.Frame } tests := []struct { @@ -34,7 +34,7 @@ func TestThresholdOutput_Output(t *testing.T) { Channel: "test", }, }, - args: args{in0: context.Background(), vars: OutputVars{}, frame: nil}, + args: args{in0: context.Background(), vars: Vars{}, frame: nil}, wantErr: false, }, } @@ -44,8 +44,8 @@ func TestThresholdOutput_Output(t *testing.T) { frameStorage: tt.fields.frameStorage, config: tt.fields.config, } - if _, err := l.Output(tt.args.in0, tt.args.vars, tt.args.frame); (err != nil) != tt.wantErr { - t.Errorf("Output() error = %v, wantErr %v", err, tt.wantErr) + if _, err := l.OutputFrame(tt.args.in0, tt.args.vars, tt.args.frame); (err != nil) != tt.wantErr { + t.Errorf("OutputFrame() error = %v, wantErr %v", err, tt.wantErr) } }) } @@ -88,7 +88,7 @@ func TestThresholdOutput_NoPreviousFrame_SingleRow(t *testing.T) { frame := data.NewFrame("test", f1, f2) - channelFrames, err := outputter.Output(context.Background(), OutputVars{}, frame) + channelFrames, err := outputter.OutputFrame(context.Background(), Vars{}, frame) require.NoError(t, err) require.Len(t, channelFrames, 1) @@ -139,7 +139,7 @@ func TestThresholdOutput_NoPreviousFrame_MultipleRows(t *testing.T) { frame := data.NewFrame("test", f1, f2) - channelFrames, err := outputter.Output(context.Background(), OutputVars{}, frame) + channelFrames, err := outputter.OutputFrame(context.Background(), Vars{}, frame) require.NoError(t, err) require.Len(t, channelFrames, 1) @@ -198,7 +198,7 @@ func TestThresholdOutput_WithPreviousFrame_SingleRow(t *testing.T) { frame := data.NewFrame("test", f1, f2) - channelFrames, err := outputter.Output(context.Background(), OutputVars{}, frame) + channelFrames, err := outputter.OutputFrame(context.Background(), Vars{}, frame) require.NoError(t, err) require.Len(t, channelFrames, 0) } @@ -248,7 +248,7 @@ func TestThresholdOutput_WithPreviousFrame_MultipleRows(t *testing.T) { frame := data.NewFrame("test", f1, f2) - channelFrames, err := outputter.Output(context.Background(), OutputVars{}, frame) + channelFrames, err := outputter.OutputFrame(context.Background(), Vars{}, frame) require.NoError(t, err) require.Len(t, channelFrames, 1) } diff --git a/pkg/services/live/pipeline/frame_processor_drop_field.go b/pkg/services/live/pipeline/frame_processor_drop_field.go new file mode 100644 index 00000000000..a1ba48d5c0d --- /dev/null +++ b/pkg/services/live/pipeline/frame_processor_drop_field.go @@ -0,0 +1,43 @@ +package pipeline + +import ( + "context" + + "github.com/grafana/grafana-plugin-sdk-go/data" +) + +type DropFieldsFrameProcessorConfig struct { + FieldNames []string `json:"fieldNames"` +} + +// DropFieldsFrameProcessor can drop specified fields from a data.Frame. +type DropFieldsFrameProcessor struct { + config DropFieldsFrameProcessorConfig +} + +func removeIndex(s []*data.Field, index int) []*data.Field { + return append(s[:index], s[index+1:]...) +} + +func NewDropFieldsFrameProcessor(config DropFieldsFrameProcessorConfig) *DropFieldsFrameProcessor { + return &DropFieldsFrameProcessor{config: config} +} + +const FrameProcessorTypeDropFields = "dropFields" + +func (p *DropFieldsFrameProcessor) Type() string { + return FrameProcessorTypeDropFields +} + +func (p *DropFieldsFrameProcessor) ProcessFrame(_ context.Context, _ Vars, frame *data.Frame) (*data.Frame, error) { + for _, f := range p.config.FieldNames { + inner: + for i, field := range frame.Fields { + if f == field.Name { + frame.Fields = removeIndex(frame.Fields, i) + continue inner + } + } + } + return frame, nil +} diff --git a/pkg/services/live/pipeline/frame_processor_keep_field.go b/pkg/services/live/pipeline/frame_processor_keep_field.go new file mode 100644 index 00000000000..ea771dee2c9 --- /dev/null +++ b/pkg/services/live/pipeline/frame_processor_keep_field.go @@ -0,0 +1,46 @@ +package pipeline + +import ( + "context" + + "github.com/grafana/grafana-plugin-sdk-go/data" +) + +type KeepFieldsFrameProcessorConfig struct { + FieldNames []string `json:"fieldNames"` +} + +// KeepFieldsFrameProcessor can keep specified fields in a data.Frame dropping all other fields. +type KeepFieldsFrameProcessor struct { + config KeepFieldsFrameProcessorConfig +} + +func NewKeepFieldsFrameProcessor(config KeepFieldsFrameProcessorConfig) *KeepFieldsFrameProcessor { + return &KeepFieldsFrameProcessor{config: config} +} + +func stringInSlice(str string, slice []string) bool { + for _, s := range slice { + if s == str { + return true + } + } + return false +} + +const FrameProcessorTypeKeepFields = "keepFields" + +func (p *KeepFieldsFrameProcessor) Type() string { + return FrameProcessorTypeKeepFields +} + +func (p *KeepFieldsFrameProcessor) ProcessFrame(_ context.Context, _ Vars, frame *data.Frame) (*data.Frame, error) { + var fieldsToKeep []*data.Field + for _, field := range frame.Fields { + if stringInSlice(field.Name, p.config.FieldNames) { + fieldsToKeep = append(fieldsToKeep, field) + } + } + f := data.NewFrame(frame.Name, fieldsToKeep...) + return f, nil +} diff --git a/pkg/services/live/pipeline/frame_processor_multiple.go b/pkg/services/live/pipeline/frame_processor_multiple.go new file mode 100644 index 00000000000..d581b7ae6b1 --- /dev/null +++ b/pkg/services/live/pipeline/frame_processor_multiple.go @@ -0,0 +1,35 @@ +package pipeline + +import ( + "context" + + "github.com/grafana/grafana-plugin-sdk-go/data" +) + +// MultipleFrameProcessor can combine several FrameProcessor and +// execute them sequentially. +type MultipleFrameProcessor struct { + Processors []FrameProcessor +} + +const FrameProcessorTypeMultiple = "multiple" + +func (p *MultipleFrameProcessor) Type() string { + return FrameProcessorTypeMultiple +} + +func (p *MultipleFrameProcessor) ProcessFrame(ctx context.Context, vars Vars, frame *data.Frame) (*data.Frame, error) { + for _, p := range p.Processors { + var err error + frame, err = p.ProcessFrame(ctx, vars, frame) + if err != nil { + logger.Error("Error processing frame", "error", err) + return nil, err + } + } + return frame, nil +} + +func NewMultipleFrameProcessor(processors ...FrameProcessor) *MultipleFrameProcessor { + return &MultipleFrameProcessor{Processors: processors} +} diff --git a/pkg/services/live/pipeline/output_conditional.go b/pkg/services/live/pipeline/output_conditional.go deleted file mode 100644 index fef42cadebc..00000000000 --- a/pkg/services/live/pipeline/output_conditional.go +++ /dev/null @@ -1,33 +0,0 @@ -package pipeline - -import ( - "context" - - "github.com/grafana/grafana-plugin-sdk-go/data" -) - -type ConditionalOutput struct { - Condition ConditionChecker - Outputter Outputter -} - -func NewConditionalOutput(condition ConditionChecker, outputter Outputter) *ConditionalOutput { - return &ConditionalOutput{Condition: condition, Outputter: outputter} -} - -const OutputTypeConditional = "conditional" - -func (out *ConditionalOutput) Type() string { - return OutputTypeConditional -} - -func (out ConditionalOutput) Output(ctx context.Context, vars OutputVars, frame *data.Frame) ([]*ChannelFrame, error) { - ok, err := out.Condition.CheckCondition(ctx, frame) - if err != nil { - return nil, err - } - if !ok { - return nil, nil - } - return out.Outputter.Output(ctx, vars, frame) -} diff --git a/pkg/services/live/pipeline/output_managed_stream.go b/pkg/services/live/pipeline/output_managed_stream.go deleted file mode 100644 index 5ec929ec487..00000000000 --- a/pkg/services/live/pipeline/output_managed_stream.go +++ /dev/null @@ -1,32 +0,0 @@ -package pipeline - -import ( - "context" - - "github.com/grafana/grafana/pkg/services/live/managedstream" - - "github.com/grafana/grafana-plugin-sdk-go/data" -) - -type ManagedStreamOutput struct { - managedStream *managedstream.Runner -} - -func NewManagedStreamOutput(managedStream *managedstream.Runner) *ManagedStreamOutput { - return &ManagedStreamOutput{managedStream: managedStream} -} - -const OutputTypeManagedStream = "managedStream" - -func (out *ManagedStreamOutput) Type() string { - return OutputTypeManagedStream -} - -func (out *ManagedStreamOutput) Output(_ context.Context, vars OutputVars, frame *data.Frame) ([]*ChannelFrame, error) { - stream, err := out.managedStream.GetOrCreateStream(vars.OrgID, vars.Scope, vars.Namespace) - if err != nil { - logger.Error("Error getting stream", "error", err) - return nil, err - } - return nil, stream.Push(vars.Path, frame) -} diff --git a/pkg/services/live/pipeline/output_multiple.go b/pkg/services/live/pipeline/output_multiple.go deleted file mode 100644 index bd21934b438..00000000000 --- a/pkg/services/live/pipeline/output_multiple.go +++ /dev/null @@ -1,36 +0,0 @@ -package pipeline - -import ( - "context" - - "github.com/grafana/grafana-plugin-sdk-go/data" -) - -// MultipleOutput can combine several Outputter and -// execute them sequentially. -type MultipleOutput struct { - Outputters []Outputter -} - -const OutputTypeMultiple = "multiple" - -func (out *MultipleOutput) Type() string { - return OutputTypeMultiple -} - -func (out MultipleOutput) Output(ctx context.Context, vars OutputVars, frame *data.Frame) ([]*ChannelFrame, error) { - var frames []*ChannelFrame - for _, out := range out.Outputters { - f, err := out.Output(ctx, vars, frame) - if err != nil { - logger.Error("Error outputting frame", "error", err) - return nil, err - } - frames = append(frames, f...) - } - return frames, nil -} - -func NewMultipleOutput(outputters ...Outputter) *MultipleOutput { - return &MultipleOutput{Outputters: outputters} -} diff --git a/pkg/services/live/pipeline/pipeline.go b/pkg/services/live/pipeline/pipeline.go index ae881e8691a..3b7df5fb3e5 100644 --- a/pkg/services/live/pipeline/pipeline.go +++ b/pkg/services/live/pipeline/pipeline.go @@ -6,19 +6,63 @@ import ( "fmt" "os" - "github.com/grafana/grafana-plugin-sdk-go/backend" - "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana-plugin-sdk-go/live" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/exporters/jaeger" + "go.opentelemetry.io/otel/sdk/resource" + tracesdk "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + "go.opentelemetry.io/otel/trace" ) +const ( + service = "grafana" + environment = "dev" + id = 1 +) + +// tracerProvider returns an OpenTelemetry TracerProvider configured to use +// the Jaeger exporter that will send spans to the provided url. The returned +// TracerProvider will also use a Resource configured with all the information +// about the application. +func tracerProvider(url string) (*tracesdk.TracerProvider, error) { + // Create the Jaeger exporter + exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url))) + if err != nil { + return nil, err + } + tp := tracesdk.NewTracerProvider( + // Always be sure to batch in production. + tracesdk.WithBatcher(exp), + // Record information about this application in an Resource. + tracesdk.WithResource(resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceNameKey.String(service), + attribute.String("environment", environment), + attribute.Int64("ID", id), + )), + ) + return tp, nil +} + +// ChannelData is a wrapper over raw data with additional channel information. +// Channel is used for rule routing, if the channel is empty then data processing +// stops. If channel is not empty then data processing will be redirected to a +// corresponding channel rule. +type ChannelData struct { + Channel string + Data []byte +} + // ChannelFrame is a wrapper over data.Frame with additional channel information. // Channel is used for rule routing, if the channel is empty then frame processing -// will try to take current rule Processor and Outputter. If channel is not empty +// will try to take current rule FrameProcessor and FrameOutputter. If channel is not empty // then frame processing will be redirected to a corresponding channel rule. -// TODO: avoid recursion, increment a counter while frame travels over pipeline steps, make it configurable. type ChannelFrame struct { Channel string `json:"channel"` Frame *data.Frame `json:"frame"` @@ -33,14 +77,10 @@ type Vars struct { Path string } -// ProcessorVars has some helpful things Processor entities could use. -type ProcessorVars struct { - Vars -} - -// OutputVars has some helpful things Outputter entities could use. -type OutputVars struct { - ProcessorVars +// DataOutputter can output incoming data before conversion to frames. +type DataOutputter interface { + Type() string + OutputData(ctx context.Context, vars Vars, data []byte) ([]*ChannelData, error) } // Converter converts raw bytes to slice of ChannelFrame. Each element @@ -51,17 +91,17 @@ type Converter interface { Convert(ctx context.Context, vars Vars, body []byte) ([]*ChannelFrame, error) } -// Processor can modify data.Frame in a custom way before it will be outputted. -type Processor interface { +// FrameProcessor can modify data.Frame in a custom way before it will be outputted. +type FrameProcessor interface { Type() string - Process(ctx context.Context, vars ProcessorVars, frame *data.Frame) (*data.Frame, error) + ProcessFrame(ctx context.Context, vars Vars, frame *data.Frame) (*data.Frame, error) } -// Outputter outputs data.Frame to a custom destination. Or simply +// FrameOutputter outputs data.Frame to a custom destination. Or simply // do nothing if some conditions not met. -type Outputter interface { +type FrameOutputter interface { Type() string - Output(ctx context.Context, vars OutputVars, frame *data.Frame) ([]*ChannelFrame, error) + OutputFrame(ctx context.Context, vars Vars, frame *data.Frame) ([]*ChannelFrame, error) } // Subscriber can handle channel subscribe events. @@ -70,15 +110,28 @@ type Subscriber interface { Subscribe(ctx context.Context, vars Vars) (models.SubscribeReply, backend.SubscribeStreamStatus, error) } -// LiveChannelRule is an in-memory representation of each specific rule, with Converter, Processor -// and Outputter to be executed by Pipeline. +// PublishAuthChecker checks whether current user can publish to a channel. +type PublishAuthChecker interface { + CanPublish(ctx context.Context, u *models.SignedInUser) (bool, error) +} + +// SubscribeAuthChecker checks whether current user can subscribe to a channel. +type SubscribeAuthChecker interface { + CanSubscribe(ctx context.Context, u *models.SignedInUser) (bool, error) +} + +// LiveChannelRule is an in-memory representation of each specific rule, with Converter, FrameProcessor +// and FrameOutputter to be executed by Pipeline. type LiveChannelRule struct { - OrgId int64 - Pattern string - Subscriber Subscriber - Converter Converter - Processor Processor - Outputter Outputter + OrgId int64 + Pattern string + PublishAuth PublishAuthChecker + SubscribeAuth SubscribeAuthChecker + Subscribers []Subscriber + DataOutputters []DataOutputter + Converter Converter + FrameProcessors []FrameProcessor + FrameOutputters []FrameOutputter } // Label ... @@ -107,6 +160,7 @@ type ChannelRuleGetter interface { // * output resulting frames to various destinations. type Pipeline struct { ruleGetter ChannelRuleGetter + tracer trace.Tracer } // New creates new Pipeline. @@ -114,9 +168,24 @@ func New(ruleGetter ChannelRuleGetter) (*Pipeline, error) { p := &Pipeline{ ruleGetter: ruleGetter, } + + if os.Getenv("GF_LIVE_PIPELINE_TRACE") != "" { + // Traces for development only at the moment. + // Start local Jaeger and then run Grafana with GF_LIVE_PIPELINE_TRACE: + // docker run --rm -it --name jaeger -e COLLECTOR_ZIPKIN_HOST_PORT=:9411 -p 5775:5775/udp -p 6831:6831/udp -p 6832:6832/udp -p 5778:5778 -p 16686:16686 -p 14268:14268 -p 14250:14250 -p 9411:9411 jaegertracing/all-in-one:1.26 + // Then visit http://localhost:16686/ where Jaeger UI is served. + tp, err := tracerProvider("http://localhost:14268/api/traces") + if err != nil { + return nil, err + } + tracer := tp.Tracer("gf.live.pipeline") + p.tracer = tracer + } + if os.Getenv("GF_LIVE_PIPELINE_DEV") != "" { go postTestData() // TODO: temporary for development, remove before merge. } + return p, nil } @@ -125,6 +194,37 @@ func (p *Pipeline) Get(orgID int64, channel string) (*LiveChannelRule, bool, err } func (p *Pipeline) ProcessInput(ctx context.Context, orgID int64, channelID string, body []byte) (bool, error) { + var span trace.Span + if p.tracer != nil { + ctx, span = p.tracer.Start(ctx, "live.pipeline.process_input") + span.SetAttributes( + attribute.Int64("orgId", orgID), + attribute.String("channel", channelID), + attribute.String("body", string(body)), + ) + defer span.End() + } + ok, err := p.processInput(ctx, orgID, channelID, body, nil) + if err != nil { + if p.tracer != nil && span != nil { + span.SetStatus(codes.Error, err.Error()) + } + return ok, err + } + return ok, err +} + +func (p *Pipeline) processInput(ctx context.Context, orgID int64, channelID string, body []byte, visitedChannels map[string]struct{}) (bool, error) { + var span trace.Span + if p.tracer != nil { + ctx, span = p.tracer.Start(ctx, "live.pipeline.process_input_"+channelID) + span.SetAttributes( + attribute.Int64("orgId", orgID), + attribute.String("channel", channelID), + attribute.String("body", string(body)), + ) + defer span.End() + } rule, ok, err := p.ruleGetter.Get(orgID, channelID) if err != nil { return false, err @@ -132,13 +232,23 @@ func (p *Pipeline) ProcessInput(ctx context.Context, orgID int64, channelID stri if !ok { return false, nil } - channelFrames, ok, err := p.DataToChannelFrames(ctx, *rule, orgID, channelID, body) + if visitedChannels == nil { + visitedChannels = map[string]struct{}{} + } + if len(rule.DataOutputters) > 0 { + channelDataList := []*ChannelData{{Channel: channelID, Data: body}} + err = p.processChannelDataList(ctx, orgID, channelID, channelDataList, visitedChannels) + if err != nil { + return false, err + } + } + if rule.Converter == nil { + return false, nil + } + channelFrames, err := p.DataToChannelFrames(ctx, *rule, orgID, channelID, body) if err != nil { return false, err } - if !ok { - return false, nil - } err = p.processChannelFrames(ctx, orgID, channelID, channelFrames, nil) if err != nil { return false, fmt.Errorf("error processing frame: %w", err) @@ -146,15 +256,21 @@ func (p *Pipeline) ProcessInput(ctx context.Context, orgID int64, channelID stri return true, nil } -func (p *Pipeline) DataToChannelFrames(ctx context.Context, rule LiveChannelRule, orgID int64, channelID string, body []byte) ([]*ChannelFrame, bool, error) { - if rule.Converter == nil { - return nil, false, nil +func (p *Pipeline) DataToChannelFrames(ctx context.Context, rule LiveChannelRule, orgID int64, channelID string, body []byte) ([]*ChannelFrame, error) { + var span trace.Span + if p.tracer != nil { + ctx, span = p.tracer.Start(ctx, "live.pipeline.convert_"+rule.Converter.Type()) + span.SetAttributes( + attribute.Int64("orgId", orgID), + attribute.String("channel", channelID), + ) + defer span.End() } channel, err := live.ParseChannel(channelID) if err != nil { logger.Error("Error parsing channel", "error", err, "channel", channelID) - return nil, false, err + return nil, err } vars := Vars{ @@ -168,14 +284,40 @@ func (p *Pipeline) DataToChannelFrames(ctx context.Context, rule LiveChannelRule frames, err := rule.Converter.Convert(ctx, vars, body) if err != nil { logger.Error("Error converting data", "error", err) - return nil, false, err + return nil, err } - return frames, true, nil + return frames, nil } var errChannelRecursion = errors.New("channel recursion") +func (p *Pipeline) processChannelDataList(ctx context.Context, orgID int64, channelID string, channelDataList []*ChannelData, visitedChannels map[string]struct{}) error { + for _, channelData := range channelDataList { + var nextChannel = channelID + if channelData.Channel != "" { + nextChannel = channelData.Channel + } + if _, ok := visitedChannels[nextChannel]; ok { + return fmt.Errorf("%w: %s", errChannelRecursion, nextChannel) + } + visitedChannels[nextChannel] = struct{}{} + newChannelDataList, err := p.processData(ctx, orgID, nextChannel, channelData.Data) + if err != nil { + return err + } + if len(newChannelDataList) > 0 { + for _, cd := range newChannelDataList { + _, err := p.processInput(ctx, orgID, cd.Channel, cd.Data, visitedChannels) + if err != nil { + return err + } + } + } + } + return nil +} + func (p *Pipeline) processChannelFrames(ctx context.Context, orgID int64, channelID string, channelFrames []*ChannelFrame, visitedChannels map[string]struct{}) error { if visitedChannels == nil { visitedChannels = map[string]struct{}{} @@ -204,6 +346,20 @@ func (p *Pipeline) processChannelFrames(ctx context.Context, orgID int64, channe } func (p *Pipeline) processFrame(ctx context.Context, orgID int64, channelID string, frame *data.Frame) ([]*ChannelFrame, error) { + var span trace.Span + if p.tracer != nil { + table, err := frame.StringTable(32, 32) + if err != nil { + return nil, err + } + ctx, span = p.tracer.Start(ctx, "live.pipeline.process_frame_"+channelID) + span.SetAttributes( + attribute.Int64("orgId", orgID), + attribute.String("channel", channelID), + attribute.String("frame", table), + ) + defer span.End() + } rule, ruleOk, err := p.ruleGetter.Get(orgID, channelID) if err != nil { logger.Error("Error getting rule", "error", err) @@ -220,39 +376,144 @@ func (p *Pipeline) processFrame(ctx context.Context, orgID int64, channelID stri return nil, err } - vars := ProcessorVars{ - Vars: Vars{ - OrgID: orgID, - Channel: channelID, - Scope: ch.Scope, - Namespace: ch.Namespace, - Path: ch.Path, - }, + vars := Vars{ + OrgID: orgID, + Channel: channelID, + Scope: ch.Scope, + Namespace: ch.Namespace, + Path: ch.Path, } - if rule.Processor != nil { - frame, err = rule.Processor.Process(ctx, vars, frame) - if err != nil { - logger.Error("Error processing frame", "error", err) - return nil, err - } - if frame == nil { - return nil, nil + if len(rule.FrameProcessors) > 0 { + for _, proc := range rule.FrameProcessors { + frame, err = p.execProcessor(ctx, proc, vars, frame) + if err != nil { + logger.Error("Error processing frame", "error", err) + return nil, err + } + if frame == nil { + return nil, nil + } } } - outputVars := OutputVars{ - ProcessorVars: vars, - } - - if rule.Outputter != nil { - frames, err := rule.Outputter.Output(ctx, outputVars, frame) - if err != nil { - logger.Error("Error outputting frame", "error", err) - return nil, err + if len(rule.FrameOutputters) > 0 { + var resultingFrames []*ChannelFrame + for _, out := range rule.FrameOutputters { + frames, err := p.processFrameOutput(ctx, out, vars, frame) + if err != nil { + logger.Error("Error outputting frame", "error", err) + return nil, err + } + resultingFrames = append(resultingFrames, frames...) } - return frames, nil + return resultingFrames, nil } return nil, nil } + +func (p *Pipeline) execProcessor(ctx context.Context, proc FrameProcessor, vars Vars, frame *data.Frame) (*data.Frame, error) { + var span trace.Span + if p.tracer != nil { + ctx, span = p.tracer.Start(ctx, "live.pipeline.apply_processor_"+proc.Type()) + table, err := frame.StringTable(32, 32) + if err != nil { + return nil, err + } + span.SetAttributes( + attribute.Int64("orgId", vars.OrgID), + attribute.String("channel", vars.Channel), + attribute.String("frame", table), + attribute.String("processor", proc.Type()), + ) + // Note: we can also visualize resulting frame here. + defer span.End() + } + return proc.ProcessFrame(ctx, vars, frame) +} + +func (p *Pipeline) processFrameOutput(ctx context.Context, out FrameOutputter, vars Vars, frame *data.Frame) ([]*ChannelFrame, error) { + var span trace.Span + if p.tracer != nil { + ctx, span = p.tracer.Start(ctx, "live.pipeline.frame_output_"+out.Type()) + table, err := frame.StringTable(32, 32) + if err != nil { + return nil, err + } + span.SetAttributes( + attribute.Int64("orgId", vars.OrgID), + attribute.String("channel", vars.Channel), + attribute.String("frame", table), + attribute.String("output", out.Type()), + ) + defer span.End() + } + return out.OutputFrame(ctx, vars, frame) +} + +func (p *Pipeline) processData(ctx context.Context, orgID int64, channelID string, data []byte) ([]*ChannelData, error) { + var span trace.Span + if p.tracer != nil { + ctx, span = p.tracer.Start(ctx, "live.pipeline.process_data_"+channelID) + span.SetAttributes( + attribute.Int64("orgId", orgID), + attribute.String("channel", channelID), + attribute.String("data", string(data)), + ) + defer span.End() + } + rule, ruleOk, err := p.ruleGetter.Get(orgID, channelID) + if err != nil { + logger.Error("Error getting rule", "error", err) + return nil, err + } + if !ruleOk { + logger.Debug("Rule not found", "channel", channelID) + return nil, err + } + + ch, err := live.ParseChannel(channelID) + if err != nil { + logger.Error("Error parsing channel", "error", err, "channel", channelID) + return nil, err + } + + vars := Vars{ + OrgID: orgID, + Channel: channelID, + Scope: ch.Scope, + Namespace: ch.Namespace, + Path: ch.Path, + } + + if len(rule.DataOutputters) > 0 { + var resultingChannelDataList []*ChannelData + for _, out := range rule.DataOutputters { + channelDataList, err := p.processDataOutput(ctx, out, vars, data) + if err != nil { + logger.Error("Error outputting frame", "error", err) + return nil, err + } + resultingChannelDataList = append(resultingChannelDataList, channelDataList...) + } + return resultingChannelDataList, nil + } + + return nil, nil +} + +func (p *Pipeline) processDataOutput(ctx context.Context, out DataOutputter, vars Vars, data []byte) ([]*ChannelData, error) { + var span trace.Span + if p.tracer != nil { + ctx, span = p.tracer.Start(ctx, "live.pipeline.data_output_"+out.Type()) + span.SetAttributes( + attribute.Int64("orgId", vars.OrgID), + attribute.String("channel", vars.Channel), + attribute.String("data", string(data)), + attribute.String("output", out.Type()), + ) + defer span.End() + } + return out.OutputData(ctx, vars, data) +} diff --git a/pkg/services/live/pipeline/pipeline_test.go b/pkg/services/live/pipeline/pipeline_test.go index 95b49571b7d..fc3bb61b326 100644 --- a/pkg/services/live/pipeline/pipeline_test.go +++ b/pkg/services/live/pipeline/pipeline_test.go @@ -62,7 +62,7 @@ func (t *testProcessor) Type() string { return "test" } -func (t *testProcessor) Process(_ context.Context, _ ProcessorVars, frame *data.Frame) (*data.Frame, error) { +func (t *testProcessor) ProcessFrame(_ context.Context, _ Vars, frame *data.Frame) (*data.Frame, error) { return frame, nil } @@ -75,7 +75,7 @@ func (t *testOutputter) Type() string { return "test" } -func (t *testOutputter) Output(_ context.Context, _ OutputVars, frame *data.Frame) ([]*ChannelFrame, error) { +func (t *testOutputter) OutputFrame(_ context.Context, _ Vars, frame *data.Frame) ([]*ChannelFrame, error) { if t.err != nil { return nil, t.err } @@ -88,9 +88,9 @@ func TestPipeline(t *testing.T) { p, err := New(&testRuleGetter{ rules: map[string]*LiveChannelRule{ "stream/test/xxx": { - Converter: &testConverter{"", data.NewFrame("test")}, - Processor: &testProcessor{}, - Outputter: outputter, + Converter: &testConverter{"", data.NewFrame("test")}, + FrameProcessors: []FrameProcessor{&testProcessor{}}, + FrameOutputters: []FrameOutputter{outputter}, }, }, }) @@ -107,9 +107,9 @@ func TestPipeline_OutputError(t *testing.T) { p, err := New(&testRuleGetter{ rules: map[string]*LiveChannelRule{ "stream/test/xxx": { - Converter: &testConverter{"", data.NewFrame("test")}, - Processor: &testProcessor{}, - Outputter: outputter, + Converter: &testConverter{"", data.NewFrame("test")}, + FrameProcessors: []FrameProcessor{&testProcessor{}}, + FrameOutputters: []FrameOutputter{outputter}, }, }, }) @@ -123,15 +123,19 @@ func TestPipeline_Recursion(t *testing.T) { rules: map[string]*LiveChannelRule{ "stream/test/xxx": { Converter: &testConverter{"", data.NewFrame("test")}, - Outputter: NewRedirectOutput(RedirectOutputConfig{ - Channel: "stream/test/yyy", - }), + FrameOutputters: []FrameOutputter{ + NewRedirectFrameOutput(RedirectOutputConfig{ + Channel: "stream/test/yyy", + }), + }, }, "stream/test/yyy": { Converter: &testConverter{"", data.NewFrame("test")}, - Outputter: NewRedirectOutput(RedirectOutputConfig{ - Channel: "stream/test/xxx", - }), + FrameOutputters: []FrameOutputter{ + NewRedirectFrameOutput(RedirectOutputConfig{ + Channel: "stream/test/xxx", + }), + }, }, }, }) diff --git a/pkg/services/live/pipeline/processor_drop_field.go b/pkg/services/live/pipeline/processor_drop_field.go deleted file mode 100644 index e0917269cea..00000000000 --- a/pkg/services/live/pipeline/processor_drop_field.go +++ /dev/null @@ -1,43 +0,0 @@ -package pipeline - -import ( - "context" - - "github.com/grafana/grafana-plugin-sdk-go/data" -) - -type DropFieldsProcessorConfig struct { - FieldNames []string `json:"fieldNames"` -} - -// DropFieldsProcessor can drop specified fields from a data.Frame. -type DropFieldsProcessor struct { - config DropFieldsProcessorConfig -} - -func removeIndex(s []*data.Field, index int) []*data.Field { - return append(s[:index], s[index+1:]...) -} - -func NewDropFieldsProcessor(config DropFieldsProcessorConfig) *DropFieldsProcessor { - return &DropFieldsProcessor{config: config} -} - -const ProcessorTypeDropFields = "dropFields" - -func (p *DropFieldsProcessor) Type() string { - return ProcessorTypeDropFields -} - -func (p *DropFieldsProcessor) Process(_ context.Context, _ ProcessorVars, frame *data.Frame) (*data.Frame, error) { - for _, f := range p.config.FieldNames { - inner: - for i, field := range frame.Fields { - if f == field.Name { - frame.Fields = removeIndex(frame.Fields, i) - continue inner - } - } - } - return frame, nil -} diff --git a/pkg/services/live/pipeline/processor_keep_field.go b/pkg/services/live/pipeline/processor_keep_field.go deleted file mode 100644 index a4943142808..00000000000 --- a/pkg/services/live/pipeline/processor_keep_field.go +++ /dev/null @@ -1,46 +0,0 @@ -package pipeline - -import ( - "context" - - "github.com/grafana/grafana-plugin-sdk-go/data" -) - -type KeepFieldsProcessorConfig struct { - FieldNames []string `json:"fieldNames"` -} - -// KeepFieldsProcessor can keep specified fields in a data.Frame dropping all other fields. -type KeepFieldsProcessor struct { - config KeepFieldsProcessorConfig -} - -func NewKeepFieldsProcessor(config KeepFieldsProcessorConfig) *KeepFieldsProcessor { - return &KeepFieldsProcessor{config: config} -} - -func stringInSlice(str string, slice []string) bool { - for _, s := range slice { - if s == str { - return true - } - } - return false -} - -const ProcessorTypeKeepFields = "keepFields" - -func (p *KeepFieldsProcessor) Type() string { - return ProcessorTypeKeepFields -} - -func (p *KeepFieldsProcessor) Process(_ context.Context, _ ProcessorVars, frame *data.Frame) (*data.Frame, error) { - var fieldsToKeep []*data.Field - for _, field := range frame.Fields { - if stringInSlice(field.Name, p.config.FieldNames) { - fieldsToKeep = append(fieldsToKeep, field) - } - } - f := data.NewFrame(frame.Name, fieldsToKeep...) - return f, nil -} diff --git a/pkg/services/live/pipeline/processor_multiple.go b/pkg/services/live/pipeline/processor_multiple.go deleted file mode 100644 index 91f228db632..00000000000 --- a/pkg/services/live/pipeline/processor_multiple.go +++ /dev/null @@ -1,35 +0,0 @@ -package pipeline - -import ( - "context" - - "github.com/grafana/grafana-plugin-sdk-go/data" -) - -// MultipleProcessor can combine several Processor and -// execute them sequentially. -type MultipleProcessor struct { - Processors []Processor -} - -const ProcessorTypeMultiple = "multiple" - -func (p *MultipleProcessor) Type() string { - return ProcessorTypeMultiple -} - -func (p *MultipleProcessor) Process(ctx context.Context, vars ProcessorVars, frame *data.Frame) (*data.Frame, error) { - for _, p := range p.Processors { - var err error - frame, err = p.Process(ctx, vars, frame) - if err != nil { - logger.Error("Error processing frame", "error", err) - return nil, err - } - } - return frame, nil -} - -func NewMultipleProcessor(processors ...Processor) *MultipleProcessor { - return &MultipleProcessor{Processors: processors} -} diff --git a/pkg/services/live/pipeline/registry.go b/pkg/services/live/pipeline/registry.go index 18c0ab17474..351031315f2 100644 --- a/pkg/services/live/pipeline/registry.go +++ b/pkg/services/live/pipeline/registry.go @@ -15,52 +15,45 @@ var SubscribersRegistry = []EntityInfo{ Type: SubscriberTypeManagedStream, Description: "apply managed stream subscribe logic", }, - { - Type: SubscriberTypeMultiple, - Description: "apply multiple subscribers", - }, - { - Type: SubscriberTypeAuthorizeRole, - Description: "authorize user role", - }, } -var OutputsRegistry = []EntityInfo{ +var FrameOutputsRegistry = []EntityInfo{ { - Type: OutputTypeManagedStream, - Description: "Only send schema when structure changes. Note this also requires a matching subscriber", + Type: FrameOutputTypeManagedStream, + Description: "only send schema when structure changes (note this also requires a matching subscriber)", Example: ManagedStreamOutputConfig{}, }, { - Type: OutputTypeMultiple, - Description: "Send the output to multiple destinations", - Example: MultipleOutputterConfig{}, - }, - { - Type: OutputTypeConditional, + Type: FrameOutputTypeConditional, Description: "send to an output depending on frame values", Example: ConditionalOutputConfig{}, }, { - Type: OutputTypeRedirect, + Type: FrameOutputTypeRedirect, + Description: "redirect for processing by another channel rule", }, { - Type: OutputTypeThreshold, + Type: FrameOutputTypeThreshold, + Description: "output field threshold boundaries cross into new channel", }, { - Type: OutputTypeChangeLog, + Type: FrameOutputTypeChangeLog, + Description: "output field changes into new channel", }, { - Type: OutputTypeRemoteWrite, + Type: FrameOutputTypeRemoteWrite, + Description: "output to remote write endpoint", }, } var ConvertersRegistry = []EntityInfo{ { - Type: ConverterTypeJsonAuto, + Type: ConverterTypeJsonAuto, + Description: "automatic recursive JSON to Frame conversion", }, { - Type: ConverterTypeJsonExact, + Type: ConverterTypeJsonExact, + Description: "JSON to Frame conversion according to exact list of fields", }, { Type: ConverterTypeInfluxAuto, @@ -68,24 +61,31 @@ var ConvertersRegistry = []EntityInfo{ Example: AutoInfluxConverterConfig{}, }, { - Type: ConverterTypeJsonFrame, + Type: ConverterTypeJsonFrame, + Description: "JSON-encoded Grafana data frame", }, } -var ProcessorsRegistry = []EntityInfo{ +var FrameProcessorsRegistry = []EntityInfo{ { - Type: ProcessorTypeKeepFields, + Type: FrameProcessorTypeKeepFields, Description: "list the fields that should stay", - Example: KeepFieldsProcessorConfig{}, + Example: KeepFieldsFrameProcessorConfig{}, }, { - Type: ProcessorTypeDropFields, + Type: FrameProcessorTypeDropFields, Description: "list the fields that should be removed", - Example: DropFieldsProcessorConfig{}, - }, - { - Type: ProcessorTypeMultiple, - Description: "apply multiple processors", - Example: MultipleProcessorConfig{}, + Example: DropFieldsFrameProcessorConfig{}, + }, +} + +var DataOutputsRegistry = []EntityInfo{ + { + Type: DataOutputTypeBuiltin, + Description: "use builtin publish handler", + }, + { + Type: DataOutputTypeRedirect, + Description: "redirect data processing to another channel rule", }, } diff --git a/pkg/services/live/pipeline/rule_cache_segmented.go b/pkg/services/live/pipeline/rule_cache_segmented.go index 26bb1fcb4fa..499f13c43d2 100644 --- a/pkg/services/live/pipeline/rule_cache_segmented.go +++ b/pkg/services/live/pipeline/rule_cache_segmented.go @@ -2,6 +2,7 @@ package pipeline import ( "context" + "fmt" "sync" "time" @@ -65,7 +66,7 @@ func (s *CacheSegmentedTree) Get(orgID int64, channel string) (*LiveChannelRule, if !ok { err := s.fillOrg(orgID) if err != nil { - return nil, false, err + return nil, false, fmt.Errorf("error filling org: %w", err) } } s.radixMu.RLock() diff --git a/pkg/services/live/pipeline/rule_cache_segmented_test.go b/pkg/services/live/pipeline/rule_cache_segmented_test.go index 23462274c79..a03ec9b368d 100644 --- a/pkg/services/live/pipeline/rule_cache_segmented_test.go +++ b/pkg/services/live/pipeline/rule_cache_segmented_test.go @@ -21,9 +21,11 @@ func (t *testBuilder) BuildRules(_ context.Context, _ int64) ([]*LiveChannelRule Pattern: "stream/telegraf/:metric", }, { - OrgId: 1, - Pattern: "stream/telegraf/:metric/:extra", - Outputter: NewRedirectOutput(RedirectOutputConfig{}), + OrgId: 1, + Pattern: "stream/telegraf/:metric/:extra", + FrameOutputters: []FrameOutputter{ + NewRedirectFrameOutput(RedirectOutputConfig{}), + }, }, { OrgId: 1, @@ -48,7 +50,7 @@ func TestStorage_Get(t *testing.T) { rule, ok, err = s.Get(1, "stream/telegraf/mem/rss") require.NoError(t, err) require.True(t, ok) - require.Equal(t, OutputTypeRedirect, rule.Outputter.Type()) + require.Equal(t, FrameOutputTypeRedirect, rule.FrameOutputters[0].Type()) rule, ok, err = s.Get(1, "stream/booms") require.NoError(t, err) diff --git a/pkg/services/live/pipeline/subscribe_authorize_role.go b/pkg/services/live/pipeline/subscribe_authorize_role.go deleted file mode 100644 index 069825b15d6..00000000000 --- a/pkg/services/live/pipeline/subscribe_authorize_role.go +++ /dev/null @@ -1,39 +0,0 @@ -package pipeline - -import ( - "context" - - "github.com/grafana/grafana/pkg/services/live/livecontext" - - "github.com/grafana/grafana-plugin-sdk-go/backend" - "github.com/grafana/grafana/pkg/models" -) - -type AuthorizeRoleSubscriberConfig struct { - Role models.RoleType `json:"role,omitempty"` -} - -type AuthorizeRoleSubscriber struct { - config AuthorizeRoleSubscriberConfig -} - -func NewAuthorizeRoleSubscriber(config AuthorizeRoleSubscriberConfig) *AuthorizeRoleSubscriber { - return &AuthorizeRoleSubscriber{config: config} -} - -const SubscriberTypeAuthorizeRole = "authorizeRole" - -func (s *AuthorizeRoleSubscriber) Type() string { - return SubscriberTypeAuthorizeRole -} - -func (s *AuthorizeRoleSubscriber) Subscribe(ctx context.Context, _ Vars) (models.SubscribeReply, backend.SubscribeStreamStatus, error) { - u, ok := livecontext.GetContextSignedUser(ctx) - if !ok { - return models.SubscribeReply{}, backend.SubscribeStreamStatusPermissionDenied, nil - } - if u.HasRole(s.config.Role) { - return models.SubscribeReply{}, backend.SubscribeStreamStatusOK, nil - } - return models.SubscribeReply{}, backend.SubscribeStreamStatusPermissionDenied, nil -} diff --git a/pkg/services/live/pipeline/subscribe_authorize_role_test.go b/pkg/services/live/pipeline/subscribe_authorize_role_test.go deleted file mode 100644 index 1d87d396e20..00000000000 --- a/pkg/services/live/pipeline/subscribe_authorize_role_test.go +++ /dev/null @@ -1,34 +0,0 @@ -package pipeline - -import ( - "context" - "testing" - - "github.com/grafana/grafana/pkg/models" - "github.com/grafana/grafana/pkg/services/live/livecontext" - - "github.com/grafana/grafana-plugin-sdk-go/backend" - "github.com/stretchr/testify/require" -) - -func TestAuthorizeRoleSubscriber_Subscribe_PermissionDenied(t *testing.T) { - ctx := context.Background() - ctx = livecontext.SetContextSignedUser(ctx, &models.SignedInUser{OrgRole: models.ROLE_EDITOR}) - s := NewAuthorizeRoleSubscriber(AuthorizeRoleSubscriberConfig{ - Role: models.ROLE_ADMIN, - }) - _, status, err := s.Subscribe(ctx, Vars{}) - require.NoError(t, err) - require.Equal(t, backend.SubscribeStreamStatusPermissionDenied, status) -} - -func TestAuthorizeRoleSubscriber_Subscribe_OK(t *testing.T) { - ctx := context.Background() - ctx = livecontext.SetContextSignedUser(ctx, &models.SignedInUser{OrgRole: models.ROLE_ADMIN}) - s := NewAuthorizeRoleSubscriber(AuthorizeRoleSubscriberConfig{ - Role: models.ROLE_ADMIN, - }) - _, status, err := s.Subscribe(ctx, Vars{}) - require.NoError(t, err) - require.Equal(t, backend.SubscribeStreamStatusOK, status) -}