mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
CloudMonitoring: Migrate to use backend plugin SDK contracts (#38650)
* Use SDK contracts for cloudmonitoring * Get build running, tests passing and do some refactoring (#38754) * fix build+tests and refactor * remove alerting stuff * remove unused field * fix plugin fetch * end to end * resp rename * tidy annotations * reformatting * update refID * reformat imports * fix styling * clean up unmarshalling * uncomment + fix tests * appease linter * remove spaces * remove old cruft * add check for empty queries * update tests * remove pm as dep * adjust proxy route contract * fix service loading * use UNIX val * fix endpoint + resp * h@ckz for frontend * fix resp * fix interval * always set custom meta * remove unused param * fix labels fetch * fix linter * fix test + remove unused field * apply pr feedback * fix grafana-auto intervals * fix tests * resolve conflicts * fix bad merge * fix conflicts * remove bad logger import Co-authored-by: Will Browne <wbrowne@users.noreply.github.com> Co-authored-by: Will Browne <will.browne@grafana.com>
This commit is contained in:
parent
d13c799aa6
commit
e822c8a24d
@ -6,28 +6,28 @@ import (
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/models"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana/pkg/services/encryption"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/util"
|
||||
)
|
||||
|
||||
type DSInfo struct {
|
||||
ID int64
|
||||
Updated time.Time
|
||||
JSONData map[string]interface{}
|
||||
DecryptedSecureJSONData map[string]string
|
||||
}
|
||||
|
||||
// ApplyRoute should use the plugin route data to set auth headers and custom headers.
|
||||
func ApplyRoute(ctx context.Context, req *http.Request, proxyPath string, route *plugins.AppPluginRoute,
|
||||
ds *models.DataSource, cfg *setting.Cfg, encryptionService encryption.Service) {
|
||||
ds DSInfo, cfg *setting.Cfg) {
|
||||
proxyPath = strings.TrimPrefix(proxyPath, route.Path)
|
||||
|
||||
secureJsonData, err := encryptionService.DecryptJsonData(ctx, ds.SecureJsonData, setting.SecretKey)
|
||||
if err != nil {
|
||||
logger.Error("Error interpolating proxy url", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
data := templateData{
|
||||
JsonData: ds.JsonData.Interface().(map[string]interface{}),
|
||||
SecureJsonData: secureJsonData,
|
||||
JsonData: ds.JSONData,
|
||||
SecureJsonData: ds.DecryptedSecureJSONData,
|
||||
}
|
||||
|
||||
if len(route.URL) > 0 {
|
||||
@ -76,12 +76,12 @@ func ApplyRoute(ctx context.Context, req *http.Request, proxyPath string, route
|
||||
}
|
||||
}
|
||||
|
||||
func getTokenProvider(ctx context.Context, cfg *setting.Cfg, ds *models.DataSource, pluginRoute *plugins.AppPluginRoute,
|
||||
func getTokenProvider(ctx context.Context, cfg *setting.Cfg, ds DSInfo, pluginRoute *plugins.AppPluginRoute,
|
||||
data templateData) (accessTokenProvider, error) {
|
||||
authType := pluginRoute.AuthType
|
||||
|
||||
// Plugin can override authentication type specified in route configuration
|
||||
if authTypeOverride := ds.JsonData.Get("authenticationType").MustString(); authTypeOverride != "" {
|
||||
if authTypeOverride, ok := ds.JSONData["authenticationType"].(string); ok && authTypeOverride != "" {
|
||||
authType = authTypeOverride
|
||||
}
|
||||
|
||||
|
@ -238,16 +238,32 @@ func (proxy *DataSourceProxy) director(req *http.Request) {
|
||||
|
||||
req.Header.Set("User-Agent", fmt.Sprintf("Grafana/%s", setting.BuildVersion))
|
||||
|
||||
// Clear Origin and Referer to avoir CORS issues
|
||||
// Clear Origin and Referer to avoid CORS issues
|
||||
req.Header.Del("Origin")
|
||||
req.Header.Del("Referer")
|
||||
|
||||
jsonData := make(map[string]interface{})
|
||||
if proxy.ds.JsonData != nil {
|
||||
jsonData, err = proxy.ds.JsonData.Map()
|
||||
if err != nil {
|
||||
logger.Error("Failed to get json data as map", "jsonData", proxy.ds.JsonData, "error", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
secureJsonData, err := proxy.dataSourcesService.EncryptionService.DecryptJsonData(req.Context(), proxy.ds.SecureJsonData, setting.SecretKey)
|
||||
if err != nil {
|
||||
logger.Error("Error interpolating proxy url", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
if proxy.route != nil {
|
||||
ApplyRoute(
|
||||
proxy.ctx.Req.Context(), req, proxy.proxyPath,
|
||||
proxy.route, proxy.ds, proxy.cfg,
|
||||
proxy.dataSourcesService.EncryptionService,
|
||||
)
|
||||
ApplyRoute(proxy.ctx.Req.Context(), req, proxy.proxyPath, proxy.route, DSInfo{
|
||||
ID: proxy.ds.Id,
|
||||
Updated: proxy.ds.Updated,
|
||||
JSONData: jsonData,
|
||||
DecryptedSecureJSONData: secureJsonData,
|
||||
}, proxy.cfg)
|
||||
}
|
||||
|
||||
if proxy.oAuthTokenService.IsOAuthPassThruEnabled(proxy.ds) {
|
||||
|
@ -99,6 +99,17 @@ func TestDataSourceProxy_routeRule(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
jd, err := ds.JsonData.Map()
|
||||
require.NoError(t, err)
|
||||
dsInfo := DSInfo{
|
||||
ID: ds.Id,
|
||||
Updated: ds.Updated,
|
||||
JSONData: jd,
|
||||
DecryptedSecureJSONData: map[string]string{
|
||||
"key": "123",
|
||||
},
|
||||
}
|
||||
|
||||
setUp := func() (*models.ReqContext, *http.Request) {
|
||||
req, err := http.NewRequest("GET", "http://localhost/asd", nil)
|
||||
require.NoError(t, err)
|
||||
@ -117,7 +128,7 @@ func TestDataSourceProxy_routeRule(t *testing.T) {
|
||||
proxy, err := NewDataSourceProxy(ds, plugin, ctx, "api/v4/some/method", cfg, httpClientProvider, &oauthtoken.Service{}, dsService)
|
||||
require.NoError(t, err)
|
||||
proxy.route = plugin.Routes[0]
|
||||
ApplyRoute(proxy.ctx.Req.Context(), req, proxy.proxyPath, proxy.route, proxy.ds, cfg, ossencryption.ProvideService())
|
||||
ApplyRoute(proxy.ctx.Req.Context(), req, proxy.proxyPath, proxy.route, dsInfo, cfg)
|
||||
|
||||
assert.Equal(t, "https://www.google.com/some/method", req.URL.String())
|
||||
assert.Equal(t, "my secret 123", req.Header.Get("x-header"))
|
||||
@ -129,7 +140,7 @@ func TestDataSourceProxy_routeRule(t *testing.T) {
|
||||
proxy, err := NewDataSourceProxy(ds, plugin, ctx, "api/common/some/method", cfg, httpClientProvider, &oauthtoken.Service{}, dsService)
|
||||
require.NoError(t, err)
|
||||
proxy.route = plugin.Routes[3]
|
||||
ApplyRoute(proxy.ctx.Req.Context(), req, proxy.proxyPath, proxy.route, proxy.ds, cfg, ossencryption.ProvideService())
|
||||
ApplyRoute(proxy.ctx.Req.Context(), req, proxy.proxyPath, proxy.route, dsInfo, cfg)
|
||||
|
||||
assert.Equal(t, "https://dynamic.grafana.com/some/method?apiKey=123", req.URL.String())
|
||||
assert.Equal(t, "my secret 123", req.Header.Get("x-header"))
|
||||
@ -141,7 +152,7 @@ func TestDataSourceProxy_routeRule(t *testing.T) {
|
||||
proxy, err := NewDataSourceProxy(ds, plugin, ctx, "", cfg, httpClientProvider, &oauthtoken.Service{}, dsService)
|
||||
require.NoError(t, err)
|
||||
proxy.route = plugin.Routes[4]
|
||||
ApplyRoute(proxy.ctx.Req.Context(), req, proxy.proxyPath, proxy.route, proxy.ds, cfg, ossencryption.ProvideService())
|
||||
ApplyRoute(proxy.ctx.Req.Context(), req, proxy.proxyPath, proxy.route, dsInfo, cfg)
|
||||
|
||||
assert.Equal(t, "http://localhost/asd", req.URL.String())
|
||||
})
|
||||
@ -152,7 +163,7 @@ func TestDataSourceProxy_routeRule(t *testing.T) {
|
||||
proxy, err := NewDataSourceProxy(ds, plugin, ctx, "api/body", cfg, httpClientProvider, &oauthtoken.Service{}, dsService)
|
||||
require.NoError(t, err)
|
||||
proxy.route = plugin.Routes[5]
|
||||
ApplyRoute(proxy.ctx.Req.Context(), req, proxy.proxyPath, proxy.route, proxy.ds, cfg, ossencryption.ProvideService())
|
||||
ApplyRoute(proxy.ctx.Req.Context(), req, proxy.proxyPath, proxy.route, dsInfo, cfg)
|
||||
|
||||
content, err := ioutil.ReadAll(req.Body)
|
||||
require.NoError(t, err)
|
||||
@ -262,10 +273,21 @@ func TestDataSourceProxy_routeRule(t *testing.T) {
|
||||
|
||||
cfg := &setting.Cfg{}
|
||||
|
||||
jd, err := ds.JsonData.Map()
|
||||
require.NoError(t, err)
|
||||
dsInfo := DSInfo{
|
||||
ID: ds.Id,
|
||||
Updated: ds.Updated,
|
||||
JSONData: jd,
|
||||
DecryptedSecureJSONData: map[string]string{
|
||||
"clientSecret": "123",
|
||||
},
|
||||
}
|
||||
|
||||
dsService := datasources.ProvideService(bus.New(), nil, ossencryption.ProvideService())
|
||||
proxy, err := NewDataSourceProxy(ds, plugin, ctx, "pathwithtoken1", cfg, httpClientProvider, &oauthtoken.Service{}, dsService)
|
||||
require.NoError(t, err)
|
||||
ApplyRoute(proxy.ctx.Req.Context(), req, proxy.proxyPath, plugin.Routes[0], proxy.ds, cfg, ossencryption.ProvideService())
|
||||
ApplyRoute(proxy.ctx.Req.Context(), req, proxy.proxyPath, plugin.Routes[0], dsInfo, cfg)
|
||||
|
||||
authorizationHeaderCall1 = req.Header.Get("Authorization")
|
||||
assert.Equal(t, "https://api.nr1.io/some/path", req.URL.String())
|
||||
@ -281,7 +303,7 @@ func TestDataSourceProxy_routeRule(t *testing.T) {
|
||||
dsService := datasources.ProvideService(bus.New(), nil, ossencryption.ProvideService())
|
||||
proxy, err := NewDataSourceProxy(ds, plugin, ctx, "pathwithtoken2", cfg, httpClientProvider, &oauthtoken.Service{}, dsService)
|
||||
require.NoError(t, err)
|
||||
ApplyRoute(proxy.ctx.Req.Context(), req, proxy.proxyPath, plugin.Routes[1], proxy.ds, cfg, ossencryption.ProvideService())
|
||||
ApplyRoute(proxy.ctx.Req.Context(), req, proxy.proxyPath, plugin.Routes[1], dsInfo, cfg)
|
||||
|
||||
authorizationHeaderCall2 = req.Header.Get("Authorization")
|
||||
|
||||
@ -298,7 +320,7 @@ func TestDataSourceProxy_routeRule(t *testing.T) {
|
||||
dsService := datasources.ProvideService(bus.New(), nil, ossencryption.ProvideService())
|
||||
proxy, err := NewDataSourceProxy(ds, plugin, ctx, "pathwithtoken1", cfg, httpClientProvider, &oauthtoken.Service{}, dsService)
|
||||
require.NoError(t, err)
|
||||
ApplyRoute(proxy.ctx.Req.Context(), req, proxy.proxyPath, plugin.Routes[0], proxy.ds, cfg, ossencryption.ProvideService())
|
||||
ApplyRoute(proxy.ctx.Req.Context(), req, proxy.proxyPath, plugin.Routes[0], dsInfo, cfg)
|
||||
|
||||
authorizationHeaderCall3 := req.Header.Get("Authorization")
|
||||
assert.Equal(t, "https://api.nr1.io/some/path", req.URL.String())
|
||||
|
@ -2,25 +2,25 @@ package pluginproxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/models"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"golang.org/x/oauth2/google"
|
||||
)
|
||||
|
||||
type gceAccessTokenProvider struct {
|
||||
datasourceId int64
|
||||
datasourceVersion int
|
||||
datasourceUpdated time.Time
|
||||
ctx context.Context
|
||||
route *plugins.AppPluginRoute
|
||||
authParams *plugins.JwtTokenAuth
|
||||
}
|
||||
|
||||
func newGceAccessTokenProvider(ctx context.Context, ds *models.DataSource, pluginRoute *plugins.AppPluginRoute,
|
||||
func newGceAccessTokenProvider(ctx context.Context, ds DSInfo, pluginRoute *plugins.AppPluginRoute,
|
||||
authParams *plugins.JwtTokenAuth) *gceAccessTokenProvider {
|
||||
return &gceAccessTokenProvider{
|
||||
datasourceId: ds.Id,
|
||||
datasourceVersion: ds.Version,
|
||||
datasourceId: ds.ID,
|
||||
datasourceUpdated: ds.Updated,
|
||||
ctx: ctx,
|
||||
route: pluginRoute,
|
||||
authParams: authParams,
|
||||
|
@ -10,7 +10,6 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/models"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
)
|
||||
|
||||
@ -27,7 +26,7 @@ type tokenCacheType struct {
|
||||
|
||||
type genericAccessTokenProvider struct {
|
||||
datasourceId int64
|
||||
datasourceVersion int
|
||||
datasourceUpdated time.Time
|
||||
route *plugins.AppPluginRoute
|
||||
authParams *plugins.JwtTokenAuth
|
||||
}
|
||||
@ -68,11 +67,11 @@ func (token *jwtToken) UnmarshalJSON(b []byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func newGenericAccessTokenProvider(ds *models.DataSource, pluginRoute *plugins.AppPluginRoute,
|
||||
func newGenericAccessTokenProvider(ds DSInfo, pluginRoute *plugins.AppPluginRoute,
|
||||
authParams *plugins.JwtTokenAuth) *genericAccessTokenProvider {
|
||||
return &genericAccessTokenProvider{
|
||||
datasourceId: ds.Id,
|
||||
datasourceVersion: ds.Version,
|
||||
datasourceId: ds.ID,
|
||||
datasourceUpdated: ds.Updated,
|
||||
route: pluginRoute,
|
||||
authParams: authParams,
|
||||
}
|
||||
@ -124,5 +123,5 @@ func (provider *genericAccessTokenProvider) GetAccessToken() (string, error) {
|
||||
}
|
||||
|
||||
func (provider *genericAccessTokenProvider) getAccessTokenCacheKey() string {
|
||||
return fmt.Sprintf("%v_%v_%v_%v", provider.datasourceId, provider.datasourceVersion, provider.route.Path, provider.route.Method)
|
||||
return fmt.Sprintf("%v_%v_%v_%v", provider.datasourceId, provider.datasourceUpdated.Unix(), provider.route.Path, provider.route.Method)
|
||||
}
|
||||
|
@ -6,7 +6,6 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/models"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"golang.org/x/oauth2"
|
||||
"golang.org/x/oauth2/jwt"
|
||||
@ -25,17 +24,17 @@ type oauthJwtTokenCacheType struct {
|
||||
|
||||
type jwtAccessTokenProvider struct {
|
||||
datasourceId int64
|
||||
datasourceVersion int
|
||||
datasourceUpdated time.Time
|
||||
ctx context.Context
|
||||
route *plugins.AppPluginRoute
|
||||
authParams *plugins.JwtTokenAuth
|
||||
}
|
||||
|
||||
func newJwtAccessTokenProvider(ctx context.Context, ds *models.DataSource, pluginRoute *plugins.AppPluginRoute,
|
||||
func newJwtAccessTokenProvider(ctx context.Context, ds DSInfo, pluginRoute *plugins.AppPluginRoute,
|
||||
authParams *plugins.JwtTokenAuth) *jwtAccessTokenProvider {
|
||||
return &jwtAccessTokenProvider{
|
||||
datasourceId: ds.Id,
|
||||
datasourceVersion: ds.Version,
|
||||
datasourceId: ds.ID,
|
||||
datasourceUpdated: ds.Updated,
|
||||
ctx: ctx,
|
||||
route: pluginRoute,
|
||||
authParams: authParams,
|
||||
@ -93,5 +92,5 @@ var getTokenSource = func(conf *jwt.Config, ctx context.Context) (*oauth2.Token,
|
||||
}
|
||||
|
||||
func (provider *jwtAccessTokenProvider) getAccessTokenCacheKey() string {
|
||||
return fmt.Sprintf("%v_%v_%v_%v", provider.datasourceId, provider.datasourceVersion, provider.route.Path, provider.route.Method)
|
||||
return fmt.Sprintf("%v_%v_%v_%v", provider.datasourceId, provider.datasourceUpdated.Unix(), provider.route.Path, provider.route.Method)
|
||||
}
|
||||
|
@ -12,7 +12,6 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/grafana/grafana/pkg/models"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"golang.org/x/oauth2"
|
||||
"golang.org/x/oauth2/jwt"
|
||||
@ -63,7 +62,7 @@ func TestAccessToken_pluginWithJWTTokenAuthRoute(t *testing.T) {
|
||||
getTokenSource = fn
|
||||
}
|
||||
|
||||
ds := &models.DataSource{Id: 1, Version: 2}
|
||||
ds := DSInfo{ID: 1, Updated: time.Now()}
|
||||
|
||||
t.Run("should fetch token using JWT private key", func(t *testing.T) {
|
||||
setUp(t, func(conf *jwt.Config, ctx context.Context) (*oauth2.Token, error) {
|
||||
@ -170,7 +169,7 @@ func TestAccessToken_pluginWithTokenAuthRoute(t *testing.T) {
|
||||
|
||||
mockTimeNow(time.Now())
|
||||
defer resetTimeNow()
|
||||
provider := newGenericAccessTokenProvider(&models.DataSource{}, pluginRoute, authParams)
|
||||
provider := newGenericAccessTokenProvider(DSInfo{}, pluginRoute, authParams)
|
||||
|
||||
testCases := []tokenTestDescription{
|
||||
{
|
||||
@ -252,7 +251,7 @@ func TestAccessToken_pluginWithTokenAuthRoute(t *testing.T) {
|
||||
|
||||
mockTimeNow(time.Now())
|
||||
defer resetTimeNow()
|
||||
provider := newGenericAccessTokenProvider(&models.DataSource{}, pluginRoute, authParams)
|
||||
provider := newGenericAccessTokenProvider(DSInfo{}, pluginRoute, authParams)
|
||||
|
||||
token = map[string]interface{}{
|
||||
"access_token": "2YotnFZFEjr1zCsicMWpAA",
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/rendering"
|
||||
"github.com/grafana/grafana/pkg/services/secrets"
|
||||
"github.com/grafana/grafana/pkg/tsdb/azuremonitor"
|
||||
"github.com/grafana/grafana/pkg/tsdb/cloudmonitoring"
|
||||
"github.com/grafana/grafana/pkg/tsdb/cloudwatch"
|
||||
"github.com/grafana/grafana/pkg/tsdb/elasticsearch"
|
||||
"github.com/grafana/grafana/pkg/tsdb/grafanads"
|
||||
@ -49,7 +50,7 @@ func ProvideBackgroundServiceRegistry(
|
||||
_ *azuremonitor.Service, _ *cloudwatch.CloudWatchService, _ *elasticsearch.Service, _ *graphite.Service,
|
||||
_ *influxdb.Service, _ *loki.Service, _ *opentsdb.Service, _ *prometheus.Service, _ *tempo.Service,
|
||||
_ *testdatasource.TestDataPlugin, _ *plugindashboards.Service, _ *dashboardsnapshots.Service, _ secrets.SecretsService,
|
||||
_ *postgres.Service, _ *mysql.Service, _ *mssql.Service, _ *grafanads.Service,
|
||||
_ *postgres.Service, _ *mysql.Service, _ *mssql.Service, _ *grafanads.Service, _ *cloudmonitoring.Service,
|
||||
_ *pluginsettings.Service, _ *alerting.AlertNotificationService,
|
||||
) *BackgroundServiceRegistry {
|
||||
return NewBackgroundServiceRegistry(
|
||||
|
@ -2,63 +2,65 @@ package cloudmonitoring
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"strings"
|
||||
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
)
|
||||
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
func (e *Executor) executeAnnotationQuery(ctx context.Context, tsdbQuery plugins.DataQuery) (
|
||||
plugins.DataResponse, error) {
|
||||
result := plugins.DataResponse{
|
||||
Results: make(map[string]plugins.DataQueryResult),
|
||||
}
|
||||
func (s *Service) executeAnnotationQuery(ctx context.Context, req *backend.QueryDataRequest, dsInfo datasourceInfo) (
|
||||
*backend.QueryDataResponse, error) {
|
||||
resp := backend.NewQueryDataResponse()
|
||||
|
||||
firstQuery := tsdbQuery.Queries[0]
|
||||
|
||||
queries, err := e.buildQueryExecutors(tsdbQuery)
|
||||
queries, err := s.buildQueryExecutors(req)
|
||||
if err != nil {
|
||||
return plugins.DataResponse{}, err
|
||||
return resp, err
|
||||
}
|
||||
|
||||
queryRes, resp, _, err := queries[0].run(ctx, tsdbQuery, e)
|
||||
queryRes, dr, _, err := queries[0].run(ctx, req, s, dsInfo)
|
||||
if err != nil {
|
||||
return plugins.DataResponse{}, err
|
||||
return resp, err
|
||||
}
|
||||
|
||||
metricQuery := firstQuery.Model.Get("metricQuery")
|
||||
title := metricQuery.Get("title").MustString()
|
||||
text := metricQuery.Get("text").MustString()
|
||||
tags := metricQuery.Get("tags").MustString()
|
||||
mq := struct {
|
||||
Title string `json:"title"`
|
||||
Text string `json:"text"`
|
||||
Tags string `json:"tags"`
|
||||
}{}
|
||||
|
||||
err = queries[0].parseToAnnotations(&queryRes, resp, title, text, tags)
|
||||
result.Results[firstQuery.RefID] = queryRes
|
||||
firstQuery := req.Queries[0]
|
||||
err = json.Unmarshal(firstQuery.JSON, &mq)
|
||||
if err != nil {
|
||||
return resp, nil
|
||||
}
|
||||
err = queries[0].parseToAnnotations(queryRes, dr, mq.Title, mq.Text, mq.Tags)
|
||||
resp.Responses[firstQuery.RefID] = *queryRes
|
||||
|
||||
return result, err
|
||||
return resp, err
|
||||
}
|
||||
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
func transformAnnotationToTable(data []map[string]string, result *plugins.DataQueryResult) {
|
||||
table := plugins.DataTable{
|
||||
Columns: make([]plugins.DataTableColumn, 4),
|
||||
Rows: make([]plugins.DataRowValues, 0),
|
||||
func (timeSeriesQuery cloudMonitoringTimeSeriesQuery) transformAnnotationToFrame(annotations []map[string]string, result *backend.DataResponse) {
|
||||
frames := data.Frames{}
|
||||
for _, a := range annotations {
|
||||
frame := &data.Frame{
|
||||
RefID: timeSeriesQuery.getRefID(),
|
||||
Fields: []*data.Field{
|
||||
data.NewField("time", nil, a["time"]),
|
||||
data.NewField("title", nil, a["title"]),
|
||||
data.NewField("tags", nil, a["tags"]),
|
||||
data.NewField("text", nil, a["text"]),
|
||||
},
|
||||
Meta: &data.FrameMeta{
|
||||
Custom: map[string]interface{}{
|
||||
"rowCount": len(a),
|
||||
},
|
||||
},
|
||||
}
|
||||
frames = append(frames, frame)
|
||||
}
|
||||
table.Columns[0].Text = "time"
|
||||
table.Columns[1].Text = "title"
|
||||
table.Columns[2].Text = "tags"
|
||||
table.Columns[3].Text = "text"
|
||||
|
||||
for _, r := range data {
|
||||
values := make([]interface{}, 4)
|
||||
values[0] = r["time"]
|
||||
values[1] = r["title"]
|
||||
values[2] = r["tags"]
|
||||
values[3] = r["text"]
|
||||
table.Rows = append(table.Rows, values)
|
||||
}
|
||||
result.Tables = append(result.Tables, table)
|
||||
result.Meta.Set("rowCount", len(data))
|
||||
slog.Info("anno", "len", len(data))
|
||||
result.Frames = frames
|
||||
slog.Info("anno", "len", len(annotations))
|
||||
}
|
||||
|
||||
func formatAnnotationText(annotationText string, pointValue string, metricType string, metricLabels map[string]string, resourceLabels map[string]string) string {
|
||||
|
@ -3,8 +3,7 @@ package cloudmonitoring
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
@ -14,25 +13,21 @@ func TestExecutor_parseToAnnotations(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.Len(t, d.TimeSeries, 3)
|
||||
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
res := &plugins.DataQueryResult{Meta: simplejson.New(), RefID: "annotationQuery"}
|
||||
res := &backend.DataResponse{}
|
||||
query := &cloudMonitoringTimeSeriesFilter{}
|
||||
|
||||
err = query.parseToAnnotations(res, d, "atitle {{metric.label.instance_name}} {{metric.value}}",
|
||||
"atext {{resource.label.zone}}", "atag")
|
||||
require.NoError(t, err)
|
||||
|
||||
decoded, err := res.Dataframes.Decoded()
|
||||
require.NoError(t, err)
|
||||
require.Len(t, decoded, 3)
|
||||
assert.Equal(t, "title", decoded[0].Fields[1].Name)
|
||||
assert.Equal(t, "tags", decoded[0].Fields[2].Name)
|
||||
assert.Equal(t, "text", decoded[0].Fields[3].Name)
|
||||
require.Len(t, res.Frames, 3)
|
||||
assert.Equal(t, "title", res.Frames[0].Fields[1].Name)
|
||||
assert.Equal(t, "tags", res.Frames[0].Fields[2].Name)
|
||||
assert.Equal(t, "text", res.Frames[0].Fields[3].Name)
|
||||
}
|
||||
|
||||
func TestCloudMonitoringExecutor_parseToAnnotations_emptyTimeSeries(t *testing.T) {
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
res := &plugins.DataQueryResult{Meta: simplejson.New(), RefID: "annotationQuery"}
|
||||
res := &backend.DataResponse{}
|
||||
query := &cloudMonitoringTimeSeriesFilter{}
|
||||
|
||||
response := cloudMonitoringResponse{
|
||||
@ -42,14 +37,11 @@ func TestCloudMonitoringExecutor_parseToAnnotations_emptyTimeSeries(t *testing.T
|
||||
err := query.parseToAnnotations(res, response, "atitle", "atext", "atag")
|
||||
require.NoError(t, err)
|
||||
|
||||
decoded, err := res.Dataframes.Decoded()
|
||||
require.NoError(t, err)
|
||||
require.Len(t, decoded, 0)
|
||||
require.Len(t, res.Frames, 0)
|
||||
}
|
||||
|
||||
func TestCloudMonitoringExecutor_parseToAnnotations_noPointsInSeries(t *testing.T) {
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
res := &plugins.DataQueryResult{Meta: simplejson.New(), RefID: "annotationQuery"}
|
||||
res := &backend.DataResponse{}
|
||||
query := &cloudMonitoringTimeSeriesFilter{}
|
||||
|
||||
response := cloudMonitoringResponse{
|
||||
@ -61,6 +53,5 @@ func TestCloudMonitoringExecutor_parseToAnnotations_noPointsInSeries(t *testing.
|
||||
err := query.parseToAnnotations(res, response, "atitle", "atext", "atag")
|
||||
require.NoError(t, err)
|
||||
|
||||
decoded, _ := res.Dataframes.Decoded()
|
||||
require.Len(t, decoded, 0)
|
||||
require.Len(t, res.Frames, 0)
|
||||
}
|
||||
|
@ -16,18 +16,22 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/services/encryption"
|
||||
"golang.org/x/oauth2/google"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
|
||||
"github.com/grafana/grafana/pkg/api/pluginproxy"
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
"github.com/grafana/grafana/pkg/infra/httpclient"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/models"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana/pkg/plugins/backendplugin"
|
||||
"github.com/grafana/grafana/pkg/plugins/backendplugin/coreplugin"
|
||||
"github.com/grafana/grafana/pkg/services/datasources"
|
||||
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"golang.org/x/oauth2/google"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -68,150 +72,213 @@ const (
|
||||
perSeriesAlignerDefault string = "ALIGN_MEAN"
|
||||
)
|
||||
|
||||
func ProvideService(cfg *setting.Cfg, pluginManager plugins.Manager, httpClientProvider httpclient.Provider,
|
||||
dsService *datasources.Service) *Service {
|
||||
return &Service{
|
||||
PluginManager: pluginManager,
|
||||
HTTPClientProvider: httpClientProvider,
|
||||
Cfg: cfg,
|
||||
dsService: dsService,
|
||||
func ProvideService(cfg *setting.Cfg, httpClientProvider httpclient.Provider, pluginManager plugins.Manager,
|
||||
backendPluginManager backendplugin.Manager, dsService *datasources.Service) *Service {
|
||||
s := &Service{
|
||||
pluginManager: pluginManager,
|
||||
backendPluginManager: backendPluginManager,
|
||||
httpClientProvider: httpClientProvider,
|
||||
cfg: cfg,
|
||||
im: datasource.NewInstanceManager(newInstanceSettings(httpClientProvider)),
|
||||
dsService: dsService,
|
||||
}
|
||||
|
||||
factory := coreplugin.New(backend.ServeOpts{
|
||||
QueryDataHandler: s,
|
||||
})
|
||||
|
||||
if err := s.backendPluginManager.Register("stackdriver", factory); err != nil {
|
||||
slog.Error("Failed to register plugin", "error", err)
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
type Service struct {
|
||||
PluginManager plugins.Manager
|
||||
HTTPClientProvider httpclient.Provider
|
||||
Cfg *setting.Cfg
|
||||
dsService *datasources.Service
|
||||
pluginManager plugins.Manager
|
||||
backendPluginManager backendplugin.Manager
|
||||
httpClientProvider httpclient.Provider
|
||||
cfg *setting.Cfg
|
||||
im instancemgmt.InstanceManager
|
||||
dsService *datasources.Service
|
||||
}
|
||||
|
||||
// Executor executes queries for the CloudMonitoring datasource.
|
||||
type Executor struct {
|
||||
httpClient *http.Client
|
||||
dsInfo *models.DataSource
|
||||
pluginManager plugins.Manager
|
||||
encryptionService encryption.Service
|
||||
cfg *setting.Cfg
|
||||
type QueryModel struct {
|
||||
Type string `json:"type"`
|
||||
}
|
||||
|
||||
// NewExecutor returns an Executor.
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
func (s *Service) NewExecutor(dsInfo *models.DataSource) (plugins.DataPlugin, error) {
|
||||
httpClient, err := s.dsService.GetHTTPClient(dsInfo, s.HTTPClientProvider)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
type datasourceInfo struct {
|
||||
id int64
|
||||
updated time.Time
|
||||
url string
|
||||
authenticationType string
|
||||
defaultProject string
|
||||
client *http.Client
|
||||
|
||||
jsonData map[string]interface{}
|
||||
decryptedSecureJSONData map[string]string
|
||||
}
|
||||
|
||||
func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.InstanceFactoryFunc {
|
||||
return func(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
|
||||
var jsonData map[string]interface{}
|
||||
err := json.Unmarshal(settings.JSONData, &jsonData)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error reading settings: %w", err)
|
||||
}
|
||||
|
||||
opts, err := settings.HTTPClientOptions()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
client, err := httpClientProvider.New(opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
authType := jwtAuthentication
|
||||
if authTypeOverride, ok := jsonData["authenticationType"].(string); ok && authTypeOverride != "" {
|
||||
authType = authTypeOverride
|
||||
}
|
||||
|
||||
var defaultProject string
|
||||
if jsonData["defaultProject"] != nil {
|
||||
defaultProject = jsonData["defaultProject"].(string)
|
||||
}
|
||||
|
||||
return &datasourceInfo{
|
||||
id: settings.ID,
|
||||
updated: settings.Updated,
|
||||
url: settings.URL,
|
||||
authenticationType: authType,
|
||||
defaultProject: defaultProject,
|
||||
client: client,
|
||||
jsonData: jsonData,
|
||||
decryptedSecureJSONData: settings.DecryptedSecureJSONData,
|
||||
}, nil
|
||||
}
|
||||
|
||||
return &Executor{
|
||||
httpClient: httpClient,
|
||||
dsInfo: dsInfo,
|
||||
pluginManager: s.PluginManager,
|
||||
encryptionService: s.dsService.EncryptionService,
|
||||
cfg: s.Cfg,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Query takes in the frontend queries, parses them into the CloudMonitoring query format
|
||||
// executes the queries against the CloudMonitoring API and parses the response into
|
||||
// the time series or table format
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
func (e *Executor) DataQuery(ctx context.Context, dsInfo *models.DataSource, tsdbQuery plugins.DataQuery) (
|
||||
plugins.DataResponse, error) {
|
||||
var result plugins.DataResponse
|
||||
var err error
|
||||
queryType := tsdbQuery.Queries[0].Model.Get("type").MustString("")
|
||||
// the data frames
|
||||
func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
||||
resp := backend.NewQueryDataResponse()
|
||||
if len(req.Queries) == 0 {
|
||||
return resp, fmt.Errorf("query contains no queries")
|
||||
}
|
||||
|
||||
switch queryType {
|
||||
model := &QueryModel{}
|
||||
err := json.Unmarshal(req.Queries[0].JSON, model)
|
||||
if err != nil {
|
||||
return resp, err
|
||||
}
|
||||
|
||||
dsInfo, err := s.getDSInfo(req.PluginContext)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch model.Type {
|
||||
case "annotationQuery":
|
||||
result, err = e.executeAnnotationQuery(ctx, tsdbQuery)
|
||||
resp, err = s.executeAnnotationQuery(ctx, req, *dsInfo)
|
||||
case "getGCEDefaultProject":
|
||||
result, err = e.getGCEDefaultProject(ctx, tsdbQuery)
|
||||
resp, err = s.getGCEDefaultProject(ctx, req, *dsInfo)
|
||||
case "timeSeriesQuery":
|
||||
fallthrough
|
||||
default:
|
||||
result, err = e.executeTimeSeriesQuery(ctx, tsdbQuery)
|
||||
resp, err = s.executeTimeSeriesQuery(ctx, req, *dsInfo)
|
||||
}
|
||||
|
||||
return result, err
|
||||
return resp, err
|
||||
}
|
||||
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
func (e *Executor) getGCEDefaultProject(ctx context.Context, tsdbQuery plugins.DataQuery) (plugins.DataResponse, error) {
|
||||
result := plugins.DataResponse{
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
Results: make(map[string]plugins.DataQueryResult),
|
||||
}
|
||||
refID := tsdbQuery.Queries[0].RefID
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
queryResult := plugins.DataQueryResult{Meta: simplejson.New(), RefID: refID}
|
||||
|
||||
gceDefaultProject, err := e.getDefaultProject(ctx)
|
||||
func (s *Service) getGCEDefaultProject(ctx context.Context, req *backend.QueryDataRequest, dsInfo datasourceInfo) (*backend.QueryDataResponse, error) {
|
||||
gceDefaultProject, err := s.getDefaultProject(ctx, dsInfo)
|
||||
if err != nil {
|
||||
return plugins.DataResponse{}, fmt.Errorf(
|
||||
return backend.NewQueryDataResponse(), fmt.Errorf(
|
||||
"failed to retrieve default project from GCE metadata server, error: %w", err)
|
||||
}
|
||||
|
||||
queryResult.Meta.Set("defaultProject", gceDefaultProject)
|
||||
result.Results[refID] = queryResult
|
||||
|
||||
return result, nil
|
||||
return &backend.QueryDataResponse{
|
||||
Responses: backend.Responses{
|
||||
req.Queries[0].RefID: {
|
||||
Frames: data.Frames{data.NewFrame("").SetMeta(&data.FrameMeta{
|
||||
Custom: map[string]interface{}{
|
||||
"defaultProject": gceDefaultProject,
|
||||
},
|
||||
})},
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
func (e *Executor) executeTimeSeriesQuery(ctx context.Context, tsdbQuery plugins.DataQuery) (
|
||||
plugins.DataResponse, error) {
|
||||
result := plugins.DataResponse{
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
Results: make(map[string]plugins.DataQueryResult),
|
||||
}
|
||||
|
||||
queryExecutors, err := e.buildQueryExecutors(tsdbQuery)
|
||||
func (s *Service) executeTimeSeriesQuery(ctx context.Context, req *backend.QueryDataRequest, dsInfo datasourceInfo) (
|
||||
*backend.QueryDataResponse, error) {
|
||||
resp := backend.NewQueryDataResponse()
|
||||
queryExecutors, err := s.buildQueryExecutors(req)
|
||||
if err != nil {
|
||||
return plugins.DataResponse{}, err
|
||||
return resp, err
|
||||
}
|
||||
|
||||
for _, queryExecutor := range queryExecutors {
|
||||
queryRes, resp, executedQueryString, err := queryExecutor.run(ctx, tsdbQuery, e)
|
||||
queryRes, dr, executedQueryString, err := queryExecutor.run(ctx, req, s, dsInfo)
|
||||
if err != nil {
|
||||
return plugins.DataResponse{}, err
|
||||
return resp, err
|
||||
}
|
||||
err = queryExecutor.parseResponse(&queryRes, resp, executedQueryString)
|
||||
err = queryExecutor.parseResponse(queryRes, dr, executedQueryString)
|
||||
if err != nil {
|
||||
queryRes.Error = err
|
||||
}
|
||||
|
||||
result.Results[queryExecutor.getRefID()] = queryRes
|
||||
resp.Responses[queryExecutor.getRefID()] = *queryRes
|
||||
}
|
||||
|
||||
return result, nil
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (e *Executor) buildQueryExecutors(tsdbQuery plugins.DataQuery) ([]cloudMonitoringQueryExecutor, error) {
|
||||
cloudMonitoringQueryExecutors := []cloudMonitoringQueryExecutor{}
|
||||
|
||||
startTime, err := tsdbQuery.TimeRange.ParseFrom()
|
||||
func queryModel(query backend.DataQuery) (grafanaQuery, error) {
|
||||
var rawQuery map[string]interface{}
|
||||
err := json.Unmarshal(query.JSON, &rawQuery)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return grafanaQuery{}, err
|
||||
}
|
||||
|
||||
endTime, err := tsdbQuery.TimeRange.ParseTo()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if rawQuery["metricQuery"] == nil {
|
||||
// migrate legacy query
|
||||
var mq metricQuery
|
||||
err = json.Unmarshal(query.JSON, &mq)
|
||||
if err != nil {
|
||||
return grafanaQuery{}, err
|
||||
}
|
||||
|
||||
return grafanaQuery{
|
||||
QueryType: metricQueryType,
|
||||
MetricQuery: mq,
|
||||
}, nil
|
||||
}
|
||||
|
||||
var q grafanaQuery
|
||||
err = json.Unmarshal(query.JSON, &q)
|
||||
if err != nil {
|
||||
return grafanaQuery{}, err
|
||||
}
|
||||
|
||||
return q, nil
|
||||
}
|
||||
|
||||
func (s *Service) buildQueryExecutors(req *backend.QueryDataRequest) ([]cloudMonitoringQueryExecutor, error) {
|
||||
var cloudMonitoringQueryExecutors []cloudMonitoringQueryExecutor
|
||||
startTime := req.Queries[0].TimeRange.From
|
||||
endTime := req.Queries[0].TimeRange.To
|
||||
durationSeconds := int(endTime.Sub(startTime).Seconds())
|
||||
|
||||
for i := range tsdbQuery.Queries {
|
||||
migrateLegacyQueryModel(&tsdbQuery.Queries[i])
|
||||
query := tsdbQuery.Queries[i]
|
||||
q := grafanaQuery{}
|
||||
model, err := query.Model.MarshalJSON()
|
||||
for _, query := range req.Queries {
|
||||
q, err := queryModel(query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := json.Unmarshal(model, &q); err != nil {
|
||||
return nil, fmt.Errorf("could not unmarshal CloudMonitoringQuery json: %w", err)
|
||||
}
|
||||
|
||||
q.MetricQuery.PreprocessorType = toPreprocessorType(q.MetricQuery.Preprocessor)
|
||||
var target string
|
||||
params := url.Values{}
|
||||
@ -230,9 +297,9 @@ func (e *Executor) buildQueryExecutors(tsdbQuery plugins.DataQuery) ([]cloudMoni
|
||||
RefID: query.RefID,
|
||||
ProjectName: q.MetricQuery.ProjectName,
|
||||
Query: q.MetricQuery.Query,
|
||||
IntervalMS: query.IntervalMS,
|
||||
IntervalMS: query.Interval.Milliseconds(),
|
||||
AliasBy: q.MetricQuery.AliasBy,
|
||||
timeRange: *tsdbQuery.TimeRange,
|
||||
timeRange: req.Queries[0].TimeRange,
|
||||
}
|
||||
} else {
|
||||
cmtsf.AliasBy = q.MetricQuery.AliasBy
|
||||
@ -243,7 +310,7 @@ func (e *Executor) buildQueryExecutors(tsdbQuery plugins.DataQuery) ([]cloudMoni
|
||||
}
|
||||
params.Add("filter", buildFilterString(q.MetricQuery.MetricType, q.MetricQuery.Filters))
|
||||
params.Add("view", q.MetricQuery.View)
|
||||
setMetricAggParams(¶ms, &q.MetricQuery, durationSeconds, query.IntervalMS)
|
||||
setMetricAggParams(¶ms, &q.MetricQuery, durationSeconds, query.Interval.Milliseconds())
|
||||
queryInterface = cmtsf
|
||||
}
|
||||
case sloQueryType:
|
||||
@ -253,7 +320,7 @@ func (e *Executor) buildQueryExecutors(tsdbQuery plugins.DataQuery) ([]cloudMoni
|
||||
cmtsf.Service = q.SloQuery.ServiceId
|
||||
cmtsf.Slo = q.SloQuery.SloId
|
||||
params.Add("filter", buildSLOFilterExpression(q.SloQuery))
|
||||
setSloAggParams(¶ms, &q.SloQuery, durationSeconds, query.IntervalMS)
|
||||
setSloAggParams(¶ms, &q.SloQuery, durationSeconds, query.Interval.Milliseconds())
|
||||
queryInterface = cmtsf
|
||||
default:
|
||||
panic(fmt.Sprintf("Unrecognized query type %q", q.QueryType))
|
||||
@ -273,17 +340,6 @@ func (e *Executor) buildQueryExecutors(tsdbQuery plugins.DataQuery) ([]cloudMoni
|
||||
return cloudMonitoringQueryExecutors, nil
|
||||
}
|
||||
|
||||
func migrateLegacyQueryModel(query *plugins.DataSubQuery) {
|
||||
mq := query.Model.Get("metricQuery").MustMap()
|
||||
if mq == nil {
|
||||
migratedModel := simplejson.NewFromAny(map[string]interface{}{
|
||||
"queryType": metricQueryType,
|
||||
"metricQuery": query.Model.MustMap(),
|
||||
})
|
||||
query.Model = migratedModel
|
||||
}
|
||||
}
|
||||
|
||||
func reverse(s string) string {
|
||||
chars := []rune(s)
|
||||
for i, j := 0, len(chars)-1; i < j; i, j = i+1, j-1 {
|
||||
@ -522,8 +578,8 @@ func calcBucketBound(bucketOptions cloudMonitoringBucketOptions, n int) string {
|
||||
return bucketBound
|
||||
}
|
||||
|
||||
func (e *Executor) createRequest(ctx context.Context, dsInfo *models.DataSource, proxyPass string, body io.Reader) (*http.Request, error) {
|
||||
u, err := url.Parse(dsInfo.Url)
|
||||
func (s *Service) createRequest(ctx context.Context, pluginCtx backend.PluginContext, dsInfo *datasourceInfo, proxyPass string, body io.Reader) (*http.Request, error) {
|
||||
u, err := url.Parse(dsInfo.url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -542,7 +598,7 @@ func (e *Executor) createRequest(ctx context.Context, dsInfo *models.DataSource,
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
// find plugin
|
||||
plugin := e.pluginManager.GetDataSource(dsInfo.Type)
|
||||
plugin := s.pluginManager.GetDataSource(pluginCtx.PluginID)
|
||||
if plugin == nil {
|
||||
return nil, errors.New("unable to find datasource plugin CloudMonitoring")
|
||||
}
|
||||
@ -555,14 +611,18 @@ func (e *Executor) createRequest(ctx context.Context, dsInfo *models.DataSource,
|
||||
}
|
||||
}
|
||||
|
||||
pluginproxy.ApplyRoute(ctx, req, proxyPass, cloudMonitoringRoute, dsInfo, e.cfg, e.encryptionService)
|
||||
pluginproxy.ApplyRoute(ctx, req, proxyPass, cloudMonitoringRoute, pluginproxy.DSInfo{
|
||||
ID: dsInfo.id,
|
||||
Updated: dsInfo.updated,
|
||||
JSONData: dsInfo.jsonData,
|
||||
DecryptedSecureJSONData: dsInfo.decryptedSecureJSONData,
|
||||
}, s.cfg)
|
||||
|
||||
return req, nil
|
||||
}
|
||||
|
||||
func (e *Executor) getDefaultProject(ctx context.Context) (string, error) {
|
||||
authenticationType := e.dsInfo.JsonData.Get("authenticationType").MustString(jwtAuthentication)
|
||||
if authenticationType == gceAuthentication {
|
||||
func (s *Service) getDefaultProject(ctx context.Context, dsInfo datasourceInfo) (string, error) {
|
||||
if dsInfo.authenticationType == gceAuthentication {
|
||||
defaultCredentials, err := google.FindDefaultCredentials(ctx, "https://www.googleapis.com/auth/monitoring.read")
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to retrieve default project from GCE metadata server: %w", err)
|
||||
@ -577,7 +637,7 @@ func (e *Executor) getDefaultProject(ctx context.Context) (string, error) {
|
||||
|
||||
return defaultCredentials.ProjectID, nil
|
||||
}
|
||||
return e.dsInfo.JsonData.Get("defaultProject").MustString(), nil
|
||||
return dsInfo.defaultProject, nil
|
||||
}
|
||||
|
||||
func unmarshalResponse(res *http.Response) (cloudMonitoringResponse, error) {
|
||||
@ -626,3 +686,17 @@ func addConfigData(frames data.Frames, dl string, unit string) data.Frames {
|
||||
}
|
||||
return frames
|
||||
}
|
||||
|
||||
func (s *Service) getDSInfo(pluginCtx backend.PluginContext) (*datasourceInfo, error) {
|
||||
i, err := s.im.Get(pluginCtx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
instance, ok := i.(*datasourceInfo)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("failed to cast datsource info")
|
||||
}
|
||||
|
||||
return instance, nil
|
||||
}
|
||||
|
@ -12,18 +12,18 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestCloudMonitoring(t *testing.T) {
|
||||
executor := &Executor{}
|
||||
service := &Service{}
|
||||
|
||||
t.Run("Parse migrated queries from frontend and build Google Cloud Monitoring API queries", func(t *testing.T) {
|
||||
t.Run("and query has no aggregation set", func(t *testing.T) {
|
||||
qes, err := executor.buildQueryExecutors(getBaseQuery())
|
||||
qes, err := service.buildQueryExecutors(baseReq())
|
||||
require.NoError(t, err)
|
||||
queries := getCloudMonitoringQueriesFromInterface(t, qes)
|
||||
|
||||
@ -58,13 +58,13 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("and query has filters", func(t *testing.T) {
|
||||
query := getBaseQuery()
|
||||
query.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{
|
||||
query := baseReq()
|
||||
query.Queries[0].JSON = json.RawMessage(`{
|
||||
"metricType": "a/metric/type",
|
||||
"filters": []interface{}{"key", "=", "value", "AND", "key2", "=", "value2", "AND", "resource.type", "=", "another/resource/type"},
|
||||
})
|
||||
"filters": ["key", "=", "value", "AND", "key2", "=", "value2", "AND", "resource.type", "=", "another/resource/type"]
|
||||
}`)
|
||||
|
||||
qes, err := executor.buildQueryExecutors(query)
|
||||
qes, err := service.buildQueryExecutors(query)
|
||||
require.NoError(t, err)
|
||||
queries := getCloudMonitoringQueriesFromInterface(t, qes)
|
||||
assert.Equal(t, 1, len(queries))
|
||||
@ -89,14 +89,14 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
|
||||
t.Run("and alignmentPeriod is set to grafana-auto", func(t *testing.T) {
|
||||
t.Run("and IntervalMS is larger than 60000", func(t *testing.T) {
|
||||
tsdbQuery := getBaseQuery()
|
||||
tsdbQuery.Queries[0].IntervalMS = 1000000
|
||||
tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{
|
||||
req := baseReq()
|
||||
req.Queries[0].Interval = 1000000 * time.Millisecond
|
||||
req.Queries[0].JSON = json.RawMessage(`{
|
||||
"alignmentPeriod": "grafana-auto",
|
||||
"filters": []interface{}{"key", "=", "value", "AND", "key2", "=", "value2"},
|
||||
})
|
||||
"filters": ["key", "=", "value", "AND", "key2", "=", "value2"]
|
||||
}`)
|
||||
|
||||
qes, err := executor.buildQueryExecutors(tsdbQuery)
|
||||
qes, err := service.buildQueryExecutors(req)
|
||||
require.NoError(t, err)
|
||||
queries := getCloudMonitoringQueriesFromInterface(t, qes)
|
||||
assert.Equal(t, `+1000s`, queries[0].Params["aggregation.alignmentPeriod"][0])
|
||||
@ -117,14 +117,14 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
verifyDeepLink(t, dl, expectedTimeSelection, expectedTimeSeriesFilter)
|
||||
})
|
||||
t.Run("and IntervalMS is less than 60000", func(t *testing.T) {
|
||||
tsdbQuery := getBaseQuery()
|
||||
tsdbQuery.Queries[0].IntervalMS = 30000
|
||||
tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{
|
||||
req := baseReq()
|
||||
req.Queries[0].Interval = 30000 * time.Millisecond
|
||||
req.Queries[0].JSON = json.RawMessage(`{
|
||||
"alignmentPeriod": "grafana-auto",
|
||||
"filters": []interface{}{"key", "=", "value", "AND", "key2", "=", "value2"},
|
||||
})
|
||||
"filters": ["key", "=", "value", "AND", "key2", "=", "value2"]
|
||||
}`)
|
||||
|
||||
qes, err := executor.buildQueryExecutors(tsdbQuery)
|
||||
qes, err := service.buildQueryExecutors(req)
|
||||
require.NoError(t, err)
|
||||
queries := getCloudMonitoringQueriesFromInterface(t, qes)
|
||||
assert.Equal(t, `+60s`, queries[0].Params["aggregation.alignmentPeriod"][0])
|
||||
@ -147,61 +147,63 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("and alignmentPeriod is set to cloud-monitoring-auto", func(t *testing.T) { // legacy
|
||||
t.Run("and range is two hours", func(t *testing.T) {
|
||||
tsdbQuery := getBaseQuery()
|
||||
tsdbQuery.TimeRange.From = "1538033322461"
|
||||
tsdbQuery.TimeRange.To = "1538040522461"
|
||||
tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{
|
||||
"target": "target",
|
||||
"alignmentPeriod": "cloud-monitoring-auto",
|
||||
})
|
||||
now := time.Now().UTC()
|
||||
|
||||
qes, err := executor.buildQueryExecutors(tsdbQuery)
|
||||
t.Run("and range is two hours", func(t *testing.T) {
|
||||
req := baseReq()
|
||||
req.Queries[0].TimeRange.From = now.Add(-(time.Hour * 2))
|
||||
req.Queries[0].TimeRange.To = now
|
||||
req.Queries[0].JSON = json.RawMessage(`{
|
||||
"target": "target",
|
||||
"alignmentPeriod": "cloud-monitoring-auto"
|
||||
}`)
|
||||
|
||||
qes, err := service.buildQueryExecutors(req)
|
||||
require.NoError(t, err)
|
||||
queries := getCloudMonitoringQueriesFromInterface(t, qes)
|
||||
assert.Equal(t, `+60s`, queries[0].Params["aggregation.alignmentPeriod"][0])
|
||||
})
|
||||
|
||||
t.Run("and range is 22 hours", func(t *testing.T) {
|
||||
tsdbQuery := getBaseQuery()
|
||||
tsdbQuery.TimeRange.From = "1538034524922"
|
||||
tsdbQuery.TimeRange.To = "1538113724922"
|
||||
tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{
|
||||
"target": "target",
|
||||
"alignmentPeriod": "cloud-monitoring-auto",
|
||||
})
|
||||
req := baseReq()
|
||||
req.Queries[0].TimeRange.From = now.Add(-(time.Hour * 22))
|
||||
req.Queries[0].TimeRange.To = now
|
||||
req.Queries[0].JSON = json.RawMessage(`{
|
||||
"target": "target",
|
||||
"alignmentPeriod": "cloud-monitoring-auto"
|
||||
}`)
|
||||
|
||||
qes, err := executor.buildQueryExecutors(tsdbQuery)
|
||||
qes, err := service.buildQueryExecutors(req)
|
||||
require.NoError(t, err)
|
||||
queries := getCloudMonitoringQueriesFromInterface(t, qes)
|
||||
assert.Equal(t, `+60s`, queries[0].Params["aggregation.alignmentPeriod"][0])
|
||||
})
|
||||
|
||||
t.Run("and range is 23 hours", func(t *testing.T) {
|
||||
tsdbQuery := getBaseQuery()
|
||||
tsdbQuery.TimeRange.From = "1538034567985"
|
||||
tsdbQuery.TimeRange.To = "1538117367985"
|
||||
tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{
|
||||
"target": "target",
|
||||
"alignmentPeriod": "cloud-monitoring-auto",
|
||||
})
|
||||
req := baseReq()
|
||||
req.Queries[0].TimeRange.From = now.Add(-(time.Hour * 23))
|
||||
req.Queries[0].TimeRange.To = now
|
||||
req.Queries[0].JSON = json.RawMessage(`{
|
||||
"target": "target",
|
||||
"alignmentPeriod": "cloud-monitoring-auto"
|
||||
}`)
|
||||
|
||||
qes, err := executor.buildQueryExecutors(tsdbQuery)
|
||||
qes, err := service.buildQueryExecutors(req)
|
||||
require.NoError(t, err)
|
||||
queries := getCloudMonitoringQueriesFromInterface(t, qes)
|
||||
assert.Equal(t, `+300s`, queries[0].Params["aggregation.alignmentPeriod"][0])
|
||||
})
|
||||
|
||||
t.Run("and range is 7 days", func(t *testing.T) {
|
||||
tsdbQuery := getBaseQuery()
|
||||
tsdbQuery.TimeRange.From = "1538036324073"
|
||||
tsdbQuery.TimeRange.To = "1538641124073"
|
||||
tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{
|
||||
"target": "target",
|
||||
"alignmentPeriod": "cloud-monitoring-auto",
|
||||
})
|
||||
req := baseReq()
|
||||
req.Queries[0].TimeRange.From = now
|
||||
req.Queries[0].TimeRange.To = now.AddDate(0, 0, 7)
|
||||
req.Queries[0].JSON = json.RawMessage(`{
|
||||
"target": "target",
|
||||
"alignmentPeriod": "cloud-monitoring-auto"
|
||||
}`)
|
||||
|
||||
qes, err := executor.buildQueryExecutors(tsdbQuery)
|
||||
qes, err := service.buildQueryExecutors(req)
|
||||
require.NoError(t, err)
|
||||
queries := getCloudMonitoringQueriesFromInterface(t, qes)
|
||||
assert.Equal(t, `+3600s`, queries[0].Params["aggregation.alignmentPeriod"][0])
|
||||
@ -209,16 +211,18 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("and alignmentPeriod is set to stackdriver-auto", func(t *testing.T) { // legacy
|
||||
t.Run("and range is two hours", func(t *testing.T) {
|
||||
tsdbQuery := getBaseQuery()
|
||||
tsdbQuery.TimeRange.From = "1538033322461"
|
||||
tsdbQuery.TimeRange.To = "1538040522461"
|
||||
tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{
|
||||
"target": "target",
|
||||
"alignmentPeriod": "stackdriver-auto",
|
||||
})
|
||||
now := time.Now().UTC()
|
||||
|
||||
qes, err := executor.buildQueryExecutors(tsdbQuery)
|
||||
t.Run("and range is two hours", func(t *testing.T) {
|
||||
req := baseReq()
|
||||
req.Queries[0].TimeRange.From = now.Add(-(time.Hour * 2))
|
||||
req.Queries[0].TimeRange.To = now
|
||||
req.Queries[0].JSON = json.RawMessage(`{
|
||||
"target": "target",
|
||||
"alignmentPeriod": "stackdriver-auto"
|
||||
}`)
|
||||
|
||||
qes, err := service.buildQueryExecutors(req)
|
||||
require.NoError(t, err)
|
||||
queries := getCloudMonitoringQueriesFromInterface(t, qes)
|
||||
assert.Equal(t, `+60s`, queries[0].Params["aggregation.alignmentPeriod"][0])
|
||||
@ -230,8 +234,8 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
|
||||
expectedTimeSelection := map[string]string{
|
||||
"timeRange": "custom",
|
||||
"start": "2018-09-27T07:28:42Z",
|
||||
"end": "2018-09-27T09:28:42Z",
|
||||
"start": req.Queries[0].TimeRange.From.Format(time.RFC3339),
|
||||
"end": req.Queries[0].TimeRange.To.Format(time.RFC3339),
|
||||
}
|
||||
expectedTimeSeriesFilter := map[string]interface{}{
|
||||
"minAlignmentPeriod": `60s`,
|
||||
@ -240,15 +244,15 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("and range is 22 hours", func(t *testing.T) {
|
||||
tsdbQuery := getBaseQuery()
|
||||
tsdbQuery.TimeRange.From = "1538034524922"
|
||||
tsdbQuery.TimeRange.To = "1538113724922"
|
||||
tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{
|
||||
"target": "target",
|
||||
"alignmentPeriod": "stackdriver-auto",
|
||||
})
|
||||
req := baseReq()
|
||||
req.Queries[0].TimeRange.From = now.Add(-(time.Hour * 22))
|
||||
req.Queries[0].TimeRange.To = now
|
||||
req.Queries[0].JSON = json.RawMessage(`{
|
||||
"target": "target",
|
||||
"alignmentPeriod": "stackdriver-auto"
|
||||
}`)
|
||||
|
||||
qes, err := executor.buildQueryExecutors(tsdbQuery)
|
||||
qes, err := service.buildQueryExecutors(req)
|
||||
require.NoError(t, err)
|
||||
queries := getCloudMonitoringQueriesFromInterface(t, qes)
|
||||
assert.Equal(t, `+60s`, queries[0].Params["aggregation.alignmentPeriod"][0])
|
||||
@ -260,8 +264,8 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
|
||||
expectedTimeSelection := map[string]string{
|
||||
"timeRange": "custom",
|
||||
"start": "2018-09-27T07:48:44Z",
|
||||
"end": "2018-09-28T05:48:44Z",
|
||||
"start": req.Queries[0].TimeRange.From.Format(time.RFC3339),
|
||||
"end": req.Queries[0].TimeRange.To.Format(time.RFC3339),
|
||||
}
|
||||
expectedTimeSeriesFilter := map[string]interface{}{
|
||||
"minAlignmentPeriod": `60s`,
|
||||
@ -270,15 +274,15 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("and range is 23 hours", func(t *testing.T) {
|
||||
tsdbQuery := getBaseQuery()
|
||||
tsdbQuery.TimeRange.From = "1538034567985"
|
||||
tsdbQuery.TimeRange.To = "1538117367985"
|
||||
tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{
|
||||
"target": "target",
|
||||
"alignmentPeriod": "stackdriver-auto",
|
||||
})
|
||||
req := baseReq()
|
||||
req.Queries[0].TimeRange.From = now.Add(-(time.Hour * 23))
|
||||
req.Queries[0].TimeRange.To = now
|
||||
req.Queries[0].JSON = json.RawMessage(`{
|
||||
"target": "target",
|
||||
"alignmentPeriod": "stackdriver-auto"
|
||||
}`)
|
||||
|
||||
qes, err := executor.buildQueryExecutors(tsdbQuery)
|
||||
qes, err := service.buildQueryExecutors(req)
|
||||
require.NoError(t, err)
|
||||
queries := getCloudMonitoringQueriesFromInterface(t, qes)
|
||||
assert.Equal(t, `+300s`, queries[0].Params["aggregation.alignmentPeriod"][0])
|
||||
@ -290,8 +294,8 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
|
||||
expectedTimeSelection := map[string]string{
|
||||
"timeRange": "custom",
|
||||
"start": "2018-09-27T07:49:27Z",
|
||||
"end": "2018-09-28T06:49:27Z",
|
||||
"start": req.Queries[0].TimeRange.From.Format(time.RFC3339),
|
||||
"end": req.Queries[0].TimeRange.To.Format(time.RFC3339),
|
||||
}
|
||||
expectedTimeSeriesFilter := map[string]interface{}{
|
||||
"minAlignmentPeriod": `300s`,
|
||||
@ -300,15 +304,15 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("and range is 7 days", func(t *testing.T) {
|
||||
tsdbQuery := getBaseQuery()
|
||||
tsdbQuery.TimeRange.From = "1538036324073"
|
||||
tsdbQuery.TimeRange.To = "1538641124073"
|
||||
tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{
|
||||
"target": "target",
|
||||
"alignmentPeriod": "stackdriver-auto",
|
||||
})
|
||||
req := baseReq()
|
||||
req.Queries[0].TimeRange.From = now.AddDate(0, 0, -7)
|
||||
req.Queries[0].TimeRange.To = now
|
||||
req.Queries[0].JSON = json.RawMessage(`{
|
||||
"target": "target",
|
||||
"alignmentPeriod": "stackdriver-auto"
|
||||
}`)
|
||||
|
||||
qes, err := executor.buildQueryExecutors(tsdbQuery)
|
||||
qes, err := service.buildQueryExecutors(req)
|
||||
require.NoError(t, err)
|
||||
queries := getCloudMonitoringQueriesFromInterface(t, qes)
|
||||
assert.Equal(t, `+3600s`, queries[0].Params["aggregation.alignmentPeriod"][0])
|
||||
@ -320,8 +324,8 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
|
||||
expectedTimeSelection := map[string]string{
|
||||
"timeRange": "custom",
|
||||
"start": "2018-09-27T08:18:44Z",
|
||||
"end": "2018-10-04T08:18:44Z",
|
||||
"start": req.Queries[0].TimeRange.From.Format(time.RFC3339),
|
||||
"end": req.Queries[0].TimeRange.To.Format(time.RFC3339),
|
||||
}
|
||||
expectedTimeSeriesFilter := map[string]interface{}{
|
||||
"minAlignmentPeriod": `3600s`,
|
||||
@ -332,13 +336,13 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
|
||||
t.Run("and alignmentPeriod is set in frontend", func(t *testing.T) {
|
||||
t.Run("and alignment period is within accepted range", func(t *testing.T) {
|
||||
tsdbQuery := getBaseQuery()
|
||||
tsdbQuery.Queries[0].IntervalMS = 1000
|
||||
tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{
|
||||
"alignmentPeriod": "+600s",
|
||||
})
|
||||
req := baseReq()
|
||||
req.Queries[0].Interval = 1000
|
||||
req.Queries[0].JSON = json.RawMessage(`{
|
||||
"alignmentPeriod": "+600s"
|
||||
}`)
|
||||
|
||||
qes, err := executor.buildQueryExecutors(tsdbQuery)
|
||||
qes, err := service.buildQueryExecutors(req)
|
||||
require.NoError(t, err)
|
||||
queries := getCloudMonitoringQueriesFromInterface(t, qes)
|
||||
assert.Equal(t, `+600s`, queries[0].Params["aggregation.alignmentPeriod"][0])
|
||||
@ -361,14 +365,14 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("and query has aggregation mean set", func(t *testing.T) {
|
||||
tsdbQuery := getBaseQuery()
|
||||
tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{
|
||||
req := baseReq()
|
||||
req.Queries[0].JSON = json.RawMessage(`{
|
||||
"metricType": "a/metric/type",
|
||||
"crossSeriesReducer": "REDUCE_SUM",
|
||||
"view": "FULL",
|
||||
})
|
||||
"view": "FULL"
|
||||
}`)
|
||||
|
||||
qes, err := executor.buildQueryExecutors(tsdbQuery)
|
||||
qes, err := service.buildQueryExecutors(req)
|
||||
require.NoError(t, err)
|
||||
queries := getCloudMonitoringQueriesFromInterface(t, qes)
|
||||
|
||||
@ -404,15 +408,15 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("and query has group bys", func(t *testing.T) {
|
||||
tsdbQuery := getBaseQuery()
|
||||
tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{
|
||||
req := baseReq()
|
||||
req.Queries[0].JSON = json.RawMessage(`{
|
||||
"metricType": "a/metric/type",
|
||||
"crossSeriesReducer": "REDUCE_NONE",
|
||||
"groupBys": []interface{}{"metric.label.group1", "metric.label.group2"},
|
||||
"view": "FULL",
|
||||
})
|
||||
"groupBys": ["metric.label.group1", "metric.label.group2"],
|
||||
"view": "FULL"
|
||||
}`)
|
||||
|
||||
qes, err := executor.buildQueryExecutors(tsdbQuery)
|
||||
qes, err := service.buildQueryExecutors(req)
|
||||
require.NoError(t, err)
|
||||
queries := getCloudMonitoringQueriesFromInterface(t, qes)
|
||||
|
||||
@ -450,30 +454,29 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
|
||||
t.Run("Parse queries from frontend and build Google Cloud Monitoring API queries", func(t *testing.T) {
|
||||
fromStart := time.Date(2018, 3, 15, 13, 0, 0, 0, time.UTC).In(time.Local)
|
||||
tsdbQuery := plugins.DataQuery{
|
||||
TimeRange: &plugins.DataTimeRange{
|
||||
From: fmt.Sprintf("%v", fromStart.Unix()*1000),
|
||||
To: fmt.Sprintf("%v", fromStart.Add(34*time.Minute).Unix()*1000),
|
||||
},
|
||||
Queries: []plugins.DataSubQuery{
|
||||
req := &backend.QueryDataRequest{
|
||||
Queries: []backend.DataQuery{
|
||||
{
|
||||
Model: simplejson.NewFromAny(map[string]interface{}{
|
||||
"queryType": metricQueryType,
|
||||
"metricQuery": map[string]interface{}{
|
||||
"metricType": "a/metric/type",
|
||||
"view": "FULL",
|
||||
"aliasBy": "testalias",
|
||||
"type": "timeSeriesQuery",
|
||||
"groupBys": []interface{}{"metric.label.group1", "metric.label.group2"},
|
||||
},
|
||||
}),
|
||||
RefID: "A",
|
||||
TimeRange: backend.TimeRange{
|
||||
From: fromStart,
|
||||
To: fromStart.Add(34 * time.Minute),
|
||||
},
|
||||
JSON: json.RawMessage(`{
|
||||
"queryType": "metrics",
|
||||
"metricQuery": {
|
||||
"metricType": "a/metric/type",
|
||||
"view": "FULL",
|
||||
"aliasBy": "testalias",
|
||||
"type": "timeSeriesQuery",
|
||||
"groupBys": ["metric.label.group1", "metric.label.group2"]
|
||||
}
|
||||
}`),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
t.Run("and query type is metrics", func(t *testing.T) {
|
||||
qes, err := executor.buildQueryExecutors(tsdbQuery)
|
||||
qes, err := service.buildQueryExecutors(req)
|
||||
require.NoError(t, err)
|
||||
queries := getCloudMonitoringQueriesFromInterface(t, qes)
|
||||
|
||||
@ -509,18 +512,18 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
}
|
||||
verifyDeepLink(t, dl, expectedTimeSelection, expectedTimeSeriesFilter)
|
||||
|
||||
tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{
|
||||
"queryType": metricQueryType,
|
||||
"metricQuery": map[string]interface{}{
|
||||
"editorMode": mqlEditorMode,
|
||||
req.Queries[0].JSON = json.RawMessage(`{
|
||||
"queryType": "metrics",
|
||||
"metricQuery": {
|
||||
"editorMode": "mql",
|
||||
"projectName": "test-proj",
|
||||
"query": "test-query",
|
||||
"aliasBy": "test-alias",
|
||||
"aliasBy": "test-alias"
|
||||
},
|
||||
"sloQuery": map[string]interface{}{},
|
||||
})
|
||||
"sloQuery": {}
|
||||
}`)
|
||||
|
||||
qes, err = executor.buildQueryExecutors(tsdbQuery)
|
||||
qes, err = service.buildQueryExecutors(req)
|
||||
require.NoError(t, err)
|
||||
tqueries := make([]*cloudMonitoringTimeSeriesQuery, 0)
|
||||
for _, qi := range qes {
|
||||
@ -537,21 +540,21 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("and query type is SLOs", func(t *testing.T) {
|
||||
tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{
|
||||
"queryType": sloQueryType,
|
||||
"metricQuery": map[string]interface{}{},
|
||||
"sloQuery": map[string]interface{}{
|
||||
req.Queries[0].JSON = json.RawMessage(`{
|
||||
"queryType": "slo",
|
||||
"sloQuery": {
|
||||
"projectName": "test-proj",
|
||||
"alignmentPeriod": "stackdriver-auto",
|
||||
"perSeriesAligner": "ALIGN_NEXT_OLDER",
|
||||
"aliasBy": "",
|
||||
"selectorName": "select_slo_health",
|
||||
"serviceId": "test-service",
|
||||
"sloId": "test-slo",
|
||||
"sloId": "test-slo"
|
||||
},
|
||||
})
|
||||
"metricQuery": {}
|
||||
}`)
|
||||
|
||||
qes, err := executor.buildQueryExecutors(tsdbQuery)
|
||||
qes, err := service.buildQueryExecutors(req)
|
||||
require.NoError(t, err)
|
||||
queries := getCloudMonitoringQueriesFromInterface(t, qes)
|
||||
|
||||
@ -565,21 +568,21 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
assert.Equal(t, `aggregation.alignmentPeriod=%2B60s&aggregation.perSeriesAligner=ALIGN_MEAN&filter=select_slo_health%28%22projects%2Ftest-proj%2Fservices%2Ftest-service%2FserviceLevelObjectives%2Ftest-slo%22%29&interval.endTime=2018-03-15T13%3A34%3A00Z&interval.startTime=2018-03-15T13%3A00%3A00Z`, queries[0].Target)
|
||||
assert.Equal(t, 5, len(queries[0].Params))
|
||||
|
||||
tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{
|
||||
"queryType": sloQueryType,
|
||||
"metricQuery": map[string]interface{}{},
|
||||
"sloQuery": map[string]interface{}{
|
||||
req.Queries[0].JSON = json.RawMessage(`{
|
||||
"queryType": "slo",
|
||||
"sloQuery": {
|
||||
"projectName": "test-proj",
|
||||
"alignmentPeriod": "stackdriver-auto",
|
||||
"perSeriesAligner": "ALIGN_NEXT_OLDER",
|
||||
"aliasBy": "",
|
||||
"selectorName": "select_slo_compliance",
|
||||
"serviceId": "test-service",
|
||||
"sloId": "test-slo",
|
||||
"sloId": "test-slo"
|
||||
},
|
||||
})
|
||||
"metricQuery": {}
|
||||
}`)
|
||||
|
||||
qes, err = executor.buildQueryExecutors(tsdbQuery)
|
||||
qes, err = service.buildQueryExecutors(req)
|
||||
require.NoError(t, err)
|
||||
qqueries := getCloudMonitoringQueriesFromInterface(t, qes)
|
||||
assert.Equal(t, "ALIGN_NEXT_OLDER", qqueries[0].Params["aggregation.perSeriesAligner"][0])
|
||||
@ -595,13 +598,11 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1, len(data.TimeSeries))
|
||||
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
res := &plugins.DataQueryResult{Meta: simplejson.New(), RefID: "A"}
|
||||
res := &backend.DataResponse{}
|
||||
query := &cloudMonitoringTimeSeriesFilter{Params: url.Values{}}
|
||||
err = query.parseResponse(res, data, "")
|
||||
require.NoError(t, err)
|
||||
frames, err := res.Dataframes.Decoded()
|
||||
require.NoError(t, err)
|
||||
frames := res.Frames
|
||||
require.Len(t, frames, 1)
|
||||
assert.Equal(t, "serviceruntime.googleapis.com/api/request_count", frames[0].Fields[1].Name)
|
||||
assert.Equal(t, 3, frames[0].Fields[1].Len())
|
||||
@ -620,52 +621,53 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
data, err := loadTestFile("./test-data/2-series-response-no-agg.json")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 3, len(data.TimeSeries))
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
res := &plugins.DataQueryResult{Meta: simplejson.New(), RefID: "A"}
|
||||
res := &backend.DataResponse{}
|
||||
query := &cloudMonitoringTimeSeriesFilter{Params: url.Values{}}
|
||||
err = query.parseResponse(res, data, "")
|
||||
require.NoError(t, err)
|
||||
frames, err := res.Dataframes.Decoded()
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, 3, len(frames))
|
||||
assert.Equal(t, "compute.googleapis.com/instance/cpu/usage_time collector-asia-east-1", frames[0].Fields[1].Name)
|
||||
assert.Equal(t, "compute.googleapis.com/instance/cpu/usage_time collector-europe-west-1", frames[1].Fields[1].Name)
|
||||
assert.Equal(t, "compute.googleapis.com/instance/cpu/usage_time collector-us-east-1", frames[2].Fields[1].Name)
|
||||
field := res.Frames[0].Fields[1]
|
||||
assert.Equal(t, 3, field.Len())
|
||||
assert.Equal(t, 9.8566497180145, field.At(0))
|
||||
assert.Equal(t, 9.7323568146676, field.At(1))
|
||||
assert.Equal(t, 9.7730520330369, field.At(2))
|
||||
assert.Equal(t, "compute.googleapis.com/instance/cpu/usage_time collector-asia-east-1", field.Name)
|
||||
assert.Equal(t, "collector-asia-east-1", field.Labels["metric.label.instance_name"])
|
||||
assert.Equal(t, "asia-east1-a", field.Labels["resource.label.zone"])
|
||||
assert.Equal(t, "grafana-prod", field.Labels["resource.label.project_id"])
|
||||
|
||||
assert.Equal(t, 3, frames[0].Fields[1].Len())
|
||||
assert.Equal(t, 9.8566497180145, frames[0].Fields[1].At(0))
|
||||
assert.Equal(t, 9.7323568146676, frames[0].Fields[1].At(1))
|
||||
assert.Equal(t, 9.7730520330369, frames[0].Fields[1].At(2))
|
||||
field = res.Frames[1].Fields[1]
|
||||
assert.Equal(t, 3, field.Len())
|
||||
assert.Equal(t, 9.0238475054502, field.At(0))
|
||||
assert.Equal(t, 8.9689492364414, field.At(1))
|
||||
assert.Equal(t, 8.8210971239023, field.At(2))
|
||||
assert.Equal(t, "compute.googleapis.com/instance/cpu/usage_time collector-europe-west-1", field.Name)
|
||||
assert.Equal(t, "collector-europe-west-1", field.Labels["metric.label.instance_name"])
|
||||
assert.Equal(t, "europe-west1-b", field.Labels["resource.label.zone"])
|
||||
assert.Equal(t, "grafana-prod", field.Labels["resource.label.project_id"])
|
||||
|
||||
labels := res.Meta.Get("labels").Interface().(map[string][]string)
|
||||
require.NotNil(t, labels)
|
||||
assert.Equal(t, 3, len(labels["metric.label.instance_name"]))
|
||||
assert.Contains(t, labels["metric.label.instance_name"], "collector-asia-east-1")
|
||||
assert.Contains(t, labels["metric.label.instance_name"], "collector-europe-west-1")
|
||||
assert.Contains(t, labels["metric.label.instance_name"], "collector-us-east-1")
|
||||
|
||||
assert.Equal(t, 3, len(labels["resource.label.zone"]))
|
||||
assert.Contains(t, labels["resource.label.zone"], "asia-east1-a")
|
||||
assert.Contains(t, labels["resource.label.zone"], "europe-west1-b")
|
||||
assert.Contains(t, labels["resource.label.zone"], "us-east1-b")
|
||||
|
||||
assert.Equal(t, 1, len(labels["resource.label.project_id"]))
|
||||
assert.Equal(t, "grafana-prod", labels["resource.label.project_id"][0])
|
||||
field = res.Frames[2].Fields[1]
|
||||
assert.Equal(t, 3, field.Len())
|
||||
assert.Equal(t, 30.829426143318, field.At(0))
|
||||
assert.Equal(t, 30.903974115849, field.At(1))
|
||||
assert.Equal(t, 30.807846801355, field.At(2))
|
||||
assert.Equal(t, "compute.googleapis.com/instance/cpu/usage_time collector-us-east-1", field.Name)
|
||||
assert.Equal(t, "collector-us-east-1", field.Labels["metric.label.instance_name"])
|
||||
assert.Equal(t, "us-east1-b", field.Labels["resource.label.zone"])
|
||||
assert.Equal(t, "grafana-prod", field.Labels["resource.label.project_id"])
|
||||
})
|
||||
|
||||
t.Run("when data from query with no aggregation and group bys", func(t *testing.T) {
|
||||
data, err := loadTestFile("./test-data/2-series-response-no-agg.json")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 3, len(data.TimeSeries))
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
res := &plugins.DataQueryResult{Meta: simplejson.New(), RefID: "A"}
|
||||
res := &backend.DataResponse{}
|
||||
query := &cloudMonitoringTimeSeriesFilter{Params: url.Values{}, GroupBys: []string{
|
||||
"metric.label.instance_name", "resource.label.zone",
|
||||
}}
|
||||
err = query.parseResponse(res, data, "")
|
||||
require.NoError(t, err)
|
||||
frames, err := res.Dataframes.Decoded()
|
||||
frames := res.Frames
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, 3, len(frames))
|
||||
@ -678,14 +680,13 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
data, err := loadTestFile("./test-data/2-series-response-no-agg.json")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 3, len(data.TimeSeries))
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
res := &plugins.DataQueryResult{Meta: simplejson.New(), RefID: "A"}
|
||||
res := &backend.DataResponse{}
|
||||
|
||||
t.Run("and the alias pattern is for metric type, a metric label and a resource label", func(t *testing.T) {
|
||||
query := &cloudMonitoringTimeSeriesFilter{Params: url.Values{}, AliasBy: "{{metric.type}} - {{metric.label.instance_name}} - {{resource.label.zone}}", GroupBys: []string{"metric.label.instance_name", "resource.label.zone"}}
|
||||
err = query.parseResponse(res, data, "")
|
||||
require.NoError(t, err)
|
||||
frames, err := res.Dataframes.Decoded()
|
||||
frames := res.Frames
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, 3, len(frames))
|
||||
@ -698,7 +699,7 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
query := &cloudMonitoringTimeSeriesFilter{Params: url.Values{}, AliasBy: "metric {{metric.name}} service {{metric.service}}", GroupBys: []string{"metric.label.instance_name", "resource.label.zone"}}
|
||||
err = query.parseResponse(res, data, "")
|
||||
require.NoError(t, err)
|
||||
frames, err := res.Dataframes.Decoded()
|
||||
frames := res.Frames
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, 3, len(frames))
|
||||
@ -712,12 +713,11 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
data, err := loadTestFile("./test-data/3-series-response-distribution-exponential.json")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1, len(data.TimeSeries))
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
res := &plugins.DataQueryResult{Meta: simplejson.New(), RefID: "A"}
|
||||
res := &backend.DataResponse{}
|
||||
query := &cloudMonitoringTimeSeriesFilter{Params: url.Values{}, AliasBy: "{{bucket}}"}
|
||||
err = query.parseResponse(res, data, "")
|
||||
require.NoError(t, err)
|
||||
frames, err := res.Dataframes.Decoded()
|
||||
frames := res.Frames
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 11, len(frames))
|
||||
for i := 0; i < 11; i++ {
|
||||
@ -754,12 +754,11 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
data, err := loadTestFile("./test-data/4-series-response-distribution-explicit.json")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1, len(data.TimeSeries))
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
res := &plugins.DataQueryResult{Meta: simplejson.New(), RefID: "A"}
|
||||
res := &backend.DataResponse{}
|
||||
query := &cloudMonitoringTimeSeriesFilter{Params: url.Values{}, AliasBy: "{{bucket}}"}
|
||||
err = query.parseResponse(res, data, "")
|
||||
require.NoError(t, err)
|
||||
frames, err := res.Dataframes.Decoded()
|
||||
frames := res.Frames
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 33, len(frames))
|
||||
for i := 0; i < 33; i++ {
|
||||
@ -789,34 +788,34 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
data, err := loadTestFile("./test-data/5-series-response-meta-data.json")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 3, len(data.TimeSeries))
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
res := &plugins.DataQueryResult{Meta: simplejson.New(), RefID: "A"}
|
||||
res := &backend.DataResponse{}
|
||||
query := &cloudMonitoringTimeSeriesFilter{Params: url.Values{}, AliasBy: "{{bucket}}"}
|
||||
err = query.parseResponse(res, data, "")
|
||||
require.NoError(t, err)
|
||||
labels := res.Meta.Get("labels").Interface().(map[string][]string)
|
||||
frames, err := res.Dataframes.Decoded()
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 3, len(frames))
|
||||
assert.Equal(t, 3, len(res.Frames))
|
||||
|
||||
assert.Equal(t, 5, len(labels["metadata.system_labels.test"]))
|
||||
assert.Contains(t, labels["metadata.system_labels.test"], "value1")
|
||||
assert.Contains(t, labels["metadata.system_labels.test"], "value2")
|
||||
assert.Contains(t, labels["metadata.system_labels.test"], "value3")
|
||||
assert.Contains(t, labels["metadata.system_labels.test"], "value4")
|
||||
assert.Contains(t, labels["metadata.system_labels.test"], "value5")
|
||||
field := res.Frames[0].Fields[1]
|
||||
assert.Equal(t, "diana-debian9", field.Labels["metadata.system_labels.name"])
|
||||
assert.Equal(t, "value1, value2", field.Labels["metadata.system_labels.test"])
|
||||
assert.Equal(t, "us-west1", field.Labels["metadata.system_labels.region"])
|
||||
assert.Equal(t, "false", field.Labels["metadata.system_labels.spot_instance"])
|
||||
assert.Equal(t, "name1", field.Labels["metadata.user_labels.name"])
|
||||
assert.Equal(t, "region1", field.Labels["metadata.user_labels.region"])
|
||||
|
||||
assert.Equal(t, 2, len(labels["metadata.system_labels.region"]))
|
||||
assert.Contains(t, labels["metadata.system_labels.region"], "us-central1")
|
||||
assert.Contains(t, labels["metadata.system_labels.region"], "us-west1")
|
||||
field = res.Frames[1].Fields[1]
|
||||
assert.Equal(t, "diana-ubuntu1910", field.Labels["metadata.system_labels.name"])
|
||||
assert.Equal(t, "value1, value2, value3", field.Labels["metadata.system_labels.test"])
|
||||
assert.Equal(t, "us-west1", field.Labels["metadata.system_labels.region"])
|
||||
assert.Equal(t, "false", field.Labels["metadata.system_labels.spot_instance"])
|
||||
|
||||
assert.Equal(t, 2, len(labels["metadata.user_labels.region"]))
|
||||
assert.Contains(t, labels["metadata.user_labels.region"], "region1")
|
||||
assert.Contains(t, labels["metadata.user_labels.region"], "region3")
|
||||
|
||||
assert.Equal(t, 2, len(labels["metadata.user_labels.name"]))
|
||||
assert.Contains(t, labels["metadata.user_labels.name"], "name1")
|
||||
assert.Contains(t, labels["metadata.user_labels.name"], "name3")
|
||||
field = res.Frames[2].Fields[1]
|
||||
assert.Equal(t, "premium-plugin-staging", field.Labels["metadata.system_labels.name"])
|
||||
assert.Equal(t, "value1, value2, value4, value5", field.Labels["metadata.system_labels.test"])
|
||||
assert.Equal(t, "us-central1", field.Labels["metadata.system_labels.region"])
|
||||
assert.Equal(t, "true", field.Labels["metadata.system_labels.spot_instance"])
|
||||
assert.Equal(t, "name3", field.Labels["metadata.user_labels.name"])
|
||||
assert.Equal(t, "region3", field.Labels["metadata.user_labels.region"])
|
||||
})
|
||||
|
||||
t.Run("when data from query returns metadata system labels and alias by is defined", func(t *testing.T) {
|
||||
@ -825,12 +824,11 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
assert.Equal(t, 3, len(data.TimeSeries))
|
||||
|
||||
t.Run("and systemlabel contains key with array of string", func(t *testing.T) {
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
res := &plugins.DataQueryResult{Meta: simplejson.New(), RefID: "A"}
|
||||
res := &backend.DataResponse{}
|
||||
query := &cloudMonitoringTimeSeriesFilter{Params: url.Values{}, AliasBy: "{{metadata.system_labels.test}}"}
|
||||
err = query.parseResponse(res, data, "")
|
||||
require.NoError(t, err)
|
||||
frames, err := res.Dataframes.Decoded()
|
||||
frames := res.Frames
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 3, len(frames))
|
||||
fmt.Println(frames[0].Fields[1].Name)
|
||||
@ -840,12 +838,11 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("and systemlabel contains key with array of string2", func(t *testing.T) {
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
res := &plugins.DataQueryResult{Meta: simplejson.New(), RefID: "A"}
|
||||
res := &backend.DataResponse{}
|
||||
query := &cloudMonitoringTimeSeriesFilter{Params: url.Values{}, AliasBy: "{{metadata.system_labels.test2}}"}
|
||||
err = query.parseResponse(res, data, "")
|
||||
require.NoError(t, err)
|
||||
frames, err := res.Dataframes.Decoded()
|
||||
frames := res.Frames
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 3, len(frames))
|
||||
assert.Equal(t, "testvalue", frames[2].Fields[1].Name)
|
||||
@ -858,8 +855,7 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
assert.Equal(t, 1, len(data.TimeSeries))
|
||||
|
||||
t.Run("and alias by is expanded", func(t *testing.T) {
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
res := &plugins.DataQueryResult{Meta: simplejson.New(), RefID: "A"}
|
||||
res := &backend.DataResponse{}
|
||||
query := &cloudMonitoringTimeSeriesFilter{
|
||||
Params: url.Values{},
|
||||
ProjectName: "test-proj",
|
||||
@ -870,7 +866,7 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
}
|
||||
err = query.parseResponse(res, data, "")
|
||||
require.NoError(t, err)
|
||||
frames, err := res.Dataframes.Decoded()
|
||||
frames := res.Frames
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "test-proj - test-service - test-slo - select_slo_compliance", frames[0].Fields[1].Name)
|
||||
})
|
||||
@ -882,8 +878,7 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
assert.Equal(t, 1, len(data.TimeSeries))
|
||||
|
||||
t.Run("and alias by is expanded", func(t *testing.T) {
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
res := &plugins.DataQueryResult{Meta: simplejson.New(), RefID: "A"}
|
||||
res := &backend.DataResponse{}
|
||||
query := &cloudMonitoringTimeSeriesFilter{
|
||||
Params: url.Values{},
|
||||
ProjectName: "test-proj",
|
||||
@ -893,7 +888,7 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
}
|
||||
err = query.parseResponse(res, data, "")
|
||||
require.NoError(t, err)
|
||||
frames, err := res.Dataframes.Decoded()
|
||||
frames := res.Frames
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "select_slo_compliance(\"projects/test-proj/services/test-service/serviceLevelObjectives/test-slo\")", frames[0].Fields[1].Name)
|
||||
})
|
||||
@ -904,12 +899,11 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
data, err := loadTestFile("./test-data/1-series-response-agg-one-metric.json")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1, len(data.TimeSeries))
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
res := &plugins.DataQueryResult{Meta: simplejson.New(), RefID: "A"}
|
||||
res := &backend.DataResponse{}
|
||||
query := &cloudMonitoringTimeSeriesFilter{Params: url.Values{}}
|
||||
err = query.parseResponse(res, data, "")
|
||||
require.NoError(t, err)
|
||||
frames, err := res.Dataframes.Decoded()
|
||||
frames := res.Frames
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "Bps", frames[0].Fields[1].Config.Unit)
|
||||
})
|
||||
@ -918,12 +912,11 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
data, err := loadTestFile("./test-data/2-series-response-no-agg.json")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 3, len(data.TimeSeries))
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
res := &plugins.DataQueryResult{Meta: simplejson.New(), RefID: "A"}
|
||||
res := &backend.DataResponse{}
|
||||
query := &cloudMonitoringTimeSeriesFilter{Params: url.Values{}}
|
||||
err = query.parseResponse(res, data, "")
|
||||
require.NoError(t, err)
|
||||
frames, err := res.Dataframes.Decoded()
|
||||
frames := res.Frames
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "", frames[0].Fields[1].Config.Unit)
|
||||
})
|
||||
@ -937,21 +930,20 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
|
||||
t.Run("and alias by is expanded", func(t *testing.T) {
|
||||
fromStart := time.Date(2018, 3, 15, 13, 0, 0, 0, time.UTC).In(time.Local)
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
res := &plugins.DataQueryResult{Meta: simplejson.New(), RefID: "A"}
|
||||
|
||||
res := &backend.DataResponse{}
|
||||
query := &cloudMonitoringTimeSeriesQuery{
|
||||
ProjectName: "test-proj",
|
||||
Query: "test-query",
|
||||
AliasBy: "{{project}} - {{resource.label.zone}} - {{resource.label.instance_id}} - {{metric.label.response_code_class}}",
|
||||
timeRange: plugins.DataTimeRange{
|
||||
From: fmt.Sprintf("%v", fromStart.Unix()*1000),
|
||||
To: fmt.Sprintf("%v", fromStart.Add(34*time.Minute).Unix()*1000),
|
||||
timeRange: backend.TimeRange{
|
||||
From: fromStart,
|
||||
To: fromStart.Add(34 * time.Minute),
|
||||
},
|
||||
}
|
||||
err = query.parseResponse(res, data, "")
|
||||
require.NoError(t, err)
|
||||
frames, err := res.Dataframes.Decoded()
|
||||
require.NoError(t, err)
|
||||
frames := res.Frames
|
||||
assert.Equal(t, "test-proj - asia-northeast1-c - 6724404429462225363 - 200", frames[0].Fields[1].Name)
|
||||
})
|
||||
})
|
||||
@ -1045,17 +1037,17 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("and query preprocessor is not defined", func(t *testing.T) {
|
||||
tsdbQuery := getBaseQuery()
|
||||
tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{
|
||||
req := baseReq()
|
||||
req.Queries[0].JSON = json.RawMessage(`{
|
||||
"metricType": "a/metric/type",
|
||||
"crossSeriesReducer": "REDUCE_MIN",
|
||||
"perSeriesAligner": "REDUCE_SUM",
|
||||
"alignmentPeriod": "+60s",
|
||||
"groupBys": []string{"labelname"},
|
||||
"view": "FULL",
|
||||
})
|
||||
"groupBys": ["labelname"],
|
||||
"view": "FULL"
|
||||
}`)
|
||||
|
||||
qes, err := executor.buildQueryExecutors(tsdbQuery)
|
||||
qes, err := service.buildQueryExecutors(req)
|
||||
require.NoError(t, err)
|
||||
queries := getCloudMonitoringQueriesFromInterface(t, qes)
|
||||
|
||||
@ -1072,18 +1064,18 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("and query preprocessor is set to none", func(t *testing.T) {
|
||||
tsdbQuery := getBaseQuery()
|
||||
tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{
|
||||
req := baseReq()
|
||||
req.Queries[0].JSON = json.RawMessage(`{
|
||||
"metricType": "a/metric/type",
|
||||
"crossSeriesReducer": "REDUCE_MIN",
|
||||
"perSeriesAligner": "REDUCE_SUM",
|
||||
"alignmentPeriod": "+60s",
|
||||
"groupBys": []string{"labelname"},
|
||||
"groupBys": ["labelname"],
|
||||
"view": "FULL",
|
||||
"preprocessor": "none",
|
||||
})
|
||||
"preprocessor": "none"
|
||||
}`)
|
||||
|
||||
qes, err := executor.buildQueryExecutors(tsdbQuery)
|
||||
qes, err := service.buildQueryExecutors(req)
|
||||
require.NoError(t, err)
|
||||
queries := getCloudMonitoringQueriesFromInterface(t, qes)
|
||||
|
||||
@ -1100,18 +1092,18 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("and query preprocessor is set to rate and there's no group bys", func(t *testing.T) {
|
||||
tsdbQuery := getBaseQuery()
|
||||
tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{
|
||||
req := baseReq()
|
||||
req.Queries[0].JSON = json.RawMessage(`{
|
||||
"metricType": "a/metric/type",
|
||||
"crossSeriesReducer": "REDUCE_SUM",
|
||||
"perSeriesAligner": "REDUCE_MIN",
|
||||
"alignmentPeriod": "+60s",
|
||||
"groupBys": []string{},
|
||||
"groupBys": [],
|
||||
"view": "FULL",
|
||||
"preprocessor": "rate",
|
||||
})
|
||||
"preprocessor": "rate"
|
||||
}`)
|
||||
|
||||
qes, err := executor.buildQueryExecutors(tsdbQuery)
|
||||
qes, err := service.buildQueryExecutors(req)
|
||||
require.NoError(t, err)
|
||||
queries := getCloudMonitoringQueriesFromInterface(t, qes)
|
||||
|
||||
@ -1126,18 +1118,18 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("and query preprocessor is set to rate and group bys exist", func(t *testing.T) {
|
||||
tsdbQuery := getBaseQuery()
|
||||
tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{
|
||||
req := baseReq()
|
||||
req.Queries[0].JSON = json.RawMessage(`{
|
||||
"metricType": "a/metric/type",
|
||||
"crossSeriesReducer": "REDUCE_SUM",
|
||||
"perSeriesAligner": "REDUCE_MIN",
|
||||
"alignmentPeriod": "+60s",
|
||||
"groupBys": []string{"labelname"},
|
||||
"groupBys": ["labelname"],
|
||||
"view": "FULL",
|
||||
"preprocessor": "rate",
|
||||
})
|
||||
"preprocessor": "rate"
|
||||
}`)
|
||||
|
||||
qes, err := executor.buildQueryExecutors(tsdbQuery)
|
||||
qes, err := service.buildQueryExecutors(req)
|
||||
require.NoError(t, err)
|
||||
queries := getCloudMonitoringQueriesFromInterface(t, qes)
|
||||
|
||||
@ -1154,18 +1146,18 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("and query preprocessor is set to delta and there's no group bys", func(t *testing.T) {
|
||||
tsdbQuery := getBaseQuery()
|
||||
tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{
|
||||
req := baseReq()
|
||||
req.Queries[0].JSON = json.RawMessage(`{
|
||||
"metricType": "a/metric/type",
|
||||
"crossSeriesReducer": "REDUCE_MIN",
|
||||
"perSeriesAligner": "REDUCE_SUM",
|
||||
"alignmentPeriod": "+60s",
|
||||
"groupBys": []string{},
|
||||
"groupBys": [],
|
||||
"view": "FULL",
|
||||
"preprocessor": "delta",
|
||||
})
|
||||
"preprocessor": "delta"
|
||||
}`)
|
||||
|
||||
qes, err := executor.buildQueryExecutors(tsdbQuery)
|
||||
qes, err := service.buildQueryExecutors(req)
|
||||
require.NoError(t, err)
|
||||
queries := getCloudMonitoringQueriesFromInterface(t, qes)
|
||||
|
||||
@ -1180,18 +1172,18 @@ func TestCloudMonitoring(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("and query preprocessor is set to delta and group bys exist", func(t *testing.T) {
|
||||
tsdbQuery := getBaseQuery()
|
||||
tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{
|
||||
req := baseReq()
|
||||
req.Queries[0].JSON = json.RawMessage(`{
|
||||
"metricType": "a/metric/type",
|
||||
"crossSeriesReducer": "REDUCE_MIN",
|
||||
"perSeriesAligner": "REDUCE_SUM",
|
||||
"alignmentPeriod": "+60s",
|
||||
"groupBys": []string{"labelname"},
|
||||
"groupBys": ["labelname"],
|
||||
"view": "FULL",
|
||||
"preprocessor": "delta",
|
||||
})
|
||||
"preprocessor": "delta"
|
||||
}`)
|
||||
|
||||
qes, err := executor.buildQueryExecutors(tsdbQuery)
|
||||
qes, err := service.buildQueryExecutors(req)
|
||||
require.NoError(t, err)
|
||||
queries := getCloudMonitoringQueriesFromInterface(t, qes)
|
||||
|
||||
@ -1294,22 +1286,22 @@ func verifyDeepLink(t *testing.T, dl string, expectedTimeSelection map[string]st
|
||||
}
|
||||
}
|
||||
|
||||
func getBaseQuery() plugins.DataQuery {
|
||||
func baseReq() *backend.QueryDataRequest {
|
||||
fromStart := time.Date(2018, 3, 15, 13, 0, 0, 0, time.UTC).In(time.Local)
|
||||
query := plugins.DataQuery{
|
||||
TimeRange: &plugins.DataTimeRange{
|
||||
From: fmt.Sprintf("%v", fromStart.Unix()*1000),
|
||||
To: fmt.Sprintf("%v", fromStart.Add(34*time.Minute).Unix()*1000),
|
||||
},
|
||||
Queries: []plugins.DataSubQuery{
|
||||
query := &backend.QueryDataRequest{
|
||||
Queries: []backend.DataQuery{
|
||||
{
|
||||
Model: simplejson.NewFromAny(map[string]interface{}{
|
||||
RefID: "A",
|
||||
TimeRange: backend.TimeRange{
|
||||
From: fromStart,
|
||||
To: fromStart.Add(34 * time.Minute),
|
||||
},
|
||||
JSON: json.RawMessage(`{
|
||||
"metricType": "a/metric/type",
|
||||
"view": "FULL",
|
||||
"aliasBy": "testalias",
|
||||
"type": "timeSeriesQuery",
|
||||
}),
|
||||
RefID: "A",
|
||||
"type": "timeSeriesQuery"
|
||||
}`),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -10,81 +10,93 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"golang.org/x/net/context/ctxhttp"
|
||||
)
|
||||
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
func (timeSeriesFilter *cloudMonitoringTimeSeriesFilter) run(ctx context.Context, tsdbQuery plugins.DataQuery,
|
||||
e *Executor) (plugins.DataQueryResult, cloudMonitoringResponse, string, error) {
|
||||
queryResult := plugins.DataQueryResult{Meta: simplejson.New(), RefID: timeSeriesFilter.RefID}
|
||||
func (timeSeriesFilter *cloudMonitoringTimeSeriesFilter) run(ctx context.Context, req *backend.QueryDataRequest,
|
||||
s *Service, dsInfo datasourceInfo) (*backend.DataResponse, cloudMonitoringResponse, string, error) {
|
||||
dr := &backend.DataResponse{}
|
||||
projectName := timeSeriesFilter.ProjectName
|
||||
if projectName == "" {
|
||||
defaultProject, err := e.getDefaultProject(ctx)
|
||||
var err error
|
||||
projectName, err = s.getDefaultProject(ctx, dsInfo)
|
||||
if err != nil {
|
||||
queryResult.Error = err
|
||||
return queryResult, cloudMonitoringResponse{}, "", nil
|
||||
dr.Error = err
|
||||
return dr, cloudMonitoringResponse{}, "", nil
|
||||
}
|
||||
projectName = defaultProject
|
||||
slog.Info("No project name set on query, using project name from datasource", "projectName", projectName)
|
||||
}
|
||||
|
||||
req, err := e.createRequest(ctx, e.dsInfo, path.Join("cloudmonitoringv3/projects", projectName, "timeSeries"), nil)
|
||||
r, err := s.createRequest(ctx, req.PluginContext, &dsInfo, path.Join("cloudmonitoringv3/projects", projectName, "timeSeries"), nil)
|
||||
if err != nil {
|
||||
queryResult.Error = err
|
||||
return queryResult, cloudMonitoringResponse{}, "", nil
|
||||
dr.Error = err
|
||||
return dr, cloudMonitoringResponse{}, "", nil
|
||||
}
|
||||
|
||||
req.URL.RawQuery = timeSeriesFilter.Params.Encode()
|
||||
alignmentPeriod, ok := req.URL.Query()["aggregation.alignmentPeriod"]
|
||||
r.URL.RawQuery = timeSeriesFilter.Params.Encode()
|
||||
alignmentPeriod, ok := r.URL.Query()["aggregation.alignmentPeriod"]
|
||||
|
||||
if ok {
|
||||
seconds, err := strconv.ParseInt(alignmentPeriodRe.FindString(alignmentPeriod[0]), 10, 64)
|
||||
if err == nil {
|
||||
queryResult.Meta.Set("alignmentPeriod", seconds)
|
||||
if len(dr.Frames) == 0 {
|
||||
dr.Frames = append(dr.Frames, data.NewFrame(""))
|
||||
}
|
||||
firstFrame := dr.Frames[0]
|
||||
if firstFrame.Meta == nil {
|
||||
firstFrame.SetMeta(&data.FrameMeta{
|
||||
Custom: map[string]interface{}{
|
||||
"alignmentPeriod": seconds,
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "cloudMonitoring query")
|
||||
span.SetTag("target", timeSeriesFilter.Target)
|
||||
span.SetTag("from", tsdbQuery.TimeRange.From)
|
||||
span.SetTag("until", tsdbQuery.TimeRange.To)
|
||||
span.SetTag("datasource_id", e.dsInfo.Id)
|
||||
span.SetTag("org_id", e.dsInfo.OrgId)
|
||||
span.SetTag("from", req.Queries[0].TimeRange.From)
|
||||
span.SetTag("until", req.Queries[0].TimeRange.To)
|
||||
span.SetTag("datasource_id", dsInfo.id)
|
||||
span.SetTag("org_id", req.PluginContext.OrgID)
|
||||
|
||||
defer span.Finish()
|
||||
|
||||
if err := opentracing.GlobalTracer().Inject(
|
||||
span.Context(),
|
||||
opentracing.HTTPHeaders,
|
||||
opentracing.HTTPHeadersCarrier(req.Header)); err != nil {
|
||||
queryResult.Error = err
|
||||
return queryResult, cloudMonitoringResponse{}, "", nil
|
||||
opentracing.HTTPHeadersCarrier(r.Header)); err != nil {
|
||||
dr.Error = err
|
||||
return dr, cloudMonitoringResponse{}, "", nil
|
||||
}
|
||||
|
||||
res, err := ctxhttp.Do(ctx, e.httpClient, req)
|
||||
r = r.WithContext(ctx)
|
||||
res, err := dsInfo.client.Do(r)
|
||||
if err != nil {
|
||||
queryResult.Error = err
|
||||
return queryResult, cloudMonitoringResponse{}, "", nil
|
||||
dr.Error = err
|
||||
return dr, cloudMonitoringResponse{}, "", nil
|
||||
}
|
||||
|
||||
data, err := unmarshalResponse(res)
|
||||
d, err := unmarshalResponse(res)
|
||||
if err != nil {
|
||||
queryResult.Error = err
|
||||
return queryResult, cloudMonitoringResponse{}, "", nil
|
||||
dr.Error = err
|
||||
return dr, cloudMonitoringResponse{}, "", nil
|
||||
}
|
||||
|
||||
return queryResult, data, req.URL.RawQuery, nil
|
||||
return dr, d, r.URL.RawQuery, nil
|
||||
}
|
||||
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
func (timeSeriesFilter *cloudMonitoringTimeSeriesFilter) parseResponse(queryRes *plugins.DataQueryResult,
|
||||
//nolint: gocyclo
|
||||
func (timeSeriesFilter *cloudMonitoringTimeSeriesFilter) parseResponse(queryRes *backend.DataResponse,
|
||||
response cloudMonitoringResponse, executedQueryString string) error {
|
||||
labels := make(map[string]map[string]bool)
|
||||
frames := data.Frames{}
|
||||
|
||||
customFrameMeta := map[string]interface{}{}
|
||||
customFrameMeta["alignmentPeriod"] = timeSeriesFilter.Params.Get("aggregation.alignmentPeriod")
|
||||
customFrameMeta["perSeriesAligner"] = timeSeriesFilter.Params.Get("aggregation.perSeriesAligner")
|
||||
for _, series := range response.TimeSeries {
|
||||
seriesLabels := data.Labels{}
|
||||
defaultMetricName := series.Metric.Type
|
||||
@ -95,10 +107,6 @@ func (timeSeriesFilter *cloudMonitoringTimeSeriesFilter) parseResponse(queryRes
|
||||
frame.RefID = timeSeriesFilter.RefID
|
||||
frame.Meta = &data.FrameMeta{
|
||||
ExecutedQueryString: executedQueryString,
|
||||
Custom: map[string]interface{}{
|
||||
"alignmentPeriod": timeSeriesFilter.Params.Get("aggregation.alignmentPeriod"),
|
||||
"perSeriesAligner": timeSeriesFilter.Params.Get("aggregation.perSeriesAligner"),
|
||||
},
|
||||
}
|
||||
|
||||
for key, value := range series.Metric.Labels {
|
||||
@ -155,8 +163,7 @@ func (timeSeriesFilter *cloudMonitoringTimeSeriesFilter) parseResponse(queryRes
|
||||
|
||||
// reverse the order to be ascending
|
||||
if series.ValueType != "DISTRIBUTION" {
|
||||
timeSeriesFilter.handleNonDistributionSeries(
|
||||
series, defaultMetricName, seriesLabels, queryRes, frame)
|
||||
timeSeriesFilter.handleNonDistributionSeries(series, defaultMetricName, seriesLabels, frame)
|
||||
frames = append(frames, frame)
|
||||
continue
|
||||
}
|
||||
@ -214,12 +221,12 @@ func (timeSeriesFilter *cloudMonitoringTimeSeriesFilter) parseResponse(queryRes
|
||||
setDisplayNameAsFieldName(valueField)
|
||||
|
||||
buckets[i] = &data.Frame{
|
||||
Name: frameName,
|
||||
Name: frameName,
|
||||
RefID: timeSeriesFilter.RefID,
|
||||
Fields: []*data.Field{
|
||||
timeField,
|
||||
valueField,
|
||||
},
|
||||
RefID: timeSeriesFilter.RefID,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -233,24 +240,30 @@ func (timeSeriesFilter *cloudMonitoringTimeSeriesFilter) parseResponse(queryRes
|
||||
frames = addConfigData(frames, dl, response.Unit)
|
||||
}
|
||||
|
||||
queryRes.Dataframes = plugins.NewDecodedDataFrames(frames)
|
||||
|
||||
labelsByKey := make(map[string][]string)
|
||||
for key, values := range labels {
|
||||
for value := range values {
|
||||
labelsByKey[key] = append(labelsByKey[key], value)
|
||||
}
|
||||
}
|
||||
customFrameMeta["labels"] = labelsByKey
|
||||
customFrameMeta["groupBys"] = timeSeriesFilter.GroupBys
|
||||
|
||||
for _, frame := range frames {
|
||||
if frame.Meta != nil {
|
||||
frame.Meta.Custom = customFrameMeta
|
||||
} else {
|
||||
frame.SetMeta(&data.FrameMeta{Custom: customFrameMeta})
|
||||
}
|
||||
}
|
||||
|
||||
queryRes.Frames = frames
|
||||
|
||||
queryRes.Meta.Set("labels", labelsByKey)
|
||||
queryRes.Meta.Set("groupBys", timeSeriesFilter.GroupBys)
|
||||
return nil
|
||||
}
|
||||
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
func (timeSeriesFilter *cloudMonitoringTimeSeriesFilter) handleNonDistributionSeries(series timeSeries,
|
||||
defaultMetricName string, seriesLabels map[string]string, queryRes *plugins.DataQueryResult,
|
||||
frame *data.Frame) {
|
||||
defaultMetricName string, seriesLabels map[string]string, frame *data.Frame) {
|
||||
for i := 0; i < len(series.Points); i++ {
|
||||
point := series.Points[i]
|
||||
value := point.Value.DoubleValue
|
||||
@ -279,9 +292,8 @@ func (timeSeriesFilter *cloudMonitoringTimeSeriesFilter) handleNonDistributionSe
|
||||
setDisplayNameAsFieldName(dataField)
|
||||
}
|
||||
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
func (timeSeriesFilter *cloudMonitoringTimeSeriesFilter) parseToAnnotations(queryRes *plugins.DataQueryResult,
|
||||
response cloudMonitoringResponse, title string, text string, tags string) error {
|
||||
func (timeSeriesFilter *cloudMonitoringTimeSeriesFilter) parseToAnnotations(dr *backend.DataResponse,
|
||||
response cloudMonitoringResponse, title, text, tags string) error {
|
||||
frames := data.Frames{}
|
||||
for _, series := range response.TimeSeries {
|
||||
if len(series.Points) == 0 {
|
||||
@ -301,14 +313,14 @@ func (timeSeriesFilter *cloudMonitoringTimeSeriesFilter) parseToAnnotations(quer
|
||||
annotation["text"] = append(annotation["text"], formatAnnotationText(text, value, series.Metric.Type,
|
||||
series.Metric.Labels, series.Resource.Labels))
|
||||
}
|
||||
frames = append(frames, data.NewFrame(queryRes.RefID,
|
||||
frames = append(frames, data.NewFrame(timeSeriesFilter.getRefID(),
|
||||
data.NewField("time", nil, annotation["time"]),
|
||||
data.NewField("title", nil, annotation["title"]),
|
||||
data.NewField("tags", nil, annotation["tags"]),
|
||||
data.NewField("text", nil, annotation["text"]),
|
||||
))
|
||||
}
|
||||
queryRes.Dataframes = plugins.NewDecodedDataFrames(frames)
|
||||
dr.Frames = frames
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -11,42 +11,34 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana/pkg/tsdb/interval"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"golang.org/x/net/context/ctxhttp"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
|
||||
"github.com/grafana/grafana/pkg/tsdb/intervalv2"
|
||||
)
|
||||
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
func (timeSeriesQuery cloudMonitoringTimeSeriesQuery) run(ctx context.Context, tsdbQuery plugins.DataQuery,
|
||||
e *Executor) (plugins.DataQueryResult, cloudMonitoringResponse, string, error) {
|
||||
queryResult := plugins.DataQueryResult{Meta: simplejson.New(), RefID: timeSeriesQuery.RefID}
|
||||
func (timeSeriesQuery cloudMonitoringTimeSeriesQuery) run(ctx context.Context, req *backend.QueryDataRequest,
|
||||
s *Service, dsInfo datasourceInfo) (*backend.DataResponse, cloudMonitoringResponse, string, error) {
|
||||
dr := &backend.DataResponse{}
|
||||
projectName := timeSeriesQuery.ProjectName
|
||||
|
||||
if projectName == "" {
|
||||
defaultProject, err := e.getDefaultProject(ctx)
|
||||
var err error
|
||||
projectName, err = s.getDefaultProject(ctx, dsInfo)
|
||||
if err != nil {
|
||||
queryResult.Error = err
|
||||
return queryResult, cloudMonitoringResponse{}, "", nil
|
||||
dr.Error = err
|
||||
return dr, cloudMonitoringResponse{}, "", nil
|
||||
}
|
||||
projectName = defaultProject
|
||||
slog.Info("No project name set on query, using project name from datasource", "projectName", projectName)
|
||||
}
|
||||
|
||||
from, err := tsdbQuery.TimeRange.ParseFrom()
|
||||
if err != nil {
|
||||
queryResult.Error = err
|
||||
return queryResult, cloudMonitoringResponse{}, "", nil
|
||||
}
|
||||
to, err := tsdbQuery.TimeRange.ParseTo()
|
||||
if err != nil {
|
||||
queryResult.Error = err
|
||||
return queryResult, cloudMonitoringResponse{}, "", nil
|
||||
}
|
||||
intervalCalculator := interval.NewCalculator(interval.CalculatorOptions{})
|
||||
interval := intervalCalculator.Calculate(*tsdbQuery.TimeRange, time.Duration(timeSeriesQuery.IntervalMS/1000)*time.Second)
|
||||
intervalCalculator := intervalv2.NewCalculator(intervalv2.CalculatorOptions{})
|
||||
interval := intervalCalculator.Calculate(req.Queries[0].TimeRange, time.Duration(timeSeriesQuery.IntervalMS/1000)*time.Second, req.Queries[0].MaxDataPoints)
|
||||
|
||||
from := req.Queries[0].TimeRange.From
|
||||
to := req.Queries[0].TimeRange.To
|
||||
timeFormat := "2006/01/02-15:04:05"
|
||||
timeSeriesQuery.Query += fmt.Sprintf(" | graph_period %s | within d'%s', d'%s'", interval.Text, from.UTC().Format(timeFormat), to.UTC().Format(timeFormat))
|
||||
|
||||
@ -54,53 +46,54 @@ func (timeSeriesQuery cloudMonitoringTimeSeriesQuery) run(ctx context.Context, t
|
||||
"query": timeSeriesQuery.Query,
|
||||
})
|
||||
if err != nil {
|
||||
queryResult.Error = err
|
||||
return queryResult, cloudMonitoringResponse{}, "", nil
|
||||
dr.Error = err
|
||||
return dr, cloudMonitoringResponse{}, "", nil
|
||||
}
|
||||
req, err := e.createRequest(ctx, e.dsInfo, path.Join("cloudmonitoringv3/projects", projectName, "timeSeries:query"), bytes.NewBuffer(buf))
|
||||
r, err := s.createRequest(ctx, req.PluginContext, &dsInfo, path.Join("cloudmonitoringv3/projects", projectName, "timeSeries:query"), bytes.NewBuffer(buf))
|
||||
if err != nil {
|
||||
queryResult.Error = err
|
||||
return queryResult, cloudMonitoringResponse{}, "", nil
|
||||
dr.Error = err
|
||||
return dr, cloudMonitoringResponse{}, "", nil
|
||||
}
|
||||
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "cloudMonitoring MQL query")
|
||||
span.SetTag("query", timeSeriesQuery.Query)
|
||||
span.SetTag("from", tsdbQuery.TimeRange.From)
|
||||
span.SetTag("until", tsdbQuery.TimeRange.To)
|
||||
span.SetTag("datasource_id", e.dsInfo.Id)
|
||||
span.SetTag("org_id", e.dsInfo.OrgId)
|
||||
span.SetTag("from", req.Queries[0].TimeRange.From)
|
||||
span.SetTag("until", req.Queries[0].TimeRange.To)
|
||||
span.SetTag("datasource_id", dsInfo.id)
|
||||
span.SetTag("org_id", req.PluginContext.OrgID)
|
||||
|
||||
defer span.Finish()
|
||||
|
||||
if err := opentracing.GlobalTracer().Inject(
|
||||
span.Context(),
|
||||
opentracing.HTTPHeaders,
|
||||
opentracing.HTTPHeadersCarrier(req.Header)); err != nil {
|
||||
queryResult.Error = err
|
||||
return queryResult, cloudMonitoringResponse{}, "", nil
|
||||
opentracing.HTTPHeadersCarrier(r.Header)); err != nil {
|
||||
dr.Error = err
|
||||
return dr, cloudMonitoringResponse{}, "", nil
|
||||
}
|
||||
|
||||
res, err := ctxhttp.Do(ctx, e.httpClient, req)
|
||||
r = r.WithContext(ctx)
|
||||
res, err := dsInfo.client.Do(r)
|
||||
if err != nil {
|
||||
queryResult.Error = err
|
||||
return queryResult, cloudMonitoringResponse{}, "", nil
|
||||
dr.Error = err
|
||||
return dr, cloudMonitoringResponse{}, "", nil
|
||||
}
|
||||
|
||||
data, err := unmarshalResponse(res)
|
||||
|
||||
d, err := unmarshalResponse(res)
|
||||
if err != nil {
|
||||
queryResult.Error = err
|
||||
return queryResult, cloudMonitoringResponse{}, "", nil
|
||||
dr.Error = err
|
||||
return dr, cloudMonitoringResponse{}, "", nil
|
||||
}
|
||||
|
||||
return queryResult, data, timeSeriesQuery.Query, nil
|
||||
return dr, d, timeSeriesQuery.Query, nil
|
||||
}
|
||||
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
func (timeSeriesQuery cloudMonitoringTimeSeriesQuery) parseResponse(queryRes *plugins.DataQueryResult,
|
||||
func (timeSeriesQuery cloudMonitoringTimeSeriesQuery) parseResponse(queryRes *backend.DataResponse,
|
||||
response cloudMonitoringResponse, executedQueryString string) error {
|
||||
labels := make(map[string]map[string]bool)
|
||||
frames := data.Frames{}
|
||||
|
||||
customFrameMeta := map[string]interface{}{}
|
||||
for _, series := range response.TimeSeriesData {
|
||||
seriesLabels := make(map[string]string)
|
||||
frame := data.NewFrameOfFieldTypes("", len(series.PointData), data.FieldTypeTime, data.FieldTypeFloat64)
|
||||
@ -252,23 +245,29 @@ func (timeSeriesQuery cloudMonitoringTimeSeriesQuery) parseResponse(queryRes *pl
|
||||
frames = addConfigData(frames, dl, response.Unit)
|
||||
}
|
||||
|
||||
queryRes.Dataframes = plugins.NewDecodedDataFrames(frames)
|
||||
|
||||
labelsByKey := make(map[string][]string)
|
||||
for key, values := range labels {
|
||||
for value := range values {
|
||||
labelsByKey[key] = append(labelsByKey[key], value)
|
||||
}
|
||||
}
|
||||
customFrameMeta["labels"] = labelsByKey
|
||||
|
||||
queryRes.Meta.Set("labels", labelsByKey)
|
||||
for _, frame := range frames {
|
||||
if frame.Meta != nil {
|
||||
frame.Meta.Custom = customFrameMeta
|
||||
} else {
|
||||
frame.SetMeta(&data.FrameMeta{Custom: customFrameMeta})
|
||||
}
|
||||
}
|
||||
|
||||
queryRes.Frames = frames
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
func (timeSeriesQuery cloudMonitoringTimeSeriesQuery) parseToAnnotations(queryRes *plugins.DataQueryResult,
|
||||
data cloudMonitoringResponse, title string, text string, tags string) error {
|
||||
func (timeSeriesQuery cloudMonitoringTimeSeriesQuery) parseToAnnotations(queryRes *backend.DataResponse,
|
||||
data cloudMonitoringResponse, title, text, tags string) error {
|
||||
annotations := make([]map[string]string, 0)
|
||||
|
||||
for _, series := range data.TimeSeriesData {
|
||||
@ -315,7 +314,7 @@ func (timeSeriesQuery cloudMonitoringTimeSeriesQuery) parseToAnnotations(queryRe
|
||||
}
|
||||
}
|
||||
|
||||
transformAnnotationToTable(annotations, queryRes)
|
||||
timeSeriesQuery.transformAnnotationToFrame(annotations, queryRes)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -348,8 +347,8 @@ func (timeSeriesQuery cloudMonitoringTimeSeriesQuery) buildDeepLink() string {
|
||||
},
|
||||
"timeSelection": map[string]string{
|
||||
"timeRange": "custom",
|
||||
"start": timeSeriesQuery.timeRange.MustGetFrom().Format(time.RFC3339Nano),
|
||||
"end": timeSeriesQuery.timeRange.MustGetTo().Format(time.RFC3339Nano),
|
||||
"start": timeSeriesQuery.timeRange.From.Format(time.RFC3339Nano),
|
||||
"end": timeSeriesQuery.timeRange.To.Format(time.RFC3339Nano),
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -5,18 +5,15 @@ import (
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
)
|
||||
|
||||
type (
|
||||
cloudMonitoringQueryExecutor interface {
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
run(ctx context.Context, tsdbQuery plugins.DataQuery, e *Executor) (
|
||||
plugins.DataQueryResult, cloudMonitoringResponse, string, error)
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
parseResponse(queryRes *plugins.DataQueryResult, data cloudMonitoringResponse, executedQueryString string) error
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
parseToAnnotations(queryRes *plugins.DataQueryResult, data cloudMonitoringResponse, title string, text string, tags string) error
|
||||
run(ctx context.Context, req *backend.QueryDataRequest, s *Service, dsInfo datasourceInfo) (
|
||||
*backend.DataResponse, cloudMonitoringResponse, string, error)
|
||||
parseResponse(dr *backend.DataResponse, data cloudMonitoringResponse, executedQueryString string) error
|
||||
parseToAnnotations(dr *backend.DataResponse, data cloudMonitoringResponse, title, text, tags string) error
|
||||
buildDeepLink() string
|
||||
getRefID() string
|
||||
}
|
||||
@ -41,7 +38,7 @@ type (
|
||||
Query string
|
||||
IntervalMS int64
|
||||
AliasBy string
|
||||
timeRange plugins.DataTimeRange
|
||||
timeRange backend.TimeRange
|
||||
}
|
||||
|
||||
metricQuery struct {
|
||||
|
@ -2,7 +2,6 @@ package tsdb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/grafana/grafana/pkg/models"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
@ -10,72 +9,35 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/datasources"
|
||||
"github.com/grafana/grafana/pkg/services/oauthtoken"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/tsdb/cloudmonitoring"
|
||||
_ "github.com/grafana/grafana/pkg/tsdb/postgres"
|
||||
)
|
||||
|
||||
// NewService returns a new Service.
|
||||
func NewService(
|
||||
cfg *setting.Cfg,
|
||||
pluginManager plugins.Manager,
|
||||
backendPluginManager backendplugin.Manager,
|
||||
oauthTokenService *oauthtoken.Service,
|
||||
dataSourcesService *datasources.Service,
|
||||
cloudMonitoringService *cloudmonitoring.Service,
|
||||
) *Service {
|
||||
s := newService(cfg, pluginManager, backendPluginManager, oauthTokenService, dataSourcesService)
|
||||
|
||||
// register backend data sources using legacy plugin
|
||||
// contracts/non-SDK contracts
|
||||
s.registry["stackdriver"] = cloudMonitoringService.NewExecutor
|
||||
|
||||
return s
|
||||
cfg *setting.Cfg, backendPluginManager backendplugin.Manager,
|
||||
oauthTokenService *oauthtoken.Service, dataSourcesService *datasources.Service) *Service {
|
||||
return newService(cfg, backendPluginManager, oauthTokenService, dataSourcesService)
|
||||
}
|
||||
|
||||
func newService(cfg *setting.Cfg, manager plugins.Manager, backendPluginManager backendplugin.Manager,
|
||||
func newService(cfg *setting.Cfg, backendPluginManager backendplugin.Manager,
|
||||
oauthTokenService oauthtoken.OAuthTokenService, dataSourcesService *datasources.Service) *Service {
|
||||
return &Service{
|
||||
Cfg: cfg,
|
||||
PluginManager: manager,
|
||||
BackendPluginManager: backendPluginManager,
|
||||
// nolint:staticcheck // plugins.DataPlugin deprecated
|
||||
registry: map[string]func(*models.DataSource) (plugins.DataPlugin, error){},
|
||||
OAuthTokenService: oauthTokenService,
|
||||
DataSourcesService: dataSourcesService,
|
||||
OAuthTokenService: oauthTokenService,
|
||||
DataSourcesService: dataSourcesService,
|
||||
}
|
||||
}
|
||||
|
||||
// Service handles data requests to data sources.
|
||||
type Service struct {
|
||||
Cfg *setting.Cfg
|
||||
PluginManager plugins.Manager
|
||||
BackendPluginManager backendplugin.Manager
|
||||
OAuthTokenService oauthtoken.OAuthTokenService
|
||||
DataSourcesService *datasources.Service
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
registry map[string]func(*models.DataSource) (plugins.DataPlugin, error)
|
||||
}
|
||||
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
func (s *Service) HandleRequest(ctx context.Context, ds *models.DataSource, query plugins.DataQuery) (plugins.DataResponse, error) {
|
||||
if factory, exists := s.registry[ds.Type]; exists {
|
||||
var err error
|
||||
plugin, err := factory(ds)
|
||||
if err != nil {
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
return plugins.DataResponse{}, fmt.Errorf("could not instantiate endpoint for data plugin %q: %w",
|
||||
ds.Type, err)
|
||||
}
|
||||
|
||||
return plugin.DataQuery(ctx, ds, query)
|
||||
}
|
||||
return dataPluginQueryAdapter(ds.Type, s.BackendPluginManager, s.OAuthTokenService, s.DataSourcesService).
|
||||
DataQuery(ctx, ds, query)
|
||||
}
|
||||
|
||||
// RegisterQueryHandler registers a query handler factory.
|
||||
// This is only exposed for tests!
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
func (s *Service) RegisterQueryHandler(name string, factory func(*models.DataSource) (plugins.DataPlugin, error)) {
|
||||
s.registry[name] = factory
|
||||
return dataPluginQueryAdapter(ds.Type, s.BackendPluginManager, s.OAuthTokenService, s.DataSourcesService).DataQuery(ctx, ds, query)
|
||||
}
|
||||
|
@ -10,7 +10,6 @@ import (
|
||||
"github.com/grafana/grafana/pkg/models"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana/pkg/plugins/backendplugin"
|
||||
"github.com/grafana/grafana/pkg/plugins/manager"
|
||||
"github.com/grafana/grafana/pkg/services/datasources"
|
||||
"github.com/grafana/grafana/pkg/services/encryption/ossencryption"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
@ -19,59 +18,20 @@ import (
|
||||
)
|
||||
|
||||
func TestHandleRequest(t *testing.T) {
|
||||
t.Run("Should return query result when handling request for query", func(t *testing.T) {
|
||||
req := plugins.DataQuery{
|
||||
Queries: []plugins.DataSubQuery{
|
||||
{RefID: "A", DataSource: &models.DataSource{Id: 1, Type: "test"}},
|
||||
},
|
||||
}
|
||||
|
||||
svc, exe, _ := createService()
|
||||
exe.Return("A", plugins.DataTimeSeriesSlice{plugins.DataTimeSeries{Name: "argh"}})
|
||||
|
||||
res, err := svc.HandleRequest(context.TODO(), &models.DataSource{Id: 1, Type: "test"}, req)
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, res.Results["A"].Series)
|
||||
require.Equal(t, "argh", res.Results["A"].Series[0].Name)
|
||||
})
|
||||
|
||||
t.Run("Should return query results when handling request for two queries with same data source", func(t *testing.T) {
|
||||
req := plugins.DataQuery{
|
||||
Queries: []plugins.DataSubQuery{
|
||||
{RefID: "A", DataSource: &models.DataSource{Id: 1, Type: "test"}},
|
||||
{RefID: "B", DataSource: &models.DataSource{Id: 1, Type: "test"}},
|
||||
},
|
||||
}
|
||||
|
||||
svc, exe, _ := createService()
|
||||
exe.Return("A", plugins.DataTimeSeriesSlice{plugins.DataTimeSeries{Name: "argh"}})
|
||||
exe.Return("B", plugins.DataTimeSeriesSlice{plugins.DataTimeSeries{Name: "barg"}})
|
||||
|
||||
res, err := svc.HandleRequest(context.TODO(), &models.DataSource{Id: 1, Type: "test"}, req)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, res.Results, 2)
|
||||
require.Equal(t, "argh", res.Results["A"].Series[0].Name)
|
||||
require.Equal(t, "barg", res.Results["B"].Series[0].Name)
|
||||
})
|
||||
|
||||
t.Run("Should fallback to backend plugin manager when handling request for query with unregistered type", func(t *testing.T) {
|
||||
svc, _, manager := createService()
|
||||
t.Run("Should invoke plugin manager QueryData when handling request for query", func(t *testing.T) {
|
||||
svc, _, pm := createService()
|
||||
backendPluginManagerCalled := false
|
||||
manager.QueryDataHandlerFunc = backend.QueryDataHandlerFunc(func(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
||||
pm.QueryDataHandlerFunc = func(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
||||
backendPluginManagerCalled = true
|
||||
return &backend.QueryDataResponse{}, nil
|
||||
})
|
||||
return backend.NewQueryDataResponse(), nil
|
||||
}
|
||||
|
||||
ds := &models.DataSource{Id: 12, Type: "unregisteredType", JsonData: simplejson.New()}
|
||||
req := plugins.DataQuery{
|
||||
TimeRange: &plugins.DataTimeRange{},
|
||||
Queries: []plugins.DataSubQuery{
|
||||
{
|
||||
RefID: "A",
|
||||
DataSource: ds,
|
||||
Model: simplejson.New(),
|
||||
},
|
||||
{RefID: "A", DataSource: &models.DataSource{Id: 1, Type: "test"}, Model: simplejson.New()},
|
||||
{RefID: "B", DataSource: &models.DataSource{Id: 1, Type: "test"}, Model: simplejson.New()},
|
||||
},
|
||||
}
|
||||
_, err := svc.HandleRequest(context.Background(), ds, req)
|
||||
@ -142,21 +102,18 @@ func (s *fakeOAuthTokenService) IsOAuthPassThruEnabled(*models.DataSource) bool
|
||||
|
||||
func createService() (*Service, *fakeExecutor, *fakeBackendPM) {
|
||||
fakeBackendPM := &fakeBackendPM{}
|
||||
manager := &manager.PluginManager{
|
||||
BackendPluginManager: fakeBackendPM,
|
||||
}
|
||||
dsService := datasources.ProvideService(bus.New(), nil, ossencryption.ProvideService())
|
||||
|
||||
s := newService(setting.NewCfg(), manager, fakeBackendPM, &fakeOAuthTokenService{}, dsService)
|
||||
s := newService(
|
||||
setting.NewCfg(),
|
||||
fakeBackendPM,
|
||||
&fakeOAuthTokenService{},
|
||||
dsService,
|
||||
)
|
||||
e := &fakeExecutor{
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
results: make(map[string]plugins.DataQueryResult),
|
||||
resultsFn: make(map[string]resultsFn),
|
||||
}
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
s.registry["test"] = func(*models.DataSource) (plugins.DataPlugin, error) {
|
||||
return e, nil
|
||||
}
|
||||
|
||||
return s, e, fakeBackendPM
|
||||
}
|
||||
|
@ -70,7 +70,7 @@ export default class Api {
|
||||
|
||||
post(data: Record<string, any>): Observable<FetchResponse<PostResponse>> {
|
||||
return getBackendSrv().fetch<PostResponse>({
|
||||
url: '/api/tsdb/query',
|
||||
url: '/api/ds/query',
|
||||
method: 'POST',
|
||||
data,
|
||||
});
|
||||
|
@ -163,11 +163,10 @@ export default class CloudMonitoringDatasource extends DataSourceWithBackend<
|
||||
});
|
||||
}),
|
||||
map(({ data }) => {
|
||||
return data;
|
||||
}),
|
||||
map((response) => {
|
||||
const result = response.results[refId];
|
||||
return result && result.meta ? result.meta.labels : {};
|
||||
const dataQueryResponse = toDataQueryResponse({
|
||||
data: data,
|
||||
});
|
||||
return dataQueryResponse?.data[0]?.meta?.custom?.labels ?? {};
|
||||
})
|
||||
)
|
||||
);
|
||||
@ -219,9 +218,10 @@ export default class CloudMonitoringDatasource extends DataSourceWithBackend<
|
||||
})
|
||||
.pipe(
|
||||
map(({ data }) => {
|
||||
return data && data.results && data.results.getGCEDefaultProject && data.results.getGCEDefaultProject.meta
|
||||
? data.results.getGCEDefaultProject.meta.defaultProject
|
||||
: '';
|
||||
const dataQueryResponse = toDataQueryResponse({
|
||||
data: data,
|
||||
});
|
||||
return dataQueryResponse?.data[0]?.meta?.custom?.defaultProject ?? '';
|
||||
}),
|
||||
catchError((err) => {
|
||||
return throwError(err.data.error);
|
||||
|
Loading…
Reference in New Issue
Block a user