Plugins: Refactor call resource API handling (#67234)

Moving call resource API stream handling within plugin management as a utility/wrapper. 

Closes #66889

Co-authored-by: Will Browne <wbrowne@users.noreply.github.com>
This commit is contained in:
Marcus Efraimsson 2023-04-28 14:02:27 +02:00 committed by GitHub
parent b5fbce50b3
commit 4cbda914bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 252 additions and 169 deletions

View File

@ -1,18 +1,16 @@
package api
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"net/textproto"
"net/url"
"sync"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/plugins/backendplugin"
"github.com/grafana/grafana/pkg/plugins/httpresponsesender"
contextmodel "github.com/grafana/grafana/pkg/services/contexthandler/model"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/util/proxyutil"
@ -113,80 +111,8 @@ func (hs *HTTPServer) makePluginResourceRequest(w http.ResponseWriter, req *http
Body: body,
}
childCtx, cancel := context.WithCancel(req.Context())
defer cancel()
stream := newCallResourceResponseStream(childCtx)
var wg sync.WaitGroup
wg.Add(1)
defer func() {
if err := stream.Close(); err != nil {
hs.log.Warn("Failed to close plugin resource stream", "err", err)
}
wg.Wait()
}()
var flushStreamErr error
go func() {
flushStreamErr = hs.flushStream(stream, w)
wg.Done()
}()
if err := hs.pluginClient.CallResource(req.Context(), crReq, stream); err != nil {
return err
}
return flushStreamErr
}
func (hs *HTTPServer) flushStream(stream callResourceClientResponseStream, w http.ResponseWriter) error {
processedStreams := 0
for {
resp, err := stream.Recv()
if errors.Is(err, io.EOF) {
if processedStreams == 0 {
return errors.New("received empty resource response")
}
return nil
}
if err != nil {
if processedStreams == 0 {
return fmt.Errorf("%v: %w", "failed to receive response from resource call", err)
}
hs.log.Error("Failed to receive response from resource call", "err", err)
return stream.Close()
}
// Expected that headers and status are only part of first stream
if processedStreams == 0 {
for k, values := range resp.Headers {
// Convert the keys to the canonical format of MIME headers.
// This ensures that we can safely add/overwrite headers
// even if the plugin returns them in non-canonical format
// and be sure they won't be present multiple times in the response.
k = textproto.CanonicalMIMEHeaderKey(k)
for _, v := range values {
// TODO: Figure out if we should use Set here instead
// nolint:gocritic
w.Header().Add(k, v)
}
}
w.WriteHeader(resp.Status)
}
if _, err := w.Write(resp.Body); err != nil {
hs.log.Error("Failed to write resource response", "err", err)
}
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
processedStreams++
}
httpSender := httpresponsesender.New(w)
return hs.pluginClient.CallResource(req.Context(), crReq, httpSender)
}
func handleCallResourceError(err error, reqCtx *contextmodel.ReqContext) {
@ -202,57 +128,3 @@ func handleCallResourceError(err error, reqCtx *contextmodel.ReqContext) {
reqCtx.JsonApiErr(500, "Failed to call resource", err)
}
// callResourceClientResponseStream is used for receiving resource call responses.
type callResourceClientResponseStream interface {
Recv() (*backend.CallResourceResponse, error)
Close() error
}
type callResourceResponseStream struct {
ctx context.Context
stream chan *backend.CallResourceResponse
closed bool
}
func newCallResourceResponseStream(ctx context.Context) *callResourceResponseStream {
return &callResourceResponseStream{
ctx: ctx,
stream: make(chan *backend.CallResourceResponse),
}
}
func (s *callResourceResponseStream) Send(res *backend.CallResourceResponse) error {
if s.closed {
return errors.New("cannot send to a closed stream")
}
select {
case <-s.ctx.Done():
return errors.New("cancelled")
case s.stream <- res:
return nil
}
}
func (s *callResourceResponseStream) Recv() (*backend.CallResourceResponse, error) {
select {
case <-s.ctx.Done():
return nil, s.ctx.Err()
case res, ok := <-s.stream:
if !ok {
return nil, io.EOF
}
return res, nil
}
}
func (s *callResourceResponseStream) Close() error {
if s.closed {
return errors.New("cannot close a closed stream")
}
close(s.stream)
s.closed = true
return nil
}

View File

@ -15,6 +15,7 @@ import (
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/localcache"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/plugins/backendplugin/coreplugin"
"github.com/grafana/grafana/pkg/plugins/backendplugin/provider"
@ -79,6 +80,7 @@ func TestCallResource(t *testing.T) {
hs.QuotaService = quotatest.New(false, nil)
hs.pluginStore = ps
hs.pluginClient = pluginClient.ProvideService(reg, pCfg)
hs.log = log.New("test")
})
t.Run("Test successful response is received for valid request", func(t *testing.T) {
@ -117,6 +119,7 @@ func TestCallResource(t *testing.T) {
hs.QuotaService = quotatest.New(false, nil)
hs.pluginStore = ps
hs.pluginClient = pc
hs.log = log.New("test")
})
t.Run("Test error is properly propagated to API response", func(t *testing.T) {

View File

@ -0,0 +1,65 @@
package httpresponsesender
import (
"errors"
"fmt"
"net/http"
"net/textproto"
"github.com/grafana/grafana-plugin-sdk-go/backend"
)
// HTTPResponseSender implements backend.CallResourceResponseSender and
// writes an HTTP response using an http.ResponseWriter given received
// backend.CallResourceResponse(s).
type HTTPResponseSender struct {
processedStreams int
w http.ResponseWriter
}
// New creates a new HTTPResponseSender.
func New(w http.ResponseWriter) *HTTPResponseSender {
if w == nil {
panic("response writer cannot be nil")
}
return &HTTPResponseSender{
w: w,
}
}
func (s *HTTPResponseSender) Send(resp *backend.CallResourceResponse) error {
if resp == nil {
return errors.New("resp cannot be nil")
}
// Expected that headers and status are only part of first stream
if s.processedStreams == 0 {
for k, values := range resp.Headers {
// Convert the keys to the canonical format of MIME headers.
// This ensures that we can safely add/overwrite headers
// even if the plugin returns them in non-canonical format
// and be sure they won't be present multiple times in the response.
k = textproto.CanonicalMIMEHeaderKey(k)
for _, v := range values {
s.w.Header().Add(k, v)
}
}
s.w.WriteHeader(resp.Status)
}
if _, err := s.w.Write(resp.Body); err != nil {
return fmt.Errorf("failed to write resource response: %v", err)
}
if flusher, ok := s.w.(http.Flusher); ok {
flusher.Flush()
}
s.processedStreams++
return nil
}
var _ backend.CallResourceResponseSender = &HTTPResponseSender{}

View File

@ -0,0 +1,45 @@
package httpresponsesender
import (
"io"
"net/http"
"net/http/httptest"
"testing"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/stretchr/testify/require"
)
func TestHTTPResponseSender(t *testing.T) {
w := httptest.NewRecorder()
sender := New(w)
require.NotNil(t, sender)
headers := http.Header{}
headers.Add("X-Custom", "custom")
err := sender.Send(&backend.CallResourceResponse{
Status: http.StatusOK,
Headers: headers,
Body: []byte("Hello world"),
})
require.NoError(t, err)
headers2 := http.Header{}
headers2.Add("X-Custom-Two", "custom two")
err = sender.Send(&backend.CallResourceResponse{
Status: http.StatusNotFound,
Headers: headers2,
Body: []byte("Hello world again"),
})
require.NoError(t, err)
resp := w.Result()
require.NotNil(t, resp)
require.Equal(t, http.StatusOK, resp.StatusCode)
require.Equal(t, "custom", resp.Header.Get("X-Custom"))
require.Empty(t, resp.Header.Get("X-Custom-Two"))
bytes, err := io.ReadAll(resp.Body)
require.NoError(t, resp.Body.Close())
require.NoError(t, err)
require.Equal(t, "Hello worldHello world again", string(bytes))
}

View File

@ -34,6 +34,10 @@ func (m *ResourceResponseMiddleware) CallResource(ctx context.Context, req *back
processedStreams := 0
wrappedSender := callResourceResponseSenderFunc(func(res *backend.CallResourceResponse) error {
if processedStreams == 0 {
if res.Headers == nil {
res.Headers = map[string][]string{}
}
proxyutil.SetProxyResponseHeaders(res.Headers)
}

View File

@ -66,10 +66,15 @@ func TestIntegrationBackendPlugins(t *testing.T) {
verify(pReq)
})
tsCtx.runCallResourceTest(t, func(pReq *backend.CallResourceRequest) {
tsCtx.runCallResourceTest(t, func(pReq *backend.CallResourceRequest, resp *http.Response) {
verify(pReq)
require.Equal(t, "custom", pReq.GetHTTPHeader("X-Custom"))
require.Equal(t, "custom", tsCtx.outgoingRequest.Header.Get("X-Custom"))
require.Equal(t, "should not be deleted", resp.Header.Get("X-Custom"))
require.Equal(t, http.StatusOK, resp.StatusCode)
// default content type set if not provided
require.Equal(t, "application/json", resp.Header.Get("Content-Type"))
})
verifyQueryData := func(pReq *backend.QueryDataRequest) {
@ -119,8 +124,14 @@ func TestIntegrationBackendPlugins(t *testing.T) {
verify(pReq)
})
tsCtx.runCallResourceTest(t, func(pReq *backend.CallResourceRequest) {
tsCtx.runCallResourceTest(t, func(pReq *backend.CallResourceRequest, resp *http.Response) {
verify(pReq)
require.Equal(t, "should not be deleted", resp.Header.Get("X-Custom"))
require.Equal(t, http.StatusOK, resp.StatusCode)
// default content type set if not provided
require.Equal(t, "application/json", resp.Header.Get("Content-Type"))
})
verifyQueryData := func(pReq *backend.QueryDataRequest) {
@ -161,8 +172,14 @@ func TestIntegrationBackendPlugins(t *testing.T) {
verify(pReq)
})
tsCtx.runCallResourceTest(t, func(pReq *backend.CallResourceRequest) {
tsCtx.runCallResourceTest(t, func(pReq *backend.CallResourceRequest, resp *http.Response) {
verify(pReq)
require.Equal(t, "should not be deleted", resp.Header.Get("X-Custom"))
require.Equal(t, http.StatusOK, resp.StatusCode)
// default content type set if not provided
require.Equal(t, "application/json", resp.Header.Get("Content-Type"))
})
verifyQueryData := func(pReq *backend.QueryDataRequest) {
@ -177,24 +194,94 @@ func TestIntegrationBackendPlugins(t *testing.T) {
})
})
})
newTestScenario(t, "Datasource with resource returning non-default content-type should not be kept",
options(
withCallResourceResponse(func(sender backend.CallResourceResponseSender) error {
return sender.Send(&backend.CallResourceResponse{
Status: http.StatusOK,
Headers: map[string][]string{
"Content-Type": {"text/plain"},
"Content-Length": {"5"},
},
Body: []byte("hello"),
})
}),
),
func(t *testing.T, tsCtx *testScenarioContext) {
tsCtx.runCallResourceTest(t, func(pReq *backend.CallResourceRequest, resp *http.Response) {
require.Equal(t, "text/plain", resp.Header.Get("Content-Type"))
require.Equal(t, http.StatusOK, resp.StatusCode)
require.Equal(t, int64(5), resp.ContentLength)
require.Empty(t, resp.TransferEncoding)
})
})
newTestScenario(t, "Datasource with resource returning 204 (no content) status should not set content-type header",
options(
withCallResourceResponse(func(sender backend.CallResourceResponseSender) error {
return sender.Send(&backend.CallResourceResponse{
Status: http.StatusNoContent,
})
}),
),
func(t *testing.T, tsCtx *testScenarioContext) {
tsCtx.runCallResourceTest(t, func(pReq *backend.CallResourceRequest, resp *http.Response) {
require.Empty(t, resp.Header.Get("Content-Type"))
require.Equal(t, http.StatusNoContent, resp.StatusCode)
})
})
newTestScenario(t, "Datasource with resource returning streaming content should return chunked transfer encoding",
options(
withCallResourceResponse(func(sender backend.CallResourceResponseSender) error {
err := sender.Send(&backend.CallResourceResponse{
Status: http.StatusOK,
Headers: map[string][]string{
"Content-Type": {"text/plain"},
},
Body: []byte("msg 1\r\n"),
})
if err != nil {
return err
}
return sender.Send(&backend.CallResourceResponse{
Body: []byte("msg 2\r\n"),
})
}),
),
func(t *testing.T, tsCtx *testScenarioContext) {
tsCtx.runCallResourceTest(t, func(pReq *backend.CallResourceRequest, resp *http.Response) {
require.Equal(t, "text/plain", resp.Header.Get("Content-Type"))
require.Equal(t, http.StatusOK, resp.StatusCode)
require.Equal(t, []string{"chunked"}, resp.TransferEncoding)
bytes, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.Equal(t, "msg 1\r\nmsg 2\r\n", string(bytes))
})
})
}
type testScenarioContext struct {
testPluginID string
uid string
grafanaListeningAddr string
testEnv *server.TestEnv
outgoingServer *httptest.Server
outgoingRequest *http.Request
backendTestPlugin *testPlugin
rt http.RoundTripper
modifyIncomingRequest func(req *http.Request)
testPluginID string
uid string
grafanaListeningAddr string
testEnv *server.TestEnv
outgoingServer *httptest.Server
outgoingRequest *http.Request
backendTestPlugin *testPlugin
rt http.RoundTripper
modifyIncomingRequest func(req *http.Request)
modifyCallResourceResponse func(sender backend.CallResourceResponseSender) error
}
type testScenarioInput struct {
ds *datasources.AddDataSourceCommand
token *oauth2.Token
modifyIncomingRequest func(req *http.Request)
ds *datasources.AddDataSourceCommand
token *oauth2.Token
modifyIncomingRequest func(req *http.Request)
modifyCallResourceResponse func(sender backend.CallResourceResponseSender) error
}
type testScenarioOption func(*testScenarioInput)
@ -246,6 +333,12 @@ func withDsCookieForwarding(names []string) testScenarioOption {
}
}
func withCallResourceResponse(cb func(sender backend.CallResourceResponseSender) error) testScenarioOption {
return func(in *testScenarioInput) {
in.modifyCallResourceResponse = cb
}
}
func newTestScenario(t *testing.T, name string, opts []testScenarioOption, callback func(t *testing.T, ctx *testScenarioContext)) {
tsCtx := testScenarioContext{
testPluginID: "test-plugin",
@ -300,6 +393,26 @@ func newTestScenario(t *testing.T, name string, opts []testScenarioOption, callb
}
tsCtx.modifyIncomingRequest = in.modifyIncomingRequest
if in.modifyCallResourceResponse == nil {
in.modifyCallResourceResponse = func(sender backend.CallResourceResponseSender) error {
responseHeaders := map[string][]string{
"Connection": {"close, TE"},
"Te": {"foo", "bar, trailers"},
"Proxy-Connection": {"should be deleted"},
"Upgrade": {"foo"},
"Set-Cookie": {"should be deleted"},
"X-Custom": {"should not be deleted"},
}
return sender.Send(&backend.CallResourceResponse{
Status: http.StatusOK,
Headers: responseHeaders,
})
}
}
tsCtx.modifyCallResourceResponse = in.modifyCallResourceResponse
tsCtx.testEnv.OAuthTokenService.Token = in.token
_, err = testEnv.Server.HTTPServer.DataSourcesService.AddDataSource(ctx, cmd)
@ -476,7 +589,7 @@ func (tsCtx *testScenarioContext) runCheckHealthTest(t *testing.T, callback func
})
}
func (tsCtx *testScenarioContext) runCallResourceTest(t *testing.T, callback func(req *backend.CallResourceRequest)) {
func (tsCtx *testScenarioContext) runCallResourceTest(t *testing.T, callback func(req *backend.CallResourceRequest, resp *http.Response)) {
t.Run("When calling /api/datasources/uid/:uid/resources should set expected headers on outgoing CallResource and HTTP request", func(t *testing.T) {
var received *backend.CallResourceRequest
tsCtx.backendTestPlugin.CallResourceHandler = backend.CallResourceHandlerFunc(func(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
@ -508,21 +621,7 @@ func (tsCtx *testScenarioContext) runCallResourceTest(t *testing.T, callback fun
tsCtx.testEnv.Server.HTTPServer.Cfg.Logger.Error("Failed to discard body", "error", err)
}
responseHeaders := map[string][]string{
"Connection": {"close, TE"},
"Te": {"foo", "bar, trailers"},
"Proxy-Connection": {"should be deleted"},
"Upgrade": {"foo"},
"Set-Cookie": {"should be deleted"},
"X-Custom": {"should not be deleted"},
}
err = sender.Send(&backend.CallResourceResponse{
Status: http.StatusOK,
Headers: responseHeaders,
})
return err
return tsCtx.modifyCallResourceResponse(sender)
})
u := fmt.Sprintf("http://admin:admin@%s/api/datasources/uid/%s/resources", tsCtx.grafanaListeningAddr, tsCtx.uid)
@ -541,22 +640,17 @@ func (tsCtx *testScenarioContext) runCallResourceTest(t *testing.T, callback fun
require.NoError(t, err)
resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)
b, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode, string(b))
t.Cleanup(func() {
err := resp.Body.Close()
require.NoError(t, err)
})
_, err = io.ReadAll(resp.Body)
require.NoError(t, err)
require.Empty(t, resp.Header.Get("Connection"))
require.Empty(t, resp.Header.Get("Te"))
require.Empty(t, resp.Header.Get("Proxy-Connection"))
require.Empty(t, resp.Header.Get("Upgrade"))
require.Empty(t, resp.Header.Get("Set-Cookie"))
require.Equal(t, "should not be deleted", resp.Header.Get("X-Custom"))
require.Equal(t, "sandbox", resp.Header.Get("Content-Security-Policy"))
require.NotNil(t, received)
require.Empty(t, received.Headers["Connection"])
@ -569,7 +663,7 @@ func (tsCtx *testScenarioContext) runCallResourceTest(t *testing.T, callback fun
require.NotEmpty(t, tsCtx.outgoingRequest.Header.Get("Accept-Encoding"))
require.Equal(t, fmt.Sprintf("Grafana/%s", tsCtx.testEnv.SQLStore.Cfg.BuildVersion), tsCtx.outgoingRequest.Header.Get("User-Agent"))
callback(received)
callback(received, resp)
})
}