From 2e64ea652e7c23f6cd78e063fde919923ddda01f Mon Sep 17 00:00:00 2001 From: Andres Martinez Gotor Date: Wed, 30 Nov 2022 17:23:05 +0100 Subject: [PATCH] GoogleCloudMonitoring: Refactor SLO queries (#59421) * GoogleCloudMonitoring: Refactor metricType input * Remove preprocessor in favor of secondary inputs * GoogleCloudMonitoring: Refactor SLO queries * GoogleCloudMonitoring: Refactor util functions (#59482) * GoogleCloudMonitoring: Remove unnecessary util functions * GoogleCloudMonitoring: Refactor run function (#59505) --- pkg/tsdb/cloudmonitoring/cloudmonitoring.go | 199 ++-------------- .../cloudmonitoring/cloudmonitoring_test.go | 43 ++-- pkg/tsdb/cloudmonitoring/slo_query.go | 75 ++++++ pkg/tsdb/cloudmonitoring/slo_query_test.go | 75 ++++++ .../cloudmonitoring/time_series_filter.go | 216 +++++++++--------- .../time_series_filter_test.go | 85 ++----- pkg/tsdb/cloudmonitoring/time_series_query.go | 90 ++------ pkg/tsdb/cloudmonitoring/types.go | 30 ++- pkg/tsdb/cloudmonitoring/utils.go | 145 ++++++++++-- 9 files changed, 487 insertions(+), 471 deletions(-) create mode 100644 pkg/tsdb/cloudmonitoring/slo_query.go create mode 100644 pkg/tsdb/cloudmonitoring/slo_query_test.go diff --git a/pkg/tsdb/cloudmonitoring/cloudmonitoring.go b/pkg/tsdb/cloudmonitoring/cloudmonitoring.go index b40524a3636..c48edd5d52e 100644 --- a/pkg/tsdb/cloudmonitoring/cloudmonitoring.go +++ b/pkg/tsdb/cloudmonitoring/cloudmonitoring.go @@ -7,14 +7,13 @@ import ( "io" "math" "net/http" - "net/url" - "path" "regexp" "strconv" "strings" "time" "github.com/grafana/grafana-google-sdk-go/pkg/utils" + "github.com/huandu/xstrings" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend/datasource" @@ -25,7 +24,6 @@ import ( "github.com/grafana/grafana/pkg/infra/httpclient" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/tracing" - "github.com/grafana/grafana/pkg/setting" ) var ( @@ -33,11 +31,9 @@ var ( ) var ( - matchAllCap = regexp.MustCompile("(.)([A-Z][a-z]*)") legendKeyFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`) metricNameFormat = regexp.MustCompile(`([\w\d_]+)\.(googleapis\.com|io)/(.+)`) wildcardRegexRe = regexp.MustCompile(`[-\/^$+?.()|[\]{}]`) - alignmentPeriodRe = regexp.MustCompile("[0-9]+") cloudMonitoringUnitMappings = map[string]string{ "bit": "bits", "By": "bytes", @@ -333,33 +329,6 @@ func migrateRequest(req *backend.QueryDataRequest) error { q.JSON = b } - // SloQuery was merged into timeSeriesList - if rawQuery["sloQuery"] != nil { - if rawQuery["timeSeriesList"] == nil { - rawQuery["timeSeriesList"] = &timeSeriesList{} - } - tsl := rawQuery["timeSeriesList"].(*timeSeriesList) - sloq := rawQuery["sloQuery"].(map[string]interface{}) - if sloq["projectName"] != nil { - tsl.ProjectName = sloq["projectName"].(string) - } - if sloq["alignmentPeriod"] != nil { - tsl.AlignmentPeriod = sloq["alignmentPeriod"].(string) - } - if sloq["perSeriesAligner"] != nil { - tsl.PerSeriesAligner = sloq["perSeriesAligner"].(string) - } - rawQuery["timeSeriesList"] = tsl - b, err := json.Marshal(rawQuery) - if err != nil { - return err - } - if q.QueryType == "" { - q.QueryType = sloQueryType - } - q.JSON = b - } - req.Queries[i] = q } @@ -437,16 +406,7 @@ func (s *Service) buildQueryExecutors(logger log.Logger, req *backend.QueryDataR return nil, fmt.Errorf("could not unmarshal CloudMonitoringQuery json: %w", err) } - params := url.Values{} - params.Add("interval.startTime", startTime.UTC().Format(time.RFC3339)) - params.Add("interval.endTime", endTime.UTC().Format(time.RFC3339)) - var queryInterface cloudMonitoringQueryExecutor - cmtsf := &cloudMonitoringTimeSeriesList{ - refID: query.RefID, - logger: logger, - aliasBy: q.AliasBy, - } switch query.QueryType { case metricQueryType, annotationQueryType: if q.TimeSeriesQuery != nil { @@ -458,33 +418,33 @@ func (s *Service) buildQueryExecutors(logger log.Logger, req *backend.QueryDataR timeRange: req.Queries[0].TimeRange, } } else if q.TimeSeriesList != nil { + cmtsf := &cloudMonitoringTimeSeriesList{ + refID: query.RefID, + logger: logger, + aliasBy: q.AliasBy, + } if q.TimeSeriesList.View == "" { q.TimeSeriesList.View = "FULL" } cmtsf.parameters = q.TimeSeriesList - params.Add("filter", buildFilterString(q.TimeSeriesList.Filters)) - params.Add("view", q.TimeSeriesList.View) - setMetricAggParams(¶ms, q.TimeSeriesList, durationSeconds, query.Interval.Milliseconds()) + cmtsf.setParams(startTime, endTime, durationSeconds, query.Interval.Milliseconds()) queryInterface = cmtsf } else { return nil, fmt.Errorf("missing query info") } case sloQueryType: - cmtsf.sloQ = q.SloQuery - cmtsf.parameters = q.TimeSeriesList - params.Add("filter", buildSLOFilterExpression(q.TimeSeriesList.ProjectName, q.SloQuery)) - setSloAggParams(¶ms, q.SloQuery, q.TimeSeriesList.AlignmentPeriod, durationSeconds, query.Interval.Milliseconds()) - queryInterface = cmtsf + cmslo := &cloudMonitoringSLO{ + refID: query.RefID, + logger: logger, + aliasBy: q.AliasBy, + parameters: q.SloQuery, + } + cmslo.setParams(startTime, endTime, durationSeconds, query.Interval.Milliseconds()) + queryInterface = cmslo default: return nil, fmt.Errorf("unrecognized query type %q", query.QueryType) } - cmtsf.params = params - - if setting.Env == setting.Dev { - logger.Debug("CloudMonitoring request", "params", params) - } - cloudMonitoringQueryExecutors = append(cloudMonitoringQueryExecutors, queryInterface) } @@ -501,7 +461,7 @@ func interpolateFilterWildcards(value string) string { value = strings.Replace(value, "*", "", 1) value = fmt.Sprintf(`ends_with("%s")`, value) case matches == 1 && strings.HasSuffix(value, "*"): - value = reverse(strings.Replace(reverse(value), "*", "", 1)) + value = xstrings.Reverse(strings.Replace(xstrings.Reverse(value), "*", "", 1)) value = fmt.Sprintf(`starts_with("%s")`, value) case matches != 0: value = string(wildcardRegexRe.ReplaceAllFunc([]byte(value), func(in []byte) []byte { @@ -515,87 +475,6 @@ func interpolateFilterWildcards(value string) string { return value } -func buildFilterString(filterParts []string) string { - filterString := "" - for i, part := range filterParts { - mod := i % 4 - switch { - case part == "AND": - filterString += " " - case mod == 2: - operator := filterParts[i-1] - switch { - case operator == "=~" || operator == "!=~": - filterString = reverse(strings.Replace(reverse(filterString), "~", "", 1)) - filterString += fmt.Sprintf(`monitoring.regex.full_match("%s")`, part) - case strings.Contains(part, "*"): - filterString += interpolateFilterWildcards(part) - default: - filterString += fmt.Sprintf(`"%s"`, part) - } - default: - filterString += part - } - } - - return strings.Trim(filterString, " ") -} - -func buildSLOFilterExpression(projectName string, q *sloQuery) string { - sloName := fmt.Sprintf("projects/%s/services/%s/serviceLevelObjectives/%s", projectName, q.ServiceId, q.SloId) - - if q.SelectorName == "select_slo_burn_rate" { - return fmt.Sprintf(`%s("%s", "%s")`, q.SelectorName, sloName, q.LookbackPeriod) - } else { - return fmt.Sprintf(`%s("%s")`, q.SelectorName, sloName) - } -} - -func setMetricAggParams(params *url.Values, query *timeSeriesList, durationSeconds int, intervalMs int64) { - if query.CrossSeriesReducer == "" { - query.CrossSeriesReducer = crossSeriesReducerDefault - } - - if query.PerSeriesAligner == "" { - query.PerSeriesAligner = perSeriesAlignerDefault - } - - alignmentPeriod := calculateAlignmentPeriod(query.AlignmentPeriod, intervalMs, durationSeconds) - params.Add("aggregation.alignmentPeriod", alignmentPeriod) - if query.CrossSeriesReducer != "" { - params.Add("aggregation.crossSeriesReducer", query.CrossSeriesReducer) - } - if query.PerSeriesAligner != "" { - params.Add("aggregation.perSeriesAligner", query.PerSeriesAligner) - } - for _, groupBy := range query.GroupBys { - params.Add("aggregation.groupByFields", groupBy) - } - - if query.SecondaryAlignmentPeriod != "" { - secondaryAlignmentPeriod := calculateAlignmentPeriod(query.AlignmentPeriod, intervalMs, durationSeconds) - params.Add("secondaryAggregation.alignmentPeriod", secondaryAlignmentPeriod) - } - if query.SecondaryCrossSeriesReducer != "" { - params.Add("secondaryAggregation.crossSeriesReducer", query.SecondaryCrossSeriesReducer) - } - if query.SecondaryPerSeriesAligner != "" { - params.Add("secondaryAggregation.perSeriesAligner", query.SecondaryPerSeriesAligner) - } - for _, groupBy := range query.SecondaryGroupBys { - params.Add("secondaryAggregation.groupByFields", groupBy) - } -} - -func setSloAggParams(params *url.Values, query *sloQuery, alignmentPeriod string, durationSeconds int, intervalMs int64) { - params.Add("aggregation.alignmentPeriod", calculateAlignmentPeriod(alignmentPeriod, intervalMs, durationSeconds)) - if query.SelectorName == "select_slo_health" { - params.Add("aggregation.perSeriesAligner", "ALIGN_MEAN") - } else { - params.Add("aggregation.perSeriesAligner", "ALIGN_NEXT_OLDER") - } -} - func calculateAlignmentPeriod(alignmentPeriod string, intervalMs int64, durationSeconds int) string { if alignmentPeriod == "grafana-auto" || alignmentPeriod == "" { alignmentPeriodValue := int(math.Max(float64(intervalMs)/1000, 60.0)) @@ -618,12 +497,12 @@ func calculateAlignmentPeriod(alignmentPeriod string, intervalMs int64, duration } func formatLegendKeys(metricType string, defaultMetricName string, labels map[string]string, - additionalLabels map[string]string, query *cloudMonitoringTimeSeriesList) string { - if query.aliasBy == "" { + additionalLabels map[string]string, query cloudMonitoringQueryExecutor) string { + if query.getAliasBy() == "" { return defaultMetricName } - result := legendKeyFormat.ReplaceAllFunc([]byte(query.aliasBy), func(in []byte) []byte { + result := legendKeyFormat.ReplaceAllFunc([]byte(query.getAliasBy()), func(in []byte) []byte { metaPartName := strings.Replace(string(in), "{{", "", 1) metaPartName = strings.Replace(metaPartName, "}}", "", 1) metaPartName = strings.TrimSpace(metaPartName) @@ -646,20 +525,8 @@ func formatLegendKeys(metricType string, defaultMetricName string, labels map[st return []byte(val) } - if metaPartName == "project" && query.parameters.ProjectName != "" { - return []byte(query.parameters.ProjectName) - } - - if metaPartName == "service" && query.sloQ.ServiceId != "" { - return []byte(query.sloQ.ServiceId) - } - - if metaPartName == "slo" && query.sloQ.SloId != "" { - return []byte(query.sloQ.SloId) - } - - if metaPartName == "selector" && query.sloQ.SelectorName != "" { - return []byte(query.sloQ.SelectorName) + if query.getParameter(metaPartName) != "" { + return []byte(query.getParameter(metaPartName)) } return in @@ -709,27 +576,11 @@ func calcBucketBound(bucketOptions cloudMonitoringBucketOptions, n int) string { return bucketBound } -func (s *Service) createRequest(logger log.Logger, dsInfo *datasourceInfo, proxyPass string, body io.Reader) (*http.Request, error) { - u, err := url.Parse(dsInfo.url) - if err != nil { - return nil, err +func (s *Service) ensureProject(ctx context.Context, dsInfo datasourceInfo, projectName string) (string, error) { + if projectName != "" { + return projectName, nil } - u.Path = path.Join(u.Path, "render") - - method := http.MethodGet - if body != nil { - method = http.MethodPost - } - req, err := http.NewRequest(method, dsInfo.services[cloudMonitor].url, body) - if err != nil { - logger.Error("Failed to create request", "error", err) - return nil, fmt.Errorf("failed to create request: %w", err) - } - - req.Header.Set("Content-Type", "application/json") - req.URL.Path = proxyPass - - return req, nil + return s.getDefaultProject(ctx, dsInfo) } func (s *Service) getDefaultProject(ctx context.Context, dsInfo datasourceInfo) (string, error) { diff --git a/pkg/tsdb/cloudmonitoring/cloudmonitoring_test.go b/pkg/tsdb/cloudmonitoring/cloudmonitoring_test.go index df0f6109438..272a64b007c 100644 --- a/pkg/tsdb/cloudmonitoring/cloudmonitoring_test.go +++ b/pkg/tsdb/cloudmonitoring/cloudmonitoring_test.go @@ -676,7 +676,7 @@ func TestCloudMonitoring(t *testing.T) { qes, err := service.buildQueryExecutors(slog, req) require.NoError(t, err) - queries := getCloudMonitoringListFromInterface(t, qes) + queries := getCloudMonitoringSLOFromInterface(t, qes) assert.Equal(t, 1, len(queries)) assert.Equal(t, "A", queries[0].refID) @@ -706,7 +706,7 @@ func TestCloudMonitoring(t *testing.T) { qes, err = service.buildQueryExecutors(slog, req) require.NoError(t, err) - qqueries := getCloudMonitoringListFromInterface(t, qes) + qqueries := getCloudMonitoringSLOFromInterface(t, qes) assert.Equal(t, "ALIGN_NEXT_OLDER", qqueries[0].params["aggregation.perSeriesAligner"][0]) dl := qqueries[0].buildDeepLink() @@ -731,7 +731,7 @@ func TestCloudMonitoring(t *testing.T) { qes, err = service.buildQueryExecutors(slog, req) require.NoError(t, err) - qqqueries := getCloudMonitoringListFromInterface(t, qes) + qqqueries := getCloudMonitoringSLOFromInterface(t, qes) assert.Equal(t, `aggregation.alignmentPeriod=%2B60s&aggregation.perSeriesAligner=ALIGN_NEXT_OLDER&filter=select_slo_burn_rate%28%22projects%2Ftest-proj%2Fservices%2Ftest-service%2FserviceLevelObjectives%2Ftest-slo%22%2C+%221h%22%29&interval.endTime=2018-03-15T13%3A34%3A00Z&interval.startTime=2018-03-15T13%3A00%3A00Z`, qqqueries[0].params.Encode()) }) }) @@ -798,31 +798,6 @@ func TestCloudMonitoring(t *testing.T) { }) }) - t.Run("when building filter string", func(t *testing.T) { - t.Run("and there's no regex operator", func(t *testing.T) { - t.Run("and there are wildcards in a filter value", func(t *testing.T) { - filterParts := []string{"metric.type", "=", "somemetrictype", "AND", "zone", "=", "*-central1*"} - value := buildFilterString(filterParts) - assert.Equal(t, `metric.type="somemetrictype" zone=has_substring("-central1")`, value) - }) - - t.Run("and there are no wildcards in any filter value", func(t *testing.T) { - filterParts := []string{"metric.type", "=", "somemetrictype", "AND", "zone", "!=", "us-central1-a"} - value := buildFilterString(filterParts) - assert.Equal(t, `metric.type="somemetrictype" zone!="us-central1-a"`, value) - }) - }) - - t.Run("and there is a regex operator", func(t *testing.T) { - filterParts := []string{"metric.type", "=", "somemetrictype", "AND", "zone", "=~", "us-central1-a~"} - value := buildFilterString(filterParts) - assert.NotContains(t, value, `=~`) - assert.Contains(t, value, `zone=`) - - assert.Contains(t, value, `zone=monitoring.regex.full_match("us-central1-a~")`) - }) - }) - t.Run("and query preprocessor is not defined", func(t *testing.T) { req := deprecatedReq() req.Queries[0].JSON = json.RawMessage(`{ @@ -1011,6 +986,18 @@ func getCloudMonitoringListFromInterface(t *testing.T, qes []cloudMonitoringQuer return queries } +func getCloudMonitoringSLOFromInterface(t *testing.T, qes []cloudMonitoringQueryExecutor) []*cloudMonitoringSLO { + t.Helper() + + queries := make([]*cloudMonitoringSLO, 0) + for _, qi := range qes { + q, ok := qi.(*cloudMonitoringSLO) + require.Truef(t, ok, "Received wrong type %T", qi) + queries = append(queries, q) + } + return queries +} + func getCloudMonitoringQueryFromInterface(t *testing.T, qes []cloudMonitoringQueryExecutor) []*cloudMonitoringTimeSeriesQuery { t.Helper() diff --git a/pkg/tsdb/cloudmonitoring/slo_query.go b/pkg/tsdb/cloudmonitoring/slo_query.go new file mode 100644 index 00000000000..a989449e6a1 --- /dev/null +++ b/pkg/tsdb/cloudmonitoring/slo_query.go @@ -0,0 +1,75 @@ +package cloudmonitoring + +import ( + "context" + "fmt" + "net/url" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + + "github.com/grafana/grafana/pkg/infra/tracing" +) + +func (sloQ *cloudMonitoringSLO) run(ctx context.Context, req *backend.QueryDataRequest, + s *Service, dsInfo datasourceInfo, tracer tracing.Tracer) (*backend.DataResponse, cloudMonitoringResponse, string, error) { + return runTimeSeriesRequest(ctx, sloQ.logger, req, s, dsInfo, tracer, sloQ.parameters.ProjectName, sloQ.params, nil) +} + +func (sloQ *cloudMonitoringSLO) parseResponse(queryRes *backend.DataResponse, + response cloudMonitoringResponse, executedQueryString string) error { + return parseTimeSeriesResponse(queryRes, response, executedQueryString, sloQ, sloQ.params, []string{}) +} + +func (sloQ *cloudMonitoringSLO) buildDeepLink() string { + return "" +} + +func (sloQ *cloudMonitoringSLO) getRefID() string { + return sloQ.refID +} + +func (sloQ *cloudMonitoringSLO) getAliasBy() string { + return sloQ.aliasBy +} + +func (sloQ *cloudMonitoringSLO) getParameter(i string) string { + switch i { + case "project": + return sloQ.parameters.ProjectName + case "service": + return sloQ.parameters.ServiceId + case "slo": + return sloQ.parameters.SloId + case "selector": + return sloQ.parameters.SelectorName + default: + return "" + } +} + +func (sloQ *cloudMonitoringSLO) getFilter() string { + sloName := fmt.Sprintf("projects/%s/services/%s/serviceLevelObjectives/%s", sloQ.parameters.ProjectName, sloQ.parameters.ServiceId, sloQ.parameters.SloId) + + if sloQ.parameters.SelectorName == "select_slo_burn_rate" { + return fmt.Sprintf(`%s("%s", "%s")`, sloQ.parameters.SelectorName, sloName, sloQ.parameters.LookbackPeriod) + } else { + return fmt.Sprintf(`%s("%s")`, sloQ.parameters.SelectorName, sloName) + } +} + +func (sloQ *cloudMonitoringSLO) setParams(startTime time.Time, endTime time.Time, durationSeconds int, intervalMs int64) { + params := url.Values{} + + params.Add("interval.startTime", startTime.UTC().Format(time.RFC3339)) + params.Add("interval.endTime", endTime.UTC().Format(time.RFC3339)) + + params.Add("filter", sloQ.getFilter()) + params.Add("aggregation.alignmentPeriod", calculateAlignmentPeriod(sloQ.parameters.AlignmentPeriod, intervalMs, durationSeconds)) + if sloQ.parameters.SelectorName == "select_slo_health" { + params.Add("aggregation.perSeriesAligner", "ALIGN_MEAN") + } else { + params.Add("aggregation.perSeriesAligner", "ALIGN_NEXT_OLDER") + } + sloQ.params = params +} diff --git a/pkg/tsdb/cloudmonitoring/slo_query_test.go b/pkg/tsdb/cloudmonitoring/slo_query_test.go new file mode 100644 index 00000000000..907b0075b29 --- /dev/null +++ b/pkg/tsdb/cloudmonitoring/slo_query_test.go @@ -0,0 +1,75 @@ +package cloudmonitoring + +import ( + "net/url" + "testing" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func SLOQuery(t *testing.T) { + t.Run("when data from query returns slo and alias by is defined", func(t *testing.T) { + data, err := loadTestFile("./test-data/6-series-response-slo.json") + require.NoError(t, err) + assert.Equal(t, 1, len(data.TimeSeries)) + + t.Run("and alias by is expanded", func(t *testing.T) { + res := &backend.DataResponse{} + query := &cloudMonitoringSLO{ + params: url.Values{}, + parameters: &sloQuery{ + ProjectName: "test-proj", + SelectorName: "select_slo_compliance", + ServiceId: "test-service", + SloId: "test-slo", + }, + aliasBy: "{{project}} - {{service}} - {{slo}} - {{selector}}", + } + err = query.parseResponse(res, data, "") + require.NoError(t, err) + frames := res.Frames + require.NoError(t, err) + assert.Equal(t, "test-proj - test-service - test-slo - select_slo_compliance", frames[0].Fields[1].Name) + }) + }) + + t.Run("when data from query returns slo and alias by is not defined", func(t *testing.T) { + data, err := loadTestFile("./test-data/6-series-response-slo.json") + require.NoError(t, err) + assert.Equal(t, 1, len(data.TimeSeries)) + + t.Run("and alias by is expanded", func(t *testing.T) { + res := &backend.DataResponse{} + query := &cloudMonitoringSLO{ + params: url.Values{}, + parameters: &sloQuery{ + ProjectName: "test-proj", + SelectorName: "select_slo_compliance", + ServiceId: "test-service", + SloId: "test-slo", + }, + } + err = query.parseResponse(res, data, "") + require.NoError(t, err) + frames := res.Frames + require.NoError(t, err) + assert.Equal(t, "select_slo_compliance(\"projects/test-proj/services/test-service/serviceLevelObjectives/test-slo\")", frames[0].Fields[1].Name) + }) + }) + + t.Run("when data comes from a slo query, it should skip the link", func(t *testing.T) { + data, err := loadTestFile("./test-data/3-series-response-distribution-exponential.json") + require.NoError(t, err) + assert.Equal(t, 1, len(data.TimeSeries)) + + res := &backend.DataResponse{} + query := &cloudMonitoringSLO{params: url.Values{}, parameters: &sloQuery{SloId: "yes"}} + err = query.parseResponse(res, data, "") + require.NoError(t, err) + frames := res.Frames + assert.Equal(t, len(frames[0].Fields[1].Config.Links), 0) + }) +} diff --git a/pkg/tsdb/cloudmonitoring/time_series_filter.go b/pkg/tsdb/cloudmonitoring/time_series_filter.go index b2878e3230e..2e1ac9813e4 100644 --- a/pkg/tsdb/cloudmonitoring/time_series_filter.go +++ b/pkg/tsdb/cloudmonitoring/time_series_filter.go @@ -4,104 +4,25 @@ import ( "context" "encoding/json" "fmt" - "net/http" "net/url" - "path" "strconv" "strings" "time" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" - "go.opentelemetry.io/otel/attribute" + "github.com/huandu/xstrings" "github.com/grafana/grafana/pkg/infra/tracing" ) -func (timeSeriesFilter *cloudMonitoringTimeSeriesList) doRequestFilterPage(ctx context.Context, r *http.Request, dsInfo datasourceInfo) (cloudMonitoringResponse, error) { - r.URL.RawQuery = timeSeriesFilter.params.Encode() - r = r.WithContext(ctx) - res, err := dsInfo.services[cloudMonitor].client.Do(r) - if err != nil { - return cloudMonitoringResponse{}, err - } - - dnext, err := unmarshalResponse(timeSeriesFilter.logger, res) - if err != nil { - return cloudMonitoringResponse{}, err - } - - return dnext, nil -} - func (timeSeriesFilter *cloudMonitoringTimeSeriesList) run(ctx context.Context, req *backend.QueryDataRequest, s *Service, dsInfo datasourceInfo, tracer tracing.Tracer) (*backend.DataResponse, cloudMonitoringResponse, string, error) { - dr := &backend.DataResponse{} - projectName := timeSeriesFilter.parameters.ProjectName - if projectName == "" { - var err error - projectName, err = s.getDefaultProject(ctx, dsInfo) - if err != nil { - dr.Error = err - return dr, cloudMonitoringResponse{}, "", nil - } - timeSeriesFilter.logger.Info("No project name set on query, using project name from datasource", "projectName", projectName) - } - r, err := s.createRequest(timeSeriesFilter.logger, &dsInfo, path.Join("/v3/projects", projectName, "timeSeries"), nil) - if err != nil { - dr.Error = err - return dr, cloudMonitoringResponse{}, "", nil - } - alignmentPeriod, ok := r.URL.Query()["aggregation.alignmentPeriod"] - if ok { - seconds, err := strconv.ParseInt(alignmentPeriodRe.FindString(alignmentPeriod[0]), 10, 64) - if err == nil { - if len(dr.Frames) == 0 { - dr.Frames = append(dr.Frames, data.NewFrame("")) - } - firstFrame := dr.Frames[0] - if firstFrame.Meta == nil { - firstFrame.SetMeta(&data.FrameMeta{ - Custom: map[string]interface{}{ - "alignmentPeriod": seconds, - }, - }) - } - } - } - - ctx, span := tracer.Start(ctx, "cloudMonitoring query") - span.SetAttributes("target", timeSeriesFilter.params.Encode(), attribute.Key("target").String(timeSeriesFilter.params.Encode())) - span.SetAttributes("from", req.Queries[0].TimeRange.From, attribute.Key("from").String(req.Queries[0].TimeRange.From.String())) - span.SetAttributes("until", req.Queries[0].TimeRange.To, attribute.Key("until").String(req.Queries[0].TimeRange.To.String())) - span.SetAttributes("datasource_id", dsInfo.id, attribute.Key("datasource_id").Int64(dsInfo.id)) - span.SetAttributes("org_id", req.PluginContext.OrgID, attribute.Key("org_id").Int64(req.PluginContext.OrgID)) - defer span.End() - tracer.Inject(ctx, r.Header, span) - - d, err := timeSeriesFilter.doRequestFilterPage(ctx, r, dsInfo) - if err != nil { - dr.Error = err - return dr, cloudMonitoringResponse{}, "", nil - } - nextPageToken := d.NextPageToken - for nextPageToken != "" { - timeSeriesFilter.params["pageToken"] = []string{d.NextPageToken} - nextPage, err := timeSeriesFilter.doRequestFilterPage(ctx, r, dsInfo) - if err != nil { - dr.Error = err - return dr, cloudMonitoringResponse{}, "", nil - } - d.TimeSeries = append(d.TimeSeries, nextPage.TimeSeries...) - nextPageToken = nextPage.NextPageToken - } - - return dr, d, r.URL.RawQuery, nil + return runTimeSeriesRequest(ctx, timeSeriesFilter.logger, req, s, dsInfo, tracer, timeSeriesFilter.parameters.ProjectName, timeSeriesFilter.params, nil) } -//nolint:gocyclo -func (timeSeriesFilter *cloudMonitoringTimeSeriesList) parseResponse(queryRes *backend.DataResponse, - response cloudMonitoringResponse, executedQueryString string) error { +func parseTimeSeriesResponse(queryRes *backend.DataResponse, + response cloudMonitoringResponse, executedQueryString string, query cloudMonitoringQueryExecutor, params url.Values, groupBys []string) error { frames := data.Frames{} for _, series := range response.TimeSeries { @@ -110,9 +31,13 @@ func (timeSeriesFilter *cloudMonitoringTimeSeriesList) parseResponse(queryRes *b labels := make(map[string]string) labels["resource.type"] = series.Resource.Type seriesLabels["resource.type"] = series.Resource.Type + groupBysMap := make(map[string]bool) + for _, groupBy := range groupBys { + groupBysMap[groupBy] = true + } frame := data.NewFrameOfFieldTypes("", len(series.Points), data.FieldTypeTime, data.FieldTypeFloat64) - frame.RefID = timeSeriesFilter.refID + frame.RefID = query.getRefID() frame.Meta = &data.FrameMeta{ ExecutedQueryString: executedQueryString, } @@ -121,7 +46,7 @@ func (timeSeriesFilter *cloudMonitoringTimeSeriesList) parseResponse(queryRes *b labels["metric.label."+key] = value seriesLabels["metric.label."+key] = value - if len(timeSeriesFilter.parameters.GroupBys) == 0 || containsLabel(timeSeriesFilter.parameters.GroupBys, "metric.label."+key) { + if len(groupBys) == 0 || groupBysMap["metric.label."+key] { defaultMetricName += " " + value } } @@ -130,14 +55,14 @@ func (timeSeriesFilter *cloudMonitoringTimeSeriesList) parseResponse(queryRes *b labels["resource.label."+key] = value seriesLabels["resource.label."+key] = value - if containsLabel(timeSeriesFilter.parameters.GroupBys, "resource.label."+key) { + if groupBysMap["resource.label."+key] { defaultMetricName += " " + value } } for labelType, labelTypeValues := range series.MetaData { for labelKey, labelValue := range labelTypeValues { - key := toSnakeCase(fmt.Sprintf("metadata.%s.%s", labelType, labelKey)) + key := xstrings.ToSnakeCase(fmt.Sprintf("metadata.%s.%s", labelType, labelKey)) switch v := labelValue.(type) { case string: @@ -161,10 +86,10 @@ func (timeSeriesFilter *cloudMonitoringTimeSeriesList) parseResponse(queryRes *b } customFrameMeta := map[string]interface{}{} - customFrameMeta["alignmentPeriod"] = timeSeriesFilter.params.Get("aggregation.alignmentPeriod") - customFrameMeta["perSeriesAligner"] = timeSeriesFilter.params.Get("aggregation.perSeriesAligner") + customFrameMeta["alignmentPeriod"] = params.Get("aggregation.alignmentPeriod") + customFrameMeta["perSeriesAligner"] = params.Get("aggregation.perSeriesAligner") customFrameMeta["labels"] = labels - customFrameMeta["groupBys"] = timeSeriesFilter.parameters.GroupBys + customFrameMeta["groupBys"] = groupBys if frame.Meta != nil { frame.Meta.Custom = customFrameMeta } else { @@ -173,7 +98,7 @@ func (timeSeriesFilter *cloudMonitoringTimeSeriesList) parseResponse(queryRes *b // reverse the order to be ascending if series.ValueType != "DISTRIBUTION" { - timeSeriesFilter.handleNonDistributionSeries(series, defaultMetricName, seriesLabels, frame) + handleNonDistributionSeries(series, defaultMetricName, seriesLabels, frame, query) frames = append(frames, frame) continue } @@ -197,7 +122,7 @@ func (timeSeriesFilter *cloudMonitoringTimeSeriesList) parseResponse(queryRes *b timeField := data.NewField(data.TimeSeriesTimeFieldName, nil, []time.Time{}) valueField := data.NewField(data.TimeSeriesValueFieldName, nil, []float64{}) - frameName := formatLegendKeys(series.Metric.Type, defaultMetricName, nil, additionalLabels, timeSeriesFilter) + frameName := formatLegendKeys(series.Metric.Type, defaultMetricName, nil, additionalLabels, query) valueField.Name = frameName valueField.Labels = seriesLabels setDisplayNameAsFieldName(valueField) @@ -208,7 +133,7 @@ func (timeSeriesFilter *cloudMonitoringTimeSeriesList) parseResponse(queryRes *b timeField, valueField, }, - RefID: timeSeriesFilter.refID, + RefID: query.getRefID(), Meta: &data.FrameMeta{ ExecutedQueryString: executedQueryString, }, @@ -226,8 +151,8 @@ func (timeSeriesFilter *cloudMonitoringTimeSeriesList) parseResponse(queryRes *b } } if len(response.TimeSeries) > 0 { - dl := timeSeriesFilter.buildDeepLink() - frames = addConfigData(frames, dl, response.Unit, timeSeriesFilter.params.Get("aggregation.alignmentPeriod")) + dl := query.buildDeepLink() + frames = addConfigData(frames, dl, response.Unit, params.Get("aggregation.alignmentPeriod")) } queryRes.Frames = frames @@ -235,8 +160,13 @@ func (timeSeriesFilter *cloudMonitoringTimeSeriesList) parseResponse(queryRes *b return nil } -func (timeSeriesFilter *cloudMonitoringTimeSeriesList) handleNonDistributionSeries(series timeSeries, - defaultMetricName string, seriesLabels map[string]string, frame *data.Frame) { +func (timeSeriesFilter *cloudMonitoringTimeSeriesList) parseResponse(queryRes *backend.DataResponse, + response cloudMonitoringResponse, executedQueryString string) error { + return parseTimeSeriesResponse(queryRes, response, executedQueryString, timeSeriesFilter, timeSeriesFilter.params, timeSeriesFilter.parameters.GroupBys) +} + +func handleNonDistributionSeries(series timeSeries, + defaultMetricName string, seriesLabels map[string]string, frame *data.Frame, query cloudMonitoringQueryExecutor) { for i := 0; i < len(series.Points); i++ { point := series.Points[i] value := point.Value.DoubleValue @@ -258,7 +188,7 @@ func (timeSeriesFilter *cloudMonitoringTimeSeriesList) handleNonDistributionSeri frame.SetRow(len(series.Points)-1-i, point.Interval.EndTime, value) } - metricName := formatLegendKeys(series.Metric.Type, defaultMetricName, seriesLabels, nil, timeSeriesFilter) + metricName := formatLegendKeys(series.Metric.Type, defaultMetricName, seriesLabels, nil, query) dataField := frame.Fields[1] dataField.Name = metricName dataField.Labels = seriesLabels @@ -266,10 +196,6 @@ func (timeSeriesFilter *cloudMonitoringTimeSeriesList) handleNonDistributionSeri } func (timeSeriesFilter *cloudMonitoringTimeSeriesList) buildDeepLink() string { - if timeSeriesFilter.sloQ != nil && timeSeriesFilter.sloQ.SloId != "" { - return "" - } - filter := timeSeriesFilter.params.Get("filter") if !strings.Contains(filter, "resource.type=") { resourceType := timeSeriesFilter.params.Get("resourceType") @@ -352,3 +278,89 @@ func setDisplayNameAsFieldName(f *data.Field) { func (timeSeriesFilter *cloudMonitoringTimeSeriesList) getRefID() string { return timeSeriesFilter.refID } + +func (timeSeriesFilter *cloudMonitoringTimeSeriesList) getAliasBy() string { + return timeSeriesFilter.aliasBy +} + +func (timeSeriesFilter *cloudMonitoringTimeSeriesList) getParameter(i string) string { + switch i { + case "project": + return timeSeriesFilter.parameters.ProjectName + default: + return "" + } +} + +func (timeSeriesFilter *cloudMonitoringTimeSeriesList) getFilter() string { + filterString := "" + for i, part := range timeSeriesFilter.parameters.Filters { + mod := i % 4 + switch { + case part == "AND": + filterString += " " + case mod == 2: + operator := timeSeriesFilter.parameters.Filters[i-1] + switch { + case operator == "=~" || operator == "!=~": + filterString = xstrings.Reverse(strings.Replace(xstrings.Reverse(filterString), "~", "", 1)) + filterString += fmt.Sprintf(`monitoring.regex.full_match("%s")`, part) + case strings.Contains(part, "*"): + filterString += interpolateFilterWildcards(part) + default: + filterString += fmt.Sprintf(`"%s"`, part) + } + default: + filterString += part + } + } + + return strings.Trim(filterString, " ") +} + +func (timeSeriesFilter *cloudMonitoringTimeSeriesList) setParams(startTime time.Time, endTime time.Time, durationSeconds int, intervalMs int64) { + params := url.Values{} + query := timeSeriesFilter.parameters + + params.Add("interval.startTime", startTime.UTC().Format(time.RFC3339)) + params.Add("interval.endTime", endTime.UTC().Format(time.RFC3339)) + + params.Add("filter", timeSeriesFilter.getFilter()) + params.Add("view", query.View) + + if query.CrossSeriesReducer == "" { + query.CrossSeriesReducer = crossSeriesReducerDefault + } + + if query.PerSeriesAligner == "" { + query.PerSeriesAligner = perSeriesAlignerDefault + } + + alignmentPeriod := calculateAlignmentPeriod(query.AlignmentPeriod, intervalMs, durationSeconds) + params.Add("aggregation.alignmentPeriod", alignmentPeriod) + if query.CrossSeriesReducer != "" { + params.Add("aggregation.crossSeriesReducer", query.CrossSeriesReducer) + } + if query.PerSeriesAligner != "" { + params.Add("aggregation.perSeriesAligner", query.PerSeriesAligner) + } + for _, groupBy := range query.GroupBys { + params.Add("aggregation.groupByFields", groupBy) + } + + if query.SecondaryAlignmentPeriod != "" { + secondaryAlignmentPeriod := calculateAlignmentPeriod(query.AlignmentPeriod, intervalMs, durationSeconds) + params.Add("secondaryAggregation.alignmentPeriod", secondaryAlignmentPeriod) + } + if query.SecondaryCrossSeriesReducer != "" { + params.Add("secondaryAggregation.crossSeriesReducer", query.SecondaryCrossSeriesReducer) + } + if query.SecondaryPerSeriesAligner != "" { + params.Add("secondaryAggregation.perSeriesAligner", query.SecondaryPerSeriesAligner) + } + for _, groupBy := range query.SecondaryGroupBys { + params.Add("secondaryAggregation.groupByFields", groupBy) + } + + timeSeriesFilter.params = params +} diff --git a/pkg/tsdb/cloudmonitoring/time_series_filter_test.go b/pkg/tsdb/cloudmonitoring/time_series_filter_test.go index e73ceae0c70..ef379a7d970 100644 --- a/pkg/tsdb/cloudmonitoring/time_series_filter_test.go +++ b/pkg/tsdb/cloudmonitoring/time_series_filter_test.go @@ -304,59 +304,6 @@ func TestTimeSeriesFilter(t *testing.T) { }) }) - t.Run("when data from query returns slo and alias by is defined", func(t *testing.T) { - data, err := loadTestFile("./test-data/6-series-response-slo.json") - require.NoError(t, err) - assert.Equal(t, 1, len(data.TimeSeries)) - - t.Run("and alias by is expanded", func(t *testing.T) { - res := &backend.DataResponse{} - query := &cloudMonitoringTimeSeriesList{ - params: url.Values{}, - parameters: &timeSeriesList{ - ProjectName: "test-proj", - }, - aliasBy: "{{project}} - {{service}} - {{slo}} - {{selector}}", - sloQ: &sloQuery{ - SelectorName: "select_slo_compliance", - ServiceId: "test-service", - SloId: "test-slo", - }, - } - err = query.parseResponse(res, data, "") - require.NoError(t, err) - frames := res.Frames - require.NoError(t, err) - assert.Equal(t, "test-proj - test-service - test-slo - select_slo_compliance", frames[0].Fields[1].Name) - }) - }) - - t.Run("when data from query returns slo and alias by is not defined", func(t *testing.T) { - data, err := loadTestFile("./test-data/6-series-response-slo.json") - require.NoError(t, err) - assert.Equal(t, 1, len(data.TimeSeries)) - - t.Run("and alias by is expanded", func(t *testing.T) { - res := &backend.DataResponse{} - query := &cloudMonitoringTimeSeriesList{ - params: url.Values{}, - parameters: &timeSeriesList{ - ProjectName: "test-proj", - }, - sloQ: &sloQuery{ - SelectorName: "select_slo_compliance", - ServiceId: "test-service", - SloId: "test-slo", - }, - } - err = query.parseResponse(res, data, "") - require.NoError(t, err) - frames := res.Frames - require.NoError(t, err) - assert.Equal(t, "select_slo_compliance(\"projects/test-proj/services/test-service/serviceLevelObjectives/test-slo\")", frames[0].Fields[1].Name) - }) - }) - t.Run("Parse cloud monitoring unit", func(t *testing.T) { t.Run("when mapping is found a unit should be specified on the field config", func(t *testing.T) { data, err := loadTestFile("./test-data/1-series-response-agg-one-metric.json") @@ -515,17 +462,29 @@ func TestTimeSeriesFilter(t *testing.T) { }) }) - t.Run("when data comes from a slo query, it should skip the link", func(t *testing.T) { - data, err := loadTestFile("./test-data/3-series-response-distribution-exponential.json") - require.NoError(t, err) - assert.Equal(t, 1, len(data.TimeSeries)) + t.Run("when building filter string", func(t *testing.T) { + t.Run("and there's no regex operator", func(t *testing.T) { + t.Run("and there are wildcards in a filter value", func(t *testing.T) { + tsl := &cloudMonitoringTimeSeriesList{parameters: &timeSeriesList{Filters: []string{"metric.type", "=", "somemetrictype", "AND", "zone", "=", "*-central1*"}}} + value := tsl.getFilter() + assert.Equal(t, `metric.type="somemetrictype" zone=has_substring("-central1")`, value) + }) - res := &backend.DataResponse{} - query := &cloudMonitoringTimeSeriesList{params: url.Values{}, sloQ: &sloQuery{SloId: "yes"}, parameters: &timeSeriesList{}} - err = query.parseResponse(res, data, "") - require.NoError(t, err) - frames := res.Frames - assert.Equal(t, len(frames[0].Fields[1].Config.Links), 0) + t.Run("and there are no wildcards in any filter value", func(t *testing.T) { + tsl := &cloudMonitoringTimeSeriesList{parameters: &timeSeriesList{Filters: []string{"metric.type", "=", "somemetrictype", "AND", "zone", "!=", "us-central1-a"}}} + value := tsl.getFilter() + assert.Equal(t, `metric.type="somemetrictype" zone!="us-central1-a"`, value) + }) + }) + + t.Run("and there is a regex operator", func(t *testing.T) { + tsl := &cloudMonitoringTimeSeriesList{parameters: &timeSeriesList{Filters: []string{"metric.type", "=", "somemetrictype", "AND", "zone", "=~", "us-central1-a~"}}} + value := tsl.getFilter() + assert.NotContains(t, value, `=~`) + assert.Contains(t, value, `zone=`) + + assert.Contains(t, value, `zone=monitoring.regex.full_match("us-central1-a~")`) + }) }) } diff --git a/pkg/tsdb/cloudmonitoring/time_series_query.go b/pkg/tsdb/cloudmonitoring/time_series_query.go index c3181d45149..3f9b7b7fb5c 100644 --- a/pkg/tsdb/cloudmonitoring/time_series_query.go +++ b/pkg/tsdb/cloudmonitoring/time_series_query.go @@ -1,23 +1,18 @@ package cloudmonitoring import ( - "bytes" "context" "encoding/json" "fmt" - "io" - "net/http" "net/url" - "path" "strconv" "strings" "time" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" - "go.opentelemetry.io/otel/attribute" + "github.com/huandu/xstrings" - "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/tsdb/intervalv2" ) @@ -36,83 +31,17 @@ func (timeSeriesQuery *cloudMonitoringTimeSeriesQuery) appendGraphPeriod(req *ba return "" } -func doRequestQueryPage(log log.Logger, requestBody map[string]interface{}, r *http.Request, dsInfo datasourceInfo) (cloudMonitoringResponse, error) { - buf, err := json.Marshal(requestBody) - if err != nil { - return cloudMonitoringResponse{}, err - } - r.Body = io.NopCloser(bytes.NewBuffer(buf)) - res, err := dsInfo.services[cloudMonitor].client.Do(r) - if err != nil { - return cloudMonitoringResponse{}, err - } - - dnext, err := unmarshalResponse(log, res) - if err != nil { - return cloudMonitoringResponse{}, err - } - return dnext, nil -} - func (timeSeriesQuery *cloudMonitoringTimeSeriesQuery) run(ctx context.Context, req *backend.QueryDataRequest, s *Service, dsInfo datasourceInfo, tracer tracing.Tracer) (*backend.DataResponse, cloudMonitoringResponse, string, error) { - dr := &backend.DataResponse{} - projectName := timeSeriesQuery.parameters.ProjectName - - if projectName == "" { - var err error - projectName, err = s.getDefaultProject(ctx, dsInfo) - if err != nil { - dr.Error = err - return dr, cloudMonitoringResponse{}, "", nil - } - timeSeriesQuery.logger.Info("No project name set on query, using project name from datasource", "projectName", projectName) - } - timeSeriesQuery.parameters.Query += timeSeriesQuery.appendGraphPeriod(req) from := req.Queries[0].TimeRange.From to := req.Queries[0].TimeRange.To timeFormat := "2006/01/02-15:04:05" timeSeriesQuery.parameters.Query += fmt.Sprintf(" | within d'%s', d'%s'", from.UTC().Format(timeFormat), to.UTC().Format(timeFormat)) - p := path.Join("/v3/projects", projectName, "timeSeries:query") - - ctx, span := tracer.Start(ctx, "cloudMonitoring MQL query") - span.SetAttributes("query", timeSeriesQuery.parameters.Query, attribute.Key("query").String(timeSeriesQuery.parameters.Query)) - span.SetAttributes("from", req.Queries[0].TimeRange.From, attribute.Key("from").String(req.Queries[0].TimeRange.From.String())) - span.SetAttributes("until", req.Queries[0].TimeRange.To, attribute.Key("until").String(req.Queries[0].TimeRange.To.String())) - defer span.End() - requestBody := map[string]interface{}{ "query": timeSeriesQuery.parameters.Query, } - r, err := s.createRequest(timeSeriesQuery.logger, &dsInfo, p, bytes.NewBuffer([]byte{})) - if err != nil { - dr.Error = err - return dr, cloudMonitoringResponse{}, "", nil - } - tracer.Inject(ctx, r.Header, span) - r = r.WithContext(ctx) - - d, err := doRequestQueryPage(timeSeriesQuery.logger, requestBody, r, dsInfo) - if err != nil { - dr.Error = err - return dr, cloudMonitoringResponse{}, "", nil - } - for d.NextPageToken != "" { - requestBody := map[string]interface{}{ - "query": timeSeriesQuery.parameters.Query, - "pageToken": d.NextPageToken, - } - nextPage, err := doRequestQueryPage(timeSeriesQuery.logger, requestBody, r, dsInfo) - if err != nil { - dr.Error = err - return dr, cloudMonitoringResponse{}, "", nil - } - d.TimeSeriesData = append(d.TimeSeriesData, nextPage.TimeSeriesData...) - d.NextPageToken = nextPage.NextPageToken - } - - return dr, d, timeSeriesQuery.parameters.Query, nil + return runTimeSeriesRequest(ctx, timeSeriesQuery.logger, req, s, dsInfo, tracer, timeSeriesQuery.parameters.ProjectName, nil, requestBody) } func (timeSeriesQuery *cloudMonitoringTimeSeriesQuery) parseResponse(queryRes *backend.DataResponse, @@ -129,7 +58,7 @@ func (timeSeriesQuery *cloudMonitoringTimeSeriesQuery) parseResponse(queryRes *b labels := make(map[string]string) for n, d := range response.TimeSeriesDescriptor.LabelDescriptors { - key := toSnakeCase(d.Key) + key := xstrings.ToSnakeCase(d.Key) key = strings.Replace(key, ".", ".label.", 1) labelValue := series.LabelValues[n] @@ -356,3 +285,16 @@ func (timeSeriesQuery *cloudMonitoringTimeSeriesQuery) buildDeepLink() string { func (timeSeriesQuery *cloudMonitoringTimeSeriesQuery) getRefID() string { return timeSeriesQuery.refID } + +func (timeSeriesQuery *cloudMonitoringTimeSeriesQuery) getAliasBy() string { + return timeSeriesQuery.aliasBy +} + +func (timeSeriesQuery *cloudMonitoringTimeSeriesQuery) getParameter(i string) string { + switch i { + case "project": + return timeSeriesQuery.parameters.ProjectName + default: + return "" + } +} diff --git a/pkg/tsdb/cloudmonitoring/types.go b/pkg/tsdb/cloudmonitoring/types.go index 925e064dba9..cebbc385e39 100644 --- a/pkg/tsdb/cloudmonitoring/types.go +++ b/pkg/tsdb/cloudmonitoring/types.go @@ -18,6 +18,8 @@ type ( parseResponse(dr *backend.DataResponse, data cloudMonitoringResponse, executedQueryString string) error buildDeepLink() string getRefID() string + getAliasBy() string + getParameter(i string) string } // Plugin API query data request used to generate @@ -26,8 +28,7 @@ type ( AliasBy string `json:"aliasBy"` TimeSeriesList *timeSeriesList `json:"timeSeriesList,omitempty"` TimeSeriesQuery *timeSeriesQuery `json:"timeSeriesQuery,omitempty"` - // TODO: Merge SloQuery into TimeSeriesList - SloQuery *sloQuery `json:"sloQuery,omitempty"` + SloQuery *sloQuery `json:"sloQuery,omitempty"` } // These should reflect GCM APIs @@ -45,12 +46,15 @@ type ( SecondaryPerSeriesAligner string `json:"secondaryPerSeriesAligner"` SecondaryGroupBys []string `json:"secondaryGroupBys"` } - // TODO: sloQuery can be specified as timeSeriesList parameters + + // sloQuery is an internal convention but the API is the same as timeSeriesList sloQuery struct { - SelectorName string `json:"selectorName"` - ServiceId string `json:"serviceId"` - SloId string `json:"sloId"` - LookbackPeriod string `json:"lookbackPeriod"` + ProjectName string `json:"projectName"` + SelectorName string `json:"selectorName"` + ServiceId string `json:"serviceId"` + SloId string `json:"sloId"` + AlignmentPeriod string `json:"alignmentPeriod"` + LookbackPeriod string `json:"lookbackPeriod"` } // timeSeries.query https://cloud.google.com/monitoring/api/ref_v3/rest/v3/projects.timeSeries/query @@ -68,11 +72,19 @@ type ( aliasBy string logger log.Logger parameters *timeSeriesList - // TODO: Merge SloQuery into TimeSeriesList - sloQ *sloQuery // Processed properties params url.Values } + // cloudMonitoringSLO is used to build time series with a filter but for the SLO case + cloudMonitoringSLO struct { + refID string + aliasBy string + logger log.Logger + parameters *sloQuery + // Processed properties + params url.Values + } + // cloudMonitoringTimeSeriesQuery is used to build MQL queries cloudMonitoringTimeSeriesQuery struct { refID string diff --git a/pkg/tsdb/cloudmonitoring/utils.go b/pkg/tsdb/cloudmonitoring/utils.go index 67eb54df84e..695b17a7b49 100644 --- a/pkg/tsdb/cloudmonitoring/utils.go +++ b/pkg/tsdb/cloudmonitoring/utils.go @@ -1,33 +1,24 @@ package cloudmonitoring import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "path" "strings" + "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/tsdb/intervalv2" + "go.opentelemetry.io/otel/attribute" ) -func reverse(s string) string { - chars := []rune(s) - for i, j := 0, len(chars)-1; i < j; i, j = i+1, j-1 { - chars[i], chars[j] = chars[j], chars[i] - } - return string(chars) -} - -func toSnakeCase(str string) string { - return strings.ToLower(matchAllCap.ReplaceAllString(str, "${1}_${2}")) -} - -func containsLabel(labels []string, newLabel string) bool { - for _, val := range labels { - if val == newLabel { - return true - } - } - return false -} - func addInterval(period string, field *data.Field) error { period = strings.TrimPrefix(period, "+") p, err := intervalv2.ParseIntervalStringToTimeDuration(period) @@ -52,3 +43,115 @@ func toString(v interface{}) string { } return v.(string) } + +func createRequest(ctx context.Context, logger log.Logger, dsInfo *datasourceInfo, proxyPass string, body io.Reader) (*http.Request, error) { + u, err := url.Parse(dsInfo.url) + if err != nil { + return nil, err + } + u.Path = path.Join(u.Path, "render") + + method := http.MethodGet + if body != nil { + method = http.MethodPost + } + req, err := http.NewRequestWithContext(ctx, method, dsInfo.services[cloudMonitor].url, body) + if err != nil { + logger.Error("Failed to create request", "error", err) + return nil, fmt.Errorf("failed to create request: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + req.URL.Path = proxyPass + + return req, nil +} + +func doRequestPage(ctx context.Context, logger log.Logger, r *http.Request, dsInfo datasourceInfo, params url.Values, body map[string]interface{}) (cloudMonitoringResponse, error) { + if params != nil { + r.URL.RawQuery = params.Encode() + } + if body != nil { + buf, err := json.Marshal(body) + if err != nil { + return cloudMonitoringResponse{}, err + } + r.Body = io.NopCloser(bytes.NewBuffer(buf)) + r.Method = http.MethodPost + } + res, err := dsInfo.services[cloudMonitor].client.Do(r) + if err != nil { + return cloudMonitoringResponse{}, err + } + + dnext, err := unmarshalResponse(logger, res) + if err != nil { + return cloudMonitoringResponse{}, err + } + + return dnext, nil +} + +func doRequestWithPagination(ctx context.Context, logger log.Logger, r *http.Request, dsInfo datasourceInfo, params url.Values, body map[string]interface{}) (cloudMonitoringResponse, error) { + d, err := doRequestPage(ctx, logger, r, dsInfo, params, body) + if err != nil { + return cloudMonitoringResponse{}, err + } + for d.NextPageToken != "" { + if params != nil { + params["pageToken"] = []string{d.NextPageToken} + } + if body != nil { + body["pageToken"] = d.NextPageToken + } + nextPage, err := doRequestPage(ctx, logger, r, dsInfo, params, body) + if err != nil { + return cloudMonitoringResponse{}, err + } + d.TimeSeries = append(d.TimeSeries, nextPage.TimeSeries...) + d.TimeSeriesData = append(d.TimeSeriesData, nextPage.TimeSeriesData...) + d.NextPageToken = nextPage.NextPageToken + } + return d, nil +} + +func traceReq(ctx context.Context, tracer tracing.Tracer, req *backend.QueryDataRequest, dsInfo datasourceInfo, r *http.Request, target string) tracing.Span { + ctx, span := tracer.Start(ctx, "cloudMonitoring query") + span.SetAttributes("target", target, attribute.Key("target").String(target)) + span.SetAttributes("from", req.Queries[0].TimeRange.From, attribute.Key("from").String(req.Queries[0].TimeRange.From.String())) + span.SetAttributes("until", req.Queries[0].TimeRange.To, attribute.Key("until").String(req.Queries[0].TimeRange.To.String())) + span.SetAttributes("datasource_id", dsInfo.id, attribute.Key("datasource_id").Int64(dsInfo.id)) + span.SetAttributes("org_id", req.PluginContext.OrgID, attribute.Key("org_id").Int64(req.PluginContext.OrgID)) + tracer.Inject(ctx, r.Header, span) + return span +} + +func runTimeSeriesRequest(ctx context.Context, logger log.Logger, req *backend.QueryDataRequest, + s *Service, dsInfo datasourceInfo, tracer tracing.Tracer, projectName string, params url.Values, body map[string]interface{}) (*backend.DataResponse, cloudMonitoringResponse, string, error) { + dr := &backend.DataResponse{} + projectName, err := s.ensureProject(ctx, dsInfo, projectName) + if err != nil { + dr.Error = err + return dr, cloudMonitoringResponse{}, "", nil + } + timeSeriesMethod := "timeSeries" + if body != nil { + timeSeriesMethod += ":query" + } + r, err := createRequest(ctx, logger, &dsInfo, path.Join("/v3/projects", projectName, timeSeriesMethod), nil) + if err != nil { + dr.Error = err + return dr, cloudMonitoringResponse{}, "", nil + } + + span := traceReq(ctx, tracer, req, dsInfo, r, params.Encode()) + defer span.End() + + d, err := doRequestWithPagination(ctx, logger, r, dsInfo, params, body) + if err != nil { + dr.Error = err + return dr, cloudMonitoringResponse{}, "", nil + } + + return dr, d, r.URL.RawQuery, nil +}