From 84ae13fe5dfcd4f8d0c25b9e5c9a0f6d60bda090 Mon Sep 17 00:00:00 2001 From: Artur Wierzbicki Date: Wed, 15 Dec 2021 23:01:55 +0400 Subject: [PATCH] Live: keep stream history on 'refresh' (#41492) Co-authored-by: Ryan McKinley --- packages/grafana-data/src/dataframe/index.ts | 1 - .../src/dataframe/processDataFrame.ts | 7 +- packages/grafana-data/src/types/live.ts | 7 +- packages/grafana-runtime/src/services/live.ts | 22 +- .../src/utils/DataSourceWithBackend.ts | 20 +- .../live/centrifuge/LiveDataStream.test.ts | 777 ++++++++++++++++++ .../live/centrifuge/LiveDataStream.ts | 325 ++++++++ .../app/features/live/centrifuge/channel.ts | 9 +- .../app/features/live/centrifuge/service.ts | 202 ++--- .../live/data}/StreamingDataFrame.test.ts | 315 ++++++- .../features/live/data}/StreamingDataFrame.ts | 283 +++++-- public/app/features/live/data/utils.ts | 55 ++ public/app/features/live/live.test.ts | 148 ++++ public/app/features/live/live.ts | 69 +- public/app/features/live/pages/RuleTest.tsx | 4 +- .../features/query/state/PanelQueryRunner.ts | 56 +- public/app/features/query/state/runRequest.ts | 32 +- .../plugins/datasource/grafana/datasource.ts | 11 +- .../plugins/datasource/testdata/runStreams.ts | 4 +- public/app/plugins/panel/live/LivePanel.tsx | 2 +- .../state-timeline/StateTimelinePanel.tsx | 2 +- 21 files changed, 2060 insertions(+), 291 deletions(-) create mode 100644 public/app/features/live/centrifuge/LiveDataStream.test.ts create mode 100644 public/app/features/live/centrifuge/LiveDataStream.ts rename {packages/grafana-data/src/dataframe => public/app/features/live/data}/StreamingDataFrame.test.ts (57%) rename {packages/grafana-data/src/dataframe => public/app/features/live/data}/StreamingDataFrame.ts (53%) create mode 100644 public/app/features/live/data/utils.ts create mode 100644 public/app/features/live/live.test.ts diff --git a/packages/grafana-data/src/dataframe/index.ts b/packages/grafana-data/src/dataframe/index.ts index 1839b8af10f..3267885b2c0 100644 --- a/packages/grafana-data/src/dataframe/index.ts +++ b/packages/grafana-data/src/dataframe/index.ts @@ -6,6 +6,5 @@ export * from './processDataFrame'; export * from './dimensions'; export * from './ArrayDataFrame'; export * from './DataFrameJSON'; -export { StreamingDataFrame, StreamingFrameOptions, StreamingFrameAction } from './StreamingDataFrame'; export * from './frameComparisons'; export { anySeriesWithTimeField } from './utils'; diff --git a/packages/grafana-data/src/dataframe/processDataFrame.ts b/packages/grafana-data/src/dataframe/processDataFrame.ts index 4f19314cb61..954e6f29780 100644 --- a/packages/grafana-data/src/dataframe/processDataFrame.ts +++ b/packages/grafana-data/src/dataframe/processDataFrame.ts @@ -470,7 +470,12 @@ export function getDataFrameRow(data: DataFrame, row: number): any[] { * Returns a copy that does not include functions */ export function toDataFrameDTO(data: DataFrame): DataFrameDTO { - const fields: FieldDTO[] = data.fields.map((f) => { + return toFilteredDataFrameDTO(data); +} + +export function toFilteredDataFrameDTO(data: DataFrame, fieldPredicate?: (f: Field) => boolean): DataFrameDTO { + const filteredFields = fieldPredicate ? data.fields.filter(fieldPredicate) : data.fields; + const fields: FieldDTO[] = filteredFields.map((f) => { let values = f.values.toArray(); // The byte buffers serialize like objects if (values instanceof Float64Array) { diff --git a/packages/grafana-data/src/types/live.ts b/packages/grafana-data/src/types/live.ts index e4ad453d5bb..f4cb0618e33 100644 --- a/packages/grafana-data/src/types/live.ts +++ b/packages/grafana-data/src/types/live.ts @@ -125,6 +125,11 @@ export interface LiveChannelPresenceStatus { users: any; // @alpha -- experimental -- will be filled in when we improve the UI } +/** + * @alpha -- experimental + */ +export type LiveChannelId = string; + /** * @alpha -- experimental */ @@ -174,7 +179,7 @@ export function isValidLiveChannelAddress(addr?: LiveChannelAddress): addr is Li * * @alpha -- experimental */ -export function toLiveChannelId(addr: LiveChannelAddress): string { +export function toLiveChannelId(addr: LiveChannelAddress): LiveChannelId { if (!addr.scope) { return ''; } diff --git a/packages/grafana-runtime/src/services/live.ts b/packages/grafana-runtime/src/services/live.ts index 199eb3d17af..0b81e319083 100644 --- a/packages/grafana-runtime/src/services/live.ts +++ b/packages/grafana-runtime/src/services/live.ts @@ -4,7 +4,6 @@ import { LiveChannelAddress, LiveChannelEvent, LiveChannelPresenceStatus, - StreamingFrameOptions, } from '@grafana/data'; import { Observable } from 'rxjs'; @@ -15,6 +14,25 @@ export interface LiveDataFilter { fields?: string[]; } +/** + * Indicate if the frame is appened or replace + * + * @alpha + */ +export enum StreamingFrameAction { + Append = 'append', + Replace = 'replace', +} + +/** + * @alpha + */ +export interface StreamingFrameOptions { + maxLength: number; // 1000 + maxDelta: number; // how long to keep things + action: StreamingFrameAction; // default will append +} + /** * @alpha */ @@ -22,7 +40,7 @@ export interface LiveDataStreamOptions { addr: LiveChannelAddress; frame?: DataFrame; // initial results key?: string; - buffer?: StreamingFrameOptions; + buffer?: Partial; filter?: LiveDataFilter; } diff --git a/packages/grafana-runtime/src/utils/DataSourceWithBackend.ts b/packages/grafana-runtime/src/utils/DataSourceWithBackend.ts index 844e2a7ee0c..115f545059e 100644 --- a/packages/grafana-runtime/src/utils/DataSourceWithBackend.ts +++ b/packages/grafana-runtime/src/utils/DataSourceWithBackend.ts @@ -9,14 +9,18 @@ import { makeClassES5Compatible, DataFrame, parseLiveChannelAddress, - StreamingFrameOptions, - StreamingFrameAction, getDataSourceRef, DataSourceRef, } from '@grafana/data'; import { merge, Observable, of } from 'rxjs'; import { catchError, switchMap } from 'rxjs/operators'; -import { getBackendSrv, getDataSourceSrv, getGrafanaLiveSrv } from '../services'; +import { + getBackendSrv, + getDataSourceSrv, + getGrafanaLiveSrv, + StreamingFrameOptions, + StreamingFrameAction, +} from '../services'; import { BackendDataSourceResponse, toDataQueryResponse } from './queryResponse'; /** @@ -250,7 +254,7 @@ class DataSourceWithBackend< export function toStreamingDataResponse( rsp: DataQueryResponse, req: DataQueryRequest, - getter: (req: DataQueryRequest, frame: DataFrame) => StreamingFrameOptions + getter: (req: DataQueryRequest, frame: DataFrame) => Partial ): Observable { const live = getGrafanaLiveSrv(); if (!live) { @@ -291,22 +295,22 @@ export function toStreamingDataResponse( export type StreamOptionsProvider = ( request: DataQueryRequest, frame: DataFrame -) => StreamingFrameOptions; +) => Partial; /** * @public */ export const standardStreamOptionsProvider: StreamOptionsProvider = (request: DataQueryRequest, frame: DataFrame) => { - const buffer: StreamingFrameOptions = { + const opts: Partial = { maxLength: request.maxDataPoints ?? 500, action: StreamingFrameAction.Append, }; // For recent queries, clamp to the current time range if (request.rangeRaw?.to === 'now') { - buffer.maxDelta = request.range.to.valueOf() - request.range.from.valueOf(); + opts.maxDelta = request.range.to.valueOf() - request.range.from.valueOf(); } - return buffer; + return opts; }; //@ts-ignore diff --git a/public/app/features/live/centrifuge/LiveDataStream.test.ts b/public/app/features/live/centrifuge/LiveDataStream.test.ts new file mode 100644 index 00000000000..08e470fb72c --- /dev/null +++ b/public/app/features/live/centrifuge/LiveDataStream.test.ts @@ -0,0 +1,777 @@ +import { + DataFrameJSON, + DataQueryResponse, + FieldType, + LiveChannelAddress, + LiveChannelConnectionState, + LiveChannelEvent, + LiveChannelEventType, + LiveChannelLeaveEvent, + LiveChannelScope, + LoadingState, +} from '@grafana/data'; +import { Observable, Subject, Subscription, Unsubscribable } from 'rxjs'; +import { DataStreamHandlerDeps, LiveDataStream } from './LiveDataStream'; +import { mapValues } from 'lodash'; +import { StreamingFrameAction } from '@grafana/runtime'; +import { isStreamingResponseData, StreamingResponseData, StreamingResponseDataType } from '../data/utils'; +import { StreamingDataFrame } from '../data/StreamingDataFrame'; + +type SubjectsInsteadOfObservables = { + [key in keyof T]: T[key] extends Observable ? Subject : T[key]; +}; + +type DepsWithSubjectsInsteadOfObservables = SubjectsInsteadOfObservables>; + +const createDeps = ( + overrides?: Partial> +): DepsWithSubjectsInsteadOfObservables => { + return { + channelId: 'channel-1', + liveEventsObservable: new Subject(), + onShutdown: jest.fn(), + subscriberReadiness: new Subject(), + defaultStreamingFrameOptions: { maxLength: 100, maxDelta: Infinity, action: StreamingFrameAction.Append }, + shutdownDelayInMs: 1000, + ...(overrides ?? {}), + }; +}; + +class ValuesCollection implements Unsubscribable { + values: T[] = []; + errors: any[] = []; + receivedComplete = false; + subscription: Subscription | undefined; + + valuesCount = () => this.values.length; + + subscribeTo = (obs: Observable) => { + if (this.subscription) { + throw new Error(`can't subscribe twice!`); + } + + this.subscription = obs.subscribe({ + next: (n) => { + this.values.push(n); + }, + error: (err) => { + this.errors.push(err); + }, + complete: () => { + this.receivedComplete = true; + }, + }); + }; + + get complete() { + return this.receivedComplete || this.subscription?.closed; + } + + unsubscribe = () => { + this.subscription?.unsubscribe(); + }; + + lastValue = () => { + if (!this.values.length) { + throw new Error(`no values available in ${JSON.stringify(this)}`); + } + + return this.values[this.values.length - 1]; + }; + + lastError = () => { + if (!this.errors.length) { + throw new Error(`no errors available in ${JSON.stringify(this)}`); + } + + return this.errors[this.errors.length - 1]; + }; +} + +const liveChannelMessageEvent = (message: T): LiveChannelEvent => ({ + type: LiveChannelEventType.Message, + message, +}); + +const liveChannelLeaveEvent = (): LiveChannelLeaveEvent => ({ + type: LiveChannelEventType.Leave, + user: '', +}); + +const liveChannelStatusEvent = (state: LiveChannelConnectionState, error?: Error): LiveChannelEvent => ({ + type: LiveChannelEventType.Status, + state, + error, + id: '', + timestamp: 1, +}); + +const fieldsOf = (data: StreamingResponseData) => { + return data.frame.fields.map((f) => ({ + name: f.name, + values: f.values, + })); +}; + +const dummyErrorMessage = 'dummy-error'; + +describe('LiveDataStream', () => { + jest.useFakeTimers(); + + const expectValueCollectionState = ( + valuesCollection: ValuesCollection, + state: { errors: number; values: number; complete: boolean } + ) => { + expect(valuesCollection.values).toHaveLength(state.values); + expect(valuesCollection.errors).toHaveLength(state.errors); + expect(valuesCollection.complete).toEqual(state.complete); + }; + + const expectResponse = (state: LoadingState) => ( + res: DataQueryResponse, + streamingDataType: T + ) => { + expect(res.state).toEqual(state); + + expect(res.data).toHaveLength(1); + + const firstData = res.data[0]; + expect(isStreamingResponseData(firstData, streamingDataType)).toEqual(true); + }; + + const expectStreamingResponse = expectResponse(LoadingState.Streaming); + const expectErrorResponse = expectResponse(LoadingState.Error); + + const dummyLiveChannelAddress: LiveChannelAddress = { + scope: LiveChannelScope.Grafana, + namespace: 'stream', + path: 'abc', + }; + + const subscriptionKey = 'subKey'; + + const liveDataStreamOptions = { + withTimeBFilter: { + addr: dummyLiveChannelAddress, + buffer: { + maxLength: 2, + maxDelta: 10, + action: StreamingFrameAction.Append, + }, + filter: { + fields: ['time', 'b'], + }, + }, + withTimeAFilter: { + addr: dummyLiveChannelAddress, + buffer: { + maxLength: 3, + maxDelta: 10, + action: StreamingFrameAction.Append, + }, + filter: { + fields: ['time', 'a'], + }, + }, + withoutFilter: { + addr: dummyLiveChannelAddress, + buffer: { + maxLength: 4, + maxDelta: 10, + action: StreamingFrameAction.Append, + }, + }, + }; + + const dataFrameJsons = { + schema1: () => ({ + schema: { + fields: [ + { name: 'time', type: FieldType.time }, + { name: 'a', type: FieldType.string }, + { name: 'b', type: FieldType.number }, + ], + }, + data: { + values: [ + [100, 101], + ['a', 'b'], + [1, 2], + ], + }, + }), + schema1newValues: () => ({ + data: { + values: [[102], ['c'], [3]], + }, + }), + schema2: () => ({ + schema: { + fields: [ + { name: 'time', type: FieldType.time }, + { name: 'a', type: FieldType.string }, + { name: 'b', type: FieldType.string }, + ], + }, + data: { + values: [[103], ['x'], ['y']], + }, + }), + schema2newValues: () => ({ + data: { + values: [[104], ['w'], ['o']], + }, + }), + }; + + describe('happy path with a single subscriber', () => { + let deps: ReturnType; + let liveDataStream: LiveDataStream; + const valuesCollection = new ValuesCollection(); + + beforeAll(() => { + deps = createDeps(); + + expect(deps.liveEventsObservable.observed).toBeFalsy(); + expect(deps.subscriberReadiness.observed).toBeFalsy(); + liveDataStream = new LiveDataStream(deps); + }); + + it('should subscribe to live events observable immediately after creation', async () => { + expect(deps.liveEventsObservable.observed).toBeTruthy(); + }); + + it('should not subscribe to subscriberReadiness observable until first subscription', async () => { + expect(deps.subscriberReadiness.observed).toBeFalsy(); + }); + + it('should subscribe to subscriberReadiness observable on first subscription and return observable without any values', async () => { + const observable = liveDataStream.get(liveDataStreamOptions.withTimeBFilter, subscriptionKey); + valuesCollection.subscribeTo(observable); + + //then + expect(deps.subscriberReadiness.observed).toBeTruthy(); + expectValueCollectionState(valuesCollection, { errors: 0, values: 0, complete: false }); + }); + + it('should emit the first live channel message event as a serialized streamingDataFrame', async () => { + const valuesCount = valuesCollection.valuesCount(); + + deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1())); + + expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount + 1, complete: false }); + const response = valuesCollection.lastValue(); + + expectStreamingResponse(response, StreamingResponseDataType.FullFrame); + const data = response.data[0] as StreamingResponseData; + + expect(data.frame.options).toEqual(liveDataStreamOptions.withTimeBFilter.buffer); + + const deserializedFrame = StreamingDataFrame.deserialize(data.frame); + expect(deserializedFrame.fields).toEqual([ + { + config: {}, + name: 'time', + type: 'time', + values: { + buffer: [100, 101], + }, + }, + { + config: {}, + name: 'b', + type: 'number', + values: { + buffer: [1, 2], + }, + }, + ]); + expect(deserializedFrame.length).toEqual(dataFrameJsons.schema1().data.values[0].length); + }); + + it('should emit subsequent messages as deltas if the schema stays the same', async () => { + const valuesCount = valuesCollection.valuesCount(); + + deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1newValues())); + + expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount + 1, complete: false }); + const response = valuesCollection.lastValue(); + + expectStreamingResponse(response, StreamingResponseDataType.NewValuesSameSchema); + const data = response.data[0] as StreamingResponseData; + + expect(data.values).toEqual([[102], [3]]); + }); + + it('should emit a full frame if schema changes', async () => { + const valuesCount = valuesCollection.valuesCount(); + deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema2())); + + expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount + 1, complete: false }); + const response = valuesCollection.lastValue(); + + expectStreamingResponse(response, StreamingResponseDataType.FullFrame); + const data = response.data[0] as StreamingResponseData; + + expect(fieldsOf(data)).toEqual([ + { + name: 'time', + values: [102, 103], + }, + { + name: 'b', + values: [undefined, 'y'], // bug in streamingDataFrame - fix! + }, + ]); + }); + + it('should emit a full frame if received a status live channel event with error', async () => { + const valuesCount = valuesCollection.valuesCount(); + + const error = new Error(`oh no!`); + deps.liveEventsObservable.next(liveChannelStatusEvent(LiveChannelConnectionState.Connected, error)); + + expectValueCollectionState(valuesCollection, { + errors: 0, + values: valuesCount + 1, + complete: false, + }); + const response = valuesCollection.lastValue(); + + expectErrorResponse(response, StreamingResponseDataType.FullFrame); + }); + + it('should buffer new values until subscriber is ready', async () => { + const valuesCount = valuesCollection.valuesCount(); + + deps.subscriberReadiness.next(false); + deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema2newValues())); + expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false }); + + deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema2newValues())); + expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false }); + + deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema2newValues())); + expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false }); + + deps.subscriberReadiness.next(true); + expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount + 1, complete: false }); + + const response = valuesCollection.lastValue(); + + expectStreamingResponse(response, StreamingResponseDataType.NewValuesSameSchema); + const data = response.data[0] as StreamingResponseData; + + expect(data.values).toEqual([ + [104, 104, 104], + ['o', 'o', 'o'], + ]); + }); + + it(`should reduce buffer to a full frame if schema changed at any point during subscriber's unavailability`, async () => { + const valuesCount = valuesCollection.valuesCount(); + + deps.subscriberReadiness.next(false); + + deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema2newValues())); + expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false }); + + deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema2newValues())); + expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false }); + + deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1())); + expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false }); + + deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1newValues())); + expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false }); + + deps.subscriberReadiness.next(true); + expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount + 1, complete: false }); + + const response = valuesCollection.lastValue(); + + expectStreamingResponse(response, StreamingResponseDataType.FullFrame); + expect(fieldsOf(response.data[0])).toEqual([ + { + name: 'time', + values: [101, 102], + }, + { + name: 'b', + values: [2, 3], + }, + ]); + }); + + it(`should reduce buffer to a full frame with last error if one or more errors occur during subscriber's unavailability`, async () => { + const firstError = new Error('first error'); + const secondError = new Error(dummyErrorMessage); + const valuesCount = valuesCollection.valuesCount(); + + deps.subscriberReadiness.next(false); + + deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1newValues())); + deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1newValues())); + deps.liveEventsObservable.next(liveChannelStatusEvent(LiveChannelConnectionState.Connected, firstError)); + deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1newValues())); + deps.liveEventsObservable.next(liveChannelStatusEvent(LiveChannelConnectionState.Connected, secondError)); + + deps.subscriberReadiness.next(true); + expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount + 1, complete: false }); + + const response = valuesCollection.lastValue(); + expectErrorResponse(response, StreamingResponseDataType.FullFrame); + + const errorMessage = response?.error?.message; + expect(errorMessage?.includes(dummyErrorMessage)).toBeTruthy(); + + expect(fieldsOf(response.data[0])).toEqual([ + { + name: 'time', + values: [102, 102], + }, + { + name: 'b', + values: [3, 3], + }, + ]); + }); + + it('should ignore messages without payload', async () => { + const valuesCount = valuesCollection.valuesCount(); + + deps.liveEventsObservable.next(liveChannelStatusEvent(LiveChannelConnectionState.Connected)); + deps.liveEventsObservable.next(liveChannelStatusEvent(LiveChannelConnectionState.Pending)); + deps.liveEventsObservable.next(liveChannelStatusEvent(LiveChannelConnectionState.Pending)); + deps.liveEventsObservable.next(liveChannelLeaveEvent()); + + expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false }); + }); + + it(`should shutdown when source observable completes`, async () => { + expect(deps.onShutdown).not.toHaveBeenCalled(); + expect(deps.subscriberReadiness.observed).toBeTruthy(); + expect(deps.liveEventsObservable.observed).toBeTruthy(); + + deps.liveEventsObservable.complete(); + expectValueCollectionState(valuesCollection, { + errors: 0, + values: valuesCollection.valuesCount(), + complete: true, + }); + + expect(deps.subscriberReadiness.observed).toBeFalsy(); + expect(deps.liveEventsObservable.observed).toBeFalsy(); + expect(deps.onShutdown).toHaveBeenCalled(); + }); + }); + + describe('single subscriber with initial frame', () => { + it('should emit the initial frame right after subscribe', async () => { + const deps = createDeps(); + const liveDataStream = new LiveDataStream(deps); + const valuesCollection = new ValuesCollection(); + + const initialFrame = StreamingDataFrame.fromDataFrameJSON(dataFrameJsons.schema2()); + const observable = liveDataStream.get( + { ...liveDataStreamOptions.withTimeBFilter, frame: initialFrame }, + subscriptionKey + ); + valuesCollection.subscribeTo(observable); + + //then + expect(deps.subscriberReadiness.observed).toBeTruthy(); + expectValueCollectionState(valuesCollection, { errors: 0, values: 1, complete: false }); + + const response = valuesCollection.lastValue(); + + expectStreamingResponse(response, StreamingResponseDataType.FullFrame); + const data = response.data[0] as StreamingResponseData; + + expect(fieldsOf(data)).toEqual([ + { + name: 'time', + values: [103], + }, + { + name: 'b', + values: ['y'], // bug in streamingDataFrame - fix! + }, + ]); + }); + }); + + describe('two subscribers with initial frames', () => { + it('should ignore initial frame from second subscriber', async () => { + const deps = createDeps(); + const liveDataStream = new LiveDataStream(deps); + const valuesCollection = new ValuesCollection(); + const valuesCollection2 = new ValuesCollection(); + + valuesCollection.subscribeTo( + liveDataStream.get( + { + ...liveDataStreamOptions.withTimeBFilter, + frame: StreamingDataFrame.fromDataFrameJSON(dataFrameJsons.schema1()), + }, + subscriptionKey + ) + ); + + expectValueCollectionState(valuesCollection, { errors: 0, values: 1, complete: false }); + + valuesCollection2.subscribeTo( + liveDataStream.get( + { + ...liveDataStreamOptions.withTimeBFilter, + frame: StreamingDataFrame.fromDataFrameJSON(dataFrameJsons.schema2()), + }, + subscriptionKey + ) + ); + // no extra emits for initial subscriber + expectValueCollectionState(valuesCollection, { errors: 0, values: 1, complete: false }); + expectValueCollectionState(valuesCollection2, { errors: 0, values: 1, complete: false }); + + const frame1 = fieldsOf(valuesCollection.lastValue().data[0]); + const frame2 = fieldsOf(valuesCollection2.lastValue().data[0]); + expect(frame1).toEqual(frame2); + }); + }); + + describe('source observable emits completed event', () => { + it('should shutdown', async () => { + const deps = createDeps(); + const liveDataStream = new LiveDataStream(deps); + const valuesCollection = new ValuesCollection(); + + const observable = liveDataStream.get(liveDataStreamOptions.withTimeAFilter, subscriptionKey); + valuesCollection.subscribeTo(observable); + + expect(deps.subscriberReadiness.observed).toBeTruthy(); + expect(deps.liveEventsObservable.observed).toBeTruthy(); + expect(deps.onShutdown).not.toHaveBeenCalled(); + + deps.liveEventsObservable.complete(); + + expectValueCollectionState(valuesCollection, { + errors: 0, + values: 0, + complete: true, + }); + expect(deps.subscriberReadiness.observed).toBeFalsy(); + expect(deps.liveEventsObservable.observed).toBeFalsy(); + expect(deps.onShutdown).toHaveBeenCalled(); + }); + }); + + describe('source observable emits error event', () => { + it('should shutdown', async () => { + const deps = createDeps(); + const liveDataStream = new LiveDataStream(deps); + const valuesCollection = new ValuesCollection(); + + const observable = liveDataStream.get(liveDataStreamOptions.withTimeAFilter, subscriptionKey); + valuesCollection.subscribeTo(observable); + + expect(deps.subscriberReadiness.observed).toBeTruthy(); + expect(deps.liveEventsObservable.observed).toBeTruthy(); + expect(deps.onShutdown).not.toHaveBeenCalled(); + + deps.liveEventsObservable.error(new Error(dummyErrorMessage)); + + expectValueCollectionState(valuesCollection, { + errors: 0, + values: 1, + complete: true, + }); + const response = valuesCollection.lastValue(); + expectErrorResponse(response, StreamingResponseDataType.FullFrame); + + expect(response?.error?.message?.includes(dummyErrorMessage)).toBeTruthy(); + + expect(deps.subscriberReadiness.observed).toBeFalsy(); + expect(deps.liveEventsObservable.observed).toBeFalsy(); + expect(deps.onShutdown).toHaveBeenCalled(); + }); + }); + + describe('happy path with multiple subscribers', () => { + let deps: ReturnType; + let liveDataStream: LiveDataStream; + const valuesCollections = { + withTimeBFilter: new ValuesCollection(), + withTimeAFilter: new ValuesCollection(), + withoutFilter: new ValuesCollection(), + }; + + beforeAll(() => { + deps = createDeps(); + liveDataStream = new LiveDataStream(deps); + }); + + it('should emit the last value as full frame to new subscribers', async () => { + valuesCollections.withTimeAFilter.subscribeTo( + liveDataStream.get(liveDataStreamOptions.withTimeAFilter, subscriptionKey) + ); + + deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1())); + deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1newValues())); + + expectValueCollectionState(valuesCollections.withTimeAFilter, { errors: 0, values: 2, complete: false }); + + valuesCollections.withTimeBFilter.subscribeTo( + liveDataStream.get(liveDataStreamOptions.withTimeBFilter, subscriptionKey) + ); + valuesCollections.withoutFilter.subscribeTo( + liveDataStream.get(liveDataStreamOptions.withoutFilter, subscriptionKey) + ); + + console.log(JSON.stringify(valuesCollections.withTimeAFilter, null, 2)); + + expectValueCollectionState(valuesCollections.withTimeAFilter, { errors: 0, values: 2, complete: false }); + expectValueCollectionState(valuesCollections.withTimeBFilter, { errors: 0, values: 1, complete: false }); + expectValueCollectionState(valuesCollections.withoutFilter, { errors: 0, values: 1, complete: false }); + }); + + it('should emit filtered data to each subscriber', async () => { + deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1newValues())); + expect( + mapValues(valuesCollections, (collection) => + collection.values.map((response) => { + const data = response.data[0]; + return isStreamingResponseData(data, StreamingResponseDataType.FullFrame) + ? fieldsOf(data) + : isStreamingResponseData(data, StreamingResponseDataType.NewValuesSameSchema) + ? data.values + : response; + }) + ) + ).toEqual({ + withTimeAFilter: [ + [ + { + name: 'time', + values: [100, 101], + }, + { + name: 'a', + values: ['a', 'b'], + }, + ], + [[102], ['c']], + [[102], ['c']], + ], + withTimeBFilter: [ + [ + { + name: 'time', + values: [101, 102], + }, + { + name: 'b', + values: [2, 3], + }, + ], + [[102], [3]], + ], + withoutFilter: [ + [ + { + name: 'time', + values: [100, 101, 102], + }, + { + name: 'a', + values: ['a', 'b', 'c'], + }, + { + name: 'b', + values: [1, 2, 3], + }, + ], + [[102], ['c'], [3]], + ], + }); + }); + + it('should not unsubscribe the source observable unless all subscribers unsubscribe', async () => { + valuesCollections.withTimeAFilter.unsubscribe(); + jest.advanceTimersByTime(deps.shutdownDelayInMs + 1); + + expect(mapValues(valuesCollections, (coll) => coll.complete)).toEqual({ + withTimeAFilter: true, + withTimeBFilter: false, + withoutFilter: false, + }); + expect(deps.subscriberReadiness.observed).toBeTruthy(); + expect(deps.liveEventsObservable.observed).toBeTruthy(); + expect(deps.onShutdown).not.toHaveBeenCalled(); + }); + + it('should emit complete event to all subscribers during shutdown', async () => { + deps.liveEventsObservable.complete(); + + expect(mapValues(valuesCollections, (coll) => coll.complete)).toEqual({ + withTimeAFilter: true, + withTimeBFilter: true, + withoutFilter: true, + }); + expect(deps.subscriberReadiness.observed).toBeFalsy(); + expect(deps.liveEventsObservable.observed).toBeFalsy(); + expect(deps.onShutdown).toHaveBeenCalled(); + }); + }); + + describe('shutdown after unsubscribe', () => { + it('should shutdown if no other subscriber subscribed during shutdown delay', async () => { + const deps = createDeps(); + const liveDataStream = new LiveDataStream(deps); + const valuesCollection = new ValuesCollection(); + + valuesCollection.subscribeTo(liveDataStream.get(liveDataStreamOptions.withTimeAFilter, subscriptionKey)); + + expect(deps.subscriberReadiness.observed).toBeTruthy(); + expect(deps.liveEventsObservable.observed).toBeTruthy(); + expect(deps.onShutdown).not.toHaveBeenCalled(); + + valuesCollection.unsubscribe(); + jest.advanceTimersByTime(deps.shutdownDelayInMs - 1); + + // delay not finished - should still be subscribed + expect(deps.subscriberReadiness.observed).toBeFalsy(); + expect(deps.liveEventsObservable.observed).toBeTruthy(); + expect(deps.onShutdown).not.toHaveBeenCalled(); + + jest.advanceTimersByTime(2); + + // delay not finished - shut still be subscribed + expect(deps.subscriberReadiness.observed).toBeFalsy(); + expect(deps.liveEventsObservable.observed).toBeFalsy(); + expect(deps.onShutdown).toHaveBeenCalled(); + }); + + it('should not shutdown after unsubscribe if another subscriber subscribes during shutdown delay', async () => { + const deps = createDeps(); + const liveDataStream = new LiveDataStream(deps); + const valuesCollection1 = new ValuesCollection(); + const valuesCollection2 = new ValuesCollection(); + + valuesCollection1.subscribeTo(liveDataStream.get(liveDataStreamOptions.withTimeAFilter, subscriptionKey)); + + expect(deps.subscriberReadiness.observed).toBeTruthy(); + expect(deps.liveEventsObservable.observed).toBeTruthy(); + expect(deps.onShutdown).not.toHaveBeenCalled(); + + valuesCollection1.unsubscribe(); + jest.advanceTimersByTime(deps.shutdownDelayInMs - 1); + + valuesCollection2.subscribeTo(liveDataStream.get(liveDataStreamOptions.withTimeAFilter, subscriptionKey)); + jest.advanceTimersByTime(deps.shutdownDelayInMs); + + expect(deps.subscriberReadiness.observed).toBeTruthy(); + expect(deps.liveEventsObservable.observed).toBeTruthy(); + expect(deps.onShutdown).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/public/app/features/live/centrifuge/LiveDataStream.ts b/public/app/features/live/centrifuge/LiveDataStream.ts new file mode 100644 index 00000000000..5dd32976999 --- /dev/null +++ b/public/app/features/live/centrifuge/LiveDataStream.ts @@ -0,0 +1,325 @@ +import type { LiveDataStreamOptions, StreamingFrameOptions } from '@grafana/runtime/src/services/live'; +import { toDataQueryError } from '@grafana/runtime/src/utils/toDataQueryError'; +import { + DataFrameJSON, + dataFrameToJSON, + DataQueryError, + Field, + isLiveChannelMessageEvent, + isLiveChannelStatusEvent, + LiveChannelConnectionState, + LiveChannelEvent, + LiveChannelId, + LoadingState, +} from '@grafana/data'; +import { map, Observable, ReplaySubject, Subject, Subscriber, Subscription } from 'rxjs'; +import { DataStreamSubscriptionKey, StreamingDataQueryResponse } from './service'; +import { getStreamingFrameOptions, StreamingDataFrame } from '../data/StreamingDataFrame'; +import { StreamingResponseDataType } from '../data/utils'; + +const bufferIfNot = (canEmitObservable: Observable) => (source: Observable): Observable => { + return new Observable((subscriber: Subscriber) => { + let buffer: T[] = []; + let canEmit = true; + + const emitBuffer = () => { + subscriber.next(buffer); + buffer = []; + }; + + const canEmitSub = canEmitObservable.subscribe({ + next: (val) => { + canEmit = val; + + if (canEmit && buffer.length) { + emitBuffer(); + } + }, + }); + + const sourceSub = source.subscribe({ + next(value) { + if (canEmit) { + if (!buffer.length) { + subscriber.next([value]); + } else { + emitBuffer(); + } + } else { + buffer.push(value); + } + }, + error(error) { + subscriber.error(error); + }, + complete() { + subscriber.complete(); + }, + }); + + return () => { + sourceSub.unsubscribe(); + canEmitSub.unsubscribe(); + }; + }); +}; + +export type DataStreamHandlerDeps = { + channelId: LiveChannelId; + liveEventsObservable: Observable>; + onShutdown: () => void; + subscriberReadiness: Observable; + defaultStreamingFrameOptions: Readonly; + shutdownDelayInMs: number; +}; + +enum InternalStreamMessageType { + Error, + NewValuesSameSchema, + ChangedSchema, +} + +type InternalStreamMessageTypeToData = { + [InternalStreamMessageType.Error]: { + error: DataQueryError; + }; + [InternalStreamMessageType.ChangedSchema]: {}; + [InternalStreamMessageType.NewValuesSameSchema]: { + values: unknown[][]; + }; +}; + +type InternalStreamMessage = T extends InternalStreamMessageType + ? { + type: T; + } & InternalStreamMessageTypeToData[T] + : never; + +const reduceNewValuesSameSchemaMessages = ( + packets: Array> +) => ({ + values: packets.reduce((acc, { values }) => { + for (let i = 0; i < values.length; i++) { + if (!acc[i]) { + acc[i] = []; + } + for (let j = 0; j < values[i].length; j++) { + acc[i].push(values[i][j]); + } + } + return acc; + }, [] as unknown[][]), + type: InternalStreamMessageType.NewValuesSameSchema, +}); + +const filterMessages = ( + packets: InternalStreamMessage[], + type: T +): Array> => packets.filter((p) => p.type === type) as Array>; + +export class LiveDataStream { + private frameBuffer: StreamingDataFrame; + private liveEventsSubscription: Subscription; + private stream: Subject = new ReplaySubject(1); + private shutdownTimeoutId: ReturnType | undefined; + + constructor(private deps: DataStreamHandlerDeps) { + this.frameBuffer = StreamingDataFrame.empty(deps.defaultStreamingFrameOptions); + this.liveEventsSubscription = deps.liveEventsObservable.subscribe({ + error: this.onError, + complete: this.onComplete, + next: this.onNext, + }); + } + + private shutdown = () => { + this.stream.complete(); + this.liveEventsSubscription.unsubscribe(); + this.deps.onShutdown(); + }; + + private shutdownIfNoSubscribers = () => { + if (!this.stream.observed) { + this.shutdown(); + } + }; + + private onError = (err: any) => { + console.log('LiveQuery [error]', { err }, this.deps.channelId); + this.stream.next({ + type: InternalStreamMessageType.Error, + error: toDataQueryError(err), + }); + this.shutdown(); + }; + + private onComplete = () => { + console.log('LiveQuery [complete]', this.deps.channelId); + this.shutdown(); + }; + + private onNext = (evt: LiveChannelEvent) => { + if (isLiveChannelMessageEvent(evt)) { + this.process(evt.message); + return; + } + + const liveChannelStatusEvent = isLiveChannelStatusEvent(evt); + if (liveChannelStatusEvent && evt.error) { + this.stream.next({ + type: InternalStreamMessageType.Error, + error: { + ...toDataQueryError(evt.error), + message: `Streaming channel error: ${evt.error.message}`, + }, + }); + return; + } + + if ( + liveChannelStatusEvent && + (evt.state === LiveChannelConnectionState.Connected || evt.state === LiveChannelConnectionState.Pending) && + evt.message + ) { + this.process(evt.message); + } + }; + + private process = (msg: DataFrameJSON) => { + const packetInfo = this.frameBuffer.push(msg); + + if (packetInfo.schemaChanged) { + this.stream.next({ + type: InternalStreamMessageType.ChangedSchema, + }); + } else { + this.stream.next({ + type: InternalStreamMessageType.NewValuesSameSchema, + values: this.frameBuffer.getValuesFromLastPacket(), + }); + } + }; + + private resizeBuffer = (bufferOptions: StreamingFrameOptions) => { + if (bufferOptions && this.frameBuffer.needsResizing(bufferOptions)) { + this.frameBuffer.resize(bufferOptions); + } + }; + + private prepareInternalStreamForNewSubscription = (options: LiveDataStreamOptions): void => { + if (!this.frameBuffer.hasAtLeastOnePacket() && options.frame) { + // will skip initial frames from subsequent subscribers + this.process(dataFrameToJSON(options.frame)); + } + }; + + private clearShutdownTimeout = () => { + if (this.shutdownTimeoutId) { + clearTimeout(this.shutdownTimeoutId); + this.shutdownTimeoutId = undefined; + } + }; + + get = (options: LiveDataStreamOptions, subKey: DataStreamSubscriptionKey): Observable => { + this.clearShutdownTimeout(); + const buffer = getStreamingFrameOptions(options.buffer); + + this.resizeBuffer(buffer); + this.prepareInternalStreamForNewSubscription(options); + + const fieldsNamesFilter = options.filter?.fields; + const dataNeedsFiltering = fieldsNamesFilter?.length; + const fieldFilterPredicate = dataNeedsFiltering ? ({ name }: Field) => fieldsNamesFilter.includes(name) : undefined; + let matchingFieldIndexes: number[] | undefined = undefined; + + const getFullFrameResponseData = (error?: DataQueryError): StreamingDataQueryResponse => { + matchingFieldIndexes = fieldFilterPredicate + ? this.frameBuffer.getMatchingFieldIndexes(fieldFilterPredicate) + : undefined; + + return { + key: subKey, + state: error ? LoadingState.Error : LoadingState.Streaming, + data: [ + { + type: StreamingResponseDataType.FullFrame, + frame: this.frameBuffer.serialize(fieldFilterPredicate, buffer), + }, + ], + error, + }; + }; + + const getNewValuesSameSchemaResponseData = (values: unknown[][]): StreamingDataQueryResponse => { + const filteredValues = matchingFieldIndexes + ? values.filter((v, i) => (matchingFieldIndexes as number[]).includes(i)) + : values; + + return { + key: subKey, + state: LoadingState.Streaming, + data: [ + { + type: StreamingResponseDataType.NewValuesSameSchema, + values: filteredValues, + }, + ], + }; + }; + + let shouldSendFullFrame = true; + const transformedInternalStream = this.stream.pipe( + bufferIfNot(this.deps.subscriberReadiness), + map((messages, i) => { + const errors = filterMessages(messages, InternalStreamMessageType.Error); + const lastError = errors.length ? errors[errors.length - 1].error : undefined; + + if (shouldSendFullFrame) { + shouldSendFullFrame = false; + return getFullFrameResponseData(lastError); + } + + if (errors.length) { + // send the latest frame with the last error, discard everything else + return getFullFrameResponseData(lastError); + } + + const schemaChanged = messages.some((n) => n.type === InternalStreamMessageType.ChangedSchema); + if (schemaChanged) { + // send the latest frame, discard intermediate appends + return getFullFrameResponseData(); + } + + const newValueSameSchemaMessages = filterMessages(messages, InternalStreamMessageType.NewValuesSameSchema); + if (newValueSameSchemaMessages.length !== messages.length) { + console.warn(`unsupported message type ${messages.map(({ type }) => type)}`); + } + + return getNewValuesSameSchemaResponseData(reduceNewValuesSameSchemaMessages(newValueSameSchemaMessages).values); + }) + ); + + return new Observable((subscriber) => { + const sub = transformedInternalStream.subscribe({ + next: (n) => { + subscriber.next(n); + }, + error: (err) => { + subscriber.error(err); + }, + complete: () => { + subscriber.complete(); + }, + }); + + return () => { + // TODO: potentially resize (downsize) the buffer on unsubscribe + sub.unsubscribe(); + if (!this.stream.observed) { + this.clearShutdownTimeout(); + this.shutdownTimeoutId = setTimeout(this.shutdownIfNoSubscribers, this.deps.shutdownDelayInMs); + } + }; + }); + }; +} diff --git a/public/app/features/live/centrifuge/channel.ts b/public/app/features/live/centrifuge/channel.ts index c505a972891..92bae084747 100644 --- a/public/app/features/live/centrifuge/channel.ts +++ b/public/app/features/live/centrifuge/channel.ts @@ -140,7 +140,14 @@ export class CentrifugeLiveChannel { */ getStream() { return new Observable((subscriber) => { - subscriber.next({ ...this.currentStatus }); + const initialMessage = { ...this.currentStatus }; + if (this.lastMessageWithSchema?.schema) { + // send just schema instead of schema+data to avoid having data gaps + initialMessage.message = { schema: this.lastMessageWithSchema?.schema }; + } + + subscriber.next({ ...this.currentStatus, message: this.lastMessageWithSchema }); + const sub = this.stream.subscribe(subscriber); return () => { sub.unsubscribe(); diff --git a/public/app/features/live/centrifuge/service.ts b/public/app/features/live/centrifuge/service.ts index 515d0628154..c003e5f3f54 100644 --- a/public/app/features/live/centrifuge/service.ts +++ b/public/app/features/live/centrifuge/service.ts @@ -1,23 +1,21 @@ import Centrifuge from 'centrifuge/dist/centrifuge'; -import { LiveDataStreamOptions } from '@grafana/runtime'; -import { toDataQueryError } from '@grafana/runtime/src/utils/toDataQueryError'; -import { BehaviorSubject, Observable } from 'rxjs'; import { - DataFrame, - DataFrameJSON, - dataFrameToJSON, + GrafanaLiveSrv, + LiveDataStreamOptions, + StreamingFrameAction, + StreamingFrameOptions, +} from '@grafana/runtime/src/services/live'; +import { BehaviorSubject, Observable, share, startWith } from 'rxjs'; +import { DataQueryResponse, - isLiveChannelMessageEvent, - isLiveChannelStatusEvent, LiveChannelAddress, LiveChannelConnectionState, - LiveChannelEvent, - LiveChannelPresenceStatus, - LoadingState, - StreamingDataFrame, - toDataFrameDTO, + LiveChannelId, + toLiveChannelId, } from '@grafana/data'; import { CentrifugeLiveChannel } from './channel'; +import { LiveDataStream } from './LiveDataStream'; +import { StreamingResponseData } from '../data/utils'; export type CentrifugeSrvDeps = { appUrl: string; @@ -28,39 +26,32 @@ export type CentrifugeSrvDeps = { dataStreamSubscriberReadiness: Observable; }; -export interface CentrifugeSrv { - /** - * Listen for changes to the connection state - */ - getConnectionState(): Observable; +export type StreamingDataQueryResponse = Omit & { data: [StreamingResponseData] }; - /** - * Watch for messages in a channel - */ - getStream(address: LiveChannelAddress): Observable>; +export type CentrifugeSrv = Omit & { + getDataStream: (options: LiveDataStreamOptions) => Observable; +}; - /** - * Connect to a channel and return results as DataFrames - */ - getDataStream(options: LiveDataStreamOptions): Observable; +export type DataStreamSubscriptionKey = string; - /** - * For channels that support presence, this will request the current state from the server. - * - * Join and leave messages will be sent to the open stream - */ - getPresence(address: LiveChannelAddress): Promise; -} +const defaultStreamingFrameOptions: Readonly = { + maxLength: 100, + maxDelta: Infinity, + action: StreamingFrameAction.Append, +}; + +const dataStreamShutdownDelayInMs = 5000; export class CentrifugeService implements CentrifugeSrv { readonly open = new Map(); + private readonly liveDataStreamByChannelId: Record = {}; readonly centrifuge: Centrifuge; readonly connectionState: BehaviorSubject; readonly connectionBlocker: Promise; - private dataStreamSubscriberReady = true; + private readonly dataStreamSubscriberReadiness: Observable; constructor(private deps: CentrifugeSrvDeps) { - deps.dataStreamSubscriberReadiness.subscribe((next) => (this.dataStreamSubscriberReady = next)); + this.dataStreamSubscriberReadiness = deps.dataStreamSubscriberReadiness.pipe(share(), startWith(true)); const liveUrl = `${deps.appUrl.replace(/^http/, 'ws')}/api/live/ws`; this.centrifuge = new Centrifuge(liveUrl, {}); this.centrifuge.setConnectData({ @@ -154,130 +145,59 @@ export class CentrifugeService implements CentrifugeSrv { /** * Listen for changes to the connection state */ - getConnectionState() { + getConnectionState = () => { return this.connectionState.asObservable(); - } + }; /** * Watch for messages in a channel */ - getStream(address: LiveChannelAddress): Observable> { + getStream: CentrifugeSrv['getStream'] = (address: LiveChannelAddress) => { return this.getChannel(address).getStream(); - } + }; + private createSubscriptionKey = (options: LiveDataStreamOptions): DataStreamSubscriptionKey => + options.key ?? `xstr/${streamCounter++}`; + + private getLiveDataStream = (options: LiveDataStreamOptions): LiveDataStream => { + const channelId = toLiveChannelId(options.addr); + const existingStream = this.liveDataStreamByChannelId[channelId]; + + if (existingStream) { + return existingStream; + } + + const channel = this.getChannel(options.addr); + this.liveDataStreamByChannelId[channelId] = new LiveDataStream({ + channelId, + onShutdown: () => { + delete this.liveDataStreamByChannelId[channelId]; + }, + liveEventsObservable: channel.getStream(), + subscriberReadiness: this.dataStreamSubscriberReadiness, + defaultStreamingFrameOptions, + shutdownDelayInMs: dataStreamShutdownDelayInMs, + }); + return this.liveDataStreamByChannelId[channelId]; + }; /** * Connect to a channel and return results as DataFrames */ - getDataStream(options: LiveDataStreamOptions): Observable { - return new Observable((subscriber) => { - const channel = this.getChannel(options.addr); - const key = options.key ?? `xstr/${streamCounter++}`; - let data: StreamingDataFrame | undefined = undefined; - let filtered: DataFrame | undefined = undefined; - let state = LoadingState.Streaming; - let lastWidth = -1; + getDataStream: CentrifugeSrv['getDataStream'] = (options) => { + const subscriptionKey = this.createSubscriptionKey(options); - const process = (msg: DataFrameJSON) => { - if (!data) { - data = new StreamingDataFrame(msg, options.buffer); - } else { - data.push(msg); - } - state = LoadingState.Streaming; - const sameWidth = lastWidth === data.fields.length; - lastWidth = data.fields.length; - - // Filter out fields - if (!filtered || msg.schema || !sameWidth) { - filtered = data; - if (options.filter) { - const { fields } = options.filter; - if (fields?.length) { - filtered = { - ...data, - fields: data.fields.filter((f) => fields.includes(f.name)), - }; - } - } - } - - if (this.dataStreamSubscriberReady) { - filtered.length = data.length; // make sure they stay up-to-date - subscriber.next({ - state, - data: [ - // workaround for serializing issues when sending DataFrame from web worker to the main thread - // DataFrame is making use of ArrayVectors which are es6 classes and thus not cloneable out of the box - // `toDataFrameDTO` converts ArrayVectors into native arrays. - toDataFrameDTO(filtered), - ], - key, - }); - } - }; - - if (options.frame) { - process(dataFrameToJSON(options.frame)); - } else if (channel.lastMessageWithSchema) { - process(channel.lastMessageWithSchema); - } - - const sub = channel.getStream().subscribe({ - error: (err: any) => { - console.log('LiveQuery [error]', { err }, options.addr); - state = LoadingState.Error; - subscriber.next({ state, data: [data], key, error: toDataQueryError(err) }); - sub.unsubscribe(); // close after error - }, - complete: () => { - console.log('LiveQuery [complete]', options.addr); - if (state !== LoadingState.Error) { - state = LoadingState.Done; - } - // or track errors? subscriber.next({ state, data: [data], key }); - subscriber.complete(); - sub.unsubscribe(); - }, - next: (evt: LiveChannelEvent) => { - if (isLiveChannelMessageEvent(evt)) { - process(evt.message); - return; - } - if (isLiveChannelStatusEvent(evt)) { - if (evt.error) { - let error = toDataQueryError(evt.error); - error.message = `Streaming channel error: ${error.message}`; - state = LoadingState.Error; - subscriber.next({ state, data: [data], key, error }); - return; - } else if ( - evt.state === LiveChannelConnectionState.Connected || - evt.state === LiveChannelConnectionState.Pending - ) { - if (evt.message) { - process(evt.message); - } - return; - } - console.log('ignore state', evt); - } - }, - }); - - return () => { - sub.unsubscribe(); - }; - }); - } + const stream = this.getLiveDataStream(options); + return stream.get(options, subscriptionKey); + }; /** * For channels that support presence, this will request the current state from the server. * * Join and leave messages will be sent to the open stream */ - getPresence(address: LiveChannelAddress): Promise { + getPresence: CentrifugeSrv['getPresence'] = (address) => { return this.getChannel(address).getPresence(); - } + }; } // This is used to give a unique key for each stream. The actual value does not matter diff --git a/packages/grafana-data/src/dataframe/StreamingDataFrame.test.ts b/public/app/features/live/data/StreamingDataFrame.test.ts similarity index 57% rename from packages/grafana-data/src/dataframe/StreamingDataFrame.test.ts rename to public/app/features/live/data/StreamingDataFrame.test.ts index 36248bf4268..4e736eadd08 100644 --- a/packages/grafana-data/src/dataframe/StreamingDataFrame.test.ts +++ b/public/app/features/live/data/StreamingDataFrame.test.ts @@ -1,8 +1,6 @@ -import { reduceField, ReducerID } from '..'; -import { getFieldDisplayName } from '../field'; -import { DataFrame, FieldType } from '../types/dataFrame'; -import { DataFrameJSON } from './DataFrameJSON'; -import { StreamingDataFrame } from './StreamingDataFrame'; +import { reduceField, ReducerID, getFieldDisplayName, DataFrame, FieldType, DataFrameJSON } from '@grafana/data'; +import { StreamingFrameAction, StreamingFrameOptions } from '@grafana/runtime'; +import { getStreamingFrameOptions, StreamingDataFrame } from './StreamingDataFrame'; describe('Streaming JSON', () => { describe('when called with a DataFrame', () => { @@ -23,7 +21,7 @@ describe('Streaming JSON', () => { }, }; - const stream = new StreamingDataFrame(json, { + const stream = StreamingDataFrame.fromDataFrameJSON(json, { maxLength: 5, maxDelta: 300, }); @@ -186,10 +184,306 @@ describe('Streaming JSON', () => { ] `); }); + + it('should append data with new schema and fill missed values with undefined', () => { + stream.push({ + schema: { + fields: [ + { name: 'time', type: FieldType.time }, + { name: 'name', type: FieldType.string }, + { name: 'value', type: FieldType.number }, + { name: 'value2', type: FieldType.number }, + ], + }, + data: { + values: [[601], ['i'], [10], [-10]], + }, + }); + + expect(stream.fields.map((f) => ({ name: f.name, value: f.values.buffer }))).toMatchInlineSnapshot(` + Array [ + Object { + "name": "time", + "value": Array [ + 500, + 501, + 502, + 503, + 601, + ], + }, + Object { + "name": "name", + "value": Array [ + "e", + "f", + "g", + "h", + "i", + ], + }, + Object { + "name": "value", + "value": Array [ + 5, + 6, + 7, + 8, + 9, + 10, + ], + }, + Object { + "name": "value2", + "value": Array [ + undefined, + undefined, + undefined, + undefined, + -10, + ], + }, + ] + `); + }); + + it('should be able to return values from previous packet', function () { + stream.push({ + data: { + values: [ + [602, 603], + ['j', 'k'], + [11, 12], + [-11, -12], + ], + }, + }); + + expect(stream.getValuesFromLastPacket()).toEqual([ + [602, 603], + ['j', 'k'], + [11, 12], + [-11, -12], + ]); + }); + }); + + describe('serialization', function () { + const json: DataFrameJSON = { + schema: { + fields: [ + { name: 'time', type: FieldType.time }, + { name: 'name', type: FieldType.string }, + { name: 'value', type: FieldType.number }, + ], + }, + data: { + values: [ + [100, 200, 300], + ['a', 'b', 'c'], + [1, 2, 3], + ], + }, + }; + + const frame = StreamingDataFrame.fromDataFrameJSON(json, { + maxLength: 5, + maxDelta: 300, + }); + + it('should filter fields', function () { + const serializedFrame = frame.serialize((f) => ['time'].includes(f.name)); + expect(serializedFrame.fields).toEqual([ + { + config: {}, + name: 'time', + type: 'time', + values: [100, 200, 300], + }, + ]); + }); + + it('should resize the buffer', function () { + const serializedFrame = frame.serialize((f) => ['time', 'name'].includes(f.name), { maxLength: 2 }); + expect(serializedFrame.fields).toEqual([ + { + config: {}, + name: 'time', + type: 'time', + values: [200, 300], + }, + { + config: {}, + name: 'name', + type: 'string', + values: ['b', 'c'], + }, + ]); + }); + }); + + describe('resizing', function () { + it.each([ + [ + { + existing: { + maxLength: 10, + maxDelta: 5, + action: StreamingFrameAction.Replace, + }, + newOptions: {}, + expected: { + maxLength: 10, + maxDelta: 5, + action: StreamingFrameAction.Replace, + }, + }, + ], + [ + { + existing: { + maxLength: 10, + maxDelta: 5, + action: StreamingFrameAction.Replace, + }, + newOptions: { + maxLength: 9, + maxDelta: 4, + }, + expected: { + maxLength: 10, + maxDelta: 5, + action: StreamingFrameAction.Replace, + }, + }, + ], + [ + { + existing: { + maxLength: 10, + maxDelta: 5, + action: StreamingFrameAction.Replace, + }, + newOptions: { + maxLength: 11, + maxDelta: 6, + }, + expected: { + maxLength: 11, + maxDelta: 6, + action: StreamingFrameAction.Replace, + }, + }, + ], + ])( + 'should always resize to a bigger buffer', + ({ + existing, + expected, + newOptions, + }: { + existing: StreamingFrameOptions; + newOptions: Partial; + expected: StreamingFrameOptions; + }) => { + const frame = StreamingDataFrame.empty(existing); + frame.resize(newOptions); + expect(frame.getOptions()).toEqual(expected); + } + ); + + it('should override infinity maxDelta', function () { + const frame = StreamingDataFrame.empty({ + maxLength: 10, + maxDelta: Infinity, + action: StreamingFrameAction.Replace, + }); + frame.resize({ + maxLength: 9, + maxDelta: 4, + }); + expect(frame.getOptions()).toEqual({ + maxLength: 10, + maxDelta: 4, + action: StreamingFrameAction.Replace, + }); + }); + }); + + describe('options with defaults', function () { + it('should provide defaults', function () { + expect(getStreamingFrameOptions()).toEqual({ + action: StreamingFrameAction.Append, + maxDelta: Infinity, + maxLength: 1000, + }); + }); + }); + + describe('when deserialized', function () { + const json: DataFrameJSON = { + schema: { + fields: [ + { name: 'time', type: FieldType.time }, + { name: 'name', type: FieldType.string }, + { name: 'value', type: FieldType.number }, + ], + }, + data: { + values: [ + [100, 200, 300], + ['a', 'b', 'c'], + [1, 2, 3], + ], + }, + }; + + const serializedFrame = StreamingDataFrame.fromDataFrameJSON(json, { + maxLength: 5, + maxDelta: 300, + }).serialize(); + + it('should support pushing new values matching the existing schema', function () { + const frame = StreamingDataFrame.deserialize(serializedFrame); + frame.pushNewValues([[601], ['x'], [10]]); + expect(frame.fields.map((f) => ({ name: f.name, value: f.values.buffer }))).toMatchInlineSnapshot(` + Array [ + Object { + "name": "time", + "value": Array [ + 300, + 601, + ], + }, + Object { + "name": "name", + "value": Array [ + "c", + "x", + ], + }, + Object { + "name": "value", + "value": Array [ + 3, + 10, + ], + }, + ] + `); + }); + }); + + describe('when created empty', function () { + it('should have no packets', function () { + const streamingDataFrame = StreamingDataFrame.empty(); + expect(streamingDataFrame.hasAtLeastOnePacket()).toEqual(false); + expect(streamingDataFrame.fields).toHaveLength(0); + }); }); describe('lengths property is accurate', () => { - const stream = new StreamingDataFrame( + const stream = StreamingDataFrame.fromDataFrameJSON( { schema: { fields: [{ name: 'simple', type: FieldType.number }], @@ -217,7 +511,7 @@ describe('Streaming JSON', () => { }); describe('streaming labels column', () => { - const stream = new StreamingDataFrame( + const stream = StreamingDataFrame.fromDataFrameJSON( { schema: { fields: [ @@ -392,7 +686,7 @@ describe('Streaming JSON', () => { }, }; - const stream = new StreamingDataFrame(json, { + const stream = StreamingDataFrame.fromDataFrameJSON(json, { maxLength: 4, maxDelta: 300, }); @@ -410,6 +704,7 @@ describe('Streaming JSON', () => { "action": "replace", "length": 3, "number": 1, + "schemaChanged": true, }, "values": Array [ 1, @@ -433,6 +728,7 @@ describe('Streaming JSON', () => { "action": "append", "length": 2, "number": 2, + "schemaChanged": false, }, "values": Array [ 2, @@ -454,6 +750,7 @@ describe('Streaming JSON', () => { "action": "append", "length": 1, "number": 3, + "schemaChanged": false, }, "values": Array [ 3, diff --git a/packages/grafana-data/src/dataframe/StreamingDataFrame.ts b/public/app/features/live/data/StreamingDataFrame.ts similarity index 53% rename from packages/grafana-data/src/dataframe/StreamingDataFrame.ts rename to public/app/features/live/data/StreamingDataFrame.ts index dbcfe20e916..38a17b9c409 100644 --- a/packages/grafana-data/src/dataframe/StreamingDataFrame.ts +++ b/public/app/features/live/data/StreamingDataFrame.ts @@ -1,40 +1,34 @@ -import { Field, DataFrame, FieldType, Labels, QueryResultMeta } from '../types'; -import { ArrayVector } from '../vector'; -import { DataFrameJSON, decodeFieldValueEntities, FieldSchema } from './DataFrameJSON'; -import { guessFieldTypeFromValue } from './processDataFrame'; -import { join } from '../transformations/transformers/joinDataFrames'; +import { + DataFrame, + Field, + FieldDTO, + FieldType, + Labels, + QueryResultMeta, + DataFrameJSON, + decodeFieldValueEntities, + FieldSchema, + guessFieldTypeFromValue, + ArrayVector, + toFilteredDataFrameDTO, +} from '@grafana/data'; +import { join } from '@grafana/data/src/transformations/transformers/joinDataFrames'; +import { + StreamingFrameAction, + StreamingFrameOptions, +} from '@grafana/runtime/src/services/live'; import { AlignedData } from 'uplot'; -/** - * Indicate if the frame is appened or replace - * - * @public -- but runtime - */ -export enum StreamingFrameAction { - Append = 'append', - Replace = 'replace', -} - /** * Stream packet info is attached to StreamingDataFrames and indicate how many * rows were added to the end of the frame. The number of discarded rows can be * calculated from previous state - * - * @public -- but runtime */ export interface StreamPacketInfo { number: number; action: StreamingFrameAction; length: number; -} - -/** - * @alpha - */ -export interface StreamingFrameOptions { - maxLength?: number; // 1000 - maxDelta?: number; // how long to keep things - action?: StreamingFrameAction; // default will append + schemaChanged: boolean; } enum PushMode { @@ -43,10 +37,22 @@ enum PushMode { // long } +export type SerializedStreamingDataFrame = { + name?: string; + fields: FieldDTO[]; + refId?: string; + meta: QueryResultMeta; + schemaFields: FieldSchema[]; + timeFieldIndex: number; + pushMode: PushMode; + length: number; + packetInfo: StreamPacketInfo; + options: StreamingFrameOptions; + labels: Set; +}; + /** * Unlike a circular buffer, this will append and periodically slice the front - * - * @alpha */ export class StreamingDataFrame implements DataFrame { name?: string; @@ -56,40 +62,142 @@ export class StreamingDataFrame implements DataFrame { fields: Array>> = []; length = 0; - options: StreamingFrameOptions; - private schemaFields: FieldSchema[] = []; private timeFieldIndex = -1; private pushMode = PushMode.wide; - private alwaysReplace = false; // current labels private labels: Set = new Set(); readonly packetInfo: StreamPacketInfo = { + schemaChanged: true, number: 0, action: StreamingFrameAction.Replace, length: 0, }; - constructor(frame: DataFrameJSON, opts?: StreamingFrameOptions) { - this.options = { - maxLength: 1000, - maxDelta: Infinity, - ...opts, - }; - this.alwaysReplace = this.options.action === StreamingFrameAction.Replace; + private constructor(public options: StreamingFrameOptions) { + // Get Length to show up if you use spread + Object.defineProperty(this, 'length', { + enumerable: true, + }); - this.push(frame); + // Get fields to show up if you use spread + Object.defineProperty(this, 'fields', { + enumerable: true, + }); } + serialize = ( + fieldPredicate?: (f: Field) => boolean, + optionsOverride?: Partial + ): SerializedStreamingDataFrame => { + const options = optionsOverride ? Object.assign({}, { ...this.options, ...optionsOverride }) : this.options; + const dataFrameDTO = toFilteredDataFrameDTO(this, fieldPredicate); + + const numberOfItemsToRemove = getNumberOfItemsToRemove( + dataFrameDTO.fields.map((f) => f.values) as unknown[][], + options.maxLength, + this.timeFieldIndex, + options.maxDelta + ); + + dataFrameDTO.fields = dataFrameDTO.fields.map((f) => ({ + ...f, + values: (f.values as unknown[]).slice(numberOfItemsToRemove), + })); + + return { + ...dataFrameDTO, + // TODO: Labels and schema are not filtered by field + labels: this.labels, + schemaFields: this.schemaFields, + + name: this.name, + refId: this.refId, + meta: this.meta, + length: this.length, + timeFieldIndex: this.timeFieldIndex, + pushMode: this.pushMode, + packetInfo: this.packetInfo, + options, + }; + }; + + private initFromSerialized = (serialized: Omit) => { + this.name = serialized.name; + this.refId = serialized.refId; + this.meta = serialized.meta; + this.length = serialized.length; + this.labels = serialized.labels; + this.schemaFields = serialized.schemaFields; + this.timeFieldIndex = serialized.timeFieldIndex; + this.pushMode = serialized.pushMode; + this.packetInfo.length = serialized.packetInfo.length; + this.packetInfo.number = serialized.packetInfo.number; + this.packetInfo.action = StreamingFrameAction.Replace; + this.packetInfo.schemaChanged = true; + this.fields = serialized.fields.map((f) => ({ + ...f, + type: f.type ?? FieldType.other, + config: f.config ?? {}, + values: Array.isArray(f.values) ? new ArrayVector(f.values) : new ArrayVector(), + })); + + assureValuesAreWithinLengthLimit( + this.fields.map((f) => f.values.buffer), + this.options.maxLength, + this.timeFieldIndex, + this.options.maxDelta + ); + }; + + static deserialize = (serialized: SerializedStreamingDataFrame) => { + const frame = new StreamingDataFrame(serialized.options); + frame.initFromSerialized(serialized); + return frame; + }; + + static empty = (opts?: Partial): StreamingDataFrame => + new StreamingDataFrame(getStreamingFrameOptions(opts)); + + static fromDataFrameJSON = (frame: DataFrameJSON, opts?: Partial): StreamingDataFrame => { + const streamingDataFrame = new StreamingDataFrame(getStreamingFrameOptions(opts)); + streamingDataFrame.push(frame); + return streamingDataFrame; + }; + + private get alwaysReplace() { + return this.options.action === StreamingFrameAction.Replace; + } + + needsResizing = ({ maxLength, maxDelta }: StreamingFrameOptions) => { + const needsMoreLength = maxLength && this.options.maxLength < maxLength; + const needsBiggerDelta = maxDelta && this.options.maxDelta < maxDelta; + const needsToOverrideDefaultInfinityDelta = maxDelta && this.options.maxDelta === Infinity; + return Boolean(needsMoreLength || needsBiggerDelta || needsToOverrideDefaultInfinityDelta); + }; + + resize = ({ maxLength, maxDelta }: Partial) => { + if (maxDelta) { + if (this.options.maxDelta === Infinity) { + this.options.maxDelta = maxDelta; + } else { + this.options.maxDelta = Math.max(maxDelta, this.options.maxDelta); + } + } + this.options.maxLength = Math.max(this.options.maxLength, maxLength ?? 0); + }; + /** * apply the new message to the existing data. This will replace the existing schema * if a new schema is included in the message, or append data matching the current schema */ - push(msg: DataFrameJSON) { + push(msg: DataFrameJSON): StreamPacketInfo { const { schema, data } = msg; this.packetInfo.number++; + this.packetInfo.length = 0; + this.packetInfo.schemaChanged = false; if (schema) { this.pushMode = PushMode.wide; @@ -118,6 +226,7 @@ export class StreamingDataFrame implements DataFrame { f.labels = sf.labels; }); } else { + this.packetInfo.schemaChanged = true; const isWide = this.pushMode === PushMode.wide; this.fields = niceSchemaFields.map((f) => { return { @@ -127,7 +236,8 @@ export class StreamingDataFrame implements DataFrame { type: f.type ?? FieldType.other, // transfer old values by type & name, unless we relied on labels to match fields values: isWide - ? this.fields.find((of) => of.name === f.name && f.type === of.type)?.values ?? new ArrayVector() + ? this.fields.find((of) => of.name === f.name && f.type === of.type)?.values ?? + new ArrayVector(Array(this.length).fill(undefined)) : new ArrayVector(), }; }); @@ -155,6 +265,7 @@ export class StreamingDataFrame implements DataFrame { // make sure fields are initalized for each label for (const label of labeledTables.keys()) { if (!this.labels.has(label)) { + this.packetInfo.schemaChanged = true; this.addLabel(label); } } @@ -221,8 +332,54 @@ export class StreamingDataFrame implements DataFrame { // Update the frame length this.length = appended[0].length; } + + return { + ...this.packetInfo, + }; } + pushNewValues = (values: unknown[][]) => { + if (!values?.length) { + return; + } + + this.packetInfo.action = StreamingFrameAction.Append; + this.packetInfo.number++; + this.packetInfo.length = values[0].length; + this.packetInfo.schemaChanged = false; + + circPush( + this.fields.map((f) => f.values.buffer), + values, + this.options.maxLength, + this.timeFieldIndex, + this.options.maxDelta + ); + }; + + resetStateCalculations = () => { + this.fields.forEach((f) => { + f.state = { + ...(f.state ?? {}), + calcs: undefined, + range: undefined, + }; + }); + }; + + getMatchingFieldIndexes = (fieldPredicate: (f: Field) => boolean): number[] => + this.fields + .map((f, index) => (fieldPredicate(f) ? index : undefined)) + .filter((val) => val !== undefined) as number[]; + + getValuesFromLastPacket = (): unknown[][] => + this.fields.map((f) => { + const values = f.values.buffer; + return values.slice(Math.max(values.length - this.packetInfo.length)); + }); + + hasAtLeastOnePacket = () => Boolean(this.packetInfo.length); + // adds a set of fields for a new label private addLabel(label: string) { let labelCount = this.labels.size; @@ -258,6 +415,16 @@ export class StreamingDataFrame implements DataFrame { this.labels.add(label); } + + getOptions = (): Readonly => this.options; +} + +export function getStreamingFrameOptions(opts?: Partial): StreamingFrameOptions { + return { + maxLength: opts?.maxLength ?? 1000, + maxDelta: opts?.maxDelta ?? Infinity, + action: opts?.action ?? StreamingFrameAction.Append, + }; } // converts vertical insertion records with table keys in [0] and column values in [1...N] @@ -317,9 +484,31 @@ export function getLastStreamingDataFramePacket(frame: DataFrame) { } // mutable circular push -function circPush(data: number[][], newData: number[][], maxLength = Infinity, deltaIdx = 0, maxDelta = Infinity) { +function circPush(data: unknown[][], newData: unknown[][], maxLength = Infinity, deltaIdx = 0, maxDelta = Infinity) { for (let i = 0; i < data.length; i++) { - data[i] = data[i].concat(newData[i]); + for (let k = 0; k < newData[i].length; k++) { + data[i].push(newData[i][k]); + } + } + + return assureValuesAreWithinLengthLimit(data, maxLength, deltaIdx, maxDelta); +} + +function assureValuesAreWithinLengthLimit(data: unknown[][], maxLength = Infinity, deltaIdx = 0, maxDelta = Infinity) { + const count = getNumberOfItemsToRemove(data, maxLength, deltaIdx, maxDelta); + + if (count) { + for (let i = 0; i < data.length; i++) { + data[i].splice(0, count); + } + } + + return count; +} + +function getNumberOfItemsToRemove(data: unknown[][], maxLength = Infinity, deltaIdx = 0, maxDelta = Infinity) { + if (!data[0]?.length) { + return 0; } const nlen = data[0].length; @@ -331,7 +520,7 @@ function circPush(data: number[][], newData: number[][], maxLength = Infinity, d } if (maxDelta !== Infinity && deltaIdx >= 0) { - const deltaLookup = data[deltaIdx]; + const deltaLookup = data[deltaIdx] as number[]; const low = deltaLookup[sliceIdx]; const high = deltaLookup[nlen - 1]; @@ -341,12 +530,6 @@ function circPush(data: number[][], newData: number[][], maxLength = Infinity, d } } - if (sliceIdx) { - for (let i = 0; i < data.length; i++) { - data[i] = data[i].slice(sliceIdx); - } - } - return sliceIdx; } diff --git a/public/app/features/live/data/utils.ts b/public/app/features/live/data/utils.ts new file mode 100644 index 00000000000..5ffdc6c6380 --- /dev/null +++ b/public/app/features/live/data/utils.ts @@ -0,0 +1,55 @@ +import { DataQueryResponseData, isDataFrame } from '@grafana/data'; +import { StreamingDataFrame } from './StreamingDataFrame'; + +/** + * @alpha -- experimental + */ +export enum StreamingResponseDataType { + NewValuesSameSchema = 'NewValuesSameSchema', + FullFrame = 'FullFrame', +} + +/** + * @alpha -- experimental + */ +export type StreamingResponseDataTypeToData = { + [StreamingResponseDataType.NewValuesSameSchema]: { + values: unknown[][]; + }; + [StreamingResponseDataType.FullFrame]: { + frame: ReturnType; + }; +}; + +/** + * @alpha -- experimental + */ +export type StreamingResponseData = T extends StreamingResponseDataType + ? { + type: T; + } & StreamingResponseDataTypeToData[T] + : never; + +/** + * @alpha -- experimental + */ +export const isStreamingResponseData = ( + responseData: DataQueryResponseData, + type: T +): responseData is StreamingResponseData => 'type' in responseData && responseData.type === type; + +const AllStreamingResponseDataTypes = Object.values(StreamingResponseDataType); + +/** + * @alpha -- experimental + */ +export const isAnyStreamingResponseData = ( + responseData: DataQueryResponseData +): responseData is StreamingResponseData => + 'type' in responseData && AllStreamingResponseDataTypes.includes(responseData.type); + +/** + * @alpha -- experimental + */ +export const isStreamingDataFrame = (data: DataQueryResponseData): data is StreamingDataFrame => + isDataFrame(data) && 'packetInfo' in data; diff --git a/public/app/features/live/live.test.ts b/public/app/features/live/live.test.ts new file mode 100644 index 00000000000..490b7f9aa56 --- /dev/null +++ b/public/app/features/live/live.test.ts @@ -0,0 +1,148 @@ +import { GrafanaLiveService } from './live'; +import { StreamingDataQueryResponse } from './centrifuge/service'; +import { Subject } from 'rxjs'; +import { DataQueryResponse, FieldType, LiveChannelScope } from '@grafana/data'; +import { StreamingDataFrame } from './data/StreamingDataFrame'; +import { StreamingResponseDataType } from './data/utils'; + +describe('GrafanaLiveService', () => { + const deps = { + backendSrv: {}, + centrifugeSrv: { + getDataStream: jest.fn(), + }, + }; + + const liveService = new GrafanaLiveService(deps as any); + + const liveDataStreamOptions = { + addr: { + scope: LiveChannelScope.Grafana, + namespace: ' abc', + path: 'abc', + }, + }; + + beforeEach(() => { + jest.clearAllMocks(); + }); + + it('should map response from Centrifuge Service to a streaming data frame', async () => { + const dummySubject = new Subject(); + deps.centrifugeSrv.getDataStream.mockReturnValueOnce(dummySubject); + + let response: DataQueryResponse | undefined; + liveService.getDataStream(liveDataStreamOptions).subscribe((next) => { + response = next; + }); + + dummySubject.next({ + data: [ + { + type: StreamingResponseDataType.FullFrame, + frame: StreamingDataFrame.empty().serialize(), + }, + ], + }); + + expect(response).not.toBeUndefined(); + expect(response?.data[0]).toBeInstanceOf(StreamingDataFrame); + }); + + it('should add partial streaming data to the buffer', async () => { + const dummySubject = new Subject(); + deps.centrifugeSrv.getDataStream.mockReturnValueOnce(dummySubject); + + let response: DataQueryResponse | undefined; + liveService.getDataStream(liveDataStreamOptions).subscribe((next) => { + response = next; + }); + + dummySubject.next({ + data: [ + { + type: StreamingResponseDataType.FullFrame, + frame: StreamingDataFrame.fromDataFrameJSON({ + schema: { + fields: [ + { name: 'time', type: FieldType.time }, + { name: 'a', type: FieldType.string }, + { name: 'b', type: FieldType.number }, + ], + }, + }).serialize(), + }, + ], + }); + dummySubject.next({ + data: [ + { + type: StreamingResponseDataType.NewValuesSameSchema, + values: [ + [100, 101], + ['a', 'b'], + [1, 2], + ], + }, + ], + }); + + expect(response).not.toBeUndefined(); + const frame: StreamingDataFrame = response?.data[0]; + expect(frame).toBeInstanceOf(StreamingDataFrame); + expect(frame.fields).toEqual([ + { + config: {}, + name: 'time', + type: FieldType.time, + values: { + buffer: [100, 101], + }, + }, + { + config: {}, + name: 'a', + type: FieldType.string, + values: { + buffer: ['a', 'b'], + }, + }, + { + config: {}, + name: 'b', + type: FieldType.number, + values: { + buffer: [1, 2], + }, + }, + ]); + }); + + it('should return an empty frame if first message was not a full frame', async () => { + const dummySubject = new Subject(); + deps.centrifugeSrv.getDataStream.mockReturnValueOnce(dummySubject); + + let response: DataQueryResponse | undefined; + liveService.getDataStream(liveDataStreamOptions).subscribe((next) => { + response = next; + }); + + dummySubject.next({ + data: [ + { + type: StreamingResponseDataType.NewValuesSameSchema, + values: [ + [100, 101], + ['a', 'b'], + [1, 2], + ], + }, + ], + }); + + expect(response).not.toBeUndefined(); + const frame: StreamingDataFrame = response?.data[0]; + expect(frame).toBeInstanceOf(StreamingDataFrame); + expect(frame.fields).toEqual([]); + }); +}); diff --git a/public/app/features/live/live.ts b/public/app/features/live/live.ts index f9fd1afdda1..77407ff10f6 100644 --- a/public/app/features/live/live.ts +++ b/public/app/features/live/live.ts @@ -1,14 +1,10 @@ -import { BackendSrv, GrafanaLiveSrv, LiveDataStreamOptions } from '@grafana/runtime'; -import { CentrifugeSrv } from './centrifuge/service'; +import { BackendSrv, GrafanaLiveSrv } from '@grafana/runtime'; +import { CentrifugeSrv, StreamingDataQueryResponse } from './centrifuge/service'; -import { Observable } from 'rxjs'; -import { - DataQueryResponse, - LiveChannelAddress, - LiveChannelEvent, - LiveChannelPresenceStatus, - toLiveChannelId, -} from '@grafana/data'; +import { toLiveChannelId } from '@grafana/data'; +import { StreamingDataFrame } from './data/StreamingDataFrame'; +import { isStreamingResponseData, StreamingResponseDataType } from './data/utils'; +import { map } from 'rxjs'; type GrafanaLiveServiceDeps = { centrifugeSrv: CentrifugeSrv; @@ -21,42 +17,71 @@ export class GrafanaLiveService implements GrafanaLiveSrv { /** * Listen for changes to the connection state */ - getConnectionState(): Observable { + getConnectionState = () => { return this.deps.centrifugeSrv.getConnectionState(); - } + }; /** * Connect to a channel and return results as DataFrames */ - getDataStream(options: LiveDataStreamOptions): Observable { - return this.deps.centrifugeSrv.getDataStream(options); - } + getDataStream: GrafanaLiveSrv['getDataStream'] = (options) => { + let buffer: StreamingDataFrame; + + const updateBuffer = (next: StreamingDataQueryResponse): void => { + const data = next.data[0]; + if (!buffer && !isStreamingResponseData(data, StreamingResponseDataType.FullFrame)) { + console.warn(`expected first packet to contain a full frame, received ${data?.type}`); + return; + } + + switch (data.type) { + case StreamingResponseDataType.FullFrame: { + buffer = StreamingDataFrame.deserialize(data.frame); + return; + } + case StreamingResponseDataType.NewValuesSameSchema: { + buffer.pushNewValues(data.values); + return; + } + } + }; + + return this.deps.centrifugeSrv.getDataStream(options).pipe( + map((next) => { + updateBuffer(next); + return { + ...next, + data: [buffer ?? StreamingDataFrame.empty()], + }; + }) + ); + }; /** * Watch for messages in a channel */ - getStream(address: LiveChannelAddress): Observable> { - return this.deps.centrifugeSrv.getStream(address); - } + getStream: GrafanaLiveSrv['getStream'] = (address) => { + return this.deps.centrifugeSrv.getStream(address); + }; /** * Publish into a channel * * @alpha -- experimental */ - async publish(address: LiveChannelAddress, data: any): Promise { + publish: GrafanaLiveSrv['publish'] = async (address, data) => { return this.deps.backendSrv.post(`api/live/publish`, { channel: toLiveChannelId(address), // orgId is from user data, }); - } + }; /** * For channels that support presence, this will request the current state from the server. * * Join and leave messages will be sent to the open stream */ - async getPresence(address: LiveChannelAddress): Promise { + getPresence: GrafanaLiveSrv['getPresence'] = (address) => { return this.deps.centrifugeSrv.getPresence(address); - } + }; } diff --git a/public/app/features/live/pages/RuleTest.tsx b/public/app/features/live/pages/RuleTest.tsx index 7a64bbe0c4b..d6494ccf82e 100644 --- a/public/app/features/live/pages/RuleTest.tsx +++ b/public/app/features/live/pages/RuleTest.tsx @@ -3,7 +3,7 @@ import { Button, CodeEditor, Table, useStyles, Field } from '@grafana/ui'; import { ChannelFrame, Rule } from './types'; import { getBackendSrv, config } from '@grafana/runtime'; import { css } from '@emotion/css'; -import { getDisplayProcessor, GrafanaTheme, StreamingDataFrame } from '@grafana/data'; +import { dataFrameFromJSON, getDisplayProcessor, GrafanaTheme } from '@grafana/data'; interface Props { rule: Rule; @@ -30,7 +30,7 @@ export const RuleTest: React.FC = (props) => { if (t) { setResponse( t.map((f) => { - const frame = new StreamingDataFrame(f.frame); + const frame = dataFrameFromJSON(f.frame); for (const field of frame.fields) { field.display = getDisplayProcessor({ field, theme: config.theme2 }); } diff --git a/public/app/features/query/state/PanelQueryRunner.ts b/public/app/features/query/state/PanelQueryRunner.ts index d6a0a22a81f..020f2a32d10 100644 --- a/public/app/features/query/state/PanelQueryRunner.ts +++ b/public/app/features/query/state/PanelQueryRunner.ts @@ -36,6 +36,8 @@ import { import { getDashboardQueryRunner } from './DashboardQueryRunner/DashboardQueryRunner'; import { mergePanelAndDashData } from './mergePanelAndDashData'; import { PanelModel } from '../../dashboard/state'; +import { isStreamingDataFrame } from 'app/features/live/data/utils'; +import { StreamingDataFrame } from 'app/features/live/data/StreamingDataFrame'; export interface QueryRunnerOptions< TQuery extends DataQuery = DataQuery, @@ -56,6 +58,7 @@ export interface QueryRunnerOptions< } let counter = 100; + export function getNextRequestId() { return 'Q' + counter++; } @@ -83,11 +86,8 @@ export class PanelQueryRunner { const { withFieldConfig, withTransforms } = options; let structureRev = 1; let lastData: DataFrame[] = []; - let processedCount = 0; + let isFirstPacket = true; let lastConfigRev = -1; - const fastCompare = (a: DataFrame, b: DataFrame) => { - return compareDataFrameStructures(a, b, true); - }; if (this.dataConfigSource.snapshotData) { const snapshotPanelData: PanelData = { @@ -102,24 +102,21 @@ export class PanelQueryRunner { this.getTransformationsStream(withTransforms), map((data: PanelData) => { let processedData = data; - let sameStructure = false; + let streamingPacketWithSameSchema = false; if (withFieldConfig && data.series?.length) { - // Apply field defaults and overrides - let fieldConfig = this.dataConfigSource.getFieldOverrideOptions(); - let processFields = fieldConfig != null; + if (lastConfigRev === this.dataConfigSource.configRev) { + const streamingDataFrame = data.series.find((data) => isStreamingDataFrame(data)) as + | StreamingDataFrame + | undefined; - // If the shape is the same, we can skip field overrides - if ( - data.state === LoadingState.Streaming && - processFields && - processedCount > 0 && - lastData.length && - lastConfigRev === this.dataConfigSource.configRev - ) { - const sameTypes = compareArrayValues(lastData, processedData.series, fastCompare); - if (sameTypes) { - // Keep the previous field config settings + if ( + streamingDataFrame && + !streamingDataFrame.packetInfo.schemaChanged && + // TODO: remove the condition below after fixing + // https://github.com/grafana/grafana/pull/41492#issuecomment-970281430 + lastData[0].fields.length === streamingDataFrame.fields.length + ) { processedData = { ...processedData, series: lastData.map((frame, frameIndex) => ({ @@ -131,20 +128,21 @@ export class PanelQueryRunner { state: { ...field.state, calcs: undefined, - // add global range calculation here? (not optimal for streaming) range: undefined, }, })), })), }; - processFields = false; - sameStructure = true; + + streamingPacketWithSameSchema = true; } } - if (processFields) { + // Apply field defaults and overrides + let fieldConfig = this.dataConfigSource.getFieldOverrideOptions(); + + if (fieldConfig != null && (isFirstPacket || !streamingPacketWithSameSchema)) { lastConfigRev = this.dataConfigSource.configRev!; - processedCount++; // results with data processedData = { ...processedData, series: applyFieldOverrides({ @@ -153,16 +151,16 @@ export class PanelQueryRunner { ...fieldConfig!, }), }; + isFirstPacket = false; } } - if (!sameStructure) { - sameStructure = compareArrayValues(lastData, processedData.series, compareDataFrameStructures); - } - if (!sameStructure) { + if ( + !streamingPacketWithSameSchema && + !compareArrayValues(lastData, processedData.series, compareDataFrameStructures) + ) { structureRev++; } - lastData = processedData.series; return { ...processedData, structureRev }; diff --git a/public/app/features/query/state/runRequest.ts b/public/app/features/query/state/runRequest.ts index 6f47d4acb95..1569a56735a 100644 --- a/public/app/features/query/state/runRequest.ts +++ b/public/app/features/query/state/runRequest.ts @@ -181,6 +181,19 @@ export function callQueryMethod( return from(returnVal); } +function getProcessedDataFrame(data: DataQueryResponseData): DataFrame { + const dataFrame = guessFieldTypes(toDataFrame(data)); + + if (dataFrame.fields && dataFrame.fields.length) { + // clear out the cached info + for (const field of dataFrame.fields) { + field.state = null; + } + } + + return dataFrame; +} + /** * All panels will be passed tables that have our best guess at column type set * @@ -191,22 +204,7 @@ export function getProcessedDataFrames(results?: DataQueryResponseData[]): DataF return []; } - const dataFrames: DataFrame[] = []; - - for (const result of results) { - const dataFrame = guessFieldTypes(toDataFrame(result)); - - if (dataFrame.fields && dataFrame.fields.length) { - // clear out the cached info - for (const field of dataFrame.fields) { - field.state = null; - } - } - - dataFrames.push(dataFrame); - } - - return dataFrames; + return results.map((data) => getProcessedDataFrame(data)); } export function preProcessPanelData(data: PanelData, lastResult?: PanelData): PanelData { @@ -227,7 +225,7 @@ export function preProcessPanelData(data: PanelData, lastResult?: PanelData): Pa // Make sure the data frames are properly formatted const STARTTIME = performance.now(); - const processedDataFrames = getProcessedDataFrames(series); + const processedDataFrames = series.map((data) => getProcessedDataFrame(data)); const annotationsProcessed = getProcessedDataFrames(annotations); const STOPTIME = performance.now(); diff --git a/public/app/plugins/datasource/grafana/datasource.ts b/public/app/plugins/datasource/grafana/datasource.ts index feff2953581..f7ba1a33289 100644 --- a/public/app/plugins/datasource/grafana/datasource.ts +++ b/public/app/plugins/datasource/grafana/datasource.ts @@ -1,5 +1,11 @@ import { from, merge, Observable, of } from 'rxjs'; -import { DataSourceWithBackend, getBackendSrv, getGrafanaLiveSrv, getTemplateSrv } from '@grafana/runtime'; +import { + DataSourceWithBackend, + getBackendSrv, + getGrafanaLiveSrv, + getTemplateSrv, + StreamingFrameOptions, +} from '@grafana/runtime'; import { AnnotationQuery, AnnotationQueryRequest, @@ -10,7 +16,6 @@ import { DataSourceRef, isValidLiveChannelAddress, parseLiveChannelAddress, - StreamingFrameOptions, toDataFrame, } from '@grafana/data'; @@ -88,7 +93,7 @@ export class GrafanaDatasource extends DataSourceWithBackend { if (!isValidLiveChannelAddress(addr)) { continue; } - const buffer: StreamingFrameOptions = { + const buffer: Partial = { maxLength: request.maxDataPoints ?? 500, }; if (target.buffer) { diff --git a/public/app/plugins/datasource/testdata/runStreams.ts b/public/app/plugins/datasource/testdata/runStreams.ts index e7a2bf3f316..943406fe74c 100644 --- a/public/app/plugins/datasource/testdata/runStreams.ts +++ b/public/app/plugins/datasource/testdata/runStreams.ts @@ -9,7 +9,6 @@ import { CSVReader, Field, LoadingState, - StreamingDataFrame, DataFrameSchema, DataFrameData, } from '@grafana/data'; @@ -17,6 +16,7 @@ import { import { TestDataQuery, StreamingQuery } from './types'; import { getRandomLine } from './LogIpsum'; import { liveTimer } from 'app/features/dashboard/dashgrid/liveTimer'; +import { StreamingDataFrame } from 'app/features/live/data/StreamingDataFrame'; export const defaultStreamQuery: StreamingQuery = { type: 'signal', @@ -65,7 +65,7 @@ export function runSignalStream( schema.fields.push({ name: 'Max' + suffix, type: FieldType.number }); } - const frame = new StreamingDataFrame({ schema }, { maxLength: maxDataPoints }); + const frame = StreamingDataFrame.fromDataFrameJSON({ schema }, { maxLength: maxDataPoints }); let value = Math.random() * 100; let timeoutId: any = null; diff --git a/public/app/plugins/panel/live/LivePanel.tsx b/public/app/plugins/panel/live/LivePanel.tsx index 33ec45ca85e..b84332f7729 100755 --- a/public/app/plugins/panel/live/LivePanel.tsx +++ b/public/app/plugins/panel/live/LivePanel.tsx @@ -13,7 +13,6 @@ import { PanelData, LoadingState, applyFieldOverrides, - StreamingDataFrame, LiveChannelAddress, } from '@grafana/data'; import { TablePanel } from '../table/TablePanel'; @@ -21,6 +20,7 @@ import { LivePanelOptions, MessageDisplayMode } from './types'; import { config, getGrafanaLiveSrv } from '@grafana/runtime'; import { css, cx } from '@emotion/css'; import { isEqual } from 'lodash'; +import { StreamingDataFrame } from 'app/features/live/data/StreamingDataFrame'; interface Props extends PanelProps {} diff --git a/public/app/plugins/panel/state-timeline/StateTimelinePanel.tsx b/public/app/plugins/panel/state-timeline/StateTimelinePanel.tsx index 6c2fb9100e9..f998bbaf660 100755 --- a/public/app/plugins/panel/state-timeline/StateTimelinePanel.tsx +++ b/public/app/plugins/panel/state-timeline/StateTimelinePanel.tsx @@ -5,7 +5,7 @@ import { TimelineMode, TimelineOptions } from './types'; import { TimelineChart } from './TimelineChart'; import { prepareTimelineFields, prepareTimelineLegendItems } from './utils'; import { StateTimelineTooltip } from './StateTimelineTooltip'; -import { getLastStreamingDataFramePacket } from '@grafana/data/src/dataframe/StreamingDataFrame'; +import { getLastStreamingDataFramePacket } from 'app/features/live/data/StreamingDataFrame'; interface TimelinePanelProps extends PanelProps {}