Elasticsearch: Fix setting of default maxConcurrentShardRequests (#87703)

* Elasticsearch: Fix setting of default maxConcurrentShardRequests

* Add float test scenario

* Add comment
This commit is contained in:
Ivana Huckova 2024-05-13 15:39:37 +02:00 committed by GitHub
parent 17f6f75bd2
commit 336b6dafb6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 136 additions and 14 deletions

View File

@ -254,12 +254,7 @@ func (c *baseClientImpl) createMultiSearchRequests(searchRequests []*SearchReque
func (c *baseClientImpl) getMultiSearchQueryParameters() string {
var qs []string
maxConcurrentShardRequests := c.ds.MaxConcurrentShardRequests
if maxConcurrentShardRequests == 0 {
maxConcurrentShardRequests = 5
}
qs = append(qs, fmt.Sprintf("max_concurrent_shard_requests=%d", maxConcurrentShardRequests))
qs = append(qs, fmt.Sprintf("max_concurrent_shard_requests=%d", c.ds.MaxConcurrentShardRequests))
if c.ds.IncludeFrozen {
qs = append(qs, "ignore_throttled=false")

View File

@ -32,8 +32,10 @@ var eslog = log.New("tsdb.elasticsearch")
const (
// headerFromExpression is used by data sources to identify expression queries
headerFromExpression = "X-Grafana-From-Expr"
// headerFromAlert is used by datasources to identify alert queries
// headerFromAlert is used by data sources to identify alert queries
headerFromAlert = "FromAlert"
// this is the default value for the maxConcurrentShardRequests setting - it should be in sync with the default value in the datasource config settings
defaultMaxConcurrentShardRequests = int64(5)
)
type Service struct {
@ -138,18 +140,23 @@ func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.Inst
index = settings.Database
}
var maxConcurrentShardRequests float64
var maxConcurrentShardRequests int64
switch v := jsonData["maxConcurrentShardRequests"].(type) {
// unmarshalling from JSON will return float64 for numbers, so we need to handle that and convert to int64
case float64:
maxConcurrentShardRequests = v
maxConcurrentShardRequests = int64(v)
case string:
maxConcurrentShardRequests, err = strconv.ParseFloat(v, 64)
maxConcurrentShardRequests, err = strconv.ParseInt(v, 10, 64)
if err != nil {
maxConcurrentShardRequests = 256
maxConcurrentShardRequests = defaultMaxConcurrentShardRequests
}
default:
maxConcurrentShardRequests = 256
maxConcurrentShardRequests = defaultMaxConcurrentShardRequests
}
if maxConcurrentShardRequests <= 0 {
maxConcurrentShardRequests = defaultMaxConcurrentShardRequests
}
includeFrozen, ok := jsonData["includeFrozen"].(bool)
@ -168,7 +175,7 @@ func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.Inst
URL: settings.URL,
HTTPClient: httpCli,
Database: index,
MaxConcurrentShardRequests: int64(maxConcurrentShardRequests),
MaxConcurrentShardRequests: maxConcurrentShardRequests,
ConfiguredFields: configuredFields,
Interval: interval,
IncludeFrozen: includeFrozen,

View File

@ -14,7 +14,7 @@ import (
type datasourceInfo struct {
TimeField any `json:"timeField"`
MaxConcurrentShardRequests int64 `json:"maxConcurrentShardRequests"`
MaxConcurrentShardRequests any `json:"maxConcurrentShardRequests,omitempty"`
Interval string `json:"interval"`
}
@ -71,6 +71,126 @@ func TestNewInstanceSettings(t *testing.T) {
require.EqualError(t, err, "elasticsearch time field name is required")
})
})
t.Run("maxConcurrentShardRequests", func(t *testing.T) {
t.Run("no maxConcurrentShardRequests", func(t *testing.T) {
dsInfo := datasourceInfo{
TimeField: "@timestamp",
}
settingsJSON, err := json.Marshal(dsInfo)
require.NoError(t, err)
dsSettings := backend.DataSourceInstanceSettings{
JSONData: json.RawMessage(settingsJSON),
}
instance, err := newInstanceSettings(httpclient.NewProvider())(context.Background(), dsSettings)
require.Equal(t, defaultMaxConcurrentShardRequests, instance.(es.DatasourceInfo).MaxConcurrentShardRequests)
require.NoError(t, err)
})
t.Run("string maxConcurrentShardRequests", func(t *testing.T) {
dsInfo := datasourceInfo{
TimeField: "@timestamp",
MaxConcurrentShardRequests: "10",
}
settingsJSON, err := json.Marshal(dsInfo)
require.NoError(t, err)
dsSettings := backend.DataSourceInstanceSettings{
JSONData: json.RawMessage(settingsJSON),
}
instance, err := newInstanceSettings(httpclient.NewProvider())(context.Background(), dsSettings)
require.Equal(t, int64(10), instance.(es.DatasourceInfo).MaxConcurrentShardRequests)
require.NoError(t, err)
})
t.Run("number maxConcurrentShardRequests", func(t *testing.T) {
dsInfo := datasourceInfo{
TimeField: "@timestamp",
MaxConcurrentShardRequests: 10,
}
settingsJSON, err := json.Marshal(dsInfo)
require.NoError(t, err)
dsSettings := backend.DataSourceInstanceSettings{
JSONData: json.RawMessage(settingsJSON),
}
instance, err := newInstanceSettings(httpclient.NewProvider())(context.Background(), dsSettings)
require.Equal(t, int64(10), instance.(es.DatasourceInfo).MaxConcurrentShardRequests)
require.NoError(t, err)
})
t.Run("zero maxConcurrentShardRequests", func(t *testing.T) {
dsInfo := datasourceInfo{
TimeField: "@timestamp",
MaxConcurrentShardRequests: 0,
}
settingsJSON, err := json.Marshal(dsInfo)
require.NoError(t, err)
dsSettings := backend.DataSourceInstanceSettings{
JSONData: json.RawMessage(settingsJSON),
}
instance, err := newInstanceSettings(httpclient.NewProvider())(context.Background(), dsSettings)
require.Equal(t, defaultMaxConcurrentShardRequests, instance.(es.DatasourceInfo).MaxConcurrentShardRequests)
require.NoError(t, err)
})
t.Run("negative maxConcurrentShardRequests", func(t *testing.T) {
dsInfo := datasourceInfo{
TimeField: "@timestamp",
MaxConcurrentShardRequests: -10,
}
settingsJSON, err := json.Marshal(dsInfo)
require.NoError(t, err)
dsSettings := backend.DataSourceInstanceSettings{
JSONData: json.RawMessage(settingsJSON),
}
instance, err := newInstanceSettings(httpclient.NewProvider())(context.Background(), dsSettings)
require.Equal(t, defaultMaxConcurrentShardRequests, instance.(es.DatasourceInfo).MaxConcurrentShardRequests)
require.NoError(t, err)
})
t.Run("float maxConcurrentShardRequests", func(t *testing.T) {
dsInfo := datasourceInfo{
TimeField: "@timestamp",
MaxConcurrentShardRequests: 10.5,
}
settingsJSON, err := json.Marshal(dsInfo)
require.NoError(t, err)
dsSettings := backend.DataSourceInstanceSettings{
JSONData: json.RawMessage(settingsJSON),
}
instance, err := newInstanceSettings(httpclient.NewProvider())(context.Background(), dsSettings)
require.Equal(t, int64(10), instance.(es.DatasourceInfo).MaxConcurrentShardRequests)
require.NoError(t, err)
})
t.Run("invalid maxConcurrentShardRequests", func(t *testing.T) {
dsInfo := datasourceInfo{
TimeField: "@timestamp",
MaxConcurrentShardRequests: "invalid",
}
settingsJSON, err := json.Marshal(dsInfo)
require.NoError(t, err)
dsSettings := backend.DataSourceInstanceSettings{
JSONData: json.RawMessage(settingsJSON),
}
instance, err := newInstanceSettings(httpclient.NewProvider())(context.Background(), dsSettings)
require.Equal(t, defaultMaxConcurrentShardRequests, instance.(es.DatasourceInfo).MaxConcurrentShardRequests)
require.NoError(t, err)
})
})
}
func TestCreateElasticsearchURL(t *testing.T) {