Live: streaming labels field (#34031)

Co-authored-by: Ryan McKinley <ryantxu@gmail.com>
This commit is contained in:
Leon Sorokin 2021-05-13 02:33:11 -05:00 committed by GitHub
parent 601455190e
commit 4aaf141ddb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 437 additions and 132 deletions

View File

@ -214,4 +214,213 @@ describe('Streaming JSON', () => {
const copy = ({ ...stream } as any) as DataFrame;
expect(copy.length).toEqual(2);
});
describe('streaming labels column', () => {
const stream = new StreamingDataFrame(
{
schema: {
fields: [
{ name: 'labels', type: FieldType.string },
{ name: 'time', type: FieldType.time },
{ name: 'speed', type: FieldType.number },
{ name: 'light', type: FieldType.number },
],
},
},
{
maxLength: 4,
}
);
stream.push({
data: {
values: [
['sensor=A', 'sensor=B'],
[100, 100],
[10, 15],
[1, 2],
],
},
});
stream.push({
data: {
values: [
['sensor=B', 'sensor=C'],
[200, 200],
[20, 25],
[3, 4],
],
},
});
stream.push({
data: {
values: [
['sensor=A', 'sensor=C'],
[300, 400],
[30, 40],
[5, 6],
],
},
});
expect(stream.fields.map((f) => ({ name: f.name, labels: f.labels, values: f.values.buffer })))
.toMatchInlineSnapshot(`
Array [
Object {
"labels": undefined,
"name": "time",
"values": Array [
100,
200,
300,
400,
],
},
Object {
"labels": Object {
"sensor": "A",
},
"name": "speed",
"values": Array [
10,
undefined,
30,
undefined,
],
},
Object {
"labels": Object {
"sensor": "A",
},
"name": "light",
"values": Array [
1,
undefined,
5,
undefined,
],
},
Object {
"labels": Object {
"sensor": "B",
},
"name": "speed",
"values": Array [
15,
20,
undefined,
undefined,
],
},
Object {
"labels": Object {
"sensor": "B",
},
"name": "light",
"values": Array [
2,
3,
undefined,
undefined,
],
},
Object {
"labels": Object {
"sensor": "C",
},
"name": "speed",
"values": Array [
undefined,
25,
undefined,
40,
],
},
Object {
"labels": Object {
"sensor": "C",
},
"name": "light",
"values": Array [
undefined,
4,
undefined,
6,
],
},
]
`);
});
/*
describe('transpose vertical records', () => {
let vrecsA = [
['sensor=A', 'sensor=B'],
[100, 100],
[10, 15],
];
let vrecsB = [
['sensor=B', 'sensor=C'],
[200, 200],
[20, 25],
];
let vrecsC = [
['sensor=A', 'sensor=C'],
[300, 400],
[30, 40],
];
let cTables = transpose(vrecsC);
expect(cTables).toMatchInlineSnapshot(`
Array [
Array [
"sensor=A",
"sensor=C",
],
Array [
Array [
Array [
300,
],
Array [
30,
],
],
Array [
Array [
400,
],
Array [
40,
],
],
],
]
`);
let cJoined = join(cTables[1]);
expect(cJoined).toMatchInlineSnapshot(`
Array [
Array [
300,
400,
],
Array [
30,
undefined,
],
Array [
undefined,
40,
],
]
`);
});
*/
});

View File

