Cloudwatch Logs: Ignore non-time grouping fields in expressions and alerts (#67608)

This commit is contained in:
Isabella Siu 2023-05-16 08:58:04 -04:00 committed by GitHub
parent 82114cb316
commit 0612f1f87a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 115 additions and 6 deletions

View File

@ -333,7 +333,7 @@ func groupResponseFrame(frame *data.Frame, statsGroups []string) (data.Frames, e
// Check if we have time field though as it makes sense to split only for time series. // Check if we have time field though as it makes sense to split only for time series.
if hasTimeField(frame) { if hasTimeField(frame) {
if len(statsGroups) > 0 && len(frame.Fields) > 0 { if len(statsGroups) > 0 && len(frame.Fields) > 0 {
groupedFrames, err := groupResults(frame, statsGroups) groupedFrames, err := groupResults(frame, statsGroups, false)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -163,8 +163,9 @@ func changeToStringField(lengthOfValues int, rows [][]*cloudwatchlogs.ResultFiel
return fieldValuesAsStrings return fieldValuesAsStrings
} }
func groupResults(results *data.Frame, groupingFieldNames []string) ([]*data.Frame, error) { func groupResults(results *data.Frame, groupingFieldNames []string, removeNonTime bool) ([]*data.Frame, error) {
groupingFields := make([]*data.Field, 0) groupingFields := make([]*data.Field, 0)
removeFieldIndices := make([]int, 0)
for i, field := range results.Fields { for i, field := range results.Fields {
for _, groupingField := range groupingFieldNames { for _, groupingField := range groupingFieldNames {
@ -178,6 +179,10 @@ func groupResults(results *data.Frame, groupingFieldNames []string) ([]*data.Fra
results.Fields[i] = newField results.Fields[i] = newField
field = newField field = newField
} }
// For expressions and alerts to work properly we need to remove non-time grouping fields
if removeNonTime && !field.Type().Time() {
removeFieldIndices = append(removeFieldIndices, i)
}
groupingFields = append(groupingFields, field) groupingFields = append(groupingFields, field)
} }
@ -192,14 +197,19 @@ func groupResults(results *data.Frame, groupingFieldNames []string) ([]*data.Fra
groupedDataFrames := make(map[string]*data.Frame) groupedDataFrames := make(map[string]*data.Frame)
for i := 0; i < rowLength; i++ { for i := 0; i < rowLength; i++ {
groupKey := generateGroupKey(groupingFields, i) groupKey := generateGroupKey(groupingFields, i)
// if group key doesn't exist create it
if _, exists := groupedDataFrames[groupKey]; !exists { if _, exists := groupedDataFrames[groupKey]; !exists {
newFrame := results.EmptyCopy() newFrame := results.EmptyCopy()
newFrame.Name = groupKey newFrame.Name = groupKey
newFrame.Meta = results.Meta newFrame.Meta = results.Meta
// remove grouping indices
newFrame.Fields = removeFieldsByIndex(newFrame.Fields, removeFieldIndices)
groupedDataFrames[groupKey] = newFrame groupedDataFrames[groupKey] = newFrame
} }
groupedDataFrames[groupKey].AppendRow(results.RowCopy(i)...) // add row to frame
row := copyRowWithoutValues(results, i, removeFieldIndices)
groupedDataFrames[groupKey].AppendRow(row...)
} }
newDataFrames := make([]*data.Frame, 0, len(groupedDataFrames)) newDataFrames := make([]*data.Frame, 0, len(groupedDataFrames))
@ -210,6 +220,40 @@ func groupResults(results *data.Frame, groupingFieldNames []string) ([]*data.Fra
return newDataFrames, nil return newDataFrames, nil
} }
// remove fields at the listed indices
func removeFieldsByIndex(fields []*data.Field, removeIndices []int) []*data.Field {
newGroupingFields := make([]*data.Field, 0)
removeIndicesIndex := 0
for i, field := range fields {
if removeIndicesIndex < len(removeIndices) && i == removeIndices[removeIndicesIndex] {
removeIndicesIndex++
if removeIndicesIndex > len(removeIndices) {
newGroupingFields = append(newGroupingFields, fields[i+1:]...)
break
}
continue
}
newGroupingFields = append(newGroupingFields, field)
}
return newGroupingFields
}
// copy a row without the listed values
func copyRowWithoutValues(f *data.Frame, rowIdx int, removeIndices []int) []interface{} {
vals := make([]interface{}, len(f.Fields)-len(removeIndices))
valsIdx := 0
removeIndicesIndex := 0
for i := range f.Fields {
if removeIndicesIndex < len(removeIndices) && i == removeIndices[removeIndicesIndex] {
removeIndicesIndex++
continue
}
vals[valsIdx] = f.CopyAt(i, rowIdx)
valsIdx++
}
return vals
}
func generateGroupKey(fields []*data.Field, row int) string { func generateGroupKey(fields []*data.Field, row int) string {
groupKey := "" groupKey := ""
for _, field := range fields { for _, field := range fields {

View File

@ -417,7 +417,7 @@ func TestGroupingResults(t *testing.T) {
}, },
} }
groupedResults, err := groupResults(fakeDataFrame, []string{"@log"}) groupedResults, err := groupResults(fakeDataFrame, []string{"@log"}, false)
require.NoError(t, err) require.NoError(t, err)
assert.ElementsMatch(t, expectedGroupedFrames, groupedResults) assert.ElementsMatch(t, expectedGroupedFrames, groupedResults)
} }
@ -538,7 +538,72 @@ func TestGroupingResultsWithNumericField(t *testing.T) {
}, },
} }
groupedResults, err := groupResults(fakeDataFrame, []string{"httpresponse"}) groupedResults, err := groupResults(fakeDataFrame, []string{"httpresponse"}, false)
require.NoError(t, err)
assert.ElementsMatch(t, expectedGroupedFrames, groupedResults)
}
func TestGroupingResultsWithRemoveNonTimeTrue(t *testing.T) {
logField := data.NewField("@log", data.Labels{}, []*string{
aws.String("fakelog-a"),
aws.String("fakelog-b"),
aws.String("fakelog-a"),
aws.String("fakelog-b"),
})
streamField := data.NewField("stream", data.Labels{}, []*int32{
aws.Int32(1),
aws.Int32(1),
aws.Int32(1),
aws.Int32(1),
})
countField := data.NewField("count", data.Labels{}, []*string{
aws.String("100"),
aws.String("150"),
aws.String("57"),
aws.String("62"),
})
timeA := time.Time{}
timeB := time.Time{}.Add(1 * time.Minute)
fakeDataFrame := &data.Frame{
Name: "CloudWatchLogsResponse",
Fields: []*data.Field{
data.NewField("@timestamp", data.Labels{}, []*time.Time{&timeA, &timeA, &timeB, &timeB}),
logField,
streamField,
countField,
},
RefID: "",
}
expectedGroupedFrames := []*data.Frame{
{
Name: "fakelog-a1",
Fields: []*data.Field{
data.NewField("@timestamp", data.Labels{}, []*time.Time{&timeA, &timeB}),
data.NewField("count", data.Labels{}, []*string{
aws.String("100"),
aws.String("57"),
}),
},
RefID: "",
},
{
Name: "fakelog-b1",
Fields: []*data.Field{
data.NewField("@timestamp", data.Labels{}, []*time.Time{&timeA, &timeB}),
data.NewField("count", data.Labels{}, []*string{
aws.String("150"),
aws.String("62"),
}),
},
RefID: "",
},
}
groupedResults, err := groupResults(fakeDataFrame, []string{"@log", "stream"}, true)
require.NoError(t, err) require.NoError(t, err)
assert.ElementsMatch(t, expectedGroupedFrames, groupedResults) assert.ElementsMatch(t, expectedGroupedFrames, groupedResults)
} }

View File

@ -60,7 +60,7 @@ var executeSyncLogQuery = func(ctx context.Context, e *cloudWatchExecutor, req *
var frames []*data.Frame var frames []*data.Frame
if len(logsQuery.StatsGroups) > 0 && len(dataframe.Fields) > 0 { if len(logsQuery.StatsGroups) > 0 && len(dataframe.Fields) > 0 {
frames, err = groupResults(dataframe, logsQuery.StatsGroups) frames, err = groupResults(dataframe, logsQuery.StatsGroups, true)
if err != nil { if err != nil {
return nil, err return nil, err
} }