Live: fix StreamingDataFrame length on replace action and schema change (#53796)

* streaming-data-frame

* add more expect.length in tests
This commit is contained in:
Artur Wierzbicki 2022-08-18 00:58:38 +04:00 committed by GitHub
parent 070393075a
commit 2a0ae74f96
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 13 additions and 1 deletions

View File

@ -353,6 +353,7 @@ describe('LiveDataStream', () => {
values: [undefined, 'y'], // bug in streamingDataFrame - fix!
},
]);
expect(StreamingDataFrame.deserialize(data.frame).length).toEqual(2);
});
it('should emit a full frame if received a status live channel event with error', async () => {
@ -369,6 +370,7 @@ describe('LiveDataStream', () => {
const response = valuesCollection.lastValue();
expectErrorResponse(response, StreamingResponseDataType.FullFrame);
expect(StreamingDataFrame.deserialize(response.data[0].frame).length).toEqual(2); // contains previously populated values
});
it('should buffer new values until subscriber is ready', async () => {
@ -431,6 +433,7 @@ describe('LiveDataStream', () => {
values: [2, 3],
},
]);
expect(StreamingDataFrame.deserialize(response.data[0].frame).length).toEqual(2);
});
it(`should reduce buffer to a full frame with last error if one or more errors occur during subscriber's unavailability`, async () => {
@ -579,6 +582,8 @@ describe('LiveDataStream', () => {
values: ['y'],
},
]);
const deserializedFrame = StreamingDataFrame.deserialize(data.frame);
expect(deserializedFrame.length).toEqual(1);
});
it('should emit a full frame if received a status live channel event with error', async () => {
@ -595,6 +600,7 @@ describe('LiveDataStream', () => {
const response = valuesCollection.lastValue();
expectErrorResponse(response, StreamingResponseDataType.FullFrame);
expect(StreamingDataFrame.deserialize(response.data[0].frame).length).toEqual(0);
});
it('should buffer new values until subscriber is ready', async () => {
@ -654,6 +660,8 @@ describe('LiveDataStream', () => {
values: [3],
},
]);
const data = response.data[0] as StreamingResponseData<StreamingResponseDataType.FullFrame>;
expect(StreamingDataFrame.deserialize(data.frame).length).toEqual(1);
});
it(`should reduce buffer to an empty full frame with last error if one or more errors occur during subscriber's unavailability`, async () => {
@ -688,6 +696,7 @@ describe('LiveDataStream', () => {
values: [],
},
]);
expect(StreamingDataFrame.deserialize(response.data[0].frame).length).toEqual(0);
});
it('should ignore messages without payload', async () => {
@ -751,6 +760,7 @@ describe('LiveDataStream', () => {
values: ['y'], // bug in streamingDataFrame - fix!
},
]);
expect(StreamingDataFrame.deserialize(response.data[0].frame).length).toEqual(1);
});
});

View File

@ -110,6 +110,8 @@ export class StreamingDataFrame implements DataFrame {
values: (f.values as unknown[]).slice(numberOfItemsToRemove),
}));
const length = dataFrameDTO.fields[0]?.values?.length ?? 0
return {
...dataFrameDTO,
// TODO: Labels and schema are not filtered by field
@ -119,7 +121,7 @@ export class StreamingDataFrame implements DataFrame {
name: this.name,
refId: this.refId,
meta: this.meta,
length: this.length,
length,
timeFieldIndex: this.timeFieldIndex,
pushMode: this.pushMode,
packetInfo: this.packetInfo,