Loki: use the same dataframe-format for both live and normal queries (#47153)

* loki: use single dataframe-format for both live and normal queries

* better comment

* better function-name

* simplified code

* logs_model: added test
This commit is contained in:
Gábor Farkas 2022-04-08 09:57:06 +02:00 committed by GitHub
parent 7ab910af48
commit e1438990ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 290 additions and 118 deletions

View File

@ -332,7 +332,10 @@ describe('dataFrameToLogsModel', () => {
expect(logsModel.meta).toHaveLength(2);
expect(logsModel.meta![0]).toMatchObject({
label: COMMON_LABELS,
value: series[0].fields[1].labels,
value: {
filename: '/var/log/grafana/grafana.log',
job: 'grafana',
},
kind: LogsMetaKind.LabelsMap,
});
expect(logsModel.meta![1]).toMatchObject({
@ -342,6 +345,147 @@ describe('dataFrameToLogsModel', () => {
});
});
it('given one series with labels-field should return expected logs model', () => {
const series: DataFrame[] = [
new MutableDataFrame({
fields: [
{
name: 'labels',
type: FieldType.other,
values: [
{
filename: '/var/log/grafana/grafana.log',
job: 'grafana',
},
{
filename: '/var/log/grafana/grafana.log',
job: 'grafana',
},
],
},
{
name: 'time',
type: FieldType.time,
values: ['2019-04-26T09:28:11.352440161Z', '2019-04-26T14:42:50.991981292Z'],
},
{
name: 'message',
type: FieldType.string,
values: [
't=2019-04-26T11:05:28+0200 lvl=info msg="Initializing DatasourceCacheService" logger=server',
't=2019-04-26T16:42:50+0200 lvl=eror msg="new token…t unhashed token=56d9fdc5c8b7400bd51b060eea8ca9d7',
],
},
{
name: 'id',
type: FieldType.string,
values: ['foo', 'bar'],
},
],
meta: {
limit: 1000,
},
}),
];
const logsModel = dataFrameToLogsModel(series, 1);
expect(logsModel.hasUniqueLabels).toBeFalsy();
expect(logsModel.rows).toHaveLength(2);
expect(logsModel.rows).toMatchObject([
{
entry: 't=2019-04-26T11:05:28+0200 lvl=info msg="Initializing DatasourceCacheService" logger=server',
labels: { filename: '/var/log/grafana/grafana.log', job: 'grafana' },
logLevel: 'info',
uniqueLabels: {},
uid: 'foo',
},
{
entry: 't=2019-04-26T16:42:50+0200 lvl=eror msg="new token…t unhashed token=56d9fdc5c8b7400bd51b060eea8ca9d7',
labels: { filename: '/var/log/grafana/grafana.log', job: 'grafana' },
logLevel: 'error',
uniqueLabels: {},
uid: 'bar',
},
]);
expect(logsModel.series).toHaveLength(2);
expect(logsModel.series).toMatchObject([
{
name: 'info',
fields: [
{ type: 'time', values: new ArrayVector([1556270891000, 1556289770000]) },
{ type: 'number', values: new ArrayVector([1, 0]) },
],
},
{
name: 'error',
fields: [
{ type: 'time', values: new ArrayVector([1556289770000]) },
{ type: 'number', values: new ArrayVector([1]) },
],
},
]);
expect(logsModel.meta).toHaveLength(2);
expect(logsModel.meta![0]).toMatchObject({
label: COMMON_LABELS,
value: { filename: '/var/log/grafana/grafana.log', job: 'grafana' },
kind: LogsMetaKind.LabelsMap,
});
expect(logsModel.meta![1]).toMatchObject({
label: LIMIT_LABEL,
value: `1000 (2 returned)`,
kind: LogsMetaKind.String,
});
});
it('given one series with labels-field it should work regardless the label-fields position', () => {
const labels = {
name: 'labels',
type: FieldType.other,
values: [
{
node: 'first',
mode: 'slow',
},
],
};
const time = {
name: 'time',
type: FieldType.time,
values: ['2019-04-26T09:28:11.352440161Z'],
};
const line = {
name: 'line',
type: FieldType.string,
values: ['line1'],
};
const frame1 = new MutableDataFrame({
fields: [labels, time, line],
});
const frame2 = new MutableDataFrame({
fields: [time, labels, line],
});
const frame3 = new MutableDataFrame({
fields: [time, line, labels],
});
const logsModel1 = dataFrameToLogsModel([frame1], 1);
expect(logsModel1.rows).toHaveLength(1);
expect(logsModel1.rows[0].labels).toStrictEqual({ mode: 'slow', node: 'first' });
const logsModel2 = dataFrameToLogsModel([frame2], 1);
expect(logsModel2.rows).toHaveLength(1);
expect(logsModel2.rows[0].labels).toStrictEqual({ mode: 'slow', node: 'first' });
const logsModel3 = dataFrameToLogsModel([frame3], 1);
expect(logsModel3.rows).toHaveLength(1);
expect(logsModel3.rows[0].labels).toStrictEqual({ mode: 'slow', node: 'first' });
});
it('given one series with error should return expected logs model', () => {
const series: DataFrame[] = [
new MutableDataFrame({

View File

@ -303,11 +303,38 @@ interface LogFields {
timeField: FieldWithIndex;
stringField: FieldWithIndex;
labelsField?: FieldWithIndex;
timeNanosecondField?: FieldWithIndex;
logLevelField?: FieldWithIndex;
idField?: FieldWithIndex;
}
function getAllLabels(fields: LogFields): Labels[] {
// there are two types of dataframes we handle:
// 1. labels are in a separate field (more efficient when labels change by every log-row)
// 2. labels are in in the string-field's `.labels` attribute
const { stringField, labelsField } = fields;
const fieldLabels = stringField.labels !== undefined ? [stringField.labels] : [];
const labelsFieldLabels: Labels[] = labelsField !== undefined ? labelsField.values.toArray() : [];
return [...fieldLabels, ...labelsFieldLabels];
}
function getLabelsForFrameRow(fields: LogFields, index: number): Labels {
// there are two types of dataframes we handle.
// either labels-on-the-string-field, or labels-in-the-labels-field
const { stringField, labelsField } = fields;
return {
...stringField.labels,
...labelsField?.values.get(index),
};
}
/**
* Converts dataFrames into LogsModel. This involves merging them into one list, sorting them and computing metadata
* like common labels.
@ -316,7 +343,7 @@ export function logSeriesToLogsModel(logSeries: DataFrame[]): LogsModel | undefi
if (logSeries.length === 0) {
return undefined;
}
const allLabels: Labels[] = [];
const allLabels: Labels[][] = [];
// Find the fields we care about and collect all labels
let allSeries: LogFields[] = [];
@ -330,43 +357,39 @@ export function logSeriesToLogsModel(logSeries: DataFrame[]): LogsModel | undefi
const fieldCache = new FieldCache(series);
const stringField = fieldCache.getFirstFieldOfType(FieldType.string);
const timeField = fieldCache.getFirstFieldOfType(FieldType.time);
const labelsField = fieldCache.getFieldByName('labels');
if (stringField !== undefined && timeField !== undefined) {
if (stringField?.labels) {
allLabels.push(stringField.labels);
}
allSeries.push({
const info = {
series,
timeField,
labelsField,
timeNanosecondField: fieldCache.hasFieldWithNameAndType('tsNs', FieldType.time)
? fieldCache.getFieldByName('tsNs')
: undefined,
stringField,
logLevelField: fieldCache.getFieldByName('level'),
idField: getIdField(fieldCache),
});
};
allSeries.push(info);
const labels = getAllLabels(info);
if (labels.length > 0) {
allLabels.push(labels);
}
}
});
}
const commonLabels = allLabels.length > 0 ? findCommonLabels(allLabels) : {};
const flatAllLabels = allLabels.flat();
const commonLabels = flatAllLabels.length > 0 ? findCommonLabels(flatAllLabels) : {};
const rows: LogRowModel[] = [];
let hasUniqueLabels = false;
for (const info of allSeries) {
const { timeField, timeNanosecondField, stringField, logLevelField, idField, series } = info;
const labels = stringField.labels;
const uniqueLabels = findUniqueLabels(labels, commonLabels);
if (Object.keys(uniqueLabels).length > 0) {
hasUniqueLabels = true;
}
let seriesLogLevel: LogLevel | undefined = undefined;
if (labels && Object.keys(labels).indexOf('level') !== -1) {
seriesLogLevel = getLogLevelFromKey(labels['level']);
}
for (let j = 0; j < series.length; j++) {
const ts = timeField.values.get(j);
@ -386,14 +409,20 @@ export function logSeriesToLogsModel(logSeries: DataFrame[]): LogsModel | undefi
const searchWords = series.meta && series.meta.searchWords ? series.meta.searchWords : [];
const entry = hasAnsi ? ansicolor.strip(message) : message;
const labels = getLabelsForFrameRow(info, j);
const uniqueLabels = findUniqueLabels(labels, commonLabels);
if (Object.keys(uniqueLabels).length > 0) {
hasUniqueLabels = true;
}
let logLevel = LogLevel.unknown;
if (logLevelField && logLevelField.values.get(j)) {
logLevel = getLogLevelFromKey(logLevelField.values.get(j));
} else if (seriesLogLevel) {
logLevel = seriesLogLevel;
const logLevelKey = (logLevelField && logLevelField.values.get(j)) || (labels && labels['level']);
if (logLevelKey) {
logLevel = getLogLevelFromKey(logLevelKey);
} else {
logLevel = getLogLevel(entry);
}
rows.push({
entryFieldIndex: stringField.index,
rowIndex: j,
@ -410,7 +439,7 @@ export function logSeriesToLogsModel(logSeries: DataFrame[]): LogsModel | undefi
searchWords,
entry,
raw: message,
labels: stringField.labels || {},
labels: labels || {},
uid: idField ? idField.values.get(j) : j.toString(),
});
}

View File

@ -40,8 +40,8 @@ import { getTimeSrv, TimeSrv } from 'app/features/dashboard/services/TimeSrv';
import { convertToWebSocketUrl } from 'app/core/utils/explore';
import {
lokiResultsToTableModel,
lokiStreamResultToDataFrame,
lokiStreamsToDataFrames,
lokiStreamsToRawDataFrame,
processRangeQueryResponse,
} from './result_transformer';
import { transformBackendResult } from './backendResultTransformer';
@ -54,7 +54,6 @@ import {
LokiRangeQueryRequest,
LokiResultType,
LokiStreamResponse,
LokiStreamResult,
} from './types';
import { LiveStreams, LokiLiveTarget } from './live_streams';
import LanguageProvider from './language_provider';
@ -538,9 +537,7 @@ export class LokiDatasource
}),
switchMap((res) =>
of({
data: res.data
? res.data.data.result.map((stream: LokiStreamResult) => lokiStreamResultToDataFrame(stream, reverse))
: [],
data: res.data ? [lokiStreamsToRawDataFrame(res.data.data.result, reverse)] : [],
})
)
)
@ -667,33 +664,31 @@ export class LokiDatasource
const splitKeys: string[] = tagKeys.split(',').filter((v: string) => v !== '');
for (const frame of data) {
const labels: { [key: string]: string } = {};
for (const field of frame.fields) {
if (field.labels) {
for (const [key, value] of Object.entries(field.labels)) {
labels[key] = String(value).trim();
}
}
}
const tags: string[] = [
...new Set(
Object.entries(labels).reduce((acc: string[], [key, val]) => {
if (val === '') {
return acc;
}
if (splitKeys.length && !splitKeys.includes(key)) {
return acc;
}
acc.push.apply(acc, [val]);
return acc;
}, [])
),
];
const view = new DataFrameView<{ ts: string; line: string }>(frame);
const view = new DataFrameView<{ ts: string; line: string; labels: Labels }>(frame);
view.forEach((row) => {
const { labels } = row;
const maybeDuplicatedTags = Object.entries(labels)
.map(([key, val]) => [key, val.trim()]) // trim all label-values
.filter(([key, val]) => {
if (val === '') {
// remove empty
return false;
}
// if tags are specified, remove label if does not match tags
if (splitKeys.length && !splitKeys.includes(key)) {
return false;
}
return true;
})
.map(([key, val]) => val); // keep only the label-value
// remove duplicates
const tags = Array.from(new Set(maybeDuplicatedTags));
annotations.push({
time: new Date(row.ts).valueOf(),
title: renderLegendFormat(titleFormat, labels),

View File

@ -30,11 +30,11 @@ export class LiveStreams {
}
const data = new CircularDataFrame({ capacity: target.size });
data.addField({ name: 'ts', type: FieldType.time, config: { displayName: 'Time' } });
data.addField({ name: 'tsNs', type: FieldType.time, config: { displayName: 'Time ns' } });
data.addField({ name: 'line', type: FieldType.string }).labels = parseLabels(target.query);
data.addField({ name: 'labels', type: FieldType.other }); // The labels for each line
data.addField({ name: 'ts', type: FieldType.time, config: { displayName: 'Time' } });
data.addField({ name: 'line', type: FieldType.string }).labels = parseLabels(target.query);
data.addField({ name: 'id', type: FieldType.string });
data.addField({ name: 'tsNs', type: FieldType.time, config: { displayName: 'Time ns' } });
data.meta = { ...data.meta, preferredVisualisationType: 'logs' };
data.refId = target.refId;

View File

@ -64,18 +64,20 @@ describe('loki result transformer', () => {
jest.clearAllMocks();
});
describe('lokiStreamResultToDataFrame', () => {
describe('lokiStreamsToRawDataFrame', () => {
it('converts streams to series', () => {
const data = streamResult.map((stream) => ResultTransformer.lokiStreamResultToDataFrame(stream));
const data = ResultTransformer.lokiStreamsToRawDataFrame(streamResult);
expect(data.length).toBe(2);
expect(data[0].fields[1].labels!['foo']).toEqual('bar');
expect(data[0].fields[0].values.get(0)).toEqual('2020-01-24T09:19:22.021Z');
expect(data[0].fields[1].values.get(0)).toEqual(streamResult[0].values[0][1]);
expect(data[0].fields[2].values.get(0)).toEqual('4b79cb43-81ce-52f7-b1e9-a207fff144dc');
expect(data[1].fields[0].values.get(0)).toEqual('2020-01-24T09:19:22.031Z');
expect(data[1].fields[1].values.get(0)).toEqual(streamResult[1].values[0][1]);
expect(data[1].fields[2].values.get(0)).toEqual('73d144f6-57f2-5a45-a49c-eb998e2006b1');
expect(data.fields[0].values.get(0)).toStrictEqual({ foo: 'bar' });
expect(data.fields[1].values.get(0)).toEqual('2020-01-24T09:19:22.021Z');
expect(data.fields[2].values.get(0)).toEqual(streamResult[0].values[0][1]);
expect(data.fields[3].values.get(0)).toEqual(streamResult[0].values[0][0]);
expect(data.fields[4].values.get(0)).toEqual('4b79cb43-81ce-52f7-b1e9-a207fff144dc');
expect(data.fields[0].values.get(1)).toStrictEqual({ bar: 'foo' });
expect(data.fields[1].values.get(1)).toEqual('2020-01-24T09:19:22.031Z');
expect(data.fields[2].values.get(1)).toEqual(streamResult[1].values[0][1]);
expect(data.fields[3].values.get(1)).toEqual(streamResult[1].values[0][0]);
expect(data.fields[4].values.get(1)).toEqual('73d144f6-57f2-5a45-a49c-eb998e2006b1');
});
it('should always generate unique ids for logs', () => {
@ -100,20 +102,19 @@ describe('loki result transformer', () => {
},
];
const data = streamResultWithDuplicateLogs.map((stream) => ResultTransformer.lokiStreamResultToDataFrame(stream));
const data = ResultTransformer.lokiStreamsToRawDataFrame(streamResultWithDuplicateLogs);
expect(data[0].fields[2].values.get(0)).toEqual('b48fe7dc-36aa-5d37-bfba-087ef810d8fa');
expect(data[0].fields[2].values.get(1)).toEqual('b48fe7dc-36aa-5d37-bfba-087ef810d8fa_1');
expect(data[0].fields[2].values.get(2)).not.toEqual('b48fe7dc-36aa-5d37-bfba-087ef810d8fa_2');
expect(data[0].fields[2].values.get(3)).toEqual('b48fe7dc-36aa-5d37-bfba-087ef810d8fa_2');
expect(data[1].fields[2].values.get(0)).not.toEqual('b48fe7dc-36aa-5d37-bfba-087ef810d8fa_3');
expect(data.fields[4].values.get(0)).toEqual('b48fe7dc-36aa-5d37-bfba-087ef810d8fa');
expect(data.fields[4].values.get(1)).toEqual('b48fe7dc-36aa-5d37-bfba-087ef810d8fa_1');
expect(data.fields[4].values.get(2)).not.toEqual('b48fe7dc-36aa-5d37-bfba-087ef810d8fa_2');
expect(data.fields[4].values.get(3)).toEqual('b48fe7dc-36aa-5d37-bfba-087ef810d8fa_2');
expect(data.fields[4].values.get(4)).not.toEqual('b48fe7dc-36aa-5d37-bfba-087ef810d8fa_3');
});
it('should append refId to the unique ids if refId is provided', () => {
const data = streamResult.map((stream) => ResultTransformer.lokiStreamResultToDataFrame(stream, false, 'B'));
expect(data.length).toBe(2);
expect(data[0].fields[2].values.get(0)).toEqual('4b79cb43-81ce-52f7-b1e9-a207fff144dc_B');
expect(data[1].fields[2].values.get(0)).toEqual('73d144f6-57f2-5a45-a49c-eb998e2006b1_B');
const data = ResultTransformer.lokiStreamsToRawDataFrame(streamResult, false, 'B');
expect(data.fields[4].values.get(0)).toEqual('4b79cb43-81ce-52f7-b1e9-a207fff144dc_B');
expect(data.fields[4].values.get(1)).toEqual('73d144f6-57f2-5a45-a49c-eb998e2006b1_B');
});
});
@ -159,11 +160,11 @@ describe('loki result transformer', () => {
};
const data = new CircularDataFrame({ capacity: 1 });
data.addField({ name: 'ts', type: FieldType.time, config: { displayName: 'Time' } });
data.addField({ name: 'tsNs', type: FieldType.time, config: { displayName: 'Time ns' } });
data.addField({ name: 'line', type: FieldType.string }).labels = { job: 'grafana' };
data.addField({ name: 'labels', type: FieldType.other });
data.addField({ name: 'ts', type: FieldType.time, config: { displayName: 'Time' } });
data.addField({ name: 'line', type: FieldType.string }).labels = { job: 'grafana' };
data.addField({ name: 'id', type: FieldType.string });
data.addField({ name: 'tsNs', type: FieldType.time, config: { displayName: 'Time ns' } });
ResultTransformer.appendResponseToBufferedData(tailResponse, data);
expect(data.get(0)).toEqual({
@ -196,11 +197,11 @@ describe('loki result transformer', () => {
};
const data = new CircularDataFrame({ capacity: 6 });
data.addField({ name: 'ts', type: FieldType.time, config: { displayName: 'Time' } });
data.addField({ name: 'tsNs', type: FieldType.time, config: { displayName: 'Time ns' } });
data.addField({ name: 'line', type: FieldType.string }).labels = { job: 'grafana' };
data.addField({ name: 'labels', type: FieldType.other });
data.addField({ name: 'ts', type: FieldType.time, config: { displayName: 'Time' } });
data.addField({ name: 'line', type: FieldType.string }).labels = { job: 'grafana' };
data.addField({ name: 'id', type: FieldType.string });
data.addField({ name: 'tsNs', type: FieldType.time, config: { displayName: 'Time ns' } });
data.refId = 'C';
ResultTransformer.appendResponseToBufferedData(tailResponse, data);

View File

@ -43,15 +43,10 @@ import { renderLegendFormat } from '../prometheus/legend';
const UUID_NAMESPACE = '6ec946da-0f49-47a8-983a-1d76d17e7c92';
/**
* Transforms LokiStreamResult structure into a dataFrame. Used when doing standard queries and newer version of Loki.
* Transforms LokiStreamResult structure into a dataFrame. Used when doing standard queries
*/
export function lokiStreamResultToDataFrame(stream: LokiStreamResult, reverse?: boolean, refId?: string): DataFrame {
const labels: Labels = stream.stream;
const labelsString = Object.entries(labels)
.map(([key, val]) => `${key}="${val}"`)
.sort()
.join('');
export function lokiStreamsToRawDataFrame(streams: LokiStreamResult[], reverse?: boolean, refId?: string): DataFrame {
const labels = new ArrayVector<{}>([]);
const times = new ArrayVector<string>([]);
const timesNs = new ArrayVector<string>([]);
const lines = new ArrayVector<string>([]);
@ -60,12 +55,21 @@ export function lokiStreamResultToDataFrame(stream: LokiStreamResult, reverse?:
// We need to store and track all used uids to ensure that uids are unique
const usedUids: { string?: number } = {};
for (const [ts, line] of stream.values) {
// num ns epoch in string, we convert it to iso string here so it matches old format
times.add(new Date(parseInt(ts.slice(0, -6), 10)).toISOString());
timesNs.add(ts);
lines.add(line);
uids.add(createUid(ts, labelsString, line, usedUids, refId));
for (const stream of streams) {
const streamLabels: Labels = stream.stream;
const labelsString = Object.entries(streamLabels)
.map(([key, val]) => `${key}="${val}"`)
.sort()
.join('');
for (const [ts, line] of stream.values) {
labels.add(streamLabels);
// num ns epoch in string, we convert it to iso string here so it matches old format
times.add(new Date(parseInt(ts.slice(0, -6), 10)).toISOString());
timesNs.add(ts);
lines.add(line);
uids.add(createUid(ts, labelsString, line, usedUids, refId));
}
}
return constructDataFrame(times, timesNs, lines, uids, labels, reverse, refId);
@ -79,17 +83,18 @@ function constructDataFrame(
timesNs: ArrayVector<string>,
lines: ArrayVector<string>,
uids: ArrayVector<string>,
labels: Labels,
labels: ArrayVector<{}>,
reverse?: boolean,
refId?: string
) {
const dataFrame = {
refId,
fields: [
{ name: 'labels', type: FieldType.other, config: {}, values: labels },
{ name: 'ts', type: FieldType.time, config: { displayName: 'Time' }, values: times }, // Time
{ name: 'line', type: FieldType.string, config: {}, values: lines, labels }, // Line - needs to be the first field with string type
{ name: 'id', type: FieldType.string, config: {}, values: uids },
{ name: 'line', type: FieldType.string, config: {}, values: lines }, // Line - needs to be the first field with string type
{ name: 'tsNs', type: FieldType.time, config: { displayName: 'Time ns' }, values: timesNs }, // Time
{ name: 'id', type: FieldType.string, config: {}, values: uids },
],
length: times.length,
};
@ -128,11 +133,11 @@ export function appendResponseToBufferedData(response: LokiTailResponse, data: M
}
}
const tsField = data.fields[0];
const tsNsField = data.fields[1];
const labelsField = data.fields[0];
const tsField = data.fields[1];
const lineField = data.fields[2];
const labelsField = data.fields[3];
const idField = data.fields[4];
const idField = data.fields[3];
const tsNsField = data.fields[4];
// We are comparing used ids only within the received stream. This could be a problem if the same line + labels + nanosecond timestamp came in 2 separate batches.
// As this is very unlikely, and the result would only affect live-tailing css animation we have decided to not compare all received uids from data param as this would slow down processing.
@ -346,20 +351,12 @@ export function lokiStreamsToDataFrames(
preferredVisualisationType: 'logs',
};
const series: DataFrame[] = data.map((stream) => {
const dataFrame = lokiStreamResultToDataFrame(stream, reverse, target.refId);
enhanceDataFrame(dataFrame, config);
const dataFrame = lokiStreamsToRawDataFrame(data, reverse, target.refId);
enhanceDataFrame(dataFrame, config);
if (meta.custom && dataFrame.fields.some((f) => f.labels && Object.keys(f.labels).some((l) => l === '__error__'))) {
meta.custom.error = 'Error when parsing some of the logs';
}
return {
...dataFrame,
refId: target.refId,
meta,
};
});
if (meta.custom && dataFrame.fields.some((f) => f.labels && Object.keys(f.labels).some((l) => l === '__error__'))) {
meta.custom.error = 'Error when parsing some of the logs';
}
if (stats.length && !data.length) {
return [
@ -372,7 +369,13 @@ export function lokiStreamsToDataFrames(
];
}
return series;
return [
{
...dataFrame,
refId: target.refId,
meta,
},
];
}
/**