mirror of
https://github.com/grafana/grafana.git
synced 2025-02-14 09:33:34 -06:00
130 lines
2.8 KiB
Go
130 lines
2.8 KiB
Go
package mqe
|
|
|
|
import (
|
|
"context"
|
|
"net/http"
|
|
"net/url"
|
|
"path"
|
|
"strings"
|
|
|
|
"github.com/grafana/grafana/pkg/components/simplejson"
|
|
"github.com/grafana/grafana/pkg/log"
|
|
"github.com/grafana/grafana/pkg/models"
|
|
"github.com/grafana/grafana/pkg/setting"
|
|
"github.com/grafana/grafana/pkg/tsdb"
|
|
|
|
"golang.org/x/net/context/ctxhttp"
|
|
)
|
|
|
|
var (
|
|
MaxWorker int = 4
|
|
)
|
|
|
|
type apiClient struct {
|
|
*models.DataSource
|
|
log log.Logger
|
|
httpClient *http.Client
|
|
responseParser *ResponseParser
|
|
}
|
|
|
|
func NewApiClient(httpClient *http.Client, datasource *models.DataSource) *apiClient {
|
|
return &apiClient{
|
|
DataSource: datasource,
|
|
log: log.New("tsdb.mqe"),
|
|
httpClient: httpClient,
|
|
responseParser: NewResponseParser(),
|
|
}
|
|
}
|
|
|
|
func (e *apiClient) PerformRequests(ctx context.Context, queries []QueryToSend) (*tsdb.QueryResult, error) {
|
|
queryResult := &tsdb.QueryResult{}
|
|
|
|
queryCount := len(queries)
|
|
jobsChan := make(chan QueryToSend, queryCount)
|
|
resultChan := make(chan []*tsdb.TimeSeries, queryCount)
|
|
errorsChan := make(chan error, 1)
|
|
for w := 1; w <= MaxWorker; w++ {
|
|
go e.spawnWorker(ctx, w, jobsChan, resultChan, errorsChan)
|
|
}
|
|
|
|
for _, v := range queries {
|
|
jobsChan <- v
|
|
}
|
|
close(jobsChan)
|
|
|
|
resultCounter := 0
|
|
for {
|
|
select {
|
|
case timeseries := <-resultChan:
|
|
queryResult.Series = append(queryResult.Series, timeseries...)
|
|
resultCounter++
|
|
|
|
if resultCounter == queryCount {
|
|
close(resultChan)
|
|
return queryResult, nil
|
|
}
|
|
case err := <-errorsChan:
|
|
return nil, err
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *apiClient) spawnWorker(ctx context.Context, id int, jobs chan QueryToSend, results chan []*tsdb.TimeSeries, errors chan error) {
|
|
e.log.Debug("Spawning worker", "id", id)
|
|
for query := range jobs {
|
|
if setting.Env == setting.DEV {
|
|
e.log.Debug("Sending request", "query", query.RawQuery)
|
|
}
|
|
|
|
req, err := e.createRequest(query.RawQuery)
|
|
|
|
resp, err := ctxhttp.Do(ctx, e.httpClient, req)
|
|
if err != nil {
|
|
errors <- err
|
|
return
|
|
}
|
|
|
|
series, err := e.responseParser.Parse(resp, query)
|
|
if err != nil {
|
|
errors <- err
|
|
return
|
|
}
|
|
|
|
results <- series
|
|
}
|
|
e.log.Debug("Worker is complete", "id", id)
|
|
}
|
|
|
|
func (e *apiClient) createRequest(query string) (*http.Request, error) {
|
|
u, err := url.Parse(e.Url)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
u.Path = path.Join(u.Path, "query")
|
|
|
|
payload := simplejson.New()
|
|
payload.Set("query", query)
|
|
|
|
jsonPayload, err := payload.MarshalJSON()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
req, err := http.NewRequest(http.MethodPost, u.String(), strings.NewReader(string(jsonPayload)))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
req.Header.Set("User-Agent", "Grafana")
|
|
req.Header.Set("Content-Type", "application/json")
|
|
|
|
if e.BasicAuth {
|
|
req.SetBasicAuth(e.BasicAuthUser, e.BasicAuthPassword)
|
|
}
|
|
|
|
return req, nil
|
|
}
|