2021-08-18 23:38:31 -05:00
import { cloneDeep , find , first as _first , isNumber , isObject , isString , map as _map } from 'lodash' ;
import { generate , lastValueFrom , Observable , of , throwError } from 'rxjs' ;
import { catchError , first , map , mergeMap , skipWhile , throwIfEmpty } from 'rxjs/operators' ;
import { gte , lt , satisfies } from 'semver' ;
import { BackendSrvRequest , getBackendSrv , getDataSourceSrv } from '@grafana/runtime' ;
2019-12-11 10:40:56 -06:00
import {
DataFrame ,
2020-07-01 02:45:21 -05:00
DataLink ,
2020-09-08 05:34:11 -05:00
DataQuery ,
2021-01-07 04:26:56 -06:00
DataQueryRequest ,
DataQueryResponse ,
DataSourceApi ,
DataSourceInstanceSettings ,
DateTime ,
dateTime ,
2020-12-10 05:19:14 -06:00
Field ,
2021-01-07 04:26:56 -06:00
getDefaultTimeRange ,
LogRowModel ,
2020-12-04 08:29:40 -06:00
MetricFindValue ,
2021-01-07 04:26:56 -06:00
ScopedVars ,
2020-12-17 05:24:20 -06:00
TimeRange ,
2021-01-07 04:26:56 -06:00
toUtc ,
2019-12-11 10:40:56 -06:00
} from '@grafana/data' ;
2020-09-08 05:34:11 -05:00
import LanguageProvider from './language_provider' ;
2017-12-20 05:33:33 -06:00
import { ElasticResponse } from './elastic_response' ;
2019-03-26 10:15:23 -05:00
import { IndexPattern } from './index_pattern' ;
import { ElasticQueryBuilder } from './query_builder' ;
2020-12-04 08:29:40 -06:00
import { defaultBucketAgg , hasMetricOfType } from './query_def' ;
2020-10-01 12:51:23 -05:00
import { getTemplateSrv , TemplateSrv } from 'app/features/templating/template_srv' ;
2019-12-11 10:40:56 -06:00
import { DataLinkConfig , ElasticsearchOptions , ElasticsearchQuery } from './types' ;
2020-12-10 05:19:14 -06:00
import { RowContextOptions } from '@grafana/ui/src/components/Logs/LogRowContextProvider' ;
2020-12-04 08:29:40 -06:00
import { metricAggregationConfig } from './components/QueryEditor/MetricAggregationsEditor/utils' ;
import {
isMetricAggregationWithField ,
isPipelineAggregationWithMultipleBucketPaths ,
2021-04-13 11:39:07 -05:00
Logs ,
2020-12-04 08:29:40 -06:00
} from './components/QueryEditor/MetricAggregationsEditor/aggregations' ;
import { bucketAggregationConfig } from './components/QueryEditor/BucketAggregationsEditor/utils' ;
2021-04-26 10:54:23 -05:00
import {
BucketAggregation ,
isBucketAggregationWithField ,
} from './components/QueryEditor/BucketAggregationsEditor/aggregations' ;
2021-05-11 03:44:00 -05:00
import { coerceESVersion , getScriptValue } from './utils' ;
2017-09-28 05:52:39 -05:00
2020-09-15 03:14:47 -05:00
// Those are metadata fields as defined in https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-fields.html#_identity_metadata_fields.
// custom fields can start with underscores, therefore is not safe to exclude anything that starts with one.
const ELASTIC_META_FIELDS = [
'_index' ,
'_type' ,
'_id' ,
'_source' ,
'_size' ,
'_field_names' ,
'_ignored' ,
'_routing' ,
'_meta' ,
] ;
2019-06-24 15:15:03 -05:00
export class ElasticDatasource extends DataSourceApi < ElasticsearchQuery , ElasticsearchOptions > {
2020-07-08 04:05:20 -05:00
basicAuth? : string ;
withCredentials? : boolean ;
2017-09-28 05:52:39 -05:00
url : string ;
name : string ;
index : string ;
timeField : string ;
2021-05-11 03:44:00 -05:00
esVersion : string ;
2021-06-04 05:07:59 -05:00
xpack : boolean ;
2017-09-28 05:52:39 -05:00
interval : string ;
2020-07-08 04:05:20 -05:00
maxConcurrentShardRequests? : number ;
2017-09-28 05:52:39 -05:00
queryBuilder : ElasticQueryBuilder ;
indexPattern : IndexPattern ;
2019-06-24 15:15:03 -05:00
logMessageField? : string ;
logLevelField? : string ;
2019-12-11 10:40:56 -06:00
dataLinks : DataLinkConfig [ ] ;
2020-09-08 05:34:11 -05:00
languageProvider : LanguageProvider ;
2021-07-15 08:52:02 -05:00
includeFrozen : boolean ;
2017-09-28 05:52:39 -05:00
2019-06-24 15:15:03 -05:00
constructor (
instanceSettings : DataSourceInstanceSettings < ElasticsearchOptions > ,
2020-12-17 05:24:20 -06:00
private readonly templateSrv : TemplateSrv = getTemplateSrv ( )
2019-06-24 15:15:03 -05:00
) {
super ( instanceSettings ) ;
2017-09-28 05:52:39 -05:00
this . basicAuth = instanceSettings . basicAuth ;
this . withCredentials = instanceSettings . withCredentials ;
2020-07-08 04:05:20 -05:00
this . url = instanceSettings . url ! ;
2017-09-28 05:52:39 -05:00
this . name = instanceSettings . name ;
2020-07-08 04:05:20 -05:00
this . index = instanceSettings . database ? ? '' ;
2019-06-24 15:15:03 -05:00
const settingsData = instanceSettings . jsonData || ( { } as ElasticsearchOptions ) ;
this . timeField = settingsData . timeField ;
2021-05-11 03:44:00 -05:00
this . esVersion = coerceESVersion ( settingsData . esVersion ) ;
2021-06-04 05:07:59 -05:00
this . xpack = Boolean ( settingsData . xpack ) ;
2019-06-24 15:15:03 -05:00
this . indexPattern = new IndexPattern ( this . index , settingsData . interval ) ;
this . interval = settingsData . timeInterval ;
this . maxConcurrentShardRequests = settingsData . maxConcurrentShardRequests ;
2017-09-28 05:52:39 -05:00
this . queryBuilder = new ElasticQueryBuilder ( {
timeField : this.timeField ,
2017-12-20 05:33:33 -06:00
esVersion : this.esVersion ,
2017-09-28 05:52:39 -05:00
} ) ;
2019-06-24 15:15:03 -05:00
this . logMessageField = settingsData . logMessageField || '' ;
this . logLevelField = settingsData . logLevelField || '' ;
2019-12-11 10:40:56 -06:00
this . dataLinks = settingsData . dataLinks || [ ] ;
2021-07-15 08:52:02 -05:00
this . includeFrozen = settingsData . includeFrozen ? ? false ;
2019-06-24 15:15:03 -05:00
if ( this . logMessageField === '' ) {
2020-07-08 04:05:20 -05:00
this . logMessageField = undefined ;
2019-06-24 15:15:03 -05:00
}
if ( this . logLevelField === '' ) {
2020-07-08 04:05:20 -05:00
this . logLevelField = undefined ;
2019-06-24 15:15:03 -05:00
}
2020-09-08 05:34:11 -05:00
this . languageProvider = new LanguageProvider ( this ) ;
2017-09-28 05:52:39 -05:00
}
2021-03-29 17:41:45 -05:00
private request (
method : string ,
url : string ,
data? : undefined ,
headers? : BackendSrvRequest [ 'headers' ]
) : Observable < any > {
const options : BackendSrvRequest = {
2017-12-20 05:33:33 -06:00
url : this.url + '/' + url ,
2021-03-29 17:41:45 -05:00
method ,
data ,
headers ,
2017-09-28 05:52:39 -05:00
} ;
if ( this . basicAuth || this . withCredentials ) {
options . withCredentials = true ;
}
if ( this . basicAuth ) {
options . headers = {
2017-12-20 05:33:33 -06:00
Authorization : this.basicAuth ,
2017-09-28 05:52:39 -05:00
} ;
}
2020-06-09 10:32:47 -05:00
return getBackendSrv ( )
2021-01-10 23:47:51 -06:00
. fetch < any > ( options )
. pipe (
2021-01-20 00:59:48 -06:00
map ( ( results ) = > {
2021-01-10 23:47:51 -06:00
results . data . $ $config = results . config ;
return results . data ;
} ) ,
2021-01-20 00:59:48 -06:00
catchError ( ( err ) = > {
2021-02-02 06:21:24 -06:00
if ( err . data ) {
const message = err . data . error ? . reason ? ? err . data . message ? ? 'Unknown error' ;
2021-01-10 23:47:51 -06:00
return throwError ( {
2021-02-02 06:21:24 -06:00
message : 'Elasticsearch error: ' + message ,
2021-01-10 23:47:51 -06:00
error : err.data.error ,
} ) ;
}
return throwError ( err ) ;
} )
) ;
2017-09-28 05:52:39 -05:00
}
2021-05-06 02:26:26 -05:00
async importQueries ( queries : DataQuery [ ] , originDataSource : DataSourceApi ) : Promise < ElasticsearchQuery [ ] > {
return this . languageProvider . importQueries ( queries , originDataSource . meta . id ) ;
2020-09-08 05:34:11 -05:00
}
2020-03-02 03:45:31 -06:00
/ * *
* Sends a GET request to the specified url on the newest matching and available index .
*
* When multiple indices span the provided time range , the request is sent starting from the newest index ,
* and then going backwards until an index is found .
*
* @param url the url to query the index on , for example ` /_mapping ` .
* /
2021-01-10 23:47:51 -06:00
private get ( url : string , range = getDefaultTimeRange ( ) ) : Observable < any > {
let indexList = this . indexPattern . getIndexList ( range . from , range . to ) ;
if ( ! Array . isArray ( indexList ) ) {
indexList = [ this . indexPattern . getIndexForToday ( ) ] ;
2017-09-28 05:52:39 -05:00
}
2021-01-10 23:47:51 -06:00
2021-01-20 00:59:48 -06:00
const indexUrlList = indexList . map ( ( index ) = > index + url ) ;
2021-01-10 23:47:51 -06:00
return this . requestAllIndices ( indexUrlList ) ;
2017-09-28 05:52:39 -05:00
}
2021-01-10 23:47:51 -06:00
private requestAllIndices ( indexList : string [ ] ) : Observable < any > {
2020-03-02 03:45:31 -06:00
const maxTraversals = 7 ; // do not go beyond one week (for a daily pattern)
const listLen = indexList . length ;
2021-01-10 23:47:51 -06:00
2021-08-18 23:38:31 -05:00
return generate ( {
initialState : 0 ,
condition : ( i ) = > i < Math . min ( listLen , maxTraversals ) ,
iterate : ( i ) = > i + 1 ,
} ) . pipe (
2021-01-20 00:59:48 -06:00
mergeMap ( ( index ) = > {
2021-01-10 23:47:51 -06:00
// catch all errors and emit an object with an err property to simplify checks later in the pipeline
2021-01-20 00:59:48 -06:00
return this . request ( 'GET' , indexList [ listLen - index - 1 ] ) . pipe ( catchError ( ( err ) = > of ( { err } ) ) ) ;
2021-01-10 23:47:51 -06:00
} ) ,
2021-08-18 23:38:31 -05:00
skipWhile ( ( resp ) = > resp ? . err ? . status === 404 ) , // skip all requests that fail because missing Elastic index
2021-01-10 23:47:51 -06:00
throwIfEmpty ( ( ) = > 'Could not find an available index for this time range.' ) , // when i === Math.min(listLen, maxTraversals) generate will complete but without emitting any values which means we didn't find a valid index
first ( ) , // take the first value that isn't skipped
2021-01-20 00:59:48 -06:00
map ( ( resp ) = > {
2021-01-10 23:47:51 -06:00
if ( resp . err ) {
throw resp . err ; // if there is some other error except 404 then we must throw it
2020-03-02 03:45:31 -06:00
}
2021-01-10 23:47:51 -06:00
return resp ;
} )
) ;
2020-03-02 03:45:31 -06:00
}
2021-01-10 23:47:51 -06:00
private post ( url : string , data : any ) : Observable < any > {
2021-03-29 17:41:45 -05:00
return this . request ( 'POST' , url , data , { 'Content-Type' : 'application/x-ndjson' } ) ;
2017-09-28 05:52:39 -05:00
}
2020-01-21 03:08:07 -06:00
annotationQuery ( options : any ) : Promise < any > {
2018-08-29 07:27:29 -05:00
const annotation = options . annotation ;
const timeField = annotation . timeField || '@timestamp' ;
2019-10-15 06:16:08 -05:00
const timeEndField = annotation . timeEndField || null ;
2018-08-29 07:27:29 -05:00
const queryString = annotation . query || '*' ;
const tagsField = annotation . tagsField || 'tags' ;
const textField = annotation . textField || null ;
2017-09-28 05:52:39 -05:00
2019-10-15 06:16:08 -05:00
const dateRanges = [ ] ;
const rangeStart : any = { } ;
rangeStart [ timeField ] = {
2017-09-28 05:52:39 -05:00
from : options . range . from . valueOf ( ) ,
to : options.range.to.valueOf ( ) ,
2017-12-20 05:33:33 -06:00
format : 'epoch_millis' ,
2017-09-28 05:52:39 -05:00
} ;
2019-10-15 06:16:08 -05:00
dateRanges . push ( { range : rangeStart } ) ;
if ( timeEndField ) {
const rangeEnd : any = { } ;
rangeEnd [ timeEndField ] = {
from : options . range . from . valueOf ( ) ,
to : options.range.to.valueOf ( ) ,
format : 'epoch_millis' ,
} ;
dateRanges . push ( { range : rangeEnd } ) ;
}
2017-09-28 05:52:39 -05:00
2018-08-29 07:27:29 -05:00
const queryInterpolated = this . templateSrv . replace ( queryString , { } , 'lucene' ) ;
const query = {
2017-12-19 09:06:54 -06:00
bool : {
filter : [
2019-10-15 06:16:08 -05:00
{
bool : {
should : dateRanges ,
minimum_should_match : 1 ,
} ,
} ,
2017-09-28 05:52:39 -05:00
{
2017-12-19 09:06:54 -06:00
query_string : {
2017-12-20 05:33:33 -06:00
query : queryInterpolated ,
} ,
} ,
] ,
} ,
2017-09-28 05:52:39 -05:00
} ;
2019-07-11 10:05:45 -05:00
const data : any = {
query ,
2017-12-20 05:33:33 -06:00
size : 10000 ,
2017-09-28 05:52:39 -05:00
} ;
// fields field not supported on ES 5.x
2021-05-11 03:44:00 -05:00
if ( lt ( this . esVersion , '5.0.0' ) ) {
2017-12-20 05:33:33 -06:00
data [ 'fields' ] = [ timeField , '_source' ] ;
2017-09-28 05:52:39 -05:00
}
2018-08-29 07:27:29 -05:00
const header : any = {
2017-12-20 05:33:33 -06:00
search_type : 'query_then_fetch' ,
ignore_unavailable : true ,
2017-12-19 09:06:54 -06:00
} ;
2017-09-28 05:52:39 -05:00
// old elastic annotations had index specified on them
if ( annotation . index ) {
header . index = annotation . index ;
} else {
2017-12-21 01:39:31 -06:00
header . index = this . indexPattern . getIndexList ( options . range . from , options . range . to ) ;
2017-09-28 05:52:39 -05:00
}
2020-12-04 08:29:40 -06:00
const payload = JSON . stringify ( header ) + '\n' + JSON . stringify ( data ) + '\n' ;
2017-09-28 05:52:39 -05:00
2021-08-18 23:38:31 -05:00
return lastValueFrom (
this . post ( '_msearch' , payload ) . pipe (
2021-01-20 00:59:48 -06:00
map ( ( res ) = > {
2021-01-10 23:47:51 -06:00
const list = [ ] ;
const hits = res . responses [ 0 ] . hits . hits ;
2017-09-28 05:52:39 -05:00
2021-01-10 23:47:51 -06:00
const getFieldFromSource = ( source : any , fieldName : any ) = > {
if ( ! fieldName ) {
return ;
}
2017-09-28 05:52:39 -05:00
2021-01-10 23:47:51 -06:00
const fieldNames = fieldName . split ( '.' ) ;
let fieldValue = source ;
2017-09-28 05:52:39 -05:00
2021-01-10 23:47:51 -06:00
for ( let i = 0 ; i < fieldNames . length ; i ++ ) {
fieldValue = fieldValue [ fieldNames [ i ] ] ;
if ( ! fieldValue ) {
console . log ( 'could not find field in annotation: ' , fieldName ) ;
return '' ;
}
}
2017-09-28 05:52:39 -05:00
2021-01-10 23:47:51 -06:00
return fieldValue ;
} ;
2017-09-28 05:52:39 -05:00
2021-01-10 23:47:51 -06:00
for ( let i = 0 ; i < hits . length ; i ++ ) {
const source = hits [ i ] . _source ;
let time = getFieldFromSource ( source , timeField ) ;
if ( typeof hits [ i ] . fields !== 'undefined' ) {
const fields = hits [ i ] . fields ;
2021-04-21 02:38:00 -05:00
if ( isString ( fields [ timeField ] ) || isNumber ( fields [ timeField ] ) ) {
2021-01-10 23:47:51 -06:00
time = fields [ timeField ] ;
}
}
2017-09-28 05:52:39 -05:00
2021-01-10 23:47:51 -06:00
const event : {
annotation : any ;
time : number ;
timeEnd? : number ;
text : string ;
tags : string | string [ ] ;
} = {
annotation : annotation ,
time : toUtc ( time ) . valueOf ( ) ,
text : getFieldFromSource ( source , textField ) ,
tags : getFieldFromSource ( source , tagsField ) ,
} ;
if ( timeEndField ) {
const timeEnd = getFieldFromSource ( source , timeEndField ) ;
if ( timeEnd ) {
event . timeEnd = toUtc ( timeEnd ) . valueOf ( ) ;
}
}
2019-10-15 06:16:08 -05:00
2021-01-10 23:47:51 -06:00
// legacy support for title tield
if ( annotation . titleField ) {
const title = getFieldFromSource ( source , annotation . titleField ) ;
if ( title ) {
event . text = title + '\n' + event . text ;
}
}
2017-10-07 03:31:39 -05:00
2021-01-10 23:47:51 -06:00
if ( typeof event . tags === 'string' ) {
event . tags = event . tags . split ( ',' ) ;
}
2017-10-07 03:31:39 -05:00
2021-01-10 23:47:51 -06:00
list . push ( event ) ;
}
return list ;
} )
)
2021-08-18 23:38:31 -05:00
) ;
2017-10-07 03:31:39 -05:00
}
2017-09-28 05:52:39 -05:00
2020-11-11 06:56:43 -06:00
private interpolateLuceneQuery ( queryString : string , scopedVars : ScopedVars ) {
// Elasticsearch queryString should always be '*' if empty string
return this . templateSrv . replace ( queryString , scopedVars , 'lucene' ) || '*' ;
}
2020-01-24 02:50:09 -06:00
interpolateVariablesInQueries ( queries : ElasticsearchQuery [ ] , scopedVars : ScopedVars ) : ElasticsearchQuery [ ] {
2021-04-26 10:54:23 -05:00
// We need a separate interpolation format for lucene queries, therefore we first interpolate any
// lucene query string and then everything else
const interpolateBucketAgg = ( bucketAgg : BucketAggregation ) : BucketAggregation = > {
if ( bucketAgg . type === 'filters' ) {
return {
. . . bucketAgg ,
settings : {
. . . bucketAgg . settings ,
filters : bucketAgg.settings?.filters?.map ( ( filter ) = > ( {
. . . filter ,
query : this.interpolateLuceneQuery ( filter . query || '' , scopedVars ) ,
} ) ) ,
} ,
2019-10-08 10:01:20 -05:00
} ;
2021-04-26 10:54:23 -05:00
}
2020-11-11 06:56:43 -06:00
2021-04-26 10:54:23 -05:00
return bucketAgg ;
} ;
const expandedQueries = queries . map (
( query ) : ElasticsearchQuery = > ( {
. . . query ,
datasource : this.name ,
query : this.interpolateLuceneQuery ( query . query || '' , scopedVars ) ,
bucketAggs : query.bucketAggs?.map ( interpolateBucketAgg ) ,
} )
) ;
const finalQueries : ElasticsearchQuery [ ] = JSON . parse (
this . templateSrv . replace ( JSON . stringify ( expandedQueries ) , scopedVars )
) ;
2021-06-26 12:37:44 -05:00
return finalQueries ;
2019-10-08 10:01:20 -05:00
}
2017-09-28 05:52:39 -05:00
testDatasource() {
// validate that the index exist and has date field
2021-08-18 23:38:31 -05:00
return lastValueFrom (
this . getFields ( [ 'date' ] ) . pipe (
2021-01-20 00:59:48 -06:00
mergeMap ( ( dateFields ) = > {
2021-04-21 02:38:00 -05:00
const timeField : any = find ( dateFields , { text : this.timeField } ) ;
2021-01-10 23:47:51 -06:00
if ( ! timeField ) {
return of ( { status : 'error' , message : 'No date field named ' + this . timeField + ' found' } ) ;
}
return of ( { status : 'success' , message : 'Index OK. Time field name OK.' } ) ;
} ) ,
2021-01-20 00:59:48 -06:00
catchError ( ( err ) = > {
2021-01-10 23:47:51 -06:00
console . error ( err ) ;
if ( err . message ) {
return of ( { status : 'error' , message : err.message } ) ;
} else {
return of ( { status : 'error' , message : err.status } ) ;
}
} )
)
2021-08-18 23:38:31 -05:00
) ;
2017-09-28 05:52:39 -05:00
}
2020-12-17 05:24:20 -06:00
getQueryHeader ( searchType : any , timeFrom? : DateTime , timeTo? : DateTime ) : string {
2018-09-03 04:00:46 -05:00
const queryHeader : any = {
2017-12-19 09:06:54 -06:00
search_type : searchType ,
ignore_unavailable : true ,
2017-12-20 05:33:33 -06:00
index : this.indexPattern.getIndexList ( timeFrom , timeTo ) ,
2017-12-19 09:06:54 -06:00
} ;
2020-07-08 04:05:20 -05:00
2021-05-11 03:44:00 -05:00
if ( satisfies ( this . esVersion , '>=5.6.0 <7.0.0' ) ) {
2018-09-03 04:00:46 -05:00
queryHeader [ 'max_concurrent_shard_requests' ] = this . maxConcurrentShardRequests ;
2017-11-21 03:56:34 -06:00
}
2020-07-08 04:05:20 -05:00
2020-12-04 08:29:40 -06:00
return JSON . stringify ( queryHeader ) ;
}
getQueryDisplayText ( query : ElasticsearchQuery ) {
// TODO: This might be refactored a bit.
const metricAggs = query . metrics ;
const bucketAggs = query . bucketAggs ;
let text = '' ;
if ( query . query ) {
text += 'Query: ' + query . query + ', ' ;
}
text += 'Metrics: ' ;
text += metricAggs ? . reduce ( ( acc , metric ) = > {
const metricConfig = metricAggregationConfig [ metric . type ] ;
let text = metricConfig . label + '(' ;
if ( isMetricAggregationWithField ( metric ) ) {
text += metric . field ;
}
if ( isPipelineAggregationWithMultipleBucketPaths ( metric ) ) {
2021-03-05 06:48:45 -06:00
text += getScriptValue ( metric ) . replace ( new RegExp ( 'params.' , 'g' ) , '' ) ;
2020-12-04 08:29:40 -06:00
}
text += '), ' ;
return ` ${ acc } ${ text } ` ;
} , '' ) ;
text += bucketAggs ? . reduce ( ( acc , bucketAgg , index ) = > {
const bucketConfig = bucketAggregationConfig [ bucketAgg . type ] ;
let text = '' ;
if ( index === 0 ) {
text += ' Group by: ' ;
}
text += bucketConfig . label + '(' ;
if ( isBucketAggregationWithField ( bucketAgg ) ) {
text += bucketAgg . field ;
}
return ` ${ acc } ${ text } ), ` ;
} , '' ) ;
if ( query . alias ) {
text += 'Alias: ' + query . alias ;
}
return text ;
2017-09-28 05:52:39 -05:00
}
2020-12-10 05:19:14 -06:00
/ * *
* This method checks to ensure the user is running a 5.0 + cluster . This is
* necessary bacause the query being used for the getLogRowContext relies on the
* search_after feature .
* /
showContextToggle ( ) : boolean {
2021-05-11 03:44:00 -05:00
return gte ( this . esVersion , '5.0.0' ) ;
2020-12-10 05:19:14 -06:00
}
getLogRowContext = async ( row : LogRowModel , options? : RowContextOptions ) : Promise < { data : DataFrame [ ] } > = > {
2021-01-20 00:59:48 -06:00
const sortField = row . dataFrame . fields . find ( ( f ) = > f . name === 'sort' ) ;
2020-12-10 05:19:14 -06:00
const searchAfter = sortField ? . values . get ( row . rowIndex ) || [ row . timeEpochMs ] ;
2020-12-17 05:24:20 -06:00
const sort = options ? . direction === 'FORWARD' ? 'asc' : 'desc' ;
const header =
options ? . direction === 'FORWARD'
? this . getQueryHeader ( 'query_then_fetch' , dateTime ( row . timeEpochMs ) )
: this . getQueryHeader ( 'query_then_fetch' , undefined , dateTime ( row . timeEpochMs ) ) ;
2020-12-10 05:19:14 -06:00
const limit = options ? . limit ? ? 10 ;
const esQuery = JSON . stringify ( {
size : limit ,
query : {
bool : {
filter : [
{
range : {
[ this . timeField ] : {
2020-12-17 05:24:20 -06:00
[ options ? . direction === 'FORWARD' ? 'gte' : 'lte' ] : row . timeEpochMs ,
2020-12-10 05:19:14 -06:00
format : 'epoch_millis' ,
} ,
} ,
} ,
] ,
} ,
} ,
2020-12-17 05:24:20 -06:00
sort : [ { [ this . timeField ] : sort } , { _doc : sort } ] ,
2020-12-10 05:19:14 -06:00
search_after : searchAfter ,
} ) ;
const payload = [ header , esQuery ] . join ( '\n' ) + '\n' ;
const url = this . getMultiSearchUrl ( ) ;
2021-08-18 23:38:31 -05:00
const response = await lastValueFrom ( this . post ( url , payload ) ) ;
2021-03-02 03:10:41 -06:00
const targets : ElasticsearchQuery [ ] = [ { refId : ` ${ row . dataFrame . refId } ` , metrics : [ { type : 'logs' , id : '1' } ] } ] ;
2020-12-17 05:24:20 -06:00
const elasticResponse = new ElasticResponse ( targets , transformHitsBasedOnDirection ( response , sort ) ) ;
2020-12-10 05:19:14 -06:00
const logResponse = elasticResponse . getLogs ( this . logMessageField , this . logLevelField ) ;
2021-04-21 02:38:00 -05:00
const dataFrame = _first ( logResponse . data ) ;
2020-12-10 05:19:14 -06:00
if ( ! dataFrame ) {
return { data : [ ] } ;
}
/ * *
* The LogRowContextProvider requires there is a field in the dataFrame . fields
* named ` ts ` for timestamp and ` line ` for the actual log line to display .
* Unfortunatly these fields are hardcoded and are required for the lines to
* be properly displayed . This code just copies the fields based on this . timeField
* and this . logMessageField and recreates the dataFrame so it works .
* /
const timestampField = dataFrame . fields . find ( ( f : Field ) = > f . name === this . timeField ) ;
const lineField = dataFrame . fields . find ( ( f : Field ) = > f . name === this . logMessageField ) ;
if ( timestampField && lineField ) {
return {
data : [
{
. . . dataFrame ,
fields : [ . . . dataFrame . fields , { . . . timestampField , name : 'ts' } , { . . . lineField , name : 'line' } ] ,
} ,
] ,
} ;
}
return logResponse ;
} ;
2021-01-10 23:47:51 -06:00
query ( options : DataQueryRequest < ElasticsearchQuery > ) : Observable < DataQueryResponse > {
2018-08-30 02:03:11 -05:00
let payload = '' ;
2021-04-21 02:38:00 -05:00
const targets = this . interpolateVariablesInQueries ( cloneDeep ( options . targets ) , options . scopedVars ) ;
2019-06-24 15:15:03 -05:00
const sentTargets : ElasticsearchQuery [ ] = [ ] ;
2021-03-02 03:10:41 -06:00
let targetsContainsLogsQuery = targets . some ( ( target ) = > hasMetricOfType ( target , 'logs' ) ) ;
2017-09-28 05:52:39 -05:00
// add global adhoc filters to timeFilter
2018-08-29 07:27:29 -05:00
const adhocFilters = this . templateSrv . getAdhocFilters ( this . name ) ;
2017-09-28 05:52:39 -05:00
2021-05-19 03:07:17 -05:00
const logLimits : Array < number | undefined > = [ ] ;
2019-03-26 10:15:23 -05:00
for ( const target of targets ) {
2017-12-19 09:06:54 -06:00
if ( target . hide ) {
continue ;
}
2017-09-28 05:52:39 -05:00
2019-06-24 15:15:03 -05:00
let queryObj ;
2021-03-02 03:10:41 -06:00
if ( hasMetricOfType ( target , 'logs' ) ) {
2021-04-13 11:39:07 -05:00
// FIXME: All this logic here should be in the query builder.
// When moving to the BE-only implementation we should remove this and let the BE
// Handle this.
// TODO: defaultBucketAgg creates a dete_histogram aggregation without a field, so it fallbacks to
// the configured timeField. we should allow people to use a different time field here.
2020-12-04 08:29:40 -06:00
target . bucketAggs = [ defaultBucketAgg ( ) ] ;
2021-04-13 11:39:07 -05:00
const log = target . metrics ? . find ( ( m ) = > m . type === 'logs' ) as Logs ;
const limit = log . settings ? . limit ? parseInt ( log . settings ? . limit , 10 ) : 500 ;
2021-05-19 03:07:17 -05:00
logLimits . push ( limit ) ;
2021-04-13 11:39:07 -05:00
2020-07-09 09:14:55 -05:00
target . metrics = [ ] ;
2019-11-07 06:13:24 -06:00
// Setting this for metrics queries that are typed as logs
2021-04-13 11:39:07 -05:00
queryObj = this . queryBuilder . getLogsQuery ( target , limit , adhocFilters , target . query ) ;
2019-06-24 15:15:03 -05:00
} else {
2021-05-19 03:07:17 -05:00
logLimits . push ( ) ;
2019-06-24 15:15:03 -05:00
if ( target . alias ) {
target . alias = this . templateSrv . replace ( target . alias , options . scopedVars , 'lucene' ) ;
}
2020-11-11 06:56:43 -06:00
queryObj = this . queryBuilder . build ( target , adhocFilters , target . query ) ;
2019-06-24 15:15:03 -05:00
}
2020-12-04 08:29:40 -06:00
const esQuery = JSON . stringify ( queryObj ) ;
2017-09-28 05:52:39 -05:00
2021-05-11 03:44:00 -05:00
const searchType = queryObj . size === 0 && lt ( this . esVersion , '5.0.0' ) ? 'count' : 'query_then_fetch' ;
2018-08-29 07:27:29 -05:00
const header = this . getQueryHeader ( searchType , options . range . from , options . range . to ) ;
2017-12-20 05:33:33 -06:00
payload += header + '\n' ;
2017-12-19 09:06:54 -06:00
2017-12-20 05:33:33 -06:00
payload += esQuery + '\n' ;
2019-06-24 15:15:03 -05:00
2017-09-28 05:52:39 -05:00
sentTargets . push ( target ) ;
}
if ( sentTargets . length === 0 ) {
2021-01-10 23:47:51 -06:00
return of ( { data : [ ] } ) ;
2017-09-28 05:52:39 -05:00
}
2020-02-13 09:00:01 -06:00
// We replace the range here for actual values. We need to replace it together with enclosing "" so that we replace
// it as an integer not as string with digits. This is because elastic will convert the string only if the time
// field is specified as type date (which probably should) but can also be specified as integer (millisecond epoch)
// and then sending string will error out.
payload = payload . replace ( /"\$timeFrom"/g , options . range . from . valueOf ( ) . toString ( ) ) ;
payload = payload . replace ( /"\$timeTo"/g , options . range . to . valueOf ( ) . toString ( ) ) ;
2017-09-28 05:52:39 -05:00
payload = this . templateSrv . replace ( payload , options . scopedVars ) ;
2019-04-25 02:41:13 -05:00
const url = this . getMultiSearchUrl ( ) ;
2021-01-10 23:47:51 -06:00
return this . post ( url , payload ) . pipe (
2021-01-20 00:59:48 -06:00
map ( ( res ) = > {
2021-01-10 23:47:51 -06:00
const er = new ElasticResponse ( sentTargets , res ) ;
2020-07-08 04:05:20 -05:00
2021-03-02 03:10:41 -06:00
// TODO: This needs to be revisited, it seems wrong to process ALL the sent queries as logs if only one of them was a log query
if ( targetsContainsLogsQuery ) {
2021-01-10 23:47:51 -06:00
const response = er . getLogs ( this . logMessageField , this . logLevelField ) ;
2021-05-19 03:07:17 -05:00
response . data . forEach ( ( dataFrame , index ) = > {
enhanceDataFrame ( dataFrame , this . dataLinks , logLimits [ index ] ) ;
} ) ;
2021-01-10 23:47:51 -06:00
return response ;
2019-12-11 10:40:56 -06:00
}
2019-06-24 15:15:03 -05:00
2021-01-10 23:47:51 -06:00
return er . getTimeSeries ( ) ;
} )
) ;
2017-10-07 03:31:39 -05:00
}
2017-09-28 05:52:39 -05:00
2020-09-15 03:14:47 -05:00
isMetadataField ( fieldName : string ) {
return ELASTIC_META_FIELDS . includes ( fieldName ) ;
}
2020-12-04 08:29:40 -06:00
// TODO: instead of being a string, this could be a custom type representing all the elastic types
2021-04-14 09:18:06 -05:00
// FIXME: This doesn't seem to return actual MetricFindValues, we should either change the return type
// or fix the implementation.
2021-06-04 05:07:59 -05:00
getFields ( type ? : string [ ] , range? : TimeRange ) : Observable < MetricFindValue [ ] > {
const typeMap : Record < string , string > = {
float : 'number' ,
double : 'number' ,
integer : 'number' ,
long : 'number' ,
date : 'date' ,
date_nanos : 'date' ,
string : 'string' ,
text : 'string' ,
scaled_float : 'number' ,
nested : 'nested' ,
histogram : 'number' ,
} ;
2021-01-10 23:47:51 -06:00
return this . get ( '/_mapping' , range ) . pipe (
2021-01-20 00:59:48 -06:00
map ( ( result ) = > {
2021-01-10 23:47:51 -06:00
const shouldAddField = ( obj : any , key : string ) = > {
if ( this . isMetadataField ( key ) ) {
return false ;
}
2017-09-28 05:52:39 -05:00
2021-06-04 05:07:59 -05:00
if ( ! type || type . length === 0 ) {
2021-01-10 23:47:51 -06:00
return true ;
}
2017-09-28 05:52:39 -05:00
2021-01-10 23:47:51 -06:00
// equal query type filter, or via typemap translation
2021-06-04 05:07:59 -05:00
return type . includes ( obj . type ) || type . includes ( typeMap [ obj . type ] ) ;
2021-01-10 23:47:51 -06:00
} ;
2017-09-28 05:52:39 -05:00
2021-01-10 23:47:51 -06:00
// Store subfield names: [system, process, cpu, total] -> system.process.cpu.total
const fieldNameParts : any = [ ] ;
const fields : any = { } ;
2017-09-28 05:52:39 -05:00
2021-01-10 23:47:51 -06:00
function getFieldsRecursively ( obj : any ) {
for ( const key in obj ) {
const subObj = obj [ key ] ;
2017-09-28 05:52:39 -05:00
2021-01-10 23:47:51 -06:00
// Check mapping field for nested fields
2021-04-21 02:38:00 -05:00
if ( isObject ( subObj . properties ) ) {
2021-01-10 23:47:51 -06:00
fieldNameParts . push ( key ) ;
getFieldsRecursively ( subObj . properties ) ;
}
2017-09-28 05:52:39 -05:00
2021-04-21 02:38:00 -05:00
if ( isObject ( subObj . fields ) ) {
2021-01-10 23:47:51 -06:00
fieldNameParts . push ( key ) ;
getFieldsRecursively ( subObj . fields ) ;
}
2017-09-28 05:52:39 -05:00
2021-04-21 02:38:00 -05:00
if ( isString ( subObj . type ) ) {
2021-01-10 23:47:51 -06:00
const fieldName = fieldNameParts . concat ( key ) . join ( '.' ) ;
2017-09-28 05:52:39 -05:00
2021-01-10 23:47:51 -06:00
// Hide meta-fields and check field type
if ( shouldAddField ( subObj , key ) ) {
fields [ fieldName ] = {
text : fieldName ,
type : subObj . type ,
} ;
}
2017-09-28 05:52:39 -05:00
}
}
2021-01-10 23:47:51 -06:00
fieldNameParts . pop ( ) ;
2017-09-28 05:52:39 -05:00
}
2019-04-25 02:41:13 -05:00
2021-01-10 23:47:51 -06:00
for ( const indexName in result ) {
const index = result [ indexName ] ;
if ( index && index . mappings ) {
const mappings = index . mappings ;
2021-05-11 03:44:00 -05:00
if ( lt ( this . esVersion , '7.0.0' ) ) {
2021-01-10 23:47:51 -06:00
for ( const typeName in mappings ) {
const properties = mappings [ typeName ] . properties ;
getFieldsRecursively ( properties ) ;
}
} else {
const properties = mappings . properties ;
2019-04-25 02:41:13 -05:00
getFieldsRecursively ( properties ) ;
}
2017-09-28 05:52:39 -05:00
}
}
2021-01-10 23:47:51 -06:00
// transform to array
2021-04-21 02:38:00 -05:00
return _map ( fields , ( value ) = > {
2021-01-10 23:47:51 -06:00
return value ;
} ) ;
} )
) ;
2017-09-28 05:52:39 -05:00
}
2021-01-10 23:47:51 -06:00
getTerms ( queryDef : any , range = getDefaultTimeRange ( ) ) : Observable < MetricFindValue [ ] > {
2021-05-11 03:44:00 -05:00
const searchType = gte ( this . esVersion , '5.0.0' ) ? 'query_then_fetch' : 'count' ;
2018-08-29 07:27:29 -05:00
const header = this . getQueryHeader ( searchType , range . from , range . to ) ;
2020-12-04 08:29:40 -06:00
let esQuery = JSON . stringify ( this . queryBuilder . getTermsQuery ( queryDef ) ) ;
2017-09-28 05:52:39 -05:00
2019-06-24 15:15:03 -05:00
esQuery = esQuery . replace ( /\$timeFrom/g , range . from . valueOf ( ) . toString ( ) ) ;
esQuery = esQuery . replace ( /\$timeTo/g , range . to . valueOf ( ) . toString ( ) ) ;
2017-12-20 05:33:33 -06:00
esQuery = header + '\n' + esQuery + '\n' ;
2017-09-28 05:52:39 -05:00
2019-04-25 02:41:13 -05:00
const url = this . getMultiSearchUrl ( ) ;
2021-01-10 23:47:51 -06:00
return this . post ( url , esQuery ) . pipe (
2021-01-20 00:59:48 -06:00
map ( ( res ) = > {
2021-01-10 23:47:51 -06:00
if ( ! res . responses [ 0 ] . aggregations ) {
return [ ] ;
}
2017-12-21 01:39:31 -06:00
2021-01-10 23:47:51 -06:00
const buckets = res . responses [ 0 ] . aggregations [ '1' ] . buckets ;
2021-04-21 02:38:00 -05:00
return _map ( buckets , ( bucket ) = > {
2021-01-10 23:47:51 -06:00
return {
text : bucket.key_as_string || bucket . key ,
value : bucket.key ,
} ;
} ) ;
} )
) ;
2017-09-28 05:52:39 -05:00
}
2019-04-25 02:41:13 -05:00
getMultiSearchUrl() {
2021-07-15 08:52:02 -05:00
const searchParams = new URLSearchParams ( ) ;
2021-05-11 03:44:00 -05:00
if ( gte ( this . esVersion , '7.0.0' ) && this . maxConcurrentShardRequests ) {
2021-07-15 08:52:02 -05:00
searchParams . append ( 'max_concurrent_shard_requests' , ` ${ this . maxConcurrentShardRequests } ` ) ;
}
if ( gte ( this . esVersion , '6.6.0' ) && this . xpack && this . includeFrozen ) {
searchParams . append ( 'ignore_throttled' , 'false' ) ;
2019-04-25 02:41:13 -05:00
}
2021-07-15 08:52:02 -05:00
return ( '_msearch?' + searchParams . toString ( ) ) . replace ( /\?$/ , '' ) ;
2019-04-25 02:41:13 -05:00
}
2020-12-17 05:24:20 -06:00
metricFindQuery ( query : string , options? : any ) : Promise < MetricFindValue [ ] > {
const range = options ? . range ;
2020-12-04 08:29:40 -06:00
const parsedQuery = JSON . parse ( query ) ;
2020-01-21 03:08:07 -06:00
if ( query ) {
2020-12-04 08:29:40 -06:00
if ( parsedQuery . find === 'fields' ) {
2021-01-07 04:26:56 -06:00
parsedQuery . type = this . templateSrv . replace ( parsedQuery . type , { } , 'lucene' ) ;
2021-08-18 23:38:31 -05:00
return lastValueFrom ( this . getFields ( parsedQuery . type , range ) ) ;
2020-01-21 03:08:07 -06:00
}
2017-09-28 05:52:39 -05:00
2020-12-04 08:29:40 -06:00
if ( parsedQuery . find === 'terms' ) {
parsedQuery . field = this . templateSrv . replace ( parsedQuery . field , { } , 'lucene' ) ;
parsedQuery . query = this . templateSrv . replace ( parsedQuery . query || '*' , { } , 'lucene' ) ;
2021-08-18 23:38:31 -05:00
return lastValueFrom ( this . getTerms ( parsedQuery , range ) ) ;
2020-01-21 03:08:07 -06:00
}
2017-09-28 05:52:39 -05:00
}
2020-01-21 03:08:07 -06:00
return Promise . resolve ( [ ] ) ;
2017-09-28 05:52:39 -05:00
}
getTagKeys() {
2021-08-18 23:38:31 -05:00
return lastValueFrom ( this . getFields ( ) ) ;
2017-09-28 05:52:39 -05:00
}
2019-07-11 10:05:45 -05:00
getTagValues ( options : any ) {
2021-08-18 23:38:31 -05:00
return lastValueFrom ( this . getTerms ( { field : options.key , query : '*' } ) ) ;
2017-09-28 05:52:39 -05:00
}
2018-05-28 12:45:18 -05:00
2019-07-11 10:05:45 -05:00
targetContainsTemplate ( target : any ) {
2018-05-28 12:45:18 -05:00
if ( this . templateSrv . variableExists ( target . query ) || this . templateSrv . variableExists ( target . alias ) ) {
return true ;
}
2018-08-26 10:14:40 -05:00
for ( const bucketAgg of target . bucketAggs ) {
2018-05-28 12:45:18 -05:00
if ( this . templateSrv . variableExists ( bucketAgg . field ) || this . objectContainsTemplate ( bucketAgg . settings ) ) {
return true ;
}
}
2018-08-26 10:14:40 -05:00
for ( const metric of target . metrics ) {
2018-05-28 12:45:18 -05:00
if (
this . templateSrv . variableExists ( metric . field ) ||
this . objectContainsTemplate ( metric . settings ) ||
this . objectContainsTemplate ( metric . meta )
) {
return true ;
}
}
return false ;
}
2019-07-11 10:05:45 -05:00
private isPrimitive ( obj : any ) {
2018-05-28 12:45:18 -05:00
if ( obj === null || obj === undefined ) {
return true ;
}
2021-01-20 00:59:48 -06:00
if ( [ 'string' , 'number' , 'boolean' ] . some ( ( type ) = > type === typeof true ) ) {
2018-05-28 12:45:18 -05:00
return true ;
}
return false ;
}
2019-07-11 10:05:45 -05:00
private objectContainsTemplate ( obj : any ) {
2018-05-28 12:45:18 -05:00
if ( ! obj ) {
return false ;
}
2018-08-26 10:14:40 -05:00
for ( const key of Object . keys ( obj ) ) {
2018-05-28 12:45:18 -05:00
if ( this . isPrimitive ( obj [ key ] ) ) {
if ( this . templateSrv . variableExists ( obj [ key ] ) ) {
return true ;
}
} else if ( Array . isArray ( obj [ key ] ) ) {
2018-08-26 10:14:40 -05:00
for ( const item of obj [ key ] ) {
2018-05-28 12:45:18 -05:00
if ( this . objectContainsTemplate ( item ) ) {
return true ;
}
}
} else {
if ( this . objectContainsTemplate ( obj [ key ] ) ) {
return true ;
}
}
}
return false ;
}
2017-09-28 05:52:39 -05:00
}
2020-07-01 02:45:21 -05:00
/ * *
* Modifies dataframe and adds dataLinks from the config .
* Exported for tests .
* /
2021-05-19 03:07:17 -05:00
export function enhanceDataFrame ( dataFrame : DataFrame , dataLinks : DataLinkConfig [ ] , limit? : number ) {
2020-12-01 12:10:23 -06:00
const dataSourceSrv = getDataSourceSrv ( ) ;
2021-05-19 03:07:17 -05:00
if ( limit ) {
dataFrame . meta = {
. . . dataFrame . meta ,
limit ,
} ;
}
2020-12-01 12:10:23 -06:00
if ( ! dataLinks . length ) {
return ;
}
for ( const field of dataFrame . fields ) {
2021-01-20 00:59:48 -06:00
const dataLinkConfig = dataLinks . find ( ( dataLink ) = > field . name && field . name . match ( dataLink . field ) ) ;
2020-12-01 12:10:23 -06:00
if ( ! dataLinkConfig ) {
continue ;
2020-07-01 02:45:21 -05:00
}
2020-12-01 12:10:23 -06:00
let link : DataLink ;
if ( dataLinkConfig . datasourceUid ) {
const dsSettings = dataSourceSrv . getInstanceSettings ( dataLinkConfig . datasourceUid ) ;
link = {
2021-07-29 13:42:20 -05:00
title : dataLinkConfig.urlDisplayLabel || '' ,
2020-12-01 12:10:23 -06:00
url : '' ,
internal : {
query : { query : dataLinkConfig.url } ,
datasourceUid : dataLinkConfig.datasourceUid ,
datasourceName : dsSettings?.name ? ? 'Data source not found' ,
} ,
} ;
} else {
link = {
2021-07-29 13:42:20 -05:00
title : dataLinkConfig.urlDisplayLabel || '' ,
2020-12-01 12:10:23 -06:00
url : dataLinkConfig.url ,
} ;
}
field . config = field . config || { } ;
field . config . links = [ . . . ( field . config . links || [ ] ) , link ] ;
2020-07-01 02:45:21 -05:00
}
}
2020-12-10 05:19:14 -06:00
function transformHitsBasedOnDirection ( response : any , direction : 'asc' | 'desc' ) {
if ( direction === 'desc' ) {
return response ;
}
const actualResponse = response . responses [ 0 ] ;
return {
. . . response ,
responses : [
{
. . . actualResponse ,
hits : {
. . . actualResponse . hits ,
hits : actualResponse.hits.hits.reverse ( ) ,
} ,
} ,
] ,
} ;
}