Live: keep stream history on 'refresh' (#41492)

Co-authored-by: Ryan McKinley <ryantxu@gmail.com>
This commit is contained in:
Artur Wierzbicki 2021-12-15 23:01:55 +04:00 committed by GitHub
parent bb3b5c10e7
commit 84ae13fe5d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 2060 additions and 291 deletions

View File

@ -6,6 +6,5 @@ export * from './processDataFrame';
export * from './dimensions';
export * from './ArrayDataFrame';
export * from './DataFrameJSON';
export { StreamingDataFrame, StreamingFrameOptions, StreamingFrameAction } from './StreamingDataFrame';
export * from './frameComparisons';
export { anySeriesWithTimeField } from './utils';

View File

@ -470,7 +470,12 @@ export function getDataFrameRow(data: DataFrame, row: number): any[] {
* Returns a copy that does not include functions
*/
export function toDataFrameDTO(data: DataFrame): DataFrameDTO {
const fields: FieldDTO[] = data.fields.map((f) => {
return toFilteredDataFrameDTO(data);
}
export function toFilteredDataFrameDTO(data: DataFrame, fieldPredicate?: (f: Field) => boolean): DataFrameDTO {
const filteredFields = fieldPredicate ? data.fields.filter(fieldPredicate) : data.fields;
const fields: FieldDTO[] = filteredFields.map((f) => {
let values = f.values.toArray();
// The byte buffers serialize like objects
if (values instanceof Float64Array) {

View File

@ -125,6 +125,11 @@ export interface LiveChannelPresenceStatus {
users: any; // @alpha -- experimental -- will be filled in when we improve the UI
}
/**
* @alpha -- experimental
*/
export type LiveChannelId = string;
/**
* @alpha -- experimental
*/
@ -174,7 +179,7 @@ export function isValidLiveChannelAddress(addr?: LiveChannelAddress): addr is Li
*
* @alpha -- experimental
*/
export function toLiveChannelId(addr: LiveChannelAddress): string {
export function toLiveChannelId(addr: LiveChannelAddress): LiveChannelId {
if (!addr.scope) {
return '';
}

View File

@ -4,7 +4,6 @@ import {
LiveChannelAddress,
LiveChannelEvent,
LiveChannelPresenceStatus,
StreamingFrameOptions,
} from '@grafana/data';
import { Observable } from 'rxjs';
@ -15,6 +14,25 @@ export interface LiveDataFilter {
fields?: string[];
}
/**
* Indicate if the frame is appened or replace
*
* @alpha
*/
export enum StreamingFrameAction {
Append = 'append',
Replace = 'replace',
}
/**
* @alpha
*/
export interface StreamingFrameOptions {
maxLength: number; // 1000
maxDelta: number; // how long to keep things
action: StreamingFrameAction; // default will append
}
/**
* @alpha
*/
@ -22,7 +40,7 @@ export interface LiveDataStreamOptions {
addr: LiveChannelAddress;
frame?: DataFrame; // initial results
key?: string;
buffer?: StreamingFrameOptions;
buffer?: Partial<StreamingFrameOptions>;
filter?: LiveDataFilter;
}

View File

@ -9,14 +9,18 @@ import {
makeClassES5Compatible,
DataFrame,
parseLiveChannelAddress,
StreamingFrameOptions,
StreamingFrameAction,
getDataSourceRef,
DataSourceRef,
} from '@grafana/data';
import { merge, Observable, of } from 'rxjs';
import { catchError, switchMap } from 'rxjs/operators';
import { getBackendSrv, getDataSourceSrv, getGrafanaLiveSrv } from '../services';
import {
getBackendSrv,
getDataSourceSrv,
getGrafanaLiveSrv,
StreamingFrameOptions,
StreamingFrameAction,
} from '../services';
import { BackendDataSourceResponse, toDataQueryResponse } from './queryResponse';
/**
@ -250,7 +254,7 @@ class DataSourceWithBackend<
export function toStreamingDataResponse<TQuery extends DataQuery = DataQuery>(
rsp: DataQueryResponse,
req: DataQueryRequest<TQuery>,
getter: (req: DataQueryRequest<TQuery>, frame: DataFrame) => StreamingFrameOptions
getter: (req: DataQueryRequest<TQuery>, frame: DataFrame) => Partial<StreamingFrameOptions>
): Observable<DataQueryResponse> {
const live = getGrafanaLiveSrv();
if (!live) {
@ -291,22 +295,22 @@ export function toStreamingDataResponse<TQuery extends DataQuery = DataQuery>(
export type StreamOptionsProvider<TQuery extends DataQuery = DataQuery> = (
request: DataQueryRequest<TQuery>,
frame: DataFrame
) => StreamingFrameOptions;
) => Partial<StreamingFrameOptions>;
/**
* @public
*/
export const standardStreamOptionsProvider: StreamOptionsProvider = (request: DataQueryRequest, frame: DataFrame) => {
const buffer: StreamingFrameOptions = {
const opts: Partial<StreamingFrameOptions> = {
maxLength: request.maxDataPoints ?? 500,
action: StreamingFrameAction.Append,
};
// For recent queries, clamp to the current time range
if (request.rangeRaw?.to === 'now') {
buffer.maxDelta = request.range.to.valueOf() - request.range.from.valueOf();
opts.maxDelta = request.range.to.valueOf() - request.range.from.valueOf();
}
return buffer;
return opts;
};
//@ts-ignore

View File

@ -0,0 +1,777 @@
import {
DataFrameJSON,
DataQueryResponse,
FieldType,
LiveChannelAddress,
LiveChannelConnectionState,
LiveChannelEvent,
LiveChannelEventType,
LiveChannelLeaveEvent,
LiveChannelScope,
LoadingState,
} from '@grafana/data';
import { Observable, Subject, Subscription, Unsubscribable } from 'rxjs';
import { DataStreamHandlerDeps, LiveDataStream } from './LiveDataStream';
import { mapValues } from 'lodash';
import { StreamingFrameAction } from '@grafana/runtime';
import { isStreamingResponseData, StreamingResponseData, StreamingResponseDataType } from '../data/utils';
import { StreamingDataFrame } from '../data/StreamingDataFrame';
type SubjectsInsteadOfObservables<T> = {
[key in keyof T]: T[key] extends Observable<infer U> ? Subject<U> : T[key];
};
type DepsWithSubjectsInsteadOfObservables<T = any> = SubjectsInsteadOfObservables<DataStreamHandlerDeps<T>>;
const createDeps = <T = any>(
overrides?: Partial<DepsWithSubjectsInsteadOfObservables<T>>
): DepsWithSubjectsInsteadOfObservables<T> => {
return {
channelId: 'channel-1',
liveEventsObservable: new Subject(),
onShutdown: jest.fn(),
subscriberReadiness: new Subject(),
defaultStreamingFrameOptions: { maxLength: 100, maxDelta: Infinity, action: StreamingFrameAction.Append },
shutdownDelayInMs: 1000,
...(overrides ?? {}),
};
};
class ValuesCollection<T> implements Unsubscribable {
values: T[] = [];
errors: any[] = [];
receivedComplete = false;
subscription: Subscription | undefined;
valuesCount = () => this.values.length;
subscribeTo = (obs: Observable<T>) => {
if (this.subscription) {
throw new Error(`can't subscribe twice!`);
}
this.subscription = obs.subscribe({
next: (n) => {
this.values.push(n);
},
error: (err) => {
this.errors.push(err);
},
complete: () => {
this.receivedComplete = true;
},
});
};
get complete() {
return this.receivedComplete || this.subscription?.closed;
}
unsubscribe = () => {
this.subscription?.unsubscribe();
};
lastValue = () => {
if (!this.values.length) {
throw new Error(`no values available in ${JSON.stringify(this)}`);
}
return this.values[this.values.length - 1];
};
lastError = () => {
if (!this.errors.length) {
throw new Error(`no errors available in ${JSON.stringify(this)}`);
}
return this.errors[this.errors.length - 1];
};
}
const liveChannelMessageEvent = <T extends DataFrameJSON>(message: T): LiveChannelEvent<T> => ({
type: LiveChannelEventType.Message,
message,
});
const liveChannelLeaveEvent = (): LiveChannelLeaveEvent => ({
type: LiveChannelEventType.Leave,
user: '',
});
const liveChannelStatusEvent = (state: LiveChannelConnectionState, error?: Error): LiveChannelEvent => ({
type: LiveChannelEventType.Status,
state,
error,
id: '',
timestamp: 1,
});
const fieldsOf = (data: StreamingResponseData<StreamingResponseDataType.FullFrame>) => {
return data.frame.fields.map((f) => ({
name: f.name,
values: f.values,
}));
};
const dummyErrorMessage = 'dummy-error';
describe('LiveDataStream', () => {
jest.useFakeTimers();
const expectValueCollectionState = <T>(
valuesCollection: ValuesCollection<T>,
state: { errors: number; values: number; complete: boolean }
) => {
expect(valuesCollection.values).toHaveLength(state.values);
expect(valuesCollection.errors).toHaveLength(state.errors);
expect(valuesCollection.complete).toEqual(state.complete);
};
const expectResponse = <T extends StreamingResponseDataType>(state: LoadingState) => (
res: DataQueryResponse,
streamingDataType: T
) => {
expect(res.state).toEqual(state);
expect(res.data).toHaveLength(1);
const firstData = res.data[0];
expect(isStreamingResponseData(firstData, streamingDataType)).toEqual(true);
};
const expectStreamingResponse = expectResponse(LoadingState.Streaming);
const expectErrorResponse = expectResponse(LoadingState.Error);
const dummyLiveChannelAddress: LiveChannelAddress = {
scope: LiveChannelScope.Grafana,
namespace: 'stream',
path: 'abc',
};
const subscriptionKey = 'subKey';
const liveDataStreamOptions = {
withTimeBFilter: {
addr: dummyLiveChannelAddress,
buffer: {
maxLength: 2,
maxDelta: 10,
action: StreamingFrameAction.Append,
},
filter: {
fields: ['time', 'b'],
},
},
withTimeAFilter: {
addr: dummyLiveChannelAddress,
buffer: {
maxLength: 3,
maxDelta: 10,
action: StreamingFrameAction.Append,
},
filter: {
fields: ['time', 'a'],
},
},
withoutFilter: {
addr: dummyLiveChannelAddress,
buffer: {
maxLength: 4,
maxDelta: 10,
action: StreamingFrameAction.Append,
},
},
};
const dataFrameJsons = {
schema1: () => ({
schema: {
fields: [
{ name: 'time', type: FieldType.time },
{ name: 'a', type: FieldType.string },
{ name: 'b', type: FieldType.number },
],
},
data: {
values: [
[100, 101],
['a', 'b'],
[1, 2],
],
},
}),
schema1newValues: () => ({
data: {
values: [[102], ['c'], [3]],
},
}),
schema2: () => ({
schema: {
fields: [
{ name: 'time', type: FieldType.time },
{ name: 'a', type: FieldType.string },
{ name: 'b', type: FieldType.string },
],
},
data: {
values: [[103], ['x'], ['y']],
},
}),
schema2newValues: () => ({
data: {
values: [[104], ['w'], ['o']],
},
}),
};
describe('happy path with a single subscriber', () => {
let deps: ReturnType<typeof createDeps>;
let liveDataStream: LiveDataStream<any>;
const valuesCollection = new ValuesCollection<DataQueryResponse>();
beforeAll(() => {
deps = createDeps();
expect(deps.liveEventsObservable.observed).toBeFalsy();
expect(deps.subscriberReadiness.observed).toBeFalsy();
liveDataStream = new LiveDataStream(deps);
});
it('should subscribe to live events observable immediately after creation', async () => {
expect(deps.liveEventsObservable.observed).toBeTruthy();
});
it('should not subscribe to subscriberReadiness observable until first subscription', async () => {
expect(deps.subscriberReadiness.observed).toBeFalsy();
});
it('should subscribe to subscriberReadiness observable on first subscription and return observable without any values', async () => {
const observable = liveDataStream.get(liveDataStreamOptions.withTimeBFilter, subscriptionKey);
valuesCollection.subscribeTo(observable);
//then
expect(deps.subscriberReadiness.observed).toBeTruthy();
expectValueCollectionState(valuesCollection, { errors: 0, values: 0, complete: false });
});
it('should emit the first live channel message event as a serialized streamingDataFrame', async () => {
const valuesCount = valuesCollection.valuesCount();
deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1()));
expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount + 1, complete: false });
const response = valuesCollection.lastValue();
expectStreamingResponse(response, StreamingResponseDataType.FullFrame);
const data = response.data[0] as StreamingResponseData<StreamingResponseDataType.FullFrame>;
expect(data.frame.options).toEqual(liveDataStreamOptions.withTimeBFilter.buffer);
const deserializedFrame = StreamingDataFrame.deserialize(data.frame);
expect(deserializedFrame.fields).toEqual([
{
config: {},
name: 'time',
type: 'time',
values: {
buffer: [100, 101],
},
},
{
config: {},
name: 'b',
type: 'number',
values: {
buffer: [1, 2],
},
},
]);
expect(deserializedFrame.length).toEqual(dataFrameJsons.schema1().data.values[0].length);
});
it('should emit subsequent messages as deltas if the schema stays the same', async () => {
const valuesCount = valuesCollection.valuesCount();
deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1newValues()));
expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount + 1, complete: false });
const response = valuesCollection.lastValue();
expectStreamingResponse(response, StreamingResponseDataType.NewValuesSameSchema);
const data = response.data[0] as StreamingResponseData<StreamingResponseDataType.NewValuesSameSchema>;
expect(data.values).toEqual([[102], [3]]);
});
it('should emit a full frame if schema changes', async () => {
const valuesCount = valuesCollection.valuesCount();
deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema2()));
expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount + 1, complete: false });
const response = valuesCollection.lastValue();
expectStreamingResponse(response, StreamingResponseDataType.FullFrame);
const data = response.data[0] as StreamingResponseData<StreamingResponseDataType.FullFrame>;
expect(fieldsOf(data)).toEqual([
{
name: 'time',
values: [102, 103],
},
{
name: 'b',
values: [undefined, 'y'], // bug in streamingDataFrame - fix!
},
]);
});
it('should emit a full frame if received a status live channel event with error', async () => {
const valuesCount = valuesCollection.valuesCount();
const error = new Error(`oh no!`);
deps.liveEventsObservable.next(liveChannelStatusEvent(LiveChannelConnectionState.Connected, error));
expectValueCollectionState(valuesCollection, {
errors: 0,
values: valuesCount + 1,
complete: false,
});
const response = valuesCollection.lastValue();
expectErrorResponse(response, StreamingResponseDataType.FullFrame);
});
it('should buffer new values until subscriber is ready', async () => {
const valuesCount = valuesCollection.valuesCount();
deps.subscriberReadiness.next(false);
deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema2newValues()));
expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false });
deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema2newValues()));
expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false });
deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema2newValues()));
expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false });
deps.subscriberReadiness.next(true);
expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount + 1, complete: false });
const response = valuesCollection.lastValue();
expectStreamingResponse(response, StreamingResponseDataType.NewValuesSameSchema);
const data = response.data[0] as StreamingResponseData<StreamingResponseDataType.NewValuesSameSchema>;
expect(data.values).toEqual([
[104, 104, 104],
['o', 'o', 'o'],
]);
});
it(`should reduce buffer to a full frame if schema changed at any point during subscriber's unavailability`, async () => {
const valuesCount = valuesCollection.valuesCount();
deps.subscriberReadiness.next(false);
deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema2newValues()));
expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false });
deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema2newValues()));
expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false });
deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1()));
expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false });
deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1newValues()));
expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false });
deps.subscriberReadiness.next(true);
expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount + 1, complete: false });
const response = valuesCollection.lastValue();
expectStreamingResponse(response, StreamingResponseDataType.FullFrame);
expect(fieldsOf(response.data[0])).toEqual([
{
name: 'time',
values: [101, 102],
},
{
name: 'b',
values: [2, 3],
},
]);
});
it(`should reduce buffer to a full frame with last error if one or more errors occur during subscriber's unavailability`, async () => {
const firstError = new Error('first error');
const secondError = new Error(dummyErrorMessage);
const valuesCount = valuesCollection.valuesCount();
deps.subscriberReadiness.next(false);
deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1newValues()));
deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1newValues()));
deps.liveEventsObservable.next(liveChannelStatusEvent(LiveChannelConnectionState.Connected, firstError));
deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1newValues()));
deps.liveEventsObservable.next(liveChannelStatusEvent(LiveChannelConnectionState.Connected, secondError));
deps.subscriberReadiness.next(true);
expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount + 1, complete: false });
const response = valuesCollection.lastValue();
expectErrorResponse(response, StreamingResponseDataType.FullFrame);
const errorMessage = response?.error?.message;
expect(errorMessage?.includes(dummyErrorMessage)).toBeTruthy();
expect(fieldsOf(response.data[0])).toEqual([
{
name: 'time',
values: [102, 102],
},
{
name: 'b',
values: [3, 3],
},
]);
});
it('should ignore messages without payload', async () => {
const valuesCount = valuesCollection.valuesCount();
deps.liveEventsObservable.next(liveChannelStatusEvent(LiveChannelConnectionState.Connected));
deps.liveEventsObservable.next(liveChannelStatusEvent(LiveChannelConnectionState.Pending));
deps.liveEventsObservable.next(liveChannelStatusEvent(LiveChannelConnectionState.Pending));
deps.liveEventsObservable.next(liveChannelLeaveEvent());
expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false });
});
it(`should shutdown when source observable completes`, async () => {
expect(deps.onShutdown).not.toHaveBeenCalled();
expect(deps.subscriberReadiness.observed).toBeTruthy();
expect(deps.liveEventsObservable.observed).toBeTruthy();
deps.liveEventsObservable.complete();
expectValueCollectionState(valuesCollection, {
errors: 0,
values: valuesCollection.valuesCount(),
complete: true,
});
expect(deps.subscriberReadiness.observed).toBeFalsy();
expect(deps.liveEventsObservable.observed).toBeFalsy();
expect(deps.onShutdown).toHaveBeenCalled();
});
});
describe('single subscriber with initial frame', () => {
it('should emit the initial frame right after subscribe', async () => {
const deps = createDeps();
const liveDataStream = new LiveDataStream(deps);
const valuesCollection = new ValuesCollection<DataQueryResponse>();
const initialFrame = StreamingDataFrame.fromDataFrameJSON(dataFrameJsons.schema2());
const observable = liveDataStream.get(
{ ...liveDataStreamOptions.withTimeBFilter, frame: initialFrame },
subscriptionKey
);
valuesCollection.subscribeTo(observable);
//then
expect(deps.subscriberReadiness.observed).toBeTruthy();
expectValueCollectionState(valuesCollection, { errors: 0, values: 1, complete: false });
const response = valuesCollection.lastValue();
expectStreamingResponse(response, StreamingResponseDataType.FullFrame);
const data = response.data[0] as StreamingResponseData<StreamingResponseDataType.FullFrame>;
expect(fieldsOf(data)).toEqual([
{
name: 'time',
values: [103],
},
{
name: 'b',
values: ['y'], // bug in streamingDataFrame - fix!
},
]);
});
});
describe('two subscribers with initial frames', () => {
it('should ignore initial frame from second subscriber', async () => {
const deps = createDeps();
const liveDataStream = new LiveDataStream(deps);
const valuesCollection = new ValuesCollection<DataQueryResponse>();
const valuesCollection2 = new ValuesCollection<DataQueryResponse>();
valuesCollection.subscribeTo(
liveDataStream.get(
{
...liveDataStreamOptions.withTimeBFilter,
frame: StreamingDataFrame.fromDataFrameJSON(dataFrameJsons.schema1()),
},
subscriptionKey
)
);
expectValueCollectionState(valuesCollection, { errors: 0, values: 1, complete: false });
valuesCollection2.subscribeTo(
liveDataStream.get(
{
...liveDataStreamOptions.withTimeBFilter,
frame: StreamingDataFrame.fromDataFrameJSON(dataFrameJsons.schema2()),
},
subscriptionKey
)
);
// no extra emits for initial subscriber
expectValueCollectionState(valuesCollection, { errors: 0, values: 1, complete: false });
expectValueCollectionState(valuesCollection2, { errors: 0, values: 1, complete: false });
const frame1 = fieldsOf(valuesCollection.lastValue().data[0]);
const frame2 = fieldsOf(valuesCollection2.lastValue().data[0]);
expect(frame1).toEqual(frame2);
});
});
describe('source observable emits completed event', () => {
it('should shutdown', async () => {
const deps = createDeps();
const liveDataStream = new LiveDataStream(deps);
const valuesCollection = new ValuesCollection<DataQueryResponse>();
const observable = liveDataStream.get(liveDataStreamOptions.withTimeAFilter, subscriptionKey);
valuesCollection.subscribeTo(observable);
expect(deps.subscriberReadiness.observed).toBeTruthy();
expect(deps.liveEventsObservable.observed).toBeTruthy();
expect(deps.onShutdown).not.toHaveBeenCalled();
deps.liveEventsObservable.complete();
expectValueCollectionState(valuesCollection, {
errors: 0,
values: 0,
complete: true,
});
expect(deps.subscriberReadiness.observed).toBeFalsy();
expect(deps.liveEventsObservable.observed).toBeFalsy();
expect(deps.onShutdown).toHaveBeenCalled();
});
});
describe('source observable emits error event', () => {
it('should shutdown', async () => {
const deps = createDeps();
const liveDataStream = new LiveDataStream(deps);
const valuesCollection = new ValuesCollection<DataQueryResponse>();
const observable = liveDataStream.get(liveDataStreamOptions.withTimeAFilter, subscriptionKey);
valuesCollection.subscribeTo(observable);
expect(deps.subscriberReadiness.observed).toBeTruthy();
expect(deps.liveEventsObservable.observed).toBeTruthy();
expect(deps.onShutdown).not.toHaveBeenCalled();
deps.liveEventsObservable.error(new Error(dummyErrorMessage));
expectValueCollectionState(valuesCollection, {
errors: 0,
values: 1,
complete: true,
});
const response = valuesCollection.lastValue();
expectErrorResponse(response, StreamingResponseDataType.FullFrame);
expect(response?.error?.message?.includes(dummyErrorMessage)).toBeTruthy();
expect(deps.subscriberReadiness.observed).toBeFalsy();
expect(deps.liveEventsObservable.observed).toBeFalsy();
expect(deps.onShutdown).toHaveBeenCalled();
});
});
describe('happy path with multiple subscribers', () => {
let deps: ReturnType<typeof createDeps>;
let liveDataStream: LiveDataStream<any>;
const valuesCollections = {
withTimeBFilter: new ValuesCollection<DataQueryResponse>(),
withTimeAFilter: new ValuesCollection<DataQueryResponse>(),
withoutFilter: new ValuesCollection<DataQueryResponse>(),
};
beforeAll(() => {
deps = createDeps();
liveDataStream = new LiveDataStream(deps);
});
it('should emit the last value as full frame to new subscribers', async () => {
valuesCollections.withTimeAFilter.subscribeTo(
liveDataStream.get(liveDataStreamOptions.withTimeAFilter, subscriptionKey)
);
deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1()));
deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1newValues()));
expectValueCollectionState(valuesCollections.withTimeAFilter, { errors: 0, values: 2, complete: false });
valuesCollections.withTimeBFilter.subscribeTo(
liveDataStream.get(liveDataStreamOptions.withTimeBFilter, subscriptionKey)
);
valuesCollections.withoutFilter.subscribeTo(
liveDataStream.get(liveDataStreamOptions.withoutFilter, subscriptionKey)
);
console.log(JSON.stringify(valuesCollections.withTimeAFilter, null, 2));
expectValueCollectionState(valuesCollections.withTimeAFilter, { errors: 0, values: 2, complete: false });
expectValueCollectionState(valuesCollections.withTimeBFilter, { errors: 0, values: 1, complete: false });
expectValueCollectionState(valuesCollections.withoutFilter, { errors: 0, values: 1, complete: false });
});
it('should emit filtered data to each subscriber', async () => {
deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1newValues()));
expect(
mapValues(valuesCollections, (collection) =>
collection.values.map((response) => {
const data = response.data[0];
return isStreamingResponseData(data, StreamingResponseDataType.FullFrame)
? fieldsOf(data)
: isStreamingResponseData(data, StreamingResponseDataType.NewValuesSameSchema)
? data.values
: response;
})
)
).toEqual({
withTimeAFilter: [
[
{
name: 'time',
values: [100, 101],
},
{
name: 'a',
values: ['a', 'b'],
},
],
[[102], ['c']],
[[102], ['c']],
],
withTimeBFilter: [
[
{
name: 'time',
values: [101, 102],
},
{
name: 'b',
values: [2, 3],
},
],
[[102], [3]],
],
withoutFilter: [
[
{
name: 'time',
values: [100, 101, 102],
},
{
name: 'a',
values: ['a', 'b', 'c'],
},
{
name: 'b',
values: [1, 2, 3],
},
],
[[102], ['c'], [3]],
],
});
});
it('should not unsubscribe the source observable unless all subscribers unsubscribe', async () => {
valuesCollections.withTimeAFilter.unsubscribe();
jest.advanceTimersByTime(deps.shutdownDelayInMs + 1);
expect(mapValues(valuesCollections, (coll) => coll.complete)).toEqual({
withTimeAFilter: true,
withTimeBFilter: false,
withoutFilter: false,
});
expect(deps.subscriberReadiness.observed).toBeTruthy();
expect(deps.liveEventsObservable.observed).toBeTruthy();
expect(deps.onShutdown).not.toHaveBeenCalled();
});
it('should emit complete event to all subscribers during shutdown', async () => {
deps.liveEventsObservable.complete();
expect(mapValues(valuesCollections, (coll) => coll.complete)).toEqual({
withTimeAFilter: true,
withTimeBFilter: true,
withoutFilter: true,
});
expect(deps.subscriberReadiness.observed).toBeFalsy();
expect(deps.liveEventsObservable.observed).toBeFalsy();
expect(deps.onShutdown).toHaveBeenCalled();
});
});
describe('shutdown after unsubscribe', () => {
it('should shutdown if no other subscriber subscribed during shutdown delay', async () => {
const deps = createDeps();
const liveDataStream = new LiveDataStream(deps);
const valuesCollection = new ValuesCollection<DataQueryResponse>();
valuesCollection.subscribeTo(liveDataStream.get(liveDataStreamOptions.withTimeAFilter, subscriptionKey));
expect(deps.subscriberReadiness.observed).toBeTruthy();
expect(deps.liveEventsObservable.observed).toBeTruthy();
expect(deps.onShutdown).not.toHaveBeenCalled();
valuesCollection.unsubscribe();
jest.advanceTimersByTime(deps.shutdownDelayInMs - 1);
// delay not finished - should still be subscribed
expect(deps.subscriberReadiness.observed).toBeFalsy();
expect(deps.liveEventsObservable.observed).toBeTruthy();
expect(deps.onShutdown).not.toHaveBeenCalled();
jest.advanceTimersByTime(2);
// delay not finished - shut still be subscribed
expect(deps.subscriberReadiness.observed).toBeFalsy();
expect(deps.liveEventsObservable.observed).toBeFalsy();
expect(deps.onShutdown).toHaveBeenCalled();
});
it('should not shutdown after unsubscribe if another subscriber subscribes during shutdown delay', async () => {
const deps = createDeps();
const liveDataStream = new LiveDataStream(deps);
const valuesCollection1 = new ValuesCollection<DataQueryResponse>();
const valuesCollection2 = new ValuesCollection<DataQueryResponse>();
valuesCollection1.subscribeTo(liveDataStream.get(liveDataStreamOptions.withTimeAFilter, subscriptionKey));
expect(deps.subscriberReadiness.observed).toBeTruthy();
expect(deps.liveEventsObservable.observed).toBeTruthy();
expect(deps.onShutdown).not.toHaveBeenCalled();
valuesCollection1.unsubscribe();
jest.advanceTimersByTime(deps.shutdownDelayInMs - 1);
valuesCollection2.subscribeTo(liveDataStream.get(liveDataStreamOptions.withTimeAFilter, subscriptionKey));
jest.advanceTimersByTime(deps.shutdownDelayInMs);
expect(deps.subscriberReadiness.observed).toBeTruthy();
expect(deps.liveEventsObservable.observed).toBeTruthy();
expect(deps.onShutdown).not.toHaveBeenCalled();
});
});
});

