StreamingDataFrame: support legend format and prometheus style labels (#43637)

This commit is contained in:
Ryan McKinley 2022-01-05 11:56:23 -05:00 committed by GitHub
parent b07bd55343
commit e77b4abcc8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 70 additions and 27 deletions

View File

@ -32,6 +32,9 @@ export interface StreamingFrameOptions {
maxLength: number; // 1000
maxDelta: number; // how long to keep things
action: StreamingFrameAction; // default will append
/** optionally format field names based on labels */
displayNameFormat?: string;
}
/**

View File

@ -524,13 +524,16 @@ describe('Streaming JSON', () => {
},
{
maxLength: 4,
displayNameFormat: '{{__name__}}: {{sensor}}',
}
);
stream.push({
data: {
values: [
['sensor=A', 'sensor=B'],
// A = influxStyle, B = prometheus style labels
// key must be constatnt for join to work
['sensor=A', '{sensor="B"}'],
[100, 100],
[10, 15],
[1, 2],
@ -541,7 +544,7 @@ describe('Streaming JSON', () => {
stream.push({
data: {
values: [
['sensor=B', 'sensor=C'],
['{sensor="B"}', 'sensor=C'],
[200, 200],
[20, 25],
[3, 4],
@ -655,19 +658,20 @@ describe('Streaming JSON', () => {
},
});
// names are based on legend format
expect(stream.fields.map((f) => getFieldDisplayName(f, stream, [stream]))).toMatchInlineSnapshot(`
Array [
"time",
"speed A",
"light A",
"speed B",
"light B",
"speed C",
"light C",
"speed 4",
"light 4",
"time: sensor",
"speed: A",
"light: A",
"speed: B",
"light: B",
"speed: C",
"light: C",
"speed: sensor",
"light: sensor",
]
`); // speed+light 4 ¯\_(ツ)_/¯ better than undefined labels
`);
});
describe('keep track of packets', () => {

View File

@ -11,12 +11,14 @@ import {
guessFieldTypeFromValue,
ArrayVector,
toFilteredDataFrameDTO,
parseLabels,
} from '@grafana/data';
import { join } from '@grafana/data/src/transformations/transformers/joinDataFrames';
import {
StreamingFrameAction,
StreamingFrameOptions,
} from '@grafana/runtime/src/services/live';
import { renderLegendFormat } from 'app/plugins/datasource/prometheus/legend';
import { AlignedData } from 'uplot';
/**
@ -31,6 +33,8 @@ export interface StreamPacketInfo {
schemaChanged: boolean;
}
const PROM_STYLE_METRIC_LABEL = '__name__';
enum PushMode {
wide,
labels,
@ -202,10 +206,11 @@ export class StreamingDataFrame implements DataFrame {
if (schema) {
this.pushMode = PushMode.wide;
this.timeFieldIndex = schema.fields.findIndex((f) => f.type === FieldType.time);
const firstField = schema.fields[0];
if (
this.timeFieldIndex === 1 &&
schema.fields[0].name === 'labels' &&
schema.fields[0].type === FieldType.string
firstField.type === FieldType.string &&
(firstField.name === 'labels' || firstField.name === 'Labels')
) {
this.pushMode = PushMode.labels;
this.timeFieldIndex = 0; // after labels are removed!
@ -218,19 +223,31 @@ export class StreamingDataFrame implements DataFrame {
this.meta = { ...schema.meta };
}
const { displayNameFormat } = this.options;
if (hasSameStructure(this.schemaFields, niceSchemaFields)) {
const len = niceSchemaFields.length;
this.fields.forEach((f, idx) => {
const sf = niceSchemaFields[idx % len];
f.config = sf.config ?? {};
f.labels = sf.labels;
});
});
if (displayNameFormat) {
this.fields.forEach((f) => {
const labels = {[PROM_STYLE_METRIC_LABEL]:f.name, ...f.labels};
f.config.displayNameFromDS = renderLegendFormat(displayNameFormat, labels);
});
}
} else {
this.packetInfo.schemaChanged = true;
const isWide = this.pushMode === PushMode.wide;
this.fields = niceSchemaFields.map((f) => {
const config = f.config ?? {};
if (displayNameFormat) {
const labels = {[PROM_STYLE_METRIC_LABEL]:f.name, ...f.labels};
config.displayNameFromDS = renderLegendFormat(displayNameFormat, labels);
}
return {
config: f.config ?? {},
config,
name: f.name,
labels: f.labels,
type: f.type ?? FieldType.other,
@ -382,31 +399,34 @@ export class StreamingDataFrame implements DataFrame {
// adds a set of fields for a new label
private addLabel(label: string) {
let labelCount = this.labels.size;
const { displayNameFormat } = this.options;
const labelCount = this.labels.size;
// parse labels
const parsedLabels: Labels = {};
if (label.length) {
label.split(',').forEach((kv) => {
const [key, val] = kv.trim().split('=');
parsedLabels[key] = val;
});
}
const parsedLabels = parseLabelsFromField(label);
if (labelCount === 0) {
// mutate existing fields and add labels
this.fields.forEach((f, i) => {
if (i > 0) {
f.labels = parsedLabels;
if (displayNameFormat) {
const labels = {[PROM_STYLE_METRIC_LABEL]:f.name, ...parsedLabels};
f.config.displayNameFromDS = renderLegendFormat(displayNameFormat, labels);
}
}
});
} else {
for (let i = 1; i < this.schemaFields.length; i++) {
let proto = this.schemaFields[i] as Field;
const config = proto.config ?? {};
if (displayNameFormat) {
const labels = {[PROM_STYLE_METRIC_LABEL]:proto.name, ...parsedLabels};
config.displayNameFromDS = renderLegendFormat(displayNameFormat, labels);
}
this.fields.push({
...proto,
config: proto.config ?? {},
config,
labels: parsedLabels,
values: new ArrayVector(Array(this.length).fill(undefined)),
});
@ -414,7 +434,7 @@ export class StreamingDataFrame implements DataFrame {
}
this.labels.add(label);
}
};
getOptions = (): Readonly<StreamingFrameOptions> => this.options;
}
@ -424,6 +444,7 @@ export function getStreamingFrameOptions(opts?: Partial<StreamingFrameOptions>):
maxLength: opts?.maxLength ?? 1000,
maxDelta: opts?.maxDelta ?? Infinity,
action: opts?.action ?? StreamingFrameAction.Append,
displayNameFormat: opts?.displayNameFormat,
};
}
@ -475,6 +496,21 @@ function closestIdx(num: number, arr: number[], lo?: number, hi?: number) {
return hi;
}
export function parseLabelsFromField(str: string): Labels {
if (!str.length) {
return {};
}
if (str.charAt(0) === '{') {
return parseLabels(str);
}
const parsedLabels: Labels = {};
str.split(',').forEach((kv) => {
const [key, val] = kv.trim().split('=');
parsedLabels[key] = val;
});
return parsedLabels;
}
/**
* @internal // not exported in yet
*/