SSE: DSNode to parse only one response (#61644)

* do not iterate through responses
* log type of the response in all exit cases
This commit is contained in:
Yuri Tseretyan 2023-01-18 13:06:10 -05:00 committed by GitHub
parent 593e8ae17c
commit 5e8866ed5a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -201,8 +201,8 @@ func (s *Service) buildDSNode(dp *simple.DirectedGraph, rn *rawNode, req *Reques
// Execute runs the node and adds the results to vars. If the node requires
// other nodes they must have already been executed and their results must
// already by in vars.
func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s *Service) (mathexp.Results, error) {
logger := logger.FromContext(ctx).New("datasourceType", dn.datasource.Type)
func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s *Service) (r mathexp.Results, e error) {
logger := logger.FromContext(ctx).New("datasourceType", dn.datasource.Type, "queryRefId", dn.refID, "datasourceUid", dn.datasource.Uid, "datasourceVersion", dn.datasource.Version)
dsInstanceSettings, err := adapters.ModelToInstanceSettings(dn.datasource, s.decryptSecureJsonDataFn(ctx))
if err != nil {
return mathexp.Results{}, fmt.Errorf("%v: %w", "failed to convert datasource instance settings", err)
@ -229,28 +229,48 @@ func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s
Headers: dn.request.Headers,
}
responseType := "unknown"
defer func() {
if e != nil {
responseType = "error"
}
logger.Debug("Data source queried", "responseType", responseType)
}()
resp, err := s.dataService.QueryData(ctx, req)
if err != nil {
return mathexp.Results{}, err
}
vals := make([]mathexp.Value, 0)
for refID, qr := range resp.Responses {
if qr.Error != nil {
return mathexp.Results{}, QueryError{RefID: refID, Err: qr.Error}
response, ok := resp.Responses[dn.refID]
if !ok {
if len(resp.Responses) > 0 {
keys := make([]string, 0, len(resp.Responses))
for refID := range resp.Responses {
keys = append(keys, refID)
}
logger.Warn("Can't find response by refID. Return nodata", "responseRefIds", keys)
}
return mathexp.Results{Values: mathexp.Values{mathexp.NoData{}.New()}}, nil
}
if response.Error != nil {
return mathexp.Results{}, QueryError{RefID: dn.refID, Err: response.Error}
}
dataSource := dn.datasource.Type
if isAllFrameVectors(dataSource, qr.Frames) { // Prometheus Specific Handling
vals, err = framesToNumbers(qr.Frames)
if isAllFrameVectors(dataSource, response.Frames) { // Prometheus Specific Handling
vals, err = framesToNumbers(response.Frames)
if err != nil {
return mathexp.Results{}, fmt.Errorf("failed to read frames as numbers: %w", err)
}
responseType = "vector"
return mathexp.Results{Values: vals}, nil
}
if len(qr.Frames) == 1 {
frame := qr.Frames[0]
if len(response.Frames) == 1 {
frame := response.Frames[0]
// Handle Untyped NoData
if len(frame.Fields) == 0 {
return mathexp.Results{Values: mathexp.Values{mathexp.NoData{Frame: frame}}}, nil
@ -258,7 +278,6 @@ func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s
// Handle Numeric Table
if frame.TimeSeriesSchema().Type == data.TimeSeriesTypeNot && isNumberTable(frame) {
logger.Debug("expression datasource query (numberSet)", "query", refID)
numberSet, err := extractNumberSet(frame)
if err != nil {
return mathexp.Results{}, err
@ -266,20 +285,19 @@ func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s
for _, n := range numberSet {
vals = append(vals, n)
}
responseType = "number set"
return mathexp.Results{
Values: vals,
}, nil
}
}
for _, frame := range qr.Frames {
logger.Debug("expression datasource query (seriesSet)", "query", refID)
for _, frame := range response.Frames {
// Check for TimeSeriesTypeNot in InfluxDB queries. A data frame of this type will cause
// the WideToMany() function to error out, which results in unhealthy alerts.
// This check should be removed once inconsistencies in data source responses are solved.
if frame.TimeSeriesSchema().Type == data.TimeSeriesTypeNot && dataSource == datasources.DS_INFLUXDB {
logger.Warn("ignoring InfluxDB data frame due to missing numeric fields", "frame", frame)
logger.Warn("Ignoring InfluxDB data frame due to missing numeric fields")
continue
}
series, err := WideToMany(frame)
@ -290,9 +308,10 @@ func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s
vals = append(vals, s)
}
}
}
responseType = "series set"
return mathexp.Results{
Values: vals,
Values: vals, // TODO vals can be empty. Should we replace with no-data?
}, nil
}