View File

@ -0,0 +1,325 @@
import type { LiveDataStreamOptions, StreamingFrameOptions } from '@grafana/runtime/src/services/live';
import { toDataQueryError } from '@grafana/runtime/src/utils/toDataQueryError';
import {
DataFrameJSON,
dataFrameToJSON,
DataQueryError,
Field,
isLiveChannelMessageEvent,
isLiveChannelStatusEvent,
LiveChannelConnectionState,
LiveChannelEvent,
LiveChannelId,
LoadingState,
} from '@grafana/data';
import { map, Observable, ReplaySubject, Subject, Subscriber, Subscription } from 'rxjs';
import { DataStreamSubscriptionKey, StreamingDataQueryResponse } from './service';
import { getStreamingFrameOptions, StreamingDataFrame } from '../data/StreamingDataFrame';
import { StreamingResponseDataType } from '../data/utils';
const bufferIfNot = (canEmitObservable: Observable<boolean>) => <T>(source: Observable<T>): Observable<T[]> => {
return new Observable((subscriber: Subscriber<T[]>) => {
let buffer: T[] = [];
let canEmit = true;
const emitBuffer = () => {
subscriber.next(buffer);
buffer = [];
};
const canEmitSub = canEmitObservable.subscribe({
next: (val) => {
canEmit = val;
if (canEmit && buffer.length) {
emitBuffer();
}
},
});
const sourceSub = source.subscribe({
next(value) {
if (canEmit) {
if (!buffer.length) {
subscriber.next([value]);
} else {
emitBuffer();
}
} else {
buffer.push(value);
}
},
error(error) {
subscriber.error(error);
},
complete() {
subscriber.complete();
},
});
return () => {
sourceSub.unsubscribe();
canEmitSub.unsubscribe();
};
});
};
export type DataStreamHandlerDeps<T> = {
channelId: LiveChannelId;
liveEventsObservable: Observable<LiveChannelEvent<T>>;
onShutdown: () => void;
subscriberReadiness: Observable<boolean>;
defaultStreamingFrameOptions: Readonly<StreamingFrameOptions>;
shutdownDelayInMs: number;
};
enum InternalStreamMessageType {
Error,
NewValuesSameSchema,
ChangedSchema,
}
type InternalStreamMessageTypeToData = {
[InternalStreamMessageType.Error]: {
error: DataQueryError;
};
[InternalStreamMessageType.ChangedSchema]: {};
[InternalStreamMessageType.NewValuesSameSchema]: {
values: unknown[][];
};
};
type InternalStreamMessage<T = InternalStreamMessageType> = T extends InternalStreamMessageType
? {
type: T;
} & InternalStreamMessageTypeToData[T]
: never;
const reduceNewValuesSameSchemaMessages = (
packets: Array<InternalStreamMessage<InternalStreamMessageType.NewValuesSameSchema>>
) => ({
values: packets.reduce((acc, { values }) => {
for (let i = 0; i < values.length; i++) {
if (!acc[i]) {
acc[i] = [];
}
for (let j = 0; j < values[i].length; j++) {
acc[i].push(values[i][j]);
}
}
return acc;
}, [] as unknown[][]),
type: InternalStreamMessageType.NewValuesSameSchema,
});
const filterMessages = <T extends InternalStreamMessageType>(
packets: InternalStreamMessage[],
type: T
): Array<InternalStreamMessage<T>> => packets.filter((p) => p.type === type) as Array<InternalStreamMessage<T>>;
export class LiveDataStream<T = unknown> {
private frameBuffer: StreamingDataFrame;
private liveEventsSubscription: Subscription;
private stream: Subject<InternalStreamMessage> = new ReplaySubject(1);
private shutdownTimeoutId: ReturnType<typeof setTimeout> | undefined;
constructor(private deps: DataStreamHandlerDeps<T>) {
this.frameBuffer = StreamingDataFrame.empty(deps.defaultStreamingFrameOptions);
this.liveEventsSubscription = deps.liveEventsObservable.subscribe({
error: this.onError,
complete: this.onComplete,
next: this.onNext,
});
}
private shutdown = () => {
this.stream.complete();
this.liveEventsSubscription.unsubscribe();
this.deps.onShutdown();
};
private shutdownIfNoSubscribers = () => {
if (!this.stream.observed) {
this.shutdown();
}
};
private onError = (err: any) => {
console.log('LiveQuery [error]', { err }, this.deps.channelId);
this.stream.next({
type: InternalStreamMessageType.Error,
error: toDataQueryError(err),
});
this.shutdown();
};
private onComplete = () => {
console.log('LiveQuery [complete]', this.deps.channelId);
this.shutdown();
};
private onNext = (evt: LiveChannelEvent) => {
if (isLiveChannelMessageEvent(evt)) {
this.process(evt.message);
return;
}
const liveChannelStatusEvent = isLiveChannelStatusEvent(evt);
if (liveChannelStatusEvent && evt.error) {
this.stream.next({
type: InternalStreamMessageType.Error,
error: {
...toDataQueryError(evt.error),
message: `Streaming channel error: ${evt.error.message}`,
},
});
return;
}
if (
liveChannelStatusEvent &&
(evt.state === LiveChannelConnectionState.Connected || evt.state === LiveChannelConnectionState.Pending) &&
evt.message
) {
this.process(evt.message);
}
};
private process = (msg: DataFrameJSON) => {
const packetInfo = this.frameBuffer.push(msg);
if (packetInfo.schemaChanged) {
this.stream.next({
type: InternalStreamMessageType.ChangedSchema,
});
} else {
this.stream.next({
type: InternalStreamMessageType.NewValuesSameSchema,
values: this.frameBuffer.getValuesFromLastPacket(),
});
}
};
private resizeBuffer = (bufferOptions: StreamingFrameOptions) => {
if (bufferOptions && this.frameBuffer.needsResizing(bufferOptions)) {
this.frameBuffer.resize(bufferOptions);
}
};
private prepareInternalStreamForNewSubscription = (options: LiveDataStreamOptions): void => {
if (!this.frameBuffer.hasAtLeastOnePacket() && options.frame) {
// will skip initial frames from subsequent subscribers
this.process(dataFrameToJSON(options.frame));
}
};
private clearShutdownTimeout = () => {
if (this.shutdownTimeoutId) {
clearTimeout(this.shutdownTimeoutId);
this.shutdownTimeoutId = undefined;
}
};
get = (options: LiveDataStreamOptions, subKey: DataStreamSubscriptionKey): Observable<StreamingDataQueryResponse> => {
this.clearShutdownTimeout();
const buffer = getStreamingFrameOptions(options.buffer);
this.resizeBuffer(buffer);
this.prepareInternalStreamForNewSubscription(options);
const fieldsNamesFilter = options.filter?.fields;
const dataNeedsFiltering = fieldsNamesFilter?.length;
const fieldFilterPredicate = dataNeedsFiltering ? ({ name }: Field) => fieldsNamesFilter.includes(name) : undefined;
let matchingFieldIndexes: number[] | undefined = undefined;
const getFullFrameResponseData = (error?: DataQueryError): StreamingDataQueryResponse => {
matchingFieldIndexes = fieldFilterPredicate
? this.frameBuffer.getMatchingFieldIndexes(fieldFilterPredicate)
: undefined;
return {
key: subKey,
state: error ? LoadingState.Error : LoadingState.Streaming,
data: [
{
type: StreamingResponseDataType.FullFrame,
frame: this.frameBuffer.serialize(fieldFilterPredicate, buffer),
},
],
error,
};
};
const getNewValuesSameSchemaResponseData = (values: unknown[][]): StreamingDataQueryResponse => {
const filteredValues = matchingFieldIndexes
? values.filter((v, i) => (matchingFieldIndexes as number[]).includes(i))
: values;
return {
key: subKey,
state: LoadingState.Streaming,
data: [
{
type: StreamingResponseDataType.NewValuesSameSchema,
values: filteredValues,
},
],
};
};
let shouldSendFullFrame = true;
const transformedInternalStream = this.stream.pipe(
bufferIfNot(this.deps.subscriberReadiness),
map((messages, i) => {
const errors = filterMessages(messages, InternalStreamMessageType.Error);
const lastError = errors.length ? errors[errors.length - 1].error : undefined;
if (shouldSendFullFrame) {
shouldSendFullFrame = false;
return getFullFrameResponseData(lastError);
}
if (errors.length) {
// send the latest frame with the last error, discard everything else
return getFullFrameResponseData(lastError);
}
const schemaChanged = messages.some((n) => n.type === InternalStreamMessageType.ChangedSchema);
if (schemaChanged) {
// send the latest frame, discard intermediate appends
return getFullFrameResponseData();
}
const newValueSameSchemaMessages = filterMessages(messages, InternalStreamMessageType.NewValuesSameSchema);
if (newValueSameSchemaMessages.length !== messages.length) {
console.warn(`unsupported message type ${messages.map(({ type }) => type)}`);
}
return getNewValuesSameSchemaResponseData(reduceNewValuesSameSchemaMessages(newValueSameSchemaMessages).values);
})
);
return new Observable<StreamingDataQueryResponse>((subscriber) => {
const sub = transformedInternalStream.subscribe({
next: (n) => {
subscriber.next(n);
},
error: (err) => {
subscriber.error(err);
},
complete: () => {
subscriber.complete();
},
});
return () => {
// TODO: potentially resize (downsize) the buffer on unsubscribe
sub.unsubscribe();
if (!this.stream.observed) {
this.clearShutdownTimeout();
this.shutdownTimeoutId = setTimeout(this.shutdownIfNoSubscribers, this.deps.shutdownDelayInMs);
}
};
});
};
}

