diff --git a/pkg/tsdb/elasticsearch/client/client.go b/pkg/tsdb/elasticsearch/client/client.go index b51b824e45b..90ec24c2791 100644 --- a/pkg/tsdb/elasticsearch/client/client.go +++ b/pkg/tsdb/elasticsearch/client/client.go @@ -254,12 +254,7 @@ func (c *baseClientImpl) createMultiSearchRequests(searchRequests []*SearchReque func (c *baseClientImpl) getMultiSearchQueryParameters() string { var qs []string - - maxConcurrentShardRequests := c.ds.MaxConcurrentShardRequests - if maxConcurrentShardRequests == 0 { - maxConcurrentShardRequests = 5 - } - qs = append(qs, fmt.Sprintf("max_concurrent_shard_requests=%d", maxConcurrentShardRequests)) + qs = append(qs, fmt.Sprintf("max_concurrent_shard_requests=%d", c.ds.MaxConcurrentShardRequests)) if c.ds.IncludeFrozen { qs = append(qs, "ignore_throttled=false") diff --git a/pkg/tsdb/elasticsearch/elasticsearch.go b/pkg/tsdb/elasticsearch/elasticsearch.go index 28fe2bb824e..69c7fc30a62 100644 --- a/pkg/tsdb/elasticsearch/elasticsearch.go +++ b/pkg/tsdb/elasticsearch/elasticsearch.go @@ -32,8 +32,10 @@ var eslog = log.New("tsdb.elasticsearch") const ( // headerFromExpression is used by data sources to identify expression queries headerFromExpression = "X-Grafana-From-Expr" - // headerFromAlert is used by datasources to identify alert queries + // headerFromAlert is used by data sources to identify alert queries headerFromAlert = "FromAlert" + // this is the default value for the maxConcurrentShardRequests setting - it should be in sync with the default value in the datasource config settings + defaultMaxConcurrentShardRequests = int64(5) ) type Service struct { @@ -138,18 +140,23 @@ func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.Inst index = settings.Database } - var maxConcurrentShardRequests float64 + var maxConcurrentShardRequests int64 switch v := jsonData["maxConcurrentShardRequests"].(type) { + // unmarshalling from JSON will return float64 for numbers, so we need to handle that and convert to int64 case float64: - maxConcurrentShardRequests = v + maxConcurrentShardRequests = int64(v) case string: - maxConcurrentShardRequests, err = strconv.ParseFloat(v, 64) + maxConcurrentShardRequests, err = strconv.ParseInt(v, 10, 64) if err != nil { - maxConcurrentShardRequests = 256 + maxConcurrentShardRequests = defaultMaxConcurrentShardRequests } default: - maxConcurrentShardRequests = 256 + maxConcurrentShardRequests = defaultMaxConcurrentShardRequests + } + + if maxConcurrentShardRequests <= 0 { + maxConcurrentShardRequests = defaultMaxConcurrentShardRequests } includeFrozen, ok := jsonData["includeFrozen"].(bool) @@ -168,7 +175,7 @@ func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.Inst URL: settings.URL, HTTPClient: httpCli, Database: index, - MaxConcurrentShardRequests: int64(maxConcurrentShardRequests), + MaxConcurrentShardRequests: maxConcurrentShardRequests, ConfiguredFields: configuredFields, Interval: interval, IncludeFrozen: includeFrozen, diff --git a/pkg/tsdb/elasticsearch/elasticsearch_test.go b/pkg/tsdb/elasticsearch/elasticsearch_test.go index 13a6aa79c71..798e54bf6d0 100644 --- a/pkg/tsdb/elasticsearch/elasticsearch_test.go +++ b/pkg/tsdb/elasticsearch/elasticsearch_test.go @@ -14,7 +14,7 @@ import ( type datasourceInfo struct { TimeField any `json:"timeField"` - MaxConcurrentShardRequests int64 `json:"maxConcurrentShardRequests"` + MaxConcurrentShardRequests any `json:"maxConcurrentShardRequests,omitempty"` Interval string `json:"interval"` } @@ -71,6 +71,126 @@ func TestNewInstanceSettings(t *testing.T) { require.EqualError(t, err, "elasticsearch time field name is required") }) }) + + t.Run("maxConcurrentShardRequests", func(t *testing.T) { + t.Run("no maxConcurrentShardRequests", func(t *testing.T) { + dsInfo := datasourceInfo{ + TimeField: "@timestamp", + } + settingsJSON, err := json.Marshal(dsInfo) + require.NoError(t, err) + + dsSettings := backend.DataSourceInstanceSettings{ + JSONData: json.RawMessage(settingsJSON), + } + + instance, err := newInstanceSettings(httpclient.NewProvider())(context.Background(), dsSettings) + require.Equal(t, defaultMaxConcurrentShardRequests, instance.(es.DatasourceInfo).MaxConcurrentShardRequests) + require.NoError(t, err) + }) + + t.Run("string maxConcurrentShardRequests", func(t *testing.T) { + dsInfo := datasourceInfo{ + TimeField: "@timestamp", + MaxConcurrentShardRequests: "10", + } + settingsJSON, err := json.Marshal(dsInfo) + require.NoError(t, err) + + dsSettings := backend.DataSourceInstanceSettings{ + JSONData: json.RawMessage(settingsJSON), + } + + instance, err := newInstanceSettings(httpclient.NewProvider())(context.Background(), dsSettings) + require.Equal(t, int64(10), instance.(es.DatasourceInfo).MaxConcurrentShardRequests) + require.NoError(t, err) + }) + + t.Run("number maxConcurrentShardRequests", func(t *testing.T) { + dsInfo := datasourceInfo{ + TimeField: "@timestamp", + MaxConcurrentShardRequests: 10, + } + settingsJSON, err := json.Marshal(dsInfo) + require.NoError(t, err) + + dsSettings := backend.DataSourceInstanceSettings{ + JSONData: json.RawMessage(settingsJSON), + } + + instance, err := newInstanceSettings(httpclient.NewProvider())(context.Background(), dsSettings) + require.Equal(t, int64(10), instance.(es.DatasourceInfo).MaxConcurrentShardRequests) + require.NoError(t, err) + }) + + t.Run("zero maxConcurrentShardRequests", func(t *testing.T) { + dsInfo := datasourceInfo{ + TimeField: "@timestamp", + MaxConcurrentShardRequests: 0, + } + settingsJSON, err := json.Marshal(dsInfo) + require.NoError(t, err) + + dsSettings := backend.DataSourceInstanceSettings{ + JSONData: json.RawMessage(settingsJSON), + } + + instance, err := newInstanceSettings(httpclient.NewProvider())(context.Background(), dsSettings) + require.Equal(t, defaultMaxConcurrentShardRequests, instance.(es.DatasourceInfo).MaxConcurrentShardRequests) + require.NoError(t, err) + }) + + t.Run("negative maxConcurrentShardRequests", func(t *testing.T) { + dsInfo := datasourceInfo{ + TimeField: "@timestamp", + MaxConcurrentShardRequests: -10, + } + settingsJSON, err := json.Marshal(dsInfo) + require.NoError(t, err) + + dsSettings := backend.DataSourceInstanceSettings{ + JSONData: json.RawMessage(settingsJSON), + } + + instance, err := newInstanceSettings(httpclient.NewProvider())(context.Background(), dsSettings) + require.Equal(t, defaultMaxConcurrentShardRequests, instance.(es.DatasourceInfo).MaxConcurrentShardRequests) + require.NoError(t, err) + }) + + t.Run("float maxConcurrentShardRequests", func(t *testing.T) { + dsInfo := datasourceInfo{ + TimeField: "@timestamp", + MaxConcurrentShardRequests: 10.5, + } + settingsJSON, err := json.Marshal(dsInfo) + require.NoError(t, err) + + dsSettings := backend.DataSourceInstanceSettings{ + JSONData: json.RawMessage(settingsJSON), + } + + instance, err := newInstanceSettings(httpclient.NewProvider())(context.Background(), dsSettings) + require.Equal(t, int64(10), instance.(es.DatasourceInfo).MaxConcurrentShardRequests) + require.NoError(t, err) + }) + + t.Run("invalid maxConcurrentShardRequests", func(t *testing.T) { + dsInfo := datasourceInfo{ + TimeField: "@timestamp", + MaxConcurrentShardRequests: "invalid", + } + settingsJSON, err := json.Marshal(dsInfo) + require.NoError(t, err) + + dsSettings := backend.DataSourceInstanceSettings{ + JSONData: json.RawMessage(settingsJSON), + } + + instance, err := newInstanceSettings(httpclient.NewProvider())(context.Background(), dsSettings) + require.Equal(t, defaultMaxConcurrentShardRequests, instance.(es.DatasourceInfo).MaxConcurrentShardRequests) + require.NoError(t, err) + }) + }) } func TestCreateElasticsearchURL(t *testing.T) {