MM-25731 Advanced Logging (#14888)

Adds Advanced Logging to server. Advanced Logging is an optional logging capability that allows customers to send log records to any number of destinations.

Supported destinations:
- file
- syslog (with out without TLS)
- raw TCP socket (with out without TLS)

Allows developers to specify discrete log levels as well as the standard trace, debug, info, ... panic. Existing code and logging API usage is unchanged.

Log records are emitted asynchronously to reduce latency to the caller. Supports hot-reloading of logger config, including adding removing targets.

Advanced Logging is configured within config.json via "LogSettings.AdvancedLoggingConfig" which can contain a filespec to another config file, a database DSN, or JSON.
This commit is contained in:
Doug Lauder
2020-07-15 14:40:36 -04:00
committed by GitHub
parent 4ba6c35813
commit 90ff87a77f
53 changed files with 1442 additions and 82 deletions

View File

@@ -117,14 +117,14 @@ func (s *Server) configureAudit(adt *audit.Audit) {
if port <= 0 {
port = 6514
}
raddr := fmt.Sprintf("%s:%d", IP, port)
maxQSize := *s.Config().ExperimentalAuditSettings.SysLogMaxQueueSize
if maxQSize <= 0 {
maxQSize = audit.DefMaxQueueSize
}
params := &audit.SyslogParams{
Raddr: raddr,
params := &mlog.SyslogParams{
IP: IP,
Port: port,
Cert: *s.Config().ExperimentalAuditSettings.SysLogCert,
Tag: *s.Config().ExperimentalAuditSettings.SysLogTag,
Insecure: *s.Config().ExperimentalAuditSettings.SysLogInsecure,
@@ -132,11 +132,11 @@ func (s *Server) configureAudit(adt *audit.Audit) {
filter := adt.MakeFilter(RestLevel, RestContentLevel, RestPermsLevel, CLILevel)
formatter := adt.MakeJSONFormatter()
target, err := audit.NewSyslogTLSTarget(filter, formatter, params, maxQSize)
target, err := mlog.NewSyslogTarget(filter, formatter, params, maxQSize)
if err != nil {
mlog.Error("cannot configure SysLogTLS audit target", mlog.Err(err))
} else {
mlog.Debug("SysLogTLS audit target connected successfully", mlog.String("raddr", raddr))
mlog.Debug("SysLogTLS audit target connected successfully", mlog.String("IP", IP), mlog.Int("Port", port))
adt.AddTarget(target)
}
}

View File

@@ -401,6 +401,7 @@ func (s *Server) trackConfig() {
"file_json": cfg.LogSettings.FileJson,
"enable_webhook_debugging": cfg.LogSettings.EnableWebhookDebugging,
"isdefault_file_location": isDefault(cfg.LogSettings.FileLocation, ""),
"advanced_logging_config": *cfg.LogSettings.AdvancedLoggingConfig != "",
})
s.SendDiagnostic(TRACK_CONFIG_AUDIT, map[string]interface{}{

View File

@@ -123,6 +123,8 @@ type Server struct {
asymmetricSigningKey *ecdsa.PrivateKey
postActionCookieSecret []byte
advancedLogListenerCleanup func()
pluginCommands []*PluginCommand
pluginCommandsLock sync.RWMutex
@@ -195,22 +197,10 @@ func NewServer(options ...Option) (*Server, error) {
s.configStore = configStore
}
if s.Log == nil {
s.Log = mlog.NewLogger(utils.MloggerConfigFromLoggerConfig(&s.Config().LogSettings, utils.GetLogFileLocation))
if err := s.initLogging(); err != nil {
mlog.Error(err.Error())
}
if s.NotificationsLog == nil {
notificationLogSettings := utils.GetLogSettingsFromNotificationsLogSettings(&s.Config().NotificationLogSettings)
s.NotificationsLog = mlog.NewLogger(utils.MloggerConfigFromLoggerConfig(notificationLogSettings, utils.GetNotificationsLogFileLocation)).
WithCallerSkip(1).With(mlog.String("logSource", "notifications"))
}
// Redirect default golang logger to this logger
mlog.RedirectStdLog(s.Log)
// Use this app logger as the global logger (eventually remove all instances of global logging)
mlog.InitGlobalLogger(s.Log)
// It is important to initialize the hub only after the global logger is set
// to avoid race conditions while logging from inside the hub.
fakeApp := New(ServerConnector(s))
@@ -238,13 +228,6 @@ func NewServer(options ...Option) (*Server, error) {
s.tracer = tracer
}
s.logListenerId = s.AddConfigListener(func(_, after *model.Config) {
s.Log.ChangeLevels(utils.MloggerConfigFromLoggerConfig(&after.LogSettings, utils.GetLogFileLocation))
notificationLogSettings := utils.GetLogSettingsFromNotificationsLogSettings(&after.NotificationLogSettings)
s.NotificationsLog.ChangeLevels(utils.MloggerConfigFromLoggerConfig(notificationLogSettings, utils.GetNotificationsLogFileLocation))
})
s.HTTPService = httpservice.MakeHTTPService(s)
s.pushNotificationClient = s.HTTPService.MakeClient(true)
@@ -490,6 +473,17 @@ func NewServer(options ...Option) (*Server, error) {
s.configureAudit(s.Audit)
}
if license == nil || !*license.Features.AdvancedLogging {
timeoutCtx, cancelCtx := context.WithTimeout(context.Background(), time.Second*5)
defer cancelCtx()
mlog.Info("Shutting down advanced logging")
mlog.ShutdownAdvancedLogging(timeoutCtx)
if s.advancedLogListenerCleanup != nil {
s.advancedLogListenerCleanup()
s.advancedLogListenerCleanup = nil
}
}
// Enable developer settings if this is a "dev" build
if model.BuildNumber == "dev" {
s.UpdateConfig(func(cfg *model.Config) { *cfg.ServiceSettings.EnableDeveloper = true })
@@ -549,6 +543,78 @@ func (s *Server) AppOptions() []AppOption {
}
}
func (s *Server) initLogging() error {
if s.Log == nil {
s.Log = mlog.NewLogger(utils.MloggerConfigFromLoggerConfig(&s.Config().LogSettings, utils.GetLogFileLocation))
}
if s.NotificationsLog == nil {
notificationLogSettings := utils.GetLogSettingsFromNotificationsLogSettings(&s.Config().NotificationLogSettings)
s.NotificationsLog = mlog.NewLogger(utils.MloggerConfigFromLoggerConfig(notificationLogSettings, utils.GetNotificationsLogFileLocation)).
WithCallerSkip(1).With(mlog.String("logSource", "notifications"))
}
// Redirect default golang logger to this logger
mlog.RedirectStdLog(s.Log)
// Use this app logger as the global logger (eventually remove all instances of global logging)
mlog.InitGlobalLogger(s.Log)
s.logListenerId = s.AddConfigListener(func(_, after *model.Config) {
s.Log.ChangeLevels(utils.MloggerConfigFromLoggerConfig(&after.LogSettings, utils.GetLogFileLocation))
notificationLogSettings := utils.GetLogSettingsFromNotificationsLogSettings(&after.NotificationLogSettings)
s.NotificationsLog.ChangeLevels(utils.MloggerConfigFromLoggerConfig(notificationLogSettings, utils.GetNotificationsLogFileLocation))
})
// Configure advanced logging.
// Advanced logging is E20 only, however logging must be initialized before the license
// file is loaded. If no valid E20 license exists then advanced logging will be
// shutdown once license is loaded/checked.
if *s.Config().LogSettings.AdvancedLoggingConfig != "" {
dsn := *s.Config().LogSettings.AdvancedLoggingConfig
isJson := config.IsJsonMap(dsn)
// If this is a file based config we need the full path so it can be watched.
if !isJson {
if fs, ok := s.configStore.(*config.FileStore); ok {
dsn = fs.GetFilePath(dsn)
}
}
cfg, err := config.NewLogConfigSrc(dsn, isJson, s.configStore)
if err != nil {
return fmt.Errorf("invalid advanced logging config, %w", err)
}
if err := mlog.ConfigAdvancedLogging(cfg.Get()); err != nil {
return fmt.Errorf("error configuring advanced logging, %w", err)
}
if !isJson {
mlog.Info("Loaded advanced logging config", mlog.String("source", dsn))
}
listenerId := cfg.AddListener(func(_, newCfg mlog.LogTargetCfg) {
if err := mlog.ConfigAdvancedLogging(newCfg); err != nil {
mlog.Error("Error re-configuring advanced logging", mlog.Err(err))
} else {
mlog.Info("Re-configured advanced logging")
}
})
// In case initLogging is called more than once.
if s.advancedLogListenerCleanup != nil {
s.advancedLogListenerCleanup()
}
s.advancedLogListenerCleanup = func() {
cfg.RemoveListener(listenerId)
}
}
return nil
}
const TIME_TO_WAIT_FOR_CONNECTIONS_TO_CLOSE_ON_SERVER_SHUTDOWN = time.Second
func (s *Server) StopHTTPServer() {
@@ -604,6 +670,11 @@ func (s *Server) Shutdown() error {
s.htmlTemplateWatcher.Close()
}
if s.advancedLogListenerCleanup != nil {
s.advancedLogListenerCleanup()
s.advancedLogListenerCleanup = nil
}
s.RemoveConfigListener(s.configListenerId)
s.RemoveConfigListener(s.logListenerId)
s.stopSearchEngine()
@@ -640,7 +711,19 @@ func (s *Server) Shutdown() error {
}
}
timeoutCtx, timeoutCancel := context.WithTimeout(context.Background(), time.Second*15)
defer timeoutCancel()
if err := mlog.Flush(timeoutCtx); err != nil {
mlog.Error("Error flushing logs", mlog.Err(err))
}
mlog.Info("Server stopped")
// this should just write the "server stopped" record, the rest are already flushed.
timeoutCtx2, timeoutCancel2 := context.WithTimeout(context.Background(), time.Second*5)
defer timeoutCancel2()
_ = mlog.ShutdownAdvancedLogging(timeoutCtx2)
return nil
}

View File

@@ -7,8 +7,8 @@ import (
"fmt"
"sort"
"github.com/wiggin77/logr"
"github.com/wiggin77/logr/format"
"github.com/mattermost/logr"
"github.com/mattermost/logr/format"
)
type Level logr.Level

View File

@@ -6,9 +6,9 @@ package audit
import (
"testing"
"github.com/mattermost/logr"
"github.com/mattermost/logr/format"
"github.com/stretchr/testify/require"
"github.com/wiggin77/logr"
"github.com/wiggin77/logr/format"
)
func Test_sortAuditFields(t *testing.T) {

View File

@@ -6,8 +6,8 @@ package audit
import (
"os"
"github.com/wiggin77/logr"
"github.com/wiggin77/logr/target"
"github.com/mattermost/logr"
"github.com/mattermost/logr/target"
)
type FileOptions target.FileOptions

View File

@@ -6,6 +6,7 @@ package config
import (
"sync"
"github.com/mattermost/mattermost-server/v5/mlog"
"github.com/mattermost/mattermost-server/v5/model"
)
@@ -37,3 +38,29 @@ func (e *emitter) invokeConfigListeners(oldCfg, newCfg *model.Config) {
return true
})
}
// srcEmitter enables threadsafe registration and broadcasting to configuration listeners
type logSrcEmitter struct {
listeners sync.Map
}
// AddListener adds a callback function to invoke when the configuration is modified.
func (e *logSrcEmitter) AddListener(listener LogSrcListener) string {
id := model.NewId()
e.listeners.Store(id, listener)
return id
}
// RemoveListener removes a callback function using an id returned from AddListener.
func (e *logSrcEmitter) RemoveListener(id string) {
e.listeners.Delete(id)
}
// invokeConfigListeners synchronously notifies all listeners about the configuration change.
func (e *logSrcEmitter) invokeConfigListeners(oldCfg, newCfg mlog.LogTargetCfg) {
e.listeners.Range(func(key, value interface{}) bool {
listener := value.(LogSrcListener)
listener(oldCfg, newCfg)
return true
})
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/mattermost/mattermost-server/v5/mlog"
"github.com/mattermost/mattermost-server/v5/model"
)
@@ -51,3 +52,44 @@ func TestEmitter(t *testing.T) {
assert.False(t, listener1, "listener 1 should not have been called")
assert.False(t, listener2, "listener 2 should not have been called")
}
func TestLogSrcEmitter(t *testing.T) {
var e logSrcEmitter
expectedOldCfg := make(mlog.LogTargetCfg)
expectedNewCfg := make(mlog.LogTargetCfg)
listener1 := false
id1 := e.AddListener(func(oldCfg, newCfg mlog.LogTargetCfg) {
assert.Equal(t, expectedOldCfg, oldCfg)
assert.Equal(t, expectedNewCfg, newCfg)
listener1 = true
})
listener2 := false
id2 := e.AddListener(func(oldCfg, newCfg mlog.LogTargetCfg) {
assert.Equal(t, expectedOldCfg, oldCfg)
assert.Equal(t, expectedNewCfg, newCfg)
listener2 = true
})
e.invokeConfigListeners(expectedOldCfg, expectedNewCfg)
assert.True(t, listener1, "listener 1 not called")
assert.True(t, listener2, "listener 2 not called")
e.RemoveListener(id2)
listener1 = false
listener2 = false
e.invokeConfigListeners(expectedOldCfg, expectedNewCfg)
assert.True(t, listener1, "listener 1 not called")
assert.False(t, listener2, "listener 2 should not have been called")
e.RemoveListener(id1)
listener1 = false
listener2 = false
e.invokeConfigListeners(expectedOldCfg, expectedNewCfg)
assert.False(t, listener1, "listener 1 should not have been called")
assert.False(t, listener2, "listener 2 should not have been called")
}

View File

@@ -181,6 +181,12 @@ func (fs *FileStore) GetFile(name string) ([]byte, error) {
return data, nil
}
// GetFilePath returns the resolved path of a configuration file.
// The file may not necessarily exist.
func (fs *FileStore) GetFilePath(name string) string {
return fs.resolveFilePath(name)
}
// SetFile sets or replaces the contents of a configuration file.
func (fs *FileStore) SetFile(name string, data []byte) error {
resolvedPath := fs.resolveFilePath(name)

188
config/logging.go Normal file
View File

@@ -0,0 +1,188 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package config
import (
"os"
"strings"
"sync"
"github.com/mattermost/mattermost-server/v5/mlog"
)
type LogSrcListener func(old, new mlog.LogTargetCfg)
type FileGetter interface {
GetFile(name string) ([]byte, error)
}
// LogConfigSrc abstracts the Advanced Logging configuration so that implementations can
// fetch from file, database, etc.
type LogConfigSrc interface {
// Get fetches the current, cached configuration.
Get() mlog.LogTargetCfg
// Set updates the dsn specifying the source and reloads
Set(dsn string, fget FileGetter) (err error)
// AddListener adds a callback function to invoke when the configuration is modified.
AddListener(listener LogSrcListener) string
// RemoveListener removes a callback function using an id returned from AddListener.
RemoveListener(id string)
// Close cleans up resources.
Close() error
}
// NewLogConfigSrc creates an advanced logging configuration source, backed by a
// file, JSON string, or database.
func NewLogConfigSrc(dsn string, isJSON bool, fget FileGetter) (LogConfigSrc, error) {
dsn = strings.TrimSpace(dsn)
if isJSON {
return newJSONSrc(dsn)
}
return newFileSrc(dsn, fget)
}
// jsonSrc
type jsonSrc struct {
logSrcEmitter
mutex sync.RWMutex
cfg mlog.LogTargetCfg
}
func newJSONSrc(data string) (*jsonSrc, error) {
src := &jsonSrc{}
return src, src.Set(data, nil)
}
// Get fetches the current, cached configuration
func (src *jsonSrc) Get() mlog.LogTargetCfg {
src.mutex.RLock()
defer src.mutex.RUnlock()
return src.cfg
}
// Set updates the JSON specifying the source and reloads
func (src *jsonSrc) Set(data string, _ FileGetter) error {
cfg, err := JSONToLogTargetCfg([]byte(data))
if err != nil {
return err
}
src.set(cfg)
return nil
}
func (src *jsonSrc) set(cfg mlog.LogTargetCfg) {
src.mutex.Lock()
defer src.mutex.Unlock()
old := src.cfg
src.cfg = cfg
src.invokeConfigListeners(old, cfg)
}
// Close cleans up resources.
func (src *jsonSrc) Close() error {
return nil
}
// fileSrc
type fileSrc struct {
logSrcEmitter
mutex sync.RWMutex
cfg mlog.LogTargetCfg
path string
watcher *watcher
}
func newFileSrc(path string, fget FileGetter) (*fileSrc, error) {
src := &fileSrc{
path: path,
}
if err := src.Set(path, fget); err != nil {
return nil, err
}
return src, nil
}
// Get fetches the current, cached configuration
func (src *fileSrc) Get() mlog.LogTargetCfg {
src.mutex.RLock()
defer src.mutex.RUnlock()
return src.cfg
}
// Set updates the dsn specifying the file source and reloads.
// The file will be watched for changes and reloaded as needed,
// and all listeners notified.
func (src *fileSrc) Set(path string, fget FileGetter) error {
data, err := fget.GetFile(path)
if err != nil {
return err
}
cfg, err := JSONToLogTargetCfg(data)
if err != nil {
return err
}
src.set(cfg)
// If path is a real file and not just the name of a database resource then watch it for changes.
// Absolute paths are explicit and require no resolution.
if _, err = os.Stat(path); os.IsNotExist(err) {
return nil
}
src.mutex.Lock()
defer src.mutex.Unlock()
if src.watcher != nil {
if err = src.watcher.Close(); err != nil {
mlog.Error("Failed to close watcher", mlog.Err(err))
}
src.watcher = nil
}
watcher, err := newWatcher(path, func() {
if serr := src.Set(path, fget); serr != nil {
mlog.Error("Failed to reload file on change", mlog.String("path", path), mlog.Err(serr))
}
})
if err != nil {
return err
}
src.watcher = watcher
return nil
}
func (src *fileSrc) set(cfg mlog.LogTargetCfg) {
src.mutex.Lock()
defer src.mutex.Unlock()
old := src.cfg
src.cfg = cfg
src.invokeConfigListeners(old, cfg)
}
// Close cleans up resources.
func (src *fileSrc) Close() error {
var err error
src.mutex.Lock()
defer src.mutex.Unlock()
if src.watcher != nil {
err = src.watcher.Close()
src.watcher = nil
}
return err
}

58
config/logging_test.go Normal file
View File

@@ -0,0 +1,58 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package config
import (
"os"
"testing"
"github.com/stretchr/testify/assert"
)
const (
validJSON = `{"file":{ "Type":"file"}}`
badJSON = `{"file":{ Type="file"}}`
)
type fgetFunc func(string) ([]byte, error)
func (f fgetFunc) GetFile(path string) ([]byte, error) {
return f(path)
}
func getValidFile(path string) ([]byte, error) {
return []byte(validJSON), nil
}
func getInvalidFile(path string) ([]byte, error) {
return nil, os.ErrNotExist
}
func TestNewLogConfigSrc(t *testing.T) {
tests := []struct {
name string
dsn string
fget FileGetter
wantErr bool
wantType LogConfigSrc
}{
{name: "empty dsn", dsn: "", fget: fgetFunc(getInvalidFile), wantErr: true, wantType: nil},
{name: "garbage dsn", dsn: "!@wfejwcevioj", fget: fgetFunc(getInvalidFile), wantErr: true, wantType: nil},
{name: "valid json dsn", dsn: validJSON, fget: fgetFunc(getInvalidFile), wantErr: false, wantType: &jsonSrc{}},
{name: "invalid json dsn", dsn: badJSON, fget: fgetFunc(getInvalidFile), wantErr: true, wantType: nil},
{name: "valid filespec dsn", dsn: "advancedlogging.conf", fget: fgetFunc(getValidFile), wantErr: false, wantType: &fileSrc{}},
{name: "invalid filespec dsn", dsn: "/nobody/here.conf", fget: fgetFunc(getInvalidFile), wantErr: true, wantType: nil},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := NewLogConfigSrc(tt.dsn, IsJsonMap(tt.dsn), tt.fget)
if tt.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.IsType(t, tt.wantType, got)
}
})
}
}

View File

@@ -5,7 +5,7 @@ package config
import "github.com/pkg/errors"
// Migrate migrates SAML keys and certificates from one store to another given their data source names.
// Migrate migrates SAML keys, certificates, and other config files from one store to another given their data source names.
func Migrate(from, to string) error {
source, err := NewStore(from, false)
if err != nil {
@@ -30,6 +30,11 @@ func Migrate(from, to string) error {
*sourceConfig.SamlSettings.PrivateKeyFile,
}
// Only migrate advanced logging config if it is not embedded JSON.
if !IsJsonMap(*sourceConfig.LogSettings.AdvancedLoggingConfig) {
files = append(files, *sourceConfig.LogSettings.AdvancedLoggingConfig)
}
files = append(files, sourceConfig.PluginSettings.SignaturePublicKeyFiles...)
for _, file := range files {

View File

@@ -4,6 +4,7 @@
package config
import (
"encoding/json"
"strings"
"github.com/mattermost/mattermost-server/v5/mlog"
@@ -161,3 +162,17 @@ func stripPassword(dsn, schema string) string {
return prefix + dsn[:i+1] + dsn[j:]
}
func IsJsonMap(data string) bool {
var m map[string]interface{}
return json.Unmarshal([]byte(data), &m) == nil
}
func JSONToLogTargetCfg(data []byte) (mlog.LogTargetCfg, error) {
cfg := make(mlog.LogTargetCfg)
err := json.Unmarshal(data, &cfg)
if err != nil {
return nil, err
}
return cfg, nil
}

View File

@@ -6,10 +6,9 @@ package config
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/mattermost/mattermost-server/v5/model"
"github.com/mattermost/mattermost-server/v5/utils"
"github.com/stretchr/testify/assert"
)
func TestDesanitize(t *testing.T) {
@@ -204,3 +203,34 @@ func sToP(s string) *string {
func bToP(b bool) *bool {
return &b
}
func TestIsJsonMap(t *testing.T) {
tests := []struct {
name string
data string
want bool
}{
{name: "good json", data: `{"local_tcp": {
"Type": "tcp","Format": "json","Levels": [
{"ID": 5,"Name": "debug","Stacktrace": false}
],
"Options": {"ip": "localhost","port": 18065},
"MaxQueueSize": 1000}}
`, want: true,
},
{name: "empty json", data: "{}", want: true},
{name: "string json", data: `"test"`, want: false},
{name: "array json", data: `["test1", "test2"]`, want: false},
{name: "bad json", data: `{huh?}`, want: false},
{name: "filename", data: "/tmp/logger.conf", want: false},
{name: "mysql dsn", data: "mysql://mmuser:@tcp(localhost:3306)/mattermost?charset=utf8mb4,utf8&readTimeout=30s", want: false},
{name: "postgres dsn", data: "postgres://mmuser:passwordlocalhost:5432/mattermost?sslmode=disable&connect_timeout=10", want: false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := IsJsonMap(tt.data); got != tt.want {
t.Errorf("IsJsonMap() = %v, want %v", got, tt.want)
}
})
}
}

4
go.mod
View File

@@ -43,7 +43,7 @@ require (
github.com/hashicorp/go-hclog v0.14.1
github.com/hashicorp/go-immutable-radix v1.2.0 // indirect
github.com/hashicorp/go-msgpack v1.1.5 // indirect
github.com/hashicorp/go-multierror v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.0
github.com/hashicorp/go-plugin v1.3.0
github.com/hashicorp/go-sockaddr v1.0.2 // indirect
github.com/hashicorp/go-uuid v1.0.1 // indirect
@@ -63,6 +63,7 @@ require (
github.com/mattermost/gorp v2.0.1-0.20200527092429-d62b7b9cadfc+incompatible
github.com/mattermost/gosaml2 v0.3.2
github.com/mattermost/ldap v0.0.0-20191128190019-9f62ba4b8d4d
github.com/mattermost/logr v1.0.5
github.com/mattermost/rsc v0.0.0-20160330161541-bbaefb05eaa0
github.com/mattermost/viper v1.0.4
github.com/mattn/go-colorable v0.1.7 // indirect
@@ -108,7 +109,6 @@ require (
github.com/uber/jaeger-client-go v2.24.0+incompatible
github.com/uber/jaeger-lib v2.2.0+incompatible
github.com/vmihailenco/msgpack/v5 v5.0.0-beta.1
github.com/wiggin77/logr v1.0.4
github.com/wiggin77/merror v1.0.2
github.com/wiggin77/srslog v1.0.1
github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c // indirect

4
go.sum
View File

@@ -427,6 +427,8 @@ github.com/mattermost/gosaml2 v0.3.2 h1:kq2dY5qUe6fPPHra171GVlgo+ycBsEog0gZMetxL
github.com/mattermost/gosaml2 v0.3.2/go.mod h1:Z429EIOiEi9kbq6yHoApfzlcXpa6dzRDc6pO+Vy2Ksk=
github.com/mattermost/ldap v0.0.0-20191128190019-9f62ba4b8d4d h1:2DV7VIlEv6J5R5o6tUcb3ZMKJYeeZuWZL7Rv1m23TgQ=
github.com/mattermost/ldap v0.0.0-20191128190019-9f62ba4b8d4d/go.mod h1:HLbgMEI5K131jpxGazJ97AxfPDt31osq36YS1oxFQPQ=
github.com/mattermost/logr v1.0.5 h1:TST38xROPguNh8o90BfDHpp1bz6HfTdFYX5Btw/oLwM=
github.com/mattermost/logr v1.0.5/go.mod h1:YzldchiJXgF789YNDFGXVoCHTQOTrCKwWft9Fwev1iI=
github.com/mattermost/rsc v0.0.0-20160330161541-bbaefb05eaa0 h1:G9tL6JXRBMzjuD1kkBtcnd42kUiT6QDwxfFYu7adM6o=
github.com/mattermost/rsc v0.0.0-20160330161541-bbaefb05eaa0/go.mod h1:nV5bfVpT//+B1RPD2JvRnxbkLmJEYXmRaaVl15fsXjs=
github.com/mattermost/viper v1.0.4 h1:cMYOz4PhguscGSPxrSokUtib5HrG4gCpiUh27wyA3d0=
@@ -742,8 +744,6 @@ github.com/vmihailenco/tagparser v0.1.1 h1:quXMXlA39OCbd2wAdTsGDlK9RkOk6Wuw+x37w
github.com/vmihailenco/tagparser v0.1.1/go.mod h1:OeAg3pn3UbLjkWt+rN9oFYB6u/cQgqMEUPoW2WPyhdI=
github.com/wiggin77/cfg v1.0.2 h1:NBUX+iJRr+RTncTqTNvajHwzduqbhCQjEqxLHr6Fk7A=
github.com/wiggin77/cfg v1.0.2/go.mod h1:b3gotba2e5bXTqTW48DwIFoLc+4lWKP7WPi/CdvZ4aE=
github.com/wiggin77/logr v1.0.4 h1:g8YO5AU9hhKvLQnXceP8/y3JJtw3wPKL4kTzMTUdN5k=
github.com/wiggin77/logr v1.0.4/go.mod h1:h98FF6GPfThhDrHCg063hZA1sIyOEzQ/P85wgqI0IqE=
github.com/wiggin77/merror v1.0.2 h1:V0nH9eFp64ASyaXC+pB5WpvBoCg7NUwvaCSKdzlcHqw=
github.com/wiggin77/merror v1.0.2/go.mod h1:uQTcIU0Z6jRK4OwqganPYerzQxSFJ4GSHM3aurxxQpg=
github.com/wiggin77/srslog v1.0.1 h1:gA2XjSMy3DrRdX9UqLuDtuVAAshb8bE1NhX1YK0Qe+8=

View File

@@ -4,9 +4,13 @@
package mlog
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"github.com/mattermost/logr"
)
// defaultLog manually encodes the log to STDERR, providing a basic, default logging implementation
@@ -49,3 +53,31 @@ func defaultCriticalLog(msg string, fields ...Field) {
// We map critical to error in zap, so be consistent.
defaultLog("error", msg, fields...)
}
func defaultCustomLog(lvl LogLevel, msg string, fields ...Field) {
// custom log levels are only output once log targets are configured.
}
func defaultCustomMultiLog(lvl []LogLevel, msg string, fields ...Field) {
// custom log levels are only output once log targets are configured.
}
func defaultFlush(ctx context.Context) error {
return nil
}
func defaultAdvancedConfig(cfg LogTargetCfg) error {
// mlog.ConfigAdvancedConfig should not be called until default
// logger is replaced with mlog.Logger instance.
return errors.New("cannot config advanced logging on default logger")
}
func defaultAdvancedShutdown(ctx context.Context) error {
return nil
}
func defaultAddTarget(target logr.Target) error {
// mlog.AddTarget should not be called until default
// logger is replaced with mlog.Logger instance.
return errors.New("cannot AddTarget on default logger")
}

30
mlog/errors.go Normal file
View File

@@ -0,0 +1,30 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package mlog
import "github.com/mattermost/logr"
// onLoggerError is called when the logging system encounters an error,
// such as a target not able to write records. The targets will keep trying
// however the error will be logged with a dedicated level that can be output
// to a safe/always available target for monitoring or alerting.
func onLoggerError(err error) {
Log(LvlLogError, "advanced logging error", Err(err))
}
// onQueueFull is called when the main logger queue is full, indicating the
// volume and frequency of log record creation is too high for the queue size
// and/or the target latencies.
func onQueueFull(rec *logr.LogRec, maxQueueSize int) bool {
Log(LvlLogError, "main queue full, dropping record", Any("rec", rec))
return true // drop record
}
// onTargetQueueFull is called when the main logger queue is full, indicating the
// volume and frequency of log record creation is too high for the target's queue size
// and/or the target latency.
func onTargetQueueFull(target logr.Target, rec *logr.LogRec, maxQueueSize int) bool {
Log(LvlLogError, "target queue full, dropping record", String("target", ""), Any("rec", rec))
return true // drop record
}

View File

@@ -4,6 +4,9 @@
package mlog
import (
"context"
"github.com/mattermost/logr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
@@ -11,6 +14,10 @@ import (
var globalLogger *Logger
func InitGlobalLogger(logger *Logger) {
// Clean up previous instance.
if globalLogger != nil && globalLogger.logrLogger != nil {
globalLogger.logrLogger.Logr().Shutdown()
}
glob := *logger
glob.zap = glob.zap.WithOptions(zap.AddCallerSkip(1))
globalLogger = &glob
@@ -19,6 +26,12 @@ func InitGlobalLogger(logger *Logger) {
Warn = globalLogger.Warn
Error = globalLogger.Error
Critical = globalLogger.Critical
Log = globalLogger.Log
LogM = globalLogger.LogM
Flush = globalLogger.Flush
ConfigAdvancedLogging = globalLogger.ConfigAdvancedLogging
ShutdownAdvancedLogging = globalLogger.ShutdownAdvancedLogging
AddTarget = globalLogger.AddTarget
}
func RedirectStdLog(logger *Logger) {
@@ -26,6 +39,12 @@ func RedirectStdLog(logger *Logger) {
}
type LogFunc func(string, ...Field)
type LogFuncCustom func(LogLevel, string, ...Field)
type LogFuncCustomMulti func([]LogLevel, string, ...Field)
type FlushFunc func(context.Context) error
type ConfigFunc func(cfg LogTargetCfg) error
type ShutdownFunc func(context.Context) error
type AddTargetFunc func(logr.Target) error
// DON'T USE THIS Modify the level on the app logger
func GloballyDisableDebugLogForTest() {
@@ -42,3 +61,10 @@ var Info LogFunc = defaultInfoLog
var Warn LogFunc = defaultWarnLog
var Error LogFunc = defaultErrorLog
var Critical LogFunc = defaultCriticalLog
var Log LogFuncCustom = defaultCustomLog
var LogM LogFuncCustomMulti = defaultCustomMultiLog
var Flush FlushFunc = defaultFlush
var ConfigAdvancedLogging ConfigFunc = defaultAdvancedConfig
var ShutdownAdvancedLogging ShutdownFunc = defaultAdvancedShutdown
var AddTarget AddTargetFunc = defaultAddTarget

32
mlog/levels.go Normal file
View File

@@ -0,0 +1,32 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package mlog
// Standard levels
var (
LvlPanic = LogLevel{ID: 0, Name: "panic"}
LvlFatal = LogLevel{ID: 1, Name: "fatal"}
LvlError = LogLevel{ID: 2, Name: "error"}
LvlWarn = LogLevel{ID: 3, Name: "warn"}
LvlInfo = LogLevel{ID: 4, Name: "info"}
LvlDebug = LogLevel{ID: 5, Name: "debug"}
LvlTrace = LogLevel{ID: 6, Name: "trace"}
// used only by the logger
LvlLogError = LogLevel{ID: 11, Name: "logerror"}
)
// Register custom (discrete) levels here...
// ! ID's must not exceed 32,768 !
var (
// used by the audit system
LvlAuditDebug = LogLevel{ID: 100, Name: "AuditDebug"}
LvlAuditError = LogLevel{ID: 101, Name: "AuditError"}
// used by the TCP log target
LvlTcpLogTarget = LogLevel{ID: 105, Name: "TcpLogTarget"}
)
// Combinations for LogM (log multi)
var (
MLvlExample = []LogLevel{LvlAuditDebug, LvlDebug}
)

View File

@@ -4,10 +4,12 @@
package mlog
import (
"context"
"io"
"log"
"os"
"github.com/mattermost/logr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"gopkg.in/natefinch/lumberjack.v2"
@@ -52,6 +54,7 @@ type Logger struct {
zap *zap.Logger
consoleLevel zap.AtomicLevel
fileLevel zap.AtomicLevel
logrLogger *logr.Logger
}
func getZapLevel(level string) zapcore.Level {
@@ -107,7 +110,6 @@ func NewLogger(config *LoggerConfiguration) *Logger {
logger.zap = zap.New(combinedCore,
zap.AddCaller(),
)
return logger
}
@@ -123,6 +125,10 @@ func (l *Logger) SetConsoleLevel(level string) {
func (l *Logger) With(fields ...Field) *Logger {
newlogger := *l
newlogger.zap = newlogger.zap.With(fields...)
if newlogger.logrLogger != nil {
ll := newlogger.logrLogger.WithFields(zapToLogr(fields))
newlogger.logrLogger = &ll
}
return &newlogger
}
@@ -161,20 +167,98 @@ func (l *Logger) Sugar() *SugarLogger {
func (l *Logger) Debug(message string, fields ...Field) {
l.zap.Debug(message, fields...)
if l.logrLogger != nil && isLevelEnabled(l.logrLogger, logr.Debug) {
l.logrLogger.WithFields(zapToLogr(fields)).Debug(message)
}
}
func (l *Logger) Info(message string, fields ...Field) {
l.zap.Info(message, fields...)
if l.logrLogger != nil && isLevelEnabled(l.logrLogger, logr.Info) {
l.logrLogger.WithFields(zapToLogr(fields)).Info(message)
}
}
func (l *Logger) Warn(message string, fields ...Field) {
l.zap.Warn(message, fields...)
if l.logrLogger != nil && isLevelEnabled(l.logrLogger, logr.Warn) {
l.logrLogger.WithFields(zapToLogr(fields)).Warn(message)
}
}
func (l *Logger) Error(message string, fields ...Field) {
l.zap.Error(message, fields...)
if l.logrLogger != nil && isLevelEnabled(l.logrLogger, logr.Error) {
l.logrLogger.WithFields(zapToLogr(fields)).Error(message)
}
}
func (l *Logger) Critical(message string, fields ...Field) {
l.zap.Error(message, fields...)
if l.logrLogger != nil && isLevelEnabled(l.logrLogger, logr.Error) {
l.logrLogger.WithFields(zapToLogr(fields)).Error(message)
}
}
func (l *Logger) Log(level LogLevel, message string, fields ...Field) {
if l.logrLogger != nil && isLevelEnabled(l.logrLogger, logr.Level(level)) {
l.logrLogger.WithFields(zapToLogr(fields)).Log(logr.Level(level), message)
}
}
func (l *Logger) LogM(levels []LogLevel, message string, fields ...Field) {
if l.logrLogger != nil {
var logger *logr.Logger
for _, lvl := range levels {
if isLevelEnabled(l.logrLogger, logr.Level(lvl)) {
// don't create logger with fields unless at least one level is active.
if logger == nil {
l := l.logrLogger.WithFields(zapToLogr(fields))
logger = &l
}
logger.Log(logr.Level(lvl), message)
}
}
}
}
func (l *Logger) Flush(cxt context.Context) error {
if l.logrLogger != nil {
return l.logrLogger.Logr().Flush() // TODO: use context when Logr lib supports it.
}
return nil
}
// ShutdownAdvancedLogging stops the logger from accepting new log records and tries to
// flush queues within the context timeout. Once complete all targets are shutdown
// and any resources released.
func (l *Logger) ShutdownAdvancedLogging(cxt context.Context) error {
var err error
if l.logrLogger != nil {
err = l.logrLogger.Logr().Shutdown() // TODO: use context when Logr lib supports it.
l.logrLogger = nil
}
return err
}
// ConfigAdvancedLoggingConfig (re)configures advanced logging based on the
// specified log targets. This is the easiest way to get the advanced logger
// configured via a config source such as file.
func (l *Logger) ConfigAdvancedLogging(targets LogTargetCfg) error {
if l.logrLogger != nil {
if err := l.ShutdownAdvancedLogging(context.Background()); err != nil {
Error("error shutting down previous logger", Err(err))
}
}
logr, err := newLogr(targets)
l.logrLogger = logr
return err
}
// AddTarget adds a logr.Target to the advanced logger. This is the preferred method
// to add custom targets or provide configuration that cannot be expressed via a
//config source.
func (l *Logger) AddTarget(target logr.Target) error {
return l.logrLogger.Logr().AddTarget(target)
}

211
mlog/logr.go Normal file
View File

@@ -0,0 +1,211 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package mlog
import (
"encoding/json"
"fmt"
"io"
"os"
"github.com/hashicorp/go-multierror"
"github.com/mattermost/logr"
logrFmt "github.com/mattermost/logr/format"
"github.com/mattermost/logr/target"
"go.uber.org/zap/zapcore"
)
const (
DefaultMaxTargetQueue = 1000
DefaultSysLogPort = 514
)
type LogLevel struct {
ID logr.LevelID
Name string
Stacktrace bool
}
type LogTarget struct {
Type string // one of "console", "file", "tcp", "syslog".
Format string // one of "json", "plain"
Levels []LogLevel
Options json.RawMessage
MaxQueueSize int
}
type LogTargetCfg map[string]*LogTarget
type LogrCleanup func() error
func newLogr(targets LogTargetCfg) (*logr.Logger, error) {
var errs error
lgr := logr.Logr{}
lgr.OnExit = func(int) {}
lgr.OnPanic = func(interface{}) {}
lgr.OnLoggerError = onLoggerError
lgr.OnQueueFull = onQueueFull
lgr.OnTargetQueueFull = onTargetQueueFull
for name, t := range targets {
target, err := newLogrTarget(name, t)
if err != nil {
errs = multierror.Append(err)
continue
}
lgr.AddTarget(target)
}
logger := lgr.NewLogger()
return &logger, errs
}
func newLogrTarget(name string, t *LogTarget) (logr.Target, error) {
formatter, err := newFormatter(name, t.Format)
if err != nil {
return nil, err
}
filter, err := newFilter(name, t.Levels)
if err != nil {
return nil, err
}
if t.MaxQueueSize == 0 {
t.MaxQueueSize = DefaultMaxTargetQueue
}
switch t.Type {
case "console":
return newConsoleTarget(name, t, filter, formatter)
case "file":
return newFileTarget(name, t, filter, formatter)
case "syslog":
return newSyslogTarget(name, t, filter, formatter)
case "tcp":
return newTCPTarget(name, t, filter, formatter)
}
return nil, fmt.Errorf("invalid type '%s' for target %s", t.Type, name)
}
func newFilter(name string, levels []LogLevel) (logr.Filter, error) {
filter := &logr.CustomFilter{}
for _, lvl := range levels {
filter.Add(logr.Level(lvl))
}
return filter, nil
}
func newFormatter(name string, format string) (logr.Formatter, error) {
switch format {
case "json", "":
return &logrFmt.JSON{}, nil
case "plain":
return &logrFmt.Plain{Delim: " | "}, nil
default:
return nil, fmt.Errorf("invalid format '%s' for target %s", format, name)
}
}
func newConsoleTarget(name string, t *LogTarget, filter logr.Filter, formatter logr.Formatter) (logr.Target, error) {
type consoleOptions struct {
Out string `json:"Out"`
}
options := &consoleOptions{}
if err := json.Unmarshal(t.Options, options); err != nil {
return nil, err
}
var w io.Writer
switch options.Out {
case "stdout", "":
w = os.Stdout
case "stderr":
w = os.Stderr
default:
return nil, fmt.Errorf("invalid out '%s' for target %s", options.Out, name)
}
newTarget := target.NewWriterTarget(filter, formatter, w, t.MaxQueueSize)
return newTarget, nil
}
func newFileTarget(name string, t *LogTarget, filter logr.Filter, formatter logr.Formatter) (logr.Target, error) {
type fileOptions struct {
Filename string `json:"Filename"`
MaxSize int `json:"MaxSizeMB"`
MaxAge int `json:"MaxAgeDays"`
MaxBackups int `json:"MaxBackups"`
Compress bool `json:"Compress"`
}
options := &fileOptions{}
if err := json.Unmarshal(t.Options, options); err != nil {
return nil, err
}
if options.Filename == "" {
return nil, fmt.Errorf("missing 'Filename' option for target %s", name)
}
if err := checkFileWritable(options.Filename); err != nil {
return nil, fmt.Errorf("error writing to 'Filename' for target %s: %w", name, err)
}
newTarget := target.NewFileTarget(filter, formatter, target.FileOptions(*options), t.MaxQueueSize)
return newTarget, nil
}
func newSyslogTarget(name string, t *LogTarget, filter logr.Filter, formatter logr.Formatter) (logr.Target, error) {
options := &SyslogParams{}
if err := json.Unmarshal(t.Options, options); err != nil {
return nil, err
}
if options.IP == "" {
return nil, fmt.Errorf("missing 'IP' option for target %s", name)
}
if options.Port == 0 {
options.Port = DefaultSysLogPort
}
return NewSyslogTarget(filter, formatter, options, t.MaxQueueSize)
}
func newTCPTarget(name string, t *LogTarget, filter logr.Filter, formatter logr.Formatter) (logr.Target, error) {
options := &TcpParams{}
if err := json.Unmarshal(t.Options, options); err != nil {
return nil, err
}
if options.IP == "" {
return nil, fmt.Errorf("missing 'IP' option for target %s", name)
}
if options.Port == 0 {
return nil, fmt.Errorf("missing 'Port' option for target %s", name)
}
return NewTcpTarget(filter, formatter, options, t.MaxQueueSize)
}
func checkFileWritable(filename string) error {
// try opening/creating the file for writing
file, err := os.OpenFile(filename, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
if err != nil {
return err
}
file.Close()
return nil
}
func isLevelEnabled(logger *logr.Logger, level logr.Level) bool {
status := logger.Logr().IsLevelEnabled(level)
return status.Enabled
}
// zapToLogr converts Zap fields to Logr fields.
// This will not be needed once Logr is used for all logging.
func zapToLogr(zapFields []Field) logr.Fields {
encoder := zapcore.NewMapObjectEncoder()
for _, zapField := range zapFields {
zapField.AddTo(encoder)
}
return logr.Fields(encoder.Fields)
}

View File

@@ -1,7 +1,7 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package audit
package mlog
import (
"context"
@@ -12,49 +12,58 @@ import (
"fmt"
"io/ioutil"
"github.com/wiggin77/logr"
"github.com/mattermost/logr"
"github.com/wiggin77/merror"
syslog "github.com/wiggin77/srslog"
)
// Syslog outputs log records to local or remote syslog.
type SyslogTLS struct {
type Syslog struct {
logr.Basic
w *syslog.Writer
}
// SyslogParams provides parameters for dialing a syslogTLS daemon.
// SyslogParams provides parameters for dialing a syslog daemon.
type SyslogParams struct {
Raddr string
Cert string
Tag string
Insecure bool
IP string `json:"IP"`
Port int `json:"Port"`
Tag string `json:"Tag"`
TLS bool `json:"TLS"`
Cert string `json:"Cert"`
Insecure bool `json:"Insecure"`
}
// NewSyslogTLSTarget creates a target capable of outputting log records to remote or local syslog via TLS.
func NewSyslogTLSTarget(filter logr.Filter, formatter logr.Formatter, params *SyslogParams, maxQueue int) (*SyslogTLS, error) {
config := tls.Config{InsecureSkipVerify: params.Insecure}
if params.Cert != "" {
pool, err := getCertPool(params.Cert)
if err != nil {
return nil, err
}
config.RootCAs = pool
}
// NewSyslogTarget creates a target capable of outputting log records to remote or local syslog, with or without TLS.
func NewSyslogTarget(filter logr.Filter, formatter logr.Formatter, params *SyslogParams, maxQueue int) (*Syslog, error) {
network := "tcp"
var config *tls.Config
writer, err := syslog.DialWithTLSConfig("tcp+tls", params.Raddr, syslog.LOG_INFO, params.Tag, &config)
if params.TLS {
network = "tcp+tls"
config = &tls.Config{InsecureSkipVerify: params.Insecure}
if params.Cert != "" {
pool, err := getCertPool(params.Cert)
if err != nil {
return nil, err
}
config.RootCAs = pool
}
}
raddr := fmt.Sprintf("%s:%d", params.IP, params.Port)
writer, err := syslog.DialWithTLSConfig(network, raddr, syslog.LOG_INFO, params.Tag, config)
if err != nil {
return nil, err
}
s := &SyslogTLS{w: writer}
s := &Syslog{w: writer}
s.Basic.Start(s, s, filter, formatter, maxQueue)
return s, nil
}
// Shutdown stops processing log records after making best effort to flush queue.
func (s *SyslogTLS) Shutdown(ctx context.Context) error {
func (s *Syslog) Shutdown(ctx context.Context) error {
errs := merror.New()
err := s.Basic.Shutdown(ctx)
@@ -92,8 +101,8 @@ func getCertPool(cert string) (*x509.CertPool, error) {
}
// Write converts the log record to bytes, via the Formatter,
// and outputs to syslog via TLS.
func (s *SyslogTLS) Write(rec *logr.LogRec) error {
// and outputs to syslog.
func (s *Syslog) Write(rec *logr.LogRec) error {
_, stacktrace := s.IsLevelEnabled(rec.Level())
buf := rec.Logger().Logr().BorrowBuffer()
@@ -122,12 +131,12 @@ func (s *SyslogTLS) Write(rec *logr.LogRec) error {
if err != nil {
reporter := rec.Logger().Logr().ReportError
reporter(fmt.Errorf("syslog write fail: %w", err))
// syslogTLS writer will try to reconnect.
// syslog writer will try to reconnect.
}
return err
}
// String returns a string representation of this target.
func (s *SyslogTLS) String() string {
return "SyslogTLSTarget"
func (s *Syslog) String() string {
return "SyslogTarget"
}

View File

@@ -1,7 +1,7 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package audit
package mlog
import (
"testing"

274
mlog/tcp.go Normal file
View File

@@ -0,0 +1,274 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package mlog
import (
"context"
"crypto/tls"
"errors"
"fmt"
"net"
"sync"
"time"
"github.com/hashicorp/go-multierror"
"github.com/mattermost/logr"
_ "net/http/pprof"
)
const (
DialTimeoutSecs = 30
WriteTimeoutSecs = 30
RetryBackoffMillis int64 = 100
MaxRetryBackoffMillis int64 = 30 * 1000 // 30 seconds
)
// Tcp outputs log records to raw socket server.
type Tcp struct {
logr.Basic
params *TcpParams
addy string
mutex sync.Mutex
conn net.Conn
monitor chan struct{}
shutdown chan struct{}
}
// TcpParams provides parameters for dialing a socket server.
type TcpParams struct {
IP string `json:"IP"`
Port int `json:"Port"`
TLS bool `json:"TLS"`
Cert string `json:"Cert"`
Insecure bool `json:"Insecure"`
}
// NewTcpTarget creates a target capable of outputting log records to a raw socket, with or without TLS.
func NewTcpTarget(filter logr.Filter, formatter logr.Formatter, params *TcpParams, maxQueue int) (*Tcp, error) {
tcp := &Tcp{
params: params,
addy: fmt.Sprintf("%s:%d", params.IP, params.Port),
monitor: make(chan struct{}),
shutdown: make(chan struct{}),
}
tcp.Basic.Start(tcp, tcp, filter, formatter, maxQueue)
return tcp, nil
}
// getConn provides a net.Conn. If a connection already exists, it is returned immediately,
// otherwise this method blocks until a new connection is created, timeout or shutdown.
func (tcp *Tcp) getConn() (net.Conn, error) {
tcp.mutex.Lock()
defer tcp.mutex.Unlock()
Log(LvlTcpLogTarget, "getConn enter", String("addy", tcp.addy))
defer Log(LvlTcpLogTarget, "getConn exit", String("addy", tcp.addy))
if tcp.conn != nil {
Log(LvlTcpLogTarget, "reusing existing conn", String("addy", tcp.addy)) // use "With" once Zap is removed
return tcp.conn, nil
}
type result struct {
conn net.Conn
err error
}
connChan := make(chan result)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*DialTimeoutSecs)
defer cancel()
go func(ctx context.Context, ch chan result) {
Log(LvlTcpLogTarget, "dailing", String("addy", tcp.addy))
conn, err := tcp.dial(ctx)
if err == nil {
tcp.conn = conn
tcp.monitor = make(chan struct{})
go monitor(tcp.conn, tcp.monitor)
}
connChan <- result{conn: conn, err: err}
}(ctx, connChan)
select {
case <-tcp.shutdown:
return nil, errors.New("shutdown")
case res := <-connChan:
return res.conn, res.err
}
}
// dial connects to a TCP socket, and optionally performs a TLS handshake.
// A non-nil context must be provided which can cancel the dial.
func (tcp *Tcp) dial(ctx context.Context) (net.Conn, error) {
var dialer net.Dialer
dialer.Timeout = time.Second * DialTimeoutSecs
conn, err := dialer.DialContext(ctx, "tcp", fmt.Sprintf("%s:%d", tcp.params.IP, tcp.params.Port))
if err != nil {
return nil, err
}
if !tcp.params.TLS {
return conn, nil
}
Log(LvlTcpLogTarget, "TLS handshake", String("addy", tcp.addy))
tlsconfig := &tls.Config{
ServerName: tcp.params.IP,
InsecureSkipVerify: tcp.params.Insecure,
}
if tcp.params.Cert != "" {
pool, err := getCertPool(tcp.params.Cert)
if err != nil {
return nil, err
}
tlsconfig.RootCAs = pool
}
tlsConn := tls.Client(conn, tlsconfig)
if err := tlsConn.Handshake(); err != nil {
return nil, err
}
return tlsConn, nil
}
func (tcp *Tcp) close() error {
tcp.mutex.Lock()
defer tcp.mutex.Unlock()
var err error
if tcp.conn != nil {
Log(LvlTcpLogTarget, "closing connection", String("addy", tcp.addy))
close(tcp.monitor)
err = tcp.conn.Close()
tcp.conn = nil
}
return err
}
// Shutdown stops processing log records after making best effort to flush queue.
func (tcp *Tcp) Shutdown(ctx context.Context) error {
errs := &multierror.Error{}
Log(LvlTcpLogTarget, "shutting down", String("addy", tcp.addy))
if err := tcp.Basic.Shutdown(ctx); err != nil {
errs = multierror.Append(errs, err)
}
if err := tcp.close(); err != nil {
errs = multierror.Append(errs, err)
}
close(tcp.shutdown)
return errs.ErrorOrNil()
}
// Write converts the log record to bytes, via the Formatter, and outputs to the socket.
// Called by dedicated target goroutine and will block until success or shutdown.
func (tcp *Tcp) Write(rec *logr.LogRec) error {
_, stacktrace := tcp.IsLevelEnabled(rec.Level())
buf := rec.Logger().Logr().BorrowBuffer()
defer rec.Logger().Logr().ReleaseBuffer(buf)
buf, err := tcp.Formatter().Format(rec, stacktrace, buf)
if err != nil {
return err
}
try := 1
backoff := RetryBackoffMillis
for {
select {
case <-tcp.shutdown:
return err
default:
}
conn, err := tcp.getConn()
if err != nil {
Log(LvlTcpLogTarget, "failed getting connection", String("addy", tcp.addy), Err(err))
reporter := rec.Logger().Logr().ReportError
reporter(fmt.Errorf("log target %s connection error: %w", tcp.String(), err))
backoff = tcp.sleep(backoff)
continue
}
conn.SetWriteDeadline(time.Now().Add(time.Second * WriteTimeoutSecs))
_, err = buf.WriteTo(conn)
if err == nil {
return nil
}
Log(LvlTcpLogTarget, "write error", String("addy", tcp.addy), Err(err))
reporter := rec.Logger().Logr().ReportError
reporter(fmt.Errorf("log target %s write error: %w", tcp.String(), err))
_ = tcp.close()
backoff = tcp.sleep(backoff)
try++
Log(LvlTcpLogTarget, "retrying write", String("addy", tcp.addy), Int("try", try))
}
}
// monitor continuously tries to read from the connection to detect socket close.
// This is needed because TCP target uses a write only socket and Linux systems
// take a long time to detect a loss of connectivity on a socket when only writing;
// the writes simply fail without an error returned.
func monitor(conn net.Conn, done <-chan struct{}) {
addy := conn.RemoteAddr().String()
defer Log(LvlTcpLogTarget, "monitor exiting", String("addy", addy))
buf := make([]byte, 1)
for {
Log(LvlTcpLogTarget, "monitor loop", String("addy", addy))
select {
case <-done:
return
case <-time.After(1 * time.Second):
}
err := conn.SetReadDeadline(time.Now().Add(time.Second * 30))
if err != nil {
continue
}
_, err = conn.Read(buf)
if errt, ok := err.(net.Error); ok && errt.Timeout() {
// read timeout is expected, keep looping.
continue
}
// Any other error closes the connection, forcing a reconnect.
Log(LvlTcpLogTarget, "monitor closing connection", Err(err))
conn.Close()
return
}
}
// String returns a string representation of this target.
func (tcp *Tcp) String() string {
return fmt.Sprintf("TcpTarget[%s:%d]", tcp.params.IP, tcp.params.Port)
}
func (tcp *Tcp) sleep(backoff int64) int64 {
select {
case <-tcp.shutdown:
case <-time.After(time.Millisecond * time.Duration(backoff)):
}
nextBackoff := backoff + (backoff >> 1)
if nextBackoff > MaxRetryBackoffMillis {
nextBackoff = MaxRetryBackoffMillis
}
return nextBackoff
}

197
mlog/tcp_test.go Normal file
View File

@@ -0,0 +1,197 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package mlog
import (
"bytes"
"errors"
"fmt"
"io"
"net"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/wiggin77/merror"
)
const (
testPort = 18066
)
func TestNewTcpTarget(t *testing.T) {
target := LogTarget{
Type: "tcp",
Format: "json",
Levels: []LogLevel{LvlInfo},
Options: []byte(`{"IP": "localhost", "Port": 18066}`),
MaxQueueSize: 1000,
}
targets := map[string]*LogTarget{"tcp_test": &target}
t.Run("logging", func(t *testing.T) {
buf := &buffer{}
server, err := newSocketServer(testPort, buf)
require.NoError(t, err)
data := []string{"I drink your milkshake!", "We don't need no badges!", "You can't fight in here! This is the war room!"}
logr, err := newLogr(targets)
require.NoError(t, err)
for _, s := range data {
logr.Info(s)
}
err = logr.Logr().Flush()
require.NoError(t, err)
err = logr.Logr().Shutdown()
require.NoError(t, err)
err = server.waitForAnyConnection()
require.NoError(t, err)
err = server.stopServer(true)
require.NoError(t, err)
sdata := buf.String()
for _, s := range data {
require.Contains(t, sdata, s)
}
})
}
// socketServer is a simple socket server used for testing TCP log targets.
// Note: There is more synchronization here than normally needed to avoid flaky tests.
// For example, it's possible for a unit test to create a socketServer, attempt
// writing to it, and stop the socket server before "go ss.listen()" gets scheduled.
type socketServer struct {
listener net.Listener
anyConn chan struct{}
buf *buffer
conns map[string]*socketServerConn
mux sync.Mutex
}
type socketServerConn struct {
raddy string
conn net.Conn
done chan struct{}
}
func newSocketServer(port int, buf *buffer) (*socketServer, error) {
ss := &socketServer{
buf: buf,
conns: make(map[string]*socketServerConn),
anyConn: make(chan struct{}),
}
addy := fmt.Sprintf(":%d", port)
l, err := net.Listen("tcp4", addy)
if err != nil {
return nil, err
}
ss.listener = l
go ss.listen()
return ss, nil
}
func (ss *socketServer) listen() {
for {
conn, err := ss.listener.Accept()
if err != nil {
return
}
sconn := &socketServerConn{raddy: conn.RemoteAddr().String(), conn: conn, done: make(chan struct{})}
ss.registerConnection(sconn)
go ss.handleConnection(sconn)
}
}
func (ss *socketServer) waitForAnyConnection() error {
var err error
select {
case <-ss.anyConn:
case <-time.After(5 * time.Second):
err = errors.New("wait for any connection timed out")
}
return err
}
func (ss *socketServer) handleConnection(sconn *socketServerConn) {
close(ss.anyConn)
defer ss.unregisterConnection(sconn)
buf := make([]byte, 1024)
for {
n, err := sconn.conn.Read(buf)
if n > 0 {
ss.buf.Write(buf[:n])
}
if err == io.EOF {
ss.signalDone(sconn)
return
}
}
}
func (ss *socketServer) registerConnection(sconn *socketServerConn) {
ss.mux.Lock()
defer ss.mux.Unlock()
ss.conns[sconn.raddy] = sconn
}
func (ss *socketServer) unregisterConnection(sconn *socketServerConn) {
ss.mux.Lock()
defer ss.mux.Unlock()
delete(ss.conns, sconn.raddy)
}
func (ss *socketServer) signalDone(sconn *socketServerConn) {
ss.mux.Lock()
defer ss.mux.Unlock()
close(sconn.done)
}
func (ss *socketServer) stopServer(wait bool) error {
errs := merror.New()
ss.listener.Close()
ss.mux.Lock()
// defensive copy; no more connections can be accepted so copy will stay current.
conns := make(map[string]*socketServerConn, len(ss.conns))
for k, v := range ss.conns {
conns[k] = v
}
ss.mux.Unlock()
for _, sconn := range conns {
if wait {
select {
case <-sconn.done:
case <-time.After(time.Second * 5):
errs.Append(errors.New("timed out"))
}
}
}
return errs.ErrorOrNil()
}
type buffer struct {
buf bytes.Buffer
mux sync.Mutex
}
func (b *buffer) Write(p []byte) (n int, err error) {
b.mux.Lock()
defer b.mux.Unlock()
return b.buf.Write(p)
}
func (b *buffer) String() string {
b.mux.Lock()
defer b.mux.Unlock()
return b.buf.String()
}

View File

@@ -1062,6 +1062,7 @@ type LogSettings struct {
EnableWebhookDebugging *bool `restricted:"true"`
EnableDiagnostics *bool `restricted:"true"`
EnableSentry *bool `restricted:"true"`
AdvancedLoggingConfig *string `restricted:"true"`
}
func (s *LogSettings) SetDefaults() {
@@ -1104,6 +1105,10 @@ func (s *LogSettings) SetDefaults() {
if s.FileJson == nil {
s.FileJson = NewBool(true)
}
if s.AdvancedLoggingConfig == nil {
s.AdvancedLoggingConfig = NewString("")
}
}
type ExperimentalAuditSettings struct {

View File

@@ -81,6 +81,7 @@ type Features struct {
IDLoadedPushNotifications *bool `json:"id_loaded"`
LockTeammateNameDisplay *bool `json:"lock_teammate_name_display"`
EnterprisePlugins *bool `json:"enterprise_plugins"`
AdvancedLogging *bool `json:"advanced_logging"`
// after we enabled more features we'll need to control them with this
FutureFeatures *bool `json:"future_features"`
@@ -108,6 +109,7 @@ func (f *Features) ToMap() map[string]interface{} {
"id_loaded": *f.IDLoadedPushNotifications,
"lock_teammate_name_display": *f.LockTeammateNameDisplay,
"enterprise_plugins": *f.EnterprisePlugins,
"advanced_logging": *f.AdvancedLogging,
"future": *f.FutureFeatures,
}
}
@@ -212,6 +214,10 @@ func (f *Features) SetDefaults() {
if f.EnterprisePlugins == nil {
f.EnterprisePlugins = NewBool(*f.FutureFeatures)
}
if f.AdvancedLogging == nil {
f.AdvancedLogging = NewBool(*f.FutureFeatures)
}
}
func (l *License) IsExpired() bool {

View File

@@ -1,8 +1,7 @@
# logr
[![GoDoc](https://godoc.org/github.com/wiggin77/logr?status.svg)](http://godoc.org/github.com/wiggin77/logr)
[![Build Status](https://api.travis-ci.com/wiggin77/logr.svg?branch=master)](https://travis-ci.com/wiggin77/logr)
[![Report Card](https://goreportcard.com/badge/github.com/wiggin77/logr)](https://goreportcard.com/report/github.com/wiggin77/logr)
[![GoDoc](https://godoc.org/github.com/mattermost/logr?status.svg)](http://godoc.org/github.com/mattermost/logr)
[![Report Card](https://goreportcard.com/badge/github.com/mattermost/logr)](https://goreportcard.com/report/github.com/mattermost/logr)
Logr is a fully asynchronous, contextual logger for Go.

View File

@@ -9,7 +9,7 @@ import (
"time"
"github.com/francoispqt/gojay"
"github.com/wiggin77/logr"
"github.com/mattermost/logr"
)
// ContextField is a name/value pair within the context fields.

View File

@@ -4,7 +4,7 @@ import (
"bytes"
"fmt"
"github.com/wiggin77/logr"
"github.com/mattermost/logr"
)
// Plain is the simplest formatter, outputting only text with

View File

@@ -1,4 +1,4 @@
module github.com/wiggin77/logr
module github.com/mattermost/logr
go 1.12

View File

@@ -195,7 +195,7 @@ func (logr *Logr) IsLevelEnabled(lvl Level) LevelStatus {
// Cache and return the result.
if err := logr.lvlCache.put(lvl.ID, status); err != nil {
logr.OnLoggerError(err)
logr.ReportError(err)
return LevelStatus{}
}
return status

View File

@@ -4,7 +4,7 @@ import (
"context"
"io"
"github.com/wiggin77/logr"
"github.com/mattermost/logr"
"github.com/wiggin77/merror"
"gopkg.in/natefinch/lumberjack.v2"
)

View File

@@ -7,7 +7,7 @@ import (
"fmt"
"log/syslog"
"github.com/wiggin77/logr"
"github.com/mattermost/logr"
"github.com/wiggin77/merror"
)

View File

@@ -4,7 +4,7 @@ import (
"io"
"io/ioutil"
"github.com/wiggin77/logr"
"github.com/mattermost/logr"
)
// Writer outputs log records to any `io.Writer`.

10
vendor/modules.txt vendored
View File

@@ -283,6 +283,11 @@ github.com/mattermost/gosaml2/uuid
# github.com/mattermost/ldap v0.0.0-20191128190019-9f62ba4b8d4d
## explicit
github.com/mattermost/ldap
# github.com/mattermost/logr v1.0.5
## explicit
github.com/mattermost/logr
github.com/mattermost/logr/format
github.com/mattermost/logr/target
# github.com/mattermost/rsc v0.0.0-20160330161541-bbaefb05eaa0
## explicit
github.com/mattermost/rsc/gf256
@@ -496,11 +501,6 @@ github.com/vmihailenco/tagparser/internal/parser
github.com/wiggin77/cfg
github.com/wiggin77/cfg/ini
github.com/wiggin77/cfg/timeconv
# github.com/wiggin77/logr v1.0.4
## explicit
github.com/wiggin77/logr
github.com/wiggin77/logr/format
github.com/wiggin77/logr/target
# github.com/wiggin77/merror v1.0.2
## explicit
github.com/wiggin77/merror