diff --git a/docs/sources/datasources/cloudmonitoring.md b/docs/sources/datasources/cloudmonitoring.md index 2d80e29b553..38a97978eba 100644 --- a/docs/sources/datasources/cloudmonitoring.md +++ b/docs/sources/datasources/cloudmonitoring.md @@ -222,6 +222,27 @@ The Alias By field allows you to control the format of the legend keys for SLO q SLO queries use the same [alignment period functionality as metric queries]({{< relref "#metric-queries" >}}). +### MQL (Monitoring Query Language) queries + +> **Note:** Only available in Grafana v7.4+. + +The MQL query builder in the Google Cloud Monitoring data source allows you to display MQL results in time series format. To get an understanding of the basic concepts in MQL, refer to [Introduction to Monitoring Query Language](https://cloud.google.com/monitoring/mql). + +#### Create an MQL query + +To create an MQL query, follow these steps: + +1. In the **Query Type** list, select **Metrics**. +2. Click **<> Edit MQL** right next to the **Query Type** field. This will toggle the metric query builder mode so that raw MQL queries can be used. +3. Choose a project from the **Project** list. +4. Add the [MQL](https://cloud.google.com/monitoring/mql/query-language) query of your choice in the text area. + +#### Alias patterns for MQL queries + +MQL queries use the same alias patterns as [metric queries]({{< relref "#metric-queries" >}}). + +`{{metric.service}}` is not supported. `{{metric.type}}` and `{{metric.name}}` show the time series key in the response. + ## Templating Instead of hard-coding things like server, application and sensor name in your metric queries you can use variables in their place. diff --git a/pkg/tsdb/cloudmonitoring/annotation_query.go b/pkg/tsdb/cloudmonitoring/annotation_query.go index 3ad31965c0f..0aa5cea4a2f 100644 --- a/pkg/tsdb/cloudmonitoring/annotation_query.go +++ b/pkg/tsdb/cloudmonitoring/annotation_query.go @@ -2,9 +2,7 @@ package cloudmonitoring import ( "context" - "strconv" "strings" - "time" "github.com/grafana/grafana/pkg/tsdb" ) @@ -16,12 +14,12 @@ func (e *CloudMonitoringExecutor) executeAnnotationQuery(ctx context.Context, ts firstQuery := tsdbQuery.Queries[0] - queries, err := e.buildQueries(tsdbQuery) + queries, err := e.buildQueryExecutors(tsdbQuery) if err != nil { return nil, err } - queryRes, resp, err := e.executeQuery(ctx, queries[0], tsdbQuery) + queryRes, resp, _, err := queries[0].run(ctx, tsdbQuery, e) if err != nil { return nil, err } @@ -30,36 +28,12 @@ func (e *CloudMonitoringExecutor) executeAnnotationQuery(ctx context.Context, ts title := metricQuery.Get("title").MustString() text := metricQuery.Get("text").MustString() tags := metricQuery.Get("tags").MustString() - err = e.parseToAnnotations(queryRes, resp, queries[0], title, text, tags) + err = queries[0].parseToAnnotations(queryRes, resp, title, text, tags) result.Results[firstQuery.RefId] = queryRes return result, err } -func (e *CloudMonitoringExecutor) parseToAnnotations(queryRes *tsdb.QueryResult, data cloudMonitoringResponse, query *cloudMonitoringQuery, title string, text string, tags string) error { - annotations := make([]map[string]string, 0) - - for _, series := range data.TimeSeries { - // reverse the order to be ascending - for i := len(series.Points) - 1; i >= 0; i-- { - point := series.Points[i] - value := strconv.FormatFloat(point.Value.DoubleValue, 'f', 6, 64) - if series.ValueType == "STRING" { - value = point.Value.StringValue - } - annotation := make(map[string]string) - annotation["time"] = point.Interval.EndTime.UTC().Format(time.RFC3339) - annotation["title"] = formatAnnotationText(title, value, series.Metric.Type, series.Metric.Labels, series.Resource.Labels) - annotation["tags"] = tags - annotation["text"] = formatAnnotationText(text, value, series.Metric.Type, series.Metric.Labels, series.Resource.Labels) - annotations = append(annotations, annotation) - } - } - - transformAnnotationToTable(annotations, queryRes) - return nil -} - func transformAnnotationToTable(data []map[string]string, result *tsdb.QueryResult) { table := &tsdb.Table{ Columns: make([]tsdb.TableColumn, 4), diff --git a/pkg/tsdb/cloudmonitoring/annotation_query_test.go b/pkg/tsdb/cloudmonitoring/annotation_query_test.go index 091471ee82f..b0d50acf140 100644 --- a/pkg/tsdb/cloudmonitoring/annotation_query_test.go +++ b/pkg/tsdb/cloudmonitoring/annotation_query_test.go @@ -15,10 +15,9 @@ func TestCloudMonitoringExecutor_parseToAnnotations(t *testing.T) { require.Len(t, data.TimeSeries, 3) res := &tsdb.QueryResult{Meta: simplejson.New(), RefId: "annotationQuery"} - query := &cloudMonitoringQuery{} + query := &cloudMonitoringTimeSeriesFilter{} - executor := &CloudMonitoringExecutor{} - err = executor.parseToAnnotations(res, data, query, "atitle {{metric.label.instance_name}} {{metric.value}}", "atext {{resource.label.zone}}", "atag") + err = query.parseToAnnotations(res, data, "atitle {{metric.label.instance_name}} {{metric.value}}", "atext {{resource.label.zone}}", "atag") require.NoError(t, err) require.Len(t, res.Tables, 1) diff --git a/pkg/tsdb/cloudmonitoring/cloudmonitoring.go b/pkg/tsdb/cloudmonitoring/cloudmonitoring.go index 7065521e4f1..d716334b8eb 100644 --- a/pkg/tsdb/cloudmonitoring/cloudmonitoring.go +++ b/pkg/tsdb/cloudmonitoring/cloudmonitoring.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "io" "io/ioutil" "math" "net/http" @@ -23,9 +24,6 @@ import ( "github.com/grafana/grafana/pkg/plugins" "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/tsdb" - "github.com/grafana/grafana/pkg/tsdb/sqleng" - "github.com/opentracing/opentracing-go" - "golang.org/x/net/context/ctxhttp" "golang.org/x/oauth2/google" ) @@ -61,6 +59,7 @@ const ( jwtAuthentication string = "jwt" metricQueryType string = "metrics" sloQueryType string = "slo" + mqlEditorMode string = "mql" ) // CloudMonitoringExecutor executes queries for the CloudMonitoring datasource @@ -127,117 +126,30 @@ func (e *CloudMonitoringExecutor) getGCEDefaultProject(ctx context.Context, tsdb return result, nil } -func (query *cloudMonitoringQuery) isSLO() bool { - return query.Slo != "" -} - -func (query *cloudMonitoringQuery) buildDeepLink() string { - if query.isSLO() { - return "" - } - - filter := query.Params.Get("filter") - if !strings.Contains(filter, "resource.type=") { - resourceType := query.Params.Get("resourceType") - if resourceType == "" { - slog.Error("Failed to generate deep link: no resource type found", "ProjectName", query.ProjectName, "query", query.RefID) - return "" - } - filter = fmt.Sprintf(`resource.type="%s" %s`, resourceType, filter) - } - - u, err := url.Parse("https://console.cloud.google.com/monitoring/metrics-explorer") - if err != nil { - slog.Error("Failed to generate deep link: unable to parse metrics explorer URL", "ProjectName", query.ProjectName, "query", query.RefID) - return "" - } - - q := u.Query() - q.Set("project", query.ProjectName) - q.Set("Grafana_deeplink", "true") - - pageState := map[string]interface{}{ - "xyChart": map[string]interface{}{ - "constantLines": []string{}, - "dataSets": []map[string]interface{}{ - { - "timeSeriesFilter": map[string]interface{}{ - "aggregations": []string{}, - "crossSeriesReducer": query.Params.Get("aggregation.crossSeriesReducer"), - "filter": filter, - "groupByFields": query.Params["aggregation.groupByFields"], - "minAlignmentPeriod": strings.TrimPrefix(query.Params.Get("aggregation.alignmentPeriod"), "+"), // get rid of leading + - "perSeriesAligner": query.Params.Get("aggregation.perSeriesAligner"), - "secondaryGroupByFields": []string{}, - "unitOverride": "1", - }, - }, - }, - "timeshiftDuration": "0s", - "y1Axis": map[string]string{ - "label": "y1Axis", - "scale": "LINEAR", - }, - }, - "timeSelection": map[string]string{ - "timeRange": "custom", - "start": query.Params.Get("interval.startTime"), - "end": query.Params.Get("interval.endTime"), - }, - } - - blob, err := json.Marshal(pageState) - if err != nil { - slog.Error("Failed to generate deep link", "pageState", pageState, "ProjectName", query.ProjectName, "query", query.RefID) - return "" - } - - q.Set("pageState", string(blob)) - u.RawQuery = q.Encode() - - accountChooserURL, err := url.Parse("https://accounts.google.com/AccountChooser") - if err != nil { - slog.Error("Failed to generate deep link: unable to parse account chooser URL", "ProjectName", query.ProjectName, "query", query.RefID) - return "" - } - accountChooserQuery := accountChooserURL.Query() - accountChooserQuery.Set("continue", u.String()) - accountChooserURL.RawQuery = accountChooserQuery.Encode() - - return accountChooserURL.String() -} - func (e *CloudMonitoringExecutor) executeTimeSeriesQuery(ctx context.Context, tsdbQuery *tsdb.TsdbQuery) (*tsdb.Response, error) { result := &tsdb.Response{ Results: make(map[string]*tsdb.QueryResult), } - queries, err := e.buildQueries(tsdbQuery) + queryExecutors, err := e.buildQueryExecutors(tsdbQuery) if err != nil { return nil, err } - unit := e.resolvePanelUnitFromQueries(queries) + unit := e.resolvePanelUnitFromQueries(queryExecutors) - for _, query := range queries { - queryRes, resp, err := e.executeQuery(ctx, query, tsdbQuery) + for _, queryExecutor := range queryExecutors { + queryRes, resp, executedQueryString, err := queryExecutor.run(ctx, tsdbQuery, e) if err != nil { return nil, err } - - resourceType := "" - for _, s := range resp.TimeSeries { - resourceType = s.Resource.Type - // set the first resource type found - break - } - query.Params.Set("resourceType", resourceType) - - err = e.parseResponse(queryRes, resp, query) + err = queryExecutor.parseResponse(queryRes, resp, executedQueryString) if err != nil { queryRes.Error = err } + result.Results[queryExecutor.getRefID()] = queryRes + if len(unit) > 0 { frames, _ := queryRes.Dataframes.Decoded() for i := range frames { @@ -248,20 +160,20 @@ func (e *CloudMonitoringExecutor) executeTimeSeriesQuery(ctx context.Context, ts } queryRes.Dataframes = tsdb.NewDecodedDataFrames(frames) } - result.Results[query.RefID] = queryRes + result.Results[queryExecutor.getRefID()] = queryRes } return result, nil } -func (e *CloudMonitoringExecutor) resolvePanelUnitFromQueries(queries []*cloudMonitoringQuery) string { - if len(queries) == 0 { +func (e *CloudMonitoringExecutor) resolvePanelUnitFromQueries(executors []cloudMonitoringQueryExecutor) string { + if len(executors) == 0 { return "" } - unit := queries[0].Unit - if len(queries) > 1 { - for _, query := range queries[1:] { - if query.Unit != unit { + unit := executors[0].getUnit() + if len(executors) > 1 { + for _, query := range executors[1:] { + if query.getUnit() != unit { return "" } } @@ -274,8 +186,8 @@ func (e *CloudMonitoringExecutor) resolvePanelUnitFromQueries(queries []*cloudMo return "" } -func (e *CloudMonitoringExecutor) buildQueries(tsdbQuery *tsdb.TsdbQuery) ([]*cloudMonitoringQuery, error) { - cloudMonitoringQueries := []*cloudMonitoringQuery{} +func (e *CloudMonitoringExecutor) buildQueryExecutors(tsdbQuery *tsdb.TsdbQuery) ([]cloudMonitoringQueryExecutor, error) { + cloudMonitoringQueryExecutors := []cloudMonitoringQueryExecutor{} startTime, err := tsdbQuery.TimeRange.ParseFrom() if err != nil { @@ -301,43 +213,59 @@ func (e *CloudMonitoringExecutor) buildQueries(tsdbQuery *tsdb.TsdbQuery) ([]*cl params.Add("interval.startTime", startTime.UTC().Format(time.RFC3339)) params.Add("interval.endTime", endTime.UTC().Format(time.RFC3339)) - sq := &cloudMonitoringQuery{ + var queryInterface cloudMonitoringQueryExecutor + cmtsf := &cloudMonitoringTimeSeriesFilter{ RefID: query.RefId, GroupBys: []string{}, } - if q.QueryType == metricQueryType { - sq.AliasBy = q.MetricQuery.AliasBy - sq.GroupBys = append(sq.GroupBys, q.MetricQuery.GroupBys...) - sq.ProjectName = q.MetricQuery.ProjectName - if q.MetricQuery.View == "" { - q.MetricQuery.View = "FULL" + switch q.QueryType { + case metricQueryType: + if q.MetricQuery.EditorMode == mqlEditorMode { + queryInterface = &cloudMonitoringTimeSeriesQuery{ + RefID: query.RefId, + ProjectName: q.MetricQuery.ProjectName, + Query: q.MetricQuery.Query, + IntervalMS: query.IntervalMs, + AliasBy: q.MetricQuery.AliasBy, + timeRange: tsdbQuery.TimeRange, + } + } else { + cmtsf.AliasBy = q.MetricQuery.AliasBy + cmtsf.ProjectName = q.MetricQuery.ProjectName + cmtsf.GroupBys = append(cmtsf.GroupBys, q.MetricQuery.GroupBys...) + if q.MetricQuery.View == "" { + q.MetricQuery.View = "FULL" + } + params.Add("filter", buildFilterString(q.MetricQuery.MetricType, q.MetricQuery.Filters)) + params.Add("view", q.MetricQuery.View) + setMetricAggParams(¶ms, &q.MetricQuery, durationSeconds, query.IntervalMs) + queryInterface = cmtsf } - params.Add("filter", buildFilterString(q.MetricQuery.MetricType, q.MetricQuery.Filters)) - params.Add("view", q.MetricQuery.View) - setMetricAggParams(¶ms, &q.MetricQuery, durationSeconds, query.IntervalMs) - } else if q.QueryType == sloQueryType { - sq.AliasBy = q.SloQuery.AliasBy - sq.ProjectName = q.SloQuery.ProjectName - sq.Selector = q.SloQuery.SelectorName - sq.Service = q.SloQuery.ServiceId - sq.Slo = q.SloQuery.SloId + case sloQueryType: + cmtsf.AliasBy = q.SloQuery.AliasBy + cmtsf.ProjectName = q.SloQuery.ProjectName + cmtsf.Selector = q.SloQuery.SelectorName + cmtsf.Service = q.SloQuery.ServiceId + cmtsf.Slo = q.SloQuery.SloId params.Add("filter", buildSLOFilterExpression(q.SloQuery)) setSloAggParams(¶ms, &q.SloQuery, durationSeconds, query.IntervalMs) + queryInterface = cmtsf } target = params.Encode() - sq.Target = target - sq.Params = params - sq.Unit = q.MetricQuery.Unit + cmtsf.Target = target + cmtsf.Params = params + cmtsf.Unit = q.MetricQuery.Unit + if setting.Env == setting.Dev { slog.Debug("CloudMonitoring request", "params", params) } - cloudMonitoringQueries = append(cloudMonitoringQueries, sq) + cloudMonitoringQueryExecutors = append(cloudMonitoringQueryExecutors, queryInterface) } - return cloudMonitoringQueries, nil + return cloudMonitoringQueryExecutors, nil } func migrateLegacyQueryModel(query *tsdb.Query) { @@ -461,289 +389,6 @@ func calculateAlignmentPeriod(alignmentPeriod string, intervalMs int64, duration return alignmentPeriod } -func (e *CloudMonitoringExecutor) executeQuery(ctx context.Context, query *cloudMonitoringQuery, tsdbQuery *tsdb.TsdbQuery) (*tsdb.QueryResult, cloudMonitoringResponse, error) { - queryResult := &tsdb.QueryResult{Meta: simplejson.New(), RefId: query.RefID} - projectName := query.ProjectName - if projectName == "" { - defaultProject, err := e.getDefaultProject(ctx) - if err != nil { - queryResult.Error = err - return queryResult, cloudMonitoringResponse{}, nil - } - projectName = defaultProject - slog.Info("No project name set on query, using project name from datasource", "projectName", projectName) - } - - req, err := e.createRequest(ctx, e.dsInfo, query, fmt.Sprintf("cloudmonitoring%s", "v3/projects/"+projectName+"/timeSeries")) - if err != nil { - queryResult.Error = err - return queryResult, cloudMonitoringResponse{}, nil - } - - req.URL.RawQuery = query.Params.Encode() - queryResult.Meta.Set(sqleng.MetaKeyExecutedQueryString, req.URL.RawQuery) - alignmentPeriod, ok := req.URL.Query()["aggregation.alignmentPeriod"] - - if ok { - seconds, err := strconv.ParseInt(alignmentPeriodRe.FindString(alignmentPeriod[0]), 10, 64) - if err == nil { - queryResult.Meta.Set("alignmentPeriod", seconds) - } - } - - span, ctx := opentracing.StartSpanFromContext(ctx, "cloudMonitoring query") - span.SetTag("target", query.Target) - span.SetTag("from", tsdbQuery.TimeRange.From) - span.SetTag("until", tsdbQuery.TimeRange.To) - span.SetTag("datasource_id", e.dsInfo.Id) - span.SetTag("org_id", e.dsInfo.OrgId) - - defer span.Finish() - - if err := opentracing.GlobalTracer().Inject( - span.Context(), - opentracing.HTTPHeaders, - opentracing.HTTPHeadersCarrier(req.Header)); err != nil { - queryResult.Error = err - return queryResult, cloudMonitoringResponse{}, nil - } - - res, err := ctxhttp.Do(ctx, e.httpClient, req) - if err != nil { - queryResult.Error = err - return queryResult, cloudMonitoringResponse{}, nil - } - - data, err := e.unmarshalResponse(res) - if err != nil { - queryResult.Error = err - return queryResult, cloudMonitoringResponse{}, nil - } - - return queryResult, data, nil -} - -func (e *CloudMonitoringExecutor) unmarshalResponse(res *http.Response) (cloudMonitoringResponse, error) { - body, err := ioutil.ReadAll(res.Body) - if err != nil { - return cloudMonitoringResponse{}, err - } - defer func() { - if err := res.Body.Close(); err != nil { - slog.Warn("Failed to close response body", "err", err) - } - }() - - if res.StatusCode/100 != 2 { - slog.Error("Request failed", "status", res.Status, "body", string(body)) - return cloudMonitoringResponse{}, fmt.Errorf(string(body)) - } - - var data cloudMonitoringResponse - err = json.Unmarshal(body, &data) - if err != nil { - slog.Error("Failed to unmarshal CloudMonitoring response", "error", err, "status", res.Status, "body", string(body)) - return cloudMonitoringResponse{}, err - } - - return data, nil -} - -func handleDistributionSeries(series timeSeries, defaultMetricName string, seriesLabels map[string]string, - query *cloudMonitoringQuery, queryRes *tsdb.QueryResult, frame *data.Frame) { - for i := 0; i < len(series.Points); i++ { - point := series.Points[i] - value := point.Value.DoubleValue - - if series.ValueType == "INT64" { - parsedValue, err := strconv.ParseFloat(point.Value.IntValue, 64) - if err == nil { - value = parsedValue - } - } - - if series.ValueType == "BOOL" { - if point.Value.BoolValue { - value = 1 - } else { - value = 0 - } - } - frame.SetRow(len(series.Points)-1-i, point.Interval.EndTime, value) - } - - metricName := formatLegendKeys(series.Metric.Type, defaultMetricName, seriesLabels, nil, query) - dataField := frame.Fields[1] - dataField.Name = metricName -} - -func (e *CloudMonitoringExecutor) parseResponse(queryRes *tsdb.QueryResult, cmr cloudMonitoringResponse, query *cloudMonitoringQuery) error { - labels := make(map[string]map[string]bool) - frames := data.Frames{} - for _, series := range cmr.TimeSeries { - seriesLabels := data.Labels{} - defaultMetricName := series.Metric.Type - - labels["resource.type"] = map[string]bool{series.Resource.Type: true} - seriesLabels["resource.type"] = series.Resource.Type - - frame := data.NewFrameOfFieldTypes("", len(series.Points), data.FieldTypeTime, data.FieldTypeFloat64) - frame.RefID = query.RefID - - for key, value := range series.Metric.Labels { - if _, ok := labels["metric.label."+key]; !ok { - labels["metric.label."+key] = map[string]bool{} - } - labels["metric.label."+key][value] = true - seriesLabels["metric.label."+key] = value - - if len(query.GroupBys) == 0 || containsLabel(query.GroupBys, "metric.label."+key) { - defaultMetricName += " " + value - } - } - - for key, value := range series.Resource.Labels { - if _, ok := labels["resource.label."+key]; !ok { - labels["resource.label."+key] = map[string]bool{} - } - labels["resource.label."+key][value] = true - seriesLabels["resource.label."+key] = value - - if containsLabel(query.GroupBys, "resource.label."+key) { - defaultMetricName += " " + value - } - } - - for labelType, labelTypeValues := range series.MetaData { - for labelKey, labelValue := range labelTypeValues { - key := toSnakeCase(fmt.Sprintf("metadata.%s.%s", labelType, labelKey)) - if _, ok := labels[key]; !ok { - labels[key] = map[string]bool{} - } - - switch v := labelValue.(type) { - case string: - labels[key][v] = true - seriesLabels[key] = v - case bool: - strVal := strconv.FormatBool(v) - labels[key][strVal] = true - seriesLabels[key] = strVal - case []interface{}: - for _, v := range v { - strVal := v.(string) - labels[key][strVal] = true - if len(seriesLabels[key]) > 0 { - strVal = fmt.Sprintf("%s, %s", seriesLabels[key], strVal) - } - seriesLabels[key] = strVal - } - } - } - } - - // reverse the order to be ascending - if series.ValueType != "DISTRIBUTION" { - handleDistributionSeries( - series, defaultMetricName, seriesLabels, query, queryRes, frame) - frames = append(frames, frame) - } else { - buckets := make(map[int]*data.Frame) - for i := len(series.Points) - 1; i >= 0; i-- { - point := series.Points[i] - if len(point.Value.DistributionValue.BucketCounts) == 0 { - continue - } - maxKey := 0 - for i := 0; i < len(point.Value.DistributionValue.BucketCounts); i++ { - value, err := strconv.ParseFloat(point.Value.DistributionValue.BucketCounts[i], 64) - if err != nil { - continue - } - if _, ok := buckets[i]; !ok { - // set lower bounds - // https://cloud.google.com/monitoring/api/ref_v3/rest/v3/TimeSeries#Distribution - bucketBound := calcBucketBound(point.Value.DistributionValue.BucketOptions, i) - additionalLabels := map[string]string{"bucket": bucketBound} - - timeField := data.NewField(data.TimeSeriesTimeFieldName, nil, []time.Time{}) - valueField := data.NewField(data.TimeSeriesValueFieldName, nil, []float64{}) - - frameName := formatLegendKeys(series.Metric.Type, defaultMetricName, nil, additionalLabels, query) - valueField.Name = frameName - buckets[i] = &data.Frame{ - Name: frameName, - Fields: []*data.Field{ - timeField, - valueField, - }, - RefID: query.RefID, - } - - if maxKey < i { - maxKey = i - } - } - buckets[i].AppendRow(point.Interval.EndTime, value) - } - for i := 0; i < maxKey; i++ { - if _, ok := buckets[i]; !ok { - bucketBound := calcBucketBound(point.Value.DistributionValue.BucketOptions, i) - additionalLabels := data.Labels{"bucket": bucketBound} - timeField := data.NewField(data.TimeSeriesTimeFieldName, nil, []time.Time{}) - valueField := data.NewField(data.TimeSeriesValueFieldName, nil, []float64{}) - frameName := formatLegendKeys(series.Metric.Type, defaultMetricName, seriesLabels, additionalLabels, query) - valueField.Name = frameName - buckets[i] = &data.Frame{ - Name: frameName, - Fields: []*data.Field{ - timeField, - valueField, - }, - RefID: query.RefID, - } - } - } - } - for i := 0; i < len(buckets); i++ { - frames = append(frames, buckets[i]) - } - } - } - if len(cmr.TimeSeries) > 0 { - frames = addConfigData(frames, query) - } - - queryRes.Dataframes = tsdb.NewDecodedDataFrames(frames) - - labelsByKey := make(map[string][]string) - for key, values := range labels { - for value := range values { - labelsByKey[key] = append(labelsByKey[key], value) - } - } - - queryRes.Meta.Set("labels", labelsByKey) - queryRes.Meta.Set("groupBys", query.GroupBys) - return nil -} - -func addConfigData(frames data.Frames, query *cloudMonitoringQuery) data.Frames { - dl := query.buildDeepLink() - for i := range frames { - if frames[i].Fields[1].Config == nil { - frames[i].Fields[1].Config = &data.FieldConfig{} - } - deepLink := data.DataLink{ - Title: "View in Metrics Explorer", - TargetBlank: true, - URL: dl, - } - frames[i].Fields[1].Config.Links = append(frames[i].Fields[1].Config.Links, deepLink) - } - return frames -} - func toSnakeCase(str string) string { return strings.ToLower(matchAllCap.ReplaceAllString(str, "${1}_${2}")) } @@ -757,7 +402,7 @@ func containsLabel(labels []string, newLabel string) bool { return false } -func formatLegendKeys(metricType string, defaultMetricName string, labels map[string]string, additionalLabels map[string]string, query *cloudMonitoringQuery) string { +func formatLegendKeys(metricType string, defaultMetricName string, labels map[string]string, additionalLabels map[string]string, query *cloudMonitoringTimeSeriesFilter) string { if query.AliasBy == "" { return defaultMetricName } @@ -843,14 +488,18 @@ func calcBucketBound(bucketOptions cloudMonitoringBucketOptions, n int) string { return bucketBound } -func (e *CloudMonitoringExecutor) createRequest(ctx context.Context, dsInfo *models.DataSource, query *cloudMonitoringQuery, proxyPass string) (*http.Request, error) { +func (e *CloudMonitoringExecutor) createRequest(ctx context.Context, dsInfo *models.DataSource, proxyPass string, body io.Reader) (*http.Request, error) { u, err := url.Parse(dsInfo.Url) if err != nil { return nil, err } u.Path = path.Join(u.Path, "render") - req, err := http.NewRequest(http.MethodGet, "https://monitoring.googleapis.com/", nil) + method := http.MethodGet + if body != nil { + method = http.MethodPost + } + req, err := http.NewRequest(method, "https://monitoring.googleapis.com/", body) if err != nil { slog.Error("Failed to create request", "error", err) return nil, fmt.Errorf("failed to create request: %w", err) @@ -897,3 +546,45 @@ func (e *CloudMonitoringExecutor) getDefaultProject(ctx context.Context) (string } return e.dsInfo.JsonData.Get("defaultProject").MustString(), nil } + +func unmarshalResponse(res *http.Response) (cloudMonitoringResponse, error) { + body, err := ioutil.ReadAll(res.Body) + if err != nil { + return cloudMonitoringResponse{}, err + } + + defer func() { + if err := res.Body.Close(); err != nil { + slog.Warn("Failed to close response body", "err", err) + } + }() + + if res.StatusCode/100 != 2 { + slog.Error("Request failed", "status", res.Status, "body", string(body)) + return cloudMonitoringResponse{}, fmt.Errorf("query failed: %s", string(body)) + } + + var data cloudMonitoringResponse + err = json.Unmarshal(body, &data) + if err != nil { + slog.Error("Failed to unmarshal CloudMonitoring response", "error", err, "status", res.Status, "body", string(body)) + return cloudMonitoringResponse{}, fmt.Errorf("failed to unmarshal query response: %w", err) + } + + return data, nil +} + +func addConfigData(frames data.Frames, dl string) data.Frames { + for i := range frames { + if frames[i].Fields[1].Config == nil { + frames[i].Fields[1].Config = &data.FieldConfig{} + } + deepLink := data.DataLink{ + Title: "View in Metrics Explorer", + TargetBlank: true, + URL: dl, + } + frames[i].Fields[1].Config.Links = append(frames[i].Fields[1].Config.Links, deepLink) + } + return frames +} diff --git a/pkg/tsdb/cloudmonitoring/cloudmonitoring_test.go b/pkg/tsdb/cloudmonitoring/cloudmonitoring_test.go index 2766413b338..0caebfbde56 100644 --- a/pkg/tsdb/cloudmonitoring/cloudmonitoring_test.go +++ b/pkg/tsdb/cloudmonitoring/cloudmonitoring_test.go @@ -40,8 +40,9 @@ func TestCloudMonitoring(t *testing.T) { } Convey("and query has no aggregation set", func() { - queries, err := executor.buildQueries(tsdbQuery) + qes, err := executor.buildQueryExecutors(tsdbQuery) So(err, ShouldBeNil) + queries := getCloudMonitoringQueriesFromInterface(qes) So(len(queries), ShouldEqual, 1) So(queries[0].RefID, ShouldEqual, "A") @@ -55,13 +56,10 @@ func TestCloudMonitoring(t *testing.T) { So(queries[0].AliasBy, ShouldEqual, "testalias") Convey("and generated deep link has correct parameters", func() { - dl := queries[0].buildDeepLink() - So(dl, ShouldBeEmpty) // no resource type found - // assign resource type to query parameters to be included in the deep link filter // in the actual workflow this information comes from the response of the Monitoring API queries[0].Params.Set("resourceType", "a/resource/type") - dl = queries[0].buildDeepLink() + dl := queries[0].buildDeepLink() expectedTimeSelection := map[string]string{ "timeRange": "custom", @@ -82,8 +80,9 @@ func TestCloudMonitoring(t *testing.T) { "filters": []interface{}{"key", "=", "value", "AND", "key2", "=", "value2", "AND", "resource.type", "=", "another/resource/type"}, }) - queries, err := executor.buildQueries(tsdbQuery) + qes, err := executor.buildQueryExecutors(tsdbQuery) So(err, ShouldBeNil) + queries := getCloudMonitoringQueriesFromInterface(qes) So(len(queries), ShouldEqual, 1) So(queries[0].Params["filter"][0], ShouldEqual, `metric.type="a/metric/type" key="value" key2="value2" resource.type="another/resource/type"`) @@ -114,8 +113,9 @@ func TestCloudMonitoring(t *testing.T) { "filters": []interface{}{"key", "=", "value", "AND", "key2", "=", "value2"}, }) - queries, err := executor.buildQueries(tsdbQuery) + qes, err := executor.buildQueryExecutors(tsdbQuery) So(err, ShouldBeNil) + queries := getCloudMonitoringQueriesFromInterface(qes) So(queries[0].Params["aggregation.alignmentPeriod"][0], ShouldEqual, `+1000s`) Convey("and generated deep link has correct parameters", func() { @@ -142,8 +142,9 @@ func TestCloudMonitoring(t *testing.T) { "filters": []interface{}{"key", "=", "value", "AND", "key2", "=", "value2"}, }) - queries, err := executor.buildQueries(tsdbQuery) + qes, err := executor.buildQueryExecutors(tsdbQuery) So(err, ShouldBeNil) + queries := getCloudMonitoringQueriesFromInterface(qes) So(queries[0].Params["aggregation.alignmentPeriod"][0], ShouldEqual, `+60s`) Convey("and generated deep link has correct parameters", func() { @@ -174,8 +175,9 @@ func TestCloudMonitoring(t *testing.T) { "alignmentPeriod": "cloud-monitoring-auto", }) - queries, err := executor.buildQueries(tsdbQuery) + qes, err := executor.buildQueryExecutors(tsdbQuery) So(err, ShouldBeNil) + queries := getCloudMonitoringQueriesFromInterface(qes) So(queries[0].Params["aggregation.alignmentPeriod"][0], ShouldEqual, `+60s`) }) @@ -187,8 +189,9 @@ func TestCloudMonitoring(t *testing.T) { "alignmentPeriod": "cloud-monitoring-auto", }) - queries, err := executor.buildQueries(tsdbQuery) + qes, err := executor.buildQueryExecutors(tsdbQuery) So(err, ShouldBeNil) + queries := getCloudMonitoringQueriesFromInterface(qes) So(queries[0].Params["aggregation.alignmentPeriod"][0], ShouldEqual, `+60s`) }) @@ -200,8 +203,9 @@ func TestCloudMonitoring(t *testing.T) { "alignmentPeriod": "cloud-monitoring-auto", }) - queries, err := executor.buildQueries(tsdbQuery) + qes, err := executor.buildQueryExecutors(tsdbQuery) So(err, ShouldBeNil) + queries := getCloudMonitoringQueriesFromInterface(qes) So(queries[0].Params["aggregation.alignmentPeriod"][0], ShouldEqual, `+300s`) }) @@ -213,8 +217,9 @@ func TestCloudMonitoring(t *testing.T) { "alignmentPeriod": "cloud-monitoring-auto", }) - queries, err := executor.buildQueries(tsdbQuery) + qes, err := executor.buildQueryExecutors(tsdbQuery) So(err, ShouldBeNil) + queries := getCloudMonitoringQueriesFromInterface(qes) So(queries[0].Params["aggregation.alignmentPeriod"][0], ShouldEqual, `+3600s`) }) }) @@ -228,8 +233,9 @@ func TestCloudMonitoring(t *testing.T) { "alignmentPeriod": "stackdriver-auto", }) - queries, err := executor.buildQueries(tsdbQuery) + qes, err := executor.buildQueryExecutors(tsdbQuery) So(err, ShouldBeNil) + queries := getCloudMonitoringQueriesFromInterface(qes) So(queries[0].Params["aggregation.alignmentPeriod"][0], ShouldEqual, `+60s`) Convey("and generated deep link has correct parameters", func() { @@ -258,8 +264,9 @@ func TestCloudMonitoring(t *testing.T) { "alignmentPeriod": "stackdriver-auto", }) - queries, err := executor.buildQueries(tsdbQuery) + qes, err := executor.buildQueryExecutors(tsdbQuery) So(err, ShouldBeNil) + queries := getCloudMonitoringQueriesFromInterface(qes) So(queries[0].Params["aggregation.alignmentPeriod"][0], ShouldEqual, `+60s`) Convey("and generated deep link has correct parameters", func() { @@ -288,8 +295,9 @@ func TestCloudMonitoring(t *testing.T) { "alignmentPeriod": "stackdriver-auto", }) - queries, err := executor.buildQueries(tsdbQuery) + qes, err := executor.buildQueryExecutors(tsdbQuery) So(err, ShouldBeNil) + queries := getCloudMonitoringQueriesFromInterface(qes) So(queries[0].Params["aggregation.alignmentPeriod"][0], ShouldEqual, `+300s`) Convey("and generated deep link has correct parameters", func() { @@ -318,8 +326,9 @@ func TestCloudMonitoring(t *testing.T) { "alignmentPeriod": "stackdriver-auto", }) - queries, err := executor.buildQueries(tsdbQuery) + qes, err := executor.buildQueryExecutors(tsdbQuery) So(err, ShouldBeNil) + queries := getCloudMonitoringQueriesFromInterface(qes) So(queries[0].Params["aggregation.alignmentPeriod"][0], ShouldEqual, `+3600s`) Convey("and generated deep link has correct parameters", func() { @@ -348,8 +357,9 @@ func TestCloudMonitoring(t *testing.T) { "alignmentPeriod": "+600s", }) - queries, err := executor.buildQueries(tsdbQuery) + qes, err := executor.buildQueryExecutors(tsdbQuery) So(err, ShouldBeNil) + queries := getCloudMonitoringQueriesFromInterface(qes) So(queries[0].Params["aggregation.alignmentPeriod"][0], ShouldEqual, `+600s`) Convey("and generated deep link has correct parameters", func() { @@ -378,8 +388,9 @@ func TestCloudMonitoring(t *testing.T) { "view": "FULL", }) - queries, err := executor.buildQueries(tsdbQuery) + qes, err := executor.buildQueryExecutors(tsdbQuery) So(err, ShouldBeNil) + queries := getCloudMonitoringQueriesFromInterface(qes) So(len(queries), ShouldEqual, 1) So(queries[0].RefID, ShouldEqual, "A") @@ -422,8 +433,9 @@ func TestCloudMonitoring(t *testing.T) { "view": "FULL", }) - queries, err := executor.buildQueries(tsdbQuery) + qes, err := executor.buildQueryExecutors(tsdbQuery) So(err, ShouldBeNil) + queries := getCloudMonitoringQueriesFromInterface(qes) So(len(queries), ShouldEqual, 1) So(queries[0].RefID, ShouldEqual, "A") @@ -484,8 +496,9 @@ func TestCloudMonitoring(t *testing.T) { } Convey("and query type is metrics", func() { - queries, err := executor.buildQueries(tsdbQuery) + qes, err := executor.buildQueryExecutors(tsdbQuery) So(err, ShouldBeNil) + queries := getCloudMonitoringQueriesFromInterface(qes) So(len(queries), ShouldEqual, 1) So(queries[0].RefID, ShouldEqual, "A") @@ -520,6 +533,34 @@ func TestCloudMonitoring(t *testing.T) { } verifyDeepLink(dl, expectedTimeSelection, expectedTimeSeriesFilter) }) + + Convey("and editor mode is MQL", func() { + tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{ + "queryType": metricQueryType, + "metricQuery": map[string]interface{}{ + "editorMode": mqlEditorMode, + "projectName": "test-proj", + "query": "test-query", + "aliasBy": "test-alias", + }, + "sloQuery": map[string]interface{}{}, + }) + + qes, err := executor.buildQueryExecutors(tsdbQuery) + So(err, ShouldBeNil) + queries := make([]*cloudMonitoringTimeSeriesQuery, 0) + for _, qi := range qes { + q, ok := qi.(*cloudMonitoringTimeSeriesQuery) + So(ok, ShouldBeTrue) + queries = append(queries, q) + } + + So(len(queries), ShouldEqual, 1) + So(queries[0].RefID, ShouldEqual, "A") + So(queries[0].ProjectName, ShouldEqual, "test-proj") + So(queries[0].Query, ShouldEqual, "test-query") + So(queries[0].AliasBy, ShouldEqual, "test-alias") + }) }) Convey("and query type is SLOs", func() { @@ -537,8 +578,9 @@ func TestCloudMonitoring(t *testing.T) { }, }) - queries, err := executor.buildQueries(tsdbQuery) + qes, err := executor.buildQueryExecutors(tsdbQuery) So(err, ShouldBeNil) + queries := getCloudMonitoringQueriesFromInterface(qes) So(len(queries), ShouldEqual, 1) So(queries[0].RefID, ShouldEqual, "A") @@ -565,8 +607,9 @@ func TestCloudMonitoring(t *testing.T) { }, }) - queries, err := executor.buildQueries(tsdbQuery) + qes, err := executor.buildQueryExecutors(tsdbQuery) So(err, ShouldBeNil) + queries := getCloudMonitoringQueriesFromInterface(qes) So(queries[0].Params["aggregation.perSeriesAligner"][0], ShouldEqual, "ALIGN_NEXT_OLDER") Convey("and empty deep link", func() { @@ -584,8 +627,8 @@ func TestCloudMonitoring(t *testing.T) { So(len(data.TimeSeries), ShouldEqual, 1) res := &tsdb.QueryResult{Meta: simplejson.New(), RefId: "A"} - query := &cloudMonitoringQuery{} - err = executor.parseResponse(res, data, query) + query := &cloudMonitoringTimeSeriesFilter{Params: url.Values{}} + err = query.parseResponse(res, data, "") So(err, ShouldBeNil) frames, _ := res.Dataframes.Decoded() So(len(frames), ShouldEqual, 1) @@ -610,8 +653,8 @@ func TestCloudMonitoring(t *testing.T) { So(len(data.TimeSeries), ShouldEqual, 3) res := &tsdb.QueryResult{Meta: simplejson.New(), RefId: "A"} - query := &cloudMonitoringQuery{} - err = executor.parseResponse(res, data, query) + query := &cloudMonitoringTimeSeriesFilter{Params: url.Values{}} + err = query.parseResponse(res, data, "") So(err, ShouldBeNil) frames, _ := res.Dataframes.Decoded() @@ -653,8 +696,8 @@ func TestCloudMonitoring(t *testing.T) { So(len(data.TimeSeries), ShouldEqual, 3) res := &tsdb.QueryResult{Meta: simplejson.New(), RefId: "A"} - query := &cloudMonitoringQuery{GroupBys: []string{"metric.label.instance_name", "resource.label.zone"}} - err = executor.parseResponse(res, data, query) + query := &cloudMonitoringTimeSeriesFilter{Params: url.Values{}, GroupBys: []string{"metric.label.instance_name", "resource.label.zone"}} + err = query.parseResponse(res, data, "") So(err, ShouldBeNil) frames, _ := res.Dataframes.Decoded() Convey("Should add instance name and zone labels to metric name", func() { @@ -673,8 +716,8 @@ func TestCloudMonitoring(t *testing.T) { res := &tsdb.QueryResult{Meta: simplejson.New(), RefId: "A"} Convey("and the alias pattern is for metric type, a metric label and a resource label", func() { - query := &cloudMonitoringQuery{AliasBy: "{{metric.type}} - {{metric.label.instance_name}} - {{resource.label.zone}}", GroupBys: []string{"metric.label.instance_name", "resource.label.zone"}} - err = executor.parseResponse(res, data, query) + query := &cloudMonitoringTimeSeriesFilter{Params: url.Values{}, AliasBy: "{{metric.type}} - {{metric.label.instance_name}} - {{resource.label.zone}}", GroupBys: []string{"metric.label.instance_name", "resource.label.zone"}} + err = query.parseResponse(res, data, "") So(err, ShouldBeNil) frames, _ := res.Dataframes.Decoded() Convey("Should use alias by formatting and only show instance name", func() { @@ -686,8 +729,8 @@ func TestCloudMonitoring(t *testing.T) { }) Convey("and the alias pattern is for metric name", func() { - query := &cloudMonitoringQuery{AliasBy: "metric {{metric.name}} service {{metric.service}}", GroupBys: []string{"metric.label.instance_name", "resource.label.zone"}} - err = executor.parseResponse(res, data, query) + query := &cloudMonitoringTimeSeriesFilter{Params: url.Values{}, AliasBy: "metric {{metric.name}} service {{metric.service}}", GroupBys: []string{"metric.label.instance_name", "resource.label.zone"}} + err = query.parseResponse(res, data, "") So(err, ShouldBeNil) frames, _ := res.Dataframes.Decoded() Convey("Should use alias by formatting and only show instance name", func() { @@ -705,8 +748,8 @@ func TestCloudMonitoring(t *testing.T) { So(len(data.TimeSeries), ShouldEqual, 1) res := &tsdb.QueryResult{Meta: simplejson.New(), RefId: "A"} - query := &cloudMonitoringQuery{AliasBy: "{{bucket}}"} - err = executor.parseResponse(res, data, query) + query := &cloudMonitoringTimeSeriesFilter{Params: url.Values{}, AliasBy: "{{bucket}}"} + err = query.parseResponse(res, data, "") So(err, ShouldBeNil) frames, _ := res.Dataframes.Decoded() So(len(frames), ShouldEqual, 11) @@ -752,8 +795,8 @@ func TestCloudMonitoring(t *testing.T) { So(len(data.TimeSeries), ShouldEqual, 1) res := &tsdb.QueryResult{Meta: simplejson.New(), RefId: "A"} - query := &cloudMonitoringQuery{AliasBy: "{{bucket}}"} - err = executor.parseResponse(res, data, query) + query := &cloudMonitoringTimeSeriesFilter{Params: url.Values{}, AliasBy: "{{bucket}}"} + err = query.parseResponse(res, data, "") So(err, ShouldBeNil) frames, _ := res.Dataframes.Decoded() So(len(frames), ShouldEqual, 33) @@ -792,8 +835,8 @@ func TestCloudMonitoring(t *testing.T) { So(len(data.TimeSeries), ShouldEqual, 3) res := &tsdb.QueryResult{Meta: simplejson.New(), RefId: "A"} - query := &cloudMonitoringQuery{AliasBy: "{{bucket}}"} - err = executor.parseResponse(res, data, query) + query := &cloudMonitoringTimeSeriesFilter{Params: url.Values{}, AliasBy: "{{bucket}}"} + err = query.parseResponse(res, data, "") labels := res.Meta.Get("labels").Interface().(map[string][]string) So(err, ShouldBeNil) frames, _ := res.Dataframes.Decoded() @@ -831,8 +874,8 @@ func TestCloudMonitoring(t *testing.T) { Convey("and systemlabel contains key with array of string", func() { res := &tsdb.QueryResult{Meta: simplejson.New(), RefId: "A"} - query := &cloudMonitoringQuery{AliasBy: "{{metadata.system_labels.test}}"} - err = executor.parseResponse(res, data, query) + query := &cloudMonitoringTimeSeriesFilter{Params: url.Values{}, AliasBy: "{{metadata.system_labels.test}}"} + err = query.parseResponse(res, data, "") So(err, ShouldBeNil) frames, _ := res.Dataframes.Decoded() So(len(frames), ShouldEqual, 3) @@ -844,8 +887,8 @@ func TestCloudMonitoring(t *testing.T) { Convey("and systemlabel contains key with array of string2", func() { res := &tsdb.QueryResult{Meta: simplejson.New(), RefId: "A"} - query := &cloudMonitoringQuery{AliasBy: "{{metadata.system_labels.test2}}"} - err = executor.parseResponse(res, data, query) + query := &cloudMonitoringTimeSeriesFilter{Params: url.Values{}, AliasBy: "{{metadata.system_labels.test2}}"} + err = query.parseResponse(res, data, "") So(err, ShouldBeNil) frames, _ := res.Dataframes.Decoded() So(len(frames), ShouldEqual, 3) @@ -860,14 +903,15 @@ func TestCloudMonitoring(t *testing.T) { Convey("and alias by is expanded", func() { res := &tsdb.QueryResult{Meta: simplejson.New(), RefId: "A"} - query := &cloudMonitoringQuery{ + query := &cloudMonitoringTimeSeriesFilter{ + Params: url.Values{}, ProjectName: "test-proj", Selector: "select_slo_compliance", Service: "test-service", Slo: "test-slo", AliasBy: "{{project}} - {{service}} - {{slo}} - {{selector}}", } - err = executor.parseResponse(res, data, query) + err = query.parseResponse(res, data, "") frames, _ := res.Dataframes.Decoded() So(err, ShouldBeNil) So(frames[0].Fields[1].Name, ShouldEqual, "test-proj - test-service - test-slo - select_slo_compliance") @@ -881,13 +925,14 @@ func TestCloudMonitoring(t *testing.T) { Convey("and alias by is expanded", func() { res := &tsdb.QueryResult{Meta: simplejson.New(), RefId: "A"} - query := &cloudMonitoringQuery{ + query := &cloudMonitoringTimeSeriesFilter{ + Params: url.Values{}, ProjectName: "test-proj", Selector: "select_slo_compliance", Service: "test-service", Slo: "test-slo", } - err = executor.parseResponse(res, data, query) + err = query.parseResponse(res, data, "") frames, _ := res.Dataframes.Decoded() So(err, ShouldBeNil) So(frames[0].Fields[1].Name, ShouldEqual, "select_slo_compliance(\"projects/test-proj/services/test-service/serviceLevelObjectives/test-slo\")") @@ -897,18 +942,20 @@ func TestCloudMonitoring(t *testing.T) { Convey("Parse cloud monitoring unit", func() { Convey("when there is only one query", func() { Convey("and cloud monitoring unit does not have a corresponding grafana unit", func() { - queries := []*cloudMonitoringQuery{ - {ProjectName: "test-proj", Selector: "select_slo_compliance", Service: "test-service", - Slo: "test-slo", Unit: "megaseconds"}} - unit := executor.resolvePanelUnitFromQueries(queries) + executors := []cloudMonitoringQueryExecutor{ + &cloudMonitoringTimeSeriesFilter{Params: url.Values{}, ProjectName: "test-proj", Selector: "select_slo_compliance", Service: "test-service", + Slo: "test-slo", Unit: "megaseconds"}, + } + unit := executor.resolvePanelUnitFromQueries(executors) So(unit, ShouldEqual, "") }) Convey("and cloud monitoring unit has a corresponding grafana unit", func() { for key, element := range cloudMonitoringUnitMappings { - queries := []*cloudMonitoringQuery{ - {ProjectName: "test-proj", Selector: "select_slo_compliance", Service: "test-service", - Slo: "test-slo", Unit: key}} + queries := []cloudMonitoringQueryExecutor{ + &cloudMonitoringTimeSeriesFilter{Params: url.Values{}, ProjectName: "test-proj", Selector: "select_slo_compliance", Service: "test-service", + Slo: "test-slo", Unit: key}, + } unit := executor.resolvePanelUnitFromQueries(queries) So(unit, ShouldEqual, element) } @@ -918,10 +965,10 @@ func TestCloudMonitoring(t *testing.T) { Convey("when there are more than one query", func() { Convey("and all target units are the same", func() { for key, element := range cloudMonitoringUnitMappings { - queries := []*cloudMonitoringQuery{ - {ProjectName: "test-proj", Selector: "select_slo_compliance", Service: "test-service1", + queries := []cloudMonitoringQueryExecutor{ + &cloudMonitoringTimeSeriesFilter{Params: url.Values{}, ProjectName: "test-proj", Selector: "select_slo_compliance", Service: "test-service1", Slo: "test-slo", Unit: key}, - {ProjectName: "test-proj", Selector: "select_slo_compliance", Service: "test-service2", + &cloudMonitoringTimeSeriesFilter{Params: url.Values{}, ProjectName: "test-proj", Selector: "select_slo_compliance", Service: "test-service2", Slo: "test-slo", Unit: key}, } unit := executor.resolvePanelUnitFromQueries(queries) @@ -930,10 +977,10 @@ func TestCloudMonitoring(t *testing.T) { }) Convey("and all target units are the same but does not have grafana mappings", func() { - queries := []*cloudMonitoringQuery{ - {ProjectName: "test-proj", Selector: "select_slo_compliance", Service: "test-service1", + queries := []cloudMonitoringQueryExecutor{ + &cloudMonitoringTimeSeriesFilter{Params: url.Values{}, ProjectName: "test-proj", Selector: "select_slo_compliance", Service: "test-service1", Slo: "test-slo", Unit: "megaseconds"}, - {ProjectName: "test-proj", Selector: "select_slo_compliance", Service: "test-service2", + &cloudMonitoringTimeSeriesFilter{Params: url.Values{}, ProjectName: "test-proj", Selector: "select_slo_compliance", Service: "test-service2", Slo: "test-slo", Unit: "megaseconds"}, } unit := executor.resolvePanelUnitFromQueries(queries) @@ -941,16 +988,41 @@ func TestCloudMonitoring(t *testing.T) { }) Convey("and all target units are not the same", func() { - queries := []*cloudMonitoringQuery{ - {ProjectName: "test-proj", Selector: "select_slo_compliance", Service: "test-service1", + queries := []cloudMonitoringQueryExecutor{ + &cloudMonitoringTimeSeriesFilter{Params: url.Values{}, ProjectName: "test-proj", Selector: "select_slo_compliance", Service: "test-service1", Slo: "test-slo", Unit: "bit"}, - {ProjectName: "test-proj", Selector: "select_slo_compliance", Service: "test-service2", + &cloudMonitoringTimeSeriesFilter{Params: url.Values{}, ProjectName: "test-proj", Selector: "select_slo_compliance", Service: "test-service2", Slo: "test-slo", Unit: "min"}, } unit := executor.resolvePanelUnitFromQueries(queries) So(unit, ShouldEqual, "") }) }) + + Convey("when data from query returns MQL and alias by is defined", func() { + data, err := loadTestFile("./test-data/7-series-response-mql.json") + So(err, ShouldBeNil) + So(len(data.TimeSeries), ShouldEqual, 0) + So(len(data.TimeSeriesData), ShouldEqual, 1) + + Convey("and alias by is expanded", func() { + fromStart := time.Date(2018, 3, 15, 13, 0, 0, 0, time.UTC).In(time.Local) + res := &tsdb.QueryResult{Meta: simplejson.New(), RefId: "A"} + query := &cloudMonitoringTimeSeriesQuery{ + ProjectName: "test-proj", + Query: "test-query", + AliasBy: "{{project}} - {{resource.label.zone}} - {{resource.label.instance_id}}", + timeRange: &tsdb.TimeRange{ + From: fmt.Sprintf("%v", fromStart.Unix()*1000), + To: fmt.Sprintf("%v", fromStart.Add(34*time.Minute).Unix()*1000), + }, + } + err = query.parseResponse(res, data, "") + So(err, ShouldBeNil) + frames, _ := res.Dataframes.Decoded() + So(frames[0].Fields[1].Name, ShouldEqual, "test-proj - asia-northeast1-c - 6724404429462225363") + }) + }) }) Convey("when interpolating filter wildcards", func() { @@ -1059,6 +1131,16 @@ func loadTestFile(path string) (cloudMonitoringResponse, error) { return data, err } +func getCloudMonitoringQueriesFromInterface(qes []cloudMonitoringQueryExecutor) []*cloudMonitoringTimeSeriesFilter { + queries := make([]*cloudMonitoringTimeSeriesFilter, 0) + for _, qi := range qes { + q, ok := qi.(*cloudMonitoringTimeSeriesFilter) + So(ok, ShouldBeTrue) + queries = append(queries, q) + } + return queries +} + func verifyDeepLink(dl string, expectedTimeSelection map[string]string, expectedTimeSeriesFilter map[string]interface{}) { u, err := url.Parse(dl) So(err, ShouldBeNil) diff --git a/pkg/tsdb/cloudmonitoring/test-data/7-series-response-mql.json b/pkg/tsdb/cloudmonitoring/test-data/7-series-response-mql.json new file mode 100644 index 00000000000..0bb57fc4116 --- /dev/null +++ b/pkg/tsdb/cloudmonitoring/test-data/7-series-response-mql.json @@ -0,0 +1,61 @@ +{ + "timeSeriesDescriptor": { + "labelDescriptors": [ + { + "key": "resource.project_id" + }, + { + "key": "resource.zone" + }, + { + "key": "resource.instance_id" + } + ], + "pointDescriptors": [ + { + "key": "value.read_bytes_count", + "valueType": "INT64", + "metricKind": "DELTA" + } + ] + }, + "timeSeriesData": [ + { + "labelValues": [ + { + "stringValue": "grafana-prod" + }, + { + "stringValue": "asia-northeast1-c" + }, + { + "stringValue": "6724404429462225363" + } + ], + "pointData": [ + { + "values": [ + { + "int64Value": "0" + } + ], + "timeInterval": { + "startTime": "2020-05-18T09:47:00Z", + "endTime": "2020-05-18T09:48:00Z" + } + }, + { + "values": [ + { + "int64Value": "0" + } + ], + "timeInterval": { + "startTime": "2020-05-18T09:46:00Z", + "endTime": "2020-05-18T09:47:00Z" + } + } + ] + } + ] +} diff --git a/pkg/tsdb/cloudmonitoring/time_series_filter.go b/pkg/tsdb/cloudmonitoring/time_series_filter.go new file mode 100644 index 00000000000..152fca3c633 --- /dev/null +++ b/pkg/tsdb/cloudmonitoring/time_series_filter.go @@ -0,0 +1,367 @@ +package cloudmonitoring + +import ( + "context" + "encoding/json" + "fmt" + "net/url" + "path" + "strconv" + "strings" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/grafana/grafana/pkg/components/simplejson" + "github.com/grafana/grafana/pkg/tsdb" + "github.com/opentracing/opentracing-go" + "golang.org/x/net/context/ctxhttp" +) + +func (timeSeriesFilter *cloudMonitoringTimeSeriesFilter) run(ctx context.Context, tsdbQuery *tsdb.TsdbQuery, e *CloudMonitoringExecutor) (*tsdb.QueryResult, cloudMonitoringResponse, string, error) { + queryResult := &tsdb.QueryResult{Meta: simplejson.New(), RefId: timeSeriesFilter.RefID} + projectName := timeSeriesFilter.ProjectName + if projectName == "" { + defaultProject, err := e.getDefaultProject(ctx) + if err != nil { + queryResult.Error = err + return queryResult, cloudMonitoringResponse{}, "", nil + } + projectName = defaultProject + slog.Info("No project name set on query, using project name from datasource", "projectName", projectName) + } + + req, err := e.createRequest(ctx, e.dsInfo, path.Join("cloudmonitoringv3/projects", projectName, "timeSeries"), nil) + if err != nil { + queryResult.Error = err + return queryResult, cloudMonitoringResponse{}, "", nil + } + + req.URL.RawQuery = timeSeriesFilter.Params.Encode() + alignmentPeriod, ok := req.URL.Query()["aggregation.alignmentPeriod"] + + if ok { + seconds, err := strconv.ParseInt(alignmentPeriodRe.FindString(alignmentPeriod[0]), 10, 64) + if err == nil { + queryResult.Meta.Set("alignmentPeriod", seconds) + } + } + + span, ctx := opentracing.StartSpanFromContext(ctx, "cloudMonitoring query") + span.SetTag("target", timeSeriesFilter.Target) + span.SetTag("from", tsdbQuery.TimeRange.From) + span.SetTag("until", tsdbQuery.TimeRange.To) + span.SetTag("datasource_id", e.dsInfo.Id) + span.SetTag("org_id", e.dsInfo.OrgId) + + defer span.Finish() + + if err := opentracing.GlobalTracer().Inject( + span.Context(), + opentracing.HTTPHeaders, + opentracing.HTTPHeadersCarrier(req.Header)); err != nil { + queryResult.Error = err + return queryResult, cloudMonitoringResponse{}, "", nil + } + + res, err := ctxhttp.Do(ctx, e.httpClient, req) + if err != nil { + queryResult.Error = err + return queryResult, cloudMonitoringResponse{}, "", nil + } + + data, err := unmarshalResponse(res) + if err != nil { + queryResult.Error = err + return queryResult, cloudMonitoringResponse{}, "", nil + } + + return queryResult, data, req.URL.RawQuery, nil +} + +func (timeSeriesFilter *cloudMonitoringTimeSeriesFilter) parseResponse(queryRes *tsdb.QueryResult, response cloudMonitoringResponse, executedQueryString string) error { + labels := make(map[string]map[string]bool) + frames := data.Frames{} + for _, series := range response.TimeSeries { + seriesLabels := data.Labels{} + defaultMetricName := series.Metric.Type + labels["resource.type"] = map[string]bool{series.Resource.Type: true} + seriesLabels["resource.type"] = series.Resource.Type + + frame := data.NewFrameOfFieldTypes("", len(series.Points), data.FieldTypeTime, data.FieldTypeFloat64) + frame.RefID = timeSeriesFilter.RefID + frame.Meta = &data.FrameMeta{ + ExecutedQueryString: executedQueryString, + } + + for key, value := range series.Metric.Labels { + if _, ok := labels["metric.label."+key]; !ok { + labels["metric.label."+key] = map[string]bool{} + } + labels["metric.label."+key][value] = true + seriesLabels["metric.label."+key] = value + + if len(timeSeriesFilter.GroupBys) == 0 || containsLabel(timeSeriesFilter.GroupBys, "metric.label."+key) { + defaultMetricName += " " + value + } + } + + for key, value := range series.Resource.Labels { + if _, ok := labels["resource.label."+key]; !ok { + labels["resource.label."+key] = map[string]bool{} + } + labels["resource.label."+key][value] = true + seriesLabels["resource.label."+key] = value + + if containsLabel(timeSeriesFilter.GroupBys, "resource.label."+key) { + defaultMetricName += " " + value + } + } + + for labelType, labelTypeValues := range series.MetaData { + for labelKey, labelValue := range labelTypeValues { + key := toSnakeCase(fmt.Sprintf("metadata.%s.%s", labelType, labelKey)) + if _, ok := labels[key]; !ok { + labels[key] = map[string]bool{} + } + + switch v := labelValue.(type) { + case string: + labels[key][v] = true + seriesLabels[key] = v + case bool: + strVal := strconv.FormatBool(v) + labels[key][strVal] = true + seriesLabels[key] = strVal + case []interface{}: + for _, v := range v { + strVal := v.(string) + labels[key][strVal] = true + if len(seriesLabels[key]) > 0 { + strVal = fmt.Sprintf("%s, %s", seriesLabels[key], strVal) + } + seriesLabels[key] = strVal + } + } + } + } + + // reverse the order to be ascending + if series.ValueType != "DISTRIBUTION" { + timeSeriesFilter.handleNonDistributionSeries( + series, defaultMetricName, seriesLabels, queryRes, frame) + frames = append(frames, frame) + continue + } + buckets := make(map[int]*data.Frame) + for i := len(series.Points) - 1; i >= 0; i-- { + point := series.Points[i] + if len(point.Value.DistributionValue.BucketCounts) == 0 { + continue + } + maxKey := 0 + for i := 0; i < len(point.Value.DistributionValue.BucketCounts); i++ { + value, err := strconv.ParseFloat(point.Value.DistributionValue.BucketCounts[i], 64) + if err != nil { + continue + } + if _, ok := buckets[i]; !ok { + // set lower bounds + // https://cloud.google.com/monitoring/api/ref_v3/rest/v3/TimeSeries#Distribution + bucketBound := calcBucketBound(point.Value.DistributionValue.BucketOptions, i) + additionalLabels := map[string]string{"bucket": bucketBound} + + timeField := data.NewField(data.TimeSeriesTimeFieldName, nil, []time.Time{}) + valueField := data.NewField(data.TimeSeriesValueFieldName, nil, []float64{}) + + frameName := formatLegendKeys(series.Metric.Type, defaultMetricName, nil, additionalLabels, timeSeriesFilter) + valueField.Name = frameName + buckets[i] = &data.Frame{ + Name: frameName, + Fields: []*data.Field{ + timeField, + valueField, + }, + RefID: timeSeriesFilter.RefID, + } + + if maxKey < i { + maxKey = i + } + } + buckets[i].AppendRow(point.Interval.EndTime, value) + } + for i := 0; i < maxKey; i++ { + if _, ok := buckets[i]; !ok { + bucketBound := calcBucketBound(point.Value.DistributionValue.BucketOptions, i) + additionalLabels := data.Labels{"bucket": bucketBound} + timeField := data.NewField(data.TimeSeriesTimeFieldName, nil, []time.Time{}) + valueField := data.NewField(data.TimeSeriesValueFieldName, nil, []float64{}) + frameName := formatLegendKeys(series.Metric.Type, defaultMetricName, seriesLabels, additionalLabels, timeSeriesFilter) + valueField.Name = frameName + buckets[i] = &data.Frame{ + Name: frameName, + Fields: []*data.Field{ + timeField, + valueField, + }, + RefID: timeSeriesFilter.RefID, + } + } + } + } + for i := 0; i < len(buckets); i++ { + frames = append(frames, buckets[i]) + } + } + if len(response.TimeSeries) > 0 { + dl := timeSeriesFilter.buildDeepLink() + frames = addConfigData(frames, dl) + } + + queryRes.Dataframes = tsdb.NewDecodedDataFrames(frames) + + labelsByKey := make(map[string][]string) + for key, values := range labels { + for value := range values { + labelsByKey[key] = append(labelsByKey[key], value) + } + } + + queryRes.Meta.Set("labels", labelsByKey) + queryRes.Meta.Set("groupBys", timeSeriesFilter.GroupBys) + return nil +} + +func (timeSeriesFilter *cloudMonitoringTimeSeriesFilter) handleNonDistributionSeries(series timeSeries, defaultMetricName string, seriesLabels map[string]string, + queryRes *tsdb.QueryResult, frame *data.Frame) { + for i := 0; i < len(series.Points); i++ { + point := series.Points[i] + value := point.Value.DoubleValue + + if series.ValueType == "INT64" { + parsedValue, err := strconv.ParseFloat(point.Value.IntValue, 64) + if err == nil { + value = parsedValue + } + } + + if series.ValueType == "BOOL" { + if point.Value.BoolValue { + value = 1 + } else { + value = 0 + } + } + frame.SetRow(len(series.Points)-1-i, point.Interval.EndTime, value) + } + + metricName := formatLegendKeys(series.Metric.Type, defaultMetricName, seriesLabels, nil, timeSeriesFilter) + dataField := frame.Fields[1] + dataField.Name = metricName +} + +func (timeSeriesFilter *cloudMonitoringTimeSeriesFilter) parseToAnnotations(queryRes *tsdb.QueryResult, data cloudMonitoringResponse, title string, text string, tags string) error { + annotations := make([]map[string]string, 0) + + for _, series := range data.TimeSeries { + // reverse the order to be ascending + for i := len(series.Points) - 1; i >= 0; i-- { + point := series.Points[i] + value := strconv.FormatFloat(point.Value.DoubleValue, 'f', 6, 64) + if series.ValueType == "STRING" { + value = point.Value.StringValue + } + annotation := make(map[string]string) + annotation["time"] = point.Interval.EndTime.UTC().Format(time.RFC3339) + annotation["title"] = formatAnnotationText(title, value, series.Metric.Type, series.Metric.Labels, series.Resource.Labels) + annotation["tags"] = tags + annotation["text"] = formatAnnotationText(text, value, series.Metric.Type, series.Metric.Labels, series.Resource.Labels) + annotations = append(annotations, annotation) + } + } + + transformAnnotationToTable(annotations, queryRes) + return nil +} + +func (timeSeriesFilter *cloudMonitoringTimeSeriesFilter) buildDeepLink() string { + if timeSeriesFilter.Slo != "" { + return "" + } + + filter := timeSeriesFilter.Params.Get("filter") + if !strings.Contains(filter, "resource.type=") { + resourceType := timeSeriesFilter.Params.Get("resourceType") + if resourceType != "" { + filter = fmt.Sprintf(`resource.type="%s" %s`, resourceType, filter) + } + } + + u, err := url.Parse("https://console.cloud.google.com/monitoring/metrics-explorer") + if err != nil { + slog.Error("Failed to generate deep link: unable to parse metrics explorer URL", "ProjectName", timeSeriesFilter.ProjectName, "query", timeSeriesFilter.RefID) + return "" + } + + rawQuery := u.Query() + rawQuery.Set("project", timeSeriesFilter.ProjectName) + rawQuery.Set("Grafana_deeplink", "true") + + pageState := map[string]interface{}{ + "xyChart": map[string]interface{}{ + "constantLines": []string{}, + "dataSets": []map[string]interface{}{ + { + "timeSeriesFilter": map[string]interface{}{ + "aggregations": []string{}, + "crossSeriesReducer": timeSeriesFilter.Params.Get("aggregation.crossSeriesReducer"), + "filter": filter, + "groupByFields": timeSeriesFilter.Params["aggregation.groupByFields"], + "minAlignmentPeriod": strings.TrimPrefix(timeSeriesFilter.Params.Get("aggregation.alignmentPeriod"), "+"), // get rid of leading + + "perSeriesAligner": timeSeriesFilter.Params.Get("aggregation.perSeriesAligner"), + "secondaryGroupByFields": []string{}, + "unitOverride": "1", + }, + }, + }, + "timeshiftDuration": "0s", + "y1Axis": map[string]string{ + "label": "y1Axis", + "scale": "LINEAR", + }, + }, + "timeSelection": map[string]string{ + "timeRange": "custom", + "start": timeSeriesFilter.Params.Get("interval.startTime"), + "end": timeSeriesFilter.Params.Get("interval.endTime"), + }, + } + + blob, err := json.Marshal(pageState) + if err != nil { + slog.Error("Failed to generate deep link", "pageState", pageState, "ProjectName", timeSeriesFilter.ProjectName, "query", timeSeriesFilter.RefID) + return "" + } + + rawQuery.Set("pageState", string(blob)) + u.RawQuery = rawQuery.Encode() + + accountChooserURL, err := url.Parse("https://accounts.google.com/AccountChooser") + if err != nil { + slog.Error("Failed to generate deep link: unable to parse account chooser URL", "ProjectName", timeSeriesFilter.ProjectName, "query", timeSeriesFilter.RefID) + return "" + } + accountChooserQuery := accountChooserURL.Query() + accountChooserQuery.Set("continue", u.String()) + accountChooserURL.RawQuery = accountChooserQuery.Encode() + + return accountChooserURL.String() +} + +func (timeSeriesFilter *cloudMonitoringTimeSeriesFilter) getRefID() string { + return timeSeriesFilter.RefID +} + +func (timeSeriesFilter *cloudMonitoringTimeSeriesFilter) getUnit() string { + return timeSeriesFilter.Unit +} diff --git a/pkg/tsdb/cloudmonitoring/time_series_query.go b/pkg/tsdb/cloudmonitoring/time_series_query.go new file mode 100644 index 00000000000..91087ad7820 --- /dev/null +++ b/pkg/tsdb/cloudmonitoring/time_series_query.go @@ -0,0 +1,366 @@ +package cloudmonitoring + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/url" + "path" + "strconv" + "strings" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/grafana/grafana/pkg/components/simplejson" + "github.com/grafana/grafana/pkg/tsdb" + "github.com/opentracing/opentracing-go" + "golang.org/x/net/context/ctxhttp" +) + +func (timeSeriesQuery cloudMonitoringTimeSeriesQuery) run(ctx context.Context, tsdbQuery *tsdb.TsdbQuery, e *CloudMonitoringExecutor) (*tsdb.QueryResult, cloudMonitoringResponse, string, error) { + queryResult := &tsdb.QueryResult{Meta: simplejson.New(), RefId: timeSeriesQuery.RefID} + projectName := timeSeriesQuery.ProjectName + if projectName == "" { + defaultProject, err := e.getDefaultProject(ctx) + if err != nil { + queryResult.Error = err + return queryResult, cloudMonitoringResponse{}, "", nil + } + projectName = defaultProject + slog.Info("No project name set on query, using project name from datasource", "projectName", projectName) + } + + from, err := tsdbQuery.TimeRange.ParseFrom() + if err != nil { + queryResult.Error = err + return queryResult, cloudMonitoringResponse{}, "", nil + } + to, err := tsdbQuery.TimeRange.ParseTo() + if err != nil { + queryResult.Error = err + return queryResult, cloudMonitoringResponse{}, "", nil + } + intervalCalculator := tsdb.NewIntervalCalculator(&tsdb.IntervalOptions{}) + interval := intervalCalculator.Calculate(tsdbQuery.TimeRange, time.Duration(timeSeriesQuery.IntervalMS/1000)*time.Second) + timeFormat := "2006/01/02-15:04:05" + timeSeriesQuery.Query += fmt.Sprintf(" | graph_period %s | within d'%s', d'%s'", interval.Text, from.UTC().Format(timeFormat), to.UTC().Format(timeFormat)) + + buf, err := json.Marshal(map[string]interface{}{ + "query": timeSeriesQuery.Query, + }) + if err != nil { + queryResult.Error = err + return queryResult, cloudMonitoringResponse{}, "", nil + } + req, err := e.createRequest(ctx, e.dsInfo, path.Join("cloudmonitoringv3/projects", projectName, "timeSeries:query"), bytes.NewBuffer(buf)) + if err != nil { + queryResult.Error = err + return queryResult, cloudMonitoringResponse{}, "", nil + } + + span, ctx := opentracing.StartSpanFromContext(ctx, "cloudMonitoring MQL query") + span.SetTag("query", timeSeriesQuery.Query) + span.SetTag("from", tsdbQuery.TimeRange.From) + span.SetTag("until", tsdbQuery.TimeRange.To) + span.SetTag("datasource_id", e.dsInfo.Id) + span.SetTag("org_id", e.dsInfo.OrgId) + + defer span.Finish() + + if err := opentracing.GlobalTracer().Inject( + span.Context(), + opentracing.HTTPHeaders, + opentracing.HTTPHeadersCarrier(req.Header)); err != nil { + queryResult.Error = err + return queryResult, cloudMonitoringResponse{}, "", nil + } + + res, err := ctxhttp.Do(ctx, e.httpClient, req) + if err != nil { + queryResult.Error = err + return queryResult, cloudMonitoringResponse{}, "", nil + } + + data, err := unmarshalResponse(res) + + if err != nil { + queryResult.Error = err + return queryResult, cloudMonitoringResponse{}, "", nil + } + + return queryResult, data, timeSeriesQuery.Query, nil +} + +func (timeSeriesQuery cloudMonitoringTimeSeriesQuery) parseResponse(queryRes *tsdb.QueryResult, response cloudMonitoringResponse, executedQueryString string) error { + labels := make(map[string]map[string]bool) + frames := data.Frames{} + for _, series := range response.TimeSeriesData { + seriesLabels := make(map[string]string) + frame := data.NewFrameOfFieldTypes("", len(series.PointData), data.FieldTypeTime, data.FieldTypeFloat64) + frame.RefID = timeSeriesQuery.RefID + frame.Meta = &data.FrameMeta{ + ExecutedQueryString: executedQueryString, + } + + for n, d := range response.TimeSeriesDescriptor.LabelDescriptors { + key := toSnakeCase(d.Key) + key = strings.Replace(key, ".", ".label.", 1) + if _, ok := labels[key]; !ok { + labels[key] = map[string]bool{} + } + + labelValue := series.LabelValues[n] + switch d.ValueType { + case "BOOL": + strVal := strconv.FormatBool(labelValue.BoolValue) + labels[key][strVal] = true + seriesLabels[key] = strVal + case "INT64": + intVal := strconv.FormatInt(labelValue.Int64Value, 10) + labels[key][intVal] = true + seriesLabels[key] = intVal + default: + labels[key][labelValue.StringValue] = true + seriesLabels[key] = labelValue.StringValue + } + } + + for n, d := range response.TimeSeriesDescriptor.PointDescriptors { + if _, ok := labels["metric.name"]; !ok { + labels["metric.name"] = map[string]bool{} + } + labels["metric.name"][d.Key] = true + seriesLabels["metric.name"] = d.Key + defaultMetricName := d.Key + + // process non-distribution series + if d.ValueType != "DISTRIBUTION" { + // reverse the order to be ascending + for i := len(series.PointData) - 1; i >= 0; i-- { + point := series.PointData[i] + value := point.Values[n].DoubleValue + + if d.ValueType == "INT64" { + parsedValue, err := strconv.ParseFloat(point.Values[n].Int64Value, 64) + if err == nil { + value = parsedValue + } + } else if d.ValueType == "BOOL" { + if point.Values[n].BoolValue { + value = 1 + } else { + value = 0 + } + } + + frame.SetRow(len(series.PointData)-1-i, series.PointData[i].TimeInterval.EndTime, value) + } + + metricName := formatLegendKeys(d.Key, defaultMetricName, seriesLabels, nil, &cloudMonitoringTimeSeriesFilter{ProjectName: timeSeriesQuery.ProjectName, AliasBy: timeSeriesQuery.AliasBy}) + dataField := frame.Fields[1] + dataField.Name = metricName + + frames = append(frames, frame) + continue + } + + // process distribution series + buckets := make(map[int]*data.Frame) + // reverse the order to be ascending + for i := len(series.PointData) - 1; i >= 0; i-- { + point := series.PointData[i] + if len(point.Values[n].DistributionValue.BucketCounts) == 0 { + continue + } + maxKey := 0 + for i := 0; i < len(point.Values[n].DistributionValue.BucketCounts); i++ { + value, err := strconv.ParseFloat(point.Values[n].DistributionValue.BucketCounts[i], 64) + if err != nil { + continue + } + if _, ok := buckets[i]; !ok { + // set lower bounds + // https://cloud.google.com/monitoring/api/ref_v3/rest/v3/TimeSeries#Distribution + bucketBound := calcBucketBound(point.Values[n].DistributionValue.BucketOptions, i) + additionalLabels := map[string]string{"bucket": bucketBound} + + timeField := data.NewField(data.TimeSeriesTimeFieldName, nil, []time.Time{}) + valueField := data.NewField(data.TimeSeriesValueFieldName, nil, []float64{}) + + frameName := formatLegendKeys(d.Key, defaultMetricName, nil, additionalLabels, &cloudMonitoringTimeSeriesFilter{ProjectName: timeSeriesQuery.ProjectName, AliasBy: timeSeriesQuery.AliasBy}) + valueField.Name = frameName + buckets[i] = &data.Frame{ + Name: frameName, + Fields: []*data.Field{ + timeField, + valueField, + }, + RefID: timeSeriesQuery.RefID, + } + + if maxKey < i { + maxKey = i + } + } + buckets[i].AppendRow(point.TimeInterval.EndTime, value) + } + + // fill empty bucket + for i := 0; i < maxKey; i++ { + if _, ok := buckets[i]; !ok { + bucketBound := calcBucketBound(point.Values[n].DistributionValue.BucketOptions, i) + additionalLabels := data.Labels{"bucket": bucketBound} + timeField := data.NewField(data.TimeSeriesTimeFieldName, nil, []time.Time{}) + valueField := data.NewField(data.TimeSeriesValueFieldName, nil, []float64{}) + frameName := formatLegendKeys(d.Key, defaultMetricName, seriesLabels, additionalLabels, &cloudMonitoringTimeSeriesFilter{ProjectName: timeSeriesQuery.ProjectName, AliasBy: timeSeriesQuery.AliasBy}) + valueField.Name = frameName + buckets[i] = &data.Frame{ + Name: frameName, + Fields: []*data.Field{ + timeField, + valueField, + }, + RefID: timeSeriesQuery.RefID, + } + } + } + } + for i := 0; i < len(buckets); i++ { + frames = append(frames, buckets[i]) + } + } + } + if len(response.TimeSeriesData) > 0 { + dl := timeSeriesQuery.buildDeepLink() + frames = addConfigData(frames, dl) + } + + queryRes.Dataframes = tsdb.NewDecodedDataFrames(frames) + + labelsByKey := make(map[string][]string) + for key, values := range labels { + for value := range values { + labelsByKey[key] = append(labelsByKey[key], value) + } + } + + queryRes.Meta.Set("labels", labelsByKey) + + return nil +} + +func (timeSeriesQuery cloudMonitoringTimeSeriesQuery) parseToAnnotations(queryRes *tsdb.QueryResult, data cloudMonitoringResponse, title string, text string, tags string) error { + annotations := make([]map[string]string, 0) + + for _, series := range data.TimeSeriesData { + metricLabels := make(map[string]string) + resourceLabels := make(map[string]string) + + for n, d := range data.TimeSeriesDescriptor.LabelDescriptors { + key := toSnakeCase(d.Key) + labelValue := series.LabelValues[n] + value := "" + switch d.ValueType { + case "BOOL": + strVal := strconv.FormatBool(labelValue.BoolValue) + value = strVal + case "INT64": + intVal := strconv.FormatInt(labelValue.Int64Value, 10) + value = intVal + default: + value = labelValue.StringValue + } + if strings.Index(key, "metric.") == 0 { + key = key[len("metric."):] + metricLabels[key] = value + } else if strings.Index(key, "resource.") == 0 { + key = key[len("resource."):] + resourceLabels[key] = value + } + } + + for n, d := range data.TimeSeriesDescriptor.PointDescriptors { + // reverse the order to be ascending + for i := len(series.PointData) - 1; i >= 0; i-- { + point := series.PointData[i] + value := strconv.FormatFloat(point.Values[n].DoubleValue, 'f', 6, 64) + if d.ValueType == "STRING" { + value = point.Values[n].StringValue + } + annotation := make(map[string]string) + annotation["time"] = point.TimeInterval.EndTime.UTC().Format(time.RFC3339) + annotation["title"] = formatAnnotationText(title, value, d.MetricKind, metricLabels, resourceLabels) + annotation["tags"] = tags + annotation["text"] = formatAnnotationText(text, value, d.MetricKind, metricLabels, resourceLabels) + annotations = append(annotations, annotation) + } + } + } + + transformAnnotationToTable(annotations, queryRes) + return nil +} + +func (timeSeriesQuery cloudMonitoringTimeSeriesQuery) buildDeepLink() string { + u, err := url.Parse("https://console.cloud.google.com/monitoring/metrics-explorer") + if err != nil { + slog.Error("Failed to generate deep link: unable to parse metrics explorer URL", "projectName", timeSeriesQuery.ProjectName, "query", timeSeriesQuery.RefID) + return "" + } + + q := u.Query() + q.Set("project", timeSeriesQuery.ProjectName) + q.Set("Grafana_deeplink", "true") + + pageState := map[string]interface{}{ + "xyChart": map[string]interface{}{ + "constantLines": []string{}, + "dataSets": []map[string]interface{}{ + { + "timeSeriesQuery": timeSeriesQuery.Query, + "targetAxis": "Y1", + "plotType": "LINE", + }, + }, + "timeshiftDuration": "0s", + "y1Axis": map[string]string{ + "label": "y1Axis", + "scale": "LINEAR", + }, + }, + "timeSelection": map[string]string{ + "timeRange": "custom", + "start": timeSeriesQuery.timeRange.MustGetFrom().Format(time.RFC3339Nano), + "end": timeSeriesQuery.timeRange.MustGetTo().Format(time.RFC3339Nano), + }, + } + + blob, err := json.Marshal(pageState) + if err != nil { + slog.Error("Failed to generate deep link", "pageState", pageState, "ProjectName", timeSeriesQuery.ProjectName, "query", timeSeriesQuery.RefID) + return "" + } + + q.Set("pageState", string(blob)) + u.RawQuery = q.Encode() + + accountChooserURL, err := url.Parse("https://accounts.google.com/AccountChooser") + if err != nil { + slog.Error("Failed to generate deep link: unable to parse account chooser URL", "ProjectName", timeSeriesQuery.ProjectName, "query", timeSeriesQuery.RefID) + return "" + } + accountChooserQuery := accountChooserURL.Query() + accountChooserQuery.Set("continue", u.String()) + accountChooserURL.RawQuery = accountChooserQuery.Encode() + + return accountChooserURL.String() +} + +func (timeSeriesQuery cloudMonitoringTimeSeriesQuery) getRefID() string { + return timeSeriesQuery.RefID +} + +func (timeSeriesQuery *cloudMonitoringTimeSeriesQuery) getUnit() string { + return timeSeriesQuery.Unit +} diff --git a/pkg/tsdb/cloudmonitoring/types.go b/pkg/tsdb/cloudmonitoring/types.go index 704d657f662..7c43ff87c11 100644 --- a/pkg/tsdb/cloudmonitoring/types.go +++ b/pkg/tsdb/cloudmonitoring/types.go @@ -1,12 +1,25 @@ package cloudmonitoring import ( + "context" "net/url" "time" + + "github.com/grafana/grafana/pkg/tsdb" ) type ( - cloudMonitoringQuery struct { + cloudMonitoringQueryExecutor interface { + run(ctx context.Context, tsdbQuery *tsdb.TsdbQuery, e *CloudMonitoringExecutor) (*tsdb.QueryResult, cloudMonitoringResponse, string, error) + parseResponse(queryRes *tsdb.QueryResult, data cloudMonitoringResponse, executedQueryString string) error + parseToAnnotations(queryRes *tsdb.QueryResult, data cloudMonitoringResponse, title string, text string, tags string) error + buildDeepLink() string + getRefID() string + getUnit() string + } + + // Used to build time series filters + cloudMonitoringTimeSeriesFilter struct { Target string Params url.Values RefID string @@ -19,6 +32,17 @@ type ( Unit string } + // Used to build MQL queries + cloudMonitoringTimeSeriesQuery struct { + RefID string + ProjectName string + Query string + IntervalMS int64 + AliasBy string + timeRange *tsdb.TimeRange + Unit string + } + metricQuery struct { ProjectName string MetricType string @@ -29,6 +53,8 @@ type ( Filters []string AliasBy string View string + EditorMode string + Query string Unit string } @@ -67,10 +93,61 @@ type ( } cloudMonitoringResponse struct { - TimeSeries []timeSeries `json:"timeSeries"` + TimeSeries []timeSeries `json:"timeSeries"` + TimeSeriesDescriptor timeSeriesDescriptor `json:"timeSeriesDescriptor"` + TimeSeriesData timeSeriesData `json:"timeSeriesData"` } ) +type timeSeriesDescriptor struct { + LabelDescriptors []struct { + Key string `json:"key"` + ValueType string `json:"valueType"` + Description string `json:"description"` + } `json:"labelDescriptors"` + PointDescriptors []struct { + Key string `json:"key"` + ValueType string `json:"valueType"` + MetricKind string `json:"metricKind"` + } `json:"pointDescriptors"` +} + +type timeSeriesData []struct { + LabelValues []struct { + BoolValue bool `json:"boolValue"` + Int64Value int64 `json:"int64Value"` + StringValue string `json:"stringValue"` + } `json:"labelValues"` + PointData []struct { + Values []struct { + BoolValue bool `json:"boolValue"` + Int64Value string `json:"int64Value"` + DoubleValue float64 `json:"doubleValue"` + StringValue string `json:"stringValue"` + DistributionValue struct { + Count string `json:"count"` + Mean float64 `json:"mean"` + SumOfSquaredDeviation float64 `json:"sumOfSquaredDeviation"` + Range struct { + Min int `json:"min"` + Max int `json:"max"` + } `json:"range"` + BucketOptions cloudMonitoringBucketOptions `json:"bucketOptions"` + BucketCounts []string `json:"bucketCounts"` + Examplars []struct { + Value float64 `json:"value"` + Timestamp string `json:"timestamp"` + // attachments + } `json:"examplars"` + } `json:"distributionValue"` + } `json:"values"` + TimeInterval struct { + EndTime time.Time `json:"endTime"` + StartTime time.Time `json:"startTime"` + } `json:"timeInterval"` + } `json:"pointData"` +} + type timeSeries struct { Metric struct { Labels map[string]string `json:"labels"` diff --git a/public/app/plugins/datasource/cloud-monitoring/components/AnnotationQueryEditor.tsx b/public/app/plugins/datasource/cloud-monitoring/components/AnnotationQueryEditor.tsx index 4cbc38753c1..de7ed3671da 100644 --- a/public/app/plugins/datasource/cloud-monitoring/components/AnnotationQueryEditor.tsx +++ b/public/app/plugins/datasource/cloud-monitoring/components/AnnotationQueryEditor.tsx @@ -6,7 +6,7 @@ import { SelectableValue } from '@grafana/data'; import CloudMonitoringDatasource from '../datasource'; import { AnnotationsHelp, LabelFilter, Metrics, Project } from './'; import { toOption } from '../functions'; -import { AnnotationTarget, MetricDescriptor } from '../types'; +import { AnnotationTarget, EditorMode, MetricDescriptor } from '../types'; const { Input } = LegacyForms; @@ -25,6 +25,7 @@ interface State extends AnnotationTarget { } const DefaultTarget: State = { + editorMode: EditorMode.Visual, projectName: '', projects: [], metricType: '', @@ -42,7 +43,7 @@ const DefaultTarget: State = { export class AnnotationQueryEditor extends React.Component { state: State = DefaultTarget; - async UNSAFE_UNSAFE_componentWillMount() { + async UNSAFE_componentWillMount() { // Unfortunately, migrations like this need to go UNSAFE_componentWillMount. As soon as there's // migration hook for this module.ts, we can do the migrations there instead. const { target, datasource } = this.props; diff --git a/public/app/plugins/datasource/cloud-monitoring/components/MQLQueryEditor.tsx b/public/app/plugins/datasource/cloud-monitoring/components/MQLQueryEditor.tsx new file mode 100644 index 00000000000..a5d8c92c531 --- /dev/null +++ b/public/app/plugins/datasource/cloud-monitoring/components/MQLQueryEditor.tsx @@ -0,0 +1,32 @@ +import React from 'react'; +import { TextArea } from '@grafana/ui'; + +export interface Props { + onChange: (query: string) => void; + onRunQuery: () => void; + query: string; +} + +export function MQLQueryEditor({ query, onChange, onRunQuery }: React.PropsWithChildren) { + const onKeyDown = (event: any) => { + if (event.key === 'Enter' && (event.shiftKey || event.ctrlKey)) { + event.preventDefault(); + onRunQuery(); + } + }; + + return ( + <> +