mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
remove batch abstraction
This commit is contained in:
@@ -1,93 +0,0 @@
|
|||||||
package tsdb
|
|
||||||
|
|
||||||
import "context"
|
|
||||||
|
|
||||||
type Batch struct {
|
|
||||||
DataSourceId int64
|
|
||||||
Queries []*Query
|
|
||||||
Depends map[string]bool
|
|
||||||
Done bool
|
|
||||||
Started bool
|
|
||||||
}
|
|
||||||
|
|
||||||
type BatchSlice []*Batch
|
|
||||||
|
|
||||||
func newBatch(dsId int64, queries []*Query) *Batch {
|
|
||||||
return &Batch{
|
|
||||||
DataSourceId: dsId,
|
|
||||||
Queries: queries,
|
|
||||||
Depends: make(map[string]bool),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bg *Batch) process(ctx context.Context, resultChan chan *BatchResult, tsdbQuery *TsdbQuery) {
|
|
||||||
executor, err := getTsdbQueryEndpointFor(bg.Queries[0].DataSource)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
bg.Done = true
|
|
||||||
result := &BatchResult{
|
|
||||||
Error: err,
|
|
||||||
QueryResults: make(map[string]*QueryResult),
|
|
||||||
}
|
|
||||||
for _, query := range bg.Queries {
|
|
||||||
result.QueryResults[query.RefId] = &QueryResult{Error: result.Error}
|
|
||||||
}
|
|
||||||
resultChan <- result
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
res := executor.Query(ctx, &TsdbQuery{
|
|
||||||
Queries: bg.Queries,
|
|
||||||
TimeRange: tsdbQuery.TimeRange,
|
|
||||||
})
|
|
||||||
bg.Done = true
|
|
||||||
resultChan <- res
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bg *Batch) addQuery(query *Query) {
|
|
||||||
bg.Queries = append(bg.Queries, query)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bg *Batch) allDependenciesAreIn(res *Response) bool {
|
|
||||||
for key := range bg.Depends {
|
|
||||||
if _, exists := res.Results[key]; !exists {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func getBatches(req *TsdbQuery) (BatchSlice, error) {
|
|
||||||
batches := make(BatchSlice, 0)
|
|
||||||
|
|
||||||
for _, query := range req.Queries {
|
|
||||||
if foundBatch := findMatchingBatchGroup(query, batches); foundBatch != nil {
|
|
||||||
foundBatch.addQuery(query)
|
|
||||||
} else {
|
|
||||||
newBatch := newBatch(query.DataSource.Id, []*Query{query})
|
|
||||||
batches = append(batches, newBatch)
|
|
||||||
|
|
||||||
for _, refId := range query.Depends {
|
|
||||||
for _, batch := range batches {
|
|
||||||
for _, batchQuery := range batch.Queries {
|
|
||||||
if batchQuery.RefId == refId {
|
|
||||||
newBatch.Depends[refId] = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return batches, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func findMatchingBatchGroup(query *Query, batches BatchSlice) *Batch {
|
|
||||||
for _, batch := range batches {
|
|
||||||
if batch.DataSourceId == query.DataSource.Id {
|
|
||||||
return batch
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
@@ -7,63 +7,20 @@ import (
|
|||||||
type HandleRequestFunc func(ctx context.Context, req *TsdbQuery) (*Response, error)
|
type HandleRequestFunc func(ctx context.Context, req *TsdbQuery) (*Response, error)
|
||||||
|
|
||||||
func HandleRequest(ctx context.Context, req *TsdbQuery) (*Response, error) {
|
func HandleRequest(ctx context.Context, req *TsdbQuery) (*Response, error) {
|
||||||
tsdbQuery := &TsdbQuery{
|
//TODO niceify
|
||||||
Queries: req.Queries,
|
endpoint, err := getTsdbQueryEndpointFor(req.Queries[0].DataSource)
|
||||||
TimeRange: req.TimeRange,
|
|
||||||
}
|
|
||||||
|
|
||||||
batches, err := getBatches(req)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
currentlyExecuting := 0
|
res := endpoint.Query(ctx, req)
|
||||||
resultsChan := make(chan *BatchResult)
|
|
||||||
|
|
||||||
for _, batch := range batches {
|
if res.Error != nil {
|
||||||
if len(batch.Depends) == 0 {
|
return nil, res.Error
|
||||||
currentlyExecuting += 1
|
|
||||||
batch.Started = true
|
|
||||||
go batch.process(ctx, resultsChan, tsdbQuery)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
response := &Response{
|
return &Response{
|
||||||
Results: make(map[string]*QueryResult),
|
Results: res.QueryResults,
|
||||||
}
|
BatchTimings: []*BatchTiming{res.Timings},
|
||||||
|
}, nil
|
||||||
for currentlyExecuting != 0 {
|
|
||||||
select {
|
|
||||||
case batchResult := <-resultsChan:
|
|
||||||
currentlyExecuting -= 1
|
|
||||||
|
|
||||||
response.BatchTimings = append(response.BatchTimings, batchResult.Timings)
|
|
||||||
|
|
||||||
if batchResult.Error != nil {
|
|
||||||
return nil, batchResult.Error
|
|
||||||
}
|
|
||||||
|
|
||||||
for refId, result := range batchResult.QueryResults {
|
|
||||||
response.Results[refId] = result
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, batch := range batches {
|
|
||||||
// not interested in started batches
|
|
||||||
if batch.Started {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if batch.allDependenciesAreIn(response) {
|
|
||||||
currentlyExecuting += 1
|
|
||||||
batch.Started = true
|
|
||||||
go batch.process(ctx, resultsChan, tsdbQuery)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case <-ctx.Done():
|
|
||||||
return nil, ctx.Err()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//response.Results = tsdbQuery.Results
|
|
||||||
return response, nil
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,57 +3,12 @@ package tsdb
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/models"
|
"github.com/grafana/grafana/pkg/models"
|
||||||
. "github.com/smartystreets/goconvey/convey"
|
. "github.com/smartystreets/goconvey/convey"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestMetricQuery(t *testing.T) {
|
func TestMetricQuery(t *testing.T) {
|
||||||
|
|
||||||
Convey("When batches groups for query", t, func() {
|
|
||||||
|
|
||||||
Convey("Given 3 queries for 2 data sources", func() {
|
|
||||||
request := &TsdbQuery{
|
|
||||||
Queries: []*Query{
|
|
||||||
{RefId: "A", DataSource: &models.DataSource{Id: 1}},
|
|
||||||
{RefId: "B", DataSource: &models.DataSource{Id: 1}},
|
|
||||||
{RefId: "C", DataSource: &models.DataSource{Id: 2}},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
batches, err := getBatches(request)
|
|
||||||
So(err, ShouldBeNil)
|
|
||||||
|
|
||||||
Convey("Should group into two batches", func() {
|
|
||||||
So(len(batches), ShouldEqual, 2)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
Convey("Given query 2 depends on query 1", func() {
|
|
||||||
request := &TsdbQuery{
|
|
||||||
Queries: []*Query{
|
|
||||||
{RefId: "A", DataSource: &models.DataSource{Id: 1}},
|
|
||||||
{RefId: "B", DataSource: &models.DataSource{Id: 2}},
|
|
||||||
{RefId: "C", DataSource: &models.DataSource{Id: 3}, Depends: []string{"A", "B"}},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
batches, err := getBatches(request)
|
|
||||||
So(err, ShouldBeNil)
|
|
||||||
|
|
||||||
Convey("Should return three batch groups", func() {
|
|
||||||
So(len(batches), ShouldEqual, 3)
|
|
||||||
})
|
|
||||||
|
|
||||||
Convey("Group 3 should have group 1 and 2 as dependencies", func() {
|
|
||||||
So(batches[2].Depends["A"], ShouldEqual, true)
|
|
||||||
So(batches[2].Depends["B"], ShouldEqual, true)
|
|
||||||
})
|
|
||||||
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
Convey("When executing request with one query", t, func() {
|
Convey("When executing request with one query", t, func() {
|
||||||
req := &TsdbQuery{
|
req := &TsdbQuery{
|
||||||
Queries: []*Query{
|
Queries: []*Query{
|
||||||
@@ -99,23 +54,6 @@ func TestMetricQuery(t *testing.T) {
|
|||||||
|
|
||||||
})
|
})
|
||||||
|
|
||||||
Convey("When executing one request with three queries from different datasources", t, func() {
|
|
||||||
req := &TsdbQuery{
|
|
||||||
Queries: []*Query{
|
|
||||||
{RefId: "A", DataSource: &models.DataSource{Id: 1, Type: "test"}},
|
|
||||||
{RefId: "B", DataSource: &models.DataSource{Id: 1, Type: "test"}},
|
|
||||||
{RefId: "C", DataSource: &models.DataSource{Id: 2, Type: "test"}},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
res, err := HandleRequest(context.TODO(), req)
|
|
||||||
So(err, ShouldBeNil)
|
|
||||||
|
|
||||||
Convey("Should have been batched in two requests", func() {
|
|
||||||
So(len(res.BatchTimings), ShouldEqual, 2)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
Convey("When query uses data source of unknown type", t, func() {
|
Convey("When query uses data source of unknown type", t, func() {
|
||||||
req := &TsdbQuery{
|
req := &TsdbQuery{
|
||||||
Queries: []*Query{
|
Queries: []*Query{
|
||||||
@@ -126,45 +64,6 @@ func TestMetricQuery(t *testing.T) {
|
|||||||
_, err := HandleRequest(context.TODO(), req)
|
_, err := HandleRequest(context.TODO(), req)
|
||||||
So(err, ShouldNotBeNil)
|
So(err, ShouldNotBeNil)
|
||||||
})
|
})
|
||||||
|
|
||||||
Convey("When executing request that depend on other query", t, func() {
|
|
||||||
req := &TsdbQuery{
|
|
||||||
Queries: []*Query{
|
|
||||||
{
|
|
||||||
RefId: "A", DataSource: &models.DataSource{Id: 1, Type: "test"},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
RefId: "B", DataSource: &models.DataSource{Id: 2, Type: "test"}, Depends: []string{"A"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
fakeExecutor := registerFakeExecutor()
|
|
||||||
fakeExecutor.HandleQuery("A", func(c *TsdbQuery) *QueryResult {
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
return &QueryResult{
|
|
||||||
Series: TimeSeriesSlice{
|
|
||||||
&TimeSeries{Name: "Ares"},
|
|
||||||
}}
|
|
||||||
})
|
|
||||||
fakeExecutor.HandleQuery("B", func(c *TsdbQuery) *QueryResult {
|
|
||||||
return &QueryResult{
|
|
||||||
Series: TimeSeriesSlice{
|
|
||||||
&TimeSeries{Name: "Bres+Ares"},
|
|
||||||
}}
|
|
||||||
})
|
|
||||||
|
|
||||||
res, err := HandleRequest(context.TODO(), req)
|
|
||||||
So(err, ShouldBeNil)
|
|
||||||
|
|
||||||
Convey("Should have been batched in two requests", func() {
|
|
||||||
So(len(res.BatchTimings), ShouldEqual, 2)
|
|
||||||
})
|
|
||||||
|
|
||||||
Convey("Query B should have access to Query A results", func() {
|
|
||||||
So(res.Results["B"].Series[0].Name, ShouldEqual, "Bres+Ares")
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func registerFakeExecutor() *FakeExecutor {
|
func registerFakeExecutor() *FakeExecutor {
|
||||||
|
|||||||
Reference in New Issue
Block a user