diff --git a/.circleci/config.yml b/.circleci/config.yml index d61a223e149..1ebc6586599 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -828,9 +828,11 @@ jobs: -E varcheck -E goconst -E errcheck -E staticcheck ./pkg/... ./scripts/go/bin/revive -formatter stylish -config ./scripts/go/configs/revive.toml ./pkg/... ./scripts/go/bin/revive -formatter stylish -config ./scripts/go/configs/revive-strict.toml \ + -exclude ./pkg/plugins/backendplugin/pluginextensionv2/... \ ./pkg/services/alerting/... \ ./pkg/services/provisioning/datasources/... \ - ./pkg/services/provisioning/dashboards/... + ./pkg/services/provisioning/dashboards/... \ + ./pkg/plugins/backendplugin/... ./scripts/go/bin/gosec -quiet -exclude=G104,G107,G108,G201,G202,G204,G301,G304,G401,G402,G501 \ -conf=./scripts/go/configs/gosec.json ./pkg/... diff --git a/Makefile b/Makefile index 5f60a7a714c..49ff6b2d552 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ -include local/Makefile -.PHONY: all deps-go deps-js deps build-go build-server build-cli build-js build build-docker-dev build-docker-full lint-go gosec revive golangci-lint go-vet test-go test-js test run run-frontend clean devenv devenv-down revive-alerting protobuf help +.PHONY: all deps-go deps-js deps build-go build-server build-cli build-js build build-docker-dev build-docker-full lint-go gosec revive golangci-lint go-vet test-go test-js test run run-frontend clean devenv devenv-down revive-strict protobuf help GO = GO111MODULE=on go GO_FILES ?= ./pkg/... @@ -81,14 +81,16 @@ revive: scripts/go/bin/revive -config ./scripts/go/configs/revive.toml \ $(GO_FILES) -revive-alerting: scripts/go/bin/revive - @echo "lint alerting via revive" +revive-strict: scripts/go/bin/revive + @echo "lint via revive (strict)" @scripts/go/bin/revive \ -formatter stylish \ -config ./scripts/go/configs/revive-strict.toml \ + -exclude ./pkg/plugins/backendplugin/pluginextensionv2/... \ ./pkg/services/alerting/... \ ./pkg/services/provisioning/datasources/... \ - ./pkg/services/provisioning/dashboards/... + ./pkg/services/provisioning/dashboards/... \ + ./pkg/plugins/backendplugin/... scripts/go/bin/golangci-lint: scripts/go/go.mod @cd scripts/go; \ @@ -116,7 +118,7 @@ go-vet: @echo "lint via go vet" @$(GO) vet $(GO_FILES) -lint-go: go-vet golangci-lint revive revive-alerting gosec ## Run all code checks for backend. +lint-go: go-vet golangci-lint revive revive-strict gosec ## Run all code checks for backend. # with disabled SC1071 we are ignored some TCL,Expect `/usr/bin/env expect` scripts shellcheck: $(SH_FILES) ## Run checks for shell scripts. diff --git a/go.mod b/go.mod index 78e47ce2d83..a7b2dfd913c 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,7 @@ require ( github.com/gorilla/websocket v1.4.1 github.com/gosimple/slug v1.4.2 github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4 - github.com/grafana/grafana-plugin-sdk-go v0.66.0 + github.com/grafana/grafana-plugin-sdk-go v0.67.0 github.com/hashicorp/go-hclog v0.0.0-20180709165350-ff2cf002a8dd github.com/hashicorp/go-plugin v1.2.2 github.com/hashicorp/go-version v1.1.0 diff --git a/go.sum b/go.sum index 35d101ccd22..0e6ce6f4043 100644 --- a/go.sum +++ b/go.sum @@ -155,8 +155,8 @@ github.com/gosimple/slug v1.4.2 h1:jDmprx3q/9Lfk4FkGZtvzDQ9Cj9eAmsjzeQGp24PeiQ= github.com/gosimple/slug v1.4.2/go.mod h1:ER78kgg1Mv0NQGlXiDe57DpCyfbNywXXZ9mIorhxAf0= github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4 h1:SPdxCL9BChFTlyi0Khv64vdCW4TMna8+sxL7+Chx+Ag= github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4/go.mod h1:nc0XxBzjeGcrMltCDw269LoWF9S8ibhgxolCdA1R8To= -github.com/grafana/grafana-plugin-sdk-go v0.66.0 h1:Tx8pchA74QUtxtN8moavf0FJoAZbnbW3Fe8RVfSKUoY= -github.com/grafana/grafana-plugin-sdk-go v0.66.0/go.mod h1:w855JyiC5PDP3naWUJP0h/vY8RlzlE4+4fodyoXph+4= +github.com/grafana/grafana-plugin-sdk-go v0.67.0 h1:2kvI9kROmp/pXRrDQSEvcpR7Zonle7HjXgOdH70P2bw= +github.com/grafana/grafana-plugin-sdk-go v0.67.0/go.mod h1:w855JyiC5PDP3naWUJP0h/vY8RlzlE4+4fodyoXph+4= github.com/grpc-ecosystem/go-grpc-middleware v1.2.0 h1:0IKlLyQ3Hs9nDaiK5cSHAGmcQEIC8l2Ts1u6x5Dfrqg= github.com/grpc-ecosystem/go-grpc-middleware v1.2.0/go.mod h1:mJzapYve32yjrKlk9GbyCZHuPgZsrbyIbyKhSzOpg6s= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho= diff --git a/pkg/api/api.go b/pkg/api/api.go index d5d169eaa08..d2fd5dc499a 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -266,7 +266,7 @@ func (hs *HTTPServer) registerRoutes() { apiRoute.Any("/datasources/proxy/:id", reqSignedIn, hs.ProxyDataSourceRequest) apiRoute.Any("/datasources/:id/resources", hs.CallDatasourceResource) apiRoute.Any("/datasources/:id/resources/*", hs.CallDatasourceResource) - apiRoute.Any("/datasources/:id/health", hs.CheckDatasourceHealth) + apiRoute.Any("/datasources/:id/health", Wrap(hs.CheckDatasourceHealth)) // Folders apiRoute.Group("/folders", func(folderRoute routing.RouteRegister) { diff --git a/pkg/api/datasources.go b/pkg/api/datasources.go index e99cdd6f670..bfffa904a6c 100644 --- a/pkg/api/datasources.go +++ b/pkg/api/datasources.go @@ -11,7 +11,6 @@ import ( "github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/plugins" - "github.com/grafana/grafana/pkg/plugins/backendplugin" "github.com/grafana/grafana/pkg/plugins/datasource/wrapper" "github.com/grafana/grafana/pkg/util" ) @@ -342,28 +341,25 @@ func convertModelToDtos(ds *models.DataSource) dtos.DataSource { // CheckDatasourceHealth sends a health check request to the plugin datasource // /api/datasource/:id/health -func (hs *HTTPServer) CheckDatasourceHealth(c *models.ReqContext) { +func (hs *HTTPServer) CheckDatasourceHealth(c *models.ReqContext) Response { datasourceID := c.ParamsInt64("id") ds, err := hs.DatasourceCache.GetDatasource(datasourceID, c.SignedInUser, c.SkipCache) if err != nil { if err == models.ErrDataSourceAccessDenied { - c.JsonApiErr(403, "Access denied to datasource", err) - return + return Error(403, "Access denied to datasource", err) } - c.JsonApiErr(500, "Unable to load datasource metadata", err) - return + return Error(500, "Unable to load datasource metadata", err) } plugin, ok := hs.PluginManager.GetDatasource(ds.Type) if !ok { - c.JsonApiErr(500, "Unable to find datasource plugin", err) - return + return Error(500, "Unable to find datasource plugin", err) } dsInstanceSettings, err := wrapper.ModelToInstanceSettings(ds) if err != nil { - c.JsonApiErr(500, "Unable to get datasource model", err) + return Error(500, "Unable to get datasource model", err) } pCtx := backend.PluginContext{ User: wrapper.BackendUserFromSignedInUser(c.SignedInUser), @@ -374,25 +370,7 @@ func (hs *HTTPServer) CheckDatasourceHealth(c *models.ReqContext) { resp, err := hs.BackendPluginManager.CheckHealth(c.Req.Context(), pCtx) if err != nil { - if err == backendplugin.ErrPluginNotRegistered { - c.JsonApiErr(404, "Plugin not found", err) - return - } - - // Return status unknown instead? - if err == backendplugin.ErrDiagnosticsNotSupported { - c.JsonApiErr(404, "Health check not implemented", err) - return - } - - // Return status unknown or error instead? - if err == backendplugin.ErrHealthCheckFailed { - c.JsonApiErr(500, "Plugin health check failed", err) - return - } - - c.JsonApiErr(500, "Plugin healthcheck returned an unknown error", err) - return + return translatePluginRequestErrorToAPIError(err) } payload := map[string]interface{}{ @@ -405,17 +383,15 @@ func (hs *HTTPServer) CheckDatasourceHealth(c *models.ReqContext) { var jsonDetails map[string]interface{} err = json.Unmarshal(resp.JSONDetails, &jsonDetails) if err != nil { - c.JsonApiErr(500, "Failed to unmarshal detailed response from backend plugin", err) - return + return Error(500, "Failed to unmarshal detailed response from backend plugin", err) } payload["details"] = jsonDetails } - if resp.Status != backendplugin.HealthStatusOk { - c.JSON(503, payload) - return + if resp.Status != backend.HealthStatusOk { + return JSON(503, payload) } - c.JSON(200, payload) + return JSON(200, payload) } diff --git a/pkg/api/plugins.go b/pkg/api/plugins.go index 00c23585f14..bdbbc373140 100644 --- a/pkg/api/plugins.go +++ b/pkg/api/plugins.go @@ -266,20 +266,12 @@ func (hs *HTTPServer) CollectPluginMetrics(c *models.ReqContext) Response { pluginID := c.Params("pluginId") plugin, exists := plugins.Plugins[pluginID] if !exists { - return Error(404, "Plugin not found, no installed plugin with that id", nil) + return Error(404, "Plugin not found", nil) } resp, err := hs.BackendPluginManager.CollectMetrics(c.Req.Context(), plugin.Id) if err != nil { - if err == backendplugin.ErrPluginNotRegistered { - return Error(404, "Plugin not found", err) - } - - if err == backendplugin.ErrDiagnosticsNotSupported { - return Error(404, "Health check not implemented", err) - } - - return Error(500, "Collect plugin metrics failed", err) + return translatePluginRequestErrorToAPIError(err) } headers := make(http.Header) @@ -300,7 +292,7 @@ func (hs *HTTPServer) CheckHealth(c *models.ReqContext) Response { pCtx, err := hs.getPluginContext(pluginID, c.SignedInUser) if err != nil { if err == ErrPluginNotFound { - return Error(404, "Plugin not found, no installed plugin with that id", nil) + return Error(404, "Plugin not found", nil) } return Error(500, "Failed to get plugin settings", err) @@ -308,21 +300,7 @@ func (hs *HTTPServer) CheckHealth(c *models.ReqContext) Response { resp, err := hs.BackendPluginManager.CheckHealth(c.Req.Context(), pCtx) if err != nil { - if err == backendplugin.ErrPluginNotRegistered { - return Error(404, "Plugin not found", err) - } - - // Return status unknown instead? - if err == backendplugin.ErrDiagnosticsNotSupported { - return Error(404, "Health check not implemented", err) - } - - // Return status unknown or error instead? - if err == backendplugin.ErrHealthCheckFailed { - return Error(500, "Plugin health check failed", err) - } - - return Error(500, "Plugin healthcheck returned an unknown error", err) + return translatePluginRequestErrorToAPIError(err) } payload := map[string]interface{}{ @@ -341,7 +319,7 @@ func (hs *HTTPServer) CheckHealth(c *models.ReqContext) Response { payload["details"] = jsonDetails } - if resp.Status != backendplugin.HealthStatusOk { + if resp.Status != backend.HealthStatusOk { return JSON(503, payload) } @@ -357,7 +335,7 @@ func (hs *HTTPServer) CallResource(c *models.ReqContext) { pCtx, err := hs.getPluginContext(pluginID, c.SignedInUser) if err != nil { if err == ErrPluginNotFound { - c.JsonApiErr(404, "Plugin not found, no installed plugin with that id", nil) + c.JsonApiErr(404, "Plugin not found", nil) return } @@ -385,3 +363,23 @@ func (hs *HTTPServer) getCachedPluginSettings(pluginID string, user *models.Sign hs.CacheService.Set(cacheKey, query.Result, time.Second*5) return query.Result, nil } + +func translatePluginRequestErrorToAPIError(err error) Response { + if errors.Is(err, backendplugin.ErrPluginNotRegistered) { + return Error(404, "Plugin not found", err) + } + + if errors.Is(err, backendplugin.ErrMethodNotImplemented) { + return Error(404, "Not found", err) + } + + if errors.Is(err, backendplugin.ErrHealthCheckFailed) { + return Error(500, "Plugin health check failed", err) + } + + if errors.Is(err, backendplugin.ErrPluginUnavailable) { + return Error(503, "Plugin unavailable", err) + } + + return Error(500, "Plugin request failed", err) +} diff --git a/pkg/plugins/backendplugin/backend_plugin.go b/pkg/plugins/backendplugin/backend_plugin.go deleted file mode 100644 index fdeb12a9d26..00000000000 --- a/pkg/plugins/backendplugin/backend_plugin.go +++ /dev/null @@ -1,240 +0,0 @@ -package backendplugin - -import ( - "context" - "errors" - "net/http" - - datasourceV1 "github.com/grafana/grafana-plugin-model/go/datasource" - rendererV1 "github.com/grafana/grafana-plugin-model/go/renderer" - "github.com/grafana/grafana-plugin-sdk-go/backend" - "github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2" - "github.com/grafana/grafana/pkg/infra/log" - "github.com/grafana/grafana/pkg/plugins/backendplugin/pluginextensionv2" - "github.com/grafana/grafana/pkg/util/errutil" - plugin "github.com/hashicorp/go-plugin" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) - -// BackendPlugin a registered backend plugin. -type BackendPlugin struct { - id string - executablePath string - managed bool - clientFactory func() *plugin.Client - client *plugin.Client - logger log.Logger - startFns PluginStartFuncs - diagnostics DiagnosticsPlugin - resource ResourcePlugin -} - -func (p *BackendPlugin) start(ctx context.Context) error { - p.client = p.clientFactory() - rpcClient, err := p.client.Client() - if err != nil { - return err - } - - var legacyClient *LegacyClient - var client *Client - - if p.client.NegotiatedVersion() > 1 { - rawDiagnostics, err := rpcClient.Dispense("diagnostics") - if err != nil { - return err - } - - rawResource, err := rpcClient.Dispense("resource") - if err != nil { - return err - } - - rawData, err := rpcClient.Dispense("data") - if err != nil { - return err - } - - rawTransform, err := rpcClient.Dispense("transform") - if err != nil { - return err - } - - rawRenderer, err := rpcClient.Dispense("renderer") - if err != nil { - return err - } - - if rawDiagnostics != nil { - if plugin, ok := rawDiagnostics.(DiagnosticsPlugin); ok { - p.diagnostics = plugin - } - } - - client = &Client{} - if rawResource != nil { - if plugin, ok := rawResource.(ResourcePlugin); ok { - p.resource = plugin - client.ResourcePlugin = plugin - } - } - - if rawData != nil { - if plugin, ok := rawData.(DataPlugin); ok { - client.DataPlugin = plugin - } - } - - if rawTransform != nil { - if plugin, ok := rawTransform.(TransformPlugin); ok { - client.TransformPlugin = plugin - } - } - - if rawRenderer != nil { - if plugin, ok := rawRenderer.(pluginextensionv2.RendererPlugin); ok { - client.RendererPlugin = plugin - } - } - } else { - p.logger.Warn("Plugin uses a deprecated version of Grafana's backend plugin system which will be removed in a future release. " + - "Consider upgrading to a newer plugin version or reach out to the plugin repository/developer and request an upgrade.") - raw, err := rpcClient.Dispense(p.id) - if err != nil { - return err - } - - legacyClient = &LegacyClient{} - if plugin, ok := raw.(datasourceV1.DatasourcePlugin); ok { - legacyClient.DatasourcePlugin = plugin - } - - if plugin, ok := raw.(rendererV1.RendererPlugin); ok { - legacyClient.RendererPlugin = plugin - } - } - - if legacyClient == nil && client == nil { - return errors.New("no compatible plugin implementation found") - } - - if legacyClient != nil && p.startFns.OnLegacyStart != nil { - if err := p.startFns.OnLegacyStart(p.id, legacyClient, p.logger); err != nil { - return err - } - } - - if client != nil && p.startFns.OnStart != nil { - if err := p.startFns.OnStart(p.id, client, p.logger); err != nil { - return err - } - } - - return nil -} - -func (p *BackendPlugin) stop() error { - if p.client != nil { - p.client.Kill() - } - return nil -} - -// supportsDiagnostics return whether backend plugin supports diagnostics like metrics and health check. -func (p *BackendPlugin) supportsDiagnostics() bool { - return p.diagnostics != nil -} - -// CollectMetrics implements the collector.Collector interface. -func (p *BackendPlugin) CollectMetrics(ctx context.Context) (*pluginv2.CollectMetricsResponse, error) { - if p.diagnostics == nil || p.client == nil || p.client.Exited() { - return &pluginv2.CollectMetricsResponse{ - Metrics: &pluginv2.CollectMetricsResponse_Payload{}, - }, nil - } - - var res *pluginv2.CollectMetricsResponse - err := InstrumentPluginRequest(p.id, "metrics", func() error { - var innerErr error - res, innerErr = p.diagnostics.CollectMetrics(ctx, &pluginv2.CollectMetricsRequest{}) - - return innerErr - }) - - if err != nil { - if st, ok := status.FromError(err); ok { - if st.Code() == codes.Unimplemented { - return &pluginv2.CollectMetricsResponse{ - Metrics: &pluginv2.CollectMetricsResponse_Payload{}, - }, nil - } - } - - return nil, err - } - - return res, nil -} - -var toProto = backend.ToProto() - -func (p *BackendPlugin) checkHealth(ctx context.Context, pCtx backend.PluginContext) (*pluginv2.CheckHealthResponse, error) { - if p.diagnostics == nil || p.client == nil || p.client.Exited() { - return &pluginv2.CheckHealthResponse{ - Status: pluginv2.CheckHealthResponse_UNKNOWN, - }, nil - } - - protoContext := toProto.PluginContext(pCtx) - - var res *pluginv2.CheckHealthResponse - err := InstrumentPluginRequest(p.id, "checkhealth", func() error { - var innerErr error - res, innerErr = p.diagnostics.CheckHealth(ctx, &pluginv2.CheckHealthRequest{PluginContext: protoContext}) - return innerErr - }) - - if err != nil { - if st, ok := status.FromError(err); ok { - if st.Code() == codes.Unimplemented { - return &pluginv2.CheckHealthResponse{ - Status: pluginv2.CheckHealthResponse_UNKNOWN, - Message: "Health check not implemented", - }, nil - } - } - return nil, err - } - - return res, nil -} - -func (p *BackendPlugin) callResource(ctx context.Context, req *backend.CallResourceRequest) (callResourceResultStream, error) { - p.logger.Debug("Calling resource", "path", req.Path, "method", req.Method) - - if p.resource == nil || p.client == nil || p.client.Exited() { - return nil, errors.New("plugin not running, cannot call resource") - } - - protoReq := toProto.CallResourceRequest(req) - - protoStream, err := p.resource.CallResource(ctx, protoReq) - if err != nil { - if st, ok := status.FromError(err); ok { - if st.Code() == codes.Unimplemented { - return &singleCallResourceResult{ - result: &CallResourceResult{ - Status: http.StatusNotImplemented, - }, - }, nil - } - } - - return nil, errutil.Wrap("Failed to call resource", err) - } - - return &callResourceResultStreamImpl{ - stream: protoStream, - }, nil -} diff --git a/pkg/plugins/backendplugin/contracts.go b/pkg/plugins/backendplugin/contracts.go deleted file mode 100644 index 37ae086de99..00000000000 --- a/pkg/plugins/backendplugin/contracts.go +++ /dev/null @@ -1,123 +0,0 @@ -package backendplugin - -import ( - "strconv" - - "github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2" -) - -// HealthStatus is the status of the plugin. -type HealthStatus int - -const ( - // HealthStatusUnknown means the status of the plugin is unknown. - HealthStatusUnknown HealthStatus = iota - // HealthStatusOk means the status of the plugin is good. - HealthStatusOk - // HealthStatusError means the plugin is in an error state. - HealthStatusError -) - -var healthStatusNames = map[int]string{ - 0: "UNKNOWN", - 1: "OK", - 2: "ERROR", -} - -func (hs HealthStatus) String() string { - s, exists := healthStatusNames[int(hs)] - if exists { - return s - } - return strconv.Itoa(int(hs)) -} - -// CheckHealthResult check health result. -type CheckHealthResult struct { - Status HealthStatus - Message string - JSONDetails []byte -} - -func checkHealthResultFromProto(protoResp *pluginv2.CheckHealthResponse) *CheckHealthResult { - status := HealthStatusUnknown - switch protoResp.Status { - case pluginv2.CheckHealthResponse_ERROR: - status = HealthStatusError - case pluginv2.CheckHealthResponse_OK: - status = HealthStatusOk - } - - return &CheckHealthResult{ - Status: status, - Message: protoResp.Message, - JSONDetails: protoResp.JsonDetails, - } -} - -func collectMetricsResultFromProto(protoResp *pluginv2.CollectMetricsResponse) *CollectMetricsResult { - var prometheusMetrics []byte - - if protoResp.Metrics != nil { - prometheusMetrics = protoResp.Metrics.Prometheus - } - - return &CollectMetricsResult{ - PrometheusMetrics: prometheusMetrics, - } -} - -// CollectMetricsResult collect metrics result. -type CollectMetricsResult struct { - PrometheusMetrics []byte -} - -// CallResourceResult call resource result. -type CallResourceResult struct { - Status int - Headers map[string][]string - Body []byte -} - -type callResourceResultStream interface { - Recv() (*CallResourceResult, error) - Close() error -} - -type callResourceResultStreamImpl struct { - stream pluginv2.Resource_CallResourceClient -} - -func (s *callResourceResultStreamImpl) Recv() (*CallResourceResult, error) { - protoResp, err := s.stream.Recv() - if err != nil { - return nil, err - } - - respHeaders := map[string][]string{} - for key, values := range protoResp.Headers { - respHeaders[key] = values.Values - } - - return &CallResourceResult{ - Headers: respHeaders, - Body: protoResp.Body, - Status: int(protoResp.Code), - }, nil -} - -func (s *callResourceResultStreamImpl) Close() error { - return s.stream.CloseSend() -} - -type singleCallResourceResult struct { - result *CallResourceResult -} - -func (s *singleCallResourceResult) Recv() (*CallResourceResult, error) { - return s.result, nil -} - -func (s *singleCallResourceResult) Close() error { - return nil -} diff --git a/pkg/plugins/backendplugin/coreplugin/core_plugin.go b/pkg/plugins/backendplugin/coreplugin/core_plugin.go new file mode 100644 index 00000000000..e3168539e96 --- /dev/null +++ b/pkg/plugins/backendplugin/coreplugin/core_plugin.go @@ -0,0 +1,81 @@ +package coreplugin + +import ( + "context" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/plugins/backendplugin" + "github.com/grafana/grafana/pkg/tsdb" +) + +type corePlugin struct { + pluginID string + logger log.Logger + backend.CheckHealthHandler + backend.CallResourceHandler + backend.QueryDataHandler +} + +// New returns a new backendplugin.PluginFactoryFunc for creating a core (built-in) backendplugin.Plugin. +func New(opts backend.ServeOpts) backendplugin.PluginFactoryFunc { + return backendplugin.PluginFactoryFunc(func(pluginID string, logger log.Logger, env []string) (backendplugin.Plugin, error) { + return &corePlugin{ + pluginID: pluginID, + logger: logger, + CheckHealthHandler: opts.CheckHealthHandler, + CallResourceHandler: opts.CallResourceHandler, + QueryDataHandler: opts.QueryDataHandler, + }, nil + }) +} + +func (cp *corePlugin) PluginID() string { + return cp.pluginID +} + +func (cp *corePlugin) Logger() log.Logger { + return cp.logger +} + +func (cp *corePlugin) Start(ctx context.Context) error { + if cp.QueryDataHandler != nil { + tsdb.RegisterTsdbQueryEndpoint(cp.pluginID, func(dsInfo *models.DataSource) (tsdb.TsdbQueryEndpoint, error) { + return newQueryEndpointAdapter(cp.pluginID, cp.logger, backendplugin.InstrumentQueryDataHandler(cp.QueryDataHandler)), nil + }) + } + return nil +} + +func (cp *corePlugin) Stop(ctx context.Context) error { + return nil +} + +func (cp *corePlugin) IsManaged() bool { + return false +} + +func (cp *corePlugin) Exited() bool { + return false +} + +func (cp *corePlugin) CollectMetrics(ctx context.Context) (*backend.CollectMetricsResult, error) { + return nil, backendplugin.ErrMethodNotImplemented +} + +func (cp *corePlugin) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { + if cp.CheckHealthHandler != nil { + return cp.CheckHealthHandler.CheckHealth(ctx, req) + } + + return nil, backendplugin.ErrMethodNotImplemented +} + +func (cp *corePlugin) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { + if cp.CallResourceHandler != nil { + return cp.CallResourceHandler.CallResource(ctx, req, sender) + } + + return backendplugin.ErrMethodNotImplemented +} diff --git a/pkg/plugins/backendplugin/coreplugin/core_plugin_test.go b/pkg/plugins/backendplugin/coreplugin/core_plugin_test.go new file mode 100644 index 00000000000..d9fa8be7f2a --- /dev/null +++ b/pkg/plugins/backendplugin/coreplugin/core_plugin_test.go @@ -0,0 +1,67 @@ +package coreplugin_test + +import ( + "context" + "testing" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/plugins/backendplugin" + "github.com/grafana/grafana/pkg/plugins/backendplugin/coreplugin" + "github.com/stretchr/testify/require" +) + +func TestCorePlugin(t *testing.T) { + t.Run("New core plugin with empty opts should return expected values", func(t *testing.T) { + factory := coreplugin.New(backend.ServeOpts{}) + p, err := factory("plugin", log.New("test"), nil) + require.NoError(t, err) + require.NotNil(t, p) + require.NoError(t, p.Start(context.Background())) + require.NoError(t, p.Stop(context.Background())) + require.False(t, p.IsManaged()) + require.False(t, p.Exited()) + + _, err = p.CollectMetrics(context.Background()) + require.Equal(t, backendplugin.ErrMethodNotImplemented, err) + + _, err = p.CheckHealth(context.Background(), nil) + require.Equal(t, backendplugin.ErrMethodNotImplemented, err) + + err = p.CallResource(context.Background(), nil, nil) + require.Equal(t, backendplugin.ErrMethodNotImplemented, err) + }) + + t.Run("New core plugin with handlers set in opts should return expected values", func(t *testing.T) { + checkHealthCalled := false + callResourceCalled := false + factory := coreplugin.New(backend.ServeOpts{ + CheckHealthHandler: backend.CheckHealthHandlerFunc(func(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { + checkHealthCalled = true + return nil, nil + }), + CallResourceHandler: backend.CallResourceHandlerFunc(func(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { + callResourceCalled = true + return nil + }), + }) + p, err := factory("plugin", log.New("test"), nil) + require.NoError(t, err) + require.NotNil(t, p) + require.NoError(t, p.Start(context.Background())) + require.NoError(t, p.Stop(context.Background())) + require.False(t, p.IsManaged()) + require.False(t, p.Exited()) + + _, err = p.CollectMetrics(context.Background()) + require.Equal(t, backendplugin.ErrMethodNotImplemented, err) + + _, err = p.CheckHealth(context.Background(), &backend.CheckHealthRequest{}) + require.NoError(t, err) + require.True(t, checkHealthCalled) + + err = p.CallResource(context.Background(), &backend.CallResourceRequest{}, nil) + require.NoError(t, err) + require.True(t, callResourceCalled) + }) +} diff --git a/pkg/plugins/backendplugin/coreplugin/query_endpoint_adapter.go b/pkg/plugins/backendplugin/coreplugin/query_endpoint_adapter.go new file mode 100644 index 00000000000..de48040d31c --- /dev/null +++ b/pkg/plugins/backendplugin/coreplugin/query_endpoint_adapter.go @@ -0,0 +1,112 @@ +package coreplugin + +import ( + "context" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/plugins/datasource/wrapper" + "github.com/grafana/grafana/pkg/tsdb" +) + +func newQueryEndpointAdapter(pluginID string, logger log.Logger, handler backend.QueryDataHandler) tsdb.TsdbQueryEndpoint { + return &queryEndpointAdapter{ + pluginID: pluginID, + logger: logger, + handler: handler, + } +} + +type queryEndpointAdapter struct { + pluginID string + logger log.Logger + handler backend.QueryDataHandler +} + +func modelToInstanceSettings(ds *models.DataSource) (*backend.DataSourceInstanceSettings, error) { + jsonDataBytes, err := ds.JsonData.MarshalJSON() + if err != nil { + return nil, err + } + + return &backend.DataSourceInstanceSettings{ + ID: ds.Id, + Name: ds.Name, + URL: ds.Url, + Database: ds.Database, + User: ds.User, + BasicAuthEnabled: ds.BasicAuth, + BasicAuthUser: ds.BasicAuthUser, + JSONData: jsonDataBytes, + DecryptedSecureJSONData: ds.DecryptedValues(), + Updated: ds.Updated, + }, nil +} + +func (a *queryEndpointAdapter) Query(ctx context.Context, ds *models.DataSource, query *tsdb.TsdbQuery) (*tsdb.Response, error) { + instanceSettings, err := modelToInstanceSettings(ds) + if err != nil { + return nil, err + } + + req := &backend.QueryDataRequest{ + PluginContext: backend.PluginContext{ + OrgID: ds.OrgId, + PluginID: a.pluginID, + User: wrapper.BackendUserFromSignedInUser(query.User), + DataSourceInstanceSettings: instanceSettings, + }, + Queries: []backend.DataQuery{}, + } + + for _, q := range query.Queries { + modelJSON, err := q.Model.MarshalJSON() + if err != nil { + return nil, err + } + req.Queries = append(req.Queries, backend.DataQuery{ + RefID: q.RefId, + Interval: time.Duration(q.IntervalMs) * time.Millisecond, + MaxDataPoints: q.MaxDataPoints, + TimeRange: backend.TimeRange{ + From: query.TimeRange.GetFromAsTimeUTC(), + To: query.TimeRange.GetToAsTimeUTC(), + }, + QueryType: q.QueryType, + JSON: modelJSON, + }) + } + + resp, err := a.handler.QueryData(ctx, req) + if err != nil { + return nil, err + } + + tR := &tsdb.Response{ + Results: make(map[string]*tsdb.QueryResult, len(resp.Responses)), + } + + for refID, r := range resp.Responses { + qr := &tsdb.QueryResult{ + RefId: refID, + } + + for _, f := range r.Frames { + if f.RefID == "" { + f.RefID = refID + } + } + + qr.Dataframes = tsdb.NewDecodedDataFrames(r.Frames) + + if r.Error != nil { + qr.Error = r.Error + } + + tR.Results[refID] = qr + } + + return tR, nil +} diff --git a/pkg/plugins/backendplugin/client.go b/pkg/plugins/backendplugin/grpcplugin/client.go similarity index 72% rename from pkg/plugins/backendplugin/client.go rename to pkg/plugins/backendplugin/grpcplugin/client.go index 10a52876c7d..dc8897dcc34 100644 --- a/pkg/plugins/backendplugin/client.go +++ b/pkg/plugins/backendplugin/grpcplugin/client.go @@ -1,4 +1,4 @@ -package backendplugin +package grpcplugin import ( "os/exec" @@ -6,7 +6,9 @@ import ( datasourceV1 "github.com/grafana/grafana-plugin-model/go/datasource" rendererV1 "github.com/grafana/grafana-plugin-model/go/renderer" "github.com/grafana/grafana-plugin-sdk-go/backend/grpcplugin" + sdkgrpcplugin "github.com/grafana/grafana-plugin-sdk-go/backend/grpcplugin" "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/plugins/backendplugin" "github.com/grafana/grafana/pkg/plugins/backendplugin/pluginextensionv2" goplugin "github.com/hashicorp/go-plugin" ) @@ -26,8 +28,8 @@ var handshake = goplugin.HandshakeConfig{ ProtocolVersion: DefaultProtocolVersion, // The magic cookie values should NEVER be changed. - MagicCookieKey: grpcplugin.MagicCookieKey, - MagicCookieValue: grpcplugin.MagicCookieValue, + MagicCookieKey: sdkgrpcplugin.MagicCookieKey, + MagicCookieValue: sdkgrpcplugin.MagicCookieValue, } func newClientConfig(executablePath string, env []string, logger log.Logger, versionedPlugins map[int]goplugin.PluginSet) *goplugin.ClientConfig { @@ -72,18 +74,17 @@ func (pd PluginDescriptor) PluginID() string { // getV2PluginSet returns list of plugins supported on v2. func getV2PluginSet() goplugin.PluginSet { return goplugin.PluginSet{ - "diagnostics": &grpcplugin.DiagnosticsGRPCPlugin{}, - "resource": &grpcplugin.ResourceGRPCPlugin{}, - "data": &grpcplugin.DataGRPCPlugin{}, - "transform": &grpcplugin.TransformGRPCPlugin{}, + "diagnostics": &sdkgrpcplugin.DiagnosticsGRPCPlugin{}, + "resource": &sdkgrpcplugin.ResourceGRPCPlugin{}, + "data": &sdkgrpcplugin.DataGRPCPlugin{}, + "transform": &sdkgrpcplugin.TransformGRPCPlugin{}, "renderer": &pluginextensionv2.RendererGRPCPlugin{}, } } -// NewBackendPluginDescriptor creates a new backend plugin descriptor -// used for registering a backend datasource plugin. -func NewBackendPluginDescriptor(pluginID, executablePath string, startFns PluginStartFuncs) PluginDescriptor { - return PluginDescriptor{ +// NewBackendPlugin creates a new backend plugin factory used for registering a backend plugin. +func NewBackendPlugin(pluginID, executablePath string, startFns PluginStartFuncs) backendplugin.PluginFactoryFunc { + return New(PluginDescriptor{ pluginID: pluginID, executablePath: executablePath, managed: true, @@ -91,16 +92,15 @@ func NewBackendPluginDescriptor(pluginID, executablePath string, startFns Plugin DefaultProtocolVersion: { pluginID: &datasourceV1.DatasourcePluginImpl{}, }, - grpcplugin.ProtocolVersion: getV2PluginSet(), + sdkgrpcplugin.ProtocolVersion: getV2PluginSet(), }, startFns: startFns, - } + }) } -// NewRendererPluginDescriptor creates a new renderer plugin descriptor -// used for registering a backend renderer plugin. -func NewRendererPluginDescriptor(pluginID, executablePath string, startFns PluginStartFuncs) PluginDescriptor { - return PluginDescriptor{ +// NewRendererPlugin creates a new renderer plugin factory used for registering a backend renderer plugin. +func NewRendererPlugin(pluginID, executablePath string, startFns PluginStartFuncs) backendplugin.PluginFactoryFunc { + return New(PluginDescriptor{ pluginID: pluginID, executablePath: executablePath, managed: false, @@ -108,38 +108,21 @@ func NewRendererPluginDescriptor(pluginID, executablePath string, startFns Plugi DefaultProtocolVersion: { pluginID: &rendererV1.RendererPluginImpl{}, }, - grpcplugin.ProtocolVersion: getV2PluginSet(), + sdkgrpcplugin.ProtocolVersion: getV2PluginSet(), }, startFns: startFns, - } + }) } -type DiagnosticsPlugin interface { - grpcplugin.DiagnosticsClient -} - -type ResourcePlugin interface { - grpcplugin.ResourceClient -} - -type DataPlugin interface { - grpcplugin.DataClient -} - -type TransformPlugin interface { - grpcplugin.TransformClient -} - -// LegacyClient client for communicating with a plugin using the old plugin protocol. +// LegacyClient client for communicating with a plugin using the v1 plugin protocol. type LegacyClient struct { DatasourcePlugin datasourceV1.DatasourcePlugin RendererPlugin rendererV1.RendererPlugin } -// Client client for communicating with a plugin using the current plugin protocol. +// Client client for communicating with a plugin using the current (v2) plugin protocol. type Client struct { - ResourcePlugin ResourcePlugin - DataPlugin DataPlugin - TransformPlugin TransformPlugin + DataPlugin grpcplugin.DataClient + TransformPlugin grpcplugin.TransformClient RendererPlugin pluginextensionv2.RendererPlugin } diff --git a/pkg/plugins/backendplugin/grpcplugin/client_v1.go b/pkg/plugins/backendplugin/grpcplugin/client_v1.go new file mode 100644 index 00000000000..57ad97f1350 --- /dev/null +++ b/pkg/plugins/backendplugin/grpcplugin/client_v1.go @@ -0,0 +1,84 @@ +package grpcplugin + +import ( + "context" + + datasourceV1 "github.com/grafana/grafana-plugin-model/go/datasource" + rendererV1 "github.com/grafana/grafana-plugin-model/go/renderer" + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/plugins/backendplugin" + "github.com/hashicorp/go-plugin" +) + +type clientV1 struct { + logger log.Logger + datasourceV1.DatasourcePlugin + rendererV1.RendererPlugin +} + +func newClientV1(descriptor PluginDescriptor, logger log.Logger, rpcClient plugin.ClientProtocol) (pluginClient, error) { + logger.Warn("Plugin uses a deprecated version of Grafana's backend plugin system which will be removed in a future release. " + + "Consider upgrading to a newer plugin version or reach out to the plugin repository/developer and request an upgrade.") + + raw, err := rpcClient.Dispense(descriptor.pluginID) + if err != nil { + return nil, err + } + + c := clientV1{ + logger: logger, + } + if plugin, ok := raw.(datasourceV1.DatasourcePlugin); ok { + c.DatasourcePlugin = instrumentDatasourcePluginV1(plugin) + } + + if plugin, ok := raw.(rendererV1.RendererPlugin); ok { + c.RendererPlugin = plugin + } + + if descriptor.startFns.OnLegacyStart != nil { + legacyClient := &LegacyClient{ + DatasourcePlugin: c.DatasourcePlugin, + RendererPlugin: c.RendererPlugin, + } + if err := descriptor.startFns.OnLegacyStart(descriptor.pluginID, legacyClient, logger); err != nil { + return nil, err + } + } + + return &c, nil +} + +func (c *clientV1) CollectMetrics(ctx context.Context) (*backend.CollectMetricsResult, error) { + return nil, backendplugin.ErrMethodNotImplemented +} + +func (c *clientV1) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { + return nil, backendplugin.ErrMethodNotImplemented +} + +func (c *clientV1) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { + return backendplugin.ErrMethodNotImplemented +} + +type datasourceV1QueryFunc func(ctx context.Context, req *datasourceV1.DatasourceRequest) (*datasourceV1.DatasourceResponse, error) + +func (fn datasourceV1QueryFunc) Query(ctx context.Context, req *datasourceV1.DatasourceRequest) (*datasourceV1.DatasourceResponse, error) { + return fn(ctx, req) +} + +func instrumentDatasourcePluginV1(plugin datasourceV1.DatasourcePlugin) datasourceV1.DatasourcePlugin { + if plugin == nil { + return nil + } + + return datasourceV1QueryFunc(func(ctx context.Context, req *datasourceV1.DatasourceRequest) (*datasourceV1.DatasourceResponse, error) { + var resp *datasourceV1.DatasourceResponse + err := backendplugin.InstrumentQueryDataRequest(req.Datasource.Type, func() (innerErr error) { + resp, innerErr = plugin.Query(ctx, req) + return + }) + return resp, err + }) +} diff --git a/pkg/plugins/backendplugin/grpcplugin/client_v2.go b/pkg/plugins/backendplugin/grpcplugin/client_v2.go new file mode 100644 index 00000000000..3a2c75665a5 --- /dev/null +++ b/pkg/plugins/backendplugin/grpcplugin/client_v2.go @@ -0,0 +1,212 @@ +package grpcplugin + +import ( + "context" + "io" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/backend/grpcplugin" + "github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2" + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/plugins/backendplugin" + "github.com/grafana/grafana/pkg/plugins/backendplugin/pluginextensionv2" + "github.com/grafana/grafana/pkg/util/errutil" + "github.com/hashicorp/go-plugin" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type clientV2 struct { + grpcplugin.DiagnosticsClient + grpcplugin.ResourceClient + grpcplugin.DataClient + grpcplugin.TransformClient + pluginextensionv2.RendererPlugin +} + +func newClientV2(descriptor PluginDescriptor, logger log.Logger, rpcClient plugin.ClientProtocol) (pluginClient, error) { + rawDiagnostics, err := rpcClient.Dispense("diagnostics") + if err != nil { + return nil, err + } + + rawResource, err := rpcClient.Dispense("resource") + if err != nil { + return nil, err + } + + rawData, err := rpcClient.Dispense("data") + if err != nil { + return nil, err + } + + rawTransform, err := rpcClient.Dispense("transform") + if err != nil { + return nil, err + } + + rawRenderer, err := rpcClient.Dispense("renderer") + if err != nil { + return nil, err + } + + c := clientV2{} + if rawDiagnostics != nil { + if plugin, ok := rawDiagnostics.(grpcplugin.DiagnosticsClient); ok { + c.DiagnosticsClient = plugin + } + } + + if rawResource != nil { + if plugin, ok := rawResource.(grpcplugin.ResourceClient); ok { + c.ResourceClient = plugin + } + } + + if rawData != nil { + if plugin, ok := rawData.(grpcplugin.DataClient); ok { + c.DataClient = instrumentDataClient(plugin) + } + } + + if rawTransform != nil { + if plugin, ok := rawTransform.(grpcplugin.TransformClient); ok { + c.TransformClient = instrumentTransformPlugin(plugin) + } + } + + if rawRenderer != nil { + if plugin, ok := rawRenderer.(pluginextensionv2.RendererPlugin); ok { + c.RendererPlugin = plugin + } + } + + if descriptor.startFns.OnStart != nil { + client := &Client{ + DataPlugin: c.DataClient, + TransformPlugin: c.TransformClient, + RendererPlugin: c.RendererPlugin, + } + if err := descriptor.startFns.OnStart(descriptor.pluginID, client, logger); err != nil { + return nil, err + } + } + + return &c, nil +} + +func (c *clientV2) CollectMetrics(ctx context.Context) (*backend.CollectMetricsResult, error) { + if c.DiagnosticsClient == nil { + return &backend.CollectMetricsResult{}, nil + } + + protoResp, err := c.DiagnosticsClient.CollectMetrics(ctx, &pluginv2.CollectMetricsRequest{}) + if err != nil { + if status.Code(err) == codes.Unimplemented { + return &backend.CollectMetricsResult{}, nil + } + + return nil, err + } + + return backend.FromProto().CollectMetricsResponse(protoResp), nil +} + +func (c *clientV2) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { + if c.DiagnosticsClient == nil { + return nil, backendplugin.ErrMethodNotImplemented + } + + protoContext := backend.ToProto().PluginContext(req.PluginContext) + protoResp, err := c.DiagnosticsClient.CheckHealth(ctx, &pluginv2.CheckHealthRequest{PluginContext: protoContext}) + + if err != nil { + if status.Code(err) == codes.Unimplemented { + return &backend.CheckHealthResult{ + Status: backend.HealthStatusUnknown, + Message: "Health check not implemented", + }, nil + } + return nil, err + } + + return backend.FromProto().CheckHealthResponse(protoResp), nil +} + +func (c *clientV2) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { + if c.ResourceClient == nil { + return backendplugin.ErrMethodNotImplemented + } + + protoReq := backend.ToProto().CallResourceRequest(req) + protoStream, err := c.ResourceClient.CallResource(ctx, protoReq) + if err != nil { + if status.Code(err) == codes.Unimplemented { + return backendplugin.ErrMethodNotImplemented + } + + return errutil.Wrap("Failed to call resource", err) + } + + for { + protoResp, err := protoStream.Recv() + if err != nil { + if status.Code(err) == codes.Unimplemented { + return backendplugin.ErrMethodNotImplemented + } + + if err == io.EOF { + return nil + } + + return errutil.Wrap("Failed to receive call resource response", err) + } + + if err := sender.Send(backend.FromProto().CallResourceResponse(protoResp)); err != nil { + return err + } + } +} + +type dataClientQueryDataFunc func(ctx context.Context, req *pluginv2.QueryDataRequest, opts ...grpc.CallOption) (*pluginv2.QueryDataResponse, error) + +func (fn dataClientQueryDataFunc) QueryData(ctx context.Context, req *pluginv2.QueryDataRequest, opts ...grpc.CallOption) (*pluginv2.QueryDataResponse, error) { + return fn(ctx, req, opts...) +} + +func instrumentDataClient(plugin grpcplugin.DataClient) grpcplugin.DataClient { + if plugin == nil { + return nil + } + + return dataClientQueryDataFunc(func(ctx context.Context, req *pluginv2.QueryDataRequest, opts ...grpc.CallOption) (*pluginv2.QueryDataResponse, error) { + var resp *pluginv2.QueryDataResponse + err := backendplugin.InstrumentQueryDataRequest(req.PluginContext.PluginId, func() (innerErr error) { + resp, innerErr = plugin.QueryData(ctx, req) + return + }) + return resp, err + }) +} + +type transformPluginTransformDataFunc func(ctx context.Context, req *pluginv2.QueryDataRequest, callback grpcplugin.TransformDataCallBack) (*pluginv2.QueryDataResponse, error) + +func (fn transformPluginTransformDataFunc) TransformData(ctx context.Context, req *pluginv2.QueryDataRequest, callback grpcplugin.TransformDataCallBack) (*pluginv2.QueryDataResponse, error) { + return fn(ctx, req, callback) +} + +func instrumentTransformPlugin(plugin grpcplugin.TransformClient) grpcplugin.TransformClient { + if plugin == nil { + return nil + } + + return transformPluginTransformDataFunc(func(ctx context.Context, req *pluginv2.QueryDataRequest, callback grpcplugin.TransformDataCallBack) (*pluginv2.QueryDataResponse, error) { + var resp *pluginv2.QueryDataResponse + err := backendplugin.InstrumentTransformDataRequest(req.PluginContext.PluginId, func() (innerErr error) { + resp, innerErr = plugin.TransformData(ctx, req, callback) + return + }) + return resp, err + }) +} diff --git a/pkg/plugins/backendplugin/grpcplugin/grpc_plugin.go b/pkg/plugins/backendplugin/grpcplugin/grpc_plugin.go new file mode 100644 index 00000000000..54277e8adf4 --- /dev/null +++ b/pkg/plugins/backendplugin/grpcplugin/grpc_plugin.go @@ -0,0 +1,136 @@ +package grpcplugin + +import ( + "context" + "errors" + "sync" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/plugins/backendplugin" + "github.com/hashicorp/go-plugin" +) + +type pluginClient interface { + backend.CollectMetricsHandler + backend.CheckHealthHandler + backend.CallResourceHandler +} + +type grpcPlugin struct { + descriptor PluginDescriptor + clientFactory func() *plugin.Client + client *plugin.Client + pluginClient pluginClient + logger log.Logger + mutex sync.RWMutex +} + +// New allocates and returns a new gRPC (external) backendplugin.Plugin. +func New(descriptor PluginDescriptor) backendplugin.PluginFactoryFunc { + return backendplugin.PluginFactoryFunc(func(pluginID string, logger log.Logger, env []string) (backendplugin.Plugin, error) { + return &grpcPlugin{ + descriptor: descriptor, + logger: logger, + clientFactory: func() *plugin.Client { + return plugin.NewClient(newClientConfig(descriptor.executablePath, env, logger, descriptor.versionedPlugins)) + }, + }, nil + }) +} + +func (p *grpcPlugin) PluginID() string { + return p.descriptor.pluginID +} + +func (p *grpcPlugin) Logger() log.Logger { + return p.logger +} + +func (p *grpcPlugin) Start(ctx context.Context) error { + p.mutex.Lock() + defer p.mutex.Unlock() + + p.client = p.clientFactory() + rpcClient, err := p.client.Client() + if err != nil { + return err + } + + if p.client.NegotiatedVersion() > 1 { + p.pluginClient, err = newClientV2(p.descriptor, p.logger, rpcClient) + if err != nil { + return err + } + } else { + p.pluginClient, err = newClientV1(p.descriptor, p.logger, rpcClient) + if err != nil { + return err + } + } + + if p.pluginClient == nil { + return errors.New("no compatible plugin implementation found") + } + + return nil +} + +func (p *grpcPlugin) Stop(ctx context.Context) error { + p.mutex.Lock() + defer p.mutex.Unlock() + + if p.client != nil { + p.client.Kill() + } + return nil +} + +func (p *grpcPlugin) IsManaged() bool { + return p.descriptor.managed +} + +func (p *grpcPlugin) Exited() bool { + p.mutex.RLock() + defer p.mutex.RUnlock() + if p.client != nil { + return p.client.Exited() + } + return true +} + +func (p *grpcPlugin) CollectMetrics(ctx context.Context) (*backend.CollectMetricsResult, error) { + p.mutex.RLock() + if p.client == nil || p.client.Exited() || p.pluginClient == nil { + p.mutex.RUnlock() + return nil, backendplugin.ErrPluginUnavailable + } + pluginClient := p.pluginClient + p.mutex.RUnlock() + + return pluginClient.CollectMetrics(ctx) +} + +func (p *grpcPlugin) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { + p.mutex.RLock() + if p.client == nil || p.client.Exited() || p.pluginClient == nil { + p.mutex.RUnlock() + return nil, backendplugin.ErrPluginUnavailable + } + pluginClient := p.pluginClient + p.mutex.RUnlock() + + return pluginClient.CheckHealth(ctx, req) +} + +func (p *grpcPlugin) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { + p.mutex.RLock() + if p.client == nil || p.client.Exited() || p.pluginClient == nil { + p.mutex.RUnlock() + return backendplugin.ErrPluginUnavailable + } + pluginClient := p.pluginClient + p.mutex.RUnlock() + + return pluginClient.CallResource(ctx, req, sender) +} diff --git a/pkg/plugins/backendplugin/log_wrapper.go b/pkg/plugins/backendplugin/grpcplugin/log_wrapper.go similarity index 99% rename from pkg/plugins/backendplugin/log_wrapper.go rename to pkg/plugins/backendplugin/grpcplugin/log_wrapper.go index 28087286325..6b9fd6ff34d 100644 --- a/pkg/plugins/backendplugin/log_wrapper.go +++ b/pkg/plugins/backendplugin/grpcplugin/log_wrapper.go @@ -1,4 +1,4 @@ -package backendplugin +package grpcplugin import ( "io" diff --git a/pkg/plugins/backendplugin/log_wrapper_test.go b/pkg/plugins/backendplugin/grpcplugin/log_wrapper_test.go similarity index 97% rename from pkg/plugins/backendplugin/log_wrapper_test.go rename to pkg/plugins/backendplugin/grpcplugin/log_wrapper_test.go index 751da1d2e8a..e1ffc650faf 100644 --- a/pkg/plugins/backendplugin/log_wrapper_test.go +++ b/pkg/plugins/backendplugin/grpcplugin/log_wrapper_test.go @@ -1,4 +1,4 @@ -package backendplugin +package grpcplugin import ( "fmt" diff --git a/pkg/plugins/backendplugin/instrumentation.go b/pkg/plugins/backendplugin/instrumentation.go index ac5ce9ce4c5..7aba01f883c 100644 --- a/pkg/plugins/backendplugin/instrumentation.go +++ b/pkg/plugins/backendplugin/instrumentation.go @@ -1,8 +1,10 @@ package backendplugin import ( + "context" "time" + "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/prometheus/client_golang/prometheus" ) @@ -28,8 +30,8 @@ func init() { prometheus.MustRegister(pluginRequestCounter, pluginRequestDuration) } -// InstrumentPluginRequest instruments success rate and latency of `fn` -func InstrumentPluginRequest(pluginID string, endpoint string, fn func() error) error { +// instrumentPluginRequest instruments success rate and latency of `fn` +func instrumentPluginRequest(pluginID string, endpoint string, fn func() error) error { status := "ok" start := time.Now() @@ -45,3 +47,41 @@ func InstrumentPluginRequest(pluginID string, endpoint string, fn func() error) return err } + +func instrumentCollectMetrics(pluginID string, fn func() error) error { + return instrumentPluginRequest(pluginID, "collectMetrics", fn) +} + +func instrumentCheckHealthRequest(pluginID string, fn func() error) error { + return instrumentPluginRequest(pluginID, "checkHealth", fn) +} + +func instrumentCallResourceRequest(pluginID string, fn func() error) error { + return instrumentPluginRequest(pluginID, "callResource", fn) +} + +// InstrumentQueryDataRequest instruments success rate and latency of query data request. +func InstrumentQueryDataRequest(pluginID string, fn func() error) error { + return instrumentPluginRequest(pluginID, "queryData", fn) +} + +// InstrumentTransformDataRequest instruments success rate and latency of transform data request. +func InstrumentTransformDataRequest(pluginID string, fn func() error) error { + return instrumentPluginRequest(pluginID, "transformData", fn) +} + +// InstrumentQueryDataHandler wraps a backend.QueryDataHandler with instrumentation of success rate and latency. +func InstrumentQueryDataHandler(handler backend.QueryDataHandler) backend.QueryDataHandler { + if handler == nil { + return nil + } + + return backend.QueryDataHandlerFunc(func(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { + var resp *backend.QueryDataResponse + err := InstrumentQueryDataRequest(req.PluginContext.PluginID, func() (innerErr error) { + resp, innerErr = handler.QueryData(ctx, req) + return + }) + return resp, err + }) +} diff --git a/pkg/plugins/backendplugin/manager.go b/pkg/plugins/backendplugin/manager.go index 677d95ced62..b58ae5bf2b3 100644 --- a/pkg/plugins/backendplugin/manager.go +++ b/pkg/plugins/backendplugin/manager.go @@ -6,28 +6,31 @@ import ( "errors" "fmt" "io" + "io/ioutil" + "net/http" + "net/url" "sync" "time" "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/registry" "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/util/errutil" "github.com/grafana/grafana/pkg/util/proxyutil" - - "github.com/grafana/grafana/pkg/infra/log" - "github.com/grafana/grafana/pkg/registry" - plugin "github.com/hashicorp/go-plugin" "golang.org/x/xerrors" ) var ( // ErrPluginNotRegistered error returned when plugin not registered. ErrPluginNotRegistered = errors.New("Plugin not registered") - // ErrDiagnosticsNotSupported error returned when plugin doesn't support diagnostics. - ErrDiagnosticsNotSupported = errors.New("Plugin diagnostics not supported") // ErrHealthCheckFailed error returned when health check failed. ErrHealthCheckFailed = errors.New("Health check failed") + // ErrPluginUnavailable error returned when plugin is unavailable. + ErrPluginUnavailable = errors.New("Plugin unavailable") + // ErrMethodNotImplemented error returned when plugin method not implemented. + ErrMethodNotImplemented = errors.New("method not implemented") ) func init() { @@ -41,13 +44,13 @@ func init() { // Manager manages backend plugins. type Manager interface { // Register registers a backend plugin - Register(descriptor PluginDescriptor) error + Register(pluginID string, factory PluginFactoryFunc) error // StartPlugin starts a non-managed backend plugin StartPlugin(ctx context.Context, pluginID string) error // CollectMetrics collects metrics from a registered backend plugin. - CollectMetrics(ctx context.Context, pluginID string) (*CollectMetricsResult, error) + CollectMetrics(ctx context.Context, pluginID string) (*backend.CollectMetricsResult, error) // CheckHealth checks the health of a registered backend plugin. - CheckHealth(ctx context.Context, pCtx backend.PluginContext) (*CheckHealthResult, error) + CheckHealth(ctx context.Context, pCtx backend.PluginContext) (*backend.CheckHealthResult, error) // CallResource calls a plugin resource. CallResource(pluginConfig backend.PluginContext, ctx *models.ReqContext, path string) } @@ -56,13 +59,13 @@ type manager struct { Cfg *setting.Cfg `inject:""` License models.Licensing `inject:""` pluginsMu sync.RWMutex - plugins map[string]*BackendPlugin + plugins map[string]Plugin logger log.Logger pluginSettings map[string]pluginSettings } func (m *manager) Init() error { - m.plugins = make(map[string]*BackendPlugin) + m.plugins = make(map[string]Plugin) m.logger = log.New("plugins.backend") m.pluginSettings = extractPluginSettings(m.Cfg) @@ -72,27 +75,27 @@ func (m *manager) Init() error { func (m *manager) Run(ctx context.Context) error { m.start(ctx) <-ctx.Done() - m.stop() + m.stop(ctx) return ctx.Err() } // Register registers a backend plugin -func (m *manager) Register(descriptor PluginDescriptor) error { - m.logger.Debug("Registering backend plugin", "pluginId", descriptor.pluginID, "executablePath", descriptor.executablePath) +func (m *manager) Register(pluginID string, factory PluginFactoryFunc) error { + m.logger.Debug("Registering backend plugin", "pluginId", pluginID) m.pluginsMu.Lock() defer m.pluginsMu.Unlock() - if _, exists := m.plugins[descriptor.pluginID]; exists { + if _, exists := m.plugins[pluginID]; exists { return errors.New("Backend plugin already registered") } pluginSettings := pluginSettings{} - if ps, exists := m.pluginSettings[descriptor.pluginID]; exists { + if ps, exists := m.pluginSettings[pluginID]; exists { pluginSettings = ps } hostEnv := []string{ - fmt.Sprintf("GF_VERSION=%s", setting.BuildVersion), + fmt.Sprintf("GF_VERSION=%s", m.Cfg.BuildVersion), fmt.Sprintf("GF_EDITION=%s", m.License.Edition()), } @@ -102,20 +105,14 @@ func (m *manager) Register(descriptor PluginDescriptor) error { env := pluginSettings.ToEnv("GF_PLUGIN", hostEnv) - pluginLogger := m.logger.New("pluginId", descriptor.pluginID) - plugin := &BackendPlugin{ - id: descriptor.pluginID, - executablePath: descriptor.executablePath, - managed: descriptor.managed, - clientFactory: func() *plugin.Client { - return plugin.NewClient(newClientConfig(descriptor.executablePath, env, pluginLogger, descriptor.versionedPlugins)) - }, - startFns: descriptor.startFns, - logger: pluginLogger, + pluginLogger := m.logger.New("pluginId", pluginID) + plugin, err := factory(pluginID, pluginLogger, env) + if err != nil { + return err } - m.plugins[descriptor.pluginID] = plugin - m.logger.Debug("Backend plugin registered", "pluginId", descriptor.pluginID, "executablePath", descriptor.executablePath) + m.plugins[pluginID] = plugin + m.logger.Debug("Backend plugin registered", "pluginId", pluginID) return nil } @@ -124,12 +121,12 @@ func (m *manager) start(ctx context.Context) { m.pluginsMu.RLock() defer m.pluginsMu.RUnlock() for _, p := range m.plugins { - if !p.managed { + if !p.IsManaged() { continue } if err := startPluginAndRestartKilledProcesses(ctx, p); err != nil { - p.logger.Error("Failed to start plugin", "error", err) + p.Logger().Error("Failed to start plugin", "error", err) continue } } @@ -141,10 +138,10 @@ func (m *manager) StartPlugin(ctx context.Context, pluginID string) error { p, registered := m.plugins[pluginID] m.pluginsMu.RUnlock() if !registered { - return errors.New("Backend plugin not registered") + return ErrPluginNotRegistered } - if p.managed { + if p.IsManaged() { return errors.New("Backend plugin is managed and cannot be manually started") } @@ -152,22 +149,26 @@ func (m *manager) StartPlugin(ctx context.Context, pluginID string) error { } // stop stops all managed backend plugins -func (m *manager) stop() { +func (m *manager) stop(ctx context.Context) { m.pluginsMu.RLock() defer m.pluginsMu.RUnlock() + var wg sync.WaitGroup for _, p := range m.plugins { - go func(p *BackendPlugin) { - p.logger.Debug("Stopping plugin") - if err := p.stop(); err != nil { - p.logger.Error("Failed to stop plugin", "error", err) + wg.Add(1) + go func(p Plugin, ctx context.Context) { + defer wg.Done() + p.Logger().Debug("Stopping plugin") + if err := p.Stop(ctx); err != nil { + p.Logger().Error("Failed to stop plugin", "error", err) } - p.logger.Debug("Plugin stopped") - }(p) + p.Logger().Debug("Plugin stopped") + }(p, ctx) } + wg.Wait() } // CollectMetrics collects metrics from a registered backend plugin. -func (m *manager) CollectMetrics(ctx context.Context, pluginID string) (*CollectMetricsResult, error) { +func (m *manager) CollectMetrics(ctx context.Context, pluginID string) (*backend.CollectMetricsResult, error) { m.pluginsMu.RLock() p, registered := m.plugins[pluginID] m.pluginsMu.RUnlock() @@ -176,98 +177,139 @@ func (m *manager) CollectMetrics(ctx context.Context, pluginID string) (*Collect return nil, ErrPluginNotRegistered } - if !p.supportsDiagnostics() { - return nil, ErrDiagnosticsNotSupported - } - - res, err := p.CollectMetrics(ctx) + var resp *backend.CollectMetricsResult + err := instrumentCollectMetrics(p.PluginID(), func() (innerErr error) { + resp, innerErr = p.CollectMetrics(ctx) + return + }) if err != nil { return nil, err } - return collectMetricsResultFromProto(res), nil + return resp, nil } // CheckHealth checks the health of a registered backend plugin. -func (m *manager) CheckHealth(ctx context.Context, pluginConfig backend.PluginContext) (*CheckHealthResult, error) { +func (m *manager) CheckHealth(ctx context.Context, pluginContext backend.PluginContext) (*backend.CheckHealthResult, error) { m.pluginsMu.RLock() - p, registered := m.plugins[pluginConfig.PluginID] + p, registered := m.plugins[pluginContext.PluginID] m.pluginsMu.RUnlock() if !registered { return nil, ErrPluginNotRegistered } - if !p.supportsDiagnostics() { - return nil, ErrDiagnosticsNotSupported - } + var resp *backend.CheckHealthResult + err := instrumentCheckHealthRequest(p.PluginID(), func() (innerErr error) { + resp, innerErr = p.CheckHealth(ctx, &backend.CheckHealthRequest{PluginContext: pluginContext}) + return + }) - res, err := p.checkHealth(ctx, pluginConfig) if err != nil { - p.logger.Error("Failed to check plugin health", "error", err) - return nil, ErrHealthCheckFailed + if errors.Is(err, ErrMethodNotImplemented) { + return nil, err + } + + return nil, errutil.Wrap("Failed to check plugin health", ErrHealthCheckFailed) } - return checkHealthResultFromProto(res), nil + return resp, nil } type keepCookiesJSONModel struct { KeepCookies []string `json:"keepCookies"` } -// CallResource calls a plugin resource. -func (m *manager) CallResource(pCtx backend.PluginContext, reqCtx *models.ReqContext, path string) { +func (m *manager) callResourceInternal(w http.ResponseWriter, req *http.Request, pCtx backend.PluginContext) error { m.pluginsMu.RLock() p, registered := m.plugins[pCtx.PluginID] m.pluginsMu.RUnlock() if !registered { - reqCtx.JsonApiErr(404, "Plugin not registered", nil) - return + return ErrPluginNotRegistered } - clonedReq := reqCtx.Req.Clone(reqCtx.Req.Context()) keepCookieModel := keepCookiesJSONModel{} if dis := pCtx.DataSourceInstanceSettings; dis != nil { err := json.Unmarshal(dis.JSONData, &keepCookieModel) if err != nil { - p.logger.Error("Failed to to unpack JSONData in datasource instance settings", "error", err) + p.Logger().Error("Failed to to unpack JSONData in datasource instance settings", "error", err) } } - proxyutil.ClearCookieHeader(clonedReq, keepCookieModel.KeepCookies) - proxyutil.PrepareProxyRequest(clonedReq) + proxyutil.ClearCookieHeader(req, keepCookieModel.KeepCookies) + proxyutil.PrepareProxyRequest(req) - body, err := reqCtx.Req.Body().Bytes() + body, err := ioutil.ReadAll(req.Body) if err != nil { - reqCtx.JsonApiErr(500, "Failed to read request body", err) - return + return errors.New("Failed to read request body") } - req := &backend.CallResourceRequest{ + crReq := &backend.CallResourceRequest{ PluginContext: pCtx, - Path: path, - Method: clonedReq.Method, - URL: clonedReq.URL.String(), - Headers: clonedReq.Header, + Path: req.URL.Path, + Method: req.Method, + URL: req.URL.String(), + Headers: req.Header, Body: body, } - err = InstrumentPluginRequest(p.id, "resource", func() error { - stream, err := p.callResource(clonedReq.Context(), req) - if err != nil { - return errutil.Wrap("Failed to call resource", err) + return instrumentCallResourceRequest(p.PluginID(), func() error { + childCtx, cancel := context.WithCancel(req.Context()) + defer cancel() + stream := newCallResourceResponseStream(childCtx) + var wg sync.WaitGroup + wg.Add(1) + var flushStreamErr error + go func() { + flushStreamErr = flushStream(p, stream, w) + wg.Done() + }() + + innerErr := p.CallResource(req.Context(), crReq, stream) + stream.Close() + if innerErr != nil { + return innerErr } - - return flushStream(p, stream, reqCtx) + wg.Wait() + return flushStreamErr }) +} +// CallResource calls a plugin resource. +func (m *manager) CallResource(pCtx backend.PluginContext, reqCtx *models.ReqContext, path string) { + clonedReq := reqCtx.Req.Clone(reqCtx.Req.Context()) + rawURL := path + if clonedReq.URL.RawQuery != "" { + rawURL += "?" + clonedReq.URL.RawQuery + } + urlPath, err := url.Parse(rawURL) if err != nil { - reqCtx.JsonApiErr(500, "Failed to ", err) + handleCallResourceError(err, reqCtx) + return + } + clonedReq.URL = urlPath + err = m.callResourceInternal(reqCtx.Resp, clonedReq, pCtx) + if err != nil { + handleCallResourceError(err, reqCtx) } } -func flushStream(plugin *BackendPlugin, stream callResourceResultStream, reqCtx *models.ReqContext) error { +func handleCallResourceError(err error, reqCtx *models.ReqContext) { + if errors.Is(err, ErrPluginUnavailable) { + reqCtx.JsonApiErr(503, "Plugin unavailable", err) + return + } + + if errors.Is(err, ErrMethodNotImplemented) { + reqCtx.JsonApiErr(404, "Not found", err) + return + } + + reqCtx.JsonApiErr(500, "Failed to call resource", err) +} + +func flushStream(plugin Plugin, stream CallResourceClientResponseStream, w http.ResponseWriter) error { processedStreams := 0 for { @@ -283,12 +325,12 @@ func flushStream(plugin *BackendPlugin, stream callResourceResultStream, reqCtx return errutil.Wrap("Failed to receive response from resource call", err) } - plugin.logger.Error("Failed to receive response from resource call", "error", err) - return nil + plugin.Logger().Error("Failed to receive response from resource call", "error", err) + return stream.Close() } // Expected that headers and status are only part of first stream - if processedStreams == 0 { + if processedStreams == 0 && resp.Headers != nil { // Make sure a content type always is returned in response if _, exists := resp.Headers["Content-Type"]; !exists { resp.Headers["Content-Type"] = []string{"application/json"} @@ -302,37 +344,39 @@ func flushStream(plugin *BackendPlugin, stream callResourceResultStream, reqCtx } for _, v := range values { - reqCtx.Resp.Header().Add(k, v) + w.Header().Add(k, v) } } - reqCtx.WriteHeader(resp.Status) + w.WriteHeader(resp.Status) } - if _, err := reqCtx.Write(resp.Body); err != nil { - plugin.logger.Error("Failed to write resource response", "error", err) + if _, err := w.Write(resp.Body); err != nil { + plugin.Logger().Error("Failed to write resource response", "error", err) } - reqCtx.Resp.Flush() + if flusher, ok := w.(http.Flusher); ok { + flusher.Flush() + } processedStreams++ } } -func startPluginAndRestartKilledProcesses(ctx context.Context, p *BackendPlugin) error { - if err := p.start(ctx); err != nil { +func startPluginAndRestartKilledProcesses(ctx context.Context, p Plugin) error { + if err := p.Start(ctx); err != nil { return err } - go func(ctx context.Context, p *BackendPlugin) { + go func(ctx context.Context, p Plugin) { if err := restartKilledProcess(ctx, p); err != nil { - p.logger.Error("Attempt to restart killed plugin process failed", "error", err) + p.Logger().Error("Attempt to restart killed plugin process failed", "error", err) } }(ctx, p) return nil } -func restartKilledProcess(ctx context.Context, p *BackendPlugin) error { +func restartKilledProcess(ctx context.Context, p Plugin) error { ticker := time.NewTicker(time.Second * 1) for { @@ -343,16 +387,16 @@ func restartKilledProcess(ctx context.Context, p *BackendPlugin) error { } return nil case <-ticker.C: - if !p.client.Exited() { + if !p.Exited() { continue } - p.logger.Debug("Restarting plugin") - if err := p.start(ctx); err != nil { - p.logger.Error("Failed to restart plugin", "error", err) + p.Logger().Debug("Restarting plugin") + if err := p.Start(ctx); err != nil { + p.Logger().Error("Failed to restart plugin", "error", err) continue } - p.logger.Debug("Plugin restarted") + p.Logger().Debug("Plugin restarted") } } } diff --git a/pkg/plugins/backendplugin/manager_test.go b/pkg/plugins/backendplugin/manager_test.go new file mode 100644 index 00000000000..ec62a1778ff --- /dev/null +++ b/pkg/plugins/backendplugin/manager_test.go @@ -0,0 +1,410 @@ +package backendplugin + +import ( + "bytes" + "context" + "net/http" + "net/http/httptest" + "sync" + "testing" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/setting" + "github.com/stretchr/testify/require" +) + +const testPluginID = "test-plugin" + +func TestManager(t *testing.T) { + newManagerScenario(t, false, func(t *testing.T, ctx *managerScenarioCtx) { + t.Run("Unregistered plugin scenario", func(t *testing.T) { + err := ctx.manager.StartPlugin(context.Background(), testPluginID) + require.Equal(t, ErrPluginNotRegistered, err) + + _, err = ctx.manager.CollectMetrics(context.Background(), testPluginID) + require.Equal(t, ErrPluginNotRegistered, err) + + _, err = ctx.manager.CheckHealth(context.Background(), backend.PluginContext{PluginID: testPluginID}) + require.Equal(t, ErrPluginNotRegistered, err) + + req, err := http.NewRequest(http.MethodGet, "/test", nil) + require.NoError(t, err) + w := httptest.NewRecorder() + err = ctx.manager.callResourceInternal(w, req, backend.PluginContext{PluginID: testPluginID}) + require.Equal(t, ErrPluginNotRegistered, err) + }) + }) + + newManagerScenario(t, true, func(t *testing.T, ctx *managerScenarioCtx) { + t.Run("Managed plugin scenario", func(t *testing.T) { + ctx.license.edition = "Open Source" + ctx.license.hasLicense = false + ctx.cfg.BuildVersion = "7.0.0" + + t.Run("Should be able to register plugin", func(t *testing.T) { + err := ctx.manager.Register(testPluginID, ctx.factory) + require.NoError(t, err) + require.NotNil(t, ctx.plugin) + require.Equal(t, testPluginID, ctx.plugin.pluginID) + require.NotNil(t, ctx.plugin.logger) + + t.Run("Should not be able to register an already registered plugin", func(t *testing.T) { + err := ctx.manager.Register(testPluginID, ctx.factory) + require.Error(t, err) + }) + + t.Run("Should provide expected host environment variables", func(t *testing.T) { + require.Len(t, ctx.env, 2) + require.EqualValues(t, []string{"GF_VERSION=7.0.0", "GF_EDITION=Open Source"}, ctx.env) + }) + + t.Run("When manager runs should start and stop plugin", func(t *testing.T) { + pCtx := context.Background() + cCtx, cancel := context.WithCancel(pCtx) + var wg sync.WaitGroup + wg.Add(1) + var runErr error + go func() { + runErr = ctx.manager.Run(cCtx) + wg.Done() + }() + time.Sleep(time.Millisecond) + cancel() + wg.Wait() + require.Equal(t, context.Canceled, runErr) + require.Equal(t, 1, ctx.plugin.startCount) + require.Equal(t, 1, ctx.plugin.stopCount) + }) + + t.Run("When manager runs should restart plugin process when killed", func(t *testing.T) { + ctx.plugin.stopCount = 0 + ctx.plugin.startCount = 0 + pCtx := context.Background() + cCtx, cancel := context.WithCancel(pCtx) + var wgRun sync.WaitGroup + wgRun.Add(1) + var runErr error + go func() { + runErr = ctx.manager.Run(cCtx) + wgRun.Done() + }() + + time.Sleep(time.Millisecond) + + var wgKill sync.WaitGroup + wgKill.Add(1) + go func() { + ctx.plugin.kill() + for { + if !ctx.plugin.Exited() { + break + } + } + cancel() + wgKill.Done() + }() + wgKill.Wait() + wgRun.Wait() + require.Equal(t, context.Canceled, runErr) + require.Equal(t, 1, ctx.plugin.stopCount) + require.Equal(t, 2, ctx.plugin.startCount) + }) + + t.Run("Shouldn't be able to start managed plugin", func(t *testing.T) { + err := ctx.manager.StartPlugin(context.Background(), testPluginID) + require.NotNil(t, err) + }) + + t.Run("Unimplemented handlers", func(t *testing.T) { + t.Run("Collect metrics should return method not implemented error", func(t *testing.T) { + _, err = ctx.manager.CollectMetrics(context.Background(), testPluginID) + require.Equal(t, ErrMethodNotImplemented, err) + }) + + t.Run("Check health should return method not implemented error", func(t *testing.T) { + _, err = ctx.manager.CheckHealth(context.Background(), backend.PluginContext{PluginID: testPluginID}) + require.Equal(t, ErrMethodNotImplemented, err) + }) + + t.Run("Call resource should return method not implemented error", func(t *testing.T) { + req, err := http.NewRequest(http.MethodGet, "/test", bytes.NewReader([]byte{})) + require.NoError(t, err) + w := httptest.NewRecorder() + err = ctx.manager.callResourceInternal(w, req, backend.PluginContext{PluginID: testPluginID}) + require.Equal(t, ErrMethodNotImplemented, err) + }) + }) + + t.Run("Implemented handlers", func(t *testing.T) { + t.Run("Collect metrics should return expected result", func(t *testing.T) { + ctx.plugin.CollectMetricsHandlerFunc = backend.CollectMetricsHandlerFunc(func(ctx context.Context) (*backend.CollectMetricsResult, error) { + return &backend.CollectMetricsResult{ + PrometheusMetrics: []byte("hello"), + }, nil + }) + + res, err := ctx.manager.CollectMetrics(context.Background(), testPluginID) + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, "hello", string(res.PrometheusMetrics)) + }) + + t.Run("Check health should return expected result", func(t *testing.T) { + json := []byte(`{ + "key": "value" + }`) + ctx.plugin.CheckHealthHandlerFunc = backend.CheckHealthHandlerFunc(func(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { + return &backend.CheckHealthResult{ + Status: backend.HealthStatusOk, + Message: "All good", + JSONDetails: json, + }, nil + }) + + res, err := ctx.manager.CheckHealth(context.Background(), backend.PluginContext{PluginID: testPluginID}) + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, backend.HealthStatusOk, res.Status) + require.Equal(t, "All good", res.Message) + require.Equal(t, json, res.JSONDetails) + }) + + t.Run("Call resource should return expected response", func(t *testing.T) { + ctx.plugin.CallResourceHandlerFunc = backend.CallResourceHandlerFunc(func(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { + return sender.Send(&backend.CallResourceResponse{ + Status: http.StatusOK, + }) + }) + + req, err := http.NewRequest(http.MethodGet, "/test", bytes.NewReader([]byte{})) + require.NoError(t, err) + w := httptest.NewRecorder() + err = ctx.manager.callResourceInternal(w, req, backend.PluginContext{PluginID: testPluginID}) + require.NoError(t, err) + require.Equal(t, http.StatusOK, w.Code) + }) + }) + }) + }) + }) + + newManagerScenario(t, false, func(t *testing.T, ctx *managerScenarioCtx) { + t.Run("Unmanaged plugin scenario", func(t *testing.T) { + ctx.license.edition = "Open Source" + ctx.license.hasLicense = false + ctx.cfg.BuildVersion = "7.0.0" + + t.Run("Should be able to register plugin", func(t *testing.T) { + err := ctx.manager.Register(testPluginID, ctx.factory) + require.NoError(t, err) + require.False(t, ctx.plugin.managed) + + t.Run("When manager runs should not start plugin", func(t *testing.T) { + pCtx := context.Background() + cCtx, cancel := context.WithCancel(pCtx) + var wg sync.WaitGroup + wg.Add(1) + var runErr error + go func() { + runErr = ctx.manager.Run(cCtx) + wg.Done() + }() + go func() { + cancel() + }() + wg.Wait() + require.Equal(t, context.Canceled, runErr) + require.Equal(t, 0, ctx.plugin.startCount) + require.Equal(t, 1, ctx.plugin.stopCount) + }) + + t.Run("Should be able to start unmanaged plugin and be restarted when process is killed", func(t *testing.T) { + pCtx := context.Background() + cCtx, cancel := context.WithCancel(pCtx) + defer cancel() + err := ctx.manager.StartPlugin(cCtx, testPluginID) + require.Nil(t, err) + require.Equal(t, 1, ctx.plugin.startCount) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + ctx.plugin.kill() + for { + if !ctx.plugin.Exited() { + break + } + } + wg.Done() + }() + wg.Wait() + require.Equal(t, 2, ctx.plugin.startCount) + }) + }) + }) + }) + + newManagerScenario(t, true, func(t *testing.T, ctx *managerScenarioCtx) { + t.Run("Plugin registration scenario when Grafana is licensed", func(t *testing.T) { + ctx.license.edition = "Enterprise" + ctx.license.hasLicense = true + ctx.cfg.BuildVersion = "7.0.0" + ctx.cfg.EnterpriseLicensePath = "/license.txt" + + err := ctx.manager.Register(testPluginID, ctx.factory) + require.NoError(t, err) + + t.Run("Should provide expected host environment variables", func(t *testing.T) { + require.Len(t, ctx.env, 3) + require.EqualValues(t, []string{"GF_VERSION=7.0.0", "GF_EDITION=Enterprise", "GF_ENTERPRISE_LICENSE_PATH=/license.txt"}, ctx.env) + }) + }) + }) +} + +type managerScenarioCtx struct { + cfg *setting.Cfg + license *testLicensingService + manager *manager + factory PluginFactoryFunc + plugin *testPlugin + env []string +} + +func newManagerScenario(t *testing.T, managed bool, fn func(t *testing.T, ctx *managerScenarioCtx)) { + t.Helper() + cfg := setting.NewCfg() + license := &testLicensingService{} + ctx := &managerScenarioCtx{ + cfg: cfg, + license: license, + manager: &manager{ + Cfg: cfg, + License: license, + }, + } + + err := ctx.manager.Init() + require.NoError(t, err) + + ctx.factory = PluginFactoryFunc(func(pluginID string, logger log.Logger, env []string) (Plugin, error) { + ctx.plugin = &testPlugin{ + pluginID: pluginID, + logger: logger, + managed: managed, + } + ctx.env = env + + return ctx.plugin, nil + }) + + fn(t, ctx) +} + +type testPlugin struct { + pluginID string + logger log.Logger + startCount int + stopCount int + managed bool + exited bool + backend.CollectMetricsHandlerFunc + backend.CheckHealthHandlerFunc + backend.CallResourceHandlerFunc + mutex sync.RWMutex +} + +func (tp *testPlugin) PluginID() string { + return tp.pluginID +} + +func (tp *testPlugin) Logger() log.Logger { + return tp.logger +} + +func (tp *testPlugin) Start(ctx context.Context) error { + tp.mutex.Lock() + defer tp.mutex.Unlock() + tp.exited = false + tp.startCount++ + return nil +} + +func (tp *testPlugin) Stop(ctx context.Context) error { + tp.mutex.Lock() + defer tp.mutex.Unlock() + tp.stopCount++ + return nil +} + +func (tp *testPlugin) IsManaged() bool { + return tp.managed +} + +func (tp *testPlugin) Exited() bool { + tp.mutex.RLock() + defer tp.mutex.RUnlock() + return tp.exited +} + +func (tp *testPlugin) kill() { + tp.mutex.Lock() + defer tp.mutex.Unlock() + tp.exited = true +} + +func (tp *testPlugin) CollectMetrics(ctx context.Context) (*backend.CollectMetricsResult, error) { + if tp.CollectMetricsHandlerFunc != nil { + return tp.CollectMetricsHandlerFunc(ctx) + } + + return nil, ErrMethodNotImplemented +} + +func (tp *testPlugin) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { + if tp.CheckHealthHandlerFunc != nil { + return tp.CheckHealthHandlerFunc(ctx, req) + } + + return nil, ErrMethodNotImplemented +} + +func (tp *testPlugin) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { + if tp.CallResourceHandlerFunc != nil { + return tp.CallResourceHandlerFunc(ctx, req, sender) + } + + return ErrMethodNotImplemented +} + +type testLicensingService struct { + edition string + hasLicense bool +} + +func (t *testLicensingService) HasLicense() bool { + return t.hasLicense +} + +func (t *testLicensingService) Expiry() int64 { + return 0 +} + +func (t *testLicensingService) Edition() string { + return t.edition +} + +func (t *testLicensingService) StateInfo() string { + return "" +} + +func (t *testLicensingService) LicenseURL(user *models.SignedInUser) string { + return "" +} + +func (t *testLicensingService) HasValidLicense() bool { + return false +} diff --git a/pkg/plugins/backendplugin/plugin.go b/pkg/plugins/backendplugin/plugin.go new file mode 100644 index 00000000000..db2ae1c8c58 --- /dev/null +++ b/pkg/plugins/backendplugin/plugin.go @@ -0,0 +1,30 @@ +package backendplugin + +import ( + "context" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana/pkg/infra/log" +) + +// Plugin backend plugin interface. +type Plugin interface { + PluginID() string + Logger() log.Logger + Start(ctx context.Context) error + Stop(ctx context.Context) error + IsManaged() bool + Exited() bool + backend.CollectMetricsHandler + backend.CheckHealthHandler + backend.CallResourceHandler +} + +// PluginFactoryFunc factory for creating a Plugin. +type PluginFactoryFunc func(pluginID string, logger log.Logger, env []string) (Plugin, error) + +// CallResourceClientResponseStream is used for receiving resource call responses. +type CallResourceClientResponseStream interface { + Recv() (*backend.CallResourceResponse, error) + Close() error +} diff --git a/pkg/plugins/backendplugin/resource_response_stream.go b/pkg/plugins/backendplugin/resource_response_stream.go new file mode 100644 index 00000000000..3b6617c9d1a --- /dev/null +++ b/pkg/plugins/backendplugin/resource_response_stream.go @@ -0,0 +1,57 @@ +package backendplugin + +import ( + "context" + "errors" + "io" + + "github.com/grafana/grafana-plugin-sdk-go/backend" +) + +func newCallResourceResponseStream(ctx context.Context) *callResourceResponseStream { + return &callResourceResponseStream{ + ctx: ctx, + stream: make(chan *backend.CallResourceResponse), + } +} + +type callResourceResponseStream struct { + ctx context.Context + stream chan *backend.CallResourceResponse + closed bool +} + +func (s *callResourceResponseStream) Send(res *backend.CallResourceResponse) error { + if s.closed { + return errors.New("Cannot send to a closed stream") + } + + select { + case <-s.ctx.Done(): + return errors.New("cancelled") + case s.stream <- res: + return nil + } +} + +func (s *callResourceResponseStream) Recv() (*backend.CallResourceResponse, error) { + select { + case <-s.ctx.Done(): + return nil, s.ctx.Err() + case res, ok := <-s.stream: + if !ok { + return nil, io.EOF + } + return res, nil + } +} + +func (s *callResourceResponseStream) Close() error { + if s.closed { + return errors.New("Cannot close a closed stream") + } + + close(s.stream) + s.closed = true + return nil +} diff --git a/pkg/plugins/datasource/wrapper/datasource_plugin_wrapper_v2.go b/pkg/plugins/datasource/wrapper/datasource_plugin_wrapper_v2.go index 761397bad56..5f41bffd391 100644 --- a/pkg/plugins/datasource/wrapper/datasource_plugin_wrapper_v2.go +++ b/pkg/plugins/datasource/wrapper/datasource_plugin_wrapper_v2.go @@ -4,22 +4,22 @@ import ( "context" "fmt" - "github.com/grafana/grafana/pkg/components/simplejson" - "github.com/grafana/grafana/pkg/plugins/backendplugin" + "github.com/grafana/grafana-plugin-sdk-go/backend/grpcplugin" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2" + "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/tsdb" ) -func NewDatasourcePluginWrapperV2(log log.Logger, pluginId, pluginType string, plugin backendplugin.DataPlugin) *DatasourcePluginWrapperV2 { - return &DatasourcePluginWrapperV2{DataPlugin: plugin, logger: log, pluginId: pluginId, pluginType: pluginType} +func NewDatasourcePluginWrapperV2(log log.Logger, pluginId, pluginType string, client grpcplugin.DataClient) *DatasourcePluginWrapperV2 { + return &DatasourcePluginWrapperV2{DataClient: client, logger: log, pluginId: pluginId, pluginType: pluginType} } type DatasourcePluginWrapperV2 struct { - backendplugin.DataPlugin + grpcplugin.DataClient logger log.Logger pluginId string pluginType string @@ -79,14 +79,7 @@ func (tw *DatasourcePluginWrapperV2) Query(ctx context.Context, ds *models.DataS }) } - var pbRes *pluginv2.QueryDataResponse - err = backendplugin.InstrumentPluginRequest(ds.Type, "dataquery", func() error { - var err error - pbRes, err = tw.DataPlugin.QueryData(ctx, pbQuery) - - return err - }) - + pbRes, err := tw.DataClient.QueryData(ctx, pbQuery) if err != nil { return nil, err } diff --git a/pkg/plugins/datasource_plugin.go b/pkg/plugins/datasource_plugin.go index 33efa1aa521..39a30c9cbf6 100644 --- a/pkg/plugins/datasource_plugin.go +++ b/pkg/plugins/datasource_plugin.go @@ -4,14 +4,13 @@ import ( "encoding/json" "path" - "github.com/grafana/grafana/pkg/plugins/backendplugin" - - "github.com/grafana/grafana/pkg/util/errutil" - "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/plugins/backendplugin" + "github.com/grafana/grafana/pkg/plugins/backendplugin/grpcplugin" "github.com/grafana/grafana/pkg/plugins/datasource/wrapper" "github.com/grafana/grafana/pkg/tsdb" + "github.com/grafana/grafana/pkg/util/errutil" ) // DataSourcePlugin contains all metadata about a datasource plugin @@ -47,11 +46,11 @@ func (p *DataSourcePlugin) Load(decoder *json.Decoder, pluginDir string, backend if p.Backend { cmd := ComposePluginStartCommand(p.Executable) fullpath := path.Join(p.PluginDir, cmd) - descriptor := backendplugin.NewBackendPluginDescriptor(p.Id, fullpath, backendplugin.PluginStartFuncs{ + factory := grpcplugin.NewBackendPlugin(p.Id, fullpath, grpcplugin.PluginStartFuncs{ OnLegacyStart: p.onLegacyPluginStart, OnStart: p.onPluginStart, }) - if err := backendPluginManager.Register(descriptor); err != nil { + if err := backendPluginManager.Register(p.Id, factory); err != nil { return errutil.Wrapf(err, "Failed to register backend plugin") } } @@ -60,7 +59,7 @@ func (p *DataSourcePlugin) Load(decoder *json.Decoder, pluginDir string, backend return nil } -func (p *DataSourcePlugin) onLegacyPluginStart(pluginID string, client *backendplugin.LegacyClient, logger log.Logger) error { +func (p *DataSourcePlugin) onLegacyPluginStart(pluginID string, client *grpcplugin.LegacyClient, logger log.Logger) error { tsdb.RegisterTsdbQueryEndpoint(pluginID, func(dsInfo *models.DataSource) (tsdb.TsdbQueryEndpoint, error) { return wrapper.NewDatasourcePluginWrapper(logger, client.DatasourcePlugin), nil }) @@ -68,7 +67,7 @@ func (p *DataSourcePlugin) onLegacyPluginStart(pluginID string, client *backendp return nil } -func (p *DataSourcePlugin) onPluginStart(pluginID string, client *backendplugin.Client, logger log.Logger) error { +func (p *DataSourcePlugin) onPluginStart(pluginID string, client *grpcplugin.Client, logger log.Logger) error { if client.DataPlugin != nil { tsdb.RegisterTsdbQueryEndpoint(pluginID, func(dsInfo *models.DataSource) (tsdb.TsdbQueryEndpoint, error) { return wrapper.NewDatasourcePluginWrapperV2(logger, p.Id, p.Type, client.DataPlugin), nil diff --git a/pkg/plugins/plugins_test.go b/pkg/plugins/plugins_test.go index 7b6b828f752..94a4d1fd4ea 100644 --- a/pkg/plugins/plugins_test.go +++ b/pkg/plugins/plugins_test.go @@ -189,8 +189,8 @@ type fakeBackendPluginManager struct { registeredPlugins []string } -func (f *fakeBackendPluginManager) Register(descriptor backendplugin.PluginDescriptor) error { - f.registeredPlugins = append(f.registeredPlugins, descriptor.PluginID()) +func (f *fakeBackendPluginManager) Register(pluginID string, factory backendplugin.PluginFactoryFunc) error { + f.registeredPlugins = append(f.registeredPlugins, pluginID) return nil } @@ -198,11 +198,11 @@ func (f *fakeBackendPluginManager) StartPlugin(ctx context.Context, pluginID str return nil } -func (f *fakeBackendPluginManager) CollectMetrics(ctx context.Context, pluginID string) (*backendplugin.CollectMetricsResult, error) { +func (f *fakeBackendPluginManager) CollectMetrics(ctx context.Context, pluginID string) (*backend.CollectMetricsResult, error) { return nil, nil } -func (f *fakeBackendPluginManager) CheckHealth(ctx context.Context, pCtx backend.PluginContext) (*backendplugin.CheckHealthResult, error) { +func (f *fakeBackendPluginManager) CheckHealth(ctx context.Context, pCtx backend.PluginContext) (*backend.CheckHealthResult, error) { return nil, nil } diff --git a/pkg/plugins/renderer_plugin.go b/pkg/plugins/renderer_plugin.go index fb55e6feb43..fcf7829ddde 100644 --- a/pkg/plugins/renderer_plugin.go +++ b/pkg/plugins/renderer_plugin.go @@ -8,6 +8,7 @@ import ( pluginModel "github.com/grafana/grafana-plugin-model/go/renderer" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/plugins/backendplugin" + "github.com/grafana/grafana/pkg/plugins/backendplugin/grpcplugin" "github.com/grafana/grafana/pkg/plugins/backendplugin/pluginextensionv2" "github.com/grafana/grafana/pkg/util/errutil" ) @@ -34,11 +35,11 @@ func (r *RendererPlugin) Load(decoder *json.Decoder, pluginDir string, backendPl cmd := ComposePluginStartCommand("plugin_start") fullpath := path.Join(r.PluginDir, cmd) - descriptor := backendplugin.NewRendererPluginDescriptor(r.Id, fullpath, backendplugin.PluginStartFuncs{ + factory := grpcplugin.NewRendererPlugin(r.Id, fullpath, grpcplugin.PluginStartFuncs{ OnLegacyStart: r.onLegacyPluginStart, OnStart: r.onPluginStart, }) - if err := backendPluginManager.Register(descriptor); err != nil { + if err := backendPluginManager.Register(r.Id, factory); err != nil { return errutil.Wrapf(err, "Failed to register backend plugin") } @@ -54,12 +55,12 @@ func (r *RendererPlugin) Start(ctx context.Context) error { return nil } -func (r *RendererPlugin) onLegacyPluginStart(pluginID string, client *backendplugin.LegacyClient, logger log.Logger) error { +func (r *RendererPlugin) onLegacyPluginStart(pluginID string, client *grpcplugin.LegacyClient, logger log.Logger) error { r.GrpcPluginV1 = client.RendererPlugin return nil } -func (r *RendererPlugin) onPluginStart(pluginID string, client *backendplugin.Client, logger log.Logger) error { +func (r *RendererPlugin) onPluginStart(pluginID string, client *grpcplugin.Client, logger log.Logger) error { r.GrpcPluginV2 = client.RendererPlugin return nil } diff --git a/pkg/plugins/transform_plugin.go b/pkg/plugins/transform_plugin.go index 2f864b82c43..b62cc3e5258 100644 --- a/pkg/plugins/transform_plugin.go +++ b/pkg/plugins/transform_plugin.go @@ -7,12 +7,14 @@ import ( "path" "strconv" + sdkgrpcplugin "github.com/grafana/grafana-plugin-sdk-go/backend/grpcplugin" "github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2" "github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/plugins/backendplugin" + "github.com/grafana/grafana/pkg/plugins/backendplugin/grpcplugin" "github.com/grafana/grafana/pkg/plugins/datasource/wrapper" "github.com/grafana/grafana/pkg/tsdb" "github.com/grafana/grafana/pkg/util/errutil" @@ -37,10 +39,10 @@ func (p *TransformPlugin) Load(decoder *json.Decoder, pluginDir string, backendP cmd := ComposePluginStartCommand(p.Executable) fullpath := path.Join(p.PluginDir, cmd) - descriptor := backendplugin.NewBackendPluginDescriptor(p.Id, fullpath, backendplugin.PluginStartFuncs{ + factory := grpcplugin.NewBackendPlugin(p.Id, fullpath, grpcplugin.PluginStartFuncs{ OnStart: p.onPluginStart, }) - if err := backendPluginManager.Register(descriptor); err != nil { + if err := backendPluginManager.Register(p.Id, factory); err != nil { return errutil.Wrapf(err, "Failed to register backend plugin") } @@ -49,7 +51,7 @@ func (p *TransformPlugin) Load(decoder *json.Decoder, pluginDir string, backendP return nil } -func (p *TransformPlugin) onPluginStart(pluginID string, client *backendplugin.Client, logger log.Logger) error { +func (p *TransformPlugin) onPluginStart(pluginID string, client *grpcplugin.Client, logger log.Logger) error { p.TransformWrapper = NewTransformWrapper(logger, client.TransformPlugin) if client.DataPlugin != nil { @@ -65,12 +67,12 @@ func (p *TransformPlugin) onPluginStart(pluginID string, client *backendplugin.C // Wrapper Code // ... -func NewTransformWrapper(log log.Logger, plugin backendplugin.TransformPlugin) *TransformWrapper { +func NewTransformWrapper(log log.Logger, plugin sdkgrpcplugin.TransformClient) *TransformWrapper { return &TransformWrapper{plugin, log, &transformCallback{log}} } type TransformWrapper struct { - backendplugin.TransformPlugin + sdkgrpcplugin.TransformClient logger log.Logger callback *transformCallback } @@ -100,7 +102,7 @@ func (tw *TransformWrapper) Transform(ctx context.Context, query *tsdb.TsdbQuery }, }) } - pbRes, err := tw.TransformPlugin.TransformData(ctx, pbQuery, tw.callback) + pbRes, err := tw.TransformClient.TransformData(ctx, pbQuery, tw.callback) if err != nil { return nil, err } diff --git a/pkg/setting/setting.go b/pkg/setting/setting.go index b8d568894a7..7845f46b051 100644 --- a/pkg/setting/setting.go +++ b/pkg/setting/setting.go @@ -228,6 +228,16 @@ type Cfg struct { AppSubUrl string ServeFromSubPath bool + // build + BuildVersion string + BuildCommit string + BuildBranch string + BuildStamp int64 + IsEnterprise bool + + // packaging + Packaging string + // Paths ProvisioningPath string DataPath string @@ -607,6 +617,13 @@ func (cfg *Cfg) Load(args *CommandLineArgs) error { // Temporary keep global, to make refactor in steps Raw = cfg.Raw + cfg.BuildVersion = BuildVersion + cfg.BuildCommit = BuildCommit + cfg.BuildStamp = BuildStamp + cfg.BuildBranch = BuildBranch + cfg.IsEnterprise = IsEnterprise + cfg.Packaging = Packaging + ApplicationName = APP_NAME Env, err = valueAsString(iniFile.Section(""), "app_mode", "development")