mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
* convert SQLs to use sdk contracts * make draft * postgres * intermedia * get datasourceinfo filled at the beginning of the service * move the interval into package because of cyclict import and fix all postgres tests * fix mysql test * fix mssql * fix the test for pr https://github.com/grafana/grafana/issues/35839 * fix some issue about intervalv2 package * update sql test * wire migration for SQLs * add sqls to the background process * make it register instead of register and start * revert formatting * fix tests * fix linter * remove integration test * Postgres test fix Co-authored-by: Marcus Efraimsson <marcus.efraimsson@gmail.com>
391 lines
12 KiB
Go
391 lines
12 KiB
Go
package es
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/Masterminds/semver"
|
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
|
"github.com/grafana/grafana/pkg/components/simplejson"
|
|
"github.com/grafana/grafana/pkg/infra/httpclient"
|
|
"github.com/grafana/grafana/pkg/tsdb/intervalv2"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestNewClient(t *testing.T) {
|
|
t.Run("When using legacy version numbers", func(t *testing.T) {
|
|
t.Run("When version 2 should return v2 client", func(t *testing.T) {
|
|
version, err := semver.NewVersion("2.0.0")
|
|
require.NoError(t, err)
|
|
ds := &DatasourceInfo{
|
|
ESVersion: version,
|
|
TimeField: "@timestamp",
|
|
}
|
|
|
|
c, err := NewClient(context.Background(), httpclient.NewProvider(), ds, backend.TimeRange{})
|
|
require.NoError(t, err)
|
|
assert.Equal(t, "2.0.0", c.GetVersion().String())
|
|
})
|
|
|
|
t.Run("When version 5 should return v5 client", func(t *testing.T) {
|
|
version, err := semver.NewVersion("5.0.0")
|
|
require.NoError(t, err)
|
|
ds := &DatasourceInfo{
|
|
ESVersion: version,
|
|
TimeField: "@timestamp",
|
|
}
|
|
|
|
c, err := NewClient(context.Background(), httpclient.NewProvider(), ds, backend.TimeRange{})
|
|
require.NoError(t, err)
|
|
assert.Equal(t, "5.0.0", c.GetVersion().String())
|
|
})
|
|
|
|
t.Run("When version 56 should return v5.6 client", func(t *testing.T) {
|
|
version, err := semver.NewVersion("5.6.0")
|
|
require.NoError(t, err)
|
|
ds := &DatasourceInfo{
|
|
ESVersion: version,
|
|
TimeField: "@timestamp",
|
|
}
|
|
|
|
c, err := NewClient(context.Background(), httpclient.NewProvider(), ds, backend.TimeRange{})
|
|
require.NoError(t, err)
|
|
assert.Equal(t, "5.6.0", c.GetVersion().String())
|
|
})
|
|
|
|
t.Run("When version 60 should return v6.0 client", func(t *testing.T) {
|
|
version, err := semver.NewVersion("6.0.0")
|
|
require.NoError(t, err)
|
|
ds := &DatasourceInfo{
|
|
ESVersion: version,
|
|
TimeField: "@timestamp",
|
|
}
|
|
|
|
c, err := NewClient(context.Background(), httpclient.NewProvider(), ds, backend.TimeRange{})
|
|
require.NoError(t, err)
|
|
assert.Equal(t, "6.0.0", c.GetVersion().String())
|
|
})
|
|
|
|
t.Run("When version 70 should return v7.0 client", func(t *testing.T) {
|
|
version, err := semver.NewVersion("7.0.0")
|
|
require.NoError(t, err)
|
|
ds := &DatasourceInfo{
|
|
ESVersion: version,
|
|
TimeField: "@timestamp",
|
|
}
|
|
|
|
c, err := NewClient(context.Background(), httpclient.NewProvider(), ds, backend.TimeRange{})
|
|
require.NoError(t, err)
|
|
assert.Equal(t, "7.0.0", c.GetVersion().String())
|
|
})
|
|
})
|
|
|
|
t.Run("When version is a valid semver string should create a client", func(t *testing.T) {
|
|
version, err := semver.NewVersion("7.2.4")
|
|
require.NoError(t, err)
|
|
ds := &DatasourceInfo{
|
|
ESVersion: version,
|
|
TimeField: "@timestamp",
|
|
}
|
|
|
|
c, err := NewClient(context.Background(), httpclient.NewProvider(), ds, backend.TimeRange{})
|
|
require.NoError(t, err)
|
|
assert.Equal(t, version.String(), c.GetVersion().String())
|
|
})
|
|
}
|
|
|
|
func TestClient_ExecuteMultisearch(t *testing.T) {
|
|
version, err := semver.NewVersion("2.0.0")
|
|
require.NoError(t, err)
|
|
httpClientScenario(t, "Given a fake http client and a v2.x client with response", &DatasourceInfo{
|
|
Database: "[metrics-]YYYY.MM.DD",
|
|
ESVersion: version,
|
|
TimeField: "@timestamp",
|
|
Interval: "Daily",
|
|
}, func(sc *scenarioContext) {
|
|
sc.responseBody = `{
|
|
"responses": [
|
|
{
|
|
"hits": { "hits": [], "max_score": 0, "total": 4656 },
|
|
"status": 200
|
|
}
|
|
]
|
|
}`
|
|
|
|
ms, err := createMultisearchForTest(t, sc.client)
|
|
require.NoError(t, err)
|
|
res, err := sc.client.ExecuteMultisearch(ms)
|
|
require.NoError(t, err)
|
|
|
|
require.NotNil(t, sc.request)
|
|
assert.Equal(t, http.MethodPost, sc.request.Method)
|
|
assert.Equal(t, "/_msearch", sc.request.URL.Path)
|
|
|
|
require.NotNil(t, sc.requestBody)
|
|
headerBytes, err := sc.requestBody.ReadBytes('\n')
|
|
require.NoError(t, err)
|
|
bodyBytes := sc.requestBody.Bytes()
|
|
|
|
jHeader, err := simplejson.NewJson(headerBytes)
|
|
require.NoError(t, err)
|
|
|
|
jBody, err := simplejson.NewJson(bodyBytes)
|
|
require.NoError(t, err)
|
|
|
|
assert.Equal(t, "metrics-2018.05.15", jHeader.Get("index").MustString())
|
|
assert.True(t, jHeader.Get("ignore_unavailable").MustBool(false))
|
|
assert.Equal(t, "count", jHeader.Get("search_type").MustString())
|
|
assert.Empty(t, jHeader.Get("max_concurrent_shard_requests"))
|
|
|
|
assert.Equal(t, "15000*@hostname", jBody.GetPath("aggs", "2", "aggs", "1", "avg", "script").MustString())
|
|
|
|
assert.Equal(t, "15s", jBody.GetPath("aggs", "2", "date_histogram", "interval").MustString())
|
|
|
|
assert.Equal(t, 200, res.Status)
|
|
require.Len(t, res.Responses, 1)
|
|
})
|
|
|
|
version, err = semver.NewVersion("5.0.0")
|
|
require.NoError(t, err)
|
|
httpClientScenario(t, "Given a fake http client and a v5.x client with response", &DatasourceInfo{
|
|
Database: "[metrics-]YYYY.MM.DD",
|
|
ESVersion: version,
|
|
TimeField: "@timestamp",
|
|
Interval: "Daily",
|
|
MaxConcurrentShardRequests: 100,
|
|
}, func(sc *scenarioContext) {
|
|
sc.responseBody = `{
|
|
"responses": [
|
|
{
|
|
"hits": { "hits": [], "max_score": 0, "total": 4656 },
|
|
"status": 200
|
|
}
|
|
]
|
|
}`
|
|
|
|
ms, err := createMultisearchForTest(t, sc.client)
|
|
require.NoError(t, err)
|
|
res, err := sc.client.ExecuteMultisearch(ms)
|
|
require.NoError(t, err)
|
|
|
|
require.NotNil(t, sc.request)
|
|
assert.Equal(t, http.MethodPost, sc.request.Method)
|
|
assert.Equal(t, "/_msearch", sc.request.URL.Path)
|
|
|
|
require.NotNil(t, sc.requestBody)
|
|
|
|
headerBytes, err := sc.requestBody.ReadBytes('\n')
|
|
require.NoError(t, err)
|
|
bodyBytes := sc.requestBody.Bytes()
|
|
|
|
jHeader, err := simplejson.NewJson(headerBytes)
|
|
require.NoError(t, err)
|
|
|
|
jBody, err := simplejson.NewJson(bodyBytes)
|
|
require.NoError(t, err)
|
|
|
|
assert.Equal(t, "metrics-2018.05.15", jHeader.Get("index").MustString())
|
|
assert.True(t, jHeader.Get("ignore_unavailable").MustBool(false))
|
|
assert.Equal(t, "query_then_fetch", jHeader.Get("search_type").MustString())
|
|
assert.Empty(t, jHeader.Get("max_concurrent_shard_requests"))
|
|
|
|
assert.Equal(t, "15000*@hostname", jBody.GetPath("aggs", "2", "aggs", "1", "avg", "script").MustString())
|
|
|
|
assert.Equal(t, "15s", jBody.GetPath("aggs", "2", "date_histogram", "interval").MustString())
|
|
|
|
assert.Equal(t, 200, res.Status)
|
|
require.Len(t, res.Responses, 1)
|
|
})
|
|
|
|
version, err = semver.NewVersion("5.6.0")
|
|
require.NoError(t, err)
|
|
httpClientScenario(t, "Given a fake http client and a v5.6 client with response", &DatasourceInfo{
|
|
Database: "[metrics-]YYYY.MM.DD",
|
|
ESVersion: version,
|
|
TimeField: "@timestamp",
|
|
Interval: "Daily",
|
|
MaxConcurrentShardRequests: 100,
|
|
IncludeFrozen: true,
|
|
XPack: true,
|
|
}, func(sc *scenarioContext) {
|
|
sc.responseBody = `{
|
|
"responses": [
|
|
{
|
|
"hits": { "hits": [], "max_score": 0, "total": 4656 },
|
|
"status": 200
|
|
}
|
|
]
|
|
}`
|
|
|
|
ms, err := createMultisearchForTest(t, sc.client)
|
|
require.NoError(t, err)
|
|
res, err := sc.client.ExecuteMultisearch(ms)
|
|
require.NoError(t, err)
|
|
|
|
require.NotNil(t, sc.request)
|
|
assert.Equal(t, http.MethodPost, sc.request.Method)
|
|
assert.Equal(t, "/_msearch", sc.request.URL.Path)
|
|
assert.NotContains(t, sc.request.URL.RawQuery, "ignore_throttled=")
|
|
|
|
require.NotNil(t, sc.requestBody)
|
|
|
|
headerBytes, err := sc.requestBody.ReadBytes('\n')
|
|
require.NoError(t, err)
|
|
bodyBytes := sc.requestBody.Bytes()
|
|
|
|
jHeader, err := simplejson.NewJson(headerBytes)
|
|
require.NoError(t, err)
|
|
|
|
jBody, err := simplejson.NewJson(bodyBytes)
|
|
require.NoError(t, err)
|
|
|
|
assert.Equal(t, "metrics-2018.05.15", jHeader.Get("index").MustString())
|
|
assert.True(t, jHeader.Get("ignore_unavailable").MustBool(false))
|
|
assert.Equal(t, "query_then_fetch", jHeader.Get("search_type").MustString())
|
|
assert.Equal(t, 100, jHeader.Get("max_concurrent_shard_requests").MustInt())
|
|
|
|
assert.Equal(t, "15000*@hostname", jBody.GetPath("aggs", "2", "aggs", "1", "avg", "script").MustString())
|
|
|
|
assert.Equal(t, "15s", jBody.GetPath("aggs", "2", "date_histogram", "interval").MustString())
|
|
|
|
assert.Equal(t, 200, res.Status)
|
|
require.Len(t, res.Responses, 1)
|
|
})
|
|
|
|
version, err = semver.NewVersion("7.0.0")
|
|
require.NoError(t, err)
|
|
httpClientScenario(t, "Given a fake http client and a v7.0 client with response", &DatasourceInfo{
|
|
Database: "[metrics-]YYYY.MM.DD",
|
|
ESVersion: version,
|
|
TimeField: "@timestamp",
|
|
Interval: "Daily",
|
|
MaxConcurrentShardRequests: 6,
|
|
IncludeFrozen: true,
|
|
XPack: true,
|
|
}, func(sc *scenarioContext) {
|
|
sc.responseBody = `{
|
|
"responses": [
|
|
{
|
|
"hits": { "hits": [], "max_score": 0, "total": { "value": 4656, "relation": "eq"} },
|
|
"status": 200
|
|
}
|
|
]
|
|
}`
|
|
|
|
ms, err := createMultisearchForTest(t, sc.client)
|
|
require.NoError(t, err)
|
|
res, err := sc.client.ExecuteMultisearch(ms)
|
|
require.NoError(t, err)
|
|
|
|
require.NotNil(t, sc.request)
|
|
assert.Equal(t, http.MethodPost, sc.request.Method)
|
|
assert.Equal(t, "/_msearch", sc.request.URL.Path)
|
|
assert.Equal(t, "max_concurrent_shard_requests=6&ignore_throttled=false", sc.request.URL.RawQuery)
|
|
|
|
require.NotNil(t, sc.requestBody)
|
|
|
|
headerBytes, err := sc.requestBody.ReadBytes('\n')
|
|
require.NoError(t, err)
|
|
bodyBytes := sc.requestBody.Bytes()
|
|
|
|
jHeader, err := simplejson.NewJson(headerBytes)
|
|
require.NoError(t, err)
|
|
|
|
jBody, err := simplejson.NewJson(bodyBytes)
|
|
require.NoError(t, err)
|
|
|
|
assert.Equal(t, "metrics-2018.05.15", jHeader.Get("index").MustString())
|
|
assert.True(t, jHeader.Get("ignore_unavailable").MustBool(false))
|
|
assert.Equal(t, "query_then_fetch", jHeader.Get("search_type").MustString())
|
|
assert.Empty(t, jHeader.Get("max_concurrent_shard_requests"))
|
|
assert.False(t, jHeader.Get("ignore_throttled").MustBool())
|
|
|
|
assert.Equal(t, "15000*@hostname", jBody.GetPath("aggs", "2", "aggs", "1", "avg", "script").MustString())
|
|
|
|
assert.Equal(t, "15s", jBody.GetPath("aggs", "2", "date_histogram", "interval").MustString())
|
|
|
|
assert.Equal(t, 200, res.Status)
|
|
require.Len(t, res.Responses, 1)
|
|
})
|
|
}
|
|
|
|
func createMultisearchForTest(t *testing.T, c Client) (*MultiSearchRequest, error) {
|
|
t.Helper()
|
|
|
|
msb := c.MultiSearch()
|
|
s := msb.Search(intervalv2.Interval{Value: 15 * time.Second, Text: "15s"})
|
|
s.Agg().DateHistogram("2", "@timestamp", func(a *DateHistogramAgg, ab AggBuilder) {
|
|
a.Interval = "$__interval"
|
|
|
|
ab.Metric("1", "avg", "@hostname", func(a *MetricAggregation) {
|
|
a.Settings["script"] = "$__interval_ms*@hostname"
|
|
})
|
|
})
|
|
return msb.Build()
|
|
}
|
|
|
|
type scenarioContext struct {
|
|
client Client
|
|
request *http.Request
|
|
requestBody *bytes.Buffer
|
|
responseStatus int
|
|
responseBody string
|
|
}
|
|
|
|
type scenarioFunc func(*scenarioContext)
|
|
|
|
func httpClientScenario(t *testing.T, desc string, ds *DatasourceInfo, fn scenarioFunc) {
|
|
t.Helper()
|
|
|
|
t.Run(desc, func(t *testing.T) {
|
|
sc := &scenarioContext{
|
|
responseStatus: 200,
|
|
responseBody: `{ "responses": [] }`,
|
|
}
|
|
ts := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
|
|
sc.request = r
|
|
buf, err := ioutil.ReadAll(r.Body)
|
|
require.NoError(t, err)
|
|
|
|
sc.requestBody = bytes.NewBuffer(buf)
|
|
|
|
rw.Header().Set("Content-Type", "application/x-ndjson")
|
|
_, err = rw.Write([]byte(sc.responseBody))
|
|
require.NoError(t, err)
|
|
rw.WriteHeader(sc.responseStatus)
|
|
}))
|
|
ds.URL = ts.URL
|
|
|
|
from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC)
|
|
to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC)
|
|
timeRange := backend.TimeRange{
|
|
From: from,
|
|
To: to,
|
|
}
|
|
|
|
c, err := NewClient(context.Background(), httpclient.NewProvider(), ds, timeRange)
|
|
require.NoError(t, err)
|
|
require.NotNil(t, c)
|
|
sc.client = c
|
|
|
|
currentNewDatasourceHTTPClient := newDatasourceHttpClient
|
|
|
|
newDatasourceHttpClient = func(httpClientProvider httpclient.Provider, ds *DatasourceInfo) (*http.Client, error) {
|
|
return ts.Client(), nil
|
|
}
|
|
|
|
t.Cleanup(func() {
|
|
ts.Close()
|
|
newDatasourceHttpClient = currentNewDatasourceHTTPClient
|
|
})
|
|
|
|
fn(sc)
|
|
})
|
|
}
|