diff --git a/server/channels/app/platform/metrics.go b/server/channels/app/platform/metrics.go index 499dde29a4..a131adc992 100644 --- a/server/channels/app/platform/metrics.go +++ b/server/channels/app/platform/metrics.go @@ -9,7 +9,9 @@ import ( "net" "net/http" "net/http/pprof" + "path" "runtime" + "strings" "sync" "text/template" "time" @@ -19,7 +21,9 @@ import ( "github.com/pkg/errors" "github.com/mattermost/mattermost/server/public/model" + "github.com/mattermost/mattermost/server/public/plugin" "github.com/mattermost/mattermost/server/public/shared/mlog" + "github.com/mattermost/mattermost/server/v8/channels/utils" "github.com/mattermost/mattermost/server/v8/einterfaces" ) @@ -35,6 +39,8 @@ type platformMetrics struct { cfgFn func() *model.Config listenAddr string + + getPluginsEnv func() *plugin.Environment } // resetMetrics resets the metrics server. Clears the metrics if the metrics are disabled by the config. @@ -56,6 +62,12 @@ func (ps *PlatformService) resetMetrics() error { cfgFn: ps.Config, metricsImpl: ps.metricsIFace, logger: ps.logger, + getPluginsEnv: func() *plugin.Environment { + if ps.pluginEnv == nil { + return nil + } + return ps.pluginEnv.GetPluginsEnvironment() + }, } if err := ps.metrics.initMetricsRouter(); err != nil { @@ -166,9 +178,56 @@ func (pm *platformMetrics) initMetricsRouter() error { pm.router.Handle("/debug/pprof/threadcreate", pprof.Handler("threadcreate")) pm.router.Handle("/debug/pprof/block", pprof.Handler("block")) + // Plugins metrics route + pluginsMetricsRoute := pm.router.PathPrefix("/plugins/{plugin_id:[A-Za-z0-9\\_\\-\\.]+}/metrics").Subrouter() + pluginsMetricsRoute.HandleFunc("", pm.servePluginMetricsRequest) + pluginsMetricsRoute.HandleFunc("/{anything:.*}", pm.servePluginMetricsRequest) + return nil } +func (pm *platformMetrics) servePluginMetricsRequest(w http.ResponseWriter, r *http.Request) { + pluginID := mux.Vars(r)["plugin_id"] + + pluginsEnvironment := pm.getPluginsEnv() + if pluginsEnvironment == nil { + appErr := model.NewAppError("ServePluginMetricsRequest", "app.plugin.disabled.app_error", + nil, "Enable plugins to serve plugin metric requests", http.StatusNotImplemented) + mlog.Error(appErr.Error()) + w.WriteHeader(appErr.StatusCode) + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(appErr.ToJSON())) + return + } + + hooks, err := pluginsEnvironment.HooksForPlugin(pluginID) + if err != nil { + mlog.Debug("Access to route for non-existent plugin", + mlog.String("missing_plugin_id", pluginID), + mlog.String("url", r.URL.String()), + mlog.Err(err)) + http.NotFound(w, r) + return + } + + subpath, err := utils.GetSubpathFromConfig(pm.cfgFn()) + if err != nil { + appErr := model.NewAppError("ServePluginMetricsRequest", "app.plugin.subpath_parse.app_error", + nil, "Failed to parse SiteURL subpath", http.StatusInternalServerError).Wrap(err) + mlog.Error(appErr.Error()) + w.WriteHeader(appErr.StatusCode) + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(appErr.ToJSON())) + return + } + + r.URL.Path = strings.TrimPrefix(r.URL.Path, path.Join(subpath, "plugins", pluginID, "metrics")) + + // Passing an empty plugin context for the time being. To be decided whether we + // should support forms of authentication in the future. + hooks.ServeMetrics(&plugin.Context{}, w, r) +} + func (ps *PlatformService) HandleMetrics(route string, h http.Handler) { if ps.metrics != nil { ps.metrics.router.Handle(route, h) diff --git a/server/channels/app/plugin_api_test.go b/server/channels/app/plugin_api_test.go index 372d49336b..5b5684208f 100644 --- a/server/channels/app/plugin_api_test.go +++ b/server/channels/app/plugin_api_test.go @@ -2305,3 +2305,63 @@ func TestSendPushNotification(t *testing.T) { } assert.Equal(t, 6, numMessages) } + +func TestPluginServeMetrics(t *testing.T) { + th := Setup(t, StartMetrics) + defer th.TearDown() + + var prevEnable *bool + var prevAddress *string + th.App.UpdateConfig(func(cfg *model.Config) { + prevEnable = cfg.MetricsSettings.Enable + prevAddress = cfg.MetricsSettings.ListenAddress + cfg.MetricsSettings.Enable = model.NewBool(true) + cfg.MetricsSettings.ListenAddress = model.NewString(":30067") + }) + defer th.App.UpdateConfig(func(cfg *model.Config) { + cfg.MetricsSettings.Enable = prevEnable + cfg.MetricsSettings.ListenAddress = prevAddress + }) + + testFolder, found := fileutils.FindDir("channels/app/plugin_api_tests") + require.True(t, found, "Cannot find tests folder") + fullPath := path.Join(testFolder, "manual.test_serve_metrics_plugin", "main.go") + + pluginCode, err := os.ReadFile(fullPath) + require.NoError(t, err) + require.NotEmpty(t, pluginCode) + + tearDown, ids, errors := SetAppEnvironmentWithPlugins(t, []string{string(pluginCode)}, th.App, th.NewPluginAPI) + defer tearDown() + require.NoError(t, errors[0]) + require.Len(t, ids, 1) + + pluginID := ids[0] + require.NotEmpty(t, pluginID) + + reqURL := fmt.Sprintf("http://localhost%s/plugins/%s/metrics", *th.App.Config().MetricsSettings.ListenAddress, pluginID) + req, err := http.NewRequest("GET", reqURL, nil) + require.NoError(t, err) + + client := &http.Client{} + resp, err := client.Do(req) + require.NoError(t, err) + + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, "METRICS", string(body)) + + reqURL = fmt.Sprintf("http://localhost%s/plugins/%s/metrics/subpath", *th.App.Config().MetricsSettings.ListenAddress, pluginID) + req, err = http.NewRequest("GET", reqURL, nil) + require.NoError(t, err) + + resp, err = client.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + body, err = io.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, "METRICS SUBPATH", string(body)) +} diff --git a/server/channels/app/plugin_api_tests/manual.test_serve_metrics_plugin/main.go b/server/channels/app/plugin_api_tests/manual.test_serve_metrics_plugin/main.go new file mode 100644 index 0000000000..f2afa8f8cf --- /dev/null +++ b/server/channels/app/plugin_api_tests/manual.test_serve_metrics_plugin/main.go @@ -0,0 +1,27 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package main + +import ( + "net/http" + + "github.com/mattermost/mattermost/server/public/plugin" +) + +type Plugin struct { + plugin.MattermostPlugin +} + +func (p *Plugin) ServeMetrics(_ *plugin.Context, w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/subpath" { + w.Write([]byte("METRICS SUBPATH")) + return + } + + w.Write([]byte("METRICS")) +} + +func main() { + plugin.ClientMain(&Plugin{}) +} diff --git a/server/i18n/en.json b/server/i18n/en.json index d9cab16802..7b1905c15a 100644 --- a/server/i18n/en.json +++ b/server/i18n/en.json @@ -6306,6 +6306,10 @@ "id": "app.plugin.store_signature.app_error", "translation": "Unable to store the plugin signature to the configured file store." }, + { + "id": "app.plugin.subpath_parse.app_error", + "translation": "Failed to parse SiteURL subpath" + }, { "id": "app.plugin.sync.list_filestore.app_error", "translation": "Error reading files from the plugins folder in the file store." diff --git a/server/public/plugin/client_rpc.go b/server/public/plugin/client_rpc.go index 0aef4c253a..9ed932c601 100644 --- a/server/public/plugin/client_rpc.go +++ b/server/public/plugin/client_rpc.go @@ -953,3 +953,107 @@ func (s *apiRPCServer) UploadData(args *Z_UploadDataArgs, returns *Z_UploadDataR returns.A, returns.B = hook.UploadData(args.A, pluginReader) return nil } + +func init() { + hookNameToId["ServeMetrics"] = ServeMetricsID +} + +type Z_ServeMetricsArgs struct { + ResponseWriterStream uint32 + Request *http.Request + Context *Context + RequestBodyStream uint32 +} + +func (g *hooksRPCClient) ServeMetrics(c *Context, w http.ResponseWriter, r *http.Request) { + if !g.implemented[ServeMetricsID] { + http.NotFound(w, r) + return + } + + serveMetricsStreamId := g.muxBroker.NextId() + go func() { + connection, err := g.muxBroker.Accept(serveMetricsStreamId) + if err != nil { + g.log.Error("Plugin failed to ServeMetrics, muxBroker couldn't accept connection", mlog.Uint32("serve_http_stream_id", serveMetricsStreamId), mlog.Err(err)) + return + } + defer connection.Close() + + rpcServer := rpc.NewServer() + if err := rpcServer.RegisterName("Plugin", &httpResponseWriterRPCServer{w: w, log: g.log}); err != nil { + g.log.Error("Plugin failed to ServeMetrics, couldn't register RPC name", mlog.Err(err)) + return + } + rpcServer.ServeConn(connection) + }() + + requestBodyStreamId := uint32(0) + if r.Body != nil { + requestBodyStreamId = g.muxBroker.NextId() + go func() { + bodyConnection, err := g.muxBroker.Accept(requestBodyStreamId) + if err != nil { + g.log.Error("Plugin failed to ServeMetrics, muxBroker couldn't Accept request body connection", mlog.Err(err)) + return + } + defer bodyConnection.Close() + serveIOReader(r.Body, bodyConnection) + }() + } + + forwardedRequest := &http.Request{ + Method: r.Method, + URL: r.URL, + Proto: r.Proto, + ProtoMajor: r.ProtoMajor, + ProtoMinor: r.ProtoMinor, + Header: r.Header, + Host: r.Host, + RemoteAddr: r.RemoteAddr, + RequestURI: r.RequestURI, + } + + if err := g.client.Call("Plugin.ServeMetrics", Z_ServeMetricsArgs{ + Context: c, + ResponseWriterStream: serveMetricsStreamId, + Request: forwardedRequest, + RequestBodyStream: requestBodyStreamId, + }, nil); err != nil { + g.log.Error("Plugin failed to ServeMetrics, RPC call failed", mlog.Err(err)) + http.Error(w, "500 internal server error", http.StatusInternalServerError) + } +} + +func (s *hooksRPCServer) ServeMetrics(args *Z_ServeMetricsArgs, returns *struct{}) error { + connection, err := s.muxBroker.Dial(args.ResponseWriterStream) + if err != nil { + fmt.Fprintf(os.Stderr, "[ERROR] Can't connect to remote response writer stream, error: %v", err.Error()) + return err + } + w := connectHTTPResponseWriter(connection) + defer w.Close() + + r := args.Request + if args.RequestBodyStream != 0 { + connection, err := s.muxBroker.Dial(args.RequestBodyStream) + if err != nil { + fmt.Fprintf(os.Stderr, "[ERROR] Can't connect to remote request body stream, error: %v", err.Error()) + return err + } + r.Body = connectIOReader(connection) + } else { + r.Body = io.NopCloser(&bytes.Buffer{}) + } + defer r.Body.Close() + + if hook, ok := s.impl.(interface { + ServeMetrics(c *Context, w http.ResponseWriter, r *http.Request) + }); ok { + hook.ServeMetrics(args.Context, w, r) + } else { + http.NotFound(w, r) + } + + return nil +} diff --git a/server/public/plugin/hooks.go b/server/public/plugin/hooks.go index cb210a5762..fd93e90e7f 100644 --- a/server/public/plugin/hooks.go +++ b/server/public/plugin/hooks.go @@ -54,6 +54,7 @@ const ( UserHasBeenDeactivatedID = 36 MessageHasBeenDeletedID = 37 MessagesWillBeConsumedID = 38 + ServeMetricsID = 39 TotalHooksID = iota ) @@ -322,4 +323,11 @@ type Hooks interface { // // Minimum server version: 9.1 UserHasBeenDeactivated(c *Context, user *model.User) + + // ServeMetrics allows plugins to expose their own metrics endpoint through + // the server's metrics HTTP listener (e.g. "localhost:8067"). + // Requests destined to the /plugins/{id}/metrics path will be routed to the plugin. + // + // Minimum server version: 9.2 + ServeMetrics(c *Context, w http.ResponseWriter, r *http.Request) } diff --git a/server/public/plugin/hooks_timer_layer_generated.go b/server/public/plugin/hooks_timer_layer_generated.go index c9782089bc..d059b1ac11 100644 --- a/server/public/plugin/hooks_timer_layer_generated.go +++ b/server/public/plugin/hooks_timer_layer_generated.go @@ -244,3 +244,9 @@ func (hooks *hooksTimerLayer) UserHasBeenDeactivated(c *Context, user *model.Use hooks.hooksImpl.UserHasBeenDeactivated(c, user) hooks.recordTime(startTime, "UserHasBeenDeactivated", true) } + +func (hooks *hooksTimerLayer) ServeMetrics(c *Context, w http.ResponseWriter, r *http.Request) { + startTime := timePkg.Now() + hooks.hooksImpl.ServeMetrics(c, w, r) + hooks.recordTime(startTime, "ServeMetrics", true) +} diff --git a/server/public/plugin/interface_generator/main.go b/server/public/plugin/interface_generator/main.go index 15361cd960..777677c660 100644 --- a/server/public/plugin/interface_generator/main.go +++ b/server/public/plugin/interface_generator/main.go @@ -37,6 +37,7 @@ var excludedPluginHooks = []string{ "PluginHTTP", "ServeHTTP", "UploadData", + "ServeMetrics", } var excludedProductHooks = []string{ diff --git a/server/public/plugin/plugintest/hooks.go b/server/public/plugin/plugintest/hooks.go index c8bb0230b7..b9b84d22b7 100644 --- a/server/public/plugin/plugintest/hooks.go +++ b/server/public/plugin/plugintest/hooks.go @@ -360,6 +360,11 @@ func (_m *Hooks) ServeHTTP(c *plugin.Context, w http.ResponseWriter, r *http.Req _m.Called(c, w, r) } +// ServeMetrics provides a mock function with given fields: c, w, r +func (_m *Hooks) ServeMetrics(c *plugin.Context, w http.ResponseWriter, r *http.Request) { + _m.Called(c, w, r) +} + // UserHasBeenCreated provides a mock function with given fields: c, user func (_m *Hooks) UserHasBeenCreated(c *plugin.Context, user *model.User) { _m.Called(c, user) diff --git a/server/public/plugin/product_hooks_generated.go b/server/public/plugin/product_hooks_generated.go index 9bbd85f9be..6d7dc11a3a 100644 --- a/server/public/plugin/product_hooks_generated.go +++ b/server/public/plugin/product_hooks_generated.go @@ -9,6 +9,7 @@ package plugin import ( "errors" "io" + "net/http" "reflect" "github.com/mattermost/mattermost/server/public/model" @@ -134,6 +135,10 @@ type UserHasBeenDeactivatedIFace interface { UserHasBeenDeactivated(c *Context, user *model.User) } +type ServeMetricsIFace interface { + ServeMetrics(c *Context, w http.ResponseWriter, r *http.Request) +} + type HooksAdapter struct { implemented map[int]struct{} productHooks any @@ -417,6 +422,15 @@ func NewAdapter(productHooks any) (*HooksAdapter, error) { return nil, errors.New("hook has UserHasBeenDeactivated method but does not implement plugin.UserHasBeenDeactivated interface") } + // Assessing the type of the productHooks if it individually implements ServeMetrics interface. + tt = reflect.TypeOf((*ServeMetricsIFace)(nil)).Elem() + + if ft.Implements(tt) { + a.implemented[ServeMetricsID] = struct{}{} + } else if _, ok := ft.MethodByName("ServeMetrics"); ok { + return nil, errors.New("hook has ServeMetrics method but does not implement plugin.ServeMetrics interface") + } + return a, nil } @@ -689,3 +703,12 @@ func (a *HooksAdapter) UserHasBeenDeactivated(c *Context, user *model.User) { a.productHooks.(UserHasBeenDeactivatedIFace).UserHasBeenDeactivated(c, user) } + +func (a *HooksAdapter) ServeMetrics(c *Context, w http.ResponseWriter, r *http.Request) { + if _, ok := a.implemented[ServeMetricsID]; !ok { + panic("product hooks must implement ServeMetrics") + } + + a.productHooks.(ServeMetricsIFace).ServeMetrics(c, w, r) + +}