mirror of
https://github.com/grafana/grafana.git
synced 2025-02-10 07:35:45 -06:00
sql: improve sqleng-api, leave sql.DB creation to the plugins (#79672)
This commit is contained in:
parent
8923cc27ce
commit
bfb85f27b1
@ -2,11 +2,13 @@ package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
|
||||
@ -108,8 +110,6 @@ func (s *Service) newInstanceSettings(cfg *setting.Cfg) datasource.InstanceFacto
|
||||
}
|
||||
|
||||
config := sqleng.DataPluginConfiguration{
|
||||
DriverName: driverName,
|
||||
ConnectionString: cnnstr,
|
||||
DSInfo: dsInfo,
|
||||
MetricColumnTypes: []string{"UNKNOWN", "TEXT", "VARCHAR", "CHAR"},
|
||||
RowLimit: cfg.DataProxyRowLimit,
|
||||
@ -117,7 +117,16 @@ func (s *Service) newInstanceSettings(cfg *setting.Cfg) datasource.InstanceFacto
|
||||
|
||||
queryResultTransformer := postgresQueryResultTransformer{}
|
||||
|
||||
handler, err := sqleng.NewQueryDataHandler(cfg, config, &queryResultTransformer, newPostgresMacroEngine(dsInfo.JsonData.Timescaledb),
|
||||
db, err := sql.Open(driverName, cnnstr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
db.SetMaxOpenConns(config.DSInfo.JsonData.MaxOpenConns)
|
||||
db.SetMaxIdleConns(config.DSInfo.JsonData.MaxIdleConns)
|
||||
db.SetConnMaxLifetime(time.Duration(config.DSInfo.JsonData.ConnMaxLifetime) * time.Second)
|
||||
|
||||
handler, err := sqleng.NewQueryDataHandler(cfg, db, config, &queryResultTransformer, newPostgresMacroEngine(dsInfo.JsonData.Timescaledb),
|
||||
logger)
|
||||
if err != nil {
|
||||
logger.Error("Failed connecting to Postgres", "err", err)
|
||||
|
@ -184,17 +184,10 @@ func TestIntegrationPostgres(t *testing.T) {
|
||||
t.Skip()
|
||||
}
|
||||
|
||||
x := InitPostgresTestDB(t)
|
||||
|
||||
origDB := sqleng.NewDB
|
||||
origInterpolate := sqleng.Interpolate
|
||||
t.Cleanup(func() {
|
||||
sqleng.NewDB = origDB
|
||||
sqleng.Interpolate = origInterpolate
|
||||
})
|
||||
sqleng.NewDB = func(d, c string) (*sql.DB, error) {
|
||||
return x, nil
|
||||
}
|
||||
sqleng.Interpolate = func(query backend.DataQuery, timeRange backend.TimeRange, timeInterval string, sql string) (string, error) {
|
||||
return sql, nil
|
||||
}
|
||||
@ -216,8 +209,6 @@ func TestIntegrationPostgres(t *testing.T) {
|
||||
}
|
||||
|
||||
config := sqleng.DataPluginConfiguration{
|
||||
DriverName: "postgres",
|
||||
ConnectionString: "",
|
||||
DSInfo: dsInfo,
|
||||
MetricColumnTypes: []string{"UNKNOWN", "TEXT", "VARCHAR", "CHAR"},
|
||||
RowLimit: 1000000,
|
||||
@ -226,12 +217,14 @@ func TestIntegrationPostgres(t *testing.T) {
|
||||
queryResultTransformer := postgresQueryResultTransformer{}
|
||||
|
||||
logger := backend.NewLoggerWith("logger", "postgres.test")
|
||||
exe, err := sqleng.NewQueryDataHandler(cfg, config, &queryResultTransformer, newPostgresMacroEngine(dsInfo.JsonData.Timescaledb),
|
||||
|
||||
db := InitPostgresTestDB(t, jsonData)
|
||||
|
||||
exe, err := sqleng.NewQueryDataHandler(cfg, db, config, &queryResultTransformer, newPostgresMacroEngine(dsInfo.JsonData.Timescaledb),
|
||||
logger)
|
||||
|
||||
require.NoError(t, err)
|
||||
|
||||
db := x
|
||||
fromStart := time.Date(2018, 3, 15, 13, 0, 0, 0, time.UTC).In(time.Local)
|
||||
|
||||
t.Run("Given a table with different native data types", func(t *testing.T) {
|
||||
@ -1282,8 +1275,6 @@ func TestIntegrationPostgres(t *testing.T) {
|
||||
t.Run("When row limit set to 1", func(t *testing.T) {
|
||||
dsInfo := sqleng.DataSourceInfo{}
|
||||
config := sqleng.DataPluginConfiguration{
|
||||
DriverName: "postgres",
|
||||
ConnectionString: "",
|
||||
DSInfo: dsInfo,
|
||||
MetricColumnTypes: []string{"UNKNOWN", "TEXT", "VARCHAR", "CHAR"},
|
||||
RowLimit: 1,
|
||||
@ -1291,7 +1282,7 @@ func TestIntegrationPostgres(t *testing.T) {
|
||||
|
||||
queryResultTransformer := postgresQueryResultTransformer{}
|
||||
|
||||
handler, err := sqleng.NewQueryDataHandler(setting.NewCfg(), config, &queryResultTransformer, newPostgresMacroEngine(false), logger)
|
||||
handler, err := sqleng.NewQueryDataHandler(setting.NewCfg(), db, config, &queryResultTransformer, newPostgresMacroEngine(false), logger)
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Run("When doing a table query that returns 2 rows should limit the result to 1 row", func(t *testing.T) {
|
||||
@ -1392,11 +1383,15 @@ func TestIntegrationPostgres(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func InitPostgresTestDB(t *testing.T) *sql.DB {
|
||||
func InitPostgresTestDB(t *testing.T, jsonData sqleng.JsonData) *sql.DB {
|
||||
connStr := postgresTestDBConnString()
|
||||
x, err := sql.Open("postgres", connStr)
|
||||
db, err := sql.Open("postgres", connStr)
|
||||
require.NoError(t, err, "Failed to init postgres DB")
|
||||
return x
|
||||
|
||||
db.SetMaxOpenConns(jsonData.MaxOpenConns)
|
||||
db.SetMaxIdleConns(jsonData.MaxIdleConns)
|
||||
db.SetConnMaxLifetime(time.Duration(jsonData.ConnMaxLifetime) * time.Second)
|
||||
return db
|
||||
}
|
||||
|
||||
func genTimeRangeByInterval(from time.Time, duration time.Duration, interval time.Duration) []time.Time {
|
||||
|
@ -2,6 +2,7 @@ package mssql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/url"
|
||||
@ -9,6 +10,7 @@ import (
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-azure-sdk-go/azcredentials"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
@ -125,8 +127,6 @@ func newInstanceSettings(cfg *setting.Cfg, logger log.Logger) datasource.Instanc
|
||||
}
|
||||
|
||||
config := sqleng.DataPluginConfiguration{
|
||||
DriverName: driverName,
|
||||
ConnectionString: cnnstr,
|
||||
DSInfo: dsInfo,
|
||||
MetricColumnTypes: []string{"VARCHAR", "CHAR", "NVARCHAR", "NCHAR"},
|
||||
RowLimit: cfg.DataProxyRowLimit,
|
||||
@ -136,7 +136,16 @@ func newInstanceSettings(cfg *setting.Cfg, logger log.Logger) datasource.Instanc
|
||||
userError: cfg.UserFacingDefaultError,
|
||||
}
|
||||
|
||||
return sqleng.NewQueryDataHandler(cfg, config, &queryResultTransformer, newMssqlMacroEngine(), logger)
|
||||
db, err := sql.Open(driverName, cnnstr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
db.SetMaxOpenConns(config.DSInfo.JsonData.MaxOpenConns)
|
||||
db.SetMaxIdleConns(config.DSInfo.JsonData.MaxIdleConns)
|
||||
db.SetConnMaxLifetime(time.Duration(config.DSInfo.JsonData.ConnMaxLifetime) * time.Second)
|
||||
|
||||
return sqleng.NewQueryDataHandler(cfg, db, config, &queryResultTransformer, newMssqlMacroEngine(), logger)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -39,21 +39,9 @@ func TestMSSQL(t *testing.T) {
|
||||
t.Skip()
|
||||
}
|
||||
|
||||
x := initMSSQLTestDB(t)
|
||||
origDB := sqleng.NewDB
|
||||
t.Cleanup(func() {
|
||||
sqleng.NewDB = origDB
|
||||
})
|
||||
|
||||
sqleng.NewDB = func(d, c string) (*sql.DB, error) {
|
||||
return x, nil
|
||||
}
|
||||
|
||||
queryResultTransformer := mssqlQueryResultTransformer{}
|
||||
dsInfo := sqleng.DataSourceInfo{}
|
||||
config := sqleng.DataPluginConfiguration{
|
||||
DriverName: "mssql",
|
||||
ConnectionString: "",
|
||||
DSInfo: dsInfo,
|
||||
MetricColumnTypes: []string{"VARCHAR", "CHAR", "NVARCHAR", "NCHAR"},
|
||||
RowLimit: 1000000,
|
||||
@ -61,10 +49,10 @@ func TestMSSQL(t *testing.T) {
|
||||
|
||||
logger := backend.NewLoggerWith("logger", "mssql.test")
|
||||
|
||||
endpoint, err := sqleng.NewQueryDataHandler(setting.NewCfg(), config, &queryResultTransformer, newMssqlMacroEngine(), logger)
|
||||
require.NoError(t, err)
|
||||
db := initMSSQLTestDB(t, config.DSInfo.JsonData)
|
||||
|
||||
db := x
|
||||
endpoint, err := sqleng.NewQueryDataHandler(setting.NewCfg(), db, config, &queryResultTransformer, newMssqlMacroEngine(), logger)
|
||||
require.NoError(t, err)
|
||||
|
||||
fromStart := time.Date(2018, 3, 15, 13, 0, 0, 0, time.UTC).In(time.Local)
|
||||
|
||||
@ -811,13 +799,11 @@ func TestMSSQL(t *testing.T) {
|
||||
queryResultTransformer := mssqlQueryResultTransformer{}
|
||||
dsInfo := sqleng.DataSourceInfo{}
|
||||
config := sqleng.DataPluginConfiguration{
|
||||
DriverName: "mssql",
|
||||
ConnectionString: "",
|
||||
DSInfo: dsInfo,
|
||||
MetricColumnTypes: []string{"VARCHAR", "CHAR", "NVARCHAR", "NCHAR"},
|
||||
RowLimit: 1000000,
|
||||
}
|
||||
endpoint, err := sqleng.NewQueryDataHandler(setting.NewCfg(), config, &queryResultTransformer, newMssqlMacroEngine(), logger)
|
||||
endpoint, err := sqleng.NewQueryDataHandler(setting.NewCfg(), db, config, &queryResultTransformer, newMssqlMacroEngine(), logger)
|
||||
require.NoError(t, err)
|
||||
query := &backend.QueryDataRequest{
|
||||
Queries: []backend.DataQuery{
|
||||
@ -1216,14 +1202,12 @@ func TestMSSQL(t *testing.T) {
|
||||
queryResultTransformer := mssqlQueryResultTransformer{}
|
||||
dsInfo := sqleng.DataSourceInfo{}
|
||||
config := sqleng.DataPluginConfiguration{
|
||||
DriverName: "mssql",
|
||||
ConnectionString: "",
|
||||
DSInfo: dsInfo,
|
||||
MetricColumnTypes: []string{"VARCHAR", "CHAR", "NVARCHAR", "NCHAR"},
|
||||
RowLimit: 1,
|
||||
}
|
||||
|
||||
handler, err := sqleng.NewQueryDataHandler(setting.NewCfg(), config, &queryResultTransformer, newMssqlMacroEngine(), logger)
|
||||
handler, err := sqleng.NewQueryDataHandler(setting.NewCfg(), db, config, &queryResultTransformer, newMssqlMacroEngine(), logger)
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Run("When doing a table query that returns 2 rows should limit the result to 1 row", func(t *testing.T) {
|
||||
@ -1487,15 +1471,19 @@ func TestGenerateConnectionString(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func initMSSQLTestDB(t *testing.T) *sql.DB {
|
||||
func initMSSQLTestDB(t *testing.T, jsonData sqleng.JsonData) *sql.DB {
|
||||
t.Helper()
|
||||
|
||||
testDB := sqlutil.MSSQLTestDB()
|
||||
x, err := sql.Open(testDB.DriverName, strings.Replace(testDB.ConnStr, "localhost",
|
||||
db, err := sql.Open(testDB.DriverName, strings.Replace(testDB.ConnStr, "localhost",
|
||||
serverIP, 1))
|
||||
require.NoError(t, err)
|
||||
|
||||
return x
|
||||
db.SetMaxOpenConns(jsonData.MaxOpenConns)
|
||||
db.SetMaxIdleConns(jsonData.MaxIdleConns)
|
||||
db.SetConnMaxLifetime(time.Duration(jsonData.ConnMaxLifetime) * time.Second)
|
||||
|
||||
return db
|
||||
}
|
||||
|
||||
func genTimeRangeByInterval(from time.Time, duration time.Duration, interval time.Duration) []time.Time {
|
||||
|
@ -2,6 +2,7 @@ package mysql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
@ -138,8 +139,6 @@ func newInstanceSettings(cfg *setting.Cfg, logger log.Logger) datasource.Instanc
|
||||
}
|
||||
|
||||
config := sqleng.DataPluginConfiguration{
|
||||
DriverName: "mysql",
|
||||
ConnectionString: cnnstr,
|
||||
DSInfo: dsInfo,
|
||||
TimeColumnNames: []string{"time", "time_sec"},
|
||||
MetricColumnTypes: []string{"CHAR", "VARCHAR", "TINYTEXT", "TEXT", "MEDIUMTEXT", "LONGTEXT"},
|
||||
@ -150,7 +149,16 @@ func newInstanceSettings(cfg *setting.Cfg, logger log.Logger) datasource.Instanc
|
||||
userError: cfg.UserFacingDefaultError,
|
||||
}
|
||||
|
||||
return sqleng.NewQueryDataHandler(cfg, config, &rowTransformer, newMysqlMacroEngine(logger, cfg), logger)
|
||||
db, err := sql.Open("mysql", cnnstr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
db.SetMaxOpenConns(config.DSInfo.JsonData.MaxOpenConns)
|
||||
db.SetMaxIdleConns(config.DSInfo.JsonData.MaxIdleConns)
|
||||
db.SetConnMaxLifetime(time.Duration(config.DSInfo.JsonData.ConnMaxLifetime) * time.Second)
|
||||
|
||||
return sqleng.NewQueryDataHandler(cfg, db, config, &rowTransformer, newMysqlMacroEngine(logger, cfg), logger)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -37,19 +37,11 @@ func TestIntegrationMySQL(t *testing.T) {
|
||||
t.Skip()
|
||||
}
|
||||
|
||||
x := InitMySQLTestDB(t)
|
||||
|
||||
origDB := sqleng.NewDB
|
||||
origInterpolate := sqleng.Interpolate
|
||||
t.Cleanup(func() {
|
||||
sqleng.NewDB = origDB
|
||||
sqleng.Interpolate = origInterpolate
|
||||
})
|
||||
|
||||
sqleng.NewDB = func(d, c string) (*sql.DB, error) {
|
||||
return x, nil
|
||||
}
|
||||
|
||||
sqleng.Interpolate = func(query backend.DataQuery, timeRange backend.TimeRange, timeInterval string, sql string) (string, error) {
|
||||
return sql, nil
|
||||
}
|
||||
@ -63,8 +55,6 @@ func TestIntegrationMySQL(t *testing.T) {
|
||||
}
|
||||
|
||||
config := sqleng.DataPluginConfiguration{
|
||||
DriverName: "mysql",
|
||||
ConnectionString: "",
|
||||
DSInfo: dsInfo,
|
||||
TimeColumnNames: []string{"time", "time_sec"},
|
||||
MetricColumnTypes: []string{"CHAR", "VARCHAR", "TINYTEXT", "TEXT", "MEDIUMTEXT", "LONGTEXT"},
|
||||
@ -74,11 +64,13 @@ func TestIntegrationMySQL(t *testing.T) {
|
||||
rowTransformer := mysqlQueryResultTransformer{}
|
||||
|
||||
logger := backend.NewLoggerWith("logger", "mysql.test")
|
||||
exe, err := sqleng.NewQueryDataHandler(setting.NewCfg(), config, &rowTransformer, newMysqlMacroEngine(logger, setting.NewCfg()), logger)
|
||||
|
||||
db := InitMySQLTestDB(t, config.DSInfo.JsonData)
|
||||
|
||||
exe, err := sqleng.NewQueryDataHandler(setting.NewCfg(), db, config, &rowTransformer, newMysqlMacroEngine(logger, setting.NewCfg()), logger)
|
||||
|
||||
require.NoError(t, err)
|
||||
|
||||
db := x
|
||||
fromStart := time.Date(2018, 3, 15, 13, 0, 0, 0, time.UTC)
|
||||
|
||||
t.Run("Given a table with different native data types", func(t *testing.T) {
|
||||
@ -1178,8 +1170,6 @@ func TestIntegrationMySQL(t *testing.T) {
|
||||
t.Run("When row limit set to 1", func(t *testing.T) {
|
||||
dsInfo := sqleng.DataSourceInfo{}
|
||||
config := sqleng.DataPluginConfiguration{
|
||||
DriverName: "mysql",
|
||||
ConnectionString: "",
|
||||
DSInfo: dsInfo,
|
||||
TimeColumnNames: []string{"time", "time_sec"},
|
||||
MetricColumnTypes: []string{"CHAR", "VARCHAR", "TINYTEXT", "TEXT", "MEDIUMTEXT", "LONGTEXT"},
|
||||
@ -1188,7 +1178,7 @@ func TestIntegrationMySQL(t *testing.T) {
|
||||
|
||||
queryResultTransformer := mysqlQueryResultTransformer{}
|
||||
|
||||
handler, err := sqleng.NewQueryDataHandler(setting.NewCfg(), config, &queryResultTransformer, newMysqlMacroEngine(logger, setting.NewCfg()), logger)
|
||||
handler, err := sqleng.NewQueryDataHandler(setting.NewCfg(), db, config, &queryResultTransformer, newMysqlMacroEngine(logger, setting.NewCfg()), logger)
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Run("When doing a table query that returns 2 rows should limit the result to 1 row", func(t *testing.T) {
|
||||
@ -1290,14 +1280,18 @@ func TestIntegrationMySQL(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func InitMySQLTestDB(t *testing.T) *sql.DB {
|
||||
func InitMySQLTestDB(t *testing.T, jsonData sqleng.JsonData) *sql.DB {
|
||||
connStr := mySQLTestDBConnStr()
|
||||
x, err := sql.Open("mysql", connStr)
|
||||
db, err := sql.Open("mysql", connStr)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to init mysql db %v", err)
|
||||
}
|
||||
|
||||
return x
|
||||
db.SetMaxOpenConns(jsonData.MaxOpenConns)
|
||||
db.SetMaxIdleConns(jsonData.MaxIdleConns)
|
||||
db.SetConnMaxLifetime(time.Duration(jsonData.ConnMaxLifetime) * time.Second)
|
||||
|
||||
return db
|
||||
}
|
||||
|
||||
func genTimeRangeByInterval(from time.Time, duration time.Duration, interval time.Duration) []time.Time {
|
||||
|
@ -44,13 +44,6 @@ type SqlQueryResultTransformer interface {
|
||||
|
||||
var sqlIntervalCalculator = intervalv2.NewCalculator()
|
||||
|
||||
// NewDB is a sql.DB factory, that can be stubbed by tests.
|
||||
//
|
||||
//nolint:gocritic
|
||||
var NewDB = func(driverName string, connectionString string) (*sql.DB, error) {
|
||||
return sql.Open(driverName, connectionString)
|
||||
}
|
||||
|
||||
type JsonData struct {
|
||||
MaxOpenConns int `json:"maxOpenConns"`
|
||||
MaxIdleConns int `json:"maxIdleConns"`
|
||||
@ -86,9 +79,7 @@ type DataSourceInfo struct {
|
||||
}
|
||||
|
||||
type DataPluginConfiguration struct {
|
||||
DriverName string
|
||||
DSInfo DataSourceInfo
|
||||
ConnectionString string
|
||||
TimeColumnNames []string
|
||||
MetricColumnTypes []string
|
||||
RowLimit int64
|
||||
@ -129,13 +120,8 @@ func (e *DataSourceHandler) TransformQueryError(logger log.Logger, err error) er
|
||||
return e.queryResultTransformer.TransformQueryError(logger, err)
|
||||
}
|
||||
|
||||
func NewQueryDataHandler(cfg *setting.Cfg, config DataPluginConfiguration, queryResultTransformer SqlQueryResultTransformer,
|
||||
func NewQueryDataHandler(cfg *setting.Cfg, db *sql.DB, config DataPluginConfiguration, queryResultTransformer SqlQueryResultTransformer,
|
||||
macroEngine SQLMacroEngine, log log.Logger) (*DataSourceHandler, error) {
|
||||
log.Debug("Creating engine...")
|
||||
defer func() {
|
||||
log.Debug("Engine created")
|
||||
}()
|
||||
|
||||
queryDataHandler := DataSourceHandler{
|
||||
queryResultTransformer: queryResultTransformer,
|
||||
macroEngine: macroEngine,
|
||||
@ -154,15 +140,6 @@ func NewQueryDataHandler(cfg *setting.Cfg, config DataPluginConfiguration, query
|
||||
queryDataHandler.metricColumnTypes = config.MetricColumnTypes
|
||||
}
|
||||
|
||||
db, err := NewDB(config.DriverName, config.ConnectionString)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
db.SetMaxOpenConns(config.DSInfo.JsonData.MaxOpenConns)
|
||||
db.SetMaxIdleConns(config.DSInfo.JsonData.MaxIdleConns)
|
||||
db.SetConnMaxLifetime(time.Duration(config.DSInfo.JsonData.ConnMaxLifetime) * time.Second)
|
||||
|
||||
queryDataHandler.db = db
|
||||
return &queryDataHandler, nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user