GoogleCloudMonitoring: Refactor SLO queries (#59421)

* GoogleCloudMonitoring: Refactor metricType input

* Remove preprocessor in favor of secondary inputs

* GoogleCloudMonitoring: Refactor SLO queries

* GoogleCloudMonitoring: Refactor util functions (#59482)

* GoogleCloudMonitoring: Remove unnecessary util functions

* GoogleCloudMonitoring: Refactor run function (#59505)
This commit is contained in:
Andres Martinez Gotor 2022-11-30 17:23:05 +01:00 committed by GitHub
parent 4ec08c4317
commit 2e64ea652e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 487 additions and 471 deletions

View File

@ -7,14 +7,13 @@ import (
"io"
"math"
"net/http"
"net/url"
"path"
"regexp"
"strconv"
"strings"
"time"
"github.com/grafana/grafana-google-sdk-go/pkg/utils"
"github.com/huandu/xstrings"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
@ -25,7 +24,6 @@ import (
"github.com/grafana/grafana/pkg/infra/httpclient"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/setting"
)
var (
@ -33,11 +31,9 @@ var (
)
var (
matchAllCap = regexp.MustCompile("(.)([A-Z][a-z]*)")
legendKeyFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`)
metricNameFormat = regexp.MustCompile(`([\w\d_]+)\.(googleapis\.com|io)/(.+)`)
wildcardRegexRe = regexp.MustCompile(`[-\/^$+?.()|[\]{}]`)
alignmentPeriodRe = regexp.MustCompile("[0-9]+")
cloudMonitoringUnitMappings = map[string]string{
"bit": "bits",
"By": "bytes",
@ -333,33 +329,6 @@ func migrateRequest(req *backend.QueryDataRequest) error {
q.JSON = b
}
// SloQuery was merged into timeSeriesList
if rawQuery["sloQuery"] != nil {
if rawQuery["timeSeriesList"] == nil {
rawQuery["timeSeriesList"] = &timeSeriesList{}
}
tsl := rawQuery["timeSeriesList"].(*timeSeriesList)
sloq := rawQuery["sloQuery"].(map[string]interface{})
if sloq["projectName"] != nil {
tsl.ProjectName = sloq["projectName"].(string)
}
if sloq["alignmentPeriod"] != nil {
tsl.AlignmentPeriod = sloq["alignmentPeriod"].(string)
}
if sloq["perSeriesAligner"] != nil {
tsl.PerSeriesAligner = sloq["perSeriesAligner"].(string)
}
rawQuery["timeSeriesList"] = tsl
b, err := json.Marshal(rawQuery)
if err != nil {
return err
}
if q.QueryType == "" {
q.QueryType = sloQueryType
}
q.JSON = b
}
req.Queries[i] = q
}
@ -437,16 +406,7 @@ func (s *Service) buildQueryExecutors(logger log.Logger, req *backend.QueryDataR
return nil, fmt.Errorf("could not unmarshal CloudMonitoringQuery json: %w", err)
}
params := url.Values{}
params.Add("interval.startTime", startTime.UTC().Format(time.RFC3339))
params.Add("interval.endTime", endTime.UTC().Format(time.RFC3339))
var queryInterface cloudMonitoringQueryExecutor
cmtsf := &cloudMonitoringTimeSeriesList{
refID: query.RefID,
logger: logger,
aliasBy: q.AliasBy,
}
switch query.QueryType {
case metricQueryType, annotationQueryType:
if q.TimeSeriesQuery != nil {
@ -458,33 +418,33 @@ func (s *Service) buildQueryExecutors(logger log.Logger, req *backend.QueryDataR
timeRange: req.Queries[0].TimeRange,
}
} else if q.TimeSeriesList != nil {
cmtsf := &cloudMonitoringTimeSeriesList{
refID: query.RefID,
logger: logger,
aliasBy: q.AliasBy,
}
if q.TimeSeriesList.View == "" {
q.TimeSeriesList.View = "FULL"
}
cmtsf.parameters = q.TimeSeriesList
params.Add("filter", buildFilterString(q.TimeSeriesList.Filters))
params.Add("view", q.TimeSeriesList.View)
setMetricAggParams(&params, q.TimeSeriesList, durationSeconds, query.Interval.Milliseconds())
cmtsf.setParams(startTime, endTime, durationSeconds, query.Interval.Milliseconds())
queryInterface = cmtsf
} else {
return nil, fmt.Errorf("missing query info")
}
case sloQueryType:
cmtsf.sloQ = q.SloQuery
cmtsf.parameters = q.TimeSeriesList
params.Add("filter", buildSLOFilterExpression(q.TimeSeriesList.ProjectName, q.SloQuery))
setSloAggParams(&params, q.SloQuery, q.TimeSeriesList.AlignmentPeriod, durationSeconds, query.Interval.Milliseconds())
queryInterface = cmtsf
cmslo := &cloudMonitoringSLO{
refID: query.RefID,
logger: logger,
aliasBy: q.AliasBy,
parameters: q.SloQuery,
}
cmslo.setParams(startTime, endTime, durationSeconds, query.Interval.Milliseconds())
queryInterface = cmslo
default:
return nil, fmt.Errorf("unrecognized query type %q", query.QueryType)
}
cmtsf.params = params
if setting.Env == setting.Dev {
logger.Debug("CloudMonitoring request", "params", params)
}
cloudMonitoringQueryExecutors = append(cloudMonitoringQueryExecutors, queryInterface)
}
@ -501,7 +461,7 @@ func interpolateFilterWildcards(value string) string {
value = strings.Replace(value, "*", "", 1)
value = fmt.Sprintf(`ends_with("%s")`, value)
case matches == 1 && strings.HasSuffix(value, "*"):
value = reverse(strings.Replace(reverse(value), "*", "", 1))
value = xstrings.Reverse(strings.Replace(xstrings.Reverse(value), "*", "", 1))
value = fmt.Sprintf(`starts_with("%s")`, value)
case matches != 0:
value = string(wildcardRegexRe.ReplaceAllFunc([]byte(value), func(in []byte) []byte {
@ -515,87 +475,6 @@ func interpolateFilterWildcards(value string) string {
return value
}
func buildFilterString(filterParts []string) string {
filterString := ""
for i, part := range filterParts {
mod := i % 4
switch {
case part == "AND":
filterString += " "
case mod == 2:
operator := filterParts[i-1]
switch {
case operator == "=~" || operator == "!=~":
filterString = reverse(strings.Replace(reverse(filterString), "~", "", 1))
filterString += fmt.Sprintf(`monitoring.regex.full_match("%s")`, part)
case strings.Contains(part, "*"):
filterString += interpolateFilterWildcards(part)
default:
filterString += fmt.Sprintf(`"%s"`, part)
}
default:
filterString += part
}
}
return strings.Trim(filterString, " ")
}
func buildSLOFilterExpression(projectName string, q *sloQuery) string {
sloName := fmt.Sprintf("projects/%s/services/%s/serviceLevelObjectives/%s", projectName, q.ServiceId, q.SloId)
if q.SelectorName == "select_slo_burn_rate" {
return fmt.Sprintf(`%s("%s", "%s")`, q.SelectorName, sloName, q.LookbackPeriod)
} else {
return fmt.Sprintf(`%s("%s")`, q.SelectorName, sloName)
}
}
func setMetricAggParams(params *url.Values, query *timeSeriesList, durationSeconds int, intervalMs int64) {
if query.CrossSeriesReducer == "" {
query.CrossSeriesReducer = crossSeriesReducerDefault
}
if query.PerSeriesAligner == "" {
query.PerSeriesAligner = perSeriesAlignerDefault
}
alignmentPeriod := calculateAlignmentPeriod(query.AlignmentPeriod, intervalMs, durationSeconds)
params.Add("aggregation.alignmentPeriod", alignmentPeriod)
if query.CrossSeriesReducer != "" {
params.Add("aggregation.crossSeriesReducer", query.CrossSeriesReducer)
}
if query.PerSeriesAligner != "" {
params.Add("aggregation.perSeriesAligner", query.PerSeriesAligner)
}
for _, groupBy := range query.GroupBys {
params.Add("aggregation.groupByFields", groupBy)
}
if query.SecondaryAlignmentPeriod != "" {
secondaryAlignmentPeriod := calculateAlignmentPeriod(query.AlignmentPeriod, intervalMs, durationSeconds)
params.Add("secondaryAggregation.alignmentPeriod", secondaryAlignmentPeriod)
}
if query.SecondaryCrossSeriesReducer != "" {
params.Add("secondaryAggregation.crossSeriesReducer", query.SecondaryCrossSeriesReducer)
}
if query.SecondaryPerSeriesAligner != "" {
params.Add("secondaryAggregation.perSeriesAligner", query.SecondaryPerSeriesAligner)
}
for _, groupBy := range query.SecondaryGroupBys {
params.Add("secondaryAggregation.groupByFields", groupBy)
}
}
func setSloAggParams(params *url.Values, query *sloQuery, alignmentPeriod string, durationSeconds int, intervalMs int64) {
params.Add("aggregation.alignmentPeriod", calculateAlignmentPeriod(alignmentPeriod, intervalMs, durationSeconds))
if query.SelectorName == "select_slo_health" {
params.Add("aggregation.perSeriesAligner", "ALIGN_MEAN")
} else {
params.Add("aggregation.perSeriesAligner", "ALIGN_NEXT_OLDER")
}
}
func calculateAlignmentPeriod(alignmentPeriod string, intervalMs int64, durationSeconds int) string {
if alignmentPeriod == "grafana-auto" || alignmentPeriod == "" {
alignmentPeriodValue := int(math.Max(float64(intervalMs)/1000, 60.0))
@ -618,12 +497,12 @@ func calculateAlignmentPeriod(alignmentPeriod string, intervalMs int64, duration
}
func formatLegendKeys(metricType string, defaultMetricName string, labels map[string]string,
additionalLabels map[string]string, query *cloudMonitoringTimeSeriesList) string {
if query.aliasBy == "" {
additionalLabels map[string]string, query cloudMonitoringQueryExecutor) string {
if query.getAliasBy() == "" {
return defaultMetricName
}
result := legendKeyFormat.ReplaceAllFunc([]byte(query.aliasBy), func(in []byte) []byte {
result := legendKeyFormat.ReplaceAllFunc([]byte(query.getAliasBy()), func(in []byte) []byte {
metaPartName := strings.Replace(string(in), "{{", "", 1)
metaPartName = strings.Replace(metaPartName, "}}", "", 1)
metaPartName = strings.TrimSpace(metaPartName)
@ -646,20 +525,8 @@ func formatLegendKeys(metricType string, defaultMetricName string, labels map[st
return []byte(val)
}
if metaPartName == "project" && query.parameters.ProjectName != "" {
return []byte(query.parameters.ProjectName)
}
if metaPartName == "service" && query.sloQ.ServiceId != "" {
return []byte(query.sloQ.ServiceId)
}
if metaPartName == "slo" && query.sloQ.SloId != "" {
return []byte(query.sloQ.SloId)
}
if metaPartName == "selector" && query.sloQ.SelectorName != "" {
return []byte(query.sloQ.SelectorName)
if query.getParameter(metaPartName) != "" {
return []byte(query.getParameter(metaPartName))
}
return in
@ -709,27 +576,11 @@ func calcBucketBound(bucketOptions cloudMonitoringBucketOptions, n int) string {
return bucketBound
}
func (s *Service) createRequest(logger log.Logger, dsInfo *datasourceInfo, proxyPass string, body io.Reader) (*http.Request, error) {
u, err := url.Parse(dsInfo.url)
if err != nil {
return nil, err
func (s *Service) ensureProject(ctx context.Context, dsInfo datasourceInfo, projectName string) (string, error) {
if projectName != "" {
return projectName, nil
}
u.Path = path.Join(u.Path, "render")
method := http.MethodGet
if body != nil {
method = http.MethodPost
}
req, err := http.NewRequest(method, dsInfo.services[cloudMonitor].url, body)
if err != nil {
logger.Error("Failed to create request", "error", err)
return nil, fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.URL.Path = proxyPass
return req, nil
return s.getDefaultProject(ctx, dsInfo)
}
func (s *Service) getDefaultProject(ctx context.Context, dsInfo datasourceInfo) (string, error) {

View File

@ -676,7 +676,7 @@ func TestCloudMonitoring(t *testing.T) {
qes, err := service.buildQueryExecutors(slog, req)
require.NoError(t, err)
queries := getCloudMonitoringListFromInterface(t, qes)
queries := getCloudMonitoringSLOFromInterface(t, qes)
assert.Equal(t, 1, len(queries))
assert.Equal(t, "A", queries[0].refID)
@ -706,7 +706,7 @@ func TestCloudMonitoring(t *testing.T) {
qes, err = service.buildQueryExecutors(slog, req)
require.NoError(t, err)
qqueries := getCloudMonitoringListFromInterface(t, qes)
qqueries := getCloudMonitoringSLOFromInterface(t, qes)
assert.Equal(t, "ALIGN_NEXT_OLDER", qqueries[0].params["aggregation.perSeriesAligner"][0])
dl := qqueries[0].buildDeepLink()
@ -731,7 +731,7 @@ func TestCloudMonitoring(t *testing.T) {
qes, err = service.buildQueryExecutors(slog, req)
require.NoError(t, err)
qqqueries := getCloudMonitoringListFromInterface(t, qes)
qqqueries := getCloudMonitoringSLOFromInterface(t, qes)
assert.Equal(t, `aggregation.alignmentPeriod=%2B60s&aggregation.perSeriesAligner=ALIGN_NEXT_OLDER&filter=select_slo_burn_rate%28%22projects%2Ftest-proj%2Fservices%2Ftest-service%2FserviceLevelObjectives%2Ftest-slo%22%2C+%221h%22%29&interval.endTime=2018-03-15T13%3A34%3A00Z&interval.startTime=2018-03-15T13%3A00%3A00Z`, qqqueries[0].params.Encode())
})
})
@ -798,31 +798,6 @@ func TestCloudMonitoring(t *testing.T) {
})
})
t.Run("when building filter string", func(t *testing.T) {
t.Run("and there's no regex operator", func(t *testing.T) {
t.Run("and there are wildcards in a filter value", func(t *testing.T) {
filterParts := []string{"metric.type", "=", "somemetrictype", "AND", "zone", "=", "*-central1*"}
value := buildFilterString(filterParts)
assert.Equal(t, `metric.type="somemetrictype" zone=has_substring("-central1")`, value)
})
t.Run("and there are no wildcards in any filter value", func(t *testing.T) {
filterParts := []string{"metric.type", "=", "somemetrictype", "AND", "zone", "!=", "us-central1-a"}
value := buildFilterString(filterParts)
assert.Equal(t, `metric.type="somemetrictype" zone!="us-central1-a"`, value)
})
})
t.Run("and there is a regex operator", func(t *testing.T) {
filterParts := []string{"metric.type", "=", "somemetrictype", "AND", "zone", "=~", "us-central1-a~"}
value := buildFilterString(filterParts)
assert.NotContains(t, value, `=~`)
assert.Contains(t, value, `zone=`)
assert.Contains(t, value, `zone=monitoring.regex.full_match("us-central1-a~")`)
})
})
t.Run("and query preprocessor is not defined", func(t *testing.T) {
req := deprecatedReq()
req.Queries[0].JSON = json.RawMessage(`{
@ -1011,6 +986,18 @@ func getCloudMonitoringListFromInterface(t *testing.T, qes []cloudMonitoringQuer
return queries
}
func getCloudMonitoringSLOFromInterface(t *testing.T, qes []cloudMonitoringQueryExecutor) []*cloudMonitoringSLO {
t.Helper()
queries := make([]*cloudMonitoringSLO, 0)
for _, qi := range qes {
q, ok := qi.(*cloudMonitoringSLO)
require.Truef(t, ok, "Received wrong type %T", qi)
queries = append(queries, q)
}
return queries
}
func getCloudMonitoringQueryFromInterface(t *testing.T, qes []cloudMonitoringQueryExecutor) []*cloudMonitoringTimeSeriesQuery {
t.Helper()

View File

@ -0,0 +1,75 @@
package cloudmonitoring
import (
"context"
"fmt"
"net/url"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/infra/tracing"
)
func (sloQ *cloudMonitoringSLO) run(ctx context.Context, req *backend.QueryDataRequest,
s *Service, dsInfo datasourceInfo, tracer tracing.Tracer) (*backend.DataResponse, cloudMonitoringResponse, string, error) {
return runTimeSeriesRequest(ctx, sloQ.logger, req, s, dsInfo, tracer, sloQ.parameters.ProjectName, sloQ.params, nil)
}
func (sloQ *cloudMonitoringSLO) parseResponse(queryRes *backend.DataResponse,
response cloudMonitoringResponse, executedQueryString string) error {
return parseTimeSeriesResponse(queryRes, response, executedQueryString, sloQ, sloQ.params, []string{})
}
func (sloQ *cloudMonitoringSLO) buildDeepLink() string {
return ""
}
func (sloQ *cloudMonitoringSLO) getRefID() string {
return sloQ.refID
}
func (sloQ *cloudMonitoringSLO) getAliasBy() string {
return sloQ.aliasBy
}
func (sloQ *cloudMonitoringSLO) getParameter(i string) string {
switch i {
case "project":
return sloQ.parameters.ProjectName
case "service":
return sloQ.parameters.ServiceId
case "slo":
return sloQ.parameters.SloId
case "selector":
return sloQ.parameters.SelectorName
default:
return ""
}
}
func (sloQ *cloudMonitoringSLO) getFilter() string {
sloName := fmt.Sprintf("projects/%s/services/%s/serviceLevelObjectives/%s", sloQ.parameters.ProjectName, sloQ.parameters.ServiceId, sloQ.parameters.SloId)
if sloQ.parameters.SelectorName == "select_slo_burn_rate" {
return fmt.Sprintf(`%s("%s", "%s")`, sloQ.parameters.SelectorName, sloName, sloQ.parameters.LookbackPeriod)
} else {
return fmt.Sprintf(`%s("%s")`, sloQ.parameters.SelectorName, sloName)
}
}
func (sloQ *cloudMonitoringSLO) setParams(startTime time.Time, endTime time.Time, durationSeconds int, intervalMs int64) {
params := url.Values{}
params.Add("interval.startTime", startTime.UTC().Format(time.RFC3339))
params.Add("interval.endTime", endTime.UTC().Format(time.RFC3339))
params.Add("filter", sloQ.getFilter())
params.Add("aggregation.alignmentPeriod", calculateAlignmentPeriod(sloQ.parameters.AlignmentPeriod, intervalMs, durationSeconds))
if sloQ.parameters.SelectorName == "select_slo_health" {
params.Add("aggregation.perSeriesAligner", "ALIGN_MEAN")
} else {
params.Add("aggregation.perSeriesAligner", "ALIGN_NEXT_OLDER")
}
sloQ.params = params
}

View File

@ -0,0 +1,75 @@
package cloudmonitoring
import (
"net/url"
"testing"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func SLOQuery(t *testing.T) {
t.Run("when data from query returns slo and alias by is defined", func(t *testing.T) {
data, err := loadTestFile("./test-data/6-series-response-slo.json")
require.NoError(t, err)
assert.Equal(t, 1, len(data.TimeSeries))
t.Run("and alias by is expanded", func(t *testing.T) {
res := &backend.DataResponse{}
query := &cloudMonitoringSLO{
params: url.Values{},
parameters: &sloQuery{
ProjectName: "test-proj",
SelectorName: "select_slo_compliance",
ServiceId: "test-service",
SloId: "test-slo",
},
aliasBy: "{{project}} - {{service}} - {{slo}} - {{selector}}",
}
err = query.parseResponse(res, data, "")
require.NoError(t, err)
frames := res.Frames
require.NoError(t, err)
assert.Equal(t, "test-proj - test-service - test-slo - select_slo_compliance", frames[0].Fields[1].Name)
})
})
t.Run("when data from query returns slo and alias by is not defined", func(t *testing.T) {
data, err := loadTestFile("./test-data/6-series-response-slo.json")
require.NoError(t, err)
assert.Equal(t, 1, len(data.TimeSeries))
t.Run("and alias by is expanded", func(t *testing.T) {
res := &backend.DataResponse{}
query := &cloudMonitoringSLO{
params: url.Values{},
parameters: &sloQuery{
ProjectName: "test-proj",
SelectorName: "select_slo_compliance",
ServiceId: "test-service",
SloId: "test-slo",
},
}
err = query.parseResponse(res, data, "")
require.NoError(t, err)
frames := res.Frames
require.NoError(t, err)
assert.Equal(t, "select_slo_compliance(\"projects/test-proj/services/test-service/serviceLevelObjectives/test-slo\")", frames[0].Fields[1].Name)
})
})
t.Run("when data comes from a slo query, it should skip the link", func(t *testing.T) {
data, err := loadTestFile("./test-data/3-series-response-distribution-exponential.json")
require.NoError(t, err)
assert.Equal(t, 1, len(data.TimeSeries))
res := &backend.DataResponse{}
query := &cloudMonitoringSLO{params: url.Values{}, parameters: &sloQuery{SloId: "yes"}}
err = query.parseResponse(res, data, "")
require.NoError(t, err)
frames := res.Frames
assert.Equal(t, len(frames[0].Fields[1].Config.Links), 0)
})
}

View File

@ -4,104 +4,25 @@ import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"path"
"strconv"
"strings"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"go.opentelemetry.io/otel/attribute"
"github.com/huandu/xstrings"
"github.com/grafana/grafana/pkg/infra/tracing"
)
func (timeSeriesFilter *cloudMonitoringTimeSeriesList) doRequestFilterPage(ctx context.Context, r *http.Request, dsInfo datasourceInfo) (cloudMonitoringResponse, error) {
r.URL.RawQuery = timeSeriesFilter.params.Encode()
r = r.WithContext(ctx)
res, err := dsInfo.services[cloudMonitor].client.Do(r)
if err != nil {
return cloudMonitoringResponse{}, err
}
dnext, err := unmarshalResponse(timeSeriesFilter.logger, res)
if err != nil {
return cloudMonitoringResponse{}, err
}
return dnext, nil
}
func (timeSeriesFilter *cloudMonitoringTimeSeriesList) run(ctx context.Context, req *backend.QueryDataRequest,
s *Service, dsInfo datasourceInfo, tracer tracing.Tracer) (*backend.DataResponse, cloudMonitoringResponse, string, error) {
dr := &backend.DataResponse{}
projectName := timeSeriesFilter.parameters.ProjectName
if projectName == "" {
var err error
projectName, err = s.getDefaultProject(ctx, dsInfo)
if err != nil {
dr.Error = err
return dr, cloudMonitoringResponse{}, "", nil
}
timeSeriesFilter.logger.Info("No project name set on query, using project name from datasource", "projectName", projectName)
}
r, err := s.createRequest(timeSeriesFilter.logger, &dsInfo, path.Join("/v3/projects", projectName, "timeSeries"), nil)
if err != nil {
dr.Error = err
return dr, cloudMonitoringResponse{}, "", nil
}
alignmentPeriod, ok := r.URL.Query()["aggregation.alignmentPeriod"]
if ok {
seconds, err := strconv.ParseInt(alignmentPeriodRe.FindString(alignmentPeriod[0]), 10, 64)
if err == nil {
if len(dr.Frames) == 0 {
dr.Frames = append(dr.Frames, data.NewFrame(""))
}
firstFrame := dr.Frames[0]
if firstFrame.Meta == nil {
firstFrame.SetMeta(&data.FrameMeta{
Custom: map[string]interface{}{
"alignmentPeriod": seconds,
},
})
}
}
}
ctx, span := tracer.Start(ctx, "cloudMonitoring query")
span.SetAttributes("target", timeSeriesFilter.params.Encode(), attribute.Key("target").String(timeSeriesFilter.params.Encode()))
span.SetAttributes("from", req.Queries[0].TimeRange.From, attribute.Key("from").String(req.Queries[0].TimeRange.From.String()))
span.SetAttributes("until", req.Queries[0].TimeRange.To, attribute.Key("until").String(req.Queries[0].TimeRange.To.String()))
span.SetAttributes("datasource_id", dsInfo.id, attribute.Key("datasource_id").Int64(dsInfo.id))
span.SetAttributes("org_id", req.PluginContext.OrgID, attribute.Key("org_id").Int64(req.PluginContext.OrgID))
defer span.End()
tracer.Inject(ctx, r.Header, span)
d, err := timeSeriesFilter.doRequestFilterPage(ctx, r, dsInfo)
if err != nil {
dr.Error = err
return dr, cloudMonitoringResponse{}, "", nil
}
nextPageToken := d.NextPageToken
for nextPageToken != "" {
timeSeriesFilter.params["pageToken"] = []string{d.NextPageToken}
nextPage, err := timeSeriesFilter.doRequestFilterPage(ctx, r, dsInfo)
if err != nil {
dr.Error = err
return dr, cloudMonitoringResponse{}, "", nil
}
d.TimeSeries = append(d.TimeSeries, nextPage.TimeSeries...)
nextPageToken = nextPage.NextPageToken
}
return dr, d, r.URL.RawQuery, nil
return runTimeSeriesRequest(ctx, timeSeriesFilter.logger, req, s, dsInfo, tracer, timeSeriesFilter.parameters.ProjectName, timeSeriesFilter.params, nil)
}
//nolint:gocyclo
func (timeSeriesFilter *cloudMonitoringTimeSeriesList) parseResponse(queryRes *backend.DataResponse,
response cloudMonitoringResponse, executedQueryString string) error {
func parseTimeSeriesResponse(queryRes *backend.DataResponse,
response cloudMonitoringResponse, executedQueryString string, query cloudMonitoringQueryExecutor, params url.Values, groupBys []string) error {
frames := data.Frames{}
for _, series := range response.TimeSeries {
@ -110,9 +31,13 @@ func (timeSeriesFilter *cloudMonitoringTimeSeriesList) parseResponse(queryRes *b
labels := make(map[string]string)
labels["resource.type"] = series.Resource.Type
seriesLabels["resource.type"] = series.Resource.Type
groupBysMap := make(map[string]bool)
for _, groupBy := range groupBys {
groupBysMap[groupBy] = true
}
frame := data.NewFrameOfFieldTypes("", len(series.Points), data.FieldTypeTime, data.FieldTypeFloat64)
frame.RefID = timeSeriesFilter.refID
frame.RefID = query.getRefID()
frame.Meta = &data.FrameMeta{
ExecutedQueryString: executedQueryString,
}
@ -121,7 +46,7 @@ func (timeSeriesFilter *cloudMonitoringTimeSeriesList) parseResponse(queryRes *b
labels["metric.label."+key] = value
seriesLabels["metric.label."+key] = value
if len(timeSeriesFilter.parameters.GroupBys) == 0 || containsLabel(timeSeriesFilter.parameters.GroupBys, "metric.label."+key) {
if len(groupBys) == 0 || groupBysMap["metric.label."+key] {
defaultMetricName += " " + value
}
}
@ -130,14 +55,14 @@ func (timeSeriesFilter *cloudMonitoringTimeSeriesList) parseResponse(queryRes *b
labels["resource.label."+key] = value
seriesLabels["resource.label."+key] = value
if containsLabel(timeSeriesFilter.parameters.GroupBys, "resource.label."+key) {
if groupBysMap["resource.label."+key] {
defaultMetricName += " " + value
}
}
for labelType, labelTypeValues := range series.MetaData {
for labelKey, labelValue := range labelTypeValues {
key := toSnakeCase(fmt.Sprintf("metadata.%s.%s", labelType, labelKey))
key := xstrings.ToSnakeCase(fmt.Sprintf("metadata.%s.%s", labelType, labelKey))
switch v := labelValue.(type) {
case string:
@ -161,10 +86,10 @@ func (timeSeriesFilter *cloudMonitoringTimeSeriesList) parseResponse(queryRes *b
}
customFrameMeta := map[string]interface{}{}
customFrameMeta["alignmentPeriod"] = timeSeriesFilter.params.Get("aggregation.alignmentPeriod")
customFrameMeta["perSeriesAligner"] = timeSeriesFilter.params.Get("aggregation.perSeriesAligner")
customFrameMeta["alignmentPeriod"] = params.Get("aggregation.alignmentPeriod")
customFrameMeta["perSeriesAligner"] = params.Get("aggregation.perSeriesAligner")
customFrameMeta["labels"] = labels
customFrameMeta["groupBys"] = timeSeriesFilter.parameters.GroupBys
customFrameMeta["groupBys"] = groupBys
if frame.Meta != nil {
frame.Meta.Custom = customFrameMeta
} else {
@ -173,7 +98,7 @@ func (timeSeriesFilter *cloudMonitoringTimeSeriesList) parseResponse(queryRes *b
// reverse the order to be ascending
if series.ValueType != "DISTRIBUTION" {
timeSeriesFilter.handleNonDistributionSeries(series, defaultMetricName, seriesLabels, frame)
handleNonDistributionSeries(series, defaultMetricName, seriesLabels, frame, query)
frames = append(frames, frame)
continue
}
@ -197,7 +122,7 @@ func (timeSeriesFilter *cloudMonitoringTimeSeriesList) parseResponse(queryRes *b
timeField := data.NewField(data.TimeSeriesTimeFieldName, nil, []time.Time{})
valueField := data.NewField(data.TimeSeriesValueFieldName, nil, []float64{})
frameName := formatLegendKeys(series.Metric.Type, defaultMetricName, nil, additionalLabels, timeSeriesFilter)
frameName := formatLegendKeys(series.Metric.Type, defaultMetricName, nil, additionalLabels, query)
valueField.Name = frameName
valueField.Labels = seriesLabels
setDisplayNameAsFieldName(valueField)
@ -208,7 +133,7 @@ func (timeSeriesFilter *cloudMonitoringTimeSeriesList) parseResponse(queryRes *b
timeField,
valueField,
},
RefID: timeSeriesFilter.refID,
RefID: query.getRefID(),
Meta: &data.FrameMeta{
ExecutedQueryString: executedQueryString,
},
@ -226,8 +151,8 @@ func (timeSeriesFilter *cloudMonitoringTimeSeriesList) parseResponse(queryRes *b
}
}
if len(response.TimeSeries) > 0 {
dl := timeSeriesFilter.buildDeepLink()
frames = addConfigData(frames, dl, response.Unit, timeSeriesFilter.params.Get("aggregation.alignmentPeriod"))
dl := query.buildDeepLink()
frames = addConfigData(frames, dl, response.Unit, params.Get("aggregation.alignmentPeriod"))
}
queryRes.Frames = frames
@ -235,8 +160,13 @@ func (timeSeriesFilter *cloudMonitoringTimeSeriesList) parseResponse(queryRes *b
return nil
}
func (timeSeriesFilter *cloudMonitoringTimeSeriesList) handleNonDistributionSeries(series timeSeries,
defaultMetricName string, seriesLabels map[string]string, frame *data.Frame) {
func (timeSeriesFilter *cloudMonitoringTimeSeriesList) parseResponse(queryRes *backend.DataResponse,
response cloudMonitoringResponse, executedQueryString string) error {
return parseTimeSeriesResponse(queryRes, response, executedQueryString, timeSeriesFilter, timeSeriesFilter.params, timeSeriesFilter.parameters.GroupBys)
}
func handleNonDistributionSeries(series timeSeries,
defaultMetricName string, seriesLabels map[string]string, frame *data.Frame, query cloudMonitoringQueryExecutor) {
for i := 0; i < len(series.Points); i++ {
point := series.Points[i]
value := point.Value.DoubleValue
@ -258,7 +188,7 @@ func (timeSeriesFilter *cloudMonitoringTimeSeriesList) handleNonDistributionSeri
frame.SetRow(len(series.Points)-1-i, point.Interval.EndTime, value)
}
metricName := formatLegendKeys(series.Metric.Type, defaultMetricName, seriesLabels, nil, timeSeriesFilter)
metricName := formatLegendKeys(series.Metric.Type, defaultMetricName, seriesLabels, nil, query)
dataField := frame.Fields[1]
dataField.Name = metricName
dataField.Labels = seriesLabels
@ -266,10 +196,6 @@ func (timeSeriesFilter *cloudMonitoringTimeSeriesList) handleNonDistributionSeri
}
func (timeSeriesFilter *cloudMonitoringTimeSeriesList) buildDeepLink() string {
if timeSeriesFilter.sloQ != nil && timeSeriesFilter.sloQ.SloId != "" {
return ""
}
filter := timeSeriesFilter.params.Get("filter")
if !strings.Contains(filter, "resource.type=") {
resourceType := timeSeriesFilter.params.Get("resourceType")
@ -352,3 +278,89 @@ func setDisplayNameAsFieldName(f *data.Field) {
func (timeSeriesFilter *cloudMonitoringTimeSeriesList) getRefID() string {
return timeSeriesFilter.refID
}
func (timeSeriesFilter *cloudMonitoringTimeSeriesList) getAliasBy() string {
return timeSeriesFilter.aliasBy
}
func (timeSeriesFilter *cloudMonitoringTimeSeriesList) getParameter(i string) string {
switch i {
case "project":
return timeSeriesFilter.parameters.ProjectName
default:
return ""
}
}
func (timeSeriesFilter *cloudMonitoringTimeSeriesList) getFilter() string {
filterString := ""
for i, part := range timeSeriesFilter.parameters.Filters {
mod := i % 4
switch {
case part == "AND":
filterString += " "
case mod == 2:
operator := timeSeriesFilter.parameters.Filters[i-1]
switch {
case operator == "=~" || operator == "!=~":
filterString = xstrings.Reverse(strings.Replace(xstrings.Reverse(filterString), "~", "", 1))
filterString += fmt.Sprintf(`monitoring.regex.full_match("%s")`, part)
case strings.Contains(part, "*"):
filterString += interpolateFilterWildcards(part)
default:
filterString += fmt.Sprintf(`"%s"`, part)
}
default:
filterString += part
}
}
return strings.Trim(filterString, " ")
}
func (timeSeriesFilter *cloudMonitoringTimeSeriesList) setParams(startTime time.Time, endTime time.Time, durationSeconds int, intervalMs int64) {
params := url.Values{}
query := timeSeriesFilter.parameters
params.Add("interval.startTime", startTime.UTC().Format(time.RFC3339))
params.Add("interval.endTime", endTime.UTC().Format(time.RFC3339))
params.Add("filter", timeSeriesFilter.getFilter())
params.Add("view", query.View)
if query.CrossSeriesReducer == "" {
query.CrossSeriesReducer = crossSeriesReducerDefault
}
if query.PerSeriesAligner == "" {
query.PerSeriesAligner = perSeriesAlignerDefault
}
alignmentPeriod := calculateAlignmentPeriod(query.AlignmentPeriod, intervalMs, durationSeconds)
params.Add("aggregation.alignmentPeriod", alignmentPeriod)
if query.CrossSeriesReducer != "" {
params.Add("aggregation.crossSeriesReducer", query.CrossSeriesReducer)
}
if query.PerSeriesAligner != "" {
params.Add("aggregation.perSeriesAligner", query.PerSeriesAligner)
}
for _, groupBy := range query.GroupBys {
params.Add("aggregation.groupByFields", groupBy)
}
if query.SecondaryAlignmentPeriod != "" {
secondaryAlignmentPeriod := calculateAlignmentPeriod(query.AlignmentPeriod, intervalMs, durationSeconds)
params.Add("secondaryAggregation.alignmentPeriod", secondaryAlignmentPeriod)
}
if query.SecondaryCrossSeriesReducer != "" {
params.Add("secondaryAggregation.crossSeriesReducer", query.SecondaryCrossSeriesReducer)
}
if query.SecondaryPerSeriesAligner != "" {
params.Add("secondaryAggregation.perSeriesAligner", query.SecondaryPerSeriesAligner)
}
for _, groupBy := range query.SecondaryGroupBys {
params.Add("secondaryAggregation.groupByFields", groupBy)
}
timeSeriesFilter.params = params
}

View File

@ -304,59 +304,6 @@ func TestTimeSeriesFilter(t *testing.T) {
})
})
t.Run("when data from query returns slo and alias by is defined", func(t *testing.T) {
data, err := loadTestFile("./test-data/6-series-response-slo.json")
require.NoError(t, err)
assert.Equal(t, 1, len(data.TimeSeries))
t.Run("and alias by is expanded", func(t *testing.T) {
res := &backend.DataResponse{}
query := &cloudMonitoringTimeSeriesList{
params: url.Values{},
parameters: &timeSeriesList{
ProjectName: "test-proj",
},
aliasBy: "{{project}} - {{service}} - {{slo}} - {{selector}}",
sloQ: &sloQuery{
SelectorName: "select_slo_compliance",
ServiceId: "test-service",
SloId: "test-slo",
},
}
err = query.parseResponse(res, data, "")
require.NoError(t, err)
frames := res.Frames
require.NoError(t, err)
assert.Equal(t, "test-proj - test-service - test-slo - select_slo_compliance", frames[0].Fields[1].Name)
})
})
t.Run("when data from query returns slo and alias by is not defined", func(t *testing.T) {
data, err := loadTestFile("./test-data/6-series-response-slo.json")
require.NoError(t, err)
assert.Equal(t, 1, len(data.TimeSeries))
t.Run("and alias by is expanded", func(t *testing.T) {
res := &backend.DataResponse{}
query := &cloudMonitoringTimeSeriesList{
params: url.Values{},
parameters: &timeSeriesList{
ProjectName: "test-proj",
},
sloQ: &sloQuery{
SelectorName: "select_slo_compliance",
ServiceId: "test-service",
SloId: "test-slo",
},
}
err = query.parseResponse(res, data, "")
require.NoError(t, err)
frames := res.Frames
require.NoError(t, err)
assert.Equal(t, "select_slo_compliance(\"projects/test-proj/services/test-service/serviceLevelObjectives/test-slo\")", frames[0].Fields[1].Name)
})
})
t.Run("Parse cloud monitoring unit", func(t *testing.T) {
t.Run("when mapping is found a unit should be specified on the field config", func(t *testing.T) {
data, err := loadTestFile("./test-data/1-series-response-agg-one-metric.json")
@ -515,17 +462,29 @@ func TestTimeSeriesFilter(t *testing.T) {
})
})
t.Run("when data comes from a slo query, it should skip the link", func(t *testing.T) {
data, err := loadTestFile("./test-data/3-series-response-distribution-exponential.json")
require.NoError(t, err)
assert.Equal(t, 1, len(data.TimeSeries))
t.Run("when building filter string", func(t *testing.T) {
t.Run("and there's no regex operator", func(t *testing.T) {
t.Run("and there are wildcards in a filter value", func(t *testing.T) {
tsl := &cloudMonitoringTimeSeriesList{parameters: &timeSeriesList{Filters: []string{"metric.type", "=", "somemetrictype", "AND", "zone", "=", "*-central1*"}}}
value := tsl.getFilter()
assert.Equal(t, `metric.type="somemetrictype" zone=has_substring("-central1")`, value)
})
res := &backend.DataResponse{}
query := &cloudMonitoringTimeSeriesList{params: url.Values{}, sloQ: &sloQuery{SloId: "yes"}, parameters: &timeSeriesList{}}
err = query.parseResponse(res, data, "")
require.NoError(t, err)
frames := res.Frames
assert.Equal(t, len(frames[0].Fields[1].Config.Links), 0)
t.Run("and there are no wildcards in any filter value", func(t *testing.T) {
tsl := &cloudMonitoringTimeSeriesList{parameters: &timeSeriesList{Filters: []string{"metric.type", "=", "somemetrictype", "AND", "zone", "!=", "us-central1-a"}}}
value := tsl.getFilter()
assert.Equal(t, `metric.type="somemetrictype" zone!="us-central1-a"`, value)
})
})
t.Run("and there is a regex operator", func(t *testing.T) {
tsl := &cloudMonitoringTimeSeriesList{parameters: &timeSeriesList{Filters: []string{"metric.type", "=", "somemetrictype", "AND", "zone", "=~", "us-central1-a~"}}}
value := tsl.getFilter()
assert.NotContains(t, value, `=~`)
assert.Contains(t, value, `zone=`)
assert.Contains(t, value, `zone=monitoring.regex.full_match("us-central1-a~")`)
})
})
}

View File

@ -1,23 +1,18 @@
package cloudmonitoring
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"path"
"strconv"
"strings"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"go.opentelemetry.io/otel/attribute"
"github.com/huandu/xstrings"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/tsdb/intervalv2"
)
@ -36,83 +31,17 @@ func (timeSeriesQuery *cloudMonitoringTimeSeriesQuery) appendGraphPeriod(req *ba
return ""
}
func doRequestQueryPage(log log.Logger, requestBody map[string]interface{}, r *http.Request, dsInfo datasourceInfo) (cloudMonitoringResponse, error) {
buf, err := json.Marshal(requestBody)
if err != nil {
return cloudMonitoringResponse{}, err
}
r.Body = io.NopCloser(bytes.NewBuffer(buf))
res, err := dsInfo.services[cloudMonitor].client.Do(r)
if err != nil {
return cloudMonitoringResponse{}, err
}
dnext, err := unmarshalResponse(log, res)
if err != nil {
return cloudMonitoringResponse{}, err
}
return dnext, nil
}
func (timeSeriesQuery *cloudMonitoringTimeSeriesQuery) run(ctx context.Context, req *backend.QueryDataRequest,
s *Service, dsInfo datasourceInfo, tracer tracing.Tracer) (*backend.DataResponse, cloudMonitoringResponse, string, error) {
dr := &backend.DataResponse{}
projectName := timeSeriesQuery.parameters.ProjectName
if projectName == "" {
var err error
projectName, err = s.getDefaultProject(ctx, dsInfo)
if err != nil {
dr.Error = err
return dr, cloudMonitoringResponse{}, "", nil
}
timeSeriesQuery.logger.Info("No project name set on query, using project name from datasource", "projectName", projectName)
}
timeSeriesQuery.parameters.Query += timeSeriesQuery.appendGraphPeriod(req)
from := req.Queries[0].TimeRange.From
to := req.Queries[0].TimeRange.To
timeFormat := "2006/01/02-15:04:05"
timeSeriesQuery.parameters.Query += fmt.Sprintf(" | within d'%s', d'%s'", from.UTC().Format(timeFormat), to.UTC().Format(timeFormat))
p := path.Join("/v3/projects", projectName, "timeSeries:query")
ctx, span := tracer.Start(ctx, "cloudMonitoring MQL query")
span.SetAttributes("query", timeSeriesQuery.parameters.Query, attribute.Key("query").String(timeSeriesQuery.parameters.Query))
span.SetAttributes("from", req.Queries[0].TimeRange.From, attribute.Key("from").String(req.Queries[0].TimeRange.From.String()))
span.SetAttributes("until", req.Queries[0].TimeRange.To, attribute.Key("until").String(req.Queries[0].TimeRange.To.String()))
defer span.End()
requestBody := map[string]interface{}{
"query": timeSeriesQuery.parameters.Query,
}
r, err := s.createRequest(timeSeriesQuery.logger, &dsInfo, p, bytes.NewBuffer([]byte{}))
if err != nil {
dr.Error = err
return dr, cloudMonitoringResponse{}, "", nil
}
tracer.Inject(ctx, r.Header, span)
r = r.WithContext(ctx)
d, err := doRequestQueryPage(timeSeriesQuery.logger, requestBody, r, dsInfo)
if err != nil {
dr.Error = err
return dr, cloudMonitoringResponse{}, "", nil
}
for d.NextPageToken != "" {
requestBody := map[string]interface{}{
"query": timeSeriesQuery.parameters.Query,
"pageToken": d.NextPageToken,
}
nextPage, err := doRequestQueryPage(timeSeriesQuery.logger, requestBody, r, dsInfo)
if err != nil {
dr.Error = err
return dr, cloudMonitoringResponse{}, "", nil
}
d.TimeSeriesData = append(d.TimeSeriesData, nextPage.TimeSeriesData...)
d.NextPageToken = nextPage.NextPageToken
}
return dr, d, timeSeriesQuery.parameters.Query, nil
return runTimeSeriesRequest(ctx, timeSeriesQuery.logger, req, s, dsInfo, tracer, timeSeriesQuery.parameters.ProjectName, nil, requestBody)
}
func (timeSeriesQuery *cloudMonitoringTimeSeriesQuery) parseResponse(queryRes *backend.DataResponse,
@ -129,7 +58,7 @@ func (timeSeriesQuery *cloudMonitoringTimeSeriesQuery) parseResponse(queryRes *b
labels := make(map[string]string)
for n, d := range response.TimeSeriesDescriptor.LabelDescriptors {
key := toSnakeCase(d.Key)
key := xstrings.ToSnakeCase(d.Key)
key = strings.Replace(key, ".", ".label.", 1)
labelValue := series.LabelValues[n]
@ -356,3 +285,16 @@ func (timeSeriesQuery *cloudMonitoringTimeSeriesQuery) buildDeepLink() string {
func (timeSeriesQuery *cloudMonitoringTimeSeriesQuery) getRefID() string {
return timeSeriesQuery.refID
}
func (timeSeriesQuery *cloudMonitoringTimeSeriesQuery) getAliasBy() string {
return timeSeriesQuery.aliasBy
}
func (timeSeriesQuery *cloudMonitoringTimeSeriesQuery) getParameter(i string) string {
switch i {
case "project":
return timeSeriesQuery.parameters.ProjectName
default:
return ""
}
}

View File

@ -18,6 +18,8 @@ type (
parseResponse(dr *backend.DataResponse, data cloudMonitoringResponse, executedQueryString string) error
buildDeepLink() string
getRefID() string
getAliasBy() string
getParameter(i string) string
}
// Plugin API query data request used to generate
@ -26,8 +28,7 @@ type (
AliasBy string `json:"aliasBy"`
TimeSeriesList *timeSeriesList `json:"timeSeriesList,omitempty"`
TimeSeriesQuery *timeSeriesQuery `json:"timeSeriesQuery,omitempty"`
// TODO: Merge SloQuery into TimeSeriesList
SloQuery *sloQuery `json:"sloQuery,omitempty"`
SloQuery *sloQuery `json:"sloQuery,omitempty"`
}
// These should reflect GCM APIs
@ -45,12 +46,15 @@ type (
SecondaryPerSeriesAligner string `json:"secondaryPerSeriesAligner"`
SecondaryGroupBys []string `json:"secondaryGroupBys"`
}
// TODO: sloQuery can be specified as timeSeriesList parameters
// sloQuery is an internal convention but the API is the same as timeSeriesList
sloQuery struct {
SelectorName string `json:"selectorName"`
ServiceId string `json:"serviceId"`
SloId string `json:"sloId"`
LookbackPeriod string `json:"lookbackPeriod"`
ProjectName string `json:"projectName"`
SelectorName string `json:"selectorName"`
ServiceId string `json:"serviceId"`
SloId string `json:"sloId"`
AlignmentPeriod string `json:"alignmentPeriod"`
LookbackPeriod string `json:"lookbackPeriod"`
}
// timeSeries.query https://cloud.google.com/monitoring/api/ref_v3/rest/v3/projects.timeSeries/query
@ -68,11 +72,19 @@ type (
aliasBy string
logger log.Logger
parameters *timeSeriesList
// TODO: Merge SloQuery into TimeSeriesList
sloQ *sloQuery
// Processed properties
params url.Values
}
// cloudMonitoringSLO is used to build time series with a filter but for the SLO case
cloudMonitoringSLO struct {
refID string
aliasBy string
logger log.Logger
parameters *sloQuery
// Processed properties
params url.Values
}
// cloudMonitoringTimeSeriesQuery is used to build MQL queries
cloudMonitoringTimeSeriesQuery struct {
refID string

View File

@ -1,33 +1,24 @@
package cloudmonitoring
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"path"
"strings"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/tsdb/intervalv2"
"go.opentelemetry.io/otel/attribute"
)
func reverse(s string) string {
chars := []rune(s)
for i, j := 0, len(chars)-1; i < j; i, j = i+1, j-1 {
chars[i], chars[j] = chars[j], chars[i]
}
return string(chars)
}
func toSnakeCase(str string) string {
return strings.ToLower(matchAllCap.ReplaceAllString(str, "${1}_${2}"))
}
func containsLabel(labels []string, newLabel string) bool {
for _, val := range labels {
if val == newLabel {
return true
}
}
return false
}
func addInterval(period string, field *data.Field) error {
period = strings.TrimPrefix(period, "+")
p, err := intervalv2.ParseIntervalStringToTimeDuration(period)
@ -52,3 +43,115 @@ func toString(v interface{}) string {
}
return v.(string)
}
func createRequest(ctx context.Context, logger log.Logger, dsInfo *datasourceInfo, proxyPass string, body io.Reader) (*http.Request, error) {
u, err := url.Parse(dsInfo.url)
if err != nil {
return nil, err
}
u.Path = path.Join(u.Path, "render")
method := http.MethodGet
if body != nil {
method = http.MethodPost
}
req, err := http.NewRequestWithContext(ctx, method, dsInfo.services[cloudMonitor].url, body)
if err != nil {
logger.Error("Failed to create request", "error", err)
return nil, fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.URL.Path = proxyPass
return req, nil
}
func doRequestPage(ctx context.Context, logger log.Logger, r *http.Request, dsInfo datasourceInfo, params url.Values, body map[string]interface{}) (cloudMonitoringResponse, error) {
if params != nil {
r.URL.RawQuery = params.Encode()
}
if body != nil {
buf, err := json.Marshal(body)
if err != nil {
return cloudMonitoringResponse{}, err
}
r.Body = io.NopCloser(bytes.NewBuffer(buf))
r.Method = http.MethodPost
}
res, err := dsInfo.services[cloudMonitor].client.Do(r)
if err != nil {
return cloudMonitoringResponse{}, err
}
dnext, err := unmarshalResponse(logger, res)
if err != nil {
return cloudMonitoringResponse{}, err
}
return dnext, nil
}
func doRequestWithPagination(ctx context.Context, logger log.Logger, r *http.Request, dsInfo datasourceInfo, params url.Values, body map[string]interface{}) (cloudMonitoringResponse, error) {
d, err := doRequestPage(ctx, logger, r, dsInfo, params, body)
if err != nil {
return cloudMonitoringResponse{}, err
}
for d.NextPageToken != "" {
if params != nil {
params["pageToken"] = []string{d.NextPageToken}
}
if body != nil {
body["pageToken"] = d.NextPageToken
}
nextPage, err := doRequestPage(ctx, logger, r, dsInfo, params, body)
if err != nil {
return cloudMonitoringResponse{}, err
}
d.TimeSeries = append(d.TimeSeries, nextPage.TimeSeries...)
d.TimeSeriesData = append(d.TimeSeriesData, nextPage.TimeSeriesData...)
d.NextPageToken = nextPage.NextPageToken
}
return d, nil
}
func traceReq(ctx context.Context, tracer tracing.Tracer, req *backend.QueryDataRequest, dsInfo datasourceInfo, r *http.Request, target string) tracing.Span {
ctx, span := tracer.Start(ctx, "cloudMonitoring query")
span.SetAttributes("target", target, attribute.Key("target").String(target))
span.SetAttributes("from", req.Queries[0].TimeRange.From, attribute.Key("from").String(req.Queries[0].TimeRange.From.String()))
span.SetAttributes("until", req.Queries[0].TimeRange.To, attribute.Key("until").String(req.Queries[0].TimeRange.To.String()))
span.SetAttributes("datasource_id", dsInfo.id, attribute.Key("datasource_id").Int64(dsInfo.id))
span.SetAttributes("org_id", req.PluginContext.OrgID, attribute.Key("org_id").Int64(req.PluginContext.OrgID))
tracer.Inject(ctx, r.Header, span)
return span
}
func runTimeSeriesRequest(ctx context.Context, logger log.Logger, req *backend.QueryDataRequest,
s *Service, dsInfo datasourceInfo, tracer tracing.Tracer, projectName string, params url.Values, body map[string]interface{}) (*backend.DataResponse, cloudMonitoringResponse, string, error) {
dr := &backend.DataResponse{}
projectName, err := s.ensureProject(ctx, dsInfo, projectName)
if err != nil {
dr.Error = err
return dr, cloudMonitoringResponse{}, "", nil
}
timeSeriesMethod := "timeSeries"
if body != nil {
timeSeriesMethod += ":query"
}
r, err := createRequest(ctx, logger, &dsInfo, path.Join("/v3/projects", projectName, timeSeriesMethod), nil)
if err != nil {
dr.Error = err
return dr, cloudMonitoringResponse{}, "", nil
}
span := traceReq(ctx, tracer, req, dsInfo, r, params.Encode())
defer span.End()
d, err := doRequestWithPagination(ctx, logger, r, dsInfo, params, body)
if err != nil {
dr.Error = err
return dr, cloudMonitoringResponse{}, "", nil
}
return dr, d, r.URL.RawQuery, nil
}