From 48aef0c50e07b791196caa4ac40cef2c935ef288 Mon Sep 17 00:00:00 2001 From: Mitsuhiro Tanda Date: Tue, 16 Oct 2018 11:39:10 +0900 Subject: [PATCH] fix concurrent map writes --- pkg/tsdb/cloudwatch/cloudwatch.go | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/pkg/tsdb/cloudwatch/cloudwatch.go b/pkg/tsdb/cloudwatch/cloudwatch.go index 61bbc04394a..437457df52a 100644 --- a/pkg/tsdb/cloudwatch/cloudwatch.go +++ b/pkg/tsdb/cloudwatch/cloudwatch.go @@ -86,9 +86,10 @@ func (e *CloudWatchExecutor) Query(ctx context.Context, dsInfo *models.DataSourc } func (e *CloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) { - result := &tsdb.Response{ + results := &tsdb.Response{ Results: make(map[string]*tsdb.QueryResult), } + resultChan := make(chan *tsdb.QueryResult, len(queryContext.Queries)) eg, ectx := errgroup.WithContext(ctx) @@ -102,10 +103,10 @@ func (e *CloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryCo RefId := queryContext.Queries[i].RefId query, err := parseQuery(queryContext.Queries[i].Model) if err != nil { - result.Results[RefId] = &tsdb.QueryResult{ + results.Results[RefId] = &tsdb.QueryResult{ Error: err, } - return result, nil + return results, nil } query.RefId = RefId @@ -118,10 +119,10 @@ func (e *CloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryCo } if query.Id == "" && query.Expression != "" { - result.Results[query.RefId] = &tsdb.QueryResult{ + results.Results[query.RefId] = &tsdb.QueryResult{ Error: fmt.Errorf("Invalid query: id should be set if using expression"), } - return result, nil + return results, nil } eg.Go(func() error { @@ -130,12 +131,13 @@ func (e *CloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryCo return err } if err != nil { - result.Results[query.RefId] = &tsdb.QueryResult{ + resultChan <- &tsdb.QueryResult{ + RefId: query.RefId, Error: err, } return nil } - result.Results[queryRes.RefId] = queryRes + resultChan <- queryRes return nil }) } @@ -149,10 +151,10 @@ func (e *CloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryCo return err } for _, queryRes := range queryResponses { - result.Results[queryRes.RefId] = queryRes if err != nil { - result.Results[queryRes.RefId].Error = err + queryRes.Error = err } + resultChan <- queryRes } return nil }) @@ -162,8 +164,12 @@ func (e *CloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryCo if err := eg.Wait(); err != nil { return nil, err } + close(resultChan) + for result := range resultChan { + results.Results[result.RefId] = result + } - return result, nil + return results, nil } func (e *CloudWatchExecutor) executeQuery(ctx context.Context, query *CloudWatchQuery, queryContext *tsdb.TsdbQuery) (*tsdb.QueryResult, error) {