Loki: remove frontend mode code (#50185)

* feature-flag removed

* loki: remove frontend-mode
This commit is contained in:
Gábor Farkas 2022-06-08 08:14:34 +02:00 committed by GitHub
parent fd664e4beb
commit 50145e1617
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 262 additions and 1406 deletions

View File

@ -28,7 +28,6 @@ export interface FeatureToggles {
queryOverLive?: boolean;
panelTitleSearch?: boolean;
tempoServiceGraph?: boolean;
lokiBackendMode?: boolean;
prometheus_azure_auth?: boolean;
prometheusAzureOverrideAudience?: boolean;
influxdbBackendMigration?: boolean;

View File

@ -72,13 +72,6 @@ var (
State: FeatureStateBeta,
FrontendOnly: true,
},
{
Name: "lokiBackendMode",
Description: "Loki datasource works as backend datasource",
State: FeatureStateAlpha,
FrontendOnly: true,
Expression: "true", // Enabled by default
},
{
Name: "prometheus_azure_auth",
Description: "Experimental. Azure authentication for Prometheus datasource",

View File

@ -55,10 +55,6 @@ const (
// show service
FlagTempoServiceGraph = "tempoServiceGraph"
// FlagLokiBackendMode
// Loki datasource works as backend datasource
FlagLokiBackendMode = "lokiBackendMode"
// FlagPrometheusAzureAuth
// Experimental. Azure authentication for Prometheus datasource
FlagPrometheusAzureAuth = "prometheus_azure_auth"

View File

@ -28,20 +28,20 @@ const inputFrame: DataFrame = {
},
fields: [
{
name: 'time',
name: 'Time',
type: FieldType.time,
config: {},
values: new ArrayVector([1645030244810, 1645030247027]),
},
{
name: 'value',
name: 'Line',
type: FieldType.string,
config: {},
values: new ArrayVector(['line1', 'line2']),
},
{
name: 'labels',
type: FieldType.string,
type: FieldType.other,
config: {},
values: new ArrayVector([
{ level: 'info', code: '41🌙' },

View File

@ -1,37 +1,30 @@
import { lastValueFrom, of, throwError } from 'rxjs';
import { of } from 'rxjs';
import { take } from 'rxjs/operators';
import { createFetchResponse } from 'test/helpers/createFetchResponse';
import { getQueryOptions } from 'test/helpers/getQueryOptions';
import {
AbstractLabelOperator,
AnnotationQueryRequest,
CoreApp,
ArrayVector,
DataFrame,
dataFrameToJSON,
DataQueryResponse,
dateTime,
FieldCache,
FieldType,
LogRowModel,
MutableDataFrame,
toUtc,
} from '@grafana/data';
import { BackendSrvRequest, FetchResponse } from '@grafana/runtime';
import { backendSrv } from 'app/core/services/backend_srv';
import { BackendSrvRequest, FetchResponse, setBackendSrv, getBackendSrv, BackendSrv } from '@grafana/runtime';
import { TimeSrv } from 'app/features/dashboard/services/TimeSrv';
import { TemplateSrv } from 'app/features/templating/template_srv';
import { initialCustomVariableModelState } from '../../../features/variables/custom/reducer';
import { CustomVariableModel } from '../../../features/variables/types';
import { isMetricsQuery, LokiDatasource, RangeQueryOptions } from './datasource';
import { isMetricsQuery, LokiDatasource } from './datasource';
import { makeMockLokiDatasource } from './mocks';
import { LokiQuery, LokiResponse, LokiResultType } from './types';
jest.mock('@grafana/runtime', () => ({
// @ts-ignore
...jest.requireActual('@grafana/runtime'),
getBackendSrv: () => backendSrv,
}));
import { LokiQuery } from './types';
const rawRange = {
from: toUtc('2018-04-25 10:00'),
@ -51,18 +44,60 @@ const templateSrvStub = {
replace: jest.fn((a: string, ...rest: any) => a),
};
const testLogsResponse: FetchResponse<LokiResponse> = {
data: {
data: {
resultType: LokiResultType.Stream,
result: [
{
stream: {},
values: [['1573646419522934000', 'hello']],
},
],
const testFrame: DataFrame = {
refId: 'A',
fields: [
{
name: 'Time',
type: FieldType.time,
config: {},
values: new ArrayVector([1, 2]),
},
{
name: 'Line',
type: FieldType.string,
config: {},
values: new ArrayVector(['line1', 'line2']),
},
{
name: 'labels',
type: FieldType.other,
config: {},
values: new ArrayVector([
{
label: 'value',
label2: 'value ',
},
{
label: '',
label2: 'value2',
label3: ' ',
},
]),
},
{
name: 'tsNs',
type: FieldType.string,
config: {},
values: new ArrayVector(['1000000', '2000000']),
},
{
name: 'id',
type: FieldType.string,
config: {},
values: new ArrayVector(['id1', 'id2']),
},
],
length: 2,
};
const testLogsResponse: FetchResponse = {
data: {
results: {
A: {
frames: [dataFrameToJSON(testFrame)],
},
},
status: 'success',
},
ok: true,
headers: {} as unknown as Headers,
@ -74,29 +109,6 @@ const testLogsResponse: FetchResponse<LokiResponse> = {
config: {} as unknown as BackendSrvRequest,
};
const testMetricsResponse: FetchResponse<LokiResponse> = {
data: {
data: {
resultType: LokiResultType.Matrix,
result: [
{
metric: {},
values: [[1605715380, '1.1']],
},
],
},
status: 'success',
},
ok: true,
headers: {} as unknown as Headers,
redirected: false,
status: 200,
statusText: 'OK',
type: 'basic',
url: '',
config: {} as unknown as BackendSrvRequest,
};
interface AdHocFilter {
condition: string;
key: string;
@ -105,113 +117,22 @@ interface AdHocFilter {
}
describe('LokiDatasource', () => {
const fetchMock = jest.spyOn(backendSrv, 'fetch');
let origBackendSrv: BackendSrv;
beforeEach(() => {
jest.clearAllMocks();
fetchMock.mockImplementation(() => of(createFetchResponse({})));
origBackendSrv = getBackendSrv();
});
describe('when creating range query', () => {
let ds: LokiDatasource;
let adjustIntervalSpy: jest.SpyInstance;
beforeEach(() => {
ds = createLokiDSForTests();
adjustIntervalSpy = jest.spyOn(ds, 'adjustInterval');
});
it('should use default intervalMs if one is not provided', () => {
const target = { expr: '{job="grafana"}', refId: 'B' };
const raw = { from: 'now', to: 'now-1h' };
const range = { from: dateTime(), to: dateTime(), raw: raw };
const options = {
range,
};
const req = ds.createRangeQuery(target, options as any, 1000);
expect(req.start).toBeDefined();
expect(req.end).toBeDefined();
expect(adjustIntervalSpy).toHaveBeenCalledWith(1000, 1, expect.anything());
});
it('should use provided intervalMs', () => {
const target = { expr: '{job="grafana"}', refId: 'B' };
const raw = { from: 'now', to: 'now-1h' };
const range = { from: dateTime(), to: dateTime(), raw: raw };
const options = {
range,
intervalMs: 2000,
};
const req = ds.createRangeQuery(target, options as any, 1000);
expect(req.start).toBeDefined();
expect(req.end).toBeDefined();
expect(adjustIntervalSpy).toHaveBeenCalledWith(2000, 1, expect.anything());
});
it('should set the minimal step to 1ms', () => {
const target = { expr: '{job="grafana"}', refId: 'B' };
const raw = { from: 'now', to: 'now-1h' };
const range = { from: dateTime('2020-10-14T00:00:00'), to: dateTime('2020-10-14T00:00:01'), raw: raw };
const options = {
range,
intervalMs: 0.0005,
};
const req = ds.createRangeQuery(target, options as any, 1000);
expect(req.start).toBeDefined();
expect(req.end).toBeDefined();
expect(adjustIntervalSpy).toHaveBeenCalledWith(0.0005, expect.anything(), 1000);
// Step is in seconds (1 ms === 0.001 s)
expect(req.step).toEqual(0.001);
});
describe('log volume hint', () => {
let options: RangeQueryOptions;
beforeEach(() => {
const raw = { from: 'now', to: 'now-1h' };
const range = { from: dateTime(), to: dateTime(), raw: raw };
options = {
range,
} as unknown as RangeQueryOptions;
});
it('should add volume hint param for log volume queries', () => {
const target = { expr: '{job="grafana"}', refId: 'B', volumeQuery: true };
ds.runRangeQuery(target, options);
expect(backendSrv.fetch).toBeCalledWith(
expect.objectContaining({
headers: {
'X-Query-Tags': 'Source=logvolhist',
},
})
);
});
it('should not add volume hint param for regular queries', () => {
const target = { expr: '{job="grafana"}', refId: 'B', volumeQuery: false };
ds.runRangeQuery(target, options);
expect(backendSrv.fetch).not.toBeCalledWith(
expect.objectContaining({
headers: {
'X-Query-Tags': 'Source=logvolhist',
},
})
);
});
});
afterEach(() => {
setBackendSrv(origBackendSrv);
});
describe('when doing logs queries with limits', () => {
const runLimitTest = async ({
maxDataPoints = 123,
queryMaxLines,
dsMaxLines = 456,
expectedLimit,
expr = '{label="val"}',
}: any) => {
const runTest = async (
queryMaxLines: number | undefined,
dsMaxLines: number | undefined,
expectedMaxLines: number
) => {
let settings: any = {
url: 'myloggingurl',
jsonData: {
@ -226,264 +147,85 @@ describe('LokiDatasource', () => {
const ds = new LokiDatasource(settings, templateSrvMock, timeSrvStub as any);
const options = getQueryOptions<LokiQuery>({ targets: [{ expr, refId: 'B', maxLines: queryMaxLines }] });
options.maxDataPoints = maxDataPoints;
// we need to check the final query before it is sent out,
// and applyTemplateVariables is a convenient place to do that.
const spy = jest.spyOn(ds, 'applyTemplateVariables');
fetchMock.mockImplementation(() => of(testLogsResponse));
const options = getQueryOptions<LokiQuery>({
targets: [{ expr: '{a="b"}', refId: 'B', maxLines: queryMaxLines }],
});
const fetchMock = jest.fn().mockReturnValue(of({ data: testLogsResponse }));
setBackendSrv({ ...origBackendSrv, fetch: fetchMock });
await expect(ds.query(options).pipe(take(1))).toEmitValuesWith(() => {
expect(fetchMock.mock.calls.length).toBe(1);
expect(fetchMock.mock.calls[0][0].url).toContain(`limit=${expectedLimit}`);
expect(spy.mock.calls[0][0].maxLines).toBe(expectedMaxLines);
});
};
it('should use datasource max lines when no limit given and it is log query', async () => {
await runLimitTest({ expectedLimit: 456 });
it('should use datasource max lines when no query max lines', async () => {
await runTest(undefined, 40, 40);
});
it('should use custom max lines from query if set and it is logs query', async () => {
await runLimitTest({ queryMaxLines: 20, expectedLimit: 20 });
it('should use query max lines, if exists', async () => {
await runTest(80, undefined, 80);
});
it('should use custom max lines from query if set and it is logs query even if it is higher than data source limit', async () => {
await runLimitTest({ queryMaxLines: 500, expectedLimit: 500 });
});
it('should use maxDataPoints if it is metrics query', async () => {
await runLimitTest({ expr: 'rate({label="val"}[10m])', expectedLimit: 123 });
});
it('should use maxDataPoints if it is metrics query and using search', async () => {
await runLimitTest({ expr: 'rate({label="val"}[10m])', expectedLimit: 123 });
it('should use query max lines, if both exist, even if it is higher than ds max lines', async () => {
await runTest(80, 40, 80);
});
});
describe('when querying', () => {
function setup(expr: string, app: CoreApp, instant?: boolean, range?: boolean) {
const ds = createLokiDSForTests();
const options = getQueryOptions<LokiQuery>({
targets: [{ expr, refId: 'B', instant, range }],
app,
});
ds.runInstantQuery = jest.fn(() => of({ data: [] }));
ds.runRangeQuery = jest.fn(() => of({ data: [] }));
return { ds, options };
}
describe('When using adhoc filters', () => {
const DEFAULT_EXPR = 'rate({bar="baz", job="foo"} |= "bar" [5m])';
const query: LokiQuery = { expr: DEFAULT_EXPR, refId: 'A' };
const originalAdhocFiltersMock = templateSrvStub.getAdhocFilters();
const ds = new LokiDatasource({} as any, templateSrvStub as any, timeSrvStub as any);
const metricsQuery = 'rate({job="grafana"}[10m])';
const logsQuery = '{job="grafana"} |= "foo"';
it('should run logs instant if only instant is selected', async () => {
const { ds, options } = setup(logsQuery, CoreApp.Explore, true, false);
await lastValueFrom(ds.query(options));
expect(ds.runInstantQuery).toBeCalled();
expect(ds.runRangeQuery).not.toBeCalled();
afterAll(() => {
templateSrvStub.getAdhocFilters.mockReturnValue(originalAdhocFiltersMock);
});
it('should run metrics instant if only instant is selected', async () => {
const { ds, options } = setup(metricsQuery, CoreApp.Explore, true, false);
lastValueFrom(await ds.query(options));
expect(ds.runInstantQuery).toBeCalled();
expect(ds.runRangeQuery).not.toBeCalled();
it('should not modify expression with no filters', async () => {
expect(ds.applyTemplateVariables(query, {}).expr).toBe(DEFAULT_EXPR);
});
it('should run only logs range query if only range is selected', async () => {
const { ds, options } = setup(logsQuery, CoreApp.Explore, false, true);
lastValueFrom(await ds.query(options));
expect(ds.runInstantQuery).not.toBeCalled();
expect(ds.runRangeQuery).toBeCalled();
});
it('should run only metrics range query if only range is selected', async () => {
const { ds, options } = setup(metricsQuery, CoreApp.Explore, false, true);
lastValueFrom(await ds.query(options));
expect(ds.runInstantQuery).not.toBeCalled();
expect(ds.runRangeQuery).toBeCalled();
});
it('should run only logs range query if no query type is selected in Explore', async () => {
const { ds, options } = setup(logsQuery, CoreApp.Explore);
lastValueFrom(await ds.query(options));
expect(ds.runInstantQuery).not.toBeCalled();
expect(ds.runRangeQuery).toBeCalled();
});
it('should run only metrics range query if no query type is selected in Explore', async () => {
const { ds, options } = setup(metricsQuery, CoreApp.Explore);
lastValueFrom(await ds.query(options));
expect(ds.runInstantQuery).not.toBeCalled();
expect(ds.runRangeQuery).toBeCalled();
});
it('should run only logs range query in Dashboard', async () => {
const { ds, options } = setup(logsQuery, CoreApp.Dashboard);
lastValueFrom(await ds.query(options));
expect(ds.runInstantQuery).not.toBeCalled();
expect(ds.runRangeQuery).toBeCalled();
});
it('should run only metrics range query in Dashboard', async () => {
const { ds, options } = setup(metricsQuery, CoreApp.Dashboard);
lastValueFrom(await ds.query(options));
expect(ds.runInstantQuery).not.toBeCalled();
expect(ds.runRangeQuery).toBeCalled();
});
it('should return dataframe data for metrics range queries', async () => {
const ds = createLokiDSForTests();
const options = getQueryOptions<LokiQuery>({
targets: [{ expr: metricsQuery, refId: 'B', range: true }],
app: CoreApp.Explore,
});
fetchMock.mockImplementation(() => of(testMetricsResponse));
await expect(ds.query(options)).toEmitValuesWith((received) => {
const result = received[0];
const frame = result.data[0] as DataFrame;
expect(frame.meta?.preferredVisualisationType).toBe('graph');
expect(frame.refId).toBe('B');
frame.fields.forEach((field) => {
const value = field.values.get(0);
if (field.type === FieldType.time) {
expect(value).toBe(1605715380000);
} else {
expect(value).toBe(1.1);
}
});
});
});
it('should return series data for logs range query', async () => {
const ds = createLokiDSForTests();
const options = getQueryOptions<LokiQuery>({
targets: [{ expr: logsQuery, refId: 'B' }],
});
fetchMock.mockImplementation(() => of(testLogsResponse));
await expect(ds.query(options)).toEmitValuesWith((received) => {
const result = received[0];
const dataFrame = result.data[0] as DataFrame;
const fieldCache = new FieldCache(dataFrame);
expect(fieldCache.getFieldByName('Line')?.values.get(0)).toBe('hello');
expect(dataFrame.meta?.limit).toBe(20);
expect(dataFrame.meta?.searchWords).toEqual(['foo']);
});
});
it('should return custom error message when Loki returns escaping error', async () => {
const ds = createLokiDSForTests();
const options = getQueryOptions<LokiQuery>({
targets: [{ expr: '{job="gra\\fana"}', refId: 'B' }],
});
fetchMock.mockImplementation(() =>
throwError({
data: {
message: 'parse error at line 1, col 6: invalid char escape',
},
status: 400,
statusText: 'Bad Request',
})
);
await expect(ds.query(options)).toEmitValuesWith((received) => {
const err: any = received[0];
expect(err.data.message).toBe(
'Error: parse error at line 1, col 6: invalid char escape. Make sure that all special characters are escaped with \\. For more information on escaping of special characters visit LogQL documentation at https://grafana.com/docs/loki/latest/logql/.'
);
});
});
describe('When using adhoc filters', () => {
const DEFAULT_EXPR = 'rate({bar="baz", job="foo"} |= "bar" [5m])';
const options = {
targets: [{ expr: DEFAULT_EXPR }],
};
const originalAdhocFiltersMock = templateSrvStub.getAdhocFilters();
const ds = new LokiDatasource({} as any, templateSrvStub as any, timeSrvStub as any);
ds.runRangeQuery = jest.fn(() => of({ data: [] }));
afterAll(() => {
templateSrvStub.getAdhocFilters.mockReturnValue(originalAdhocFiltersMock);
});
it('should not modify expression with no filters', async () => {
await lastValueFrom(ds.query(options as any));
expect(ds.runRangeQuery).toBeCalledWith({ expr: DEFAULT_EXPR }, expect.anything());
});
it('should add filters to expression', async () => {
templateSrvStub.getAdhocFilters.mockReturnValue([
{
key: 'k1',
operator: '=',
value: 'v1',
},
{
key: 'k2',
operator: '!=',
value: 'v2',
},
]);
await lastValueFrom(ds.query(options as any));
expect(ds.runRangeQuery).toBeCalledWith(
{ expr: 'rate({bar="baz",job="foo",k1="v1",k2!="v2"} |= "bar" [5m])' },
expect.anything()
);
});
it('should add escaping if needed to regex filter expressions', async () => {
templateSrvStub.getAdhocFilters.mockReturnValue([
{
key: 'k1',
operator: '=~',
value: 'v.*',
},
{
key: 'k2',
operator: '=~',
value: `v'.*`,
},
]);
await lastValueFrom(ds.query(options as any));
expect(ds.runRangeQuery).toBeCalledWith(
{ expr: 'rate({bar="baz",job="foo",k1=~"v\\\\.\\\\*",k2=~"v\'\\\\.\\\\*"} |= "bar" [5m])' },
expect.anything()
);
});
});
describe('__range, __range_s and __range_ms variables', () => {
const options = {
targets: [{ expr: 'rate(process_cpu_seconds_total[$__range])', refId: 'A', stepInterval: '2s' }],
range: {
from: rawRange.from,
to: rawRange.to,
raw: rawRange,
it('should add filters to expression', async () => {
templateSrvStub.getAdhocFilters.mockReturnValue([
{
key: 'k1',
operator: '=',
value: 'v1',
},
};
{
key: 'k2',
operator: '!=',
value: 'v2',
},
]);
const ds = new LokiDatasource({} as any, templateSrvStub as any, timeSrvStub as any);
expect(ds.applyTemplateVariables(query, {}).expr).toBe(
'rate({bar="baz",job="foo",k1="v1",k2!="v2"} |= "bar" [5m])'
);
});
beforeEach(() => {
templateSrvStub.replace.mockClear();
});
it('should be correctly interpolated', () => {
ds.query(options as any);
const range = templateSrvStub.replace.mock.calls[0][1].__range;
const rangeMs = templateSrvStub.replace.mock.calls[0][1].__range_ms;
const rangeS = templateSrvStub.replace.mock.calls[0][1].__range_s;
expect(range).toEqual({ text: '3600s', value: '3600s' });
expect(rangeMs).toEqual({ text: 3600000, value: 3600000 });
expect(rangeS).toEqual({ text: 3600, value: 3600 });
});
it('should add escaping if needed to regex filter expressions', async () => {
templateSrvStub.getAdhocFilters.mockReturnValue([
{
key: 'k1',
operator: '=~',
value: 'v.*',
},
{
key: 'k2',
operator: '=~',
value: `v'.*`,
},
]);
expect(ds.applyTemplateVariables(query, {}).expr).toBe(
'rate({bar="baz",job="foo",k1=~"v\\\\.\\\\*",k2=~"v\'\\\\.\\\\*"} |= "bar" [5m])'
);
});
});
@ -589,45 +331,65 @@ describe('LokiDatasource', () => {
});
describe('when calling annotationQuery', () => {
const getTestContext = (response: any, options: any = []) => {
const getTestContext = (frame: DataFrame, options: any = []) => {
const query = makeAnnotationQueryRequest(options);
fetchMock.mockImplementation(() => of(response));
const ds = createLokiDSForTests();
const promise = ds.annotationQuery(query);
return { promise };
const response: DataQueryResponse = {
data: [frame],
};
ds.query = () => of(response);
return ds.annotationQuery(query);
};
it('should transform the loki data to annotation response', async () => {
const response: FetchResponse = {
data: {
data: {
resultType: LokiResultType.Stream,
result: [
{
stream: {
label: 'value',
label2: 'value ',
},
values: [['1549016857498000000', 'hello']],
},
{
stream: {
label: '', // empty value gets filtered
label2: 'value2',
label3: ' ', // whitespace value gets trimmed then filtered
},
values: [['1549024057498000000', 'hello 2']],
},
],
const testFrame: DataFrame = {
refId: 'A',
fields: [
{
name: 'Time',
type: FieldType.time,
config: {},
values: new ArrayVector([1, 2]),
},
status: 'success',
},
} as unknown as FetchResponse;
const { promise } = getTestContext(response, { stepInterval: '15s' });
const res = await promise;
{
name: 'Line',
type: FieldType.string,
config: {},
values: new ArrayVector(['hello', 'hello 2']),
},
{
name: 'labels',
type: FieldType.other,
config: {},
values: new ArrayVector([
{
label: 'value',
label2: 'value ',
},
{
label: '',
label2: 'value2',
label3: ' ',
},
]),
},
{
name: 'tsNs',
type: FieldType.string,
config: {},
values: new ArrayVector(['1000000', '2000000']),
},
{
name: 'id',
type: FieldType.string,
config: {},
values: new ArrayVector(['id1', 'id2']),
},
],
length: 2,
};
const res = await getTestContext(testFrame, { stepInterval: '15s' });
expect(res.length).toBe(2);
expect(res[0].text).toBe('hello');
@ -636,30 +398,53 @@ describe('LokiDatasource', () => {
expect(res[1].text).toBe('hello 2');
expect(res[1].tags).toEqual(['value2']);
});
describe('Formatting', () => {
const response: FetchResponse = {
data: {
data: {
resultType: LokiResultType.Stream,
result: [
{
stream: {
label: 'value',
label2: 'value2',
label3: 'value3',
},
values: [['1549016857498000000', 'hello']],
},
],
const testFrame: DataFrame = {
refId: 'A',
fields: [
{
name: 'Time',
type: FieldType.time,
config: {},
values: new ArrayVector([1]),
},
status: 'success',
},
} as unknown as FetchResponse;
{
name: 'Line',
type: FieldType.string,
config: {},
values: new ArrayVector(['hello']),
},
{
name: 'labels',
type: FieldType.other,
config: {},
values: new ArrayVector([
{
label: 'value',
label2: 'value2',
label3: 'value3',
},
]),
},
{
name: 'tsNs',
type: FieldType.string,
config: {},
values: new ArrayVector(['1000000']),
},
{
name: 'id',
type: FieldType.string,
config: {},
values: new ArrayVector(['id1']),
},
],
length: 1,
};
describe('When tagKeys is set', () => {
it('should only include selected labels', async () => {
const { promise } = getTestContext(response, { tagKeys: 'label2,label3', stepInterval: '15s' });
const res = await promise;
const res = await getTestContext(testFrame, { tagKeys: 'label2,label3', stepInterval: '15s' });
expect(res.length).toBe(1);
expect(res[0].text).toBe('hello');
@ -668,9 +453,7 @@ describe('LokiDatasource', () => {
});
describe('When textFormat is set', () => {
it('should fromat the text accordingly', async () => {
const { promise } = getTestContext(response, { textFormat: 'hello {{label2}}', stepInterval: '15s' });
const res = await promise;
const res = await getTestContext(testFrame, { textFormat: 'hello {{label2}}', stepInterval: '15s' });
expect(res.length).toBe(1);
expect(res[0].text).toBe('hello value2');
@ -678,9 +461,7 @@ describe('LokiDatasource', () => {
});
describe('When titleFormat is set', () => {
it('should fromat the title accordingly', async () => {
const { promise } = getTestContext(response, { titleFormat: 'Title {{label2}}', stepInterval: '15s' });
const res = await promise;
const res = await getTestContext(testFrame, { titleFormat: 'Title {{label2}}', stepInterval: '15s' });
expect(res.length).toBe(1);
expect(res[0].title).toBe('Title value2');
@ -920,26 +701,6 @@ describe('LokiDatasource', () => {
});
});
describe('adjustInterval', () => {
const dynamicInterval = 15;
const range = 1642;
const resolution = 1;
const ds = createLokiDSForTests();
it('should return the interval as a factor of dynamicInterval and resolution', () => {
let interval = ds.adjustInterval(dynamicInterval, resolution, range);
expect(interval).toBe(resolution * dynamicInterval);
});
it('should not return a value less than the safe interval', () => {
let safeInterval = range / 11000;
if (safeInterval > 1) {
safeInterval = Math.ceil(safeInterval);
}
const unsafeInterval = safeInterval - 0.01;
let interval = ds.adjustInterval(unsafeInterval, resolution, range);
expect(interval).toBeGreaterThanOrEqual(safeInterval);
});
});
describe('prepareLogRowContextQueryTarget', () => {
const ds = createLokiDSForTests();
it('creates query with only labels from /labels API', () => {
@ -1002,7 +763,7 @@ describe('LokiDatasource', () => {
describe('importing queries', () => {
it('keeps all labels when no labels are loaded', async () => {
const ds = createLokiDSForTests();
fetchMock.mockImplementation(() => of(createFetchResponse({ data: [] })));
ds.getResource = () => Promise.resolve({ data: [] });
const queries = await ds.importFromAbstractQueries([
{
refId: 'A',
@ -1017,7 +778,7 @@ describe('LokiDatasource', () => {
it('filters out non existing labels', async () => {
const ds = createLokiDSForTests();
fetchMock.mockImplementation(() => of(createFetchResponse({ data: ['foo'] })));
ds.getResource = () => Promise.resolve({ data: ['foo'] });
const queries = await ds.importFromAbstractQueries([
{
refId: 'A',

View File

@ -1,5 +1,5 @@
// Libraries
import { cloneDeep, isEmpty, map as lodashMap } from 'lodash';
import { cloneDeep, map as lodashMap } from 'lodash';
import Prism from 'prismjs';
import { lastValueFrom, merge, Observable, of, throwError } from 'rxjs';
import { catchError, map, switchMap } from 'rxjs/operators';
@ -29,13 +29,12 @@ import {
LoadingState,
LogLevel,
LogRowModel,
QueryResultMeta,
ScopedVars,
TimeRange,
rangeUtil,
toUtc,
} from '@grafana/data';
import { BackendSrvRequest, FetchError, getBackendSrv, config, DataSourceWithBackend } from '@grafana/runtime';
import { FetchError, config, DataSourceWithBackend } from '@grafana/runtime';
import { RowContextOptions } from '@grafana/ui/src/components/Logs/LogRowContextProvider';
import { queryLogsVolume } from 'app/core/logs_model';
import { convertToWebSocketUrl } from 'app/core/utils/explore';
@ -48,38 +47,20 @@ import { renderLegendFormat } from '../prometheus/legend';
import { addLabelToQuery } from './add_label_to_query';
import { transformBackendResult } from './backendResultTransformer';
import { LokiAnnotationsQueryEditor } from './components/AnnotationsQueryEditor';
import { DEFAULT_RESOLUTION } from './components/LokiOptionFields';
import LanguageProvider from './language_provider';
import { escapeLabelValueInSelector } from './language_utils';
import { LiveStreams, LokiLiveTarget } from './live_streams';
import { addParsedLabelToQuery, getNormalizedLokiQuery, queryHasPipeParser } from './query_utils';
import { lokiResultsToTableModel, lokiStreamsToDataFrames, processRangeQueryResponse } from './result_transformer';
import { sortDataFrameByTime } from './sortDataFrame';
import { doLokiChannelStream } from './streaming';
import syntax from './syntax';
import {
LokiOptions,
LokiQuery,
LokiQueryDirection,
LokiQueryType,
LokiRangeQueryRequest,
LokiResultType,
LokiStreamResponse,
} from './types';
import { LokiOptions, LokiQuery, LokiQueryDirection, LokiQueryType } from './types';
export type RangeQueryOptions = DataQueryRequest<LokiQuery> | AnnotationQueryRequest<LokiQuery>;
export const DEFAULT_MAX_LINES = 1000;
export const LOKI_ENDPOINT = '/loki/api/v1';
const NS_IN_MS = 1000000;
const RANGE_QUERY_ENDPOINT = `${LOKI_ENDPOINT}/query_range`;
const INSTANT_QUERY_ENDPOINT = `${LOKI_ENDPOINT}/query`;
const DEFAULT_QUERY_PARAMS: Partial<LokiRangeQueryRequest> = {
limit: DEFAULT_MAX_LINES,
query: '',
};
function makeRequest(query: LokiQuery, range: TimeRange, app: CoreApp, requestId: string): DataQueryRequest<LokiQuery> {
const intervalInfo = rangeUtil.calculateInterval(range, 1);
return {
@ -106,7 +87,6 @@ export class LokiDatasource
private streams = new LiveStreams();
languageProvider: LanguageProvider;
maxLines: number;
useBackendMode: boolean;
constructor(
private instanceSettings: DataSourceInstanceSettings<LokiOptions>,
@ -118,30 +98,11 @@ export class LokiDatasource
this.languageProvider = new LanguageProvider(this);
const settingsData = instanceSettings.jsonData || {};
this.maxLines = parseInt(settingsData.maxLines ?? '0', 10) || DEFAULT_MAX_LINES;
this.useBackendMode = config.featureToggles.lokiBackendMode ?? false;
this.annotations = {
QueryEditor: LokiAnnotationsQueryEditor,
};
}
_request(apiUrl: string, data?: any, options?: Partial<BackendSrvRequest>): Observable<Record<string, any>> {
const baseUrl = this.instanceSettings.url;
const params = data ? serializeParams(data) : '';
const url = `${baseUrl}${apiUrl}${params.length ? `?${params}` : ''}`;
if (this.instanceSettings.withCredentials || this.instanceSettings.basicAuth) {
options = { ...options, withCredentials: true };
if (this.instanceSettings.basicAuth) {
options.headers = { ...options.headers, Authorization: this.instanceSettings.basicAuth };
}
}
const req = {
...options,
url,
};
return getBackendSrv().fetch<Record<string, any>>(req);
}
getLogsVolumeDataProvider(request: DataQueryRequest<LokiQuery>): Observable<DataQueryResponse> | undefined {
const isLogsVolumeAvailable = request.targets.some((target) => target.expr && !isMetricsQuery(target.expr));
if (!isLogsVolumeAvailable) {
@ -168,79 +129,37 @@ export class LokiDatasource
}
query(request: DataQueryRequest<LokiQuery>): Observable<DataQueryResponse> {
const subQueries: Array<Observable<DataQueryResponse>> = [];
const scopedVars = {
...request.scopedVars,
...this.getRangeScopedVars(request.range),
const queries = request.targets
.map(getNormalizedLokiQuery) // "fix" the `.queryType` prop
.map((q) => ({ ...q, maxLines: q.maxLines || this.maxLines })); // set maxLines if not set
const fixedRequest = {
...request,
targets: queries,
};
if (this.useBackendMode) {
const queries = request.targets
.map(getNormalizedLokiQuery) // "fix" the `.queryType` prop
.map((q) => ({ ...q, maxLines: q.maxLines || this.maxLines })); // set maxLines if not set
const fixedRequest = {
...request,
targets: queries,
const streamQueries = fixedRequest.targets.filter((q) => q.queryType === LokiQueryType.Stream);
if (config.featureToggles.lokiLive && streamQueries.length > 0 && fixedRequest.rangeRaw?.to === 'now') {
// this is still an in-development feature,
// we do not support mixing stream-queries with normal-queries for now.
const streamRequest = {
...fixedRequest,
targets: streamQueries,
};
const streamQueries = fixedRequest.targets.filter((q) => q.queryType === LokiQueryType.Stream);
if (config.featureToggles.lokiLive && streamQueries.length > 0 && fixedRequest.rangeRaw?.to === 'now') {
// this is still an in-development feature,
// we do not support mixing stream-queries with normal-queries for now.
const streamRequest = {
...fixedRequest,
targets: streamQueries,
};
return merge(...streamQueries.map((q) => doLokiChannelStream(q, this, streamRequest)));
}
if (fixedRequest.liveStreaming) {
return this.runLiveQueryThroughBackend(fixedRequest);
} else {
return super
.query(fixedRequest)
.pipe(
map((response) =>
transformBackendResult(response, fixedRequest.targets, this.instanceSettings.jsonData.derivedFields ?? [])
)
);
}
return merge(...streamQueries.map((q) => doLokiChannelStream(q, this, streamRequest)));
}
const filteredTargets = request.targets
.filter((target) => target.expr && !target.hide)
.map((target) => {
const expr = this.addAdHocFilters(target.expr);
return {
...target,
expr: this.templateSrv.replace(expr, scopedVars, this.interpolateQueryExpr),
};
});
for (const target of filteredTargets) {
if (target.instant || target.queryType === LokiQueryType.Instant) {
subQueries.push(this.runInstantQuery(target, request, filteredTargets.length));
} else if (
config.featureToggles.lokiLive &&
target.queryType === LokiQueryType.Stream &&
request.rangeRaw?.to === 'now'
) {
subQueries.push(doLokiChannelStream(target, this, request));
} else {
subQueries.push(this.runRangeQuery(target, request));
}
if (fixedRequest.liveStreaming) {
return this.runLiveQueryThroughBackend(fixedRequest);
} else {
return super
.query(fixedRequest)
.pipe(
map((response) =>
transformBackendResult(response, fixedRequest.targets, this.instanceSettings.jsonData.derivedFields ?? [])
)
);
}
// No valid targets, return the empty result to save a round trip.
if (isEmpty(subQueries)) {
return of({
data: [],
state: LoadingState.Done,
});
}
return merge(...subQueries);
}
runLiveQueryThroughBackend(request: DataQueryRequest<LokiQuery>): Observable<DataQueryResponse> {
@ -264,119 +183,6 @@ export class LokiDatasource
return merge(...subQueries);
}
runInstantQuery = (
target: LokiQuery,
options: DataQueryRequest<LokiQuery>,
responseListLength = 1
): Observable<DataQueryResponse> => {
const timeNs = this.getTime(options.range.to, true);
const queryLimit = isMetricsQuery(target.expr) ? options.maxDataPoints : target.maxLines;
const query = {
query: target.expr,
time: `${timeNs + (1e9 - (timeNs % 1e9))}`,
limit: Math.min(queryLimit || Infinity, this.maxLines),
direction: target.direction === LokiQueryDirection.Forward ? 'FORWARD' : 'BACKWARD',
};
/** Used only for results of metrics instant queries */
const meta: QueryResultMeta = {
preferredVisualisationType: 'table',
};
return this._request(INSTANT_QUERY_ENDPOINT, query).pipe(
map((response) => {
if (response.data.data.resultType === LokiResultType.Stream) {
return {
data: response.data
? lokiStreamsToDataFrames(
response.data as LokiStreamResponse,
target,
query.limit,
this.instanceSettings.jsonData
)
: [],
key: `${target.refId}_instant`,
};
}
return {
data: [lokiResultsToTableModel(response.data.data.result, responseListLength, target.refId, meta)],
key: `${target.refId}_instant`,
};
}),
catchError((err) => throwError(() => this.processError(err, target)))
);
};
createRangeQuery(target: LokiQuery, options: RangeQueryOptions, limit: number): LokiRangeQueryRequest {
const query = target.expr;
let range: { start?: number; end?: number; step?: number } = {};
if (options.range) {
const startNs = this.getTime(options.range.from, false);
const endNs = this.getTime(options.range.to, true);
const rangeMs = Math.ceil((endNs - startNs) / 1e6);
const resolution = target.resolution || (DEFAULT_RESOLUTION.value as number);
const adjustedInterval =
this.adjustInterval((options as DataQueryRequest<LokiQuery>).intervalMs || 1000, resolution, rangeMs) / 1000;
// We want to ceil to 3 decimal places
const step = Math.ceil(adjustedInterval * 1000) / 1000;
range = {
start: startNs,
end: endNs,
step,
};
}
return {
...DEFAULT_QUERY_PARAMS,
...range,
query,
limit,
direction: target.direction === LokiQueryDirection.Forward ? 'FORWARD' : 'BACKWARD',
};
}
/**
* Attempts to send a query to /loki/api/v1/query_range
*/
runRangeQuery = (target: LokiQuery, options: RangeQueryOptions): Observable<DataQueryResponse> => {
// For metric query we use maxDataPoints from the request options which should be something like width of the
// visualisation in pixels. In case of logs request we either use lines limit defined in the query target or
// global limit defined for the data source which ever is lower.
let maxDataPoints = isMetricsQuery(target.expr)
? // We fallback to maxLines here because maxDataPoints is defined as possibly undefined. Not sure that can
// actually happen both Dashboards and Explore should send some value here. If not maxLines does not make that
// much sense but nor any other arbitrary value.
(options as DataQueryRequest<LokiQuery>).maxDataPoints || this.maxLines
: // If user wants maxLines 0 we still fallback to data source limit. I think that makes sense as why would anyone
// want to do a query and not see any results?
target.maxLines || this.maxLines;
if ((options as DataQueryRequest<LokiQuery>).liveStreaming) {
return this.runLiveQuery(target, maxDataPoints);
}
const query = this.createRangeQuery(target, options, maxDataPoints);
const headers = target.volumeQuery ? { 'X-Query-Tags': 'Source=logvolhist' } : undefined;
return this._request(RANGE_QUERY_ENDPOINT, query, { headers }).pipe(
catchError((err) => throwError(() => this.processError(err, target))),
switchMap((response) =>
processRangeQueryResponse(
response.data,
target,
query,
maxDataPoints,
this.instanceSettings.jsonData,
(options as DataQueryRequest<LokiQuery>).scopedVars
)
)
);
};
createLiveTarget(target: LokiQuery, maxDataPoints: number): LokiLiveTarget {
const query = target.expr;
const baseUrl = this.instanceSettings.url;
@ -470,14 +276,8 @@ export class LokiDatasource
throw new Error(`invalid metadata request url: ${url}`);
}
if (this.useBackendMode) {
const res = await this.getResource(url, params);
return res.data || [];
} else {
const lokiURL = `${LOKI_ENDPOINT}/${url}`;
const res = await lastValueFrom(this._request(lokiURL, params, { hideFromInspector: true }));
return res.data.data || [];
}
const res = await this.getResource(url, params);
return res.data || [];
}
async metricFindQuery(query: string) {
@ -829,18 +629,6 @@ export class LokiDatasource
return error;
}
adjustInterval(dynamicInterval: number, resolution: number, range: number) {
// Loki will drop queries that might return more than 11000 data points.
// Calibrate interval if it is too small.
let safeInterval = range / 11000;
if (safeInterval > 1) {
safeInterval = Math.ceil(safeInterval);
}
let adjustedInterval = Math.max(resolution * dynamicInterval, safeInterval);
return adjustedInterval;
}
addAdHocFilters(queryExpr: string) {
const adhocFilters = this.templateSrv.getAdhocFilters(this.name);
let expr = queryExpr;

View File

@ -1,146 +1,9 @@
import { CircularDataFrame, FieldCache, FieldType, MutableDataFrame } from '@grafana/data';
import { setTemplateSrv } from '@grafana/runtime';
import { TemplateSrv } from 'app/features/templating/template_srv';
import { CircularDataFrame, FieldType } from '@grafana/data';
import * as ResultTransformer from './result_transformer';
import {
LokiStreamResult,
LokiTailResponse,
LokiStreamResponse,
LokiResultType,
TransformerOptions,
LokiMatrixResult,
} from './types';
const streamResult: LokiStreamResult[] = [
{
stream: {
foo: 'bar',
},
values: [['1579857562021616000', "foo: 'bar'"]],
},
{
stream: {
bar: 'foo',
},
values: [['1579857562031616000', "bar: 'foo'"]],
},
];
const lokiResponse: LokiStreamResponse = {
status: 'success',
data: {
result: streamResult,
resultType: LokiResultType.Stream,
stats: {
summary: {
bytesTotal: 900,
},
},
},
};
jest.mock('@grafana/runtime', () => ({
// @ts-ignore
...jest.requireActual('@grafana/runtime'),
getDataSourceSrv: () => {
return {
getInstanceSettings: () => {
return { name: 'Loki1' };
},
};
},
}));
import { LokiTailResponse } from './types';
describe('loki result transformer', () => {
beforeAll(() => {
setTemplateSrv(new TemplateSrv());
});
afterAll(() => {
jest.restoreAllMocks();
});
afterEach(() => {
jest.clearAllMocks();
});
describe('lokiStreamsToRawDataFrame', () => {
it('converts streams to series', () => {
const data = ResultTransformer.lokiStreamsToRawDataFrame(streamResult);
expect(data.fields[0].values.get(0)).toStrictEqual({ foo: 'bar' });
expect(data.fields[1].values.get(0)).toEqual('2020-01-24T09:19:22.021Z');
expect(data.fields[2].values.get(0)).toEqual(streamResult[0].values[0][1]);
expect(data.fields[3].values.get(0)).toEqual(streamResult[0].values[0][0]);
expect(data.fields[4].values.get(0)).toEqual('4b79cb43-81ce-52f7-b1e9-a207fff144dc');
expect(data.fields[0].values.get(1)).toStrictEqual({ bar: 'foo' });
expect(data.fields[1].values.get(1)).toEqual('2020-01-24T09:19:22.031Z');
expect(data.fields[2].values.get(1)).toEqual(streamResult[1].values[0][1]);
expect(data.fields[3].values.get(1)).toEqual(streamResult[1].values[0][0]);
expect(data.fields[4].values.get(1)).toEqual('73d144f6-57f2-5a45-a49c-eb998e2006b1');
});
it('should always generate unique ids for logs', () => {
const streamResultWithDuplicateLogs: LokiStreamResult[] = [
{
stream: {
foo: 'bar',
},
values: [
['1579857562021616000', 't=2020-02-12T15:04:51+0000 lvl=info msg="Duplicated"'],
['1579857562021616000', 't=2020-02-12T15:04:51+0000 lvl=info msg="Duplicated"'],
['1579857562021616000', 't=2020-02-12T15:04:51+0000 lvl=info msg="Non-duplicated"'],
['1579857562021616000', 't=2020-02-12T15:04:51+0000 lvl=info msg="Duplicated"'],
],
},
{
stream: {
bar: 'foo',
},
values: [['1579857562021617000', 't=2020-02-12T15:04:51+0000 lvl=info msg="Non-dupliicated"']],
},
];
const data = ResultTransformer.lokiStreamsToRawDataFrame(streamResultWithDuplicateLogs);
expect(data.fields[4].values.get(0)).toEqual('b48fe7dc-36aa-5d37-bfba-087ef810d8fa');
expect(data.fields[4].values.get(1)).toEqual('b48fe7dc-36aa-5d37-bfba-087ef810d8fa_1');
expect(data.fields[4].values.get(2)).not.toEqual('b48fe7dc-36aa-5d37-bfba-087ef810d8fa_2');
expect(data.fields[4].values.get(3)).toEqual('b48fe7dc-36aa-5d37-bfba-087ef810d8fa_2');
expect(data.fields[4].values.get(4)).not.toEqual('b48fe7dc-36aa-5d37-bfba-087ef810d8fa_3');
});
it('should append refId to the unique ids if refId is provided', () => {
const data = ResultTransformer.lokiStreamsToRawDataFrame(streamResult, 'B');
expect(data.fields[4].values.get(0)).toEqual('4b79cb43-81ce-52f7-b1e9-a207fff144dc_B');
expect(data.fields[4].values.get(1)).toEqual('73d144f6-57f2-5a45-a49c-eb998e2006b1_B');
});
});
describe('lokiStreamsToDataFrames', () => {
it('should enhance data frames', () => {
jest.spyOn(ResultTransformer, 'enhanceDataFrame');
const dataFrames = ResultTransformer.lokiStreamsToDataFrames(lokiResponse, { refId: 'B', expr: '' }, 500, {
derivedFields: [
{
matcherRegex: 'trace=(w+)',
name: 'test',
url: 'example.com',
},
],
});
expect(ResultTransformer.enhanceDataFrame).toBeCalled();
dataFrames.forEach((frame) => {
expect(
frame.fields.filter((field) => field.name === 'test' && field.type === 'string').length
).toBeGreaterThanOrEqual(1);
});
});
});
describe('appendResponseToBufferedData', () => {
it('should return a dataframe with ts in iso format', () => {
const tailResponse: LokiTailResponse = {
@ -214,104 +77,4 @@ describe('loki result transformer', () => {
expect(data.get(5).id).toEqual('3ca99d6b-3ab5-5970-93c0-eb3c9449088e_1_C');
});
});
describe('createMetricLabel', () => {
it('should create correct label based on passed variables', () => {
const label = ResultTransformer.createMetricLabel({}, {
scopedVars: { testLabel: { selected: true, text: 'label1', value: 'label1' } },
legendFormat: '{{$testLabel}}',
} as unknown as TransformerOptions);
expect(label).toBe('label1');
});
});
describe('lokiResultsToTableModel', () => {
it('should correctly set the type of the label column to be a string', () => {
const lokiResultWithIntLabel = [
{ metric: { test: 1 }, value: [1610367143, 10] },
{ metric: { test: 2 }, value: [1610367144, 20] },
] as unknown as LokiMatrixResult[];
const table = ResultTransformer.lokiResultsToTableModel(lokiResultWithIntLabel, 1, 'A', {});
expect(table.columns[0].type).toBe('time');
expect(table.columns[1].type).toBe('string');
expect(table.columns[2].type).toBe('number');
});
});
});
describe('enhanceDataFrame', () => {
it('adds links to fields', () => {
const df = new MutableDataFrame({ fields: [{ name: 'Line', values: ['nothing', 'trace1=1234', 'trace2=foo'] }] });
ResultTransformer.enhanceDataFrame(df, {
derivedFields: [
{
matcherRegex: 'trace1=(\\w+)',
name: 'trace1',
url: 'http://localhost/${__value.raw}',
},
{
matcherRegex: 'trace2=(\\w+)',
name: 'trace2',
url: 'test',
datasourceUid: 'uid',
},
{
matcherRegex: 'trace2=(\\w+)',
name: 'trace2',
url: 'test',
datasourceUid: 'uid2',
urlDisplayLabel: 'Custom Label',
},
],
});
expect(df.fields.length).toBe(3);
const fc = new FieldCache(df);
expect(fc.getFieldByName('trace1')!.values.toArray()).toEqual([null, '1234', null]);
expect(fc.getFieldByName('trace1')!.config.links![0]).toEqual({
url: 'http://localhost/${__value.raw}',
title: '',
});
expect(fc.getFieldByName('trace2')!.values.toArray()).toEqual([null, null, 'foo']);
expect(fc.getFieldByName('trace2')!.config.links!.length).toBe(2);
expect(fc.getFieldByName('trace2')!.config.links![0]).toEqual({
title: '',
internal: { datasourceName: 'Loki1', datasourceUid: 'uid', query: { query: 'test' } },
url: '',
});
expect(fc.getFieldByName('trace2')!.config.links![1]).toEqual({
title: 'Custom Label',
internal: { datasourceName: 'Loki1', datasourceUid: 'uid2', query: { query: 'test' } },
url: '',
});
});
describe('lokiPointsToTimeseriesPoints()', () => {
/**
* NOTE on time parameters:
* - Input time series data has timestamps in sec (like Prometheus)
* - Output time series has timestamps in ms (as expected for the chart lib)
*/
const data: Array<[number, string]> = [
[1, '1'],
[2, '0'],
[4, '1'],
[7, 'NaN'],
[8, '+Inf'],
[9, '-Inf'],
];
it('returns data as is if step, start, and end align', () => {
const result = ResultTransformer.lokiPointsToTimeseriesPoints(data);
expect(result).toEqual([
[1, 1000],
[0, 2000],
[1, 4000],
[NaN, 7000],
[Infinity, 8000],
[-Infinity, 9000],
]);
});
});
});

View File

@ -1,108 +1,11 @@
import { capitalize, groupBy, isEmpty } from 'lodash';
import { of } from 'rxjs';
import { v5 as uuidv5 } from 'uuid';
import {
FieldType,
TimeSeries,
Labels,
DataFrame,
ArrayVector,
MutableDataFrame,
findUniqueLabels,
DataFrameView,
DataLink,
Field,
QueryResultMetaStat,
QueryResultMeta,
TimeSeriesValue,
ScopedVars,
toDataFrame,
} from '@grafana/data';
import { getTemplateSrv, getDataSourceSrv } from '@grafana/runtime';
import TableModel from 'app/core/table_model';
import { FieldType, Labels, MutableDataFrame, findUniqueLabels } from '@grafana/data';
import { renderLegendFormat } from '../prometheus/legend';
import { formatQuery, getHighlighterExpressionsFromQuery } from './query_utils';
import { dataFrameHasLokiError } from './responseUtils';
import {
LokiRangeQueryRequest,
LokiResponse,
LokiMatrixResult,
LokiVectorResult,
TransformerOptions,
LokiResultType,
LokiStreamResult,
LokiTailResponse,
LokiQuery,
LokiOptions,
DerivedFieldConfig,
LokiStreamResponse,
LokiStats,
} from './types';
import { LokiStreamResult, LokiTailResponse } from './types';
const UUID_NAMESPACE = '6ec946da-0f49-47a8-983a-1d76d17e7c92';
/**
* Transforms LokiStreamResult structure into a dataFrame. Used when doing standard queries
*/
export function lokiStreamsToRawDataFrame(streams: LokiStreamResult[], refId?: string): DataFrame {
const labels = new ArrayVector<{}>([]);
const times = new ArrayVector<string>([]);
const timesNs = new ArrayVector<string>([]);
const lines = new ArrayVector<string>([]);
const uids = new ArrayVector<string>([]);
// We need to store and track all used uids to ensure that uids are unique
const usedUids: { string?: number } = {};
for (const stream of streams) {
const streamLabels: Labels = stream.stream;
const labelsString = Object.entries(streamLabels)
.map(([key, val]) => `${key}="${val}"`)
.sort()
.join('');
for (const [ts, line] of stream.values) {
labels.add(streamLabels);
// num ns epoch in string, we convert it to iso string here so it matches old format
times.add(new Date(parseInt(ts.slice(0, -6), 10)).toISOString());
timesNs.add(ts);
lines.add(line);
uids.add(createUid(ts, labelsString, line, usedUids, refId));
}
}
return constructDataFrame(times, timesNs, lines, uids, labels, refId);
}
/**
* Constructs dataFrame with supplied fields and other data.
*/
function constructDataFrame(
times: ArrayVector<string>,
timesNs: ArrayVector<string>,
lines: ArrayVector<string>,
uids: ArrayVector<string>,
labels: ArrayVector<{}>,
refId?: string
) {
const dataFrame = {
refId,
fields: [
{ name: 'labels', type: FieldType.other, config: {}, values: labels },
{ name: 'Time', type: FieldType.time, config: {}, values: times }, // Time
{ name: 'Line', type: FieldType.string, config: {}, values: lines }, // Line - needs to be the first field with string type
{ name: 'tsNs', type: FieldType.time, config: {}, values: timesNs }, // Time
{ name: 'id', type: FieldType.string, config: {}, values: uids },
],
length: times.length,
};
return dataFrame;
}
/**
* Transform LokiResponse data and appends it to MutableDataFrame. Used for streaming where the dataFrame can be
* a CircularDataFrame creating a fixed size rolling buffer.
@ -179,350 +82,3 @@ function createUid(ts: string, labelsString: string, line: string, usedUids: any
}
return id;
}
function lokiMatrixToTimeSeries(matrixResult: LokiMatrixResult, options: TransformerOptions): TimeSeries {
const name = createMetricLabel(matrixResult.metric, options);
return {
target: name,
title: name,
datapoints: lokiPointsToTimeseriesPoints(matrixResult.values),
tags: matrixResult.metric,
meta: options.meta,
refId: options.refId,
};
}
function parsePrometheusFormatSampleValue(value: string): number {
switch (value) {
case '+Inf':
return Number.POSITIVE_INFINITY;
case '-Inf':
return Number.NEGATIVE_INFINITY;
default:
return parseFloat(value);
}
}
export function lokiPointsToTimeseriesPoints(data: Array<[number, string]>): TimeSeriesValue[][] {
const datapoints: TimeSeriesValue[][] = [];
for (const [time, value] of data) {
let datapointValue: TimeSeriesValue = parsePrometheusFormatSampleValue(value);
const timestamp = time * 1000;
datapoints.push([datapointValue, timestamp]);
}
return datapoints;
}
export function lokiResultsToTableModel(
lokiResults: Array<LokiMatrixResult | LokiVectorResult>,
resultCount: number,
refId: string,
meta: QueryResultMeta
): TableModel {
if (!lokiResults || lokiResults.length === 0) {
return new TableModel();
}
// Collect all labels across all metrics
const metricLabels: Set<string> = new Set<string>(
lokiResults.reduce((acc, cur) => acc.concat(Object.keys(cur.metric)), [] as string[])
);
// Sort metric labels, create columns for them and record their index
const sortedLabels = [...metricLabels.values()].sort();
const table = new TableModel();
table.refId = refId;
table.meta = meta;
table.columns = [
{ text: 'Time', type: FieldType.time },
...sortedLabels.map((label) => ({ text: label, filterable: true, type: FieldType.string })),
{ text: `Value #${refId}`, type: FieldType.number },
];
// Populate rows, set value to empty string when label not present.
lokiResults.forEach((series) => {
const newSeries: LokiMatrixResult = {
metric: series.metric,
values: (series as LokiVectorResult).value
? [(series as LokiVectorResult).value]
: (series as LokiMatrixResult).values,
};
if (!newSeries.values) {
return;
}
if (!newSeries.metric) {
table.rows.concat(newSeries.values.map(([a, b]) => [a * 1000, parseFloat(b)]));
} else {
table.rows.push(
...newSeries.values.map(([a, b]) => [
a * 1000,
...sortedLabels.map((label) => newSeries.metric[label] || ''),
parseFloat(b),
])
);
}
});
return table;
}
export function createMetricLabel(labelData: { [key: string]: string }, options?: TransformerOptions) {
let label =
options === undefined || isEmpty(options.legendFormat)
? getOriginalMetricName(labelData)
: renderLegendFormat(getTemplateSrv().replace(options.legendFormat ?? '', options.scopedVars), labelData);
if (!label && options) {
label = options.query;
}
return label;
}
function getOriginalMetricName(labelData: { [key: string]: string }) {
const labelPart = Object.entries(labelData)
.map((label) => `${label[0]}="${label[1]}"`)
.join(',');
return `{${labelPart}}`;
}
export function decamelize(s: string): string {
return s.replace(/[A-Z]/g, (m) => ` ${m.toLowerCase()}`);
}
// Turn loki stats { metric: value } into meta stat { title: metric, value: value }
function lokiStatsToMetaStat(stats: LokiStats | undefined): QueryResultMetaStat[] {
const result: QueryResultMetaStat[] = [];
if (!stats) {
return result;
}
for (const section in stats) {
const values = stats[section];
for (const label in values) {
const value = values[label];
let unit;
if (/time/i.test(label) && value) {
unit = 's';
} else if (/bytes.*persecond/i.test(label)) {
unit = 'Bps';
} else if (/bytes/i.test(label)) {
unit = 'decbytes';
}
const title = `${capitalize(section)}: ${decamelize(label)}`;
result.push({ displayName: title, value, unit });
}
}
return result;
}
export function lokiStreamsToDataFrames(
response: LokiStreamResponse,
target: LokiQuery,
limit: number,
config: LokiOptions
): DataFrame[] {
const data = limit > 0 ? response.data.result : [];
const stats: QueryResultMetaStat[] = lokiStatsToMetaStat(response.data.stats);
// Use custom mechanism to identify which stat we want to promote to label
const custom = {
lokiQueryStatKey: 'Summary: total bytes processed',
// TODO: when we get a real frame-type in @grafana/data
// move this to frame.meta.type
frameType: 'LabeledTimeValues',
};
const meta: QueryResultMeta = {
searchWords: getHighlighterExpressionsFromQuery(formatQuery(target.expr)),
limit,
stats,
custom,
preferredVisualisationType: 'logs',
};
const dataFrame = lokiStreamsToRawDataFrame(data, target.refId);
enhanceDataFrame(dataFrame, config);
if (meta.custom && dataFrameHasLokiError(dataFrame)) {
meta.custom.error = 'Error when parsing some of the logs';
}
if (stats.length && !data.length) {
return [
{
fields: [],
length: 0,
refId: target.refId,
meta,
},
];
}
return [
{
...dataFrame,
refId: target.refId,
meta,
},
];
}
/**
* Adds new fields and DataLinks to DataFrame based on DataSource instance config.
*/
export const enhanceDataFrame = (dataFrame: DataFrame, config: LokiOptions | null): void => {
if (!config) {
return;
}
const derivedFields = config.derivedFields ?? [];
if (!derivedFields.length) {
return;
}
const derivedFieldsGrouped = groupBy(derivedFields, 'name');
const newFields = Object.values(derivedFieldsGrouped).map(fieldFromDerivedFieldConfig);
const view = new DataFrameView(dataFrame);
view.forEach((row: { Line: string }) => {
for (const field of newFields) {
const logMatch = row.Line.match(derivedFieldsGrouped[field.name][0].matcherRegex);
field.values.add(logMatch && logMatch[1]);
}
});
dataFrame.fields = [...dataFrame.fields, ...newFields];
};
/**
* Transform derivedField config into dataframe field with config that contains link.
*/
function fieldFromDerivedFieldConfig(derivedFieldConfigs: DerivedFieldConfig[]): Field<any, ArrayVector> {
const dataSourceSrv = getDataSourceSrv();
const dataLinks = derivedFieldConfigs.reduce((acc, derivedFieldConfig) => {
// Having field.datasourceUid means it is an internal link.
if (derivedFieldConfig.datasourceUid) {
const dsSettings = dataSourceSrv.getInstanceSettings(derivedFieldConfig.datasourceUid);
acc.push({
// Will be filled out later
title: derivedFieldConfig.urlDisplayLabel || '',
url: '',
// This is hardcoded for Jaeger or Zipkin not way right now to specify datasource specific query object
internal: {
query: { query: derivedFieldConfig.url },
datasourceUid: derivedFieldConfig.datasourceUid,
datasourceName: dsSettings?.name ?? 'Data source not found',
},
});
} else if (derivedFieldConfig.url) {
acc.push({
// We do not know what title to give here so we count on presentation layer to create a title from metadata.
title: derivedFieldConfig.urlDisplayLabel || '',
// This is hardcoded for Jaeger or Zipkin not way right now to specify datasource specific query object
url: derivedFieldConfig.url,
});
}
return acc;
}, [] as DataLink[]);
return {
name: derivedFieldConfigs[0].name,
type: FieldType.string,
config: {
links: dataLinks,
},
// We are adding values later on
values: new ArrayVector<string>([]),
};
}
function rangeQueryResponseToTimeSeries(
response: LokiResponse,
query: LokiRangeQueryRequest,
target: LokiQuery,
scopedVars: ScopedVars
): TimeSeries[] {
/** Show results of Loki metric queries only in graph */
const meta: QueryResultMeta = {
preferredVisualisationType: 'graph',
};
const transformerOptions: TransformerOptions = {
legendFormat: target.legendFormat ?? '',
query: query.query,
refId: target.refId,
meta,
scopedVars,
};
switch (response.data.resultType) {
case LokiResultType.Vector:
return response.data.result.map((vecResult) =>
lokiMatrixToTimeSeries({ metric: vecResult.metric, values: [vecResult.value] }, transformerOptions)
);
case LokiResultType.Matrix:
return response.data.result.map((matrixResult) => lokiMatrixToTimeSeries(matrixResult, transformerOptions));
default:
return [];
}
}
export function rangeQueryResponseToDataFrames(
response: LokiResponse,
query: LokiRangeQueryRequest,
target: LokiQuery,
scopedVars: ScopedVars
): DataFrame[] {
const series = rangeQueryResponseToTimeSeries(response, query, target, scopedVars);
const frames = series.map((s) => toDataFrame(s));
const { step } = query;
if (step != null) {
const intervalMs = step * 1000;
frames.forEach((frame) => {
frame.fields.forEach((field) => {
if (field.type === FieldType.time) {
field.config.interval = intervalMs;
}
});
});
}
return frames;
}
export function processRangeQueryResponse(
response: LokiResponse,
target: LokiQuery,
query: LokiRangeQueryRequest,
limit: number,
config: LokiOptions,
scopedVars: ScopedVars
) {
switch (response.data.resultType) {
case LokiResultType.Stream:
return of({
data: lokiStreamsToDataFrames(response as LokiStreamResponse, target, limit, config),
key: `${target.refId}_log`,
});
case LokiResultType.Vector:
case LokiResultType.Matrix:
return of({
data: rangeQueryResponseToDataFrames(response, query, target, scopedVars),
key: target.refId,
});
default:
throw new Error(`Unknown result type "${(response.data as any).resultType}".`);
}
}