package elasticsearch import ( "context" "encoding/json" "errors" "fmt" "regexp" "sort" "strconv" "strings" "time" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/tracing" es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client" "github.com/grafana/grafana/pkg/tsdb/elasticsearch/instrumentation" ) const ( // Metric types countType = "count" percentilesType = "percentiles" extendedStatsType = "extended_stats" topMetricsType = "top_metrics" // Bucket types dateHistType = "date_histogram" nestedType = "nested" histogramType = "histogram" filtersType = "filters" termsType = "terms" geohashGridType = "geohash_grid" // Document types rawDocumentType = "raw_document" rawDataType = "raw_data" // Logs type logsType = "logs" ) var searchWordsRegex = regexp.MustCompile(regexp.QuoteMeta(es.HighlightPreTagsString) + `(.*?)` + regexp.QuoteMeta(es.HighlightPostTagsString)) func parseResponse(ctx context.Context, responses []*es.SearchResponse, targets []*Query, configuredFields es.ConfiguredFields, keepLabelsInResponse bool, logger log.Logger, tracer tracing.Tracer) (*backend.QueryDataResponse, error) { result := backend.QueryDataResponse{ Responses: backend.Responses{}, } if responses == nil { return &result, nil } ctx, span := tracer.Start(ctx, "datasource.elastic.parseResponse", trace.WithAttributes( attribute.Int("responseLength", len(responses)), )) defer span.End() for i, res := range responses { _, resSpan := tracer.Start(ctx, "datasource.elastic.parseResponse.response", trace.WithAttributes( attribute.String("queryMetricType", targets[i].Metrics[0].Type), )) start := time.Now() target := targets[i] if res.Error != nil { mt, _ := json.Marshal(target) me, _ := json.Marshal(res.Error) resSpan.RecordError(errors.New(string(me))) resSpan.SetStatus(codes.Error, string(me)) resSpan.End() logger.Error("Processing error response from Elasticsearch", "error", string(me), "query", string(mt)) errResult := getErrorFromElasticResponse(res) result.Responses[target.RefID] = errorsource.Response(errorsource.PluginError(errors.New(errResult), false)) continue } queryRes := backend.DataResponse{} if isRawDataQuery(target) { err := processRawDataResponse(res, target, configuredFields, &queryRes, logger) if err != nil { // TODO: This error never happens so we should remove it return &backend.QueryDataResponse{}, err } result.Responses[target.RefID] = queryRes } else if isRawDocumentQuery(target) { err := processRawDocumentResponse(res, target, &queryRes, logger) if err != nil { // TODO: This error never happens so we should remove it return &backend.QueryDataResponse{}, err } result.Responses[target.RefID] = queryRes } else if isLogsQuery(target) { err := processLogsResponse(res, target, configuredFields, &queryRes, logger) if err != nil { // TODO: This error never happens so we should remove it return &backend.QueryDataResponse{}, err } result.Responses[target.RefID] = queryRes } else { // Process as metric query result props := make(map[string]string) err := processBuckets(res.Aggregations, target, &queryRes, props, 0) logger.Debug("Processed metric query response") if err != nil { mt, _ := json.Marshal(target) span.RecordError(err) span.SetStatus(codes.Error, err.Error()) resSpan.RecordError(err) resSpan.SetStatus(codes.Error, err.Error()) logger.Error("Error processing buckets", "error", err, "query", string(mt), "aggregationsLength", len(res.Aggregations), "stage", es.StageParseResponse) instrumentation.UpdatePluginParsingResponseDurationSeconds(ctx, time.Since(start), "error") resSpan.End() return &backend.QueryDataResponse{}, err } nameFields(queryRes, target, keepLabelsInResponse) trimDatapoints(queryRes, target) result.Responses[target.RefID] = queryRes } instrumentation.UpdatePluginParsingResponseDurationSeconds(ctx, time.Since(start), "ok") logger.Info("Finished processing of response", "duration", time.Since(start), "stage", es.StageParseResponse) resSpan.End() } return &result, nil } func processLogsResponse(res *es.SearchResponse, target *Query, configuredFields es.ConfiguredFields, queryRes *backend.DataResponse, logger log.Logger) error { propNames := make(map[string]bool) docs := make([]map[string]interface{}, len(res.Hits.Hits)) searchWords := make(map[string]bool) for hitIdx, hit := range res.Hits.Hits { var flattened map[string]interface{} var sourceString string if hit["_source"] != nil { flattened = flatten(hit["_source"].(map[string]interface{}), 10) sourceMarshalled, err := json.Marshal(flattened) if err != nil { return err } sourceString = string(sourceMarshalled) } doc := map[string]interface{}{ "_id": hit["_id"], "_type": hit["_type"], "_index": hit["_index"], "sort": hit["sort"], "highlight": hit["highlight"], // In case of logs query we want to have the raw source as a string field so it can be visualized in logs panel "_source": sourceString, } for k, v := range flattened { if configuredFields.LogLevelField != "" && k == configuredFields.LogLevelField { doc["level"] = v } else { doc[k] = v } } if hit["fields"] != nil { source, ok := hit["fields"].(map[string]interface{}) if ok { for k, v := range source { doc[k] = v } } } // we are going to add an `id` field with the concatenation of `_id` and `_index` _, ok := doc["id"] if !ok { doc["id"] = fmt.Sprintf("%v#%v", doc["_index"], doc["_id"]) } for key := range doc { propNames[key] = true } // Process highlight to searchWords if highlights, ok := doc["highlight"].(map[string]interface{}); ok { for _, highlight := range highlights { if highlightList, ok := highlight.([]interface{}); ok { for _, highlightValue := range highlightList { str := fmt.Sprintf("%v", highlightValue) matches := searchWordsRegex.FindAllStringSubmatch(str, -1) for _, v := range matches { searchWords[v[1]] = true } } } } } docs[hitIdx] = doc } sortedPropNames := sortPropNames(propNames, configuredFields, true) fields := processDocsToDataFrameFields(docs, sortedPropNames, configuredFields) frames := data.Frames{} frame := data.NewFrame("", fields...) setPreferredVisType(frame, data.VisTypeLogs) setLogsCustomMeta(frame, searchWords, stringToIntWithDefaultValue(target.Metrics[0].Settings.Get("limit").MustString(), defaultSize)) frames = append(frames, frame) queryRes.Frames = frames logger.Debug("Processed log query response", "fieldsLength", len(frame.Fields)) return nil } func processRawDataResponse(res *es.SearchResponse, target *Query, configuredFields es.ConfiguredFields, queryRes *backend.DataResponse, logger log.Logger) error { propNames := make(map[string]bool) docs := make([]map[string]interface{}, len(res.Hits.Hits)) for hitIdx, hit := range res.Hits.Hits { var flattened map[string]interface{} if hit["_source"] != nil { flattened = flatten(hit["_source"].(map[string]interface{}), 10) } doc := map[string]interface{}{ "_id": hit["_id"], "_type": hit["_type"], "_index": hit["_index"], "sort": hit["sort"], "highlight": hit["highlight"], } for k, v := range flattened { doc[k] = v } if hit["fields"] != nil { source, ok := hit["fields"].(map[string]interface{}) if ok { for k, v := range source { doc[k] = v } } } for key := range doc { propNames[key] = true } docs[hitIdx] = doc } sortedPropNames := sortPropNames(propNames, configuredFields, false) fields := processDocsToDataFrameFields(docs, sortedPropNames, configuredFields) frames := data.Frames{} frame := data.NewFrame("", fields...) frames = append(frames, frame) queryRes.Frames = frames logger.Debug("Processed raw data query response", "fieldsLength", len(frame.Fields)) return nil } func processRawDocumentResponse(res *es.SearchResponse, target *Query, queryRes *backend.DataResponse, logger log.Logger) error { docs := make([]map[string]interface{}, len(res.Hits.Hits)) for hitIdx, hit := range res.Hits.Hits { doc := map[string]interface{}{ "_id": hit["_id"], "_type": hit["_type"], "_index": hit["_index"], "sort": hit["sort"], "highlight": hit["highlight"], } if hit["_source"] != nil { source, ok := hit["_source"].(map[string]interface{}) if ok { for k, v := range source { doc[k] = v } } } if hit["fields"] != nil { source, ok := hit["fields"].(map[string]interface{}) if ok { for k, v := range source { doc[k] = v } } } docs[hitIdx] = doc } fieldVector := make([]*json.RawMessage, len(res.Hits.Hits)) for i, doc := range docs { bytes, err := json.Marshal(doc) if err != nil { // We skip docs that can't be marshalled // should not happen continue } value := json.RawMessage(bytes) fieldVector[i] = &value } isFilterable := true field := data.NewField(target.RefID, nil, fieldVector) field.Config = &data.FieldConfig{Filterable: &isFilterable} frames := data.Frames{} frame := data.NewFrame(target.RefID, field) frames = append(frames, frame) queryRes.Frames = frames logger.Debug("Processed raw document query response", "fieldsLength", len(frame.Fields)) return nil } func processDocsToDataFrameFields(docs []map[string]interface{}, propNames []string, configuredFields es.ConfiguredFields) []*data.Field { size := len(docs) isFilterable := true allFields := make([]*data.Field, len(propNames)) timeString := "" timeStringOk := false for propNameIdx, propName := range propNames { // Special handling for time field if propName == configuredFields.TimeField { timeVector := make([]*time.Time, size) for i, doc := range docs { // Check if time field is a string timeString, timeStringOk = doc[configuredFields.TimeField].(string) // If not, it might be an array with one time string if !timeStringOk { timeList, ok := doc[configuredFields.TimeField].([]interface{}) if !ok || len(timeList) != 1 { continue } // Check if the first element is a string timeString, timeStringOk = timeList[0].(string) if !timeStringOk { continue } } timeValue, err := time.Parse(time.RFC3339Nano, timeString) if err != nil { // We skip time values that cannot be parsed continue } else { timeVector[i] = &timeValue } } field := data.NewField(configuredFields.TimeField, nil, timeVector) field.Config = &data.FieldConfig{Filterable: &isFilterable} allFields[propNameIdx] = field continue } propNameValue := findTheFirstNonNilDocValueForPropName(docs, propName) switch propNameValue.(type) { // We are checking for default data types values (float64, int, bool, string) // and default to json.RawMessage if we cannot find any of them case float64: allFields[propNameIdx] = createFieldOfType[float64](docs, propName, size, isFilterable) case int: allFields[propNameIdx] = createFieldOfType[int](docs, propName, size, isFilterable) case string: allFields[propNameIdx] = createFieldOfType[string](docs, propName, size, isFilterable) case bool: allFields[propNameIdx] = createFieldOfType[bool](docs, propName, size, isFilterable) default: fieldVector := make([]*json.RawMessage, size) for i, doc := range docs { bytes, err := json.Marshal(doc[propName]) if err != nil { // We skip values that cannot be marshalled continue } value := json.RawMessage(bytes) fieldVector[i] = &value } field := data.NewField(propName, nil, fieldVector) field.Config = &data.FieldConfig{Filterable: &isFilterable} allFields[propNameIdx] = field } } return allFields } func processBuckets(aggs map[string]interface{}, target *Query, queryResult *backend.DataResponse, props map[string]string, depth int) error { var err error maxDepth := len(target.BucketAggs) - 1 aggIDs := make([]string, 0) for k := range aggs { aggIDs = append(aggIDs, k) } sort.Strings(aggIDs) for _, aggID := range aggIDs { v := aggs[aggID] aggDef, _ := findAgg(target, aggID) esAgg := simplejson.NewFromAny(v) if aggDef == nil { continue } if aggDef.Type == nestedType { err = processBuckets(esAgg.MustMap(), target, queryResult, props, depth+1) if err != nil { return err } continue } if depth == maxDepth { if aggDef.Type == dateHistType { err = processMetrics(esAgg, target, queryResult, props) } else { err = processAggregationDocs(esAgg, aggDef, target, queryResult, props) } if err != nil { return err } } else { for _, b := range esAgg.Get("buckets").MustArray() { bucket := simplejson.NewFromAny(b) newProps := make(map[string]string) for k, v := range props { newProps[k] = v } if key, err := bucket.Get("key").String(); err == nil { newProps[aggDef.Field] = key } else if key, err := bucket.Get("key").Int64(); err == nil { newProps[aggDef.Field] = strconv.FormatInt(key, 10) } if key, err := bucket.Get("key_as_string").String(); err == nil { newProps[aggDef.Field] = key } err = processBuckets(bucket.MustMap(), target, queryResult, newProps, depth+1) if err != nil { return err } } buckets := esAgg.Get("buckets").MustMap() bucketKeys := make([]string, 0) for k := range buckets { bucketKeys = append(bucketKeys, k) } sort.Strings(bucketKeys) for _, bucketKey := range bucketKeys { bucket := simplejson.NewFromAny(buckets[bucketKey]) newProps := make(map[string]string) for k, v := range props { newProps[k] = v } newProps["filter"] = bucketKey err = processBuckets(bucket.MustMap(), target, queryResult, newProps, depth+1) if err != nil { return err } } } } return nil } func newTimeSeriesFrame(timeData []time.Time, tags map[string]string, values []*float64) *data.Frame { frame := data.NewFrame("", data.NewField(data.TimeSeriesTimeFieldName, nil, timeData), data.NewField(data.TimeSeriesValueFieldName, tags, values)) frame.Meta = &data.FrameMeta{ Type: data.FrameTypeTimeSeriesMulti, } return frame } func processCountMetric(buckets []*simplejson.Json, props map[string]string) (data.Frames, error) { tags := make(map[string]string, len(props)) timeVector := make([]time.Time, 0, len(buckets)) values := make([]*float64, 0, len(buckets)) for _, bucket := range buckets { value := castToFloat(bucket.Get("doc_count")) timeValue, err := getAsTime(bucket.Get("key")) if err != nil { return nil, err } timeVector = append(timeVector, timeValue) values = append(values, value) } for k, v := range props { tags[k] = v } tags["metric"] = countType return data.Frames{newTimeSeriesFrame(timeVector, tags, values)}, nil } func processPercentilesMetric(metric *MetricAgg, buckets []*simplejson.Json, props map[string]string) (data.Frames, error) { if len(buckets) == 0 { return data.Frames{}, nil } firstBucket := buckets[0] percentiles := firstBucket.GetPath(metric.ID, "values").MustMap() percentileKeys := make([]string, 0) for k := range percentiles { percentileKeys = append(percentileKeys, k) } sort.Strings(percentileKeys) frames := data.Frames{} for _, percentileName := range percentileKeys { tags := make(map[string]string, len(props)) timeVector := make([]time.Time, 0, len(buckets)) values := make([]*float64, 0, len(buckets)) for k, v := range props { tags[k] = v } tags["metric"] = "p" + percentileName tags["field"] = metric.Field for _, bucket := range buckets { value := castToFloat(bucket.GetPath(metric.ID, "values", percentileName)) key := bucket.Get("key") timeValue, err := getAsTime(key) if err != nil { return nil, err } timeVector = append(timeVector, timeValue) values = append(values, value) } frames = append(frames, newTimeSeriesFrame(timeVector, tags, values)) } return frames, nil } func processTopMetricsMetric(metric *MetricAgg, buckets []*simplejson.Json, props map[string]string) (data.Frames, error) { metrics := metric.Settings.Get("metrics").MustArray() frames := data.Frames{} for _, metricField := range metrics { tags := make(map[string]string, len(props)) timeVector := make([]time.Time, 0, len(buckets)) values := make([]*float64, 0, len(buckets)) for k, v := range props { tags[k] = v } tags["field"] = metricField.(string) tags["metric"] = "top_metrics" for _, bucket := range buckets { stats := bucket.GetPath(metric.ID, "top") timeValue, err := getAsTime(bucket.Get("key")) if err != nil { return nil, err } timeVector = append(timeVector, timeValue) for _, stat := range stats.MustArray() { stat := stat.(map[string]interface{}) metrics, hasMetrics := stat["metrics"] if hasMetrics { metrics := metrics.(map[string]interface{}) metricValue, hasMetricValue := metrics[metricField.(string)] if hasMetricValue && metricValue != nil { v := metricValue.(float64) values = append(values, &v) } } } } frames = append(frames, newTimeSeriesFrame(timeVector, tags, values)) } return frames, nil } func processExtendedStatsMetric(metric *MetricAgg, buckets []*simplejson.Json, props map[string]string) (data.Frames, error) { metaKeys := make([]string, 0) meta := metric.Meta.MustMap() for k := range meta { metaKeys = append(metaKeys, k) } sort.Strings(metaKeys) frames := data.Frames{} for _, statName := range metaKeys { v := meta[statName] if enabled, ok := v.(bool); !ok || !enabled { continue } tags := make(map[string]string, len(props)) timeVector := make([]time.Time, 0, len(buckets)) values := make([]*float64, 0, len(buckets)) for k, v := range props { tags[k] = v } tags["metric"] = statName tags["field"] = metric.Field for _, bucket := range buckets { timeValue, err := getAsTime(bucket.Get("key")) if err != nil { return nil, err } var value *float64 switch statName { case "std_deviation_bounds_upper": value = castToFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "upper")) case "std_deviation_bounds_lower": value = castToFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "lower")) default: value = castToFloat(bucket.GetPath(metric.ID, statName)) } timeVector = append(timeVector, timeValue) values = append(values, value) } labels := tags frames = append(frames, newTimeSeriesFrame(timeVector, labels, values)) } return frames, nil } func processDefaultMetric(metric *MetricAgg, buckets []*simplejson.Json, props map[string]string) (data.Frames, error) { tags := make(map[string]string, len(props)) timeVector := make([]time.Time, 0, len(buckets)) values := make([]*float64, 0, len(buckets)) for k, v := range props { tags[k] = v } tags["metric"] = metric.Type tags["field"] = metric.Field tags["metricId"] = metric.ID for _, bucket := range buckets { timeValue, err := getAsTime(bucket.Get("key")) if err != nil { return nil, err } valueObj, err := bucket.Get(metric.ID).Map() if err != nil { continue } var value *float64 if _, ok := valueObj["normalized_value"]; ok { value = castToFloat(bucket.GetPath(metric.ID, "normalized_value")) } else { value = castToFloat(bucket.GetPath(metric.ID, "value")) } timeVector = append(timeVector, timeValue) values = append(values, value) } return data.Frames{newTimeSeriesFrame(timeVector, tags, values)}, nil } // nolint:gocyclo func processMetrics(esAgg *simplejson.Json, target *Query, query *backend.DataResponse, props map[string]string) error { frames := data.Frames{} esAggBuckets := esAgg.Get("buckets").MustArray() jsonBuckets := make([]*simplejson.Json, len(esAggBuckets)) for i, v := range esAggBuckets { jsonBuckets[i] = simplejson.NewFromAny(v) } for _, metric := range target.Metrics { if metric.Hide { continue } switch metric.Type { case countType: countFrames, err := processCountMetric(jsonBuckets, props) if err != nil { return fmt.Errorf("error processing count metric: %w", err) } frames = append(frames, countFrames...) case percentilesType: percentileFrames, err := processPercentilesMetric(metric, jsonBuckets, props) if err != nil { return fmt.Errorf("error processing percentiles metric: %w", err) } frames = append(frames, percentileFrames...) case topMetricsType: topMetricsFrames, err := processTopMetricsMetric(metric, jsonBuckets, props) if err != nil { return fmt.Errorf("error processing top metrics metric: %w", err) } frames = append(frames, topMetricsFrames...) case extendedStatsType: extendedStatsFrames, err := processExtendedStatsMetric(metric, jsonBuckets, props) if err != nil { return fmt.Errorf("error processing extended stats metric: %w", err) } frames = append(frames, extendedStatsFrames...) default: defaultFrames, err := processDefaultMetric(metric, jsonBuckets, props) if err != nil { return fmt.Errorf("error processing default metric: %w", err) } frames = append(frames, defaultFrames...) } } if query.Frames != nil { oldFrames := query.Frames frames = append(oldFrames, frames...) } query.Frames = frames return nil } func processAggregationDocs(esAgg *simplejson.Json, aggDef *BucketAgg, target *Query, queryResult *backend.DataResponse, props map[string]string) error { propKeys := createPropKeys(props) frames := data.Frames{} fields := createFields(queryResult.Frames, propKeys) for _, v := range esAgg.Get("buckets").MustArray() { bucket := simplejson.NewFromAny(v) var values []interface{} found := false for _, field := range fields { for _, propKey := range propKeys { if field.Name == propKey { value := props[propKey] field.Append(&value) } } if field.Name == aggDef.Field { found = true if key, err := bucket.Get("key").String(); err == nil { field.Append(&key) } else { f, err := bucket.Get("key").Float64() if err != nil { return fmt.Errorf("error appending bucket key to existing field with name %s: %w", field.Name, err) } field.Append(&f) } } } if !found { var aggDefField *data.Field if key, err := bucket.Get("key").String(); err == nil { aggDefField = extractDataField(aggDef.Field, &key) aggDefField.Append(&key) } else { f, err := bucket.Get("key").Float64() if err != nil { return fmt.Errorf("error appending bucket key to new field with name %s: %w", aggDef.Field, err) } aggDefField = extractDataField(aggDef.Field, &f) aggDefField.Append(&f) } fields = append(fields, aggDefField) } for _, metric := range target.Metrics { switch metric.Type { case countType: addMetricValueToFields(&fields, values, getMetricName(metric.Type), castToFloat(bucket.Get("doc_count"))) case extendedStatsType: addExtendedStatsToFields(&fields, bucket, metric, values) case percentilesType: addPercentilesToFields(&fields, bucket, metric, values) case topMetricsType: addTopMetricsToFields(&fields, bucket, metric, values) default: addOtherMetricsToFields(&fields, bucket, metric, values, target) } } var dataFields []*data.Field dataFields = append(dataFields, fields...) frames = data.Frames{ &data.Frame{ Fields: dataFields, }} } queryResult.Frames = frames return nil } func extractDataField(name string, v interface{}) *data.Field { var field *data.Field switch v.(type) { case *string: field = data.NewField(name, nil, []*string{}) case *float64: field = data.NewField(name, nil, []*float64{}) default: field = &data.Field{} } isFilterable := true field.Config = &data.FieldConfig{Filterable: &isFilterable} return field } func trimDatapoints(queryResult backend.DataResponse, target *Query) { var histogram *BucketAgg for _, bucketAgg := range target.BucketAggs { if bucketAgg.Type == dateHistType { histogram = bucketAgg break } } if histogram == nil { return } trimEdges, err := castToInt(histogram.Settings.Get("trimEdges")) if err != nil { return } frames := queryResult.Frames for _, frame := range frames { for _, field := range frame.Fields { if field.Len() > trimEdges*2 { // first we delete the first "trim" items for i := 0; i < trimEdges; i++ { field.Delete(0) } // then we delete the last "trim" items for i := 0; i < trimEdges; i++ { field.Delete(field.Len() - 1) } } } } } // we sort the label's pairs by the label-key, // and return the label-values func getSortedLabelValues(labels data.Labels) []string { keys := make([]string, 0, len(labels)) for key := range labels { keys = append(keys, key) } sort.Strings(keys) values := make([]string, len(keys)) for i, key := range keys { values[i] = labels[key] } return values } func nameFields(queryResult backend.DataResponse, target *Query, keepLabelsInResponse bool) { set := make(map[string]struct{}) frames := queryResult.Frames for _, v := range frames { for _, vv := range v.Fields { if metricType, exists := vv.Labels["metric"]; exists { if _, ok := set[metricType]; !ok { set[metricType] = struct{}{} } } } } metricTypeCount := len(set) for _, frame := range frames { if frame.Meta != nil && frame.Meta.Type == data.FrameTypeTimeSeriesMulti { // if it is a time-series-multi, it means it has two columns, one is "time", // another is "number" valueField := frame.Fields[1] fieldName := getFieldName(*valueField, target, metricTypeCount) // If we need to keep the labels in the response, to prevent duplication in names and to keep // backward compatibility with alerting and expressions we use DisplayNameFromDS if keepLabelsInResponse { if valueField.Config == nil { valueField.Config = &data.FieldConfig{} } valueField.Config.DisplayNameFromDS = fieldName // If we don't need to keep labels (how frontend mode worked), we use frame.Name and remove labels } else { valueField.Labels = nil frame.Name = fieldName } } } } var aliasPatternRegex = regexp.MustCompile(`\{\{([\s\S]+?)\}\}`) func getFieldName(dataField data.Field, target *Query, metricTypeCount int) string { metricType := dataField.Labels["metric"] metricName := getMetricName(metricType) delete(dataField.Labels, "metric") field := "" if v, ok := dataField.Labels["field"]; ok { field = v delete(dataField.Labels, "field") } if target.Alias != "" { frameName := target.Alias subMatches := aliasPatternRegex.FindAllStringSubmatch(target.Alias, -1) for _, subMatch := range subMatches { group := subMatch[0] if len(subMatch) > 1 { group = subMatch[1] } if strings.Index(group, "term ") == 0 { frameName = strings.Replace(frameName, subMatch[0], dataField.Labels[group[5:]], 1) } if v, ok := dataField.Labels[group]; ok { frameName = strings.Replace(frameName, subMatch[0], v, 1) } if group == "metric" { frameName = strings.Replace(frameName, subMatch[0], metricName, 1) } if group == "field" { frameName = strings.Replace(frameName, subMatch[0], field, 1) } } return frameName } // todo, if field and pipelineAgg if isPipelineAgg(metricType) { if metricType != "" && isPipelineAggWithMultipleBucketPaths(metricType) { metricID := "" if v, ok := dataField.Labels["metricId"]; ok { metricID = v } for _, metric := range target.Metrics { if metric.ID == metricID { metricName = metric.Settings.Get("script").MustString() for name, pipelineAgg := range metric.PipelineVariables { for _, m := range target.Metrics { if m.ID == pipelineAgg { metricName = strings.ReplaceAll(metricName, "params."+name, describeMetric(m.Type, m.Field)) } } } } } } else { if field != "" { found := false for _, metric := range target.Metrics { if metric.ID == field { metricName += " " + describeMetric(metric.Type, metric.Field) found = true } } if !found { metricName = "Unset" } } } } else if field != "" { metricName += " " + field } delete(dataField.Labels, "metricId") if len(dataField.Labels) == 0 { return metricName } name := "" for _, v := range getSortedLabelValues(dataField.Labels) { name += v + " " } if metricTypeCount == 1 { return strings.TrimSpace(name) } return strings.TrimSpace(name) + " " + metricName } func getMetricName(metric string) string { if text, ok := metricAggType[metric]; ok { return text } if text, ok := extendedStats[metric]; ok { return text } return metric } func castToInt(j *simplejson.Json) (int, error) { i, err := j.Int() if err == nil { return i, nil } s, err := j.String() if err != nil { return 0, err } v, err := strconv.Atoi(s) if err != nil { return 0, err } return v, nil } func castToFloat(j *simplejson.Json) *float64 { f, err := j.Float64() if err == nil { return &f } if s, err := j.String(); err == nil { if strings.ToLower(s) == "nan" { return nil } if v, err := strconv.ParseFloat(s, 64); err == nil { return &v } } return nil } func getAsTime(j *simplejson.Json) (time.Time, error) { // these are stored as numbers number, err := j.Float64() if err != nil { return time.Time{}, err } return time.UnixMilli(int64(number)).UTC(), nil } func findAgg(target *Query, aggID string) (*BucketAgg, error) { for _, v := range target.BucketAggs { if aggID == v.ID { return v, nil } } return nil, errors.New("can't found aggDef, aggID:" + aggID) } func getErrorFromElasticResponse(response *es.SearchResponse) string { var errorString string json := simplejson.NewFromAny(response.Error) reason := json.Get("reason").MustString() rootCauseReason := json.Get("root_cause").GetIndex(0).Get("reason").MustString() causedByReason := json.Get("caused_by").Get("reason").MustString() switch { case rootCauseReason != "": errorString = rootCauseReason case reason != "": errorString = reason case causedByReason != "": errorString = causedByReason default: errorString = "Unknown elasticsearch error response" } return errorString } // flatten flattens multi-level objects to single level objects. It uses dot notation to join keys. func flatten(target map[string]interface{}, maxDepth int) map[string]interface{} { // On frontend maxDepth wasn't used but as we are processing on backend // let's put a limit to avoid infinite loop. 10 was chosen arbitrary. output := make(map[string]interface{}) step(0, maxDepth, target, "", output) return output } func step(currentDepth, maxDepth int, target map[string]interface{}, prev string, output map[string]interface{}) { nextDepth := currentDepth + 1 for key, value := range target { newKey := strings.Trim(prev+"."+key, ".") v, ok := value.(map[string]interface{}) if ok && len(v) > 0 && currentDepth < maxDepth { step(nextDepth, maxDepth, v, newKey, output) } else { output[newKey] = value } } } // sortPropNames orders propNames so that timeField is first (if it exists), log message field is second // if shouldSortLogMessageField is true, and rest of propNames are ordered alphabetically func sortPropNames(propNames map[string]bool, configuredFields es.ConfiguredFields, shouldSortLogMessageField bool) []string { hasTimeField := false hasLogMessageField := false var sortedPropNames []string for k := range propNames { if configuredFields.TimeField != "" && k == configuredFields.TimeField { hasTimeField = true } else if shouldSortLogMessageField && configuredFields.LogMessageField != "" && k == configuredFields.LogMessageField { hasLogMessageField = true } else { sortedPropNames = append(sortedPropNames, k) } } sort.Strings(sortedPropNames) if hasLogMessageField { sortedPropNames = append([]string{configuredFields.LogMessageField}, sortedPropNames...) } if hasTimeField { sortedPropNames = append([]string{configuredFields.TimeField}, sortedPropNames...) } return sortedPropNames } // findTheFirstNonNilDocValueForPropName finds the first non-nil value for propName in docs. If none of the values are non-nil, it returns the value of propName in the first doc. func findTheFirstNonNilDocValueForPropName(docs []map[string]interface{}, propName string) interface{} { for _, doc := range docs { if doc[propName] != nil { return doc[propName] } } return docs[0][propName] } func createFieldOfType[T int | float64 | bool | string](docs []map[string]interface{}, propName string, size int, isFilterable bool) *data.Field { fieldVector := make([]*T, size) for i, doc := range docs { value, ok := doc[propName].(T) if !ok { continue } fieldVector[i] = &value } field := data.NewField(propName, nil, fieldVector) field.Config = &data.FieldConfig{Filterable: &isFilterable} return field } func setPreferredVisType(frame *data.Frame, visType data.VisType) { if frame.Meta == nil { frame.Meta = &data.FrameMeta{} } frame.Meta.PreferredVisualization = visType } func setLogsCustomMeta(frame *data.Frame, searchWords map[string]bool, limit int) { i := 0 searchWordsList := make([]string, len(searchWords)) for searchWord := range searchWords { searchWordsList[i] = searchWord i++ } sort.Strings(searchWordsList) if frame.Meta == nil { frame.Meta = &data.FrameMeta{} } if frame.Meta.Custom == nil { frame.Meta.Custom = map[string]interface{}{} } frame.Meta.Custom = map[string]interface{}{ "searchWords": searchWordsList, "limit": limit, } } func createFields(frames data.Frames, propKeys []string) []*data.Field { var fields []*data.Field // Otherwise use the fields from frames if frames != nil { for _, frame := range frames { fields = append(fields, frame.Fields...) } // If we have no frames, we create fields from propKeys } else { for _, propKey := range propKeys { fields = append(fields, data.NewField(propKey, nil, []*string{})) } } return fields } func getSortedKeys(data map[string]interface{}) []string { keys := make([]string, 0, len(data)) for k := range data { keys = append(keys, k) } sort.Strings(keys) return keys } func createPropKeys(props map[string]string) []string { propKeys := make([]string, 0) for k := range props { propKeys = append(propKeys, k) } sort.Strings(propKeys) return propKeys } func addMetricValueToFields(fields *[]*data.Field, values []interface{}, metricName string, value *float64) { index := -1 for i, f := range *fields { if f.Name == metricName { index = i break } } var field data.Field if index == -1 { field = *data.NewField(metricName, nil, []*float64{}) *fields = append(*fields, &field) } else { field = *(*fields)[index] } field.Append(value) } func addPercentilesToFields(fields *[]*data.Field, bucket *simplejson.Json, metric *MetricAgg, values []interface{}) { percentiles := bucket.GetPath(metric.ID, "values") for _, percentileName := range getSortedKeys(percentiles.MustMap()) { percentileValue := percentiles.Get(percentileName).MustFloat64() addMetricValueToFields(fields, values, fmt.Sprintf("p%v %v", percentileName, metric.Field), &percentileValue) } } func addExtendedStatsToFields(fields *[]*data.Field, bucket *simplejson.Json, metric *MetricAgg, values []interface{}) { metaKeys := make([]string, 0) meta := metric.Meta.MustMap() for k := range meta { metaKeys = append(metaKeys, k) } sort.Strings(metaKeys) for _, statName := range metaKeys { v := meta[statName] if enabled, ok := v.(bool); !ok || !enabled { continue } var value *float64 switch statName { case "std_deviation_bounds_upper": value = castToFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "upper")) case "std_deviation_bounds_lower": value = castToFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "lower")) default: value = castToFloat(bucket.GetPath(metric.ID, statName)) } addMetricValueToFields(fields, values, getMetricName(metric.Type), value) break } } func addTopMetricsToFields(fields *[]*data.Field, bucket *simplejson.Json, metric *MetricAgg, values []interface{}) { baseName := getMetricName(metric.Type) metrics := metric.Settings.Get("metrics").MustStringArray() for _, metricField := range metrics { // If we selected more than one metric we also add each metric name metricName := baseName if len(metrics) > 1 { metricName += " " + metricField } top := bucket.GetPath(metric.ID, "top").MustArray() metrics, hasMetrics := top[0].(map[string]interface{})["metrics"] if hasMetrics { metrics := metrics.(map[string]interface{}) metricValue, hasMetricValue := metrics[metricField] if hasMetricValue && metricValue != nil { v := metricValue.(float64) addMetricValueToFields(fields, values, metricName, &v) } } } } func addOtherMetricsToFields(fields *[]*data.Field, bucket *simplejson.Json, metric *MetricAgg, values []interface{}, target *Query) { metricName := getMetricName(metric.Type) otherMetrics := make([]*MetricAgg, 0) for _, m := range target.Metrics { // To other metrics we add metric of the same type that are not the current metric if m.ID != metric.ID && m.Type == metric.Type { otherMetrics = append(otherMetrics, m) } } if len(otherMetrics) > 0 { metricName += " " + metric.Field // We check if we have metric with the same type and same field name // If so, append metric.ID to the metric name for _, m := range otherMetrics { if m.Field == metric.Field { metricName += " " + metric.ID break } } if metric.Type == "bucket_script" { // Use the formula in the column name metricName = metric.Settings.Get("script").MustString("") } } addMetricValueToFields(fields, values, metricName, castToFloat(bucket.GetPath(metric.ID, "value"))) }