mirror of
https://github.com/grafana/grafana.git
synced 2025-02-16 10:24:54 -06:00
Live: fix field filtering and survive reconnect (#35122)
This commit is contained in:
parent
ba7aca69f6
commit
f7893ca5cf
@ -73,26 +73,33 @@ export class StreamingDataFrame implements DataFrame {
|
||||
|
||||
const niceSchemaFields = this.pushMode === PushMode.labels ? schema.fields.slice(1) : schema.fields;
|
||||
|
||||
// create new fields from the schema
|
||||
const newFields = niceSchemaFields.map((f, idx) => {
|
||||
return {
|
||||
config: f.config ?? {},
|
||||
name: f.name,
|
||||
labels: f.labels,
|
||||
type: f.type ?? FieldType.other,
|
||||
// transfer old values by type & name, unless we relied on labels to match fields
|
||||
values:
|
||||
this.pushMode === PushMode.wide
|
||||
? this.fields.find((of) => of.name === f.name && f.type === of.type)?.values ?? new ArrayVector()
|
||||
: new ArrayVector(),
|
||||
};
|
||||
});
|
||||
|
||||
this.name = schema.name;
|
||||
this.refId = schema.refId;
|
||||
this.meta = schema.meta;
|
||||
|
||||
if (hasSameStructure(this.schemaFields, niceSchemaFields)) {
|
||||
const len = niceSchemaFields.length;
|
||||
this.fields.forEach((f, idx) => {
|
||||
const sf = niceSchemaFields[idx % len];
|
||||
f.config = sf.config ?? {};
|
||||
f.labels = sf.labels;
|
||||
});
|
||||
} else {
|
||||
const isWide = this.pushMode === PushMode.wide;
|
||||
this.fields = niceSchemaFields.map((f) => {
|
||||
return {
|
||||
config: f.config ?? {},
|
||||
name: f.name,
|
||||
labels: f.labels,
|
||||
type: f.type ?? FieldType.other,
|
||||
// transfer old values by type & name, unless we relied on labels to match fields
|
||||
values: isWide
|
||||
? this.fields.find((of) => of.name === f.name && f.type === of.type)?.values ?? new ArrayVector()
|
||||
: new ArrayVector(),
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
this.schemaFields = niceSchemaFields;
|
||||
this.fields = newFields;
|
||||
}
|
||||
|
||||
if (data && data.values.length && data.values[0].length) {
|
||||
@ -131,7 +138,11 @@ export class StreamingDataFrame implements DataFrame {
|
||||
|
||||
if (values.length !== this.fields.length) {
|
||||
if (this.fields.length) {
|
||||
throw new Error(`push message mismatch. Expected: ${this.fields.length}, recieved: ${values.length}`);
|
||||
throw new Error(
|
||||
`push message mismatch. Expected: ${this.fields.length}, recieved: ${values.length} (labels=${
|
||||
this.pushMode === PushMode.labels
|
||||
})`
|
||||
);
|
||||
}
|
||||
|
||||
this.fields = values.map((vals, idx) => {
|
||||
@ -286,3 +297,17 @@ function circPush(data: number[][], newData: number[][], maxLength = Infinity, d
|
||||
|
||||
return data;
|
||||
}
|
||||
|
||||
function hasSameStructure(a: FieldSchema[], b: FieldSchema[]): boolean {
|
||||
if (a?.length !== b.length) {
|
||||
return false;
|
||||
}
|
||||
for (let i = 0; i < a.length; i++) {
|
||||
const fA = a[i];
|
||||
const fB = b[i];
|
||||
if (fA.name !== fB.name || fA.type !== fB.type) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
@ -219,6 +219,7 @@ export class CentrifugeSrv implements GrafanaLiveSrv {
|
||||
let filtered: DataFrame | undefined = undefined;
|
||||
let state = LoadingState.Streaming;
|
||||
let last = perf.last;
|
||||
let lastWidth = -1;
|
||||
|
||||
const process = (msg: DataFrameJSON) => {
|
||||
if (!data) {
|
||||
@ -227,9 +228,11 @@ export class CentrifugeSrv implements GrafanaLiveSrv {
|
||||
data.push(msg);
|
||||
}
|
||||
state = LoadingState.Streaming;
|
||||
const sameWidth = lastWidth === data.fields.length;
|
||||
lastWidth = data.fields.length;
|
||||
|
||||
// Filter out fields
|
||||
if (!filtered || msg.schema) {
|
||||
if (!filtered || msg.schema || !sameWidth) {
|
||||
filtered = data;
|
||||
if (options.filter) {
|
||||
const { fields } = options.filter;
|
||||
|
Loading…
Reference in New Issue
Block a user