mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
* convert SQLs to use sdk contracts * make draft * postgres * intermedia * get datasourceinfo filled at the beginning of the service * move the interval into package because of cyclict import and fix all postgres tests * fix mysql test * fix mssql * fix the test for pr https://github.com/grafana/grafana/issues/35839 * fix some issue about intervalv2 package * update sql test * wire migration for SQLs * add sqls to the background process * make it register instead of register and start * revert formatting * fix tests * fix linter * remove integration test * Postgres test fix Co-authored-by: Marcus Efraimsson <marcus.efraimsson@gmail.com>
333 lines
8.8 KiB
Go
333 lines
8.8 KiB
Go
package es
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"net/url"
|
|
"path"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/Masterminds/semver"
|
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
|
sdkhttpclient "github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
|
|
"github.com/grafana/grafana/pkg/components/simplejson"
|
|
"github.com/grafana/grafana/pkg/infra/httpclient"
|
|
"github.com/grafana/grafana/pkg/infra/log"
|
|
"github.com/grafana/grafana/pkg/tsdb/intervalv2"
|
|
|
|
"golang.org/x/net/context/ctxhttp"
|
|
)
|
|
|
|
type DatasourceInfo struct {
|
|
ID int64
|
|
HTTPClientOpts sdkhttpclient.Options
|
|
URL string
|
|
Database string
|
|
ESVersion *semver.Version
|
|
TimeField string
|
|
Interval string
|
|
TimeInterval string
|
|
MaxConcurrentShardRequests int64
|
|
IncludeFrozen bool
|
|
XPack bool
|
|
}
|
|
|
|
const loggerName = "tsdb.elasticsearch.client"
|
|
|
|
var (
|
|
clientLog = log.New(loggerName)
|
|
)
|
|
|
|
var newDatasourceHttpClient = func(httpClientProvider httpclient.Provider, ds *DatasourceInfo) (*http.Client, error) {
|
|
return httpClientProvider.New(ds.HTTPClientOpts)
|
|
}
|
|
|
|
// Client represents a client which can interact with elasticsearch api
|
|
type Client interface {
|
|
GetVersion() *semver.Version
|
|
GetTimeField() string
|
|
GetMinInterval(queryInterval string) (time.Duration, error)
|
|
ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearchResponse, error)
|
|
MultiSearch() *MultiSearchRequestBuilder
|
|
EnableDebug()
|
|
}
|
|
|
|
// NewClient creates a new elasticsearch client
|
|
var NewClient = func(ctx context.Context, httpClientProvider httpclient.Provider, ds *DatasourceInfo, timeRange backend.TimeRange) (Client, error) {
|
|
ip, err := newIndexPattern(ds.Interval, ds.Database)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
indices, err := ip.GetIndices(timeRange)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
clientLog.Info("Creating new client", "version", ds.ESVersion, "timeField", ds.TimeField, "indices", strings.Join(indices, ", "))
|
|
|
|
return &baseClientImpl{
|
|
ctx: ctx,
|
|
httpClientProvider: httpClientProvider,
|
|
ds: ds,
|
|
version: ds.ESVersion,
|
|
timeField: ds.TimeField,
|
|
indices: indices,
|
|
timeRange: timeRange,
|
|
}, nil
|
|
}
|
|
|
|
type baseClientImpl struct {
|
|
ctx context.Context
|
|
httpClientProvider httpclient.Provider
|
|
ds *DatasourceInfo
|
|
version *semver.Version
|
|
timeField string
|
|
indices []string
|
|
timeRange backend.TimeRange
|
|
debugEnabled bool
|
|
}
|
|
|
|
func (c *baseClientImpl) GetVersion() *semver.Version {
|
|
return c.version
|
|
}
|
|
|
|
func (c *baseClientImpl) GetTimeField() string {
|
|
return c.timeField
|
|
}
|
|
|
|
func (c *baseClientImpl) GetMinInterval(queryInterval string) (time.Duration, error) {
|
|
timeInterval := c.ds.TimeInterval
|
|
return intervalv2.GetIntervalFrom(queryInterval, timeInterval, 0, 5*time.Second)
|
|
}
|
|
|
|
type multiRequest struct {
|
|
header map[string]interface{}
|
|
body interface{}
|
|
interval intervalv2.Interval
|
|
}
|
|
|
|
func (c *baseClientImpl) executeBatchRequest(uriPath, uriQuery string, requests []*multiRequest) (*response, error) {
|
|
bytes, err := c.encodeBatchRequests(requests)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return c.executeRequest(http.MethodPost, uriPath, uriQuery, bytes)
|
|
}
|
|
|
|
func (c *baseClientImpl) encodeBatchRequests(requests []*multiRequest) ([]byte, error) {
|
|
clientLog.Debug("Encoding batch requests to json", "batch requests", len(requests))
|
|
start := time.Now()
|
|
|
|
payload := bytes.Buffer{}
|
|
for _, r := range requests {
|
|
reqHeader, err := json.Marshal(r.header)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
payload.WriteString(string(reqHeader) + "\n")
|
|
|
|
reqBody, err := json.Marshal(r.body)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
body := string(reqBody)
|
|
body = strings.ReplaceAll(body, "$__interval_ms", strconv.FormatInt(r.interval.Milliseconds(), 10))
|
|
body = strings.ReplaceAll(body, "$__interval", r.interval.Text)
|
|
|
|
payload.WriteString(body + "\n")
|
|
}
|
|
|
|
elapsed := time.Since(start)
|
|
clientLog.Debug("Encoded batch requests to json", "took", elapsed)
|
|
|
|
return payload.Bytes(), nil
|
|
}
|
|
|
|
func (c *baseClientImpl) executeRequest(method, uriPath, uriQuery string, body []byte) (*response, error) {
|
|
u, err := url.Parse(c.ds.URL)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
u.Path = path.Join(u.Path, uriPath)
|
|
u.RawQuery = uriQuery
|
|
|
|
var req *http.Request
|
|
if method == http.MethodPost {
|
|
req, err = http.NewRequest(http.MethodPost, u.String(), bytes.NewBuffer(body))
|
|
} else {
|
|
req, err = http.NewRequest(http.MethodGet, u.String(), nil)
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
clientLog.Debug("Executing request", "url", req.URL.String(), "method", method)
|
|
|
|
var reqInfo *SearchRequestInfo
|
|
if c.debugEnabled {
|
|
reqInfo = &SearchRequestInfo{
|
|
Method: req.Method,
|
|
Url: req.URL.String(),
|
|
Data: string(body),
|
|
}
|
|
}
|
|
|
|
req.Header.Set("Content-Type", "application/x-ndjson")
|
|
|
|
httpClient, err := newDatasourceHttpClient(c.httpClientProvider, c.ds)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
start := time.Now()
|
|
defer func() {
|
|
elapsed := time.Since(start)
|
|
clientLog.Debug("Executed request", "took", elapsed)
|
|
}()
|
|
//nolint:bodyclose
|
|
resp, err := ctxhttp.Do(c.ctx, httpClient, req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &response{
|
|
httpResponse: resp,
|
|
reqInfo: reqInfo,
|
|
}, nil
|
|
}
|
|
|
|
func (c *baseClientImpl) ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearchResponse, error) {
|
|
clientLog.Debug("Executing multisearch", "search requests", len(r.Requests))
|
|
|
|
multiRequests := c.createMultiSearchRequests(r.Requests)
|
|
queryParams := c.getMultiSearchQueryParameters()
|
|
clientRes, err := c.executeBatchRequest("_msearch", queryParams, multiRequests)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
res := clientRes.httpResponse
|
|
defer func() {
|
|
if err := res.Body.Close(); err != nil {
|
|
clientLog.Warn("Failed to close response body", "err", err)
|
|
}
|
|
}()
|
|
|
|
clientLog.Debug("Received multisearch response", "code", res.StatusCode, "status", res.Status, "content-length", res.ContentLength)
|
|
|
|
start := time.Now()
|
|
clientLog.Debug("Decoding multisearch json response")
|
|
|
|
var bodyBytes []byte
|
|
if c.debugEnabled {
|
|
tmpBytes, err := ioutil.ReadAll(res.Body)
|
|
if err != nil {
|
|
clientLog.Error("failed to read http response bytes", "error", err)
|
|
} else {
|
|
bodyBytes = make([]byte, len(tmpBytes))
|
|
copy(bodyBytes, tmpBytes)
|
|
res.Body = ioutil.NopCloser(bytes.NewBuffer(tmpBytes))
|
|
}
|
|
}
|
|
|
|
var msr MultiSearchResponse
|
|
dec := json.NewDecoder(res.Body)
|
|
err = dec.Decode(&msr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
elapsed := time.Since(start)
|
|
clientLog.Debug("Decoded multisearch json response", "took", elapsed)
|
|
|
|
msr.Status = res.StatusCode
|
|
|
|
if c.debugEnabled {
|
|
bodyJSON, err := simplejson.NewFromReader(bytes.NewBuffer(bodyBytes))
|
|
var data *simplejson.Json
|
|
if err != nil {
|
|
clientLog.Error("failed to decode http response into json", "error", err)
|
|
} else {
|
|
data = bodyJSON
|
|
}
|
|
|
|
msr.DebugInfo = &SearchDebugInfo{
|
|
Request: clientRes.reqInfo,
|
|
Response: &SearchResponseInfo{
|
|
Status: res.StatusCode,
|
|
Data: data,
|
|
},
|
|
}
|
|
}
|
|
|
|
return &msr, nil
|
|
}
|
|
|
|
func (c *baseClientImpl) createMultiSearchRequests(searchRequests []*SearchRequest) []*multiRequest {
|
|
multiRequests := []*multiRequest{}
|
|
|
|
for _, searchReq := range searchRequests {
|
|
mr := multiRequest{
|
|
header: map[string]interface{}{
|
|
"search_type": "query_then_fetch",
|
|
"ignore_unavailable": true,
|
|
"index": strings.Join(c.indices, ","),
|
|
},
|
|
body: searchReq,
|
|
interval: searchReq.Interval,
|
|
}
|
|
|
|
if c.version.Major() < 5 {
|
|
mr.header["search_type"] = "count"
|
|
} else {
|
|
allowedVersionRange, _ := semver.NewConstraint(">=5.6.0, <7.0.0")
|
|
|
|
if allowedVersionRange.Check(c.version) {
|
|
maxConcurrentShardRequests := c.ds.MaxConcurrentShardRequests
|
|
if maxConcurrentShardRequests == 0 {
|
|
maxConcurrentShardRequests = 256
|
|
}
|
|
mr.header["max_concurrent_shard_requests"] = maxConcurrentShardRequests
|
|
}
|
|
}
|
|
|
|
multiRequests = append(multiRequests, &mr)
|
|
}
|
|
|
|
return multiRequests
|
|
}
|
|
|
|
func (c *baseClientImpl) getMultiSearchQueryParameters() string {
|
|
var qs []string
|
|
|
|
if c.version.Major() >= 7 {
|
|
maxConcurrentShardRequests := c.ds.MaxConcurrentShardRequests
|
|
if maxConcurrentShardRequests == 0 {
|
|
maxConcurrentShardRequests = 5
|
|
}
|
|
qs = append(qs, fmt.Sprintf("max_concurrent_shard_requests=%d", maxConcurrentShardRequests))
|
|
}
|
|
|
|
allowedFrozenIndicesVersionRange, _ := semver.NewConstraint(">=6.6.0")
|
|
|
|
if (allowedFrozenIndicesVersionRange.Check(c.version)) && c.ds.IncludeFrozen && c.ds.XPack {
|
|
qs = append(qs, "ignore_throttled=false")
|
|
}
|
|
|
|
return strings.Join(qs, "&")
|
|
}
|
|
|
|
func (c *baseClientImpl) MultiSearch() *MultiSearchRequestBuilder {
|
|
return NewMultiSearchRequestBuilder(c.GetVersion())
|
|
}
|
|
|
|
func (c *baseClientImpl) EnableDebug() {
|
|
c.debugEnabled = true
|
|
}
|