feat(mqe): add token cache

This commit is contained in:
bergquist
2016-12-13 13:47:37 +01:00
parent 69d6316512
commit 362162d6fa
14 changed files with 1537 additions and 53 deletions

View File

@@ -6,14 +6,14 @@ import (
"github.com/grafana/grafana/pkg/tsdb"
)
func NewQueryParser() *MQEQueryParser {
return &MQEQueryParser{}
func NewQueryParser() *QueryParser {
return &QueryParser{}
}
type MQEQueryParser struct{}
type QueryParser struct{}
func (qp *MQEQueryParser) Parse(model *simplejson.Json, dsInfo *models.DataSource, queryContext *tsdb.QueryContext) (*MQEQuery, error) {
query := &MQEQuery{TimeRange: queryContext.TimeRange}
func (qp *QueryParser) Parse(model *simplejson.Json, dsInfo *models.DataSource, queryContext *tsdb.QueryContext) (*Query, error) {
query := &Query{TimeRange: queryContext.TimeRange}
query.AddAppToAlias = model.Get("addAppToAlias").MustBool(false)
query.AddHostToAlias = model.Get("addHostToAlias").MustBool(false)
query.UseRawQuery = model.Get("rawQuery").MustBool(false)
@@ -22,11 +22,11 @@ func (qp *MQEQueryParser) Parse(model *simplejson.Json, dsInfo *models.DataSourc
query.Apps = model.Get("apps").MustStringArray([]string{})
query.Hosts = model.Get("hosts").MustStringArray([]string{})
var metrics []MQEMetric
var metrics []Metric
var err error
for _, metricsObj := range model.Get("metrics").MustArray() {
metricJson := simplejson.NewFromAny(metricsObj)
var m MQEMetric
var m Metric
m.Alias = metricJson.Get("alias").MustString("")
m.Metric, err = metricJson.Get("metric").String()

View File

@@ -11,7 +11,7 @@ import (
func TestMQEQueryParser(t *testing.T) {
Convey("MQE query parser", t, func() {
parser := &MQEQueryParser{}
parser := &QueryParser{}
dsInfo := &models.DataSource{JsonData: simplejson.New()}
queryContext := &tsdb.QueryContext{}

View File

@@ -12,19 +12,20 @@ import (
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb"
)
/*
TODO:
* response serie names with wildcards
* real caching
* performance. outgoing requests in pararell.
* frontend plugin. targetContainsTemplates
*/
type MQEExecutor struct {
*models.DataSource
queryParser *MQEQueryParser
responseParser *MQEResponseParser
queryParser *QueryParser
responseParser *ResponseParser
httpClient *http.Client
log log.Logger
tokenClient *TokenClient
@@ -52,7 +53,7 @@ func init() {
type QueryToSend struct {
RawQuery string
QueryRef *MQEQuery
QueryRef *Query
}
func (e *MQEExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, queryContext *tsdb.QueryContext) *tsdb.BatchResult {
@@ -63,7 +64,7 @@ func (e *MQEExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, quer
return result.WithError(err)
}
var mqeQueries []*MQEQuery
var mqeQueries []*Query
for _, v := range queries {
q, err := e.queryParser.Parse(v.Model, e.DataSource, queryContext)
if err != nil {
@@ -82,9 +83,13 @@ func (e *MQEExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, quer
rawQueries = append(rawQueries, queries...)
}
e.log.Debug("Sending request", "url", e.DataSource.Url)
queryResult := &tsdb.QueryResult{}
for _, v := range rawQueries {
e.log.Info("Mqe executor", "query", v)
if setting.Env == setting.DEV {
e.log.Debug("Executing", "query", v)
}
req, err := e.createRequest(v.RawQuery)
@@ -108,7 +113,11 @@ func (e *MQEExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, quer
}
func (e *MQEExecutor) createRequest(query string) (*http.Request, error) {
u, _ := url.Parse(e.Url)
u, err := url.Parse(e.Url)
if err != nil {
return nil, err
}
u.Path = path.Join(u.Path, "query")
payload := simplejson.New()
@@ -131,6 +140,5 @@ func (e *MQEExecutor) createRequest(query string) (*http.Request, error) {
req.SetBasicAuth(e.BasicAuthUser, e.BasicAuthPassword)
}
e.log.Debug("Mqe request", "url", req.URL.String())
return req, nil
}

View File

@@ -13,8 +13,8 @@ import (
"github.com/grafana/grafana/pkg/tsdb"
)
func NewResponseParser() *MQEResponseParser {
return &MQEResponseParser{
func NewResponseParser() *ResponseParser {
return &ResponseParser{
log: log.New("tsdb.mqe"),
}
}
@@ -44,11 +44,11 @@ type MQESerie struct {
Tagset map[string]string `json:"tagset"`
}
type MQEResponseParser struct {
type ResponseParser struct {
log log.Logger
}
func (parser *MQEResponseParser) Parse(res *http.Response, queryRef *MQEQuery) (*tsdb.QueryResult, error) {
func (parser *ResponseParser) Parse(res *http.Response, queryRef *Query) (*tsdb.QueryResult, error) {
body, err := ioutil.ReadAll(res.Body)
defer res.Body.Close()
if err != nil {
@@ -63,12 +63,12 @@ func (parser *MQEResponseParser) Parse(res *http.Response, queryRef *MQEQuery) (
var data *MQEResponse = &MQEResponse{}
err = json.Unmarshal(body, data)
if err != nil {
parser.log.Info("Failed to unmarshal mqe response", "error", err, "status", res.Status, "body", string(body))
parser.log.Info("Failed to unmarshal response", "error", err, "status", res.Status, "body", string(body))
return nil, err
}
if !data.Success {
return nil, fmt.Errorf("MQE request failed.")
return nil, fmt.Errorf("Request failed.")
}
var series tsdb.TimeSeriesSlice

View File

@@ -20,7 +20,7 @@ func TestMQEResponseParser(t *testing.T) {
parser := NewResponseParser()
Convey("Can parse response", func() {
queryRef := &MQEQuery{
queryRef := &Query{
AddAppToAlias: true,
AddHostToAlias: true,
}

View File

@@ -8,15 +8,25 @@ import (
"net/http"
"net/url"
"path"
"time"
"golang.org/x/net/context/ctxhttp"
"strconv"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/models"
"github.com/patrickmn/go-cache"
)
var tokenCache *cache.Cache
func init() {
tokenCache = cache.New(5*time.Minute, 30*time.Second)
}
type TokenClient struct {
tlog log.Logger
log log.Logger
Datasource *models.DataSource
HttpClient *http.Client
}
@@ -25,27 +35,30 @@ func NewTokenClient(datasource *models.DataSource) *TokenClient {
httpClient, _ := datasource.GetHttpClient()
return &TokenClient{
tlog: log.New("tsdb.mqe.tokenclient"),
log: log.New("tsdb.mqe.tokenclient"),
Datasource: datasource,
HttpClient: httpClient,
}
}
var cache map[int64]*TokenBody = map[int64]*TokenBody{}
//Replace this stupid cache with internal cache from grafana master before merging
func (client *TokenClient) GetTokenData(ctx context.Context) (*TokenBody, error) {
_, excist := cache[client.Datasource.Id]
if !excist {
b, err := client.RequestTokenData(ctx)
if err != nil {
return nil, err
}
key := strconv.FormatInt(client.Datasource.Id, 10)
cache[client.Datasource.Id] = b
item, found := tokenCache.Get(key)
if found {
if result, ok := item.(*TokenBody); ok {
return result, nil
}
}
return cache[client.Datasource.Id], nil
b, err := client.RequestTokenData(ctx)
if err != nil {
return nil, err
}
tokenCache.Set(key, b, cache.DefaultExpiration)
return b, nil
}
func (client *TokenClient) RequestTokenData(ctx context.Context) (*TokenBody, error) {
@@ -54,7 +67,7 @@ func (client *TokenClient) RequestTokenData(ctx context.Context) (*TokenBody, er
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
if err != nil {
client.tlog.Info("Failed to create request", "error", err)
client.log.Info("Failed to create request", "error", err)
}
res, err := ctxhttp.Do(ctx, client.HttpClient, req)
@@ -69,14 +82,14 @@ func (client *TokenClient) RequestTokenData(ctx context.Context) (*TokenBody, er
}
if res.StatusCode/100 != 2 {
client.tlog.Info("Request failed", "status", res.Status, "body", string(body))
client.log.Info("Request failed", "status", res.Status, "body", string(body))
return nil, fmt.Errorf("Request failed status: %v", res.Status)
}
var result *TokenResponse
err = json.Unmarshal(body, &result)
if err != nil {
client.tlog.Info("Failed to unmarshal graphite response", "error", err, "status", res.Status, "body", string(body))
client.log.Info("Failed to unmarshal response", "error", err, "status", res.Status, "body", string(body))
return nil, err
}

View File

@@ -11,13 +11,13 @@ import (
"github.com/grafana/grafana/pkg/tsdb"
)
type MQEMetric struct {
type Metric struct {
Metric string
Alias string
}
type MQEQuery struct {
Metrics []MQEMetric
type Query struct {
Metrics []Metric
Hosts []string
Apps []string
AddAppToAlias bool
@@ -32,7 +32,7 @@ var (
containsWildcardPattern *regexp.Regexp = regexp.MustCompile(`\*`)
)
func (q *MQEQuery) Build(availableSeries []string) ([]QueryToSend, error) {
func (q *Query) Build(availableSeries []string) ([]QueryToSend, error) {
var queriesToSend []QueryToSend
where := q.buildWhereClause()
@@ -90,7 +90,7 @@ func (q *MQEQuery) Build(availableSeries []string) ([]QueryToSend, error) {
return queriesToSend, nil
}
func (q *MQEQuery) buildWhereClause() string {
func (q *Query) buildWhereClause() string {
hasApps := len(q.Apps) > 0
hasHosts := len(q.Hosts) > 0

View File

@@ -25,11 +25,11 @@ func TestWildcardExpansion(t *testing.T) {
Convey("Can expanding query", t, func() {
Convey("Without wildcard series", func() {
query := &MQEQuery{
Metrics: []MQEMetric{
MQEMetric{Metric: "os.cpu.3.idle", Alias: ""},
MQEMetric{Metric: "os.cpu.2.idle", Alias: ""},
MQEMetric{Metric: "os.cpu.1.idle", Alias: "cpu"},
query := &Query{
Metrics: []Metric{
Metric{Metric: "os.cpu.3.idle", Alias: ""},
Metric{Metric: "os.cpu.2.idle", Alias: ""},
Metric{Metric: "os.cpu.1.idle", Alias: "cpu"},
},
Hosts: []string{"staples-lab-1", "staples-lab-2"},
Apps: []string{"demoapp-1", "demoapp-2"},
@@ -47,9 +47,9 @@ func TestWildcardExpansion(t *testing.T) {
})
Convey("Containg wildcard series", func() {
query := &MQEQuery{
Metrics: []MQEMetric{
MQEMetric{Metric: "os.cpu*", Alias: ""},
query := &Query{
Metrics: []Metric{
Metric{Metric: "os.cpu*", Alias: ""},
},
Hosts: []string{"staples-lab-1"},
AddAppToAlias: false,