Loki: add backend-forward mode to queries, update log-row-context (#47726)

* loki: add helper function to sort dataframe by time

* loki: add direction-attribute to queries

* loki: make log-row-context code backward-compatible

* better comment

Co-authored-by: Ivana Huckova <30407135+ivanahuckova@users.noreply.github.com>

* fixed test

* simplified code

Co-authored-by: Ivana Huckova <30407135+ivanahuckova@users.noreply.github.com>
This commit is contained in:
Gábor Farkas 2022-04-20 13:52:15 +02:00 committed by GitHub
parent e8fc6637ec
commit 79c06fdddc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 250 additions and 72 deletions

View File

@ -29,6 +29,8 @@ func makeRequest(ctx context.Context, lokiDsUrl string, query lokiQuery) (*http.
qs := url.Values{}
qs.Set("query", query.Expr)
qs.Set("direction", string(query.Direction))
// MaxLines defaults to zero when not received,
// and Loki does not like limit=0, even when it is not needed
// (for example for metric queries), so we

View File

@ -22,9 +22,9 @@ import (
// but i wanted to test for all of them, to be sure.
func TestSuccessResponse(t *testing.T) {
matrixQuery := lokiQuery{Expr: "up(ALERTS)", Step: time.Second * 42, QueryType: QueryTypeRange}
vectorQuery := lokiQuery{Expr: "query1", QueryType: QueryTypeInstant}
streamsQuery := lokiQuery{Expr: "query1", QueryType: QueryTypeRange}
matrixQuery := lokiQuery{Expr: "up(ALERTS)", Step: time.Second * 42, QueryType: QueryTypeRange, Direction: DirectionBackward}
vectorQuery := lokiQuery{Expr: "query1", QueryType: QueryTypeInstant, Direction: DirectionBackward}
streamsQuery := lokiQuery{Expr: "query1", QueryType: QueryTypeRange, Direction: DirectionBackward}
tt := []struct {
name string
@ -119,7 +119,7 @@ func TestErrorResponse(t *testing.T) {
for _, test := range tt {
t.Run(test.name, func(t *testing.T) {
frames, err := runQuery(context.Background(), makeMockedAPI(400, test.contentType, test.body), &lokiQuery{QueryType: QueryTypeRange})
frames, err := runQuery(context.Background(), makeMockedAPI(400, test.contentType, test.body), &lokiQuery{QueryType: QueryTypeRange, Direction: DirectionBackward})
require.Len(t, frames, 0)
require.Error(t, err)

View File

@ -53,6 +53,7 @@ type datasourceInfo struct {
type QueryJSONModel struct {
QueryType string `json:"queryType"`
Expr string `json:"expr"`
Direction string `json:"direction"`
LegendFormat string `json:"legendFormat"`
Interval string `json:"interval"`
IntervalMS int `json:"intervalMS"`

View File

@ -67,6 +67,21 @@ func parseQueryType(jsonValue string) (QueryType, error) {
}
}
func parseDirection(jsonValue string) (Direction, error) {
switch jsonValue {
case "backward":
return DirectionBackward, nil
case "forward":
return DirectionForward, nil
case "":
// there are older queries stored in alerting that did not have queryDirection,
// we default to "backward"
return DirectionBackward, nil
default:
return DirectionBackward, fmt.Errorf("invalid queryDirection: %s", jsonValue)
}
}
func parseQuery(queryContext *backend.QueryDataRequest) ([]*lokiQuery, error) {
qs := []*lokiQuery{}
for _, query := range queryContext.Queries {
@ -95,9 +110,15 @@ func parseQuery(queryContext *backend.QueryDataRequest) ([]*lokiQuery, error) {
return nil, err
}
direction, err := parseDirection(model.Direction)
if err != nil {
return nil, err
}
qs = append(qs, &lokiQuery{
Expr: expr,
QueryType: queryType,
Direction: direction,
Step: step,
MaxLines: model.MaxLines,
LegendFormat: model.LegendFormat,

View File

@ -9,9 +9,17 @@ const (
QueryTypeInstant QueryType = "instant"
)
type Direction string
const (
DirectionBackward Direction = "backward"
DirectionForward Direction = "forward"
)
type lokiQuery struct {
Expr string
QueryType QueryType
Direction Direction
Step time.Duration
MaxLines int
LegendFormat string

View File

@ -943,9 +943,9 @@ describe('LokiDatasource', () => {
dataFrame: new MutableDataFrame({
fields: [
{
name: 'tsNs',
type: FieldType.string,
values: ['0'],
name: 'ts',
type: FieldType.time,
values: [0],
},
],
}),
@ -957,8 +957,8 @@ describe('LokiDatasource', () => {
jest.spyOn(ds.languageProvider, 'getLabelKeys').mockImplementation(() => ['bar']);
const contextQuery = ds.prepareLogRowContextQueryTarget(row, 10, 'BACKWARD');
expect(contextQuery.expr).toContain('baz');
expect(contextQuery.expr).not.toContain('uniqueParsedLabel');
expect(contextQuery.query.expr).toContain('baz');
expect(contextQuery.query.expr).not.toContain('uniqueParsedLabel');
});
});

View File

@ -33,24 +33,21 @@ import {
ScopedVars,
TimeRange,
rangeUtil,
toUtc,
} from '@grafana/data';
import { BackendSrvRequest, FetchError, getBackendSrv, config, DataSourceWithBackend } from '@grafana/runtime';
import { getTemplateSrv, TemplateSrv } from 'app/features/templating/template_srv';
import { addLabelToQuery } from './add_label_to_query';
import { getTimeSrv, TimeSrv } from 'app/features/dashboard/services/TimeSrv';
import { convertToWebSocketUrl } from 'app/core/utils/explore';
import {
lokiResultsToTableModel,
lokiStreamsToDataFrames,
lokiStreamsToRawDataFrame,
processRangeQueryResponse,
} from './result_transformer';
import { lokiResultsToTableModel, lokiStreamsToDataFrames, processRangeQueryResponse } from './result_transformer';
import { transformBackendResult } from './backendResultTransformer';
import { addParsedLabelToQuery, getNormalizedLokiQuery, queryHasPipeParser } from './query_utils';
import {
LokiOptions,
LokiQuery,
LokiQueryDirection,
LokiQueryType,
LokiRangeQueryRequest,
LokiResultType,
@ -65,6 +62,7 @@ import { DEFAULT_RESOLUTION } from './components/LokiOptionFields';
import { queryLogsVolume } from 'app/core/logs_model';
import { doLokiChannelStream } from './streaming';
import { renderLegendFormat } from '../prometheus/legend';
import { sortDataFrameByTime } from './sortDataFrame';
export type RangeQueryOptions = DataQueryRequest<LokiQuery> | AnnotationQueryRequest<LokiQuery>;
export const DEFAULT_MAX_LINES = 1000;
@ -75,7 +73,6 @@ const RANGE_QUERY_ENDPOINT = `${LOKI_ENDPOINT}/query_range`;
const INSTANT_QUERY_ENDPOINT = `${LOKI_ENDPOINT}/query`;
const DEFAULT_QUERY_PARAMS: Partial<LokiRangeQueryRequest> = {
direction: 'BACKWARD',
limit: DEFAULT_MAX_LINES,
query: '',
};
@ -252,6 +249,7 @@ export class LokiDatasource
query: target.expr,
time: `${timeNs + (1e9 - (timeNs % 1e9))}`,
limit: Math.min(queryLimit || Infinity, this.maxLines),
direction: target.direction === LokiQueryDirection.Forward ? 'FORWARD' : 'BACKWARD',
};
/** Used only for results of metrics instant queries */
@ -311,6 +309,7 @@ export class LokiDatasource
...range,
query,
limit,
direction: target.direction === LokiQueryDirection.Forward ? 'FORWARD' : 'BACKWARD',
};
}
@ -554,15 +553,29 @@ export class LokiDatasource
}
getLogRowContext = (row: LogRowModel, options?: RowContextOptions): Promise<{ data: DataFrame[] }> => {
const target = this.prepareLogRowContextQueryTarget(
row,
(options && options.limit) || 10,
(options && options.direction) || 'BACKWARD'
);
const direction = (options && options.direction) || 'BACKWARD';
const limit = (options && options.limit) || 10;
const { query, range } = this.prepareLogRowContextQueryTarget(row, limit, direction);
const sortResults = (result: DataQueryResponse): DataQueryResponse => {
return {
...result,
data: result.data.map((frame: DataFrame) => {
const timestampFieldIndex = frame.fields.findIndex((field) => field.type === FieldType.time);
if (timestampFieldIndex === -1) {
return frame;
}
return sortDataFrameByTime(frame, 'DESCENDING');
}),
};
};
// this can only be called from explore currently
const app = CoreApp.Explore;
const reverse = options && options.direction === 'FORWARD';
return lastValueFrom(
this._request(RANGE_QUERY_ENDPOINT, target).pipe(
this.query(makeRequest(query, range, app, `log-row-context-query-${direction}`)).pipe(
catchError((err) => {
const error: DataQueryError = {
message: 'Error during context query. Please check JS console logs.',
@ -571,18 +584,18 @@ export class LokiDatasource
};
throw error;
}),
switchMap((res) =>
of({
data: res.data ? [lokiStreamsToRawDataFrame(res.data.data.result, reverse)] : [],
})
)
switchMap((res) => of(sortResults(res)))
)
);
};
prepareLogRowContextQueryTarget = (row: LogRowModel, limit: number, direction: 'BACKWARD' | 'FORWARD') => {
prepareLogRowContextQueryTarget = (
row: LogRowModel,
limit: number,
direction: 'BACKWARD' | 'FORWARD'
): { query: LokiQuery; range: TimeRange } => {
const labels = this.languageProvider.getLabelKeys();
const query = Object.keys(row.labels)
const expr = Object.keys(row.labels)
.map((label: string) => {
if (labels.includes(label)) {
// escape backslashes in label as users can't escape them by themselves
@ -595,36 +608,49 @@ export class LokiDatasource
.join(',');
const contextTimeBuffer = 2 * 60 * 60 * 1000; // 2h buffer
const commonTargetOptions = {
limit,
query: `{${query}}`,
expr: `{${query}}`,
direction,
const queryDirection = direction === 'FORWARD' ? LokiQueryDirection.Forward : LokiQueryDirection.Backward;
const query: LokiQuery = {
expr: `{${expr}}`,
queryType: LokiQueryType.Range,
refId: '',
maxLines: limit,
direction: queryDirection,
};
const fieldCache = new FieldCache(row.dataFrame);
const nsField = fieldCache.getFieldByName('tsNs')!;
const nsTimestamp = nsField.values.get(row.rowIndex);
if (direction === 'BACKWARD') {
return {
...commonTargetOptions,
// convert to ns, we loose some precision here but it is not that important at the far points of the context
start: row.timeEpochMs - contextTimeBuffer + '000000',
end: nsTimestamp,
direction,
};
} else {
return {
...commonTargetOptions,
// start param in Loki API is inclusive so we'll have to filter out the row that this request is based from
// and any other that were logged in the same ns but before the row. Right now these rows will be lost
// because the are before but came it he response that should return only rows after.
start: nsTimestamp,
// convert to ns, we loose some precision here but it is not that important at the far points of the context
end: row.timeEpochMs + contextTimeBuffer + '000000',
};
const tsField = fieldCache.getFirstFieldOfType(FieldType.time);
if (tsField === undefined) {
throw new Error('loki: dataframe missing time-field, should never happen');
}
const tsValue = tsField.values.get(row.rowIndex);
const timestamp = toUtc(tsValue);
const range =
queryDirection === LokiQueryDirection.Forward
? {
// start param in Loki API is inclusive so we'll have to filter out the row that this request is based from
// and any other that were logged in the same ns but before the row. Right now these rows will be lost
// because the are before but came it he response that should return only rows after.
from: timestamp,
// convert to ns, we loose some precision here but it is not that important at the far points of the context
to: toUtc(row.timeEpochMs + contextTimeBuffer),
}
: {
// convert to ns, we loose some precision here but it is not that important at the far points of the context
from: toUtc(row.timeEpochMs - contextTimeBuffer),
to: timestamp,
};
return {
query,
range: {
from: range.from,
to: range.to,
raw: range,
},
};
};
testDatasource() {

View File

@ -112,7 +112,7 @@ describe('loki result transformer', () => {
});
it('should append refId to the unique ids if refId is provided', () => {
const data = ResultTransformer.lokiStreamsToRawDataFrame(streamResult, false, 'B');
const data = ResultTransformer.lokiStreamsToRawDataFrame(streamResult, 'B');
expect(data.fields[4].values.get(0)).toEqual('4b79cb43-81ce-52f7-b1e9-a207fff144dc_B');
expect(data.fields[4].values.get(1)).toEqual('73d144f6-57f2-5a45-a49c-eb998e2006b1_B');
});
@ -121,7 +121,7 @@ describe('loki result transformer', () => {
describe('lokiStreamsToDataFrames', () => {
it('should enhance data frames', () => {
jest.spyOn(ResultTransformer, 'enhanceDataFrame');
const dataFrames = ResultTransformer.lokiStreamsToDataFrames(lokiResponse, { refId: 'B' }, 500, {
const dataFrames = ResultTransformer.lokiStreamsToDataFrames(lokiResponse, { refId: 'B', expr: '' }, 500, {
derivedFields: [
{
matcherRegex: 'trace=(w+)',

View File

@ -45,7 +45,7 @@ const UUID_NAMESPACE = '6ec946da-0f49-47a8-983a-1d76d17e7c92';
/**
* Transforms LokiStreamResult structure into a dataFrame. Used when doing standard queries
*/
export function lokiStreamsToRawDataFrame(streams: LokiStreamResult[], reverse?: boolean, refId?: string): DataFrame {
export function lokiStreamsToRawDataFrame(streams: LokiStreamResult[], refId?: string): DataFrame {
const labels = new ArrayVector<{}>([]);
const times = new ArrayVector<string>([]);
const timesNs = new ArrayVector<string>([]);
@ -72,11 +72,11 @@ export function lokiStreamsToRawDataFrame(streams: LokiStreamResult[], reverse?:
}
}
return constructDataFrame(times, timesNs, lines, uids, labels, reverse, refId);
return constructDataFrame(times, timesNs, lines, uids, labels, refId);
}
/**
* Constructs dataFrame with supplied fields and other data. Also makes sure it is properly reversed if needed.
* Constructs dataFrame with supplied fields and other data.
*/
function constructDataFrame(
times: ArrayVector<string>,
@ -84,7 +84,6 @@ function constructDataFrame(
lines: ArrayVector<string>,
uids: ArrayVector<string>,
labels: ArrayVector<{}>,
reverse?: boolean,
refId?: string
) {
const dataFrame = {
@ -99,12 +98,6 @@ function constructDataFrame(
length: times.length,
};
if (reverse) {
const mutableDataFrame = new MutableDataFrame(dataFrame);
mutableDataFrame.reverse();
return mutableDataFrame;
}
return dataFrame;
}
@ -330,10 +323,9 @@ function lokiStatsToMetaStat(stats: LokiStats | undefined): QueryResultMetaStat[
export function lokiStreamsToDataFrames(
response: LokiStreamResponse,
target: { refId: string; expr?: string },
target: LokiQuery,
limit: number,
config: LokiOptions,
reverse = false
config: LokiOptions
): DataFrame[] {
const data = limit > 0 ? response.data.result : [];
const stats: QueryResultMetaStat[] = lokiStatsToMetaStat(response.data.stats);
@ -350,7 +342,7 @@ export function lokiStreamsToDataFrames(
preferredVisualisationType: 'logs',
};
const dataFrame = lokiStreamsToRawDataFrame(data, reverse, target.refId);
const dataFrame = lokiStreamsToRawDataFrame(data, target.refId);
enhanceDataFrame(dataFrame, config);
if (meta.custom && dataFrame.fields.some((f) => f.labels && Object.keys(f.labels).some((l) => l === '__error__'))) {

View File

@ -0,0 +1,52 @@
import { ArrayVector, DataFrame, FieldType } from '@grafana/data';
import { sortDataFrameByTime } from './sortDataFrame';
const inputFrame: DataFrame = {
refId: 'A',
fields: [
{
name: 'time',
type: FieldType.time,
config: {},
values: new ArrayVector([1005, 1001, 1004, 1002, 1003]),
},
{
name: 'value',
type: FieldType.string,
config: {},
values: new ArrayVector(['line5', 'line1', 'line4', 'line2', 'line3']),
},
{
name: 'tsNs',
type: FieldType.time,
config: {},
values: new ArrayVector([`1005000000`, `1001000000`, `1004000000`, `1002000000`, `1003000000`]),
},
],
length: 5,
};
describe('loki sortDataFrame', () => {
it('sorts a dataframe ascending', () => {
const sortedFrame = sortDataFrameByTime(inputFrame, 'ASCENDING');
expect(sortedFrame.length).toBe(5);
const timeValues = sortedFrame.fields[0].values.toArray();
const lineValues = sortedFrame.fields[1].values.toArray();
const tsNsValues = sortedFrame.fields[2].values.toArray();
expect(timeValues).toStrictEqual([1001, 1002, 1003, 1004, 1005]);
expect(lineValues).toStrictEqual(['line1', 'line2', 'line3', 'line4', 'line5']);
expect(tsNsValues).toStrictEqual([`1001000000`, `1002000000`, `1003000000`, `1004000000`, `1005000000`]);
});
it('sorts a dataframe descending', () => {
const sortedFrame = sortDataFrameByTime(inputFrame, 'DESCENDING');
expect(sortedFrame.length).toBe(5);
const timeValues = sortedFrame.fields[0].values.toArray();
const lineValues = sortedFrame.fields[1].values.toArray();
const tsNsValues = sortedFrame.fields[2].values.toArray();
expect(timeValues).toStrictEqual([1005, 1004, 1003, 1002, 1001]);
expect(lineValues).toStrictEqual(['line5', 'line4', 'line3', 'line2', 'line1']);
expect(tsNsValues).toStrictEqual([`1005000000`, `1004000000`, `1003000000`, `1002000000`, `1001000000`]);
});
});

View File

@ -0,0 +1,70 @@
import { DataFrame, Field, SortedVector } from '@grafana/data';
type SortDirection = 'ASCENDING' | 'DESCENDING';
// creates the `index` for the sorting.
// this is needed by the `SortedVector`.
// the index is an array of numbers, and it defines an order.
// at every slot in the index the values is the position of
// the sorted item.
// for example, an index of [3,1,2] means that
// in the dataframe, that has 3 rows, after sorting:
// - the third row will become the first
// - the first row will become the second
// - the second row will become the third
function makeIndex(field: Field<string>, dir: SortDirection): number[] {
const fieldValues: string[] = field.values.toArray();
// we first build an array which is [0,1,2,3....]
const index = Array(fieldValues.length);
for (let i = 0; i < index.length; i++) {
index[i] = i;
}
const isAsc = dir === 'ASCENDING';
index.sort((a: number, b: number): number => {
// we need to answer this question:
// in the field-used-for-sorting, how would we compare value-at-index-a to value-at-index-b?
const valA = fieldValues[a];
const valB = fieldValues[b];
if (valA < valB) {
return isAsc ? -1 : 1;
}
if (valA > valB) {
return isAsc ? 1 : -1;
}
return 0;
});
return index;
}
// sort a dataframe that is in the Loki format ascending or descending,
// based on the nanosecond-timestamp
export function sortDataFrameByTime(frame: DataFrame, dir: SortDirection): DataFrame {
const { fields, ...rest } = frame;
// we use the approach used in @grafana/data/sortDataframe.
// we cannot use it directly, because our tsNs field has a type=time,
// so we have to build the `index` manually.
const tsNsField = fields.find((field) => field.name === 'tsNs');
if (tsNsField === undefined) {
throw new Error('missing nanosecond-timestamp field. should never happen');
}
const index = makeIndex(tsNsField, dir);
return {
...rest,
fields: fields.map((field) => ({
...field,
values: new SortedVector(field.values, index),
})),
};
return frame;
}

View File

@ -29,9 +29,15 @@ export enum LokiQueryType {
Stream = 'stream',
}
export enum LokiQueryDirection {
Backward = 'backward',
Forward = 'forward',
}
export interface LokiQuery extends DataQuery {
queryType?: LokiQueryType;
expr: string;
direction?: LokiQueryDirection;
legendFormat?: string;
maxLines?: number;
resolution?: number;