Elasticsearch: Run log context queries through backend (#65805)

* Elasticsearch: Run context queries trough backend

* Fix typing

* Update

* Add possibility to run context query torugh backend and frontend

* Correctly sort
This commit is contained in:
Ivana Huckova 2023-04-05 18:32:02 +02:00 committed by GitHub
parent 7e312a6aa6
commit abc11d1dcf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 281 additions and 66 deletions

View File

@ -73,10 +73,21 @@ func (b *SearchRequestBuilder) Size(size int) *SearchRequestBuilder {
return b
}
// SortDesc adds a sort to the search request
func (b *SearchRequestBuilder) SortDesc(field, unmappedType string) *SearchRequestBuilder {
type SortOrder string
const (
SortOrderAsc SortOrder = "asc"
SortOrderDesc SortOrder = "desc"
)
// Sort adds a "asc" | "desc" sort to the search request
func (b *SearchRequestBuilder) Sort(order SortOrder, field string, unmappedType string) *SearchRequestBuilder {
if order != SortOrderAsc && order != SortOrderDesc {
return b
}
props := map[string]string{
"order": "desc",
"order": string(order),
}
if unmappedType != "" {
@ -110,6 +121,16 @@ func (b *SearchRequestBuilder) AddHighlight() *SearchRequestBuilder {
return b
}
func (b *SearchRequestBuilder) AddSearchAfter(value interface{}) *SearchRequestBuilder {
if b.customProps["search_after"] == nil {
b.customProps["search_after"] = []interface{}{value}
} else {
b.customProps["search_after"] = append(b.customProps["search_after"].([]interface{}), value)
}
return b
}
// Query creates and return a query builder
func (b *SearchRequestBuilder) Query() *QueryBuilder {
if b.queryBuilder == nil {

View File

@ -45,7 +45,7 @@ func TestSearchRequest(t *testing.T) {
t.Run("When adding size, sort, filters", func(t *testing.T) {
b := setup()
b.Size(200)
b.SortDesc(timeField, "boolean")
b.Sort(SortOrderDesc, timeField, "boolean")
filters := b.Query().Bool().Filter()
filters.AddDateRangeFilter(timeField, 10, 5, DateFormatEpochMS)
filters.AddQueryStringFilter("test", true)

View File

@ -317,12 +317,24 @@ func isRawDocumentQuery(query *Query) bool {
func processLogsQuery(q *Query, b *es.SearchRequestBuilder, from, to int64, defaultTimeField string) {
metric := q.Metrics[0]
b.SortDesc(defaultTimeField, "boolean")
b.SortDesc("_doc", "")
sort := es.SortOrderDesc
if metric.Settings.Get("sortDirection").MustString() == "asc" {
// This is currently used only for log context query
sort = es.SortOrderAsc
}
b.Sort(sort, defaultTimeField, "boolean")
b.Sort(sort, "_doc", "")
b.AddDocValueField(defaultTimeField)
b.Size(stringToIntWithDefaultValue(metric.Settings.Get("limit").MustString(), defaultSize))
b.AddHighlight()
// This is currently used only for log context query to get
// log lines before and after the selected log line
searchAfter := metric.Settings.Get("searchAfter").MustArray()
for _, value := range searchAfter {
b.AddSearchAfter(value)
}
// For log query, we add a date histogram aggregation
aggBuilder := b.Agg()
q.BucketAggs = append(q.BucketAggs, &BucketAgg{
@ -342,8 +354,8 @@ func processLogsQuery(q *Query, b *es.SearchRequestBuilder, from, to int64, defa
func processDocumentQuery(q *Query, b *es.SearchRequestBuilder, from, to int64, defaultTimeField string) {
metric := q.Metrics[0]
b.SortDesc(defaultTimeField, "boolean")
b.SortDesc("_doc", "")
b.Sort(es.SortOrderDesc, defaultTimeField, "boolean")
b.Sort(es.SortOrderDesc, "_doc", "")
b.AddDocValueField(defaultTimeField)
b.Size(stringToIntWithDefaultValue(metric.Settings.Get("size").MustString(), defaultSize))
}

View File

@ -1354,6 +1354,25 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) {
})
})
t.Run("With log context query with sortDirection and searchAfter should return correct query", func(t *testing.T) {
c := newFakeClient()
_, err := executeElasticsearchDataQuery(c, `{
"metrics": [{ "type": "logs", "id": "1", "settings": { "limit": "1000", "sortDirection": "asc", "searchAfter": [1, "2"] }}]
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
require.Equal(t, sr.Sort["@timestamp"], map[string]string{"order": "asc", "unmapped_type": "boolean"})
require.Equal(t, sr.Sort["_doc"], map[string]string{"order": "asc"})
searchAfter := sr.CustomProps["search_after"].([]interface{})
firstSearchAfter, err := searchAfter[0].(json.Number).Int64()
require.NoError(t, err)
require.Equal(t, firstSearchAfter, int64(1))
secondSearchAfter := searchAfter[1].(string)
require.NoError(t, err)
require.Equal(t, secondSearchAfter, "2")
})
t.Run("With invalid query should return error", (func(t *testing.T) {
c := newFakeClient()
_, err := executeElasticsearchDataQuery(c, `{

View File

@ -10,7 +10,7 @@ type IntervalMap = Record<
}
>;
const intervalMap: IntervalMap = {
export const intervalMap: IntervalMap = {
Hourly: { startOf: 'hour', amount: 'hours' },
Daily: { startOf: 'day', amount: 'days' },
Weekly: { startOf: 'isoWeek', amount: 'weeks' },
@ -36,6 +36,7 @@ export class IndexPattern {
// for the provided index pattern.
// This is useful when requesting log context where the only time data we have is the log
// timestamp.
// TODO: Remove when enableBackendMigration toggle is removed
const indexOffset = 7;
if (!this.interval) {
return this.pattern;

View File

@ -1,6 +1,6 @@
import { cloneDeep, find, first as _first, isNumber, isObject, isString, map as _map } from 'lodash';
import { generate, lastValueFrom, Observable, of, throwError } from 'rxjs';
import { catchError, first, map, mergeMap, skipWhile, throwIfEmpty, tap } from 'rxjs/operators';
import { catchError, first, map, mergeMap, skipWhile, throwIfEmpty, tap, switchMap } from 'rxjs/operators';
import { SemVer } from 'semver';
import {
@ -14,7 +14,6 @@ import {
DataSourceWithSupplementaryQueriesSupport,
DateTime,
dateTime,
Field,
getDefaultTimeRange,
AbstractQuery,
LogLevel,
@ -26,6 +25,12 @@ import {
QueryFixAction,
CoreApp,
SupplementaryQueryType,
DataQueryError,
FieldCache,
FieldType,
rangeUtil,
Field,
sortDataFrame,
} from '@grafana/data';
import { BackendSrvRequest, DataSourceWithBackend, getBackendSrv, getDataSourceSrv, config } from '@grafana/runtime';
import { queryLogsVolume } from 'app/core/logsModel';
@ -36,7 +41,7 @@ import { RowContextOptions } from '../../../features/logs/components/LogRowConte
import { getLogLevelFromKey } from '../../../features/logs/utils';
import { ElasticResponse } from './ElasticResponse';
import { IndexPattern } from './IndexPattern';
import { IndexPattern, intervalMap } from './IndexPattern';
import LanguageProvider from './LanguageProvider';
import { ElasticQueryBuilder } from './QueryBuilder';
import { ElasticsearchAnnotationsQueryEditor } from './components/QueryEditor/AnnotationQueryEditor';
@ -49,7 +54,15 @@ import {
import { metricAggregationConfig } from './components/QueryEditor/MetricAggregationsEditor/utils';
import { defaultBucketAgg, hasMetricOfType } from './queryDef';
import { trackQuery } from './tracking';
import { Logs, BucketAggregation, DataLinkConfig, ElasticsearchOptions, ElasticsearchQuery, TermsQuery } from './types';
import {
Logs,
BucketAggregation,
DataLinkConfig,
ElasticsearchOptions,
ElasticsearchQuery,
TermsQuery,
Interval,
} from './types';
import { getScriptValue, isSupportedVersion, unsupportedVersionMessage } from './utils';
export const REF_ID_STARTER_LOG_VOLUME = 'log-volume-';
@ -85,6 +98,7 @@ export class ElasticDatasource
maxConcurrentShardRequests?: number;
queryBuilder: ElasticQueryBuilder;
indexPattern: IndexPattern;
intervalPattern?: Interval;
logMessageField?: string;
logLevelField?: string;
dataLinks: DataLinkConfig[];
@ -110,6 +124,7 @@ export class ElasticDatasource
this.timeField = settingsData.timeField;
this.xpack = Boolean(settingsData.xpack);
this.indexPattern = new IndexPattern(this.index, settingsData.interval);
this.intervalPattern = settingsData.interval;
this.interval = settingsData.timeInterval;
this.maxConcurrentShardRequests = settingsData.maxConcurrentShardRequests;
this.queryBuilder = new ElasticQueryBuilder({
@ -486,65 +501,86 @@ export class ElasticDatasource
}
getLogRowContext = async (row: LogRowModel, options?: RowContextOptions): Promise<{ data: DataFrame[] }> => {
const sortField = row.dataFrame.fields.find((f) => f.name === 'sort');
const searchAfter = sortField?.values.get(row.rowIndex) || [row.timeEpochMs];
const sort = options?.direction === 'FORWARD' ? 'asc' : 'desc';
const { disableElasticsearchBackendExploreQuery, elasticsearchBackendMigration } = config.featureToggles;
if (!disableElasticsearchBackendExploreQuery || elasticsearchBackendMigration) {
const contextRequest = this.makeLogContextDataRequest(row, options);
const header =
options?.direction === 'FORWARD'
? this.getQueryHeader('query_then_fetch', dateTime(row.timeEpochMs))
: this.getQueryHeader('query_then_fetch', undefined, dateTime(row.timeEpochMs));
return lastValueFrom(
this.query(contextRequest).pipe(
catchError((err) => {
const error: DataQueryError = {
message: 'Error during context query. Please check JS console logs.',
status: err.status,
statusText: err.statusText,
};
throw error;
}),
switchMap((res) => {
return of(processToLogContextDataFrames(res));
})
)
);
} else {
const sortField = row.dataFrame.fields.find((f) => f.name === 'sort');
const searchAfter = sortField?.values.get(row.rowIndex) || [row.timeEpochMs];
const sort = options?.direction === 'FORWARD' ? 'asc' : 'desc';
const limit = options?.limit ?? 10;
const esQuery = JSON.stringify({
size: limit,
query: {
bool: {
filter: [
{
range: {
[this.timeField]: {
[options?.direction === 'FORWARD' ? 'gte' : 'lte']: row.timeEpochMs,
format: 'epoch_millis',
const header =
options?.direction === 'FORWARD'
? this.getQueryHeader('query_then_fetch', dateTime(row.timeEpochMs))
: this.getQueryHeader('query_then_fetch', undefined, dateTime(row.timeEpochMs));
const limit = options?.limit ?? 10;
const esQuery = JSON.stringify({
size: limit,
query: {
bool: {
filter: [
{
range: {
[this.timeField]: {
[options?.direction === 'FORWARD' ? 'gte' : 'lte']: row.timeEpochMs,
format: 'epoch_millis',
},
},
},
],
},
},
sort: [{ [this.timeField]: sort }, { _doc: sort }],
search_after: searchAfter,
});
const payload = [header, esQuery].join('\n') + '\n';
const url = this.getMultiSearchUrl();
const response = await lastValueFrom(this.post(url, payload));
const targets: ElasticsearchQuery[] = [{ refId: `${row.dataFrame.refId}`, metrics: [{ type: 'logs', id: '1' }] }];
const elasticResponse = new ElasticResponse(targets, transformHitsBasedOnDirection(response, sort));
const logResponse = elasticResponse.getLogs(this.logMessageField, this.logLevelField);
const dataFrame = _first(logResponse.data);
if (!dataFrame) {
return { data: [] };
}
/**
* The LogRowContextProvider requires there is a field in the dataFrame.fields
* named `ts` for timestamp and `line` for the actual log line to display.
* Unfortunatly these fields are hardcoded and are required for the lines to
* be properly displayed. This code just copies the fields based on this.timeField
* and this.logMessageField and recreates the dataFrame so it works.
*/
const timestampField = dataFrame.fields.find((f: Field) => f.name === this.timeField);
const lineField = dataFrame.fields.find((f: Field) => f.name === this.logMessageField);
if (timestampField && lineField) {
return {
data: [
{
...dataFrame,
fields: [...dataFrame.fields, { ...timestampField, name: 'ts' }, { ...lineField, name: 'line' }],
},
],
},
},
sort: [{ [this.timeField]: sort }, { _doc: sort }],
search_after: searchAfter,
});
const payload = [header, esQuery].join('\n') + '\n';
const url = this.getMultiSearchUrl();
const response = await lastValueFrom(this.post(url, payload));
const targets: ElasticsearchQuery[] = [{ refId: `${row.dataFrame.refId}`, metrics: [{ type: 'logs', id: '1' }] }];
const elasticResponse = new ElasticResponse(targets, transformHitsBasedOnDirection(response, sort));
const logResponse = elasticResponse.getLogs(this.logMessageField, this.logLevelField);
const dataFrame = _first(logResponse.data);
if (!dataFrame) {
return { data: [] };
};
}
return logResponse;
}
/**
* The LogRowContextProvider requires there is a field in the dataFrame.fields
* named `ts` for timestamp and `line` for the actual log line to display.
* Unfortunatly these fields are hardcoded and are required for the lines to
* be properly displayed. This code just copies the fields based on this.timeField
* and this.logMessageField and recreates the dataFrame so it works.
*/
const timestampField = dataFrame.fields.find((f: Field) => f.name === this.timeField);
const lineField = dataFrame.fields.find((f: Field) => f.name === this.logMessageField);
if (timestampField && lineField) {
return {
data: [
{
...dataFrame,
fields: [...dataFrame.fields, { ...timestampField, name: 'ts' }, { ...lineField, name: 'line' }],
},
],
};
}
return logResponse;
};
getDataProvider(
@ -1077,6 +1113,50 @@ export class ElasticDatasource
this.databaseVersion = freshDatabaseVersion;
return freshDatabaseVersion;
}
private makeLogContextDataRequest = (row: LogRowModel, options?: RowContextOptions) => {
const direction = options?.direction || 'BACKWARD';
const logQuery: Logs = {
type: 'logs',
id: '1',
settings: {
limit: options?.limit ? options?.limit.toString() : '10',
// Sorting of results in the context query
sortDirection: direction === 'BACKWARD' ? 'desc' : 'asc',
// Used to get the next log lines before/after the current log line using sort field of selected log line
searchAfter: row.dataFrame.fields.find((f) => f.name === 'sort')?.values.get(row.rowIndex) ?? [row.timeEpochMs],
},
};
const query: ElasticsearchQuery = {
refId: `log-context-${row.dataFrame.refId}-${direction}`,
metrics: [logQuery],
query: '',
};
const timeRange = createContextTimeRange(row.timeEpochMs, direction, this.intervalPattern);
const range = {
from: timeRange.from,
to: timeRange.to,
raw: timeRange,
};
const interval = rangeUtil.calculateInterval(range, 1);
const contextRequest: DataQueryRequest<ElasticsearchQuery> = {
requestId: `log-context-request-${row.dataFrame.refId}-${options?.direction}`,
targets: [query],
interval: interval.interval,
intervalMs: interval.intervalMs,
range,
scopedVars: {},
timezone: 'UTC',
app: CoreApp.Explore,
startTime: Date.now(),
hideFromInspector: true,
};
return contextRequest;
};
}
/**
@ -1129,7 +1209,6 @@ function generateDataLink(linkConfig: DataLinkConfig): DataLink {
};
}
}
function transformHitsBasedOnDirection(response: any, direction: 'asc' | 'desc') {
if (direction === 'desc') {
return response;
@ -1148,3 +1227,73 @@ function transformHitsBasedOnDirection(response: any, direction: 'asc' | 'desc')
],
};
}
function processToLogContextDataFrames(result: DataQueryResponse): DataQueryResponse {
const frames = result.data.map((frame) => sortDataFrame(frame, 0, true));
const processedFrames = frames.map((frame) => {
// log-row-context requires specific field-names to work, so we set them here: "ts", "line", "id"
const cache = new FieldCache(frame);
const timestampField = cache.getFirstFieldOfType(FieldType.time);
const lineField = cache.getFirstFieldOfType(FieldType.string);
const idField = cache.getFieldByName('_id');
if (!timestampField || !lineField || !idField) {
return { ...frame, fields: [] };
}
return {
...frame,
fields: [
{
...timestampField,
name: 'ts',
},
{
...lineField,
name: 'line',
},
{
...idField,
name: 'id',
},
],
};
});
return {
...result,
data: processedFrames,
};
}
function createContextTimeRange(rowTimeEpochMs: number, direction: string, intervalPattern: Interval | undefined) {
const offset = 7;
// For log context, we want to request data from 7 subsequent/previous indices
if (intervalPattern) {
const intervalInfo = intervalMap[intervalPattern];
if (direction === 'FORWARD') {
return {
from: dateTime(rowTimeEpochMs).utc(),
to: dateTime(rowTimeEpochMs).add(offset, intervalInfo.amount).utc().startOf(intervalInfo.startOf),
};
} else {
return {
from: dateTime(rowTimeEpochMs).subtract(offset, intervalInfo.amount).utc().startOf(intervalInfo.startOf),
to: dateTime(rowTimeEpochMs).utc(),
};
}
// If we don't have an interval pattern, we can't do this, so we just request data from 7h before/after
} else {
if (direction === 'FORWARD') {
return {
from: dateTime(rowTimeEpochMs).utc(),
to: dateTime(rowTimeEpochMs).add(offset, 'hours').utc(),
};
} else {
return {
from: dateTime(rowTimeEpochMs).subtract(offset, 'hours').utc(),
to: dateTime(rowTimeEpochMs).utc(),
};
}
}
}

View File

@ -13,11 +13,24 @@ import {
ExtendedStats,
MovingAverage as SchemaMovingAverage,
BucketAggregation,
Logs as SchemaLogs,
} from './dataquery.gen';
export * from './dataquery.gen';
export { Elasticsearch as ElasticsearchQuery } from './dataquery.gen';
// We want to extend the settings of the Logs query with additional properties that
// are not part of the schema. This is a workaround, because exporting LogsSettings
// from dataquery.gen.ts and extending that produces error in SettingKeyOf.
type ExtendedLogsSettings = SchemaLogs['settings'] & {
searchAfter?: unknown[];
sortDirection?: 'asc' | 'desc';
};
export interface Logs extends SchemaLogs {
settings?: ExtendedLogsSettings;
}
export type MetricAggregationWithMeta = ExtendedStats;
export type MovingAverageModelSettings<T extends MovingAverageModel = MovingAverageModel> = Partial<