From c0f3b2929c8f62104745adf65968973c42e96237 Mon Sep 17 00:00:00 2001 From: Marcus Efraimsson Date: Thu, 11 Jun 2020 16:14:05 +0200 Subject: [PATCH] Backend plugins: Refactor to allow shared contract between core and external backend plugins (#25472) Refactor to allow shared contract between core and external backend plugins allowing core backend data sources in Grafana to be implemented in same way as an external backend plugin. Use v0.67.0 of sdk. Add tests for verifying plugin is restarted when process is killed. Enable strict linting for backendplugin packages --- .circleci/config.yml | 4 +- Makefile | 12 +- go.mod | 2 +- go.sum | 4 +- pkg/api/api.go | 2 +- pkg/api/datasources.go | 44 +- pkg/api/plugins.go | 54 ++- pkg/plugins/backendplugin/backend_plugin.go | 240 ---------- pkg/plugins/backendplugin/contracts.go | 123 ------ .../backendplugin/coreplugin/core_plugin.go | 81 ++++ .../coreplugin/core_plugin_test.go | 67 +++ .../coreplugin/query_endpoint_adapter.go | 112 +++++ .../backendplugin/{ => grpcplugin}/client.go | 63 +-- .../backendplugin/grpcplugin/client_v1.go | 84 ++++ .../backendplugin/grpcplugin/client_v2.go | 212 +++++++++ .../backendplugin/grpcplugin/grpc_plugin.go | 136 ++++++ .../{ => grpcplugin}/log_wrapper.go | 2 +- .../{ => grpcplugin}/log_wrapper_test.go | 2 +- pkg/plugins/backendplugin/instrumentation.go | 44 +- pkg/plugins/backendplugin/manager.go | 240 +++++----- pkg/plugins/backendplugin/manager_test.go | 410 ++++++++++++++++++ pkg/plugins/backendplugin/plugin.go | 30 ++ .../backendplugin/resource_response_stream.go | 57 +++ .../wrapper/datasource_plugin_wrapper_v2.go | 19 +- pkg/plugins/datasource_plugin.go | 15 +- pkg/plugins/plugins_test.go | 8 +- pkg/plugins/renderer_plugin.go | 9 +- pkg/plugins/transform_plugin.go | 14 +- pkg/setting/setting.go | 17 + 29 files changed, 1495 insertions(+), 612 deletions(-) delete mode 100644 pkg/plugins/backendplugin/backend_plugin.go delete mode 100644 pkg/plugins/backendplugin/contracts.go create mode 100644 pkg/plugins/backendplugin/coreplugin/core_plugin.go create mode 100644 pkg/plugins/backendplugin/coreplugin/core_plugin_test.go create mode 100644 pkg/plugins/backendplugin/coreplugin/query_endpoint_adapter.go rename pkg/plugins/backendplugin/{ => grpcplugin}/client.go (72%) create mode 100644 pkg/plugins/backendplugin/grpcplugin/client_v1.go create mode 100644 pkg/plugins/backendplugin/grpcplugin/client_v2.go create mode 100644 pkg/plugins/backendplugin/grpcplugin/grpc_plugin.go rename pkg/plugins/backendplugin/{ => grpcplugin}/log_wrapper.go (99%) rename pkg/plugins/backendplugin/{ => grpcplugin}/log_wrapper_test.go (97%) create mode 100644 pkg/plugins/backendplugin/manager_test.go create mode 100644 pkg/plugins/backendplugin/plugin.go create mode 100644 pkg/plugins/backendplugin/resource_response_stream.go diff --git a/.circleci/config.yml b/.circleci/config.yml index d61a223e149..1ebc6586599 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -828,9 +828,11 @@ jobs: -E varcheck -E goconst -E errcheck -E staticcheck ./pkg/... ./scripts/go/bin/revive -formatter stylish -config ./scripts/go/configs/revive.toml ./pkg/... ./scripts/go/bin/revive -formatter stylish -config ./scripts/go/configs/revive-strict.toml \ + -exclude ./pkg/plugins/backendplugin/pluginextensionv2/... \ ./pkg/services/alerting/... \ ./pkg/services/provisioning/datasources/... \ - ./pkg/services/provisioning/dashboards/... + ./pkg/services/provisioning/dashboards/... \ + ./pkg/plugins/backendplugin/... ./scripts/go/bin/gosec -quiet -exclude=G104,G107,G108,G201,G202,G204,G301,G304,G401,G402,G501 \ -conf=./scripts/go/configs/gosec.json ./pkg/... diff --git a/Makefile b/Makefile index 5f60a7a714c..49ff6b2d552 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ -include local/Makefile -.PHONY: all deps-go deps-js deps build-go build-server build-cli build-js build build-docker-dev build-docker-full lint-go gosec revive golangci-lint go-vet test-go test-js test run run-frontend clean devenv devenv-down revive-alerting protobuf help +.PHONY: all deps-go deps-js deps build-go build-server build-cli build-js build build-docker-dev build-docker-full lint-go gosec revive golangci-lint go-vet test-go test-js test run run-frontend clean devenv devenv-down revive-strict protobuf help GO = GO111MODULE=on go GO_FILES ?= ./pkg/... @@ -81,14 +81,16 @@ revive: scripts/go/bin/revive -config ./scripts/go/configs/revive.toml \ $(GO_FILES) -revive-alerting: scripts/go/bin/revive - @echo "lint alerting via revive" +revive-strict: scripts/go/bin/revive + @echo "lint via revive (strict)" @scripts/go/bin/revive \ -formatter stylish \ -config ./scripts/go/configs/revive-strict.toml \ + -exclude ./pkg/plugins/backendplugin/pluginextensionv2/... \ ./pkg/services/alerting/... \ ./pkg/services/provisioning/datasources/... \ - ./pkg/services/provisioning/dashboards/... + ./pkg/services/provisioning/dashboards/... \ + ./pkg/plugins/backendplugin/... scripts/go/bin/golangci-lint: scripts/go/go.mod @cd scripts/go; \ @@ -116,7 +118,7 @@ go-vet: @echo "lint via go vet" @$(GO) vet $(GO_FILES) -lint-go: go-vet golangci-lint revive revive-alerting gosec ## Run all code checks for backend. +lint-go: go-vet golangci-lint revive revive-strict gosec ## Run all code checks for backend. # with disabled SC1071 we are ignored some TCL,Expect `/usr/bin/env expect` scripts shellcheck: $(SH_FILES) ## Run checks for shell scripts. diff --git a/go.mod b/go.mod index 78e47ce2d83..a7b2dfd913c 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,7 @@ require ( github.com/gorilla/websocket v1.4.1 github.com/gosimple/slug v1.4.2 github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4 - github.com/grafana/grafana-plugin-sdk-go v0.66.0 + github.com/grafana/grafana-plugin-sdk-go v0.67.0 github.com/hashicorp/go-hclog v0.0.0-20180709165350-ff2cf002a8dd github.com/hashicorp/go-plugin v1.2.2 github.com/hashicorp/go-version v1.1.0 diff --git a/go.sum b/go.sum index 35d101ccd22..0e6ce6f4043 100644 --- a/go.sum +++ b/go.sum @@ -155,8 +155,8 @@ github.com/gosimple/slug v1.4.2 h1:jDmprx3q/9Lfk4FkGZtvzDQ9Cj9eAmsjzeQGp24PeiQ= github.com/gosimple/slug v1.4.2/go.mod h1:ER78kgg1Mv0NQGlXiDe57DpCyfbNywXXZ9mIorhxAf0= github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4 h1:SPdxCL9BChFTlyi0Khv64vdCW4TMna8+sxL7+Chx+Ag= github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4/go.mod h1:nc0XxBzjeGcrMltCDw269LoWF9S8ibhgxolCdA1R8To= -github.com/grafana/grafana-plugin-sdk-go v0.66.0 h1:Tx8pchA74QUtxtN8moavf0FJoAZbnbW3Fe8RVfSKUoY= -github.com/grafana/grafana-plugin-sdk-go v0.66.0/go.mod h1:w855JyiC5PDP3naWUJP0h/vY8RlzlE4+4fodyoXph+4= +github.com/grafana/grafana-plugin-sdk-go v0.67.0 h1:2kvI9kROmp/pXRrDQSEvcpR7Zonle7HjXgOdH70P2bw= +github.com/grafana/grafana-plugin-sdk-go v0.67.0/go.mod h1:w855JyiC5PDP3naWUJP0h/vY8RlzlE4+4fodyoXph+4= github.com/grpc-ecosystem/go-grpc-middleware v1.2.0 h1:0IKlLyQ3Hs9nDaiK5cSHAGmcQEIC8l2Ts1u6x5Dfrqg= github.com/grpc-ecosystem/go-grpc-middleware v1.2.0/go.mod h1:mJzapYve32yjrKlk9GbyCZHuPgZsrbyIbyKhSzOpg6s= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho= diff --git a/pkg/api/api.go b/pkg/api/api.go index d5d169eaa08..d2fd5dc499a 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -266,7 +266,7 @@ func (hs *HTTPServer) registerRoutes() { apiRoute.Any("/datasources/proxy/:id", reqSignedIn, hs.ProxyDataSourceRequest) apiRoute.Any("/datasources/:id/resources", hs.CallDatasourceResource) apiRoute.Any("/datasources/:id/resources/*", hs.CallDatasourceResource) - apiRoute.Any("/datasources/:id/health", hs.CheckDatasourceHealth) + apiRoute.Any("/datasources/:id/health", Wrap(hs.CheckDatasourceHealth)) // Folders apiRoute.Group("/folders", func(folderRoute routing.RouteRegister) { diff --git a/pkg/api/datasources.go b/pkg/api/datasources.go index e99cdd6f670..bfffa904a6c 100644 --- a/pkg/api/datasources.go +++ b/pkg/api/datasources.go @@ -11,7 +11,6 @@ import ( "github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/plugins" - "github.com/grafana/grafana/pkg/plugins/backendplugin" "github.com/grafana/grafana/pkg/plugins/datasource/wrapper" "github.com/grafana/grafana/pkg/util" ) @@ -342,28 +341,25 @@ func convertModelToDtos(ds *models.DataSource) dtos.DataSource { // CheckDatasourceHealth sends a health check request to the plugin datasource // /api/datasource/:id/health -func (hs *HTTPServer) CheckDatasourceHealth(c *models.ReqContext) { +func (hs *HTTPServer) CheckDatasourceHealth(c *models.ReqContext) Response { datasourceID := c.ParamsInt64("id") ds, err := hs.DatasourceCache.GetDatasource(datasourceID, c.SignedInUser, c.SkipCache) if err != nil { if err == models.ErrDataSourceAccessDenied { - c.JsonApiErr(403, "Access denied to datasource", err) - return + return Error(403, "Access denied to datasource", err) } - c.JsonApiErr(500, "Unable to load datasource metadata", err) - return + return Error(500, "Unable to load datasource metadata", err) } plugin, ok := hs.PluginManager.GetDatasource(ds.Type) if !ok { - c.JsonApiErr(500, "Unable to find datasource plugin", err) - return + return Error(500, "Unable to find datasource plugin", err) } dsInstanceSettings, err := wrapper.ModelToInstanceSettings(ds) if err != nil { - c.JsonApiErr(500, "Unable to get datasource model", err) + return Error(500, "Unable to get datasource model", err) } pCtx := backend.PluginContext{ User: wrapper.BackendUserFromSignedInUser(c.SignedInUser), @@ -374,25 +370,7 @@ func (hs *HTTPServer) CheckDatasourceHealth(c *models.ReqContext) { resp, err := hs.BackendPluginManager.CheckHealth(c.Req.Context(), pCtx) if err != nil { - if err == backendplugin.ErrPluginNotRegistered { - c.JsonApiErr(404, "Plugin not found", err) - return - } - - // Return status unknown instead? - if err == backendplugin.ErrDiagnosticsNotSupported { - c.JsonApiErr(404, "Health check not implemented", err) - return - } - - // Return status unknown or error instead? - if err == backendplugin.ErrHealthCheckFailed { - c.JsonApiErr(500, "Plugin health check failed", err) - return - } - - c.JsonApiErr(500, "Plugin healthcheck returned an unknown error", err) - return + return translatePluginRequestErrorToAPIError(err) } payload := map[string]interface{}{ @@ -405,17 +383,15 @@ func (hs *HTTPServer) CheckDatasourceHealth(c *models.ReqContext) { var jsonDetails map[string]interface{} err = json.Unmarshal(resp.JSONDetails, &jsonDetails) if err != nil { - c.JsonApiErr(500, "Failed to unmarshal detailed response from backend plugin", err) - return + return Error(500, "Failed to unmarshal detailed response from backend plugin", err) } payload["details"] = jsonDetails } - if resp.Status != backendplugin.HealthStatusOk { - c.JSON(503, payload) - return + if resp.Status != backend.HealthStatusOk { + return JSON(503, payload) } - c.JSON(200, payload) + return JSON(200, payload) } diff --git a/pkg/api/plugins.go b/pkg/api/plugins.go index 00c23585f14..bdbbc373140 100644 --- a/pkg/api/plugins.go +++ b/pkg/api/plugins.go @@ -266,20 +266,12 @@ func (hs *HTTPServer) CollectPluginMetrics(c *models.ReqContext) Response { pluginID := c.Params("pluginId") plugin, exists := plugins.Plugins[pluginID] if !exists { - return Error(404, "Plugin not found, no installed plugin with that id", nil) + return Error(404, "Plugin not found", nil) } resp, err := hs.BackendPluginManager.CollectMetrics(c.Req.Context(), plugin.Id) if err != nil { - if err == backendplugin.ErrPluginNotRegistered { - return Error(404, "Plugin not found", err) - } - - if err == backendplugin.ErrDiagnosticsNotSupported { - return Error(404, "Health check not implemented", err) - } - - return Error(500, "Collect plugin metrics failed", err) + return translatePluginRequestErrorToAPIError(err) } headers := make(http.Header) @@ -300,7 +292,7 @@ func (hs *HTTPServer) CheckHealth(c *models.ReqContext) Response { pCtx, err := hs.getPluginContext(pluginID, c.SignedInUser) if err != nil { if err == ErrPluginNotFound { - return Error(404, "Plugin not found, no installed plugin with that id", nil) + return Error(404, "Plugin not found", nil) } return Error(500, "Failed to get plugin settings", err) @@ -308,21 +300,7 @@ func (hs *HTTPServer) CheckHealth(c *models.ReqContext) Response { resp, err := hs.BackendPluginManager.CheckHealth(c.Req.Context(), pCtx) if err != nil { - if err == backendplugin.ErrPluginNotRegistered { - return Error(404, "Plugin not found", err) - } - - // Return status unknown instead? - if err == backendplugin.ErrDiagnosticsNotSupported { - return Error(404, "Health check not implemented", err) - } - - // Return status unknown or error instead? - if err == backendplugin.ErrHealthCheckFailed { - return Error(500, "Plugin health check failed", err) - } - - return Error(500, "Plugin healthcheck returned an unknown error", err) + return translatePluginRequestErrorToAPIError(err) } payload := map[string]interface{}{ @@ -341,7 +319,7 @@ func (hs *HTTPServer) CheckHealth(c *models.ReqContext) Response { payload["details"] = jsonDetails } - if resp.Status != backendplugin.HealthStatusOk { + if resp.Status != backend.HealthStatusOk { return JSON(503, payload) } @@ -357,7 +335,7 @@ func (hs *HTTPServer) CallResource(c *models.ReqContext) { pCtx, err := hs.getPluginContext(pluginID, c.SignedInUser) if err != nil { if err == ErrPluginNotFound { - c.JsonApiErr(404, "Plugin not found, no installed plugin with that id", nil) + c.JsonApiErr(404, "Plugin not found", nil) return } @@ -385,3 +363,23 @@ func (hs *HTTPServer) getCachedPluginSettings(pluginID string, user *models.Sign hs.CacheService.Set(cacheKey, query.Result, time.Second*5) return query.Result, nil } + +func translatePluginRequestErrorToAPIError(err error) Response { + if errors.Is(err, backendplugin.ErrPluginNotRegistered) { + return Error(404, "Plugin not found", err) + } + + if errors.Is(err, backendplugin.ErrMethodNotImplemented) { + return Error(404, "Not found", err) + } + + if errors.Is(err, backendplugin.ErrHealthCheckFailed) { + return Error(500, "Plugin health check failed", err) + } + + if errors.Is(err, backendplugin.ErrPluginUnavailable) { + return Error(503, "Plugin unavailable", err) + } + + return Error(500, "Plugin request failed", err) +} diff --git a/pkg/plugins/backendplugin/backend_plugin.go b/pkg/plugins/backendplugin/backend_plugin.go deleted file mode 100644 index fdeb12a9d26..00000000000 --- a/pkg/plugins/backendplugin/backend_plugin.go +++ /dev/null @@ -1,240 +0,0 @@ -package backendplugin - -import ( - "context" - "errors" - "net/http" - - datasourceV1 "github.com/grafana/grafana-plugin-model/go/datasource" - rendererV1 "github.com/grafana/grafana-plugin-model/go/renderer" - "github.com/grafana/grafana-plugin-sdk-go/backend" - "github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2" - "github.com/grafana/grafana/pkg/infra/log" - "github.com/grafana/grafana/pkg/plugins/backendplugin/pluginextensionv2" - "github.com/grafana/grafana/pkg/util/errutil" - plugin "github.com/hashicorp/go-plugin" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) - -// BackendPlugin a registered backend plugin. -type BackendPlugin struct { - id string - executablePath string - managed bool - clientFactory func() *plugin.Client - client *plugin.Client - logger log.Logger - startFns PluginStartFuncs - diagnostics DiagnosticsPlugin - resource ResourcePlugin -} - -func (p *BackendPlugin) start(ctx context.Context) error { - p.client = p.clientFactory() - rpcClient, err := p.client.Client() - if err != nil { - return err - } - - var legacyClient *LegacyClient - var client *Client - - if p.client.NegotiatedVersion() > 1 { - rawDiagnostics, err := rpcClient.Dispense("diagnostics") - if err != nil { - return err - } - - rawResource, err := rpcClient.Dispense("resource") - if err != nil { - return err - } - - rawData, err := rpcClient.Dispense("data") - if err != nil { - return err - } - - rawTransform, err := rpcClient.Dispense("transform") - if err != nil { - return err - } - - rawRenderer, err := rpcClient.Dispense("renderer") - if err != nil { - return err - } - - if rawDiagnostics != nil { - if plugin, ok := rawDiagnostics.(DiagnosticsPlugin); ok { - p.diagnostics = plugin - } - } - - client = &Client{} - if rawResource != nil { - if plugin, ok := rawResource.(ResourcePlugin); ok { - p.resource = plugin - client.ResourcePlugin = plugin - } - } - - if rawData != nil { - if plugin, ok := rawData.(DataPlugin); ok { - client.DataPlugin = plugin - } - } - - if rawTransform != nil { - if plugin, ok := rawTransform.(TransformPlugin); ok { - client.TransformPlugin = plugin - } - } - - if rawRenderer != nil { - if plugin, ok := rawRenderer.(pluginextensionv2.RendererPlugin); ok { - client.RendererPlugin = plugin - } - } - } else { - p.logger.Warn("Plugin uses a deprecated version of Grafana's backend plugin system which will be removed in a future release. " + - "Consider upgrading to a newer plugin version or reach out to the plugin repository/developer and request an upgrade.") - raw, err := rpcClient.Dispense(p.id) - if err != nil { - return err - } - - legacyClient = &LegacyClient{} - if plugin, ok := raw.(datasourceV1.DatasourcePlugin); ok { - legacyClient.DatasourcePlugin = plugin - } - - if plugin, ok := raw.(rendererV1.RendererPlugin); ok { - legacyClient.RendererPlugin = plugin - } - } - - if legacyClient == nil && client == nil { - return errors.New("no compatible plugin implementation found") - } - - if legacyClient != nil && p.startFns.OnLegacyStart != nil { - if err := p.startFns.OnLegacyStart(p.id, legacyClient, p.logger); err != nil { - return err - } - } - - if client != nil && p.startFns.OnStart != nil { - if err := p.startFns.OnStart(p.id, client, p.logger); err != nil { - return err - } - } - - return nil -} - -func (p *BackendPlugin) stop() error { - if p.client != nil { - p.client.Kill() - } - return nil -} - -// supportsDiagnostics return whether backend plugin supports diagnostics like metrics and health check. -func (p *BackendPlugin) supportsDiagnostics() bool { - return p.diagnostics != nil -} - -// CollectMetrics implements the collector.Collector interface. -func (p *BackendPlugin) CollectMetrics(ctx context.Context) (*pluginv2.CollectMetricsResponse, error) { - if p.diagnostics == nil || p.client == nil || p.client.Exited() { - return &pluginv2.CollectMetricsResponse{ - Metrics: &pluginv2.CollectMetricsResponse_Payload{}, - }, nil - } - - var res *pluginv2.CollectMetricsResponse - err := InstrumentPluginRequest(p.id, "metrics", func() error { - var innerErr error - res, innerErr = p.diagnostics.CollectMetrics(ctx, &pluginv2.CollectMetricsRequest{}) - - return innerErr - }) - - if err != nil { - if st, ok := status.FromError(err); ok { - if st.Code() == codes.Unimplemented { - return &pluginv2.CollectMetricsResponse{ - Metrics: &pluginv2.CollectMetricsResponse_Payload{}, - }, nil - } - } - - return nil, err - } - - return res, nil -} - -var toProto = backend.ToProto() - -func (p *BackendPlugin) checkHealth(ctx context.Context, pCtx backend.PluginContext) (*pluginv2.CheckHealthResponse, error) { - if p.diagnostics == nil || p.client == nil || p.client.Exited() { - return &pluginv2.CheckHealthResponse{ - Status: pluginv2.CheckHealthResponse_UNKNOWN, - }, nil - } - - protoContext := toProto.PluginContext(pCtx) - - var res *pluginv2.CheckHealthResponse - err := InstrumentPluginRequest(p.id, "checkhealth", func() error { - var innerErr error - res, innerErr = p.diagnostics.CheckHealth(ctx, &pluginv2.CheckHealthRequest{PluginContext: protoContext}) - return innerErr - }) - - if err != nil { - if st, ok := status.FromError(err); ok { - if st.Code() == codes.Unimplemented { - return &pluginv2.CheckHealthResponse{ - Status: pluginv2.CheckHealthResponse_UNKNOWN, - Message: "Health check not implemented", - }, nil - } - } - return nil, err - } - - return res, nil -} - -func (p *BackendPlugin) callResource(ctx context.Context, req *backend.CallResourceRequest) (callResourceResultStream, error) { - p.logger.Debug("Calling resource", "path", req.Path, "method", req.Method) - - if p.resource == nil || p.client == nil || p.client.Exited() { - return nil, errors.New("plugin not running, cannot call resource") - } - - protoReq := toProto.CallResourceRequest(req) - - protoStream, err := p.resource.CallResource(ctx, protoReq) - if err != nil { - if st, ok := status.FromError(err); ok { - if st.Code() == codes.Unimplemented { - return &singleCallResourceResult{ - result: &CallResourceResult{ - Status: http.StatusNotImplemented, - }, - }, nil - } - } - - return nil, errutil.Wrap("Failed to call resource", err) - } - - return &callResourceResultStreamImpl{ - stream: protoStream, - }, nil -} diff --git a/pkg/plugins/backendplugin/contracts.go b/pkg/plugins/backendplugin/contracts.go deleted file mode 100644 index 37ae086de99..00000000000 --- a/pkg/plugins/backendplugin/contracts.go +++ /dev/null @@ -1,123 +0,0 @@ -package backendplugin - -import ( - "strconv" - - "github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2" -) - -// HealthStatus is the status of the plugin. -type HealthStatus int - -const ( - // HealthStatusUnknown means the status of the plugin is unknown. - HealthStatusUnknown HealthStatus = iota - // HealthStatusOk means the status of the plugin is good. - HealthStatusOk - // HealthStatusError means the plugin is in an error state. - HealthStatusError -) - -var healthStatusNames = map[int]string{ - 0: "UNKNOWN", - 1: "OK", - 2: "ERROR", -} - -func (hs HealthStatus) String() string { - s, exists := healthStatusNames[int(hs)] - if exists { - return s - } - return strconv.Itoa(int(hs)) -} - -// CheckHealthResult check health result. -type CheckHealthResult struct { - Status HealthStatus - Message string - JSONDetails []byte -} - -func checkHealthResultFromProto(protoResp *pluginv2.CheckHealthResponse) *CheckHealthResult { - status := HealthStatusUnknown - switch protoResp.Status { - case pluginv2.CheckHealthResponse_ERROR: - status = HealthStatusError - case pluginv2.CheckHealthResponse_OK: - status = HealthStatusOk - } - - return &CheckHealthResult{ - Status: status, - Message: protoResp.Message, - JSONDetails: protoResp.JsonDetails, - } -} - -func collectMetricsResultFromProto(protoResp *pluginv2.CollectMetricsResponse) *CollectMetricsResult { - var prometheusMetrics []byte - - if protoResp.Metrics != nil { - prometheusMetrics = protoResp.Metrics.Prometheus - } - - return &CollectMetricsResult{ - PrometheusMetrics: prometheusMetrics, - } -} - -// CollectMetricsResult collect metrics result. -type CollectMetricsResult struct { - PrometheusMetrics []byte -} - -// CallResourceResult call resource result. -type CallResourceResult struct { - Status int - Headers map[string][]string - Body []byte -} - -type callResourceResultStream interface { - Recv() (*CallResourceResult, error) - Close() error -} - -type callResourceResultStreamImpl struct { - stream pluginv2.Resource_CallResourceClient -} - -func (s *callResourceResultStreamImpl) Recv() (*CallResourceResult, error) { - protoResp, err := s.stream.Recv() - if err != nil { - return nil, err - } - - respHeaders := map[string][]string{} - for key, values := range protoResp.Headers { - respHeaders[key] = values.Values - } - - return &CallResourceResult{ - Headers: respHeaders, - Body: protoResp.Body, - Status: int(protoResp.Code), - }, nil -} - -func (s *callResourceResultStreamImpl) Close() error { - return s.stream.CloseSend() -} - -type singleCallResourceResult struct { - result *CallResourceResult -} - -func (s *singleCallResourceResult) Recv() (*CallResourceResult, error) { - return s.result, nil -} - -func (s *singleCallResourceResult) Close() error { - return nil -} diff --git a/pkg/plugins/backendplugin/coreplugin/core_plugin.go b/pkg/plugins/backendplugin/coreplugin/core_plugin.go new file mode 100644 index 00000000000..e3168539e96 --- /dev/null +++ b/pkg/plugins/backendplugin/coreplugin/core_plugin.go @@ -0,0 +1,81 @@ +package coreplugin + +import ( + "context" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/plugins/backendplugin" + "github.com/grafana/grafana/pkg/tsdb" +) + +type corePlugin struct { + pluginID string + logger log.Logger + backend.CheckHealthHandler + backend.CallResourceHandler + backend.QueryDataHandler +} + +// New returns a new backendplugin.PluginFactoryFunc for creating a core (built-in) backendplugin.Plugin. +func New(opts backend.ServeOpts) backendplugin.PluginFactoryFunc { + return backendplugin.PluginFactoryFunc(func(pluginID string, logger log.Logger, env []string) (backendplugin.Plugin, error) { + return &corePlugin{ + pluginID: pluginID, + logger: logger, + CheckHealthHandler: opts.CheckHealthHandler, + CallResourceHandler: opts.CallResourceHandler, + QueryDataHandler: opts.QueryDataHandler, + }, nil + }) +} + +func (cp *corePlugin) PluginID() string { + return cp.pluginID +} + +func (cp *corePlugin) Logger() log.Logger { + return cp.logger +} + +func (cp *corePlugin) Start(ctx context.Context) error { + if cp.QueryDataHandler != nil { + tsdb.RegisterTsdbQueryEndpoint(cp.pluginID, func(dsInfo *models.DataSource) (tsdb.TsdbQueryEndpoint, error) { + return newQueryEndpointAdapter(cp.pluginID, cp.logger, backendplugin.InstrumentQueryDataHandler(cp.QueryDataHandler)), nil + }) + } + return nil +} + +func (cp *corePlugin) Stop(ctx context.Context) error { + return nil +} + +func (cp *corePlugin) IsManaged() bool { + return false +} + +func (cp *corePlugin) Exited() bool { + return false +} + +func (cp *corePlugin) CollectMetrics(ctx context.Context) (*backend.CollectMetricsResult, error) { + return nil, backendplugin.ErrMethodNotImplemented +} + +func (cp *corePlugin) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { + if cp.CheckHealthHandler != nil { + return cp.CheckHealthHandler.CheckHealth(ctx, req) + } + + return nil, backendplugin.ErrMethodNotImplemented +} + +func (cp *corePlugin) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { + if cp.CallResourceHandler != nil { + return cp.CallResourceHandler.CallResource(ctx, req, sender) + } + + return backendplugin.ErrMethodNotImplemented +} diff --git a/pkg/plugins/backendplugin/coreplugin/core_plugin_test.go b/pkg/plugins/backendplugin/coreplugin/core_plugin_test.go new file mode 100644 index 00000000000..d9fa8be7f2a --- /dev/null +++ b/pkg/plugins/backendplugin/coreplugin/core_plugin_test.go @@ -0,0 +1,67 @@ +package coreplugin_test + +import ( + "context" + "testing" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/plugins/backendplugin" + "github.com/grafana/grafana/pkg/plugins/backendplugin/coreplugin" + "github.com/stretchr/testify/require" +) + +func TestCorePlugin(t *testing.T) { + t.Run("New core plugin with empty opts should return expected values", func(t *testing.T) { + factory := coreplugin.New(backend.ServeOpts{}) + p, err := factory("plugin", log.New("test"), nil) + require.NoError(t, err) + require.NotNil(t, p) + require.NoError(t, p.Start(context.Background())) + require.NoError(t, p.Stop(context.Background())) + require.False(t, p.IsManaged()) + require.False(t, p.Exited()) + + _, err = p.CollectMetrics(context.Background()) + require.Equal(t, backendplugin.ErrMethodNotImplemented, err) + + _, err = p.CheckHealth(context.Background(), nil) + require.Equal(t, backendplugin.ErrMethodNotImplemented, err) + + err = p.CallResource(context.Background(), nil, nil) + require.Equal(t, backendplugin.ErrMethodNotImplemented, err) + }) + + t.Run("New core plugin with handlers set in opts should return expected values", func(t *testing.T) { + checkHealthCalled := false + callResourceCalled := false + factory := coreplugin.New(backend.ServeOpts{ + CheckHealthHandler: backend.CheckHealthHandlerFunc(func(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { + checkHealthCalled = true + return nil, nil + }), + CallResourceHandler: backend.CallResourceHandlerFunc(func(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { + callResourceCalled = true + return nil + }), + }) + p, err := factory("plugin", log.New("test"), nil) + require.NoError(t, err) + require.NotNil(t, p) + require.NoError(t, p.Start(context.Background())) + require.NoError(t, p.Stop(context.Background())) + require.False(t, p.IsManaged()) + require.False(t, p.Exited()) + + _, err = p.CollectMetrics(context.Background()) + require.Equal(t, backendplugin.ErrMethodNotImplemented, err) + + _, err = p.CheckHealth(context.Background(), &backend.CheckHealthRequest{}) + require.NoError(t, err) + require.True(t, checkHealthCalled) + + err = p.CallResource(context.Background(), &backend.CallResourceRequest{}, nil) + require.NoError(t, err) + require.True(t, callResourceCalled) + }) +} diff --git a/pkg/plugins/backendplugin/coreplugin/query_endpoint_adapter.go b/pkg/plugins/backendplugin/coreplugin/query_endpoint_adapter.go new file mode 100644 index 00000000000..de48040d31c --- /dev/null +++ b/pkg/plugins/backendplugin/coreplugin/query_endpoint_adapter.go @@ -0,0 +1,112 @@ +package coreplugin + +import ( + "context" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/plugins/datasource/wrapper" + "github.com/grafana/grafana/pkg/tsdb" +) + +func newQueryEndpointAdapter(pluginID string, logger log.Logger, handler backend.QueryDataHandler) tsdb.TsdbQueryEndpoint { + return &queryEndpointAdapter{ + pluginID: pluginID, + logger: logger, + handler: handler, + } +} + +type queryEndpointAdapter struct { + pluginID string + logger log.Logger + handler backend.QueryDataHandler +} + +func modelToInstanceSettings(ds *models.DataSource) (*backend.DataSourceInstanceSettings, error) { + jsonDataBytes, err := ds.JsonData.MarshalJSON() + if err != nil { + return nil, err + } + + return &backend.DataSourceInstanceSettings{ + ID: ds.Id, + Name: ds.Name, + URL: ds.Url, + Database: ds.Database, + User: ds.User, + BasicAuthEnabled: ds.BasicAuth, + BasicAuthUser: ds.BasicAuthUser, + JSONData: jsonDataBytes, + DecryptedSecureJSONData: ds.DecryptedValues(), + Updated: ds.Updated, + }, nil +} + +func (a *queryEndpointAdapter) Query(ctx context.Context, ds *models.DataSource, query *tsdb.TsdbQuery) (*tsdb.Response, error) { + instanceSettings, err := modelToInstanceSettings(ds) + if err != nil { + return nil, err + } + + req := &backend.QueryDataRequest{ + PluginContext: backend.PluginContext{ + OrgID: ds.OrgId, + PluginID: a.pluginID, + User: wrapper.BackendUserFromSignedInUser(query.User), + DataSourceInstanceSettings: instanceSettings, + }, + Queries: []backend.DataQuery{}, + } + + for _, q := range query.Queries { + modelJSON, err := q.Model.MarshalJSON() + if err != nil { + return nil, err + } + req.Queries = append(req.Queries, backend.DataQuery{ + RefID: q.RefId, + Interval: time.Duration(q.IntervalMs) * time.Millisecond, + MaxDataPoints: q.MaxDataPoints, + TimeRange: backend.TimeRange{ + From: query.TimeRange.GetFromAsTimeUTC(), + To: query.TimeRange.GetToAsTimeUTC(), + }, + QueryType: q.QueryType, + JSON: modelJSON, + }) + } + + resp, err := a.handler.QueryData(ctx, req) + if err != nil { + return nil, err + } + + tR := &tsdb.Response{ + Results: make(map[string]*tsdb.QueryResult, len(resp.Responses)), + } + + for refID, r := range resp.Responses { + qr := &tsdb.QueryResult{ + RefId: refID, + } + + for _, f := range r.Frames { + if f.RefID == "" { + f.RefID = refID + } + } + + qr.Dataframes = tsdb.NewDecodedDataFrames(r.Frames) + + if r.Error != nil { + qr.Error = r.Error + } + + tR.Results[refID] = qr + } + + return tR, nil +} diff --git a/pkg/plugins/backendplugin/client.go b/pkg/plugins/backendplugin/grpcplugin/client.go similarity index 72% rename from pkg/plugins/backendplugin/client.go rename to pkg/plugins/backendplugin/grpcplugin/client.go index 10a52876c7d..dc8897dcc34 100644 --- a/pkg/plugins/backendplugin/client.go +++ b/pkg/plugins/backendplugin/grpcplugin/client.go @@ -1,4 +1,4 @@ -package backendplugin +package grpcplugin import ( "os/exec" @@ -6,7 +6,9 @@ import ( datasourceV1 "github.com/grafana/grafana-plugin-model/go/datasource" rendererV1 "github.com/grafana/grafana-plugin-model/go/renderer" "github.com/grafana/grafana-plugin-sdk-go/backend/grpcplugin" + sdkgrpcplugin "github.com/grafana/grafana-plugin-sdk-go/backend/grpcplugin" "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/plugins/backendplugin" "github.com/grafana/grafana/pkg/plugins/backendplugin/pluginextensionv2" goplugin "github.com/hashicorp/go-plugin" ) @@ -26,8 +28,8 @@ var handshake = goplugin.HandshakeConfig{ ProtocolVersion: DefaultProtocolVersion, // The magic cookie values should NEVER be changed. - MagicCookieKey: grpcplugin.MagicCookieKey, - MagicCookieValue: grpcplugin.MagicCookieValue, + MagicCookieKey: sdkgrpcplugin.MagicCookieKey, + MagicCookieValue: sdkgrpcplugin.MagicCookieValue, } func newClientConfig(executablePath string, env []string, logger log.Logger, versionedPlugins map[int]goplugin.PluginSet) *goplugin.ClientConfig { @@ -72,18 +74,17 @@ func (pd PluginDescriptor) PluginID() string { // getV2PluginSet returns list of plugins supported on v2. func getV2PluginSet() goplugin.PluginSet { return goplugin.PluginSet{ - "diagnostics": &grpcplugin.DiagnosticsGRPCPlugin{}, - "resource": &grpcplugin.ResourceGRPCPlugin{}, - "data": &grpcplugin.DataGRPCPlugin{}, - "transform": &grpcplugin.TransformGRPCPlugin{}, + "diagnostics": &sdkgrpcplugin.DiagnosticsGRPCPlugin{}, + "resource": &sdkgrpcplugin.ResourceGRPCPlugin{}, + "data": &sdkgrpcplugin.DataGRPCPlugin{}, + "transform": &sdkgrpcplugin.TransformGRPCPlugin{}, "renderer": &pluginextensionv2.RendererGRPCPlugin{}, } } -// NewBackendPluginDescriptor creates a new backend plugin descriptor -// used for registering a backend datasource plugin. -func NewBackendPluginDescriptor(pluginID, executablePath string, startFns PluginStartFuncs) PluginDescriptor { - return PluginDescriptor{ +// NewBackendPlugin creates a new backend plugin factory used for registering a backend plugin. +func NewBackendPlugin(pluginID, executablePath string, startFns PluginStartFuncs) backendplugin.PluginFactoryFunc { + return New(PluginDescriptor{ pluginID: pluginID, executablePath: executablePath, managed: true, @@ -91,16 +92,15 @@ func NewBackendPluginDescriptor(pluginID, executablePath string, startFns Plugin DefaultProtocolVersion: { pluginID: &datasourceV1.DatasourcePluginImpl{}, }, - grpcplugin.ProtocolVersion: getV2PluginSet(), + sdkgrpcplugin.ProtocolVersion: getV2PluginSet(), }, startFns: startFns, - } + }) } -// NewRendererPluginDescriptor creates a new renderer plugin descriptor -// used for registering a backend renderer plugin. -func NewRendererPluginDescriptor(pluginID, executablePath string, startFns PluginStartFuncs) PluginDescriptor { - return PluginDescriptor{ +// NewRendererPlugin creates a new renderer plugin factory used for registering a backend renderer plugin. +func NewRendererPlugin(pluginID, executablePath string, startFns PluginStartFuncs) backendplugin.PluginFactoryFunc { + return New(PluginDescriptor{ pluginID: pluginID, executablePath: executablePath, managed: false, @@ -108,38 +108,21 @@ func NewRendererPluginDescriptor(pluginID, executablePath string, startFns Plugi DefaultProtocolVersion: { pluginID: &rendererV1.RendererPluginImpl{}, }, - grpcplugin.ProtocolVersion: getV2PluginSet(), + sdkgrpcplugin.ProtocolVersion: getV2PluginSet(), }, startFns: startFns, - } + }) } -type DiagnosticsPlugin interface { - grpcplugin.DiagnosticsClient -} - -type ResourcePlugin interface { - grpcplugin.ResourceClient -} - -type DataPlugin interface { - grpcplugin.DataClient -} - -type TransformPlugin interface { - grpcplugin.TransformClient -} - -// LegacyClient client for communicating with a plugin using the old plugin protocol. +// LegacyClient client for communicating with a plugin using the v1 plugin protocol. type LegacyClient struct { DatasourcePlugin datasourceV1.DatasourcePlugin RendererPlugin rendererV1.RendererPlugin } -// Client client for communicating with a plugin using the current plugin protocol. +// Client client for communicating with a plugin using the current (v2) plugin protocol. type Client struct { - ResourcePlugin ResourcePlugin - DataPlugin DataPlugin - TransformPlugin TransformPlugin + DataPlugin grpcplugin.DataClient + TransformPlugin grpcplugin.TransformClient RendererPlugin pluginextensionv2.RendererPlugin } diff --git a/pkg/plugins/backendplugin/grpcplugin/client_v1.go b/pkg/plugins/backendplugin/grpcplugin/client_v1.go new file mode 100644 index 00000000000..57ad97f1350 --- /dev/null +++ b/pkg/plugins/backendplugin/grpcplugin/client_v1.go @@ -0,0 +1,84 @@ +package grpcplugin + +import ( + "context" + + datasourceV1 "github.com/grafana/grafana-plugin-model/go/datasource" + rendererV1 "github.com/grafana/grafana-plugin-model/go/renderer" + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/plugins/backendplugin" + "github.com/hashicorp/go-plugin" +) + +type clientV1 struct { + logger log.Logger + datasourceV1.DatasourcePlugin + rendererV1.RendererPlugin +} + +func newClientV1(descriptor PluginDescriptor, logger log.Logger, rpcClient plugin.ClientProtocol) (pluginClient, error) { + logger.Warn("Plugin uses a deprecated version of Grafana's backend plugin system which will be removed in a future release. " + + "Consider upgrading to a newer plugin version or reach out to the plugin repository/developer and request an upgrade.") + + raw, err := rpcClient.Dispense(descriptor.pluginID) + if err != nil { + return nil, err + } + + c := clientV1{ + logger: logger, + } + if plugin, ok := raw.(datasourceV1.DatasourcePlugin); ok { + c.DatasourcePlugin = instrumentDatasourcePluginV1(plugin) + } + + if plugin, ok := raw.(rendererV1.RendererPlugin); ok { + c.RendererPlugin = plugin + } + + if descriptor.startFns.OnLegacyStart != nil { + legacyClient := &LegacyClient{ + DatasourcePlugin: c.DatasourcePlugin, + RendererPlugin: c.RendererPlugin, + } + if err := descriptor.startFns.OnLegacyStart(descriptor.pluginID, legacyClient, logger); err != nil { + return nil, err + } + } + + return &c, nil +} + +func (c *clientV1) CollectMetrics(ctx context.Context) (*backend.CollectMetricsResult, error) { + return nil, backendplugin.ErrMethodNotImplemented +} + +func (c *clientV1) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { + return nil, backendplugin.ErrMethodNotImplemented +} + +func (c *clientV1) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { + return backendplugin.ErrMethodNotImplemented +} + +type datasourceV1QueryFunc func(ctx context.Context, req *datasourceV1.DatasourceRequest) (*datasourceV1.DatasourceResponse, error) + +func (fn datasourceV1QueryFunc) Query(ctx context.Context, req *datasourceV1.DatasourceRequest) (*datasourceV1.DatasourceResponse, error) { + return fn(ctx, req) +} + +func instrumentDatasourcePluginV1(plugin datasourceV1.DatasourcePlugin) datasourceV1.DatasourcePlugin { + if plugin == nil { + return nil + } + + return datasourceV1QueryFunc(func(ctx context.Context, req *datasourceV1.DatasourceRequest) (*datasourceV1.DatasourceResponse, error) { + var resp *datasourceV1.DatasourceResponse + err := backendplugin.InstrumentQueryDataRequest(req.Datasource.Type, func() (innerErr error) { + resp, innerErr = plugin.Query(ctx, req) + return + }) + return resp, err + }) +} diff --git a/pkg/plugins/backendplugin/grpcplugin/client_v2.go b/pkg/plugins/backendplugin/grpcplugin/client_v2.go new file mode 100644 index 00000000000..3a2c75665a5 --- /dev/null +++ b/pkg/plugins/backendplugin/grpcplugin/client_v2.go @@ -0,0 +1,212 @@ +package grpcplugin + +import ( + "context" + "io" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/backend/grpcplugin" + "github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2" + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/plugins/backendplugin" + "github.com/grafana/grafana/pkg/plugins/backendplugin/pluginextensionv2" + "github.com/grafana/grafana/pkg/util/errutil" + "github.com/hashicorp/go-plugin" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type clientV2 struct { + grpcplugin.DiagnosticsClient + grpcplugin.ResourceClient + grpcplugin.DataClient + grpcplugin.TransformClient + pluginextensionv2.RendererPlugin +} + +func newClientV2(descriptor PluginDescriptor, logger log.Logger, rpcClient plugin.ClientProtocol) (pluginClient, error) { + rawDiagnostics, err := rpcClient.Dispense("diagnostics") + if err != nil { + return nil, err + } + + rawResource, err := rpcClient.Dispense("resource") + if err != nil { + return nil, err + } + + rawData, err := rpcClient.Dispense("data") + if err != nil { + return nil, err + } + + rawTransform, err := rpcClient.Dispense("transform") + if err != nil { + return nil, err + } + + rawRenderer, err := rpcClient.Dispense("renderer") + if err != nil { + return nil, err + } + + c := clientV2{} + if rawDiagnostics != nil { + if plugin, ok := rawDiagnostics.(grpcplugin.DiagnosticsClient); ok { + c.DiagnosticsClient = plugin + } + } + + if rawResource != nil { + if plugin, ok := rawResource.(grpcplugin.ResourceClient); ok { + c.ResourceClient = plugin + } + } + + if rawData != nil { + if plugin, ok := rawData.(grpcplugin.DataClient); ok { + c.DataClient = instrumentDataClient(plugin) + } + } + + if rawTransform != nil { + if plugin, ok := rawTransform.(grpcplugin.TransformClient); ok { + c.TransformClient = instrumentTransformPlugin(plugin) + } + } + + if rawRenderer != nil { + if plugin, ok := rawRenderer.(pluginextensionv2.RendererPlugin); ok { + c.RendererPlugin = plugin + } + } + + if descriptor.startFns.OnStart != nil { + client := &Client{ + DataPlugin: c.DataClient, + TransformPlugin: c.TransformClient, + RendererPlugin: c.RendererPlugin, + } + if err := descriptor.startFns.OnStart(descriptor.pluginID, client, logger); err != nil { + return nil, err + } + } + + return &c, nil +} + +func (c *clientV2) CollectMetrics(ctx context.Context) (*backend.CollectMetricsResult, error) { + if c.DiagnosticsClient == nil { + return &backend.CollectMetricsResult{}, nil + } + + protoResp, err := c.DiagnosticsClient.CollectMetrics(ctx, &pluginv2.CollectMetricsRequest{}) + if err != nil { + if status.Code(err) == codes.Unimplemented { + return &backend.CollectMetricsResult{}, nil + } + + return nil, err + } + + return backend.FromProto().CollectMetricsResponse(protoResp), nil +} + +func (c *clientV2) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { + if c.DiagnosticsClient == nil { + return nil, backendplugin.ErrMethodNotImplemented + } + + protoContext := backend.ToProto().PluginContext(req.PluginContext) + protoResp, err := c.DiagnosticsClient.CheckHealth(ctx, &pluginv2.CheckHealthRequest{PluginContext: protoContext}) + + if err != nil { + if status.Code(err) == codes.Unimplemented { + return &backend.CheckHealthResult{ + Status: backend.HealthStatusUnknown, + Message: "Health check not implemented", + }, nil + } + return nil, err + } + + return backend.FromProto().CheckHealthResponse(protoResp), nil +} + +func (c *clientV2) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { + if c.ResourceClient == nil { + return backendplugin.ErrMethodNotImplemented + } + + protoReq := backend.ToProto().CallResourceRequest(req) + protoStream, err := c.ResourceClient.CallResource(ctx, protoReq) + if err != nil { + if status.Code(err) == codes.Unimplemented { + return backendplugin.ErrMethodNotImplemented + } + + return errutil.Wrap("Failed to call resource", err) + } + + for { + protoResp, err := protoStream.Recv() + if err != nil { + if status.Code(err) == codes.Unimplemented { + return backendplugin.ErrMethodNotImplemented + } + + if err == io.EOF { + return nil + } + + return errutil.Wrap("Failed to receive call resource response", err) + } + + if err := sender.Send(backend.FromProto().CallResourceResponse(protoResp)); err != nil { + return err + } + } +} + +type dataClientQueryDataFunc func(ctx context.Context, req *pluginv2.QueryDataRequest, opts ...grpc.CallOption) (*pluginv2.QueryDataResponse, error) + +func (fn dataClientQueryDataFunc) QueryData(ctx context.Context, req *pluginv2.QueryDataRequest, opts ...grpc.CallOption) (*pluginv2.QueryDataResponse, error) { + return fn(ctx, req, opts...) +} + +func instrumentDataClient(plugin grpcplugin.DataClient) grpcplugin.DataClient { + if plugin == nil { + return nil + } + + return dataClientQueryDataFunc(func(ctx context.Context, req *pluginv2.QueryDataRequest, opts ...grpc.CallOption) (*pluginv2.QueryDataResponse, error) { + var resp *pluginv2.QueryDataResponse + err := backendplugin.InstrumentQueryDataRequest(req.PluginContext.PluginId, func() (innerErr error) { + resp, innerErr = plugin.QueryData(ctx, req) + return + }) + return resp, err + }) +} + +type transformPluginTransformDataFunc func(ctx context.Context, req *pluginv2.QueryDataRequest, callback grpcplugin.TransformDataCallBack) (*pluginv2.QueryDataResponse, error) + +func (fn transformPluginTransformDataFunc) TransformData(ctx context.Context, req *pluginv2.QueryDataRequest, callback grpcplugin.TransformDataCallBack) (*pluginv2.QueryDataResponse, error) { + return fn(ctx, req, callback) +} + +func instrumentTransformPlugin(plugin grpcplugin.TransformClient) grpcplugin.TransformClient { + if plugin == nil { + return nil + } + + return transformPluginTransformDataFunc(func(ctx context.Context, req *pluginv2.QueryDataRequest, callback grpcplugin.TransformDataCallBack) (*pluginv2.QueryDataResponse, error) { + var resp *pluginv2.QueryDataResponse + err := backendplugin.InstrumentTransformDataRequest(req.PluginContext.PluginId, func() (innerErr error) { + resp, innerErr = plugin.TransformData(ctx, req, callback) + return + }) + return resp, err + }) +} diff --git a/pkg/plugins/backendplugin/grpcplugin/grpc_plugin.go b/pkg/plugins/backendplugin/grpcplugin/grpc_plugin.go new file mode 100644 index 00000000000..54277e8adf4 --- /dev/null +++ b/pkg/plugins/backendplugin/grpcplugin/grpc_plugin.go @@ -0,0 +1,136 @@ +package grpcplugin + +import ( + "context" + "errors" + "sync" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/plugins/backendplugin" + "github.com/hashicorp/go-plugin" +) + +type pluginClient interface { + backend.CollectMetricsHandler + backend.CheckHealthHandler + backend.CallResourceHandler +} + +type grpcPlugin struct { + descriptor PluginDescriptor + clientFactory func() *plugin.Client + client *plugin.Client + pluginClient pluginClient + logger log.Logger + mutex sync.RWMutex +} + +// New allocates and returns a new gRPC (external) backendplugin.Plugin. +func New(descriptor PluginDescriptor) backendplugin.PluginFactoryFunc { + return backendplugin.PluginFactoryFunc(func(pluginID string, logger log.Logger, env []string) (backendplugin.Plugin, error) { + return &grpcPlugin{ + descriptor: descriptor, + logger: logger, + clientFactory: func() *plugin.Client { + return plugin.NewClient(newClientConfig(descriptor.executablePath, env, logger, descriptor.versionedPlugins)) + }, + }, nil + }) +} + +func (p *grpcPlugin) PluginID() string { + return p.descriptor.pluginID +} + +func (p *grpcPlugin) Logger() log.Logger { + return p.logger +} + +func (p *grpcPlugin) Start(ctx context.Context) error { + p.mutex.Lock() + defer p.mutex.Unlock() + + p.client = p.clientFactory() + rpcClient, err := p.client.Client() + if err != nil { + return err + } + + if p.client.NegotiatedVersion() > 1 { + p.pluginClient, err = newClientV2(p.descriptor, p.logger, rpcClient) + if err != nil { + return err + } + } else { + p.pluginClient, err = newClientV1(p.descriptor, p.logger, rpcClient) + if err != nil { + return err + } + } + + if p.pluginClient == nil { + return errors.New("no compatible plugin implementation found") + } + + return nil +} + +func (p *grpcPlugin) Stop(ctx context.Context) error { + p.mutex.Lock() + defer p.mutex.Unlock() + + if p.client != nil { + p.client.Kill() + } + return nil +} + +func (p *grpcPlugin) IsManaged() bool { + return p.descriptor.managed +} + +func (p *grpcPlugin) Exited() bool { + p.mutex.RLock() + defer p.mutex.RUnlock() + if p.client != nil { + return p.client.Exited() + } + return true +} + +func (p *grpcPlugin) CollectMetrics(ctx context.Context) (*backend.CollectMetricsResult, error) { + p.mutex.RLock() + if p.client == nil || p.client.Exited() || p.pluginClient == nil { + p.mutex.RUnlock() + return nil, backendplugin.ErrPluginUnavailable + } + pluginClient := p.pluginClient + p.mutex.RUnlock() + + return pluginClient.CollectMetrics(ctx) +} + +func (p *grpcPlugin) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { + p.mutex.RLock() + if p.client == nil || p.client.Exited() || p.pluginClient == nil { + p.mutex.RUnlock() + return nil, backendplugin.ErrPluginUnavailable + } + pluginClient := p.pluginClient + p.mutex.RUnlock() + + return pluginClient.CheckHealth(ctx, req) +} + +func (p *grpcPlugin) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { + p.mutex.RLock() + if p.client == nil || p.client.Exited() || p.pluginClient == nil { + p.mutex.RUnlock() + return backendplugin.ErrPluginUnavailable + } + pluginClient := p.pluginClient + p.mutex.RUnlock() + + return pluginClient.CallResource(ctx, req, sender) +} diff --git a/pkg/plugins/backendplugin/log_wrapper.go b/pkg/plugins/backendplugin/grpcplugin/log_wrapper.go similarity index 99% rename from pkg/plugins/backendplugin/log_wrapper.go rename to pkg/plugins/backendplugin/grpcplugin/log_wrapper.go index 28087286325..6b9fd6ff34d 100644 --- a/pkg/plugins/backendplugin/log_wrapper.go +++ b/pkg/plugins/backendplugin/grpcplugin/log_wrapper.go @@ -1,4 +1,4 @@ -package backendplugin +package grpcplugin import ( "io" diff --git a/pkg/plugins/backendplugin/log_wrapper_test.go b/pkg/plugins/backendplugin/grpcplugin/log_wrapper_test.go similarity index 97% rename from pkg/plugins/backendplugin/log_wrapper_test.go rename to pkg/plugins/backendplugin/grpcplugin/log_wrapper_test.go index 751da1d2e8a..e1ffc650faf 100644 --- a/pkg/plugins/backendplugin/log_wrapper_test.go +++ b/pkg/plugins/backendplugin/grpcplugin/log_wrapper_test.go @@ -1,4 +1,4 @@ -package backendplugin +package grpcplugin import ( "fmt" diff --git a/pkg/plugins/backendplugin/instrumentation.go b/pkg/plugins/backendplugin/instrumentation.go index ac5ce9ce4c5..7aba01f883c 100644 --- a/pkg/plugins/backendplugin/instrumentation.go +++ b/pkg/plugins/backendplugin/instrumentation.go @@ -1,8 +1,10 @@ package backendplugin import ( + "context" "time" + "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/prometheus/client_golang/prometheus" ) @@ -28,8 +30,8 @@ func init() { prometheus.MustRegister(pluginRequestCounter, pluginRequestDuration) } -// InstrumentPluginRequest instruments success rate and latency of `fn` -func InstrumentPluginRequest(pluginID string, endpoint string, fn func() error) error { +// instrumentPluginRequest instruments success rate and latency of `fn` +func instrumentPluginRequest(pluginID string, endpoint string, fn func() error) error { status := "ok" start := time.Now() @@ -45,3 +47,41 @@ func InstrumentPluginRequest(pluginID string, endpoint string, fn func() error) return err } + +func instrumentCollectMetrics(pluginID string, fn func() error) error { + return instrumentPluginRequest(pluginID, "collectMetrics", fn) +} + +func instrumentCheckHealthRequest(pluginID string, fn func() error) error { + return instrumentPluginRequest(pluginID, "checkHealth", fn) +} + +func instrumentCallResourceRequest(pluginID string, fn func() error) error { + return instrumentPluginRequest(pluginID, "callResource", fn) +} + +// InstrumentQueryDataRequest instruments success rate and latency of query data request. +func InstrumentQueryDataRequest(pluginID string, fn func() error) error { + return instrumentPluginRequest(pluginID, "queryData", fn) +} + +// InstrumentTransformDataRequest instruments success rate and latency of transform data request. +func InstrumentTransformDataRequest(pluginID string, fn func() error) error { + return instrumentPluginRequest(pluginID, "transformData", fn) +} + +// InstrumentQueryDataHandler wraps a backend.QueryDataHandler with instrumentation of success rate and latency. +func InstrumentQueryDataHandler(handler backend.QueryDataHandler) backend.QueryDataHandler { + if handler == nil { + return nil + } + + return backend.QueryDataHandlerFunc(func(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { + var resp *backend.QueryDataResponse + err := InstrumentQueryDataRequest(req.PluginContext.PluginID, func() (innerErr error) { + resp, innerErr = handler.QueryData(ctx, req) + return + }) + return resp, err + }) +} diff --git a/pkg/plugins/backendplugin/manager.go b/pkg/plugins/backendplugin/manager.go index 677d95ced62..b58ae5bf2b3 100644 --- a/pkg/plugins/backendplugin/manager.go +++ b/pkg/plugins/backendplugin/manager.go @@ -6,28 +6,31 @@ import ( "errors" "fmt" "io" + "io/ioutil" + "net/http" + "net/url" "sync" "time" "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/registry" "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/util/errutil" "github.com/grafana/grafana/pkg/util/proxyutil" - - "github.com/grafana/grafana/pkg/infra/log" - "github.com/grafana/grafana/pkg/registry" - plugin "github.com/hashicorp/go-plugin" "golang.org/x/xerrors" ) var ( // ErrPluginNotRegistered error returned when plugin not registered. ErrPluginNotRegistered = errors.New("Plugin not registered") - // ErrDiagnosticsNotSupported error returned when plugin doesn't support diagnostics. - ErrDiagnosticsNotSupported = errors.New("Plugin diagnostics not supported") // ErrHealthCheckFailed error returned when health check failed. ErrHealthCheckFailed = errors.New("Health check failed") + // ErrPluginUnavailable error returned when plugin is unavailable. + ErrPluginUnavailable = errors.New("Plugin unavailable") + // ErrMethodNotImplemented error returned when plugin method not implemented. + ErrMethodNotImplemented = errors.New("method not implemented") ) func init() { @@ -41,13 +44,13 @@ func init() { // Manager manages backend plugins. type Manager interface { // Register registers a backend plugin - Register(descriptor PluginDescriptor) error + Register(pluginID string, factory PluginFactoryFunc) error // StartPlugin starts a non-managed backend plugin StartPlugin(ctx context.Context, pluginID string) error // CollectMetrics collects metrics from a registered backend plugin. - CollectMetrics(ctx context.Context, pluginID string) (*CollectMetricsResult, error) + CollectMetrics(ctx context.Context, pluginID string) (*backend.CollectMetricsResult, error) // CheckHealth checks the health of a registered backend plugin. - CheckHealth(ctx context.Context, pCtx backend.PluginContext) (*CheckHealthResult, error) + CheckHealth(ctx context.Context, pCtx backend.PluginContext) (*backend.CheckHealthResult, error) // CallResource calls a plugin resource. CallResource(pluginConfig backend.PluginContext, ctx *models.ReqContext, path string) } @@ -56,13 +59,13 @@ type manager struct { Cfg *setting.Cfg `inject:""` License models.Licensing `inject:""` pluginsMu sync.RWMutex - plugins map[string]*BackendPlugin + plugins map[string]Plugin logger log.Logger pluginSettings map[string]pluginSettings } func (m *manager) Init() error { - m.plugins = make(map[string]*BackendPlugin) + m.plugins = make(map[string]Plugin) m.logger = log.New("plugins.backend") m.pluginSettings = extractPluginSettings(m.Cfg) @@ -72,27 +75,27 @@ func (m *manager) Init() error { func (m *manager) Run(ctx context.Context) error { m.start(ctx) <-ctx.Done() - m.stop() + m.stop(ctx) return ctx.Err() } // Register registers a backend plugin -func (m *manager) Register(descriptor PluginDescriptor) error { - m.logger.Debug("Registering backend plugin", "pluginId", descriptor.pluginID, "executablePath", descriptor.executablePath) +func (m *manager) Register(pluginID string, factory PluginFactoryFunc) error { + m.logger.Debug("Registering backend plugin", "pluginId", pluginID) m.pluginsMu.Lock() defer m.pluginsMu.Unlock() - if _, exists := m.plugins[descriptor.pluginID]; exists { + if _, exists := m.plugins[pluginID]; exists { return errors.New("Backend plugin already registered") } pluginSettings := pluginSettings{} - if ps, exists := m.pluginSettings[descriptor.pluginID]; exists { + if ps, exists := m.pluginSettings[pluginID]; exists { pluginSettings = ps } hostEnv := []string{ - fmt.Sprintf("GF_VERSION=%s", setting.BuildVersion), + fmt.Sprintf("GF_VERSION=%s", m.Cfg.BuildVersion), fmt.Sprintf("GF_EDITION=%s", m.License.Edition()), } @@ -102,20 +105,14 @@ func (m *manager) Register(descriptor PluginDescriptor) error { env := pluginSettings.ToEnv("GF_PLUGIN", hostEnv) - pluginLogger := m.logger.New("pluginId", descriptor.pluginID) - plugin := &BackendPlugin{ - id: descriptor.pluginID, - executablePath: descriptor.executablePath, - managed: descriptor.managed, - clientFactory: func() *plugin.Client { - return plugin.NewClient(newClientConfig(descriptor.executablePath, env, pluginLogger, descriptor.versionedPlugins)) - }, - startFns: descriptor.startFns, - logger: pluginLogger, + pluginLogger := m.logger.New("pluginId", pluginID) + plugin, err := factory(pluginID, pluginLogger, env) + if err != nil { + return err } - m.plugins[descriptor.pluginID] = plugin - m.logger.Debug("Backend plugin registered", "pluginId", descriptor.pluginID, "executablePath", descriptor.executablePath) + m.plugins[pluginID] = plugin + m.logger.Debug("Backend plugin registered", "pluginId", pluginID) return nil } @@ -124,12 +121,12 @@ func (m *manager) start(ctx context.Context) { m.pluginsMu.RLock() defer m.pluginsMu.RUnlock() for _, p := range m.plugins { - if !p.managed { + if !p.IsManaged() { continue } if err := startPluginAndRestartKilledProcesses(ctx, p); err != nil { - p.logger.Error("Failed to start plugin", "error", err) + p.Logger().Error("Failed to start plugin", "error", err) continue } } @@ -141,10 +138,10 @@ func (m *manager) StartPlugin(ctx context.Context, pluginID string) error { p, registered := m.plugins[pluginID] m.pluginsMu.RUnlock() if !registered { - return errors.New("Backend plugin not registered") + return ErrPluginNotRegistered } - if p.managed { + if p.IsManaged() { return errors.New("Backend plugin is managed and cannot be manually started") } @@ -152,22 +149,26 @@ func (m *manager) StartPlugin(ctx context.Context, pluginID string) error { } // stop stops all managed backend plugins -func (m *manager) stop() { +func (m *manager) stop(ctx context.Context) { m.pluginsMu.RLock() defer m.pluginsMu.RUnlock() + var wg sync.WaitGroup for _, p := range m.plugins { - go func(p *BackendPlugin) { - p.logger.Debug("Stopping plugin") - if err := p.stop(); err != nil { - p.logger.Error("Failed to stop plugin", "error", err) + wg.Add(1) + go func(p Plugin, ctx context.Context) { + defer wg.Done() + p.Logger().Debug("Stopping plugin") + if err := p.Stop(ctx); err != nil { + p.Logger().Error("Failed to stop plugin", "error", err) } - p.logger.Debug("Plugin stopped") - }(p) + p.Logger().Debug("Plugin stopped") + }(p, ctx) } + wg.Wait() } // CollectMetrics collects metrics from a registered backend plugin. -func (m *manager) CollectMetrics(ctx context.Context, pluginID string) (*CollectMetricsResult, error) { +func (m *manager) CollectMetrics(ctx context.Context, pluginID string) (*backend.CollectMetricsResult, error) { m.pluginsMu.RLock() p, registered := m.plugins[pluginID] m.pluginsMu.RUnlock() @@ -176,98 +177,139 @@ func (m *manager) CollectMetrics(ctx context.Context, pluginID string) (*Collect return nil, ErrPluginNotRegistered } - if !p.supportsDiagnostics() { - return nil, ErrDiagnosticsNotSupported - } - - res, err := p.CollectMetrics(ctx) + var resp *backend.CollectMetricsResult + err := instrumentCollectMetrics(p.PluginID(), func() (innerErr error) { + resp, innerErr = p.CollectMetrics(ctx) + return + }) if err != nil { return nil, err } - return collectMetricsResultFromProto(res), nil + return resp, nil } // CheckHealth checks the health of a registered backend plugin. -func (m *manager) CheckHealth(ctx context.Context, pluginConfig backend.PluginContext) (*CheckHealthResult, error) { +func (m *manager) CheckHealth(ctx context.Context, pluginContext backend.PluginContext) (*backend.CheckHealthResult, error) { m.pluginsMu.RLock() - p, registered := m.plugins[pluginConfig.PluginID] + p, registered := m.plugins[pluginContext.PluginID] m.pluginsMu.RUnlock() if !registered { return nil, ErrPluginNotRegistered } - if !p.supportsDiagnostics() { - return nil, ErrDiagnosticsNotSupported - } + var resp *backend.CheckHealthResult + err := instrumentCheckHealthRequest(p.PluginID(), func() (innerErr error) { + resp, innerErr = p.CheckHealth(ctx, &backend.CheckHealthRequest{PluginContext: pluginContext}) + return + }) - res, err := p.checkHealth(ctx, pluginConfig) if err != nil { - p.logger.Error("Failed to check plugin health", "error", err) - return nil, ErrHealthCheckFailed + if errors.Is(err, ErrMethodNotImplemented) { + return nil, err + } + + return nil, errutil.Wrap("Failed to check plugin health", ErrHealthCheckFailed) } - return checkHealthResultFromProto(res), nil + return resp, nil } type keepCookiesJSONModel struct { KeepCookies []string `json:"keepCookies"` } -// CallResource calls a plugin resource. -func (m *manager) CallResource(pCtx backend.PluginContext, reqCtx *models.ReqContext, path string) { +func (m *manager) callResourceInternal(w http.ResponseWriter, req *http.Request, pCtx backend.PluginContext) error { m.pluginsMu.RLock() p, registered := m.plugins[pCtx.PluginID] m.pluginsMu.RUnlock() if !registered { - reqCtx.JsonApiErr(404, "Plugin not registered", nil) - return + return ErrPluginNotRegistered } - clonedReq := reqCtx.Req.Clone(reqCtx.Req.Context()) keepCookieModel := keepCookiesJSONModel{} if dis := pCtx.DataSourceInstanceSettings; dis != nil { err := json.Unmarshal(dis.JSONData, &keepCookieModel) if err != nil { - p.logger.Error("Failed to to unpack JSONData in datasource instance settings", "error", err) + p.Logger().Error("Failed to to unpack JSONData in datasource instance settings", "error", err) } } - proxyutil.ClearCookieHeader(clonedReq, keepCookieModel.KeepCookies) - proxyutil.PrepareProxyRequest(clonedReq) + proxyutil.ClearCookieHeader(req, keepCookieModel.KeepCookies) + proxyutil.PrepareProxyRequest(req) - body, err := reqCtx.Req.Body().Bytes() + body, err := ioutil.ReadAll(req.Body) if err != nil { - reqCtx.JsonApiErr(500, "Failed to read request body", err) - return + return errors.New("Failed to read request body") } - req := &backend.CallResourceRequest{ + crReq := &backend.CallResourceRequest{ PluginContext: pCtx, - Path: path, - Method: clonedReq.Method, - URL: clonedReq.URL.String(), - Headers: clonedReq.Header, + Path: req.URL.Path, + Method: req.Method, + URL: req.URL.String(), + Headers: req.Header, Body: body, } - err = InstrumentPluginRequest(p.id, "resource", func() error { - stream, err := p.callResource(clonedReq.Context(), req) - if err != nil { - return errutil.Wrap("Failed to call resource", err) + return instrumentCallResourceRequest(p.PluginID(), func() error { + childCtx, cancel := context.WithCancel(req.Context()) + defer cancel() + stream := newCallResourceResponseStream(childCtx) + var wg sync.WaitGroup + wg.Add(1) + var flushStreamErr error + go func() { + flushStreamErr = flushStream(p, stream, w) + wg.Done() + }() + + innerErr := p.CallResource(req.Context(), crReq, stream) + stream.Close() + if innerErr != nil { + return innerErr } - - return flushStream(p, stream, reqCtx) + wg.Wait() + return flushStreamErr }) +} +// CallResource calls a plugin resource. +func (m *manager) CallResource(pCtx backend.PluginContext, reqCtx *models.ReqContext, path string) { + clonedReq := reqCtx.Req.Clone(reqCtx.Req.Context()) + rawURL := path + if clonedReq.URL.RawQuery != "" { + rawURL += "?" + clonedReq.URL.RawQuery + } + urlPath, err := url.Parse(rawURL) if err != nil { - reqCtx.JsonApiErr(500, "Failed to ", err) + handleCallResourceError(err, reqCtx) + return + } + clonedReq.URL = urlPath + err = m.callResourceInternal(reqCtx.Resp, clonedReq, pCtx) + if err != nil { + handleCallResourceError(err, reqCtx) } } -func flushStream(plugin *BackendPlugin, stream callResourceResultStream, reqCtx *models.ReqContext) error { +func handleCallResourceError(err error, reqCtx *models.ReqContext) { + if errors.Is(err, ErrPluginUnavailable) { + reqCtx.JsonApiErr(503, "Plugin unavailable", err) + return + } + + if errors.Is(err, ErrMethodNotImplemented) { + reqCtx.JsonApiErr(404, "Not found", err) + return + } + + reqCtx.JsonApiErr(500, "Failed to call resource", err) +} + +func flushStream(plugin Plugin, stream CallResourceClientResponseStream, w http.ResponseWriter) error { processedStreams := 0 for { @@ -283,12 +325,12 @@ func flushStream(plugin *BackendPlugin, stream callResourceResultStream, reqCtx return errutil.Wrap("Failed to receive response from resource call", err) } - plugin.logger.Error("Failed to receive response from resource call", "error", err) - return nil + plugin.Logger().Error("Failed to receive response from resource call", "error", err) + return stream.Close() } // Expected that headers and status are only part of first stream - if processedStreams == 0 { + if processedStreams == 0 && resp.Headers != nil { // Make sure a content type always is returned in response if _, exists := resp.Headers["Content-Type"]; !exists { resp.Headers["Content-Type"] = []string{"application/json"} @@ -302,37 +344,39 @@ func flushStream(plugin *BackendPlugin, stream callResourceResultStream, reqCtx } for _, v := range values { - reqCtx.Resp.Header().Add(k, v) + w.Header().Add(k, v) } } - reqCtx.WriteHeader(resp.Status) + w.WriteHeader(resp.Status) } - if _, err := reqCtx.Write(resp.Body); err != nil { - plugin.logger.Error("Failed to write resource response", "error", err) + if _, err := w.Write(resp.Body); err != nil { + plugin.Logger().Error("Failed to write resource response", "error", err) } - reqCtx.Resp.Flush() + if flusher, ok := w.(http.Flusher); ok { + flusher.Flush() + } processedStreams++ } } -func startPluginAndRestartKilledProcesses(ctx context.Context, p *BackendPlugin) error { - if err := p.start(ctx); err != nil { +func startPluginAndRestartKilledProcesses(ctx context.Context, p Plugin) error { + if err := p.Start(ctx); err != nil { return err } - go func(ctx context.Context, p *BackendPlugin) { + go func(ctx context.Context, p Plugin) { if err := restartKilledProcess(ctx, p); err != nil { - p.logger.Error("Attempt to restart killed plugin process failed", "error", err) + p.Logger().Error("Attempt to restart killed plugin process failed", "error", err) } }(ctx, p) return nil } -func restartKilledProcess(ctx context.Context, p *BackendPlugin) error { +func restartKilledProcess(ctx context.Context, p Plugin) error { ticker := time.NewTicker(time.Second * 1) for { @@ -343,16 +387,16 @@ func restartKilledProcess(ctx context.Context, p *BackendPlugin) error { } return nil case <-ticker.C: - if !p.client.Exited() { + if !p.Exited() { continue } - p.logger.Debug("Restarting plugin") - if err := p.start(ctx); err != nil { - p.logger.Error("Failed to restart plugin", "error", err) + p.Logger().Debug("Restarting plugin") + if err := p.Start(ctx); err != nil { + p.Logger().Error("Failed to restart plugin", "error", err) continue } - p.logger.Debug("Plugin restarted") + p.Logger().Debug("Plugin restarted") } } } diff --git a/pkg/plugins/backendplugin/manager_test.go b/pkg/plugins/backendplugin/manager_test.go new file mode 100644 index 00000000000..ec62a1778ff --- /dev/null +++ b/pkg/plugins/backendplugin/manager_test.go @@ -0,0 +1,410 @@ +package backendplugin + +import ( + "bytes" + "context" + "net/http" + "net/http/httptest" + "sync" + "testing" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/setting" + "github.com/stretchr/testify/require" +) + +const testPluginID = "test-plugin" + +func TestManager(t *testing.T) { + newManagerScenario(t, false, func(t *testing.T, ctx *managerScenarioCtx) { + t.Run("Unregistered plugin scenario", func(t *testing.T) { + err := ctx.manager.StartPlugin(context.Background(), testPluginID) + require.Equal(t, ErrPluginNotRegistered, err) + + _, err = ctx.manager.CollectMetrics(context.Background(), testPluginID) + require.Equal(t, ErrPluginNotRegistered, err) + + _, err = ctx.manager.CheckHealth(context.Background(), backend.PluginContext{PluginID: testPluginID}) + require.Equal(t, ErrPluginNotRegistered, err) + + req, err := http.NewRequest(http.MethodGet, "/test", nil) + require.NoError(t, err) + w := httptest.NewRecorder() + err = ctx.manager.callResourceInternal(w, req, backend.PluginContext{PluginID: testPluginID}) + require.Equal(t, ErrPluginNotRegistered, err) + }) + }) + + newManagerScenario(t, true, func(t *testing.T, ctx *managerScenarioCtx) { + t.Run("Managed plugin scenario", func(t *testing.T) { + ctx.license.edition = "Open Source" + ctx.license.hasLicense = false + ctx.cfg.BuildVersion = "7.0.0" + + t.Run("Should be able to register plugin", func(t *testing.T) { + err := ctx.manager.Register(testPluginID, ctx.factory) + require.NoError(t, err) + require.NotNil(t, ctx.plugin) + require.Equal(t, testPluginID, ctx.plugin.pluginID) + require.NotNil(t, ctx.plugin.logger) + + t.Run("Should not be able to register an already registered plugin", func(t *testing.T) { + err := ctx.manager.Register(testPluginID, ctx.factory) + require.Error(t, err) + }) + + t.Run("Should provide expected host environment variables", func(t *testing.T) { + require.Len(t, ctx.env, 2) + require.EqualValues(t, []string{"GF_VERSION=7.0.0", "GF_EDITION=Open Source"}, ctx.env) + }) + + t.Run("When manager runs should start and stop plugin", func(t *testing.T) { + pCtx := context.Background() + cCtx, cancel := context.WithCancel(pCtx) + var wg sync.WaitGroup + wg.Add(1) + var runErr error + go func() { + runErr = ctx.manager.Run(cCtx) + wg.Done() + }() + time.Sleep(time.Millisecond) + cancel() + wg.Wait() + require.Equal(t, context.Canceled, runErr) + require.Equal(t, 1, ctx.plugin.startCount) + require.Equal(t, 1, ctx.plugin.stopCount) + }) + + t.Run("When manager runs should restart plugin process when killed", func(t *testing.T) { + ctx.plugin.stopCount = 0 + ctx.plugin.startCount = 0 + pCtx := context.Background() + cCtx, cancel := context.WithCancel(pCtx) + var wgRun sync.WaitGroup + wgRun.Add(1) + var runErr error + go func() { + runErr = ctx.manager.Run(cCtx) + wgRun.Done() + }() + + time.Sleep(time.Millisecond) + + var wgKill sync.WaitGroup + wgKill.Add(1) + go func() { + ctx.plugin.kill() + for { + if !ctx.plugin.Exited() { + break + } + } + cancel() + wgKill.Done() + }() + wgKill.Wait() + wgRun.Wait() + require.Equal(t, context.Canceled, runErr) + require.Equal(t, 1, ctx.plugin.stopCount) + require.Equal(t, 2, ctx.plugin.startCount) + }) + + t.Run("Shouldn't be able to start managed plugin", func(t *testing.T) { + err := ctx.manager.StartPlugin(context.Background(), testPluginID) + require.NotNil(t, err) + }) + + t.Run("Unimplemented handlers", func(t *testing.T) { + t.Run("Collect metrics should return method not implemented error", func(t *testing.T) { + _, err = ctx.manager.CollectMetrics(context.Background(), testPluginID) + require.Equal(t, ErrMethodNotImplemented, err) + }) + + t.Run("Check health should return method not implemented error", func(t *testing.T) { + _, err = ctx.manager.CheckHealth(context.Background(), backend.PluginContext{PluginID: testPluginID}) + require.Equal(t, ErrMethodNotImplemented, err) + }) + + t.Run("Call resource should return method not implemented error", func(t *testing.T) { + req, err := http.NewRequest(http.MethodGet, "/test", bytes.NewReader([]byte{})) + require.NoError(t, err) + w := httptest.NewRecorder() + err = ctx.manager.callResourceInternal(w, req, backend.PluginContext{PluginID: testPluginID}) + require.Equal(t, ErrMethodNotImplemented, err) + }) + }) + + t.Run("Implemented handlers", func(t *testing.T) { + t.Run("Collect metrics should return expected result", func(t *testing.T) { + ctx.plugin.CollectMetricsHandlerFunc = backend.CollectMetricsHandlerFunc(func(ctx context.Context) (*backend.CollectMetricsResult, error) { + return &backend.CollectMetricsResult{ + PrometheusMetrics: []byte("hello"), + }, nil + }) + + res, err := ctx.manager.CollectMetrics(context.Background(), testPluginID) + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, "hello", string(res.PrometheusMetrics)) + }) + + t.Run("Check health should return expected result", func(t *testing.T) { + json := []byte(`{ + "key": "value" + }`) + ctx.plugin.CheckHealthHandlerFunc = backend.CheckHealthHandlerFunc(func(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { + return &backend.CheckHealthResult{ + Status: backend.HealthStatusOk, + Message: "All good", + JSONDetails: json, + }, nil + }) + + res, err := ctx.manager.CheckHealth(context.Background(), backend.PluginContext{PluginID: testPluginID}) + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, backend.HealthStatusOk, res.Status) + require.Equal(t, "All good", res.Message) + require.Equal(t, json, res.JSONDetails) + }) + + t.Run("Call resource should return expected response", func(t *testing.T) { + ctx.plugin.CallResourceHandlerFunc = backend.CallResourceHandlerFunc(func(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { + return sender.Send(&backend.CallResourceResponse{ + Status: http.StatusOK, + }) + }) + + req, err := http.NewRequest(http.MethodGet, "/test", bytes.NewReader([]byte{})) + require.NoError(t, err) + w := httptest.NewRecorder() + err = ctx.manager.callResourceInternal(w, req, backend.PluginContext{PluginID: testPluginID}) + require.NoError(t, err) + require.Equal(t, http.StatusOK, w.Code) + }) + }) + }) + }) + }) + + newManagerScenario(t, false, func(t *testing.T, ctx *managerScenarioCtx) { + t.Run("Unmanaged plugin scenario", func(t *testing.T) { + ctx.license.edition = "Open Source" + ctx.license.hasLicense = false + ctx.cfg.BuildVersion = "7.0.0" + + t.Run("Should be able to register plugin", func(t *testing.T) { + err := ctx.manager.Register(testPluginID, ctx.factory) + require.NoError(t, err) + require.False(t, ctx.plugin.managed) + + t.Run("When manager runs should not start plugin", func(t *testing.T) { + pCtx := context.Background() + cCtx, cancel := context.WithCancel(pCtx) + var wg sync.WaitGroup + wg.Add(1) + var runErr error + go func() { + runErr = ctx.manager.Run(cCtx) + wg.Done() + }() + go func() { + cancel() + }() + wg.Wait() + require.Equal(t, context.Canceled, runErr) + require.Equal(t, 0, ctx.plugin.startCount) + require.Equal(t, 1, ctx.plugin.stopCount) + }) + + t.Run("Should be able to start unmanaged plugin and be restarted when process is killed", func(t *testing.T) { + pCtx := context.Background() + cCtx, cancel := context.WithCancel(pCtx) + defer cancel() + err := ctx.manager.StartPlugin(cCtx, testPluginID) + require.Nil(t, err) + require.Equal(t, 1, ctx.plugin.startCount) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + ctx.plugin.kill() + for { + if !ctx.plugin.Exited() { + break + } + } + wg.Done() + }() + wg.Wait() + require.Equal(t, 2, ctx.plugin.startCount) + }) + }) + }) + }) + + newManagerScenario(t, true, func(t *testing.T, ctx *managerScenarioCtx) { + t.Run("Plugin registration scenario when Grafana is licensed", func(t *testing.T) { + ctx.license.edition = "Enterprise" + ctx.license.hasLicense = true + ctx.cfg.BuildVersion = "7.0.0" + ctx.cfg.EnterpriseLicensePath = "/license.txt" + + err := ctx.manager.Register(testPluginID, ctx.factory) + require.NoError(t, err) + + t.Run("Should provide expected host environment variables", func(t *testing.T) { + require.Len(t, ctx.env, 3) + require.EqualValues(t, []string{"GF_VERSION=7.0.0", "GF_EDITION=Enterprise", "GF_ENTERPRISE_LICENSE_PATH=/license.txt"}, ctx.env) + }) + }) + }) +} + +type managerScenarioCtx struct { + cfg *setting.Cfg + license *testLicensingService + manager *manager + factory PluginFactoryFunc + plugin *testPlugin + env []string +} + +func newManagerScenario(t *testing.T, managed bool, fn func(t *testing.T, ctx *managerScenarioCtx)) { + t.Helper() + cfg := setting.NewCfg() + license := &testLicensingService{} + ctx := &managerScenarioCtx{ + cfg: cfg, + license: license, + manager: &manager{ + Cfg: cfg, + License: license, + }, + } + + err := ctx.manager.Init() + require.NoError(t, err) + + ctx.factory = PluginFactoryFunc(func(pluginID string, logger log.Logger, env []string) (Plugin, error) { + ctx.plugin = &testPlugin{ + pluginID: pluginID, + logger: logger, + managed: managed, + } + ctx.env = env + + return ctx.plugin, nil + }) + + fn(t, ctx) +} + +type testPlugin struct { + pluginID string + logger log.Logger + startCount int + stopCount int + managed bool + exited bool + backend.CollectMetricsHandlerFunc + backend.CheckHealthHandlerFunc + backend.CallResourceHandlerFunc + mutex sync.RWMutex +} + +func (tp *testPlugin) PluginID() string { + return tp.pluginID +} + +func (tp *testPlugin) Logger() log.Logger { + return tp.logger +} + +func (tp *testPlugin) Start(ctx context.Context) error { + tp.mutex.Lock() + defer tp.mutex.Unlock() + tp.exited = false + tp.startCount++ + return nil +} + +func (tp *testPlugin) Stop(ctx context.Context) error { + tp.mutex.Lock() + defer tp.mutex.Unlock() + tp.stopCount++ + return nil +} + +func (tp *testPlugin) IsManaged() bool { + return tp.managed +} + +func (tp *testPlugin) Exited() bool { + tp.mutex.RLock() + defer tp.mutex.RUnlock() + return tp.exited +} + +func (tp *testPlugin) kill() { + tp.mutex.Lock() + defer tp.mutex.Unlock() + tp.exited = true +} + +func (tp *testPlugin) CollectMetrics(ctx context.Context) (*backend.CollectMetricsResult, error) { + if tp.CollectMetricsHandlerFunc != nil { + return tp.CollectMetricsHandlerFunc(ctx) + } + + return nil, ErrMethodNotImplemented +} + +func (tp *testPlugin) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { + if tp.CheckHealthHandlerFunc != nil { + return tp.CheckHealthHandlerFunc(ctx, req) + } + + return nil, ErrMethodNotImplemented +} + +func (tp *testPlugin) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { + if tp.CallResourceHandlerFunc != nil { + return tp.CallResourceHandlerFunc(ctx, req, sender) + } + + return ErrMethodNotImplemented +} + +type testLicensingService struct { + edition string + hasLicense bool +} + +func (t *testLicensingService) HasLicense() bool { + return t.hasLicense +} + +func (t *testLicensingService) Expiry() int64 { + return 0 +} + +func (t *testLicensingService) Edition() string { + return t.edition +} + +func (t *testLicensingService) StateInfo() string { + return "" +} + +func (t *testLicensingService) LicenseURL(user *models.SignedInUser) string { + return "" +} + +func (t *testLicensingService) HasValidLicense() bool { + return false +} diff --git a/pkg/plugins/backendplugin/plugin.go b/pkg/plugins/backendplugin/plugin.go new file mode 100644 index 00000000000..db2ae1c8c58 --- /dev/null +++ b/pkg/plugins/backendplugin/plugin.go @@ -0,0 +1,30 @@ +package backendplugin + +import ( + "context" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana/pkg/infra/log" +) + +// Plugin backend plugin interface. +type Plugin interface { + PluginID() string + Logger() log.Logger + Start(ctx context.Context) error + Stop(ctx context.Context) error + IsManaged() bool + Exited() bool + backend.CollectMetricsHandler + backend.CheckHealthHandler + backend.CallResourceHandler +} + +// PluginFactoryFunc factory for creating a Plugin. +type PluginFactoryFunc func(pluginID string, logger log.Logger, env []string) (Plugin, error) + +// CallResourceClientResponseStream is used for receiving resource call responses. +type CallResourceClientResponseStream interface { + Recv() (*backend.CallResourceResponse, error) + Close() error +} diff --git a/pkg/plugins/backendplugin/resource_response_stream.go b/pkg/plugins/backendplugin/resource_response_stream.go new file mode 100644 index 00000000000..3b6617c9d1a --- /dev/null +++ b/pkg/plugins/backendplugin/resource_response_stream.go @@ -0,0 +1,57 @@ +package backendplugin + +import ( + "context" + "errors" + "io" + + "github.com/grafana/grafana-plugin-sdk-go/backend" +) + +func newCallResourceResponseStream(ctx context.Context) *callResourceResponseStream { + return &callResourceResponseStream{ + ctx: ctx, + stream: make(chan *backend.CallResourceResponse), + } +} + +type callResourceResponseStream struct { + ctx context.Context + stream chan *backend.CallResourceResponse + closed bool +} + +func (s *callResourceResponseStream) Send(res *backend.CallResourceResponse) error { + if s.closed { + return errors.New("Cannot send to a closed stream") + } + + select { + case <-s.ctx.Done(): + return errors.New("cancelled") + case s.stream <- res: + return nil + } +} + +func (s *callResourceResponseStream) Recv() (*backend.CallResourceResponse, error) { + select { + case <-s.ctx.Done(): + return nil, s.ctx.Err() + case res, ok := <-s.stream: + if !ok { + return nil, io.EOF + } + return res, nil + } +} + +func (s *callResourceResponseStream) Close() error { + if s.closed { + return errors.New("Cannot close a closed stream") + } + + close(s.stream) + s.closed = true + return nil +} diff --git a/pkg/plugins/datasource/wrapper/datasource_plugin_wrapper_v2.go b/pkg/plugins/datasource/wrapper/datasource_plugin_wrapper_v2.go index 761397bad56..5f41bffd391 100644 --- a/pkg/plugins/datasource/wrapper/datasource_plugin_wrapper_v2.go +++ b/pkg/plugins/datasource/wrapper/datasource_plugin_wrapper_v2.go @@ -4,22 +4,22 @@ import ( "context" "fmt" - "github.com/grafana/grafana/pkg/components/simplejson" - "github.com/grafana/grafana/pkg/plugins/backendplugin" + "github.com/grafana/grafana-plugin-sdk-go/backend/grpcplugin" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2" + "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/tsdb" ) -func NewDatasourcePluginWrapperV2(log log.Logger, pluginId, pluginType string, plugin backendplugin.DataPlugin) *DatasourcePluginWrapperV2 { - return &DatasourcePluginWrapperV2{DataPlugin: plugin, logger: log, pluginId: pluginId, pluginType: pluginType} +func NewDatasourcePluginWrapperV2(log log.Logger, pluginId, pluginType string, client grpcplugin.DataClient) *DatasourcePluginWrapperV2 { + return &DatasourcePluginWrapperV2{DataClient: client, logger: log, pluginId: pluginId, pluginType: pluginType} } type DatasourcePluginWrapperV2 struct { - backendplugin.DataPlugin + grpcplugin.DataClient logger log.Logger pluginId string pluginType string @@ -79,14 +79,7 @@ func (tw *DatasourcePluginWrapperV2) Query(ctx context.Context, ds *models.DataS }) } - var pbRes *pluginv2.QueryDataResponse - err = backendplugin.InstrumentPluginRequest(ds.Type, "dataquery", func() error { - var err error - pbRes, err = tw.DataPlugin.QueryData(ctx, pbQuery) - - return err - }) - + pbRes, err := tw.DataClient.QueryData(ctx, pbQuery) if err != nil { return nil, err } diff --git a/pkg/plugins/datasource_plugin.go b/pkg/plugins/datasource_plugin.go index 33efa1aa521..39a30c9cbf6 100644 --- a/pkg/plugins/datasource_plugin.go +++ b/pkg/plugins/datasource_plugin.go @@ -4,14 +4,13 @@ import ( "encoding/json" "path" - "github.com/grafana/grafana/pkg/plugins/backendplugin" - - "github.com/grafana/grafana/pkg/util/errutil" - "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/plugins/backendplugin" + "github.com/grafana/grafana/pkg/plugins/backendplugin/grpcplugin" "github.com/grafana/grafana/pkg/plugins/datasource/wrapper" "github.com/grafana/grafana/pkg/tsdb" + "github.com/grafana/grafana/pkg/util/errutil" ) // DataSourcePlugin contains all metadata about a datasource plugin @@ -47,11 +46,11 @@ func (p *DataSourcePlugin) Load(decoder *json.Decoder, pluginDir string, backend if p.Backend { cmd := ComposePluginStartCommand(p.Executable) fullpath := path.Join(p.PluginDir, cmd) - descriptor := backendplugin.NewBackendPluginDescriptor(p.Id, fullpath, backendplugin.PluginStartFuncs{ + factory := grpcplugin.NewBackendPlugin(p.Id, fullpath, grpcplugin.PluginStartFuncs{ OnLegacyStart: p.onLegacyPluginStart, OnStart: p.onPluginStart, }) - if err := backendPluginManager.Register(descriptor); err != nil { + if err := backendPluginManager.Register(p.Id, factory); err != nil { return errutil.Wrapf(err, "Failed to register backend plugin") } } @@ -60,7 +59,7 @@ func (p *DataSourcePlugin) Load(decoder *json.Decoder, pluginDir string, backend return nil } -func (p *DataSourcePlugin) onLegacyPluginStart(pluginID string, client *backendplugin.LegacyClient, logger log.Logger) error { +func (p *DataSourcePlugin) onLegacyPluginStart(pluginID string, client *grpcplugin.LegacyClient, logger log.Logger) error { tsdb.RegisterTsdbQueryEndpoint(pluginID, func(dsInfo *models.DataSource) (tsdb.TsdbQueryEndpoint, error) { return wrapper.NewDatasourcePluginWrapper(logger, client.DatasourcePlugin), nil }) @@ -68,7 +67,7 @@ func (p *DataSourcePlugin) onLegacyPluginStart(pluginID string, client *backendp return nil } -func (p *DataSourcePlugin) onPluginStart(pluginID string, client *backendplugin.Client, logger log.Logger) error { +func (p *DataSourcePlugin) onPluginStart(pluginID string, client *grpcplugin.Client, logger log.Logger) error { if client.DataPlugin != nil { tsdb.RegisterTsdbQueryEndpoint(pluginID, func(dsInfo *models.DataSource) (tsdb.TsdbQueryEndpoint, error) { return wrapper.NewDatasourcePluginWrapperV2(logger, p.Id, p.Type, client.DataPlugin), nil diff --git a/pkg/plugins/plugins_test.go b/pkg/plugins/plugins_test.go index 7b6b828f752..94a4d1fd4ea 100644 --- a/pkg/plugins/plugins_test.go +++ b/pkg/plugins/plugins_test.go @@ -189,8 +189,8 @@ type fakeBackendPluginManager struct { registeredPlugins []string } -func (f *fakeBackendPluginManager) Register(descriptor backendplugin.PluginDescriptor) error { - f.registeredPlugins = append(f.registeredPlugins, descriptor.PluginID()) +func (f *fakeBackendPluginManager) Register(pluginID string, factory backendplugin.PluginFactoryFunc) error { + f.registeredPlugins = append(f.registeredPlugins, pluginID) return nil } @@ -198,11 +198,11 @@ func (f *fakeBackendPluginManager) StartPlugin(ctx context.Context, pluginID str return nil } -func (f *fakeBackendPluginManager) CollectMetrics(ctx context.Context, pluginID string) (*backendplugin.CollectMetricsResult, error) { +func (f *fakeBackendPluginManager) CollectMetrics(ctx context.Context, pluginID string) (*backend.CollectMetricsResult, error) { return nil, nil } -func (f *fakeBackendPluginManager) CheckHealth(ctx context.Context, pCtx backend.PluginContext) (*backendplugin.CheckHealthResult, error) { +func (f *fakeBackendPluginManager) CheckHealth(ctx context.Context, pCtx backend.PluginContext) (*backend.CheckHealthResult, error) { return nil, nil } diff --git a/pkg/plugins/renderer_plugin.go b/pkg/plugins/renderer_plugin.go index fb55e6feb43..fcf7829ddde 100644 --- a/pkg/plugins/renderer_plugin.go +++ b/pkg/plugins/renderer_plugin.go @@ -8,6 +8,7 @@ import ( pluginModel "github.com/grafana/grafana-plugin-model/go/renderer" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/plugins/backendplugin" + "github.com/grafana/grafana/pkg/plugins/backendplugin/grpcplugin" "github.com/grafana/grafana/pkg/plugins/backendplugin/pluginextensionv2" "github.com/grafana/grafana/pkg/util/errutil" ) @@ -34,11 +35,11 @@ func (r *RendererPlugin) Load(decoder *json.Decoder, pluginDir string, backendPl cmd := ComposePluginStartCommand("plugin_start") fullpath := path.Join(r.PluginDir, cmd) - descriptor := backendplugin.NewRendererPluginDescriptor(r.Id, fullpath, backendplugin.PluginStartFuncs{ + factory := grpcplugin.NewRendererPlugin(r.Id, fullpath, grpcplugin.PluginStartFuncs{ OnLegacyStart: r.onLegacyPluginStart, OnStart: r.onPluginStart, }) - if err := backendPluginManager.Register(descriptor); err != nil { + if err := backendPluginManager.Register(r.Id, factory); err != nil { return errutil.Wrapf(err, "Failed to register backend plugin") } @@ -54,12 +55,12 @@ func (r *RendererPlugin) Start(ctx context.Context) error { return nil } -func (r *RendererPlugin) onLegacyPluginStart(pluginID string, client *backendplugin.LegacyClient, logger log.Logger) error { +func (r *RendererPlugin) onLegacyPluginStart(pluginID string, client *grpcplugin.LegacyClient, logger log.Logger) error { r.GrpcPluginV1 = client.RendererPlugin return nil } -func (r *RendererPlugin) onPluginStart(pluginID string, client *backendplugin.Client, logger log.Logger) error { +func (r *RendererPlugin) onPluginStart(pluginID string, client *grpcplugin.Client, logger log.Logger) error { r.GrpcPluginV2 = client.RendererPlugin return nil } diff --git a/pkg/plugins/transform_plugin.go b/pkg/plugins/transform_plugin.go index 2f864b82c43..b62cc3e5258 100644 --- a/pkg/plugins/transform_plugin.go +++ b/pkg/plugins/transform_plugin.go @@ -7,12 +7,14 @@ import ( "path" "strconv" + sdkgrpcplugin "github.com/grafana/grafana-plugin-sdk-go/backend/grpcplugin" "github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2" "github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/plugins/backendplugin" + "github.com/grafana/grafana/pkg/plugins/backendplugin/grpcplugin" "github.com/grafana/grafana/pkg/plugins/datasource/wrapper" "github.com/grafana/grafana/pkg/tsdb" "github.com/grafana/grafana/pkg/util/errutil" @@ -37,10 +39,10 @@ func (p *TransformPlugin) Load(decoder *json.Decoder, pluginDir string, backendP cmd := ComposePluginStartCommand(p.Executable) fullpath := path.Join(p.PluginDir, cmd) - descriptor := backendplugin.NewBackendPluginDescriptor(p.Id, fullpath, backendplugin.PluginStartFuncs{ + factory := grpcplugin.NewBackendPlugin(p.Id, fullpath, grpcplugin.PluginStartFuncs{ OnStart: p.onPluginStart, }) - if err := backendPluginManager.Register(descriptor); err != nil { + if err := backendPluginManager.Register(p.Id, factory); err != nil { return errutil.Wrapf(err, "Failed to register backend plugin") } @@ -49,7 +51,7 @@ func (p *TransformPlugin) Load(decoder *json.Decoder, pluginDir string, backendP return nil } -func (p *TransformPlugin) onPluginStart(pluginID string, client *backendplugin.Client, logger log.Logger) error { +func (p *TransformPlugin) onPluginStart(pluginID string, client *grpcplugin.Client, logger log.Logger) error { p.TransformWrapper = NewTransformWrapper(logger, client.TransformPlugin) if client.DataPlugin != nil { @@ -65,12 +67,12 @@ func (p *TransformPlugin) onPluginStart(pluginID string, client *backendplugin.C // Wrapper Code // ... -func NewTransformWrapper(log log.Logger, plugin backendplugin.TransformPlugin) *TransformWrapper { +func NewTransformWrapper(log log.Logger, plugin sdkgrpcplugin.TransformClient) *TransformWrapper { return &TransformWrapper{plugin, log, &transformCallback{log}} } type TransformWrapper struct { - backendplugin.TransformPlugin + sdkgrpcplugin.TransformClient logger log.Logger callback *transformCallback } @@ -100,7 +102,7 @@ func (tw *TransformWrapper) Transform(ctx context.Context, query *tsdb.TsdbQuery }, }) } - pbRes, err := tw.TransformPlugin.TransformData(ctx, pbQuery, tw.callback) + pbRes, err := tw.TransformClient.TransformData(ctx, pbQuery, tw.callback) if err != nil { return nil, err } diff --git a/pkg/setting/setting.go b/pkg/setting/setting.go index b8d568894a7..7845f46b051 100644 --- a/pkg/setting/setting.go +++ b/pkg/setting/setting.go @@ -228,6 +228,16 @@ type Cfg struct { AppSubUrl string ServeFromSubPath bool + // build + BuildVersion string + BuildCommit string + BuildBranch string + BuildStamp int64 + IsEnterprise bool + + // packaging + Packaging string + // Paths ProvisioningPath string DataPath string @@ -607,6 +617,13 @@ func (cfg *Cfg) Load(args *CommandLineArgs) error { // Temporary keep global, to make refactor in steps Raw = cfg.Raw + cfg.BuildVersion = BuildVersion + cfg.BuildCommit = BuildCommit + cfg.BuildStamp = BuildStamp + cfg.BuildBranch = BuildBranch + cfg.IsEnterprise = IsEnterprise + cfg.Packaging = Packaging + ApplicationName = APP_NAME Env, err = valueAsString(iniFile.Section(""), "app_mode", "development")