Live: optionally send queries over websocket connection (#41653)

Co-authored-by: ArturWierzbicki <artur.wierzbicki@grafana.com>
Co-authored-by: Ryan McKinley <ryantxu@gmail.com>
This commit is contained in:
Alexander Emelin 2022-01-05 19:02:12 +03:00 committed by GitHub
parent bfecbdc0bd
commit b4204628e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 103 additions and 15 deletions

View File

@ -52,6 +52,7 @@ export interface FeatureToggles {
recordedQueries: boolean; recordedQueries: boolean;
newNavigation: boolean; newNavigation: boolean;
fullRangeLogsVolume: boolean; fullRangeLogsVolume: boolean;
queryOverLive: boolean;
dashboardPreviews: boolean; dashboardPreviews: boolean;
} }

View File

@ -69,6 +69,7 @@ export class GrafanaBootConfig implements GrafanaConfig {
recordedQueries: false, recordedQueries: false,
newNavigation: false, newNavigation: false,
fullRangeLogsVolume: false, fullRangeLogsVolume: false,
queryOverLive: false,
dashboardPreviews: false, dashboardPreviews: false,
}; };
licenseInfo: LicenseInfo = {} as LicenseInfo; licenseInfo: LicenseInfo = {} as LicenseInfo;

View File

