mirror of
https://github.com/mattermost/mattermost.git
synced 2025-02-25 18:55:24 -06:00
Adds Advanced Logging to server. Advanced Logging is an optional logging capability that allows customers to send log records to any number of destinations. Supported destinations: - file - syslog (with out without TLS) - raw TCP socket (with out without TLS) Allows developers to specify discrete log levels as well as the standard trace, debug, info, ... panic. Existing code and logging API usage is unchanged. Log records are emitted asynchronously to reduce latency to the caller. Supports hot-reloading of logger config, including adding removing targets. Advanced Logging is configured within config.json via "LogSettings.AdvancedLoggingConfig" which can contain a filespec to another config file, a database DSN, or JSON.
275 lines
6.9 KiB
Go
275 lines
6.9 KiB
Go
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
|
|
// See LICENSE.txt for license information.
|
|
|
|
package mlog
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"errors"
|
|
"fmt"
|
|
"net"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/hashicorp/go-multierror"
|
|
"github.com/mattermost/logr"
|
|
|
|
_ "net/http/pprof"
|
|
)
|
|
|
|
const (
|
|
DialTimeoutSecs = 30
|
|
WriteTimeoutSecs = 30
|
|
RetryBackoffMillis int64 = 100
|
|
MaxRetryBackoffMillis int64 = 30 * 1000 // 30 seconds
|
|
)
|
|
|
|
// Tcp outputs log records to raw socket server.
|
|
type Tcp struct {
|
|
logr.Basic
|
|
|
|
params *TcpParams
|
|
addy string
|
|
|
|
mutex sync.Mutex
|
|
conn net.Conn
|
|
monitor chan struct{}
|
|
shutdown chan struct{}
|
|
}
|
|
|
|
// TcpParams provides parameters for dialing a socket server.
|
|
type TcpParams struct {
|
|
IP string `json:"IP"`
|
|
Port int `json:"Port"`
|
|
TLS bool `json:"TLS"`
|
|
Cert string `json:"Cert"`
|
|
Insecure bool `json:"Insecure"`
|
|
}
|
|
|
|
// NewTcpTarget creates a target capable of outputting log records to a raw socket, with or without TLS.
|
|
func NewTcpTarget(filter logr.Filter, formatter logr.Formatter, params *TcpParams, maxQueue int) (*Tcp, error) {
|
|
tcp := &Tcp{
|
|
params: params,
|
|
addy: fmt.Sprintf("%s:%d", params.IP, params.Port),
|
|
monitor: make(chan struct{}),
|
|
shutdown: make(chan struct{}),
|
|
}
|
|
tcp.Basic.Start(tcp, tcp, filter, formatter, maxQueue)
|
|
|
|
return tcp, nil
|
|
}
|
|
|
|
// getConn provides a net.Conn. If a connection already exists, it is returned immediately,
|
|
// otherwise this method blocks until a new connection is created, timeout or shutdown.
|
|
func (tcp *Tcp) getConn() (net.Conn, error) {
|
|
tcp.mutex.Lock()
|
|
defer tcp.mutex.Unlock()
|
|
|
|
Log(LvlTcpLogTarget, "getConn enter", String("addy", tcp.addy))
|
|
defer Log(LvlTcpLogTarget, "getConn exit", String("addy", tcp.addy))
|
|
|
|
if tcp.conn != nil {
|
|
Log(LvlTcpLogTarget, "reusing existing conn", String("addy", tcp.addy)) // use "With" once Zap is removed
|
|
return tcp.conn, nil
|
|
}
|
|
|
|
type result struct {
|
|
conn net.Conn
|
|
err error
|
|
}
|
|
|
|
connChan := make(chan result)
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*DialTimeoutSecs)
|
|
defer cancel()
|
|
|
|
go func(ctx context.Context, ch chan result) {
|
|
Log(LvlTcpLogTarget, "dailing", String("addy", tcp.addy))
|
|
conn, err := tcp.dial(ctx)
|
|
if err == nil {
|
|
tcp.conn = conn
|
|
tcp.monitor = make(chan struct{})
|
|
go monitor(tcp.conn, tcp.monitor)
|
|
}
|
|
connChan <- result{conn: conn, err: err}
|
|
}(ctx, connChan)
|
|
|
|
select {
|
|
case <-tcp.shutdown:
|
|
return nil, errors.New("shutdown")
|
|
case res := <-connChan:
|
|
return res.conn, res.err
|
|
}
|
|
}
|
|
|
|
// dial connects to a TCP socket, and optionally performs a TLS handshake.
|
|
// A non-nil context must be provided which can cancel the dial.
|
|
func (tcp *Tcp) dial(ctx context.Context) (net.Conn, error) {
|
|
var dialer net.Dialer
|
|
dialer.Timeout = time.Second * DialTimeoutSecs
|
|
conn, err := dialer.DialContext(ctx, "tcp", fmt.Sprintf("%s:%d", tcp.params.IP, tcp.params.Port))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if !tcp.params.TLS {
|
|
return conn, nil
|
|
}
|
|
|
|
Log(LvlTcpLogTarget, "TLS handshake", String("addy", tcp.addy))
|
|
|
|
tlsconfig := &tls.Config{
|
|
ServerName: tcp.params.IP,
|
|
InsecureSkipVerify: tcp.params.Insecure,
|
|
}
|
|
if tcp.params.Cert != "" {
|
|
pool, err := getCertPool(tcp.params.Cert)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tlsconfig.RootCAs = pool
|
|
}
|
|
|
|
tlsConn := tls.Client(conn, tlsconfig)
|
|
if err := tlsConn.Handshake(); err != nil {
|
|
return nil, err
|
|
}
|
|
return tlsConn, nil
|
|
}
|
|
|
|
func (tcp *Tcp) close() error {
|
|
tcp.mutex.Lock()
|
|
defer tcp.mutex.Unlock()
|
|
|
|
var err error
|
|
if tcp.conn != nil {
|
|
Log(LvlTcpLogTarget, "closing connection", String("addy", tcp.addy))
|
|
close(tcp.monitor)
|
|
err = tcp.conn.Close()
|
|
tcp.conn = nil
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Shutdown stops processing log records after making best effort to flush queue.
|
|
func (tcp *Tcp) Shutdown(ctx context.Context) error {
|
|
errs := &multierror.Error{}
|
|
|
|
Log(LvlTcpLogTarget, "shutting down", String("addy", tcp.addy))
|
|
|
|
if err := tcp.Basic.Shutdown(ctx); err != nil {
|
|
errs = multierror.Append(errs, err)
|
|
}
|
|
|
|
if err := tcp.close(); err != nil {
|
|
errs = multierror.Append(errs, err)
|
|
}
|
|
|
|
close(tcp.shutdown)
|
|
return errs.ErrorOrNil()
|
|
}
|
|
|
|
// Write converts the log record to bytes, via the Formatter, and outputs to the socket.
|
|
// Called by dedicated target goroutine and will block until success or shutdown.
|
|
func (tcp *Tcp) Write(rec *logr.LogRec) error {
|
|
_, stacktrace := tcp.IsLevelEnabled(rec.Level())
|
|
|
|
buf := rec.Logger().Logr().BorrowBuffer()
|
|
defer rec.Logger().Logr().ReleaseBuffer(buf)
|
|
|
|
buf, err := tcp.Formatter().Format(rec, stacktrace, buf)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
try := 1
|
|
backoff := RetryBackoffMillis
|
|
for {
|
|
select {
|
|
case <-tcp.shutdown:
|
|
return err
|
|
default:
|
|
}
|
|
|
|
conn, err := tcp.getConn()
|
|
if err != nil {
|
|
Log(LvlTcpLogTarget, "failed getting connection", String("addy", tcp.addy), Err(err))
|
|
reporter := rec.Logger().Logr().ReportError
|
|
reporter(fmt.Errorf("log target %s connection error: %w", tcp.String(), err))
|
|
backoff = tcp.sleep(backoff)
|
|
continue
|
|
}
|
|
|
|
conn.SetWriteDeadline(time.Now().Add(time.Second * WriteTimeoutSecs))
|
|
_, err = buf.WriteTo(conn)
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
|
|
Log(LvlTcpLogTarget, "write error", String("addy", tcp.addy), Err(err))
|
|
reporter := rec.Logger().Logr().ReportError
|
|
reporter(fmt.Errorf("log target %s write error: %w", tcp.String(), err))
|
|
|
|
_ = tcp.close()
|
|
|
|
backoff = tcp.sleep(backoff)
|
|
try++
|
|
Log(LvlTcpLogTarget, "retrying write", String("addy", tcp.addy), Int("try", try))
|
|
}
|
|
}
|
|
|
|
// monitor continuously tries to read from the connection to detect socket close.
|
|
// This is needed because TCP target uses a write only socket and Linux systems
|
|
// take a long time to detect a loss of connectivity on a socket when only writing;
|
|
// the writes simply fail without an error returned.
|
|
func monitor(conn net.Conn, done <-chan struct{}) {
|
|
addy := conn.RemoteAddr().String()
|
|
defer Log(LvlTcpLogTarget, "monitor exiting", String("addy", addy))
|
|
|
|
buf := make([]byte, 1)
|
|
for {
|
|
Log(LvlTcpLogTarget, "monitor loop", String("addy", addy))
|
|
|
|
select {
|
|
case <-done:
|
|
return
|
|
case <-time.After(1 * time.Second):
|
|
}
|
|
|
|
err := conn.SetReadDeadline(time.Now().Add(time.Second * 30))
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
_, err = conn.Read(buf)
|
|
|
|
if errt, ok := err.(net.Error); ok && errt.Timeout() {
|
|
// read timeout is expected, keep looping.
|
|
continue
|
|
}
|
|
|
|
// Any other error closes the connection, forcing a reconnect.
|
|
Log(LvlTcpLogTarget, "monitor closing connection", Err(err))
|
|
conn.Close()
|
|
return
|
|
}
|
|
}
|
|
|
|
// String returns a string representation of this target.
|
|
func (tcp *Tcp) String() string {
|
|
return fmt.Sprintf("TcpTarget[%s:%d]", tcp.params.IP, tcp.params.Port)
|
|
}
|
|
|
|
func (tcp *Tcp) sleep(backoff int64) int64 {
|
|
select {
|
|
case <-tcp.shutdown:
|
|
case <-time.After(time.Millisecond * time.Duration(backoff)):
|
|
}
|
|
|
|
nextBackoff := backoff + (backoff >> 1)
|
|
if nextBackoff > MaxRetryBackoffMillis {
|
|
nextBackoff = MaxRetryBackoffMillis
|
|
}
|
|
return nextBackoff
|
|
}
|