InfluxDB: Run queries in parallel behind influxdbRunQueriesInParallel feature toggle (#81209)

* create the feature flag

* bring the concurrency in to the play

* Update feature flag

* Use concurrency number from settings

* update influxdb dependency

* use ConcurrentQueryCount from plugin-sdk-go

* use helper method for concurrent query count

* log the error

* add value guard

* add unit tests

* handle concurrency error
This commit is contained in:
ismail simsek 2024-02-01 11:58:24 +01:00 committed by GitHub
parent c43a170009
commit 536153c336
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 136 additions and 33 deletions

View File

@ -63,6 +63,7 @@ export interface FeatureToggles {
prometheusMetricEncyclopedia?: boolean;
influxdbBackendMigration?: boolean;
influxqlStreamingParser?: boolean;
influxdbRunQueriesInParallel?: boolean;
clientTokenRotation?: boolean;
prometheusDataplane?: boolean;
lokiMetricDataplane?: boolean;

View File

@ -48,12 +48,14 @@ type Cfg struct {
AngularSupportEnabled bool
HideAngularDeprecation []string
ConcurrentQueryCount int
}
func NewCfg(devMode bool, pluginsPath string, pluginSettings setting.PluginSettings, pluginsAllowUnsigned []string,
awsAllowedAuthProviders []string, awsAssumeRoleEnabled bool, awsExternalId string, azure *azsettings.AzureSettings, secureSocksDSProxy setting.SecureSocksDSProxySettings,
grafanaVersion string, logDatasourceRequests bool, pluginsCDNURLTemplate string, appURL string, appSubURL string, tracing Tracing, features featuremgmt.FeatureToggles, angularSupportEnabled bool,
grafanaComURL string, disablePlugins []string, hideAngularDeprecation []string, forwardHostEnvVars []string) *Cfg {
grafanaComURL string, disablePlugins []string, hideAngularDeprecation []string, forwardHostEnvVars []string, concurrentQueryCount int) *Cfg {
return &Cfg{
log: log.New("plugin.cfg"),
PluginsPath: pluginsPath,
@ -77,5 +79,6 @@ func NewCfg(devMode bool, pluginsPath string, pluginSettings setting.PluginSetti
AngularSupportEnabled: angularSupportEnabled,
HideAngularDeprecation: hideAngularDeprecation,
ForwardHostEnvVars: forwardHostEnvVars,
ConcurrentQueryCount: concurrentQueryCount,
}
}

View File

@ -14,6 +14,7 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/proxy"
"github.com/grafana/grafana-plugin-sdk-go/experimental/featuretoggles"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/plugins/auth"
"github.com/grafana/grafana/pkg/plugins/config"
@ -105,14 +106,17 @@ func (s *Service) GetConfigMap(ctx context.Context, pluginID string, _ *auth.Ext
if s.cfg.GrafanaAppURL != "" {
m[backend.AppURL] = s.cfg.GrafanaAppURL
}
if s.cfg.ConcurrentQueryCount != 0 {
m[backend.ConcurrentQueryCount] = strconv.Itoa(s.cfg.ConcurrentQueryCount)
}
// TODO add support via plugin SDK
//if externalService != nil {
// if externalService != nil {
// m[oauthtokenretriever.AppURL] = s.cfg.GrafanaAppURL
// m[oauthtokenretriever.AppClientID] = externalService.ClientID
// m[oauthtokenretriever.AppClientSecret] = externalService.ClientSecret
// m[oauthtokenretriever.AppPrivateKey] = externalService.PrivateKey
//}
// }
if s.cfg.Features != nil {
enabledFeatures := s.cfg.Features.GetEnabled(ctx)
@ -126,15 +130,15 @@ func (s *Service) GetConfigMap(ctx context.Context, pluginID string, _ *auth.Ext
}
}
// TODO add support via plugin SDK
//if s.cfg.AWSAssumeRoleEnabled {
// if s.cfg.AWSAssumeRoleEnabled {
// m[awsds.AssumeRoleEnabledEnvVarKeyName] = "true"
//}
//if len(s.cfg.AWSAllowedAuthProviders) > 0 {
// }
// if len(s.cfg.AWSAllowedAuthProviders) > 0 {
// m[awsds.AllowedAuthProvidersEnvVarKeyName] = strings.Join(s.cfg.AWSAllowedAuthProviders, ",")
//}
//if s.cfg.AWSExternalId != "" {
// }
// if s.cfg.AWSExternalId != "" {
// m[awsds.GrafanaAssumeRoleExternalIdKeyName] = s.cfg.AWSExternalId
//}
// }
if s.cfg.ProxySettings.Enabled {
m[proxy.PluginSecureSocksProxyEnabled] = "true"
@ -198,10 +202,10 @@ func (s *Service) GetConfigMap(ctx context.Context, pluginID string, _ *auth.Ext
}
// TODO add support via plugin SDK
//ps := getPluginSettings(pluginID, s.cfg)
//for k, v := range ps {
// ps := getPluginSettings(pluginID, s.cfg)
// for k, v := range ps {
// m[fmt.Sprintf("%s_%s", customConfigPrefix, strings.ToUpper(k))] = v
//}
// }
return m
}

View File

@ -798,6 +798,33 @@ func TestService_GetConfigMap_appURL(t *testing.T) {
})
}
func TestService_GetConfigMap_concurrentQueryCount(t *testing.T) {
t.Run("Uses the configured concurrent query count", func(t *testing.T) {
s := &Service{
cfg: &config.Cfg{
ConcurrentQueryCount: 42,
},
}
require.Equal(t, map[string]string{"GF_CONCURRENT_QUERY_COUNT": "42"}, s.GetConfigMap(context.Background(), "", nil))
})
t.Run("Doesn't set the concurrent query count if it is not in the config", func(t *testing.T) {
s := &Service{
cfg: &config.Cfg{},
}
require.Equal(t, map[string]string{}, s.GetConfigMap(context.Background(), "", nil))
})
t.Run("Doesn't set the concurrent query count if it is zero", func(t *testing.T) {
s := &Service{
cfg: &config.Cfg{
ConcurrentQueryCount: 0,
},
}
require.Equal(t, map[string]string{}, s.GetConfigMap(context.Background(), "", nil))
})
}
func TestService_GetConfigMap_azure(t *testing.T) {
azSettings := &azsettings.AzureSettings{
Cloud: azsettings.AzurePublic,

View File

@ -389,6 +389,14 @@ var (
Owner: grafanaObservabilityMetricsSquad,
Created: time.Date(2023, time.November, 29, 12, 0, 0, 0, time.UTC),
},
{
Name: "influxdbRunQueriesInParallel",
Description: "Enables running InfluxDB Influxql queries in parallel",
Stage: FeatureStagePrivatePreview,
FrontendOnly: false,
Owner: grafanaObservabilityMetricsSquad,
Created: time.Date(2024, time.January, 29, 12, 0, 0, 0, time.UTC),
},
{
Name: "clientTokenRotation",
Description: "Replaces the current in-request token rotation so that the client initiates the rotation",

View File

@ -44,6 +44,7 @@ individualCookiePreferences,experimental,@grafana/backend-platform,2023-02-23,fa
prometheusMetricEncyclopedia,GA,@grafana/observability-metrics,2023-03-07,false,false,true
influxdbBackendMigration,GA,@grafana/observability-metrics,2023-03-15,false,false,true
influxqlStreamingParser,experimental,@grafana/observability-metrics,2023-11-29,false,false,false
influxdbRunQueriesInParallel,privatePreview,@grafana/observability-metrics,2024-01-29,false,false,false
clientTokenRotation,GA,@grafana/identity-access-team,2023-03-23,false,false,false
prometheusDataplane,GA,@grafana/observability-metrics,2023-03-29,false,false,false
lokiMetricDataplane,GA,@grafana/observability-logs,2023-04-13,false,false,false

1 Name Stage Owner Created requiresDevMode RequiresRestart FrontendOnly
44 prometheusMetricEncyclopedia GA @grafana/observability-metrics 2023-03-07 false false true
45 influxdbBackendMigration GA @grafana/observability-metrics 2023-03-15 false false true
46 influxqlStreamingParser experimental @grafana/observability-metrics 2023-11-29 false false false
47 influxdbRunQueriesInParallel privatePreview @grafana/observability-metrics 2024-01-29 false false false
48 clientTokenRotation GA @grafana/identity-access-team 2023-03-23 false false false
49 prometheusDataplane GA @grafana/observability-metrics 2023-03-29 false false false
50 lokiMetricDataplane GA @grafana/observability-logs 2023-04-13 false false false

View File

@ -187,6 +187,10 @@ const (
// Enable streaming JSON parser for InfluxDB datasource InfluxQL query language
FlagInfluxqlStreamingParser = "influxqlStreamingParser"
// FlagInfluxdbRunQueriesInParallel
// Enables running InfluxDB Influxql queries in parallel
FlagInfluxdbRunQueriesInParallel = "influxdbRunQueriesInParallel"
// FlagClientTokenRotation
// Replaces the current in-request token rotation so that the client initiates the rotation
FlagClientTokenRotation = "clientTokenRotation"

View File

@ -49,6 +49,7 @@ func ProvideConfig(settingProvider setting.Provider, grafanaCfg *setting.Cfg, fe
grafanaCfg.DisablePlugins,
grafanaCfg.HideAngularDeprecation,
grafanaCfg.ForwardHostEnvVars,
grafanaCfg.ConcurrentQueryCount,
), nil
}

View File

@ -3,11 +3,14 @@ package influxql
import (
"context"
"errors"
"fmt"
"net/http"
"net/url"
"path"
"strings"
"sync"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"go.opentelemetry.io/otel/trace"
@ -30,40 +33,91 @@ var (
func Query(ctx context.Context, tracer trace.Tracer, dsInfo *models.DatasourceInfo, req *backend.QueryDataRequest, features featuremgmt.FeatureToggles) (*backend.QueryDataResponse, error) {
logger := glog.FromContext(ctx)
response := backend.NewQueryDataResponse()
var err error
for _, reqQuery := range req.Queries {
query, err := models.QueryParse(reqQuery)
// We are testing running of queries in parallel behind feature flag
if features.IsEnabled(ctx, featuremgmt.FlagInfluxdbRunQueriesInParallel) {
concurrentQueryCount, err := req.PluginContext.GrafanaConfig.ConcurrentQueryCount()
if err != nil {
return &backend.QueryDataResponse{}, err
logger.Debug(fmt.Sprintf("Concurrent Query Count read/parse error: %v", err), featuremgmt.FlagInfluxdbRunQueriesInParallel)
concurrentQueryCount = 10
}
rawQuery, err := query.Build(req)
if err != nil {
return &backend.QueryDataResponse{}, err
}
responseLock := sync.Mutex{}
err = concurrency.ForEachJob(ctx, len(req.Queries), concurrentQueryCount, func(ctx context.Context, idx int) error {
reqQuery := req.Queries[idx]
query, err := models.QueryParse(reqQuery)
if err != nil {
return err
}
query.RefID = reqQuery.RefID
query.RawQuery = rawQuery
rawQuery, err := query.Build(req)
if err != nil {
return err
}
if setting.Env == setting.Dev {
logger.Debug("Influxdb query", "raw query", rawQuery)
}
query.RefID = reqQuery.RefID
query.RawQuery = rawQuery
request, err := createRequest(ctx, logger, dsInfo, rawQuery, query.Policy)
if err != nil {
return &backend.QueryDataResponse{}, err
}
if setting.Env == setting.Dev {
logger.Debug("Influxdb query", "raw query", rawQuery)
}
resp, err := execute(ctx, tracer, dsInfo, logger, query, request, features.IsEnabled(ctx, featuremgmt.FlagInfluxqlStreamingParser))
request, err := createRequest(ctx, logger, dsInfo, rawQuery, query.Policy)
if err != nil {
return err
}
resp, err := execute(ctx, tracer, dsInfo, logger, query, request, features.IsEnabled(ctx, featuremgmt.FlagInfluxqlStreamingParser))
responseLock.Lock()
defer responseLock.Unlock()
if err != nil {
response.Responses[query.RefID] = backend.DataResponse{Error: err}
} else {
response.Responses[query.RefID] = resp
}
return nil // errors are saved per-query,always return nil
})
if err != nil {
response.Responses[query.RefID] = backend.DataResponse{Error: err}
} else {
response.Responses[query.RefID] = resp
logger.Debug("Influxdb concurrent query error", "concurrent query", err)
}
} else {
for _, reqQuery := range req.Queries {
query, err := models.QueryParse(reqQuery)
if err != nil {
return &backend.QueryDataResponse{}, err
}
rawQuery, err := query.Build(req)
if err != nil {
return &backend.QueryDataResponse{}, err
}
query.RefID = reqQuery.RefID
query.RawQuery = rawQuery
if setting.Env == setting.Dev {
logger.Debug("Influxdb query", "raw query", rawQuery)
}
request, err := createRequest(ctx, logger, dsInfo, rawQuery, query.Policy)
if err != nil {
return &backend.QueryDataResponse{}, err
}
resp, err := execute(ctx, tracer, dsInfo, logger, query, request, features.IsEnabled(ctx, featuremgmt.FlagInfluxqlStreamingParser))
if err != nil {
response.Responses[query.RefID] = backend.DataResponse{Error: err}
} else {
response.Responses[query.RefID] = resp
}
}
}
return response, nil
return response, err
}
func createRequest(ctx context.Context, logger log.Logger, dsInfo *models.DatasourceInfo, queryStr string, retentionPolicy string) (*http.Request, error) {