[dataframe] convert opentsdb response to dataframes (#35307)

* draft pr convert opentsdb response to dataframes

* Add test for parse response and fix go lint

* Add test case for create request

* Use go-cmp in test

* Remove comment

Co-authored-by: Ida Furjesova <ida.furjesova@grafana.com>
This commit is contained in:
ying-jeanne 2021-06-10 12:14:25 +02:00 committed by GitHub
parent 8b2ee06f3c
commit a07c53b671
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 83 additions and 14 deletions

View File

@ -6,6 +6,7 @@ import (
"path" "path"
"strconv" "strconv"
"strings" "strings"
"time"
"golang.org/x/net/context/ctxhttp" "golang.org/x/net/context/ctxhttp"
@ -14,7 +15,7 @@ import (
"net/http" "net/http"
"net/url" "net/url"
"github.com/grafana/grafana/pkg/components/null" "github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/infra/httpclient" "github.com/grafana/grafana/pkg/infra/httpclient"
"github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/models"
@ -130,32 +131,33 @@ func (e *OpenTsdbExecutor) parseResponse(query OpenTsdbQuery, res *http.Response
return nil, fmt.Errorf("request failed, status: %s", res.Status) return nil, fmt.Errorf("request failed, status: %s", res.Status)
} }
var data []OpenTsdbResponse var responseData []OpenTsdbResponse
err = json.Unmarshal(body, &data) err = json.Unmarshal(body, &responseData)
if err != nil { if err != nil {
plog.Info("Failed to unmarshal opentsdb response", "error", err, "status", res.Status, "body", string(body)) plog.Info("Failed to unmarshal opentsdb response", "error", err, "status", res.Status, "body", string(body))
return nil, err return nil, err
} }
for _, val := range data { frames := data.Frames{}
series := plugins.DataTimeSeries{ for _, val := range responseData {
Name: val.Metric, timeVector := make([]time.Time, 0, len(val.DataPoints))
} values := make([]float64, 0, len(val.DataPoints))
name := val.Metric
for timeString, value := range val.DataPoints { for timeString, value := range val.DataPoints {
timestamp, err := strconv.ParseFloat(timeString, 64) timestamp, err := strconv.ParseInt(timeString, 10, 64)
if err != nil { if err != nil {
plog.Info("Failed to unmarshal opentsdb timestamp", "timestamp", timeString) plog.Info("Failed to unmarshal opentsdb timestamp", "timestamp", timeString)
return nil, err return nil, err
} }
series.Points = append(series.Points, plugins.DataTimePoint{ timeVector = append(timeVector, time.Unix(timestamp, 0).UTC())
null.FloatFrom(value), null.FloatFrom(timestamp), values = append(values, value)
})
} }
frames = append(frames, data.NewFrame(name,
queryRes.Series = append(queryRes.Series, series) data.NewField("time", nil, timeVector),
data.NewField("value", nil, values)))
} }
queryRes.Dataframes = plugins.NewDecodedDataFrames(frames)
queryResults["A"] = queryRes queryResults["A"] = queryRes
return queryResults, nil return queryResults, nil
} }

View File

@ -1,16 +1,83 @@
package opentsdb package opentsdb
import ( import (
"io/ioutil"
"net/http"
"strings"
"testing" "testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins" "github.com/grafana/grafana/pkg/plugins"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestOpenTsdbExecutor(t *testing.T) { func TestOpenTsdbExecutor(t *testing.T) {
exec := &OpenTsdbExecutor{} exec := &OpenTsdbExecutor{}
t.Run("create request", func(t *testing.T) {
req, err := exec.createRequest(&models.DataSource{}, OpenTsdbQuery{})
require.NoError(t, err)
assert.Equal(t, "POST", req.Method)
body, err := ioutil.ReadAll(req.Body)
require.NoError(t, err)
testBody := "{\"start\":0,\"end\":0,\"queries\":null}"
assert.Equal(t, testBody, string(body))
})
t.Run("Parse response should handle invalid JSON", func(t *testing.T) {
response := `{ invalid }`
query := OpenTsdbQuery{}
result, err := exec.parseResponse(query, &http.Response{Body: ioutil.NopCloser(strings.NewReader(response))})
require.Nil(t, result["A"].Dataframes)
require.Error(t, err)
})
t.Run("Parse response should handle JSON", func(t *testing.T) {
response := `
[
{
"metric": "test",
"dps": {
"1405544146": 50.0
}
}
]`
testFrame := data.NewFrame("test",
data.NewField("time", nil, []time.Time{
time.Date(2014, 7, 16, 20, 55, 46, 0, time.UTC),
}),
data.NewField("value", nil, []float64{
50}),
)
query := OpenTsdbQuery{}
resp := http.Response{Body: ioutil.NopCloser(strings.NewReader(response))}
resp.StatusCode = 200
result, err := exec.parseResponse(query, &resp)
require.NoError(t, err)
decoded, err := result["A"].Dataframes.Decoded()
require.NoError(t, err)
require.Len(t, decoded, 1)
frame := decoded[0]
if diff := cmp.Diff(testFrame, frame, data.FrameTestCompareOptions()...); diff != "" {
t.Errorf("Result mismatch (-want +got):\n%s", diff)
}
})
t.Run("Build metric with downsampling enabled", func(t *testing.T) { t.Run("Build metric with downsampling enabled", func(t *testing.T) {
query := plugins.DataSubQuery{ query := plugins.DataSubQuery{
Model: simplejson.New(), Model: simplejson.New(),