mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Chore: Query endpoint refactor (#41637)
Get rid of using legacydata.RequestHandler in HTTPServer, /api/tsdb/query and pkg/expr with the goal of deprecating /api/tsdb/query and remove it completely eventually. This is the first step of cleaning up the HTTP API query endpoint. Co-authored-by: Marcus Efraimsson <marcus.efraimsson@gmail.com> Co-authored-by: Alexander Emelin <frvzmb@gmail.com>
This commit is contained in:
parent
6e8cc426d6
commit
8927a3ca20
@ -55,7 +55,6 @@ import (
|
|||||||
"github.com/grafana/grafana/pkg/services/sqlstore"
|
"github.com/grafana/grafana/pkg/services/sqlstore"
|
||||||
"github.com/grafana/grafana/pkg/services/updatechecker"
|
"github.com/grafana/grafana/pkg/services/updatechecker"
|
||||||
"github.com/grafana/grafana/pkg/setting"
|
"github.com/grafana/grafana/pkg/setting"
|
||||||
"github.com/grafana/grafana/pkg/tsdb/legacydata"
|
|
||||||
"github.com/grafana/grafana/pkg/util/errutil"
|
"github.com/grafana/grafana/pkg/util/errutil"
|
||||||
"github.com/grafana/grafana/pkg/web"
|
"github.com/grafana/grafana/pkg/web"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
@ -98,7 +97,6 @@ type HTTPServer struct {
|
|||||||
LivePushGateway *pushhttp.Gateway
|
LivePushGateway *pushhttp.Gateway
|
||||||
ContextHandler *contexthandler.ContextHandler
|
ContextHandler *contexthandler.ContextHandler
|
||||||
SQLStore *sqlstore.SQLStore
|
SQLStore *sqlstore.SQLStore
|
||||||
legacyDataRequestHandler legacydata.RequestHandler
|
|
||||||
AlertEngine *alerting.AlertEngine
|
AlertEngine *alerting.AlertEngine
|
||||||
LoadSchemaService *schemaloader.SchemaLoaderService
|
LoadSchemaService *schemaloader.SchemaLoaderService
|
||||||
AlertNG *ngalert.AlertNG
|
AlertNG *ngalert.AlertNG
|
||||||
@ -125,8 +123,7 @@ type ServerOptions struct {
|
|||||||
|
|
||||||
func ProvideHTTPServer(opts ServerOptions, cfg *setting.Cfg, routeRegister routing.RouteRegister, bus bus.Bus,
|
func ProvideHTTPServer(opts ServerOptions, cfg *setting.Cfg, routeRegister routing.RouteRegister, bus bus.Bus,
|
||||||
renderService rendering.Service, licensing models.Licensing, hooksService *hooks.HooksService,
|
renderService rendering.Service, licensing models.Licensing, hooksService *hooks.HooksService,
|
||||||
cacheService *localcache.CacheService, sqlStore *sqlstore.SQLStore,
|
cacheService *localcache.CacheService, sqlStore *sqlstore.SQLStore, alertEngine *alerting.AlertEngine,
|
||||||
legacyDataRequestHandler legacydata.RequestHandler, alertEngine *alerting.AlertEngine,
|
|
||||||
pluginRequestValidator models.PluginRequestValidator, pluginStaticRouteResolver plugins.StaticRouteResolver,
|
pluginRequestValidator models.PluginRequestValidator, pluginStaticRouteResolver plugins.StaticRouteResolver,
|
||||||
pluginDashboardManager plugins.PluginDashboardManager, pluginStore plugins.Store, pluginClient plugins.Client,
|
pluginDashboardManager plugins.PluginDashboardManager, pluginStore plugins.Store, pluginClient plugins.Client,
|
||||||
pluginErrorResolver plugins.ErrorResolver, settingsProvider setting.Provider,
|
pluginErrorResolver plugins.ErrorResolver, settingsProvider setting.Provider,
|
||||||
@ -156,7 +153,6 @@ func ProvideHTTPServer(opts ServerOptions, cfg *setting.Cfg, routeRegister routi
|
|||||||
HooksService: hooksService,
|
HooksService: hooksService,
|
||||||
CacheService: cacheService,
|
CacheService: cacheService,
|
||||||
SQLStore: sqlStore,
|
SQLStore: sqlStore,
|
||||||
legacyDataRequestHandler: legacyDataRequestHandler,
|
|
||||||
AlertEngine: alertEngine,
|
AlertEngine: alertEngine,
|
||||||
PluginRequestValidator: pluginRequestValidator,
|
PluginRequestValidator: pluginRequestValidator,
|
||||||
pluginClient: pluginClient,
|
pluginClient: pluginClient,
|
||||||
|
@ -16,9 +16,35 @@ import (
|
|||||||
"github.com/grafana/grafana/pkg/plugins/adapters"
|
"github.com/grafana/grafana/pkg/plugins/adapters"
|
||||||
"github.com/grafana/grafana/pkg/tsdb/grafanads"
|
"github.com/grafana/grafana/pkg/tsdb/grafanads"
|
||||||
"github.com/grafana/grafana/pkg/tsdb/legacydata"
|
"github.com/grafana/grafana/pkg/tsdb/legacydata"
|
||||||
|
"github.com/grafana/grafana/pkg/util"
|
||||||
"github.com/grafana/grafana/pkg/web"
|
"github.com/grafana/grafana/pkg/web"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// ErrBadQuery returned whenever request is malformed and must contain a message
|
||||||
|
// suitable to return in API response.
|
||||||
|
type ErrBadQuery struct {
|
||||||
|
Message string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewErrBadQuery(msg string) *ErrBadQuery {
|
||||||
|
return &ErrBadQuery{Message: msg}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e ErrBadQuery) Error() string {
|
||||||
|
return fmt.Sprintf("bad query: %s", e.Message)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (hs *HTTPServer) handleQueryMetricsError(err error) *response.NormalResponse {
|
||||||
|
if errors.Is(err, models.ErrDataSourceAccessDenied) {
|
||||||
|
return response.Error(http.StatusForbidden, "Access denied to data source", err)
|
||||||
|
}
|
||||||
|
var badQuery *ErrBadQuery
|
||||||
|
if errors.As(err, &badQuery) {
|
||||||
|
return response.Error(http.StatusBadRequest, util.Capitalize(badQuery.Message), err)
|
||||||
|
}
|
||||||
|
return response.Error(http.StatusInternalServerError, "Query data error", err)
|
||||||
|
}
|
||||||
|
|
||||||
// QueryMetricsV2 returns query metrics.
|
// QueryMetricsV2 returns query metrics.
|
||||||
// POST /api/ds/query DataSource query w/ expressions
|
// POST /api/ds/query DataSource query w/ expressions
|
||||||
func (hs *HTTPServer) QueryMetricsV2(c *models.ReqContext) response.Response {
|
func (hs *HTTPServer) QueryMetricsV2(c *models.ReqContext) response.Response {
|
||||||
@ -26,80 +52,214 @@ func (hs *HTTPServer) QueryMetricsV2(c *models.ReqContext) response.Response {
|
|||||||
if err := web.Bind(c.Req, &reqDTO); err != nil {
|
if err := web.Bind(c.Req, &reqDTO); err != nil {
|
||||||
return response.Error(http.StatusBadRequest, "bad request data", err)
|
return response.Error(http.StatusBadRequest, "bad request data", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
resp, err := hs.queryMetrics(c.Req.Context(), c.SignedInUser, c.SkipCache, reqDTO, true)
|
||||||
|
if err != nil {
|
||||||
|
return hs.handleQueryMetricsError(err)
|
||||||
|
}
|
||||||
|
return toJsonStreamingResponse(resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueryMetrics returns query metrics
|
||||||
|
// POST /api/tsdb/query
|
||||||
|
//nolint: staticcheck // legacydata.DataResponse deprecated
|
||||||
|
//nolint: staticcheck // legacydata.DataQueryResult deprecated
|
||||||
|
func (hs *HTTPServer) QueryMetrics(c *models.ReqContext) response.Response {
|
||||||
|
reqDto := dtos.MetricRequest{}
|
||||||
|
if err := web.Bind(c.Req, &reqDto); err != nil {
|
||||||
|
return response.Error(http.StatusBadRequest, "bad request data", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
sdkResp, err := hs.queryMetrics(c.Req.Context(), c.SignedInUser, c.SkipCache, reqDto, false)
|
||||||
|
if err != nil {
|
||||||
|
return hs.handleQueryMetricsError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
legacyResp := legacydata.DataResponse{
|
||||||
|
Results: map[string]legacydata.DataQueryResult{},
|
||||||
|
}
|
||||||
|
|
||||||
|
for refID, res := range sdkResp.Responses {
|
||||||
|
dqr := legacydata.DataQueryResult{
|
||||||
|
RefID: refID,
|
||||||
|
}
|
||||||
|
|
||||||
|
if res.Error != nil {
|
||||||
|
dqr.Error = res.Error
|
||||||
|
}
|
||||||
|
|
||||||
|
if res.Frames != nil {
|
||||||
|
dqr.Dataframes = legacydata.NewDecodedDataFrames(res.Frames)
|
||||||
|
}
|
||||||
|
|
||||||
|
legacyResp.Results[refID] = dqr
|
||||||
|
}
|
||||||
|
|
||||||
|
statusCode := http.StatusOK
|
||||||
|
for _, res := range legacyResp.Results {
|
||||||
|
if res.Error != nil {
|
||||||
|
res.ErrorString = res.Error.Error()
|
||||||
|
legacyResp.Message = res.ErrorString
|
||||||
|
statusCode = http.StatusBadRequest
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return response.JSON(statusCode, &legacyResp)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (hs *HTTPServer) queryMetrics(ctx context.Context, user *models.SignedInUser, skipCache bool, reqDTO dtos.MetricRequest, handleExpressions bool) (*backend.QueryDataResponse, error) {
|
||||||
|
parsedReq, err := hs.parseMetricRequest(user, skipCache, reqDTO)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if handleExpressions && parsedReq.hasExpression {
|
||||||
|
return hs.handleExpressions(ctx, user, parsedReq)
|
||||||
|
}
|
||||||
|
return hs.handleQueryData(ctx, user, parsedReq)
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleExpressions handles POST /api/ds/query when there is an expression.
|
||||||
|
func (hs *HTTPServer) handleExpressions(ctx context.Context, user *models.SignedInUser, parsedReq *parsedRequest) (*backend.QueryDataResponse, error) {
|
||||||
|
exprReq := expr.Request{
|
||||||
|
OrgId: user.OrgId,
|
||||||
|
Queries: []expr.Query{},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, pq := range parsedReq.parsedQueries {
|
||||||
|
if pq.datasource == nil {
|
||||||
|
return nil, NewErrBadQuery(fmt.Sprintf("query mising datasource info: %s", pq.query.RefID))
|
||||||
|
}
|
||||||
|
|
||||||
|
exprReq.Queries = append(exprReq.Queries, expr.Query{
|
||||||
|
JSON: pq.query.JSON,
|
||||||
|
Interval: pq.query.Interval,
|
||||||
|
RefID: pq.query.RefID,
|
||||||
|
MaxDataPoints: pq.query.MaxDataPoints,
|
||||||
|
QueryType: pq.query.QueryType,
|
||||||
|
Datasource: expr.DataSourceRef{
|
||||||
|
Type: pq.datasource.Type,
|
||||||
|
UID: pq.datasource.Uid,
|
||||||
|
},
|
||||||
|
TimeRange: expr.TimeRange{
|
||||||
|
From: pq.query.TimeRange.From,
|
||||||
|
To: pq.query.TimeRange.To,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
qdr, err := hs.expressionService.TransformData(ctx, &exprReq)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("expression request error: %w", err)
|
||||||
|
}
|
||||||
|
return qdr, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (hs *HTTPServer) handleQueryData(ctx context.Context, user *models.SignedInUser, parsedReq *parsedRequest) (*backend.QueryDataResponse, error) {
|
||||||
|
ds := parsedReq.parsedQueries[0].datasource
|
||||||
|
if err := hs.PluginRequestValidator.Validate(ds.Url, nil); err != nil {
|
||||||
|
return nil, models.ErrDataSourceAccessDenied
|
||||||
|
}
|
||||||
|
|
||||||
|
instanceSettings, err := adapters.ModelToInstanceSettings(ds, hs.decryptSecureJsonDataFn())
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to convert data source to instance settings")
|
||||||
|
}
|
||||||
|
|
||||||
|
req := &backend.QueryDataRequest{
|
||||||
|
PluginContext: backend.PluginContext{
|
||||||
|
OrgID: ds.OrgId,
|
||||||
|
PluginID: ds.Type,
|
||||||
|
User: adapters.BackendUserFromSignedInUser(user),
|
||||||
|
DataSourceInstanceSettings: instanceSettings,
|
||||||
|
},
|
||||||
|
Headers: map[string]string{},
|
||||||
|
Queries: []backend.DataQuery{},
|
||||||
|
}
|
||||||
|
|
||||||
|
if hs.OAuthTokenService.IsOAuthPassThruEnabled(ds) {
|
||||||
|
if token := hs.OAuthTokenService.GetCurrentOAuthToken(ctx, user); token != nil {
|
||||||
|
req.Headers["Authorization"] = fmt.Sprintf("%s %s", token.Type(), token.AccessToken)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, q := range parsedReq.parsedQueries {
|
||||||
|
req.Queries = append(req.Queries, q.query)
|
||||||
|
}
|
||||||
|
|
||||||
|
return hs.pluginClient.QueryData(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
type parsedQuery struct {
|
||||||
|
datasource *models.DataSource
|
||||||
|
query backend.DataQuery
|
||||||
|
}
|
||||||
|
|
||||||
|
type parsedRequest struct {
|
||||||
|
hasExpression bool
|
||||||
|
parsedQueries []parsedQuery
|
||||||
|
}
|
||||||
|
|
||||||
|
func (hs *HTTPServer) parseMetricRequest(user *models.SignedInUser, skipCache bool, reqDTO dtos.MetricRequest) (*parsedRequest, error) {
|
||||||
if len(reqDTO.Queries) == 0 {
|
if len(reqDTO.Queries) == 0 {
|
||||||
return response.Error(http.StatusBadRequest, "No queries found in query", nil)
|
return nil, NewErrBadQuery("no queries found")
|
||||||
}
|
}
|
||||||
|
|
||||||
timeRange := legacydata.NewDataTimeRange(reqDTO.From, reqDTO.To)
|
timeRange := legacydata.NewDataTimeRange(reqDTO.From, reqDTO.To)
|
||||||
request := legacydata.DataQuery{
|
req := &parsedRequest{
|
||||||
TimeRange: &timeRange,
|
hasExpression: false,
|
||||||
Debug: reqDTO.Debug,
|
parsedQueries: []parsedQuery{},
|
||||||
User: c.SignedInUser,
|
|
||||||
Queries: make([]legacydata.DataSubQuery, 0, len(reqDTO.Queries)),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse the queries
|
// Parse the queries
|
||||||
hasExpression := false
|
datasources := map[string]*models.DataSource{}
|
||||||
datasources := make(map[string]*models.DataSource, len(reqDTO.Queries))
|
|
||||||
for _, query := range reqDTO.Queries {
|
for _, query := range reqDTO.Queries {
|
||||||
ds, errRsp := hs.getDataSourceFromQuery(c, query, datasources)
|
ds, err := hs.getDataSourceFromQuery(user, skipCache, query, datasources)
|
||||||
if errRsp != nil {
|
if err != nil {
|
||||||
return errRsp
|
return nil, err
|
||||||
}
|
}
|
||||||
if ds == nil {
|
if ds == nil {
|
||||||
return response.Error(http.StatusBadRequest, "Datasource not found for query", nil)
|
return nil, NewErrBadQuery("invalid data source ID")
|
||||||
}
|
}
|
||||||
|
|
||||||
datasources[ds.Uid] = ds
|
datasources[ds.Uid] = ds
|
||||||
if expr.IsDataSource(ds.Uid) {
|
if expr.IsDataSource(ds.Uid) {
|
||||||
hasExpression = true
|
req.hasExpression = true
|
||||||
}
|
}
|
||||||
|
|
||||||
hs.log.Debug("Processing metrics query", "query", query)
|
hs.log.Debug("Processing metrics query", "query", query)
|
||||||
|
|
||||||
request.Queries = append(request.Queries, legacydata.DataSubQuery{
|
modelJSON, err := query.MarshalJSON()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
req.parsedQueries = append(req.parsedQueries, parsedQuery{
|
||||||
|
datasource: ds,
|
||||||
|
query: backend.DataQuery{
|
||||||
|
TimeRange: backend.TimeRange{
|
||||||
|
From: timeRange.GetFromAsTimeUTC(),
|
||||||
|
To: timeRange.GetToAsTimeUTC(),
|
||||||
|
},
|
||||||
RefID: query.Get("refId").MustString("A"),
|
RefID: query.Get("refId").MustString("A"),
|
||||||
MaxDataPoints: query.Get("maxDataPoints").MustInt64(100),
|
MaxDataPoints: query.Get("maxDataPoints").MustInt64(100),
|
||||||
IntervalMS: query.Get("intervalMs").MustInt64(1000),
|
Interval: time.Duration(query.Get("intervalMs").MustInt64(1000)) * time.Millisecond,
|
||||||
QueryType: query.Get("queryType").MustString(""),
|
QueryType: query.Get("queryType").MustString(""),
|
||||||
Model: query,
|
JSON: modelJSON,
|
||||||
DataSource: ds,
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
if hasExpression {
|
if !req.hasExpression {
|
||||||
qdr, err := hs.expressionService.WrapTransformData(c.Req.Context(), request)
|
|
||||||
if err != nil {
|
|
||||||
return response.Error(500, "expression request error", err)
|
|
||||||
}
|
|
||||||
return toMacronResponse(qdr)
|
|
||||||
}
|
|
||||||
|
|
||||||
ds := request.Queries[0].DataSource
|
|
||||||
if len(datasources) > 1 {
|
if len(datasources) > 1 {
|
||||||
// We do not (yet) support mixed query type
|
// We do not (yet) support mixed query type
|
||||||
return response.Error(http.StatusBadRequest, "All queries must use the same datasource", nil)
|
return nil, NewErrBadQuery("all queries must use the same datasource")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err := hs.PluginRequestValidator.Validate(ds.Url, nil)
|
return req, nil
|
||||||
if err != nil {
|
|
||||||
return response.Error(http.StatusForbidden, "Access denied", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
req, err := hs.createRequest(c.Req.Context(), ds, request)
|
func (hs *HTTPServer) getDataSourceFromQuery(user *models.SignedInUser, skipCache bool, query *simplejson.Json, history map[string]*models.DataSource) (*models.DataSource, error) {
|
||||||
if err != nil {
|
|
||||||
return response.Error(http.StatusBadRequest, "Request formation error", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
resp, err := hs.pluginClient.QueryData(c.Req.Context(), req)
|
|
||||||
if err != nil {
|
|
||||||
return response.Error(http.StatusInternalServerError, "Metric request error", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return toMacronResponse(resp)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (hs *HTTPServer) getDataSourceFromQuery(c *models.ReqContext, query *simplejson.Json, history map[string]*models.DataSource) (*models.DataSource, response.Response) {
|
|
||||||
var err error
|
var err error
|
||||||
uid := query.Get("datasource").Get("uid").MustString()
|
uid := query.Get("datasource").Get("uid").MustString()
|
||||||
|
|
||||||
@ -119,31 +279,31 @@ func (hs *HTTPServer) getDataSourceFromQuery(c *models.ReqContext, query *simple
|
|||||||
}
|
}
|
||||||
|
|
||||||
if uid == grafanads.DatasourceUID {
|
if uid == grafanads.DatasourceUID {
|
||||||
return grafanads.DataSourceModel(c.OrgId), nil
|
return grafanads.DataSourceModel(user.OrgId), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// use datasourceId if it exists
|
// use datasourceId if it exists
|
||||||
id := query.Get("datasourceId").MustInt64(0)
|
id := query.Get("datasourceId").MustInt64(0)
|
||||||
if id > 0 {
|
if id > 0 {
|
||||||
ds, err = hs.DataSourceCache.GetDatasource(id, c.SignedInUser, c.SkipCache)
|
ds, err = hs.DataSourceCache.GetDatasource(id, user, skipCache)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, hs.handleGetDataSourceError(err, id)
|
return nil, err
|
||||||
}
|
}
|
||||||
return ds, nil
|
return ds, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if uid != "" {
|
if uid != "" {
|
||||||
ds, err = hs.DataSourceCache.GetDatasourceByUID(uid, c.SignedInUser, c.SkipCache)
|
ds, err = hs.DataSourceCache.GetDatasourceByUID(uid, user, skipCache)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, hs.handleGetDataSourceError(err, uid)
|
return nil, err
|
||||||
}
|
}
|
||||||
return ds, nil
|
return ds, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, response.Error(http.StatusBadRequest, "Query missing data source ID/UID", nil)
|
return nil, NewErrBadQuery("missing data source ID/UID")
|
||||||
}
|
}
|
||||||
|
|
||||||
func toMacronResponse(qdr *backend.QueryDataResponse) response.Response {
|
func toJsonStreamingResponse(qdr *backend.QueryDataResponse) response.Response {
|
||||||
statusCode := http.StatusOK
|
statusCode := http.StatusOK
|
||||||
for _, res := range qdr.Responses {
|
for _, res := range qdr.Responses {
|
||||||
if res.Error != nil {
|
if res.Error != nil {
|
||||||
@ -153,123 +313,3 @@ func toMacronResponse(qdr *backend.QueryDataResponse) response.Response {
|
|||||||
|
|
||||||
return response.JSONStreaming(statusCode, qdr)
|
return response.JSONStreaming(statusCode, qdr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hs *HTTPServer) handleGetDataSourceError(err error, datasourceRef interface{}) *response.NormalResponse {
|
|
||||||
hs.log.Debug("Encountered error getting data source", "err", err, "ref", datasourceRef)
|
|
||||||
if errors.Is(err, models.ErrDataSourceAccessDenied) {
|
|
||||||
return response.Error(403, "Access denied to data source", err)
|
|
||||||
}
|
|
||||||
if errors.Is(err, models.ErrDataSourceNotFound) {
|
|
||||||
return response.Error(400, "Invalid data source ID", err)
|
|
||||||
}
|
|
||||||
return response.Error(500, "Unable to load data source metadata", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// QueryMetrics returns query metrics
|
|
||||||
// POST /api/tsdb/query
|
|
||||||
func (hs *HTTPServer) QueryMetrics(c *models.ReqContext) response.Response {
|
|
||||||
reqDto := dtos.MetricRequest{}
|
|
||||||
if err := web.Bind(c.Req, &reqDto); err != nil {
|
|
||||||
return response.Error(http.StatusBadRequest, "bad request data", err)
|
|
||||||
}
|
|
||||||
if len(reqDto.Queries) == 0 {
|
|
||||||
return response.Error(http.StatusBadRequest, "No queries found in query", nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
datasourceId, err := reqDto.Queries[0].Get("datasourceId").Int64()
|
|
||||||
if err != nil {
|
|
||||||
return response.Error(http.StatusBadRequest, "Query missing datasourceId", nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
ds, err := hs.DataSourceCache.GetDatasource(datasourceId, c.SignedInUser, c.SkipCache)
|
|
||||||
if err != nil {
|
|
||||||
return hs.handleGetDataSourceError(err, datasourceId)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = hs.PluginRequestValidator.Validate(ds.Url, nil)
|
|
||||||
if err != nil {
|
|
||||||
return response.Error(http.StatusForbidden, "Access denied", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
timeRange := legacydata.NewDataTimeRange(reqDto.From, reqDto.To)
|
|
||||||
request := legacydata.DataQuery{
|
|
||||||
TimeRange: &timeRange,
|
|
||||||
Debug: reqDto.Debug,
|
|
||||||
User: c.SignedInUser,
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, query := range reqDto.Queries {
|
|
||||||
request.Queries = append(request.Queries, legacydata.DataSubQuery{
|
|
||||||
RefID: query.Get("refId").MustString("A"),
|
|
||||||
MaxDataPoints: query.Get("maxDataPoints").MustInt64(100),
|
|
||||||
IntervalMS: query.Get("intervalMs").MustInt64(1000),
|
|
||||||
Model: query,
|
|
||||||
DataSource: ds,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
resp, err := hs.legacyDataRequestHandler.HandleRequest(c.Req.Context(), ds, request)
|
|
||||||
if err != nil {
|
|
||||||
return response.Error(http.StatusInternalServerError, "Metric request error", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
statusCode := http.StatusOK
|
|
||||||
for _, res := range resp.Results {
|
|
||||||
if res.Error != nil {
|
|
||||||
res.ErrorString = res.Error.Error()
|
|
||||||
resp.Message = res.ErrorString
|
|
||||||
statusCode = http.StatusBadRequest
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return response.JSON(statusCode, &resp)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (hs *HTTPServer) createRequest(ctx context.Context, ds *models.DataSource, query legacydata.DataQuery) (*backend.QueryDataRequest, error) {
|
|
||||||
instanceSettings, err := adapters.ModelToInstanceSettings(ds, hs.decryptSecureJsonDataFn())
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if query.Headers == nil {
|
|
||||||
query.Headers = make(map[string]string)
|
|
||||||
}
|
|
||||||
|
|
||||||
if hs.OAuthTokenService.IsOAuthPassThruEnabled(ds) {
|
|
||||||
if token := hs.OAuthTokenService.GetCurrentOAuthToken(ctx, query.User); token != nil {
|
|
||||||
delete(query.Headers, "Authorization")
|
|
||||||
query.Headers["Authorization"] = fmt.Sprintf("%s %s", token.Type(), token.AccessToken)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
req := &backend.QueryDataRequest{
|
|
||||||
PluginContext: backend.PluginContext{
|
|
||||||
OrgID: ds.OrgId,
|
|
||||||
PluginID: ds.Type,
|
|
||||||
User: adapters.BackendUserFromSignedInUser(query.User),
|
|
||||||
DataSourceInstanceSettings: instanceSettings,
|
|
||||||
},
|
|
||||||
Queries: []backend.DataQuery{},
|
|
||||||
Headers: query.Headers,
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, q := range query.Queries {
|
|
||||||
modelJSON, err := q.Model.MarshalJSON()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
req.Queries = append(req.Queries, backend.DataQuery{
|
|
||||||
RefID: q.RefID,
|
|
||||||
Interval: time.Duration(q.IntervalMS) * time.Millisecond,
|
|
||||||
MaxDataPoints: q.MaxDataPoints,
|
|
||||||
TimeRange: backend.TimeRange{
|
|
||||||
From: query.TimeRange.GetFromAsTimeUTC(),
|
|
||||||
To: query.TimeRange.GetToAsTimeUTC(),
|
|
||||||
},
|
|
||||||
QueryType: q.QueryType,
|
|
||||||
JSON: modelJSON,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
return req, nil
|
|
||||||
}
|
|
||||||
|
@ -9,7 +9,6 @@ import (
|
|||||||
"github.com/grafana/grafana/pkg/bus"
|
"github.com/grafana/grafana/pkg/bus"
|
||||||
"github.com/grafana/grafana/pkg/models"
|
"github.com/grafana/grafana/pkg/models"
|
||||||
"github.com/grafana/grafana/pkg/plugins/adapters"
|
"github.com/grafana/grafana/pkg/plugins/adapters"
|
||||||
"github.com/grafana/grafana/pkg/tsdb/legacydata"
|
|
||||||
"github.com/grafana/grafana/pkg/util/errutil"
|
"github.com/grafana/grafana/pkg/util/errutil"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
@ -32,40 +31,6 @@ func init() {
|
|||||||
prometheus.MustRegister(expressionsQuerySummary)
|
prometheus.MustRegister(expressionsQuerySummary)
|
||||||
}
|
}
|
||||||
|
|
||||||
// WrapTransformData creates and executes transform requests
|
|
||||||
func (s *Service) WrapTransformData(ctx context.Context, query legacydata.DataQuery) (*backend.QueryDataResponse, error) {
|
|
||||||
req := Request{
|
|
||||||
OrgId: query.User.OrgId,
|
|
||||||
Queries: []Query{},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, q := range query.Queries {
|
|
||||||
if q.DataSource == nil {
|
|
||||||
return nil, fmt.Errorf("mising datasource info: " + q.RefID)
|
|
||||||
}
|
|
||||||
modelJSON, err := q.Model.MarshalJSON()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
req.Queries = append(req.Queries, Query{
|
|
||||||
JSON: modelJSON,
|
|
||||||
Interval: time.Duration(q.IntervalMS) * time.Millisecond,
|
|
||||||
RefID: q.RefID,
|
|
||||||
MaxDataPoints: q.MaxDataPoints,
|
|
||||||
QueryType: q.QueryType,
|
|
||||||
Datasource: DataSourceRef{
|
|
||||||
Type: q.DataSource.Type,
|
|
||||||
UID: q.DataSource.Uid,
|
|
||||||
},
|
|
||||||
TimeRange: TimeRange{
|
|
||||||
From: query.TimeRange.GetFromAsTimeUTC(),
|
|
||||||
To: query.TimeRange.GetToAsTimeUTC(),
|
|
||||||
},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return s.TransformData(ctx, &req)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Request is similar to plugins.DataQuery but with the Time Ranges is per Query.
|
// Request is similar to plugins.DataQuery but with the Time Ranges is per Query.
|
||||||
type Request struct {
|
type Request struct {
|
||||||
Headers map[string]string
|
Headers map[string]string
|
||||||
|
@ -2,7 +2,6 @@ package azuremonitor
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -13,7 +12,6 @@ import (
|
|||||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||||
"github.com/grafana/grafana/pkg/setting"
|
"github.com/grafana/grafana/pkg/setting"
|
||||||
"github.com/grafana/grafana/pkg/tsdb/legacydata"
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
@ -25,15 +23,15 @@ func TestBuildingAzureResourceGraphQueries(t *testing.T) {
|
|||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
queryModel []backend.DataQuery
|
queryModel []backend.DataQuery
|
||||||
timeRange legacydata.DataTimeRange
|
timeRange backend.TimeRange
|
||||||
azureResourceGraphQueries []*AzureResourceGraphQuery
|
azureResourceGraphQueries []*AzureResourceGraphQuery
|
||||||
Err require.ErrorAssertionFunc
|
Err require.ErrorAssertionFunc
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "Query with macros should be interpolated",
|
name: "Query with macros should be interpolated",
|
||||||
timeRange: legacydata.DataTimeRange{
|
timeRange: backend.TimeRange{
|
||||||
From: fmt.Sprintf("%v", fromStart.Unix()*1000),
|
From: fromStart,
|
||||||
To: fmt.Sprintf("%v", fromStart.Add(34*time.Minute).Unix()*1000),
|
To: fromStart.Add(34 * time.Minute),
|
||||||
},
|
},
|
||||||
queryModel: []backend.DataQuery{
|
queryModel: []backend.DataQuery{
|
||||||
{
|
{
|
||||||
|
@ -6,7 +6,6 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||||
"github.com/grafana/grafana/pkg/components/null"
|
"github.com/grafana/grafana/pkg/components/null"
|
||||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||||
@ -190,41 +189,3 @@ type DataResponse struct {
|
|||||||
Results map[string]DataQueryResult `json:"results"`
|
Results map[string]DataQueryResult `json:"results"`
|
||||||
Message string `json:"message,omitempty"`
|
Message string `json:"message,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// ToBackendDataResponse converts the legacy format to the standard SDK format
|
|
||||||
func (r DataResponse) ToBackendDataResponse() (*backend.QueryDataResponse, error) {
|
|
||||||
qdr := &backend.QueryDataResponse{
|
|
||||||
Responses: make(map[string]backend.DataResponse, len(r.Results)),
|
|
||||||
}
|
|
||||||
|
|
||||||
// Convert tsdb results (map) to plugin-model/datasource (slice) results.
|
|
||||||
// Only error, Series, and encoded Dataframes responses are mapped.
|
|
||||||
for refID, res := range r.Results {
|
|
||||||
pRes := backend.DataResponse{}
|
|
||||||
if res.Error != nil {
|
|
||||||
pRes.Error = res.Error
|
|
||||||
}
|
|
||||||
|
|
||||||
if res.Dataframes != nil {
|
|
||||||
decoded, err := res.Dataframes.Decoded()
|
|
||||||
if err != nil {
|
|
||||||
return qdr, err
|
|
||||||
}
|
|
||||||
pRes.Frames = decoded
|
|
||||||
qdr.Responses[refID] = pRes
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, series := range res.Series {
|
|
||||||
frame, err := SeriesToFrame(series)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
frame.RefID = refID
|
|
||||||
pRes.Frames = append(pRes.Frames, frame)
|
|
||||||
}
|
|
||||||
|
|
||||||
qdr.Responses[refID] = pRes
|
|
||||||
}
|
|
||||||
return qdr, nil
|
|
||||||
}
|
|
||||||
|
@ -37,18 +37,6 @@ func NewDecodedDataFrames(decodedFrames data.Frames) DataFrames {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewEncodedDataFrames instantiates DataFrames from encoded frames.
|
|
||||||
//
|
|
||||||
// This one is primarily used for creating DataFrames when receiving encoded data frames from an external
|
|
||||||
// plugin or similar. This may allow the encoded data frames to be returned to Grafana UI without any additional
|
|
||||||
// decoding/encoding required. In Grafana alerting scenario it needs to operate on decoded data frames why encoded
|
|
||||||
// frames needs to be decoded before usage.
|
|
||||||
func NewEncodedDataFrames(encodedFrames [][]byte) DataFrames {
|
|
||||||
return &dataFrames{
|
|
||||||
encoded: encodedFrames,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (df *dataFrames) Encoded() ([][]byte, error) {
|
func (df *dataFrames) Encoded() ([][]byte, error) {
|
||||||
if df.encoded == nil {
|
if df.encoded == nil {
|
||||||
encoded, err := df.decoded.MarshalArrow()
|
encoded, err := df.decoded.MarshalArrow()
|
||||||
|
@ -1,37 +0,0 @@
|
|||||||
package legacydata
|
|
||||||
|
|
||||||
import (
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
|
||||||
)
|
|
||||||
|
|
||||||
// SeriesToFrame converts a DataTimeSeries to an SDK frame.
|
|
||||||
func SeriesToFrame(series DataTimeSeries) (*data.Frame, error) {
|
|
||||||
timeVec := make([]*time.Time, len(series.Points))
|
|
||||||
floatVec := make([]*float64, len(series.Points))
|
|
||||||
for idx, point := range series.Points {
|
|
||||||
timeVec[idx], floatVec[idx] = convertDataTimePoint(point)
|
|
||||||
}
|
|
||||||
frame := data.NewFrame(series.Name,
|
|
||||||
data.NewField("time", nil, timeVec),
|
|
||||||
data.NewField("value", data.Labels(series.Tags), floatVec),
|
|
||||||
)
|
|
||||||
|
|
||||||
return frame, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// convertDataTimePoint converts a DataTimePoint into two values appropriate
|
|
||||||
// for Series values.
|
|
||||||
func convertDataTimePoint(point DataTimePoint) (t *time.Time, f *float64) {
|
|
||||||
timeIdx, valueIdx := 1, 0
|
|
||||||
if point[timeIdx].Valid { // Assuming valid is null?
|
|
||||||
tI := int64(point[timeIdx].Float64)
|
|
||||||
uT := time.Unix(tI/int64(1e+3), (tI%int64(1e+3))*int64(1e+6)) // time.Time from millisecond unix ts
|
|
||||||
t = &uT
|
|
||||||
}
|
|
||||||
if point[valueIdx].Valid {
|
|
||||||
f = &point[valueIdx].Float64
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
@ -1,7 +1,6 @@
|
|||||||
package sqleng
|
package sqleng
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"database/sql"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"testing"
|
"testing"
|
||||||
@ -11,11 +10,9 @@ import (
|
|||||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/data/sqlutil"
|
"github.com/grafana/grafana-plugin-sdk-go/data/sqlutil"
|
||||||
"github.com/grafana/grafana/pkg/infra/log"
|
"github.com/grafana/grafana/pkg/infra/log"
|
||||||
"github.com/grafana/grafana/pkg/tsdb/legacydata"
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"github.com/xorcare/pointer"
|
"github.com/xorcare/pointer"
|
||||||
"xorm.io/core"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestSQLEngine(t *testing.T) {
|
func TestSQLEngine(t *testing.T) {
|
||||||
@ -420,10 +417,6 @@ type testQueryResultTransformer struct {
|
|||||||
transformQueryErrorWasCalled bool
|
transformQueryErrorWasCalled bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *testQueryResultTransformer) TransformQueryResult(columnTypes []*sql.ColumnType, rows *core.Rows) (legacydata.DataRowValues, error) {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *testQueryResultTransformer) TransformQueryError(err error) error {
|
func (t *testQueryResultTransformer) TransformQueryError(err error) error {
|
||||||
t.transformQueryErrorWasCalled = true
|
t.transformQueryErrorWasCalled = true
|
||||||
return err
|
return err
|
||||||
|
@ -6,6 +6,7 @@ import (
|
|||||||
"regexp"
|
"regexp"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
"unicode"
|
||||||
)
|
)
|
||||||
|
|
||||||
// StringsFallback2 returns the first of two not empty strings.
|
// StringsFallback2 returns the first of two not empty strings.
|
||||||
@ -108,3 +109,12 @@ func ToCamelCase(str string) string {
|
|||||||
|
|
||||||
return strings.Join(finalParts, "")
|
return strings.Join(finalParts, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Capitalize(s string) string {
|
||||||
|
if len(s) == 0 {
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
r := []rune(s)
|
||||||
|
r[0] = unicode.ToUpper(r[0])
|
||||||
|
return string(r)
|
||||||
|
}
|
||||||
|
@ -88,8 +88,20 @@ func TestToCamelCase(t *testing.T) {
|
|||||||
"snake_case_string": "snakeCaseString",
|
"snake_case_string": "snakeCaseString",
|
||||||
"mixed-case_string": "mixedCaseString",
|
"mixed-case_string": "mixedCaseString",
|
||||||
"alreadyCamelCase": "alreadyCamelCase",
|
"alreadyCamelCase": "alreadyCamelCase",
|
||||||
|
"": "",
|
||||||
}
|
}
|
||||||
for input, expected := range tests {
|
for input, expected := range tests {
|
||||||
assert.Equal(t, expected, ToCamelCase(input))
|
assert.Equal(t, expected, ToCamelCase(input))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCapitalize(t *testing.T) {
|
||||||
|
tests := map[string]string{
|
||||||
|
"properly capitalizes": "Properly capitalizes",
|
||||||
|
"Already capitalized": "Already capitalized",
|
||||||
|
"": "",
|
||||||
|
}
|
||||||
|
for input, expected := range tests {
|
||||||
|
assert.Equal(t, expected, Capitalize(input))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user