View File

@ -140,7 +140,14 @@ export class CentrifugeLiveChannel<T = any> {
*/
getStream() {
return new Observable((subscriber) => {
subscriber.next({ ...this.currentStatus });
const initialMessage = { ...this.currentStatus };
if (this.lastMessageWithSchema?.schema) {
// send just schema instead of schema+data to avoid having data gaps
initialMessage.message = { schema: this.lastMessageWithSchema?.schema };
}
subscriber.next({ ...this.currentStatus, message: this.lastMessageWithSchema });
const sub = this.stream.subscribe(subscriber);
return () => {
sub.unsubscribe();

View File

@ -1,23 +1,21 @@
import Centrifuge from 'centrifuge/dist/centrifuge';
import { LiveDataStreamOptions } from '@grafana/runtime';
import { toDataQueryError } from '@grafana/runtime/src/utils/toDataQueryError';
import { BehaviorSubject, Observable } from 'rxjs';
import {
DataFrame,
DataFrameJSON,
dataFrameToJSON,
GrafanaLiveSrv,
LiveDataStreamOptions,
StreamingFrameAction,
StreamingFrameOptions,
} from '@grafana/runtime/src/services/live';
import { BehaviorSubject, Observable, share, startWith } from 'rxjs';
import {
DataQueryResponse,
isLiveChannelMessageEvent,
isLiveChannelStatusEvent,
LiveChannelAddress,
LiveChannelConnectionState,
LiveChannelEvent,
LiveChannelPresenceStatus,
LoadingState,
StreamingDataFrame,
toDataFrameDTO,
LiveChannelId,
toLiveChannelId,
} from '@grafana/data';
import { CentrifugeLiveChannel } from './channel';
import { LiveDataStream } from './LiveDataStream';
import { StreamingResponseData } from '../data/utils';
export type CentrifugeSrvDeps = {
appUrl: string;
@ -28,39 +26,32 @@ export type CentrifugeSrvDeps = {
dataStreamSubscriberReadiness: Observable<boolean>;
};
export interface CentrifugeSrv {
/**
* Listen for changes to the connection state
*/
getConnectionState(): Observable<boolean>;
export type StreamingDataQueryResponse = Omit<DataQueryResponse, 'data'> & { data: [StreamingResponseData] };
/**
* Watch for messages in a channel
*/
getStream<T>(address: LiveChannelAddress): Observable<LiveChannelEvent<T>>;
export type CentrifugeSrv = Omit<GrafanaLiveSrv, 'publish' | 'getDataStream'> & {
getDataStream: (options: LiveDataStreamOptions) => Observable<StreamingDataQueryResponse>;
};
/**
* Connect to a channel and return results as DataFrames
*/
getDataStream(options: LiveDataStreamOptions): Observable<DataQueryResponse>;
export type DataStreamSubscriptionKey = string;
/**
* For channels that support presence, this will request the current state from the server.
*
* Join and leave messages will be sent to the open stream
*/
getPresence(address: LiveChannelAddress): Promise<LiveChannelPresenceStatus>;
}
const defaultStreamingFrameOptions: Readonly<StreamingFrameOptions> = {
maxLength: 100,
maxDelta: Infinity,
action: StreamingFrameAction.Append,
};
const dataStreamShutdownDelayInMs = 5000;
export class CentrifugeService implements CentrifugeSrv {
readonly open = new Map<string, CentrifugeLiveChannel>();
private readonly liveDataStreamByChannelId: Record<LiveChannelId, LiveDataStream> = {};
readonly centrifuge: Centrifuge;
readonly connectionState: BehaviorSubject<boolean>;
readonly connectionBlocker: Promise<void>;
private dataStreamSubscriberReady = true;
private readonly dataStreamSubscriberReadiness: Observable<boolean>;
constructor(private deps: CentrifugeSrvDeps) {
deps.dataStreamSubscriberReadiness.subscribe((next) => (this.dataStreamSubscriberReady = next));
this.dataStreamSubscriberReadiness = deps.dataStreamSubscriberReadiness.pipe(share(), startWith(true));
const liveUrl = `${deps.appUrl.replace(/^http/, 'ws')}/api/live/ws`;
this.centrifuge = new Centrifuge(liveUrl, {});
this.centrifuge.setConnectData({
@ -154,130 +145,59 @@ export class CentrifugeService implements CentrifugeSrv {
/**
* Listen for changes to the connection state
*/
getConnectionState() {
getConnectionState = () => {
return this.connectionState.asObservable();
}
};
/**
* Watch for messages in a channel
*/
getStream<T>(address: LiveChannelAddress): Observable<LiveChannelEvent<T>> {
getStream: CentrifugeSrv['getStream'] = <T>(address: LiveChannelAddress) => {
return this.getChannel<T>(address).getStream();
}
};
private createSubscriptionKey = (options: LiveDataStreamOptions): DataStreamSubscriptionKey =>
options.key ?? `xstr/${streamCounter++}`;
private getLiveDataStream = (options: LiveDataStreamOptions): LiveDataStream => {
const channelId = toLiveChannelId(options.addr);
const existingStream = this.liveDataStreamByChannelId[channelId];
if (existingStream) {
return existingStream;
}
const channel = this.getChannel(options.addr);
this.liveDataStreamByChannelId[channelId] = new LiveDataStream({
channelId,
onShutdown: () => {
delete this.liveDataStreamByChannelId[channelId];
},
liveEventsObservable: channel.getStream(),
subscriberReadiness: this.dataStreamSubscriberReadiness,
defaultStreamingFrameOptions,
shutdownDelayInMs: dataStreamShutdownDelayInMs,
});
return this.liveDataStreamByChannelId[channelId];
};
/**
* Connect to a channel and return results as DataFrames
*/
getDataStream(options: LiveDataStreamOptions): Observable<DataQueryResponse> {
return new Observable<DataQueryResponse>((subscriber) => {
const channel = this.getChannel(options.addr);
const key = options.key ?? `xstr/${streamCounter++}`;
let data: StreamingDataFrame | undefined = undefined;
let filtered: DataFrame | undefined = undefined;
let state = LoadingState.Streaming;
let lastWidth = -1;
getDataStream: CentrifugeSrv['getDataStream'] = (options) => {
const subscriptionKey = this.createSubscriptionKey(options);
const process = (msg: DataFrameJSON) => {
if (!data) {
data = new StreamingDataFrame(msg, options.buffer);
} else {
data.push(msg);
}
state = LoadingState.Streaming;
const sameWidth = lastWidth === data.fields.length;
lastWidth = data.fields.length;
// Filter out fields
if (!filtered || msg.schema || !sameWidth) {
filtered = data;
if (options.filter) {
const { fields } = options.filter;
if (fields?.length) {
filtered = {
...data,
fields: data.fields.filter((f) => fields.includes(f.name)),
};
}
}
}
if (this.dataStreamSubscriberReady) {
filtered.length = data.length; // make sure they stay up-to-date
subscriber.next({
state,
data: [
// workaround for serializing issues when sending DataFrame from web worker to the main thread
// DataFrame is making use of ArrayVectors which are es6 classes and thus not cloneable out of the box
// `toDataFrameDTO` converts ArrayVectors into native arrays.
toDataFrameDTO(filtered),
],
key,
});
}
};
if (options.frame) {
process(dataFrameToJSON(options.frame));
} else if (channel.lastMessageWithSchema) {
process(channel.lastMessageWithSchema);
}
const sub = channel.getStream().subscribe({
error: (err: any) => {
console.log('LiveQuery [error]', { err }, options.addr);
state = LoadingState.Error;
subscriber.next({ state, data: [data], key, error: toDataQueryError(err) });
sub.unsubscribe(); // close after error
},
complete: () => {
console.log('LiveQuery [complete]', options.addr);
if (state !== LoadingState.Error) {
state = LoadingState.Done;
}
// or track errors? subscriber.next({ state, data: [data], key });
subscriber.complete();
sub.unsubscribe();
},
next: (evt: LiveChannelEvent) => {
if (isLiveChannelMessageEvent(evt)) {
process(evt.message);
return;
}
if (isLiveChannelStatusEvent(evt)) {
if (evt.error) {
let error = toDataQueryError(evt.error);
error.message = `Streaming channel error: ${error.message}`;
state = LoadingState.Error;
subscriber.next({ state, data: [data], key, error });
return;
} else if (
evt.state === LiveChannelConnectionState.Connected ||
evt.state === LiveChannelConnectionState.Pending
) {
if (evt.message) {
process(evt.message);
}
return;
}
console.log('ignore state', evt);
}
},
});
return () => {
sub.unsubscribe();
};
});
}
const stream = this.getLiveDataStream(options);
return stream.get(options, subscriptionKey);
};
/**
* For channels that support presence, this will request the current state from the server.
*
* Join and leave messages will be sent to the open stream
*/
getPresence(address: LiveChannelAddress): Promise<LiveChannelPresenceStatus> {
getPresence: CentrifugeSrv['getPresence'] = (address) => {
return this.getChannel(address).getPresence();
}
};
}
// This is used to give a unique key for each stream. The actual value does not matter

View File

@ -1,8 +1,6 @@
import { reduceField, ReducerID } from '..';
import { getFieldDisplayName } from '../field';
import { DataFrame, FieldType } from '../types/dataFrame';
import { DataFrameJSON } from './DataFrameJSON';
import { StreamingDataFrame } from './StreamingDataFrame';
import { reduceField, ReducerID, getFieldDisplayName, DataFrame, FieldType, DataFrameJSON } from '@grafana/data';
import { StreamingFrameAction, StreamingFrameOptions } from '@grafana/runtime';
import { getStreamingFrameOptions, StreamingDataFrame } from './StreamingDataFrame';
describe('Streaming JSON', () => {
describe('when called with a DataFrame', () => {
@ -23,7 +21,7 @@ describe('Streaming JSON', () => {
},
};
const stream = new StreamingDataFrame(json, {
const stream = StreamingDataFrame.fromDataFrameJSON(json, {
maxLength: 5,
maxDelta: 300,
});
@ -186,10 +184,306 @@ describe('Streaming JSON', () => {
]
`);
});
it('should append data with new schema and fill missed values with undefined', () => {
stream.push({
schema: {
fields: [
{ name: 'time', type: FieldType.time },
{ name: 'name', type: FieldType.string },
{ name: 'value', type: FieldType.number },
{ name: 'value2', type: FieldType.number },
],
},
data: {
values: [[601], ['i'], [10], [-10]],
},
});
expect(stream.fields.map((f) => ({ name: f.name, value: f.values.buffer }))).toMatchInlineSnapshot(`
Array [
Object {
"name": "time",
"value": Array [
500,
501,
502,
503,
601,
],
},
Object {
"name": "name",
"value": Array [
"e",
"f",
"g",
"h",
"i",
],
},
Object {
"name": "value",
"value": Array [
5,
6,
7,
8,
9,
10,
],
},
Object {
"name": "value2",
"value": Array [
undefined,
undefined,
undefined,
undefined,
-10,
],
},
]
`);
});
it('should be able to return values from previous packet', function () {
stream.push({
data: {
values: [
[602, 603],
['j', 'k'],
[11, 12],
[-11, -12],
],
},
});
expect(stream.getValuesFromLastPacket()).toEqual([
[602, 603],
['j', 'k'],
[11, 12],
[-11, -12],
]);
});
});
describe('serialization', function () {
const json: DataFrameJSON = {
schema: {
fields: [
{ name: 'time', type: FieldType.time },
{ name: 'name', type: FieldType.string },
{ name: 'value', type: FieldType.number },
],
},
data: {
values: [
[100, 200, 300],
['a', 'b', 'c'],
[1, 2, 3],
],
},
};
const frame = StreamingDataFrame.fromDataFrameJSON(json, {
maxLength: 5,
maxDelta: 300,
});
it('should filter fields', function () {
const serializedFrame = frame.serialize((f) => ['time'].includes(f.name));
expect(serializedFrame.fields).toEqual([
{
config: {},
name: 'time',
type: 'time',
values: [100, 200, 300],
},
]);
});
it('should resize the buffer', function () {
const serializedFrame = frame.serialize((f) => ['time', 'name'].includes(f.name), { maxLength: 2 });
expect(serializedFrame.fields).toEqual([
{
config: {},
name: 'time',
type: 'time',
values: [200, 300],
},
{
config: {},
name: 'name',
type: 'string',
values: ['b', 'c'],
},
]);
});
});
describe('resizing', function () {
it.each([
[
{
existing: {
maxLength: 10,
maxDelta: 5,
action: StreamingFrameAction.Replace,
},
newOptions: {},
expected: {
maxLength: 10,
maxDelta: 5,
action: StreamingFrameAction.Replace,
},
},
],
[
{
existing: {
maxLength: 10,
maxDelta: 5,
action: StreamingFrameAction.Replace,
},
newOptions: {
maxLength: 9,
maxDelta: 4,
},
expected: {
maxLength: 10,
maxDelta: 5,
action: StreamingFrameAction.Replace,
},
},
],
[
{
existing: {
maxLength: 10,
maxDelta: 5,
action: StreamingFrameAction.Replace,
},
newOptions: {
maxLength: 11,
maxDelta: 6,
},
expected: {
maxLength: 11,
maxDelta: 6,
action: StreamingFrameAction.Replace,
},
},
],
])(
'should always resize to a bigger buffer',
({
existing,
expected,
newOptions,
}: {
existing: StreamingFrameOptions;
newOptions: Partial<StreamingFrameOptions>;
expected: StreamingFrameOptions;
}) => {
const frame = StreamingDataFrame.empty(existing);
frame.resize(newOptions);
expect(frame.getOptions()).toEqual(expected);
}
);
it('should override infinity maxDelta', function () {
const frame = StreamingDataFrame.empty({
maxLength: 10,
maxDelta: Infinity,
action: StreamingFrameAction.Replace,
});
frame.resize({
maxLength: 9,
maxDelta: 4,
});
expect(frame.getOptions()).toEqual({
maxLength: 10,
maxDelta: 4,
action: StreamingFrameAction.Replace,
});
});
});
describe('options with defaults', function () {
it('should provide defaults', function () {
expect(getStreamingFrameOptions()).toEqual({
action: StreamingFrameAction.Append,
maxDelta: Infinity,
maxLength: 1000,
});
});
});
describe('when deserialized', function () {
const json: DataFrameJSON = {
schema: {
fields: [
{ name: 'time', type: FieldType.time },
{ name: 'name', type: FieldType.string },
{ name: 'value', type: FieldType.number },
],
},
data: {
values: [
[100, 200, 300],
['a', 'b', 'c'],
[1, 2, 3],
],
},
};
const serializedFrame = StreamingDataFrame.fromDataFrameJSON(json, {
maxLength: 5,
maxDelta: 300,
}).serialize();
it('should support pushing new values matching the existing schema', function () {
const frame = StreamingDataFrame.deserialize(serializedFrame);
frame.pushNewValues([[601], ['x'], [10]]);
expect(frame.fields.map((f) => ({ name: f.name, value: f.values.buffer }))).toMatchInlineSnapshot(`
Array [
Object {
"name": "time",
"value": Array [
300,
601,
],
},
Object {
"name": "name",
"value": Array [
"c",
"x",
],
},
Object {
"name": "value",
"value": Array [
3,
10,
],
},
]
`);
});
});
describe('when created empty', function () {
it('should have no packets', function () {
const streamingDataFrame = StreamingDataFrame.empty();
expect(streamingDataFrame.hasAtLeastOnePacket()).toEqual(false);
expect(streamingDataFrame.fields).toHaveLength(0);
});
});
describe('lengths property is accurate', () => {
const stream = new StreamingDataFrame(
const stream = StreamingDataFrame.fromDataFrameJSON(
{
schema: {
fields: [{ name: 'simple', type: FieldType.number }],
@ -217,7 +511,7 @@ describe('Streaming JSON', () => {
});
describe('streaming labels column', () => {
const stream = new StreamingDataFrame(
const stream = StreamingDataFrame.fromDataFrameJSON(
{
schema: {
fields: [
@ -392,7 +686,7 @@ describe('Streaming JSON', () => {
},
};
const stream = new StreamingDataFrame(json, {
const stream = StreamingDataFrame.fromDataFrameJSON(json, {
maxLength: 4,
maxDelta: 300,
});
@ -410,6 +704,7 @@ describe('Streaming JSON', () => {
"action": "replace",
"length": 3,
"number": 1,
"schemaChanged": true,
},
"values": Array [
1,
@ -433,6 +728,7 @@ describe('Streaming JSON', () => {
"action": "append",
"length": 2,
"number": 2,
"schemaChanged": false,
},
"values": Array [
2,
@ -454,6 +750,7 @@ describe('Streaming JSON', () => {
"action": "append",
"length": 1,
"number": 3,
"schemaChanged": false,
},
"values": Array [
3,

View File

@ -1,40 +1,34 @@
import { Field, DataFrame, FieldType, Labels, QueryResultMeta } from '../types';
import { ArrayVector } from '../vector';
import { DataFrameJSON, decodeFieldValueEntities, FieldSchema } from './DataFrameJSON';
import { guessFieldTypeFromValue } from './processDataFrame';
import { join } from '../transformations/transformers/joinDataFrames';
import {
DataFrame,
Field,
FieldDTO,
FieldType,
Labels,
QueryResultMeta,
DataFrameJSON,
decodeFieldValueEntities,
FieldSchema,
guessFieldTypeFromValue,
ArrayVector,
toFilteredDataFrameDTO,
} from '@grafana/data';
import { join } from '@grafana/data/src/transformations/transformers/joinDataFrames';
import {
StreamingFrameAction,
StreamingFrameOptions,
} from '@grafana/runtime/src/services/live';
import { AlignedData } from 'uplot';
/**
* Indicate if the frame is appened or replace
*
* @public -- but runtime
*/
export enum StreamingFrameAction {
Append = 'append',
Replace = 'replace',
}
/**
* Stream packet info is attached to StreamingDataFrames and indicate how many
* rows were added to the end of the frame. The number of discarded rows can be
* calculated from previous state
*
* @public -- but runtime
*/
export interface StreamPacketInfo {
number: number;
action: StreamingFrameAction;
length: number;
}
/**
* @alpha
*/
export interface StreamingFrameOptions {
maxLength?: number; // 1000
maxDelta?: number; // how long to keep things
action?: StreamingFrameAction; // default will append
schemaChanged: boolean;
}
enum PushMode {
@ -43,10 +37,22 @@ enum PushMode {
// long
}
export type SerializedStreamingDataFrame = {
name?: string;
fields: FieldDTO[];
refId?: string;
meta: QueryResultMeta;
schemaFields: FieldSchema[];
timeFieldIndex: number;
pushMode: PushMode;
length: number;
packetInfo: StreamPacketInfo;
options: StreamingFrameOptions;
labels: Set<string>;
};
/**
* Unlike a circular buffer, this will append and periodically slice the front
*
* @alpha
*/
export class StreamingDataFrame implements DataFrame {
name?: string;
@ -56,40 +62,142 @@ export class StreamingDataFrame implements DataFrame {
fields: Array<Field<any, ArrayVector<any>>> = [];
length = 0;
options: StreamingFrameOptions;
private schemaFields: FieldSchema[] = [];
private timeFieldIndex = -1;
private pushMode = PushMode.wide;
private alwaysReplace = false;
// current labels
private labels: Set<string> = new Set();
readonly packetInfo: StreamPacketInfo = {
schemaChanged: true,
number: 0,
action: StreamingFrameAction.Replace,
length: 0,
};
constructor(frame: DataFrameJSON, opts?: StreamingFrameOptions) {
this.options = {
maxLength: 1000,
maxDelta: Infinity,
...opts,
};
this.alwaysReplace = this.options.action === StreamingFrameAction.Replace;
private constructor(public options: StreamingFrameOptions) {
// Get Length to show up if you use spread
Object.defineProperty(this, 'length', {
enumerable: true,
});
this.push(frame);
// Get fields to show up if you use spread
Object.defineProperty(this, 'fields', {
enumerable: true,
});
}
serialize = (
fieldPredicate?: (f: Field) => boolean,
optionsOverride?: Partial<StreamingFrameOptions>
): SerializedStreamingDataFrame => {
const options = optionsOverride ? Object.assign({}, { ...this.options, ...optionsOverride }) : this.options;
const dataFrameDTO = toFilteredDataFrameDTO(this, fieldPredicate);
const numberOfItemsToRemove = getNumberOfItemsToRemove(
dataFrameDTO.fields.map((f) => f.values) as unknown[][],
options.maxLength,
this.timeFieldIndex,
options.maxDelta
);
dataFrameDTO.fields = dataFrameDTO.fields.map((f) => ({
...f,
values: (f.values as unknown[]).slice(numberOfItemsToRemove),
}));
return {
...dataFrameDTO,
// TODO: Labels and schema are not filtered by field
labels: this.labels,
schemaFields: this.schemaFields,
name: this.name,
refId: this.refId,
meta: this.meta,
length: this.length,
timeFieldIndex: this.timeFieldIndex,
pushMode: this.pushMode,
packetInfo: this.packetInfo,
options,
};
};
private initFromSerialized = (serialized: Omit<SerializedStreamingDataFrame, 'options'>) => {
this.name = serialized.name;
this.refId = serialized.refId;
this.meta = serialized.meta;
this.length = serialized.length;
this.labels = serialized.labels;
this.schemaFields = serialized.schemaFields;
this.timeFieldIndex = serialized.timeFieldIndex;
this.pushMode = serialized.pushMode;
this.packetInfo.length = serialized.packetInfo.length;
this.packetInfo.number = serialized.packetInfo.number;
this.packetInfo.action = StreamingFrameAction.Replace;
this.packetInfo.schemaChanged = true;
this.fields = serialized.fields.map((f) => ({
...f,
type: f.type ?? FieldType.other,
config: f.config ?? {},
values: Array.isArray(f.values) ? new ArrayVector(f.values) : new ArrayVector(),
}));
assureValuesAreWithinLengthLimit(
this.fields.map((f) => f.values.buffer),
this.options.maxLength,
this.timeFieldIndex,
this.options.maxDelta
);
};
static deserialize = (serialized: SerializedStreamingDataFrame) => {
const frame = new StreamingDataFrame(serialized.options);
frame.initFromSerialized(serialized);
return frame;
};
static empty = (opts?: Partial<StreamingFrameOptions>): StreamingDataFrame =>
new StreamingDataFrame(getStreamingFrameOptions(opts));
static fromDataFrameJSON = (frame: DataFrameJSON, opts?: Partial<StreamingFrameOptions>): StreamingDataFrame => {
const streamingDataFrame = new StreamingDataFrame(getStreamingFrameOptions(opts));
streamingDataFrame.push(frame);
return streamingDataFrame;
};
private get alwaysReplace() {
return this.options.action === StreamingFrameAction.Replace;
}
needsResizing = ({ maxLength, maxDelta }: StreamingFrameOptions) => {
const needsMoreLength = maxLength && this.options.maxLength < maxLength;
const needsBiggerDelta = maxDelta && this.options.maxDelta < maxDelta;
const needsToOverrideDefaultInfinityDelta = maxDelta && this.options.maxDelta === Infinity;
return Boolean(needsMoreLength || needsBiggerDelta || needsToOverrideDefaultInfinityDelta);
};
resize = ({ maxLength, maxDelta }: Partial<StreamingFrameOptions>) => {
if (maxDelta) {
if (this.options.maxDelta === Infinity) {
this.options.maxDelta = maxDelta;
} else {
this.options.maxDelta = Math.max(maxDelta, this.options.maxDelta);
}
}
this.options.maxLength = Math.max(this.options.maxLength, maxLength ?? 0);
};
/**
* apply the new message to the existing data. This will replace the existing schema
* if a new schema is included in the message, or append data matching the current schema
*/
push(msg: DataFrameJSON) {
push(msg: DataFrameJSON): StreamPacketInfo {
const { schema, data } = msg;
this.packetInfo.number++;
this.packetInfo.length = 0;
this.packetInfo.schemaChanged = false;
if (schema) {
this.pushMode = PushMode.wide;
@ -118,6 +226,7 @@ export class StreamingDataFrame implements DataFrame {
f.labels = sf.labels;
});
} else {
this.packetInfo.schemaChanged = true;
const isWide = this.pushMode === PushMode.wide;
this.fields = niceSchemaFields.map((f) => {
return {
@ -127,7 +236,8 @@ export class StreamingDataFrame implements DataFrame {
type: f.type ?? FieldType.other,
// transfer old values by type & name, unless we relied on labels to match fields
values: isWide
? this.fields.find((of) => of.name === f.name && f.type === of.type)?.values ?? new ArrayVector()
? this.fields.find((of) => of.name === f.name && f.type === of.type)?.values ??
new ArrayVector(Array(this.length).fill(undefined))
: new ArrayVector(),
};
});
@ -155,6 +265,7 @@ export class StreamingDataFrame implements DataFrame {
// make sure fields are initalized for each label
for (const label of labeledTables.keys()) {
if (!this.labels.has(label)) {
this.packetInfo.schemaChanged = true;
this.addLabel(label);
}
}
@ -221,8 +332,54 @@ export class StreamingDataFrame implements DataFrame {
// Update the frame length
this.length = appended[0].length;
}
return {
...this.packetInfo,
};
}
pushNewValues = (values: unknown[][]) => {
if (!values?.length) {
return;
}
this.packetInfo.action = StreamingFrameAction.Append;
this.packetInfo.number++;
this.packetInfo.length = values[0].length;
this.packetInfo.schemaChanged = false;
circPush(
this.fields.map((f) => f.values.buffer),
values,
this.options.maxLength,
this.timeFieldIndex,
this.options.maxDelta
);
};
resetStateCalculations = () => {
this.fields.forEach((f) => {
f.state = {
...(f.state ?? {}),
calcs: undefined,
range: undefined,
};
});
};
getMatchingFieldIndexes = (fieldPredicate: (f: Field) => boolean): number[] =>
this.fields
.map((f, index) => (fieldPredicate(f) ? index : undefined))
.filter((val) => val !== undefined) as number[];
getValuesFromLastPacket = (): unknown[][] =>
this.fields.map((f) => {
const values = f.values.buffer;
return values.slice(Math.max(values.length - this.packetInfo.length));
});
hasAtLeastOnePacket = () => Boolean(this.packetInfo.length);
// adds a set of fields for a new label
private addLabel(label: string) {
let labelCount = this.labels.size;
@ -258,6 +415,16 @@ export class StreamingDataFrame implements DataFrame {
this.labels.add(label);
}
getOptions = (): Readonly<StreamingFrameOptions> => this.options;
}
export function getStreamingFrameOptions(opts?: Partial<StreamingFrameOptions>): StreamingFrameOptions {
return {
maxLength: opts?.maxLength ?? 1000,
maxDelta: opts?.maxDelta ?? Infinity,
action: opts?.action ?? StreamingFrameAction.Append,
};
}
// converts vertical insertion records with table keys in [0] and column values in [1...N]
@ -317,9 +484,31 @@ export function getLastStreamingDataFramePacket(frame: DataFrame) {
}
// mutable circular push
function circPush(data: number[][], newData: number[][], maxLength = Infinity, deltaIdx = 0, maxDelta = Infinity) {
function circPush(data: unknown[][], newData: unknown[][], maxLength = Infinity, deltaIdx = 0, maxDelta = Infinity) {
for (let i = 0; i < data.length; i++) {
data[i] = data[i].concat(newData[i]);
for (let k = 0; k < newData[i].length; k++) {
data[i].push(newData[i][k]);
}
}
return assureValuesAreWithinLengthLimit(data, maxLength, deltaIdx, maxDelta);
}
function assureValuesAreWithinLengthLimit(data: unknown[][], maxLength = Infinity, deltaIdx = 0, maxDelta = Infinity) {
const count = getNumberOfItemsToRemove(data, maxLength, deltaIdx, maxDelta);
if (count) {
for (let i = 0; i < data.length; i++) {
data[i].splice(0, count);
}
}
return count;
}
function getNumberOfItemsToRemove(data: unknown[][], maxLength = Infinity, deltaIdx = 0, maxDelta = Infinity) {
if (!data[0]?.length) {
return 0;
}
const nlen = data[0].length;
@ -331,7 +520,7 @@ function circPush(data: number[][], newData: number[][], maxLength = Infinity, d
}
if (maxDelta !== Infinity && deltaIdx >= 0) {
const deltaLookup = data[deltaIdx];
const deltaLookup = data[deltaIdx] as number[];
const low = deltaLookup[sliceIdx];
const high = deltaLookup[nlen - 1];
@ -341,12 +530,6 @@ function circPush(data: number[][], newData: number[][], maxLength = Infinity, d
}
}
if (sliceIdx) {
for (let i = 0; i < data.length; i++) {
data[i] = data[i].slice(sliceIdx);
}
}
return sliceIdx;
}

View File

@ -0,0 +1,55 @@
import { DataQueryResponseData, isDataFrame } from '@grafana/data';
import { StreamingDataFrame } from './StreamingDataFrame';
/**
* @alpha -- experimental
*/
export enum StreamingResponseDataType {
NewValuesSameSchema = 'NewValuesSameSchema',
FullFrame = 'FullFrame',
}
/**
* @alpha -- experimental
*/
export type StreamingResponseDataTypeToData = {
[StreamingResponseDataType.NewValuesSameSchema]: {
values: unknown[][];
};
[StreamingResponseDataType.FullFrame]: {
frame: ReturnType<StreamingDataFrame['serialize']>;
};
};
/**
* @alpha -- experimental
*/
export type StreamingResponseData<T = StreamingResponseDataType> = T extends StreamingResponseDataType
? {
type: T;
} & StreamingResponseDataTypeToData[T]
: never;
/**
* @alpha -- experimental
*/
export const isStreamingResponseData = <T extends StreamingResponseDataType>(
responseData: DataQueryResponseData,
type: T
): responseData is StreamingResponseData<T> => 'type' in responseData && responseData.type === type;
const AllStreamingResponseDataTypes = Object.values(StreamingResponseDataType);
/**
* @alpha -- experimental
*/
export const isAnyStreamingResponseData = (
responseData: DataQueryResponseData
): responseData is StreamingResponseData =>
'type' in responseData && AllStreamingResponseDataTypes.includes(responseData.type);
/**
* @alpha -- experimental
*/
export const isStreamingDataFrame = (data: DataQueryResponseData): data is StreamingDataFrame =>
isDataFrame(data) && 'packetInfo' in data;

View File

@ -0,0 +1,148 @@
import { GrafanaLiveService } from './live';
import { StreamingDataQueryResponse } from './centrifuge/service';
import { Subject } from 'rxjs';
import { DataQueryResponse, FieldType, LiveChannelScope } from '@grafana/data';
import { StreamingDataFrame } from './data/StreamingDataFrame';
import { StreamingResponseDataType } from './data/utils';
describe('GrafanaLiveService', () => {
const deps = {
backendSrv: {},
centrifugeSrv: {
getDataStream: jest.fn(),
},
};
const liveService = new GrafanaLiveService(deps as any);
const liveDataStreamOptions = {
addr: {
scope: LiveChannelScope.Grafana,
namespace: ' abc',
path: 'abc',
},
};
beforeEach(() => {
jest.clearAllMocks();
});
it('should map response from Centrifuge Service to a streaming data frame', async () => {
const dummySubject = new Subject<StreamingDataQueryResponse>();
deps.centrifugeSrv.getDataStream.mockReturnValueOnce(dummySubject);
let response: DataQueryResponse | undefined;
liveService.getDataStream(liveDataStreamOptions).subscribe((next) => {
response = next;
});
dummySubject.next({
data: [
{
type: StreamingResponseDataType.FullFrame,
frame: StreamingDataFrame.empty().serialize(),
},
],
});
expect(response).not.toBeUndefined();
expect(response?.data[0]).toBeInstanceOf(StreamingDataFrame);
});
it('should add partial streaming data to the buffer', async () => {
const dummySubject = new Subject<StreamingDataQueryResponse>();
deps.centrifugeSrv.getDataStream.mockReturnValueOnce(dummySubject);
let response: DataQueryResponse | undefined;
liveService.getDataStream(liveDataStreamOptions).subscribe((next) => {
response = next;
});
dummySubject.next({
data: [
{
type: StreamingResponseDataType.FullFrame,
frame: StreamingDataFrame.fromDataFrameJSON({
schema: {
fields: [
{ name: 'time', type: FieldType.time },
{ name: 'a', type: FieldType.string },
{ name: 'b', type: FieldType.number },
],
},
}).serialize(),
},
],
});
dummySubject.next({
data: [
{
type: StreamingResponseDataType.NewValuesSameSchema,
values: [
[100, 101],
['a', 'b'],
[1, 2],
],
},
],
});
expect(response).not.toBeUndefined();
const frame: StreamingDataFrame = response?.data[0];
expect(frame).toBeInstanceOf(StreamingDataFrame);
expect(frame.fields).toEqual([
{
config: {},
name: 'time',
type: FieldType.time,
values: {
buffer: [100, 101],
},
},
{
config: {},
name: 'a',
type: FieldType.string,
values: {
buffer: ['a', 'b'],
},
},
{
config: {},
name: 'b',
type: FieldType.number,
values: {
buffer: [1, 2],
},
},
]);
});
it('should return an empty frame if first message was not a full frame', async () => {
const dummySubject = new Subject<StreamingDataQueryResponse>();
deps.centrifugeSrv.getDataStream.mockReturnValueOnce(dummySubject);
let response: DataQueryResponse | undefined;
liveService.getDataStream(liveDataStreamOptions).subscribe((next) => {
response = next;
});
dummySubject.next({
data: [
{
type: StreamingResponseDataType.NewValuesSameSchema,
values: [
[100, 101],
['a', 'b'],
[1, 2],
],
},
],
});
expect(response).not.toBeUndefined();
const frame: StreamingDataFrame = response?.data[0];
expect(frame).toBeInstanceOf(StreamingDataFrame);
expect(frame.fields).toEqual([]);
});
});

View File

@ -1,14 +1,10 @@
import { BackendSrv, GrafanaLiveSrv, LiveDataStreamOptions } from '@grafana/runtime';
import { CentrifugeSrv } from './centrifuge/service';
import { BackendSrv, GrafanaLiveSrv } from '@grafana/runtime';
import { CentrifugeSrv, StreamingDataQueryResponse } from './centrifuge/service';
import { Observable } from 'rxjs';
import {
DataQueryResponse,
LiveChannelAddress,
LiveChannelEvent,
LiveChannelPresenceStatus,
toLiveChannelId,
} from '@grafana/data';
import { toLiveChannelId } from '@grafana/data';
import { StreamingDataFrame } from './data/StreamingDataFrame';
import { isStreamingResponseData, StreamingResponseDataType } from './data/utils';
import { map } from 'rxjs';
type GrafanaLiveServiceDeps = {
centrifugeSrv: CentrifugeSrv;
@ -21,42 +17,71 @@ export class GrafanaLiveService implements GrafanaLiveSrv {
/**
* Listen for changes to the connection state
*/
getConnectionState(): Observable<boolean> {
getConnectionState = () => {
return this.deps.centrifugeSrv.getConnectionState();
}
};
/**
* Connect to a channel and return results as DataFrames
*/
getDataStream(options: LiveDataStreamOptions): Observable<DataQueryResponse> {
return this.deps.centrifugeSrv.getDataStream(options);
}
getDataStream: GrafanaLiveSrv['getDataStream'] = (options) => {
let buffer: StreamingDataFrame;
const updateBuffer = (next: StreamingDataQueryResponse): void => {
const data = next.data[0];
if (!buffer && !isStreamingResponseData(data, StreamingResponseDataType.FullFrame)) {
console.warn(`expected first packet to contain a full frame, received ${data?.type}`);
return;
}
switch (data.type) {
case StreamingResponseDataType.FullFrame: {
buffer = StreamingDataFrame.deserialize(data.frame);
return;
}
case StreamingResponseDataType.NewValuesSameSchema: {
buffer.pushNewValues(data.values);
return;
}
}
};
return this.deps.centrifugeSrv.getDataStream(options).pipe(
map((next) => {
updateBuffer(next);
return {
...next,
data: [buffer ?? StreamingDataFrame.empty()],
};
})
);
};
/**
* Watch for messages in a channel
*/
getStream<T>(address: LiveChannelAddress): Observable<LiveChannelEvent<T>> {
return this.deps.centrifugeSrv.getStream<T>(address);
}
getStream: GrafanaLiveSrv['getStream'] = (address) => {
return this.deps.centrifugeSrv.getStream(address);
};
/**
* Publish into a channel
*
* @alpha -- experimental
*/
async publish(address: LiveChannelAddress, data: any): Promise<any> {
publish: GrafanaLiveSrv['publish'] = async (address, data) => {
return this.deps.backendSrv.post(`api/live/publish`, {
channel: toLiveChannelId(address), // orgId is from user
data,
});
}
};
/**
* For channels that support presence, this will request the current state from the server.
*
* Join and leave messages will be sent to the open stream
*/
async getPresence(address: LiveChannelAddress): Promise<LiveChannelPresenceStatus> {
getPresence: GrafanaLiveSrv['getPresence'] = (address) => {
return this.deps.centrifugeSrv.getPresence(address);
}
};
}

View File

@ -3,7 +3,7 @@ import { Button, CodeEditor, Table, useStyles, Field } from '@grafana/ui';
import { ChannelFrame, Rule } from './types';
import { getBackendSrv, config } from '@grafana/runtime';
import { css } from '@emotion/css';
import { getDisplayProcessor, GrafanaTheme, StreamingDataFrame } from '@grafana/data';
import { dataFrameFromJSON, getDisplayProcessor, GrafanaTheme } from '@grafana/data';
interface Props {
rule: Rule;
@ -30,7 +30,7 @@ export const RuleTest: React.FC<Props> = (props) => {
if (t) {
setResponse(
t.map((f) => {
const frame = new StreamingDataFrame(f.frame);
const frame = dataFrameFromJSON(f.frame);
for (const field of frame.fields) {
field.display = getDisplayProcessor({ field, theme: config.theme2 });
}

View File

@ -36,6 +36,8 @@ import {
import { getDashboardQueryRunner } from './DashboardQueryRunner/DashboardQueryRunner';
import { mergePanelAndDashData } from './mergePanelAndDashData';
import { PanelModel } from '../../dashboard/state';
import { isStreamingDataFrame } from 'app/features/live/data/utils';
import { StreamingDataFrame } from 'app/features/live/data/StreamingDataFrame';
export interface QueryRunnerOptions<
TQuery extends DataQuery = DataQuery,
@ -56,6 +58,7 @@ export interface QueryRunnerOptions<
}
let counter = 100;
export function getNextRequestId() {
return 'Q' + counter++;
}
@ -83,11 +86,8 @@ export class PanelQueryRunner {
const { withFieldConfig, withTransforms } = options;
let structureRev = 1;
let lastData: DataFrame[] = [];
let processedCount = 0;
let isFirstPacket = true;
let lastConfigRev = -1;
const fastCompare = (a: DataFrame, b: DataFrame) => {
return compareDataFrameStructures(a, b, true);
};
if (this.dataConfigSource.snapshotData) {
const snapshotPanelData: PanelData = {
@ -102,24 +102,21 @@ export class PanelQueryRunner {
this.getTransformationsStream(withTransforms),
map((data: PanelData) => {
let processedData = data;
let sameStructure = false;
let streamingPacketWithSameSchema = false;
if (withFieldConfig && data.series?.length) {
// Apply field defaults and overrides
let fieldConfig = this.dataConfigSource.getFieldOverrideOptions();
let processFields = fieldConfig != null;
if (lastConfigRev === this.dataConfigSource.configRev) {
const streamingDataFrame = data.series.find((data) => isStreamingDataFrame(data)) as
| StreamingDataFrame
| undefined;
// If the shape is the same, we can skip field overrides
if (
data.state === LoadingState.Streaming &&
processFields &&
processedCount > 0 &&
lastData.length &&
lastConfigRev === this.dataConfigSource.configRev
) {
const sameTypes = compareArrayValues(lastData, processedData.series, fastCompare);
if (sameTypes) {
// Keep the previous field config settings
if (
streamingDataFrame &&
!streamingDataFrame.packetInfo.schemaChanged &&
// TODO: remove the condition below after fixing
// https://github.com/grafana/grafana/pull/41492#issuecomment-970281430
lastData[0].fields.length === streamingDataFrame.fields.length
) {
processedData = {
...processedData,
series: lastData.map((frame, frameIndex) => ({
@ -131,20 +128,21 @@ export class PanelQueryRunner {
state: {
...field.state,
calcs: undefined,
// add global range calculation here? (not optimal for streaming)
range: undefined,
},
})),
})),
};
processFields = false;
sameStructure = true;
streamingPacketWithSameSchema = true;
}
}
if (processFields) {
// Apply field defaults and overrides
let fieldConfig = this.dataConfigSource.getFieldOverrideOptions();
if (fieldConfig != null && (isFirstPacket || !streamingPacketWithSameSchema)) {
lastConfigRev = this.dataConfigSource.configRev!;
processedCount++; // results with data
processedData = {
...processedData,
series: applyFieldOverrides({
@ -153,16 +151,16 @@ export class PanelQueryRunner {
...fieldConfig!,
}),
};
isFirstPacket = false;
}
}
if (!sameStructure) {
sameStructure = compareArrayValues(lastData, processedData.series, compareDataFrameStructures);
}
if (!sameStructure) {
if (
!streamingPacketWithSameSchema &&
!compareArrayValues(lastData, processedData.series, compareDataFrameStructures)
) {
structureRev++;
}
lastData = processedData.series;
return { ...processedData, structureRev };

View File

@ -181,6 +181,19 @@ export function callQueryMethod(
return from(returnVal);
}
function getProcessedDataFrame(data: DataQueryResponseData): DataFrame {
const dataFrame = guessFieldTypes(toDataFrame(data));
if (dataFrame.fields && dataFrame.fields.length) {
// clear out the cached info
for (const field of dataFrame.fields) {
field.state = null;
}
}
return dataFrame;
}
/**
* All panels will be passed tables that have our best guess at column type set
*
@ -191,22 +204,7 @@ export function getProcessedDataFrames(results?: DataQueryResponseData[]): DataF
return [];
}
const dataFrames: DataFrame[] = [];
for (const result of results) {
const dataFrame = guessFieldTypes(toDataFrame(result));
if (dataFrame.fields && dataFrame.fields.length) {
// clear out the cached info
for (const field of dataFrame.fields) {
field.state = null;
}
}
dataFrames.push(dataFrame);
}
return dataFrames;
return results.map((data) => getProcessedDataFrame(data));
}
export function preProcessPanelData(data: PanelData, lastResult?: PanelData): PanelData {
@ -227,7 +225,7 @@ export function preProcessPanelData(data: PanelData, lastResult?: PanelData): Pa
// Make sure the data frames are properly formatted
const STARTTIME = performance.now();
const processedDataFrames = getProcessedDataFrames(series);
const processedDataFrames = series.map((data) => getProcessedDataFrame(data));
const annotationsProcessed = getProcessedDataFrames(annotations);
const STOPTIME = performance.now();

View File

@ -1,5 +1,11 @@
import { from, merge, Observable, of } from 'rxjs';
import { DataSourceWithBackend, getBackendSrv, getGrafanaLiveSrv, getTemplateSrv } from '@grafana/runtime';
import {
DataSourceWithBackend,
getBackendSrv,
getGrafanaLiveSrv,
getTemplateSrv,
StreamingFrameOptions,
} from '@grafana/runtime';
import {
AnnotationQuery,
AnnotationQueryRequest,
@ -10,7 +16,6 @@ import {
DataSourceRef,
isValidLiveChannelAddress,
parseLiveChannelAddress,
StreamingFrameOptions,
toDataFrame,
} from '@grafana/data';
@ -88,7 +93,7 @@ export class GrafanaDatasource extends DataSourceWithBackend<GrafanaQuery> {
if (!isValidLiveChannelAddress(addr)) {
continue;
}
const buffer: StreamingFrameOptions = {
const buffer: Partial<StreamingFrameOptions> = {
maxLength: request.maxDataPoints ?? 500,
};
if (target.buffer) {

View File

@ -9,7 +9,6 @@ import {
CSVReader,
Field,
LoadingState,
StreamingDataFrame,
DataFrameSchema,
DataFrameData,
} from '@grafana/data';
@ -17,6 +16,7 @@ import {
import { TestDataQuery, StreamingQuery } from './types';
import { getRandomLine } from './LogIpsum';
import { liveTimer } from 'app/features/dashboard/dashgrid/liveTimer';
import { StreamingDataFrame } from 'app/features/live/data/StreamingDataFrame';
export const defaultStreamQuery: StreamingQuery = {
type: 'signal',
@ -65,7 +65,7 @@ export function runSignalStream(
schema.fields.push({ name: 'Max' + suffix, type: FieldType.number });
}
const frame = new StreamingDataFrame({ schema }, { maxLength: maxDataPoints });
const frame = StreamingDataFrame.fromDataFrameJSON({ schema }, { maxLength: maxDataPoints });
let value = Math.random() * 100;
let timeoutId: any = null;

View File

@ -13,7 +13,6 @@ import {
PanelData,
LoadingState,
applyFieldOverrides,
StreamingDataFrame,
LiveChannelAddress,
} from '@grafana/data';
import { TablePanel } from '../table/TablePanel';
@ -21,6 +20,7 @@ import { LivePanelOptions, MessageDisplayMode } from './types';
import { config, getGrafanaLiveSrv } from '@grafana/runtime';
import { css, cx } from '@emotion/css';
import { isEqual } from 'lodash';
import { StreamingDataFrame } from 'app/features/live/data/StreamingDataFrame';
interface Props extends PanelProps<LivePanelOptions> {}

View File

@ -5,7 +5,7 @@ import { TimelineMode, TimelineOptions } from './types';
import { TimelineChart } from './TimelineChart';
import { prepareTimelineFields, prepareTimelineLegendItems } from './utils';
import { StateTimelineTooltip } from './StateTimelineTooltip';
import { getLastStreamingDataFramePacket } from '@grafana/data/src/dataframe/StreamingDataFrame';
import { getLastStreamingDataFramePacket } from 'app/features/live/data/StreamingDataFrame';
interface TimelinePanelProps extends PanelProps<TimelineOptions> {}