Files
grafana/pkg/tsdb/azuremonitor/loganalytics/azure-log-analytics-datasource.go
T
Andreas Christou 63383ef545 AzureMonitor: Application Insights Traces (#64859)
* Build out barebones Traces editor

- Add Traces query type and operation ID prop to query type
- Add necessary header types
- Update resource picker to appropriately work with traces query type
- Build out TracesQueryEditor component
- Include logic to retrieve operationId's for AI Workspaces
- Add backend route mapping
- Update macro to use timestamp as default time field for traces

* AzureMonitor: Traces - Response parsing (#65442)

* Update FormatAsField component

- Add trace ResultFormat type
- Generalise FormatAsField component
- Add component to TracesQueryEditor
- Remove duplicate code in setQueryValue

* Add custom filter function to improve performance

* Add basic conversion for logs to trace

- Add serviceTags converter
- Pass through required parameters (queryType and resultFormat)
- Appropriately set visualisation

* Update parsing to also fill trace tags

- Add constant values for each table schema (include legacy mapping for now if needed)
- Add constant for list of table tags
- Set the foundation for dynamic query building
- Update query to build tags value
- Appropriately set operationName
- Update tagsConverter to filter empty values

* Fix lint and test issues

* AzureMonitor: Traces - Data links (#65566)

* Add portal link for traces

- Pull out necessary values (itemId and itemType)
- Appropriately construct
- Fix ordering

* Set default format as value

- Also set default visualisation

* Fix event schema

* Set default formatAsField value

* Include logs link on traces results

- Adapt config links to allow custom title to be set

* Correctly set operationId for query

* Update backend types

- Include OperationID in query
- Pass forward datasource name and UID

* Ensure setTime doesn't consistently get called if operationID is defined

* Add explore link

- Update util functions to allow setting custom datalinks

* Fix tests

* AzureMonitor: Traces - Query and Editor updates (#66076)

* Add initial query

- Will query the resource as soon as a resource has been selected
- Updates the data links for the query without operationId
- Remove initial operationId query and timeRange dependency
- Update query building

* Add entirely separate traces query property

- Update shared types (also including future types for Azure traces)
- Update backend log analytics datasource to accept both azureLogAnalytics and azureTraces queries
- Update backend specific types
- Update frontend datasource for new properties
- Update mock query

* Update FormatAsField to be entirely generic

* Update query building to be done in backend

- Add required mappings in backend
- Update frontend querying

* Fix query and explore data link

* Add trace type selection

* Better method for setting explore link

* Fix operationId updating

* Run go mod tidy

* Unnecessary changes

* Fix tests

* AzureMonitor: Traces - Add correlation API support (#65855)

Add correlation API support

- Add necessary types
- Add correlation API request when conditions are met
- Update query

* Fix property from merge

* AzureMonitor: Traces - Filtering (#66303)

* Add initial query

- Will query the resource as soon as a resource has been selected
- Updates the data links for the query without operationId
- Remove initial operationId query and timeRange dependency
- Update query building

* Add entirely separate traces query property

- Update shared types (also including future types for Azure traces)
- Update backend log analytics datasource to accept both azureLogAnalytics and azureTraces queries
- Update backend specific types
- Update frontend datasource for new properties
- Update mock query

* Update FormatAsField to be entirely generic

* Update query building to be done in backend

- Add required mappings in backend
- Update frontend querying

* Fix query and explore data link

* Add trace type selection

* Better method for setting explore link

* Fix operationId updating

* Run go mod tidy

* Unnecessary changes

* Fix tests

* Start building out Filters component

- Configure component to query for Filter property values when a filter property is set
- Add setFilters function
- Add typing to tablesSchema
- Use component in TracesQueryEditor

* Update Filters

- Asynchronously pull property options
- Setup list of Filter components

* Update filters component

- Remove unused imports
- Have local filters state and query filters
- Correctly set filters values
- Don't update query every time a filter property changes (not performant)

* Update properties query

- Use current timeRange
- Get count to provide informative labels

* Reset map when time changes

* Add operation selection

* Reset filters when property changes

* Appropriate label name for empty values

* Add filtering to query

* Update filter components

- Fix rendering issue
- Correctly compare and update timeRange
- Split out files for simplicity

* Add checkbox option to multiselect

- Add custom option component
- Correctly call onChange
- Add variableOptionGroup for template variable selection

* Fix adding template vars

* Improve labels and refresh labels on query prop changes

* AzureMonitor: Traces - Testing (#66474)

* Select ds for template variable interpolation

* Update az logs ds tests

- Add templateVariables test
- Add filter test
- Update mock
- Remove anys

* Update QueryEditor test

- Update mocks with timeSrv for log analytics datasource
- Fix query mock
- Use appropriate and consistent selectors

* Add TracesQueryEditor test

- Update resourcePickerRows mock to include app insights resources
- Remove comments and extra new line

* Add FormatAsField test

- Remove unneeded condition

* Update resourcePicker utils test

* Don't hide selected options in filters

* Fix multi-selection on filters

* Add TraceTypeField test

- Add test file
- Update selectors (remove copy/paste mistake)
- Update placeholder text for select and add label

* Add basic filters test

* Begin filters test

* Update filters test

* Add final tests and simplify/generalise addFilter helper

* Minor update to datasource test

* Update macros test

* Update selectors in tests

* Add response-table-frame tests

* Add datasource tests

- Use sorting where JSON models are inconsistent
- Update filters clause
- Dedupe tags
- Correct operationId conditions

* Don't set a default value for blurInputOnSelect

* Simplify datasource test

* Update to use CheckGoldenJSON utils

- Update with generated frame files
- Remove redundant expected frame code
- Update all usages

* Fix lint

* AzureMonitor: Traces feedback (#67292)

* Filter traces if the visualisation is set to trace

- Update build query logic
- Added additional test cases
- Return an error if the traces type is set by itself with the trace visualisation
- Add descriptions to event types
- Update tests

* Fix bug for error displaying traces

* Update mappings and add error field

- Update tests
- Remove unnecessary comments

* Switch location of Operation ID field

* Re-order fields

* Update link title

* Update label for event type selection

* Update correct link title

* Update logs datalink to link to Azure Logs in explore

* Fix lint
2023-04-27 20:24:11 +01:00

752 lines
27 KiB
Go

package loganalytics
import (
"bytes"
"compress/gzip"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"path"
"regexp"
"sort"
"strings"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"go.opentelemetry.io/otel/attribute"
"k8s.io/utils/strings/slices"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/tsdb/azuremonitor/kinds/dataquery"
"github.com/grafana/grafana/pkg/tsdb/azuremonitor/macros"
"github.com/grafana/grafana/pkg/tsdb/azuremonitor/types"
)
// AzureLogAnalyticsDatasource calls the Azure Log Analytics API's
type AzureLogAnalyticsDatasource struct {
Proxy types.ServiceProxy
}
// AzureLogAnalyticsQuery is the query request that is built from the saved values for
// from the UI
type AzureLogAnalyticsQuery struct {
RefID string
ResultFormat string
URL string
TraceExploreQuery string
TraceLogsExploreQuery string
JSON json.RawMessage
TimeRange backend.TimeRange
Query string
Resources []string
QueryType string
}
func (e *AzureLogAnalyticsDatasource) ResourceRequest(rw http.ResponseWriter, req *http.Request, cli *http.Client) {
e.Proxy.Do(rw, req, cli)
}
// executeTimeSeriesQuery does the following:
// 1. build the AzureMonitor url and querystring for each query
// 2. executes each query by calling the Azure Monitor API
// 3. parses the responses for each query into data frames
func (e *AzureLogAnalyticsDatasource) ExecuteTimeSeriesQuery(ctx context.Context, logger log.Logger, originalQueries []backend.DataQuery, dsInfo types.DatasourceInfo, client *http.Client, url string, tracer tracing.Tracer) (*backend.QueryDataResponse, error) {
result := backend.NewQueryDataResponse()
ctxLogger := logger.FromContext(ctx)
queries, err := e.buildQueries(ctxLogger, originalQueries, dsInfo)
if err != nil {
return nil, err
}
for _, query := range queries {
result.Responses[query.RefID] = e.executeQuery(ctx, ctxLogger, query, dsInfo, client, url, tracer)
}
return result, nil
}
func getApiURL(resourceOrWorkspace string) string {
matchesResourceURI, _ := regexp.MatchString("^/subscriptions/", resourceOrWorkspace)
if matchesResourceURI {
return fmt.Sprintf("v1%s/query", resourceOrWorkspace)
} else {
return fmt.Sprintf("v1/workspaces/%s/query", resourceOrWorkspace)
}
}
func (e *AzureLogAnalyticsDatasource) buildQueries(logger log.Logger, queries []backend.DataQuery, dsInfo types.DatasourceInfo) ([]*AzureLogAnalyticsQuery, error) {
azureLogAnalyticsQueries := []*AzureLogAnalyticsQuery{}
for _, query := range queries {
resources := []string{}
var resourceOrWorkspace string
var queryString string
var resultFormat string
traceExploreQuery := ""
traceLogsExploreQuery := ""
if query.QueryType == string(dataquery.AzureQueryTypeAzureLogAnalytics) {
queryJSONModel := types.LogJSONQuery{}
err := json.Unmarshal(query.JSON, &queryJSONModel)
if err != nil {
return nil, fmt.Errorf("failed to decode the Azure Log Analytics query object from JSON: %w", err)
}
azureLogAnalyticsTarget := queryJSONModel.AzureLogAnalytics
logger.Debug("AzureLogAnalytics", "target", azureLogAnalyticsTarget)
resultFormat = azureLogAnalyticsTarget.ResultFormat
if resultFormat == "" {
resultFormat = types.TimeSeries
}
// Legacy queries only specify a Workspace GUID, which we need to use the old workspace-centric
// API URL for, and newer queries specifying a resource URI should use resource-centric API.
// However, legacy workspace queries using a `workspaces()` template variable will be resolved
// to a resource URI, so they should use the new resource-centric.
if len(azureLogAnalyticsTarget.Resources) > 0 {
resources = azureLogAnalyticsTarget.Resources
resourceOrWorkspace = azureLogAnalyticsTarget.Resources[0]
} else if azureLogAnalyticsTarget.Resource != "" {
resources = []string{azureLogAnalyticsTarget.Resource}
resourceOrWorkspace = azureLogAnalyticsTarget.Resource
} else {
resourceOrWorkspace = azureLogAnalyticsTarget.Workspace
}
queryString = azureLogAnalyticsTarget.Query
}
if query.QueryType == string(dataquery.AzureQueryTypeAzureTraces) {
queryJSONModel := types.TracesJSONQuery{}
err := json.Unmarshal(query.JSON, &queryJSONModel)
if err != nil {
return nil, fmt.Errorf("failed to decode the Azure Traces query object from JSON: %w", err)
}
azureTracesTarget := queryJSONModel.AzureTraces
logger.Debug("AzureTraces", "target", azureTracesTarget)
if azureTracesTarget.ResultFormat == nil {
resultFormat = types.Table
} else {
resultFormat = string(*azureTracesTarget.ResultFormat)
if resultFormat == "" {
resultFormat = types.Table
}
}
resources = azureTracesTarget.Resources
resourceOrWorkspace = azureTracesTarget.Resources[0]
operationId := ""
if queryJSONModel.AzureTraces.OperationId != nil {
operationId = *queryJSONModel.AzureTraces.OperationId
}
queryString = buildTracesQuery(operationId, queryJSONModel.AzureTraces.TraceTypes, queryJSONModel.AzureTraces.Filters, &resultFormat)
traceIdVariable := "${__data.fields.traceID}"
if operationId == "" {
traceExploreQuery = buildTracesQuery(traceIdVariable, queryJSONModel.AzureTraces.TraceTypes, queryJSONModel.AzureTraces.Filters, nil)
traceLogsExploreQuery = buildTracesLogsQuery(&traceIdVariable)
} else {
traceExploreQuery = queryString
traceLogsExploreQuery = buildTracesLogsQuery(&operationId)
}
traceExploreQuery, err = macros.KqlInterpolate(logger, query, dsInfo, traceExploreQuery, "TimeGenerated")
if err != nil {
return nil, fmt.Errorf("failed to create traces explore query: %s", err)
}
traceLogsExploreQuery, err = macros.KqlInterpolate(logger, query, dsInfo, traceLogsExploreQuery, "TimeGenerated")
if err != nil {
return nil, fmt.Errorf("failed to create traces logs explore query: %s", err)
}
}
apiURL := getApiURL(resourceOrWorkspace)
rawQuery, err := macros.KqlInterpolate(logger, query, dsInfo, queryString, "TimeGenerated")
if err != nil {
return nil, err
}
azureLogAnalyticsQueries = append(azureLogAnalyticsQueries, &AzureLogAnalyticsQuery{
RefID: query.RefID,
ResultFormat: resultFormat,
URL: apiURL,
JSON: query.JSON,
TimeRange: query.TimeRange,
Query: rawQuery,
Resources: resources,
QueryType: query.QueryType,
TraceExploreQuery: traceExploreQuery,
TraceLogsExploreQuery: traceLogsExploreQuery,
})
}
return azureLogAnalyticsQueries, nil
}
func (e *AzureLogAnalyticsDatasource) executeQuery(ctx context.Context, logger log.Logger, query *AzureLogAnalyticsQuery, dsInfo types.DatasourceInfo, client *http.Client,
url string, tracer tracing.Tracer) backend.DataResponse {
dataResponse := backend.DataResponse{}
dataResponseErrorWithExecuted := func(err error) backend.DataResponse {
dataResponse.Error = err
dataResponse.Frames = data.Frames{
&data.Frame{
RefID: query.RefID,
Meta: &data.FrameMeta{
ExecutedQueryString: query.Query,
},
},
}
return dataResponse
}
// If azureLogAnalyticsSameAs is defined and set to false, return an error
if sameAs, ok := dsInfo.JSONData["azureLogAnalyticsSameAs"]; ok && !sameAs.(bool) {
return dataResponseErrorWithExecuted(fmt.Errorf("credentials for Log Analytics are no longer supported. Go to the data source configuration to update Azure Monitor credentials"))
}
queryJSONModel := dataquery.AzureMonitorQuery{}
err := json.Unmarshal(query.JSON, &queryJSONModel)
if err != nil {
dataResponse.Error = err
return dataResponse
}
if query.QueryType == string(dataquery.AzureQueryTypeAzureTraces) {
if queryJSONModel.AzureTraces.OperationId != nil && *queryJSONModel.AzureTraces.OperationId != "" {
query, err = getCorrelationWorkspaces(ctx, logger, query, dsInfo, *queryJSONModel.AzureTraces.OperationId, tracer)
if err != nil {
dataResponse.Error = err
return dataResponse
}
}
if dataquery.ResultFormat(query.ResultFormat) == (dataquery.ResultFormatTrace) && query.Query == "" {
return dataResponseErrorWithExecuted(fmt.Errorf("cannot visualise trace events using the trace visualiser"))
}
}
req, err := e.createRequest(ctx, logger, url, query)
if err != nil {
dataResponse.Error = err
return dataResponse
}
ctx, span := tracer.Start(ctx, "azure log analytics query")
span.SetAttributes("target", query.Query, attribute.Key("target").String(query.Query))
span.SetAttributes("from", query.TimeRange.From.UnixNano()/int64(time.Millisecond), attribute.Key("from").Int64(query.TimeRange.From.UnixNano()/int64(time.Millisecond)))
span.SetAttributes("until", query.TimeRange.To.UnixNano()/int64(time.Millisecond), attribute.Key("until").Int64(query.TimeRange.To.UnixNano()/int64(time.Millisecond)))
span.SetAttributes("datasource_id", dsInfo.DatasourceID, attribute.Key("datasource_id").Int64(dsInfo.DatasourceID))
span.SetAttributes("org_id", dsInfo.OrgID, attribute.Key("org_id").Int64(dsInfo.OrgID))
defer span.End()
tracer.Inject(ctx, req.Header, span)
logger.Debug("AzureLogAnalytics", "Request ApiURL", req.URL.String())
res, err := client.Do(req)
if err != nil {
return dataResponseErrorWithExecuted(err)
}
defer func() {
err := res.Body.Close()
if err != nil {
logger.Warn("failed to close response body", "error", err)
}
}()
logResponse, err := e.unmarshalResponse(logger, res)
if err != nil {
return dataResponseErrorWithExecuted(err)
}
t, err := logResponse.GetPrimaryResultTable()
if err != nil {
return dataResponseErrorWithExecuted(err)
}
frame, err := ResponseTableToFrame(t, query.RefID, query.Query, dataquery.AzureQueryType(query.QueryType), dataquery.ResultFormat(query.ResultFormat))
if err != nil {
return dataResponseErrorWithExecuted(err)
}
frame = appendErrorNotice(frame, logResponse.Error)
if frame == nil {
return dataResponse
}
azurePortalBaseUrl, err := GetAzurePortalUrl(dsInfo.Cloud)
if err != nil {
dataResponse.Error = err
return dataResponse
}
if query.QueryType == string(dataquery.AzureQueryTypeAzureTraces) && query.ResultFormat == string(dataquery.ResultFormatTrace) {
frame.Meta.PreferredVisualization = "trace"
}
if query.ResultFormat == string(dataquery.ResultFormatTable) {
frame.Meta.PreferredVisualization = "table"
}
if query.ResultFormat == types.TimeSeries {
tsSchema := frame.TimeSeriesSchema()
if tsSchema.Type == data.TimeSeriesTypeLong {
wideFrame, err := data.LongToWide(frame, nil)
if err == nil {
frame = wideFrame
} else {
frame.AppendNotices(data.Notice{Severity: data.NoticeSeverityWarning, Text: "could not convert frame to time series, returning raw table: " + err.Error()})
}
}
}
queryUrl, err := getQueryUrl(query.Query, query.Resources, azurePortalBaseUrl)
if err != nil {
dataResponse.Error = err
return dataResponse
}
if query.QueryType == string(dataquery.AzureQueryTypeAzureTraces) {
tracesUrl, err := getTracesQueryUrl(query.Resources, azurePortalBaseUrl)
if err != nil {
dataResponse.Error = err
return dataResponse
}
linkTitle := "Explore Trace in Azure Portal"
AddConfigLinks(*frame, tracesUrl, &linkTitle)
queryJSONModel := dataquery.AzureMonitorQuery{}
err = json.Unmarshal(query.JSON, &queryJSONModel)
if err != nil {
dataResponse.Error = err
return dataResponse
}
traceIdVariable := "${__data.fields.traceID}"
resultFormat := dataquery.AzureMonitorQueryAzureTracesResultFormatTrace
queryJSONModel.AzureTraces.ResultFormat = &resultFormat
queryJSONModel.AzureTraces.Query = &query.TraceExploreQuery
if queryJSONModel.AzureTraces.OperationId == nil || *queryJSONModel.AzureTraces.OperationId == "" {
queryJSONModel.AzureTraces.OperationId = &traceIdVariable
}
logsQueryType := string(dataquery.AzureQueryTypeAzureLogAnalytics)
logsJSONModel := dataquery.AzureMonitorQuery{
AzureLogAnalytics: &struct {
Query *string "json:\"query,omitempty\""
Resource *string "json:\"resource,omitempty\""
Resources []string "json:\"resources,omitempty\""
ResultFormat *dataquery.AzureMonitorQueryAzureLogAnalyticsResultFormat "json:\"resultFormat,omitempty\""
Workspace *string "json:\"workspace,omitempty\""
}{
Resources: queryJSONModel.AzureTraces.Resources,
Query: &query.TraceLogsExploreQuery,
},
QueryType: &logsQueryType,
}
AddCustomDataLink(*frame, data.DataLink{
Title: "Explore Trace: ${__data.fields.traceID}",
URL: "",
Internal: &data.InternalDataLink{
DatasourceUID: dsInfo.DatasourceUID,
DatasourceName: dsInfo.DatasourceName,
Query: queryJSONModel,
},
})
AddCustomDataLink(*frame, data.DataLink{
Title: "Explore Trace Logs",
URL: "",
Internal: &data.InternalDataLink{
DatasourceUID: dsInfo.DatasourceUID,
DatasourceName: dsInfo.DatasourceName,
Query: logsJSONModel,
},
})
} else {
AddConfigLinks(*frame, queryUrl, nil)
}
dataResponse.Frames = data.Frames{frame}
return dataResponse
}
func appendErrorNotice(frame *data.Frame, err *AzureLogAnalyticsAPIError) *data.Frame {
if err == nil {
return frame
}
if frame == nil {
frame = &data.Frame{}
}
frame.AppendNotices(apiErrorToNotice(err))
return frame
}
func (e *AzureLogAnalyticsDatasource) createRequest(ctx context.Context, logger log.Logger, queryURL string, query *AzureLogAnalyticsQuery) (*http.Request, error) {
body := map[string]interface{}{
"query": query.Query,
}
if len(query.Resources) > 1 {
body["resources"] = query.Resources
}
jsonValue, err := json.Marshal(body)
if err != nil {
return nil, fmt.Errorf("%v: %w", "failed to create request", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, queryURL, bytes.NewBuffer(jsonValue))
if err != nil {
logger.Debug("Failed to create request", "error", err)
return nil, fmt.Errorf("%v: %w", "failed to create request", err)
}
req.URL.Path = "/"
req.Header.Set("Content-Type", "application/json")
req.URL.Path = path.Join(req.URL.Path, query.URL)
return req, nil
}
type AzureLogAnalyticsURLResources struct {
Resources []AzureLogAnalyticsURLResource `json:"resources"`
}
type AzureLogAnalyticsURLResource struct {
ResourceID string `json:"resourceId"`
}
func getQueryUrl(query string, resources []string, azurePortalUrl string) (string, error) {
encodedQuery, err := encodeQuery(query)
if err != nil {
return "", fmt.Errorf("failed to encode the query: %s", err)
}
portalUrl := azurePortalUrl
if err != nil {
return "", fmt.Errorf("failed to parse base portal URL: %s", err)
}
portalUrl += "/#blade/Microsoft_OperationsManagementSuite_Workspace/AnalyticsBlade/initiator/AnalyticsShareLinkToQuery/isQueryEditorVisible/true/scope/"
resourcesJson := AzureLogAnalyticsURLResources{
Resources: make([]AzureLogAnalyticsURLResource, 0),
}
for _, resource := range resources {
resourcesJson.Resources = append(resourcesJson.Resources, AzureLogAnalyticsURLResource{
ResourceID: resource,
})
}
resourcesMarshalled, err := json.Marshal(resourcesJson)
if err != nil {
return "", fmt.Errorf("failed to marshal log analytics resources: %s", err)
}
portalUrl += url.QueryEscape(string(resourcesMarshalled))
portalUrl += "/query/" + url.PathEscape(encodedQuery) + "/isQueryBase64Compressed/true/timespanInIsoFormat/P1D"
return portalUrl, nil
}
func getTracesQueryUrl(resources []string, azurePortalUrl string) (string, error) {
portalUrl := azurePortalUrl
portalUrl += "/#view/AppInsightsExtension/DetailsV2Blade/ComponentId~/"
resource := struct {
ResourceId string `json:"ResourceId"`
}{
resources[0],
}
resourceMarshalled, err := json.Marshal(resource)
if err != nil {
return "", fmt.Errorf("failed to marshal application insights resource: %s", err)
}
portalUrl += url.PathEscape(string(resourceMarshalled))
portalUrl += "/DataModel~/"
// We're making use of data link variables to select the necessary fields in the frontend
eventId := "%22eventId%22%3A%22${__data.fields.itemId}%22%2C"
timestamp := "%22timestamp%22%3A%22${__data.fields.startTime}%22%2C"
eventTable := "%22eventTable%22%3A%22${__data.fields.itemType}%22"
traceObject := fmt.Sprintf("%%7B%s%s%s%%7D", eventId, timestamp, eventTable)
portalUrl += traceObject
return portalUrl, nil
}
func getCorrelationWorkspaces(ctx context.Context, logger log.Logger, query *AzureLogAnalyticsQuery, dsInfo types.DatasourceInfo, operationId string, tracer tracing.Tracer) (*AzureLogAnalyticsQuery, error) {
azMonService := dsInfo.Services["Azure Monitor"]
correlationUrl := azMonService.URL + fmt.Sprintf("%s/providers/microsoft.insights/transactions/%s", query.Resources[0], operationId)
callCorrelationAPI := func(url string) (AzureCorrelationAPIResponse, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewBuffer([]byte{}))
if err != nil {
logger.Debug("Failed to create request", "error", err)
return AzureCorrelationAPIResponse{}, fmt.Errorf("%v: %w", "failed to create request", err)
}
req.URL.Path = url
req.Header.Set("Content-Type", "application/json")
values := req.URL.Query()
values.Add("api-version", "2019-10-17-preview")
req.URL.RawQuery = values.Encode()
req.Method = "GET"
ctx, span := tracer.Start(ctx, "azure traces correlation request")
span.SetAttributes("target", req.URL, attribute.Key("target").String(req.URL.String()))
span.SetAttributes("datasource_id", dsInfo.DatasourceID, attribute.Key("datasource_id").Int64(dsInfo.DatasourceID))
span.SetAttributes("org_id", dsInfo.OrgID, attribute.Key("org_id").Int64(dsInfo.OrgID))
defer span.End()
tracer.Inject(ctx, req.Header, span)
logger.Debug("AzureLogAnalytics", "Traces Correlation ApiURL", req.URL.String())
res, err := azMonService.HTTPClient.Do(req)
if err != nil {
return AzureCorrelationAPIResponse{}, err
}
body, err := io.ReadAll(res.Body)
if err != nil {
return AzureCorrelationAPIResponse{}, err
}
defer func() {
err := res.Body.Close()
if err != nil {
logger.Warn("failed to close response body", "error", err)
}
}()
if res.StatusCode/100 != 2 {
logger.Debug("Request failed", "status", res.Status, "body", string(body))
return AzureCorrelationAPIResponse{}, fmt.Errorf("request failed, status: %s, body: %s", res.Status, string(body))
}
var data AzureCorrelationAPIResponse
d := json.NewDecoder(bytes.NewReader(body))
d.UseNumber()
err = d.Decode(&data)
if err != nil {
logger.Debug("Failed to unmarshal Azure Traces correlation API response", "error", err, "status", res.Status, "body", string(body))
return AzureCorrelationAPIResponse{}, err
}
return data, nil
}
var nextLink *string
var correlationResponse AzureCorrelationAPIResponse
correlationResponse, err := callCorrelationAPI(correlationUrl)
if err != nil {
return query, err
}
nextLink = correlationResponse.Properties.NextLink
query.Resources = correlationResponse.Properties.Resources
for nextLink != nil {
correlationResponse, err := callCorrelationAPI(correlationUrl)
if err != nil {
return query, err
}
query.Resources = append(query.Resources, correlationResponse.Properties.Resources...)
nextLink = correlationResponse.Properties.NextLink
}
return query, nil
}
// Error definition has been inferred from real data and other model definitions like
// https://github.com/Azure/azure-sdk-for-go/blob/3640559afddbad452d265b54fb1c20b30be0b062/services/preview/virtualmachineimagebuilder/mgmt/2019-05-01-preview/virtualmachineimagebuilder/models.go
type AzureLogAnalyticsAPIError struct {
Details *[]AzureLogAnalyticsAPIErrorBase `json:"details,omitempty"`
Code *string `json:"code,omitempty"`
Message *string `json:"message,omitempty"`
}
type AzureLogAnalyticsAPIErrorBase struct {
Code *string `json:"code,omitempty"`
Message *string `json:"message,omitempty"`
Innererror *AzureLogAnalyticsInnerError `json:"innererror,omitempty"`
}
type AzureLogAnalyticsInnerError struct {
Code *string `json:"code,omitempty"`
Message *string `json:"message,omitempty"`
Severity *int `json:"severity,omitempty"`
SeverityName *string `json:"severityName,omitempty"`
}
// AzureLogAnalyticsResponse is the json response object from the Azure Log Analytics API.
type AzureLogAnalyticsResponse struct {
Tables []types.AzureResponseTable `json:"tables"`
Error *AzureLogAnalyticsAPIError `json:"error,omitempty"`
}
type AzureCorrelationAPIResponse struct {
ID string `json:"id"`
Name string `json:"name"`
Type string `json:"type"`
Properties AzureCorrelationAPIResponseProperties `json:"properties"`
Error *AzureLogAnalyticsAPIError `json:"error,omitempty"`
}
type AzureCorrelationAPIResponseProperties struct {
Resources []string `json:"resources"`
NextLink *string `json:"nextLink,omitempty"`
}
// GetPrimaryResultTable returns the first table in the response named "PrimaryResult", or an
// error if there is no table by that name.
func (ar *AzureLogAnalyticsResponse) GetPrimaryResultTable() (*types.AzureResponseTable, error) {
for _, t := range ar.Tables {
if t.Name == "PrimaryResult" {
return &t, nil
}
}
return nil, fmt.Errorf("no data as PrimaryResult table is missing from the response")
}
func (e *AzureLogAnalyticsDatasource) unmarshalResponse(logger log.Logger, res *http.Response) (AzureLogAnalyticsResponse, error) {
body, err := io.ReadAll(res.Body)
if err != nil {
return AzureLogAnalyticsResponse{}, err
}
defer func() {
if err := res.Body.Close(); err != nil {
logger.Warn("Failed to close response body", "err", err)
}
}()
if res.StatusCode/100 != 2 {
logger.Debug("Request failed", "status", res.Status, "body", string(body))
return AzureLogAnalyticsResponse{}, fmt.Errorf("request failed, status: %s, body: %s", res.Status, string(body))
}
var data AzureLogAnalyticsResponse
d := json.NewDecoder(bytes.NewReader(body))
d.UseNumber()
err = d.Decode(&data)
if err != nil {
logger.Debug("Failed to unmarshal Azure Log Analytics response", "error", err, "status", res.Status, "body", string(body))
return AzureLogAnalyticsResponse{}, err
}
return data, nil
}
// LogAnalyticsMeta is a type for the a Frame's Meta's Custom property.
type LogAnalyticsMeta struct {
ColumnTypes []string `json:"azureColumnTypes"`
}
// encodeQuery encodes the query in gzip so the frontend can build links.
func encodeQuery(rawQuery string) (string, error) {
var b bytes.Buffer
gz := gzip.NewWriter(&b)
if _, err := gz.Write([]byte(rawQuery)); err != nil {
return "", err
}
if err := gz.Close(); err != nil {
return "", err
}
return base64.StdEncoding.EncodeToString(b.Bytes()), nil
}
func buildTracesQuery(operationId string, traceTypes []string, filters []types.TracesFilters, resultFormat *string) string {
types := traceTypes
if len(types) == 0 {
types = Tables
}
filteredTypes := make([]string, 0)
// If the result format is set to trace then we filter out all events that are of the type traces as they don't make sense when visualised as a span
if resultFormat != nil && dataquery.ResultFormat(*resultFormat) == dataquery.ResultFormatTrace {
filteredTypes = slices.Filter(filteredTypes, types, func(s string) bool { return s != "traces" })
} else {
filteredTypes = types
}
sort.Strings(filteredTypes)
if len(filteredTypes) == 0 {
return ""
}
tagsMap := make(map[string]bool)
var tags []string
for _, t := range filteredTypes {
tableTags := getTagsForTable(t)
for _, i := range tableTags {
if tagsMap[i] {
continue
}
if i == "cloud_RoleInstance" || i == "cloud_RoleName" || i == "customDimensions" || i == "customMeasurements" {
continue
}
tags = append(tags, i)
tagsMap[i] = true
}
}
sort.Strings(tags)
whereClause := ""
if operationId != "" {
whereClause = fmt.Sprintf("| where (operation_Id != '' and operation_Id == '%s') or (customDimensions.ai_legacyRootId != '' and customDimensions.ai_legacyRootId == '%s')", operationId, operationId)
}
filtersClause := ""
if len(filters) > 0 {
for _, filter := range filters {
if len(filter.Filters) == 0 {
continue
}
operation := "in"
if filter.Operation == "ne" {
operation = "!in"
}
filterValues := []string{}
for _, val := range filter.Filters {
filterValues = append(filterValues, fmt.Sprintf(`"%s"`, val))
}
filtersClause += fmt.Sprintf("| where %s %s (%s)", filter.Property, operation, strings.Join(filterValues, ","))
}
}
propertiesFunc := "bag_merge(customDimensions, customMeasurements)"
if len(tags) > 0 {
propertiesFunc = fmt.Sprintf("bag_merge(bag_pack_columns(%s), customDimensions, customMeasurements)", strings.Join(tags, ","))
}
errorProperty := ""
if slices.Contains(filteredTypes, "exceptions") {
errorProperty = `| extend error = iff(itemType == "exceptions", true, false)`
}
baseQuery := fmt.Sprintf(`set truncationmaxrecords=10000; set truncationmaxsize=67108864; union isfuzzy=true %s | where $__timeFilter()`, strings.Join(filteredTypes, ","))
propertiesStaticQuery := `| extend duration = iff(isnull(column_ifexists("duration", real(null))), toreal(0), column_ifexists("duration", real(null)))` +
`| extend spanID = iff(itemType == "pageView" or isempty(column_ifexists("id", "")), tostring(new_guid()), column_ifexists("id", ""))` +
`| extend operationName = iff(isempty(column_ifexists("name", "")), column_ifexists("problemId", ""), column_ifexists("name", ""))` +
`| extend serviceName = cloud_RoleName` +
`| extend serviceTags = bag_pack_columns(cloud_RoleInstance, cloud_RoleName)`
propertiesQuery := fmt.Sprintf(`| extend tags = %s`, propertiesFunc)
projectClause := `| project-rename traceID = operation_Id, parentSpanID = operation_ParentId, startTime = timestamp` +
`| project startTime, itemType, serviceName, duration, traceID, spanID, parentSpanID, operationName, serviceTags, tags, itemId` +
`| order by startTime asc`
return baseQuery + whereClause + propertiesStaticQuery + propertiesQuery + errorProperty + filtersClause + projectClause
}
func buildTracesLogsQuery(operationId *string) string {
tables := `union *, traces, customEvents, pageViews, requests, dependencies, exceptions, customMetrics, availabilityResults`
time := `| where $__timeFilter()`
operationIdFilter := fmt.Sprintf(`| where operation_Id == "%s"`, *operationId)
return strings.Join([]string{tables, time, operationIdFilter}, " \n")
}