mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
SSE: Refactor DSNode and extract function to convert backend response to mathexp.Results (#70098)
This commit is contained in:
parent
21f8dd9599
commit
cc72fe17d5
@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||||
@ -229,14 +230,13 @@ func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s
|
|||||||
|
|
||||||
responseType := "unknown"
|
responseType := "unknown"
|
||||||
respStatus := "success"
|
respStatus := "success"
|
||||||
var useDataplane bool
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if e != nil {
|
if e != nil {
|
||||||
responseType = "error"
|
responseType = "error"
|
||||||
respStatus = "failure"
|
respStatus = "failure"
|
||||||
}
|
}
|
||||||
logger.Debug("Data source queried", "responseType", responseType)
|
logger.Debug("Data source queried", "responseType", responseType)
|
||||||
|
useDataplane := strings.HasPrefix(responseType, "dataplane-")
|
||||||
s.metrics.dsRequests.WithLabelValues(respStatus, fmt.Sprintf("%t", useDataplane)).Inc()
|
s.metrics.dsRequests.WithLabelValues(respStatus, fmt.Sprintf("%t", useDataplane)).Inc()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -245,8 +245,14 @@ func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s
|
|||||||
return mathexp.Results{}, err
|
return mathexp.Results{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var result mathexp.Results
|
||||||
|
responseType, result, err = queryDataResponseToResults(ctx, resp, dn.refID, dn.datasource.Type, s)
|
||||||
|
return result, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func queryDataResponseToResults(ctx context.Context, resp *backend.QueryDataResponse, refID string, datasourceType string, s *Service) (string, mathexp.Results, error) {
|
||||||
vals := make([]mathexp.Value, 0)
|
vals := make([]mathexp.Value, 0)
|
||||||
response, ok := resp.Responses[dn.refID]
|
response, ok := resp.Responses[refID]
|
||||||
if !ok {
|
if !ok {
|
||||||
if len(resp.Responses) > 0 {
|
if len(resp.Responses) > 0 {
|
||||||
keys := make([]string, 0, len(resp.Responses))
|
keys := make([]string, 0, len(resp.Responses))
|
||||||
@ -255,48 +261,46 @@ func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s
|
|||||||
}
|
}
|
||||||
logger.Warn("Can't find response by refID. Return nodata", "responseRefIds", keys)
|
logger.Warn("Can't find response by refID. Return nodata", "responseRefIds", keys)
|
||||||
}
|
}
|
||||||
return mathexp.Results{Values: mathexp.Values{mathexp.NoData{}.New()}}, nil
|
return "no-data", mathexp.Results{Values: mathexp.Values{mathexp.NoData{}.New()}}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if response.Error != nil {
|
if response.Error != nil {
|
||||||
return mathexp.Results{}, QueryError{RefID: dn.refID, Err: response.Error}
|
return "", mathexp.Results{}, QueryError{RefID: refID, Err: response.Error}
|
||||||
}
|
}
|
||||||
|
|
||||||
var dt data.FrameType
|
var dt data.FrameType
|
||||||
dt, useDataplane, _ = shouldUseDataplane(response.Frames, logger, s.features.IsEnabled(featuremgmt.FlagDisableSSEDataplane))
|
dt, useDataplane, _ := shouldUseDataplane(response.Frames, logger, s.features.IsEnabled(featuremgmt.FlagDisableSSEDataplane))
|
||||||
if useDataplane {
|
if useDataplane {
|
||||||
logger.Debug("Handling SSE data source query through dataplane", "datatype", dt)
|
logger.Debug("Handling SSE data source query through dataplane", "datatype", dt)
|
||||||
return handleDataplaneFrames(ctx, s.tracer, dt, response.Frames)
|
result, err := handleDataplaneFrames(ctx, s.tracer, dt, response.Frames)
|
||||||
|
return fmt.Sprintf("dataplane-%s", dt), result, err
|
||||||
}
|
}
|
||||||
|
|
||||||
dataSource := dn.datasource.Type
|
if isAllFrameVectors(datasourceType, response.Frames) { // Prometheus Specific Handling
|
||||||
if isAllFrameVectors(dataSource, response.Frames) { // Prometheus Specific Handling
|
vals, err := framesToNumbers(response.Frames)
|
||||||
vals, err = framesToNumbers(response.Frames)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return mathexp.Results{}, fmt.Errorf("failed to read frames as numbers: %w", err)
|
return "", mathexp.Results{}, fmt.Errorf("failed to read frames as numbers: %w", err)
|
||||||
}
|
}
|
||||||
responseType = "vector"
|
return "vector", mathexp.Results{Values: vals}, nil
|
||||||
return mathexp.Results{Values: vals}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(response.Frames) == 1 {
|
if len(response.Frames) == 1 {
|
||||||
frame := response.Frames[0]
|
frame := response.Frames[0]
|
||||||
// Handle Untyped NoData
|
// Handle Untyped NoData
|
||||||
if len(frame.Fields) == 0 {
|
if len(frame.Fields) == 0 {
|
||||||
return mathexp.Results{Values: mathexp.Values{mathexp.NoData{Frame: frame}}}, nil
|
return "no-data", mathexp.Results{Values: mathexp.Values{mathexp.NoData{Frame: frame}}}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle Numeric Table
|
// Handle Numeric Table
|
||||||
if frame.TimeSeriesSchema().Type == data.TimeSeriesTypeNot && isNumberTable(frame) {
|
if frame.TimeSeriesSchema().Type == data.TimeSeriesTypeNot && isNumberTable(frame) {
|
||||||
numberSet, err := extractNumberSet(frame)
|
numberSet, err := extractNumberSet(frame)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return mathexp.Results{}, err
|
return "", mathexp.Results{}, err
|
||||||
}
|
}
|
||||||
for _, n := range numberSet {
|
for _, n := range numberSet {
|
||||||
vals = append(vals, n)
|
vals = append(vals, n)
|
||||||
}
|
}
|
||||||
responseType = "number set"
|
return "number set", mathexp.Results{
|
||||||
return mathexp.Results{
|
|
||||||
Values: vals,
|
Values: vals,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
@ -306,21 +310,21 @@ func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s
|
|||||||
// Check for TimeSeriesTypeNot in InfluxDB queries. A data frame of this type will cause
|
// Check for TimeSeriesTypeNot in InfluxDB queries. A data frame of this type will cause
|
||||||
// the WideToMany() function to error out, which results in unhealthy alerts.
|
// the WideToMany() function to error out, which results in unhealthy alerts.
|
||||||
// This check should be removed once inconsistencies in data source responses are solved.
|
// This check should be removed once inconsistencies in data source responses are solved.
|
||||||
if frame.TimeSeriesSchema().Type == data.TimeSeriesTypeNot && dataSource == datasources.DS_INFLUXDB {
|
if frame.TimeSeriesSchema().Type == data.TimeSeriesTypeNot && datasourceType == datasources.DS_INFLUXDB {
|
||||||
logger.Warn("Ignoring InfluxDB data frame due to missing numeric fields")
|
logger.Warn("Ignoring InfluxDB data frame due to missing numeric fields")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
var series []mathexp.Series
|
||||||
series, err := WideToMany(frame)
|
series, err := WideToMany(frame)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return mathexp.Results{}, err
|
return "", mathexp.Results{}, err
|
||||||
}
|
}
|
||||||
for _, s := range series {
|
for _, ser := range series {
|
||||||
vals = append(vals, s)
|
vals = append(vals, ser)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
responseType = "series set"
|
return "series set", mathexp.Results{
|
||||||
return mathexp.Results{
|
|
||||||
Values: vals, // TODO vals can be empty. Should we replace with no-data?
|
Values: vals, // TODO vals can be empty. Should we replace with no-data?
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user