Cloud Monitoring: Fix missing data when result is paginated (#56270)

* Added nextPageToken prop

* Adding first and pageToken condition to while loop

* clean up

* revert gitignore

* fix go lint

* Added logic to builder too

* Removed pageSize - was for local testing

* gofmt

* extracted doRequest function

* extracted doRequest in query too

* Adressed filter comments

* Adressed query comments

* go fmt

* removed pageSize added for testing

* go fmt again
This commit is contained in:
Yaelle Chaudy 2022-10-13 10:58:07 +02:00 committed by GitHub
parent b9d449529a
commit 0295177bb0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 71 additions and 32 deletions

View File

@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"path"
"strconv"
@ -17,6 +18,22 @@ import (
"github.com/grafana/grafana/pkg/infra/tracing"
)
func (timeSeriesFilter *cloudMonitoringTimeSeriesFilter) doRequestFilterPage(ctx context.Context, r *http.Request, dsInfo datasourceInfo) (cloudMonitoringResponse, error) {
r.URL.RawQuery = timeSeriesFilter.Params.Encode()
r = r.WithContext(ctx)
res, err := dsInfo.services[cloudMonitor].client.Do(r)
if err != nil {
return cloudMonitoringResponse{}, err
}
dnext, err := unmarshalResponse(res)
if err != nil {
return cloudMonitoringResponse{}, err
}
return dnext, nil
}
func (timeSeriesFilter *cloudMonitoringTimeSeriesFilter) run(ctx context.Context, req *backend.QueryDataRequest,
s *Service, dsInfo datasourceInfo, tracer tracing.Tracer) (*backend.DataResponse, cloudMonitoringResponse, string, error) {
dr := &backend.DataResponse{}
@ -30,16 +47,12 @@ func (timeSeriesFilter *cloudMonitoringTimeSeriesFilter) run(ctx context.Context
}
slog.Info("No project name set on query, using project name from datasource", "projectName", projectName)
}
r, err := s.createRequest(ctx, &dsInfo, path.Join("/v3/projects", projectName, "timeSeries"), nil)
if err != nil {
dr.Error = err
return dr, cloudMonitoringResponse{}, "", nil
}
r.URL.RawQuery = timeSeriesFilter.Params.Encode()
alignmentPeriod, ok := r.URL.Query()["aggregation.alignmentPeriod"]
if ok {
seconds, err := strconv.ParseInt(alignmentPeriodRe.FindString(alignmentPeriod[0]), 10, 64)
if err == nil {
@ -63,21 +76,24 @@ func (timeSeriesFilter *cloudMonitoringTimeSeriesFilter) run(ctx context.Context
span.SetAttributes("until", req.Queries[0].TimeRange.To, attribute.Key("until").String(req.Queries[0].TimeRange.To.String()))
span.SetAttributes("datasource_id", dsInfo.id, attribute.Key("datasource_id").Int64(dsInfo.id))
span.SetAttributes("org_id", req.PluginContext.OrgID, attribute.Key("org_id").Int64(req.PluginContext.OrgID))
defer span.End()
tracer.Inject(ctx, r.Header, span)
r = r.WithContext(ctx)
res, err := dsInfo.services[cloudMonitor].client.Do(r)
d, err := timeSeriesFilter.doRequestFilterPage(ctx, r, dsInfo)
if err != nil {
dr.Error = err
return dr, cloudMonitoringResponse{}, "", nil
}
d, err := unmarshalResponse(res)
if err != nil {
dr.Error = err
return dr, cloudMonitoringResponse{}, "", nil
nextPageToken := d.NextPageToken
for nextPageToken != "" {
timeSeriesFilter.Params["pageToken"] = []string{d.NextPageToken}
nextPage, err := timeSeriesFilter.doRequestFilterPage(ctx, r, dsInfo)
if err != nil {
dr.Error = err
return dr, cloudMonitoringResponse{}, "", nil
}
d.TimeSeries = append(d.TimeSeries, nextPage.TimeSeries...)
nextPageToken = nextPage.NextPageToken
}
return dr, d, r.URL.RawQuery, nil

View File

@ -5,16 +5,17 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"path"
"strconv"
"strings"
"time"
"go.opentelemetry.io/otel/attribute"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"go.opentelemetry.io/otel/attribute"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/tsdb/intervalv2"
@ -35,6 +36,24 @@ func (timeSeriesQuery cloudMonitoringTimeSeriesQuery) appendGraphPeriod(req *bac
return ""
}
func doRequestQueryPage(requestBody map[string]interface{}, r *http.Request, dsInfo datasourceInfo) (cloudMonitoringResponse, error) {
buf, err := json.Marshal(requestBody)
if err != nil {
return cloudMonitoringResponse{}, err
}
r.Body = io.NopCloser(bytes.NewBuffer(buf))
res, err := dsInfo.services[cloudMonitor].client.Do(r)
if err != nil {
return cloudMonitoringResponse{}, err
}
dnext, err := unmarshalResponse(res)
if err != nil {
return cloudMonitoringResponse{}, err
}
return dnext, nil
}
func (timeSeriesQuery cloudMonitoringTimeSeriesQuery) run(ctx context.Context, req *backend.QueryDataRequest,
s *Service, dsInfo datasourceInfo, tracer tracing.Tracer) (*backend.DataResponse, cloudMonitoringResponse, string, error) {
dr := &backend.DataResponse{}
@ -55,40 +74,43 @@ func (timeSeriesQuery cloudMonitoringTimeSeriesQuery) run(ctx context.Context, r
to := req.Queries[0].TimeRange.To
timeFormat := "2006/01/02-15:04:05"
timeSeriesQuery.Query += fmt.Sprintf(" | within d'%s', d'%s'", from.UTC().Format(timeFormat), to.UTC().Format(timeFormat))
buf, err := json.Marshal(map[string]interface{}{
"query": timeSeriesQuery.Query,
})
if err != nil {
dr.Error = err
return dr, cloudMonitoringResponse{}, "", nil
}
r, err := s.createRequest(ctx, &dsInfo, path.Join("/v3/projects", projectName, "timeSeries:query"), bytes.NewBuffer(buf))
if err != nil {
dr.Error = err
return dr, cloudMonitoringResponse{}, "", nil
}
p := path.Join("/v3/projects", projectName, "timeSeries:query")
ctx, span := tracer.Start(ctx, "cloudMonitoring MQL query")
span.SetAttributes("query", timeSeriesQuery.Query, attribute.Key("query").String(timeSeriesQuery.Query))
span.SetAttributes("from", req.Queries[0].TimeRange.From, attribute.Key("from").String(req.Queries[0].TimeRange.From.String()))
span.SetAttributes("until", req.Queries[0].TimeRange.To, attribute.Key("until").String(req.Queries[0].TimeRange.To.String()))
defer span.End()
tracer.Inject(ctx, r.Header, span)
r = r.WithContext(ctx)
res, err := dsInfo.services[cloudMonitor].client.Do(r)
requestBody := map[string]interface{}{
"query": timeSeriesQuery.Query,
}
r, err := s.createRequest(ctx, &dsInfo, p, bytes.NewBuffer([]byte{}))
if err != nil {
dr.Error = err
return dr, cloudMonitoringResponse{}, "", nil
}
tracer.Inject(ctx, r.Header, span)
r = r.WithContext(ctx)
d, err := unmarshalResponse(res)
d, err := doRequestQueryPage(requestBody, r, dsInfo)
if err != nil {
dr.Error = err
return dr, cloudMonitoringResponse{}, "", nil
}
for d.NextPageToken != "" {
requestBody := map[string]interface{}{
"query": timeSeriesQuery.Query,
"pageToken": d.NextPageToken,
}
nextPage, err := doRequestQueryPage(requestBody, r, dsInfo)
if err != nil {
dr.Error = err
return dr, cloudMonitoringResponse{}, "", nil
}
d.TimeSeriesData = append(d.TimeSeriesData, nextPage.TimeSeriesData...)
d.NextPageToken = nextPage.NextPageToken
}
return dr, d, timeSeriesQuery.Query, nil
}

View File

@ -100,6 +100,7 @@ type (
TimeSeriesDescriptor timeSeriesDescriptor `json:"timeSeriesDescriptor"`
TimeSeriesData timeSeriesData `json:"timeSeriesData"`
Unit string `json:"unit"`
NextPageToken string `json:"nextPageToken"`
}
)