diff --git a/pkg/tsdb/mqe/mqe.go b/pkg/tsdb/mqe/mqe.go index c3f10530416..aeee8c6f5b5 100644 --- a/pkg/tsdb/mqe/mqe.go +++ b/pkg/tsdb/mqe/mqe.go @@ -10,10 +10,14 @@ import ( type MQEExecutor struct { *tsdb.DataSourceInfo + QueryParser *MQEQueryParser } func NewMQEExecutor(dsInfo *tsdb.DataSourceInfo) tsdb.Executor { - return &MQEExecutor{dsInfo} + return &MQEExecutor{ + DataSourceInfo: dsInfo, + QueryParser: &MQEQueryParser{}, + } } var ( @@ -29,16 +33,40 @@ func init() { } func (e *MQEExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult { + result := &tsdb.BatchResult{} + + availableSeries, err := NewTokenClient().GetTokenData(ctx, e.DataSourceInfo) + if err != nil { + return result.WithError(err) + } - availableSeries, _ := NewTokenClient().GetTokenData(ctx, e.DataSourceInfo) glog.Info("available series", availableSeries) - //query, _ := &MQEQueryParser{}.Parse() + var mqeQueries []*MQEQuery + for _, v := range queries { + q, err := e.QueryParser.Parse(v.Model, e.DataSourceInfo) + if err != nil { + return result.WithError(err) + } + mqeQueries = append(mqeQueries, q) + } + var rawQueries []string + for _, v := range mqeQueries { + queries, err := v.Build(availableSeries.Metrics) + if err != nil { + return result.WithError(err) + } + rawQueries = append(rawQueries, queries...) + } - //fetch all available serienames - //expaned parsed model into multiple queries + for _, v := range rawQueries { + glog.Info("Mqe executor", "query", v) + //create request from v + //send request + //parse request + } - return &tsdb.BatchResult{} + return result } diff --git a/pkg/tsdb/mqe/response_parser.go b/pkg/tsdb/mqe/response_parser.go index 69b620a5790..06e9266c714 100644 --- a/pkg/tsdb/mqe/response_parser.go +++ b/pkg/tsdb/mqe/response_parser.go @@ -1,8 +1,16 @@ package mqe import ( + "encoding/json" + "io/ioutil" "net/http" + "time" + null "gopkg.in/guregu/null.v3" + + "fmt" + + "github.com/grafana/grafana/pkg/log" "github.com/grafana/grafana/pkg/tsdb" ) @@ -11,8 +19,81 @@ import ( // add app to alias // regular alias -type MQEResponseParser struct{} +func NewResponseParser() *MQEResponseParser { + return &MQEResponseParser{ + log: log.New("tsdb.mqe"), + } +} + +type MQEResponse struct { + Success bool `json:"success"` + Name string `json:"name"` + Body []MQEResponseSerie `json:"body"` +} + +type ResponseTimeRange struct { + Start int64 `json:"start"` + End int64 `json:"end"` + Resolution time.Duration `json:"Resolution"` +} + +type MQEResponseSerie struct { + Query string `json:"query"` + Name string `json:"name"` + Type string `json:"type"` + Series []MQESerie `json:"series"` + TimeRange ResponseTimeRange `json:"timerange"` +} + +type MQESerie struct { + Values []null.Float `json:"values"` + Tagset map[string]string `json:"tagset"` +} + +type MQEResponseParser struct { + log log.Logger +} func (parser *MQEResponseParser) Parse(res *http.Response) (*tsdb.QueryResult, error) { - return nil, nil + body, err := ioutil.ReadAll(res.Body) + defer res.Body.Close() + if err != nil { + return nil, err + } + + if res.StatusCode/100 != 2 { + parser.log.Error("Request failed", "status code", res.StatusCode, "body", string(body)) + return nil, fmt.Errorf("Returned invalid statuscode") + } + + var data *MQEResponse = &MQEResponse{} + err = json.Unmarshal(body, data) + if err != nil { + parser.log.Info("Failed to unmarshal graphite response", "error", err, "status", res.Status, "body", string(body)) + return nil, err + } + + if !data.Success { + return nil, fmt.Errorf("MQE request failed.") + } + + var series tsdb.TimeSeriesSlice + for _, v := range data.Body { + for _, k := range v.Series { + serie := &tsdb.TimeSeries{ + Name: v.Name, + } + + startTime := time.Unix(v.TimeRange.Start*1000, 0) + for i, l := range k.Values { + timestamp := startTime.Add(time.Duration(int64(v.TimeRange.Resolution) * int64(i))) + serie.Points = append(serie.Points, tsdb.NewTimePoint(l, float64(timestamp.UnixNano()/int64(time.Millisecond)))) + } + + series = append(series, serie) + } + + } + + return &tsdb.QueryResult{Series: series}, nil } diff --git a/pkg/tsdb/mqe/response_parser_test.go b/pkg/tsdb/mqe/response_parser_test.go index eabc437c8a1..85626a07df5 100644 --- a/pkg/tsdb/mqe/response_parser_test.go +++ b/pkg/tsdb/mqe/response_parser_test.go @@ -3,14 +3,11 @@ package mqe import ( "testing" - "time" - "net/http" "strings" "io/ioutil" - "github.com/grafana/grafana/pkg/components/simplejson" . "github.com/smartystreets/goconvey/convey" ) @@ -20,38 +17,21 @@ var ( func TestMQEResponseParser(t *testing.T) { Convey("MQE response parser", t, func() { - parser := &MQEResponseParser{} + parser := NewResponseParser() Convey("Can parse response", func() { response := &http.Response{ StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader(dummieJson)), } - _, err := parser.Parse(response) + res, err := parser.Parse(response) So(err, ShouldBeNil) + So(len(res.Series), ShouldEqual, 2) + So(len(res.Series[0].Points), ShouldEqual, 11) }) }) } -type MQEResponse struct { - Success bool `json:"success"` - Name string `json:"name"` - Body []MQEResponseSerie `json:"body"` -} - -type ResponseTimeRange struct { - Start time.Time `json:"start"` - End time.Time `json:"end"` - Resolution time.Duration `json:"Resolution"` -} - -type MQEResponseSerie struct { - Query string `json:"query"` - Name string `json:"name"` - Type string `json:"type"` - Series []simplejson.Json `json:"series"` -} - func init() { dummieJson = `{ "success": true,