Live: Fix support for StreamingFrameAction.Replace (#46086)

* fix StreamingFrameAction.replace

* simplify

* beautify

* cover `streamingDataFrame.ts` changes with unit tests

* formatting

* cover `LiveDataStream.ts` changes with unit tests

* update frame length after pushing new values
This commit is contained in:
Artur Wierzbicki 2022-03-04 14:04:30 +04:00 committed by GitHub
parent 1dca39fb91
commit bdbb9ed54a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 468 additions and 35 deletions

View File

@ -192,6 +192,17 @@ describe('LiveDataStream', () => {
action: StreamingFrameAction.Append,
},
},
withReplaceMode: {
addr: dummyLiveChannelAddress,
buffer: {
maxLength: 5,
maxDelta: 10,
action: StreamingFrameAction.Replace,
},
filter: {
fields: ['time', 'b'],
},
},
};
const dataFrameJsons = {
@ -216,6 +227,11 @@ describe('LiveDataStream', () => {
values: [[102], ['c'], [3]],
},
}),
schema1newValues2: () => ({
data: {
values: [[103], ['d'], [4]],
},
}),
schema2: () => ({
schema: {
fields: [
@ -235,7 +251,7 @@ describe('LiveDataStream', () => {
}),
};
describe('happy path with a single subscriber', () => {
describe('happy path with a single subscriber in `append` mode', () => {
let deps: ReturnType<typeof createDeps>;
let liveDataStream: LiveDataStream<any>;
const valuesCollection = new ValuesCollection<DataQueryResponse>();
@ -477,6 +493,229 @@ describe('LiveDataStream', () => {
});
});
describe('happy path with a single subscriber in `replace` mode', () => {
let deps: ReturnType<typeof createDeps>;
let liveDataStream: LiveDataStream<any>;
const valuesCollection = new ValuesCollection<DataQueryResponse>();
beforeAll(() => {
deps = createDeps();
expect(deps.liveEventsObservable.observed).toBeFalsy();
expect(deps.subscriberReadiness.observed).toBeFalsy();
liveDataStream = new LiveDataStream(deps);
valuesCollection.subscribeTo(liveDataStream.get(liveDataStreamOptions.withReplaceMode, subscriptionKey));
});
it('should emit the first live channel message event as a serialized streamingDataFrame', async () => {
const valuesCount = valuesCollection.valuesCount();
deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1()));
expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount + 1, complete: false });
const response = valuesCollection.lastValue();
expectStreamingResponse(response, StreamingResponseDataType.FullFrame);
const data = response.data[0] as StreamingResponseData<StreamingResponseDataType.FullFrame>;
expect(data.frame.options).toEqual(liveDataStreamOptions.withReplaceMode.buffer);
const deserializedFrame = StreamingDataFrame.deserialize(data.frame);
expect(deserializedFrame.fields).toEqual([
{
config: {},
name: 'time',
type: 'time',
values: {
buffer: [100, 101],
},
},
{
config: {},
name: 'b',
type: 'number',
values: {
buffer: [1, 2],
},
},
]);
expect(deserializedFrame.length).toEqual(dataFrameJsons.schema1().data.values[0].length);
});
it('should emit subsequent messages as deltas if the schema stays the same', async () => {
const valuesCount = valuesCollection.valuesCount();
deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1newValues()));
expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount + 1, complete: false });
const response = valuesCollection.lastValue();
expectStreamingResponse(response, StreamingResponseDataType.NewValuesSameSchema);
const data = response.data[0] as StreamingResponseData<StreamingResponseDataType.NewValuesSameSchema>;
expect(data.values).toEqual([[102], [3]]);
});
it('should emit a full frame if schema changes', async () => {
const valuesCount = valuesCollection.valuesCount();
deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema2()));
expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount + 1, complete: false });
const response = valuesCollection.lastValue();
expectStreamingResponse(response, StreamingResponseDataType.FullFrame);
const data = response.data[0] as StreamingResponseData<StreamingResponseDataType.FullFrame>;
expect(fieldsOf(data)).toEqual([
{
name: 'time',
values: [103],
},
{
name: 'b',
values: ['y'],
},
]);
});
it('should emit a full frame if received a status live channel event with error', async () => {
const valuesCount = valuesCollection.valuesCount();
const error = new Error(`oh no!`);
deps.liveEventsObservable.next(liveChannelStatusEvent(LiveChannelConnectionState.Connected, error));
expectValueCollectionState(valuesCollection, {
errors: 0,
values: valuesCount + 1,
complete: false,
});
const response = valuesCollection.lastValue();
expectErrorResponse(response, StreamingResponseDataType.FullFrame);
});
it('should buffer new values until subscriber is ready', async () => {
const valuesCount = valuesCollection.valuesCount();
deps.subscriberReadiness.next(false);
deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema2newValues()));
expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false });
deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema2newValues()));
expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false });
deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema2newValues()));
expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false });
deps.subscriberReadiness.next(true);
expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount + 1, complete: false });
const response = valuesCollection.lastValue();
expectStreamingResponse(response, StreamingResponseDataType.NewValuesSameSchema);
const data = response.data[0] as StreamingResponseData<StreamingResponseDataType.NewValuesSameSchema>;
expect(data.values).toEqual([[104], ['o']]);
});
it(`should reduce buffer to a full frame if schema changed at any point during subscriber's unavailability`, async () => {
const valuesCount = valuesCollection.valuesCount();
deps.subscriberReadiness.next(false);
deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema2newValues()));
expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false });
deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema2newValues()));
expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false });
deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1()));
expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false });
deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1newValues()));
expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false });
deps.subscriberReadiness.next(true);
expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount + 1, complete: false });
const response = valuesCollection.lastValue();
expectStreamingResponse(response, StreamingResponseDataType.FullFrame);
expect(fieldsOf(response.data[0])).toEqual([
{
name: 'time',
values: [102],
},
{
name: 'b',
values: [3],
},
]);
});
it(`should reduce buffer to an empty full frame with last error if one or more errors occur during subscriber's unavailability`, async () => {
const firstError = new Error('first error');
const secondError = new Error(dummyErrorMessage);
const valuesCount = valuesCollection.valuesCount();
deps.subscriberReadiness.next(false);
deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1newValues()));
deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1newValues()));
deps.liveEventsObservable.next(liveChannelStatusEvent(LiveChannelConnectionState.Connected, firstError));
deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1newValues()));
deps.liveEventsObservable.next(liveChannelStatusEvent(LiveChannelConnectionState.Connected, secondError));
deps.subscriberReadiness.next(true);
expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount + 1, complete: false });
const response = valuesCollection.lastValue();
expectErrorResponse(response, StreamingResponseDataType.FullFrame);
const errorMessage = response?.error?.message;
expect(errorMessage?.includes(dummyErrorMessage)).toBeTruthy();
expect(fieldsOf(response.data[0])).toEqual([
{
name: 'time',
values: [],
},
{
name: 'b',
values: [],
},
]);
});
it('should ignore messages without payload', async () => {
const valuesCount = valuesCollection.valuesCount();
deps.liveEventsObservable.next(liveChannelStatusEvent(LiveChannelConnectionState.Connected));
deps.liveEventsObservable.next(liveChannelStatusEvent(LiveChannelConnectionState.Pending));
deps.liveEventsObservable.next(liveChannelStatusEvent(LiveChannelConnectionState.Pending));
deps.liveEventsObservable.next(liveChannelLeaveEvent());
expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false });
});
it(`should shutdown when source observable completes`, async () => {
expect(deps.onShutdown).not.toHaveBeenCalled();
expect(deps.subscriberReadiness.observed).toBeTruthy();
expect(deps.liveEventsObservable.observed).toBeTruthy();
deps.liveEventsObservable.complete();
expectValueCollectionState(valuesCollection, {
errors: 0,
values: valuesCollection.valuesCount(),
complete: true,
});
expect(deps.subscriberReadiness.observed).toBeFalsy();
expect(deps.liveEventsObservable.observed).toBeFalsy();
expect(deps.onShutdown).toHaveBeenCalled();
});
});
describe('single subscriber with initial frame', () => {
it('should emit the initial frame right after subscribe', async () => {
const deps = createDeps();
@ -614,6 +853,7 @@ describe('LiveDataStream', () => {
withTimeBFilter: new ValuesCollection<DataQueryResponse>(),
withTimeAFilter: new ValuesCollection<DataQueryResponse>(),
withoutFilter: new ValuesCollection<DataQueryResponse>(),
withReplaceMode: new ValuesCollection<DataQueryResponse>(),
};
beforeAll(() => {
@ -637,16 +877,18 @@ describe('LiveDataStream', () => {
valuesCollections.withoutFilter.subscribeTo(
liveDataStream.get(liveDataStreamOptions.withoutFilter, subscriptionKey)
);
console.log(JSON.stringify(valuesCollections.withTimeAFilter, null, 2));
valuesCollections.withReplaceMode.subscribeTo(
liveDataStream.get(liveDataStreamOptions.withReplaceMode, subscriptionKey)
);
expectValueCollectionState(valuesCollections.withTimeAFilter, { errors: 0, values: 2, complete: false });
expectValueCollectionState(valuesCollections.withTimeBFilter, { errors: 0, values: 1, complete: false });
expectValueCollectionState(valuesCollections.withoutFilter, { errors: 0, values: 1, complete: false });
expectValueCollectionState(valuesCollections.withReplaceMode, { errors: 0, values: 1, complete: false });
});
it('should emit filtered data to each subscriber', async () => {
deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1newValues()));
deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1newValues2()));
expect(
mapValues(valuesCollections, (collection) =>
collection.values.map((response) => {
@ -671,7 +913,7 @@ describe('LiveDataStream', () => {
},
],
[[102], ['c']],
[[102], ['c']],
[[103], ['d']],
],
withTimeBFilter: [
[
@ -684,7 +926,7 @@ describe('LiveDataStream', () => {
values: [2, 3],
},
],
[[102], [3]],
[[103], [4]],
],
withoutFilter: [
[
@ -701,7 +943,21 @@ describe('LiveDataStream', () => {
values: [1, 2, 3],
},
],
[[102], ['c'], [3]],
[[103], ['d'], [4]],
],
withReplaceMode: [
// only last packet
[
{
name: 'time',
values: [102],
},
{
name: 'b',
values: [3],
},
],
[[103], [4]],
],
});
});
@ -714,6 +970,7 @@ describe('LiveDataStream', () => {
withTimeAFilter: true,
withTimeBFilter: false,
withoutFilter: false,
withReplaceMode: false,
});
expect(deps.subscriberReadiness.observed).toBeTruthy();
expect(deps.liveEventsObservable.observed).toBeTruthy();
@ -727,6 +984,7 @@ describe('LiveDataStream', () => {
withTimeAFilter: true,
withTimeBFilter: true,
withoutFilter: true,
withReplaceMode: true,
});
expect(deps.subscriberReadiness.observed).toBeFalsy();
expect(deps.liveEventsObservable.observed).toBeFalsy();

View File

@ -1,4 +1,4 @@
import type { LiveDataStreamOptions, StreamingFrameOptions } from '@grafana/runtime/src/services/live';
import { LiveDataStreamOptions, StreamingFrameAction, StreamingFrameOptions } from '@grafana/runtime/src/services/live';
import { toDataQueryError } from '@grafana/runtime/src/utils/toDataQueryError';
import {
DataFrameJSON,
@ -228,30 +228,89 @@ export class LiveDataStream<T = unknown> {
this.resizeBuffer(buffer);
this.prepareInternalStreamForNewSubscription(options);
const shouldSendLastPacketOnly = options?.buffer?.action === StreamingFrameAction.Replace;
const fieldsNamesFilter = options.filter?.fields;
const dataNeedsFiltering = fieldsNamesFilter?.length;
const fieldFilterPredicate = dataNeedsFiltering ? ({ name }: Field) => fieldsNamesFilter.includes(name) : undefined;
let matchingFieldIndexes: number[] | undefined = undefined;
const getFullFrameResponseData = (error?: DataQueryError): StreamingDataQueryResponse => {
const getFullFrameResponseData = <T>(
messages: InternalStreamMessage[],
error?: DataQueryError
): StreamingDataQueryResponse => {
matchingFieldIndexes = fieldFilterPredicate
? this.frameBuffer.getMatchingFieldIndexes(fieldFilterPredicate)
: undefined;
if (!shouldSendLastPacketOnly) {
return {
key: subKey,
state: error ? LoadingState.Error : LoadingState.Streaming,
data: [
{
type: StreamingResponseDataType.FullFrame,
frame: this.frameBuffer.serialize(fieldFilterPredicate, buffer),
},
],
error,
};
}
if (error) {
// send empty frame with error
return {
key: subKey,
state: LoadingState.Error,
data: [
{
type: StreamingResponseDataType.FullFrame,
frame: this.frameBuffer.serialize(fieldFilterPredicate, buffer, { maxLength: 0 }),
},
],
error,
};
}
if (!messages.length) {
console.warn(`expected to find at least one non error message ${messages.map(({ type }) => type)}`);
// send empty frame
return {
key: subKey,
state: LoadingState.Streaming,
data: [
{
type: StreamingResponseDataType.FullFrame,
frame: this.frameBuffer.serialize(fieldFilterPredicate, buffer, { maxLength: 0 }),
},
],
error,
};
}
return {
key: subKey,
state: error ? LoadingState.Error : LoadingState.Streaming,
state: LoadingState.Streaming,
data: [
{
type: StreamingResponseDataType.FullFrame,
frame: this.frameBuffer.serialize(fieldFilterPredicate, buffer),
frame: this.frameBuffer.serialize(fieldFilterPredicate, buffer, {
maxLength: this.frameBuffer.packetInfo.length,
}),
},
],
error,
};
};
const getNewValuesSameSchemaResponseData = (values: unknown[][]): StreamingDataQueryResponse => {
const getNewValuesSameSchemaResponseData = (
messages: Array<InternalStreamMessage<InternalStreamMessageType.NewValuesSameSchema>>
): StreamingDataQueryResponse => {
const lastMessage = messages.length ? messages[messages.length - 1] : undefined;
const values =
shouldSendLastPacketOnly && lastMessage
? lastMessage.values
: reduceNewValuesSameSchemaMessages(messages).values;
const filteredValues = matchingFieldIndexes
? values.filter((v, i) => (matchingFieldIndexes as number[]).includes(i))
: values;
@ -277,18 +336,18 @@ export class LiveDataStream<T = unknown> {
if (shouldSendFullFrame) {
shouldSendFullFrame = false;
return getFullFrameResponseData(lastError);
return getFullFrameResponseData(messages, lastError);
}
if (errors.length) {
// send the latest frame with the last error, discard everything else
return getFullFrameResponseData(lastError);
return getFullFrameResponseData(messages, lastError);
}
const schemaChanged = messages.some((n) => n.type === InternalStreamMessageType.ChangedSchema);
if (schemaChanged) {
// send the latest frame, discard intermediate appends
return getFullFrameResponseData();
return getFullFrameResponseData(messages, undefined);
}
const newValueSameSchemaMessages = filterMessages(messages, InternalStreamMessageType.NewValuesSameSchema);
@ -296,7 +355,7 @@ export class LiveDataStream<T = unknown> {
console.warn(`unsupported message type ${messages.map(({ type }) => type)}`);
}
return getNewValuesSameSchemaResponseData(reduceNewValuesSameSchemaMessages(newValueSameSchemaMessages).values);
return getNewValuesSameSchemaResponseData(newValueSameSchemaMessages);
})
);

View File

@ -367,7 +367,8 @@ describe('Streaming JSON', () => {
});
it('should resize the buffer', function () {
const serializedFrame = frame.serialize((f) => ['time', 'name'].includes(f.name), { maxLength: 2 });
const options = { maxLength: 2 };
const serializedFrame = frame.serialize((f) => ['time', 'name'].includes(f.name), options);
expect(serializedFrame.fields).toEqual([
{
config: {},
@ -383,6 +384,48 @@ describe('Streaming JSON', () => {
},
]);
});
it('should trim values and retain option override values', function () {
const options = { maxLength: 2 };
const trimValues = { maxLength: 1 };
const serializedFrame = frame.serialize((f) => ['time', 'name'].includes(f.name), options, trimValues);
expect(serializedFrame.fields).toEqual([
{
config: {},
name: 'time',
type: 'time',
values: [300],
},
{
config: {},
name: 'name',
type: 'string',
values: ['c'],
},
]);
expect(serializedFrame.options.maxLength).toEqual(options.maxLength);
});
it('should use maxLength from options if its lower than maxLength from trimValues', function () {
const options = { maxLength: 1 };
const trimValues = { maxLength: 2 };
const serializedFrame = frame.serialize((f) => ['time', 'name'].includes(f.name), options, trimValues);
expect(serializedFrame.fields).toEqual([
{
config: {},
name: 'time',
type: 'time',
values: [300],
},
{
config: {},
name: 'name',
type: 'string',
values: ['c'],
},
]);
expect(serializedFrame.options.maxLength).toEqual(options.maxLength);
});
});
describe('resizing', function () {
@ -501,14 +544,20 @@ describe('Streaming JSON', () => {
},
};
const serializedFrame = StreamingDataFrame.fromDataFrameJSON(json, {
maxLength: 5,
maxDelta: 300,
}).serialize();
it('should support pushing new values matching the existing schema', function () {
const frame = StreamingDataFrame.deserialize(serializedFrame);
frame.pushNewValues([[601], ['x'], [10]]);
it('should support pushing new values matching the existing schema in `append` mode', function () {
const frame = StreamingDataFrame.deserialize(
StreamingDataFrame.fromDataFrameJSON(json, {
maxLength: 5,
maxDelta: 300,
}).serialize()
);
expect(frame.length).toEqual(3);
frame.pushNewValues([
[601, 602],
['x', 'y'],
[10, 11],
]);
expect(frame.length).toEqual(3);
expect(frame.fields.map((f) => ({ name: f.name, value: f.values.buffer }))).toMatchInlineSnapshot(`
Array [
Object {
@ -516,6 +565,7 @@ describe('Streaming JSON', () => {
"value": Array [
300,
601,
602,
],
},
Object {
@ -523,6 +573,7 @@ describe('Streaming JSON', () => {
"value": Array [
"c",
"x",
"y",
],
},
Object {
@ -530,6 +581,49 @@ describe('Streaming JSON', () => {
"value": Array [
3,
10,
11,
],
},
]
`);
});
it('should support pushing new values matching the existing schema in `replace` mode', function () {
const frame = StreamingDataFrame.deserialize(
StreamingDataFrame.fromDataFrameJSON(json, {
maxLength: 5,
maxDelta: 300,
action: StreamingFrameAction.Replace,
}).serialize()
);
expect(frame.length).toEqual(3);
frame.pushNewValues([
[601, 602],
['x', 'y'],
[10, 11],
]);
expect(frame.length).toEqual(2);
expect(frame.fields.map((f) => ({ name: f.name, value: f.values.buffer }))).toMatchInlineSnapshot(`
Array [
Object {
"name": "time",
"value": Array [
601,
602,
],
},
Object {
"name": "name",
"value": Array [
"x",
"y",
],
},
Object {
"name": "value",
"value": Array [
10,
11,
],
},
]

View File

@ -90,14 +90,17 @@ export class StreamingDataFrame implements DataFrame {
serialize = (
fieldPredicate?: (f: Field) => boolean,
optionsOverride?: Partial<StreamingFrameOptions>
optionsOverride?: Partial<StreamingFrameOptions>,
trimValues?: {
maxLength?: number;
}
): SerializedStreamingDataFrame => {
const options = optionsOverride ? Object.assign({}, { ...this.options, ...optionsOverride }) : this.options;
const dataFrameDTO = toFilteredDataFrameDTO(this, fieldPredicate);
const numberOfItemsToRemove = getNumberOfItemsToRemove(
dataFrameDTO.fields.map((f) => f.values) as unknown[][],
options.maxLength,
typeof trimValues?.maxLength === 'number' ? Math.min(trimValues.maxLength, options.maxLength) : options.maxLength,
this.timeFieldIndex,
options.maxDelta
);
@ -357,18 +360,37 @@ export class StreamingDataFrame implements DataFrame {
return;
}
this.packetInfo.action = StreamingFrameAction.Append;
this.packetInfo.action = this.options.action;
this.packetInfo.number++;
this.packetInfo.length = values[0].length;
this.packetInfo.schemaChanged = false;
circPush(
this.fields.map((f) => f.values.buffer),
values,
this.options.maxLength,
this.timeFieldIndex,
this.options.maxDelta
);
if (this.options.action === StreamingFrameAction.Append) {
circPush(
this.fields.map((f) => f.values.buffer),
values,
this.options.maxLength,
this.timeFieldIndex,
this.options.maxDelta
);
} else {
values.forEach((v, i) => {
if (this.fields[i]?.values) {
this.fields[i].values.buffer = v;
}
});
assureValuesAreWithinLengthLimit(
this.fields.map((f) => f.values.buffer),
this.options.maxLength,
this.timeFieldIndex,
this.options.maxDelta
);
}
const newLength = this.fields?.[0]?.values?.buffer?.length;
if (newLength !== undefined) {
this.length = newLength;
}
};
resetStateCalculations = () => {