Files
grafana/pkg/tsdb/cloudwatch/cloudwatch.go
Will Browne b80fbe03f0 Plugins: Refactor Plugin Management (#40477)
* add core plugin flow

* add instrumentation

* move func

* remove cruft

* support external backend plugins

* refactor + clean up

* remove comments

* refactor loader

* simplify core plugin path arg

* cleanup loggers

* move signature validator to plugins package

* fix sig packaging

* cleanup plugin model

* remove unnecessary plugin field

* add start+stop for pm

* fix failures

* add decommissioned state

* export fields just to get things flowing

* fix comments

* set static routes

* make image loading idempotent

* merge with backend plugin manager

* re-use funcs

* reorder imports + remove unnecessary interface

* add some TODOs + remove unused func

* remove unused instrumentation func

* simplify client usage

* remove import alias

* re-use backendplugin.Plugin interface

* re order funcs

* improve var name

* fix log statements

* refactor data model

* add logic for dupe check during loading

* cleanup state setting

* refactor loader

* cleanup manager interface

* add rendering flow

* refactor loading + init

* add renderer support

* fix renderer plugin

* reformat imports

* track errors

* fix plugin signature inheritance

* name param in interface

* update func comment

* fix func arg name

* introduce class concept

* remove func

* fix external plugin check

* apply changes from pm-experiment

* fix core plugins

* fix imports

* rename interface

* comment API interface

* add support for testdata plugin

* enable alerting + use correct core plugin contracts

* slim manager API

* fix param name

* fix filter

* support static routes

* fix rendering

* tidy rendering

* get tests compiling

* fix install+uninstall

* start finder test

* add finder test coverage

* start loader tests

* add test for core plugins

* load core + bundled test

* add test for nested plugin loading

* add test files

* clean interface + fix registering some core plugins

* refactoring

* reformat and create sub packages

* simplify core plugin init

* fix ctx cancel scenario

* migrate initializer

* remove Init() funcs

* add test starter

* new logger

* flesh out initializer tests

* refactoring

* remove unused svc

* refactor rendering flow

* fixup loader tests

* add enabled helper func

* fix logger name

* fix data fetchers

* fix case where plugin dir doesn't exist

* improve coverage + move dupe checking to loader

* remove noisy debug logs

* register core plugins automagically

* add support for renderer in catalog

* make private func + fix req validation

* use interface

* re-add check for renderer in catalog

* tidy up from moving to auto reg core plugins

* core plugin registrar

* guards

* copy over core plugins for test infra

* all tests green

* renames

* propagate new interfaces

* kill old manager

* get compiling

* tidy up

* update naming

* refactor manager test + cleanup

* add more cases to finder test

* migrate validator to field

* more coverage

* refactor dupe checking

* add test for plugin class

* add coverage for initializer

* split out rendering

* move

* fixup tests

* fix uss test

* fix frontend settings

* fix grafanads test

* add check when checking sig errors

* fix enabled map

* fixup

* allow manual setup of CM

* rename to cloud-monitoring

* remove TODO

* add installer interface for testing

* loader interface returns

* tests passing

* refactor + add more coverage

* support 'stackdriver'

* fix frontend settings loading

* improve naming based on package name

* small tidy

* refactor test

* fix renderer start

* make cloud-monitoring plugin ID clearer

* add plugin update test

* add integration tests

* don't break all if sig can't be calculated

* add root URL check test

* add more signature verification tests

* update DTO name

* update enabled plugins comment

* update comments

* fix linter

* revert fe naming change

* fix errors endpoint

* reset error code field name

* re-order test to help verify

* assert -> require

* pm check

* add missing entry + re-order

* re-check

* dump icon log

* verify manager contents first

* reformat

* apply PR feedback

* apply style changes

* fix one vs all loading err

* improve log output

* only start when no signature error

* move log

* rework plugin update check

* fix test

* fix multi loading from cfg.PluginSettings

* improve log output #2

* add error abstraction to capture errors without registering a plugin

* add debug log

* add unsigned warning

* e2e test attempt

* fix logger

* set home path

* prevent panic

* alternate

* ugh.. fix home path

* return renderer even if not started

* make renderer plugin managed

* add fallback renderer icon, update renderer badge + prevent changes when renderer is installed

* fix icon loading

* rollback renderer changes

* use correct field

* remove unneccessary block

* remove newline

* remove unused func

* fix bundled plugins base + module fields

* remove unused field since refactor

* add authorizer abstraction

* loader only returns plugins expected to run

* fix multi log output
2021-11-01 10:53:33 +01:00

416 lines
12 KiB
Go

package cloudwatch
import (
"context"
"encoding/json"
"fmt"
"regexp"
"time"
"github.com/aws/aws-sdk-go/aws/client"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi"
"github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi/resourcegroupstaggingapiiface"
"github.com/grafana/grafana-aws-sdk/pkg/awsds"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/plugins/backendplugin/coreplugin"
"github.com/grafana/grafana/pkg/setting"
)
type datasourceInfo struct {
profile string
region string
authType awsds.AuthType
assumeRoleARN string
externalID string
namespace string
endpoint string
accessKey string
secretKey string
datasourceID int64
}
const cloudWatchTSFormat = "2006-01-02 15:04:05.000"
const defaultRegion = "default"
// Constants also defined in datasource/cloudwatch/datasource.ts
const logIdentifierInternal = "__log__grafana_internal__"
const logStreamIdentifierInternal = "__logstream__grafana_internal__"
var plog = log.New("tsdb.cloudwatch")
var aliasFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`)
func ProvideService(cfg *setting.Cfg, logsService *LogsService, registrar plugins.CoreBackendRegistrar) (*CloudWatchService, error) {
plog.Debug("initing")
executor := newExecutor(logsService, datasource.NewInstanceManager(NewInstanceSettings()), cfg, awsds.NewSessionCache())
factory := coreplugin.New(backend.ServeOpts{
QueryDataHandler: executor,
})
if err := registrar.LoadAndRegister("cloudwatch", factory); err != nil {
plog.Error("Failed to register plugin", "error", err)
return nil, err
}
return &CloudWatchService{
LogsService: logsService,
Cfg: cfg,
Executor: executor,
}, nil
}
type CloudWatchService struct {
LogsService *LogsService
Cfg *setting.Cfg
Executor *cloudWatchExecutor
}
type SessionCache interface {
GetSession(region string, s awsds.AWSDatasourceSettings) (*session.Session, error)
}
func newExecutor(logsService *LogsService, im instancemgmt.InstanceManager, cfg *setting.Cfg, sessions SessionCache) *cloudWatchExecutor {
return &cloudWatchExecutor{
logsService: logsService,
im: im,
cfg: cfg,
sessions: sessions,
}
}
func NewInstanceSettings() datasource.InstanceFactoryFunc {
return func(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
jsonData := struct {
Profile string `json:"profile"`
Region string `json:"defaultRegion"`
AssumeRoleARN string `json:"assumeRoleArn"`
ExternalID string `json:"externalId"`
Endpoint string `json:"endpoint"`
Namespace string `json:"customMetricsNamespaces"`
AuthType string `json:"authType"`
}{}
err := json.Unmarshal(settings.JSONData, &jsonData)
if err != nil {
return nil, fmt.Errorf("error reading settings: %w", err)
}
model := datasourceInfo{
profile: jsonData.Profile,
region: jsonData.Region,
assumeRoleARN: jsonData.AssumeRoleARN,
externalID: jsonData.ExternalID,
endpoint: jsonData.Endpoint,
namespace: jsonData.Namespace,
datasourceID: settings.ID,
}
at := awsds.AuthTypeDefault
switch jsonData.AuthType {
case "credentials":
at = awsds.AuthTypeSharedCreds
case "keys":
at = awsds.AuthTypeKeys
case "default":
at = awsds.AuthTypeDefault
case "ec2_iam_role":
at = awsds.AuthTypeEC2IAMRole
case "arn":
at = awsds.AuthTypeDefault
plog.Warn("Authentication type \"arn\" is deprecated, falling back to default")
default:
plog.Warn("Unrecognized AWS authentication type", "type", jsonData.AuthType)
}
model.authType = at
if model.profile == "" {
model.profile = settings.Database // legacy support
}
model.accessKey = settings.DecryptedSecureJSONData["accessKey"]
model.secretKey = settings.DecryptedSecureJSONData["secretKey"]
return model, nil
}
}
// cloudWatchExecutor executes CloudWatch requests.
type cloudWatchExecutor struct {
logsService *LogsService
im instancemgmt.InstanceManager
cfg *setting.Cfg
sessions SessionCache
}
func (e *cloudWatchExecutor) newSession(region string, pluginCtx backend.PluginContext) (*session.Session, error) {
dsInfo, err := e.getDSInfo(pluginCtx)
if err != nil {
return nil, err
}
if region == defaultRegion {
region = dsInfo.region
}
return e.sessions.GetSession(region, awsds.AWSDatasourceSettings{
Profile: dsInfo.profile,
Region: region,
AuthType: dsInfo.authType,
AssumeRoleARN: dsInfo.assumeRoleARN,
ExternalID: dsInfo.externalID,
Endpoint: dsInfo.endpoint,
DefaultRegion: dsInfo.region,
AccessKey: dsInfo.accessKey,
SecretKey: dsInfo.secretKey,
})
}
func (e *cloudWatchExecutor) getCWClient(region string, pluginCtx backend.PluginContext) (cloudwatchiface.CloudWatchAPI, error) {
sess, err := e.newSession(region, pluginCtx)
if err != nil {
return nil, err
}
return NewCWClient(sess), nil
}
func (e *cloudWatchExecutor) getCWLogsClient(region string, pluginCtx backend.PluginContext) (cloudwatchlogsiface.CloudWatchLogsAPI, error) {
sess, err := e.newSession(region, pluginCtx)
if err != nil {
return nil, err
}
logsClient := NewCWLogsClient(sess)
return logsClient, nil
}
func (e *cloudWatchExecutor) getEC2Client(region string, pluginCtx backend.PluginContext) (ec2iface.EC2API, error) {
sess, err := e.newSession(region, pluginCtx)
if err != nil {
return nil, err
}
return newEC2Client(sess), nil
}
func (e *cloudWatchExecutor) getRGTAClient(region string, pluginCtx backend.PluginContext) (resourcegroupstaggingapiiface.ResourceGroupsTaggingAPIAPI,
error) {
sess, err := e.newSession(region, pluginCtx)
if err != nil {
return nil, err
}
return newRGTAClient(sess), nil
}
func (e *cloudWatchExecutor) alertQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI,
queryContext backend.DataQuery, model *simplejson.Json) (*cloudwatchlogs.GetQueryResultsOutput, error) {
const maxAttempts = 8
const pollPeriod = 1000 * time.Millisecond
startQueryOutput, err := e.executeStartQuery(ctx, logsClient, model, queryContext.TimeRange)
if err != nil {
return nil, err
}
requestParams := simplejson.NewFromAny(map[string]interface{}{
"region": model.Get("region").MustString(""),
"queryId": *startQueryOutput.QueryId,
})
ticker := time.NewTicker(pollPeriod)
defer ticker.Stop()
attemptCount := 1
for range ticker.C {
res, err := e.executeGetQueryResults(ctx, logsClient, requestParams)
if err != nil {
return nil, err
}
if isTerminated(*res.Status) {
return res, err
}
if attemptCount >= maxAttempts {
return res, fmt.Errorf("fetching of query results exceeded max number of attempts")
}
attemptCount++
}
return nil, nil
}
func (e *cloudWatchExecutor) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
/*
Unlike many other data sources, with Cloudwatch Logs query requests don't receive the results as the response
to the query, but rather an ID is first returned. Following this, a client is expected to send requests along
with the ID until the status of the query is complete, receiving (possibly partial) results each time. For
queries made via dashboards and Explore, the logic of making these repeated queries is handled on the
frontend, but because alerts are executed on the backend the logic needs to be reimplemented here.
*/
q := req.Queries[0]
model, err := simplejson.NewJson(q.JSON)
if err != nil {
return nil, err
}
_, fromAlert := req.Headers["FromAlert"]
isLogAlertQuery := fromAlert && model.Get("queryMode").MustString("") == "Logs"
if isLogAlertQuery {
return e.executeLogAlertQuery(ctx, req)
}
queryType := model.Get("type").MustString("")
var result *backend.QueryDataResponse
switch queryType {
case "metricFindQuery":
result, err = e.executeMetricFindQuery(ctx, model, q, req.PluginContext)
case "annotationQuery":
result, err = e.executeAnnotationQuery(ctx, model, q, req.PluginContext)
case "logAction":
result, err = e.executeLogActions(ctx, req)
case "liveLogAction":
result, err = e.executeLiveLogQuery(ctx, req)
case "timeSeriesQuery":
fallthrough
default:
result, err = e.executeTimeSeriesQuery(ctx, req)
}
return result, err
}
func (e *cloudWatchExecutor) executeLogAlertQuery(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
resp := backend.NewQueryDataResponse()
for _, q := range req.Queries {
model, err := simplejson.NewJson(q.JSON)
if err != nil {
continue
}
model.Set("subtype", "StartQuery")
model.Set("queryString", model.Get("expression").MustString(""))
region := model.Get("region").MustString(defaultRegion)
if region == defaultRegion {
dsInfo, err := e.getDSInfo(req.PluginContext)
if err != nil {
return nil, err
}
model.Set("region", dsInfo.region)
}
logsClient, err := e.getCWLogsClient(region, req.PluginContext)
if err != nil {
return nil, err
}
result, err := e.executeStartQuery(ctx, logsClient, model, q.TimeRange)
if err != nil {
return nil, err
}
model.Set("queryId", *result.QueryId)
getQueryResultsOutput, err := e.alertQuery(ctx, logsClient, q, model)
if err != nil {
return nil, err
}
dataframe, err := logsResultsToDataframes(getQueryResultsOutput)
if err != nil {
return nil, err
}
var frames []*data.Frame
statsGroups := model.Get("statsGroups").MustStringArray()
if len(statsGroups) > 0 && len(dataframe.Fields) > 0 {
frames, err = groupResults(dataframe, statsGroups)
if err != nil {
return nil, err
}
} else {
frames = data.Frames{dataframe}
}
respD := resp.Responses["A"]
respD.Frames = frames
resp.Responses["A"] = respD
}
return resp, nil
}
func (e *cloudWatchExecutor) getDSInfo(pluginCtx backend.PluginContext) (*datasourceInfo, error) {
i, err := e.im.Get(pluginCtx)
if err != nil {
return nil, err
}
instance := i.(datasourceInfo)
return &instance, nil
}
func isTerminated(queryStatus string) bool {
return queryStatus == "Complete" || queryStatus == "Cancelled" || queryStatus == "Failed" || queryStatus == "Timeout"
}
// NewCWClient is a CloudWatch client factory.
//
// Stubbable by tests.
var NewCWClient = func(sess *session.Session) cloudwatchiface.CloudWatchAPI {
client := cloudwatch.New(sess)
client.Handlers.Send.PushFront(func(r *request.Request) {
r.HTTPRequest.Header.Set("User-Agent", fmt.Sprintf("Grafana/%s", setting.BuildVersion))
})
return client
}
// NewCWLogsClient is a CloudWatch logs client factory.
//
// Stubbable by tests.
var NewCWLogsClient = func(sess *session.Session) cloudwatchlogsiface.CloudWatchLogsAPI {
client := cloudwatchlogs.New(sess)
client.Handlers.Send.PushFront(func(r *request.Request) {
r.HTTPRequest.Header.Set("User-Agent", fmt.Sprintf("Grafana/%s", setting.BuildVersion))
})
return client
}
// EC2 client factory.
//
// Stubbable by tests.
var newEC2Client = func(provider client.ConfigProvider) ec2iface.EC2API {
return ec2.New(provider)
}
// RGTA client factory.
//
// Stubbable by tests.
var newRGTAClient = func(provider client.ConfigProvider) resourcegroupstaggingapiiface.ResourceGroupsTaggingAPIAPI {
return resourcegroupstaggingapi.New(provider)
}