GoogleCloudMonitoring: Refactor metricType input (#59369)

* GoogleCloudMonitoring: Refactor metricType input

* Remove preprocessor in favor of secondary inputs (#59384)
This commit is contained in:
Andres Martinez Gotor 2022-11-29 12:39:45 +01:00 committed by GitHub
parent 5011b259c7
commit 6127409ab3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 138 additions and 58 deletions

View File

@ -206,6 +206,46 @@ func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.Inst
} }
} }
func migrateMetricTypeFilter(metricTypeFilter string, prevFilters interface{}) []string {
metricTypeFilterArray := []string{"metric.type", "=", metricTypeFilter}
if prevFilters != nil {
filtersIface := prevFilters.([]interface{})
filters := []string{}
for _, f := range filtersIface {
filters = append(filters, f.(string))
}
metricTypeFilterArray = append([]string{"AND"}, metricTypeFilterArray...)
return append(filters, metricTypeFilterArray...)
}
return metricTypeFilterArray
}
func migratePreprocessor(tsl *timeSeriesList, preprocessor string) {
// In case a preprocessor is defined, the preprocessor becomes the primary aggregation
// and the aggregation that is specified in the UI becomes the secondary aggregation
// Rules are specified in this issue: https://github.com/grafana/grafana/issues/30866
t := toPreprocessorType(preprocessor)
if t != PreprocessorTypeNone {
// Move aggregation to secondaryAggregation
tsl.SecondaryAlignmentPeriod = tsl.AlignmentPeriod
tsl.SecondaryCrossSeriesReducer = tsl.CrossSeriesReducer
tsl.SecondaryPerSeriesAligner = tsl.PerSeriesAligner
tsl.SecondaryGroupBys = tsl.GroupBys
// Set a default cross series reducer if grouped
if len(tsl.GroupBys) == 0 {
tsl.CrossSeriesReducer = crossSeriesReducerDefault
}
// Set aligner based on preprocessor type
aligner := "ALIGN_RATE"
if t == PreprocessorTypeDelta {
aligner = "ALIGN_DELTA"
}
tsl.PerSeriesAligner = aligner
}
}
func migrateRequest(req *backend.QueryDataRequest) error { func migrateRequest(req *backend.QueryDataRequest) error {
for i, q := range req.Queries { for i, q := range req.Queries {
var rawQuery map[string]interface{} var rawQuery map[string]interface{}
@ -228,6 +268,13 @@ func migrateRequest(req *backend.QueryDataRequest) error {
if rawQuery["aliasBy"] != nil { if rawQuery["aliasBy"] != nil {
gq.AliasBy = rawQuery["aliasBy"].(string) gq.AliasBy = rawQuery["aliasBy"].(string)
} }
if rawQuery["metricType"] != nil {
// metricType should be a filter
gq.TimeSeriesList.Filters = migrateMetricTypeFilter(rawQuery["metricType"].(string), rawQuery["filters"])
}
if rawQuery["preprocessor"] != nil {
migratePreprocessor(gq.TimeSeriesList, rawQuery["preprocessor"].(string))
}
b, err := json.Marshal(gq) b, err := json.Marshal(gq)
if err != nil { if err != nil {
@ -255,7 +302,23 @@ func migrateRequest(req *backend.QueryDataRequest) error {
GraphPeriod: toString(metricQuery["graphPeriod"]), GraphPeriod: toString(metricQuery["graphPeriod"]),
} }
} else { } else {
rawQuery["timeSeriesList"] = metricQuery tslb, err := json.Marshal(metricQuery)
if err != nil {
return err
}
tsl := &timeSeriesList{}
err = json.Unmarshal(tslb, tsl)
if err != nil {
return err
}
if metricQuery["metricType"] != nil {
// metricType should be a filter
tsl.Filters = migrateMetricTypeFilter(metricQuery["metricType"].(string), metricQuery["filters"])
}
if rawQuery["preprocessor"] != nil {
migratePreprocessor(tsl, rawQuery["preprocessor"].(string))
}
rawQuery["timeSeriesList"] = tsl
} }
if metricQuery["aliasBy"] != nil { if metricQuery["aliasBy"] != nil {
rawQuery["aliasBy"] = metricQuery["aliasBy"] rawQuery["aliasBy"] = metricQuery["aliasBy"]
@ -273,18 +336,18 @@ func migrateRequest(req *backend.QueryDataRequest) error {
// SloQuery was merged into timeSeriesList // SloQuery was merged into timeSeriesList
if rawQuery["sloQuery"] != nil { if rawQuery["sloQuery"] != nil {
if rawQuery["timeSeriesList"] == nil { if rawQuery["timeSeriesList"] == nil {
rawQuery["timeSeriesList"] = map[string]interface{}{} rawQuery["timeSeriesList"] = &timeSeriesList{}
} }
tsl := rawQuery["timeSeriesList"].(map[string]interface{}) tsl := rawQuery["timeSeriesList"].(*timeSeriesList)
sloq := rawQuery["sloQuery"].(map[string]interface{}) sloq := rawQuery["sloQuery"].(map[string]interface{})
if sloq["projectName"] != nil { if sloq["projectName"] != nil {
tsl["projectName"] = sloq["projectName"] tsl.ProjectName = sloq["projectName"].(string)
} }
if sloq["alignmentPeriod"] != nil { if sloq["alignmentPeriod"] != nil {
tsl["alignmentPeriod"] = sloq["alignmentPeriod"] tsl.AlignmentPeriod = sloq["alignmentPeriod"].(string)
} }
if sloq["perSeriesAligner"] != nil { if sloq["perSeriesAligner"] != nil {
tsl["perSeriesAligner"] = sloq["perSeriesAligner"] tsl.PerSeriesAligner = sloq["perSeriesAligner"].(string)
} }
rawQuery["timeSeriesList"] = tsl rawQuery["timeSeriesList"] = tsl
b, err := json.Marshal(rawQuery) b, err := json.Marshal(rawQuery)
@ -399,7 +462,7 @@ func (s *Service) buildQueryExecutors(logger log.Logger, req *backend.QueryDataR
q.TimeSeriesList.View = "FULL" q.TimeSeriesList.View = "FULL"
} }
cmtsf.parameters = q.TimeSeriesList cmtsf.parameters = q.TimeSeriesList
params.Add("filter", buildFilterString(q.TimeSeriesList.MetricType, q.TimeSeriesList.Filters)) params.Add("filter", buildFilterString(q.TimeSeriesList.Filters))
params.Add("view", q.TimeSeriesList.View) params.Add("view", q.TimeSeriesList.View)
setMetricAggParams(&params, q.TimeSeriesList, durationSeconds, query.Interval.Milliseconds()) setMetricAggParams(&params, q.TimeSeriesList, durationSeconds, query.Interval.Milliseconds())
queryInterface = cmtsf queryInterface = cmtsf
@ -452,7 +515,7 @@ func interpolateFilterWildcards(value string) string {
return value return value
} }
func buildFilterString(metricType string, filterParts []string) string { func buildFilterString(filterParts []string) string {
filterString := "" filterString := ""
for i, part := range filterParts { for i, part := range filterParts {
mod := i % 4 mod := i % 4
@ -475,7 +538,7 @@ func buildFilterString(metricType string, filterParts []string) string {
} }
} }
return strings.Trim(fmt.Sprintf(`metric.type="%s" %s`, metricType, filterString), " ") return strings.Trim(filterString, " ")
} }
func buildSLOFilterExpression(projectName string, q *sloQuery) string { func buildSLOFilterExpression(projectName string, q *sloQuery) string {
@ -498,41 +561,30 @@ func setMetricAggParams(params *url.Values, query *timeSeriesList, durationSecon
} }
alignmentPeriod := calculateAlignmentPeriod(query.AlignmentPeriod, intervalMs, durationSeconds) alignmentPeriod := calculateAlignmentPeriod(query.AlignmentPeriod, intervalMs, durationSeconds)
params.Add("aggregation.alignmentPeriod", alignmentPeriod)
// In case a preprocessor is defined, the preprocessor becomes the primary aggregation if query.CrossSeriesReducer != "" {
// and the aggregation that is specified in the UI becomes the secondary aggregation
// Rules are specified in this issue: https://github.com/grafana/grafana/issues/30866
t := toPreprocessorType(query.Preprocessor)
if t != PreprocessorTypeNone {
params.Add("secondaryAggregation.alignmentPeriod", alignmentPeriod)
params.Add("secondaryAggregation.crossSeriesReducer", query.CrossSeriesReducer)
params.Add("secondaryAggregation.perSeriesAligner", query.PerSeriesAligner)
primaryCrossSeriesReducer := crossSeriesReducerDefault
if len(query.GroupBys) > 0 {
primaryCrossSeriesReducer = query.CrossSeriesReducer
}
params.Add("aggregation.crossSeriesReducer", primaryCrossSeriesReducer)
aligner := "ALIGN_RATE"
if t == PreprocessorTypeDelta {
aligner = "ALIGN_DELTA"
}
params.Add("aggregation.perSeriesAligner", aligner)
for _, groupBy := range query.GroupBys {
params.Add("secondaryAggregation.groupByFields", groupBy)
}
} else {
params.Add("aggregation.crossSeriesReducer", query.CrossSeriesReducer) params.Add("aggregation.crossSeriesReducer", query.CrossSeriesReducer)
}
if query.PerSeriesAligner != "" {
params.Add("aggregation.perSeriesAligner", query.PerSeriesAligner) params.Add("aggregation.perSeriesAligner", query.PerSeriesAligner)
} }
params.Add("aggregation.alignmentPeriod", alignmentPeriod)
for _, groupBy := range query.GroupBys { for _, groupBy := range query.GroupBys {
params.Add("aggregation.groupByFields", groupBy) params.Add("aggregation.groupByFields", groupBy)
} }
if query.SecondaryAlignmentPeriod != "" {
secondaryAlignmentPeriod := calculateAlignmentPeriod(query.AlignmentPeriod, intervalMs, durationSeconds)
params.Add("secondaryAggregation.alignmentPeriod", secondaryAlignmentPeriod)
}
if query.SecondaryCrossSeriesReducer != "" {
params.Add("secondaryAggregation.crossSeriesReducer", query.SecondaryCrossSeriesReducer)
}
if query.SecondaryPerSeriesAligner != "" {
params.Add("secondaryAggregation.perSeriesAligner", query.SecondaryPerSeriesAligner)
}
for _, groupBy := range query.SecondaryGroupBys {
params.Add("secondaryAggregation.groupByFields", groupBy)
}
} }
func setSloAggParams(params *url.Values, query *sloQuery, alignmentPeriod string, durationSeconds int, intervalMs int64) { func setSloAggParams(params *url.Values, query *sloQuery, alignmentPeriod string, durationSeconds int, intervalMs int64) {

View File

@ -80,6 +80,34 @@ func TestCloudMonitoring(t *testing.T) {
assert.Equal(t, "testalias", queries[0].aliasBy) assert.Equal(t, "testalias", queries[0].aliasBy)
}) })
t.Run("parses a time series list with secondary inputs", func(t *testing.T) {
req := baseTimeSeriesList()
req.Queries[0].JSON = json.RawMessage(`{
"timeSeriesList": {
"filters": ["metric.type=\"a/metric/type\""],
"view": "FULL",
"secondaryAlignmentPeriod": "60s",
"secondaryCrossSeriesReducer": "REDUCE_NONE",
"secondaryPerSeriesAligner": "ALIGN_MEAN",
"secondaryGroupBys": ["metric.label.group"]
},
"aliasBy": "testalias"
}`)
qes, err := service.buildQueryExecutors(slog, req)
require.NoError(t, err)
queries := getCloudMonitoringListFromInterface(t, qes)
require.Len(t, queries, 1)
assert.Equal(t, "A", queries[0].refID)
assert.Equal(t, "+60s", queries[0].params["secondaryAggregation.alignmentPeriod"][0])
assert.Equal(t, "REDUCE_NONE", queries[0].params["secondaryAggregation.crossSeriesReducer"][0])
assert.Equal(t, "ALIGN_MEAN", queries[0].params["secondaryAggregation.perSeriesAligner"][0])
assert.Equal(t, "metric.label.group", queries[0].params["secondaryAggregation.groupByFields"][0])
assert.Equal(t, "FULL", queries[0].params["view"][0])
assert.Equal(t, "testalias", queries[0].aliasBy)
})
t.Run("Parse migrated queries from frontend and build Google Cloud Monitoring API queries", func(t *testing.T) { t.Run("Parse migrated queries from frontend and build Google Cloud Monitoring API queries", func(t *testing.T) {
t.Run("and query has no aggregation set", func(t *testing.T) { t.Run("and query has no aggregation set", func(t *testing.T) {
req := deprecatedReq() req := deprecatedReq()
@ -132,7 +160,7 @@ func TestCloudMonitoring(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
queries := getCloudMonitoringListFromInterface(t, qes) queries := getCloudMonitoringListFromInterface(t, qes)
assert.Equal(t, 1, len(queries)) assert.Equal(t, 1, len(queries))
assert.Equal(t, `metric.type="a/metric/type" key="value" key2="value2" resource.type="another/resource/type"`, queries[0].params["filter"][0]) assert.Equal(t, `key="value" key2="value2" resource.type="another/resource/type" metric.type="a/metric/type"`, queries[0].params["filter"][0])
// assign a resource type to query parameters // assign a resource type to query parameters
// in the actual workflow this information comes from the response of the Monitoring API // in the actual workflow this information comes from the response of the Monitoring API
@ -146,7 +174,7 @@ func TestCloudMonitoring(t *testing.T) {
"end": "2018-03-15T13:34:00Z", "end": "2018-03-15T13:34:00Z",
} }
expectedTimeSeriesFilter := map[string]interface{}{ expectedTimeSeriesFilter := map[string]interface{}{
"filter": `metric.type="a/metric/type" key="value" key2="value2" resource.type="another/resource/type"`, "filter": `key="value" key2="value2" resource.type="another/resource/type" metric.type="a/metric/type"`,
} }
verifyDeepLink(t, dl, expectedTimeSelection, expectedTimeSeriesFilter) verifyDeepLink(t, dl, expectedTimeSelection, expectedTimeSeriesFilter)
}) })
@ -773,21 +801,21 @@ func TestCloudMonitoring(t *testing.T) {
t.Run("when building filter string", func(t *testing.T) { t.Run("when building filter string", func(t *testing.T) {
t.Run("and there's no regex operator", func(t *testing.T) { t.Run("and there's no regex operator", func(t *testing.T) {
t.Run("and there are wildcards in a filter value", func(t *testing.T) { t.Run("and there are wildcards in a filter value", func(t *testing.T) {
filterParts := []string{"zone", "=", "*-central1*"} filterParts := []string{"metric.type", "=", "somemetrictype", "AND", "zone", "=", "*-central1*"}
value := buildFilterString("somemetrictype", filterParts) value := buildFilterString(filterParts)
assert.Equal(t, `metric.type="somemetrictype" zone=has_substring("-central1")`, value) assert.Equal(t, `metric.type="somemetrictype" zone=has_substring("-central1")`, value)
}) })
t.Run("and there are no wildcards in any filter value", func(t *testing.T) { t.Run("and there are no wildcards in any filter value", func(t *testing.T) {
filterParts := []string{"zone", "!=", "us-central1-a"} filterParts := []string{"metric.type", "=", "somemetrictype", "AND", "zone", "!=", "us-central1-a"}
value := buildFilterString("somemetrictype", filterParts) value := buildFilterString(filterParts)
assert.Equal(t, `metric.type="somemetrictype" zone!="us-central1-a"`, value) assert.Equal(t, `metric.type="somemetrictype" zone!="us-central1-a"`, value)
}) })
}) })
t.Run("and there is a regex operator", func(t *testing.T) { t.Run("and there is a regex operator", func(t *testing.T) {
filterParts := []string{"zone", "=~", "us-central1-a~"} filterParts := []string{"metric.type", "=", "somemetrictype", "AND", "zone", "=~", "us-central1-a~"}
value := buildFilterString("somemetrictype", filterParts) value := buildFilterString(filterParts)
assert.NotContains(t, value, `=~`) assert.NotContains(t, value, `=~`)
assert.Contains(t, value, `zone=`) assert.Contains(t, value, `zone=`)
@ -1091,7 +1119,7 @@ func baseTimeSeriesList() *backend.QueryDataRequest {
QueryType: "metrics", QueryType: "metrics",
JSON: json.RawMessage(`{ JSON: json.RawMessage(`{
"timeSeriesList": { "timeSeriesList": {
"metricType": "a/metric/type", "filters": ["metric.type=\"a/metric/type\""],
"view": "FULL" "view": "FULL"
}, },
"aliasBy": "testalias" "aliasBy": "testalias"

View File

@ -33,17 +33,17 @@ type (
// These should reflect GCM APIs // These should reflect GCM APIs
// timeSeries.list https://cloud.google.com/monitoring/api/ref_v3/rest/v3/projects.timeSeries/list // timeSeries.list https://cloud.google.com/monitoring/api/ref_v3/rest/v3/projects.timeSeries/list
timeSeriesList struct { timeSeriesList struct {
ProjectName string `json:"projectName"` ProjectName string `json:"projectName"`
CrossSeriesReducer string `json:"crossSeriesReducer"` CrossSeriesReducer string `json:"crossSeriesReducer"`
AlignmentPeriod string `json:"alignmentPeriod"` AlignmentPeriod string `json:"alignmentPeriod"`
PerSeriesAligner string `json:"perSeriesAligner"` PerSeriesAligner string `json:"perSeriesAligner"`
GroupBys []string `json:"groupBys"` GroupBys []string `json:"groupBys"`
Filters []string `json:"filters"` Filters []string `json:"filters"`
View string `json:"view"` View string `json:"view"`
// Not part of the GCM API SecondaryAlignmentPeriod string `json:"secondaryAlignmentPeriod"`
// TODO: Use API fields instead SecondaryCrossSeriesReducer string `json:"secondaryCrossSeriesReducer"`
MetricType string `json:"metricType"` SecondaryPerSeriesAligner string `json:"secondaryPerSeriesAligner"`
Preprocessor string `json:"preprocessor"` SecondaryGroupBys []string `json:"secondaryGroupBys"`
} }
// TODO: sloQuery can be specified as timeSeriesList parameters // TODO: sloQuery can be specified as timeSeriesList parameters
sloQuery struct { sloQuery struct {