diff --git a/pkg/api/datasources.go b/pkg/api/datasources.go index 610d9eba14b..cd9f8c5f871 100644 --- a/pkg/api/datasources.go +++ b/pkg/api/datasources.go @@ -395,18 +395,7 @@ func (hs *HTTPServer) CallDatasourceResource(c *models.ReqContext) { return } - dsInstanceSettings, err := adapters.ModelToInstanceSettings(ds, hs.decryptSecureJsonDataFn()) - if err != nil { - c.JsonApiErr(500, "Unable to process datasource instance model", err) - } - - pCtx := backend.PluginContext{ - User: adapters.BackendUserFromSignedInUser(c.SignedInUser), - OrgID: c.OrgId, - PluginID: plugin.ID, - DataSourceInstanceSettings: dsInstanceSettings, - } - hs.pluginClient.CallResource(pCtx, c, web.Params(c.Req)["*"]) + hs.callPluginResource(c, plugin.ID, ds.Uid) } func convertModelToDtos(ds *models.DataSource) dtos.DataSource { diff --git a/pkg/api/plugins.go b/pkg/api/plugins.go index 7cf31dedd40..b5161f80a13 100644 --- a/pkg/api/plugins.go +++ b/pkg/api/plugins.go @@ -5,12 +5,15 @@ import ( "encoding/json" "errors" "fmt" + "io" "io/ioutil" "net/http" + "net/url" "os" "path/filepath" "sort" "strings" + "sync" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana/pkg/api/dtos" @@ -22,6 +25,8 @@ import ( "github.com/grafana/grafana/pkg/plugins/backendplugin" "github.com/grafana/grafana/pkg/plugins/manager/installer" "github.com/grafana/grafana/pkg/setting" + "github.com/grafana/grafana/pkg/util/errutil" + "github.com/grafana/grafana/pkg/util/proxyutil" "github.com/grafana/grafana/pkg/web" ) @@ -390,18 +395,7 @@ func (hs *HTTPServer) CheckHealth(c *models.ReqContext) response.Response { // // /api/plugins/:pluginId/resources/* func (hs *HTTPServer) CallResource(c *models.ReqContext) { - pluginID := web.Params(c.Req)[":pluginId"] - - pCtx, found, err := hs.PluginContextProvider.Get(c.Req.Context(), pluginID, "", c.SignedInUser, false) - if err != nil { - c.JsonApiErr(500, "Failed to get plugin settings", err) - return - } - if !found { - c.JsonApiErr(404, "Plugin not found", nil) - return - } - hs.pluginClient.CallResource(pCtx, c, web.Params(c.Req)["*"]) + hs.callPluginResource(c, web.Params(c.Req)[":pluginId"], "") } func (hs *HTTPServer) GetPluginErrorsList(_ *models.ReqContext) response.Response { @@ -522,3 +516,218 @@ func (hs *HTTPServer) pluginMarkdown(ctx context.Context, pluginId string, name func mdFilepath(mdFilename string) string { return filepath.Clean(filepath.Join("/", fmt.Sprintf("%s.md", mdFilename))) } + +func (hs *HTTPServer) callPluginResource(c *models.ReqContext, pluginID, dsUID string) { + pCtx, found, err := hs.PluginContextProvider.Get(c.Req.Context(), pluginID, dsUID, c.SignedInUser, false) + if err != nil { + c.JsonApiErr(500, "Failed to get plugin settings", err) + return + } + if !found { + c.JsonApiErr(404, "Plugin not found", nil) + return + } + + var dsURL string + if pCtx.DataSourceInstanceSettings != nil { + dsURL = pCtx.DataSourceInstanceSettings.URL + } + + err = hs.PluginRequestValidator.Validate(dsURL, c.Req) + if err != nil { + c.JsonApiErr(http.StatusForbidden, "Access denied", err) + return + } + + clonedReq := c.Req.Clone(c.Req.Context()) + rawURL := web.Params(c.Req)["*"] + if clonedReq.URL.RawQuery != "" { + rawURL += "?" + clonedReq.URL.RawQuery + } + urlPath, err := url.Parse(rawURL) + if err != nil { + handleCallResourceError(err, c) + return + } + clonedReq.URL = urlPath + + if err = hs.makePluginResourceRequest(c.Resp, clonedReq, pCtx); err != nil { + handleCallResourceError(err, c) + } +} + +func (hs *HTTPServer) makePluginResourceRequest(w http.ResponseWriter, req *http.Request, pCtx backend.PluginContext) error { + keepCookieModel := struct { + KeepCookies []string `json:"keepCookies"` + }{} + if dis := pCtx.DataSourceInstanceSettings; dis != nil { + err := json.Unmarshal(dis.JSONData, &keepCookieModel) + if err != nil { + hs.log.Warn("failed to to unpack JSONData in datasource instance settings", "err", err) + } + } + proxyutil.ClearCookieHeader(req, keepCookieModel.KeepCookies) + proxyutil.PrepareProxyRequest(req) + + body, err := ioutil.ReadAll(req.Body) + if err != nil { + return fmt.Errorf("failed to read request body: %w", err) + } + + crReq := &backend.CallResourceRequest{ + PluginContext: pCtx, + Path: req.URL.Path, + Method: req.Method, + URL: req.URL.String(), + Headers: req.Header, + Body: body, + } + + childCtx, cancel := context.WithCancel(req.Context()) + defer cancel() + stream := newCallResourceResponseStream(childCtx) + + var wg sync.WaitGroup + wg.Add(1) + + defer func() { + if err := stream.Close(); err != nil { + hs.log.Warn("Failed to close plugin resource stream", "err", err) + } + wg.Wait() + }() + + var flushStreamErr error + go func() { + flushStreamErr = hs.flushStream(stream, w) + wg.Done() + }() + + if err := hs.pluginClient.CallResource(req.Context(), crReq, stream); err != nil { + return err + } + + return flushStreamErr +} + +func (hs *HTTPServer) flushStream(stream callResourceClientResponseStream, w http.ResponseWriter) error { + processedStreams := 0 + + for { + resp, err := stream.Recv() + if errors.Is(err, io.EOF) { + if processedStreams == 0 { + return errors.New("received empty resource response") + } + return nil + } + if err != nil { + if processedStreams == 0 { + return errutil.Wrap("failed to receive response from resource call", err) + } + + hs.log.Error("Failed to receive response from resource call", "err", err) + return stream.Close() + } + + // Expected that headers and status are only part of first stream + if processedStreams == 0 && resp.Headers != nil { + // Make sure a content type always is returned in response + if _, exists := resp.Headers["Content-Type"]; !exists { + resp.Headers["Content-Type"] = []string{"application/json"} + } + + for k, values := range resp.Headers { + // Due to security reasons we don't want to forward + // cookies from a backend plugin to clients/browsers. + if k == "Set-Cookie" { + continue + } + + for _, v := range values { + // TODO: Figure out if we should use Set here instead + // nolint:gocritic + w.Header().Add(k, v) + } + } + w.WriteHeader(resp.Status) + } + + if _, err := w.Write(resp.Body); err != nil { + hs.log.Error("Failed to write resource response", "err", err) + } + + if flusher, ok := w.(http.Flusher); ok { + flusher.Flush() + } + processedStreams++ + } +} + +func handleCallResourceError(err error, reqCtx *models.ReqContext) { + if errors.Is(err, backendplugin.ErrPluginUnavailable) { + reqCtx.JsonApiErr(503, "Plugin unavailable", err) + return + } + + if errors.Is(err, backendplugin.ErrMethodNotImplemented) { + reqCtx.JsonApiErr(404, "Not found", err) + return + } + + reqCtx.JsonApiErr(500, "Failed to call resource", err) +} + +// callResourceClientResponseStream is used for receiving resource call responses. +type callResourceClientResponseStream interface { + Recv() (*backend.CallResourceResponse, error) + Close() error +} + +type callResourceResponseStream struct { + ctx context.Context + stream chan *backend.CallResourceResponse + closed bool +} + +func newCallResourceResponseStream(ctx context.Context) *callResourceResponseStream { + return &callResourceResponseStream{ + ctx: ctx, + stream: make(chan *backend.CallResourceResponse), + } +} + +func (s *callResourceResponseStream) Send(res *backend.CallResourceResponse) error { + if s.closed { + return errors.New("cannot send to a closed stream") + } + + select { + case <-s.ctx.Done(): + return errors.New("cancelled") + case s.stream <- res: + return nil + } +} + +func (s *callResourceResponseStream) Recv() (*backend.CallResourceResponse, error) { + select { + case <-s.ctx.Done(): + return nil, s.ctx.Err() + case res, ok := <-s.stream: + if !ok { + return nil, io.EOF + } + return res, nil + } +} + +func (s *callResourceResponseStream) Close() error { + if s.closed { + return errors.New("cannot close a closed stream") + } + + close(s.stream) + s.closed = true + return nil +} diff --git a/pkg/plugins/ifaces.go b/pkg/plugins/ifaces.go index 04ecd0757d3..c267d1a5666 100644 --- a/pkg/plugins/ifaces.go +++ b/pkg/plugins/ifaces.go @@ -55,9 +55,8 @@ type Client interface { backend.QueryDataHandler backend.CheckHealthHandler backend.StreamHandler + backend.CallResourceHandler - // CallResource calls a plugin resource. - CallResource(pCtx backend.PluginContext, ctx *models.ReqContext, path string) // CollectMetrics collects metrics from a plugin. CollectMetrics(ctx context.Context, pluginID string) (*backend.CollectMetricsResult, error) } diff --git a/pkg/plugins/manager/manager.go b/pkg/plugins/manager/manager.go index 991b7474362..22687c0323d 100644 --- a/pkg/plugins/manager/manager.go +++ b/pkg/plugins/manager/manager.go @@ -2,13 +2,9 @@ package manager import ( "context" - "encoding/json" "errors" "fmt" - "io" - "io/ioutil" "net/http" - "net/url" "os" "path/filepath" "sync" @@ -26,7 +22,6 @@ import ( "github.com/grafana/grafana/pkg/services/sqlstore" "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/util/errutil" - "github.com/grafana/grafana/pkg/util/proxyutil" ) const ( @@ -249,161 +244,24 @@ func (m *PluginManager) QueryData(ctx context.Context, req *backend.QueryDataReq return resp, err } -func (m *PluginManager) CallResource(pCtx backend.PluginContext, reqCtx *models.ReqContext, path string) { - var dsURL string - if pCtx.DataSourceInstanceSettings != nil { - dsURL = pCtx.DataSourceInstanceSettings.URL - } - - err := m.requestValidator.Validate(dsURL, reqCtx.Req) - if err != nil { - reqCtx.JsonApiErr(http.StatusForbidden, "Access denied", err) - return - } - - clonedReq := reqCtx.Req.Clone(reqCtx.Req.Context()) - rawURL := path - if clonedReq.URL.RawQuery != "" { - rawURL += "?" + clonedReq.URL.RawQuery - } - urlPath, err := url.Parse(rawURL) - if err != nil { - handleCallResourceError(err, reqCtx) - return - } - clonedReq.URL = urlPath - err = m.callResourceInternal(reqCtx.Resp, clonedReq, pCtx) - if err != nil { - handleCallResourceError(err, reqCtx) - } -} - -func (m *PluginManager) callResourceInternal(w http.ResponseWriter, req *http.Request, pCtx backend.PluginContext) error { - p, exists := m.plugin(pCtx.PluginID) +func (m *PluginManager) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { + p, exists := m.plugin(req.PluginContext.PluginID) if !exists { return backendplugin.ErrPluginNotRegistered } - keepCookieModel := keepCookiesJSONModel{} - if dis := pCtx.DataSourceInstanceSettings; dis != nil { - err := json.Unmarshal(dis.JSONData, &keepCookieModel) - if err != nil { - p.Logger().Error("Failed to to unpack JSONData in datasource instance settings", "err", err) - } - } - - proxyutil.ClearCookieHeader(req, keepCookieModel.KeepCookies) - proxyutil.PrepareProxyRequest(req) - - body, err := ioutil.ReadAll(req.Body) - if err != nil { - return fmt.Errorf("failed to read request body: %w", err) - } - - crReq := &backend.CallResourceRequest{ - PluginContext: pCtx, - Path: req.URL.Path, - Method: req.Method, - URL: req.URL.String(), - Headers: req.Header, - Body: body, - } - - return instrumentation.InstrumentCallResourceRequest(p.PluginID(), func() error { - childCtx, cancel := context.WithCancel(req.Context()) - defer cancel() - stream := newCallResourceResponseStream(childCtx) - - var wg sync.WaitGroup - wg.Add(1) - - defer func() { - if err := stream.Close(); err != nil { - m.log.Warn("Failed to close stream", "err", err) - } - wg.Wait() - }() - - var flushStreamErr error - go func() { - flushStreamErr = flushStream(p, stream, w) - wg.Done() - }() - - if err := p.CallResource(req.Context(), crReq, stream); err != nil { + err := instrumentation.InstrumentCallResourceRequest(p.PluginID(), func() error { + if err := p.CallResource(ctx, req, sender); err != nil { return err } - - return flushStreamErr + return nil }) -} -func handleCallResourceError(err error, reqCtx *models.ReqContext) { - if errors.Is(err, backendplugin.ErrPluginUnavailable) { - reqCtx.JsonApiErr(503, "Plugin unavailable", err) - return + if err != nil { + return err } - if errors.Is(err, backendplugin.ErrMethodNotImplemented) { - reqCtx.JsonApiErr(404, "Not found", err) - return - } - - reqCtx.JsonApiErr(500, "Failed to call resource", err) -} - -func flushStream(plugin backendplugin.Plugin, stream callResourceClientResponseStream, w http.ResponseWriter) error { - processedStreams := 0 - - for { - resp, err := stream.Recv() - if errors.Is(err, io.EOF) { - if processedStreams == 0 { - return errors.New("received empty resource response") - } - return nil - } - if err != nil { - if processedStreams == 0 { - return errutil.Wrap("failed to receive response from resource call", err) - } - - plugin.Logger().Error("Failed to receive response from resource call", "err", err) - return stream.Close() - } - - // Expected that headers and status are only part of first stream - if processedStreams == 0 && resp.Headers != nil { - // Make sure a content type always is returned in response - if _, exists := resp.Headers["Content-Type"]; !exists { - resp.Headers["Content-Type"] = []string{"application/json"} - } - - for k, values := range resp.Headers { - // Due to security reasons we don't want to forward - // cookies from a backend plugin to clients/browsers. - if k == "Set-Cookie" { - continue - } - - for _, v := range values { - // TODO: Figure out if we should use Set here instead - // nolint:gocritic - w.Header().Add(k, v) - } - } - w.WriteHeader(resp.Status) - } - - if _, err := w.Write(resp.Body); err != nil { - plugin.Logger().Error("Failed to write resource response", "err", err) - } - - if flusher, ok := w.(http.Flusher); ok { - flusher.Flush() - } - processedStreams++ - } + return nil } func (m *PluginManager) CollectMetrics(ctx context.Context, pluginID string) (*backend.CollectMetricsResult, error) { @@ -694,61 +552,3 @@ func (m *PluginManager) pluginSettingPaths() []string { return pluginSettingDirs } - -// callResourceClientResponseStream is used for receiving resource call responses. -type callResourceClientResponseStream interface { - Recv() (*backend.CallResourceResponse, error) - Close() error -} - -type keepCookiesJSONModel struct { - KeepCookies []string `json:"keepCookies"` -} - -type callResourceResponseStream struct { - ctx context.Context - stream chan *backend.CallResourceResponse - closed bool -} - -func newCallResourceResponseStream(ctx context.Context) *callResourceResponseStream { - return &callResourceResponseStream{ - ctx: ctx, - stream: make(chan *backend.CallResourceResponse), - } -} - -func (s *callResourceResponseStream) Send(res *backend.CallResourceResponse) error { - if s.closed { - return errors.New("cannot send to a closed stream") - } - - select { - case <-s.ctx.Done(): - return errors.New("cancelled") - case s.stream <- res: - return nil - } -} - -func (s *callResourceResponseStream) Recv() (*backend.CallResourceResponse, error) { - select { - case <-s.ctx.Done(): - return nil, s.ctx.Err() - case res, ok := <-s.stream: - if !ok { - return nil, io.EOF - } - return res, nil - } -} - -func (s *callResourceResponseStream) Close() error { - if s.closed { - return errors.New("cannot close a closed stream") - } - - close(s.stream) - s.closed = true - return nil -} diff --git a/pkg/plugins/manager/manager_test.go b/pkg/plugins/manager/manager_test.go index f996bc45a9f..51a9c9d35ee 100644 --- a/pkg/plugins/manager/manager_test.go +++ b/pkg/plugins/manager/manager_test.go @@ -1,10 +1,8 @@ package manager import ( - "bytes" "context" "net/http" - "net/http/httptest" "os" "path/filepath" "sync" @@ -399,14 +397,6 @@ func TestPluginManager_lifecycle_managed(t *testing.T) { _, err = ctx.manager.CheckHealth(context.Background(), &backend.CheckHealthRequest{PluginContext: backend.PluginContext{PluginID: testPluginID}}) require.Equal(t, backendplugin.ErrMethodNotImplemented, err) }) - - t.Run("Call resource should return method not implemented error", func(t *testing.T) { - req, err := http.NewRequest(http.MethodGet, "/test", bytes.NewReader([]byte{})) - require.NoError(t, err) - w := httptest.NewRecorder() - err = ctx.manager.callResourceInternal(w, req, backend.PluginContext{PluginID: testPluginID}) - require.Equal(t, backendplugin.ErrMethodNotImplemented, err) - }) }) t.Run("Implemented handlers", func(t *testing.T) { @@ -451,12 +441,11 @@ func TestPluginManager_lifecycle_managed(t *testing.T) { }) } - req, err := http.NewRequest(http.MethodGet, "/test", bytes.NewReader([]byte{})) + sender := &fakeSender{} + err = ctx.manager.CallResource(context.Background(), &backend.CallResourceRequest{PluginContext: backend.PluginContext{PluginID: testPluginID}}, sender) require.NoError(t, err) - w := httptest.NewRecorder() - err = ctx.manager.callResourceInternal(w, req, backend.PluginContext{PluginID: testPluginID}) - require.NoError(t, err) - require.Equal(t, http.StatusOK, w.Code) + require.NotNil(t, sender.resp) + require.Equal(t, http.StatusOK, sender.resp.Status) }) }) }) @@ -604,17 +593,17 @@ type fakePluginInstaller struct { uninstallCount int } -func (f *fakePluginInstaller) Install(ctx context.Context, pluginID, version, pluginsDir, pluginZipURL, pluginRepoURL string) error { +func (f *fakePluginInstaller) Install(_ context.Context, _, _, _, _, _ string) error { f.installCount++ return nil } -func (f *fakePluginInstaller) Uninstall(ctx context.Context, pluginPath string) error { +func (f *fakePluginInstaller) Uninstall(_ context.Context, _ string) error { f.uninstallCount++ return nil } -func (f *fakePluginInstaller) GetUpdateInfo(ctx context.Context, pluginID, version, pluginRepoURL string) (plugins.UpdateInfo, error) { +func (f *fakePluginInstaller) GetUpdateInfo(_ context.Context, _, _, _ string) (plugins.UpdateInfo, error) { return plugins.UpdateInfo{}, nil } @@ -627,13 +616,13 @@ type fakeLoader struct { plugins.Loader } -func (l *fakeLoader) Load(paths []string, ignore map[string]struct{}) ([]*plugins.Plugin, error) { +func (l *fakeLoader) Load(paths []string, _ map[string]struct{}) ([]*plugins.Plugin, error) { l.loadedPaths = append(l.loadedPaths, paths...) return l.mockedLoadedPlugins, nil } -func (l *fakeLoader) LoadWithFactory(path string, factory backendplugin.PluginFactoryFunc) (*plugins.Plugin, error) { +func (l *fakeLoader) LoadWithFactory(path string, _ backendplugin.PluginFactoryFunc) (*plugins.Plugin, error) { l.loadedPaths = append(l.loadedPaths, path) return l.mockedFactoryLoadedPlugin, nil @@ -656,102 +645,102 @@ type fakePluginClient struct { backendplugin.Plugin } -func (tp *fakePluginClient) PluginID() string { - return tp.pluginID +func (pc *fakePluginClient) PluginID() string { + return pc.pluginID } -func (tp *fakePluginClient) Logger() log.Logger { - return tp.logger +func (pc *fakePluginClient) Logger() log.Logger { + return pc.logger } -func (tp *fakePluginClient) Start(ctx context.Context) error { - tp.mutex.Lock() - defer tp.mutex.Unlock() - tp.exited = false - tp.startCount++ +func (pc *fakePluginClient) Start(_ context.Context) error { + pc.mutex.Lock() + defer pc.mutex.Unlock() + pc.exited = false + pc.startCount++ return nil } -func (tp *fakePluginClient) Stop(ctx context.Context) error { - tp.mutex.Lock() - defer tp.mutex.Unlock() - tp.stopCount++ - tp.exited = true +func (pc *fakePluginClient) Stop(_ context.Context) error { + pc.mutex.Lock() + defer pc.mutex.Unlock() + pc.stopCount++ + pc.exited = true return nil } -func (tp *fakePluginClient) IsManaged() bool { - return tp.managed +func (pc *fakePluginClient) IsManaged() bool { + return pc.managed } -func (tp *fakePluginClient) Exited() bool { - tp.mutex.RLock() - defer tp.mutex.RUnlock() - return tp.exited +func (pc *fakePluginClient) Exited() bool { + pc.mutex.RLock() + defer pc.mutex.RUnlock() + return pc.exited } -func (tp *fakePluginClient) Decommission() error { - tp.mutex.Lock() - defer tp.mutex.Unlock() +func (pc *fakePluginClient) Decommission() error { + pc.mutex.Lock() + defer pc.mutex.Unlock() - tp.decommissioned = true + pc.decommissioned = true return nil } -func (tp *fakePluginClient) IsDecommissioned() bool { - tp.mutex.RLock() - defer tp.mutex.RUnlock() - return tp.decommissioned +func (pc *fakePluginClient) IsDecommissioned() bool { + pc.mutex.RLock() + defer pc.mutex.RUnlock() + return pc.decommissioned } -func (tp *fakePluginClient) kill() { - tp.mutex.Lock() - defer tp.mutex.Unlock() - tp.exited = true +func (pc *fakePluginClient) kill() { + pc.mutex.Lock() + defer pc.mutex.Unlock() + pc.exited = true } -func (tp *fakePluginClient) CollectMetrics(ctx context.Context) (*backend.CollectMetricsResult, error) { - if tp.CollectMetricsHandlerFunc != nil { - return tp.CollectMetricsHandlerFunc(ctx) +func (pc *fakePluginClient) CollectMetrics(ctx context.Context) (*backend.CollectMetricsResult, error) { + if pc.CollectMetricsHandlerFunc != nil { + return pc.CollectMetricsHandlerFunc(ctx) } return nil, backendplugin.ErrMethodNotImplemented } -func (tp *fakePluginClient) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { - if tp.CheckHealthHandlerFunc != nil { - return tp.CheckHealthHandlerFunc(ctx, req) +func (pc *fakePluginClient) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { + if pc.CheckHealthHandlerFunc != nil { + return pc.CheckHealthHandlerFunc(ctx, req) } return nil, backendplugin.ErrMethodNotImplemented } -func (tp *fakePluginClient) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { - if tp.QueryDataHandlerFunc != nil { - return tp.QueryDataHandlerFunc(ctx, req) +func (pc *fakePluginClient) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { + if pc.QueryDataHandlerFunc != nil { + return pc.QueryDataHandlerFunc(ctx, req) } return nil, backendplugin.ErrMethodNotImplemented } -func (tp *fakePluginClient) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { - if tp.CallResourceHandlerFunc != nil { - return tp.CallResourceHandlerFunc(ctx, req, sender) +func (pc *fakePluginClient) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { + if pc.CallResourceHandlerFunc != nil { + return pc.CallResourceHandlerFunc(ctx, req, sender) } return backendplugin.ErrMethodNotImplemented } -func (tp *fakePluginClient) SubscribeStream(ctx context.Context, request *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) { +func (pc *fakePluginClient) SubscribeStream(_ context.Context, _ *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) { return nil, backendplugin.ErrMethodNotImplemented } -func (tp *fakePluginClient) PublishStream(ctx context.Context, request *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) { +func (pc *fakePluginClient) PublishStream(_ context.Context, _ *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) { return nil, backendplugin.ErrMethodNotImplemented } -func (tp *fakePluginClient) RunStream(ctx context.Context, request *backend.RunStreamRequest, sender *backend.StreamSender) error { +func (pc *fakePluginClient) RunStream(_ context.Context, _ *backend.RunStreamRequest, _ *backend.StreamSender) error { return backendplugin.ErrMethodNotImplemented } @@ -765,10 +754,20 @@ type fakeLogger struct { log.Logger } -func (tl fakeLogger) Info(msg string, ctx ...interface{}) { +func (l fakeLogger) Info(_ string, _ ...interface{}) { } -func (tl fakeLogger) Debug(msg string, ctx ...interface{}) { +func (l fakeLogger) Debug(_ string, _ ...interface{}) { } + +type fakeSender struct { + resp *backend.CallResourceResponse +} + +func (s *fakeSender) Send(crr *backend.CallResourceResponse) error { + s.resp = crr + + return nil +}