From b4204628e4b41ea2fdecf85d2c704bddcbc64d97 Mon Sep 17 00:00:00 2001 From: Alexander Emelin Date: Wed, 5 Jan 2022 19:02:12 +0300 Subject: [PATCH] Live: optionally send queries over websocket connection (#41653) Co-authored-by: ArturWierzbicki Co-authored-by: Ryan McKinley --- packages/grafana-data/src/types/config.ts | 1 + packages/grafana-runtime/src/config.ts | 1 + packages/grafana-runtime/src/services/live.ts | 22 ++++++++++++-- .../src/utils/DataSourceWithBackend.ts | 11 ++++++- .../live/centrifuge/LiveDataStream.test.ts | 7 +++-- .../live/centrifuge/LiveDataStream.ts | 3 +- .../app/features/live/centrifuge/service.ts | 28 +++++++++++++++-- .../live/centrifuge/service.worker.ts | 7 ++++- .../live/centrifuge/serviceWorkerProxy.ts | 8 +++++ public/app/features/live/live.ts | 30 +++++++++++++++++-- 10 files changed, 103 insertions(+), 15 deletions(-) diff --git a/packages/grafana-data/src/types/config.ts b/packages/grafana-data/src/types/config.ts index 4f6260e1af8..640281cfc90 100644 --- a/packages/grafana-data/src/types/config.ts +++ b/packages/grafana-data/src/types/config.ts @@ -52,6 +52,7 @@ export interface FeatureToggles { recordedQueries: boolean; newNavigation: boolean; fullRangeLogsVolume: boolean; + queryOverLive: boolean; dashboardPreviews: boolean; } diff --git a/packages/grafana-runtime/src/config.ts b/packages/grafana-runtime/src/config.ts index de25fd25d12..8d6ce2ae0d6 100644 --- a/packages/grafana-runtime/src/config.ts +++ b/packages/grafana-runtime/src/config.ts @@ -69,6 +69,7 @@ export class GrafanaBootConfig implements GrafanaConfig { recordedQueries: false, newNavigation: false, fullRangeLogsVolume: false, + queryOverLive: false, dashboardPreviews: false, }; licenseInfo: LicenseInfo = {} as LicenseInfo; diff --git a/packages/grafana-runtime/src/services/live.ts b/packages/grafana-runtime/src/services/live.ts index 0b81e319083..955273f926d 100644 --- a/packages/grafana-runtime/src/services/live.ts +++ b/packages/grafana-runtime/src/services/live.ts @@ -1,5 +1,6 @@ import { - DataFrame, + DataFrameJSON, + DataQueryRequest, DataQueryResponse, LiveChannelAddress, LiveChannelEvent, @@ -38,12 +39,20 @@ export interface StreamingFrameOptions { */ export interface LiveDataStreamOptions { addr: LiveChannelAddress; - frame?: DataFrame; // initial results + frame?: DataFrameJSON; // initial results key?: string; buffer?: Partial; filter?: LiveDataFilter; } +/** + * @alpha -- experimental: send a normal query request over websockt + */ +export interface LiveQueryDataOptions { + request: DataQueryRequest; + body: any; // processed queries, same as sent to `/api/query/ds` +} + /** * @alpha -- experimental */ @@ -63,6 +72,15 @@ export interface GrafanaLiveSrv { */ getDataStream(options: LiveDataStreamOptions): Observable; + /** + * Execute a query over the live websocket and potentiall subscribe to a live channel. + * + * Since the initial request and subscription are on the same socket, this will support HA setups + * + * @alpha -- this function requires the feature toggle `queryOverLive` to be set + */ + getQueryData(options: LiveQueryDataOptions): Observable; + /** * For channels that support presence, this will request the current state from the server. * diff --git a/packages/grafana-runtime/src/utils/DataSourceWithBackend.ts b/packages/grafana-runtime/src/utils/DataSourceWithBackend.ts index 115f545059e..9040bd70796 100644 --- a/packages/grafana-runtime/src/utils/DataSourceWithBackend.ts +++ b/packages/grafana-runtime/src/utils/DataSourceWithBackend.ts @@ -11,6 +11,7 @@ import { parseLiveChannelAddress, getDataSourceRef, DataSourceRef, + dataFrameToJSON, } from '@grafana/data'; import { merge, Observable, of } from 'rxjs'; import { catchError, switchMap } from 'rxjs/operators'; @@ -21,6 +22,7 @@ import { StreamingFrameOptions, StreamingFrameAction, } from '../services'; +import { config } from '../config'; import { BackendDataSourceResponse, toDataQueryResponse } from './queryResponse'; /** @@ -155,6 +157,13 @@ class DataSourceWithBackend< body.to = range.to.valueOf().toString(); } + if (config.featureToggles.queryOverLive) { + return getGrafanaLiveSrv().getQueryData({ + request, + body, + }); + } + return getBackendSrv() .fetch({ url: '/api/ds/query', @@ -271,7 +280,7 @@ export function toStreamingDataResponse( live.getDataStream({ addr, buffer: getter(req, frame), - frame, + frame: dataFrameToJSON(f), }) ); } else { diff --git a/public/app/features/live/centrifuge/LiveDataStream.test.ts b/public/app/features/live/centrifuge/LiveDataStream.test.ts index 08e470fb72c..cd75bfc7a2f 100644 --- a/public/app/features/live/centrifuge/LiveDataStream.test.ts +++ b/public/app/features/live/centrifuge/LiveDataStream.test.ts @@ -1,5 +1,6 @@ import { DataFrameJSON, + dataFrameToJSON, DataQueryResponse, FieldType, LiveChannelAddress, @@ -472,7 +473,7 @@ describe('LiveDataStream', () => { const liveDataStream = new LiveDataStream(deps); const valuesCollection = new ValuesCollection(); - const initialFrame = StreamingDataFrame.fromDataFrameJSON(dataFrameJsons.schema2()); + const initialFrame = dataFrameJsons.schema2(); const observable = liveDataStream.get( { ...liveDataStreamOptions.withTimeBFilter, frame: initialFrame }, subscriptionKey @@ -512,7 +513,7 @@ describe('LiveDataStream', () => { liveDataStream.get( { ...liveDataStreamOptions.withTimeBFilter, - frame: StreamingDataFrame.fromDataFrameJSON(dataFrameJsons.schema1()), + frame: dataFrameToJSON(StreamingDataFrame.fromDataFrameJSON(dataFrameJsons.schema1())), }, subscriptionKey ) @@ -524,7 +525,7 @@ describe('LiveDataStream', () => { liveDataStream.get( { ...liveDataStreamOptions.withTimeBFilter, - frame: StreamingDataFrame.fromDataFrameJSON(dataFrameJsons.schema2()), + frame: dataFrameJsons.schema2(), }, subscriptionKey ) diff --git a/public/app/features/live/centrifuge/LiveDataStream.ts b/public/app/features/live/centrifuge/LiveDataStream.ts index 5dd32976999..3ff4fb44d16 100644 --- a/public/app/features/live/centrifuge/LiveDataStream.ts +++ b/public/app/features/live/centrifuge/LiveDataStream.ts @@ -2,7 +2,6 @@ import type { LiveDataStreamOptions, StreamingFrameOptions } from '@grafana/runt import { toDataQueryError } from '@grafana/runtime/src/utils/toDataQueryError'; import { DataFrameJSON, - dataFrameToJSON, DataQueryError, Field, isLiveChannelMessageEvent, @@ -209,7 +208,7 @@ export class LiveDataStream { private prepareInternalStreamForNewSubscription = (options: LiveDataStreamOptions): void => { if (!this.frameBuffer.hasAtLeastOnePacket() && options.frame) { // will skip initial frames from subsequent subscribers - this.process(dataFrameToJSON(options.frame)); + this.process(options.frame); } }; diff --git a/public/app/features/live/centrifuge/service.ts b/public/app/features/live/centrifuge/service.ts index c003e5f3f54..9c52e87c2ff 100644 --- a/public/app/features/live/centrifuge/service.ts +++ b/public/app/features/live/centrifuge/service.ts @@ -2,11 +2,13 @@ import Centrifuge from 'centrifuge/dist/centrifuge'; import { GrafanaLiveSrv, LiveDataStreamOptions, + LiveQueryDataOptions, StreamingFrameAction, StreamingFrameOptions, } from '@grafana/runtime/src/services/live'; import { BehaviorSubject, Observable, share, startWith } from 'rxjs'; import { + DataQueryError, DataQueryResponse, LiveChannelAddress, LiveChannelConnectionState, @@ -16,6 +18,8 @@ import { import { CentrifugeLiveChannel } from './channel'; import { LiveDataStream } from './LiveDataStream'; import { StreamingResponseData } from '../data/utils'; +import { BackendDataSourceResponse } from '@grafana/runtime/src/utils/queryResponse'; +import { FetchResponse } from '@grafana/runtime/src/services/backendSrv'; export type CentrifugeSrvDeps = { appUrl: string; @@ -28,8 +32,15 @@ export type CentrifugeSrvDeps = { export type StreamingDataQueryResponse = Omit & { data: [StreamingResponseData] }; -export type CentrifugeSrv = Omit & { +export type CentrifugeSrv = Omit & { getDataStream: (options: LiveDataStreamOptions) => Observable; + getQueryData: ( + options: LiveQueryDataOptions + ) => Promise< + | { data: BackendDataSourceResponse | undefined } + | FetchResponse + | DataQueryError + >; }; export type DataStreamSubscriptionKey = string; @@ -53,7 +64,9 @@ export class CentrifugeService implements CentrifugeSrv { constructor(private deps: CentrifugeSrvDeps) { this.dataStreamSubscriberReadiness = deps.dataStreamSubscriberReadiness.pipe(share(), startWith(true)); const liveUrl = `${deps.appUrl.replace(/^http/, 'ws')}/api/live/ws`; - this.centrifuge = new Centrifuge(liveUrl, {}); + this.centrifuge = new Centrifuge(liveUrl, { + timeout: 30000, + }); this.centrifuge.setConnectData({ sessionId: deps.sessionId, orgId: deps.orgId, @@ -125,7 +138,7 @@ export class CentrifugeService implements CentrifugeSrv { this.open.delete(id); }); - // return the not-yet initalized channel + // return the not-yet initialized channel return channel; } @@ -190,6 +203,15 @@ export class CentrifugeService implements CentrifugeSrv { return stream.get(options, subscriptionKey); }; + /** + * Executes a query over the live websocket. Query response can contain live channels we can subscribe to for further updates + * + * Since the initial request and subscription are on the same socket, this will support HA setups + */ + getQueryData: CentrifugeSrv['getQueryData'] = async (options) => { + return this.centrifuge.namedRPC('grafana.query', options.body); + }; + /** * For channels that support presence, this will request the current state from the server. * diff --git a/public/app/features/live/centrifuge/service.worker.ts b/public/app/features/live/centrifuge/service.worker.ts index 6d9a1d3705c..e47681175f3 100644 --- a/public/app/features/live/centrifuge/service.worker.ts +++ b/public/app/features/live/centrifuge/service.worker.ts @@ -3,7 +3,7 @@ import * as comlink from 'comlink'; import './transferHandlers'; import { remoteObservableAsObservable } from './remoteObservable'; import { LiveChannelAddress } from '@grafana/data'; -import { LiveDataStreamOptions } from '@grafana/runtime'; +import { LiveDataStreamOptions, LiveQueryDataOptions } from '@grafana/runtime'; let centrifuge: CentrifugeService; @@ -27,6 +27,10 @@ const getDataStream = (options: LiveDataStreamOptions) => { return comlink.proxy(centrifuge.getDataStream(options)); }; +const getQueryData = async (options: LiveQueryDataOptions) => { + return await centrifuge.getQueryData(options); +}; + const getStream = (address: LiveChannelAddress) => { return comlink.proxy(centrifuge.getStream(address)); }; @@ -40,6 +44,7 @@ const workObj = { getConnectionState, getDataStream, getStream, + getQueryData, getPresence, }; diff --git a/public/app/features/live/centrifuge/serviceWorkerProxy.ts b/public/app/features/live/centrifuge/serviceWorkerProxy.ts index ac5290796da..c53d2e49507 100644 --- a/public/app/features/live/centrifuge/serviceWorkerProxy.ts +++ b/public/app/features/live/centrifuge/serviceWorkerProxy.ts @@ -28,6 +28,14 @@ export class CentrifugeServiceWorkerProxy implements CentrifugeSrv { ); }; + /** + * Query over websocket + */ + getQueryData: CentrifugeSrv['getQueryData'] = async (options) => { + const optionsAsPlainSerializableObject = JSON.parse(JSON.stringify(options)); + return this.centrifugeWorker.getQueryData(optionsAsPlainSerializableObject); + }; + getPresence: CentrifugeSrv['getPresence'] = (address) => { return this.centrifugeWorker.getPresence(address); }; diff --git a/public/app/features/live/live.ts b/public/app/features/live/live.ts index 77407ff10f6..d541a282500 100644 --- a/public/app/features/live/live.ts +++ b/public/app/features/live/live.ts @@ -1,10 +1,14 @@ -import { BackendSrv, GrafanaLiveSrv } from '@grafana/runtime'; +import { BackendSrv, GrafanaLiveSrv, toDataQueryResponse } from '@grafana/runtime'; import { CentrifugeSrv, StreamingDataQueryResponse } from './centrifuge/service'; -import { toLiveChannelId } from '@grafana/data'; +import { DataFrame, toLiveChannelId } from '@grafana/data'; import { StreamingDataFrame } from './data/StreamingDataFrame'; import { isStreamingResponseData, StreamingResponseDataType } from './data/utils'; -import { map } from 'rxjs'; +import { from, map, of, switchMap } from 'rxjs'; +import { + standardStreamOptionsProvider, + toStreamingDataResponse, +} from '@grafana/runtime/src/utils/DataSourceWithBackend'; type GrafanaLiveServiceDeps = { centrifugeSrv: CentrifugeSrv; @@ -64,6 +68,26 @@ export class GrafanaLiveService implements GrafanaLiveSrv { return this.deps.centrifugeSrv.getStream(address); }; + /** + * Execute a query over the live websocket and potentially subscribe to a live channel. + * + * Since the initial request and subscription are on the same socket, this will support HA setups + */ + getQueryData: GrafanaLiveSrv['getQueryData'] = (options) => { + return from(this.deps.centrifugeSrv.getQueryData(options)).pipe( + switchMap((rawResponse) => { + const parsedResponse = toDataQueryResponse(rawResponse, options.request.targets); + + const isSubscribable = + parsedResponse.data?.length && parsedResponse.data.find((f: DataFrame) => f.meta?.channel); + + return isSubscribable + ? toStreamingDataResponse(parsedResponse, options.request, standardStreamOptionsProvider) + : of(parsedResponse); + }) + ); + }; + /** * Publish into a channel *