@ -1,8 +1,233 @@
import { Field, DataFrame, FieldType } from '../types/dataFrame';
import { QueryResultMeta } from '../types';
import { Labels, QueryResultMeta } from '../types';
import { ArrayVector } from '../vector';
import { DataFrameJSON, decodeFieldValueEntities } from './DataFrameJSON';
import { DataFrameJSON, decodeFieldValueEntities, FieldSchema } from './DataFrameJSON';
import { guessFieldTypeFromValue } from './processDataFrame';
import { join } from '../transformations/transformers/joinDataFrames';
import { AlignedData } from 'uplot';
/**
* @alpha
*/
export interface StreamingFrameOptions {
maxLength?: number; // 1000
maxDelta?: number; // how long to keep things
}
enum PushMode {
wide,
labels,
// long
}
/**
* Unlike a circular buffer, this will append and periodically slice the front
*
* @alpha
*/
export class StreamingDataFrame implements DataFrame {
name?: string;
refId?: string;
meta?: QueryResultMeta;
fields: Array<Field<any, ArrayVector<any>>> = [];
length = 0;
options: StreamingFrameOptions;
private schemaFields: FieldSchema[] = [];
private timeFieldIndex = -1;
private pushMode = PushMode.wide;
// current labels
private labels: Set<string> = new Set();
constructor(frame: DataFrameJSON, opts?: StreamingFrameOptions) {
this.options = {
maxLength: 1000,
maxDelta: Infinity,
...opts,
};
this.push(frame);
}
/**
* apply the new message to the existing data. This will replace the existing schema
* if a new schema is included in the message, or append data matching the current schema
*/
push(msg: DataFrameJSON) {
const { schema, data } = msg;
if (schema) {
this.pushMode = PushMode.wide;
this.timeFieldIndex = schema.fields.findIndex((f) => f.type === FieldType.time);
if (
this.timeFieldIndex === 1 &&
schema.fields[0].name === 'labels' &&
schema.fields[0].type === FieldType.string
) {
this.pushMode = PushMode.labels;
this.timeFieldIndex = 0; // after labels are removed!
}
const niceSchemaFields = this.pushMode === PushMode.labels ? schema.fields.slice(1) : schema.fields;
// create new fields from the schema
const newFields = niceSchemaFields.map((f, idx) => {
return {
config: f.config ?? {},
name: f.name,
labels: f.labels,
type: f.type ?? FieldType.other,
// transfer old values by type & name, unless we relied on labels to match fields
values:
this.pushMode === PushMode.wide
? this.fields.find((of) => of.name === f.name && f.type === of.type)?.values ?? new ArrayVector()
: new ArrayVector(),
};
});
this.name = schema.name;
this.refId = schema.refId;
this.meta = schema.meta;
this.schemaFields = niceSchemaFields;
this.fields = newFields;
}
if (data && data.values.length && data.values[0].length) {
let { values, entities } = data;
if (entities) {
entities.forEach((ents, i) => {
if (ents) {
decodeFieldValueEntities(ents, values[i]);
// TODO: append replacements to field
}
});
}
if (this.pushMode === PushMode.labels) {
// augment and transform data to match current schema for standard circPush() path
const labeledTables = transpose(values);
// make sure fields are initalized for each label
for (const label of labeledTables.keys()) {
if (!this.labels.has(label)) {
this.addLabel(label);
}
}
// TODO: cache higher up
let dummyTable = Array(this.schemaFields.length).fill([]);
let tables: AlignedData[] = [];
this.labels.forEach((label) => {
tables.push(labeledTables.get(label) ?? dummyTable);
});
values = join(tables);
}
if (values.length !== this.fields.length) {
if (this.fields.length) {
throw new Error(`push message mismatch. Expected: ${this.fields.length}, recieved: ${values.length}`);
}
this.fields = values.map((vals, idx) => {
let name = `Field ${idx}`;
let type = guessFieldTypeFromValue(vals[0]);
const isTime = idx === 0 && type === FieldType.number && vals[0] > 1600016688632;
if (isTime) {
type = FieldType.time;
name = 'Time';
}
return {
name,
type,
config: {},
values: new ArrayVector([]),
};
});
}
let curValues = this.fields.map((f) => f.values.buffer);
let appended = circPush(curValues, values, this.options.maxLength, this.timeFieldIndex, this.options.maxDelta);
appended.forEach((v, i) => {
const { state, values } = this.fields[i];
values.buffer = v;
if (state) {
state.calcs = undefined;
}
});
// Update the frame length
this.length = appended[0].length;
}
}
// adds a set of fields for a new label
private addLabel(label: string) {
let labelCount = this.labels.size;
// parse labels
const parsedLabels: Labels = {};
label.split(',').forEach((kv) => {
const [key, val] = kv.trim().split('=');
parsedLabels[key] = val;
});
if (labelCount === 0) {
// mutate existing fields and add labels
this.fields.forEach((f, i) => {
if (i > 0) {
f.labels = parsedLabels;
}
});
} else {
for (let i = 1; i < this.schemaFields.length; i++) {
let proto = this.schemaFields[i] as Field;
this.fields.push({
...proto,
config: proto.config ?? {},
labels: parsedLabels,
values: new ArrayVector(Array(this.length).fill(undefined)),
});
}
}
this.labels.add(label);
}
}
// converts vertical insertion records with table keys in [0] and column values in [1...N]
// to join()-able tables with column arrays
export function transpose(vrecs: any[][]) {
let tableKeys = new Set(vrecs[0]);
let tables = new Map();
tableKeys.forEach((key) => {
let cols = Array(vrecs.length - 1)
.fill(null)
.map(() => []);
tables.set(key, cols);
});
for (let r = 0; r < vrecs[0].length; r++) {
let table = tables.get(vrecs[0][r]);
for (let c = 1; c < vrecs.length; c++) {
table[c - 1].push(vrecs[c][r]);
}
}
return tables;
}
// binary search for index of closest value
function closestIdx(num: number, arr: number[], lo?: number, hi?: number) {
@ -61,132 +286,3 @@ function circPush(data: number[][], newData: number[][], maxLength = Infinity, d
return data;
}
/**
* @alpha
*/
export interface StreamingFrameOptions {
maxLength?: number; // 1000
maxDelta?: number; // how long to keep things
}
/**
* Unlike a circular buffer, this will append and periodically slice the front
*
* @alpha
*/
export class StreamingDataFrame implements DataFrame {
name?: string;
refId?: string;
meta?: QueryResultMeta;
// raw field buffers
fields: Array<Field<any, ArrayVector<any>>> = [];
options: StreamingFrameOptions;
length = 0;
private timeFieldIndex = -1;
constructor(frame: DataFrameJSON, opts?: StreamingFrameOptions) {
this.options = {
maxLength: 1000,
maxDelta: Infinity,
...opts,
};
this.push(frame);
}
/**
* apply the new message to the existing data. This will replace the existing schema
* if a new schema is included in the message, or append data matching the current schema
*/
push(msg: DataFrameJSON) {
const { schema, data } = msg;
if (schema) {
// Keep old values if they are the same shape
let oldValues: ArrayVector[] | undefined;
if (schema.fields.length === this.fields.length) {
let same = true;
oldValues = this.fields.map((f, idx) => {
const oldField = this.fields[idx];
if (f.name !== oldField.name || f.type !== oldField.type) {
same = false;
}
return f.values;
});
if (!same) {
oldValues = undefined;
}
}
this.name = schema.name;
this.refId = schema.refId;
this.meta = schema.meta;
// Create new fields from the schema
this.fields = schema.fields.map((f, idx) => {
return {
config: f.config ?? {},
name: f.name,
labels: f.labels,
type: f.type ?? FieldType.other,
values: oldValues ? oldValues[idx] : new ArrayVector(),
};
});
this.timeFieldIndex = this.fields.findIndex((f) => f.type === FieldType.time);
}
if (data && data.values.length && data.values[0].length) {
const { values, entities } = data;
if (values.length !== this.fields.length) {
if (this.fields.length) {
throw new Error(`push message mismatch. Expected: ${this.fields.length}, recieved: ${values.length}`);
}
this.fields = values.map((vals, idx) => {
let name = `Field ${idx}`;
let type = guessFieldTypeFromValue(vals[0]);
const isTime = idx === 0 && type === FieldType.number && vals[0] > 1600016688632;
if (isTime) {
type = FieldType.time;
name = 'Time';
}
return {
name,
type,
config: {},
values: new ArrayVector([]),
};
});
}
if (entities) {
entities.forEach((ents, i) => {
if (ents) {
decodeFieldValueEntities(ents, values[i]);
// TODO: append replacements to field
}
});
}
let curValues = this.fields.map((f) => f.values.buffer);
let appended = circPush(curValues, values, this.options.maxLength, this.timeFieldIndex, this.options.maxDelta);
appended.forEach((v, i) => {
const { state, values } = this.fields[i];
values.buffer = v;
if (state) {
state.calcs = undefined;
}
});
// Update the frame length
this.length = appended[0].length;
}
}
}

View File

@ -6,6 +6,6 @@ export * from './processDataFrame';
export * from './dimensions';
export * from './ArrayDataFrame';
export * from './DataFrameJSON';
export * from './StreamingDataFrame';
export { StreamingDataFrame, StreamingFrameOptions } from './StreamingDataFrame';
export * from './frameComparisons';
export { anySeriesWithTimeField } from './utils';