QueryService: Use types from sdk (#84029)

This commit is contained in:
Ryan McKinley
2024-03-08 08:12:59 -08:00
committed by GitHub
parent f11b10a10c
commit d82f3be6f7
36 changed files with 1555 additions and 879 deletions

View File

@@ -3,8 +3,12 @@ package datasource
import (
"context"
"fmt"
"net/http"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
data "github.com/grafana/grafana-plugin-sdk-go/experimental/apis/data/v0alpha1"
"github.com/grafana/grafana-plugin-sdk-go/experimental/schemabuilder"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
@@ -15,10 +19,9 @@ import (
genericapiserver "k8s.io/apiserver/pkg/server"
openapi "k8s.io/kube-openapi/pkg/common"
"k8s.io/kube-openapi/pkg/spec3"
"k8s.io/kube-openapi/pkg/validation/spec"
"k8s.io/utils/strings/slices"
"github.com/grafana/grafana-plugin-sdk-go/backend"
common "github.com/grafana/grafana/pkg/apimachinery/apis/common/v0alpha1"
datasource "github.com/grafana/grafana/pkg/apis/datasource/v0alpha1"
query "github.com/grafana/grafana/pkg/apis/query/v0alpha1"
@@ -30,6 +33,9 @@ import (
"github.com/grafana/grafana/pkg/services/pluginsintegration/pluginstore"
)
const QueryRequestSchemaKey = "QueryRequestSchema"
const QueryPayloadSchemaKey = "QueryPayloadSchema"
var _ builder.APIGroupBuilder = (*DataSourceAPIBuilder)(nil)
// DataSourceAPIBuilder is used just so wire has something unique to return
@@ -41,6 +47,7 @@ type DataSourceAPIBuilder struct {
datasources PluginDatasourceProvider
contextProvider PluginContextWrapper
accessControl accesscontrol.AccessControl
queryTypes *query.QueryTypeDefinitionList
}
func RegisterAPIService(
@@ -62,6 +69,7 @@ func RegisterAPIService(
all := pluginStore.Plugins(context.Background(), plugins.TypeDataSource)
ids := []string{
"grafana-testdata-datasource",
// "prometheus",
}
for _, ds := range all {
@@ -123,6 +131,7 @@ func addKnownTypes(scheme *runtime.Scheme, gv schema.GroupVersion) {
&datasource.HealthCheckResult{},
&unstructured.Unstructured{},
// Query handler
&query.QueryDataRequest{},
&query.QueryDataResponse{},
&metav1.Status{},
)
@@ -238,15 +247,108 @@ func (b *DataSourceAPIBuilder) PostProcessOpenAPI(oas *spec3.OpenAPI) (*spec3.Op
// Hide the ability to list all connections across tenants
delete(oas.Paths.Paths, root+b.connectionResourceInfo.GroupResource().Resource)
var err error
opts := schemabuilder.QuerySchemaOptions{
PluginID: []string{b.pluginJSON.ID},
QueryTypes: []data.QueryTypeDefinition{},
Mode: schemabuilder.SchemaTypeQueryPayload,
}
if b.pluginJSON.AliasIDs != nil {
opts.PluginID = append(opts.PluginID, b.pluginJSON.AliasIDs...)
}
if b.queryTypes != nil {
for _, qt := range b.queryTypes.Items {
// The SDK type and api type are not the same so we recreate it here
opts.QueryTypes = append(opts.QueryTypes, data.QueryTypeDefinition{
ObjectMeta: data.ObjectMeta{
Name: qt.Name,
},
Spec: qt.Spec,
})
}
}
oas.Components.Schemas[QueryPayloadSchemaKey], err = schemabuilder.GetQuerySchema(opts)
if err != nil {
return oas, err
}
opts.Mode = schemabuilder.SchemaTypeQueryRequest
oas.Components.Schemas[QueryRequestSchemaKey], err = schemabuilder.GetQuerySchema(opts)
if err != nil {
return oas, err
}
// Update the request object
sub := oas.Paths.Paths[root+"namespaces/{namespace}/connections/{name}/query"]
if sub != nil && sub.Post != nil {
sub.Post.Description = "Execute queries"
sub.Post.RequestBody = &spec3.RequestBody{
RequestBodyProps: spec3.RequestBodyProps{
Required: true,
Content: map[string]*spec3.MediaType{
"application/json": {
MediaTypeProps: spec3.MediaTypeProps{
Schema: spec.RefSchema("#/components/schemas/" + QueryRequestSchemaKey),
Examples: getExamples(b.queryTypes),
},
},
},
},
}
okrsp, ok := sub.Post.Responses.StatusCodeResponses[200]
if ok {
sub.Post.Responses.StatusCodeResponses[http.StatusMultiStatus] = &spec3.Response{
ResponseProps: spec3.ResponseProps{
Description: "Query executed, but errors may exist in the datasource. See the payload for more details.",
Content: okrsp.Content,
},
}
}
}
// The root API discovery list
sub := oas.Paths.Paths[root]
sub = oas.Paths.Paths[root]
if sub != nil && sub.Get != nil {
sub.Get.Tags = []string{"API Discovery"} // sorts first in the list
}
return oas, nil
return oas, err
}
// Register additional routes with the server
func (b *DataSourceAPIBuilder) GetAPIRoutes() *builder.APIRoutes {
return nil
}
func getExamples(queryTypes *query.QueryTypeDefinitionList) map[string]*spec3.Example {
if queryTypes == nil {
return nil
}
tr := data.TimeRange{From: "now-1h", To: "now"}
examples := map[string]*spec3.Example{}
for _, queryType := range queryTypes.Items {
for idx, example := range queryType.Spec.Examples {
q := data.NewDataQuery(example.SaveModel.Object)
q.RefID = "A"
for _, dis := range queryType.Spec.Discriminators {
_ = q.Set(dis.Field, dis.Value)
}
if q.MaxDataPoints < 1 {
q.MaxDataPoints = 1000
}
if q.IntervalMS < 1 {
q.IntervalMS = 5000 // 5s
}
examples[fmt.Sprintf("%s-%d", example.Name, idx)] = &spec3.Example{
ExampleProps: spec3.ExampleProps{
Summary: example.Name,
Description: example.Description,
Value: data.QueryDataRequest{
TimeRange: tr,
Queries: []data.DataQuery{q},
},
},
}
}
}
return examples
}

View File

@@ -2,16 +2,15 @@ package datasource
import (
"context"
"encoding/json"
"fmt"
"net/http"
"github.com/grafana/grafana-plugin-sdk-go/backend"
data "github.com/grafana/grafana-plugin-sdk-go/experimental/apis/data/v0alpha1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/registry/rest"
query "github.com/grafana/grafana/pkg/apis/query/v0alpha1"
"github.com/grafana/grafana/pkg/middleware/requestmeta"
"github.com/grafana/grafana/pkg/tsdb/legacydata"
"github.com/grafana/grafana/pkg/web"
)
@@ -20,28 +19,33 @@ type subQueryREST struct {
builder *DataSourceAPIBuilder
}
var _ = rest.Connecter(&subQueryREST{})
var (
_ rest.Storage = (*subQueryREST)(nil)
_ rest.Connecter = (*subQueryREST)(nil)
_ rest.StorageMetadata = (*subQueryREST)(nil)
)
func (r *subQueryREST) New() runtime.Object {
// This is added as the "ResponseType" regarless what ProducesObject() says :)
return &query.QueryDataResponse{}
}
func (r *subQueryREST) Destroy() {}
func (r *subQueryREST) ProducesMIMETypes(verb string) []string {
return []string{"application/json"} // and parquet!
}
func (r *subQueryREST) ProducesObject(verb string) interface{} {
return &query.QueryDataResponse{}
}
func (r *subQueryREST) ConnectMethods() []string {
return []string{"POST"}
}
func (r *subQueryREST) NewConnectOptions() (runtime.Object, bool, string) {
return nil, false, ""
}
func (r *subQueryREST) readQueries(req *http.Request) ([]backend.DataQuery, *query.DataSourceRef, error) {
reqDTO := query.GenericQueryRequest{}
if err := web.Bind(req, &reqDTO); err != nil {
return nil, nil, err
}
return legacydata.ToDataSourceQueries(reqDTO)
return nil, false, "" // true means you can use the trailing path as a variable
}
func (r *subQueryREST) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) {
@@ -49,59 +53,35 @@ func (r *subQueryREST) Connect(ctx context.Context, name string, opts runtime.Ob
if err != nil {
return nil, err
}
ctx = backend.WithGrafanaConfig(ctx, pluginCtx.GrafanaConfig)
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
queries, dsRef, err := r.readQueries(req)
dqr := data.QueryDataRequest{}
err := web.Bind(req, &dqr)
if err != nil {
responder.Error(err)
return
}
queries, dsRef, err := legacydata.ToDataSourceQueries(dqr)
if err != nil {
responder.Error(err)
return
}
if dsRef != nil && dsRef.UID != name {
responder.Error(fmt.Errorf("expected the datasource in the request url and body to match"))
return
responder.Error(fmt.Errorf("expected query body datasource and request to match"))
}
qdr, err := r.builder.client.QueryData(ctx, &backend.QueryDataRequest{
PluginContext: pluginCtx,
ctx = backend.WithGrafanaConfig(ctx, pluginCtx.GrafanaConfig)
rsp, err := r.builder.client.QueryData(ctx, &backend.QueryDataRequest{
Queries: queries,
PluginContext: pluginCtx,
})
if err != nil {
responder.Error(err)
return
}
statusCode := http.StatusOK
for _, res := range qdr.Responses {
if res.Error != nil {
statusCode = http.StatusMultiStatus
}
}
if statusCode != http.StatusOK {
requestmeta.WithDownstreamStatusSource(ctx)
}
// TODO... someday :) can return protobuf for machine-machine communication
// will avoid some hops the current response workflow (for external plugins)
// 1. Plugin:
// creates: golang structs
// returns: arrow + protobuf |
// 2. Client: | direct when local/non grpc
// reads: protobuf+arrow V
// returns: golang structs
// 3. Datasource Server (eg right here):
// reads: golang structs
// returns: JSON
// 4. Query service (alerting etc):
// reads: JSON? (TODO! raw output from 1???)
// returns: JSON (after more operations)
// 5. Browser
// reads: JSON
w.WriteHeader(statusCode)
w.Header().Set("Content-Type", "application/json")
err = json.NewEncoder(w).Encode(qdr)
if err != nil {
responder.Error(err)
}
responder.Object(query.GetResponseCode(rsp),
&query.QueryDataResponse{QueryDataResponse: *rsp},
)
}), nil
}

View File

@@ -2,8 +2,8 @@ package peakq
import (
"github.com/grafana/grafana-plugin-sdk-go/data"
apidata "github.com/grafana/grafana-plugin-sdk-go/experimental/apis/data/v0alpha1"
query "github.com/grafana/grafana/pkg/apis/query/v0alpha1"
"github.com/grafana/grafana/pkg/apis/query/v0alpha1/template"
)
@@ -38,7 +38,7 @@ var basicTemplateSpec = template.QueryTemplate{
},
},
Properties: query.NewGenericDataQuery(map[string]any{
Properties: apidata.NewDataQuery(map[string]any{
"refId": "A", // TODO: Set when Where?
"datasource": map[string]any{
"type": "prometheus",
@@ -58,7 +58,7 @@ var basicTemplateRenderedTargets = []template.Target{
{
DataType: data.FrameTypeUnknown,
//DataTypeVersion: data.FrameTypeVersion{0, 0},
Properties: query.NewGenericDataQuery(map[string]any{
Properties: apidata.NewDataQuery(map[string]any{
"refId": "A", // TODO: Set when Where?
"datasource": map[string]any{
"type": "prometheus",

View File

@@ -14,8 +14,8 @@ func TestRender(t *testing.T) {
rT, err := template.RenderTemplate(basicTemplateSpec, map[string][]string{"metricName": {"up"}})
require.NoError(t, err)
require.Equal(t,
basicTemplateRenderedTargets[0].Properties.AdditionalProperties()["expr"],
rT[0].Properties.AdditionalProperties()["expr"])
basicTemplateRenderedTargets[0].Properties.GetString("expr"),
rT[0].Properties.GetString("expr"))
b, _ := json.MarshalIndent(basicTemplateSpec, "", " ")
fmt.Println(string(b))
}

View File

@@ -0,0 +1,22 @@
package query
import (
"context"
data "github.com/grafana/grafana-plugin-sdk-go/experimental/apis/data/v0alpha1"
)
// The query runner interface
type DataSourceClientSupplier interface {
// Get a client for a given datasource
// NOTE: authorization headers are not yet added and the client may be shared across multiple users
GetDataSourceClient(ctx context.Context, ref data.DataSourceRef) (data.QueryDataClient, error)
}
type CommonDataSourceClientSupplier struct {
Client data.QueryDataClient
}
func (s *CommonDataSourceClientSupplier) GetDataSourceClient(ctx context.Context, ref data.DataSourceRef) (data.QueryDataClient, error) {
return s.Client, nil
}

View File

@@ -1,17 +1,18 @@
package runner
package client
import (
"context"
"fmt"
"net/http"
"sync"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
data "github.com/grafana/grafana-plugin-sdk-go/experimental/apis/data/v0alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"github.com/grafana/grafana/pkg/apis/query/v0alpha1"
query "github.com/grafana/grafana/pkg/apis/query/v0alpha1"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/pluginsintegration/plugincontext"
@@ -20,14 +21,14 @@ import (
"github.com/grafana/grafana/pkg/tsdb/legacydata"
)
type directRunner struct {
type pluginClient struct {
pluginClient plugins.Client
pCtxProvider *plugincontext.Provider
}
type directRegistry struct {
type pluginRegistry struct {
pluginsMu sync.Mutex
plugins *v0alpha1.DataSourceApiServerList
plugins *query.DataSourceApiServerList
apis map[string]schema.GroupVersion
groupToPlugin map[string]string
pluginStore pluginstore.Store
@@ -36,68 +37,67 @@ type directRegistry struct {
dataSourcesService datasources.DataSourceService
}
var _ v0alpha1.QueryRunner = (*directRunner)(nil)
var _ v0alpha1.DataSourceApiServerRegistry = (*directRegistry)(nil)
var _ data.QueryDataClient = (*pluginClient)(nil)
var _ query.DataSourceApiServerRegistry = (*pluginRegistry)(nil)
// NewDummyTestRunner creates a runner that only works with testdata
func NewDirectQueryRunner(
pluginClient plugins.Client,
pCtxProvider *plugincontext.Provider) v0alpha1.QueryRunner {
return &directRunner{
pluginClient: pluginClient,
pCtxProvider: pCtxProvider,
func NewQueryClientForPluginClient(p plugins.Client, ctx *plugincontext.Provider) data.QueryDataClient {
return &pluginClient{
pluginClient: p,
pCtxProvider: ctx,
}
}
func NewDirectRegistry(pluginStore pluginstore.Store,
func NewDataSourceRegistryFromStore(pluginStore pluginstore.Store,
dataSourcesService datasources.DataSourceService,
) v0alpha1.DataSourceApiServerRegistry {
return &directRegistry{
) query.DataSourceApiServerRegistry {
return &pluginRegistry{
pluginStore: pluginStore,
dataSourcesService: dataSourcesService,
}
}
// ExecuteQueryData implements QueryHelper.
func (d *directRunner) ExecuteQueryData(ctx context.Context,
// The k8s group for the datasource (pluginId)
datasource schema.GroupVersion,
// The datasource name/uid
name string,
// The raw backend query objects
query []v0alpha1.GenericDataQuery,
) (*backend.QueryDataResponse, error) {
queries, dsRef, err := legacydata.ToDataSourceQueries(v0alpha1.GenericQueryRequest{
Queries: query,
})
func (d *pluginClient) QueryData(ctx context.Context, req data.QueryDataRequest) (int, *backend.QueryDataResponse, error) {
queries, dsRef, err := legacydata.ToDataSourceQueries(req)
if err != nil {
return nil, err
return http.StatusBadRequest, nil, err
}
if dsRef != nil && dsRef.UID != name {
return nil, fmt.Errorf("expected query body datasource and request to match")
if dsRef == nil {
return http.StatusBadRequest, nil, fmt.Errorf("expected single datasource request")
}
// NOTE: this depends on uid unique across datasources
settings, err := d.pCtxProvider.GetDataSourceInstanceSettings(ctx, name)
settings, err := d.pCtxProvider.GetDataSourceInstanceSettings(ctx, dsRef.UID)
if err != nil {
return nil, err
return http.StatusBadRequest, nil, err
}
pCtx, err := d.pCtxProvider.PluginContextForDataSource(ctx, settings)
qdr := &backend.QueryDataRequest{
Queries: queries,
}
qdr.PluginContext, err = d.pCtxProvider.PluginContextForDataSource(ctx, settings)
if err != nil {
return nil, err
return http.StatusBadRequest, nil, err
}
return d.pluginClient.QueryData(ctx, &backend.QueryDataRequest{
PluginContext: pCtx,
Queries: queries,
})
code := http.StatusOK
rsp, err := d.pluginClient.QueryData(ctx, qdr)
if err == nil {
for _, v := range rsp.Responses {
if v.Error != nil {
code = http.StatusMultiStatus
break
}
}
} else {
code = http.StatusInternalServerError
}
return code, rsp, err
}
// GetDatasourceAPI implements DataSourceRegistry.
func (d *directRegistry) GetDatasourceGroupVersion(pluginId string) (schema.GroupVersion, error) {
func (d *pluginRegistry) GetDatasourceGroupVersion(pluginId string) (schema.GroupVersion, error) {
d.pluginsMu.Lock()
defer d.pluginsMu.Unlock()
@@ -117,7 +117,7 @@ func (d *directRegistry) GetDatasourceGroupVersion(pluginId string) (schema.Grou
}
// GetDatasourcePlugins no namespace? everything that is available
func (d *directRegistry) GetDatasourceApiServers(ctx context.Context) (*v0alpha1.DataSourceApiServerList, error) {
func (d *pluginRegistry) GetDatasourceApiServers(ctx context.Context) (*query.DataSourceApiServerList, error) {
d.pluginsMu.Lock()
defer d.pluginsMu.Unlock()
@@ -132,10 +132,10 @@ func (d *directRegistry) GetDatasourceApiServers(ctx context.Context) (*v0alpha1
}
// This should be called when plugins change
func (d *directRegistry) updatePlugins() error {
func (d *pluginRegistry) updatePlugins() error {
groupToPlugin := map[string]string{}
apis := map[string]schema.GroupVersion{}
result := &v0alpha1.DataSourceApiServerList{
result := &query.DataSourceApiServerList{
ListMeta: metav1.ListMeta{
ResourceVersion: fmt.Sprintf("%d", time.Now().UnixMilli()),
},
@@ -159,7 +159,7 @@ func (d *directRegistry) updatePlugins() error {
}
groupToPlugin[group] = dsp.ID
ds := v0alpha1.DataSourceApiServer{
ds := query.DataSourceApiServer{
ObjectMeta: metav1.ObjectMeta{
Name: dsp.ID,
CreationTimestamp: metav1.NewTime(time.UnixMilli(ts)),

View File

@@ -1,59 +1,46 @@
package runner
package client
import (
"context"
"fmt"
"net/http"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
data "github.com/grafana/grafana-plugin-sdk-go/experimental/apis/data/v0alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"github.com/grafana/grafana/pkg/apis/query/v0alpha1"
query "github.com/grafana/grafana/pkg/apis/query/v0alpha1"
testdata "github.com/grafana/grafana/pkg/tsdb/grafana-testdata-datasource"
"github.com/grafana/grafana/pkg/tsdb/legacydata"
)
type testdataDummy struct{}
var _ v0alpha1.QueryRunner = (*testdataDummy)(nil)
var _ v0alpha1.DataSourceApiServerRegistry = (*testdataDummy)(nil)
var _ data.QueryDataClient = (*testdataDummy)(nil)
var _ query.DataSourceApiServerRegistry = (*testdataDummy)(nil)
// NewDummyTestRunner creates a runner that only works with testdata
func NewDummyTestRunner() v0alpha1.QueryRunner {
// NewTestDataClient creates a runner that only works with testdata
func NewTestDataClient() data.QueryDataClient {
return &testdataDummy{}
}
func NewDummyRegistry() v0alpha1.DataSourceApiServerRegistry {
// NewTestDataRegistry returns a registry that only knows about testdata
func NewTestDataRegistry() query.DataSourceApiServerRegistry {
return &testdataDummy{}
}
// ExecuteQueryData implements QueryHelper.
func (d *testdataDummy) ExecuteQueryData(ctx context.Context,
// The k8s group for the datasource (pluginId)
datasource schema.GroupVersion,
// The datasource name/uid
name string,
// The raw backend query objects
query []v0alpha1.GenericDataQuery,
) (*backend.QueryDataResponse, error) {
if datasource.Group != "testdata.datasource.grafana.app" {
return nil, fmt.Errorf("expecting testdata requests")
}
queries, _, err := legacydata.ToDataSourceQueries(v0alpha1.GenericQueryRequest{
Queries: query,
})
func (d *testdataDummy) QueryData(ctx context.Context, req data.QueryDataRequest) (int, *backend.QueryDataResponse, error) {
queries, _, err := legacydata.ToDataSourceQueries(req)
if err != nil {
return nil, err
return http.StatusBadRequest, nil, err
}
return testdata.ProvideService().QueryData(ctx, &backend.QueryDataRequest{
Queries: queries,
})
qdr := &backend.QueryDataRequest{Queries: queries}
rsp, err := testdata.ProvideService().QueryData(ctx, qdr)
return query.GetResponseCode(rsp), rsp, err
}
// GetDatasourceAPI implements DataSourceRegistry.
@@ -68,12 +55,12 @@ func (*testdataDummy) GetDatasourceGroupVersion(pluginId string) (schema.GroupVe
}
// GetDatasourcePlugins implements QueryHelper.
func (d *testdataDummy) GetDatasourceApiServers(ctx context.Context) (*v0alpha1.DataSourceApiServerList, error) {
return &v0alpha1.DataSourceApiServerList{
func (d *testdataDummy) GetDatasourceApiServers(ctx context.Context) (*query.DataSourceApiServerList, error) {
return &query.DataSourceApiServerList{
ListMeta: metav1.ListMeta{
ResourceVersion: fmt.Sprintf("%d", time.Now().UnixMilli()),
},
Items: []v0alpha1.DataSourceApiServer{
Items: []query.DataSourceApiServer{
{
ObjectMeta: metav1.ObjectMeta{
Name: "grafana-testdata-datasource",

View File

@@ -0,0 +1,48 @@
package query
import (
"github.com/prometheus/client_golang/prometheus"
)
const (
metricsSubSystem = "queryservice"
metricsNamespace = "grafana"
)
type metrics struct {
dsRequests *prometheus.CounterVec
// older metric
expressionsQuerySummary *prometheus.SummaryVec
}
func newMetrics(reg prometheus.Registerer) *metrics {
m := &metrics{
dsRequests: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubSystem,
Name: "ds_queries_total",
Help: "Number of datasource queries made from the query service",
}, []string{"error", "dataplane", "datasource_type"}),
expressionsQuerySummary: prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubSystem,
Name: "expressions_queries_duration_milliseconds",
Help: "Expressions query summary",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{"status"},
),
}
if reg != nil {
reg.MustRegister(
m.dsRequests,
m.expressionsQuerySummary,
)
}
return m
}

View File

@@ -1,83 +1,216 @@
package query
import (
"context"
"encoding/json"
"fmt"
"github.com/grafana/grafana/pkg/apis/query/v0alpha1"
"github.com/grafana/grafana-plugin-sdk-go/data/utils/jsoniter"
data "github.com/grafana/grafana-plugin-sdk-go/experimental/apis/data/v0alpha1"
"gonum.org/v1/gonum/graph/simple"
"gonum.org/v1/gonum/graph/topo"
query "github.com/grafana/grafana/pkg/apis/query/v0alpha1"
"github.com/grafana/grafana/pkg/expr"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/datasources/service"
)
type parsedQueryRequest struct {
// The queries broken into requests
Requests []groupedQueries
type datasourceRequest struct {
// The type
PluginId string `json:"pluginId"`
// The UID
UID string `json:"uid"`
// Optionally show the additional query properties
Expressions []v0alpha1.GenericDataQuery
Request *data.QueryDataRequest `json:"request"`
// Headers that should be forwarded to the next request
Headers map[string]string `json:"headers,omitempty"`
}
type groupedQueries struct {
// the plugin type
pluginId string
type parsedRequestInfo struct {
// Datasource queries, one for each datasource
Requests []datasourceRequest `json:"requests,omitempty"`
// The datasource name/uid
uid string
// Expressions in required execution order
Expressions []expr.ExpressionQuery `json:"expressions,omitempty"`
// The raw backend query objects
query []v0alpha1.GenericDataQuery
// Expressions include explicit hacks for influx+prometheus
RefIDTypes map[string]string `json:"types,omitempty"`
// Hidden queries used as dependencies
HideBeforeReturn []string `json:"hide,omitempty"`
}
// Internally define what makes this request unique (eventually may include the apiVersion)
func (d *groupedQueries) key() string {
return fmt.Sprintf("%s/%s", d.pluginId, d.uid)
type queryParser struct {
legacy service.LegacyDataSourceLookup
reader *expr.ExpressionQueryReader
tracer tracing.Tracer
}
func parseQueryRequest(raw v0alpha1.GenericQueryRequest) (parsedQueryRequest, error) {
mixed := make(map[string]*groupedQueries)
parsed := parsedQueryRequest{}
refIds := make(map[string]bool)
func newQueryParser(reader *expr.ExpressionQueryReader, legacy service.LegacyDataSourceLookup, tracer tracing.Tracer) *queryParser {
return &queryParser{
reader: reader,
legacy: legacy,
tracer: tracer,
}
}
for _, original := range raw.Queries {
if refIds[original.RefID] {
return parsed, fmt.Errorf("invalid query, duplicate refId: " + original.RefID)
// Split the main query into multiple
func (p *queryParser) parseRequest(ctx context.Context, input *query.QueryDataRequest) (parsedRequestInfo, error) {
ctx, span := p.tracer.Start(ctx, "QueryService.parseRequest")
defer span.End()
queryRefIDs := make(map[string]*data.DataQuery, len(input.Queries))
expressions := make(map[string]*expr.ExpressionQuery)
index := make(map[string]int) // index lookup
rsp := parsedRequestInfo{
RefIDTypes: make(map[string]string, len(input.Queries)),
}
// Ensure a valid time range
if input.From == "" {
input.From = "now-6h"
}
if input.To == "" {
input.To = "now"
}
for _, q := range input.Queries {
_, found := queryRefIDs[q.RefID]
if found {
return rsp, fmt.Errorf("multiple queries found for refId: %s", q.RefID)
}
_, found = expressions[q.RefID]
if found {
return rsp, fmt.Errorf("multiple queries found for refId: %s", q.RefID)
}
refIds[original.RefID] = true
q := original
ds, err := p.getValidDataSourceRef(ctx, q.Datasource, q.DatasourceID)
if err != nil {
return rsp, err
}
if q.TimeRange == nil && raw.From != "" {
q.TimeRange = &v0alpha1.TimeRange{
From: raw.From,
To: raw.To,
// Process each query
if expr.IsDataSource(ds.UID) {
// In order to process the query as a typed expression query, we
// are writing it back to JSON and parsing again. Alternatively we
// could construct it from the untyped map[string]any additional properties
// but this approach lets us focus on well typed behavior first
raw, err := json.Marshal(q)
if err != nil {
return rsp, err
}
iter, err := jsoniter.ParseBytes(jsoniter.ConfigDefault, raw)
if err != nil {
return rsp, err
}
exp, err := p.reader.ReadQuery(q, iter)
if err != nil {
return rsp, err
}
exp.GraphID = int64(len(expressions) + 1)
expressions[q.RefID] = &exp
} else {
key := fmt.Sprintf("%s/%s", ds.Type, ds.UID)
idx, ok := index[key]
if !ok {
idx = len(index)
index[key] = idx
rsp.Requests = append(rsp.Requests, datasourceRequest{
PluginId: ds.Type,
UID: ds.UID,
Request: &data.QueryDataRequest{
TimeRange: input.TimeRange,
Debug: input.Debug,
// no queries
},
})
}
req := rsp.Requests[idx].Request
req.Queries = append(req.Queries, q)
queryRefIDs[q.RefID] = &req.Queries[len(req.Queries)-1]
}
// Mark all the queries that should be hidden ()
if q.Hide {
rsp.HideBeforeReturn = append(rsp.HideBeforeReturn, q.RefID)
}
}
// Make sure all referenced variables exist and the expression order is stable
if len(expressions) > 0 {
queryNode := &expr.ExpressionQuery{
GraphID: -1,
}
// Build the graph for a request
dg := simple.NewDirectedGraph()
dg.AddNode(queryNode)
for _, exp := range expressions {
dg.AddNode(exp)
}
for _, exp := range expressions {
vars := exp.Command.NeedsVars()
for _, refId := range vars {
target := queryNode
q, ok := queryRefIDs[refId]
if !ok {
target, ok = expressions[refId]
if !ok {
return rsp, fmt.Errorf("expression [%s] is missing variable [%s]", exp.RefID, refId)
}
}
// Do not hide queries used in variables
if q != nil && q.Hide {
q.Hide = false
}
if target.ID() == exp.ID() {
return rsp, fmt.Errorf("expression [%s] can not depend on itself", exp.RefID)
}
dg.SetEdge(dg.NewEdge(target, exp))
}
}
// Extract out the expressions queries earlier
if expr.IsDataSource(q.Datasource.Type) || expr.IsDataSource(q.Datasource.UID) {
parsed.Expressions = append(parsed.Expressions, q)
continue
}
g := &groupedQueries{pluginId: q.Datasource.Type, uid: q.Datasource.UID}
group, ok := mixed[g.key()]
if !ok || group == nil {
group = g
mixed[g.key()] = g
}
group.query = append(group.query, q)
}
for _, q := range parsed.Expressions {
// TODO: parse and build tree, for now just fail fast on unknown commands
_, err := expr.GetExpressionCommandType(q.AdditionalProperties())
// Add the sorted expressions
sortedNodes, err := topo.SortStabilized(dg, nil)
if err != nil {
return parsed, err
return rsp, fmt.Errorf("cyclic references in query")
}
for _, v := range sortedNodes {
if v.ID() > 0 {
rsp.Expressions = append(rsp.Expressions, *v.(*expr.ExpressionQuery))
}
}
}
// Add each request
for _, v := range mixed {
parsed.Requests = append(parsed.Requests, *v)
}
return parsed, nil
return rsp, nil
}
func (p *queryParser) getValidDataSourceRef(ctx context.Context, ds *data.DataSourceRef, id int64) (*data.DataSourceRef, error) {
if ds == nil {
if id == 0 {
return nil, fmt.Errorf("missing datasource reference or id")
}
if p.legacy == nil {
return nil, fmt.Errorf("legacy datasource lookup unsupported (id:%d)", id)
}
return p.legacy.GetDataSourceFromDeprecatedFields(ctx, "", id)
}
if ds.Type == "" {
if ds.UID == "" {
return nil, fmt.Errorf("missing name/uid in data source reference")
}
if ds.UID == expr.DatasourceType {
return ds, nil
}
if p.legacy == nil {
return nil, fmt.Errorf("legacy datasource lookup unsupported (name:%s)", ds.UID)
}
return p.legacy.GetDataSourceFromDeprecatedFields(ctx, ds.UID, 0)
}
return ds, nil
}

View File

@@ -0,0 +1,131 @@
package query
import (
"context"
"encoding/json"
"fmt"
"os"
"path"
"strings"
"testing"
data "github.com/grafana/grafana-plugin-sdk-go/experimental/apis/data/v0alpha1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
query "github.com/grafana/grafana/pkg/apis/query/v0alpha1"
"github.com/grafana/grafana/pkg/expr"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/featuremgmt"
)
type parserTestObject struct {
Description string `json:"description,omitempty"`
Request query.QueryDataRequest `json:"input"`
Expect parsedRequestInfo `json:"expect"`
Error string `json:"error,omitempty"`
}
func TestQuerySplitting(t *testing.T) {
ctx := context.Background()
parser := newQueryParser(expr.NewExpressionQueryReader(featuremgmt.WithFeatures()),
&legacyDataSourceRetriever{}, tracing.InitializeTracerForTest())
t.Run("missing datasource flavors", func(t *testing.T) {
split, err := parser.parseRequest(ctx, &query.QueryDataRequest{
QueryDataRequest: data.QueryDataRequest{
Queries: []data.DataQuery{{
CommonQueryProperties: data.CommonQueryProperties{
RefID: "A",
},
}},
},
})
require.Error(t, err) // Missing datasource
require.Empty(t, split.Requests)
})
t.Run("applies default time range", func(t *testing.T) {
split, err := parser.parseRequest(ctx, &query.QueryDataRequest{
QueryDataRequest: data.QueryDataRequest{
TimeRange: data.TimeRange{}, // missing
Queries: []data.DataQuery{{
CommonQueryProperties: data.CommonQueryProperties{
RefID: "A",
Datasource: &data.DataSourceRef{
Type: "x",
UID: "abc",
},
},
}},
},
})
require.NoError(t, err)
require.Len(t, split.Requests, 1)
require.Equal(t, "now-6h", split.Requests[0].Request.From)
require.Equal(t, "now", split.Requests[0].Request.To)
})
t.Run("verify tests", func(t *testing.T) {
files, err := os.ReadDir("testdata")
require.NoError(t, err)
for _, file := range files {
if !strings.HasSuffix(file.Name(), ".json") {
continue
}
fpath := path.Join("testdata", file.Name())
// nolint:gosec
body, err := os.ReadFile(fpath)
require.NoError(t, err)
harness := &parserTestObject{}
err = json.Unmarshal(body, harness)
require.NoError(t, err)
changed := false
parsed, err := parser.parseRequest(ctx, &harness.Request)
if err != nil {
if !assert.Equal(t, harness.Error, err.Error(), "File %s", file) {
changed = true
}
} else {
x, _ := json.Marshal(parsed)
y, _ := json.Marshal(harness.Expect)
if !assert.JSONEq(t, string(y), string(x), "File %s", file) {
changed = true
}
}
if changed {
harness.Error = ""
harness.Expect = parsed
if err != nil {
harness.Error = err.Error()
}
jj, err := json.MarshalIndent(harness, "", " ")
require.NoError(t, err)
err = os.WriteFile(fpath, jj, 0600)
require.NoError(t, err)
}
}
})
}
type legacyDataSourceRetriever struct{}
func (s *legacyDataSourceRetriever) GetDataSourceFromDeprecatedFields(ctx context.Context, name string, id int64) (*data.DataSourceRef, error) {
if id == 100 {
return &data.DataSourceRef{
Type: "plugin-aaaa",
UID: "AAA",
}, nil
}
if name != "" {
return &data.DataSourceRef{
Type: "plugin-bbb",
UID: name,
}, nil
}
return nil, fmt.Errorf("missing parameter")
}

View File

@@ -3,62 +3,71 @@ package query
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/experimental/apis/data/v0alpha1"
"go.opentelemetry.io/otel/attribute"
"golang.org/x/sync/errgroup"
"github.com/grafana/grafana/pkg/apis/query/v0alpha1"
query "github.com/grafana/grafana/pkg/apis/query/v0alpha1"
"github.com/grafana/grafana/pkg/expr/mathexp"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/middleware/requestmeta"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/util/errutil"
"github.com/grafana/grafana/pkg/util/errutil/errhttp"
"github.com/grafana/grafana/pkg/web"
)
func (b *QueryAPIBuilder) handleQuery(w http.ResponseWriter, r *http.Request) {
reqDTO := v0alpha1.GenericQueryRequest{}
if err := web.Bind(r, &reqDTO); err != nil {
errhttp.Write(r.Context(), err, w)
return
}
// The query method (not really a create)
func (b *QueryAPIBuilder) doQuery(w http.ResponseWriter, r *http.Request) {
ctx, span := b.tracer.Start(r.Context(), "QueryService.Query")
defer span.End()
parsed, err := parseQueryRequest(reqDTO)
raw := &query.QueryDataRequest{}
err := web.Bind(r, raw)
if err != nil {
errhttp.Write(r.Context(), err, w)
errhttp.Write(ctx, errutil.BadRequest(
"query.bind",
errutil.WithPublicMessage("Error reading query")).
Errorf("error reading: %w", err), w)
return
}
ctx := r.Context()
qdr, err := b.processRequest(ctx, parsed)
// Parses the request and splits it into multiple sub queries (if necessary)
req, err := b.parser.parseRequest(ctx, raw)
if err != nil {
errhttp.Write(r.Context(), err, w)
return
}
statusCode := http.StatusOK
for _, res := range qdr.Responses {
if res.Error != nil {
statusCode = http.StatusBadRequest
if b.returnMultiStatus {
statusCode = http.StatusMultiStatus
}
if errors.Is(err, datasources.ErrDataSourceNotFound) {
errhttp.Write(ctx, errutil.BadRequest(
"query.datasource.notfound",
errutil.WithPublicMessage(err.Error())), w)
return
}
}
if statusCode != http.StatusOK {
requestmeta.WithDownstreamStatusSource(ctx)
errhttp.Write(ctx, errutil.BadRequest(
"query.parse",
errutil.WithPublicMessage("Error parsing query")).
Errorf("error parsing: %w", err), w)
return
}
w.Header().Set("Content-Type", "application/json")
err = json.NewEncoder(w).Encode(qdr)
// Actually run the query
rsp, err := b.execute(ctx, req)
if err != nil {
errhttp.Write(r.Context(), err, w)
errhttp.Write(ctx, errutil.Internal(
"query.execution",
errutil.WithPublicMessage("Error executing query")).
Errorf("execution error: %w", err), w)
return
}
w.WriteHeader(query.GetResponseCode(rsp))
_ = json.NewEncoder(w).Encode(rsp)
}
// See:
// https://github.com/grafana/grafana/blob/v10.2.3/pkg/services/query/query.go#L88
func (b *QueryAPIBuilder) processRequest(ctx context.Context, req parsedQueryRequest) (qdr *backend.QueryDataResponse, err error) {
func (b *QueryAPIBuilder) execute(ctx context.Context, req parsedRequestInfo) (qdr *backend.QueryDataResponse, err error) {
switch len(req.Requests) {
case 0:
break // nothing to do
@@ -69,25 +78,73 @@ func (b *QueryAPIBuilder) processRequest(ctx context.Context, req parsedQueryReq
}
if len(req.Expressions) > 0 {
return b.handleExpressions(ctx, qdr, req.Expressions)
qdr, err = b.handleExpressions(ctx, req, qdr)
}
return qdr, err
// Remove hidden results
for _, refId := range req.HideBeforeReturn {
r, ok := qdr.Responses[refId]
if ok && r.Error == nil {
delete(qdr.Responses, refId)
}
}
return
}
// Process a single request
// See: https://github.com/grafana/grafana/blob/v10.2.3/pkg/services/query/query.go#L242
func (b *QueryAPIBuilder) handleQuerySingleDatasource(ctx context.Context, req groupedQueries) (*backend.QueryDataResponse, error) {
gv, err := b.registry.GetDatasourceGroupVersion(req.pluginId)
func (b *QueryAPIBuilder) handleQuerySingleDatasource(ctx context.Context, req datasourceRequest) (*backend.QueryDataResponse, error) {
ctx, span := b.tracer.Start(ctx, "Query.handleQuerySingleDatasource")
defer span.End()
span.SetAttributes(
attribute.String("datasource.type", req.PluginId),
attribute.String("datasource.uid", req.UID),
)
allHidden := true
for idx := range req.Request.Queries {
if !req.Request.Queries[idx].Hide {
allHidden = false
break
}
}
if allHidden {
return &backend.QueryDataResponse{}, nil
}
// headers?
client, err := b.client.GetDataSourceClient(ctx, v0alpha1.DataSourceRef{
Type: req.PluginId,
UID: req.UID,
})
if err != nil {
return nil, err
}
return b.runner.ExecuteQueryData(ctx, gv, req.uid, req.query)
// headers?
_, rsp, err := client.QueryData(ctx, *req.Request)
if err == nil {
for _, q := range req.Request.Queries {
if q.ResultAssertions != nil {
result, ok := rsp.Responses[q.RefID]
if ok && result.Error == nil {
err = q.ResultAssertions.Validate(result.Frames)
if err != nil {
result.Error = err
result.ErrorSource = backend.ErrorSourceDownstream
rsp.Responses[q.RefID] = result
}
}
}
}
}
return rsp, err
}
// buildErrorResponses applies the provided error to each query response in the list. These queries should all belong to the same datasource.
func buildErrorResponse(err error, req groupedQueries) *backend.QueryDataResponse {
func buildErrorResponse(err error, req datasourceRequest) *backend.QueryDataResponse {
rsp := backend.NewQueryDataResponse()
for _, query := range req.query {
for _, query := range req.Request.Queries {
rsp.Responses[query.RefID] = backend.DataResponse{
Error: err,
}
@@ -96,13 +153,16 @@ func buildErrorResponse(err error, req groupedQueries) *backend.QueryDataRespons
}
// executeConcurrentQueries executes queries to multiple datasources concurrently and returns the aggregate result.
func (b *QueryAPIBuilder) executeConcurrentQueries(ctx context.Context, requests []groupedQueries) (*backend.QueryDataResponse, error) {
func (b *QueryAPIBuilder) executeConcurrentQueries(ctx context.Context, requests []datasourceRequest) (*backend.QueryDataResponse, error) {
ctx, span := b.tracer.Start(ctx, "Query.executeConcurrentQueries")
defer span.End()
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(b.concurrentQueryLimit) // prevent too many concurrent requests
rchan := make(chan *backend.QueryDataResponse, len(requests))
// Create panic recovery function for loop below
recoveryFn := func(req groupedQueries) {
recoveryFn := func(req datasourceRequest) {
if r := recover(); r != nil {
var err error
b.log.Error("query datasource panic", "error", r, "stack", log.Stack(1))
@@ -150,8 +210,63 @@ func (b *QueryAPIBuilder) executeConcurrentQueries(ctx context.Context, requests
return resp, nil
}
// NOTE the upstream queries have already been executed
// https://github.com/grafana/grafana/blob/v10.2.3/pkg/services/query/query.go#L242
func (b *QueryAPIBuilder) handleExpressions(ctx context.Context, qdr *backend.QueryDataResponse, expressions []v0alpha1.GenericDataQuery) (*backend.QueryDataResponse, error) {
return qdr, fmt.Errorf("expressions are not implemented yet")
// Unlike the implementation in expr/node.go, all datasource queries have been processed first
func (b *QueryAPIBuilder) handleExpressions(ctx context.Context, req parsedRequestInfo, data *backend.QueryDataResponse) (qdr *backend.QueryDataResponse, err error) {
start := time.Now()
ctx, span := b.tracer.Start(ctx, "SSE.handleExpressions")
defer func() {
var respStatus string
switch {
case err == nil:
respStatus = "success"
default:
respStatus = "failure"
}
duration := float64(time.Since(start).Nanoseconds()) / float64(time.Millisecond)
b.metrics.expressionsQuerySummary.WithLabelValues(respStatus).Observe(duration)
span.End()
}()
qdr = data
if qdr == nil {
qdr = &backend.QueryDataResponse{}
}
now := start // <<< this should come from the original query parser
vars := make(mathexp.Vars)
for _, expression := range req.Expressions {
// Setup the variables
for _, refId := range expression.Command.NeedsVars() {
_, ok := vars[refId]
if !ok {
dr, ok := qdr.Responses[refId]
if ok {
allowLongFrames := false // TODO -- depends on input type and only if SQL?
_, res, err := b.converter.Convert(ctx, req.RefIDTypes[refId], dr.Frames, allowLongFrames)
if err != nil {
res.Error = err
}
vars[refId] = res
} else {
// This should error in the parsing phase
err := fmt.Errorf("missing variable %s for %s", refId, expression.RefID)
qdr.Responses[refId] = backend.DataResponse{
Error: err,
}
return qdr, err
}
}
}
refId := expression.RefID
results, err := expression.Command.Execute(ctx, now, vars, b.tracer)
if err != nil {
results.Error = err
}
qdr.Responses[refId] = backend.DataResponse{
Error: results.Error,
Frames: results.Values.AsDataFrames(refId),
}
}
return qdr, nil
}

View File

@@ -1,9 +1,9 @@
package query
import (
"encoding/json"
"net/http"
data "github.com/grafana/grafana-plugin-sdk-go/experimental/apis/data/v0alpha1"
"github.com/grafana/grafana-plugin-sdk-go/experimental/schemabuilder"
"github.com/prometheus/client_golang/prometheus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -16,13 +16,17 @@ import (
"k8s.io/kube-openapi/pkg/spec3"
"k8s.io/kube-openapi/pkg/validation/spec"
example "github.com/grafana/grafana/pkg/apis/example/v0alpha1"
"github.com/grafana/grafana/pkg/apis/query/v0alpha1"
"github.com/grafana/grafana/pkg/apiserver/builder"
"github.com/grafana/grafana/pkg/expr"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/registry/apis/query/runner"
"github.com/grafana/grafana/pkg/registry/apis/query/client"
"github.com/grafana/grafana/pkg/services/accesscontrol"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/datasources/service"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/pluginsintegration/plugincontext"
"github.com/grafana/grafana/pkg/services/pluginsintegration/pluginstore"
@@ -35,22 +39,39 @@ type QueryAPIBuilder struct {
concurrentQueryLimit int
userFacingDefaultError string
returnMultiStatus bool // from feature toggle
features featuremgmt.FeatureToggles
runner v0alpha1.QueryRunner
registry v0alpha1.DataSourceApiServerRegistry
tracer tracing.Tracer
metrics *metrics
parser *queryParser
client DataSourceClientSupplier
registry v0alpha1.DataSourceApiServerRegistry
converter *expr.ResultConverter
}
func NewQueryAPIBuilder(features featuremgmt.FeatureToggles,
runner v0alpha1.QueryRunner,
client DataSourceClientSupplier,
registry v0alpha1.DataSourceApiServerRegistry,
) *QueryAPIBuilder {
legacy service.LegacyDataSourceLookup,
registerer prometheus.Registerer,
tracer tracing.Tracer,
) (*QueryAPIBuilder, error) {
reader := expr.NewExpressionQueryReader(features)
return &QueryAPIBuilder{
concurrentQueryLimit: 4, // from config?
concurrentQueryLimit: 4,
log: log.New("query_apiserver"),
returnMultiStatus: features.IsEnabledGlobally(featuremgmt.FlagDatasourceQueryMultiStatus),
runner: runner,
client: client,
registry: registry,
}
parser: newQueryParser(reader, legacy, tracer),
metrics: newMetrics(registerer),
tracer: tracer,
features: features,
converter: &expr.ResultConverter{
Features: features,
Tracer: tracer,
},
}, nil
}
func RegisterAPIService(features featuremgmt.FeatureToggles,
@@ -60,28 +81,24 @@ func RegisterAPIService(features featuremgmt.FeatureToggles,
accessControl accesscontrol.AccessControl,
pluginClient plugins.Client,
pCtxProvider *plugincontext.Provider,
) *QueryAPIBuilder {
registerer prometheus.Registerer,
tracer tracing.Tracer,
legacy service.LegacyDataSourceLookup,
) (*QueryAPIBuilder, error) {
if !features.IsEnabledGlobally(featuremgmt.FlagGrafanaAPIServerWithExperimentalAPIs) {
return nil // skip registration unless opting into experimental apis
return nil, nil // skip registration unless opting into experimental apis
}
builder := NewQueryAPIBuilder(
builder, err := NewQueryAPIBuilder(
features,
runner.NewDirectQueryRunner(pluginClient, pCtxProvider),
runner.NewDirectRegistry(pluginStore, dataSourcesService),
&CommonDataSourceClientSupplier{
Client: client.NewQueryClientForPluginClient(pluginClient, pCtxProvider),
},
client.NewDataSourceRegistryFromStore(pluginStore, dataSourcesService),
legacy, registerer, tracer,
)
// ONLY testdata...
if false {
builder = NewQueryAPIBuilder(
features,
runner.NewDummyTestRunner(),
runner.NewDummyRegistry(),
)
}
apiregistration.RegisterAPI(builder)
return builder
return builder, err
}
func (b *QueryAPIBuilder) GetGroupVersion() schema.GroupVersion {
@@ -92,7 +109,11 @@ func addKnownTypes(scheme *runtime.Scheme, gv schema.GroupVersion) {
scheme.AddKnownTypes(gv,
&v0alpha1.DataSourceApiServer{},
&v0alpha1.DataSourceApiServerList{},
&v0alpha1.QueryDataRequest{},
&v0alpha1.QueryDataResponse{},
&v0alpha1.QueryTypeDefinition{},
&v0alpha1.QueryTypeDefinitionList{},
&example.DummySubresource{},
)
}
@@ -126,50 +147,7 @@ func (b *QueryAPIBuilder) GetOpenAPIDefinitions() common.GetOpenAPIDefinitions {
// Register additional routes with the server
func (b *QueryAPIBuilder) GetAPIRoutes() *builder.APIRoutes {
defs := v0alpha1.GetOpenAPIDefinitions(func(path string) spec.Ref { return spec.Ref{} })
querySchema := defs["github.com/grafana/grafana/pkg/apis/query/v0alpha1.QueryRequest"].Schema
responseSchema := defs["github.com/grafana/grafana/pkg/apis/query/v0alpha1.QueryDataResponse"].Schema
var randomWalkQuery any
var randomWalkTable any
_ = json.Unmarshal([]byte(`{
"queries": [
{
"refId": "A",
"scenarioId": "random_walk",
"seriesCount": 1,
"datasource": {
"type": "grafana-testdata-datasource",
"uid": "PD8C576611E62080A"
},
"intervalMs": 60000,
"maxDataPoints": 20
}
],
"from": "1704893381544",
"to": "1704914981544"
}`), &randomWalkQuery)
_ = json.Unmarshal([]byte(`{
"queries": [
{
"refId": "A",
"scenarioId": "random_walk_table",
"seriesCount": 1,
"datasource": {
"type": "grafana-testdata-datasource",
"uid": "PD8C576611E62080A"
},
"intervalMs": 60000,
"maxDataPoints": 20
}
],
"from": "1704893381544",
"to": "1704914981544"
}`), &randomWalkTable)
return &builder.APIRoutes{
Root: []builder.APIRouteHandler{},
routes := &builder.APIRoutes{
Namespace: []builder.APIRouteHandler{
{
Path: "query",
@@ -177,38 +155,81 @@ func (b *QueryAPIBuilder) GetAPIRoutes() *builder.APIRoutes {
Post: &spec3.Operation{
OperationProps: spec3.OperationProps{
Tags: []string{"query"},
Description: "query across multiple datasources with expressions. This api matches the legacy /ds/query endpoint",
Summary: "Query",
Description: "longer description here?",
Parameters: []*spec3.Parameter{
{
ParameterProps: spec3.ParameterProps{
Name: "namespace",
Description: "object name and auth scope, such as for teams and projects",
In: "path",
Required: true,
Schema: spec.StringProperty(),
Example: "default",
Description: "workspace",
Schema: spec.StringProperty(),
},
},
},
RequestBody: &spec3.RequestBody{
RequestBodyProps: spec3.RequestBodyProps{
Required: true,
Description: "the query array",
Content: map[string]*spec3.MediaType{
"application/json": {
MediaTypeProps: spec3.MediaTypeProps{
Schema: querySchema.WithExample(randomWalkQuery),
Schema: spec.RefSchema("#/components/schemas/" + QueryRequestSchemaKey),
Examples: map[string]*spec3.Example{
"random_walk": {
"A": {
ExampleProps: spec3.ExampleProps{
Summary: "random walk",
Value: randomWalkQuery,
Summary: "Random walk (testdata)",
Description: "Use testdata to execute a random walk query",
Value: `{
"queries": [
{
"refId": "A",
"scenarioId": "random_walk_table",
"seriesCount": 1,
"datasource": {
"type": "grafana-testdata-datasource",
"uid": "PD8C576611E62080A"
},
"intervalMs": 60000,
"maxDataPoints": 20
}
],
"from": "now-6h",
"to": "now"
}`,
},
},
"random_walk_table": {
"B": {
ExampleProps: spec3.ExampleProps{
Summary: "random walk (table)",
Value: randomWalkTable,
Summary: "With deprecated datasource name",
Description: "Includes an old style string for datasource reference",
Value: `{
"queries": [
{
"refId": "A",
"datasource": {
"type": "grafana-googlesheets-datasource",
"uid": "b1808c48-9fc9-4045-82d7-081781f8a553"
},
"cacheDurationSeconds": 300,
"spreadsheet": "spreadsheetID",
"datasourceId": 4,
"intervalMs": 30000,
"maxDataPoints": 794
},
{
"refId": "Z",
"datasource": "old",
"maxDataPoints": 10,
"timeRange": {
"from": "100",
"to": "200"
}
}
],
"from": "now-6h",
"to": "now"
}`,
},
},
},
@@ -220,25 +241,12 @@ func (b *QueryAPIBuilder) GetAPIRoutes() *builder.APIRoutes {
Responses: &spec3.Responses{
ResponsesProps: spec3.ResponsesProps{
StatusCodeResponses: map[int]*spec3.Response{
http.StatusOK: {
200: {
ResponseProps: spec3.ResponseProps{
Description: "Query results",
Content: map[string]*spec3.MediaType{
"application/json": {
MediaTypeProps: spec3.MediaTypeProps{
Schema: &responseSchema,
},
},
},
},
},
http.StatusMultiStatus: {
ResponseProps: spec3.ResponseProps{
Description: "Errors exist in the downstream results",
Content: map[string]*spec3.MediaType{
"application/json": {
MediaTypeProps: spec3.MediaTypeProps{
Schema: &responseSchema,
Schema: spec.StringProperty(), // TODO!!!
},
},
},
@@ -250,12 +258,47 @@ func (b *QueryAPIBuilder) GetAPIRoutes() *builder.APIRoutes {
},
},
},
Handler: b.handleQuery,
Handler: b.doQuery,
},
},
}
return routes
}
func (b *QueryAPIBuilder) GetAuthorizer() authorizer.Authorizer {
return nil // default is OK
}
const QueryRequestSchemaKey = "QueryRequestSchema"
const QueryPayloadSchemaKey = "QueryPayloadSchema"
func (b *QueryAPIBuilder) PostProcessOpenAPI(oas *spec3.OpenAPI) (*spec3.OpenAPI, error) {
// The plugin description
oas.Info.Description = "Query service"
// The root api URL
root := "/apis/" + b.GetGroupVersion().String() + "/"
var err error
opts := schemabuilder.QuerySchemaOptions{
PluginID: []string{""},
QueryTypes: []data.QueryTypeDefinition{},
Mode: schemabuilder.SchemaTypeQueryPayload,
}
oas.Components.Schemas[QueryPayloadSchemaKey], err = schemabuilder.GetQuerySchema(opts)
if err != nil {
return oas, err
}
opts.Mode = schemabuilder.SchemaTypeQueryRequest
oas.Components.Schemas[QueryRequestSchemaKey], err = schemabuilder.GetQuerySchema(opts)
if err != nil {
return oas, err
}
// The root API discovery list
sub := oas.Paths.Paths[root]
if sub != nil && sub.Get != nil {
sub.Get.Tags = []string{"API Discovery"} // sorts first in the list
}
return oas, nil
}

View File

@@ -0,0 +1,29 @@
{
"description": "self dependencies",
"input": {
"from": "now-6",
"to": "now",
"queries": [
{
"refId": "A",
"datasource": {
"type": "",
"uid": "__expr__"
},
"expression": "$B",
"type": "math"
},
{
"refId": "B",
"datasource": {
"type": "",
"uid": "__expr__"
},
"type": "math",
"expression": "$A"
}
]
},
"expect": {},
"error": "cyclic references in query"
}

View File

@@ -0,0 +1,60 @@
{
"input": {
"from": "now-6",
"to": "now",
"queries": [
{
"refId": "A",
"datasource": {
"type": "plugin-x",
"uid": "123"
}
},
{
"refId": "B",
"datasource": {
"type": "plugin-x",
"uid": "456"
}
}
]
},
"expect": {
"requests": [
{
"pluginId": "plugin-x",
"uid": "123",
"request": {
"from": "now-6",
"to": "now",
"queries": [
{
"refId": "A",
"datasource": {
"type": "plugin-x",
"uid": "123"
}
}
]
}
},
{
"pluginId": "plugin-x",
"uid": "456",
"request": {
"from": "now-6",
"to": "now",
"queries": [
{
"refId": "B",
"datasource": {
"type": "plugin-x",
"uid": "456"
}
}
]
}
}
]
}
}

View File

@@ -0,0 +1,20 @@
{
"description": "self dependencies",
"input": {
"from": "now-6",
"to": "now",
"queries": [
{
"refId": "A",
"datasource": {
"type": "",
"uid": "__expr__"
},
"type": "math",
"expression": "$A"
}
]
},
"expect": {},
"error": "expression [A] can not depend on itself"
}

View File

@@ -0,0 +1,79 @@
{
"description": "one hidden query with two expressions that start out-of-order",
"input": {
"from": "now-6",
"to": "now",
"queries": [
{
"refId": "C",
"datasource": {
"type": "",
"uid": "__expr__"
},
"type": "reduce",
"expression": "$B",
"reducer": "last"
},
{
"refId": "A",
"datasource": {
"type": "sql",
"uid": "123"
},
"hide": true
},
{
"refId": "B",
"datasource": {
"type": "",
"uid": "-100"
},
"type": "math",
"expression": "$A + 10"
}
]
},
"expect": {
"requests": [
{
"pluginId": "sql",
"uid": "123",
"request": {
"from": "now-6",
"to": "now",
"queries": [
{
"refId": "A",
"datasource": {
"type": "sql",
"uid": "123"
}
}
]
}
}
],
"expressions": [
{
"id": 2,
"refId": "B",
"type": "math",
"properties": {
"expression": "$A + 10"
}
},
{
"id": 1,
"refId": "C",
"type": "reduce",
"properties": {
"expression": "$B",
"reducer": "last"
}
}
],
"hide": [
"A"
]
}
}