@ -1,5 +1,6 @@
import { import {
DataFrame, DataFrameJSON,
DataQueryRequest,
DataQueryResponse, DataQueryResponse,
LiveChannelAddress, LiveChannelAddress,
LiveChannelEvent, LiveChannelEvent,
@ -38,12 +39,20 @@ export interface StreamingFrameOptions {
*/ */
export interface LiveDataStreamOptions { export interface LiveDataStreamOptions {
addr: LiveChannelAddress; addr: LiveChannelAddress;
frame?: DataFrame; // initial results frame?: DataFrameJSON; // initial results
key?: string; key?: string;
buffer?: Partial<StreamingFrameOptions>; buffer?: Partial<StreamingFrameOptions>;
filter?: LiveDataFilter; filter?: LiveDataFilter;
} }
/**
* @alpha -- experimental: send a normal query request over websockt
*/
export interface LiveQueryDataOptions {
request: DataQueryRequest;
body: any; // processed queries, same as sent to `/api/query/ds`
}
/** /**
* @alpha -- experimental * @alpha -- experimental
*/ */
@ -63,6 +72,15 @@ export interface GrafanaLiveSrv {
*/ */
getDataStream(options: LiveDataStreamOptions): Observable<DataQueryResponse>; getDataStream(options: LiveDataStreamOptions): Observable<DataQueryResponse>;
/**
* Execute a query over the live websocket and potentiall subscribe to a live channel.
*
* Since the initial request and subscription are on the same socket, this will support HA setups
*
* @alpha -- this function requires the feature toggle `queryOverLive` to be set
*/
getQueryData(options: LiveQueryDataOptions): Observable<DataQueryResponse>;
/** /**
* For channels that support presence, this will request the current state from the server. * For channels that support presence, this will request the current state from the server.
* *

View File

@ -11,6 +11,7 @@ import {
parseLiveChannelAddress, parseLiveChannelAddress,
getDataSourceRef, getDataSourceRef,
DataSourceRef, DataSourceRef,
dataFrameToJSON,
} from '@grafana/data'; } from '@grafana/data';
import { merge, Observable, of } from 'rxjs'; import { merge, Observable, of } from 'rxjs';
import { catchError, switchMap } from 'rxjs/operators'; import { catchError, switchMap } from 'rxjs/operators';
@ -21,6 +22,7 @@ import {
StreamingFrameOptions, StreamingFrameOptions,
StreamingFrameAction, StreamingFrameAction,
} from '../services'; } from '../services';
import { config } from '../config';
import { BackendDataSourceResponse, toDataQueryResponse } from './queryResponse'; import { BackendDataSourceResponse, toDataQueryResponse } from './queryResponse';
/** /**
@ -155,6 +157,13 @@ class DataSourceWithBackend<
body.to = range.to.valueOf().toString(); body.to = range.to.valueOf().toString();
} }
if (config.featureToggles.queryOverLive) {
return getGrafanaLiveSrv().getQueryData({
request,
body,
});
}
return getBackendSrv() return getBackendSrv()
.fetch<BackendDataSourceResponse>({ .fetch<BackendDataSourceResponse>({
url: '/api/ds/query', url: '/api/ds/query',
@ -271,7 +280,7 @@ export function toStreamingDataResponse<TQuery extends DataQuery = DataQuery>(
live.getDataStream({ live.getDataStream({
addr, addr,
buffer: getter(req, frame), buffer: getter(req, frame),
frame, frame: dataFrameToJSON(f),
}) })
); );
} else { } else {

View File

@ -1,5 +1,6 @@
import { import {
DataFrameJSON, DataFrameJSON,
dataFrameToJSON,
DataQueryResponse, DataQueryResponse,
FieldType, FieldType,
LiveChannelAddress, LiveChannelAddress,
@ -472,7 +473,7 @@ describe('LiveDataStream', () => {
const liveDataStream = new LiveDataStream(deps); const liveDataStream = new LiveDataStream(deps);
const valuesCollection = new ValuesCollection<DataQueryResponse>(); const valuesCollection = new ValuesCollection<DataQueryResponse>();
const initialFrame = StreamingDataFrame.fromDataFrameJSON(dataFrameJsons.schema2()); const initialFrame = dataFrameJsons.schema2();
const observable = liveDataStream.get( const observable = liveDataStream.get(
{ ...liveDataStreamOptions.withTimeBFilter, frame: initialFrame }, { ...liveDataStreamOptions.withTimeBFilter, frame: initialFrame },
subscriptionKey subscriptionKey
@ -512,7 +513,7 @@ describe('LiveDataStream', () => {
liveDataStream.get( liveDataStream.get(
{ {
...liveDataStreamOptions.withTimeBFilter, ...liveDataStreamOptions.withTimeBFilter,
frame: StreamingDataFrame.fromDataFrameJSON(dataFrameJsons.schema1()), frame: dataFrameToJSON(StreamingDataFrame.fromDataFrameJSON(dataFrameJsons.schema1())),
}, },
subscriptionKey subscriptionKey
) )
@ -524,7 +525,7 @@ describe('LiveDataStream', () => {
liveDataStream.get( liveDataStream.get(
{ {
...liveDataStreamOptions.withTimeBFilter, ...liveDataStreamOptions.withTimeBFilter,
frame: StreamingDataFrame.fromDataFrameJSON(dataFrameJsons.schema2()), frame: dataFrameJsons.schema2(),
}, },
subscriptionKey subscriptionKey
) )

View File

@ -2,7 +2,6 @@ import type { LiveDataStreamOptions, StreamingFrameOptions } from '@grafana/runt
import { toDataQueryError } from '@grafana/runtime/src/utils/toDataQueryError'; import { toDataQueryError } from '@grafana/runtime/src/utils/toDataQueryError';
import { import {
DataFrameJSON, DataFrameJSON,
dataFrameToJSON,
DataQueryError, DataQueryError,
Field, Field,
isLiveChannelMessageEvent, isLiveChannelMessageEvent,
@ -209,7 +208,7 @@ export class LiveDataStream<T = unknown> {
private prepareInternalStreamForNewSubscription = (options: LiveDataStreamOptions): void => { private prepareInternalStreamForNewSubscription = (options: LiveDataStreamOptions): void => {
if (!this.frameBuffer.hasAtLeastOnePacket() && options.frame) { if (!this.frameBuffer.hasAtLeastOnePacket() && options.frame) {
// will skip initial frames from subsequent subscribers // will skip initial frames from subsequent subscribers
this.process(dataFrameToJSON(options.frame)); this.process(options.frame);
} }
}; };

View File

@ -2,11 +2,13 @@ import Centrifuge from 'centrifuge/dist/centrifuge';
import { import {
GrafanaLiveSrv, GrafanaLiveSrv,
LiveDataStreamOptions, LiveDataStreamOptions,
LiveQueryDataOptions,
StreamingFrameAction, StreamingFrameAction,
StreamingFrameOptions, StreamingFrameOptions,
} from '@grafana/runtime/src/services/live'; } from '@grafana/runtime/src/services/live';
import { BehaviorSubject, Observable, share, startWith } from 'rxjs'; import { BehaviorSubject, Observable, share, startWith } from 'rxjs';
import { import {
DataQueryError,
DataQueryResponse, DataQueryResponse,
LiveChannelAddress, LiveChannelAddress,
LiveChannelConnectionState, LiveChannelConnectionState,
@ -16,6 +18,8 @@ import {
import { CentrifugeLiveChannel } from './channel'; import { CentrifugeLiveChannel } from './channel';
import { LiveDataStream } from './LiveDataStream'; import { LiveDataStream } from './LiveDataStream';
import { StreamingResponseData } from '../data/utils'; import { StreamingResponseData } from '../data/utils';
import { BackendDataSourceResponse } from '@grafana/runtime/src/utils/queryResponse';
import { FetchResponse } from '@grafana/runtime/src/services/backendSrv';
export type CentrifugeSrvDeps = { export type CentrifugeSrvDeps = {
appUrl: string; appUrl: string;
@ -28,8 +32,15 @@ export type CentrifugeSrvDeps = {
export type StreamingDataQueryResponse = Omit<DataQueryResponse, 'data'> & { data: [StreamingResponseData] }; export type StreamingDataQueryResponse = Omit<DataQueryResponse, 'data'> & { data: [StreamingResponseData] };
export type CentrifugeSrv = Omit<GrafanaLiveSrv, 'publish' | 'getDataStream'> & { export type CentrifugeSrv = Omit<GrafanaLiveSrv, 'publish' | 'getDataStream' | 'getQueryData'> & {
getDataStream: (options: LiveDataStreamOptions) => Observable<StreamingDataQueryResponse>; getDataStream: (options: LiveDataStreamOptions) => Observable<StreamingDataQueryResponse>;
getQueryData: (
options: LiveQueryDataOptions
) => Promise<
| { data: BackendDataSourceResponse | undefined }
| FetchResponse<BackendDataSourceResponse | undefined>
| DataQueryError
>;
}; };
export type DataStreamSubscriptionKey = string; export type DataStreamSubscriptionKey = string;
@ -53,7 +64,9 @@ export class CentrifugeService implements CentrifugeSrv {
constructor(private deps: CentrifugeSrvDeps) { constructor(private deps: CentrifugeSrvDeps) {
this.dataStreamSubscriberReadiness = deps.dataStreamSubscriberReadiness.pipe(share(), startWith(true)); this.dataStreamSubscriberReadiness = deps.dataStreamSubscriberReadiness.pipe(share(), startWith(true));
const liveUrl = `${deps.appUrl.replace(/^http/, 'ws')}/api/live/ws`; const liveUrl = `${deps.appUrl.replace(/^http/, 'ws')}/api/live/ws`;
this.centrifuge = new Centrifuge(liveUrl, {}); this.centrifuge = new Centrifuge(liveUrl, {
timeout: 30000,
});
this.centrifuge.setConnectData({ this.centrifuge.setConnectData({
sessionId: deps.sessionId, sessionId: deps.sessionId,
orgId: deps.orgId, orgId: deps.orgId,
@ -125,7 +138,7 @@ export class CentrifugeService implements CentrifugeSrv {
this.open.delete(id); this.open.delete(id);
}); });
// return the not-yet initalized channel // return the not-yet initialized channel
return channel; return channel;
} }
@ -190,6 +203,15 @@ export class CentrifugeService implements CentrifugeSrv {
return stream.get(options, subscriptionKey); return stream.get(options, subscriptionKey);
}; };
/**
* Executes a query over the live websocket. Query response can contain live channels we can subscribe to for further updates
*
* Since the initial request and subscription are on the same socket, this will support HA setups
*/
getQueryData: CentrifugeSrv['getQueryData'] = async (options) => {
return this.centrifuge.namedRPC('grafana.query', options.body);
};
/** /**
* For channels that support presence, this will request the current state from the server. * For channels that support presence, this will request the current state from the server.
* *

View File

@ -3,7 +3,7 @@ import * as comlink from 'comlink';
import './transferHandlers'; import './transferHandlers';
import { remoteObservableAsObservable } from './remoteObservable'; import { remoteObservableAsObservable } from './remoteObservable';
import { LiveChannelAddress } from '@grafana/data'; import { LiveChannelAddress } from '@grafana/data';
import { LiveDataStreamOptions } from '@grafana/runtime'; import { LiveDataStreamOptions, LiveQueryDataOptions } from '@grafana/runtime';
let centrifuge: CentrifugeService; let centrifuge: CentrifugeService;
@ -27,6 +27,10 @@ const getDataStream = (options: LiveDataStreamOptions) => {
return comlink.proxy(centrifuge.getDataStream(options)); return comlink.proxy(centrifuge.getDataStream(options));
}; };
const getQueryData = async (options: LiveQueryDataOptions) => {
return await centrifuge.getQueryData(options);
};
const getStream = (address: LiveChannelAddress) => { const getStream = (address: LiveChannelAddress) => {
return comlink.proxy(centrifuge.getStream(address)); return comlink.proxy(centrifuge.getStream(address));
}; };
@ -40,6 +44,7 @@ const workObj = {
getConnectionState, getConnectionState,
getDataStream, getDataStream,
getStream, getStream,
getQueryData,
getPresence, getPresence,
}; };

View File

@ -28,6 +28,14 @@ export class CentrifugeServiceWorkerProxy implements CentrifugeSrv {
); );
}; };
/**
* Query over websocket
*/
getQueryData: CentrifugeSrv['getQueryData'] = async (options) => {
const optionsAsPlainSerializableObject = JSON.parse(JSON.stringify(options));
return this.centrifugeWorker.getQueryData(optionsAsPlainSerializableObject);
};
getPresence: CentrifugeSrv['getPresence'] = (address) => { getPresence: CentrifugeSrv['getPresence'] = (address) => {
return this.centrifugeWorker.getPresence(address); return this.centrifugeWorker.getPresence(address);
}; };

View File

@ -1,10 +1,14 @@
import { BackendSrv, GrafanaLiveSrv } from '@grafana/runtime'; import { BackendSrv, GrafanaLiveSrv, toDataQueryResponse } from '@grafana/runtime';
import { CentrifugeSrv, StreamingDataQueryResponse } from './centrifuge/service'; import { CentrifugeSrv, StreamingDataQueryResponse } from './centrifuge/service';
import { toLiveChannelId } from '@grafana/data'; import { DataFrame, toLiveChannelId } from '@grafana/data';
import { StreamingDataFrame } from './data/StreamingDataFrame'; import { StreamingDataFrame } from './data/StreamingDataFrame';
import { isStreamingResponseData, StreamingResponseDataType } from './data/utils'; import { isStreamingResponseData, StreamingResponseDataType } from './data/utils';
import { map } from 'rxjs'; import { from, map, of, switchMap } from 'rxjs';
import {
standardStreamOptionsProvider,
toStreamingDataResponse,
} from '@grafana/runtime/src/utils/DataSourceWithBackend';
type GrafanaLiveServiceDeps = { type GrafanaLiveServiceDeps = {
centrifugeSrv: CentrifugeSrv; centrifugeSrv: CentrifugeSrv;
@ -64,6 +68,26 @@ export class GrafanaLiveService implements GrafanaLiveSrv {
return this.deps.centrifugeSrv.getStream(address); return this.deps.centrifugeSrv.getStream(address);
}; };
/**
* Execute a query over the live websocket and potentially subscribe to a live channel.
*
* Since the initial request and subscription are on the same socket, this will support HA setups
*/
getQueryData: GrafanaLiveSrv['getQueryData'] = (options) => {
return from(this.deps.centrifugeSrv.getQueryData(options)).pipe(
switchMap((rawResponse) => {
const parsedResponse = toDataQueryResponse(rawResponse, options.request.targets);
const isSubscribable =
parsedResponse.data?.length && parsedResponse.data.find((f: DataFrame) => f.meta?.channel);
return isSubscribable
? toStreamingDataResponse(parsedResponse, options.request, standardStreamOptionsProvider)
: of(parsedResponse);
})
);
};
/** /**
* Publish into a channel * Publish into a channel
* *