elasticsearch: minor refactor

Handle all replacements if interval template variables in the client.
Fix issue with client and different versions.
Adds better tests of the client
This commit is contained in:
Marcus Efraimsson 2018-05-31 19:02:20 +02:00
parent 28f0acd854
commit dcac63936b
No known key found for this signature in database
GPG Key ID: EBFE0FB04612DD4A
7 changed files with 252 additions and 195 deletions

View File

@ -8,6 +8,7 @@ import (
"net/http"
"net/url"
"path"
"strconv"
"strings"
"time"
@ -25,6 +26,10 @@ var (
clientLog = log.New(loggerName)
)
var newDatasourceHttpClient = func(ds *models.DataSource) (*http.Client, error) {
return ds.GetHttpClient()
}
// Client represents a client which can interact with elasticsearch api
type Client interface {
GetVersion() int
@ -57,23 +62,18 @@ var NewClient = func(ctx context.Context, ds *models.DataSource, timeRange *tsdb
return nil, err
}
bc := &baseClientImpl{
ctx: ctx,
ds: ds,
version: version,
timeField: timeField,
indices: indices,
}
clientLog.Debug("Creating new client", "version", version, "timeField", timeField, "indices", strings.Join(indices, ", "))
switch version {
case 2:
return newV2Client(bc)
case 5:
return newV5Client(bc)
case 56:
return newV56Client(bc)
case 2, 5, 56:
return &baseClientImpl{
ctx: ctx,
ds: ds,
version: version,
timeField: timeField,
indices: indices,
timeRange: timeRange,
}, nil
}
return nil, fmt.Errorf("elasticsearch version=%d is not supported", version)
@ -93,6 +93,7 @@ type baseClientImpl struct {
version int
timeField string
indices []string
timeRange *tsdb.TimeRange
}
func (c *baseClientImpl) GetVersion() int {
@ -114,11 +115,20 @@ func (c *baseClientImpl) getSettings() *simplejson.Json {
}
type multiRequest struct {
header map[string]interface{}
body interface{}
header map[string]interface{}
body interface{}
interval tsdb.Interval
}
func (c *baseClientImpl) executeBatchRequest(uriPath string, requests []*multiRequest) (*http.Response, error) {
bytes, err := c.encodeBatchRequests(requests)
if err != nil {
return nil, err
}
return c.executeRequest(http.MethodPost, uriPath, bytes)
}
func (c *baseClientImpl) encodeBatchRequests(requests []*multiRequest) ([]byte, error) {
clientLog.Debug("Encoding batch requests to json", "batch requests", len(requests))
start := time.Now()
@ -134,13 +144,18 @@ func (c *baseClientImpl) executeBatchRequest(uriPath string, requests []*multiRe
if err != nil {
return nil, err
}
payload.WriteString(string(reqBody) + "\n")
body := string(reqBody)
body = strings.Replace(body, "$__interval_ms", strconv.FormatInt(r.interval.Value.Nanoseconds()/int64(time.Millisecond), 10), -1)
body = strings.Replace(body, "$__interval", r.interval.Text, -1)
payload.WriteString(body + "\n")
}
elapsed := time.Now().Sub(start)
clientLog.Debug("Encoded batch requests to json", "took", elapsed)
return c.executeRequest(http.MethodPost, uriPath, payload.Bytes())
return payload.Bytes(), nil
}
func (c *baseClientImpl) executeRequest(method, uriPath string, body []byte) (*http.Response, error) {
@ -173,7 +188,7 @@ func (c *baseClientImpl) executeRequest(method, uriPath string, body []byte) (*h
req.SetBasicAuth(c.ds.User, c.ds.Password)
}
httpClient, err := c.ds.GetHttpClient()
httpClient, err := newDatasourceHttpClient(c.ds)
if err != nil {
return nil, err
}
@ -220,77 +235,26 @@ func (c *baseClientImpl) createMultiSearchRequests(searchRequests []*SearchReque
multiRequests := []*multiRequest{}
for _, searchReq := range searchRequests {
multiRequests = append(multiRequests, &multiRequest{
mr := multiRequest{
header: map[string]interface{}{
"search_type": "query_then_fetch",
"ignore_unavailable": true,
"index": strings.Join(c.indices, ","),
},
body: searchReq,
})
}
body: searchReq,
interval: searchReq.Interval,
}
return multiRequests
}
if c.version == 2 {
mr.header["search_type"] = "count"
}
type v2Client struct {
baseClient
}
if c.version >= 56 {
maxConcurrentShardRequests := c.getSettings().Get("maxConcurrentShardRequests").MustInt(256)
mr.header["max_concurrent_shard_requests"] = maxConcurrentShardRequests
}
func newV2Client(bc baseClient) (*v2Client, error) {
c := v2Client{
baseClient: bc,
}
return &c, nil
}
func (c *v2Client) createMultiSearchRequests(searchRequests []*SearchRequest) []*multiRequest {
multiRequests := c.baseClient.createMultiSearchRequests(searchRequests)
for _, mr := range multiRequests {
mr.header["search_type"] = "count"
}
return multiRequests
}
type v5Client struct {
baseClient
}
func newV5Client(bc baseClient) (*v5Client, error) {
c := v5Client{
baseClient: bc,
}
return &c, nil
}
type v56Client struct {
*v5Client
maxConcurrentShardRequests int
}
func newV56Client(bc baseClient) (*v56Client, error) {
v5Client := v5Client{
baseClient: bc,
}
maxConcurrentShardRequests := bc.getSettings().Get("maxConcurrentShardRequests").MustInt(256)
c := v56Client{
v5Client: &v5Client,
maxConcurrentShardRequests: maxConcurrentShardRequests,
}
return &c, nil
}
func (c *v56Client) createMultiSearchRequests(searchRequests []*SearchRequest) []*multiRequest {
multiRequests := c.v5Client.createMultiSearchRequests(searchRequests)
for _, mr := range multiRequests {
mr.header["max_concurrent_shard_requests"] = c.maxConcurrentShardRequests
multiRequests = append(multiRequests, &mr)
}
return multiRequests

View File

@ -1,10 +1,17 @@
package es
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/tsdb"
"github.com/grafana/grafana/pkg/models"
. "github.com/smartystreets/goconvey/convey"
@ -85,131 +92,213 @@ func TestClient(t *testing.T) {
})
})
Convey("v2", func() {
ds := &models.DataSource{
JsonData: simplejson.NewFromAny(map[string]interface{}{
"esVersion": 2,
}),
Convey("Given a fake http client", func() {
var responseBuffer *bytes.Buffer
var req *http.Request
ts := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
req = r
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
t.Fatalf("Failed to read response body, err=%v", err)
}
responseBuffer = bytes.NewBuffer(buf)
}))
currentNewDatasourceHttpClient := newDatasourceHttpClient
newDatasourceHttpClient = func(ds *models.DataSource) (*http.Client, error) {
return ts.Client(), nil
}
c, err := newV2Client(newFakeBaseClient(ds, []string{"test-*"}))
So(err, ShouldBeNil)
So(c, ShouldNotBeNil)
from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC)
to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC)
fromStr := fmt.Sprintf("%d", from.UnixNano()/int64(time.Millisecond))
toStr := fmt.Sprintf("%d", to.UnixNano()/int64(time.Millisecond))
timeRange := tsdb.NewTimeRange(fromStr, toStr)
Convey("When creating multisearch requests should have correct headers", func() {
multiRequests := c.createMultiSearchRequests([]*SearchRequest{
{Index: "test-*"},
})
So(multiRequests, ShouldHaveLength, 1)
header := multiRequests[0].header
So(header, ShouldHaveLength, 3)
So(header["index"], ShouldEqual, "test-*")
So(header["ignore_unavailable"], ShouldEqual, true)
So(header["search_type"], ShouldEqual, "count")
})
})
Convey("v5", func() {
ds := &models.DataSource{
JsonData: simplejson.NewFromAny(map[string]interface{}{
"esVersion": 5,
}),
}
c, err := newV5Client(newFakeBaseClient(ds, []string{"test-*"}))
So(err, ShouldBeNil)
So(c, ShouldNotBeNil)
Convey("When creating multisearch requests should have correct headers", func() {
multiRequests := c.createMultiSearchRequests([]*SearchRequest{
{Index: "test-*"},
})
So(multiRequests, ShouldHaveLength, 1)
header := multiRequests[0].header
So(header, ShouldHaveLength, 3)
So(header["index"], ShouldEqual, "test-*")
So(header["ignore_unavailable"], ShouldEqual, true)
So(header["search_type"], ShouldEqual, "query_then_fetch")
})
})
Convey("v5.6", func() {
Convey("With default settings", func() {
Convey("and a v2.x client", func() {
ds := models.DataSource{
Database: "[metrics-]YYYY.MM.DD",
Url: ts.URL,
JsonData: simplejson.NewFromAny(map[string]interface{}{
"esVersion": 56,
"esVersion": 2,
"timeField": "@timestamp",
"interval": "Daily",
}),
}
c, err := newV56Client(newFakeBaseClient(&ds, []string{"test-*"}))
c, err := NewClient(context.Background(), &ds, timeRange)
So(err, ShouldBeNil)
So(c, ShouldNotBeNil)
Convey("When creating multisearch requests should have correct headers", func() {
multiRequests := c.createMultiSearchRequests([]*SearchRequest{
{Index: "test-*"},
Convey("When executing multi search", func() {
ms, err := createMultisearchForTest(c)
So(err, ShouldBeNil)
c.ExecuteMultisearch(ms)
Convey("Should send correct request and payload", func() {
So(req, ShouldNotBeNil)
So(req.Method, ShouldEqual, http.MethodPost)
So(req.URL.Path, ShouldEqual, "/_msearch")
So(responseBuffer, ShouldNotBeNil)
headerBytes, err := responseBuffer.ReadBytes('\n')
So(err, ShouldBeNil)
bodyBytes := responseBuffer.Bytes()
jHeader, err := simplejson.NewJson(headerBytes)
So(err, ShouldBeNil)
jBody, err := simplejson.NewJson(bodyBytes)
So(err, ShouldBeNil)
fmt.Println("body", string(headerBytes))
So(jHeader.Get("index").MustString(), ShouldEqual, "metrics-2018.05.15")
So(jHeader.Get("ignore_unavailable").MustBool(false), ShouldEqual, true)
So(jHeader.Get("search_type").MustString(), ShouldEqual, "count")
So(jHeader.Get("max_concurrent_shard_requests").MustInt(10), ShouldEqual, 10)
Convey("and replace $__interval variable", func() {
So(jBody.GetPath("aggs", "2", "aggs", "1", "avg", "script").MustString(), ShouldEqual, "15000*@hostname")
})
Convey("and replace $__interval_ms variable", func() {
So(jBody.GetPath("aggs", "2", "date_histogram", "interval").MustString(), ShouldEqual, "15s")
})
})
So(multiRequests, ShouldHaveLength, 1)
header := multiRequests[0].header
So(header, ShouldHaveLength, 4)
So(header["index"], ShouldEqual, "test-*")
So(header["ignore_unavailable"], ShouldEqual, true)
So(header["search_type"], ShouldEqual, "query_then_fetch")
So(header["max_concurrent_shard_requests"], ShouldEqual, 256)
})
})
Convey("With custom settings", func() {
Convey("and a v5.x client", func() {
ds := models.DataSource{
Database: "[metrics-]YYYY.MM.DD",
Url: ts.URL,
JsonData: simplejson.NewFromAny(map[string]interface{}{
"esVersion": 5,
"maxConcurrentShardRequests": 100,
"timeField": "@timestamp",
"interval": "Daily",
}),
}
c, err := NewClient(context.Background(), &ds, timeRange)
So(err, ShouldBeNil)
So(c, ShouldNotBeNil)
Convey("When executing multi search", func() {
ms, err := createMultisearchForTest(c)
So(err, ShouldBeNil)
c.ExecuteMultisearch(ms)
Convey("Should send correct request and payload", func() {
So(req, ShouldNotBeNil)
So(req.Method, ShouldEqual, http.MethodPost)
So(req.URL.Path, ShouldEqual, "/_msearch")
So(responseBuffer, ShouldNotBeNil)
headerBytes, err := responseBuffer.ReadBytes('\n')
So(err, ShouldBeNil)
bodyBytes := responseBuffer.Bytes()
jHeader, err := simplejson.NewJson(headerBytes)
So(err, ShouldBeNil)
jBody, err := simplejson.NewJson(bodyBytes)
So(err, ShouldBeNil)
fmt.Println("body", string(headerBytes))
So(jHeader.Get("index").MustString(), ShouldEqual, "metrics-2018.05.15")
So(jHeader.Get("ignore_unavailable").MustBool(false), ShouldEqual, true)
So(jHeader.Get("search_type").MustString(), ShouldEqual, "query_then_fetch")
So(jHeader.Get("max_concurrent_shard_requests").MustInt(10), ShouldEqual, 10)
Convey("and replace $__interval variable", func() {
So(jBody.GetPath("aggs", "2", "aggs", "1", "avg", "script").MustString(), ShouldEqual, "15000*@hostname")
})
Convey("and replace $__interval_ms variable", func() {
So(jBody.GetPath("aggs", "2", "date_histogram", "interval").MustString(), ShouldEqual, "15s")
})
})
})
})
Convey("and a v5.6 client", func() {
ds := models.DataSource{
Database: "[metrics-]YYYY.MM.DD",
Url: ts.URL,
JsonData: simplejson.NewFromAny(map[string]interface{}{
"esVersion": 56,
"maxConcurrentShardRequests": 100,
"timeField": "@timestamp",
"interval": "Daily",
}),
}
c, err := newV56Client(newFakeBaseClient(&ds, []string{"test-*"}))
c, err := NewClient(context.Background(), &ds, timeRange)
So(err, ShouldBeNil)
So(c, ShouldNotBeNil)
Convey("When creating multisearch requests should have correct headers", func() {
multiRequests := c.createMultiSearchRequests([]*SearchRequest{
{Index: "test-*"},
Convey("When executing multi search", func() {
ms, err := createMultisearchForTest(c)
So(err, ShouldBeNil)
c.ExecuteMultisearch(ms)
Convey("Should send correct request and payload", func() {
So(req, ShouldNotBeNil)
So(req.Method, ShouldEqual, http.MethodPost)
So(req.URL.Path, ShouldEqual, "/_msearch")
So(responseBuffer, ShouldNotBeNil)
headerBytes, err := responseBuffer.ReadBytes('\n')
So(err, ShouldBeNil)
bodyBytes := responseBuffer.Bytes()
jHeader, err := simplejson.NewJson(headerBytes)
So(err, ShouldBeNil)
jBody, err := simplejson.NewJson(bodyBytes)
So(err, ShouldBeNil)
fmt.Println("body", string(headerBytes))
So(jHeader.Get("index").MustString(), ShouldEqual, "metrics-2018.05.15")
So(jHeader.Get("ignore_unavailable").MustBool(false), ShouldEqual, true)
So(jHeader.Get("search_type").MustString(), ShouldEqual, "query_then_fetch")
So(jHeader.Get("max_concurrent_shard_requests").MustInt(), ShouldEqual, 100)
Convey("and replace $__interval variable", func() {
So(jBody.GetPath("aggs", "2", "aggs", "1", "avg", "script").MustString(), ShouldEqual, "15000*@hostname")
})
Convey("and replace $__interval_ms variable", func() {
So(jBody.GetPath("aggs", "2", "date_histogram", "interval").MustString(), ShouldEqual, "15s")
})
})
So(multiRequests, ShouldHaveLength, 1)
header := multiRequests[0].header
So(header, ShouldHaveLength, 4)
So(header["index"], ShouldEqual, "test-*")
So(header["ignore_unavailable"], ShouldEqual, true)
So(header["search_type"], ShouldEqual, "query_then_fetch")
So(header["max_concurrent_shard_requests"], ShouldEqual, 100)
})
})
Reset(func() {
newDatasourceHttpClient = currentNewDatasourceHttpClient
})
})
})
}
type fakeBaseClient struct {
*baseClientImpl
ds *models.DataSource
}
func createMultisearchForTest(c Client) (*MultiSearchRequest, error) {
msb := c.MultiSearch()
s := msb.Search(tsdb.Interval{Value: 15 * time.Second, Text: "15s"})
s.Agg().DateHistogram("2", "@timestamp", func(a *DateHistogramAgg, ab AggBuilder) {
a.Interval = "$__interval"
func newFakeBaseClient(ds *models.DataSource, indices []string) baseClient {
return &fakeBaseClient{
baseClientImpl: &baseClientImpl{
ds: ds,
indices: indices,
},
ds: ds,
}
}
func (c *fakeBaseClient) executeBatchRequest(uriPath string, requests []*multiRequest) (*http.Response, error) {
return nil, nil
}
func (c *fakeBaseClient) executeRequest(method, uriPath string, body []byte) (*http.Response, error) {
return nil, nil
}
func (c *fakeBaseClient) executeMultisearch(searchRequests []*SearchRequest) ([]*SearchResponse, error) {
return nil, nil
ab.Metric("1", "avg", "@hostname", func(a *MetricAggregation) {
a.Settings["script"] = "$__interval_ms*@hostname"
})
})
return msb.Build()
}

View File

@ -2,11 +2,14 @@ package es
import (
"encoding/json"
"github.com/grafana/grafana/pkg/tsdb"
)
// SearchRequest represents a search request
type SearchRequest struct {
Index string
Interval tsdb.Interval
Size int
Sort map[string]interface{}
Query *Query

View File

@ -2,11 +2,14 @@ package es
import (
"strings"
"github.com/grafana/grafana/pkg/tsdb"
)
// SearchRequestBuilder represents a builder which can build a search request
type SearchRequestBuilder struct {
version int
interval tsdb.Interval
index string
size int
sort map[string]interface{}
@ -16,9 +19,10 @@ type SearchRequestBuilder struct {
}
// NewSearchRequestBuilder create a new search request builder
func NewSearchRequestBuilder(version int) *SearchRequestBuilder {
func NewSearchRequestBuilder(version int, interval tsdb.Interval) *SearchRequestBuilder {
builder := &SearchRequestBuilder{
version: version,
interval: interval,
sort: make(map[string]interface{}),
customProps: make(map[string]interface{}),
aggBuilders: make([]AggBuilder, 0),
@ -30,6 +34,7 @@ func NewSearchRequestBuilder(version int) *SearchRequestBuilder {
func (b *SearchRequestBuilder) Build() (*SearchRequest, error) {
sr := SearchRequest{
Index: b.index,
Interval: b.interval,
Size: b.size,
Sort: b.sort,
CustomProps: b.customProps,
@ -128,8 +133,8 @@ func NewMultiSearchRequestBuilder(version int) *MultiSearchRequestBuilder {
}
// Search initiates and returns a new search request builder
func (m *MultiSearchRequestBuilder) Search() *SearchRequestBuilder {
b := NewSearchRequestBuilder(m.version)
func (m *MultiSearchRequestBuilder) Search(interval tsdb.Interval) *SearchRequestBuilder {
b := NewSearchRequestBuilder(m.version, interval)
m.requestBuilders = append(m.requestBuilders, b)
return b
}

View File

@ -3,8 +3,10 @@ package es
import (
"encoding/json"
"testing"
"time"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/tsdb"
. "github.com/smartystreets/goconvey/convey"
)
@ -13,7 +15,7 @@ func TestSearchRequest(t *testing.T) {
Convey("Test elasticsearch search request", t, func() {
timeField := "@timestamp"
Convey("Given new search request builder for es version 5", func() {
b := NewSearchRequestBuilder(5)
b := NewSearchRequestBuilder(5, tsdb.Interval{Value: 15 * time.Second, Text: "15s"})
Convey("When building search request", func() {
sr, err := b.Build()
@ -388,7 +390,7 @@ func TestSearchRequest(t *testing.T) {
})
Convey("Given new search request builder for es version 2", func() {
b := NewSearchRequestBuilder(2)
b := NewSearchRequestBuilder(2, tsdb.Interval{Value: 15 * time.Second, Text: "15s"})
Convey("When adding doc value field", func() {
b.AddDocValueField(timeField)
@ -447,7 +449,7 @@ func TestMultiSearchRequest(t *testing.T) {
b := NewMultiSearchRequestBuilder(0)
Convey("When adding one search request", func() {
b.Search()
b.Search(tsdb.Interval{Value: 15 * time.Second, Text: "15s"})
Convey("When building search request should contain one search request", func() {
mr, err := b.Build()
@ -457,8 +459,8 @@ func TestMultiSearchRequest(t *testing.T) {
})
Convey("When adding two search requests", func() {
b.Search()
b.Search()
b.Search(tsdb.Interval{Value: 15 * time.Second, Text: "15s"})
b.Search(tsdb.Interval{Value: 15 * time.Second, Text: "15s"})
Convey("When building search request should contain two search requests", func() {
mr, err := b.Build()

View File

@ -3,8 +3,6 @@ package elasticsearch
import (
"fmt"
"strconv"
"strings"
"time"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/tsdb"
@ -47,7 +45,7 @@ func (e *timeSeriesQuery) execute() (*tsdb.Response, error) {
}
interval := e.intervalCalculator.Calculate(e.tsdbQuery.TimeRange, minInterval)
b := ms.Search()
b := ms.Search(interval)
b.Size(0)
filters := b.Query().Bool().Filter()
filters.AddDateRangeFilter(e.client.GetTimeField(), to, from, es.DateFormatEpochMS)
@ -78,7 +76,7 @@ func (e *timeSeriesQuery) execute() (*tsdb.Response, error) {
for _, bucketAgg := range q.BucketAggs {
switch bucketAgg.Type {
case "date_histogram":
aggBuilder = addDateHistogramAgg(aggBuilder, bucketAgg, from, to, interval)
aggBuilder = addDateHistogramAgg(aggBuilder, bucketAgg, from, to)
case "histogram":
aggBuilder = addHistogramAgg(aggBuilder, bucketAgg)
case "filters":
@ -125,7 +123,7 @@ func (e *timeSeriesQuery) execute() (*tsdb.Response, error) {
return rp.getTimeSeries()
}
func addDateHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, timeFrom, timeTo string, interval tsdb.Interval) es.AggBuilder {
func addDateHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, timeFrom, timeTo string) es.AggBuilder {
aggBuilder.DateHistogram(bucketAgg.ID, bucketAgg.Field, func(a *es.DateHistogramAgg, b es.AggBuilder) {
a.Interval = bucketAgg.Settings.Get("interval").MustString("auto")
a.MinDocCount = bucketAgg.Settings.Get("min_doc_count").MustInt(0)
@ -136,10 +134,6 @@ func addDateHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, timeFro
a.Interval = "$__interval"
}
a.Interval = strings.Replace(a.Interval, "$interval", interval.Text, -1)
a.Interval = strings.Replace(a.Interval, "$__interval_ms", strconv.FormatInt(interval.Value.Nanoseconds()/int64(time.Millisecond), 10), -1)
a.Interval = strings.Replace(a.Interval, "$__interval", interval.Text, -1)
if missing, err := bucketAgg.Settings.Get("missing").String(); err == nil {
a.Missing = &missing
}

View File

@ -268,7 +268,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
So(firstLevel.Aggregation.Type, ShouldEqual, "date_histogram")
hAgg := firstLevel.Aggregation.Aggregation.(*es.DateHistogramAgg)
So(hAgg.Field, ShouldEqual, "@timestamp")
So(hAgg.Interval, ShouldEqual, "15s")
So(hAgg.Interval, ShouldEqual, "$__interval")
So(hAgg.MinDocCount, ShouldEqual, 2)
})