Files
grafana/pkg/tsdb/cloudwatch/cloudwatch.go
T
Erik Sundell 254577ba56 CloudWatch: Cross-account querying support (#59362)
* Lattice: Point to private prerelease of aws-sdk-go (#515)

* point to private prerelease of aws-sdk-go

* fix build issue

* Lattice: Adding a feature toggle (#549)

* Adding a feature toggle for lattice

* Change name of feature toggle

* Lattice: List accounts (#543)

* Separate layers

* Introduce testify/mock library

Co-authored-by: Shirley Leu <4163034+fridgepoet@users.noreply.github.com>

* point to version that includes metric api changes (#574)

* add accounts component (#575)

* Test refactor: remove unneeded clientFactoryMock (#581)

* Lattice: Add monitoring badge (#576)

* add monitoring badge

* fix tests

* solve conflict

* Lattice: Add dynamic label for account display name (#579)

* Build: Automatically sync lattice-main with OSS

* Lattice: Point to private prerelease of aws-sdk-go (#515)

* point to private prerelease of aws-sdk-go

* fix build issue

* Lattice: Adding a feature toggle (#549)

* Adding a feature toggle for lattice

* Change name of feature toggle

* Lattice: List accounts (#543)

* Separate layers

* Introduce testify/mock library

Co-authored-by: Shirley Leu <4163034+fridgepoet@users.noreply.github.com>

* point to version that includes metric api changes (#574)

* add accounts component (#575)

* Test refactor: remove unneeded clientFactoryMock (#581)

* Lattice: Add monitoring badge (#576)

* add monitoring badge

* fix tests

* solve conflict

* add account label

Co-authored-by: Shirley Leu <4163034+fridgepoet@users.noreply.github.com>
Co-authored-by: Sarah Zinger <sarah.zinger@grafana.com>

* fix import

* solve merge related problem

* add account info (#608)

* add back namespaces handler

* Lattice: Parse account id and return it to frontend (#609)

* parse account id and return to frontend

* fix route test

* only show badge when feature toggle is enabled (#615)

* Lattice: Refactor resource response type and return account (#613)

* refactor resource response type

* remove not used file.

* go lint

* fix tests

* remove commented code

* Lattice: Use account as input when listing metric names and dimensions (#611)

* use account in resource requests

* add account to response

* revert accountInfo to accountId

* PR feedback

* unit test account in list metrics response

* remove not used asserts

* don't assert on response that is not relevant to the test

* removed dupe test

* pr feedback

* rename request package (#626)

* Lattice: Move account component and add tooltip (#630)

* move accounts component to the top of metric stat editor

* add tooltip

* CloudWatch: add account to GetMetricData queries (#627)

* Add AccountId to metric stat query

* Lattice: Account variable support  (#625)

* add variable support in accounts component

* add account variable query type

* update variables

* interpolate variable before its sent to backend

* handle variable change in hooks

* remove not used import

* Update public/app/plugins/datasource/cloudwatch/components/Account.tsx

Co-authored-by: Sarah Zinger <sarah.zinger@grafana.com>

* Update public/app/plugins/datasource/cloudwatch/hooks.ts

Co-authored-by: Sarah Zinger <sarah.zinger@grafana.com>

* add one more unit test

Co-authored-by: Sarah Zinger <sarah.zinger@grafana.com>

* cleanup (#629)

* Set account Id according to crossAccountQuerying feature flag in backend (#632)

* CloudWatch: Change spelling of feature-toggle (#634)

* Lattice Logs (#631)

* Lattice Logs

* Fixes after CR

* Lattice: Bug: fix dimension keys request (#644)

* fix dimension keys

* fix lint

* more lint

* CloudWatch: Add tests for QueryData with AccountId (#637)

* Update from breaking change (#645)

* Update from breaking change

* Remove extra interface and methods

Co-authored-by: Shirley Leu <4163034+fridgepoet@users.noreply.github.com>

* CloudWatch: Add business logic layer for getting log groups (#642)



Co-authored-by: Sarah Zinger <sarah.zinger@grafana.com>

* Lattice: Fix - unset account id in region change handler (#646)

* move reset of account to region change handler

* fix broken test

* Lattice: Add account id to metric stat query deep link (#656)

add account id to metric stat link

* CloudWatch: Add new log groups handler for cross-account querying (#643)

* Lattice: Add feature tracking (#660)

* add tracking for account id prescense in metrics query

* also check feature toggle

* fix broken test

* CloudWatch: Add route for DescribeLogGroups for cross-account querying (#647)

Co-authored-by: Erik Sundell <erik.sundell87@gmail.com>

* Lattice: Handle account id default value (#662)

* make sure right type is returned

* set right default values

* Suggestions to lattice changes (#663)

* Change ListMetricsWithPageLimit response to slice of non-pointers

* Change GetAccountsForCurrentUserOrRole response to be not pointer

* Clean test Cleanup calls in test

* Remove CloudWatchAPI as part of mock

* Resolve conflicts

* Add Latest SDK (#672)

* add tooltip (#674)

* Docs: Add documentation for CloudWatch cross account querying (#676)

* wip docs

* change wordings

* add sections about metrics and logs

* change from monitoring to observability

* Update docs/sources/datasources/aws-cloudwatch/_index.md

Co-authored-by: Sarah Zinger <sarah.zinger@grafana.com>

* Update docs/sources/datasources/aws-cloudwatch/query-editor/index.md

Co-authored-by: Fiona Artiaga <89225282+GrafanaWriter@users.noreply.github.com>

* Update docs/sources/datasources/aws-cloudwatch/query-editor/index.md

Co-authored-by: Fiona Artiaga <89225282+GrafanaWriter@users.noreply.github.com>

* Update docs/sources/datasources/aws-cloudwatch/query-editor/index.md

Co-authored-by: Sarah Zinger <sarah.zinger@grafana.com>

* Update docs/sources/datasources/aws-cloudwatch/query-editor/index.md

Co-authored-by: Fiona Artiaga <89225282+GrafanaWriter@users.noreply.github.com>

* apply pr feedback

* fix file name

* more pr feedback

* pr feedback

Co-authored-by: Sarah Zinger <sarah.zinger@grafana.com>
Co-authored-by: Fiona Artiaga <89225282+GrafanaWriter@users.noreply.github.com>

* use latest version of the aws-sdk-go

* Fix tests' mock response type

* Remove change in Azure Monitor

Co-authored-by: Sarah Zinger <sarah.zinger@grafana.com>
Co-authored-by: Shirley Leu <4163034+fridgepoet@users.noreply.github.com>
Co-authored-by: Fiona Artiaga <89225282+GrafanaWriter@users.noreply.github.com>
2022-11-28 12:39:12 +01:00

470 lines
14 KiB
Go

package cloudwatch
import (
"context"
"encoding/json"
"fmt"
"net/http"
"regexp"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/client"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/aws/aws-sdk-go/service/oam"
"github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi"
"github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi/resourcegroupstaggingapiiface"
"github.com/grafana/grafana-aws-sdk/pkg/awsds"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
"github.com/grafana/grafana-plugin-sdk-go/backend/resource/httpadapter"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/infra/httpclient"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/clients"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/models"
)
type DataQueryJson struct {
QueryType string `json:"type,omitempty"`
QueryMode string
PrefixMatching bool
Region string
Namespace string
MetricName string
Dimensions map[string]interface{}
Statistic *string
Period string
ActionPrefix string
AlarmNamePrefix string
}
type DataSource struct {
Settings models.CloudWatchSettings
HTTPClient *http.Client
}
const (
cloudWatchTSFormat = "2006-01-02 15:04:05.000"
defaultRegion = "default"
// Constants also defined in datasource/cloudwatch/datasource.ts
logIdentifierInternal = "__log__grafana_internal__"
logStreamIdentifierInternal = "__logstream__grafana_internal__"
alertMaxAttempts = 8
alertPollPeriod = time.Second
logsQueryMode = "Logs"
// QueryTypes
annotationQuery = "annotationQuery"
logAction = "logAction"
timeSeriesQuery = "timeSeriesQuery"
)
var logger = log.New("tsdb.cloudwatch")
var aliasFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`)
func ProvideService(cfg *setting.Cfg, httpClientProvider httpclient.Provider, features featuremgmt.FeatureToggles) *CloudWatchService {
logger.Debug("Initializing")
executor := newExecutor(datasource.NewInstanceManager(NewInstanceSettings(httpClientProvider)), cfg, awsds.NewSessionCache(), features)
return &CloudWatchService{
Cfg: cfg,
Executor: executor,
}
}
type CloudWatchService struct {
Cfg *setting.Cfg
Executor *cloudWatchExecutor
}
type SessionCache interface {
GetSession(c awsds.SessionConfig) (*session.Session, error)
}
func newExecutor(im instancemgmt.InstanceManager, cfg *setting.Cfg, sessions SessionCache, features featuremgmt.FeatureToggles) *cloudWatchExecutor {
e := &cloudWatchExecutor{
im: im,
cfg: cfg,
sessions: sessions,
features: features,
}
e.resourceHandler = httpadapter.New(e.newResourceMux())
return e
}
func (e *cloudWatchExecutor) getRequestContext(pluginCtx backend.PluginContext, region string) (models.RequestContext, error) {
r := region
instance, err := e.getInstance(pluginCtx)
if region == defaultRegion {
if err != nil {
return models.RequestContext{}, err
}
r = instance.Settings.Region
}
sess, err := e.newSession(pluginCtx, r)
if err != nil {
return models.RequestContext{}, err
}
return models.RequestContext{
OAMClientProvider: NewOAMAPI(sess),
MetricsClientProvider: clients.NewMetricsClient(NewMetricsAPI(sess), e.cfg),
LogsAPIProvider: NewLogsAPI(sess),
Settings: instance.Settings,
Features: e.features,
}, nil
}
func NewInstanceSettings(httpClientProvider httpclient.Provider) datasource.InstanceFactoryFunc {
return func(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
instanceSettings, err := models.LoadCloudWatchSettings(settings)
if err != nil {
return nil, fmt.Errorf("error reading settings: %w", err)
}
httpClient, err := httpClientProvider.New()
if err != nil {
return nil, fmt.Errorf("error creating http client: %w", err)
}
return DataSource{
Settings: instanceSettings,
HTTPClient: httpClient,
}, nil
}
}
// cloudWatchExecutor executes CloudWatch requests.
type cloudWatchExecutor struct {
im instancemgmt.InstanceManager
cfg *setting.Cfg
sessions SessionCache
features featuremgmt.FeatureToggles
resourceHandler backend.CallResourceHandler
}
func (e *cloudWatchExecutor) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
return e.resourceHandler.CallResource(ctx, req, sender)
}
func (e *cloudWatchExecutor) checkHealthMetrics(pluginCtx backend.PluginContext) error {
namespace := "AWS/Billing"
metric := "EstimatedCharges"
params := &cloudwatch.ListMetricsInput{
Namespace: &namespace,
MetricName: &metric,
}
session, err := e.newSession(pluginCtx, defaultRegion)
if err != nil {
return err
}
metricClient := clients.NewMetricsClient(NewMetricsAPI(session), e.cfg)
_, err = metricClient.ListMetricsWithPageLimit(params)
return err
}
func (e *cloudWatchExecutor) checkHealthLogs(pluginCtx backend.PluginContext) error {
session, err := e.newSession(pluginCtx, defaultRegion)
if err != nil {
return err
}
logsClient := NewLogsAPI(session)
_, err = logsClient.DescribeLogGroups(&cloudwatchlogs.DescribeLogGroupsInput{Limit: aws.Int64(1)})
return err
}
func (e *cloudWatchExecutor) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
status := backend.HealthStatusOk
metricsTest := "Successfully queried the CloudWatch metrics API."
logsTest := "Successfully queried the CloudWatch logs API."
err := e.checkHealthMetrics(req.PluginContext)
if err != nil {
status = backend.HealthStatusError
metricsTest = fmt.Sprintf("CloudWatch metrics query failed: %s", err.Error())
}
err = e.checkHealthLogs(req.PluginContext)
if err != nil {
status = backend.HealthStatusError
logsTest = fmt.Sprintf("CloudWatch logs query failed: %s", err.Error())
}
return &backend.CheckHealthResult{
Status: status,
Message: fmt.Sprintf("1. %s\n2. %s", metricsTest, logsTest),
}, nil
}
func (e *cloudWatchExecutor) newSession(pluginCtx backend.PluginContext, region string) (*session.Session, error) {
instance, err := e.getInstance(pluginCtx)
if err != nil {
return nil, err
}
if region == defaultRegion {
region = instance.Settings.Region
}
return e.sessions.GetSession(awsds.SessionConfig{
// https://github.com/grafana/grafana/issues/46365
// HTTPClient: dsInfo.HTTPClient,
Settings: awsds.AWSDatasourceSettings{
Profile: instance.Settings.Profile,
Region: region,
AuthType: instance.Settings.AuthType,
AssumeRoleARN: instance.Settings.AssumeRoleARN,
ExternalID: instance.Settings.ExternalID,
Endpoint: instance.Settings.Endpoint,
DefaultRegion: instance.Settings.Region,
AccessKey: instance.Settings.AccessKey,
SecretKey: instance.Settings.SecretKey,
},
UserAgentName: aws.String("Cloudwatch"),
})
}
func (e *cloudWatchExecutor) getCWClient(pluginCtx backend.PluginContext, region string) (cloudwatchiface.CloudWatchAPI, error) {
sess, err := e.newSession(pluginCtx, region)
if err != nil {
return nil, err
}
return NewCWClient(sess), nil
}
func (e *cloudWatchExecutor) getCWLogsClient(pluginCtx backend.PluginContext, region string) (cloudwatchlogsiface.CloudWatchLogsAPI, error) {
sess, err := e.newSession(pluginCtx, region)
if err != nil {
return nil, err
}
logsClient := NewCWLogsClient(sess)
return logsClient, nil
}
func (e *cloudWatchExecutor) getEC2Client(pluginCtx backend.PluginContext, region string) (ec2iface.EC2API, error) {
sess, err := e.newSession(pluginCtx, region)
if err != nil {
return nil, err
}
return newEC2Client(sess), nil
}
func (e *cloudWatchExecutor) getRGTAClient(pluginCtx backend.PluginContext, region string) (resourcegroupstaggingapiiface.ResourceGroupsTaggingAPIAPI,
error) {
sess, err := e.newSession(pluginCtx, region)
if err != nil {
return nil, err
}
return newRGTAClient(sess), nil
}
func (e *cloudWatchExecutor) alertQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI,
queryContext backend.DataQuery, model LogQueryJson) (*cloudwatchlogs.GetQueryResultsOutput, error) {
startQueryOutput, err := e.executeStartQuery(ctx, logsClient, model, queryContext.TimeRange)
if err != nil {
return nil, err
}
requestParams := LogQueryJson{
Region: model.Region,
QueryId: *startQueryOutput.QueryId,
}
ticker := time.NewTicker(alertPollPeriod)
defer ticker.Stop()
attemptCount := 1
for range ticker.C {
res, err := e.executeGetQueryResults(ctx, logsClient, requestParams)
if err != nil {
return nil, err
}
if isTerminated(*res.Status) {
return res, err
}
if attemptCount >= alertMaxAttempts {
return res, fmt.Errorf("fetching of query results exceeded max number of attempts")
}
attemptCount++
}
return nil, nil
}
func (e *cloudWatchExecutor) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
logger := logger.FromContext(ctx)
/*
Unlike many other data sources, with Cloudwatch Logs query requests don't receive the results as the response
to the query, but rather an ID is first returned. Following this, a client is expected to send requests along
with the ID until the status of the query is complete, receiving (possibly partial) results each time. For
queries made via dashboards and Explore, the logic of making these repeated queries is handled on the
frontend, but because alerts are executed on the backend the logic needs to be reimplemented here.
*/
q := req.Queries[0]
var model DataQueryJson
err := json.Unmarshal(q.JSON, &model)
if err != nil {
return nil, err
}
_, fromAlert := req.Headers["FromAlert"]
isLogAlertQuery := fromAlert && model.QueryMode == logsQueryMode
if isLogAlertQuery {
return e.executeLogAlertQuery(ctx, req)
}
var result *backend.QueryDataResponse
switch model.QueryType {
case annotationQuery:
result, err = e.executeAnnotationQuery(req.PluginContext, model, q)
case logAction:
result, err = e.executeLogActions(ctx, logger, req)
case timeSeriesQuery:
fallthrough
default:
result, err = e.executeTimeSeriesQuery(ctx, logger, req)
}
return result, err
}
func (e *cloudWatchExecutor) executeLogAlertQuery(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
resp := backend.NewQueryDataResponse()
for _, q := range req.Queries {
var model LogQueryJson
err := json.Unmarshal(q.JSON, &model)
if err != nil {
continue
}
model.Subtype = "StartQuery"
model.QueryString = model.Expression
region := model.Region
if model.Region == "" || region == defaultRegion {
instance, err := e.getInstance(req.PluginContext)
if err != nil {
return nil, err
}
model.Region = instance.Settings.Region
}
logsClient, err := e.getCWLogsClient(req.PluginContext, region)
if err != nil {
return nil, err
}
getQueryResultsOutput, err := e.alertQuery(ctx, logsClient, q, model)
if err != nil {
return nil, err
}
dataframe, err := logsResultsToDataframes(getQueryResultsOutput)
if err != nil {
return nil, err
}
var frames []*data.Frame
if len(model.StatsGroups) > 0 && len(dataframe.Fields) > 0 {
frames, err = groupResults(dataframe, model.StatsGroups)
if err != nil {
return nil, err
}
} else {
frames = data.Frames{dataframe}
}
respD := resp.Responses["A"]
respD.Frames = frames
resp.Responses["A"] = respD
}
return resp, nil
}
func (e *cloudWatchExecutor) getInstance(pluginCtx backend.PluginContext) (*DataSource, error) {
i, err := e.im.Get(pluginCtx)
if err != nil {
return nil, err
}
instance := i.(DataSource)
return &instance, nil
}
func isTerminated(queryStatus string) bool {
return queryStatus == "Complete" || queryStatus == "Cancelled" || queryStatus == "Failed" || queryStatus == "Timeout"
}
// NewMetricsAPI is a CloudWatch metrics api factory.
//
// Stubbable by tests.
var NewMetricsAPI = func(sess *session.Session) models.CloudWatchMetricsAPIProvider {
return cloudwatch.New(sess)
}
// NewLogsAPI is a CloudWatch logs api factory.
//
// Stubbable by tests.
var NewLogsAPI = func(sess *session.Session) models.CloudWatchLogsAPIProvider {
return cloudwatchlogs.New(sess)
}
// NewOAMAPI is a CloudWatch OAM api factory.
//
// Stubbable by tests.
var NewOAMAPI = func(sess *session.Session) models.OAMClientProvider {
return oam.New(sess)
}
// NewCWClient is a CloudWatch client factory.
//
// Stubbable by tests.
var NewCWClient = func(sess *session.Session) cloudwatchiface.CloudWatchAPI {
return cloudwatch.New(sess)
}
// NewCWLogsClient is a CloudWatch logs client factory.
//
// Stubbable by tests.
var NewCWLogsClient = func(sess *session.Session) cloudwatchlogsiface.CloudWatchLogsAPI {
return cloudwatchlogs.New(sess)
}
// EC2 client factory.
//
// Stubbable by tests.
var newEC2Client = func(provider client.ConfigProvider) ec2iface.EC2API {
return ec2.New(provider)
}
// RGTA client factory.
//
// Stubbable by tests.
var newRGTAClient = func(provider client.ConfigProvider) resourcegroupstaggingapiiface.ResourceGroupsTaggingAPIAPI {
return resourcegroupstaggingapi.New(provider)
}