From e171ed89102ff136f48488f38c98e71e03a77813 Mon Sep 17 00:00:00 2001 From: Marcus Efraimsson Date: Wed, 23 May 2018 14:59:12 +0200 Subject: [PATCH] elasticsearch: new simple client for communicating with elasticsearch Handles minor differences of es 2, 5 and 5.6. Implements index pattern logic. Exposes builders for building search requests. --- pkg/tsdb/elasticsearch/client/client.go | 286 +++++++++++ pkg/tsdb/elasticsearch/client/client_test.go | 215 ++++++++ .../elasticsearch/client/index_pattern.go | 312 ++++++++++++ .../client/index_pattern_test.go | 244 +++++++++ pkg/tsdb/elasticsearch/client/models.go | 304 +++++++++++ .../elasticsearch/client/search_request.go | 446 +++++++++++++++++ .../client/search_request_test.go | 471 ++++++++++++++++++ 7 files changed, 2278 insertions(+) create mode 100644 pkg/tsdb/elasticsearch/client/client.go create mode 100644 pkg/tsdb/elasticsearch/client/client_test.go create mode 100644 pkg/tsdb/elasticsearch/client/index_pattern.go create mode 100644 pkg/tsdb/elasticsearch/client/index_pattern_test.go create mode 100644 pkg/tsdb/elasticsearch/client/models.go create mode 100644 pkg/tsdb/elasticsearch/client/search_request.go create mode 100644 pkg/tsdb/elasticsearch/client/search_request_test.go diff --git a/pkg/tsdb/elasticsearch/client/client.go b/pkg/tsdb/elasticsearch/client/client.go new file mode 100644 index 00000000000..3dae343bd37 --- /dev/null +++ b/pkg/tsdb/elasticsearch/client/client.go @@ -0,0 +1,286 @@ +package es + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "net/url" + "path" + "strings" + "time" + + "github.com/grafana/grafana/pkg/components/simplejson" + "github.com/grafana/grafana/pkg/log" + "github.com/grafana/grafana/pkg/tsdb" + + "github.com/grafana/grafana/pkg/models" + "golang.org/x/net/context/ctxhttp" +) + +const loggerName = "tsdb.elasticsearch.client" + +var ( + clientLog = log.New(loggerName) + intervalCalculator = tsdb.NewIntervalCalculator(&tsdb.IntervalOptions{MinInterval: 15 * time.Second}) +) + +// Client represents a client which can interact with elasticsearch api +type Client interface { + GetVersion() int + GetTimeField() string + GetMinInterval(queryInterval string) (time.Duration, error) + ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearchResponse, error) + MultiSearch() *MultiSearchRequestBuilder +} + +// NewClient creates a new elasticsearch client +var NewClient = func(ctx context.Context, ds *models.DataSource, timeRange *tsdb.TimeRange) (Client, error) { + version, err := ds.JsonData.Get("esVersion").Int() + if err != nil { + return nil, fmt.Errorf("eleasticsearch version is required, err=%v", err) + } + + timeField, err := ds.JsonData.Get("timeField").String() + if err != nil { + return nil, fmt.Errorf("eleasticsearch time field name is required, err=%v", err) + } + + indexInterval := ds.JsonData.Get("interval").MustString() + ip, err := newIndexPattern(indexInterval, ds.Database) + if err != nil { + return nil, err + } + + indices, err := ip.GetIndices(timeRange) + if err != nil { + return nil, err + } + + bc := &baseClientImpl{ + ctx: ctx, + ds: ds, + version: version, + timeField: timeField, + indices: indices, + } + + clientLog.Debug("Creating new client", "version", version, "timeField", timeField, "indices", strings.Join(indices, ", ")) + + switch version { + case 2: + return newV2Client(bc) + case 5: + return newV5Client(bc) + case 56: + return newV56Client(bc) + } + + return nil, fmt.Errorf("elasticsearch version=%d is not supported", version) +} + +type baseClient interface { + Client + getSettings() *simplejson.Json + executeBatchRequest(uriPath string, requests []*multiRequest) (*http.Response, error) + executeRequest(method, uriPath string, body []byte) (*http.Response, error) + createMultiSearchRequests(searchRequests []*SearchRequest) []*multiRequest +} + +type baseClientImpl struct { + ctx context.Context + ds *models.DataSource + version int + timeField string + indices []string +} + +func (c *baseClientImpl) GetVersion() int { + return c.version +} + +func (c *baseClientImpl) GetTimeField() string { + return c.timeField +} + +func (c *baseClientImpl) GetMinInterval(queryInterval string) (time.Duration, error) { + return tsdb.GetIntervalFrom(c.ds, simplejson.NewFromAny(map[string]string{ + "interval": queryInterval, + }), 15*time.Second) +} + +func (c *baseClientImpl) getSettings() *simplejson.Json { + return c.ds.JsonData +} + +type multiRequest struct { + header map[string]interface{} + body interface{} +} + +func (c *baseClientImpl) executeBatchRequest(uriPath string, requests []*multiRequest) (*http.Response, error) { + payload := bytes.Buffer{} + for _, r := range requests { + reqHeader, err := json.Marshal(r.header) + if err != nil { + return nil, err + } + payload.WriteString(string(reqHeader) + "\n") + + reqBody, err := json.Marshal(r.body) + if err != nil { + return nil, err + } + payload.WriteString(string(reqBody) + "\n") + } + + return c.executeRequest(http.MethodPost, uriPath, payload.Bytes()) +} + +func (c *baseClientImpl) executeRequest(method, uriPath string, body []byte) (*http.Response, error) { + u, _ := url.Parse(c.ds.Url) + u.Path = path.Join(u.Path, uriPath) + + var req *http.Request + var err error + if method == http.MethodPost { + req, err = http.NewRequest(http.MethodPost, u.String(), bytes.NewBuffer(body)) + } else { + req, err = http.NewRequest(http.MethodGet, u.String(), nil) + } + if err != nil { + return nil, err + } + req.Header.Set("User-Agent", "Grafana") + req.Header.Set("Content-Type", "application/json") + + if c.ds.BasicAuth { + clientLog.Debug("Request configured to use basic authentication") + req.SetBasicAuth(c.ds.BasicAuthUser, c.ds.BasicAuthPassword) + } + + if !c.ds.BasicAuth && c.ds.User != "" { + clientLog.Debug("Request configured to use basic authentication") + req.SetBasicAuth(c.ds.User, c.ds.Password) + } + + httpClient, err := c.ds.GetHttpClient() + if err != nil { + return nil, err + } + + if method == http.MethodPost { + clientLog.Debug("Executing request", "url", req.URL.String(), "method", method) + } else { + clientLog.Debug("Executing request", "url", req.URL.String(), "method", method) + } + + return ctxhttp.Do(c.ctx, httpClient, req) +} + +func (c *baseClientImpl) ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearchResponse, error) { + multiRequests := c.createMultiSearchRequests(r.Requests) + res, err := c.executeBatchRequest("_msearch", multiRequests) + if err != nil { + return nil, err + } + + var msr MultiSearchResponse + defer res.Body.Close() + dec := json.NewDecoder(res.Body) + err = dec.Decode(&msr) + if err != nil { + return nil, err + } + + clientLog.Debug("Received multisearch response", "code", res.StatusCode, "status", res.Status, "content-length", res.ContentLength) + + msr.status = res.StatusCode + + return &msr, nil +} + +func (c *baseClientImpl) createMultiSearchRequests(searchRequests []*SearchRequest) []*multiRequest { + multiRequests := []*multiRequest{} + + for _, searchReq := range searchRequests { + multiRequests = append(multiRequests, &multiRequest{ + header: map[string]interface{}{ + "search_type": "query_then_fetch", + "ignore_unavailable": true, + "index": strings.Join(c.indices, ","), + }, + body: searchReq, + }) + } + + return multiRequests +} + +type v2Client struct { + baseClient +} + +func newV2Client(bc baseClient) (*v2Client, error) { + c := v2Client{ + baseClient: bc, + } + + return &c, nil +} + +func (c *v2Client) createMultiSearchRequests(searchRequests []*SearchRequest) []*multiRequest { + multiRequests := c.baseClient.createMultiSearchRequests(searchRequests) + + for _, mr := range multiRequests { + mr.header["search_type"] = "count" + } + + return multiRequests +} + +type v5Client struct { + baseClient +} + +func newV5Client(bc baseClient) (*v5Client, error) { + c := v5Client{ + baseClient: bc, + } + + return &c, nil +} + +type v56Client struct { + *v5Client + maxConcurrentShardRequests int +} + +func newV56Client(bc baseClient) (*v56Client, error) { + v5Client := v5Client{ + baseClient: bc, + } + maxConcurrentShardRequests := bc.getSettings().Get("maxConcurrentShardRequests").MustInt(256) + + c := v56Client{ + v5Client: &v5Client, + maxConcurrentShardRequests: maxConcurrentShardRequests, + } + + return &c, nil +} + +func (c *v56Client) createMultiSearchRequests(searchRequests []*SearchRequest) []*multiRequest { + multiRequests := c.v5Client.createMultiSearchRequests(searchRequests) + + for _, mr := range multiRequests { + mr.header["max_concurrent_shard_requests"] = c.maxConcurrentShardRequests + } + + return multiRequests +} + +func (c *baseClientImpl) MultiSearch() *MultiSearchRequestBuilder { + return NewMultiSearchRequestBuilder(c.GetVersion()) +} diff --git a/pkg/tsdb/elasticsearch/client/client_test.go b/pkg/tsdb/elasticsearch/client/client_test.go new file mode 100644 index 00000000000..d557ceb28b1 --- /dev/null +++ b/pkg/tsdb/elasticsearch/client/client_test.go @@ -0,0 +1,215 @@ +package es + +import ( + "net/http" + "testing" + + "github.com/grafana/grafana/pkg/components/simplejson" + + "github.com/grafana/grafana/pkg/models" + . "github.com/smartystreets/goconvey/convey" +) + +func TestClient(t *testing.T) { + Convey("Test elasticsearch client", t, func() { + Convey("NewClient", func() { + Convey("When no version set should return error", func() { + ds := &models.DataSource{ + JsonData: simplejson.NewFromAny(make(map[string]interface{})), + } + + _, err := NewClient(nil, ds, nil) + So(err, ShouldNotBeNil) + }) + + Convey("When no time field name set should return error", func() { + ds := &models.DataSource{ + JsonData: simplejson.NewFromAny(map[string]interface{}{ + "esVersion": 5, + }), + } + + _, err := NewClient(nil, ds, nil) + So(err, ShouldNotBeNil) + }) + + Convey("When unspported version set should return error", func() { + ds := &models.DataSource{ + JsonData: simplejson.NewFromAny(map[string]interface{}{ + "esVersion": 6, + "timeField": "@timestamp", + }), + } + + _, err := NewClient(nil, ds, nil) + So(err, ShouldNotBeNil) + }) + + Convey("When version 2 should return v2 client", func() { + ds := &models.DataSource{ + JsonData: simplejson.NewFromAny(map[string]interface{}{ + "esVersion": 2, + "timeField": "@timestamp", + }), + } + + c, err := NewClient(nil, ds, nil) + So(err, ShouldBeNil) + So(c.GetVersion(), ShouldEqual, 2) + }) + + Convey("When version 5 should return v5 client", func() { + ds := &models.DataSource{ + JsonData: simplejson.NewFromAny(map[string]interface{}{ + "esVersion": 5, + "timeField": "@timestamp", + }), + } + + c, err := NewClient(nil, ds, nil) + So(err, ShouldBeNil) + So(c.GetVersion(), ShouldEqual, 5) + }) + + Convey("When version 56 should return v5.6 client", func() { + ds := &models.DataSource{ + JsonData: simplejson.NewFromAny(map[string]interface{}{ + "esVersion": 56, + "timeField": "@timestamp", + }), + } + + c, err := NewClient(nil, ds, nil) + So(err, ShouldBeNil) + So(c.GetVersion(), ShouldEqual, 56) + }) + }) + + Convey("v2", func() { + ds := &models.DataSource{ + JsonData: simplejson.NewFromAny(map[string]interface{}{ + "esVersion": 2, + }), + } + + c, err := newV2Client(newFakeBaseClient(ds, []string{"test-*"})) + So(err, ShouldBeNil) + So(c, ShouldNotBeNil) + + Convey("When creating multisearch requests should have correct headers", func() { + multiRequests := c.createMultiSearchRequests([]*SearchRequest{ + {Index: "test-*"}, + }) + So(multiRequests, ShouldHaveLength, 1) + header := multiRequests[0].header + So(header, ShouldHaveLength, 3) + So(header["index"], ShouldEqual, "test-*") + So(header["ignore_unavailable"], ShouldEqual, true) + So(header["search_type"], ShouldEqual, "count") + }) + }) + + Convey("v5", func() { + ds := &models.DataSource{ + JsonData: simplejson.NewFromAny(map[string]interface{}{ + "esVersion": 5, + }), + } + + c, err := newV5Client(newFakeBaseClient(ds, []string{"test-*"})) + So(err, ShouldBeNil) + So(c, ShouldNotBeNil) + + Convey("When creating multisearch requests should have correct headers", func() { + multiRequests := c.createMultiSearchRequests([]*SearchRequest{ + {Index: "test-*"}, + }) + So(multiRequests, ShouldHaveLength, 1) + header := multiRequests[0].header + So(header, ShouldHaveLength, 3) + So(header["index"], ShouldEqual, "test-*") + So(header["ignore_unavailable"], ShouldEqual, true) + So(header["search_type"], ShouldEqual, "query_then_fetch") + }) + }) + + Convey("v5.6", func() { + Convey("With default settings", func() { + ds := models.DataSource{ + JsonData: simplejson.NewFromAny(map[string]interface{}{ + "esVersion": 56, + }), + } + + c, err := newV56Client(newFakeBaseClient(&ds, []string{"test-*"})) + So(err, ShouldBeNil) + So(c, ShouldNotBeNil) + + Convey("When creating multisearch requests should have correct headers", func() { + multiRequests := c.createMultiSearchRequests([]*SearchRequest{ + {Index: "test-*"}, + }) + So(multiRequests, ShouldHaveLength, 1) + header := multiRequests[0].header + So(header, ShouldHaveLength, 4) + So(header["index"], ShouldEqual, "test-*") + So(header["ignore_unavailable"], ShouldEqual, true) + So(header["search_type"], ShouldEqual, "query_then_fetch") + So(header["max_concurrent_shard_requests"], ShouldEqual, 256) + }) + }) + + Convey("With custom settings", func() { + ds := models.DataSource{ + JsonData: simplejson.NewFromAny(map[string]interface{}{ + "esVersion": 56, + "maxConcurrentShardRequests": 100, + }), + } + + c, err := newV56Client(newFakeBaseClient(&ds, []string{"test-*"})) + So(err, ShouldBeNil) + So(c, ShouldNotBeNil) + Convey("When creating multisearch requests should have correct headers", func() { + multiRequests := c.createMultiSearchRequests([]*SearchRequest{ + {Index: "test-*"}, + }) + So(multiRequests, ShouldHaveLength, 1) + header := multiRequests[0].header + So(header, ShouldHaveLength, 4) + So(header["index"], ShouldEqual, "test-*") + So(header["ignore_unavailable"], ShouldEqual, true) + So(header["search_type"], ShouldEqual, "query_then_fetch") + So(header["max_concurrent_shard_requests"], ShouldEqual, 100) + }) + }) + }) + }) +} + +type fakeBaseClient struct { + *baseClientImpl + ds *models.DataSource +} + +func newFakeBaseClient(ds *models.DataSource, indices []string) baseClient { + return &fakeBaseClient{ + baseClientImpl: &baseClientImpl{ + ds: ds, + indices: indices, + }, + ds: ds, + } +} + +func (c *fakeBaseClient) executeBatchRequest(uriPath string, requests []*multiRequest) (*http.Response, error) { + return nil, nil +} + +func (c *fakeBaseClient) executeRequest(method, uriPath string, body []byte) (*http.Response, error) { + return nil, nil +} + +func (c *fakeBaseClient) executeMultisearch(searchRequests []*SearchRequest) ([]*SearchResponse, error) { + return nil, nil +} diff --git a/pkg/tsdb/elasticsearch/client/index_pattern.go b/pkg/tsdb/elasticsearch/client/index_pattern.go new file mode 100644 index 00000000000..8391e902ea4 --- /dev/null +++ b/pkg/tsdb/elasticsearch/client/index_pattern.go @@ -0,0 +1,312 @@ +package es + +import ( + "fmt" + "regexp" + "strings" + "time" + + "github.com/grafana/grafana/pkg/tsdb" +) + +const ( + noInterval = "" + intervalHourly = "hourly" + intervalDaily = "daily" + intervalWeekly = "weekly" + intervalMonthly = "monthly" + intervalYearly = "yearly" +) + +type indexPattern interface { + GetIndices(timeRange *tsdb.TimeRange) ([]string, error) +} + +var newIndexPattern = func(interval string, pattern string) (indexPattern, error) { + if interval == noInterval { + return &staticIndexPattern{indexName: pattern}, nil + } + + return newDynamicIndexPattern(interval, pattern) +} + +type staticIndexPattern struct { + indexName string +} + +func (ip *staticIndexPattern) GetIndices(timeRange *tsdb.TimeRange) ([]string, error) { + return []string{ip.indexName}, nil +} + +type intervalGenerator interface { + Generate(from, to time.Time) []time.Time +} + +type dynamicIndexPattern struct { + interval string + pattern string + intervalGenerator intervalGenerator +} + +func newDynamicIndexPattern(interval, pattern string) (*dynamicIndexPattern, error) { + var generator intervalGenerator + + switch strings.ToLower(interval) { + case intervalHourly: + generator = &hourlyInterval{} + case intervalDaily: + generator = &dailyInterval{} + case intervalWeekly: + generator = &weeklyInterval{} + case intervalMonthly: + generator = &monthlyInterval{} + case intervalYearly: + generator = &yearlyInterval{} + default: + return nil, fmt.Errorf("unsupported interval '%s'", interval) + } + + return &dynamicIndexPattern{ + interval: interval, + pattern: pattern, + intervalGenerator: generator, + }, nil +} + +func (ip *dynamicIndexPattern) GetIndices(timeRange *tsdb.TimeRange) ([]string, error) { + from := timeRange.GetFromAsTimeUTC() + to := timeRange.GetToAsTimeUTC() + intervals := ip.intervalGenerator.Generate(from, to) + indices := make([]string, 0) + + for _, t := range intervals { + indices = append(indices, formatDate(t, ip.pattern)) + } + + return indices, nil +} + +type hourlyInterval struct{} + +func (i *hourlyInterval) Generate(from, to time.Time) []time.Time { + intervals := []time.Time{} + start := time.Date(from.Year(), from.Month(), from.Day(), from.Hour(), 0, 0, 0, time.UTC) + end := time.Date(to.Year(), to.Month(), to.Day(), to.Hour(), 0, 0, 0, time.UTC) + + intervals = append(intervals, start) + + for start.Before(end) { + start = start.Add(time.Hour) + intervals = append(intervals, start) + } + + return intervals +} + +type dailyInterval struct{} + +func (i *dailyInterval) Generate(from, to time.Time) []time.Time { + intervals := []time.Time{} + start := time.Date(from.Year(), from.Month(), from.Day(), 0, 0, 0, 0, time.UTC) + end := time.Date(to.Year(), to.Month(), to.Day(), 0, 0, 0, 0, time.UTC) + + intervals = append(intervals, start) + + for start.Before(end) { + start = start.Add(24 * time.Hour) + intervals = append(intervals, start) + } + + return intervals +} + +type weeklyInterval struct{} + +func (i *weeklyInterval) Generate(from, to time.Time) []time.Time { + intervals := []time.Time{} + start := time.Date(from.Year(), from.Month(), from.Day(), 0, 0, 0, 0, time.UTC) + end := time.Date(to.Year(), to.Month(), to.Day(), 0, 0, 0, 0, time.UTC) + + for start.Weekday() != time.Monday { + start = start.Add(-24 * time.Hour) + } + + for end.Weekday() != time.Monday { + end = end.Add(-24 * time.Hour) + } + + year, week := start.ISOWeek() + intervals = append(intervals, start) + + for start.Before(end) { + start = start.Add(24 * time.Hour) + nextYear, nextWeek := start.ISOWeek() + if nextYear != year || nextWeek != week { + intervals = append(intervals, start) + } + year = nextYear + week = nextWeek + } + + return intervals +} + +type monthlyInterval struct{} + +func (i *monthlyInterval) Generate(from, to time.Time) []time.Time { + intervals := []time.Time{} + start := time.Date(from.Year(), from.Month(), 1, 0, 0, 0, 0, time.UTC) + end := time.Date(to.Year(), to.Month(), 1, 0, 0, 0, 0, time.UTC) + + month := start.Month() + intervals = append(intervals, start) + + for start.Before(end) { + start = start.Add(24 * time.Hour) + nextMonth := start.Month() + if nextMonth != month { + intervals = append(intervals, start) + } + month = nextMonth + } + + return intervals +} + +type yearlyInterval struct{} + +func (i *yearlyInterval) Generate(from, to time.Time) []time.Time { + intervals := []time.Time{} + start := time.Date(from.Year(), 1, 1, 0, 0, 0, 0, time.UTC) + end := time.Date(to.Year(), 1, 1, 0, 0, 0, 0, time.UTC) + + year := start.Year() + intervals = append(intervals, start) + + for start.Before(end) { + start = start.Add(24 * time.Hour) + nextYear := start.Year() + if nextYear != year { + intervals = append(intervals, start) + } + year = nextYear + } + + return intervals +} + +var datePatternRegex = regexp.MustCompile("(LT|LL?L?L?|l{1,4}|Mo|MM?M?M?|Do|DDDo|DD?D?D?|ddd?d?|do?|w[o|w]?|W[o|W]?|YYYYY|YYYY|YY|gg(ggg?)?|GG(GGG?)?|e|E|a|A|hh?|HH?|mm?|ss?|SS?S?|X|zz?|ZZ?|Q)") + +var datePatternReplacements = map[string]string{ + "M": "1", // stdNumMonth 1 2 ... 11 12 + "MM": "01", // stdZeroMonth 01 02 ... 11 12 + "MMM": "Jan", // stdMonth Jan Feb ... Nov Dec + "MMMM": "January", // stdLongMonth January February ... November December + "D": "2", // stdDay 1 2 ... 30 30 + "DD": "02", // stdZeroDay 01 02 ... 30 31 + "DDD": "", // Day of the year 1 2 ... 364 365 + "DDDD": "", // Day of the year 001 002 ... 364 365 @todo**** + "d": "", // Numeric representation of day of the week 0 1 ... 5 6 + "dd": "Mon", // ***Su Mo ... Fr Sa @todo + "ddd": "Mon", // Sun Mon ... Fri Sat + "dddd": "Monday", // stdLongWeekDay Sunday Monday ... Friday Saturday + "e": "", // Numeric representation of day of the week 0 1 ... 5 6 @todo + "E": "", // ISO-8601 numeric representation of the day of the week (added in PHP 5.1.0) 1 2 ... 6 7 @todo + "w": "", // 1 2 ... 52 53 + "ww": "", // ***01 02 ... 52 53 @todo + "W": "", // 1 2 ... 52 53 + "WW": "", // ***01 02 ... 52 53 @todo + "YY": "06", // stdYear 70 71 ... 29 30 + "YYYY": "2006", // stdLongYear 1970 1971 ... 2029 2030 + "gg": "", // ISO-8601 year number 70 71 ... 29 30 + "gggg": "", // ***1970 1971 ... 2029 2030 + "GG": "", //70 71 ... 29 30 + "GGGG": "", // ***1970 1971 ... 2029 2030 + "Q": "", // 1, 2, 3, 4 + "A": "PM", // stdPM AM PM + "a": "pm", // stdpm am pm + "H": "", // stdHour 0 1 ... 22 23 + "HH": "15", // 00 01 ... 22 23 + "h": "3", // stdHour12 1 2 ... 11 12 + "hh": "03", // stdZeroHour12 01 02 ... 11 12 + "m": "4", // stdZeroMinute 0 1 ... 58 59 + "mm": "04", // stdZeroMinute 00 01 ... 58 59 + "s": "5", // stdSecond 0 1 ... 58 59 + "ss": "05", // stdZeroSecond ***00 01 ... 58 59 + "z": "MST", //EST CST ... MST PST + "zz": "MST", //EST CST ... MST PST + "Z": "Z07:00", // stdNumColonTZ -07:00 -06:00 ... +06:00 +07:00 + "ZZ": "-0700", // stdNumTZ -0700 -0600 ... +0600 +0700 + "X": "", // Seconds since unix epoch 1360013296 + "LT": "3:04 PM", // 8:30 PM + "L": "01/02/2006", //09/04/1986 + "l": "1/2/2006", //9/4/1986 + "ll": "Jan 2 2006", //Sep 4 1986 + "lll": "Jan 2 2006 3:04 PM", //Sep 4 1986 8:30 PM + "llll": "Mon, Jan 2 2006 3:04 PM", //Thu, Sep 4 1986 8:30 PM +} + +func formatDate(t time.Time, pattern string) string { + var datePattern string + parts := strings.Split(strings.TrimLeft(pattern, "["), "]") + base := parts[0] + if len(parts) == 2 { + datePattern = parts[1] + } else { + datePattern = base + base = "" + } + + formatted := t.Format(patternToLayout(datePattern)) + + if strings.Contains(formatted, "", fmt.Sprintf("%d", isoYear), -1) + formatted = strings.Replace(formatted, "", isoYearShort, -1) + formatted = strings.Replace(formatted, "", fmt.Sprintf("%d", isoWeek), -1) + + formatted = strings.Replace(formatted, "", fmt.Sprintf("%d", t.Unix()), -1) + + day := t.Weekday() + dayOfWeekIso := int(day) + if day == time.Sunday { + dayOfWeekIso = 7 + } + + formatted = strings.Replace(formatted, "", fmt.Sprintf("%d", day), -1) + formatted = strings.Replace(formatted, "", fmt.Sprintf("%d", dayOfWeekIso), -1) + formatted = strings.Replace(formatted, "", fmt.Sprintf("%d", t.YearDay()), -1) + + quarter := 4 + + switch t.Month() { + case time.January, time.February, time.March: + quarter = 1 + case time.April, time.May, time.June: + quarter = 2 + case time.July, time.August, time.September: + quarter = 3 + } + + formatted = strings.Replace(formatted, "", fmt.Sprintf("%d", quarter), -1) + formatted = strings.Replace(formatted, "", fmt.Sprintf("%d", t.Hour()), -1) + } + + return base + formatted +} + +func patternToLayout(pattern string) string { + var match [][]string + if match = datePatternRegex.FindAllStringSubmatch(pattern, -1); match == nil { + return pattern + } + + for i := range match { + if replace, ok := datePatternReplacements[match[i][0]]; ok { + pattern = strings.Replace(pattern, match[i][0], replace, 1) + } + } + + return pattern +} diff --git a/pkg/tsdb/elasticsearch/client/index_pattern_test.go b/pkg/tsdb/elasticsearch/client/index_pattern_test.go new file mode 100644 index 00000000000..3bd823d8c87 --- /dev/null +++ b/pkg/tsdb/elasticsearch/client/index_pattern_test.go @@ -0,0 +1,244 @@ +package es + +import ( + "fmt" + "testing" + "time" + + "github.com/grafana/grafana/pkg/tsdb" + + . "github.com/smartystreets/goconvey/convey" +) + +func TestIndexPattern(t *testing.T) { + Convey("Static index patterns", t, func() { + indexPatternScenario(noInterval, "data-*", nil, func(indices []string) { + So(indices, ShouldHaveLength, 1) + So(indices[0], ShouldEqual, "data-*") + }) + + indexPatternScenario(noInterval, "es-index-name", nil, func(indices []string) { + So(indices, ShouldHaveLength, 1) + So(indices[0], ShouldEqual, "es-index-name") + }) + }) + + Convey("Dynamic index patterns", t, func() { + from := fmt.Sprintf("%d", time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC).UnixNano()/int64(time.Millisecond)) + to := fmt.Sprintf("%d", time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC).UnixNano()/int64(time.Millisecond)) + + indexPatternScenario(intervalHourly, "[data-]YYYY.MM.DD.HH", tsdb.NewTimeRange(from, to), func(indices []string) { + //So(indices, ShouldHaveLength, 1) + So(indices[0], ShouldEqual, "data-2018.05.15.17") + }) + + indexPatternScenario(intervalDaily, "[data-]YYYY.MM.DD", tsdb.NewTimeRange(from, to), func(indices []string) { + So(indices, ShouldHaveLength, 1) + So(indices[0], ShouldEqual, "data-2018.05.15") + }) + + indexPatternScenario(intervalWeekly, "[data-]GGGG.WW", tsdb.NewTimeRange(from, to), func(indices []string) { + So(indices, ShouldHaveLength, 1) + So(indices[0], ShouldEqual, "data-2018.20") + }) + + indexPatternScenario(intervalMonthly, "[data-]YYYY.MM", tsdb.NewTimeRange(from, to), func(indices []string) { + So(indices, ShouldHaveLength, 1) + So(indices[0], ShouldEqual, "data-2018.05") + }) + + indexPatternScenario(intervalYearly, "[data-]YYYY", tsdb.NewTimeRange(from, to), func(indices []string) { + So(indices, ShouldHaveLength, 1) + So(indices[0], ShouldEqual, "data-2018") + }) + }) + + Convey("Hourly interval", t, func() { + Convey("Should return 1 interval", func() { + from := time.Date(2018, 1, 1, 23, 1, 1, 0, time.UTC) + to := time.Date(2018, 1, 1, 23, 6, 0, 0, time.UTC) + intervals := (&hourlyInterval{}).Generate(from, to) + So(intervals, ShouldHaveLength, 1) + So(intervals[0], ShouldEqual, time.Date(2018, 1, 1, 23, 0, 0, 0, time.UTC)) + }) + + Convey("Should return 2 intervals", func() { + from := time.Date(2018, 1, 1, 23, 1, 1, 0, time.UTC) + to := time.Date(2018, 1, 2, 0, 6, 0, 0, time.UTC) + intervals := (&hourlyInterval{}).Generate(from, to) + So(intervals, ShouldHaveLength, 2) + So(intervals[0], ShouldEqual, time.Date(2018, 1, 1, 23, 0, 0, 0, time.UTC)) + So(intervals[1], ShouldEqual, time.Date(2018, 1, 2, 0, 0, 0, 0, time.UTC)) + }) + + Convey("Should return 10 intervals", func() { + from := time.Date(2018, 1, 1, 23, 1, 1, 0, time.UTC) + to := time.Date(2018, 1, 2, 8, 6, 0, 0, time.UTC) + intervals := (&hourlyInterval{}).Generate(from, to) + So(intervals, ShouldHaveLength, 10) + So(intervals[0], ShouldEqual, time.Date(2018, 1, 1, 23, 0, 0, 0, time.UTC)) + So(intervals[4], ShouldEqual, time.Date(2018, 1, 2, 3, 0, 0, 0, time.UTC)) + So(intervals[9], ShouldEqual, time.Date(2018, 1, 2, 8, 0, 0, 0, time.UTC)) + }) + }) + + Convey("Daily interval", t, func() { + Convey("Should return 1 day", func() { + from := time.Date(2018, 1, 1, 23, 1, 1, 0, time.UTC) + to := time.Date(2018, 1, 1, 23, 6, 0, 0, time.UTC) + intervals := (&dailyInterval{}).Generate(from, to) + So(intervals, ShouldHaveLength, 1) + So(intervals[0], ShouldEqual, time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC)) + }) + + Convey("Should return 2 days", func() { + from := time.Date(2018, 1, 1, 23, 1, 1, 0, time.UTC) + to := time.Date(2018, 1, 2, 0, 6, 0, 0, time.UTC) + intervals := (&dailyInterval{}).Generate(from, to) + So(intervals, ShouldHaveLength, 2) + So(intervals[0], ShouldEqual, time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC)) + So(intervals[1], ShouldEqual, time.Date(2018, 1, 2, 0, 0, 0, 0, time.UTC)) + }) + + Convey("Should return 32 days", func() { + from := time.Date(2018, 1, 1, 23, 1, 1, 0, time.UTC) + to := time.Date(2018, 2, 1, 8, 6, 0, 0, time.UTC) + intervals := (&dailyInterval{}).Generate(from, to) + So(intervals, ShouldHaveLength, 32) + So(intervals[0], ShouldEqual, time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC)) + So(intervals[30], ShouldEqual, time.Date(2018, 1, 31, 0, 0, 0, 0, time.UTC)) + So(intervals[31], ShouldEqual, time.Date(2018, 2, 1, 0, 0, 0, 0, time.UTC)) + }) + }) + + Convey("Weekly interval", t, func() { + Convey("Should return 1 week (1)", func() { + from := time.Date(2018, 1, 1, 23, 1, 1, 0, time.UTC) + to := time.Date(2018, 1, 1, 23, 6, 0, 0, time.UTC) + intervals := (&weeklyInterval{}).Generate(from, to) + So(intervals, ShouldHaveLength, 1) + So(intervals[0], ShouldEqual, time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC)) + }) + + Convey("Should return 1 week (2)", func() { + from := time.Date(2017, 1, 1, 23, 1, 1, 0, time.UTC) + to := time.Date(2017, 1, 1, 23, 6, 0, 0, time.UTC) + intervals := (&weeklyInterval{}).Generate(from, to) + So(intervals, ShouldHaveLength, 1) + So(intervals[0], ShouldEqual, time.Date(2016, 12, 26, 0, 0, 0, 0, time.UTC)) + }) + + Convey("Should return 2 weeks (1)", func() { + from := time.Date(2018, 1, 1, 23, 1, 1, 0, time.UTC) + to := time.Date(2018, 1, 10, 23, 6, 0, 0, time.UTC) + intervals := (&weeklyInterval{}).Generate(from, to) + So(intervals, ShouldHaveLength, 2) + So(intervals[0], ShouldEqual, time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC)) + So(intervals[1], ShouldEqual, time.Date(2018, 1, 8, 0, 0, 0, 0, time.UTC)) + }) + + Convey("Should return 2 weeks (2)", func() { + from := time.Date(2017, 1, 1, 23, 1, 1, 0, time.UTC) + to := time.Date(2017, 1, 8, 23, 6, 0, 0, time.UTC) + intervals := (&weeklyInterval{}).Generate(from, to) + So(intervals, ShouldHaveLength, 2) + So(intervals[0], ShouldEqual, time.Date(2016, 12, 26, 0, 0, 0, 0, time.UTC)) + So(intervals[1], ShouldEqual, time.Date(2017, 1, 2, 0, 0, 0, 0, time.UTC)) + }) + + Convey("Should return 3 weeks (1)", func() { + from := time.Date(2018, 1, 1, 23, 1, 1, 0, time.UTC) + to := time.Date(2018, 1, 21, 23, 6, 0, 0, time.UTC) + intervals := (&weeklyInterval{}).Generate(from, to) + So(intervals, ShouldHaveLength, 3) + So(intervals[0], ShouldEqual, time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC)) + So(intervals[1], ShouldEqual, time.Date(2018, 1, 8, 0, 0, 0, 0, time.UTC)) + So(intervals[2], ShouldEqual, time.Date(2018, 1, 15, 0, 0, 0, 0, time.UTC)) + }) + + Convey("Should return 3 weeks (2)", func() { + from := time.Date(2017, 1, 1, 23, 1, 1, 0, time.UTC) + to := time.Date(2017, 1, 9, 23, 6, 0, 0, time.UTC) + intervals := (&weeklyInterval{}).Generate(from, to) + So(intervals, ShouldHaveLength, 3) + So(intervals[0], ShouldEqual, time.Date(2016, 12, 26, 0, 0, 0, 0, time.UTC)) + So(intervals[1], ShouldEqual, time.Date(2017, 1, 2, 0, 0, 0, 0, time.UTC)) + So(intervals[2], ShouldEqual, time.Date(2017, 1, 9, 0, 0, 0, 0, time.UTC)) + }) + }) + + Convey("Monthly interval", t, func() { + Convey("Should return 1 month", func() { + from := time.Date(2018, 1, 1, 23, 1, 1, 0, time.UTC) + to := time.Date(2018, 1, 1, 23, 6, 0, 0, time.UTC) + intervals := (&monthlyInterval{}).Generate(from, to) + So(intervals, ShouldHaveLength, 1) + So(intervals[0], ShouldEqual, time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC)) + }) + + Convey("Should return 2 months", func() { + from := time.Date(2018, 1, 1, 23, 1, 1, 0, time.UTC) + to := time.Date(2018, 2, 2, 0, 6, 0, 0, time.UTC) + intervals := (&monthlyInterval{}).Generate(from, to) + So(intervals, ShouldHaveLength, 2) + So(intervals[0], ShouldEqual, time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC)) + So(intervals[1], ShouldEqual, time.Date(2018, 2, 1, 0, 0, 0, 0, time.UTC)) + }) + + Convey("Should return 14 months", func() { + from := time.Date(2017, 1, 1, 23, 1, 1, 0, time.UTC) + to := time.Date(2018, 2, 1, 8, 6, 0, 0, time.UTC) + intervals := (&monthlyInterval{}).Generate(from, to) + So(intervals, ShouldHaveLength, 14) + So(intervals[0], ShouldEqual, time.Date(2017, 1, 1, 0, 0, 0, 0, time.UTC)) + So(intervals[13], ShouldEqual, time.Date(2018, 2, 1, 0, 0, 0, 0, time.UTC)) + }) + }) + + Convey("Yearly interval", t, func() { + Convey("Should return 1 year (hour diff)", func() { + from := time.Date(2018, 2, 1, 23, 1, 1, 0, time.UTC) + to := time.Date(2018, 2, 1, 23, 6, 0, 0, time.UTC) + intervals := (&yearlyInterval{}).Generate(from, to) + So(intervals, ShouldHaveLength, 1) + So(intervals[0], ShouldEqual, time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC)) + }) + + Convey("Should return 1 year (month diff)", func() { + from := time.Date(2018, 2, 1, 23, 1, 1, 0, time.UTC) + to := time.Date(2018, 12, 31, 23, 59, 59, 0, time.UTC) + intervals := (&yearlyInterval{}).Generate(from, to) + So(intervals, ShouldHaveLength, 1) + So(intervals[0], ShouldEqual, time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC)) + }) + + Convey("Should return 2 years", func() { + from := time.Date(2018, 2, 1, 23, 1, 1, 0, time.UTC) + to := time.Date(2019, 1, 1, 23, 59, 59, 0, time.UTC) + intervals := (&yearlyInterval{}).Generate(from, to) + So(intervals, ShouldHaveLength, 2) + So(intervals[0], ShouldEqual, time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC)) + So(intervals[1], ShouldEqual, time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC)) + }) + + Convey("Should return 5 years", func() { + from := time.Date(2014, 1, 1, 23, 1, 1, 0, time.UTC) + to := time.Date(2018, 11, 1, 23, 59, 59, 0, time.UTC) + intervals := (&yearlyInterval{}).Generate(from, to) + So(intervals, ShouldHaveLength, 5) + So(intervals[0], ShouldEqual, time.Date(2014, 1, 1, 0, 0, 0, 0, time.UTC)) + So(intervals[4], ShouldEqual, time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC)) + }) + }) +} + +func indexPatternScenario(interval string, pattern string, timeRange *tsdb.TimeRange, fn func(indices []string)) { + Convey(fmt.Sprintf("Index pattern (interval=%s, index=%s", interval, pattern), func() { + ip, err := newIndexPattern(interval, pattern) + So(err, ShouldBeNil) + So(ip, ShouldNotBeNil) + indices, err := ip.GetIndices(timeRange) + So(err, ShouldBeNil) + fn(indices) + }) +} diff --git a/pkg/tsdb/elasticsearch/client/models.go b/pkg/tsdb/elasticsearch/client/models.go new file mode 100644 index 00000000000..3c86dcce825 --- /dev/null +++ b/pkg/tsdb/elasticsearch/client/models.go @@ -0,0 +1,304 @@ +package es + +import ( + "encoding/json" +) + +// SearchRequest represents a search request +type SearchRequest struct { + Index string + Size int + Sort map[string]interface{} + Query *Query + Aggs AggArray + CustomProps map[string]interface{} +} + +// MarshalJSON returns the JSON encoding of the request. +func (r *SearchRequest) MarshalJSON() ([]byte, error) { + root := make(map[string]interface{}) + + root["size"] = r.Size + if len(r.Sort) > 0 { + root["sort"] = r.Sort + } + + for key, value := range r.CustomProps { + root[key] = value + } + + root["query"] = r.Query + + if len(r.Aggs) > 0 { + root["aggs"] = r.Aggs + } + + return json.Marshal(root) +} + +// SearchResponseHits represents search response hits +type SearchResponseHits struct { + Hits []map[string]interface{} + Total int64 +} + +// SearchResponse represents a search response +type SearchResponse struct { + Error map[string]interface{} `json:"error"` + Aggregations map[string]interface{} `json:"aggregations"` + Hits *SearchResponseHits `json:"hits"` +} + +// func (r *Response) getErrMsg() string { +// var msg bytes.Buffer +// errJson := simplejson.NewFromAny(r.Err) +// errType, err := errJson.Get("type").String() +// if err == nil { +// msg.WriteString(fmt.Sprintf("type:%s", errType)) +// } + +// reason, err := errJson.Get("type").String() +// if err == nil { +// msg.WriteString(fmt.Sprintf("reason:%s", reason)) +// } +// return msg.String() +// } + +// MultiSearchRequest represents a multi search request +type MultiSearchRequest struct { + Requests []*SearchRequest +} + +// MultiSearchResponse represents a multi search response +type MultiSearchResponse struct { + status int `json:"status,omitempty"` + Responses []*SearchResponse `json:"responses"` +} + +// Query represents a query +type Query struct { + Bool *BoolQuery `json:"bool"` +} + +// BoolQuery represents a bool query +type BoolQuery struct { + Filters []Filter +} + +// NewBoolQuery create a new bool query +func NewBoolQuery() *BoolQuery { + return &BoolQuery{Filters: make([]Filter, 0)} +} + +// MarshalJSON returns the JSON encoding of the boolean query. +func (q *BoolQuery) MarshalJSON() ([]byte, error) { + root := make(map[string]interface{}) + + if len(q.Filters) > 0 { + if len(q.Filters) == 1 { + root["filter"] = q.Filters[0] + } else { + root["filter"] = q.Filters + } + } + return json.Marshal(root) +} + +// Filter represents a search filter +type Filter interface{} + +// QueryStringFilter represents a query string search filter +type QueryStringFilter struct { + Filter + Query string + AnalyzeWildcard bool +} + +// MarshalJSON returns the JSON encoding of the query string filter. +func (f *QueryStringFilter) MarshalJSON() ([]byte, error) { + root := map[string]interface{}{ + "query_string": map[string]interface{}{ + "query": f.Query, + "analyze_wildcard": f.AnalyzeWildcard, + }, + } + + return json.Marshal(root) +} + +// RangeFilter represents a range search filter +type RangeFilter struct { + Filter + Key string + Gte string + Lte string + Format string +} + +// DateFormatEpochMS represents a date format of epoch milliseconds (epoch_millis) +const DateFormatEpochMS = "epoch_millis" + +// MarshalJSON returns the JSON encoding of the query string filter. +func (f *RangeFilter) MarshalJSON() ([]byte, error) { + root := map[string]map[string]map[string]interface{}{ + "range": { + f.Key: { + "lte": f.Lte, + "gte": f.Gte, + }, + }, + } + + if f.Format != "" { + root["range"][f.Key]["format"] = f.Format + } + + return json.Marshal(root) +} + +// Aggregation represents an aggregation +type Aggregation interface{} + +// Agg represents a key and aggregation +type Agg struct { + Key string + Aggregation *aggContainer +} + +// MarshalJSON returns the JSON encoding of the agg +func (a *Agg) MarshalJSON() ([]byte, error) { + root := map[string]interface{}{ + a.Key: a.Aggregation, + } + + return json.Marshal(root) +} + +// AggArray represents a collection of key/aggregation pairs +type AggArray []*Agg + +// MarshalJSON returns the JSON encoding of the agg +func (a AggArray) MarshalJSON() ([]byte, error) { + aggsMap := make(map[string]Aggregation) + + for _, subAgg := range a { + aggsMap[subAgg.Key] = subAgg.Aggregation + } + + return json.Marshal(aggsMap) +} + +type aggContainer struct { + Type string + Aggregation Aggregation + Aggs AggArray +} + +// MarshalJSON returns the JSON encoding of the aggregation container +func (a *aggContainer) MarshalJSON() ([]byte, error) { + root := map[string]interface{}{ + a.Type: a.Aggregation, + } + + if len(a.Aggs) > 0 { + root["aggs"] = a.Aggs + } + + return json.Marshal(root) +} + +type aggDef struct { + key string + aggregation *aggContainer + builders []AggBuilder +} + +func newAggDef(key string, aggregation *aggContainer) *aggDef { + return &aggDef{ + key: key, + aggregation: aggregation, + builders: make([]AggBuilder, 0), + } +} + +// HistogramAgg represents a histogram aggregation +type HistogramAgg struct { + Interval int `json:"interval,omitempty"` + Field string `json:"field"` + MinDocCount int `json:"min_doc_count"` + Missing *int `json:"missing,omitempty"` +} + +// DateHistogramAgg represents a date histogram aggregation +type DateHistogramAgg struct { + Field string `json:"field"` + Interval string `json:"interval,omitempty"` + MinDocCount int `json:"min_doc_count"` + Missing *string `json:"missing,omitempty"` + ExtendedBounds *ExtendedBounds `json:"extended_bounds"` + Format string `json:"format"` +} + +// FiltersAggregation represents a filters aggregation +type FiltersAggregation struct { + Filters map[string]interface{} `json:"filters"` +} + +// TermsAggregation represents a terms aggregation +type TermsAggregation struct { + Field string `json:"field"` + Size int `json:"size"` + Order map[string]interface{} `json:"order"` + MinDocCount *int `json:"min_doc_count,omitempty"` + Missing *string `json:"missing,omitempty"` +} + +// ExtendedBounds represents extended bounds +type ExtendedBounds struct { + Min string `json:"min"` + Max string `json:"max"` +} + +// GeoHashGridAggregation represents a geo hash grid aggregation +type GeoHashGridAggregation struct { + Field string `json:"field"` + Precision int `json:"precision"` +} + +// MetricAggregation represents a metric aggregation +type MetricAggregation struct { + Field string + Settings map[string]interface{} +} + +// MarshalJSON returns the JSON encoding of the metric aggregation +func (a *MetricAggregation) MarshalJSON() ([]byte, error) { + root := map[string]interface{}{ + "field": a.Field, + } + + for k, v := range a.Settings { + root[k] = v + } + + return json.Marshal(root) +} + +// PipelineAggregation represents a metric aggregation +type PipelineAggregation struct { + BucketPath string + Settings map[string]interface{} +} + +// MarshalJSON returns the JSON encoding of the pipeline aggregation +func (a *PipelineAggregation) MarshalJSON() ([]byte, error) { + root := map[string]interface{}{ + "bucket_path": a.BucketPath, + } + + for k, v := range a.Settings { + root[k] = v + } + + return json.Marshal(root) +} diff --git a/pkg/tsdb/elasticsearch/client/search_request.go b/pkg/tsdb/elasticsearch/client/search_request.go new file mode 100644 index 00000000000..a582d8ec247 --- /dev/null +++ b/pkg/tsdb/elasticsearch/client/search_request.go @@ -0,0 +1,446 @@ +package es + +import ( + "strings" +) + +// SearchRequestBuilder represents a builder which can build a search request +type SearchRequestBuilder struct { + version int + index string + size int + sort map[string]interface{} + queryBuilder *QueryBuilder + aggBuilders []AggBuilder + customProps map[string]interface{} +} + +// NewSearchRequestBuilder create a new search request builder +func NewSearchRequestBuilder(version int) *SearchRequestBuilder { + builder := &SearchRequestBuilder{ + version: version, + sort: make(map[string]interface{}), + customProps: make(map[string]interface{}), + aggBuilders: make([]AggBuilder, 0), + } + return builder +} + +// Build builds and return a search request +func (b *SearchRequestBuilder) Build() (*SearchRequest, error) { + sr := SearchRequest{ + Index: b.index, + Size: b.size, + Sort: b.sort, + CustomProps: b.customProps, + } + + if b.queryBuilder != nil { + q, err := b.queryBuilder.Build() + if err != nil { + return nil, err + } + sr.Query = q + } + + if len(b.aggBuilders) > 0 { + sr.Aggs = make(AggArray, 0) + + for _, ab := range b.aggBuilders { + aggArray, err := ab.Build() + if err != nil { + return nil, err + } + for _, agg := range aggArray { + sr.Aggs = append(sr.Aggs, agg) + } + } + } + + return &sr, nil +} + +// Size sets the size of the search request +func (b *SearchRequestBuilder) Size(size int) *SearchRequestBuilder { + b.size = size + return b +} + +// SortDesc adds a sort to the search request +func (b *SearchRequestBuilder) SortDesc(field, unmappedType string) *SearchRequestBuilder { + props := map[string]string{ + "order": "desc", + } + + if unmappedType != "" { + props["unmapped_type"] = unmappedType + } + + b.sort[field] = props + + return b +} + +// AddDocValueField adds a doc value field to the search request +func (b *SearchRequestBuilder) AddDocValueField(field string) *SearchRequestBuilder { + // fields field not supported on version >= 5 + if b.version < 5 { + b.customProps["fields"] = []string{"*", "_source"} + } + + b.customProps["script_fields"] = make(map[string]interface{}) + + if b.version < 5 { + b.customProps["fielddata_fields"] = []string{field} + } else { + b.customProps["docvalue_fields"] = []string{field} + } + + return b +} + +// Query creates and return a query builder +func (b *SearchRequestBuilder) Query() *QueryBuilder { + if b.queryBuilder == nil { + b.queryBuilder = NewQueryBuilder() + } + return b.queryBuilder +} + +// Agg initaite and returns a new aggregation builder +func (b *SearchRequestBuilder) Agg() AggBuilder { + aggBuilder := newAggBuilder() + b.aggBuilders = append(b.aggBuilders, aggBuilder) + return aggBuilder +} + +// MultiSearchRequestBuilder represents a builder which can build a multi search request +type MultiSearchRequestBuilder struct { + version int + requestBuilders []*SearchRequestBuilder +} + +// NewMultiSearchRequestBuilder creates a new multi search request builder +func NewMultiSearchRequestBuilder(version int) *MultiSearchRequestBuilder { + return &MultiSearchRequestBuilder{ + version: version, + } +} + +// Search initiates and returns a new search request builder +func (m *MultiSearchRequestBuilder) Search() *SearchRequestBuilder { + b := NewSearchRequestBuilder(m.version) + m.requestBuilders = append(m.requestBuilders, b) + return b +} + +// Build builds and return a multi search request +func (m *MultiSearchRequestBuilder) Build() (*MultiSearchRequest, error) { + requests := []*SearchRequest{} + for _, sb := range m.requestBuilders { + searchRequest, err := sb.Build() + if err != nil { + return nil, err + } + requests = append(requests, searchRequest) + } + + return &MultiSearchRequest{ + Requests: requests, + }, nil +} + +// QueryBuilder represents a query builder +type QueryBuilder struct { + boolQueryBuilder *BoolQueryBuilder +} + +// NewQueryBuilder create a new query builder +func NewQueryBuilder() *QueryBuilder { + return &QueryBuilder{} +} + +// Build builds and return a query builder +func (b *QueryBuilder) Build() (*Query, error) { + q := Query{} + + if b.boolQueryBuilder != nil { + b, err := b.boolQueryBuilder.Build() + if err != nil { + return nil, err + } + q.Bool = b + } + + return &q, nil +} + +// Bool creates and return a query builder +func (b *QueryBuilder) Bool() *BoolQueryBuilder { + if b.boolQueryBuilder == nil { + b.boolQueryBuilder = NewBoolQueryBuilder() + } + return b.boolQueryBuilder +} + +// BoolQueryBuilder represents a bool query builder +type BoolQueryBuilder struct { + filterQueryBuilder *FilterQueryBuilder +} + +// NewBoolQueryBuilder create a new bool query builder +func NewBoolQueryBuilder() *BoolQueryBuilder { + return &BoolQueryBuilder{} +} + +// Filter creates and return a filter query builder +func (b *BoolQueryBuilder) Filter() *FilterQueryBuilder { + if b.filterQueryBuilder == nil { + b.filterQueryBuilder = NewFilterQueryBuilder() + } + return b.filterQueryBuilder +} + +// Build builds and return a bool query builder +func (b *BoolQueryBuilder) Build() (*BoolQuery, error) { + boolQuery := BoolQuery{} + + if b.filterQueryBuilder != nil { + filters, err := b.filterQueryBuilder.Build() + if err != nil { + return nil, err + } + boolQuery.Filters = filters + } + + return &boolQuery, nil +} + +// FilterQueryBuilder represents a filter query builder +type FilterQueryBuilder struct { + filters []Filter +} + +// NewFilterQueryBuilder creates a new filter query builder +func NewFilterQueryBuilder() *FilterQueryBuilder { + return &FilterQueryBuilder{ + filters: make([]Filter, 0), + } +} + +// Build builds and return a filter query builder +func (b *FilterQueryBuilder) Build() ([]Filter, error) { + return b.filters, nil +} + +// AddDateRangeFilter adds a new time range filter +func (b *FilterQueryBuilder) AddDateRangeFilter(timeField, lte, gte, format string) *FilterQueryBuilder { + b.filters = append(b.filters, &RangeFilter{ + Key: timeField, + Lte: lte, + Gte: gte, + Format: format, + }) + return b +} + +// AddQueryStringFilter adds a new query string filter +func (b *FilterQueryBuilder) AddQueryStringFilter(querystring string, analyseWildcard bool) *FilterQueryBuilder { + if len(strings.TrimSpace(querystring)) == 0 { + return b + } + + b.filters = append(b.filters, &QueryStringFilter{ + Query: querystring, + AnalyzeWildcard: analyseWildcard, + }) + return b +} + +// AggBuilder represents an aggregation builder +type AggBuilder interface { + Histogram(key, field string, fn func(a *HistogramAgg, b AggBuilder)) AggBuilder + DateHistogram(key, field string, fn func(a *DateHistogramAgg, b AggBuilder)) AggBuilder + Terms(key, field string, fn func(a *TermsAggregation, b AggBuilder)) AggBuilder + Filters(key string, fn func(a *FiltersAggregation, b AggBuilder)) AggBuilder + GeoHashGrid(key, field string, fn func(a *GeoHashGridAggregation, b AggBuilder)) AggBuilder + Metric(key, metricType, field string, fn func(a *MetricAggregation)) AggBuilder + Pipeline(key, pipelineType, bucketPath string, fn func(a *PipelineAggregation)) AggBuilder + Build() (AggArray, error) +} + +type aggBuilderImpl struct { + AggBuilder + aggDefs []*aggDef +} + +func newAggBuilder() *aggBuilderImpl { + return &aggBuilderImpl{ + aggDefs: make([]*aggDef, 0), + } +} + +func (b *aggBuilderImpl) Build() (AggArray, error) { + aggs := make(AggArray, 0) + + for _, aggDef := range b.aggDefs { + agg := &Agg{ + Key: aggDef.key, + Aggregation: aggDef.aggregation, + } + + for _, cb := range aggDef.builders { + childAggs, err := cb.Build() + if err != nil { + return nil, err + } + + for _, childAgg := range childAggs { + agg.Aggregation.Aggs = append(agg.Aggregation.Aggs, childAgg) + } + } + + aggs = append(aggs, agg) + } + + return aggs, nil +} + +func (b *aggBuilderImpl) Histogram(key, field string, fn func(a *HistogramAgg, b AggBuilder)) AggBuilder { + innerAgg := &HistogramAgg{ + Field: field, + } + aggDef := newAggDef(key, &aggContainer{ + Type: "histogram", + Aggregation: innerAgg, + }) + + if fn != nil { + builder := newAggBuilder() + aggDef.builders = append(aggDef.builders, builder) + fn(innerAgg, builder) + } + + b.aggDefs = append(b.aggDefs, aggDef) + + return b +} + +func (b *aggBuilderImpl) DateHistogram(key, field string, fn func(a *DateHistogramAgg, b AggBuilder)) AggBuilder { + innerAgg := &DateHistogramAgg{ + Field: field, + } + aggDef := newAggDef(key, &aggContainer{ + Type: "date_histogram", + Aggregation: innerAgg, + }) + + if fn != nil { + builder := newAggBuilder() + aggDef.builders = append(aggDef.builders, builder) + fn(innerAgg, builder) + } + + b.aggDefs = append(b.aggDefs, aggDef) + + return b +} + +func (b *aggBuilderImpl) Terms(key, field string, fn func(a *TermsAggregation, b AggBuilder)) AggBuilder { + innerAgg := &TermsAggregation{ + Field: field, + Order: make(map[string]interface{}), + } + aggDef := newAggDef(key, &aggContainer{ + Type: "terms", + Aggregation: innerAgg, + }) + + if fn != nil { + builder := newAggBuilder() + aggDef.builders = append(aggDef.builders, builder) + fn(innerAgg, builder) + } + + b.aggDefs = append(b.aggDefs, aggDef) + + return b +} + +func (b *aggBuilderImpl) Filters(key string, fn func(a *FiltersAggregation, b AggBuilder)) AggBuilder { + innerAgg := &FiltersAggregation{ + Filters: make(map[string]interface{}), + } + aggDef := newAggDef(key, &aggContainer{ + Type: "filters", + Aggregation: innerAgg, + }) + if fn != nil { + builder := newAggBuilder() + aggDef.builders = append(aggDef.builders, builder) + fn(innerAgg, builder) + } + + b.aggDefs = append(b.aggDefs, aggDef) + + return b +} + +func (b *aggBuilderImpl) GeoHashGrid(key, field string, fn func(a *GeoHashGridAggregation, b AggBuilder)) AggBuilder { + innerAgg := &GeoHashGridAggregation{ + Field: field, + Precision: 5, + } + aggDef := newAggDef(key, &aggContainer{ + Type: "geohash_grid", + Aggregation: innerAgg, + }) + + if fn != nil { + builder := newAggBuilder() + aggDef.builders = append(aggDef.builders, builder) + fn(innerAgg, builder) + } + + b.aggDefs = append(b.aggDefs, aggDef) + + return b +} + +func (b *aggBuilderImpl) Metric(key, metricType, field string, fn func(a *MetricAggregation)) AggBuilder { + innerAgg := &MetricAggregation{ + Field: field, + Settings: make(map[string]interface{}), + } + aggDef := newAggDef(key, &aggContainer{ + Type: metricType, + Aggregation: innerAgg, + }) + + if fn != nil { + fn(innerAgg) + } + + b.aggDefs = append(b.aggDefs, aggDef) + + return b +} + +func (b *aggBuilderImpl) Pipeline(key, pipelineType, bucketPath string, fn func(a *PipelineAggregation)) AggBuilder { + innerAgg := &PipelineAggregation{ + BucketPath: bucketPath, + Settings: make(map[string]interface{}), + } + aggDef := newAggDef(key, &aggContainer{ + Type: pipelineType, + Aggregation: innerAgg, + }) + + if fn != nil { + fn(innerAgg) + } + + b.aggDefs = append(b.aggDefs, aggDef) + + return b +} diff --git a/pkg/tsdb/elasticsearch/client/search_request_test.go b/pkg/tsdb/elasticsearch/client/search_request_test.go new file mode 100644 index 00000000000..d93f8826442 --- /dev/null +++ b/pkg/tsdb/elasticsearch/client/search_request_test.go @@ -0,0 +1,471 @@ +package es + +import ( + "encoding/json" + "testing" + + "github.com/grafana/grafana/pkg/components/simplejson" + + . "github.com/smartystreets/goconvey/convey" +) + +func TestSearchRequest(t *testing.T) { + Convey("Test elasticsearch search request", t, func() { + timeField := "@timestamp" + Convey("Given new search request builder for es version 5", func() { + b := NewSearchRequestBuilder(5) + + Convey("When building search request", func() { + sr, err := b.Build() + So(err, ShouldBeNil) + + Convey("Should have size of zero", func() { + So(sr.Size, ShouldEqual, 0) + }) + + Convey("Should have no sorting", func() { + So(sr.Sort, ShouldHaveLength, 0) + }) + + Convey("When marshal to JSON should generate correct json", func() { + body, err := json.Marshal(sr) + So(err, ShouldBeNil) + json, err := simplejson.NewJson([]byte(body)) + So(err, ShouldBeNil) + So(json.Get("size").MustInt(500), ShouldEqual, 0) + So(json.Get("sort").Interface(), ShouldBeNil) + So(json.Get("aggs").Interface(), ShouldBeNil) + So(json.Get("query").Interface(), ShouldBeNil) + }) + }) + + Convey("When adding size, sort, filters", func() { + b.Size(200) + b.SortDesc(timeField, "boolean") + filters := b.Query().Bool().Filter() + filters.AddDateRangeFilter(timeField, "$timeTo", "$timeFrom", DateFormatEpochMS) + filters.AddQueryStringFilter("test", true) + + Convey("When building search request", func() { + sr, err := b.Build() + So(err, ShouldBeNil) + + Convey("Should have correct size", func() { + So(sr.Size, ShouldEqual, 200) + }) + + Convey("Should have correct sorting", func() { + sort, ok := sr.Sort[timeField].(map[string]string) + So(ok, ShouldBeTrue) + So(sort["order"], ShouldEqual, "desc") + So(sort["unmapped_type"], ShouldEqual, "boolean") + }) + + Convey("Should have range filter", func() { + f, ok := sr.Query.Bool.Filters[0].(*RangeFilter) + So(ok, ShouldBeTrue) + So(f.Gte, ShouldEqual, "$timeFrom") + So(f.Lte, ShouldEqual, "$timeTo") + So(f.Format, ShouldEqual, "epoch_millis") + }) + + Convey("Should have query string filter", func() { + f, ok := sr.Query.Bool.Filters[1].(*QueryStringFilter) + So(ok, ShouldBeTrue) + So(f.Query, ShouldEqual, "test") + So(f.AnalyzeWildcard, ShouldBeTrue) + }) + + Convey("When marshal to JSON should generate correct json", func() { + body, err := json.Marshal(sr) + So(err, ShouldBeNil) + json, err := simplejson.NewJson([]byte(body)) + So(err, ShouldBeNil) + So(json.Get("size").MustInt(0), ShouldEqual, 200) + + sort := json.GetPath("sort", timeField) + So(sort.Get("order").MustString(), ShouldEqual, "desc") + So(sort.Get("unmapped_type").MustString(), ShouldEqual, "boolean") + + timeRangeFilter := json.GetPath("query", "bool", "filter").GetIndex(0).Get("range").Get(timeField) + So(timeRangeFilter.Get("gte").MustString(""), ShouldEqual, "$timeFrom") + So(timeRangeFilter.Get("lte").MustString(""), ShouldEqual, "$timeTo") + So(timeRangeFilter.Get("format").MustString(""), ShouldEqual, DateFormatEpochMS) + + queryStringFilter := json.GetPath("query", "bool", "filter").GetIndex(1).Get("query_string") + So(queryStringFilter.Get("analyze_wildcard").MustBool(false), ShouldEqual, true) + So(queryStringFilter.Get("query").MustString(""), ShouldEqual, "test") + }) + }) + }) + + Convey("When adding doc value field", func() { + b.AddDocValueField(timeField) + + Convey("should set correct props", func() { + So(b.customProps["fields"], ShouldBeNil) + + scriptFields, ok := b.customProps["script_fields"].(map[string]interface{}) + So(ok, ShouldBeTrue) + So(scriptFields, ShouldHaveLength, 0) + + docValueFields, ok := b.customProps["docvalue_fields"].([]string) + So(ok, ShouldBeTrue) + So(docValueFields, ShouldHaveLength, 1) + So(docValueFields[0], ShouldEqual, timeField) + }) + + Convey("When building search request", func() { + sr, err := b.Build() + So(err, ShouldBeNil) + + Convey("When marshal to JSON should generate correct json", func() { + body, err := json.Marshal(sr) + So(err, ShouldBeNil) + json, err := simplejson.NewJson([]byte(body)) + So(err, ShouldBeNil) + + scriptFields, err := json.Get("script_fields").Map() + So(err, ShouldBeNil) + So(scriptFields, ShouldHaveLength, 0) + + _, err = json.Get("fields").StringArray() + So(err, ShouldNotBeNil) + + docValueFields, err := json.Get("docvalue_fields").StringArray() + So(err, ShouldBeNil) + So(docValueFields, ShouldHaveLength, 1) + So(docValueFields[0], ShouldEqual, timeField) + }) + }) + }) + + Convey("and adding multiple top level aggs", func() { + aggBuilder := b.Agg() + aggBuilder.Terms("1", "@hostname", nil) + aggBuilder.DateHistogram("2", "@timestamp", nil) + + Convey("When building search request", func() { + sr, err := b.Build() + So(err, ShouldBeNil) + + Convey("Should have 2 top level aggs", func() { + aggs := sr.Aggs + So(aggs, ShouldHaveLength, 2) + So(aggs[0].Key, ShouldEqual, "1") + So(aggs[0].Aggregation.Type, ShouldEqual, "terms") + So(aggs[1].Key, ShouldEqual, "2") + So(aggs[1].Aggregation.Type, ShouldEqual, "date_histogram") + }) + + Convey("When marshal to JSON should generate correct json", func() { + body, err := json.Marshal(sr) + So(err, ShouldBeNil) + json, err := simplejson.NewJson([]byte(body)) + So(err, ShouldBeNil) + + So(json.Get("aggs").MustMap(), ShouldHaveLength, 2) + So(json.GetPath("aggs", "1", "terms", "field").MustString(), ShouldEqual, "@hostname") + So(json.GetPath("aggs", "2", "date_histogram", "field").MustString(), ShouldEqual, "@timestamp") + }) + }) + }) + + Convey("and adding top level agg with child agg", func() { + aggBuilder := b.Agg() + aggBuilder.Terms("1", "@hostname", func(a *TermsAggregation, ib AggBuilder) { + ib.DateHistogram("2", "@timestamp", nil) + }) + + Convey("When building search request", func() { + sr, err := b.Build() + So(err, ShouldBeNil) + + Convey("Should have 1 top level agg and one child agg", func() { + aggs := sr.Aggs + So(aggs, ShouldHaveLength, 1) + + topAgg := aggs[0] + So(topAgg.Key, ShouldEqual, "1") + So(topAgg.Aggregation.Type, ShouldEqual, "terms") + So(topAgg.Aggregation.Aggs, ShouldHaveLength, 1) + + childAgg := aggs[0].Aggregation.Aggs[0] + So(childAgg.Key, ShouldEqual, "2") + So(childAgg.Aggregation.Type, ShouldEqual, "date_histogram") + }) + + Convey("When marshal to JSON should generate correct json", func() { + body, err := json.Marshal(sr) + So(err, ShouldBeNil) + json, err := simplejson.NewJson([]byte(body)) + So(err, ShouldBeNil) + + So(json.Get("aggs").MustMap(), ShouldHaveLength, 1) + firstLevelAgg := json.GetPath("aggs", "1") + secondLevelAgg := firstLevelAgg.GetPath("aggs", "2") + So(firstLevelAgg.GetPath("terms", "field").MustString(), ShouldEqual, "@hostname") + So(secondLevelAgg.GetPath("date_histogram", "field").MustString(), ShouldEqual, "@timestamp") + }) + }) + }) + + Convey("and adding two top level aggs with child agg", func() { + aggBuilder := b.Agg() + aggBuilder.Histogram("1", "@hostname", func(a *HistogramAgg, ib AggBuilder) { + ib.DateHistogram("2", "@timestamp", nil) + }) + aggBuilder.Filters("3", func(a *FiltersAggregation, ib AggBuilder) { + ib.Terms("4", "@test", nil) + }) + + Convey("When building search request", func() { + sr, err := b.Build() + So(err, ShouldBeNil) + + Convey("Should have 2 top level aggs with one child agg each", func() { + aggs := sr.Aggs + So(aggs, ShouldHaveLength, 2) + + topAggOne := aggs[0] + So(topAggOne.Key, ShouldEqual, "1") + So(topAggOne.Aggregation.Type, ShouldEqual, "histogram") + So(topAggOne.Aggregation.Aggs, ShouldHaveLength, 1) + + topAggOnechildAgg := topAggOne.Aggregation.Aggs[0] + So(topAggOnechildAgg.Key, ShouldEqual, "2") + So(topAggOnechildAgg.Aggregation.Type, ShouldEqual, "date_histogram") + + topAggTwo := aggs[1] + So(topAggTwo.Key, ShouldEqual, "3") + So(topAggTwo.Aggregation.Type, ShouldEqual, "filters") + So(topAggTwo.Aggregation.Aggs, ShouldHaveLength, 1) + + topAggTwochildAgg := topAggTwo.Aggregation.Aggs[0] + So(topAggTwochildAgg.Key, ShouldEqual, "4") + So(topAggTwochildAgg.Aggregation.Type, ShouldEqual, "terms") + }) + + Convey("When marshal to JSON should generate correct json", func() { + body, err := json.Marshal(sr) + So(err, ShouldBeNil) + json, err := simplejson.NewJson([]byte(body)) + So(err, ShouldBeNil) + + topAggOne := json.GetPath("aggs", "1") + So(topAggOne.GetPath("histogram", "field").MustString(), ShouldEqual, "@hostname") + topAggOnechildAgg := topAggOne.GetPath("aggs", "2") + So(topAggOnechildAgg.GetPath("date_histogram", "field").MustString(), ShouldEqual, "@timestamp") + + topAggTwo := json.GetPath("aggs", "3") + topAggTwochildAgg := topAggTwo.GetPath("aggs", "4") + So(topAggTwo.GetPath("filters").MustArray(), ShouldHaveLength, 0) + So(topAggTwochildAgg.GetPath("terms", "field").MustString(), ShouldEqual, "@test") + }) + }) + }) + + Convey("and adding top level agg with child agg with child agg", func() { + aggBuilder := b.Agg() + aggBuilder.Terms("1", "@hostname", func(a *TermsAggregation, ib AggBuilder) { + ib.Terms("2", "@app", func(a *TermsAggregation, ib AggBuilder) { + ib.DateHistogram("3", "@timestamp", nil) + }) + }) + + Convey("When building search request", func() { + sr, err := b.Build() + So(err, ShouldBeNil) + + Convey("Should have 1 top level agg with one child having a child", func() { + aggs := sr.Aggs + So(aggs, ShouldHaveLength, 1) + + topAgg := aggs[0] + So(topAgg.Key, ShouldEqual, "1") + So(topAgg.Aggregation.Type, ShouldEqual, "terms") + So(topAgg.Aggregation.Aggs, ShouldHaveLength, 1) + + childAgg := topAgg.Aggregation.Aggs[0] + So(childAgg.Key, ShouldEqual, "2") + So(childAgg.Aggregation.Type, ShouldEqual, "terms") + + childChildAgg := childAgg.Aggregation.Aggs[0] + So(childChildAgg.Key, ShouldEqual, "3") + So(childChildAgg.Aggregation.Type, ShouldEqual, "date_histogram") + }) + + Convey("When marshal to JSON should generate correct json", func() { + body, err := json.Marshal(sr) + So(err, ShouldBeNil) + json, err := simplejson.NewJson([]byte(body)) + So(err, ShouldBeNil) + + topAgg := json.GetPath("aggs", "1") + So(topAgg.GetPath("terms", "field").MustString(), ShouldEqual, "@hostname") + + childAgg := topAgg.GetPath("aggs", "2") + So(childAgg.GetPath("terms", "field").MustString(), ShouldEqual, "@app") + + childChildAgg := childAgg.GetPath("aggs", "3") + So(childChildAgg.GetPath("date_histogram", "field").MustString(), ShouldEqual, "@timestamp") + }) + }) + }) + + Convey("and adding bucket and metric aggs", func() { + aggBuilder := b.Agg() + aggBuilder.Terms("1", "@hostname", func(a *TermsAggregation, ib AggBuilder) { + ib.Terms("2", "@app", func(a *TermsAggregation, ib AggBuilder) { + ib.Metric("4", "avg", "@value", nil) + ib.DateHistogram("3", "@timestamp", func(a *DateHistogramAgg, ib AggBuilder) { + ib.Metric("4", "avg", "@value", nil) + ib.Metric("5", "max", "@value", nil) + }) + }) + }) + + Convey("When building search request", func() { + sr, err := b.Build() + So(err, ShouldBeNil) + + Convey("Should have 1 top level agg with one child having a child", func() { + aggs := sr.Aggs + So(aggs, ShouldHaveLength, 1) + + topAgg := aggs[0] + So(topAgg.Key, ShouldEqual, "1") + So(topAgg.Aggregation.Type, ShouldEqual, "terms") + So(topAgg.Aggregation.Aggs, ShouldHaveLength, 1) + + childAgg := topAgg.Aggregation.Aggs[0] + So(childAgg.Key, ShouldEqual, "2") + So(childAgg.Aggregation.Type, ShouldEqual, "terms") + + childChildOneAgg := childAgg.Aggregation.Aggs[0] + So(childChildOneAgg.Key, ShouldEqual, "4") + So(childChildOneAgg.Aggregation.Type, ShouldEqual, "avg") + + childChildTwoAgg := childAgg.Aggregation.Aggs[1] + So(childChildTwoAgg.Key, ShouldEqual, "3") + So(childChildTwoAgg.Aggregation.Type, ShouldEqual, "date_histogram") + + childChildTwoChildOneAgg := childChildTwoAgg.Aggregation.Aggs[0] + So(childChildTwoChildOneAgg.Key, ShouldEqual, "4") + So(childChildTwoChildOneAgg.Aggregation.Type, ShouldEqual, "avg") + + childChildTwoChildTwoAgg := childChildTwoAgg.Aggregation.Aggs[1] + So(childChildTwoChildTwoAgg.Key, ShouldEqual, "5") + So(childChildTwoChildTwoAgg.Aggregation.Type, ShouldEqual, "max") + }) + + Convey("When marshal to JSON should generate correct json", func() { + body, err := json.Marshal(sr) + So(err, ShouldBeNil) + json, err := simplejson.NewJson([]byte(body)) + So(err, ShouldBeNil) + + termsAgg := json.GetPath("aggs", "1") + So(termsAgg.GetPath("terms", "field").MustString(), ShouldEqual, "@hostname") + + termsAggTwo := termsAgg.GetPath("aggs", "2") + So(termsAggTwo.GetPath("terms", "field").MustString(), ShouldEqual, "@app") + + termsAggTwoAvg := termsAggTwo.GetPath("aggs", "4") + So(termsAggTwoAvg.GetPath("avg", "field").MustString(), ShouldEqual, "@value") + + dateHistAgg := termsAggTwo.GetPath("aggs", "3") + So(dateHistAgg.GetPath("date_histogram", "field").MustString(), ShouldEqual, "@timestamp") + + avgAgg := dateHistAgg.GetPath("aggs", "4") + So(avgAgg.GetPath("avg", "field").MustString(), ShouldEqual, "@value") + + maxAgg := dateHistAgg.GetPath("aggs", "5") + So(maxAgg.GetPath("max", "field").MustString(), ShouldEqual, "@value") + }) + }) + }) + }) + + Convey("Given new search request builder for es version 2", func() { + b := NewSearchRequestBuilder(2) + + Convey("When adding doc value field", func() { + b.AddDocValueField(timeField) + + Convey("should set correct props", func() { + fields, ok := b.customProps["fields"].([]string) + So(ok, ShouldBeTrue) + So(fields, ShouldHaveLength, 2) + So(fields[0], ShouldEqual, "*") + So(fields[1], ShouldEqual, "_source") + + scriptFields, ok := b.customProps["script_fields"].(map[string]interface{}) + So(ok, ShouldBeTrue) + So(scriptFields, ShouldHaveLength, 0) + + fieldDataFields, ok := b.customProps["fielddata_fields"].([]string) + So(ok, ShouldBeTrue) + So(fieldDataFields, ShouldHaveLength, 1) + So(fieldDataFields[0], ShouldEqual, timeField) + }) + + Convey("When building search request", func() { + sr, err := b.Build() + So(err, ShouldBeNil) + + Convey("When marshal to JSON should generate correct json", func() { + body, err := json.Marshal(sr) + So(err, ShouldBeNil) + json, err := simplejson.NewJson([]byte(body)) + So(err, ShouldBeNil) + + scriptFields, err := json.Get("script_fields").Map() + So(err, ShouldBeNil) + So(scriptFields, ShouldHaveLength, 0) + + fields, err := json.Get("fields").StringArray() + So(err, ShouldBeNil) + So(fields, ShouldHaveLength, 2) + So(fields[0], ShouldEqual, "*") + So(fields[1], ShouldEqual, "_source") + + fieldDataFields, err := json.Get("fielddata_fields").StringArray() + So(err, ShouldBeNil) + So(fieldDataFields, ShouldHaveLength, 1) + So(fieldDataFields[0], ShouldEqual, timeField) + }) + }) + }) + }) + }) +} + +func TestMultiSearchRequest(t *testing.T) { + Convey("Test elasticsearch multi search request", t, func() { + Convey("Given new multi search request builder", func() { + b := NewMultiSearchRequestBuilder(0) + + Convey("When adding one search request", func() { + b.Search() + + Convey("When building search request should contain one search request", func() { + mr, err := b.Build() + So(err, ShouldBeNil) + So(mr.Requests, ShouldHaveLength, 1) + }) + }) + + Convey("When adding two search requests", func() { + b.Search() + b.Search() + + Convey("When building search request should contain two search requests", func() { + mr, err := b.Build() + So(err, ShouldBeNil) + So(mr.Requests, ShouldHaveLength, 2) + }) + }) + }) + }) +}