grafana/pkg/services/sqlstore/sqlstore.go
Marcus Efraimsson d433ca7d40
fix util for splitting host and port
Now you can provide both a default host and a default port
2019-02-04 13:10:32 +01:00

432 lines
11 KiB
Go

package sqlstore
import (
"context"
"fmt"
"net/url"
"os"
"path"
"path/filepath"
"strings"
"testing"
"time"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/log"
m "github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/services/annotations"
"github.com/grafana/grafana/pkg/services/cache"
"github.com/grafana/grafana/pkg/services/sqlstore/migrations"
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
"github.com/grafana/grafana/pkg/services/sqlstore/sqlutil"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/util"
"github.com/go-sql-driver/mysql"
"github.com/go-xorm/xorm"
_ "github.com/grafana/grafana/pkg/tsdb/mssql"
_ "github.com/lib/pq"
sqlite3 "github.com/mattn/go-sqlite3"
)
var (
x *xorm.Engine
dialect migrator.Dialect
sqlog log.Logger = log.New("sqlstore")
)
const ContextSessionName = "db-session"
func init() {
registry.Register(&registry.Descriptor{
Name: "SqlStore",
Instance: &SqlStore{},
InitPriority: registry.High,
})
}
type SqlStore struct {
Cfg *setting.Cfg `inject:""`
Bus bus.Bus `inject:""`
CacheService *cache.CacheService `inject:""`
dbCfg DatabaseConfig
engine *xorm.Engine
log log.Logger
Dialect migrator.Dialect
skipEnsureAdmin bool
}
// NewSession returns a new DBSession
func (ss *SqlStore) NewSession() *DBSession {
return &DBSession{Session: ss.engine.NewSession()}
}
// WithDbSession calls the callback with an session attached to the context.
func (ss *SqlStore) WithDbSession(ctx context.Context, callback dbTransactionFunc) error {
sess, err := startSession(ctx, ss.engine, false)
if err != nil {
return err
}
return callback(sess)
}
// WithTransactionalDbSession calls the callback with an session within a transaction
func (ss *SqlStore) WithTransactionalDbSession(ctx context.Context, callback dbTransactionFunc) error {
return ss.inTransactionWithRetryCtx(ctx, callback, 0)
}
func (ss *SqlStore) inTransactionWithRetryCtx(ctx context.Context, callback dbTransactionFunc, retry int) error {
sess, err := startSession(ctx, ss.engine, true)
if err != nil {
return err
}
defer sess.Close()
err = callback(sess)
// special handling of database locked errors for sqlite, then we can retry 3 times
if sqlError, ok := err.(sqlite3.Error); ok && retry < 5 {
if sqlError.Code == sqlite3.ErrLocked {
sess.Rollback()
time.Sleep(time.Millisecond * time.Duration(10))
sqlog.Info("Database table locked, sleeping then retrying", "retry", retry)
return ss.inTransactionWithRetryCtx(ctx, callback, retry+1)
}
}
if err != nil {
sess.Rollback()
return err
} else if err = sess.Commit(); err != nil {
return err
}
if len(sess.events) > 0 {
for _, e := range sess.events {
if err = bus.Publish(e); err != nil {
log.Error(3, "Failed to publish event after commit. error: %v", err)
}
}
}
return nil
}
func (ss *SqlStore) Init() error {
ss.log = log.New("sqlstore")
ss.readConfig()
engine, err := ss.getEngine()
if err != nil {
return fmt.Errorf("Fail to connect to database: %v", err)
}
ss.engine = engine
ss.Dialect = migrator.NewDialect(ss.engine)
// temporarily still set global var
x = engine
dialect = ss.Dialect
migrator := migrator.NewMigrator(x)
migrations.AddMigrations(migrator)
for _, descriptor := range registry.GetServices() {
sc, ok := descriptor.Instance.(registry.DatabaseMigrator)
if ok {
sc.AddMigration(migrator)
}
}
if err := migrator.Start(); err != nil {
return fmt.Errorf("Migration failed err: %v", err)
}
// Init repo instances
annotations.SetRepository(&SqlAnnotationRepo{})
ss.Bus.SetTransactionManager(ss)
// Register handlers
ss.addUserQueryAndCommandHandlers()
// ensure admin user
if ss.skipEnsureAdmin {
return nil
}
return ss.ensureAdminUser()
}
func (ss *SqlStore) ensureAdminUser() error {
systemUserCountQuery := m.GetSystemUserCountStatsQuery{}
err := ss.InTransaction(context.Background(), func(ctx context.Context) error {
err := bus.DispatchCtx(ctx, &systemUserCountQuery)
if err != nil {
return fmt.Errorf("Could not determine if admin user exists: %v", err)
}
if systemUserCountQuery.Result.Count > 0 {
return nil
}
cmd := m.CreateUserCommand{}
cmd.Login = setting.AdminUser
cmd.Email = setting.AdminUser + "@localhost"
cmd.Password = setting.AdminPassword
cmd.IsAdmin = true
if err := bus.DispatchCtx(ctx, &cmd); err != nil {
return fmt.Errorf("Failed to create admin user: %v", err)
}
ss.log.Info("Created default admin", "user", setting.AdminUser)
return nil
})
return err
}
func (ss *SqlStore) buildExtraConnectionString(sep rune) string {
if ss.dbCfg.UrlQueryParams == nil {
return ""
}
var sb strings.Builder
for key, values := range ss.dbCfg.UrlQueryParams {
for _, value := range values {
sb.WriteRune(sep)
sb.WriteString(key)
sb.WriteRune('=')
sb.WriteString(value)
}
}
return sb.String()
}
func (ss *SqlStore) buildConnectionString() (string, error) {
cnnstr := ss.dbCfg.ConnectionString
// special case used by integration tests
if cnnstr != "" {
return cnnstr, nil
}
switch ss.dbCfg.Type {
case migrator.MYSQL:
protocol := "tcp"
if strings.HasPrefix(ss.dbCfg.Host, "/") {
protocol = "unix"
}
cnnstr = fmt.Sprintf("%s:%s@%s(%s)/%s?collation=utf8mb4_unicode_ci&allowNativePasswords=true",
ss.dbCfg.User, ss.dbCfg.Pwd, protocol, ss.dbCfg.Host, ss.dbCfg.Name)
if ss.dbCfg.SslMode == "true" || ss.dbCfg.SslMode == "skip-verify" {
tlsCert, err := makeCert("custom", ss.dbCfg)
if err != nil {
return "", err
}
mysql.RegisterTLSConfig("custom", tlsCert)
cnnstr += "&tls=custom"
}
cnnstr += ss.buildExtraConnectionString('&')
case migrator.POSTGRES:
host, port := util.SplitHostPortDefault(ss.dbCfg.Host, "127.0.0.1", "5432")
if ss.dbCfg.Pwd == "" {
ss.dbCfg.Pwd = "''"
}
if ss.dbCfg.User == "" {
ss.dbCfg.User = "''"
}
cnnstr = fmt.Sprintf("user=%s password=%s host=%s port=%s dbname=%s sslmode=%s sslcert=%s sslkey=%s sslrootcert=%s", ss.dbCfg.User, ss.dbCfg.Pwd, host, port, ss.dbCfg.Name, ss.dbCfg.SslMode, ss.dbCfg.ClientCertPath, ss.dbCfg.ClientKeyPath, ss.dbCfg.CaCertPath)
cnnstr += ss.buildExtraConnectionString(' ')
case migrator.SQLITE:
// special case for tests
if !filepath.IsAbs(ss.dbCfg.Path) {
ss.dbCfg.Path = filepath.Join(ss.Cfg.DataPath, ss.dbCfg.Path)
}
os.MkdirAll(path.Dir(ss.dbCfg.Path), os.ModePerm)
cnnstr = fmt.Sprintf("file:%s?cache=%s&mode=rwc", ss.dbCfg.Path, ss.dbCfg.CacheMode)
cnnstr += ss.buildExtraConnectionString('&')
default:
return "", fmt.Errorf("Unknown database type: %s", ss.dbCfg.Type)
}
return cnnstr, nil
}
func (ss *SqlStore) getEngine() (*xorm.Engine, error) {
connectionString, err := ss.buildConnectionString()
if err != nil {
return nil, err
}
sqlog.Info("Connecting to DB", "dbtype", ss.dbCfg.Type)
engine, err := xorm.NewEngine(ss.dbCfg.Type, connectionString)
if err != nil {
return nil, err
}
engine.SetMaxOpenConns(ss.dbCfg.MaxOpenConn)
engine.SetMaxIdleConns(ss.dbCfg.MaxIdleConn)
engine.SetConnMaxLifetime(time.Second * time.Duration(ss.dbCfg.ConnMaxLifetime))
// configure sql logging
debugSql := ss.Cfg.Raw.Section("database").Key("log_queries").MustBool(false)
if !debugSql {
engine.SetLogger(&xorm.DiscardLogger{})
} else {
engine.SetLogger(NewXormLogger(log.LvlInfo, log.New("sqlstore.xorm")))
engine.ShowSQL(true)
engine.ShowExecTime(true)
}
return engine, nil
}
func (ss *SqlStore) readConfig() {
sec := ss.Cfg.Raw.Section("database")
cfgURL := sec.Key("url").String()
if len(cfgURL) != 0 {
dbURL, _ := url.Parse(cfgURL)
ss.dbCfg.Type = dbURL.Scheme
ss.dbCfg.Host = dbURL.Host
pathSplit := strings.Split(dbURL.Path, "/")
if len(pathSplit) > 1 {
ss.dbCfg.Name = pathSplit[1]
}
userInfo := dbURL.User
if userInfo != nil {
ss.dbCfg.User = userInfo.Username()
ss.dbCfg.Pwd, _ = userInfo.Password()
}
ss.dbCfg.UrlQueryParams = dbURL.Query()
} else {
ss.dbCfg.Type = sec.Key("type").String()
ss.dbCfg.Host = sec.Key("host").String()
ss.dbCfg.Name = sec.Key("name").String()
ss.dbCfg.User = sec.Key("user").String()
ss.dbCfg.ConnectionString = sec.Key("connection_string").String()
ss.dbCfg.Pwd = sec.Key("password").String()
}
ss.dbCfg.MaxOpenConn = sec.Key("max_open_conn").MustInt(0)
ss.dbCfg.MaxIdleConn = sec.Key("max_idle_conn").MustInt(2)
ss.dbCfg.ConnMaxLifetime = sec.Key("conn_max_lifetime").MustInt(14400)
ss.dbCfg.SslMode = sec.Key("ssl_mode").String()
ss.dbCfg.CaCertPath = sec.Key("ca_cert_path").String()
ss.dbCfg.ClientKeyPath = sec.Key("client_key_path").String()
ss.dbCfg.ClientCertPath = sec.Key("client_cert_path").String()
ss.dbCfg.ServerCertName = sec.Key("server_cert_name").String()
ss.dbCfg.Path = sec.Key("path").MustString("data/grafana.db")
ss.dbCfg.CacheMode = sec.Key("cache_mode").MustString("private")
}
func InitTestDB(t *testing.T) *SqlStore {
t.Helper()
sqlstore := &SqlStore{}
sqlstore.skipEnsureAdmin = true
sqlstore.Bus = bus.New()
sqlstore.CacheService = cache.New(5*time.Minute, 10*time.Minute)
dbType := migrator.SQLITE
// environment variable present for test db?
if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
dbType = db
}
// set test db config
sqlstore.Cfg = setting.NewCfg()
sec, _ := sqlstore.Cfg.Raw.NewSection("database")
sec.NewKey("type", dbType)
switch dbType {
case "mysql":
sec.NewKey("connection_string", sqlutil.TestDB_Mysql.ConnStr)
case "postgres":
sec.NewKey("connection_string", sqlutil.TestDB_Postgres.ConnStr)
default:
sec.NewKey("connection_string", sqlutil.TestDB_Sqlite3.ConnStr)
}
// need to get engine to clean db before we init
engine, err := xorm.NewEngine(dbType, sec.Key("connection_string").String())
if err != nil {
t.Fatalf("Failed to init test database: %v", err)
}
sqlstore.Dialect = migrator.NewDialect(engine)
// temp global var until we get rid of global vars
dialect = sqlstore.Dialect
if err := dialect.CleanDB(); err != nil {
t.Fatalf("Failed to clean test db %v", err)
}
if err := sqlstore.Init(); err != nil {
t.Fatalf("Failed to init test database: %v", err)
}
sqlstore.engine.DatabaseTZ = time.UTC
sqlstore.engine.TZLocation = time.UTC
return sqlstore
}
func IsTestDbMySql() bool {
if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
return db == migrator.MYSQL
}
return false
}
func IsTestDbPostgres() bool {
if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
return db == migrator.POSTGRES
}
return false
}
type DatabaseConfig struct {
Type string
Host string
Name string
User string
Pwd string
Path string
SslMode string
CaCertPath string
ClientKeyPath string
ClientCertPath string
ServerCertName string
ConnectionString string
MaxOpenConn int
MaxIdleConn int
ConnMaxLifetime int
CacheMode string
UrlQueryParams map[string][]string
}