From f135a5c8a43029e7711c6a4f83b1da52feb029f8 Mon Sep 17 00:00:00 2001 From: Will Browne Date: Fri, 6 May 2022 10:58:02 +0200 Subject: [PATCH] Plugins: Refactor plugin resource call with and without data source (#48754) * refactor plugin resource call with/without ds * check err * fix imports * only validate req on ds path * Update warn log Co-authored-by: Marcus Efraimsson Co-authored-by: Marcus Efraimsson --- pkg/api/datasources.go | 2 +- pkg/api/plugin_resource.go | 287 +++++++++++++++++++++ pkg/api/plugins.go | 257 +----------------- pkg/plugins/plugincontext/plugincontext.go | 54 ++-- pkg/services/live/live.go | 2 +- pkg/services/live/liveplugin/plugin.go | 20 +- 6 files changed, 336 insertions(+), 286 deletions(-) create mode 100644 pkg/api/plugin_resource.go diff --git a/pkg/api/datasources.go b/pkg/api/datasources.go index a35471d31fa..500d113cc15 100644 --- a/pkg/api/datasources.go +++ b/pkg/api/datasources.go @@ -422,7 +422,7 @@ func (hs *HTTPServer) CallDatasourceResource(c *models.ReqContext) { return } - hs.callPluginResource(c, plugin.ID, ds.Uid) + hs.callPluginResourceWithDataSource(c, plugin.ID, ds) } func (hs *HTTPServer) convertModelToDtos(ctx context.Context, ds *models.DataSource) dtos.DataSource { diff --git a/pkg/api/plugin_resource.go b/pkg/api/plugin_resource.go new file mode 100644 index 00000000000..e2e299a1fa4 --- /dev/null +++ b/pkg/api/plugin_resource.go @@ -0,0 +1,287 @@ +package api + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/url" + "sync" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/plugins/backendplugin" + "github.com/grafana/grafana/pkg/util/errutil" + "github.com/grafana/grafana/pkg/util/proxyutil" + "github.com/grafana/grafana/pkg/web" +) + +// CallResource passes a resource call from a plugin to the backend plugin. +// +// /api/plugins/:pluginId/resources/* +func (hs *HTTPServer) CallResource(c *models.ReqContext) { + hs.callPluginResource(c, web.Params(c.Req)[":pluginId"]) +} + +func (hs *HTTPServer) callPluginResource(c *models.ReqContext, pluginID string) { + pCtx, found, err := hs.PluginContextProvider.Get(c.Req.Context(), pluginID, c.SignedInUser) + if err != nil { + c.JsonApiErr(500, "Failed to get plugin settings", err) + return + } + if !found { + c.JsonApiErr(404, "Plugin not found", nil) + return + } + + req, err := hs.pluginResourceRequest(c) + if err != nil { + c.JsonApiErr(http.StatusBadRequest, "Failed for create plugin resource request", err) + return + } + + if err = hs.makePluginResourceRequest(c.Resp, req, pCtx); err != nil { + handleCallResourceError(err, c) + } +} + +func (hs *HTTPServer) callPluginResourceWithDataSource(c *models.ReqContext, pluginID string, ds *models.DataSource) { + pCtx, found, err := hs.PluginContextProvider.GetWithDataSource(c.Req.Context(), pluginID, c.SignedInUser, ds) + if err != nil { + c.JsonApiErr(500, "Failed to get plugin settings", err) + return + } + if !found { + c.JsonApiErr(404, "Plugin not found", nil) + return + } + + var dsURL string + if pCtx.DataSourceInstanceSettings != nil { + dsURL = pCtx.DataSourceInstanceSettings.URL + } + + err = hs.PluginRequestValidator.Validate(dsURL, c.Req) + if err != nil { + c.JsonApiErr(http.StatusForbidden, "Access denied", err) + return + } + + req, err := hs.pluginResourceRequest(c) + if err != nil { + c.JsonApiErr(http.StatusBadRequest, "Failed for create plugin resource request", err) + return + } + + if hs.DataProxy.OAuthTokenService.IsOAuthPassThruEnabled(ds) { + if token := hs.DataProxy.OAuthTokenService.GetCurrentOAuthToken(c.Req.Context(), c.SignedInUser); token != nil { + req.Header.Add("Authorization", fmt.Sprintf("%s %s", token.Type(), token.AccessToken)) + + idToken, ok := token.Extra("id_token").(string) + if ok && idToken != "" { + req.Header.Add("X-ID-Token", idToken) + } + } + } + + if err = hs.makePluginResourceRequest(c.Resp, req, pCtx); err != nil { + handleCallResourceError(err, c) + } +} + +func (hs *HTTPServer) pluginResourceRequest(c *models.ReqContext) (*http.Request, error) { + clonedReq := c.Req.Clone(c.Req.Context()) + rawURL := web.Params(c.Req)["*"] + if clonedReq.URL.RawQuery != "" { + rawURL += "?" + clonedReq.URL.RawQuery + } + urlPath, err := url.Parse(rawURL) + if err != nil { + return nil, err + } + clonedReq.URL = urlPath + + return clonedReq, nil +} + +func (hs *HTTPServer) makePluginResourceRequest(w http.ResponseWriter, req *http.Request, pCtx backend.PluginContext) error { + keepCookieModel := struct { + KeepCookies []string `json:"keepCookies"` + }{} + if dis := pCtx.DataSourceInstanceSettings; dis != nil { + err := json.Unmarshal(dis.JSONData, &keepCookieModel) + if err != nil { + hs.log.Warn("failed to unpack JSONData in datasource instance settings", "err", err) + } + } + proxyutil.ClearCookieHeader(req, keepCookieModel.KeepCookies) + proxyutil.PrepareProxyRequest(req) + + body, err := ioutil.ReadAll(req.Body) + if err != nil { + return fmt.Errorf("failed to read request body: %w", err) + } + + crReq := &backend.CallResourceRequest{ + PluginContext: pCtx, + Path: req.URL.Path, + Method: req.Method, + URL: req.URL.String(), + Headers: req.Header, + Body: body, + } + + childCtx, cancel := context.WithCancel(req.Context()) + defer cancel() + stream := newCallResourceResponseStream(childCtx) + + var wg sync.WaitGroup + wg.Add(1) + + defer func() { + if err := stream.Close(); err != nil { + hs.log.Warn("Failed to close plugin resource stream", "err", err) + } + wg.Wait() + }() + + var flushStreamErr error + go func() { + flushStreamErr = hs.flushStream(stream, w) + wg.Done() + }() + + if err := hs.pluginClient.CallResource(req.Context(), crReq, stream); err != nil { + return err + } + + return flushStreamErr +} + +func (hs *HTTPServer) flushStream(stream callResourceClientResponseStream, w http.ResponseWriter) error { + processedStreams := 0 + + for { + resp, err := stream.Recv() + if errors.Is(err, io.EOF) { + if processedStreams == 0 { + return errors.New("received empty resource response") + } + return nil + } + if err != nil { + if processedStreams == 0 { + return errutil.Wrap("failed to receive response from resource call", err) + } + + hs.log.Error("Failed to receive response from resource call", "err", err) + return stream.Close() + } + + // Expected that headers and status are only part of first stream + if processedStreams == 0 && resp.Headers != nil { + // Make sure a content type always is returned in response + if _, exists := resp.Headers["Content-Type"]; !exists { + resp.Headers["Content-Type"] = []string{"application/json"} + } + + for k, values := range resp.Headers { + // Due to security reasons we don't want to forward + // cookies from a backend plugin to clients/browsers. + if k == "Set-Cookie" { + continue + } + + for _, v := range values { + // TODO: Figure out if we should use Set here instead + // nolint:gocritic + w.Header().Add(k, v) + } + } + + proxyutil.SetProxyResponseHeaders(w.Header()) + + w.WriteHeader(resp.Status) + } + + if _, err := w.Write(resp.Body); err != nil { + hs.log.Error("Failed to write resource response", "err", err) + } + + if flusher, ok := w.(http.Flusher); ok { + flusher.Flush() + } + processedStreams++ + } +} + +func handleCallResourceError(err error, reqCtx *models.ReqContext) { + if errors.Is(err, backendplugin.ErrPluginUnavailable) { + reqCtx.JsonApiErr(503, "Plugin unavailable", err) + return + } + + if errors.Is(err, backendplugin.ErrMethodNotImplemented) { + reqCtx.JsonApiErr(404, "Not found", err) + return + } + + reqCtx.JsonApiErr(500, "Failed to call resource", err) +} + +// callResourceClientResponseStream is used for receiving resource call responses. +type callResourceClientResponseStream interface { + Recv() (*backend.CallResourceResponse, error) + Close() error +} + +type callResourceResponseStream struct { + ctx context.Context + stream chan *backend.CallResourceResponse + closed bool +} + +func newCallResourceResponseStream(ctx context.Context) *callResourceResponseStream { + return &callResourceResponseStream{ + ctx: ctx, + stream: make(chan *backend.CallResourceResponse), + } +} + +func (s *callResourceResponseStream) Send(res *backend.CallResourceResponse) error { + if s.closed { + return errors.New("cannot send to a closed stream") + } + + select { + case <-s.ctx.Done(): + return errors.New("cancelled") + case s.stream <- res: + return nil + } +} + +func (s *callResourceResponseStream) Recv() (*backend.CallResourceResponse, error) { + select { + case <-s.ctx.Done(): + return nil, s.ctx.Err() + case res, ok := <-s.stream: + if !ok { + return nil, io.EOF + } + return res, nil + } +} + +func (s *callResourceResponseStream) Close() error { + if s.closed { + return errors.New("cannot close a closed stream") + } + + close(s.stream) + s.closed = true + return nil +} diff --git a/pkg/api/plugins.go b/pkg/api/plugins.go index 227a6d6fba1..48f938b83ea 100644 --- a/pkg/api/plugins.go +++ b/pkg/api/plugins.go @@ -5,16 +5,13 @@ import ( "encoding/json" "errors" "fmt" - "io" "io/ioutil" "net/http" - "net/url" "os" "path" "path/filepath" "sort" "strings" - "sync" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana/pkg/api/dtos" @@ -26,8 +23,6 @@ import ( "github.com/grafana/grafana/pkg/plugins/manager/installer" "github.com/grafana/grafana/pkg/services/pluginsettings" "github.com/grafana/grafana/pkg/setting" - "github.com/grafana/grafana/pkg/util/errutil" - "github.com/grafana/grafana/pkg/util/proxyutil" "github.com/grafana/grafana/pkg/web" ) @@ -308,7 +303,7 @@ func (hs *HTTPServer) getPluginAssets(c *models.ReqContext) { func (hs *HTTPServer) CheckHealth(c *models.ReqContext) response.Response { pluginID := web.Params(c.Req)[":pluginId"] - pCtx, found, err := hs.PluginContextProvider.Get(c.Req.Context(), pluginID, "", c.SignedInUser, false) + pCtx, found, err := hs.PluginContextProvider.Get(c.Req.Context(), pluginID, c.SignedInUser) if err != nil { return response.Error(500, "Failed to get plugin settings", err) } @@ -346,13 +341,6 @@ func (hs *HTTPServer) CheckHealth(c *models.ReqContext) response.Response { return response.JSON(http.StatusOK, payload) } -// CallResource passes a resource call from a plugin to the backend plugin. -// -// /api/plugins/:pluginId/resources/* -func (hs *HTTPServer) CallResource(c *models.ReqContext) { - hs.callPluginResource(c, web.Params(c.Req)[":pluginId"], "") -} - func (hs *HTTPServer) GetPluginErrorsList(_ *models.ReqContext) response.Response { return response.JSON(http.StatusOK, hs.pluginErrorResolver.PluginErrors()) } @@ -471,246 +459,3 @@ func (hs *HTTPServer) pluginMarkdown(ctx context.Context, pluginId string, name func mdFilepath(mdFilename string) string { return filepath.Clean(filepath.Join("/", fmt.Sprintf("%s.md", mdFilename))) } - -func (hs *HTTPServer) callPluginResource(c *models.ReqContext, pluginID, dsUID string) { - pCtx, found, err := hs.PluginContextProvider.Get(c.Req.Context(), pluginID, dsUID, c.SignedInUser, false) - if err != nil { - c.JsonApiErr(500, "Failed to get plugin settings", err) - return - } - if !found { - c.JsonApiErr(404, "Plugin not found", nil) - return - } - - var dsURL string - if pCtx.DataSourceInstanceSettings != nil { - dsURL = pCtx.DataSourceInstanceSettings.URL - } - - err = hs.PluginRequestValidator.Validate(dsURL, c.Req) - if err != nil { - c.JsonApiErr(http.StatusForbidden, "Access denied", err) - return - } - - clonedReq := c.Req.Clone(c.Req.Context()) - rawURL := web.Params(c.Req)["*"] - if clonedReq.URL.RawQuery != "" { - rawURL += "?" + clonedReq.URL.RawQuery - } - urlPath, err := url.Parse(rawURL) - if err != nil { - handleCallResourceError(err, c) - return - } - clonedReq.URL = urlPath - - if dsUID != "" { - ds, err := hs.DataSourceCache.GetDatasourceByUID(c.Req.Context(), dsUID, c.SignedInUser, c.SkipCache) - - if err != nil { - if errors.Is(err, models.ErrDataSourceNotFound) { - c.JsonApiErr(404, "Datasource not found", err) - return - } - - c.JsonApiErr(500, "Failed to get datasource", err) - return - } - - if hs.DataProxy.OAuthTokenService.IsOAuthPassThruEnabled(ds) { - if token := hs.DataProxy.OAuthTokenService.GetCurrentOAuthToken(c.Req.Context(), c.SignedInUser); token != nil { - clonedReq.Header.Add("Authorization", fmt.Sprintf("%s %s", token.Type(), token.AccessToken)) - - idToken, ok := token.Extra("id_token").(string) - if ok && idToken != "" { - clonedReq.Header.Add("X-ID-Token", idToken) - } - } - } - } - - if err = hs.makePluginResourceRequest(c.Resp, clonedReq, pCtx); err != nil { - handleCallResourceError(err, c) - } -} - -func (hs *HTTPServer) makePluginResourceRequest(w http.ResponseWriter, req *http.Request, pCtx backend.PluginContext) error { - keepCookieModel := struct { - KeepCookies []string `json:"keepCookies"` - }{} - if dis := pCtx.DataSourceInstanceSettings; dis != nil { - err := json.Unmarshal(dis.JSONData, &keepCookieModel) - if err != nil { - hs.log.Warn("failed to to unpack JSONData in datasource instance settings", "err", err) - } - } - proxyutil.ClearCookieHeader(req, keepCookieModel.KeepCookies) - proxyutil.PrepareProxyRequest(req) - - body, err := ioutil.ReadAll(req.Body) - if err != nil { - return fmt.Errorf("failed to read request body: %w", err) - } - - crReq := &backend.CallResourceRequest{ - PluginContext: pCtx, - Path: req.URL.Path, - Method: req.Method, - URL: req.URL.String(), - Headers: req.Header, - Body: body, - } - - childCtx, cancel := context.WithCancel(req.Context()) - defer cancel() - stream := newCallResourceResponseStream(childCtx) - - var wg sync.WaitGroup - wg.Add(1) - - defer func() { - if err := stream.Close(); err != nil { - hs.log.Warn("Failed to close plugin resource stream", "err", err) - } - wg.Wait() - }() - - var flushStreamErr error - go func() { - flushStreamErr = hs.flushStream(stream, w) - wg.Done() - }() - - if err := hs.pluginClient.CallResource(req.Context(), crReq, stream); err != nil { - return err - } - - return flushStreamErr -} - -func (hs *HTTPServer) flushStream(stream callResourceClientResponseStream, w http.ResponseWriter) error { - processedStreams := 0 - - for { - resp, err := stream.Recv() - if errors.Is(err, io.EOF) { - if processedStreams == 0 { - return errors.New("received empty resource response") - } - return nil - } - if err != nil { - if processedStreams == 0 { - return errutil.Wrap("failed to receive response from resource call", err) - } - - hs.log.Error("Failed to receive response from resource call", "err", err) - return stream.Close() - } - - // Expected that headers and status are only part of first stream - if processedStreams == 0 && resp.Headers != nil { - // Make sure a content type always is returned in response - if _, exists := resp.Headers["Content-Type"]; !exists { - resp.Headers["Content-Type"] = []string{"application/json"} - } - - for k, values := range resp.Headers { - // Due to security reasons we don't want to forward - // cookies from a backend plugin to clients/browsers. - if k == "Set-Cookie" { - continue - } - - for _, v := range values { - // TODO: Figure out if we should use Set here instead - // nolint:gocritic - w.Header().Add(k, v) - } - } - - proxyutil.SetProxyResponseHeaders(w.Header()) - - w.WriteHeader(resp.Status) - } - - if _, err := w.Write(resp.Body); err != nil { - hs.log.Error("Failed to write resource response", "err", err) - } - - if flusher, ok := w.(http.Flusher); ok { - flusher.Flush() - } - processedStreams++ - } -} - -func handleCallResourceError(err error, reqCtx *models.ReqContext) { - if errors.Is(err, backendplugin.ErrPluginUnavailable) { - reqCtx.JsonApiErr(503, "Plugin unavailable", err) - return - } - - if errors.Is(err, backendplugin.ErrMethodNotImplemented) { - reqCtx.JsonApiErr(404, "Not found", err) - return - } - - reqCtx.JsonApiErr(500, "Failed to call resource", err) -} - -// callResourceClientResponseStream is used for receiving resource call responses. -type callResourceClientResponseStream interface { - Recv() (*backend.CallResourceResponse, error) - Close() error -} - -type callResourceResponseStream struct { - ctx context.Context - stream chan *backend.CallResourceResponse - closed bool -} - -func newCallResourceResponseStream(ctx context.Context) *callResourceResponseStream { - return &callResourceResponseStream{ - ctx: ctx, - stream: make(chan *backend.CallResourceResponse), - } -} - -func (s *callResourceResponseStream) Send(res *backend.CallResourceResponse) error { - if s.closed { - return errors.New("cannot send to a closed stream") - } - - select { - case <-s.ctx.Done(): - return errors.New("cancelled") - case s.stream <- res: - return nil - } -} - -func (s *callResourceResponseStream) Recv() (*backend.CallResourceResponse, error) { - select { - case <-s.ctx.Done(): - return nil, s.ctx.Err() - case res, ok := <-s.stream: - if !ok { - return nil, io.EOF - } - return res, nil - } -} - -func (s *callResourceResponseStream) Close() error { - if s.closed { - return errors.New("cannot close a closed stream") - } - - close(s.stream) - s.closed = true - return nil -} diff --git a/pkg/plugins/plugincontext/plugincontext.go b/pkg/plugins/plugincontext/plugincontext.go index e06a7821d4a..f8a3ece3ac2 100644 --- a/pkg/plugins/plugincontext/plugincontext.go +++ b/pkg/plugins/plugincontext/plugincontext.go @@ -43,11 +43,34 @@ type Provider struct { // Get allows getting plugin context by its ID. If datasourceUID is not empty string // then PluginContext.DataSourceInstanceSettings will be resolved and appended to // returned context. -func (p *Provider) Get(ctx context.Context, pluginID string, datasourceUID string, user *models.SignedInUser, skipCache bool) (backend.PluginContext, bool, error) { - pc := backend.PluginContext{} +func (p *Provider) Get(ctx context.Context, pluginID string, user *models.SignedInUser) (backend.PluginContext, bool, error) { + return p.pluginContext(ctx, pluginID, user) +} + +// GetWithDataSource allows getting plugin context by its ID and PluginContext.DataSourceInstanceSettings will be +// resolved and appended to the returned context. +func (p *Provider) GetWithDataSource(ctx context.Context, pluginID string, user *models.SignedInUser, ds *models.DataSource) (backend.PluginContext, bool, error) { + pCtx, exists, err := p.pluginContext(ctx, pluginID, user) + if err != nil { + return pCtx, exists, err + } + + datasourceSettings, err := adapters.ModelToInstanceSettings(ds, p.decryptSecureJsonDataFn(ctx)) + if err != nil { + return pCtx, exists, errutil.Wrap("Failed to convert datasource", err) + } + pCtx.DataSourceInstanceSettings = datasourceSettings + + return pCtx, true, nil +} + +const pluginSettingsCacheTTL = 5 * time.Second +const pluginSettingsCachePrefix = "plugin-setting-" + +func (p *Provider) pluginContext(ctx context.Context, pluginID string, user *models.SignedInUser) (backend.PluginContext, bool, error) { plugin, exists := p.pluginStore.Plugin(ctx, pluginID) if !exists { - return pc, false, nil + return backend.PluginContext{}, false, nil } jsonData := json.RawMessage{} @@ -59,18 +82,18 @@ func (p *Provider) Get(ctx context.Context, pluginID string, datasourceUID strin // models.ErrPluginSettingNotFound is expected if there's no row found for plugin setting in database (if non-app plugin). // If it's not this expected error something is wrong with cache or database and we return the error to the client. if !errors.Is(err, models.ErrPluginSettingNotFound) { - return pc, false, errutil.Wrap("Failed to get plugin settings", err) + return backend.PluginContext{}, false, errutil.Wrap("Failed to get plugin settings", err) } } else { jsonData, err = json.Marshal(ps.JSONData) if err != nil { - return pc, false, errutil.Wrap("Failed to unmarshal plugin json data", err) + return backend.PluginContext{}, false, errutil.Wrap("Failed to unmarshal plugin json data", err) } decryptedSecureJSONData = p.pluginSettingsService.DecryptedValues(ps) updated = ps.Updated } - pCtx := backend.PluginContext{ + return backend.PluginContext{ OrgID: user.OrgId, PluginID: plugin.ID, User: adapters.BackendUserFromSignedInUser(user), @@ -79,26 +102,9 @@ func (p *Provider) Get(ctx context.Context, pluginID string, datasourceUID strin DecryptedSecureJSONData: decryptedSecureJSONData, Updated: updated, }, - } - - if datasourceUID != "" { - ds, err := p.dataSourceCache.GetDatasourceByUID(ctx, datasourceUID, user, skipCache) - if err != nil { - return pc, false, errutil.Wrap("Failed to get datasource", err) - } - datasourceSettings, err := adapters.ModelToInstanceSettings(ds, p.decryptSecureJsonDataFn(ctx)) - if err != nil { - return pc, false, errutil.Wrap("Failed to convert datasource", err) - } - pCtx.DataSourceInstanceSettings = datasourceSettings - } - - return pCtx, true, nil + }, true, nil } -const pluginSettingsCacheTTL = 5 * time.Second -const pluginSettingsCachePrefix = "plugin-setting-" - func (p *Provider) getCachedPluginSettings(ctx context.Context, pluginID string, user *models.SignedInUser) (*pluginsettings.DTO, error) { cacheKey := pluginSettingsCachePrefix + pluginID diff --git a/pkg/services/live/live.go b/pkg/services/live/live.go index 1dba33d32b3..21bce2f69ee 100644 --- a/pkg/services/live/live.go +++ b/pkg/services/live/live.go @@ -227,7 +227,7 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r } } - g.contextGetter = liveplugin.NewContextGetter(g.PluginContextProvider) + g.contextGetter = liveplugin.NewContextGetter(g.PluginContextProvider, g.DataSourceCache) pipelinedChannelLocalPublisher := liveplugin.NewChannelLocalPublisher(node, g.Pipeline) numLocalSubscribersGetter := liveplugin.NewNumLocalSubscribersGetter(node) g.runStreamManager = runstream.NewManager(pipelinedChannelLocalPublisher, numLocalSubscribersGetter, g.contextGetter) diff --git a/pkg/services/live/liveplugin/plugin.go b/pkg/services/live/liveplugin/plugin.go index cf06a67cfd6..31376e7f293 100644 --- a/pkg/services/live/liveplugin/plugin.go +++ b/pkg/services/live/liveplugin/plugin.go @@ -6,8 +6,10 @@ import ( "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/plugins/plugincontext" + "github.com/grafana/grafana/pkg/services/datasources" "github.com/grafana/grafana/pkg/services/live/orgchannel" "github.com/grafana/grafana/pkg/services/live/pipeline" + "github.com/grafana/grafana/pkg/util/errutil" "github.com/centrifugal/centrifuge" "github.com/grafana/grafana-plugin-sdk-go/backend" @@ -60,15 +62,25 @@ func (p *NumLocalSubscribersGetter) GetNumLocalSubscribers(channelID string) (in } type ContextGetter struct { - PluginContextProvider *plugincontext.Provider + pluginContextProvider *plugincontext.Provider + dataSourceCache datasources.CacheService } -func NewContextGetter(pluginContextProvider *plugincontext.Provider) *ContextGetter { +func NewContextGetter(pluginContextProvider *plugincontext.Provider, dataSourceCache datasources.CacheService) *ContextGetter { return &ContextGetter{ - PluginContextProvider: pluginContextProvider, + pluginContextProvider: pluginContextProvider, + dataSourceCache: dataSourceCache, } } func (g *ContextGetter) GetPluginContext(ctx context.Context, user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) { - return g.PluginContextProvider.Get(ctx, pluginID, datasourceUID, user, skipCache) + if datasourceUID == "" { + return g.pluginContextProvider.Get(ctx, pluginID, user) + } + + ds, err := g.dataSourceCache.GetDatasourceByUID(ctx, datasourceUID, user, skipCache) + if err != nil { + return backend.PluginContext{}, false, errutil.Wrap("Failed to get datasource", err) + } + return g.pluginContextProvider.GetWithDataSource(ctx, pluginID, user, ds) }