From 06f2047ced1a301027a26258bbe53ba095ac4c25 Mon Sep 17 00:00:00 2001 From: utkarshcmu Date: Fri, 7 Oct 2016 23:58:27 -0700 Subject: [PATCH] OpenTsdb alerting works with metric query --- pkg/cmd/grafana-server/main.go | 1 + pkg/tsdb/opentsdb/opentsdb.go | 134 +++++++++++++++++++++++++++++++-- pkg/tsdb/opentsdb/types.go | 17 +++++ 3 files changed, 146 insertions(+), 6 deletions(-) create mode 100644 pkg/tsdb/opentsdb/types.go diff --git a/pkg/cmd/grafana-server/main.go b/pkg/cmd/grafana-server/main.go index caf9d2cb56f..263c7ae569c 100644 --- a/pkg/cmd/grafana-server/main.go +++ b/pkg/cmd/grafana-server/main.go @@ -22,6 +22,7 @@ import ( _ "github.com/grafana/grafana/pkg/tsdb/graphite" _ "github.com/grafana/grafana/pkg/tsdb/influxdb" _ "github.com/grafana/grafana/pkg/tsdb/prometheus" + _ "github.com/grafana/grafana/pkg/tsdb/opentsdb" _ "github.com/grafana/grafana/pkg/tsdb/testdata" ) diff --git a/pkg/tsdb/opentsdb/opentsdb.go b/pkg/tsdb/opentsdb/opentsdb.go index 57134657603..f8b51204ade 100644 --- a/pkg/tsdb/opentsdb/opentsdb.go +++ b/pkg/tsdb/opentsdb/opentsdb.go @@ -1,17 +1,30 @@ package opentsdb import ( - "net/http" + "fmt" + "path" + "strings" + "context" + "strconv" + + "net/url" + "net/http" + "io/ioutil" + //"net/http/httputil" + "encoding/json" + + "gopkg.in/guregu/null.v3" "github.com/grafana/grafana/pkg/log" - "github.com/grafana/grafana/pkg/tsdb" + "github.com/grafana/grafana/pkg/tsdb" + "github.com/grafana/grafana/pkg/setting" ) type OpenTsdbExecutor struct { *tsdb.DataSourceInfo } -func NewOpenTsdbExecutorExecutor(dsInfo *tsdb.DataSourceInfo) tsdb.Executor { +func NewOpenTsdbExecutor(dsInfo *tsdb.DataSourceInfo) tsdb.Executor { return &OpenTsdbExecutor{dsInfo} } @@ -22,9 +35,118 @@ var ( func init() { plog = log.New("tsdb.opentsdb") - tsdb.RegisterExecutor("opentsdb", NewOpenTsdbExecutorExecutor) + tsdb.RegisterExecutor("opentsdb", NewOpenTsdbExecutor) } -func (e *OpenTsdbExecutor) Execute(queries tsdb.QuerySlice, queryContext *tsdb.QueryContext) *tsdb.BatchResult { - panic("Missing implementation") +func (e *OpenTsdbExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, queryContext *tsdb.QueryContext) *tsdb.BatchResult { + result := &tsdb.BatchResult{} + + var tsdbQuery OpenTsdbQuery + + tsdbQuery.Start = queryContext.TimeRange.GetFromAsMsEpoch() + tsdbQuery.End = queryContext.TimeRange.GetToAsMsEpoch() + + for _, query := range queries { + tsdbQuery.Queries = []OpenTsdbMetric { + OpenTsdbMetric{ + Metric: query.Model.Get("metric").MustString(), + Aggregator: query.Model.Get("aggregator").MustString(), + }, + } + } + + if setting.Env == setting.DEV { + plog.Debug("OpenTsdb request", "params", tsdbQuery) + } + + req, err := e.createRequest(tsdbQuery) + if err != nil { + result.Error = err + return result + } + + res, err := HttpClient.Do(req) + if err != nil { + result.Error = err + return result + } + + queryResult, err := e.parseResponse(tsdbQuery, res) + if err != nil { + return result.WithError(err) + } + + result.QueryResults = queryResult + return result } + +func (e *OpenTsdbExecutor) createRequest(data OpenTsdbQuery) (*http.Request, error) { + u, _ := url.Parse(e.Url) + u.Path = path.Join(u.Path, "api/query") + + postData, err := json.Marshal(data) + + req, err := http.NewRequest(http.MethodPost, u.String(), strings.NewReader(string(postData))) + if err != nil { + plog.Info("Failed to create request", "error", err) + return nil, fmt.Errorf("Failed to create request. error: %v", err) + } + + req.Header.Set("Content-Type", "application/json") + if e.BasicAuth { + req.SetBasicAuth(e.BasicAuthUser, e.BasicAuthPassword) + } + + /* + requestDump, err := httputil.DumpRequest(req, true) + if err != nil { + fmt.Println(err) + } + fmt.Println(string(requestDump)) + */ + return req, err +} + +func (e *OpenTsdbExecutor) parseResponse(query OpenTsdbQuery, res *http.Response) (map[string]*tsdb.QueryResult, error) { + + queryResults := make(map[string]*tsdb.QueryResult) + queryRes := tsdb.NewQueryResult() + + body, err := ioutil.ReadAll(res.Body) + defer res.Body.Close() + if err != nil { + return nil, err + } + + if res.StatusCode/100 != 2 { + plog.Info("Request failed", "status", res.Status, "body", string(body)) + return nil, fmt.Errorf("Request failed status: %v", res.Status) + } + + var data []OpenTsdbResponse + err = json.Unmarshal(body, &data) + if err != nil { + plog.Info("Failed to unmarshal opentsdb response", "error", err, "status", res.Status, "body", string(body)) + return nil, err + } + + for _, val := range data { + series := tsdb.TimeSeries{ + Name: val.Metric, + } + + for timeString, value := range val.DataPoints { + timestamp, err := strconv.ParseFloat(timeString, 64) + if err != nil { + plog.Info("Failed to unmarshal opentsdb timestamp", "timestamp", timeString) + return nil, err + } + series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFrom(value), timestamp)) + } + + queryRes.Series = append(queryRes.Series, &series) + } + + queryResults["A"] = queryRes + return queryResults, nil +} \ No newline at end of file diff --git a/pkg/tsdb/opentsdb/types.go b/pkg/tsdb/opentsdb/types.go new file mode 100644 index 00000000000..473c797b2d4 --- /dev/null +++ b/pkg/tsdb/opentsdb/types.go @@ -0,0 +1,17 @@ +package opentsdb + +type OpenTsdbQuery struct { + Start int64 `json:"start"` + End int64 `json:"end"` + Queries []OpenTsdbMetric `json:"queries"` +} + +type OpenTsdbMetric struct { + Metric string `json:"metric"` + Aggregator string `json:"aggregator"` +} + +type OpenTsdbResponse struct { + Metric string `json:"metric"` + DataPoints map[string]float64 `json:"dps"` +} \ No newline at end of file