Loki: Run queries in parallel behind lokiRunQueriesInParallel feature toggle (#74064)

* issue loki queries in parallel so total query time is only the slowest query rather than the sum of all query times.

* Fix lint

* Add running of queries in parallel behind feature toggle to test the functonality before release

* Add span end

* Move shared logic to separate function

* Add logging and tracing around running of all queries

---------

Co-authored-by: Ivana Huckova <ivana.huckova@gmail.com>
This commit is contained in:
Travis Patterson 2023-09-19 03:34:01 -06:00 committed by GitHub
parent d7ed1e9379
commit 98aa7db64a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 70 additions and 27 deletions

View File

@ -124,6 +124,7 @@ export interface FeatureToggles {
newBrowseDashboards?: boolean; newBrowseDashboards?: boolean;
sseGroupByDatasource?: boolean; sseGroupByDatasource?: boolean;
requestInstrumentationStatusSource?: boolean; requestInstrumentationStatusSource?: boolean;
lokiRunQueriesInParallel?: boolean;
wargamesTesting?: boolean; wargamesTesting?: boolean;
alertingInsights?: boolean; alertingInsights?: boolean;
} }

View File

@ -737,6 +737,13 @@ var (
FrontendOnly: false, FrontendOnly: false,
Owner: grafanaPluginsPlatformSquad, Owner: grafanaPluginsPlatformSquad,
}, },
{
Name: "lokiRunQueriesInParallel",
Description: "Enables running Loki queries in parallel",
Stage: FeatureStagePrivatePreview,
FrontendOnly: false,
Owner: grafanaObservabilityLogsSquad,
},
{ {
Name: "wargamesTesting", Name: "wargamesTesting",
Description: "Placeholder feature flag for internal testing", Description: "Placeholder feature flag for internal testing",

View File

@ -105,5 +105,6 @@ reportingRetries,preview,@grafana/sharing-squad,false,false,true,false
newBrowseDashboards,preview,@grafana/grafana-frontend-platform,false,false,false,true newBrowseDashboards,preview,@grafana/grafana-frontend-platform,false,false,false,true
sseGroupByDatasource,experimental,@grafana/observability-metrics,false,false,false,false sseGroupByDatasource,experimental,@grafana/observability-metrics,false,false,false,false
requestInstrumentationStatusSource,experimental,@grafana/plugins-platform-backend,false,false,false,false requestInstrumentationStatusSource,experimental,@grafana/plugins-platform-backend,false,false,false,false
lokiRunQueriesInParallel,privatePreview,@grafana/observability-logs,false,false,false,false
wargamesTesting,experimental,@grafana/hosted-grafana-team,false,false,false,false wargamesTesting,experimental,@grafana/hosted-grafana-team,false,false,false,false
alertingInsights,experimental,@grafana/alerting-squad,false,false,false,true alertingInsights,experimental,@grafana/alerting-squad,false,false,false,true

1 Name Stage Owner requiresDevMode RequiresLicense RequiresRestart FrontendOnly
105 newBrowseDashboards preview @grafana/grafana-frontend-platform false false false true
106 sseGroupByDatasource experimental @grafana/observability-metrics false false false false
107 requestInstrumentationStatusSource experimental @grafana/plugins-platform-backend false false false false
108 lokiRunQueriesInParallel privatePreview @grafana/observability-logs false false false false
109 wargamesTesting experimental @grafana/hosted-grafana-team false false false false
110 alertingInsights experimental @grafana/alerting-squad false false false true

View File

@ -431,6 +431,10 @@ const (
// Include a status source label for request metrics and logs // Include a status source label for request metrics and logs
FlagRequestInstrumentationStatusSource = "requestInstrumentationStatusSource" FlagRequestInstrumentationStatusSource = "requestInstrumentationStatusSource"
// FlagLokiRunQueriesInParallel
// Enables running Loki queries in parallel
FlagLokiRunQueriesInParallel = "lokiRunQueriesInParallel"
// FlagWargamesTesting // FlagWargamesTesting
// Placeholder feature flag for internal testing // Placeholder feature flag for internal testing
FlagWargamesTesting = "wargamesTesting" FlagWargamesTesting = "wargamesTesting"

View File

@ -10,6 +10,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource" "github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt" "github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
@ -174,10 +175,10 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
logsDataplane: s.features.IsEnabled(featuremgmt.FlagLokiLogsDataplane), logsDataplane: s.features.IsEnabled(featuremgmt.FlagLokiLogsDataplane),
} }
return queryData(ctx, req, dsInfo, responseOpts, s.tracer, logger) return queryData(ctx, req, dsInfo, responseOpts, s.tracer, logger, s.features.IsEnabled(featuremgmt.FlagLokiRunQueriesInParallel))
} }
func queryData(ctx context.Context, req *backend.QueryDataRequest, dsInfo *datasourceInfo, responseOpts ResponseOpts, tracer tracing.Tracer, plog log.Logger) (*backend.QueryDataResponse, error) { func queryData(ctx context.Context, req *backend.QueryDataRequest, dsInfo *datasourceInfo, responseOpts ResponseOpts, tracer tracing.Tracer, plog log.Logger, runInParallel bool) (*backend.QueryDataResponse, error) {
result := backend.NewQueryDataResponse() result := backend.NewQueryDataResponse()
api := newLokiAPI(dsInfo.HTTPClient, dsInfo.URL, plog, tracer) api := newLokiAPI(dsInfo.HTTPClient, dsInfo.URL, plog, tracer)
@ -188,34 +189,63 @@ func queryData(ctx context.Context, req *backend.QueryDataRequest, dsInfo *datas
plog.Error("Failed to prepare request to Loki", "error", err, "duration", time.Since(start), "queriesLength", len(queries), "stage", stagePrepareRequest) plog.Error("Failed to prepare request to Loki", "error", err, "duration", time.Since(start), "queriesLength", len(queries), "stage", stagePrepareRequest)
return result, err return result, err
} }
plog.Info("Prepared request to Loki", "duration", time.Since(start), "queriesLength", len(queries), "stage", stagePrepareRequest)
for _, query := range queries { plog.Info("Prepared request to Loki", "duration", time.Since(start), "queriesLength", len(queries), "stage", stagePrepareRequest, "runInParallel", runInParallel)
ctx, span := tracer.Start(ctx, "datasource.loki.queryData.runQuery")
span.SetAttributes("expr", query.Expr, attribute.Key("expr").String(query.Expr))
span.SetAttributes("start_unixnano", query.Start, attribute.Key("start_unixnano").Int64(query.Start.UnixNano()))
span.SetAttributes("stop_unixnano", query.End, attribute.Key("stop_unixnano").Int64(query.End.UnixNano()))
if req.GetHTTPHeader("X-Query-Group-Id") != "" { ctx, span := tracer.Start(ctx, "datasource.loki.queryData.runQueries")
span.SetAttributes("query_group_id", req.GetHTTPHeader("X-Query-Group-Id"), attribute.Key("query_group_id").String(req.GetHTTPHeader("X-Query-Group-Id"))) span.SetAttributes("runInParallel", runInParallel, attribute.Key("runInParallel").Bool(runInParallel))
} span.SetAttributes("queriesLength", len(queries), attribute.Key("queriesLength").Int((len(queries))))
if req.GetHTTPHeader("X-Query-Group-Id") != "" {
frames, err := runQuery(ctx, api, query, responseOpts, plog) span.SetAttributes("query_group_id", req.GetHTTPHeader("X-Query-Group-Id"), attribute.Key("query_group_id").String(req.GetHTTPHeader("X-Query-Group-Id")))
queryRes := backend.DataResponse{}
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
queryRes.Error = err
} else {
queryRes.Frames = frames
}
result.Responses[query.RefID] = queryRes
span.End()
} }
return result, nil defer span.End()
start = time.Now()
// We are testing running of queries in parallel behind feature flag
if runInParallel {
resultLock := sync.Mutex{}
err = concurrency.ForEachJob(ctx, len(queries), 10, func(ctx context.Context, idx int) error {
query := queries[idx]
queryRes := executeQuery(ctx, query, req, runInParallel, api, responseOpts, tracer, plog)
resultLock.Lock()
defer resultLock.Unlock()
result.Responses[query.RefID] = queryRes
return nil // errors are saved per-query,always return nil
})
} else {
for _, query := range queries {
queryRes := executeQuery(ctx, query, req, runInParallel, api, responseOpts, tracer, plog)
result.Responses[query.RefID] = queryRes
}
}
plog.Debug("Executed queries", "duration", time.Since(start), "queriesLength", len(queries), "runInParallel", runInParallel)
return result, err
}
func executeQuery(ctx context.Context, query *lokiQuery, req *backend.QueryDataRequest, runInParallel bool, api *LokiAPI, responseOpts ResponseOpts, tracer tracing.Tracer, plog log.Logger) backend.DataResponse {
ctx, span := tracer.Start(ctx, "datasource.loki.queryData.runQueries.runQuery")
span.SetAttributes("runInParallel", runInParallel, attribute.Key("runInParallel").Bool(runInParallel))
span.SetAttributes("expr", query.Expr, attribute.Key("expr").String(query.Expr))
span.SetAttributes("start_unixnano", query.Start, attribute.Key("start_unixnano").Int64(query.Start.UnixNano()))
span.SetAttributes("stop_unixnano", query.End, attribute.Key("stop_unixnano").Int64(query.End.UnixNano()))
if req.GetHTTPHeader("X-Query-Group-Id") != "" {
span.SetAttributes("query_group_id", req.GetHTTPHeader("X-Query-Group-Id"), attribute.Key("query_group_id").String(req.GetHTTPHeader("X-Query-Group-Id")))
}
defer span.End()
frames, err := runQuery(ctx, api, query, responseOpts, plog)
queryRes := backend.DataResponse{}
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
queryRes.Error = err
} else {
queryRes.Frames = frames
}
return queryRes
} }
// we extracted this part of the functionality to make it easy to unit-test it // we extracted this part of the functionality to make it easy to unit-test it