StreamingDataFrame: use concat/slice, add maxDelta support (#32047)

This commit is contained in:
Leon Sorokin 2021-03-22 18:44:05 -05:00 committed by GitHub
parent fbe5f1ce4b
commit 8c4cbd39aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 215 additions and 58 deletions

View File

@ -1,28 +1,32 @@
import { FieldType } from '../types/dataFrame';
import { DataFrame, FieldType } from '../types/dataFrame';
import { DataFrameJSON } from './DataFrameJSON';
import { StreamingDataFrame } from './StreamingDataFrame';
describe('Streaming JSON', () => {
describe('when called with a DataFrame', () => {
it('should decode values not supported natively in JSON (e.g. NaN, Infinity)', () => {
const json: DataFrameJSON = {
schema: {
fields: [
{ name: 'time', type: FieldType.time },
{ name: 'name', type: FieldType.string },
{ name: 'value', type: FieldType.number },
],
},
data: {
values: [
[100, 200, 300],
['a', 'b', 'c'],
[1, 2, 3],
],
},
};
const json: DataFrameJSON = {
schema: {
fields: [
{ name: 'time', type: FieldType.time },
{ name: 'name', type: FieldType.string },
{ name: 'value', type: FieldType.number },
],
},
data: {
values: [
[100, 200, 300],
['a', 'b', 'c'],
[1, 2, 3],
],
},
};
const stream = new StreamingDataFrame(json);
const stream = new StreamingDataFrame(json, {
maxLength: 5,
maxDelta: 300,
});
it('should create frame with schema & data', () => {
expect(stream.fields.map((f) => ({ name: f.name, value: f.values.buffer }))).toMatchInlineSnapshot(`
Array [
Object {
@ -51,8 +55,10 @@ describe('Streaming JSON', () => {
},
]
`);
});
stream.update({
it('should append new data to frame', () => {
stream.push({
data: {
values: [[400], ['d'], [4]],
},
@ -90,5 +96,117 @@ describe('Streaming JSON', () => {
]
`);
});
it('should append new data and slice based on maxDelta', () => {
stream.push({
data: {
values: [[500], ['e'], [5]],
},
});
expect(stream.fields.map((f) => ({ name: f.name, value: f.values.buffer }))).toMatchInlineSnapshot(`
Array [
Object {
"name": "time",
"value": Array [
200,
300,
400,
500,
],
},
Object {
"name": "name",
"value": Array [
"b",
"c",
"d",
"e",
],
},
Object {
"name": "value",
"value": Array [
2,
3,
4,
5,
],
},
]
`);
});
it('should append new data and slice based on maxLength', () => {
stream.push({
data: {
values: [
[501, 502, 503],
['f', 'g', 'h'],
[6, 7, 8, 9],
],
},
});
expect(stream.fields.map((f) => ({ name: f.name, value: f.values.buffer }))).toMatchInlineSnapshot(`
Array [
Object {
"name": "time",
"value": Array [
400,
500,
501,
502,
503,
],
},
Object {
"name": "name",
"value": Array [
"d",
"e",
"f",
"g",
"h",
],
},
Object {
"name": "value",
"value": Array [
4,
5,
6,
7,
8,
9,
],
},
]
`);
});
});
describe('lengths property is accurate', () => {
const stream = new StreamingDataFrame(
{
schema: {
fields: [{ name: 'simple', type: FieldType.number }],
},
data: {
values: [[100]],
},
},
{
maxLength: 5,
}
);
expect(stream.length).toEqual(1);
stream.push({
data: { values: [[200]] },
});
expect(stream.length).toEqual(2);
const copy = ({ ...stream } as any) as DataFrame;
expect(copy.length).toEqual(2);
});
});

View File

@ -3,12 +3,70 @@ import { QueryResultMeta } from '../types';
import { ArrayVector } from '../vector';
import { DataFrameJSON, decodeFieldValueEntities } from './DataFrameJSON';
// binary search for index of closest value
function closestIdx(num: number, arr: number[], lo?: number, hi?: number) {
let mid;
lo = lo || 0;
hi = hi || arr.length - 1;
let bitwise = hi <= 2147483647;
while (hi - lo > 1) {
mid = bitwise ? (lo + hi) >> 1 : Math.floor((lo + hi) / 2);
if (arr[mid] < num) {
lo = mid;
} else {
hi = mid;
}
}
if (num - arr[lo] <= arr[hi] - num) {
return lo;
}
return hi;
}
// mutable circular push
function circPush(data: number[][], newData: number[][], maxLength = Infinity, deltaIdx = 0, maxDelta = Infinity) {
for (let i = 0; i < data.length; i++) {
data[i] = data[i].concat(newData[i]);
}
const nlen = data[0].length;
let sliceIdx = 0;
if (nlen > maxLength) {
sliceIdx = nlen - maxLength;
}
if (maxDelta !== Infinity) {
const deltaLookup = data[deltaIdx];
const low = deltaLookup[sliceIdx];
const high = deltaLookup[nlen - 1];
if (high - low > maxDelta) {
sliceIdx = closestIdx(high - maxDelta, deltaLookup, sliceIdx);
}
}
if (sliceIdx) {
for (let i = 0; i < data.length; i++) {
data[i] = data[i].slice(sliceIdx);
}
}
return data;
}
/**
* @alpha
*/
export interface StreamingFrameOptions {
maxLength?: number; // 1000
maxSeconds?: number; // how long to keep things
maxDelta?: number; // how long to keep things
}
/**
@ -25,29 +83,25 @@ export class StreamingDataFrame implements DataFrame {
fields: Array<Field<any, ArrayVector<any>>> = [];
options: StreamingFrameOptions;
private lastUpdateTime = 0;
length = 0;
private timeFieldIndex = -1;
constructor(frame: DataFrameJSON, opts?: StreamingFrameOptions) {
this.options = {
maxLength: 1000,
maxDelta: Infinity,
...opts,
};
this.update(frame);
}
get length() {
if (!this.fields.length) {
return 0;
}
return this.fields[0].values.length;
this.push(frame);
}
/**
* apply the new message to the existing data. This will replace the existing schema
* if a new schema is included in the message, or append data matching the current schema
*/
update(msg: DataFrameJSON) {
push(msg: DataFrameJSON) {
const { schema, data } = msg;
if (schema) {
// Keep old values if they are the same shape
@ -87,7 +141,7 @@ export class StreamingDataFrame implements DataFrame {
if (data && data.values.length && data.values[0].length) {
const { values, entities } = data;
if (values.length !== this.fields.length) {
throw new Error('update message mismatch');
throw new Error('push message mismatch');
}
if (entities) {
@ -99,31 +153,16 @@ export class StreamingDataFrame implements DataFrame {
});
}
this.fields.forEach((f, i) => {
f.values.buffer.push(...values[i]);
let curValues = this.fields.map((f) => f.values.buffer);
let appended = circPush(curValues, values, this.options.maxLength, this.timeFieldIndex, this.options.maxDelta);
appended.forEach((v, i) => {
this.fields[i].values.buffer = v;
});
// Shorten the array less frequently than we append
const now = Date.now();
const elapsed = now - this.lastUpdateTime;
if (elapsed > 5000) {
if (this.options.maxSeconds && this.timeFieldIndex >= 0 && this.length > 2) {
// TODO -- check time length
const tf = this.fields[this.timeFieldIndex].values.buffer;
const elapsed = tf[tf.length - 1] - tf[0];
console.log('Check elapsed time: ', elapsed);
}
if (this.options.maxLength) {
const delta = this.length - this.options.maxLength;
if (delta > 0) {
this.fields.forEach((f) => {
f.values.buffer = f.values.buffer.slice(delta);
});
}
}
this.lastUpdateTime = now;
}
// Update the frame length
this.length = appended[0].length;
}
}
}

View File

@ -34,7 +34,6 @@ describe('MeasurementCollector', () => {
const frames = collector.getData();
expect(frames.length).toEqual(1);
(frames[0] as any).lastUpdateTime = 0;
expect(frames[0]).toMatchInlineSnapshot(`
StreamingDataFrame {
"fields": Array [
@ -63,10 +62,11 @@ describe('MeasurementCollector', () => {
],
},
],
"lastUpdateTime": 0,
"length": 4,
"meta": undefined,
"name": undefined,
"options": Object {
"maxDelta": Infinity,
"maxLength": 600,
},
"refId": undefined,

View File

@ -75,7 +75,7 @@ export class MeasurementCollector implements LiveMeasurements {
let s = this.measurements.get(key);
if (s) {
s.update(measure);
s.push(measure);
} else {
s = new StreamingDataFrame(measure, this.config); //
this.measurements.set(key, s);

View File

@ -88,7 +88,7 @@ export function runSignalStream(
}
const event = { data };
return frame.update(event);
return frame.push(event);
};
// Fill the buffer on init