DataFrameJSON: introduce a json format that supports same semantics as arrow (#31010)

This commit is contained in:
Ryan McKinley 2021-03-05 12:24:43 -08:00 committed by GitHub
parent 04e82add68
commit a61e636bcd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 487 additions and 1 deletions

View File

@ -0,0 +1,86 @@
import { FieldType } from '../types/dataFrame';
import { DataFrameJSON, dataFrameFromJSON } from './DataFrameJSON';
describe('DataFrame JSON', () => {
describe('when called with a DataFrame', () => {
it('should decode values not supported natively in JSON (e.g. NaN, Infinity)', () => {
const json: DataFrameJSON = {
schema: {
fields: [
{ name: 'time', type: FieldType.time },
{ name: 'name', type: FieldType.string },
{ name: 'value', type: FieldType.number },
],
},
data: {
values: [
[100, 200, 300],
['a', 'b', 'c'],
[1, 2, 3],
],
entities: [
null, // nothing to replace, but keeps the index
{ NaN: [0], Inf: [1], Undef: [2] },
{ NegInf: [2] },
],
},
};
const frame = dataFrameFromJSON(json);
expect(frame).toMatchInlineSnapshot(`
Object {
"fields": Array [
Object {
"config": Object {},
"entities": Object {},
"name": "time",
"type": "time",
"values": Array [
100,
200,
300,
],
},
Object {
"config": Object {},
"entities": Object {
"Inf": Array [
1,
],
"NaN": Array [
0,
],
"Undef": Array [
2,
],
},
"name": "name",
"type": "string",
"values": Array [
NaN,
Infinity,
undefined,
],
},
Object {
"config": Object {},
"entities": Object {
"NegInf": Array [
2,
],
},
"name": "value",
"type": "number",
"values": Array [
1,
2,
-Infinity,
],
},
],
"length": 3,
}
`);
});
});
});

View File

@ -0,0 +1,196 @@
import { DataFrame, FieldType, FieldConfig, Labels, QueryResultMeta } from '../types';
import { ArrayVector } from '../vector';
import { guessFieldTypeFromNameAndValue } from './processDataFrame';
/**
* The JSON transfer object for DataFrames. Values are stored in simple JSON
*
* @alpha
*/
export interface DataFrameJSON {
/**HACK: this will get removed, but will help transition telegraf streaming
*
* In telegraf, this will be: ${name}${labels}
*/
key?: string;
/**
* The schema defines the field type and configuration.
*/
schema?: DataFrameSchema;
/**
* The field data
*/
data?: DataFrameData;
}
/**
* @alpha
*/
export interface DataFrameData {
/**
* A columnar store that matches fields defined by schema.
*/
values: any[][];
/**
* Since JSON cannot encode NaN, Inf, -Inf, and undefined, these entities
* are decoded after JSON.parse() using this struct
*/
entities?: Array<FieldValueEntityLookup | null>;
/**
* Holds value bases per field so we can encode numbers from fixed points
* e.g. [1612900958, 1612900959, 1612900960] -> 1612900958 + [0, 1, 2]
*/
bases?: number[];
/**
* Holds value multipliers per field so we can encode large numbers concisely
* e.g. [4900000000, 35000000000] -> 1e9 + [4.9, 35]
*/
factors?: number[];
/**
* Holds enums per field so we can encode recurring values as ints
* e.g. ["foo", "foo", "baz", "foo"] -> ["foo", "baz"] + [0,0,1,0]
*/
enums?: any[][];
}
/**
* The JSON transfer object for DataFrames. Values are stored in simple JSON
*
* @alpha
*/
export interface DataFrameSchema {
/**
* Matches the query target refId
*/
refId?: string;
/**
* Initial response global metadata
*/
meta?: QueryResultMeta;
/**
* Frame name
*/
name?: string;
/**
* Field definition without any metadata
*/
fields?: FieldSchema[];
}
/**
* Field object passed over JSON
*
* @alpha
*/
export interface FieldSchema {
name: string; // The column name
type?: FieldType;
config?: FieldConfig;
labels?: Labels;
}
/**
* Since JSON cannot encode NaN, Inf, -Inf, and undefined, the locations
* of these entities in field value arrays are stored here for restoration
* after JSON.parse()
*
* @alpha
*/
export interface FieldValueEntityLookup {
NaN?: number[];
Undef?: number[]; // Missing because of absense or join
Inf?: number[];
NegInf?: number[];
}
const ENTITY_MAP: Record<keyof FieldValueEntityLookup, any> = {
Inf: Infinity,
NegInf: -Infinity,
Undef: undefined,
NaN: NaN,
};
/**
* @internal use locally
*/
export function decodeFieldValueEntities(lookup: FieldValueEntityLookup, values: any[]) {
if (!lookup || !values) {
return;
}
for (const key in lookup) {
const repl = ENTITY_MAP[key as keyof FieldValueEntityLookup];
for (const idx of lookup[key as keyof FieldValueEntityLookup]!) {
if (idx < values.length) {
values[idx] = repl;
}
}
}
}
function guessFieldType(name: string, values: any[]): FieldType {
for (const v of values) {
if (v != null) {
return guessFieldTypeFromNameAndValue(name, v);
}
}
return FieldType.other;
}
/**
* NOTE: dto.data.values will be mutated and decoded/inflated using entities,bases,factors,enums
*
* @alpha
*/
export function dataFrameFromJSON(dto: DataFrameJSON): DataFrame {
const { schema, data } = dto;
if (!schema || !schema.fields) {
throw new Error('JSON needs a fields definition');
}
// Find the longest field length
const length = data ? data.values.reduce((max, vals) => Math.max(max, vals.length), 0) : 0;
const fields = schema.fields.map((f, index) => {
let buffer = data ? data.values[index] : [];
let origLen = buffer.length;
if (origLen !== length) {
buffer.length = length;
// avoid sparse arrays
buffer.fill(undefined, origLen);
}
let entities: FieldValueEntityLookup | undefined | null;
if ((entities = data && data.entities && data.entities[index])) {
decodeFieldValueEntities(entities, buffer);
}
// TODO: expand arrays further using bases,factors,enums
return {
...f,
type: f.type ?? guessFieldType(f.name, buffer),
config: f.config ?? {},
values: new ArrayVector(buffer),
// the presence of this prop is an optimization signal & lookup for consumers
entities: entities ?? {},
};
});
return {
...schema,
fields,
length,
};
}

View File

@ -0,0 +1,94 @@
import { FieldType } from '../types/dataFrame';
import { DataFrameJSON, dataFrameFromJSON } from './DataFrameJSON';
import { StreamingDataFrame } from './StreamingDataFrame';
describe('Streaming JSON', () => {
describe('when called with a DataFrame', () => {
it('should decode values not supported natively in JSON (e.g. NaN, Infinity)', () => {
const json: DataFrameJSON = {
schema: {
fields: [
{ name: 'time', type: FieldType.time },
{ name: 'name', type: FieldType.string },
{ name: 'value', type: FieldType.number },
],
},
data: {
values: [
[100, 200, 300],
['a', 'b', 'c'],
[1, 2, 3],
],
},
};
const stream = new StreamingDataFrame(dataFrameFromJSON(json));
expect(stream.fields.map((f) => ({ name: f.name, value: f.values.buffer }))).toMatchInlineSnapshot(`
Array [
Object {
"name": "time",
"value": Array [
100,
200,
300,
],
},
Object {
"name": "name",
"value": Array [
"a",
"b",
"c",
],
},
Object {
"name": "value",
"value": Array [
1,
2,
3,
],
},
]
`);
stream.update({
data: {
values: [[400], ['d'], [4]],
},
});
expect(stream.fields.map((f) => ({ name: f.name, value: f.values.buffer }))).toMatchInlineSnapshot(`
Array [
Object {
"name": "time",
"value": Array [
100,
200,
300,
400,
],
},
Object {
"name": "name",
"value": Array [
"a",
"b",
"c",
"d",
],
},
Object {
"name": "value",
"value": Array [
1,
2,
3,
4,
],
},
]
`);
});
});
});

View File

@ -0,0 +1,104 @@
import { Field, DataFrame, FieldType } from '../types/dataFrame';
import { QueryResultMeta } from '../types';
import { ArrayVector } from '../vector';
import { DataFrameJSON, decodeFieldValueEntities } from './DataFrameJSON';
/**
* @alpha
*/
export interface StreamingFrameOptions {
maxLength?: number; // 1000
maxSeconds?: number; // how long to keep things
}
/**
* Unlike a circular buffer, this will append and periodically slice the front
*
* @alpha
*/
export class StreamingDataFrame implements DataFrame {
name?: string;
refId?: string;
meta?: QueryResultMeta;
// raw field buffers
fields: Array<Field<any, ArrayVector<any>>> = [];
options: StreamingFrameOptions;
private lastUpdateTime = 0;
private timeFieldIndex = -1;
constructor(frame: DataFrame, opts?: StreamingFrameOptions) {
this.name = frame.name;
this.refId = frame.refId;
this.meta = frame.meta;
this.options = {
maxLength: 1000,
...opts,
};
// Keep the existing fields
this.fields = frame.fields.map((f) => {
if (f.values instanceof ArrayVector) {
return f as Field<any, ArrayVector<any>>;
}
return {
...f,
values: new ArrayVector(f.values.toArray()),
};
});
this.timeFieldIndex = this.fields.findIndex((f) => f.type === FieldType.time);
}
get length() {
if (!this.fields.length) {
return 0;
}
return this.fields[0].values.length;
}
update(msg: DataFrameJSON) {
if (msg.schema) {
// TODO, replace the existing fields
}
if (msg.data) {
const data = msg.data;
const { values, entities } = data;
if (entities) {
entities.forEach((ents, i) => {
if (ents) {
decodeFieldValueEntities(ents, values[i]);
// TODO: append replacements to field
}
});
}
this.fields.forEach((f, i) => {
f.values.buffer.push(...values[i]);
});
// Shorten the array less frequently than we append
const elapsed = Date.now() - this.lastUpdateTime;
if (elapsed > 5000) {
if (this.options.maxSeconds && this.timeFieldIndex >= 0) {
// TODO -- check time length
}
if (this.options.maxLength) {
const delta = this.length - this.options.maxLength;
if (delta > 0) {
this.fields.forEach((f) => {
f.values.buffer = f.values.buffer.slice(delta);
});
}
}
}
this.lastUpdateTime = Date.now();
}
}
}

View File

@ -6,4 +6,6 @@ export * from './processDataFrame';
export * from './dimensions';
export * from './ArrowDataFrame';
export * from './ArrayDataFrame';
export * from './DataFrameJSON';
export * from './StreamingDataFrame';
export * from './frameComparisons';

View File

@ -22,6 +22,9 @@ import { UPlotChart } from '../uPlot/Plot';
import { LegendDisplayMode, VizLegendOptions } from '../VizLegend/types';
import { VizLayout } from '../VizLayout/VizLayout';
/**
* @internal -- not a public API
*/
export const FIXED_UNIT = '__fixed';
export interface GraphNGProps extends Themeable {

View File

@ -19,6 +19,7 @@ export interface GraphNGLegendEvent {
mode: GraphNGLegendEventMode;
}
/** @alpha */
export interface XYFieldMatchers {
x: FieldMatcher; // first match
y: FieldMatcher;

View File

@ -144,7 +144,7 @@ export class LivePanel extends PureComponent<Props, State> {
const json = this.props.options?.json;
if (json) {
const rsp = await channel.publish(json);
console.log('GOT', rsp);
console.log('onPublishClicked (response from publish)', rsp);
} else {
console.log('nothing to publish');
}