mirror of
https://github.com/mattermost/mattermost.git
synced 2025-02-25 18:55:24 -06:00
[MM-55268] Implement ServeMetrics
plugins hook (#24249)
* Implement ServeMetrics plugins hook
* Update error id
* Simplify
* Revert "Simplify"
This reverts commit c9dc5d5eac
.
* Add comment and error handler
* Wrap error
* Update translation file
---------
Co-authored-by: Mattermost Build <build@mattermost.com>
This commit is contained in:
parent
926142ca22
commit
aa3a12f183
@ -9,7 +9,9 @@ import (
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/pprof"
|
||||
"path"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"text/template"
|
||||
"time"
|
||||
@ -19,7 +21,9 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/mattermost/mattermost/server/public/model"
|
||||
"github.com/mattermost/mattermost/server/public/plugin"
|
||||
"github.com/mattermost/mattermost/server/public/shared/mlog"
|
||||
"github.com/mattermost/mattermost/server/v8/channels/utils"
|
||||
"github.com/mattermost/mattermost/server/v8/einterfaces"
|
||||
)
|
||||
|
||||
@ -35,6 +39,8 @@ type platformMetrics struct {
|
||||
|
||||
cfgFn func() *model.Config
|
||||
listenAddr string
|
||||
|
||||
getPluginsEnv func() *plugin.Environment
|
||||
}
|
||||
|
||||
// resetMetrics resets the metrics server. Clears the metrics if the metrics are disabled by the config.
|
||||
@ -56,6 +62,12 @@ func (ps *PlatformService) resetMetrics() error {
|
||||
cfgFn: ps.Config,
|
||||
metricsImpl: ps.metricsIFace,
|
||||
logger: ps.logger,
|
||||
getPluginsEnv: func() *plugin.Environment {
|
||||
if ps.pluginEnv == nil {
|
||||
return nil
|
||||
}
|
||||
return ps.pluginEnv.GetPluginsEnvironment()
|
||||
},
|
||||
}
|
||||
|
||||
if err := ps.metrics.initMetricsRouter(); err != nil {
|
||||
@ -166,9 +178,56 @@ func (pm *platformMetrics) initMetricsRouter() error {
|
||||
pm.router.Handle("/debug/pprof/threadcreate", pprof.Handler("threadcreate"))
|
||||
pm.router.Handle("/debug/pprof/block", pprof.Handler("block"))
|
||||
|
||||
// Plugins metrics route
|
||||
pluginsMetricsRoute := pm.router.PathPrefix("/plugins/{plugin_id:[A-Za-z0-9\\_\\-\\.]+}/metrics").Subrouter()
|
||||
pluginsMetricsRoute.HandleFunc("", pm.servePluginMetricsRequest)
|
||||
pluginsMetricsRoute.HandleFunc("/{anything:.*}", pm.servePluginMetricsRequest)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pm *platformMetrics) servePluginMetricsRequest(w http.ResponseWriter, r *http.Request) {
|
||||
pluginID := mux.Vars(r)["plugin_id"]
|
||||
|
||||
pluginsEnvironment := pm.getPluginsEnv()
|
||||
if pluginsEnvironment == nil {
|
||||
appErr := model.NewAppError("ServePluginMetricsRequest", "app.plugin.disabled.app_error",
|
||||
nil, "Enable plugins to serve plugin metric requests", http.StatusNotImplemented)
|
||||
mlog.Error(appErr.Error())
|
||||
w.WriteHeader(appErr.StatusCode)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write([]byte(appErr.ToJSON()))
|
||||
return
|
||||
}
|
||||
|
||||
hooks, err := pluginsEnvironment.HooksForPlugin(pluginID)
|
||||
if err != nil {
|
||||
mlog.Debug("Access to route for non-existent plugin",
|
||||
mlog.String("missing_plugin_id", pluginID),
|
||||
mlog.String("url", r.URL.String()),
|
||||
mlog.Err(err))
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
subpath, err := utils.GetSubpathFromConfig(pm.cfgFn())
|
||||
if err != nil {
|
||||
appErr := model.NewAppError("ServePluginMetricsRequest", "app.plugin.subpath_parse.app_error",
|
||||
nil, "Failed to parse SiteURL subpath", http.StatusInternalServerError).Wrap(err)
|
||||
mlog.Error(appErr.Error())
|
||||
w.WriteHeader(appErr.StatusCode)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write([]byte(appErr.ToJSON()))
|
||||
return
|
||||
}
|
||||
|
||||
r.URL.Path = strings.TrimPrefix(r.URL.Path, path.Join(subpath, "plugins", pluginID, "metrics"))
|
||||
|
||||
// Passing an empty plugin context for the time being. To be decided whether we
|
||||
// should support forms of authentication in the future.
|
||||
hooks.ServeMetrics(&plugin.Context{}, w, r)
|
||||
}
|
||||
|
||||
func (ps *PlatformService) HandleMetrics(route string, h http.Handler) {
|
||||
if ps.metrics != nil {
|
||||
ps.metrics.router.Handle(route, h)
|
||||
|
@ -2305,3 +2305,63 @@ func TestSendPushNotification(t *testing.T) {
|
||||
}
|
||||
assert.Equal(t, 6, numMessages)
|
||||
}
|
||||
|
||||
func TestPluginServeMetrics(t *testing.T) {
|
||||
th := Setup(t, StartMetrics)
|
||||
defer th.TearDown()
|
||||
|
||||
var prevEnable *bool
|
||||
var prevAddress *string
|
||||
th.App.UpdateConfig(func(cfg *model.Config) {
|
||||
prevEnable = cfg.MetricsSettings.Enable
|
||||
prevAddress = cfg.MetricsSettings.ListenAddress
|
||||
cfg.MetricsSettings.Enable = model.NewBool(true)
|
||||
cfg.MetricsSettings.ListenAddress = model.NewString(":30067")
|
||||
})
|
||||
defer th.App.UpdateConfig(func(cfg *model.Config) {
|
||||
cfg.MetricsSettings.Enable = prevEnable
|
||||
cfg.MetricsSettings.ListenAddress = prevAddress
|
||||
})
|
||||
|
||||
testFolder, found := fileutils.FindDir("channels/app/plugin_api_tests")
|
||||
require.True(t, found, "Cannot find tests folder")
|
||||
fullPath := path.Join(testFolder, "manual.test_serve_metrics_plugin", "main.go")
|
||||
|
||||
pluginCode, err := os.ReadFile(fullPath)
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, pluginCode)
|
||||
|
||||
tearDown, ids, errors := SetAppEnvironmentWithPlugins(t, []string{string(pluginCode)}, th.App, th.NewPluginAPI)
|
||||
defer tearDown()
|
||||
require.NoError(t, errors[0])
|
||||
require.Len(t, ids, 1)
|
||||
|
||||
pluginID := ids[0]
|
||||
require.NotEmpty(t, pluginID)
|
||||
|
||||
reqURL := fmt.Sprintf("http://localhost%s/plugins/%s/metrics", *th.App.Config().MetricsSettings.ListenAddress, pluginID)
|
||||
req, err := http.NewRequest("GET", reqURL, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
client := &http.Client{}
|
||||
resp, err := client.Do(req)
|
||||
require.NoError(t, err)
|
||||
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "METRICS", string(body))
|
||||
|
||||
reqURL = fmt.Sprintf("http://localhost%s/plugins/%s/metrics/subpath", *th.App.Config().MetricsSettings.ListenAddress, pluginID)
|
||||
req, err = http.NewRequest("GET", reqURL, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
resp, err = client.Do(req)
|
||||
require.NoError(t, err)
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, err = io.ReadAll(resp.Body)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "METRICS SUBPATH", string(body))
|
||||
}
|
||||
|
@ -0,0 +1,27 @@
|
||||
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
|
||||
// See LICENSE.txt for license information.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/mattermost/mattermost/server/public/plugin"
|
||||
)
|
||||
|
||||
type Plugin struct {
|
||||
plugin.MattermostPlugin
|
||||
}
|
||||
|
||||
func (p *Plugin) ServeMetrics(_ *plugin.Context, w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path == "/subpath" {
|
||||
w.Write([]byte("METRICS SUBPATH"))
|
||||
return
|
||||
}
|
||||
|
||||
w.Write([]byte("METRICS"))
|
||||
}
|
||||
|
||||
func main() {
|
||||
plugin.ClientMain(&Plugin{})
|
||||
}
|
@ -6306,6 +6306,10 @@
|
||||
"id": "app.plugin.store_signature.app_error",
|
||||
"translation": "Unable to store the plugin signature to the configured file store."
|
||||
},
|
||||
{
|
||||
"id": "app.plugin.subpath_parse.app_error",
|
||||
"translation": "Failed to parse SiteURL subpath"
|
||||
},
|
||||
{
|
||||
"id": "app.plugin.sync.list_filestore.app_error",
|
||||
"translation": "Error reading files from the plugins folder in the file store."
|
||||
|
@ -953,3 +953,107 @@ func (s *apiRPCServer) UploadData(args *Z_UploadDataArgs, returns *Z_UploadDataR
|
||||
returns.A, returns.B = hook.UploadData(args.A, pluginReader)
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
hookNameToId["ServeMetrics"] = ServeMetricsID
|
||||
}
|
||||
|
||||
type Z_ServeMetricsArgs struct {
|
||||
ResponseWriterStream uint32
|
||||
Request *http.Request
|
||||
Context *Context
|
||||
RequestBodyStream uint32
|
||||
}
|
||||
|
||||
func (g *hooksRPCClient) ServeMetrics(c *Context, w http.ResponseWriter, r *http.Request) {
|
||||
if !g.implemented[ServeMetricsID] {
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
serveMetricsStreamId := g.muxBroker.NextId()
|
||||
go func() {
|
||||
connection, err := g.muxBroker.Accept(serveMetricsStreamId)
|
||||
if err != nil {
|
||||
g.log.Error("Plugin failed to ServeMetrics, muxBroker couldn't accept connection", mlog.Uint32("serve_http_stream_id", serveMetricsStreamId), mlog.Err(err))
|
||||
return
|
||||
}
|
||||
defer connection.Close()
|
||||
|
||||
rpcServer := rpc.NewServer()
|
||||
if err := rpcServer.RegisterName("Plugin", &httpResponseWriterRPCServer{w: w, log: g.log}); err != nil {
|
||||
g.log.Error("Plugin failed to ServeMetrics, couldn't register RPC name", mlog.Err(err))
|
||||
return
|
||||
}
|
||||
rpcServer.ServeConn(connection)
|
||||
}()
|
||||
|
||||
requestBodyStreamId := uint32(0)
|
||||
if r.Body != nil {
|
||||
requestBodyStreamId = g.muxBroker.NextId()
|
||||
go func() {
|
||||
bodyConnection, err := g.muxBroker.Accept(requestBodyStreamId)
|
||||
if err != nil {
|
||||
g.log.Error("Plugin failed to ServeMetrics, muxBroker couldn't Accept request body connection", mlog.Err(err))
|
||||
return
|
||||
}
|
||||
defer bodyConnection.Close()
|
||||
serveIOReader(r.Body, bodyConnection)
|
||||
}()
|
||||
}
|
||||
|
||||
forwardedRequest := &http.Request{
|
||||
Method: r.Method,
|
||||
URL: r.URL,
|
||||
Proto: r.Proto,
|
||||
ProtoMajor: r.ProtoMajor,
|
||||
ProtoMinor: r.ProtoMinor,
|
||||
Header: r.Header,
|
||||
Host: r.Host,
|
||||
RemoteAddr: r.RemoteAddr,
|
||||
RequestURI: r.RequestURI,
|
||||
}
|
||||
|
||||
if err := g.client.Call("Plugin.ServeMetrics", Z_ServeMetricsArgs{
|
||||
Context: c,
|
||||
ResponseWriterStream: serveMetricsStreamId,
|
||||
Request: forwardedRequest,
|
||||
RequestBodyStream: requestBodyStreamId,
|
||||
}, nil); err != nil {
|
||||
g.log.Error("Plugin failed to ServeMetrics, RPC call failed", mlog.Err(err))
|
||||
http.Error(w, "500 internal server error", http.StatusInternalServerError)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *hooksRPCServer) ServeMetrics(args *Z_ServeMetricsArgs, returns *struct{}) error {
|
||||
connection, err := s.muxBroker.Dial(args.ResponseWriterStream)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "[ERROR] Can't connect to remote response writer stream, error: %v", err.Error())
|
||||
return err
|
||||
}
|
||||
w := connectHTTPResponseWriter(connection)
|
||||
defer w.Close()
|
||||
|
||||
r := args.Request
|
||||
if args.RequestBodyStream != 0 {
|
||||
connection, err := s.muxBroker.Dial(args.RequestBodyStream)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "[ERROR] Can't connect to remote request body stream, error: %v", err.Error())
|
||||
return err
|
||||
}
|
||||
r.Body = connectIOReader(connection)
|
||||
} else {
|
||||
r.Body = io.NopCloser(&bytes.Buffer{})
|
||||
}
|
||||
defer r.Body.Close()
|
||||
|
||||
if hook, ok := s.impl.(interface {
|
||||
ServeMetrics(c *Context, w http.ResponseWriter, r *http.Request)
|
||||
}); ok {
|
||||
hook.ServeMetrics(args.Context, w, r)
|
||||
} else {
|
||||
http.NotFound(w, r)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -54,6 +54,7 @@ const (
|
||||
UserHasBeenDeactivatedID = 36
|
||||
MessageHasBeenDeletedID = 37
|
||||
MessagesWillBeConsumedID = 38
|
||||
ServeMetricsID = 39
|
||||
TotalHooksID = iota
|
||||
)
|
||||
|
||||
@ -322,4 +323,11 @@ type Hooks interface {
|
||||
//
|
||||
// Minimum server version: 9.1
|
||||
UserHasBeenDeactivated(c *Context, user *model.User)
|
||||
|
||||
// ServeMetrics allows plugins to expose their own metrics endpoint through
|
||||
// the server's metrics HTTP listener (e.g. "localhost:8067").
|
||||
// Requests destined to the /plugins/{id}/metrics path will be routed to the plugin.
|
||||
//
|
||||
// Minimum server version: 9.2
|
||||
ServeMetrics(c *Context, w http.ResponseWriter, r *http.Request)
|
||||
}
|
||||
|
@ -244,3 +244,9 @@ func (hooks *hooksTimerLayer) UserHasBeenDeactivated(c *Context, user *model.Use
|
||||
hooks.hooksImpl.UserHasBeenDeactivated(c, user)
|
||||
hooks.recordTime(startTime, "UserHasBeenDeactivated", true)
|
||||
}
|
||||
|
||||
func (hooks *hooksTimerLayer) ServeMetrics(c *Context, w http.ResponseWriter, r *http.Request) {
|
||||
startTime := timePkg.Now()
|
||||
hooks.hooksImpl.ServeMetrics(c, w, r)
|
||||
hooks.recordTime(startTime, "ServeMetrics", true)
|
||||
}
|
||||
|
@ -37,6 +37,7 @@ var excludedPluginHooks = []string{
|
||||
"PluginHTTP",
|
||||
"ServeHTTP",
|
||||
"UploadData",
|
||||
"ServeMetrics",
|
||||
}
|
||||
|
||||
var excludedProductHooks = []string{
|
||||
|
@ -360,6 +360,11 @@ func (_m *Hooks) ServeHTTP(c *plugin.Context, w http.ResponseWriter, r *http.Req
|
||||
_m.Called(c, w, r)
|
||||
}
|
||||
|
||||
// ServeMetrics provides a mock function with given fields: c, w, r
|
||||
func (_m *Hooks) ServeMetrics(c *plugin.Context, w http.ResponseWriter, r *http.Request) {
|
||||
_m.Called(c, w, r)
|
||||
}
|
||||
|
||||
// UserHasBeenCreated provides a mock function with given fields: c, user
|
||||
func (_m *Hooks) UserHasBeenCreated(c *plugin.Context, user *model.User) {
|
||||
_m.Called(c, user)
|
||||
|
@ -9,6 +9,7 @@ package plugin
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"net/http"
|
||||
"reflect"
|
||||
|
||||
"github.com/mattermost/mattermost/server/public/model"
|
||||
@ -134,6 +135,10 @@ type UserHasBeenDeactivatedIFace interface {
|
||||
UserHasBeenDeactivated(c *Context, user *model.User)
|
||||
}
|
||||
|
||||
type ServeMetricsIFace interface {
|
||||
ServeMetrics(c *Context, w http.ResponseWriter, r *http.Request)
|
||||
}
|
||||
|
||||
type HooksAdapter struct {
|
||||
implemented map[int]struct{}
|
||||
productHooks any
|
||||
@ -417,6 +422,15 @@ func NewAdapter(productHooks any) (*HooksAdapter, error) {
|
||||
return nil, errors.New("hook has UserHasBeenDeactivated method but does not implement plugin.UserHasBeenDeactivated interface")
|
||||
}
|
||||
|
||||
// Assessing the type of the productHooks if it individually implements ServeMetrics interface.
|
||||
tt = reflect.TypeOf((*ServeMetricsIFace)(nil)).Elem()
|
||||
|
||||
if ft.Implements(tt) {
|
||||
a.implemented[ServeMetricsID] = struct{}{}
|
||||
} else if _, ok := ft.MethodByName("ServeMetrics"); ok {
|
||||
return nil, errors.New("hook has ServeMetrics method but does not implement plugin.ServeMetrics interface")
|
||||
}
|
||||
|
||||
return a, nil
|
||||
}
|
||||
|
||||
@ -689,3 +703,12 @@ func (a *HooksAdapter) UserHasBeenDeactivated(c *Context, user *model.User) {
|
||||
a.productHooks.(UserHasBeenDeactivatedIFace).UserHasBeenDeactivated(c, user)
|
||||
|
||||
}
|
||||
|
||||
func (a *HooksAdapter) ServeMetrics(c *Context, w http.ResponseWriter, r *http.Request) {
|
||||
if _, ok := a.implemented[ServeMetricsID]; !ok {
|
||||
panic("product hooks must implement ServeMetrics")
|
||||
}
|
||||
|
||||
a.productHooks.(ServeMetricsIFace).ServeMetrics(c, w, r)
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user