diff --git a/packages/grafana-data/src/types/featureToggles.gen.ts b/packages/grafana-data/src/types/featureToggles.gen.ts index 5282959cfb6..1f4bb6c141f 100644 --- a/packages/grafana-data/src/types/featureToggles.gen.ts +++ b/packages/grafana-data/src/types/featureToggles.gen.ts @@ -124,6 +124,7 @@ export interface FeatureToggles { newBrowseDashboards?: boolean; sseGroupByDatasource?: boolean; requestInstrumentationStatusSource?: boolean; + lokiRunQueriesInParallel?: boolean; wargamesTesting?: boolean; alertingInsights?: boolean; } diff --git a/pkg/services/featuremgmt/registry.go b/pkg/services/featuremgmt/registry.go index d4fb605e641..bf615c9b1ef 100644 --- a/pkg/services/featuremgmt/registry.go +++ b/pkg/services/featuremgmt/registry.go @@ -737,6 +737,13 @@ var ( FrontendOnly: false, Owner: grafanaPluginsPlatformSquad, }, + { + Name: "lokiRunQueriesInParallel", + Description: "Enables running Loki queries in parallel", + Stage: FeatureStagePrivatePreview, + FrontendOnly: false, + Owner: grafanaObservabilityLogsSquad, + }, { Name: "wargamesTesting", Description: "Placeholder feature flag for internal testing", diff --git a/pkg/services/featuremgmt/toggles_gen.csv b/pkg/services/featuremgmt/toggles_gen.csv index 965a79f4495..c86e59631bb 100644 --- a/pkg/services/featuremgmt/toggles_gen.csv +++ b/pkg/services/featuremgmt/toggles_gen.csv @@ -105,5 +105,6 @@ reportingRetries,preview,@grafana/sharing-squad,false,false,true,false newBrowseDashboards,preview,@grafana/grafana-frontend-platform,false,false,false,true sseGroupByDatasource,experimental,@grafana/observability-metrics,false,false,false,false requestInstrumentationStatusSource,experimental,@grafana/plugins-platform-backend,false,false,false,false +lokiRunQueriesInParallel,privatePreview,@grafana/observability-logs,false,false,false,false wargamesTesting,experimental,@grafana/hosted-grafana-team,false,false,false,false alertingInsights,experimental,@grafana/alerting-squad,false,false,false,true diff --git a/pkg/services/featuremgmt/toggles_gen.go b/pkg/services/featuremgmt/toggles_gen.go index df546f241ff..ff9ae1b5993 100644 --- a/pkg/services/featuremgmt/toggles_gen.go +++ b/pkg/services/featuremgmt/toggles_gen.go @@ -431,6 +431,10 @@ const ( // Include a status source label for request metrics and logs FlagRequestInstrumentationStatusSource = "requestInstrumentationStatusSource" + // FlagLokiRunQueriesInParallel + // Enables running Loki queries in parallel + FlagLokiRunQueriesInParallel = "lokiRunQueriesInParallel" + // FlagWargamesTesting // Placeholder feature flag for internal testing FlagWargamesTesting = "wargamesTesting" diff --git a/pkg/tsdb/loki/loki.go b/pkg/tsdb/loki/loki.go index 33e2fd9dd3d..1883b2ef19f 100644 --- a/pkg/tsdb/loki/loki.go +++ b/pkg/tsdb/loki/loki.go @@ -10,6 +10,7 @@ import ( "sync" "time" + "github.com/grafana/dskit/concurrency" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend/datasource" "github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt" @@ -174,10 +175,10 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) logsDataplane: s.features.IsEnabled(featuremgmt.FlagLokiLogsDataplane), } - return queryData(ctx, req, dsInfo, responseOpts, s.tracer, logger) + return queryData(ctx, req, dsInfo, responseOpts, s.tracer, logger, s.features.IsEnabled(featuremgmt.FlagLokiRunQueriesInParallel)) } -func queryData(ctx context.Context, req *backend.QueryDataRequest, dsInfo *datasourceInfo, responseOpts ResponseOpts, tracer tracing.Tracer, plog log.Logger) (*backend.QueryDataResponse, error) { +func queryData(ctx context.Context, req *backend.QueryDataRequest, dsInfo *datasourceInfo, responseOpts ResponseOpts, tracer tracing.Tracer, plog log.Logger, runInParallel bool) (*backend.QueryDataResponse, error) { result := backend.NewQueryDataResponse() api := newLokiAPI(dsInfo.HTTPClient, dsInfo.URL, plog, tracer) @@ -188,34 +189,63 @@ func queryData(ctx context.Context, req *backend.QueryDataRequest, dsInfo *datas plog.Error("Failed to prepare request to Loki", "error", err, "duration", time.Since(start), "queriesLength", len(queries), "stage", stagePrepareRequest) return result, err } - plog.Info("Prepared request to Loki", "duration", time.Since(start), "queriesLength", len(queries), "stage", stagePrepareRequest) - for _, query := range queries { - ctx, span := tracer.Start(ctx, "datasource.loki.queryData.runQuery") - span.SetAttributes("expr", query.Expr, attribute.Key("expr").String(query.Expr)) - span.SetAttributes("start_unixnano", query.Start, attribute.Key("start_unixnano").Int64(query.Start.UnixNano())) - span.SetAttributes("stop_unixnano", query.End, attribute.Key("stop_unixnano").Int64(query.End.UnixNano())) + plog.Info("Prepared request to Loki", "duration", time.Since(start), "queriesLength", len(queries), "stage", stagePrepareRequest, "runInParallel", runInParallel) - if req.GetHTTPHeader("X-Query-Group-Id") != "" { - span.SetAttributes("query_group_id", req.GetHTTPHeader("X-Query-Group-Id"), attribute.Key("query_group_id").String(req.GetHTTPHeader("X-Query-Group-Id"))) - } - - frames, err := runQuery(ctx, api, query, responseOpts, plog) - - queryRes := backend.DataResponse{} - - if err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - queryRes.Error = err - } else { - queryRes.Frames = frames - } - - result.Responses[query.RefID] = queryRes - span.End() + ctx, span := tracer.Start(ctx, "datasource.loki.queryData.runQueries") + span.SetAttributes("runInParallel", runInParallel, attribute.Key("runInParallel").Bool(runInParallel)) + span.SetAttributes("queriesLength", len(queries), attribute.Key("queriesLength").Int((len(queries)))) + if req.GetHTTPHeader("X-Query-Group-Id") != "" { + span.SetAttributes("query_group_id", req.GetHTTPHeader("X-Query-Group-Id"), attribute.Key("query_group_id").String(req.GetHTTPHeader("X-Query-Group-Id"))) } - return result, nil + defer span.End() + start = time.Now() + + // We are testing running of queries in parallel behind feature flag + if runInParallel { + resultLock := sync.Mutex{} + err = concurrency.ForEachJob(ctx, len(queries), 10, func(ctx context.Context, idx int) error { + query := queries[idx] + queryRes := executeQuery(ctx, query, req, runInParallel, api, responseOpts, tracer, plog) + + resultLock.Lock() + defer resultLock.Unlock() + result.Responses[query.RefID] = queryRes + return nil // errors are saved per-query,always return nil + }) + } else { + for _, query := range queries { + queryRes := executeQuery(ctx, query, req, runInParallel, api, responseOpts, tracer, plog) + result.Responses[query.RefID] = queryRes + } + } + plog.Debug("Executed queries", "duration", time.Since(start), "queriesLength", len(queries), "runInParallel", runInParallel) + return result, err +} + +func executeQuery(ctx context.Context, query *lokiQuery, req *backend.QueryDataRequest, runInParallel bool, api *LokiAPI, responseOpts ResponseOpts, tracer tracing.Tracer, plog log.Logger) backend.DataResponse { + ctx, span := tracer.Start(ctx, "datasource.loki.queryData.runQueries.runQuery") + span.SetAttributes("runInParallel", runInParallel, attribute.Key("runInParallel").Bool(runInParallel)) + span.SetAttributes("expr", query.Expr, attribute.Key("expr").String(query.Expr)) + span.SetAttributes("start_unixnano", query.Start, attribute.Key("start_unixnano").Int64(query.Start.UnixNano())) + span.SetAttributes("stop_unixnano", query.End, attribute.Key("stop_unixnano").Int64(query.End.UnixNano())) + if req.GetHTTPHeader("X-Query-Group-Id") != "" { + span.SetAttributes("query_group_id", req.GetHTTPHeader("X-Query-Group-Id"), attribute.Key("query_group_id").String(req.GetHTTPHeader("X-Query-Group-Id"))) + } + + defer span.End() + + frames, err := runQuery(ctx, api, query, responseOpts, plog) + queryRes := backend.DataResponse{} + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + queryRes.Error = err + } else { + queryRes.Frames = frames + } + + return queryRes } // we extracted this part of the functionality to make it easy to unit-test it