Chore: Refactor loki to use SDK contracts (#37400)

* Refactor loki to use SDK contracts

* Register with service name

* Initialize interval calculator

* Return always created result

* Use go library instead of simplejson

* Update pkg/tsdb/loki/loki.go

Co-authored-by: Ivana Huckova <30407135+ivanahuckova@users.noreply.github.com>

* Remove newline

* Merge with main

Co-authored-by: Ivana Huckova <30407135+ivanahuckova@users.noreply.github.com>
This commit is contained in:
idafurjes
2021-08-05 10:25:31 +02:00
committed by GitHub
parent 1083bef030
commit 0563bc68fc
3 changed files with 223 additions and 129 deletions

View File

@@ -2,40 +2,39 @@ package loki
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
"regexp"
"strings"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/infra/httpclient"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/tsdb/interval"
"github.com/grafana/grafana/pkg/plugins/backendplugin"
"github.com/grafana/grafana/pkg/plugins/backendplugin/coreplugin"
"github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/tsdb"
"github.com/grafana/loki/pkg/logcli/client"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/common/config"
"github.com/prometheus/common/model"
)
type LokiExecutor struct {
intervalCalculator interval.Calculator
httpClientProvider httpclient.Provider
}
type Service struct {
intervalCalculator tsdb.Calculator
im instancemgmt.InstanceManager
// nolint:staticcheck // plugins.DataPlugin deprecated
func New(httpClientProvider httpclient.Provider) func(dsInfo *models.DataSource) (plugins.DataPlugin, error) {
// nolint:staticcheck // plugins.DataPlugin deprecated
return func(dsInfo *models.DataSource) (plugins.DataPlugin, error) {
return &LokiExecutor{
intervalCalculator: interval.NewCalculator(interval.CalculatorOptions{MinInterval: time.Second * 1}),
httpClientProvider: httpClientProvider,
}, nil
}
HTTPClientProvider httpclient.Provider `inject:""`
BackendPluginManager backendplugin.Manager `inject:""`
}
var (
@@ -43,39 +42,103 @@ var (
legendFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`)
)
// DataQuery executes a Loki query.
//nolint: staticcheck // plugins.DataPlugin deprecated
func (e *LokiExecutor) DataQuery(ctx context.Context, dsInfo *models.DataSource,
queryContext plugins.DataQuery) (plugins.DataResponse, error) {
result := plugins.DataResponse{
Results: map[string]plugins.DataQueryResult{},
type datasourceInfo struct {
HTTPClient *http.Client
URL string
TLSClientConfig *tls.Config
BasicAuthUser string
BasicAuthPassword string
TimeInterval string `json:"timeInterval"`
}
type ResponseModel struct {
Expr string `json:"expr"`
LegendFormat string `json:"legendFormat"`
Interval string `json:"interval"`
IntervalMS int `json:"intervalMS"`
}
func init() {
registry.Register(&registry.Descriptor{
Name: "LokiService",
InitPriority: registry.Low,
Instance: &Service{},
})
}
func (s *Service) Init() error {
s.im = datasource.NewInstanceManager(newInstanceSettings(s.HTTPClientProvider))
s.intervalCalculator = tsdb.NewCalculator()
factory := coreplugin.New(backend.ServeOpts{
QueryDataHandler: s,
})
if err := s.BackendPluginManager.RegisterAndStart(context.Background(), "loki", factory); err != nil {
plog.Error("Failed to register plugin", "error", err)
}
tlsConfig, err := dsInfo.GetTLSConfig(e.httpClientProvider)
if err != nil {
return plugins.DataResponse{}, err
}
return nil
}
transport, err := dsInfo.GetHTTPTransport(e.httpClientProvider)
func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.InstanceFactoryFunc {
return func(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
opts, err := settings.HTTPClientOptions()
if err != nil {
return nil, err
}
client, err := httpClientProvider.New(opts)
if err != nil {
return nil, err
}
tlsClientConfig, err := httpClientProvider.GetTLSConfig(opts)
if err != nil {
return nil, err
}
jsonData := datasourceInfo{}
err = json.Unmarshal(settings.JSONData, &jsonData)
if err != nil {
return nil, fmt.Errorf("error reading settings: %w", err)
}
model := &datasourceInfo{
HTTPClient: client,
URL: settings.URL,
TLSClientConfig: tlsClientConfig,
TimeInterval: jsonData.TimeInterval,
BasicAuthUser: settings.BasicAuthUser,
BasicAuthPassword: settings.DecryptedSecureJSONData["basicAuthPassword"],
}
return model, nil
}
}
func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
result := backend.NewQueryDataResponse()
queryRes := backend.DataResponse{}
dsInfo, err := s.getDSInfo(req.PluginContext)
if err != nil {
return plugins.DataResponse{}, err
return result, err
}
client := &client.DefaultClient{
Address: dsInfo.Url,
Address: dsInfo.URL,
Username: dsInfo.BasicAuthUser,
Password: dsInfo.DecryptedBasicAuthPassword(),
Password: dsInfo.BasicAuthPassword,
TLSConfig: config.TLSConfig{
InsecureSkipVerify: tlsConfig.InsecureSkipVerify,
InsecureSkipVerify: dsInfo.TLSClientConfig.InsecureSkipVerify,
},
Tripperware: func(t http.RoundTripper) http.RoundTripper {
return transport
return dsInfo.HTTPClient.Transport
},
}
queries, err := e.parseQuery(dsInfo, queryContext)
queries, err := s.parseQuery(dsInfo, req)
if err != nil {
return plugins.DataResponse{}, err
return result, err
}
for _, query := range queries {
@@ -93,16 +156,16 @@ func (e *LokiExecutor) DataQuery(ctx context.Context, dsInfo *models.DataSource,
value, err := client.QueryRange(query.Expr, limit, query.Start, query.End, logproto.BACKWARD, query.Step, interval, false)
if err != nil {
return plugins.DataResponse{}, err
return result, err
}
queryResult, err := parseResponse(value, query)
frames, err := parseResponse(value, query)
if err != nil {
return plugins.DataResponse{}, err
return result, err
}
result.Results[query.RefID] = queryResult
queryRes.Frames = frames
result.Responses[query.RefID] = queryRes
}
return result, nil
}
@@ -125,59 +188,50 @@ func formatLegend(metric model.Metric, query *lokiQuery) string {
return string(result)
}
func (e *LokiExecutor) parseQuery(dsInfo *models.DataSource, queryContext plugins.DataQuery) ([]*lokiQuery, error) {
func (s *Service) parseQuery(dsInfo *datasourceInfo, queryContext *backend.QueryDataRequest) ([]*lokiQuery, error) {
qs := []*lokiQuery{}
for _, queryModel := range queryContext.Queries {
expr, err := queryModel.Model.Get("expr").String()
for _, query := range queryContext.Queries {
model := &ResponseModel{}
err := json.Unmarshal(query.JSON, model)
if err != nil {
return nil, fmt.Errorf("failed to parse Expr: %v", err)
return nil, err
}
format := queryModel.Model.Get("legendFormat").MustString("")
start := query.TimeRange.From
end := query.TimeRange.To
start, err := queryContext.TimeRange.ParseFrom()
if err != nil {
return nil, fmt.Errorf("failed to parse From: %v", err)
}
end, err := queryContext.TimeRange.ParseTo()
if err != nil {
return nil, fmt.Errorf("failed to parse To: %v", err)
}
dsInterval, err := interval.GetIntervalFrom(dsInfo, queryModel.Model, time.Second)
dsInterval, err := tsdb.GetIntervalFrom(dsInfo.TimeInterval, model.Interval, int64(model.IntervalMS), time.Second)
if err != nil {
return nil, fmt.Errorf("failed to parse Interval: %v", err)
}
interval, err := e.intervalCalculator.Calculate(*queryContext.TimeRange, dsInterval, "min")
interval, err := s.intervalCalculator.Calculate(query.TimeRange, dsInterval, tsdb.Min)
if err != nil {
return nil, err
}
step := time.Duration(int64(interval.Value))
qs = append(qs, &lokiQuery{
Expr: expr,
Expr: model.Expr,
Step: step,
LegendFormat: format,
LegendFormat: model.LegendFormat,
Start: start,
End: end,
RefID: queryModel.RefID,
RefID: query.RefID,
})
}
return qs, nil
}
//nolint: staticcheck // plugins.DataPlugin deprecated
func parseResponse(value *loghttp.QueryResponse, query *lokiQuery) (plugins.DataQueryResult, error) {
var queryRes plugins.DataQueryResult
func parseResponse(value *loghttp.QueryResponse, query *lokiQuery) (data.Frames, error) {
frames := data.Frames{}
//We are currently processing only matrix results (for alerting)
matrix, ok := value.Data.Result.(loghttp.Matrix)
if !ok {
return queryRes, fmt.Errorf("unsupported result format: %q", value.Data.ResultType)
return frames, fmt.Errorf("unsupported result format: %q", value.Data.ResultType)
}
for _, v := range matrix {
@@ -199,7 +253,20 @@ func parseResponse(value *loghttp.QueryResponse, query *lokiQuery) (plugins.Data
data.NewField("time", nil, timeVector),
data.NewField("value", tags, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: name})))
}
queryRes.Dataframes = plugins.NewDecodedDataFrames(frames)
return queryRes, nil
return frames, nil
}
func (s *Service) getDSInfo(pluginCtx backend.PluginContext) (*datasourceInfo, error) {
i, err := s.im.Get(pluginCtx)
if err != nil {
return nil, err
}
instance, ok := i.(*datasourceInfo)
if !ok {
return nil, fmt.Errorf("failed to cast datsource info")
}
return instance, nil
}

View File

@@ -1,23 +1,20 @@
package loki
import (
"fmt"
"testing"
"time"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/httpclient"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins"
"github.com/google/go-cmp/cmp"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/tsdb"
"github.com/grafana/loki/pkg/loghttp"
p "github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
)
func TestLoki(t *testing.T) {
dsInfo := &models.DataSource{
JsonData: simplejson.New(),
}
t.Run("converting metric name", func(t *testing.T) {
metric := map[p.LabelName]p.LabelValue{
p.LabelName("app"): p.LabelValue("backend"),
@@ -46,71 +43,89 @@ func TestLoki(t *testing.T) {
})
t.Run("parsing query model with step", func(t *testing.T) {
json := `{
"expr": "go_goroutines",
"format": "time_series",
"refId": "A"
}`
jsonModel, err := simplejson.NewJson([]byte(json))
require.NoError(t, err)
timeRange := plugins.NewDataTimeRange("12h", "now")
queryContext := plugins.DataQuery{
Queries: []plugins.DataSubQuery{
{Model: jsonModel},
queryContext := &backend.QueryDataRequest{
Queries: []backend.DataQuery{
{
JSON: []byte(`
{
"expr": "go_goroutines",
"format": "time_series",
"refId": "A"
}`,
),
TimeRange: backend.TimeRange{
From: time.Now().Add(-30 * time.Second),
To: time.Now(),
},
},
},
TimeRange: &timeRange,
}
exe, err := New(httpclient.NewProvider())(dsInfo)
require.NoError(t, err)
lokiExecutor := exe.(*LokiExecutor)
models, err := lokiExecutor.parseQuery(dsInfo, queryContext)
service := &Service{
intervalCalculator: mockCalculator{
interval: tsdb.Interval{
Value: time.Second * 30,
},
},
}
dsInfo := &datasourceInfo{}
models, err := service.parseQuery(dsInfo, queryContext)
require.NoError(t, err)
require.Equal(t, time.Second*30, models[0].Step)
})
t.Run("parsing query model without step parameter", func(t *testing.T) {
json := `{
"expr": "go_goroutines",
"format": "time_series",
"refId": "A"
}`
jsonModel, err := simplejson.NewJson([]byte(json))
require.NoError(t, err)
timeRange := plugins.NewDataTimeRange("48h", "now")
queryContext := plugins.DataQuery{
TimeRange: &timeRange,
Queries: []plugins.DataSubQuery{
{Model: jsonModel},
queryContext := &backend.QueryDataRequest{
Queries: []backend.DataQuery{
{
JSON: []byte(`
{
"expr": "go_goroutines",
"format": "time_series",
"refId": "A"
}`,
),
TimeRange: backend.TimeRange{
From: time.Now().Add(-48 * time.Hour),
To: time.Now(),
},
},
},
}
exe, err := New(httpclient.NewProvider())(dsInfo)
require.NoError(t, err)
lokiExecutor := exe.(*LokiExecutor)
models, err := lokiExecutor.parseQuery(dsInfo, queryContext)
service := &Service{
intervalCalculator: mockCalculator{
interval: tsdb.Interval{
Value: time.Minute * 2,
},
},
}
dsInfo := &datasourceInfo{}
models, err := service.parseQuery(dsInfo, queryContext)
require.NoError(t, err)
require.Equal(t, time.Minute*2, models[0].Step)
timeRange = plugins.NewDataTimeRange("1h", "now")
queryContext.TimeRange = &timeRange
models, err = lokiExecutor.parseQuery(dsInfo, queryContext)
service = &Service{
intervalCalculator: mockCalculator{
interval: tsdb.Interval{
Value: time.Second * 2,
},
},
}
models, err = service.parseQuery(dsInfo, queryContext)
require.NoError(t, err)
fmt.Println(models)
require.Equal(t, time.Second*2, models[0].Step)
})
}
func TestParseResponse(t *testing.T) {
t.Run("value is not of type matrix", func(t *testing.T) {
//nolint: staticcheck // plugins.DataPlugin deprecated
queryRes := plugins.DataQueryResult{}
queryRes := data.Frames{}
value := loghttp.QueryResponse{
Data: loghttp.QueryResponseData{
Result: loghttp.Vector{},
},
}
res, err := parseResponse(&value, nil)
require.Equal(t, queryRes, res)
require.Error(t, err)
})
@@ -137,22 +152,36 @@ func TestParseResponse(t *testing.T) {
query := &lokiQuery{
LegendFormat: "legend {{app}}",
}
res, err := parseResponse(&value, query)
frame, err := parseResponse(&value, query)
require.NoError(t, err)
decoded, _ := res.Dataframes.Decoded()
require.Len(t, decoded, 1)
require.Equal(t, decoded[0].Name, "legend Application")
require.Len(t, decoded[0].Fields, 2)
require.Len(t, decoded[0].Fields[0].Labels, 0)
require.Equal(t, decoded[0].Fields[0].Name, "time")
require.Len(t, decoded[0].Fields[1].Labels, 2)
require.Equal(t, decoded[0].Fields[1].Labels.String(), "app=Application, tag2=tag2")
require.Equal(t, decoded[0].Fields[1].Name, "value")
require.Equal(t, decoded[0].Fields[1].Config.DisplayNameFromDS, "legend Application")
labels, err := data.LabelsFromString("app=Application, tag2=tag2")
require.NoError(t, err)
field1 := data.NewField("time", nil, []time.Time{
time.Date(1970, 1, 1, 0, 0, 1, 0, time.UTC),
time.Date(1970, 1, 1, 0, 0, 2, 0, time.UTC),
time.Date(1970, 1, 1, 0, 0, 3, 0, time.UTC),
time.Date(1970, 1, 1, 0, 0, 4, 0, time.UTC),
time.Date(1970, 1, 1, 0, 0, 5, 0, time.UTC),
})
field2 := data.NewField("value", labels, []float64{1, 2, 3, 4, 5})
field2.SetConfig(&data.FieldConfig{DisplayNameFromDS: "legend Application"})
testFrame := data.NewFrame("legend Application", field1, field2)
// Ensure the timestamps are UTC zoned
testValue := decoded[0].Fields[0].At(0)
require.Equal(t, "UTC", testValue.(time.Time).Location().String())
if diff := cmp.Diff(testFrame, frame[0], data.FrameTestCompareOptions()...); diff != "" {
t.Errorf("Result mismatch (-want +got):\n%s", diff)
}
})
}
type mockCalculator struct {
interval tsdb.Interval
}
func (m mockCalculator) Calculate(timerange backend.TimeRange, minInterval time.Duration, intervalMode tsdb.IntervalMode) (tsdb.Interval, error) {
return m.interval, nil
}
func (m mockCalculator) CalculateSafeInterval(timerange backend.TimeRange, resolution int64) tsdb.Interval {
return m.interval
}

View File

@@ -13,7 +13,6 @@ import (
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb/azuremonitor"
"github.com/grafana/grafana/pkg/tsdb/cloudmonitoring"
"github.com/grafana/grafana/pkg/tsdb/loki"
"github.com/grafana/grafana/pkg/tsdb/mssql"
"github.com/grafana/grafana/pkg/tsdb/mysql"
"github.com/grafana/grafana/pkg/tsdb/postgres"
@@ -59,7 +58,6 @@ func (s *Service) Init() error {
s.registry["postgres"] = s.PostgresService.NewExecutor
s.registry["mysql"] = mysql.New(s.HTTPClientProvider)
s.registry["stackdriver"] = s.CloudMonitoringService.NewExecutor
s.registry["loki"] = loki.New(s.HTTPClientProvider)
s.registry["tempo"] = tempo.New(s.HTTPClientProvider)
return nil
}