Chore: Move StreamingDataFrame to @grafana/data (#72014)

This commit is contained in:
Andres Martinez Gotor
2023-07-24 13:30:52 +02:00
committed by GitHub
parent 3c48701f08
commit 987624f8cf
22 changed files with 75 additions and 69 deletions

View File

@@ -13,10 +13,10 @@ import {
LiveChannelLeaveEvent,
LiveChannelScope,
LoadingState,
StreamingDataFrame,
} from '@grafana/data';
import { StreamingFrameAction } from '@grafana/runtime';
import { StreamingDataFrame } from '../data/StreamingDataFrame';
import { isStreamingResponseData, StreamingResponseData, StreamingResponseDataType } from '../data/utils';
import { DataStreamHandlerDeps, LiveDataStream } from './LiveDataStream';

View File

@@ -10,11 +10,12 @@ import {
LiveChannelEvent,
LiveChannelId,
LoadingState,
StreamingDataFrame,
} from '@grafana/data';
import { getStreamingFrameOptions } from '@grafana/data/src/dataframe/StreamingDataFrame';
import { LiveDataStreamOptions, StreamingFrameAction, StreamingFrameOptions } from '@grafana/runtime/src/services/live';
import { toDataQueryError } from '@grafana/runtime/src/utils/toDataQueryError';
import { getStreamingFrameOptions, StreamingDataFrame } from '../data/StreamingDataFrame';
import { StreamingResponseDataType } from '../data/utils';
import { DataStreamSubscriptionKey, StreamingDataQueryResponse } from './service';

View File

@@ -1,996 +0,0 @@
import { DataFrame, DataFrameJSON, FieldType, getFieldDisplayName, reduceField, ReducerID } from '@grafana/data';
import { StreamingFrameAction, StreamingFrameOptions } from '@grafana/runtime';
import { closestIdx, getStreamingFrameOptions, StreamingDataFrame } from './StreamingDataFrame';
describe('Streaming JSON', () => {
describe('closestIdx', function () {
[
{
num: 10,
arr: [2, 3, 4, 5, 6],
expected: 4,
descr: 'bigger than all in array',
},
{
num: 10,
arr: [2, 3, 4, 5, 11, 12, 13],
expected: 4,
descr: 'bigger than some in array #1 - smaller difference to bigger number',
},
{
num: 10,
arr: [2, 3, 4, 5, 16, 17, 18],
expected: 3,
descr: 'bigger than some in array #2 - smaller difference to smaller number',
},
{
num: 10,
arr: [2, 3, 4, 9, 11, 12, 13],
expected: 3,
descr: 'bigger than some in array #3 - same difference between smaller and bigger number - favors smaller',
},
{
num: 10,
arr: [9, 10, 11, 12, 13, 14],
expected: 1,
descr: 'present in the array',
},
{
num: 10,
arr: [10, 11, 12, 13, 14],
expected: 0,
descr: 'present in the array on first position',
},
{
num: 10,
arr: [5, 6, 7, 8, 9, 10],
expected: 5,
descr: 'present in the array on last position',
},
{
num: 10,
arr: [11, 12, 13, 14, 15],
expected: 0,
descr: 'smaller than all in array',
},
{
num: 10,
arr: [],
expected: -1,
descr: 'empty array',
},
].forEach(({ num, arr, expected, descr }) => {
it(descr, () => {
expect(closestIdx(num, arr)).toEqual(expected);
});
});
});
describe('when called with a DataFrame', () => {
const json: DataFrameJSON = {
schema: {
fields: [
{ name: 'time', type: FieldType.time },
{ name: 'name', type: FieldType.string },
{ name: 'value', type: FieldType.number },
],
},
data: {
values: [
[100, 200, 300],
['a', 'b', 'c'],
[1, 2, 3],
],
},
};
const stream = StreamingDataFrame.fromDataFrameJSON(json, {
maxLength: 5,
maxDelta: 300,
});
it('should create frame with schema & data', () => {
expect(stream.fields.map((f) => ({ name: f.name, value: f.values }))).toMatchInlineSnapshot(`
[
{
"name": "time",
"value": [
100,
200,
300,
],
},
{
"name": "name",
"value": [
"a",
"b",
"c",
],
},
{
"name": "value",
"value": [
1,
2,
3,
],
},
]
`);
});
it('should append new data to frame', () => {
stream.push({
data: {
values: [[400], ['d'], [4]],
},
});
expect(stream.fields.map((f) => ({ name: f.name, value: f.values }))).toMatchInlineSnapshot(`
[
{
"name": "time",
"value": [
100,
200,
300,
400,
],
},
{
"name": "name",
"value": [
"a",
"b",
"c",
"d",
],
},
{
"name": "value",
"value": [
1,
2,
3,
4,
],
},
]
`);
});
it('should append new data and slice based on maxDelta', () => {
stream.push({
data: {
values: [[500], ['e'], [5]],
},
});
expect(stream.fields.map((f) => ({ name: f.name, value: f.values }))).toMatchInlineSnapshot(`
[
{
"name": "time",
"value": [
200,
300,
400,
500,
],
},
{
"name": "name",
"value": [
"b",
"c",
"d",
"e",
],
},
{
"name": "value",
"value": [
2,
3,
4,
5,
],
},
]
`);
});
it('should append new data and slice based on maxLength', () => {
stream.push({
data: {
values: [
[501, 502, 503],
['f', 'g', 'h'],
[6, 7, 8, 9],
],
},
});
expect(stream.fields.map((f) => ({ name: f.name, value: f.values }))).toMatchInlineSnapshot(`
[
{
"name": "time",
"value": [
400,
500,
501,
502,
503,
],
},
{
"name": "name",
"value": [
"d",
"e",
"f",
"g",
"h",
],
},
{
"name": "value",
"value": [
4,
5,
6,
7,
8,
9,
],
},
]
`);
});
it('should append data with new schema and fill missed values with undefined', () => {
stream.push({
schema: {
fields: [
{ name: 'time', type: FieldType.time },
{ name: 'name', type: FieldType.string },
{ name: 'value', type: FieldType.number },
{ name: 'value2', type: FieldType.number },
],
},
data: {
values: [[601], ['i'], [10], [-10]],
},
});
expect(stream.fields.map((f) => ({ name: f.name, value: f.values }))).toMatchInlineSnapshot(`
[
{
"name": "time",
"value": [
500,
501,
502,
503,
601,
],
},
{
"name": "name",
"value": [
"e",
"f",
"g",
"h",
"i",
],
},
{
"name": "value",
"value": [
5,
6,
7,
8,
9,
10,
],
},
{
"name": "value2",
"value": [
undefined,
undefined,
undefined,
undefined,
-10,
],
},
]
`);
});
it('should be able to return values from previous packet', function () {
stream.push({
data: {
values: [
[602, 603],
['j', 'k'],
[11, 12],
[-11, -12],
],
},
});
expect(stream.getValuesFromLastPacket()).toEqual([
[602, 603],
['j', 'k'],
[11, 12],
[-11, -12],
]);
});
});
describe('serialization', function () {
const json: DataFrameJSON = {
schema: {
fields: [
{ name: 'time', type: FieldType.time },
{ name: 'name', type: FieldType.string },
{ name: 'value', type: FieldType.number },
],
},
data: {
values: [
[100, 200, 300],
['a', 'b', 'c'],
[1, 2, 3],
],
},
};
const frame = StreamingDataFrame.fromDataFrameJSON(json, {
maxLength: 5,
maxDelta: 300,
});
it('should filter fields', function () {
const serializedFrame = frame.serialize((f) => ['time'].includes(f.name));
expect(serializedFrame.fields).toEqual([
{
config: {},
name: 'time',
type: 'time',
values: [100, 200, 300],
},
]);
});
it('should resize the buffer', function () {
const options = { maxLength: 2 };
const serializedFrame = frame.serialize((f) => ['time', 'name'].includes(f.name), options);
expect(serializedFrame.fields).toEqual([
{
config: {},
name: 'time',
type: 'time',
values: [200, 300],
},
{
config: {},
name: 'name',
type: 'string',
values: ['b', 'c'],
},
]);
});
it('should trim values and retain option override values', function () {
const options = { maxLength: 2 };
const trimValues = { maxLength: 1 };
const serializedFrame = frame.serialize((f) => ['time', 'name'].includes(f.name), options, trimValues);
expect(serializedFrame.fields).toEqual([
{
config: {},
name: 'time',
type: 'time',
values: [300],
},
{
config: {},
name: 'name',
type: 'string',
values: ['c'],
},
]);
expect(serializedFrame.options.maxLength).toEqual(options.maxLength);
});
it('should use maxLength from options if its lower than maxLength from trimValues', function () {
const options = { maxLength: 1 };
const trimValues = { maxLength: 2 };
const serializedFrame = frame.serialize((f) => ['time', 'name'].includes(f.name), options, trimValues);
expect(serializedFrame.fields).toEqual([
{
config: {},
name: 'time',
type: 'time',
values: [300],
},
{
config: {},
name: 'name',
type: 'string',
values: ['c'],
},
]);
expect(serializedFrame.options.maxLength).toEqual(options.maxLength);
});
});
describe('resizing', function () {
it.each([
[
{
existing: {
maxLength: 10,
maxDelta: 5,
action: StreamingFrameAction.Replace,
},
newOptions: {},
expected: {
maxLength: 10,
maxDelta: 5,
action: StreamingFrameAction.Replace,
},
},
],
[
{
existing: {
maxLength: 10,
maxDelta: 5,
action: StreamingFrameAction.Replace,
},
newOptions: {
maxLength: 9,
maxDelta: 4,
},
expected: {
maxLength: 10,
maxDelta: 5,
action: StreamingFrameAction.Replace,
},
},
],
[
{
existing: {
maxLength: 10,
maxDelta: 5,
action: StreamingFrameAction.Replace,
},
newOptions: {
maxLength: 11,
maxDelta: 6,
},
expected: {
maxLength: 11,
maxDelta: 6,
action: StreamingFrameAction.Replace,
},
},
],
])(
'should always resize to a bigger buffer',
({
existing,
expected,
newOptions,
}: {
existing: StreamingFrameOptions;
newOptions: Partial<StreamingFrameOptions>;
expected: StreamingFrameOptions;
}) => {
const frame = StreamingDataFrame.empty(existing);
frame.resize(newOptions);
expect(frame.getOptions()).toEqual(expected);
}
);
it('should override infinity maxDelta', function () {
const frame = StreamingDataFrame.empty({
maxLength: 10,
maxDelta: Infinity,
action: StreamingFrameAction.Replace,
});
frame.resize({
maxLength: 9,
maxDelta: 4,
});
expect(frame.getOptions()).toEqual({
maxLength: 10,
maxDelta: 4,
action: StreamingFrameAction.Replace,
});
});
});
describe('options with defaults', function () {
it('should provide defaults', function () {
expect(getStreamingFrameOptions()).toEqual({
action: StreamingFrameAction.Append,
maxDelta: Infinity,
maxLength: 1000,
});
});
});
describe('when deserialized', function () {
const json: DataFrameJSON = {
schema: {
fields: [
{ name: 'time', type: FieldType.time },
{ name: 'name', type: FieldType.string },
{ name: 'value', type: FieldType.number },
],
},
data: {
values: [
[100, 200, 300],
['a', 'b', 'c'],
[1, 2, 3],
],
},
};
it('should support pushing new values matching the existing schema in `append` mode', function () {
const frame = StreamingDataFrame.deserialize(
StreamingDataFrame.fromDataFrameJSON(json, {
maxLength: 5,
maxDelta: 300,
}).serialize()
);
expect(frame.length).toEqual(3);
frame.pushNewValues([
[601, 602],
['x', 'y'],
[10, 11],
]);
expect(frame.length).toEqual(3);
expect(frame.fields.map((f) => ({ name: f.name, value: f.values }))).toMatchInlineSnapshot(`
[
{
"name": "time",
"value": [
300,
601,
602,
],
},
{
"name": "name",
"value": [
"c",
"x",
"y",
],
},
{
"name": "value",
"value": [
3,
10,
11,
],
},
]
`);
});
it('should support pushing new values matching the existing schema in `replace` mode', function () {
const frame = StreamingDataFrame.deserialize(
StreamingDataFrame.fromDataFrameJSON(json, {
maxLength: 5,
maxDelta: 300,
action: StreamingFrameAction.Replace,
}).serialize()
);
expect(frame.length).toEqual(3);
frame.pushNewValues([
[601, 602],
['x', 'y'],
[10, 11],
]);
expect(frame.length).toEqual(2);
expect(frame.fields.map((f) => ({ name: f.name, value: f.values }))).toMatchInlineSnapshot(`
[
{
"name": "time",
"value": [
601,
602,
],
},
{
"name": "name",
"value": [
"x",
"y",
],
},
{
"name": "value",
"value": [
10,
11,
],
},
]
`);
});
});
describe('when created empty', function () {
it('should have no packets', function () {
const streamingDataFrame = StreamingDataFrame.empty();
expect(streamingDataFrame.hasAtLeastOnePacket()).toEqual(false);
expect(streamingDataFrame.fields).toHaveLength(0);
});
});
describe('lengths property is accurate', () => {
const stream = StreamingDataFrame.fromDataFrameJSON(
{
schema: {
fields: [{ name: 'simple', type: FieldType.number }],
},
data: {
values: [[100]],
},
},
{
maxLength: 5,
}
);
let val = reduceField({ field: stream.fields[0], reducers: [ReducerID.lastNotNull] })[ReducerID.lastNotNull];
expect(val).toEqual(100);
expect(stream.length).toEqual(1);
stream.push({
data: { values: [[200]] },
});
val = reduceField({ field: stream.fields[0], reducers: [ReducerID.lastNotNull] })[ReducerID.lastNotNull];
expect(val).toEqual(200);
expect(stream.length).toEqual(2);
const copy = { ...stream } as unknown as DataFrame;
expect(copy.length).toEqual(2);
});
describe('streaming labels column', () => {
const stream = StreamingDataFrame.fromDataFrameJSON(
{
schema: {
fields: [
{ name: 'labels', type: FieldType.string },
{ name: 'time', type: FieldType.time },
{ name: 'speed', type: FieldType.number },
{ name: 'light', type: FieldType.number },
],
},
},
{
maxLength: 4,
displayNameFormat: '{{__name__}}: {{sensor}}',
}
);
stream.push({
data: {
values: [
// A = influxStyle, B = prometheus style labels
// key must be constatnt for join to work
['sensor=A', '{sensor="B"}'],
[100, 100],
[10, 15],
[1, 2],
],
},
});
stream.push({
data: {
values: [
['{sensor="B"}', 'sensor=C'],
[200, 200],
[20, 25],
[3, 4],
],
},
});
stream.push({
data: {
values: [
['sensor=A', 'sensor=C'],
[300, 400],
[30, 40],
[5, 6],
],
},
});
expect(stream.fields.map((f) => ({ name: f.name, labels: f.labels, values: f.values })))
.toMatchInlineSnapshot(`
[
{
"labels": undefined,
"name": "time",
"values": [
100,
200,
300,
400,
],
},
{
"labels": {
"sensor": "A",
},
"name": "speed",
"values": [
10,
undefined,
30,
undefined,
],
},
{
"labels": {
"sensor": "A",
},
"name": "light",
"values": [
1,
undefined,
5,
undefined,
],
},
{
"labels": {
"sensor": "B",
},
"name": "speed",
"values": [
15,
20,
undefined,
undefined,
],
},
{
"labels": {
"sensor": "B",
},
"name": "light",
"values": [
2,
3,
undefined,
undefined,
],
},
{
"labels": {
"sensor": "C",
},
"name": "speed",
"values": [
undefined,
25,
undefined,
40,
],
},
{
"labels": {
"sensor": "C",
},
"name": "light",
"values": [
undefined,
4,
undefined,
6,
],
},
]
`);
// Push value with empty labels
stream.push({
data: {
values: [[''], [500], [50], [7]],
},
});
// names are based on legend format
expect(stream.fields.map((f) => getFieldDisplayName(f, stream, [stream]))).toMatchInlineSnapshot(`
[
"time: sensor",
"speed: A",
"light: A",
"speed: B",
"light: B",
"speed: C",
"light: C",
"speed: sensor",
"light: sensor",
]
`);
});
describe('keep track of packets', () => {
const json: DataFrameJSON = {
schema: {
fields: [
{ name: 'time', type: FieldType.time },
{ name: 'value', type: FieldType.number },
],
},
data: {
values: [
[100, 200, 300],
[1, 2, 3],
],
},
};
const stream = StreamingDataFrame.fromDataFrameJSON(json, {
maxLength: 4,
maxDelta: 300,
});
const getSnapshot = (f: StreamingDataFrame) => {
return {
values: f.fields[1].values,
info: f.packetInfo,
};
};
expect(getSnapshot(stream)).toMatchInlineSnapshot(`
{
"info": {
"action": "replace",
"length": 3,
"number": 1,
"schemaChanged": true,
},
"values": [
1,
2,
3,
],
}
`);
stream.push({
data: {
values: [
[400, 500],
[4, 5],
],
},
});
expect(getSnapshot(stream)).toMatchInlineSnapshot(`
{
"info": {
"action": "append",
"length": 2,
"number": 2,
"schemaChanged": false,
},
"values": [
2,
3,
4,
5,
],
}
`);
stream.push({
data: {
values: [[600], [6]],
},
});
expect(getSnapshot(stream)).toMatchInlineSnapshot(`
{
"info": {
"action": "append",
"length": 1,
"number": 3,
"schemaChanged": false,
},
"values": [
3,
4,
5,
6,
],
}
`);
});
/*
describe('transpose vertical records', () => {
let vrecsA = [
['sensor=A', 'sensor=B'],
[100, 100],
[10, 15],
];
let vrecsB = [
['sensor=B', 'sensor=C'],
[200, 200],
[20, 25],
];
let vrecsC = [
['sensor=A', 'sensor=C'],
[300, 400],
[30, 40],
];
let cTables = transpose(vrecsC);
expect(cTables).toMatchInlineSnapshot(`
[
[
"sensor=A",
"sensor=C",
],
[
[
[
300,
],
[
30,
],
],
[
[
400,
],
[
40,
],
],
],
]
`);
let cJoined = join(cTables[1]);
expect(cJoined).toMatchInlineSnapshot(`
[
[
300,
400,
],
[
30,
undefined,
],
[
undefined,
40,
],
]
`);
});
*/
});

View File

@@ -1,606 +0,0 @@
import { AlignedData } from 'uplot';
import {
DataFrame,
DataFrameJSON,
decodeFieldValueEntities,
Field,
FieldDTO,
FieldSchema,
FieldType,
guessFieldTypeFromValue,
Labels,
parseLabels,
QueryResultMeta,
toFilteredDataFrameDTO,
} from '@grafana/data';
import { join } from '@grafana/data/src/transformations/transformers/joinDataFrames';
import { StreamingFrameAction, StreamingFrameOptions } from '@grafana/runtime/src/services/live';
import { renderLegendFormat } from 'app/plugins/datasource/prometheus/legend';
/**
* Stream packet info is attached to StreamingDataFrames and indicate how many
* rows were added to the end of the frame. The number of discarded rows can be
* calculated from previous state
*/
export interface StreamPacketInfo {
number: number;
action: StreamingFrameAction;
length: number;
schemaChanged: boolean;
}
const PROM_STYLE_METRIC_LABEL = '__name__';
enum PushMode {
wide,
labels,
// long
}
export type SerializedStreamingDataFrame = {
name?: string;
fields: FieldDTO[];
refId?: string;
meta: QueryResultMeta;
schemaFields: FieldSchema[];
timeFieldIndex: number;
pushMode: PushMode;
length: number;
packetInfo: StreamPacketInfo;
options: StreamingFrameOptions;
labels: Set<string>;
};
/**
* Unlike a circular buffer, this will append and periodically slice the front
*/
export class StreamingDataFrame implements DataFrame {
name?: string;
refId?: string;
meta: QueryResultMeta = {};
fields: Field[] = [];
length = 0;
private schemaFields: FieldSchema[] = [];
private timeFieldIndex = -1;
private pushMode = PushMode.wide;
// current labels
private labels: Set<string> = new Set();
readonly packetInfo: StreamPacketInfo = {
schemaChanged: true,
number: 0,
action: StreamingFrameAction.Replace,
length: 0,
};
private constructor(public options: StreamingFrameOptions) {
// Get Length to show up if you use spread
Object.defineProperty(this, 'length', {
enumerable: true,
});
// Get fields to show up if you use spread
Object.defineProperty(this, 'fields', {
enumerable: true,
});
}
serialize = (
fieldPredicate?: (f: Field) => boolean,
optionsOverride?: Partial<StreamingFrameOptions>,
trimValues?: {
maxLength?: number;
}
): SerializedStreamingDataFrame => {
const options = optionsOverride ? Object.assign({}, { ...this.options, ...optionsOverride }) : this.options;
const dataFrameDTO = toFilteredDataFrameDTO(this, fieldPredicate);
const numberOfItemsToRemove = getNumberOfItemsToRemove(
dataFrameDTO.fields.map((f) => f.values) as unknown[][],
typeof trimValues?.maxLength === 'number' ? Math.min(trimValues.maxLength, options.maxLength) : options.maxLength,
this.timeFieldIndex,
options.maxDelta
);
dataFrameDTO.fields = dataFrameDTO.fields.map((f) => ({
...f,
values: (f.values as unknown[]).slice(numberOfItemsToRemove),
}));
const length = dataFrameDTO.fields[0]?.values?.length ?? 0
return {
...dataFrameDTO,
// TODO: Labels and schema are not filtered by field
labels: this.labels,
schemaFields: this.schemaFields,
name: this.name,
refId: this.refId,
meta: this.meta,
length,
timeFieldIndex: this.timeFieldIndex,
pushMode: this.pushMode,
packetInfo: this.packetInfo,
options,
};
};
private initFromSerialized = (serialized: Omit<SerializedStreamingDataFrame, 'options'>) => {
this.name = serialized.name;
this.refId = serialized.refId;
this.meta = serialized.meta;
this.length = serialized.length;
this.labels = serialized.labels;
this.schemaFields = serialized.schemaFields;
this.timeFieldIndex = serialized.timeFieldIndex;
this.pushMode = serialized.pushMode;
this.packetInfo.length = serialized.packetInfo.length;
this.packetInfo.number = serialized.packetInfo.number;
this.packetInfo.action = StreamingFrameAction.Replace;
this.packetInfo.schemaChanged = true;
this.fields = serialized.fields.map((f) => ({
...f,
type: f.type ?? FieldType.other,
config: f.config ?? {},
values: f.values ?? [],
}));
assureValuesAreWithinLengthLimit(
this.fields.map((f) => f.values),
this.options.maxLength,
this.timeFieldIndex,
this.options.maxDelta
);
};
static deserialize = (serialized: SerializedStreamingDataFrame) => {
const frame = new StreamingDataFrame(serialized.options);
frame.initFromSerialized(serialized);
return frame;
};
static empty = (opts?: Partial<StreamingFrameOptions>): StreamingDataFrame =>
new StreamingDataFrame(getStreamingFrameOptions(opts));
static fromDataFrameJSON = (frame: DataFrameJSON, opts?: Partial<StreamingFrameOptions>): StreamingDataFrame => {
const streamingDataFrame = new StreamingDataFrame(getStreamingFrameOptions(opts));
streamingDataFrame.push(frame);
return streamingDataFrame;
};
private get alwaysReplace() {
return this.options.action === StreamingFrameAction.Replace;
}
needsResizing = ({ maxLength, maxDelta }: StreamingFrameOptions) => {
const needsMoreLength = maxLength && this.options.maxLength < maxLength;
const needsBiggerDelta = maxDelta && this.options.maxDelta < maxDelta;
const needsToOverrideDefaultInfinityDelta = maxDelta && this.options.maxDelta === Infinity;
return Boolean(needsMoreLength || needsBiggerDelta || needsToOverrideDefaultInfinityDelta);
};
resize = ({ maxLength, maxDelta }: Partial<StreamingFrameOptions>) => {
if (maxDelta) {
if (this.options.maxDelta === Infinity) {
this.options.maxDelta = maxDelta;
} else {
this.options.maxDelta = Math.max(maxDelta, this.options.maxDelta);
}
}
this.options.maxLength = Math.max(this.options.maxLength, maxLength ?? 0);
};
/**
* apply the new message to the existing data. This will replace the existing schema
* if a new schema is included in the message, or append data matching the current schema
*/
push(msg: DataFrameJSON): StreamPacketInfo {
const { schema, data } = msg;
this.packetInfo.number++;
this.packetInfo.length = 0;
this.packetInfo.schemaChanged = false;
if (schema) {
this.pushMode = PushMode.wide;
this.timeFieldIndex = schema.fields.findIndex((f) => f.type === FieldType.time);
const firstField = schema.fields[0];
if (
this.timeFieldIndex === 1 &&
firstField.type === FieldType.string &&
(firstField.name === 'labels' || firstField.name === 'Labels')
) {
this.pushMode = PushMode.labels;
this.timeFieldIndex = 0; // after labels are removed!
}
const niceSchemaFields = this.pushMode === PushMode.labels ? schema.fields.slice(1) : schema.fields;
this.refId = schema.refId;
if (schema.meta) {
this.meta = { ...schema.meta };
}
const { displayNameFormat } = this.options;
if (hasSameStructure(this.schemaFields, niceSchemaFields)) {
const len = niceSchemaFields.length;
this.fields.forEach((f, idx) => {
const sf = niceSchemaFields[idx % len];
f.config = sf.config ?? {};
f.labels = sf.labels;
});
if (displayNameFormat) {
this.fields.forEach((f) => {
const labels = { [PROM_STYLE_METRIC_LABEL]: f.name, ...f.labels };
f.config.displayNameFromDS = renderLegendFormat(displayNameFormat, labels);
});
}
} else {
this.packetInfo.schemaChanged = true;
const isWide = this.pushMode === PushMode.wide;
this.fields = niceSchemaFields.map((f) => {
const config = f.config ?? {};
if (displayNameFormat) {
const labels = { [PROM_STYLE_METRIC_LABEL]: f.name, ...f.labels };
config.displayNameFromDS = renderLegendFormat(displayNameFormat, labels);
}
return {
config,
name: f.name,
labels: f.labels,
type: f.type ?? FieldType.other,
// transfer old values by type & name, unless we relied on labels to match fields
values: isWide
? this.fields.find((of) => of.name === f.name && f.type === of.type)?.values ??
Array(this.length).fill(undefined)
: [],
};
});
}
this.schemaFields = niceSchemaFields;
}
if (data && data.values.length && data.values[0].length) {
let { values, entities } = data;
if (entities) {
entities.forEach((ents, i) => {
if (ents) {
decodeFieldValueEntities(ents, values[i]);
// TODO: append replacements to field
}
});
}
if (this.pushMode === PushMode.labels) {
// augment and transform data to match current schema for standard circPush() path
const labeledTables = transpose(values);
// make sure fields are initalized for each label
for (const label of labeledTables.keys()) {
if (!this.labels.has(label)) {
this.packetInfo.schemaChanged = true;
this.addLabel(label);
}
}
// TODO: cache higher up
let dummyTable = Array(this.schemaFields.length).fill([]);
let tables: AlignedData[] = [];
this.labels.forEach((label) => {
tables.push(labeledTables.get(label) ?? dummyTable);
});
values = join(tables);
}
if (values.length !== this.fields.length) {
if (this.fields.length) {
throw new Error(
`push message mismatch. Expected: ${this.fields.length}, received: ${values.length} (labels=${
this.pushMode === PushMode.labels
})`
);
}
this.fields = values.map((vals, idx) => {
let name = `Field ${idx}`;
let type = guessFieldTypeFromValue(vals[0]);
const isTime = idx === 0 && type === FieldType.number && (vals as number[])[0] > 1600016688632;
if (isTime) {
type = FieldType.time;
name = 'Time';
}
return {
name,
type,
config: {},
values: [],
};
});
}
let appended = values;
this.packetInfo.length = values[0].length;
if (this.alwaysReplace || !this.length) {
this.packetInfo.action = StreamingFrameAction.Replace;
} else {
this.packetInfo.action = StreamingFrameAction.Append;
// mutates appended
appended = this.fields.map((f) => f.values);
circPush(appended, values, this.options.maxLength, this.timeFieldIndex, this.options.maxDelta);
}
appended.forEach((v, i) => {
const field = this.fields[i];
const { state } = field;
field.values = v;
if (state) {
state.calcs = undefined;
}
});
// Update the frame length
this.length = appended[0].length;
}
return {
...this.packetInfo,
};
}
pushNewValues = (values: unknown[][]) => {
if (!values?.length) {
return;
}
this.packetInfo.action = this.options.action;
this.packetInfo.number++;
this.packetInfo.length = values[0].length;
this.packetInfo.schemaChanged = false;
if (this.options.action === StreamingFrameAction.Append) {
circPush(
this.fields.map((f) => f.values),
values,
this.options.maxLength,
this.timeFieldIndex,
this.options.maxDelta
);
} else {
values.forEach((v, i) => {
if (this.fields[i]) {
this.fields[i].values = v;
}
});
assureValuesAreWithinLengthLimit(
this.fields.map((f) => f.values),
this.options.maxLength,
this.timeFieldIndex,
this.options.maxDelta
);
}
const newLength = this.fields?.[0]?.values.length;
if (newLength !== undefined) {
this.length = newLength;
}
};
resetStateCalculations = () => {
this.fields.forEach((f) => {
f.state = {
...(f.state ?? {}),
calcs: undefined,
range: undefined,
};
});
};
getMatchingFieldIndexes = (fieldPredicate: (f: Field) => boolean): number[] =>
this.fields
.map((f, index) => (fieldPredicate(f) ? index : undefined))
.filter((val) => val !== undefined) as number[];
getValuesFromLastPacket = (): unknown[][] =>
this.fields.map((f) => {
const values = f.values;
return values.slice(Math.max(values.length - this.packetInfo.length));
});
hasAtLeastOnePacket = () => Boolean(this.packetInfo.length);
// adds a set of fields for a new label
private addLabel(label: string) {
const { displayNameFormat } = this.options;
const labelCount = this.labels.size;
// parse labels
const parsedLabels = parseLabelsFromField(label);
if (labelCount === 0) {
// mutate existing fields and add labels
this.fields.forEach((f, i) => {
if (i > 0) {
f.labels = parsedLabels;
if (displayNameFormat) {
const labels = { [PROM_STYLE_METRIC_LABEL]: f.name, ...parsedLabels };
f.config.displayNameFromDS = renderLegendFormat(displayNameFormat, labels);
}
}
});
} else {
for (let i = 1; i < this.schemaFields.length; i++) {
let proto = this.schemaFields[i] as Field;
const config = proto.config ?? {};
if (displayNameFormat) {
const labels = { [PROM_STYLE_METRIC_LABEL]: proto.name, ...parsedLabels };
config.displayNameFromDS = renderLegendFormat(displayNameFormat, labels);
}
this.fields.push({
...proto,
config,
labels: parsedLabels,
values: Array(this.length).fill(undefined),
});
}
}
this.labels.add(label);
}
getOptions = (): Readonly<StreamingFrameOptions> => this.options;
}
export function getStreamingFrameOptions(opts?: Partial<StreamingFrameOptions>): StreamingFrameOptions {
return {
maxLength: opts?.maxLength ?? 1000,
maxDelta: opts?.maxDelta ?? Infinity,
action: opts?.action ?? StreamingFrameAction.Append,
displayNameFormat: opts?.displayNameFormat,
};
}
// converts vertical insertion records with table keys in [0] and column values in [1...N]
// to join()-able tables with column arrays
export function transpose(vrecs: any[][]) {
let tableKeys = new Set(vrecs[0]);
let tables = new Map();
tableKeys.forEach((key) => {
let cols = Array(vrecs.length - 1)
.fill(null)
.map(() => []);
tables.set(key, cols);
});
for (let r = 0; r < vrecs[0].length; r++) {
let table = tables.get(vrecs[0][r]);
for (let c = 1; c < vrecs.length; c++) {
table[c - 1].push(vrecs[c][r]);
}
}
return tables;
}
// binary search for index of closest value
export function closestIdx(num: number, arr: number[], lo?: number, hi?: number) {
let mid;
lo = lo || 0;
hi = hi || arr.length - 1;
let bitwise = hi <= 2147483647;
while (hi - lo > 1) {
mid = bitwise ? (lo + hi) >> 1 : Math.floor((lo + hi) / 2);
if (arr[mid] < num) {
lo = mid;
} else {
hi = mid;
}
}
if (num - arr[lo] <= arr[hi] - num) {
return lo;
}
return hi;
}
export function parseLabelsFromField(str: string): Labels {
if (!str.length) {
return {};
}
if (str.charAt(0) === '{') {
return parseLabels(str);
}
const parsedLabels: Labels = {};
str.split(',').forEach((kv) => {
const [key, val] = kv.trim().split('=');
parsedLabels[key] = val;
});
return parsedLabels;
}
/**
* @internal // not exported in yet
*/
export function getLastStreamingDataFramePacket(frame: DataFrame) {
const pi = (frame as StreamingDataFrame).packetInfo;
return pi?.action ? pi : undefined;
}
// mutable circular push
function circPush(data: unknown[][], newData: unknown[][], maxLength = Infinity, deltaIdx = 0, maxDelta = Infinity) {
for (let i = 0; i < data.length; i++) {
for (let k = 0; k < newData[i].length; k++) {
data[i].push(newData[i][k]);
}
}
return assureValuesAreWithinLengthLimit(data, maxLength, deltaIdx, maxDelta);
}
function assureValuesAreWithinLengthLimit(data: unknown[][], maxLength = Infinity, deltaIdx = 0, maxDelta = Infinity) {
const count = getNumberOfItemsToRemove(data, maxLength, deltaIdx, maxDelta);
if (count) {
for (let i = 0; i < data.length; i++) {
data[i].splice(0, count);
}
}
return count;
}
function getNumberOfItemsToRemove(data: unknown[][], maxLength = Infinity, deltaIdx = 0, maxDelta = Infinity) {
if (!data[0]?.length) {
return 0;
}
const nlen = data[0].length;
let sliceIdx = 0;
if (nlen > maxLength) {
sliceIdx = nlen - maxLength;
}
if (maxDelta !== Infinity && deltaIdx >= 0) {
const deltaLookup = data[deltaIdx] as number[];
const low = deltaLookup[sliceIdx];
const high = deltaLookup[nlen - 1];
if (high - low > maxDelta) {
sliceIdx = closestIdx(high - maxDelta, deltaLookup, sliceIdx);
}
}
return sliceIdx;
}
function hasSameStructure(a: FieldSchema[], b: FieldSchema[]): boolean {
if (a?.length !== b.length) {
return false;
}
for (let i = 0; i < a.length; i++) {
const fA = a[i];
const fB = b[i];
if (fA.name !== fB.name || fA.type !== fB.type) {
return false;
}
}
return true;
}

View File

@@ -1,4 +1,4 @@
import { closestIdx } from "./StreamingDataFrame";
import { closestIdx } from '@grafana/data/src/dataframe/StreamingDataFrame';
export type Table = [times: number[], ...values: any[][]];

View File

@@ -1,6 +1,4 @@
import { DataQueryResponseData, isDataFrame } from '@grafana/data';
import { StreamingDataFrame } from './StreamingDataFrame';
import { DataQueryResponseData, isDataFrame, StreamingDataFrame } from '@grafana/data';
/**
* @alpha -- experimental

View File

@@ -1,10 +1,9 @@
import { Subject } from 'rxjs';
import { DataQueryResponse, FieldType, LiveChannelScope } from '@grafana/data';
import { DataQueryResponse, FieldType, LiveChannelScope, StreamingDataFrame } from '@grafana/data';
import { BackendSrv } from '@grafana/runtime';
import { CentrifugeSrv, StreamingDataQueryResponse } from './centrifuge/service';
import { StreamingDataFrame } from './data/StreamingDataFrame';
import { StreamingResponseDataType } from './data/utils';
import { GrafanaLiveService } from './live';

View File

@@ -1,6 +1,6 @@
import { from, map, of, switchMap } from 'rxjs';
import { DataFrame, toLiveChannelId } from '@grafana/data';
import { DataFrame, toLiveChannelId, StreamingDataFrame } from '@grafana/data';
import { BackendSrv, GrafanaLiveSrv, toDataQueryResponse } from '@grafana/runtime';
import {
standardStreamOptionsProvider,
@@ -8,7 +8,6 @@ import {
} from '@grafana/runtime/src/utils/DataSourceWithBackend';
import { CentrifugeSrv, StreamingDataQueryResponse } from './centrifuge/service';
import { StreamingDataFrame } from './data/StreamingDataFrame';
import { isStreamingResponseData, StreamingResponseDataType } from './data/utils';
type GrafanaLiveServiceDeps = {

View File

@@ -27,10 +27,10 @@ import {
transformDataFrame,
preProcessPanelData,
ApplyFieldOverrideOptions,
StreamingDataFrame,
} from '@grafana/data';
import { getTemplateSrv, toDataQueryError } from '@grafana/runtime';
import { ExpressionDatasourceRef } from '@grafana/runtime/src/utils/DataSourceWithBackend';
import { StreamingDataFrame } from 'app/features/live/data/StreamingDataFrame';
import { isStreamingDataFrame } from 'app/features/live/data/utils';
import { getDatasourceSrv } from 'app/features/plugins/datasource_srv';

View File

@@ -36,6 +36,7 @@ import {
DataSourceWithToggleableQueryFiltersSupport,
ToggleFilterAction,
QueryFilterOptions,
renderLegendFormat,
} from '@grafana/data';
import { BackendSrvRequest, config, DataSourceWithBackend, FetchError } from '@grafana/runtime';
import { DataQuery } from '@grafana/schema';
@@ -46,7 +47,6 @@ import { getTemplateSrv, TemplateSrv } from 'app/features/templating/template_sr
import { serializeParams } from '../../../core/utils/fetch';
import { queryLogsSample, queryLogsVolume } from '../../../features/logs/logsModel';
import { getLogLevelFromKey } from '../../../features/logs/utils';
import { renderLegendFormat } from '../prometheus/legend';
import { replaceVariables, returnVariables } from '../prometheus/querybuilder/shared/parsingUtils';
import LanguageProvider from './LanguageProvider';

View File

@@ -1,8 +1,14 @@
import { map, Observable, defer, mergeMap } from 'rxjs';
import { DataFrameJSON, DataQueryRequest, DataQueryResponse, LiveChannelScope, LoadingState } from '@grafana/data';
import {
DataFrameJSON,
DataQueryRequest,
DataQueryResponse,
LiveChannelScope,
LoadingState,
StreamingDataFrame,
} from '@grafana/data';
import { getGrafanaLiveSrv } from '@grafana/runtime';
import { StreamingDataFrame } from 'app/features/live/data/StreamingDataFrame';
import { LokiDatasource } from './datasource';
import { LokiQuery } from './types';

View File

@@ -23,6 +23,7 @@ import {
rangeUtil,
ScopedVars,
TimeRange,
renderLegendFormat,
} from '@grafana/data';
import {
BackendDataSourceResponse,
@@ -52,7 +53,6 @@ import {
getPrometheusTime,
getRangeSnapInterval,
} from './language_utils';
import { renderLegendFormat } from './legend';
import PrometheusMetricFindQuery from './metric_find_query';
import { getInitHints, getQueryHints } from './query_hints';
import { QueryEditorMode } from './querybuilder/shared/types';

View File

@@ -1,30 +0,0 @@
import { renderLegendFormat } from './legend';
describe('renderLegendFormat()', () => {
const labels = {
a: 'AAA',
b: 'BBB',
'with space': 'CCC',
};
it('works without any labels', () => {
expect(renderLegendFormat('hello', {})).toEqual('hello');
expect(renderLegendFormat('hello', labels)).toEqual('hello');
});
it('Simple replace', () => {
expect(renderLegendFormat('value: {{a}}', labels)).toEqual('value: AAA');
expect(renderLegendFormat('{{a}} {{with space}}', labels)).toEqual('AAA CCC');
// not sure if this is expected... but current behavior
expect(renderLegendFormat('{{ a }}', labels)).toEqual('AAA');
});
it('Bad syntax', () => {
expect(renderLegendFormat('value: {{a}', labels)).toEqual('value: {{a}');
expect(renderLegendFormat('value: {a}}}', labels)).toEqual('value: {a}}}');
// Current behavior -- not sure if expected or not
expect(renderLegendFormat('value: {{{a}}}', labels)).toEqual('value: {a}');
});
});

View File

@@ -1,7 +0,0 @@
import { Labels } from '@grafana/data';
/** replace labels in a string. Used for loki+prometheus legend formats */
export function renderLegendFormat(aliasPattern: string, aliasData: Labels): string {
const aliasRegex = /\{\{\s*(.+?)\s*\}\}/g;
return aliasPattern.replace(aliasRegex, (_, g1) => (aliasData[g1] ? aliasData[g1] : g1));
}

View File

@@ -19,11 +19,11 @@ import {
ScopedVars,
TIME_SERIES_TIME_FIELD_NAME,
TIME_SERIES_VALUE_FIELD_NAME,
renderLegendFormat,
} from '@grafana/data';
import { calculateFieldDisplayName } from '@grafana/data/src/field/fieldState';
import { config, FetchResponse, getDataSourceSrv, getTemplateSrv } from '@grafana/runtime';
import { renderLegendFormat } from './legend';
import {
ExemplarTraceIdDestination,
isExemplarData,

View File

@@ -11,9 +11,8 @@ import {
LoadingState,
DataFrameSchema,
DataFrameData,
StreamingDataFrame,
} from '@grafana/data';
// eslint-disable-next-line no-restricted-imports -- In the process from being removed
import { StreamingDataFrame } from 'app/features/live/data/StreamingDataFrame';
import { getRandomLine } from './LogIpsum';
import { TestData, StreamingQuery } from './dataquery.gen';

View File

@@ -16,10 +16,10 @@ import {
LoadingState,
applyFieldOverrides,
LiveChannelAddress,
StreamingDataFrame,
} from '@grafana/data';
import { config, getGrafanaLiveSrv } from '@grafana/runtime';
import { Alert, stylesFactory, Button, JSONFormatter, CustomScrollbar, CodeEditor } from '@grafana/ui';
import { StreamingDataFrame } from 'app/features/live/data/StreamingDataFrame';
import { TablePanel } from '../table/TablePanel';

View File

@@ -1,6 +1,7 @@
import React, { useCallback, useMemo, useRef, useState } from 'react';
import { CartesianCoords2D, DashboardCursorSync, DataFrame, FieldType, PanelProps } from '@grafana/data';
import { getLastStreamingDataFramePacket } from '@grafana/data/src/dataframe/StreamingDataFrame';
import {
Portal,
TooltipDisplayMode,
@@ -18,7 +19,6 @@ import {
prepareTimelineLegendItems,
TimelineMode,
} from 'app/core/components/TimelineChart/utils';
import { getLastStreamingDataFramePacket } from 'app/features/live/data/StreamingDataFrame';
import { AnnotationEditorPlugin } from '../timeseries/plugins/AnnotationEditorPlugin';
import { AnnotationsPlugin } from '../timeseries/plugins/AnnotationsPlugin';