mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Postgres/MySQL/MSSQL: Add setting to limit maximum amount of rows processed (#38986)
Adds a new setting dataproxy.row_limit that allows an operator to limit the amount of rows being processed/accepted in response to database queries originating from SQL data sources. Closes #38975 Ref #39095 Co-authored-by: achatterjee-grafana <70489351+achatterjee-grafana@users.noreply.github.com>
This commit is contained in:
parent
5b7dc16c06
commit
1c892a2fc4
@ -175,6 +175,9 @@ send_user_header = false
|
|||||||
# Limit the amount of bytes that will be read/accepted from responses of outgoing HTTP requests.
|
# Limit the amount of bytes that will be read/accepted from responses of outgoing HTTP requests.
|
||||||
response_limit = 0
|
response_limit = 0
|
||||||
|
|
||||||
|
# Limits the number of rows that Grafana will process from SQL data sources.
|
||||||
|
row_limit = 1000000
|
||||||
|
|
||||||
#################################### Analytics ###########################
|
#################################### Analytics ###########################
|
||||||
[analytics]
|
[analytics]
|
||||||
# Server reporting, sends usage counters to stats.grafana.org every 24 hours.
|
# Server reporting, sends usage counters to stats.grafana.org every 24 hours.
|
||||||
|
@ -181,6 +181,9 @@
|
|||||||
# Limit the amount of bytes that will be read/accepted from responses of outgoing HTTP requests.
|
# Limit the amount of bytes that will be read/accepted from responses of outgoing HTTP requests.
|
||||||
;response_limit = 0
|
;response_limit = 0
|
||||||
|
|
||||||
|
# Limits the number of rows that Grafana will process from SQL data sources.
|
||||||
|
;row_limit = 1000000
|
||||||
|
|
||||||
#################################### Analytics ####################################
|
#################################### Analytics ####################################
|
||||||
[analytics]
|
[analytics]
|
||||||
# Server reporting, sends usage counters to stats.grafana.org every 24 hours.
|
# Server reporting, sends usage counters to stats.grafana.org every 24 hours.
|
||||||
|
@ -439,6 +439,10 @@ If enabled and user is not anonymous, data proxy will add X-Grafana-User header
|
|||||||
|
|
||||||
Limits the amount of bytes that will be read/accepted from responses of outgoing HTTP requests. Default is `0` which means disabled.
|
Limits the amount of bytes that will be read/accepted from responses of outgoing HTTP requests. Default is `0` which means disabled.
|
||||||
|
|
||||||
|
### row_limit
|
||||||
|
|
||||||
|
Limits the number of rows that Grafana will process from SQL (relational) data sources. Default is `1000000`.
|
||||||
|
|
||||||
<hr />
|
<hr />
|
||||||
|
|
||||||
## [analytics]
|
## [analytics]
|
||||||
|
@ -327,6 +327,7 @@ type Cfg struct {
|
|||||||
DataProxyKeepAlive int
|
DataProxyKeepAlive int
|
||||||
DataProxyIdleConnTimeout int
|
DataProxyIdleConnTimeout int
|
||||||
ResponseLimit int64
|
ResponseLimit int64
|
||||||
|
DataProxyRowLimit int64
|
||||||
|
|
||||||
// DistributedCache
|
// DistributedCache
|
||||||
RemoteCacheOptions *RemoteCacheOptions
|
RemoteCacheOptions *RemoteCacheOptions
|
||||||
|
@ -2,6 +2,8 @@ package setting
|
|||||||
|
|
||||||
import "gopkg.in/ini.v1"
|
import "gopkg.in/ini.v1"
|
||||||
|
|
||||||
|
const defaultDataProxyRowLimit = int64(1000000)
|
||||||
|
|
||||||
func readDataProxySettings(iniFile *ini.File, cfg *Cfg) error {
|
func readDataProxySettings(iniFile *ini.File, cfg *Cfg) error {
|
||||||
dataproxy := iniFile.Section("dataproxy")
|
dataproxy := iniFile.Section("dataproxy")
|
||||||
cfg.SendUserHeader = dataproxy.Key("send_user_header").MustBool(false)
|
cfg.SendUserHeader = dataproxy.Key("send_user_header").MustBool(false)
|
||||||
@ -15,6 +17,11 @@ func readDataProxySettings(iniFile *ini.File, cfg *Cfg) error {
|
|||||||
cfg.DataProxyMaxIdleConns = dataproxy.Key("max_idle_connections").MustInt()
|
cfg.DataProxyMaxIdleConns = dataproxy.Key("max_idle_connections").MustInt()
|
||||||
cfg.DataProxyIdleConnTimeout = dataproxy.Key("idle_conn_timeout_seconds").MustInt(90)
|
cfg.DataProxyIdleConnTimeout = dataproxy.Key("idle_conn_timeout_seconds").MustInt(90)
|
||||||
cfg.ResponseLimit = dataproxy.Key("response_limit").MustInt64(0)
|
cfg.ResponseLimit = dataproxy.Key("response_limit").MustInt64(0)
|
||||||
|
cfg.DataProxyRowLimit = dataproxy.Key("row_limit").MustInt64(defaultDataProxyRowLimit)
|
||||||
|
|
||||||
|
if cfg.DataProxyRowLimit <= 0 {
|
||||||
|
cfg.DataProxyRowLimit = defaultDataProxyRowLimit
|
||||||
|
}
|
||||||
|
|
||||||
if val, err := dataproxy.Key("max_idle_connections_per_host").Int(); err == nil {
|
if val, err := dataproxy.Key("max_idle_connections_per_host").Int(); err == nil {
|
||||||
cfg.Logger.Warn("[Deprecated] the configuration setting 'max_idle_connections_per_host' is deprecated, please use 'max_idle_connections' instead")
|
cfg.Logger.Warn("[Deprecated] the configuration setting 'max_idle_connections_per_host' is deprecated, please use 'max_idle_connections' instead")
|
||||||
|
@ -31,9 +31,9 @@ type Service struct {
|
|||||||
im instancemgmt.InstanceManager
|
im instancemgmt.InstanceManager
|
||||||
}
|
}
|
||||||
|
|
||||||
func ProvideService(manager backendplugin.Manager) (*Service, error) {
|
func ProvideService(cfg *setting.Cfg, manager backendplugin.Manager) (*Service, error) {
|
||||||
s := &Service{
|
s := &Service{
|
||||||
im: datasource.NewInstanceManager(newInstanceSettings()),
|
im: datasource.NewInstanceManager(newInstanceSettings(cfg)),
|
||||||
}
|
}
|
||||||
factory := coreplugin.New(backend.ServeOpts{
|
factory := coreplugin.New(backend.ServeOpts{
|
||||||
QueryDataHandler: s,
|
QueryDataHandler: s,
|
||||||
@ -62,7 +62,7 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
|
|||||||
return dsHandler.QueryData(ctx, req)
|
return dsHandler.QueryData(ctx, req)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newInstanceSettings() datasource.InstanceFactoryFunc {
|
func newInstanceSettings(cfg *setting.Cfg) datasource.InstanceFactoryFunc {
|
||||||
return func(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
|
return func(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
|
||||||
jsonData := sqleng.JsonData{
|
jsonData := sqleng.JsonData{
|
||||||
MaxOpenConns: 0,
|
MaxOpenConns: 0,
|
||||||
@ -89,8 +89,8 @@ func newInstanceSettings() datasource.InstanceFactoryFunc {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// TODO: Don't use global
|
|
||||||
if setting.Env == setting.Dev {
|
if cfg.Env == setting.Dev {
|
||||||
logger.Debug("getEngine", "connection", cnnstr)
|
logger.Debug("getEngine", "connection", cnnstr)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -99,6 +99,7 @@ func newInstanceSettings() datasource.InstanceFactoryFunc {
|
|||||||
ConnectionString: cnnstr,
|
ConnectionString: cnnstr,
|
||||||
DSInfo: dsInfo,
|
DSInfo: dsInfo,
|
||||||
MetricColumnTypes: []string{"VARCHAR", "CHAR", "NVARCHAR", "NCHAR"},
|
MetricColumnTypes: []string{"VARCHAR", "CHAR", "NVARCHAR", "NCHAR"},
|
||||||
|
RowLimit: cfg.DataProxyRowLimit,
|
||||||
}
|
}
|
||||||
|
|
||||||
queryResultTransformer := mssqlQueryResultTransformer{
|
queryResultTransformer := mssqlQueryResultTransformer{
|
||||||
|
@ -57,6 +57,7 @@ func TestMSSQL(t *testing.T) {
|
|||||||
ConnectionString: "",
|
ConnectionString: "",
|
||||||
DSInfo: dsInfo,
|
DSInfo: dsInfo,
|
||||||
MetricColumnTypes: []string{"VARCHAR", "CHAR", "NVARCHAR", "NCHAR"},
|
MetricColumnTypes: []string{"VARCHAR", "CHAR", "NVARCHAR", "NCHAR"},
|
||||||
|
RowLimit: 1000000,
|
||||||
}
|
}
|
||||||
endpoint, err := sqleng.NewQueryDataHandler(config, &queryResultTransformer, newMssqlMacroEngine(), logger)
|
endpoint, err := sqleng.NewQueryDataHandler(config, &queryResultTransformer, newMssqlMacroEngine(), logger)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@ -794,6 +795,7 @@ func TestMSSQL(t *testing.T) {
|
|||||||
ConnectionString: "",
|
ConnectionString: "",
|
||||||
DSInfo: dsInfo,
|
DSInfo: dsInfo,
|
||||||
MetricColumnTypes: []string{"VARCHAR", "CHAR", "NVARCHAR", "NCHAR"},
|
MetricColumnTypes: []string{"VARCHAR", "CHAR", "NVARCHAR", "NCHAR"},
|
||||||
|
RowLimit: 1000000,
|
||||||
}
|
}
|
||||||
endpoint, err := sqleng.NewQueryDataHandler(config, &queryResultTransformer, newMssqlMacroEngine(), logger)
|
endpoint, err := sqleng.NewQueryDataHandler(config, &queryResultTransformer, newMssqlMacroEngine(), logger)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@ -1189,6 +1191,83 @@ func TestMSSQL(t *testing.T) {
|
|||||||
require.Equal(t, data.FieldTypeNullableTime, frames[0].Fields[0].Type())
|
require.Equal(t, data.FieldTypeNullableTime, frames[0].Fields[0].Type())
|
||||||
require.Equal(t, data.FieldTypeNullableTime, frames[0].Fields[1].Type())
|
require.Equal(t, data.FieldTypeNullableTime, frames[0].Fields[1].Type())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("When row limit set to 1", func(t *testing.T) {
|
||||||
|
queryResultTransformer := mssqlQueryResultTransformer{
|
||||||
|
log: logger,
|
||||||
|
}
|
||||||
|
dsInfo := sqleng.DataSourceInfo{}
|
||||||
|
config := sqleng.DataPluginConfiguration{
|
||||||
|
DriverName: "mssql",
|
||||||
|
ConnectionString: "",
|
||||||
|
DSInfo: dsInfo,
|
||||||
|
MetricColumnTypes: []string{"VARCHAR", "CHAR", "NVARCHAR", "NCHAR"},
|
||||||
|
RowLimit: 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
handler, err := sqleng.NewQueryDataHandler(config, &queryResultTransformer, newMssqlMacroEngine(), logger)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
t.Run("When doing a table query that returns 2 rows should limit the result to 1 row", func(t *testing.T) {
|
||||||
|
query := &backend.QueryDataRequest{
|
||||||
|
Queries: []backend.DataQuery{
|
||||||
|
{
|
||||||
|
JSON: []byte(`{
|
||||||
|
"rawSql": "SELECT 1 as value UNION ALL select 2 as value",
|
||||||
|
"format": "table"
|
||||||
|
}`),
|
||||||
|
RefID: "A",
|
||||||
|
TimeRange: backend.TimeRange{
|
||||||
|
From: time.Now(),
|
||||||
|
To: time.Now(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := handler.QueryData(context.Background(), query)
|
||||||
|
require.NoError(t, err)
|
||||||
|
queryResult := resp.Responses["A"]
|
||||||
|
require.NoError(t, queryResult.Error)
|
||||||
|
frames := queryResult.Frames
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 1, len(frames))
|
||||||
|
require.Equal(t, 1, len(frames[0].Fields))
|
||||||
|
require.Equal(t, 1, frames[0].Rows())
|
||||||
|
require.Len(t, frames[0].Meta.Notices, 1)
|
||||||
|
require.Equal(t, data.NoticeSeverityWarning, frames[0].Meta.Notices[0].Severity)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("When doing a time series that returns 2 rows should limit the result to 1 row", func(t *testing.T) {
|
||||||
|
query := &backend.QueryDataRequest{
|
||||||
|
Queries: []backend.DataQuery{
|
||||||
|
{
|
||||||
|
JSON: []byte(`{
|
||||||
|
"rawSql": "SELECT 1 as time, 1 as value UNION ALL select 2 as time, 2 as value",
|
||||||
|
"format": "time_series"
|
||||||
|
}`),
|
||||||
|
RefID: "A",
|
||||||
|
TimeRange: backend.TimeRange{
|
||||||
|
From: time.Now(),
|
||||||
|
To: time.Now(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := handler.QueryData(context.Background(), query)
|
||||||
|
require.NoError(t, err)
|
||||||
|
queryResult := resp.Responses["A"]
|
||||||
|
require.NoError(t, queryResult.Error)
|
||||||
|
frames := queryResult.Frames
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 1, len(frames))
|
||||||
|
require.Equal(t, 2, len(frames[0].Fields))
|
||||||
|
require.Equal(t, 1, frames[0].Rows())
|
||||||
|
require.Len(t, frames[0].Meta.Notices, 1)
|
||||||
|
require.Equal(t, data.NoticeSeverityWarning, frames[0].Meta.Notices[0].Severity)
|
||||||
|
})
|
||||||
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -46,8 +46,7 @@ func characterEscape(s string, escapeChar string) string {
|
|||||||
|
|
||||||
func ProvideService(cfg *setting.Cfg, manager backendplugin.Manager, httpClientProvider httpclient.Provider) (*Service, error) {
|
func ProvideService(cfg *setting.Cfg, manager backendplugin.Manager, httpClientProvider httpclient.Provider) (*Service, error) {
|
||||||
s := &Service{
|
s := &Service{
|
||||||
Cfg: cfg,
|
im: datasource.NewInstanceManager(newInstanceSettings(cfg, httpClientProvider)),
|
||||||
im: datasource.NewInstanceManager(newInstanceSettings(httpClientProvider)),
|
|
||||||
}
|
}
|
||||||
factory := coreplugin.New(backend.ServeOpts{
|
factory := coreplugin.New(backend.ServeOpts{
|
||||||
QueryDataHandler: s,
|
QueryDataHandler: s,
|
||||||
@ -59,7 +58,7 @@ func ProvideService(cfg *setting.Cfg, manager backendplugin.Manager, httpClientP
|
|||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.InstanceFactoryFunc {
|
func newInstanceSettings(cfg *setting.Cfg, httpClientProvider httpclient.Provider) datasource.InstanceFactoryFunc {
|
||||||
return func(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
|
return func(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
|
||||||
jsonData := sqleng.JsonData{
|
jsonData := sqleng.JsonData{
|
||||||
MaxOpenConns: 0,
|
MaxOpenConns: 0,
|
||||||
@ -117,7 +116,7 @@ func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.Inst
|
|||||||
cnnstr += fmt.Sprintf("&time_zone='%s'", url.QueryEscape(dsInfo.JsonData.Timezone))
|
cnnstr += fmt.Sprintf("&time_zone='%s'", url.QueryEscape(dsInfo.JsonData.Timezone))
|
||||||
}
|
}
|
||||||
|
|
||||||
if setting.Env == setting.Dev {
|
if cfg.Env == setting.Dev {
|
||||||
logger.Debug("getEngine", "connection", cnnstr)
|
logger.Debug("getEngine", "connection", cnnstr)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -127,6 +126,7 @@ func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.Inst
|
|||||||
DSInfo: dsInfo,
|
DSInfo: dsInfo,
|
||||||
TimeColumnNames: []string{"time", "time_sec"},
|
TimeColumnNames: []string{"time", "time_sec"},
|
||||||
MetricColumnTypes: []string{"CHAR", "VARCHAR", "TINYTEXT", "TEXT", "MEDIUMTEXT", "LONGTEXT"},
|
MetricColumnTypes: []string{"CHAR", "VARCHAR", "TINYTEXT", "TEXT", "MEDIUMTEXT", "LONGTEXT"},
|
||||||
|
RowLimit: cfg.DataProxyRowLimit,
|
||||||
}
|
}
|
||||||
|
|
||||||
rowTransformer := mysqlQueryResultTransformer{
|
rowTransformer := mysqlQueryResultTransformer{
|
||||||
|
@ -68,6 +68,7 @@ func TestMySQL(t *testing.T) {
|
|||||||
DSInfo: dsInfo,
|
DSInfo: dsInfo,
|
||||||
TimeColumnNames: []string{"time", "time_sec"},
|
TimeColumnNames: []string{"time", "time_sec"},
|
||||||
MetricColumnTypes: []string{"CHAR", "VARCHAR", "TINYTEXT", "TEXT", "MEDIUMTEXT", "LONGTEXT"},
|
MetricColumnTypes: []string{"CHAR", "VARCHAR", "TINYTEXT", "TEXT", "MEDIUMTEXT", "LONGTEXT"},
|
||||||
|
RowLimit: 1000000,
|
||||||
}
|
}
|
||||||
|
|
||||||
rowTransformer := mysqlQueryResultTransformer{
|
rowTransformer := mysqlQueryResultTransformer{
|
||||||
@ -1151,6 +1152,85 @@ func TestMySQL(t *testing.T) {
|
|||||||
require.Equal(t, data.FieldTypeNullableTime, frames[0].Fields[0].Type())
|
require.Equal(t, data.FieldTypeNullableTime, frames[0].Fields[0].Type())
|
||||||
require.Equal(t, data.FieldTypeNullableTime, frames[0].Fields[1].Type())
|
require.Equal(t, data.FieldTypeNullableTime, frames[0].Fields[1].Type())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("When row limit set to 1", func(t *testing.T) {
|
||||||
|
dsInfo := sqleng.DataSourceInfo{}
|
||||||
|
config := sqleng.DataPluginConfiguration{
|
||||||
|
DriverName: "mysql",
|
||||||
|
ConnectionString: "",
|
||||||
|
DSInfo: dsInfo,
|
||||||
|
TimeColumnNames: []string{"time", "time_sec"},
|
||||||
|
MetricColumnTypes: []string{"CHAR", "VARCHAR", "TINYTEXT", "TEXT", "MEDIUMTEXT", "LONGTEXT"},
|
||||||
|
RowLimit: 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
queryResultTransformer := mysqlQueryResultTransformer{
|
||||||
|
log: logger,
|
||||||
|
}
|
||||||
|
|
||||||
|
handler, err := sqleng.NewQueryDataHandler(config, &queryResultTransformer, newMysqlMacroEngine(logger), logger)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
t.Run("When doing a table query that returns 2 rows should limit the result to 1 row", func(t *testing.T) {
|
||||||
|
query := &backend.QueryDataRequest{
|
||||||
|
Queries: []backend.DataQuery{
|
||||||
|
{
|
||||||
|
JSON: []byte(`{
|
||||||
|
"rawSql": "SELECT 1 as value UNION ALL select 2 as value",
|
||||||
|
"format": "table"
|
||||||
|
}`),
|
||||||
|
RefID: "A",
|
||||||
|
TimeRange: backend.TimeRange{
|
||||||
|
From: time.Now(),
|
||||||
|
To: time.Now(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := handler.QueryData(context.Background(), query)
|
||||||
|
require.NoError(t, err)
|
||||||
|
queryResult := resp.Responses["A"]
|
||||||
|
require.NoError(t, queryResult.Error)
|
||||||
|
frames := queryResult.Frames
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 1, len(frames))
|
||||||
|
require.Equal(t, 1, len(frames[0].Fields))
|
||||||
|
require.Equal(t, 1, frames[0].Rows())
|
||||||
|
require.Len(t, frames[0].Meta.Notices, 1)
|
||||||
|
require.Equal(t, data.NoticeSeverityWarning, frames[0].Meta.Notices[0].Severity)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("When doing a time series that returns 2 rows should limit the result to 1 row", func(t *testing.T) {
|
||||||
|
query := &backend.QueryDataRequest{
|
||||||
|
Queries: []backend.DataQuery{
|
||||||
|
{
|
||||||
|
JSON: []byte(`{
|
||||||
|
"rawSql": "SELECT 1 as time, 1 as value UNION ALL select 2 as time, 2 as value",
|
||||||
|
"format": "time_series"
|
||||||
|
}`),
|
||||||
|
RefID: "A",
|
||||||
|
TimeRange: backend.TimeRange{
|
||||||
|
From: time.Now(),
|
||||||
|
To: time.Now(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := handler.QueryData(context.Background(), query)
|
||||||
|
require.NoError(t, err)
|
||||||
|
queryResult := resp.Responses["A"]
|
||||||
|
require.NoError(t, queryResult.Error)
|
||||||
|
frames := queryResult.Frames
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 1, len(frames))
|
||||||
|
require.Equal(t, 2, len(frames[0].Fields))
|
||||||
|
require.Equal(t, 1, frames[0].Rows())
|
||||||
|
require.Len(t, frames[0].Meta.Notices, 1)
|
||||||
|
require.Equal(t, data.NoticeSeverityWarning, frames[0].Meta.Notices[0].Severity)
|
||||||
|
})
|
||||||
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -26,10 +26,9 @@ var logger = log.New("tsdb.postgres")
|
|||||||
|
|
||||||
func ProvideService(cfg *setting.Cfg, manager backendplugin.Manager) (*Service, error) {
|
func ProvideService(cfg *setting.Cfg, manager backendplugin.Manager) (*Service, error) {
|
||||||
s := &Service{
|
s := &Service{
|
||||||
Cfg: cfg,
|
|
||||||
tlsManager: newTLSManager(logger, cfg.DataPath),
|
tlsManager: newTLSManager(logger, cfg.DataPath),
|
||||||
}
|
}
|
||||||
s.im = datasource.NewInstanceManager(s.newInstanceSettings())
|
s.im = datasource.NewInstanceManager(s.newInstanceSettings(cfg))
|
||||||
factory := coreplugin.New(backend.ServeOpts{
|
factory := coreplugin.New(backend.ServeOpts{
|
||||||
QueryDataHandler: s,
|
QueryDataHandler: s,
|
||||||
})
|
})
|
||||||
@ -41,7 +40,6 @@ func ProvideService(cfg *setting.Cfg, manager backendplugin.Manager) (*Service,
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Service struct {
|
type Service struct {
|
||||||
Cfg *setting.Cfg
|
|
||||||
tlsManager tlsSettingsProvider
|
tlsManager tlsSettingsProvider
|
||||||
im instancemgmt.InstanceManager
|
im instancemgmt.InstanceManager
|
||||||
}
|
}
|
||||||
@ -63,7 +61,7 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
|
|||||||
return dsInfo.QueryData(ctx, req)
|
return dsInfo.QueryData(ctx, req)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) newInstanceSettings() datasource.InstanceFactoryFunc {
|
func (s *Service) newInstanceSettings(cfg *setting.Cfg) datasource.InstanceFactoryFunc {
|
||||||
return func(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
|
return func(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
|
||||||
logger.Debug("Creating Postgres query endpoint")
|
logger.Debug("Creating Postgres query endpoint")
|
||||||
jsonData := sqleng.JsonData{
|
jsonData := sqleng.JsonData{
|
||||||
@ -94,7 +92,7 @@ func (s *Service) newInstanceSettings() datasource.InstanceFactoryFunc {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.Cfg.Env == setting.Dev {
|
if cfg.Env == setting.Dev {
|
||||||
logger.Debug("getEngine", "connection", cnnstr)
|
logger.Debug("getEngine", "connection", cnnstr)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -103,6 +101,7 @@ func (s *Service) newInstanceSettings() datasource.InstanceFactoryFunc {
|
|||||||
ConnectionString: cnnstr,
|
ConnectionString: cnnstr,
|
||||||
DSInfo: dsInfo,
|
DSInfo: dsInfo,
|
||||||
MetricColumnTypes: []string{"UNKNOWN", "TEXT", "VARCHAR", "CHAR"},
|
MetricColumnTypes: []string{"UNKNOWN", "TEXT", "VARCHAR", "CHAR"},
|
||||||
|
RowLimit: cfg.DataProxyRowLimit,
|
||||||
}
|
}
|
||||||
|
|
||||||
queryResultTransformer := postgresQueryResultTransformer{
|
queryResultTransformer := postgresQueryResultTransformer{
|
||||||
|
@ -112,7 +112,6 @@ func TestGenerateConnectionString(t *testing.T) {
|
|||||||
for _, tt := range testCases {
|
for _, tt := range testCases {
|
||||||
t.Run(tt.desc, func(t *testing.T) {
|
t.Run(tt.desc, func(t *testing.T) {
|
||||||
svc := Service{
|
svc := Service{
|
||||||
Cfg: cfg,
|
|
||||||
tlsManager: &tlsTestManager{settings: tt.tlsSettings},
|
tlsManager: &tlsTestManager{settings: tt.tlsSettings},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -190,6 +189,7 @@ func TestPostgres(t *testing.T) {
|
|||||||
ConnectionString: "",
|
ConnectionString: "",
|
||||||
DSInfo: dsInfo,
|
DSInfo: dsInfo,
|
||||||
MetricColumnTypes: []string{"UNKNOWN", "TEXT", "VARCHAR", "CHAR"},
|
MetricColumnTypes: []string{"UNKNOWN", "TEXT", "VARCHAR", "CHAR"},
|
||||||
|
RowLimit: 1000000,
|
||||||
}
|
}
|
||||||
|
|
||||||
queryResultTransformer := postgresQueryResultTransformer{
|
queryResultTransformer := postgresQueryResultTransformer{
|
||||||
@ -1225,6 +1225,84 @@ func TestPostgres(t *testing.T) {
|
|||||||
require.Equal(t, data.FieldTypeNullableTime, frames[0].Fields[0].Type())
|
require.Equal(t, data.FieldTypeNullableTime, frames[0].Fields[0].Type())
|
||||||
require.Equal(t, data.FieldTypeNullableTime, frames[0].Fields[1].Type())
|
require.Equal(t, data.FieldTypeNullableTime, frames[0].Fields[1].Type())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("When row limit set to 1", func(t *testing.T) {
|
||||||
|
dsInfo := sqleng.DataSourceInfo{}
|
||||||
|
config := sqleng.DataPluginConfiguration{
|
||||||
|
DriverName: "postgres",
|
||||||
|
ConnectionString: "",
|
||||||
|
DSInfo: dsInfo,
|
||||||
|
MetricColumnTypes: []string{"UNKNOWN", "TEXT", "VARCHAR", "CHAR"},
|
||||||
|
RowLimit: 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
queryResultTransformer := postgresQueryResultTransformer{
|
||||||
|
log: logger,
|
||||||
|
}
|
||||||
|
|
||||||
|
handler, err := sqleng.NewQueryDataHandler(config, &queryResultTransformer, newPostgresMacroEngine(false), logger)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
t.Run("When doing a table query that returns 2 rows should limit the result to 1 row", func(t *testing.T) {
|
||||||
|
query := &backend.QueryDataRequest{
|
||||||
|
Queries: []backend.DataQuery{
|
||||||
|
{
|
||||||
|
JSON: []byte(`{
|
||||||
|
"rawSql": "SELECT 1 as value UNION ALL select 2 as value",
|
||||||
|
"format": "table"
|
||||||
|
}`),
|
||||||
|
RefID: "A",
|
||||||
|
TimeRange: backend.TimeRange{
|
||||||
|
From: time.Now(),
|
||||||
|
To: time.Now(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := handler.QueryData(context.Background(), query)
|
||||||
|
require.NoError(t, err)
|
||||||
|
queryResult := resp.Responses["A"]
|
||||||
|
require.NoError(t, queryResult.Error)
|
||||||
|
frames := queryResult.Frames
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 1, len(frames))
|
||||||
|
require.Equal(t, 1, len(frames[0].Fields))
|
||||||
|
require.Equal(t, 1, frames[0].Rows())
|
||||||
|
require.Len(t, frames[0].Meta.Notices, 1)
|
||||||
|
require.Equal(t, data.NoticeSeverityWarning, frames[0].Meta.Notices[0].Severity)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("When doing a time series query that returns 2 rows should limit the result to 1 row", func(t *testing.T) {
|
||||||
|
query := &backend.QueryDataRequest{
|
||||||
|
Queries: []backend.DataQuery{
|
||||||
|
{
|
||||||
|
JSON: []byte(`{
|
||||||
|
"rawSql": "SELECT 1 as time, 1 as value UNION ALL select 2 as time, 2 as value",
|
||||||
|
"format": "time_series"
|
||||||
|
}`),
|
||||||
|
RefID: "A",
|
||||||
|
TimeRange: backend.TimeRange{
|
||||||
|
From: time.Now(),
|
||||||
|
To: time.Now(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := handler.QueryData(context.Background(), query)
|
||||||
|
require.NoError(t, err)
|
||||||
|
queryResult := resp.Responses["A"]
|
||||||
|
require.NoError(t, queryResult.Error)
|
||||||
|
frames := queryResult.Frames
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 1, len(frames))
|
||||||
|
require.Equal(t, 2, len(frames[0].Fields))
|
||||||
|
require.Equal(t, 1, frames[0].Rows())
|
||||||
|
require.Len(t, frames[0].Meta.Notices, 1)
|
||||||
|
require.Equal(t, data.NoticeSeverityWarning, frames[0].Meta.Notices[0].Severity)
|
||||||
|
})
|
||||||
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -92,6 +92,7 @@ type DataPluginConfiguration struct {
|
|||||||
ConnectionString string
|
ConnectionString string
|
||||||
TimeColumnNames []string
|
TimeColumnNames []string
|
||||||
MetricColumnTypes []string
|
MetricColumnTypes []string
|
||||||
|
RowLimit int64
|
||||||
}
|
}
|
||||||
type DataSourceHandler struct {
|
type DataSourceHandler struct {
|
||||||
macroEngine SQLMacroEngine
|
macroEngine SQLMacroEngine
|
||||||
@ -101,6 +102,7 @@ type DataSourceHandler struct {
|
|||||||
metricColumnTypes []string
|
metricColumnTypes []string
|
||||||
log log.Logger
|
log log.Logger
|
||||||
dsInfo DataSourceInfo
|
dsInfo DataSourceInfo
|
||||||
|
rowLimit int64
|
||||||
}
|
}
|
||||||
type QueryJson struct {
|
type QueryJson struct {
|
||||||
RawSql string `json:"rawSql"`
|
RawSql string `json:"rawSql"`
|
||||||
@ -133,6 +135,7 @@ func NewQueryDataHandler(config DataPluginConfiguration, queryResultTransformer
|
|||||||
timeColumnNames: []string{"time"},
|
timeColumnNames: []string{"time"},
|
||||||
log: log,
|
log: log,
|
||||||
dsInfo: config.DSInfo,
|
dsInfo: config.DSInfo,
|
||||||
|
rowLimit: config.RowLimit,
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(config.TimeColumnNames) > 0 {
|
if len(config.TimeColumnNames) > 0 {
|
||||||
@ -168,8 +171,6 @@ func NewQueryDataHandler(config DataPluginConfiguration, queryResultTransformer
|
|||||||
return &queryDataHandler, nil
|
return &queryDataHandler, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
const rowLimit = 1000000
|
|
||||||
|
|
||||||
type DBDataResponse struct {
|
type DBDataResponse struct {
|
||||||
dataResponse backend.DataResponse
|
dataResponse backend.DataResponse
|
||||||
refID string
|
refID string
|
||||||
@ -284,15 +285,17 @@ func (e *DataSourceHandler) executeQuery(query backend.DataQuery, wg *sync.WaitG
|
|||||||
|
|
||||||
// Convert row.Rows to dataframe
|
// Convert row.Rows to dataframe
|
||||||
stringConverters := e.queryResultTransformer.GetConverterList()
|
stringConverters := e.queryResultTransformer.GetConverterList()
|
||||||
frame, err := sqlutil.FrameFromRows(rows.Rows, rowLimit, sqlutil.ToConverters(stringConverters...)...)
|
frame, err := sqlutil.FrameFromRows(rows.Rows, e.rowLimit, sqlutil.ToConverters(stringConverters...)...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errAppendDebug("convert frame from rows error", err, interpolatedQuery)
|
errAppendDebug("convert frame from rows error", err, interpolatedQuery)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
frame.SetMeta(&data.FrameMeta{
|
if frame.Meta == nil {
|
||||||
ExecutedQueryString: interpolatedQuery,
|
frame.Meta = &data.FrameMeta{}
|
||||||
})
|
}
|
||||||
|
|
||||||
|
frame.Meta.ExecutedQueryString = interpolatedQuery
|
||||||
|
|
||||||
// If no rows were returned, no point checking anything else.
|
// If no rows were returned, no point checking anything else.
|
||||||
if frame.Rows() == 0 {
|
if frame.Rows() == 0 {
|
||||||
|
Loading…
Reference in New Issue
Block a user