Cloudwatch Logs: Fix bug where we did not return errors to user (#87190)

This commit is contained in:
Sarah Zinger
2024-06-04 08:43:36 -04:00
committed by GitHub
parent 2090270a13
commit 4d002d85f9
10 changed files with 701 additions and 767 deletions
-3
View File
@@ -6240,9 +6240,6 @@ exports[`better eslint`] = {
[0, 0, 0, "Do not use any type assertions.", "0"],
[0, 0, 0, "Do not use any type assertions.", "1"]
],
"public/app/plugins/datasource/cloudwatch/utils/logsRetry.ts:5381": [
[0, 0, 0, "Unexpected any. Specify a different type.", "0"]
],
"public/app/plugins/datasource/dashboard/index.ts:5381": [
[0, 0, 0, "Do not re-export imported variable (\`./runSharedRequest\`)", "0"],
[0, 0, 0, "Do not re-export imported variable (\`./DashboardQueryEditor\`)", "1"],
+3 -7
View File
@@ -57,14 +57,10 @@ func (e *cloudWatchExecutor) executeLogActions(ctx context.Context, req *backend
eg.Go(func() error {
dataframe, err := e.executeLogAction(ectx, logsQuery, query, req.PluginContext)
if err != nil {
var AWSError *AWSError
if errors.As(err, &AWSError) {
resultChan <- backend.Responses{
query.RefID: backend.DataResponse{Frames: data.Frames{}, Error: AWSError},
}
return nil
resultChan <- backend.Responses{
query.RefID: backend.DataResponse{Frames: data.Frames{}, Error: err},
}
return err
return nil
}
groupedFrames, err := groupResponseFrame(dataframe, logsQuery.StatsGroups)
+6 -3
View File
@@ -177,6 +177,8 @@ func TestQuery_StartQuery(t *testing.T) {
}
t.Run("invalid time range", func(t *testing.T) {
const refID = "A"
cli = fakeCWLogsClient{
logGroupFields: cloudwatchlogs.GetLogGroupFieldsOutput{
LogGroupFields: []*cloudwatchlogs.LogGroupField{
@@ -210,12 +212,13 @@ func TestQuery_StartQuery(t *testing.T) {
})
executor := newExecutor(im, log.NewNullLogger())
_, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{
resp, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{
PluginContext: backend.PluginContext{
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{},
},
Queries: []backend.DataQuery{
{
RefID: refID,
TimeRange: timeRange,
JSON: json.RawMessage(`{
"type": "logAction",
@@ -227,9 +230,9 @@ func TestQuery_StartQuery(t *testing.T) {
},
},
})
require.Error(t, err)
require.NoError(t, err)
assert.Contains(t, err.Error(), "invalid time range: start time must be before end time")
assert.Equal(t, "failed to execute log action with subtype: StartQuery: invalid time range: start time must be before end time", resp.Responses[refID].Error.Error())
})
t.Run("valid time range", func(t *testing.T) {
@@ -13,12 +13,10 @@ export function setupMockedLogsQueryRunner({
results: {},
},
variables,
mockGetVariableName = true,
settings = CloudWatchSettings,
}: {
data?: BackendDataSourceResponse;
variables?: CustomVariableModel[];
mockGetVariableName?: boolean;
settings?: DataSourceInstanceSettings<CloudWatchJsonData>;
} = {}) {
let templateService = setupMockedTemplateService(variables);
@@ -19,9 +19,8 @@ export interface CloudWatchLogsQueryFieldProps
query: CloudWatchLogsQuery;
}
export const CloudWatchLogsQueryField = (props: CloudWatchLogsQueryFieldProps) => {
const { query, datasource, onChange, ExtraFieldElement, data } = props;
const { query, datasource, onChange, ExtraFieldElement } = props;
const showError = data?.error?.refId === query.refId;
const monacoRef = useRef<Monaco>();
const disposalRef = useRef<monacoType.IDisposable>();
@@ -132,11 +131,6 @@ export const CloudWatchLogsQueryField = (props: CloudWatchLogsQueryFieldProps) =
</div>
{ExtraFieldElement}
</div>
{showError ? (
<div className="query-row-break">
<div className="prom-query-field-info text-error">{data?.error?.message}</div>
</div>
) : null}
</>
);
};
@@ -3,7 +3,6 @@ import { merge, Observable, of } from 'rxjs';
import {
CoreApp,
DataFrame,
DataQueryRequest,
DataQueryResponse,
DataSourceInstanceSettings,
@@ -80,6 +79,11 @@ export class CloudWatchDatasource
return query.hide !== true || (isCloudWatchMetricsQuery(query) && query.id !== '');
}
// reminder: when queries are made on the backend through alerting they will not go through this function
// we have duplicated code here to retry queries on the frontend so that the we can show partial results to users
// but ultimately anytime we add special error handling or logic retrying here we should ask ourselves
// could it only live in the backend? if so let's implement it there. If not, should it also live in the backend or just in the frontend?
// another note that at the end of the day all of these queries call super.query which is what forwards the request to the backend through /query
query(options: DataQueryRequest<CloudWatchQuery>): Observable<DataQueryResponse> {
options = cloneDeep(options);
@@ -142,11 +146,11 @@ export class CloudWatchDatasource
}));
}
getLogRowContext(
row: LogRowModel,
context?: LogRowContextOptions,
query?: CloudWatchLogsQuery
): Promise<{ data: DataFrame[] }> {
/**
* Get log row context for a given log row. This is called when the user clicks on a log row in the logs visualization and the "show context button"
* it shows the surrounding logs.
*/
getLogRowContext(row: LogRowModel, context?: LogRowContextOptions, query?: CloudWatchLogsQuery) {
return this.logsQueryRunner.getLogRowContext(row, context, super.query.bind(this), query);
}
@@ -1,38 +1,20 @@
import { interval, lastValueFrom, of } from 'rxjs';
import { lastValueFrom, of } from 'rxjs';
import {
DataQueryErrorType,
DataQueryRequest,
FieldType,
LogLevel,
LogRowContextQueryDirection,
LogRowModel,
MutableDataFrame,
dateTime,
DataQueryRequest,
LogRowContextQueryDirection,
} from '@grafana/data';
import {
CloudWatchSettings,
limitVariable,
logGroupNamesVariable,
regionVariable,
} from '../__mocks__/CloudWatchDataSource';
import { genMockFrames, genMockCloudWatchLogsRequest, setupMockedLogsQueryRunner } from '../__mocks__/LogsQueryRunner';
import { setupMockedLogsQueryRunner } from '../__mocks__/LogsQueryRunner';
import { LogsRequestMock } from '../__mocks__/Request';
import { validLogsQuery } from '../__mocks__/queries';
import { CloudWatchLogsQuery, LogAction, StartQueryRequest } from '../types';
import * as rxjsUtils from '../utils/rxjs/increasingInterval';
import { CloudWatchLogsQuery } from '../types'; // Add this import statement
import { LOG_IDENTIFIER_INTERNAL, LOGSTREAM_IDENTIFIER_INTERNAL } from './CloudWatchLogsQueryRunner';
jest.mock('@grafana/data', () => ({
...jest.requireActual('@grafana/data'),
getDefaultTimeRange: jest.fn().mockImplementation(() => {
const from = dateTime(1111);
const to = dateTime(2222);
return { from, to, raw: { from, to } };
}),
}));
import { LOGSTREAM_IDENTIFIER_INTERNAL, LOG_IDENTIFIER_INTERNAL } from './CloudWatchLogsQueryRunner';
describe('CloudWatchLogsQueryRunner', () => {
beforeEach(() => {
@@ -79,254 +61,499 @@ describe('CloudWatchLogsQueryRunner', () => {
});
});
describe('logs query', () => {
beforeEach(() => {
jest.spyOn(rxjsUtils, 'increasingInterval').mockImplementation(() => interval(100));
});
it('should stop querying when timed out', async () => {
const { runner, queryMock } = setupMockedLogsQueryRunner();
const fakeFrames = genMockFrames(20);
const initialRecordsMatched = fakeFrames[0].meta!.stats!.find(
(stat) => stat.displayName === 'Records scanned'
)!.value!;
for (let i = 1; i < 4; i++) {
fakeFrames[i].meta!.stats = [
{
displayName: 'Records scanned',
value: initialRecordsMatched,
},
];
}
const finalRecordsMatched = fakeFrames[9].meta!.stats!.find(
(stat) => stat.displayName === 'Records scanned'
)!.value!;
for (let i = 10; i < fakeFrames.length; i++) {
fakeFrames[i].meta!.stats = [
{
displayName: 'Records scanned',
value: finalRecordsMatched,
},
];
}
let i = 0;
jest.spyOn(runner, 'makeLogActionRequest').mockImplementation((subtype: LogAction) => {
if (subtype === 'GetQueryResults') {
const mockObservable = of([fakeFrames[i]]);
i++;
return mockObservable;
} else {
return of([]);
}
});
const iterations = 15;
// Times out after 15 passes for consistent testing
const timeoutFunc = () => {
return i >= iterations;
};
const myResponse = await lastValueFrom(
runner.logsQuery([{ queryId: 'fake-query-id', region: 'default', refId: 'A' }], timeoutFunc, queryMock)
);
const expectedData = [
{
...fakeFrames[14],
meta: {
custom: {
Status: 'Cancelled',
},
stats: fakeFrames[14].meta!.stats,
},
},
];
expect(myResponse).toEqual({
data: expectedData,
key: 'test-key',
state: 'Done',
error: {
type: DataQueryErrorType.Timeout,
message: `error: query timed out after 5 attempts`,
},
});
expect(i).toBe(iterations);
});
it('should continue querying as long as new data is being received', async () => {
const { runner, queryMock } = setupMockedLogsQueryRunner();
const fakeFrames = genMockFrames(15);
let i = 0;
jest.spyOn(runner, 'makeLogActionRequest').mockImplementation((subtype: LogAction) => {
if (subtype === 'GetQueryResults') {
const mockObservable = of([fakeFrames[i]]);
i++;
return mockObservable;
} else {
return of([]);
}
});
const startTime = new Date();
const timeoutFunc = () => {
return Date.now() >= startTime.valueOf() + 6000;
};
const myResponse = await lastValueFrom(
runner.logsQuery([{ queryId: 'fake-query-id', region: 'default', refId: 'A' }], timeoutFunc, queryMock)
);
expect(myResponse).toEqual({
data: [fakeFrames[fakeFrames.length - 1]],
key: 'test-key',
state: 'Done',
});
expect(i).toBe(15);
});
it('should stop querying when results come back with status "Complete"', async () => {
const { runner, queryMock } = setupMockedLogsQueryRunner();
const fakeFrames = genMockFrames(3);
let i = 0;
jest.spyOn(runner, 'makeLogActionRequest').mockImplementation((subtype: LogAction) => {
if (subtype === 'GetQueryResults') {
const mockObservable = of([fakeFrames[i]]);
i++;
return mockObservable;
} else {
return of([]);
}
});
const startTime = new Date();
const timeoutFunc = () => {
return Date.now() >= startTime.valueOf() + 6000;
};
const myResponse = await lastValueFrom(
runner.logsQuery([{ queryId: 'fake-query-id', region: 'default', refId: 'A' }], timeoutFunc, queryMock)
);
expect(myResponse).toEqual({
data: [fakeFrames[2]],
key: 'test-key',
state: 'Done',
});
expect(i).toBe(3);
});
});
const legacyLogGroupNamesQuery: CloudWatchLogsQuery = {
queryMode: 'Logs',
logGroupNames: ['group-A', 'templatedGroup-1', `$${logGroupNamesVariable.name}`],
hide: false,
id: '',
region: 'us-east-2',
refId: 'A',
expression: `fields @timestamp, @message | sort @timestamp desc | limit $${limitVariable.name}`,
};
const logGroupNamesQuery: CloudWatchLogsQuery = {
queryMode: 'Logs',
logGroups: [
{ arn: 'arn:aws:logs:us-east-2:123456789012:log-group:group-A:*', name: 'group-A' },
{ arn: `$${logGroupNamesVariable.name}`, name: logGroupNamesVariable.name },
],
hide: false,
id: '',
region: '$' + regionVariable.name,
refId: 'A',
expression: `fields @timestamp, @message | sort @timestamp desc | limit 1`,
};
const logsScopedVarQuery: CloudWatchLogsQuery = {
queryMode: 'Logs',
logGroups: [{ arn: `$${logGroupNamesVariable.name}`, name: logGroupNamesVariable.name }],
hide: false,
id: '',
region: '$' + regionVariable.name,
refId: 'A',
expression: `stats count(*) by queryType, bin(20s)`,
};
describe('handleLogQueries', () => {
it('should map log queries to start query requests correctly', async () => {
const { runner, queryMock } = setupMockedLogsQueryRunner({
variables: [logGroupNamesVariable, regionVariable, limitVariable],
settings: {
...CloudWatchSettings,
jsonData: {
...CloudWatchSettings.jsonData,
logsTimeout: '500ms',
},
},
mockGetVariableName: false,
});
const spy = jest.spyOn(runner, 'makeLogActionRequest');
await lastValueFrom(
runner.handleLogQueries(
[legacyLogGroupNamesQuery, logGroupNamesQuery, logsScopedVarQuery],
LogsRequestMock,
queryMock
)
);
const startQueryRequests: StartQueryRequest[] = [
{
queryString: `fields @timestamp, @message | sort @timestamp desc | limit ${limitVariable.current.value}`,
logGroupNames: ['group-A', ...logGroupNamesVariable.current.text],
logGroups: [],
refId: legacyLogGroupNamesQuery.refId,
region: legacyLogGroupNamesQuery.region,
},
{
queryString: logGroupNamesQuery.expression!,
logGroupNames: [],
logGroups: [
{
arn: 'arn:aws:logs:us-east-2:123456789012:log-group:group-A:*',
name: 'arn:aws:logs:us-east-2:123456789012:log-group:group-A:*',
},
...(logGroupNamesVariable.current.value as string[]).map((v) => ({ arn: v, name: v })),
],
refId: legacyLogGroupNamesQuery.refId,
region: regionVariable.current.value as string,
},
{
queryString: `stats count(*) by queryType, bin(20s)`,
logGroupNames: [],
logGroups: [...(logGroupNamesVariable.current.value as string[]).map((v) => ({ arn: v, name: v }))],
refId: legacyLogGroupNamesQuery.refId,
region: regionVariable.current.value as string,
},
];
expect(spy).toHaveBeenNthCalledWith(1, 'StartQuery', startQueryRequests, queryMock, LogsRequestMock);
});
});
it('should request to start each query and then request to get the query results', async () => {
const { runner } = setupMockedLogsQueryRunner();
describe('makeLogActionRequest', () => {
it('should use the time range from the options if it is available', async () => {
const { runner, queryMock } = setupMockedLogsQueryRunner();
const from = dateTime(0);
const to = dateTime(1000);
const options: DataQueryRequest<CloudWatchLogsQuery> = {
...LogsRequestMock,
range: { from, to, raw: { from, to } },
targets: rawLogQueriesStub,
};
await lastValueFrom(
runner.makeLogActionRequest('StartQuery', [genMockCloudWatchLogsRequest()], queryMock, options)
const queryFn = jest
.fn()
.mockReturnValueOnce(of(startQuerySuccessResponseStub))
.mockReturnValueOnce(of(getQuerySuccessResponseStub));
const response = runner.handleLogQueries(rawLogQueriesStub, options, queryFn);
const results = await lastValueFrom(response);
expect(queryFn).toHaveBeenCalledTimes(2);
expect(queryFn).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
targets: expect.arrayContaining([expect.objectContaining({ subtype: 'StartQuery' })]),
})
);
expect(queryMock.mock.calls[0][0].skipQueryCache).toBe(true);
expect(queryMock.mock.calls[0][0]).toEqual(expect.objectContaining({ range: { from, to, raw: { from, to } } }));
expect(queryFn).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
targets: expect.arrayContaining([expect.objectContaining({ subtype: 'GetQueryResults' })]),
})
);
expect(results).toEqual({
...getQuerySuccessResponseStub,
errors: [],
key: 'test-key',
});
});
it('should use the default time range if the time range in the options is not available', async () => {
const from = dateTime(1111);
const to = dateTime(2222);
const { runner, queryMock } = setupMockedLogsQueryRunner();
await lastValueFrom(runner.makeLogActionRequest('StartQuery', [genMockCloudWatchLogsRequest()], queryMock));
it('should call getQueryResults until the query returns with a status of complete', async () => {
const { runner } = setupMockedLogsQueryRunner();
expect(queryMock.mock.calls[0][0].skipQueryCache).toBe(true);
expect(queryMock.mock.calls[0][0]).toEqual(expect.objectContaining({ range: { from, to, raw: { from, to } } }));
const options: DataQueryRequest<CloudWatchLogsQuery> = {
...LogsRequestMock,
targets: rawLogQueriesStub,
};
const queryFn = jest
.fn()
.mockReturnValueOnce(of(startQuerySuccessResponseStub))
.mockReturnValueOnce(of(getQueryLoadingResponseStub))
.mockReturnValueOnce(of(getQueryLoadingResponseStub))
.mockReturnValueOnce(of(getQueryLoadingResponseStub))
.mockReturnValueOnce(of(getQuerySuccessResponseStub));
const response = runner.handleLogQueries(rawLogQueriesStub, options, queryFn);
const results = await lastValueFrom(response);
expect(queryFn).toHaveBeenCalledTimes(5);
// first call to start query
expect(queryFn).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
targets: expect.arrayContaining([expect.objectContaining({ subtype: 'StartQuery' })]),
})
);
// second call we try to get the results
expect(queryFn).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
targets: expect.arrayContaining([expect.objectContaining({ subtype: 'GetQueryResults' })]),
})
);
// after getting a loading response we wait and try again
expect(queryFn).toHaveBeenNthCalledWith(
3,
expect.objectContaining({
targets: expect.arrayContaining([expect.objectContaining({ subtype: 'GetQueryResults' })]),
})
);
// after getting a loading response we wait and try again
expect(queryFn).toHaveBeenNthCalledWith(
4,
expect.objectContaining({
targets: expect.arrayContaining([expect.objectContaining({ subtype: 'GetQueryResults' })]),
})
);
// after getting a loading response we wait and try again
expect(queryFn).toHaveBeenNthCalledWith(
5,
expect.objectContaining({
targets: expect.arrayContaining([expect.objectContaining({ subtype: 'GetQueryResults' })]),
})
);
expect(results).toEqual({
...getQuerySuccessResponseStub,
errors: [],
key: 'test-key',
});
});
it('should call getQueryResults until the query returns even if it the startQuery gets a rate limiting error from aws', async () => {
const { runner } = setupMockedLogsQueryRunner();
const options: DataQueryRequest<CloudWatchLogsQuery> = {
...LogsRequestMock,
targets: rawLogQueriesStub,
};
const queryFn = jest
.fn()
.mockReturnValueOnce(of(startQueryErrorWhenRateLimitedResponseStub))
.mockReturnValueOnce(of(startQuerySuccessResponseStub))
.mockReturnValueOnce(of(getQuerySuccessResponseStub));
const response = runner.handleLogQueries(rawLogQueriesStub, options, queryFn);
const results = await lastValueFrom(response);
expect(queryFn).toHaveBeenCalledTimes(3);
// first call
expect(queryFn).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
targets: expect.arrayContaining([expect.objectContaining({ subtype: 'StartQuery' })]),
})
);
// we retry because the first call failed with the rate limiting error
expect(queryFn).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
targets: expect.arrayContaining([expect.objectContaining({ subtype: 'StartQuery' })]),
})
);
// we get results because second call was successful
expect(queryFn).toHaveBeenNthCalledWith(
3,
expect.objectContaining({
targets: expect.arrayContaining([expect.objectContaining({ subtype: 'GetQueryResults' })]),
})
);
expect(results).toEqual({
...getQuerySuccessResponseStub,
errors: [],
key: 'test-key',
});
});
it('should return an error if it timesout before the start queries can get past a rate limiting error', async () => {
const { runner } = setupMockedLogsQueryRunner();
// first time timeout is called it will not be timed out, second time it will be timed out
const timeoutFunc = jest
.fn()
.mockImplementationOnce(() => false)
.mockImplementationOnce(() => true);
runner.createTimeoutFn = jest.fn(() => timeoutFunc);
const options: DataQueryRequest<CloudWatchLogsQuery> = {
...LogsRequestMock,
targets: rawLogQueriesStub,
};
// running query fn will always return the rate limit
const queryFn = jest.fn().mockReturnValue(of(startQueryErrorWhenRateLimitedResponseStub));
const response = runner.handleLogQueries(rawLogQueriesStub, options, queryFn);
const results = await lastValueFrom(response);
expect(queryFn).toHaveBeenCalledTimes(2);
// first call starts the query, but it fails with rate limiting error
expect(queryFn).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
targets: expect.arrayContaining([expect.objectContaining({ subtype: 'StartQuery' })]),
})
);
// we retry because the first call failed with the rate limiting error and we haven't timed out yet
expect(queryFn).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
targets: expect.arrayContaining([expect.objectContaining({ subtype: 'StartQuery' })]),
})
);
expect(results).toEqual({
...startQueryErrorWhenRateLimitedResponseStub,
key: 'test-key',
state: 'Done',
});
});
it('should return an error if the start query fails with an error that is not a rate limiting error', async () => {
const { runner } = setupMockedLogsQueryRunner();
const options: DataQueryRequest<CloudWatchLogsQuery> = {
...LogsRequestMock,
targets: rawLogQueriesStub,
};
const queryFn = jest.fn().mockReturnValueOnce(of(startQueryErrorWhenBadSyntaxResponseStub));
const response = runner.handleLogQueries(rawLogQueriesStub, options, queryFn);
const results = await lastValueFrom(response);
// only one query is made, it gets the error and returns the error
expect(queryFn).toHaveBeenCalledTimes(1);
expect(queryFn).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
targets: expect.arrayContaining([expect.objectContaining({ subtype: 'StartQuery' })]),
})
);
expect(results).toEqual({
...startQueryErrorWhenBadSyntaxResponseStub,
key: 'test-key',
state: 'Done',
});
});
it('should return an error and stop querying if get query results has finished with errors', async () => {
const { runner } = setupMockedLogsQueryRunner();
const options: DataQueryRequest<CloudWatchLogsQuery> = {
...LogsRequestMock,
targets: rawLogQueriesStub,
};
const queryFn = jest
.fn()
.mockReturnValueOnce(of(startQuerySuccessResponseStub))
.mockReturnValueOnce(of(getQueryLoadingResponseStub))
.mockReturnValueOnce(of(getQueryErrorResponseStub))
.mockReturnValueOnce(of(stopQueryResponseStub));
const response = runner.handleLogQueries(rawLogQueriesStub, options, queryFn);
const results = await lastValueFrom(response);
expect(queryFn).toHaveBeenCalledTimes(4);
expect(queryFn).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
targets: expect.arrayContaining([expect.objectContaining({ subtype: 'StartQuery' })]),
})
);
expect(queryFn).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
targets: expect.arrayContaining([expect.objectContaining({ subtype: 'GetQueryResults' })]),
})
);
expect(queryFn).toHaveBeenNthCalledWith(
3,
expect.objectContaining({
targets: expect.arrayContaining([expect.objectContaining({ subtype: 'GetQueryResults' })]),
})
);
expect(queryFn).toHaveBeenNthCalledWith(
4,
expect.objectContaining({
targets: expect.arrayContaining([expect.objectContaining({ subtype: 'StopQuery' })]),
})
);
expect(results).toEqual({
...getQueryErrorResponseStub,
key: 'test-key',
state: 'Done',
});
});
it('should return an error and any partial data if it timesout before getting back all the results', async () => {
const { runner } = setupMockedLogsQueryRunner();
// mocking running for a while and then timing out
const timeoutFunc = jest
.fn()
.mockImplementationOnce(() => false)
.mockImplementationOnce(() => false)
.mockImplementationOnce(() => false)
.mockImplementationOnce(() => true);
runner.createTimeoutFn = jest.fn(() => timeoutFunc);
const queryFn = jest
.fn()
.mockReturnValueOnce(of(startQuerySuccessResponseStub))
.mockReturnValueOnce(of(getQueryLoadingResponseStub))
.mockReturnValueOnce(of(getQueryLoadingResponseStub))
.mockReturnValueOnce(of(getQueryLoadingResponseStub))
.mockReturnValueOnce(of(getQueryLoadingResponseStub))
.mockReturnValueOnce(of(stopQueryResponseStub));
const options: DataQueryRequest<CloudWatchLogsQuery> = {
...LogsRequestMock,
targets: rawLogQueriesStub,
};
const response = runner.handleLogQueries(rawLogQueriesStub, options, queryFn);
const results = await lastValueFrom(response);
expect(queryFn).toHaveBeenCalledTimes(6);
expect(queryFn).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
targets: expect.arrayContaining([expect.objectContaining({ subtype: 'StartQuery' })]),
})
);
expect(queryFn).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
targets: expect.arrayContaining([expect.objectContaining({ subtype: 'GetQueryResults' })]),
})
);
expect(queryFn).toHaveBeenNthCalledWith(
3,
expect.objectContaining({
targets: expect.arrayContaining([expect.objectContaining({ subtype: 'GetQueryResults' })]),
})
);
expect(queryFn).toHaveBeenNthCalledWith(
4,
expect.objectContaining({
targets: expect.arrayContaining([expect.objectContaining({ subtype: 'GetQueryResults' })]),
})
);
expect(queryFn).toHaveBeenNthCalledWith(
5,
expect.objectContaining({
targets: expect.arrayContaining([expect.objectContaining({ subtype: 'GetQueryResults' })]),
})
);
expect(queryFn).toHaveBeenNthCalledWith(
6,
expect.objectContaining({
targets: expect.arrayContaining([expect.objectContaining({ subtype: 'StopQuery' })]),
})
);
expect(results).toEqual({
...getQueryLoadingResponseStub,
errors: [
{
message:
'Error: Query hit timeout before completing after 3 attempts, partial results may be shown. To increase the timeout window update your datasource configuration.',
refId: 'A',
type: 'timeout',
},
],
key: 'test-key',
state: 'Done',
});
});
});
});
const rawLogQueriesStub: CloudWatchLogsQuery[] = [
{
refId: 'A',
id: '',
region: 'us-east-2',
logGroups: [
{
accountId: 'accountId',
arn: 'somearn',
name: 'nameOfLogGroup',
},
],
queryMode: 'Logs',
expression: 'fields @timestamp, @message |\n sort @timestamp desc |\n limit 20',
datasource: {
type: 'cloudwatch',
uid: 'ff87aa43-7618-42ee-ae9c-4a405378728b',
},
},
];
const startQuerySuccessResponseStub = {
data: [
{
name: 'A',
refId: 'A',
meta: {
typeVersion: [0, 0],
custom: { Region: 'us-east-2' },
},
fields: [
{
name: 'queryId',
type: 'string',
typeInfo: { frame: 'string' },
config: {},
values: ['123'],
entities: {},
},
],
length: 1,
state: 'Done',
},
],
};
const startQueryErrorWhenRateLimitedResponseStub = {
data: [],
errors: [
{
refId: 'A',
message:
'failed to execute log action with subtype: StartQuery: LimitExceededException: LimitExceededException: Account maximum query concurrency limit of [30] reached.',
status: 500,
},
],
};
const startQueryErrorWhenBadSyntaxResponseStub = {
data: [],
state: 'Error',
errors: [
{
refId: 'A',
message:
'failed to execute log action with subtype: StartQuery: MalformedQueryException: unexpected symbol found bad at line 1 and position 843',
status: 500,
},
],
};
const getQuerySuccessResponseStub = {
data: [
{
name: 'A',
refId: 'A',
meta: {
custom: { Status: 'Complete' },
typeVersion: [0, 0],
stats: [
{ displayName: 'Bytes scanned', value: 1000 },
{ displayName: 'Records scanned', value: 1000 },
{ displayName: 'Records matched', value: 1000 },
],
},
fields: [
{
name: '@message',
type: 'string',
typeInfo: { frame: 'string' },
config: {},
values: ['some log'],
},
],
length: 1,
state: 'Done',
},
],
state: 'Done',
};
const getQueryLoadingResponseStub = {
data: [
{
name: 'A',
refId: 'A',
meta: {
custom: { Status: 'Running' },
typeVersion: [0, 0],
stats: [
{ displayName: 'Bytes scanned', value: 1 },
{ displayName: 'Records scanned', value: 1 },
{ displayName: 'Records matched', value: 1 },
],
},
fields: [
{
name: '@message',
type: 'string',
typeInfo: { frame: 'string' },
config: {},
values: ['some log'],
},
],
length: 1,
state: 'Done',
},
],
state: 'Done',
};
const getQueryErrorResponseStub = {
data: [],
errors: [
{
refId: 'A',
message: 'failed to execute log action with subtype: GetQueryResults: AWS is down',
status: 500,
},
],
state: 'Error',
};
const stopQueryResponseStub = {
state: 'Done',
};
@@ -1,6 +1,5 @@
import { set, uniq } from 'lodash';
import {
catchError,
concatMap,
finalize,
from,
@@ -31,7 +30,7 @@ import {
getDefaultTimeRange,
rangeUtil,
} from '@grafana/data';
import { config, FetchError, TemplateSrv } from '@grafana/runtime';
import { TemplateSrv } from '@grafana/runtime';
import {
CloudWatchJsonData,
@@ -67,67 +66,23 @@ export class CloudWatchLogsQueryRunner extends CloudWatchRequest {
this.logsTimeout = instanceSettings.jsonData.logsTimeout || '30m';
}
/**
* Check if the query is complete and returns results if it is. Otherwise it will poll for results.
*/
getQueryResults = ({
frames,
error,
logQueries,
timeoutFunc,
queryFn,
}: {
frames: DataFrame[];
logQueries: CloudWatchLogsQuery[];
timeoutFunc: () => boolean;
queryFn: (request: DataQueryRequest<CloudWatchQuery>) => Observable<DataQueryResponse>;
error?: DataQueryError;
}) => {
// If every frame is already finished, we can return the result as the
// query was run synchronously. Otherwise, we return `this.logsQuery`
// which will poll for the results.
if (
frames.every((frame) =>
[
CloudWatchLogsQueryStatus.Complete,
CloudWatchLogsQueryStatus.Cancelled,
CloudWatchLogsQueryStatus.Failed,
].includes(frame.meta?.custom?.['Status'])
)
) {
return of({
data: frames,
key: 'test-key',
state: LoadingState.Done,
});
}
return this.logsQuery(
frames.map((dataFrame) => ({
queryId: dataFrame.fields[0].values[0],
region: dataFrame.meta?.custom?.['Region'] ?? 'default',
refId: dataFrame.refId!,
statsGroups: logQueries.find((target) => target.refId === dataFrame.refId)?.statsGroups,
})),
timeoutFunc,
queryFn
).pipe(
map((response: DataQueryResponse) => {
if (!response.error && error) {
response.error = error;
}
return response;
})
);
// only public so that it is easy to mock out in tests
public createTimeoutFn = () => {
const startTime = new Date();
return () => {
return Date.now() >= startTime.valueOf() + rangeUtil.intervalToMs(this.logsTimeout);
};
};
/**
* Handle log query. The log query works by starting the query on the CloudWatch and then periodically polling for
* results.
* @param logQueries
* @param options
* Where all frontend log queries start. Log Queries are started and then we poll for the results.
* There is a timeout set in the ds configuration that will stop the query if it takes too long.
* We automatically retry logs queries that hit rate limits from aws.
* @param logQueries the raw log queries as created by the user
* @param options the full raw query request which might contain other queries
* @param queryFn the inherited query function from the datasource that calls /query endpoint
*/
handleLogQueries = (
public handleLogQueries = (
logQueries: CloudWatchLogsQuery[],
options: DataQueryRequest<CloudWatchQuery>,
queryFn: (request: DataQueryRequest<CloudWatchQuery>) => Observable<DataQueryResponse>
@@ -163,21 +118,19 @@ export class CloudWatchLogsQueryRunner extends CloudWatchRequest {
};
});
const startTime = new Date();
const timeoutFunc = () => {
return Date.now() >= startTime.valueOf() + rangeUtil.intervalToMs(this.logsTimeout);
};
const timeoutFunc = this.createTimeoutFn();
// run with retry will retry any failed start queries due to rate limiting
return runWithRetry(
(targets) => {
return this.makeLogActionRequest('StartQuery', targets, queryFn, options);
},
(targets) => this.makeLogActionRequest('StartQuery', targets, queryFn, options),
startQueryRequests,
timeoutFunc
).pipe(
mergeMap(({ frames, error }: { frames: DataFrame[]; error?: DataQueryError }) =>
this.getQueryResults({ frames, logQueries, timeoutFunc, error, queryFn })
),
// once we've started the query, we need to poll for the results
mergeMap((startQueryResponse) => {
return this.getQueryResults({ logQueries, timeoutFunc, queryFn, startQueryResponse });
}),
// once we get the results, we add data links to the logs
mergeMap((dataQueryResponse) => {
return from(
(async () => {
@@ -197,14 +150,102 @@ export class CloudWatchLogsQueryRunner extends CloudWatchRequest {
);
};
/**
* Called by datasource.ts, invoked when user clicks on a log row in the logs visualization and the "show context button"
*/
public getLogRowContext = async (
row: LogRowModel,
{ limit = 10, direction = LogRowContextQueryDirection.Backward }: LogRowContextOptions = {},
queryFn: (request: DataQueryRequest<CloudWatchQuery>) => Observable<DataQueryResponse>,
query?: CloudWatchLogsQuery
) => {
let logStreamField = null;
let logField = null;
for (const field of row.dataFrame.fields) {
if (field.name === LOGSTREAM_IDENTIFIER_INTERNAL) {
logStreamField = field;
if (logField !== null) {
break;
}
} else if (field.name === LOG_IDENTIFIER_INTERNAL) {
logField = field;
if (logStreamField !== null) {
break;
}
}
}
const requestParams: GetLogEventsRequest = {
refId: query?.refId || 'A', // dummy
limit,
startFromHead: direction !== LogRowContextQueryDirection.Backward,
region: query?.region || '',
logGroupName: parseLogGroupName(logField!.values[row.rowIndex]),
logStreamName: logStreamField!.values[row.rowIndex],
};
if (direction === LogRowContextQueryDirection.Backward) {
requestParams.endTime = row.timeEpochMs;
} else {
requestParams.startTime = row.timeEpochMs;
}
return await lastValueFrom(this.makeLogActionRequest('GetLogEvents', [requestParams], queryFn));
};
/**
* Check if an already started query is complete and returns results if it is. Otherwise it will start polling for results.
*/
private getQueryResults = ({
logQueries,
timeoutFunc,
queryFn,
startQueryResponse,
}: {
logQueries: CloudWatchLogsQuery[];
timeoutFunc: () => boolean;
queryFn: (request: DataQueryRequest<CloudWatchQuery>) => Observable<DataQueryResponse>;
startQueryResponse: DataQueryResponse;
}) => {
if (
startQueryResponse.data.every((frame) =>
[
CloudWatchLogsQueryStatus.Complete,
CloudWatchLogsQueryStatus.Cancelled,
CloudWatchLogsQueryStatus.Failed,
].includes(frame.meta?.custom?.['Status'])
)
) {
return of({
key: 'test-key',
state: LoadingState.Done,
...startQueryResponse,
});
}
return this.pollForLogQueryResults(
startQueryResponse.data.map((dataFrame) => ({
queryId: dataFrame.fields[0].values[0],
region: dataFrame.meta?.custom?.['Region'] ?? 'default',
refId: dataFrame.refId!,
statsGroups: logQueries.find((target) => target.refId === dataFrame.refId)?.statsGroups,
})),
timeoutFunc,
queryFn,
startQueryResponse.errors || []
);
};
/**
* Checks progress and polls data of a started logs query with some retry logic.
* @param queryParams
*/
logsQuery(
private pollForLogQueryResults(
queryParams: QueryParam[],
timeoutFunc: () => boolean,
queryFn: (request: DataQueryRequest<CloudWatchQuery>) => Observable<DataQueryResponse>
queryFn: (request: DataQueryRequest<CloudWatchQuery>) => Observable<DataQueryResponse>,
errorsFromStartQuery: DataQueryError[]
): Observable<DataQueryResponse> {
this.logQueries = {};
queryParams.forEach((param) => {
@@ -215,16 +256,30 @@ export class CloudWatchLogsQueryRunner extends CloudWatchRequest {
};
});
const dataFrames = increasingInterval({ startPeriod: 100, endPeriod: 1000, step: 300 }).pipe(
const responses = increasingInterval({ startPeriod: 100, endPeriod: 1000, step: 300 }).pipe(
concatMap((_) => this.makeLogActionRequest('GetQueryResults', queryParams, queryFn)),
repeat(),
share()
);
let errorsFromGetQuery: DataQueryError[] = [];
const dataFrames: Observable<DataFrame[]> = responses.pipe(
map((response) => {
// TODO: it's not entirely clear to me why but this map gets called twice, but the responses are the same
// I think it has something to do with lingering subscriptions being opened, it feels like a bug here.
// In an ideal world we'd push the errors to an array, not reset it
if (response.errors) {
errorsFromGetQuery = response.errors;
}
return response.data;
})
);
const initialValue: { failures: number; prevRecordsMatched: Record<string, number> } = {
failures: 0,
prevRecordsMatched: {},
};
const consecutiveFailedAttempts = dataFrames.pipe(
scan(({ failures, prevRecordsMatched }, frames) => {
failures++;
@@ -258,9 +313,16 @@ export class CloudWatchLogsQueryRunner extends CloudWatchRequest {
}
}),
map(([dataFrames, failedAttempts]) => {
// if we've timed out, we set a status of cancel which will stop the query from being retried again in getQueryResults
const errors = [...errorsFromStartQuery, ...errorsFromGetQuery];
if (timeoutFunc()) {
for (const frame of dataFrames) {
set(frame, 'meta.custom.Status', CloudWatchLogsQueryStatus.Cancelled);
errors.push({
message: `Error: Query hit timeout before completing after ${failedAttempts} attempts, partial results may be shown. To increase the timeout window update your datasource configuration.`,
type: DataQueryErrorType.Timeout,
refId: frame.refId,
});
}
}
@@ -276,12 +338,7 @@ export class CloudWatchLogsQueryRunner extends CloudWatchRequest {
)
? LoadingState.Done
: LoadingState.Loading,
error: timeoutFunc()
? {
message: `error: query timed out after ${failedAttempts} attempts`,
type: DataQueryErrorType.Timeout,
}
: undefined,
errors: errors,
};
}),
takeWhile(({ state }) => state !== LoadingState.Error && state !== LoadingState.Done, true)
@@ -290,7 +347,7 @@ export class CloudWatchLogsQueryRunner extends CloudWatchRequest {
return withTeardown(queryResponse, () => this.stopQueries(queryFn));
}
stopQueries(queryFn: (request: DataQueryRequest<CloudWatchQuery>) => Observable<DataQueryResponse>) {
private stopQueries(queryFn: (request: DataQueryRequest<CloudWatchQuery>) => Observable<DataQueryResponse>) {
if (Object.keys(this.logQueries).length > 0) {
this.makeLogActionRequest(
'StopQuery',
@@ -309,12 +366,12 @@ export class CloudWatchLogsQueryRunner extends CloudWatchRequest {
}
}
makeLogActionRequest(
private makeLogActionRequest(
subtype: LogAction,
queryParams: CloudWatchLogsRequest[],
queryFn: (request: DataQueryRequest<CloudWatchQuery>) => Observable<DataQueryResponse>,
options?: DataQueryRequest<CloudWatchQuery>
): Observable<DataFrame[]> {
): Observable<DataQueryResponse> {
const range = options?.range || getDefaultTimeRange();
const requestParams: DataQueryRequest<CloudWatchLogsQuery> = {
@@ -341,74 +398,9 @@ export class CloudWatchLogsQueryRunner extends CloudWatchRequest {
})),
};
return queryFn(requestParams).pipe(
map((response) => response.data),
catchError((err: FetchError) => {
if (config.featureToggles.datasourceQueryMultiStatus && err.status === 207) {
throw err;
}
if (err.status === 400) {
throw err;
}
if (err.data?.error) {
throw err.data.error;
} else if (err.data?.message) {
// In PROD we do not supply .error
throw err.data.message;
}
throw err;
})
);
return queryFn(requestParams);
}
getLogRowContext = async (
row: LogRowModel,
{ limit = 10, direction = LogRowContextQueryDirection.Backward }: LogRowContextOptions = {},
queryFn: (request: DataQueryRequest<CloudWatchQuery>) => Observable<DataQueryResponse>,
query?: CloudWatchLogsQuery
): Promise<{ data: DataFrame[] }> => {
let logStreamField = null;
let logField = null;
for (const field of row.dataFrame.fields) {
if (field.name === LOGSTREAM_IDENTIFIER_INTERNAL) {
logStreamField = field;
if (logField !== null) {
break;
}
} else if (field.name === LOG_IDENTIFIER_INTERNAL) {
logField = field;
if (logStreamField !== null) {
break;
}
}
}
const requestParams: GetLogEventsRequest = {
refId: query?.refId || 'A', // dummy
limit,
startFromHead: direction !== LogRowContextQueryDirection.Backward,
region: query?.region || '',
logGroupName: parseLogGroupName(logField!.values[row.rowIndex]),
logStreamName: logStreamField!.values[row.rowIndex],
};
if (direction === LogRowContextQueryDirection.Backward) {
requestParams.endTime = row.timeEpochMs;
} else {
requestParams.startTime = row.timeEpochMs;
}
const dataFrames = await lastValueFrom(this.makeLogActionRequest('GetLogEvents', [requestParams], queryFn));
return {
data: dataFrames,
};
};
private filterQuery(query: CloudWatchLogsQuery) {
const hasMissingLegacyLogGroupNames = !query.logGroupNames?.length;
const hasMissingLogGroups = !query.logGroups?.length;
@@ -1,231 +0,0 @@
import { lastValueFrom, of, throwError } from 'rxjs';
import { toArray } from 'rxjs/operators';
import { dataFrameToJSON, MutableDataFrame } from '@grafana/data';
import { DataResponse, FetchError } from '@grafana/runtime';
import { StartQueryRequest } from '../types';
import { runWithRetry } from './logsRetry';
describe('runWithRetry', () => {
const timeoutPass = () => false;
const timeoutFail = () => true;
it('returns results if no retry is needed', async () => {
const queryFunc = jest.fn();
const mockFrames = [createResponseFrame('A')];
queryFunc.mockReturnValueOnce(of(mockFrames));
const targets = [targetA];
const values = await lastValueFrom(runWithRetry(queryFunc, targets, timeoutPass).pipe(toArray()));
expect(queryFunc).toBeCalledTimes(1);
expect(queryFunc).toBeCalledWith(targets);
expect(values).toEqual([{ frames: mockFrames }]);
});
it('retries if error', async () => {
jest.useFakeTimers();
const targets = [targetA];
const queryFunc = jest.fn();
const mockFrames = [createResponseFrame('A')];
queryFunc.mockReturnValueOnce(throwError(() => createErrorResponse(targets)));
queryFunc.mockReturnValueOnce(of(mockFrames));
const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets, timeoutPass).pipe(toArray()));
jest.runAllTimers();
const values = await valuesPromise;
expect(queryFunc).toBeCalledTimes(2);
expect(queryFunc).nthCalledWith(1, targets);
expect(queryFunc).nthCalledWith(2, targets);
expect(values).toEqual([{ frames: mockFrames }]);
});
it('fails if reaching timeout and no data was retrieved', async () => {
jest.useFakeTimers();
const targets = [targetA];
const queryFunc = jest.fn();
queryFunc.mockReturnValueOnce(throwError(() => createErrorResponse(targets)));
queryFunc.mockReturnValueOnce(of([createResponseFrame('A')]));
const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets, timeoutFail).pipe(toArray()));
jest.runAllTimers();
let error;
try {
await valuesPromise;
} catch (e) {
error = e;
}
expect(queryFunc).toBeCalledTimes(1);
expect(queryFunc).nthCalledWith(1, targets);
expect(error).toEqual({ message: 'LimitExceededException', refId: 'A' });
});
it('fails if we get unexpected error', async () => {
jest.useFakeTimers();
const targets = [targetA];
const queryFunc = jest.fn();
queryFunc.mockReturnValueOnce(throwError(() => 'random error'));
const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets, timeoutPass).pipe(toArray()));
jest.runAllTimers();
let error;
try {
await valuesPromise;
} catch (e) {
error = e;
}
expect(queryFunc).toBeCalledTimes(1);
expect(queryFunc).nthCalledWith(1, targets);
expect(error).toEqual('random error');
});
it('works with multiple queries if there is no error', async () => {
const targets = [targetA, targetB];
const queryFunc = jest.fn();
const mockFrames = [createResponseFrame('A'), createResponseFrame('B')];
queryFunc.mockReturnValueOnce(of(mockFrames));
const values = await lastValueFrom(runWithRetry(queryFunc, targets, timeoutPass).pipe(toArray()));
expect(queryFunc).toBeCalledTimes(1);
expect(queryFunc).nthCalledWith(1, targets);
expect(values).toEqual([{ frames: mockFrames }]);
});
it('works with multiple queries only one errors out', async () => {
jest.useFakeTimers();
const targets = [targetA, targetB];
const queryFunc = jest.fn();
queryFunc.mockReturnValueOnce(
throwError(() =>
createErrorResponse(targets, {
A: { frames: [dataFrameToJSON(createResponseFrame('A'))] },
B: { error: 'LimitExceededException' },
})
)
);
queryFunc.mockReturnValueOnce(of([createResponseFrame('B')]));
const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets, timeoutPass).pipe(toArray()));
jest.runAllTimers();
const values = await valuesPromise;
expect(queryFunc).toBeCalledTimes(2);
expect(queryFunc).nthCalledWith(1, targets);
expect(queryFunc).nthCalledWith(2, [targetB]);
// Bit more involved because dataFrameToJSON and dataFrameFromJSON are not symmetrical and add some attributes to the
// dataframe fields
expect(values.length).toBe(1);
expect(values[0].frames.length).toBe(2);
expect(values[0].frames[0].fields[0].values[0]).toBe('A');
expect(values[0].frames[1].fields[0].values[0]).toBe('B');
});
it('sends data and also error if only one query gets limit error', async () => {
jest.useFakeTimers();
const targets = [targetA, targetB];
const queryFunc = jest.fn();
queryFunc.mockReturnValueOnce(
throwError(() =>
createErrorResponse(targets, {
A: { frames: [dataFrameToJSON(createResponseFrame('A'))] },
B: { error: 'LimitExceededException' },
})
)
);
const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets, timeoutFail).pipe(toArray()));
jest.runAllTimers();
const values = await valuesPromise;
expect(queryFunc).toBeCalledTimes(1);
expect(queryFunc).nthCalledWith(1, targets);
expect(values.length).toBe(1);
expect(values[0].frames.length).toBe(1);
expect(values[0].frames[0].fields[0].values[0]).toBe('A');
expect(values[0].error).toEqual({ message: 'Some queries timed out: LimitExceededException' });
});
it('sends all collected successful responses on timeout', async () => {
jest.useFakeTimers();
const targets = [targetA, targetB, targetC];
const queryFunc = jest.fn();
queryFunc.mockReturnValueOnce(
throwError(() =>
createErrorResponse(targets, {
A: { frames: [dataFrameToJSON(createResponseFrame('A'))] },
B: { error: 'LimitExceededException' },
C: { error: 'LimitExceededException' },
})
)
);
queryFunc.mockReturnValueOnce(
throwError(() =>
createErrorResponse(targets, {
B: { frames: [dataFrameToJSON(createResponseFrame('B'))] },
C: { error: 'LimitExceededException' },
})
)
);
queryFunc.mockReturnValueOnce(
throwError(() =>
createErrorResponse(targets, {
C: { error: 'LimitExceededException' },
})
)
);
const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets, (retry) => retry >= 2).pipe(toArray()));
jest.runAllTimers();
const values = await valuesPromise;
expect(queryFunc).toBeCalledTimes(3);
expect(queryFunc).nthCalledWith(1, targets);
expect(queryFunc).nthCalledWith(2, [targetB, targetC]);
expect(queryFunc).nthCalledWith(3, [targetC]);
expect(values.length).toBe(1);
expect(values[0].frames.length).toBe(2);
expect(values[0].frames[0].fields[0].values[0]).toBe('A');
expect(values[0].frames[1].fields[0].values[0]).toBe('B');
expect(values[0].error).toEqual({ message: 'Some queries timed out: LimitExceededException' });
});
});
const targetA = makeTarget('A');
const targetB = makeTarget('B');
const targetC = makeTarget('C');
function makeTarget(refId: string) {
return { queryString: 'query ' + refId, refId, region: 'test' };
}
function createResponseFrame(ref: string) {
return new MutableDataFrame({
fields: [{ name: 'queryId', values: [ref] }],
refId: ref,
});
}
function createErrorResponse(targets: StartQueryRequest[], results?: Record<string, DataResponse>): FetchError {
return {
status: 400,
data: {
results: results || {
A: {
error: 'LimitExceededException',
},
},
},
config: {
url: '',
data: {
queries: targets,
},
},
};
}
@@ -1,7 +1,7 @@
import { Observable, Subscription } from 'rxjs';
import { DataFrame, DataFrameJSON, DataQueryError } from '@grafana/data';
import { FetchError, toDataQueryResponse } from '@grafana/runtime';
import { DataFrame, DataFrameJSON, DataQueryError, DataQueryResponse } from '@grafana/data';
import { FetchError } from '@grafana/runtime';
import { StartQueryRequest } from '../types';
@@ -21,16 +21,16 @@ type Result = { frames: DataFrameJSON[]; error?: string };
* @param options
*/
export function runWithRetry(
queryFun: (targets: StartQueryRequest[]) => Observable<DataFrame[]>,
queryFun: (targets: StartQueryRequest[]) => Observable<DataQueryResponse>,
targets: StartQueryRequest[],
timeoutFunc: (retry: number, startTime: number) => boolean
): Observable<{ frames: DataFrame[]; error?: DataQueryError }> {
): Observable<DataQueryResponse> {
const startTime = new Date();
let retries = 0;
let timerID: ReturnType<typeof setTimeout>;
let subscription: Subscription;
let collected = {};
let collected: { data: DataFrame[]; errors: DataQueryError[] } = { data: [], errors: [] };
// This function is used to calculate the time to wait before retrying the query.
const retryWaitFunction = (retry: number) => {
return Math.pow(2, retry) * 1000 + Math.random() * 100;
};
@@ -39,83 +39,47 @@ export function runWithRetry(
// Run function is where the logic takes place. We have it in a function so we can call it recursively.
function run(currentQueryParams: StartQueryRequest[]) {
subscription = queryFun(currentQueryParams).subscribe({
next(frames) {
// In case we successfully finished, merge the current response with whatever we already collected.
const collectedPreviously = toDataQueryResponse({ data: { results: collected } }).data || [];
observer.next({ frames: [...collectedPreviously, ...frames] });
next(response: DataQueryResponse) {
if (response.errors) {
const { refIdsForRequestsToRetry, errorsNotToRetry } = splitErrorsData(response.errors);
if (refIdsForRequestsToRetry.length > 0) {
if (!timeoutFunc(retries, startTime.valueOf())) {
// store the responses we are not retrying
collected.data = [...collected.data, ...response.data];
collected.errors = [...collected.errors, ...errorsNotToRetry];
// We retry only the failed queries
timerID = setTimeout(
() => {
retries++;
run(currentQueryParams.filter((query) => refIdsForRequestsToRetry.includes(query.refId)));
},
// We want to know how long to wait for the next retry. First time this will be 0.
retryWaitFunction(retries + 1)
);
// we return early. The observer.next will be called whenever the timeout finisies or there are no errors.
return;
}
}
}
// if the timeout is done or it was never called we take what we have from past retries and the current round
collected.data = [...collected.data, ...response.data];
collected.errors = [
...collected.errors,
...(response.errors && response.errors.length > 0 ? response.errors : []),
];
observer.next(collected);
observer.complete();
},
// if the server returns a raw string 5xx error, something is very unexpectedly wrong and we just forward it
error(error: FetchError<{ results?: Record<string, Result> }> | string) {
// In case of error we first try to figure out what kind of error it is
// This means it was a generic 500 error probably so we just pass it on
if (typeof error === 'string') {
observer.error(error);
return;
}
// In case of multiple queries this some can error while some may be ok
const errorData = splitErrorData(error);
if (!errorData) {
// Not sure what happened but the error structure wasn't what we expected
observer.error(error);
return;
}
if (!errorData!.errors.length) {
// So there is no limit error but some other errors so nothing to retry so we just pass it as it would be
// otherwise.
observer.error(error);
return;
}
if (timeoutFunc(retries, startTime.valueOf())) {
// We timed out but we could have started some queries
if (Object.keys(collected).length || Object.keys(errorData.good).length) {
const dataResponse = toDataQueryResponse({
data: {
results: {
...(errorData.good ?? {}),
...(collected ?? {}),
},
},
});
dataResponse.error = {
...(dataResponse.error ?? {}),
message: `Some queries timed out: ${errorData.errorMessage}`,
};
// So we consider this a partial success and pass the data forward but also with error to be shown to
// the user.
observer.next({
error: dataResponse.error,
frames: dataResponse.data,
});
observer.complete();
} else {
// So we timed out and there was no data to pass forward so we just pass the error
const dataResponse = toDataQueryResponse({ data: { results: error.data?.results ?? {} } });
observer.error(dataResponse.error);
}
return;
}
collected = {
...collected,
...errorData!.good,
};
timerID = setTimeout(
() => {
retries++;
run(errorData!.errors);
},
// We want to know how long to wait for the next retry. First time this will be 0.
retryWaitFunction(retries + 1)
);
observer.error(error);
},
});
}
run(targets);
return () => {
// We clear only the latest timer and subscription but the observable should complete after one response so
@@ -126,25 +90,15 @@ export function runWithRetry(
});
}
function splitErrorData(error: FetchError<{ results?: Record<string, Result> }>) {
const results = error.data?.results;
if (!results) {
return undefined;
}
return Object.keys(results).reduce<{
errors: StartQueryRequest[];
good: Record<string, Result>;
errorMessage: string;
}>(
(acc, refId) => {
if (results[refId].error?.startsWith('LimitExceededException')) {
acc.errorMessage = results[refId].error!;
acc.errors.push(error.config.data.queries.find((q: any) => q.refId === refId));
} else {
acc.good[refId] = results[refId];
}
return acc;
},
{ errors: [], good: {}, errorMessage: '' }
);
function splitErrorsData(errors: DataQueryError[]) {
const refIdsForRequestsToRetry: string[] = [];
const errorsNotToRetry: DataQueryError[] = [];
errors.map((err) => {
if (err?.message?.includes('LimitExceededException') && err.refId) {
refIdsForRequestsToRetry.push(err.refId);
} else {
errorsNotToRetry.push(err);
}
});
return { refIdsForRequestsToRetry, errorsNotToRetry };
}