From c0260fd913bd8ed4bcba8651ea6ee8c8fd28144f Mon Sep 17 00:00:00 2001 From: bergquist Date: Thu, 21 Sep 2017 13:39:25 +0200 Subject: [PATCH] remove batch abstraction --- pkg/tsdb/batch.go | 93 -------------------------------------- pkg/tsdb/request.go | 61 ++++--------------------- pkg/tsdb/tsdb_test.go | 101 ------------------------------------------ 3 files changed, 9 insertions(+), 246 deletions(-) delete mode 100644 pkg/tsdb/batch.go diff --git a/pkg/tsdb/batch.go b/pkg/tsdb/batch.go deleted file mode 100644 index f743911ba64..00000000000 --- a/pkg/tsdb/batch.go +++ /dev/null @@ -1,93 +0,0 @@ -package tsdb - -import "context" - -type Batch struct { - DataSourceId int64 - Queries []*Query - Depends map[string]bool - Done bool - Started bool -} - -type BatchSlice []*Batch - -func newBatch(dsId int64, queries []*Query) *Batch { - return &Batch{ - DataSourceId: dsId, - Queries: queries, - Depends: make(map[string]bool), - } -} - -func (bg *Batch) process(ctx context.Context, resultChan chan *BatchResult, tsdbQuery *TsdbQuery) { - executor, err := getTsdbQueryEndpointFor(bg.Queries[0].DataSource) - - if err != nil { - bg.Done = true - result := &BatchResult{ - Error: err, - QueryResults: make(map[string]*QueryResult), - } - for _, query := range bg.Queries { - result.QueryResults[query.RefId] = &QueryResult{Error: result.Error} - } - resultChan <- result - return - } - - res := executor.Query(ctx, &TsdbQuery{ - Queries: bg.Queries, - TimeRange: tsdbQuery.TimeRange, - }) - bg.Done = true - resultChan <- res -} - -func (bg *Batch) addQuery(query *Query) { - bg.Queries = append(bg.Queries, query) -} - -func (bg *Batch) allDependenciesAreIn(res *Response) bool { - for key := range bg.Depends { - if _, exists := res.Results[key]; !exists { - return false - } - } - - return true -} - -func getBatches(req *TsdbQuery) (BatchSlice, error) { - batches := make(BatchSlice, 0) - - for _, query := range req.Queries { - if foundBatch := findMatchingBatchGroup(query, batches); foundBatch != nil { - foundBatch.addQuery(query) - } else { - newBatch := newBatch(query.DataSource.Id, []*Query{query}) - batches = append(batches, newBatch) - - for _, refId := range query.Depends { - for _, batch := range batches { - for _, batchQuery := range batch.Queries { - if batchQuery.RefId == refId { - newBatch.Depends[refId] = true - } - } - } - } - } - } - - return batches, nil -} - -func findMatchingBatchGroup(query *Query, batches BatchSlice) *Batch { - for _, batch := range batches { - if batch.DataSourceId == query.DataSource.Id { - return batch - } - } - return nil -} diff --git a/pkg/tsdb/request.go b/pkg/tsdb/request.go index a06236b0fe3..d29bffa88f6 100644 --- a/pkg/tsdb/request.go +++ b/pkg/tsdb/request.go @@ -7,63 +7,20 @@ import ( type HandleRequestFunc func(ctx context.Context, req *TsdbQuery) (*Response, error) func HandleRequest(ctx context.Context, req *TsdbQuery) (*Response, error) { - tsdbQuery := &TsdbQuery{ - Queries: req.Queries, - TimeRange: req.TimeRange, - } - - batches, err := getBatches(req) + //TODO niceify + endpoint, err := getTsdbQueryEndpointFor(req.Queries[0].DataSource) if err != nil { return nil, err } - currentlyExecuting := 0 - resultsChan := make(chan *BatchResult) + res := endpoint.Query(ctx, req) - for _, batch := range batches { - if len(batch.Depends) == 0 { - currentlyExecuting += 1 - batch.Started = true - go batch.process(ctx, resultsChan, tsdbQuery) - } + if res.Error != nil { + return nil, res.Error } - response := &Response{ - Results: make(map[string]*QueryResult), - } - - for currentlyExecuting != 0 { - select { - case batchResult := <-resultsChan: - currentlyExecuting -= 1 - - response.BatchTimings = append(response.BatchTimings, batchResult.Timings) - - if batchResult.Error != nil { - return nil, batchResult.Error - } - - for refId, result := range batchResult.QueryResults { - response.Results[refId] = result - } - - for _, batch := range batches { - // not interested in started batches - if batch.Started { - continue - } - - if batch.allDependenciesAreIn(response) { - currentlyExecuting += 1 - batch.Started = true - go batch.process(ctx, resultsChan, tsdbQuery) - } - } - case <-ctx.Done(): - return nil, ctx.Err() - } - } - - //response.Results = tsdbQuery.Results - return response, nil + return &Response{ + Results: res.QueryResults, + BatchTimings: []*BatchTiming{res.Timings}, + }, nil } diff --git a/pkg/tsdb/tsdb_test.go b/pkg/tsdb/tsdb_test.go index bb86caa4b2f..5e84a646a31 100644 --- a/pkg/tsdb/tsdb_test.go +++ b/pkg/tsdb/tsdb_test.go @@ -3,57 +3,12 @@ package tsdb import ( "context" "testing" - "time" "github.com/grafana/grafana/pkg/models" . "github.com/smartystreets/goconvey/convey" ) func TestMetricQuery(t *testing.T) { - - Convey("When batches groups for query", t, func() { - - Convey("Given 3 queries for 2 data sources", func() { - request := &TsdbQuery{ - Queries: []*Query{ - {RefId: "A", DataSource: &models.DataSource{Id: 1}}, - {RefId: "B", DataSource: &models.DataSource{Id: 1}}, - {RefId: "C", DataSource: &models.DataSource{Id: 2}}, - }, - } - - batches, err := getBatches(request) - So(err, ShouldBeNil) - - Convey("Should group into two batches", func() { - So(len(batches), ShouldEqual, 2) - }) - }) - - Convey("Given query 2 depends on query 1", func() { - request := &TsdbQuery{ - Queries: []*Query{ - {RefId: "A", DataSource: &models.DataSource{Id: 1}}, - {RefId: "B", DataSource: &models.DataSource{Id: 2}}, - {RefId: "C", DataSource: &models.DataSource{Id: 3}, Depends: []string{"A", "B"}}, - }, - } - - batches, err := getBatches(request) - So(err, ShouldBeNil) - - Convey("Should return three batch groups", func() { - So(len(batches), ShouldEqual, 3) - }) - - Convey("Group 3 should have group 1 and 2 as dependencies", func() { - So(batches[2].Depends["A"], ShouldEqual, true) - So(batches[2].Depends["B"], ShouldEqual, true) - }) - - }) - }) - Convey("When executing request with one query", t, func() { req := &TsdbQuery{ Queries: []*Query{ @@ -99,23 +54,6 @@ func TestMetricQuery(t *testing.T) { }) - Convey("When executing one request with three queries from different datasources", t, func() { - req := &TsdbQuery{ - Queries: []*Query{ - {RefId: "A", DataSource: &models.DataSource{Id: 1, Type: "test"}}, - {RefId: "B", DataSource: &models.DataSource{Id: 1, Type: "test"}}, - {RefId: "C", DataSource: &models.DataSource{Id: 2, Type: "test"}}, - }, - } - - res, err := HandleRequest(context.TODO(), req) - So(err, ShouldBeNil) - - Convey("Should have been batched in two requests", func() { - So(len(res.BatchTimings), ShouldEqual, 2) - }) - }) - Convey("When query uses data source of unknown type", t, func() { req := &TsdbQuery{ Queries: []*Query{ @@ -126,45 +64,6 @@ func TestMetricQuery(t *testing.T) { _, err := HandleRequest(context.TODO(), req) So(err, ShouldNotBeNil) }) - - Convey("When executing request that depend on other query", t, func() { - req := &TsdbQuery{ - Queries: []*Query{ - { - RefId: "A", DataSource: &models.DataSource{Id: 1, Type: "test"}, - }, - { - RefId: "B", DataSource: &models.DataSource{Id: 2, Type: "test"}, Depends: []string{"A"}, - }, - }, - } - - fakeExecutor := registerFakeExecutor() - fakeExecutor.HandleQuery("A", func(c *TsdbQuery) *QueryResult { - time.Sleep(10 * time.Millisecond) - return &QueryResult{ - Series: TimeSeriesSlice{ - &TimeSeries{Name: "Ares"}, - }} - }) - fakeExecutor.HandleQuery("B", func(c *TsdbQuery) *QueryResult { - return &QueryResult{ - Series: TimeSeriesSlice{ - &TimeSeries{Name: "Bres+Ares"}, - }} - }) - - res, err := HandleRequest(context.TODO(), req) - So(err, ShouldBeNil) - - Convey("Should have been batched in two requests", func() { - So(len(res.BatchTimings), ShouldEqual, 2) - }) - - Convey("Query B should have access to Query A results", func() { - So(res.Results["B"].Series[0].Name, ShouldEqual, "Bres+Ares") - }) - }) } func registerFakeExecutor() *FakeExecutor {