Plugins: Externalize Cloud Monitoring data source (#80181)

This commit is contained in:
Alyssa Bull
2024-01-29 09:24:23 -07:00
committed by GitHub
parent de662810cf
commit 2d432d6ff3
43 changed files with 1838 additions and 172 deletions

View File

@@ -8,6 +8,7 @@ import (
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
@@ -18,10 +19,10 @@ type annotationEvent struct {
Text string
}
func (s *Service) executeAnnotationQuery(ctx context.Context, req *backend.QueryDataRequest, dsInfo datasourceInfo, queries []cloudMonitoringQueryExecutor) (
func (s *Service) executeAnnotationQuery(ctx context.Context, req *backend.QueryDataRequest, dsInfo datasourceInfo, queries []cloudMonitoringQueryExecutor, logger log.Logger) (
*backend.QueryDataResponse, error) {
resp := backend.NewQueryDataResponse()
queryRes, dr, _, err := queries[0].run(ctx, req, s, dsInfo, s.tracer)
queryRes, dr, _, err := queries[0].run(ctx, req, s, dsInfo, logger)
if err != nil {
return resp, err
}

View File

@@ -17,20 +17,15 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
"github.com/grafana/grafana-plugin-sdk-go/backend/resource/httpadapter"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/infra/httpclient"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/tsdb/cloud-monitoring/kinds/dataquery"
)
var (
slog = log.New("tsdb.cloudMonitoring")
)
var (
legendKeyFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`)
metricNameFormat = regexp.MustCompile(`([\w\d_]+)\.(googleapis\.com|io)/(.+)`)
@@ -65,11 +60,11 @@ const (
perSeriesAlignerDefault = "ALIGN_MEAN"
)
func ProvideService(httpClientProvider httpclient.Provider, tracer tracing.Tracer) *Service {
func ProvideService(httpClientProvider *httpclient.Provider) *Service {
s := &Service{
tracer: tracer,
httpClientProvider: httpClientProvider,
im: datasource.NewInstanceManager(newInstanceSettings(httpClientProvider)),
httpClientProvider: *httpClientProvider,
im: datasource.NewInstanceManager(newInstanceSettings(*httpClientProvider)),
logger: backend.NewLoggerWith("logger", "tsdb.cloudmonitoring"),
gceDefaultProjectGetter: utils.GCEDefaultProject,
}
@@ -109,7 +104,7 @@ func (s *Service) CheckHealth(ctx context.Context, req *backend.CheckHealthReque
}
defer func() {
if err := res.Body.Close(); err != nil {
slog.Warn("Failed to close response body", "err", err)
s.logger.Warn("Failed to close response body", "err", err)
}
}()
@@ -128,7 +123,7 @@ func (s *Service) CheckHealth(ctx context.Context, req *backend.CheckHealthReque
type Service struct {
httpClientProvider httpclient.Provider
im instancemgmt.InstanceManager
tracer tracing.Tracer
logger log.Logger
resourceHandler backend.CallResourceHandler
@@ -194,7 +189,7 @@ func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.Inst
}
for name, info := range routes {
client, err := newHTTPClient(dsInfo, opts, httpClientProvider, name)
client, err := newHTTPClient(dsInfo, opts, &httpClientProvider, name)
if err != nil {
return nil, err
}
@@ -332,7 +327,7 @@ func migrateRequest(req *backend.QueryDataRequest) error {
// QueryData takes in the frontend queries, parses them into the CloudMonitoring query format
// executes the queries against the CloudMonitoring API and parses the response into data frames
func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
logger := slog.FromContext(ctx)
logger := s.logger.FromContext(ctx)
if len(req.Queries) == 0 {
return nil, fmt.Errorf("query contains no queries")
}
@@ -354,21 +349,21 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
switch req.Queries[0].QueryType {
case string(dataquery.QueryTypeAnnotation):
return s.executeAnnotationQuery(ctx, req, *dsInfo, queries)
return s.executeAnnotationQuery(ctx, req, *dsInfo, queries, logger)
default:
return s.executeTimeSeriesQuery(ctx, req, *dsInfo, queries)
return s.executeTimeSeriesQuery(ctx, req, *dsInfo, queries, logger)
}
}
func (s *Service) executeTimeSeriesQuery(ctx context.Context, req *backend.QueryDataRequest, dsInfo datasourceInfo, queries []cloudMonitoringQueryExecutor) (
func (s *Service) executeTimeSeriesQuery(ctx context.Context, req *backend.QueryDataRequest, dsInfo datasourceInfo, queries []cloudMonitoringQueryExecutor, logger log.Logger) (
*backend.QueryDataResponse, error) {
resp := backend.NewQueryDataResponse()
for _, queryExecutor := range queries {
queryRes, dr, executedQueryString, err := queryExecutor.run(ctx, req, s, dsInfo, s.tracer)
queryRes, dr, executedQueryString, err := queryExecutor.run(ctx, req, s, dsInfo, logger)
if err != nil {
return resp, err
}
err = queryExecutor.parseResponse(queryRes, dr, executedQueryString)
err = queryExecutor.parseResponse(queryRes, dr, executedQueryString, logger)
if err != nil {
queryRes.Error = err
}
@@ -405,7 +400,6 @@ func (s *Service) buildQueryExecutors(logger log.Logger, req *backend.QueryDataR
case string(dataquery.QueryTypeTimeSeriesList), string(dataquery.QueryTypeAnnotation):
cmtsf := &cloudMonitoringTimeSeriesList{
refID: query.RefID,
logger: logger,
aliasBy: q.AliasBy,
}
if q.TimeSeriesList.View == nil || *q.TimeSeriesList.View == "" {
@@ -427,7 +421,6 @@ func (s *Service) buildQueryExecutors(logger log.Logger, req *backend.QueryDataR
case string(dataquery.QueryTypeSlo):
cmslo := &cloudMonitoringSLO{
refID: query.RefID,
logger: logger,
aliasBy: q.AliasBy,
parameters: q.SloQuery,
}
@@ -436,10 +429,10 @@ func (s *Service) buildQueryExecutors(logger log.Logger, req *backend.QueryDataR
case string(dataquery.QueryTypePromQL):
cmp := &cloudMonitoringProm{
refID: query.RefID,
logger: logger,
aliasBy: q.AliasBy,
parameters: q.PromQLQuery,
timeRange: req.Queries[0].TimeRange,
logger: logger,
}
queryInterface = cmp
default:
@@ -595,7 +588,7 @@ func (s *Service) getDefaultProject(ctx context.Context, dsInfo datasourceInfo)
return dsInfo.defaultProject, nil
}
func unmarshalResponse(logger log.Logger, res *http.Response) (cloudMonitoringResponse, error) {
func unmarshalResponse(res *http.Response, logger log.Logger) (cloudMonitoringResponse, error) {
body, err := io.ReadAll(res.Body)
if err != nil {
return cloudMonitoringResponse{}, err
@@ -646,7 +639,7 @@ func addConfigData(frames data.Frames, dl string, unit string, period *string) d
if period != nil && *period != "" {
err := addInterval(*period, frames[i].Fields[0])
if err != nil {
slog.Error("Failed to add interval", "error", err)
backend.Logger.Error("Failed to add interval", "error", err)
}
}
}

View File

@@ -12,8 +12,8 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
"github.com/grafana/grafana/pkg/infra/httpclient"
"github.com/grafana/grafana/pkg/tsdb/cloud-monitoring/kinds/dataquery"
"github.com/stretchr/testify/assert"
@@ -23,7 +23,7 @@ import (
func TestNewInstanceSettings(t *testing.T) {
t.Run("should create a new instance with empty settings", func(t *testing.T) {
cli := httpclient.NewProvider()
f := newInstanceSettings(cli)
f := newInstanceSettings(*cli)
dsInfo, err := f(context.Background(), backend.DataSourceInstanceSettings{
JSONData: json.RawMessage(`{}`),
})
@@ -34,7 +34,7 @@ func TestNewInstanceSettings(t *testing.T) {
t.Run("should create a new instance parsing settings", func(t *testing.T) {
cli := httpclient.NewProvider()
f := newInstanceSettings(cli)
f := newInstanceSettings(*cli)
dsInfo, err := f(context.Background(), backend.DataSourceInstanceSettings{
JSONData: json.RawMessage(`{"authenticationType": "test", "defaultProject": "test", "clientEmail": "test", "tokenUri": "test"}`),
})
@@ -53,7 +53,7 @@ func TestCloudMonitoring(t *testing.T) {
t.Run("parses a time series list query", func(t *testing.T) {
req := baseTimeSeriesList()
qes, err := service.buildQueryExecutors(slog, req)
qes, err := service.buildQueryExecutors(service.logger, req)
require.NoError(t, err)
queries := getCloudMonitoringListFromInterface(t, qes)
@@ -71,7 +71,7 @@ func TestCloudMonitoring(t *testing.T) {
t.Run("parses a time series query", func(t *testing.T) {
req := baseTimeSeriesQuery()
qes, err := service.buildQueryExecutors(slog, req)
qes, err := service.buildQueryExecutors(service.logger, req)
require.NoError(t, err)
queries := getCloudMonitoringQueryFromInterface(t, qes)
@@ -95,7 +95,7 @@ func TestCloudMonitoring(t *testing.T) {
"aliasBy": "testalias"
}`)
qes, err := service.buildQueryExecutors(slog, req)
qes, err := service.buildQueryExecutors(service.logger, req)
require.NoError(t, err)
queries := getCloudMonitoringListFromInterface(t, qes)
@@ -114,7 +114,7 @@ func TestCloudMonitoring(t *testing.T) {
req := deprecatedReq()
err := migrateRequest(req)
require.NoError(t, err)
qes, err := service.buildQueryExecutors(slog, req)
qes, err := service.buildQueryExecutors(service.logger, req)
require.NoError(t, err)
queries := getCloudMonitoringListFromInterface(t, qes)
@@ -157,7 +157,7 @@ func TestCloudMonitoring(t *testing.T) {
err := migrateRequest(query)
require.NoError(t, err)
qes, err := service.buildQueryExecutors(slog, query)
qes, err := service.buildQueryExecutors(service.logger, query)
require.NoError(t, err)
queries := getCloudMonitoringListFromInterface(t, qes)
assert.Equal(t, 1, len(queries))
@@ -191,7 +191,7 @@ func TestCloudMonitoring(t *testing.T) {
err := migrateRequest(req)
require.NoError(t, err)
qes, err := service.buildQueryExecutors(slog, req)
qes, err := service.buildQueryExecutors(service.logger, req)
require.NoError(t, err)
queries := getCloudMonitoringListFromInterface(t, qes)
assert.Equal(t, `+1000s`, queries[0].params["aggregation.alignmentPeriod"][0])
@@ -221,7 +221,7 @@ func TestCloudMonitoring(t *testing.T) {
err := migrateRequest(req)
require.NoError(t, err)
qes, err := service.buildQueryExecutors(slog, req)
qes, err := service.buildQueryExecutors(service.logger, req)
require.NoError(t, err)
queries := getCloudMonitoringListFromInterface(t, qes)
assert.Equal(t, `+60s`, queries[0].params["aggregation.alignmentPeriod"][0])
@@ -257,7 +257,7 @@ func TestCloudMonitoring(t *testing.T) {
err := migrateRequest(req)
require.NoError(t, err)
qes, err := service.buildQueryExecutors(slog, req)
qes, err := service.buildQueryExecutors(service.logger, req)
require.NoError(t, err)
queries := getCloudMonitoringListFromInterface(t, qes)
assert.Equal(t, `+60s`, queries[0].params["aggregation.alignmentPeriod"][0])
@@ -274,7 +274,7 @@ func TestCloudMonitoring(t *testing.T) {
err := migrateRequest(req)
require.NoError(t, err)
qes, err := service.buildQueryExecutors(slog, req)
qes, err := service.buildQueryExecutors(service.logger, req)
require.NoError(t, err)
queries := getCloudMonitoringListFromInterface(t, qes)
assert.Equal(t, `+60s`, queries[0].params["aggregation.alignmentPeriod"][0])
@@ -291,7 +291,7 @@ func TestCloudMonitoring(t *testing.T) {
err := migrateRequest(req)
require.NoError(t, err)
qes, err := service.buildQueryExecutors(slog, req)
qes, err := service.buildQueryExecutors(service.logger, req)
require.NoError(t, err)
queries := getCloudMonitoringListFromInterface(t, qes)
assert.Equal(t, `+300s`, queries[0].params["aggregation.alignmentPeriod"][0])
@@ -308,7 +308,7 @@ func TestCloudMonitoring(t *testing.T) {
err := migrateRequest(req)
require.NoError(t, err)
qes, err := service.buildQueryExecutors(slog, req)
qes, err := service.buildQueryExecutors(service.logger, req)
require.NoError(t, err)
queries := getCloudMonitoringListFromInterface(t, qes)
assert.Equal(t, `+3600s`, queries[0].params["aggregation.alignmentPeriod"][0])
@@ -329,7 +329,7 @@ func TestCloudMonitoring(t *testing.T) {
err := migrateRequest(req)
require.NoError(t, err)
qes, err := service.buildQueryExecutors(slog, req)
qes, err := service.buildQueryExecutors(service.logger, req)
require.NoError(t, err)
queries := getCloudMonitoringListFromInterface(t, qes)
assert.Equal(t, `+60s`, queries[0].params["aggregation.alignmentPeriod"][0])
@@ -361,7 +361,7 @@ func TestCloudMonitoring(t *testing.T) {
err := migrateRequest(req)
require.NoError(t, err)
qes, err := service.buildQueryExecutors(slog, req)
qes, err := service.buildQueryExecutors(service.logger, req)
require.NoError(t, err)
queries := getCloudMonitoringListFromInterface(t, qes)
assert.Equal(t, `+60s`, queries[0].params["aggregation.alignmentPeriod"][0])
@@ -393,7 +393,7 @@ func TestCloudMonitoring(t *testing.T) {
err := migrateRequest(req)
require.NoError(t, err)
qes, err := service.buildQueryExecutors(slog, req)
qes, err := service.buildQueryExecutors(service.logger, req)
require.NoError(t, err)
queries := getCloudMonitoringListFromInterface(t, qes)
assert.Equal(t, `+300s`, queries[0].params["aggregation.alignmentPeriod"][0])
@@ -425,7 +425,7 @@ func TestCloudMonitoring(t *testing.T) {
err := migrateRequest(req)
require.NoError(t, err)
qes, err := service.buildQueryExecutors(slog, req)
qes, err := service.buildQueryExecutors(service.logger, req)
require.NoError(t, err)
queries := getCloudMonitoringListFromInterface(t, qes)
assert.Equal(t, `+3600s`, queries[0].params["aggregation.alignmentPeriod"][0])
@@ -457,7 +457,7 @@ func TestCloudMonitoring(t *testing.T) {
err := migrateRequest(req)
require.NoError(t, err)
qes, err := service.buildQueryExecutors(slog, req)
qes, err := service.buildQueryExecutors(service.logger, req)
require.NoError(t, err)
queries := getCloudMonitoringListFromInterface(t, qes)
assert.Equal(t, `+600s`, queries[0].params["aggregation.alignmentPeriod"][0])
@@ -489,7 +489,7 @@ func TestCloudMonitoring(t *testing.T) {
err := migrateRequest(req)
require.NoError(t, err)
qes, err := service.buildQueryExecutors(slog, req)
qes, err := service.buildQueryExecutors(service.logger, req)
require.NoError(t, err)
queries := getCloudMonitoringListFromInterface(t, qes)
@@ -535,7 +535,7 @@ func TestCloudMonitoring(t *testing.T) {
err := migrateRequest(req)
require.NoError(t, err)
qes, err := service.buildQueryExecutors(slog, req)
qes, err := service.buildQueryExecutors(service.logger, req)
require.NoError(t, err)
queries := getCloudMonitoringListFromInterface(t, qes)
@@ -598,7 +598,7 @@ func TestCloudMonitoring(t *testing.T) {
require.NoError(t, err)
t.Run("and query type is metrics", func(t *testing.T) {
qes, err := service.buildQueryExecutors(slog, req)
qes, err := service.buildQueryExecutors(service.logger, req)
require.NoError(t, err)
queries := getCloudMonitoringListFromInterface(t, qes)
@@ -647,7 +647,7 @@ func TestCloudMonitoring(t *testing.T) {
err = migrateRequest(req)
require.NoError(t, err)
qes, err = service.buildQueryExecutors(slog, req)
qes, err = service.buildQueryExecutors(service.logger, req)
require.NoError(t, err)
tqueries := getCloudMonitoringQueryFromInterface(t, qes)
@@ -675,7 +675,7 @@ func TestCloudMonitoring(t *testing.T) {
err := migrateRequest(req)
require.NoError(t, err)
qes, err := service.buildQueryExecutors(slog, req)
qes, err := service.buildQueryExecutors(service.logger, req)
require.NoError(t, err)
queries := getCloudMonitoringSLOFromInterface(t, qes)
@@ -705,7 +705,7 @@ func TestCloudMonitoring(t *testing.T) {
err = migrateRequest(req)
require.NoError(t, err)
qes, err = service.buildQueryExecutors(slog, req)
qes, err = service.buildQueryExecutors(service.logger, req)
require.NoError(t, err)
qqueries := getCloudMonitoringSLOFromInterface(t, qes)
assert.Equal(t, "ALIGN_NEXT_OLDER", qqueries[0].params["aggregation.perSeriesAligner"][0])
@@ -730,7 +730,7 @@ func TestCloudMonitoring(t *testing.T) {
err = migrateRequest(req)
require.NoError(t, err)
qes, err = service.buildQueryExecutors(slog, req)
qes, err = service.buildQueryExecutors(service.logger, req)
require.NoError(t, err)
qqqueries := getCloudMonitoringSLOFromInterface(t, qes)
assert.Equal(t, `aggregation.alignmentPeriod=%2B60s&aggregation.perSeriesAligner=ALIGN_NEXT_OLDER&filter=select_slo_burn_rate%28%22projects%2Ftest-proj%2Fservices%2Ftest-service%2FserviceLevelObjectives%2Ftest-slo%22%2C+%221h%22%29&interval.endTime=2018-03-15T13%3A34%3A00Z&interval.startTime=2018-03-15T13%3A00%3A00Z`, qqqueries[0].params.Encode())
@@ -812,7 +812,7 @@ func TestCloudMonitoring(t *testing.T) {
err := migrateRequest(req)
require.NoError(t, err)
qes, err := service.buildQueryExecutors(slog, req)
qes, err := service.buildQueryExecutors(service.logger, req)
require.NoError(t, err)
queries := getCloudMonitoringListFromInterface(t, qes)
@@ -842,7 +842,7 @@ func TestCloudMonitoring(t *testing.T) {
err := migrateRequest(req)
require.NoError(t, err)
qes, err := service.buildQueryExecutors(slog, req)
qes, err := service.buildQueryExecutors(service.logger, req)
require.NoError(t, err)
queries := getCloudMonitoringListFromInterface(t, qes)
@@ -872,7 +872,7 @@ func TestCloudMonitoring(t *testing.T) {
err := migrateRequest(req)
require.NoError(t, err)
qes, err := service.buildQueryExecutors(slog, req)
qes, err := service.buildQueryExecutors(service.logger, req)
require.NoError(t, err)
queries := getCloudMonitoringListFromInterface(t, qes)
@@ -900,7 +900,7 @@ func TestCloudMonitoring(t *testing.T) {
err := migrateRequest(req)
require.NoError(t, err)
qes, err := service.buildQueryExecutors(slog, req)
qes, err := service.buildQueryExecutors(service.logger, req)
require.NoError(t, err)
queries := getCloudMonitoringListFromInterface(t, qes)
@@ -930,7 +930,7 @@ func TestCloudMonitoring(t *testing.T) {
err := migrateRequest(req)
require.NoError(t, err)
qes, err := service.buildQueryExecutors(slog, req)
qes, err := service.buildQueryExecutors(service.logger, req)
require.NoError(t, err)
queries := getCloudMonitoringListFromInterface(t, qes)
@@ -958,7 +958,7 @@ func TestCloudMonitoring(t *testing.T) {
err := migrateRequest(req)
require.NoError(t, err)
qes, err := service.buildQueryExecutors(slog, req)
qes, err := service.buildQueryExecutors(service.logger, req)
require.NoError(t, err)
queries := getCloudMonitoringListFromInterface(t, qes)

File diff suppressed because it is too large Load Diff

View File

@@ -5,7 +5,6 @@ import (
"github.com/grafana/grafana-google-sdk-go/pkg/tokenprovider"
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
infrahttp "github.com/grafana/grafana/pkg/infra/httpclient"
)
const (
@@ -59,7 +58,7 @@ func getMiddleware(model *datasourceInfo, routePath string) (httpclient.Middlewa
return tokenprovider.AuthMiddleware(provider), nil
}
func newHTTPClient(model *datasourceInfo, opts httpclient.Options, clientProvider infrahttp.Provider, route string) (*http.Client, error) {
func newHTTPClient(model *datasourceInfo, opts httpclient.Options, clientProvider *httpclient.Provider, route string) (*http.Client, error) {
m, err := getMiddleware(model, route)
if err != nil {
return nil, err

View File

@@ -0,0 +1,66 @@
// Package jsonitere wraps json-iterator/go's Iterator methods with error returns
// so linting can catch unchecked errors.
// The underlying iterator's Error property is returned and not reset.
// See json-iterator/go for method documentation and additional methods that
// can be added to this library.
package jsonitere
import (
j "github.com/json-iterator/go"
)
type Iterator struct {
// named property instead of embedded so there is no
// confusion about which method or property is called
i *j.Iterator
}
func NewIterator(i *j.Iterator) *Iterator {
return &Iterator{i}
}
func (iter *Iterator) Read() (any, error) {
return iter.i.Read(), iter.i.Error
}
func (iter *Iterator) ReadAny() (j.Any, error) {
return iter.i.ReadAny(), iter.i.Error
}
func (iter *Iterator) ReadArray() (bool, error) {
return iter.i.ReadArray(), iter.i.Error
}
func (iter *Iterator) ReadObject() (string, error) {
return iter.i.ReadObject(), iter.i.Error
}
func (iter *Iterator) ReadString() (string, error) {
return iter.i.ReadString(), iter.i.Error
}
func (iter *Iterator) WhatIsNext() (j.ValueType, error) {
return iter.i.WhatIsNext(), iter.i.Error
}
func (iter *Iterator) Skip() error {
iter.i.Skip()
return iter.i.Error
}
func (iter *Iterator) SkipAndReturnBytes() []byte {
return iter.i.SkipAndReturnBytes()
}
func (iter *Iterator) ReadVal(obj any) error {
iter.i.ReadVal(obj)
return iter.i.Error
}
func (iter *Iterator) ReadFloat64() (float64, error) {
return iter.i.ReadFloat64(), iter.i.Error
}
func (iter *Iterator) ReadInt8() (int8, error) {
return iter.i.ReadInt8(), iter.i.Error
}

View File

@@ -11,28 +11,27 @@ import (
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/tsdb/cloud-monitoring/converter"
jsoniter "github.com/json-iterator/go"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/util/converter"
)
func (promQLQ *cloudMonitoringProm) run(ctx context.Context, req *backend.QueryDataRequest,
s *Service, dsInfo datasourceInfo, tracer tracing.Tracer) (*backend.DataResponse, any, string, error) {
s *Service, dsInfo datasourceInfo, logger log.Logger) (*backend.DataResponse, any, string, error) {
dr := &backend.DataResponse{}
projectName, err := s.ensureProject(ctx, dsInfo, promQLQ.parameters.ProjectName)
if err != nil {
dr.Error = err
return dr, promResponse{}, "", nil
}
r, err := createRequest(ctx, promQLQ.logger, &dsInfo, path.Join("/v1/projects", projectName, "location/global/prometheus/api/v1/query_range"), nil)
r, err := createRequest(ctx, &dsInfo, path.Join("/v1/projects", projectName, "location/global/prometheus/api/v1/query_range"), nil)
if err != nil {
dr.Error = err
return dr, promResponse{}, "", nil
}
span := traceReq(ctx, tracer, req, dsInfo, r, "")
span := traceReq(ctx, req, dsInfo, r, "")
defer span.End()
requestBody := map[string]any{
@@ -45,7 +44,7 @@ func (promQLQ *cloudMonitoringProm) run(ctx context.Context, req *backend.QueryD
res, err := doRequestProm(r, dsInfo, requestBody)
defer func() {
if err := res.Body.Close(); err != nil {
promQLQ.logger.Error("Failed to close response body", "err", err)
s.logger.Error("Failed to close response body", "err", err)
}
}()
if err != nil {
@@ -83,7 +82,7 @@ func parseProm(res *http.Response) backend.DataResponse {
// We are not parsing the response in this function. ReadPrometheusStyleResult needs an open reader and we cannot
// pass an open reader to this function because lint complains as it is unsafe
func (promQLQ *cloudMonitoringProm) parseResponse(queryRes *backend.DataResponse,
response any, executedQueryString string) error {
response any, executedQueryString string, logger log.Logger) error {
r := response.(backend.DataResponse)
// Add frame to attach metadata
if len(r.Frames) == 0 {

View File

@@ -13,6 +13,7 @@ import (
)
func TestPromqlQuery(t *testing.T) {
service := &Service{}
t.Run("parseResponse is returned", func(t *testing.T) {
fileData, err := os.ReadFile("./test-data/11-prom-response.json")
reader := strings.NewReader(string(fileData))
@@ -24,7 +25,7 @@ func TestPromqlQuery(t *testing.T) {
dataRes := &backend.DataResponse{}
query := &cloudMonitoringProm{}
parsedProm := parseProm(&res)
err = query.parseResponse(dataRes, parsedProm, "")
err = query.parseResponse(dataRes, parsedProm, "", service.logger)
require.NoError(t, err)
frame := dataRes.Frames[0]
experimental.CheckGoldenJSONFrame(t, "test-data", "parse-response-is-returned", frame, false)

View File

@@ -15,6 +15,7 @@ import (
"time"
"github.com/andybalholm/brotli"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/resource/httpadapter"
)
@@ -199,14 +200,14 @@ func decode(encoding string, original io.ReadCloser) ([]byte, error) {
}
defer func() {
if err := reader.(io.ReadCloser).Close(); err != nil {
slog.Warn("Failed to close reader body", "err", err)
backend.Logger.Warn("Failed to close reader body", "err", err)
}
}()
case "deflate":
reader = flate.NewReader(original)
defer func() {
if err := reader.(io.ReadCloser).Close(); err != nil {
slog.Warn("Failed to close reader body", "err", err)
backend.Logger.Warn("Failed to close reader body", "err", err)
}
}()
case "br":
@@ -246,7 +247,7 @@ func encode(encoding string, body []byte) ([]byte, error) {
_, err = writer.Write(body)
if writeCloser, ok := writer.(io.WriteCloser); ok {
if err := writeCloser.Close(); err != nil {
slog.Warn("Failed to close writer body", "err", err)
backend.Logger.Warn("Failed to close writer body", "err", err)
}
}
if err != nil {
@@ -284,7 +285,7 @@ func doRequest(req *http.Request, cli *http.Client, responseFn processResponse)
}
defer func() {
if err := res.Body.Close(); err != nil {
slog.Warn("Failed to close response body", "err", err)
backend.Logger.Warn("Failed to close response body", "err", err)
}
}()
encoding := res.Header.Get("Content-Encoding")
@@ -346,7 +347,7 @@ func buildResponse(responses []json.RawMessage, encoding string) ([]byte, error)
}
func (s *Service) setRequestVariables(req *http.Request, subDataSource string) (*http.Client, int, error) {
slog.Debug("Received resource call", "url", req.URL.String(), "method", req.Method)
s.logger.Debug("Received resource call", "url", req.URL.String(), "method", req.Method)
newPath, err := getTarget(req.URL.Path)
if err != nil {
@@ -386,7 +387,7 @@ func writeResponseBytes(rw http.ResponseWriter, code int, msg []byte) {
rw.WriteHeader(code)
_, err := rw.Write(msg)
if err != nil {
slog.Error("Unable to write HTTP response", "error", err)
backend.Logger.Error("Unable to write HTTP response", "error", err)
}
}

View File

@@ -11,6 +11,7 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@@ -115,6 +116,7 @@ func Test_setRequestVariables(t *testing.T) {
},
},
},
logger: log.DefaultLogger,
}
req, err := http.NewRequest(http.MethodGet, "http://foo/cloudmonitoring/v3/projects/bar/metricDescriptors", nil)
if err != nil {

View File

@@ -7,18 +7,17 @@ import (
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
)
func (sloQ *cloudMonitoringSLO) run(ctx context.Context, req *backend.QueryDataRequest,
s *Service, dsInfo datasourceInfo, tracer tracing.Tracer) (*backend.DataResponse, any, string, error) {
return runTimeSeriesRequest(ctx, sloQ.logger, req, s, dsInfo, tracer, sloQ.parameters.ProjectName, sloQ.params, nil)
s *Service, dsInfo datasourceInfo, logger log.Logger) (*backend.DataResponse, any, string, error) {
return runTimeSeriesRequest(ctx, req, s, dsInfo, sloQ.parameters.ProjectName, sloQ.params, nil, logger)
}
func (sloQ *cloudMonitoringSLO) parseResponse(queryRes *backend.DataResponse,
response any, executedQueryString string) error {
return parseTimeSeriesResponse(queryRes, response.(cloudMonitoringResponse), executedQueryString, sloQ, sloQ.params, []string{})
response any, executedQueryString string, logger log.Logger) error {
return parseTimeSeriesResponse(queryRes, response.(cloudMonitoringResponse), executedQueryString, sloQ, sloQ.params, []string{}, logger)
}
func (sloQ *cloudMonitoringSLO) buildDeepLink() string {

View File

@@ -12,6 +12,7 @@ import (
)
func SLOQuery(t *testing.T) {
service := &Service{}
t.Run("when data from query returns slo and alias by is defined", func(t *testing.T) {
data, err := loadTestFile("./test-data/6-series-response-slo.json")
require.NoError(t, err)
@@ -29,7 +30,7 @@ func SLOQuery(t *testing.T) {
},
aliasBy: "{{project}} - {{service}} - {{slo}} - {{selector}}",
}
err = query.parseResponse(res, data, "")
err = query.parseResponse(res, data, "", service.logger)
require.NoError(t, err)
frames := res.Frames
require.NoError(t, err)
@@ -53,7 +54,7 @@ func SLOQuery(t *testing.T) {
SloId: "test-slo",
},
}
err = query.parseResponse(res, data, "")
err = query.parseResponse(res, data, "", service.logger)
require.NoError(t, err)
frames := res.Frames
require.NoError(t, err)
@@ -68,7 +69,7 @@ func SLOQuery(t *testing.T) {
res := &backend.DataResponse{}
query := &cloudMonitoringSLO{params: url.Values{}, parameters: &dataquery.SLOQuery{SloId: "yes"}}
err = query.parseResponse(res, data, "")
err = query.parseResponse(res, data, "", service.logger)
require.NoError(t, err)
frames := res.Frames
assert.Equal(t, len(frames[0].Fields[1].Config.Links), 0)

View File

@@ -0,0 +1,258 @@
// Copied from https://github.com/grafana/grafana/blob/main/pkg/tsdb/intervalv2/intervalv2.go
// We're copying this to not block ourselves from decoupling until the conversation here is resolved
// https://raintank-corp.slack.com/archives/C05QFJUHUQ6/p1700064431005089
package time
import (
"fmt"
"regexp"
"strings"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/gtime"
)
var (
DefaultRes int64 = 1500
defaultMinInterval = time.Millisecond * 1
year = time.Hour * 24 * 365
day = time.Hour * 24
)
type Interval struct {
Text string
Value time.Duration
}
type intervalCalculator struct {
minInterval time.Duration
}
type Calculator interface {
Calculate(timerange backend.TimeRange, minInterval time.Duration, maxDataPoints int64) Interval
CalculateSafeInterval(timerange backend.TimeRange, resolution int64) Interval
}
type CalculatorOptions struct {
MinInterval time.Duration
}
func NewCalculator(opts ...CalculatorOptions) *intervalCalculator {
calc := &intervalCalculator{}
for _, o := range opts {
if o.MinInterval == 0 {
calc.minInterval = defaultMinInterval
} else {
calc.minInterval = o.MinInterval
}
}
return calc
}
func (i *Interval) Milliseconds() int64 {
return i.Value.Nanoseconds() / int64(time.Millisecond)
}
func (ic *intervalCalculator) Calculate(timerange backend.TimeRange, minInterval time.Duration, maxDataPoints int64) Interval {
to := timerange.To.UnixNano()
from := timerange.From.UnixNano()
resolution := maxDataPoints
if resolution == 0 {
resolution = DefaultRes
}
calculatedInterval := time.Duration((to - from) / resolution)
if calculatedInterval < minInterval {
return Interval{Text: FormatDuration(minInterval), Value: minInterval}
}
rounded := roundInterval(calculatedInterval)
return Interval{Text: FormatDuration(rounded), Value: rounded}
}
func (ic *intervalCalculator) CalculateSafeInterval(timerange backend.TimeRange, safeRes int64) Interval {
to := timerange.To.UnixNano()
from := timerange.From.UnixNano()
safeInterval := time.Duration((to - from) / safeRes)
rounded := roundInterval(safeInterval)
return Interval{Text: FormatDuration(rounded), Value: rounded}
}
// GetIntervalFrom returns the minimum interval.
// dsInterval is the string representation of data source min interval, if configured.
// queryInterval is the string representation of query interval (min interval), e.g. "10ms" or "10s".
// queryIntervalMS is a pre-calculated numeric representation of the query interval in milliseconds.
func GetIntervalFrom(dsInterval, queryInterval string, queryIntervalMS int64, defaultInterval time.Duration) (time.Duration, error) {
// Apparently we are setting default value of queryInterval to 0s now
interval := queryInterval
if interval == "0s" {
interval = ""
}
if interval == "" {
if queryIntervalMS != 0 {
return time.Duration(queryIntervalMS) * time.Millisecond, nil
}
}
if interval == "" && dsInterval != "" {
interval = dsInterval
}
if interval == "" {
return defaultInterval, nil
}
parsedInterval, err := ParseIntervalStringToTimeDuration(interval)
if err != nil {
return time.Duration(0), err
}
return parsedInterval, nil
}
func ParseIntervalStringToTimeDuration(interval string) (time.Duration, error) {
formattedInterval := strings.Replace(strings.Replace(interval, "<", "", 1), ">", "", 1)
isPureNum, err := regexp.MatchString(`^\d+$`, formattedInterval)
if err != nil {
return time.Duration(0), err
}
if isPureNum {
formattedInterval += "s"
}
parsedInterval, err := gtime.ParseDuration(formattedInterval)
if err != nil {
return time.Duration(0), err
}
return parsedInterval, nil
}
// FormatDuration converts a duration into the kbn format e.g. 1m 2h or 3d
func FormatDuration(inter time.Duration) string {
if inter >= year {
return fmt.Sprintf("%dy", inter/year)
}
if inter >= day {
return fmt.Sprintf("%dd", inter/day)
}
if inter >= time.Hour {
return fmt.Sprintf("%dh", inter/time.Hour)
}
if inter >= time.Minute {
return fmt.Sprintf("%dm", inter/time.Minute)
}
if inter >= time.Second {
return fmt.Sprintf("%ds", inter/time.Second)
}
if inter >= time.Millisecond {
return fmt.Sprintf("%dms", inter/time.Millisecond)
}
return "1ms"
}
//nolint:gocyclo
func roundInterval(interval time.Duration) time.Duration {
switch {
// 0.01s
case interval <= 10*time.Millisecond:
return time.Millisecond * 1 // 0.001s
// 0.015s
case interval <= 15*time.Millisecond:
return time.Millisecond * 10 // 0.01s
// 0.035s
case interval <= 35*time.Millisecond:
return time.Millisecond * 20 // 0.02s
// 0.075s
case interval <= 75*time.Millisecond:
return time.Millisecond * 50 // 0.05s
// 0.15s
case interval <= 150*time.Millisecond:
return time.Millisecond * 100 // 0.1s
// 0.35s
case interval <= 350*time.Millisecond:
return time.Millisecond * 200 // 0.2s
// 0.75s
case interval <= 750*time.Millisecond:
return time.Millisecond * 500 // 0.5s
// 1.5s
case interval <= 1500*time.Millisecond:
return time.Millisecond * 1000 // 1s
// 3.5s
case interval <= 3500*time.Millisecond:
return time.Millisecond * 2000 // 2s
// 7.5s
case interval <= 7500*time.Millisecond:
return time.Millisecond * 5000 // 5s
// 12.5s
case interval <= 12500*time.Millisecond:
return time.Millisecond * 10000 // 10s
// 17.5s
case interval <= 17500*time.Millisecond:
return time.Millisecond * 15000 // 15s
// 25s
case interval <= 25000*time.Millisecond:
return time.Millisecond * 20000 // 20s
// 45s
case interval <= 45000*time.Millisecond:
return time.Millisecond * 30000 // 30s
// 1.5m
case interval <= 90000*time.Millisecond:
return time.Millisecond * 60000 // 1m
// 3.5m
case interval <= 210000*time.Millisecond:
return time.Millisecond * 120000 // 2m
// 7.5m
case interval <= 450000*time.Millisecond:
return time.Millisecond * 300000 // 5m
// 12.5m
case interval <= 750000*time.Millisecond:
return time.Millisecond * 600000 // 10m
// 17.5m
case interval <= 1050000*time.Millisecond:
return time.Millisecond * 900000 // 15m
// 25m
case interval <= 1500000*time.Millisecond:
return time.Millisecond * 1200000 // 20m
// 45m
case interval <= 2700000*time.Millisecond:
return time.Millisecond * 1800000 // 30m
// 1.5h
case interval <= 5400000*time.Millisecond:
return time.Millisecond * 3600000 // 1h
// 2.5h
case interval <= 9000000*time.Millisecond:
return time.Millisecond * 7200000 // 2h
// 4.5h
case interval <= 16200000*time.Millisecond:
return time.Millisecond * 10800000 // 3h
// 9h
case interval <= 32400000*time.Millisecond:
return time.Millisecond * 21600000 // 6h
// 24h
case interval <= 86400000*time.Millisecond:
return time.Millisecond * 43200000 // 12h
// 48h
case interval <= 172800000*time.Millisecond:
return time.Millisecond * 86400000 // 24h
// 1w
case interval <= 604800000*time.Millisecond:
return time.Millisecond * 86400000 // 24h
// 3w
case interval <= 1814400000*time.Millisecond:
return time.Millisecond * 604800000 // 1w
// 2y
case interval < 3628800000*time.Millisecond:
return time.Millisecond * 2592000000 // 30d
default:
return time.Millisecond * 31536000000 // 1y
}
}

View File

@@ -8,20 +8,20 @@ import (
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/huandu/xstrings"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/tsdb/cloud-monitoring/kinds/dataquery"
)
func (timeSeriesFilter *cloudMonitoringTimeSeriesList) run(ctx context.Context, req *backend.QueryDataRequest,
s *Service, dsInfo datasourceInfo, tracer tracing.Tracer) (*backend.DataResponse, any, string, error) {
return runTimeSeriesRequest(ctx, timeSeriesFilter.logger, req, s, dsInfo, tracer, timeSeriesFilter.parameters.ProjectName, timeSeriesFilter.params, nil)
s *Service, dsInfo datasourceInfo, logger log.Logger) (*backend.DataResponse, any, string, error) {
return runTimeSeriesRequest(ctx, req, s, dsInfo, timeSeriesFilter.parameters.ProjectName, timeSeriesFilter.params, nil, logger)
}
func parseTimeSeriesResponse(queryRes *backend.DataResponse,
response cloudMonitoringResponse, executedQueryString string, query cloudMonitoringQueryExecutor, params url.Values, groupBys []string) error {
response cloudMonitoringResponse, executedQueryString string, query cloudMonitoringQueryExecutor, params url.Values, groupBys []string, logger log.Logger) error {
frames := data.Frames{}
for _, series := range response.TimeSeries {
@@ -56,8 +56,8 @@ func parseTimeSeriesResponse(queryRes *backend.DataResponse,
}
func (timeSeriesFilter *cloudMonitoringTimeSeriesList) parseResponse(queryRes *backend.DataResponse,
response any, executedQueryString string) error {
return parseTimeSeriesResponse(queryRes, response.(cloudMonitoringResponse), executedQueryString, timeSeriesFilter, timeSeriesFilter.params, timeSeriesFilter.parameters.GroupBys)
response any, executedQueryString string, logger log.Logger) error {
return parseTimeSeriesResponse(queryRes, response.(cloudMonitoringResponse), executedQueryString, timeSeriesFilter, timeSeriesFilter.params, timeSeriesFilter.parameters.GroupBys, logger)
}
func (timeSeriesFilter *cloudMonitoringTimeSeriesList) buildDeepLink() string {
@@ -89,7 +89,7 @@ func (timeSeriesFilter *cloudMonitoringTimeSeriesList) buildDeepLink() string {
timeSeriesFilter.params.Get("interval.endTime"),
)
if err != nil {
slog.Error(
backend.Logger.Error(
"Failed to generate deep link: unable to parse metrics explorer URL",
"ProjectName", timeSeriesFilter.parameters.ProjectName,
"error", err,

View File

@@ -19,6 +19,7 @@ import (
)
func TestTimeSeriesFilter(t *testing.T) {
service := &Service{}
t.Run("parses params", func(t *testing.T) {
query := &cloudMonitoringTimeSeriesList{parameters: &dataquery.TimeSeriesList{}}
query.setParams(time.Time{}, time.Time{}, 0, 0)
@@ -63,7 +64,7 @@ func TestTimeSeriesFilter(t *testing.T) {
res := &backend.DataResponse{}
query := &cloudMonitoringTimeSeriesList{params: url.Values{}, parameters: &dataquery.TimeSeriesList{}}
err = query.parseResponse(res, data, "")
err = query.parseResponse(res, data, "", service.logger)
require.NoError(t, err)
frames := res.Frames
require.Len(t, frames, 1)
@@ -86,7 +87,7 @@ func TestTimeSeriesFilter(t *testing.T) {
assert.Equal(t, 3, len(data.TimeSeries))
res := &backend.DataResponse{}
query := &cloudMonitoringTimeSeriesList{params: url.Values{}, parameters: &dataquery.TimeSeriesList{}}
err = query.parseResponse(res, data, "")
err = query.parseResponse(res, data, "", service.logger)
require.NoError(t, err)
field := res.Frames[0].Fields[1]
@@ -128,7 +129,7 @@ func TestTimeSeriesFilter(t *testing.T) {
query := &cloudMonitoringTimeSeriesList{params: url.Values{}, parameters: &dataquery.TimeSeriesList{GroupBys: []string{
"metric.label.instance_name", "resource.label.zone",
}}}
err = query.parseResponse(res, data, "")
err = query.parseResponse(res, data, "", service.logger)
require.NoError(t, err)
frames := res.Frames
require.NoError(t, err)
@@ -153,7 +154,7 @@ func TestTimeSeriesFilter(t *testing.T) {
},
aliasBy: "{{metric.type}} - {{metric.label.instance_name}} - {{resource.label.zone}}",
}
err = query.parseResponse(res, data, "")
err = query.parseResponse(res, data, "", service.logger)
require.NoError(t, err)
frames := res.Frames
require.NoError(t, err)
@@ -170,7 +171,7 @@ func TestTimeSeriesFilter(t *testing.T) {
parameters: &dataquery.TimeSeriesList{GroupBys: []string{"metric.label.instance_name", "resource.label.zone"}},
aliasBy: "metric {{metric.name}} service {{metric.service}}",
}
err = query.parseResponse(res, data, "")
err = query.parseResponse(res, data, "", service.logger)
require.NoError(t, err)
frames := res.Frames
require.NoError(t, err)
@@ -192,7 +193,7 @@ func TestTimeSeriesFilter(t *testing.T) {
parameters: &dataquery.TimeSeriesList{},
aliasBy: "{{bucket}}",
}
err = query.parseResponse(res, data, "")
err = query.parseResponse(res, data, "", service.logger)
require.NoError(t, err)
frames := res.Frames
require.NoError(t, err)
@@ -237,7 +238,7 @@ func TestTimeSeriesFilter(t *testing.T) {
parameters: &dataquery.TimeSeriesList{},
aliasBy: "{{bucket}}",
}
err = query.parseResponse(res, data, "")
err = query.parseResponse(res, data, "", service.logger)
require.NoError(t, err)
frames := res.Frames
require.NoError(t, err)
@@ -275,7 +276,7 @@ func TestTimeSeriesFilter(t *testing.T) {
parameters: &dataquery.TimeSeriesList{},
aliasBy: "{{bucket}}",
}
err = query.parseResponse(res, data, "")
err = query.parseResponse(res, data, "", service.logger)
require.NoError(t, err)
require.NoError(t, err)
assert.Equal(t, 3, len(res.Frames))
@@ -315,7 +316,7 @@ func TestTimeSeriesFilter(t *testing.T) {
parameters: &dataquery.TimeSeriesList{},
aliasBy: "{{metadata.system_labels.test}}",
}
err = query.parseResponse(res, data, "")
err = query.parseResponse(res, data, "", service.logger)
require.NoError(t, err)
frames := res.Frames
require.NoError(t, err)
@@ -333,7 +334,7 @@ func TestTimeSeriesFilter(t *testing.T) {
parameters: &dataquery.TimeSeriesList{},
aliasBy: "{{metadata.system_labels.test2}}",
}
err = query.parseResponse(res, data, "")
err = query.parseResponse(res, data, "", service.logger)
require.NoError(t, err)
frames := res.Frames
require.NoError(t, err)
@@ -349,7 +350,7 @@ func TestTimeSeriesFilter(t *testing.T) {
assert.Equal(t, 1, len(data.TimeSeries))
res := &backend.DataResponse{}
query := &cloudMonitoringTimeSeriesList{params: url.Values{}, parameters: &dataquery.TimeSeriesList{}}
err = query.parseResponse(res, data, "")
err = query.parseResponse(res, data, "", service.logger)
require.NoError(t, err)
frames := res.Frames
require.NoError(t, err)
@@ -362,7 +363,7 @@ func TestTimeSeriesFilter(t *testing.T) {
assert.Equal(t, 3, len(data.TimeSeries))
res := &backend.DataResponse{}
query := &cloudMonitoringTimeSeriesList{params: url.Values{}, parameters: &dataquery.TimeSeriesList{}}
err = query.parseResponse(res, data, "")
err = query.parseResponse(res, data, "", service.logger)
require.NoError(t, err)
frames := res.Frames
require.NoError(t, err)
@@ -391,7 +392,7 @@ func TestTimeSeriesFilter(t *testing.T) {
To: fromStart.Add(34 * time.Minute),
},
}
err = query.parseResponse(res, data, "")
err = query.parseResponse(res, data, "", service.logger)
require.NoError(t, err)
frames := res.Frames
assert.Equal(t, "test-proj - asia-northeast1-c - 6724404429462225363 - 200", frames[0].Fields[1].Name)
@@ -412,7 +413,7 @@ func TestTimeSeriesFilter(t *testing.T) {
GraphPeriod: strPtr("60s"),
},
}
err = query.parseResponse(res, data, "")
err = query.parseResponse(res, data, "", service.logger)
require.NoError(t, err)
assert.Equal(t, "value_utilization_sum", res.Frames[0].Fields[1].Name)
})
@@ -423,7 +424,7 @@ func TestTimeSeriesFilter(t *testing.T) {
assert.Equal(t, 3, len(data.TimeSeries))
res := &backend.DataResponse{}
query := &cloudMonitoringTimeSeriesList{params: url.Values{}, parameters: &dataquery.TimeSeriesList{}}
err = query.parseResponse(res, data, "")
err = query.parseResponse(res, data, "", service.logger)
require.NoError(t, err)
frames := res.Frames
custom, ok := frames[0].Meta.Custom.(map[string]any)
@@ -441,7 +442,7 @@ func TestTimeSeriesFilter(t *testing.T) {
query := &cloudMonitoringTimeSeriesList{params: url.Values{
"aggregation.alignmentPeriod": []string{"+60s"},
}, parameters: &dataquery.TimeSeriesList{}}
err = query.parseResponse(res, data, "")
err = query.parseResponse(res, data, "", service.logger)
require.NoError(t, err)
frames := res.Frames
timeField := frames[0].Fields[0]
@@ -455,7 +456,7 @@ func TestTimeSeriesFilter(t *testing.T) {
assert.Equal(t, 1, len(data.TimeSeries))
res := &backend.DataResponse{}
require.NoError(t, (&cloudMonitoringTimeSeriesList{parameters: &dataquery.TimeSeriesList{GroupBys: []string{"test_group_by"}}}).parseResponse(res, data, "test_query"))
require.NoError(t, (&cloudMonitoringTimeSeriesList{parameters: &dataquery.TimeSeriesList{GroupBys: []string{"test_group_by"}}}).parseResponse(res, data, "test_query", service.logger))
require.NotNil(t, res.Frames[0].Meta)
assert.Equal(t, sdkdata.FrameMeta{
@@ -478,7 +479,7 @@ func TestTimeSeriesFilter(t *testing.T) {
assert.Equal(t, 1, len(data.TimeSeries))
res := &backend.DataResponse{}
require.NoError(t, (&cloudMonitoringTimeSeriesList{parameters: &dataquery.TimeSeriesList{GroupBys: []string{"test_group_by"}}}).parseResponse(res, data, "test_query"))
require.NoError(t, (&cloudMonitoringTimeSeriesList{parameters: &dataquery.TimeSeriesList{GroupBys: []string{"test_group_by"}}}).parseResponse(res, data, "test_query", service.logger))
require.NotNil(t, res.Frames[0].Meta)
assert.Equal(t, sdkdata.FrameMeta{
@@ -501,7 +502,7 @@ func TestTimeSeriesFilter(t *testing.T) {
assert.Equal(t, 1, len(data.TimeSeries))
res := &backend.DataResponse{}
require.NoError(t, (&cloudMonitoringTimeSeriesList{parameters: &dataquery.TimeSeriesList{GroupBys: []string{"test_group_by"}}}).parseResponse(res, data, "test_query"))
require.NoError(t, (&cloudMonitoringTimeSeriesList{parameters: &dataquery.TimeSeriesList{GroupBys: []string{"test_group_by"}}}).parseResponse(res, data, "test_query", service.logger))
require.NotNil(t, res.Frames[0].Meta)
assert.Equal(t, sdkdata.FrameMeta{

View File

@@ -7,10 +7,9 @@ import (
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/tsdb/intervalv2"
gcmTime "github.com/grafana/grafana/pkg/tsdb/cloud-monitoring/time"
)
func (timeSeriesQuery *cloudMonitoringTimeSeriesQuery) appendGraphPeriod(req *backend.QueryDataRequest) string {
@@ -18,7 +17,7 @@ func (timeSeriesQuery *cloudMonitoringTimeSeriesQuery) appendGraphPeriod(req *ba
// If not set, the default behavior is to set an automatic value
if timeSeriesQuery.parameters.GraphPeriod == nil || *timeSeriesQuery.parameters.GraphPeriod != "disabled" {
if timeSeriesQuery.parameters.GraphPeriod == nil || *timeSeriesQuery.parameters.GraphPeriod == "auto" || *timeSeriesQuery.parameters.GraphPeriod == "" {
intervalCalculator := intervalv2.NewCalculator(intervalv2.CalculatorOptions{})
intervalCalculator := gcmTime.NewCalculator(gcmTime.CalculatorOptions{})
interval := intervalCalculator.Calculate(req.Queries[0].TimeRange, time.Duration(timeSeriesQuery.IntervalMS/1000)*time.Second, req.Queries[0].MaxDataPoints)
timeSeriesQuery.parameters.GraphPeriod = &interval.Text
}
@@ -28,7 +27,7 @@ func (timeSeriesQuery *cloudMonitoringTimeSeriesQuery) appendGraphPeriod(req *ba
}
func (timeSeriesQuery *cloudMonitoringTimeSeriesQuery) run(ctx context.Context, req *backend.QueryDataRequest,
s *Service, dsInfo datasourceInfo, tracer tracing.Tracer) (*backend.DataResponse, any, string, error) {
s *Service, dsInfo datasourceInfo, logger log.Logger) (*backend.DataResponse, any, string, error) {
timeSeriesQuery.parameters.Query += timeSeriesQuery.appendGraphPeriod(req)
from := req.Queries[0].TimeRange.From
to := req.Queries[0].TimeRange.To
@@ -37,11 +36,11 @@ func (timeSeriesQuery *cloudMonitoringTimeSeriesQuery) run(ctx context.Context,
requestBody := map[string]any{
"query": timeSeriesQuery.parameters.Query,
}
return runTimeSeriesRequest(ctx, timeSeriesQuery.logger, req, s, dsInfo, tracer, timeSeriesQuery.parameters.ProjectName, nil, requestBody)
return runTimeSeriesRequest(ctx, req, s, dsInfo, timeSeriesQuery.parameters.ProjectName, nil, requestBody, logger)
}
func (timeSeriesQuery *cloudMonitoringTimeSeriesQuery) parseResponse(queryRes *backend.DataResponse,
res any, executedQueryString string) error {
res any, executedQueryString string, logger log.Logger) error {
response := res.(cloudMonitoringResponse)
frames := data.Frames{}
@@ -103,7 +102,7 @@ func (timeSeriesQuery *cloudMonitoringTimeSeriesQuery) buildDeepLink() string {
timeSeriesQuery.timeRange.To.Format(time.RFC3339Nano),
)
if err != nil {
slog.Error(
backend.Logger.Error(
"Failed to generate deep link: unable to parse metrics explorer URL",
"ProjectName", timeSeriesQuery.parameters.Query,
"error", err,

View File

@@ -13,6 +13,7 @@ import (
)
func TestTimeSeriesQuery(t *testing.T) {
service := &Service{}
t.Run("multiple point descriptor is returned", func(t *testing.T) {
data, err := loadTestFile("./test-data/8-series-response-mql-multiple-point-descriptors.json")
require.NoError(t, err)
@@ -33,7 +34,7 @@ func TestTimeSeriesQuery(t *testing.T) {
To: fromStart.Add(34 * time.Minute),
},
}
err = query.parseResponse(res, data, "")
err = query.parseResponse(res, data, "", service.logger)
frames := res.Frames
assert.Equal(t, "grafana-prod asia-northeast1-c 6724404429462225363 200", frames[0].Fields[1].Name)
assert.Equal(t, 843302441.9, frames[0].Fields[1].At(0))
@@ -52,7 +53,7 @@ func TestTimeSeriesQuery(t *testing.T) {
To: fromStart.Add(34 * time.Minute),
},
}
err = query.parseResponse(res, data, "")
err = query.parseResponse(res, data, "", service.logger)
frames := res.Frames
assert.Equal(t, "test-proj - asia-northeast1-c - 6724404429462225363 - 200", frames[0].Fields[1].Name)
})
@@ -79,7 +80,7 @@ func TestTimeSeriesQuery(t *testing.T) {
To: fromStart.Add(34 * time.Minute),
},
}
err = query.parseResponse(res, data, "")
err = query.parseResponse(res, data, "", service.logger)
require.NoError(t, err)
frames := res.Frames
assert.Equal(t, 1, len(res.Frames))
@@ -103,7 +104,7 @@ func TestTimeSeriesQuery(t *testing.T) {
To: fromStart.Add(34 * time.Minute),
},
}
err = query.parseResponse(res, data, "")
err = query.parseResponse(res, data, "", service.logger)
require.NoError(t, err)
frames := res.Frames
custom, ok := frames[0].Meta.Custom.(map[string]any)
@@ -131,7 +132,7 @@ func TestTimeSeriesQuery(t *testing.T) {
To: fromStart.Add(34 * time.Minute),
},
}
err = query.parseResponse(res, data, "")
err = query.parseResponse(res, data, "", service.logger)
require.NoError(t, err)
frames := res.Frames
timeField := frames[0].Fields[0]

View File

@@ -9,19 +9,18 @@ import (
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/huandu/xstrings"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/tsdb/cloud-monitoring/kinds/dataquery"
)
type (
cloudMonitoringQueryExecutor interface {
run(ctx context.Context, req *backend.QueryDataRequest, s *Service, dsInfo datasourceInfo, tracer tracing.Tracer) (
run(ctx context.Context, req *backend.QueryDataRequest, s *Service, dsInfo datasourceInfo, logger log.Logger) (
*backend.DataResponse, any, string, error)
parseResponse(dr *backend.DataResponse, data any, executedQueryString string) error
parseResponse(dr *backend.DataResponse, data any, executedQueryString string, logger log.Logger) error
buildDeepLink() string
getRefID() string
getAliasBy() string
@@ -41,7 +40,6 @@ type (
cloudMonitoringTimeSeriesList struct {
refID string
aliasBy string
logger log.Logger
parameters *dataquery.TimeSeriesList
// Processed properties
params url.Values
@@ -50,7 +48,6 @@ type (
cloudMonitoringSLO struct {
refID string
aliasBy string
logger log.Logger
parameters *dataquery.SLOQuery
// Processed properties
params url.Values
@@ -59,8 +56,8 @@ type (
// cloudMonitoringProm is used to build a promQL queries
cloudMonitoringProm struct {
refID string
aliasBy string
logger log.Logger
aliasBy string
parameters *dataquery.PromQLQuery
timeRange backend.TimeRange
IntervalMS int64
@@ -69,8 +66,8 @@ type (
// cloudMonitoringTimeSeriesQuery is used to build MQL queries
cloudMonitoringTimeSeriesQuery struct {
refID string
aliasBy string
logger log.Logger
aliasBy string
parameters *dataquery.TimeSeriesQuery
// Processed properties
timeRange backend.TimeRange

View File

@@ -14,18 +14,17 @@ import (
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
"github.com/grafana/grafana-plugin-sdk-go/backend/tracing"
"github.com/grafana/grafana-plugin-sdk-go/data"
gcmTime "github.com/grafana/grafana/pkg/tsdb/cloud-monitoring/time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/tsdb/intervalv2"
)
func addInterval(period string, field *data.Field) error {
period = strings.TrimPrefix(period, "+")
p, err := intervalv2.ParseIntervalStringToTimeDuration(period)
p, err := gcmTime.ParseIntervalStringToTimeDuration(period)
if err != nil {
return err
}
@@ -48,7 +47,7 @@ func toString(v any) string {
return v.(string)
}
func createRequest(ctx context.Context, logger log.Logger, dsInfo *datasourceInfo, proxyPass string, body io.Reader) (*http.Request, error) {
func createRequest(ctx context.Context, dsInfo *datasourceInfo, proxyPass string, body io.Reader) (*http.Request, error) {
u, err := url.Parse(dsInfo.url)
if err != nil {
return nil, err
@@ -61,7 +60,7 @@ func createRequest(ctx context.Context, logger log.Logger, dsInfo *datasourceInf
}
req, err := http.NewRequestWithContext(ctx, method, dsInfo.services[cloudMonitor].url, body)
if err != nil {
logger.Error("Failed to create request", "error", err)
backend.Logger.Error("Failed to create request", "error", err)
return nil, fmt.Errorf("failed to create request: %w", err)
}
@@ -71,7 +70,7 @@ func createRequest(ctx context.Context, logger log.Logger, dsInfo *datasourceInf
return req, nil
}
func doRequestPage(ctx context.Context, logger log.Logger, r *http.Request, dsInfo datasourceInfo, params url.Values, body map[string]any) (cloudMonitoringResponse, error) {
func doRequestPage(ctx context.Context, r *http.Request, dsInfo datasourceInfo, params url.Values, body map[string]any, logger log.Logger) (cloudMonitoringResponse, error) {
if params != nil {
r.URL.RawQuery = params.Encode()
}
@@ -90,11 +89,11 @@ func doRequestPage(ctx context.Context, logger log.Logger, r *http.Request, dsIn
defer func() {
if err = res.Body.Close(); err != nil {
logger.Warn("Failed to close response body", "error", err)
backend.Logger.Warn("Failed to close response body", "error", err)
}
}()
dnext, err := unmarshalResponse(logger, res)
dnext, err := unmarshalResponse(res, logger)
if err != nil {
return cloudMonitoringResponse{}, err
}
@@ -102,8 +101,8 @@ func doRequestPage(ctx context.Context, logger log.Logger, r *http.Request, dsIn
return dnext, nil
}
func doRequestWithPagination(ctx context.Context, logger log.Logger, r *http.Request, dsInfo datasourceInfo, params url.Values, body map[string]any) (cloudMonitoringResponse, error) {
d, err := doRequestPage(ctx, logger, r, dsInfo, params, body)
func doRequestWithPagination(ctx context.Context, r *http.Request, dsInfo datasourceInfo, params url.Values, body map[string]any, logger log.Logger) (cloudMonitoringResponse, error) {
d, err := doRequestPage(ctx, r, dsInfo, params, body, logger)
if err != nil {
return cloudMonitoringResponse{}, err
}
@@ -114,7 +113,7 @@ func doRequestWithPagination(ctx context.Context, logger log.Logger, r *http.Req
if body != nil {
body["pageToken"] = d.NextPageToken
}
nextPage, err := doRequestPage(ctx, logger, r, dsInfo, params, body)
nextPage, err := doRequestPage(ctx, r, dsInfo, params, body, logger)
if err != nil {
return cloudMonitoringResponse{}, err
}
@@ -125,20 +124,20 @@ func doRequestWithPagination(ctx context.Context, logger log.Logger, r *http.Req
return d, nil
}
func traceReq(ctx context.Context, tracer tracing.Tracer, req *backend.QueryDataRequest, dsInfo datasourceInfo, r *http.Request, target string) trace.Span {
ctx, span := tracer.Start(ctx, "cloudMonitoring query", trace.WithAttributes(
func traceReq(ctx context.Context, req *backend.QueryDataRequest, dsInfo datasourceInfo, r *http.Request, target string) trace.Span {
_, span := tracing.DefaultTracer().Start(ctx, "cloudMonitoring query", trace.WithAttributes(
attribute.String("target", target),
attribute.String("from", req.Queries[0].TimeRange.From.String()),
attribute.String("until", req.Queries[0].TimeRange.To.String()),
attribute.Int64("datasource_id", dsInfo.id),
attribute.Int64("org_id", req.PluginContext.OrgID),
))
tracer.Inject(ctx, r.Header, span)
defer span.End()
return span
}
func runTimeSeriesRequest(ctx context.Context, logger log.Logger, req *backend.QueryDataRequest,
s *Service, dsInfo datasourceInfo, tracer tracing.Tracer, projectName string, params url.Values, body map[string]any) (*backend.DataResponse, cloudMonitoringResponse, string, error) {
func runTimeSeriesRequest(ctx context.Context, req *backend.QueryDataRequest,
s *Service, dsInfo datasourceInfo, projectName string, params url.Values, body map[string]any, logger log.Logger) (*backend.DataResponse, cloudMonitoringResponse, string, error) {
dr := &backend.DataResponse{}
projectName, err := s.ensureProject(ctx, dsInfo, projectName)
if err != nil {
@@ -149,16 +148,16 @@ func runTimeSeriesRequest(ctx context.Context, logger log.Logger, req *backend.Q
if body != nil {
timeSeriesMethod += ":query"
}
r, err := createRequest(ctx, logger, &dsInfo, path.Join("/v3/projects", projectName, timeSeriesMethod), nil)
r, err := createRequest(ctx, &dsInfo, path.Join("/v3/projects", projectName, timeSeriesMethod), nil)
if err != nil {
dr.Error = err
return dr, cloudMonitoringResponse{}, "", nil
}
span := traceReq(ctx, tracer, req, dsInfo, r, params.Encode())
span := traceReq(ctx, req, dsInfo, r, params.Encode())
defer span.End()
d, err := doRequestWithPagination(ctx, logger, r, dsInfo, params, body)
d, err := doRequestWithPagination(ctx, r, dsInfo, params, body, logger)
if err != nil {
dr.Error = err
return dr, cloudMonitoringResponse{}, "", nil