Loki: Add support for alerting (#31424)

* Create Loki backend template

* Update endpoint

* Adjust step

* Add test

* Change to use Loki client

* Address feedback, improve errors and comments

* Linting

* Update pkg/tsdb/loki/loki.go

* Update pkg/tsdb/loki/loki.go

* Update pkg/tsdb/loki/loki.go
This commit is contained in:
Ivana Huckova 2021-03-02 14:55:16 +01:00 committed by GitHub
parent c113d3ce72
commit d5fee5a3f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 274 additions and 1 deletions

View File

@ -27,6 +27,7 @@ import (
_ "github.com/grafana/grafana/pkg/tsdb/elasticsearch"
_ "github.com/grafana/grafana/pkg/tsdb/graphite"
_ "github.com/grafana/grafana/pkg/tsdb/influxdb"
_ "github.com/grafana/grafana/pkg/tsdb/loki"
_ "github.com/grafana/grafana/pkg/tsdb/mysql"
_ "github.com/grafana/grafana/pkg/tsdb/opentsdb"
_ "github.com/grafana/grafana/pkg/tsdb/postgres"

173
pkg/tsdb/loki/loki.go Normal file
View File

@ -0,0 +1,173 @@
package loki
import (
"context"
"fmt"
"regexp"
"strings"
"time"
"github.com/grafana/grafana/pkg/components/null"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/tsdb"
"github.com/grafana/loki/pkg/logcli/client"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/common/model"
)
type LokiExecutor struct{}
func NewLokiExecutor(dsInfo *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
return &LokiExecutor{}, nil
}
var (
plog log.Logger
legendFormat *regexp.Regexp
intervalCalculator tsdb.IntervalCalculator
)
func init() {
plog = log.New("tsdb.loki")
tsdb.RegisterTsdbQueryEndpoint("loki", NewLokiExecutor)
legendFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`)
intervalCalculator = tsdb.NewIntervalCalculator(&tsdb.IntervalOptions{MinInterval: time.Second * 1})
}
func (e *LokiExecutor) Query(ctx context.Context, dsInfo *models.DataSource, tsdbQuery *tsdb.TsdbQuery) (*tsdb.Response, error) {
result := &tsdb.Response{
Results: map[string]*tsdb.QueryResult{},
}
client := &client.DefaultClient{
Address: dsInfo.Url,
Username: dsInfo.BasicAuthUser,
Password: dsInfo.DecryptedBasicAuthPassword(),
}
queries, err := parseQuery(dsInfo, tsdbQuery.Queries, tsdbQuery)
if err != nil {
return nil, err
}
for _, query := range queries {
plog.Debug("Sending query", "start", query.Start, "end", query.End, "step", query.Step, "query", query.Expr)
span, _ := opentracing.StartSpanFromContext(ctx, "alerting.loki")
span.SetTag("expr", query.Expr)
span.SetTag("start_unixnano", query.Start.UnixNano())
span.SetTag("stop_unixnano", query.End.UnixNano())
defer span.Finish()
//Currently hard coded as not used - applies to log queries
limit := 1000
//Currently hard coded as not used - applies to queries which produce a stream response
interval := time.Second * 1
value, err := client.QueryRange(query.Expr, limit, query.Start, query.End, logproto.BACKWARD, query.Step, interval, false)
if err != nil {
return nil, err
}
queryResult, err := parseResponse(value, query)
if err != nil {
return nil, err
}
result.Results[query.RefId] = queryResult
}
return result, nil
}
//If legend (using of name or pattern instead of time series name) is used, use that name/pattern for formatting
func formatLegend(metric model.Metric, query *LokiQuery) string {
if query.LegendFormat == "" {
return metric.String()
}
result := legendFormat.ReplaceAllFunc([]byte(query.LegendFormat), func(in []byte) []byte {
labelName := strings.Replace(string(in), "{{", "", 1)
labelName = strings.Replace(labelName, "}}", "", 1)
labelName = strings.TrimSpace(labelName)
if val, exists := metric[model.LabelName(labelName)]; exists {
return []byte(val)
}
return []byte{}
})
return string(result)
}
func parseQuery(dsInfo *models.DataSource, queries []*tsdb.Query, queryContext *tsdb.TsdbQuery) ([]*LokiQuery, error) {
qs := []*LokiQuery{}
for _, queryModel := range queries {
expr, err := queryModel.Model.Get("expr").String()
if err != nil {
return nil, fmt.Errorf("failed to parse Expr: %v", err)
}
format := queryModel.Model.Get("legendFormat").MustString("")
start, err := queryContext.TimeRange.ParseFrom()
if err != nil {
return nil, fmt.Errorf("failed to parse From: %v", err)
}
end, err := queryContext.TimeRange.ParseTo()
if err != nil {
return nil, fmt.Errorf("failed to parse To: %v", err)
}
dsInterval, err := tsdb.GetIntervalFrom(dsInfo, queryModel.Model, time.Second)
if err != nil {
return nil, fmt.Errorf("failed to parse Interval: %v", err)
}
interval := intervalCalculator.Calculate(queryContext.TimeRange, dsInterval)
step := time.Duration(int64(interval.Value))
qs = append(qs, &LokiQuery{
Expr: expr,
Step: step,
LegendFormat: format,
Start: start,
End: end,
RefId: queryModel.RefId,
})
}
return qs, nil
}
func parseResponse(value *loghttp.QueryResponse, query *LokiQuery) (*tsdb.QueryResult, error) {
queryRes := tsdb.NewQueryResult()
//We are currently processing only matrix results (for alerting)
data, ok := value.Data.Result.(loghttp.Matrix)
if !ok {
return queryRes, fmt.Errorf("unsupported result format: %q", value.Data.ResultType)
}
for _, v := range data {
series := tsdb.TimeSeries{
Name: formatLegend(v.Metric, query),
Tags: make(map[string]string, len(v.Metric)),
Points: make([]tsdb.TimePoint, 0, len(v.Values)),
}
for k, v := range v.Metric {
series.Tags[string(k)] = string(v)
}
for _, k := range v.Values {
series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFrom(float64(k.Value)), float64(k.Timestamp.Unix()*1000)))
}
queryRes.Series = append(queryRes.Series, &series)
}
return queryRes, nil
}

View File

@ -0,0 +1,87 @@
package loki
import (
"testing"
"time"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/tsdb"
p "github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
)
func TestLoki(t *testing.T) {
dsInfo := &models.DataSource{
JsonData: simplejson.New(),
}
t.Run("converting metric name", func(t *testing.T) {
metric := map[p.LabelName]p.LabelValue{
p.LabelName("app"): p.LabelValue("backend"),
p.LabelName("device"): p.LabelValue("mobile"),
}
query := &LokiQuery{
LegendFormat: "legend {{app}} {{ device }} {{broken}}",
}
require.Equal(t, "legend backend mobile ", formatLegend(metric, query))
})
t.Run("build full series name", func(t *testing.T) {
metric := map[p.LabelName]p.LabelValue{
p.LabelName(p.MetricNameLabel): p.LabelValue("http_request_total"),
p.LabelName("app"): p.LabelValue("backend"),
p.LabelName("device"): p.LabelValue("mobile"),
}
query := &LokiQuery{
LegendFormat: "",
}
require.Equal(t, `http_request_total{app="backend", device="mobile"}`, formatLegend(metric, query))
})
t.Run("parsing query model with step", func(t *testing.T) {
json := `{
"expr": "go_goroutines",
"format": "time_series",
"refId": "A"
}`
jsonModel, _ := simplejson.NewJson([]byte(json))
queryContext := &tsdb.TsdbQuery{}
queryModels := []*tsdb.Query{
{Model: jsonModel},
}
queryContext.TimeRange = tsdb.NewTimeRange("12h", "now")
models, err := parseQuery(dsInfo, queryModels, queryContext)
require.NoError(t, err)
require.Equal(t, time.Second*30, models[0].Step)
})
t.Run("parsing query model without step parameter", func(t *testing.T) {
json := `{
"expr": "go_goroutines",
"format": "time_series",
"refId": "A"
}`
jsonModel, _ := simplejson.NewJson([]byte(json))
queryContext := &tsdb.TsdbQuery{}
queryModels := []*tsdb.Query{
{Model: jsonModel},
}
queryContext.TimeRange = tsdb.NewTimeRange("48h", "now")
models, err := parseQuery(dsInfo, queryModels, queryContext)
require.NoError(t, err)
require.Equal(t, time.Minute*2, models[0].Step)
queryContext.TimeRange = tsdb.NewTimeRange("1h", "now")
models, err = parseQuery(dsInfo, queryModels, queryContext)
require.NoError(t, err)
require.Equal(t, time.Second*2, models[0].Step)
})
}

12
pkg/tsdb/loki/types.go Normal file
View File

@ -0,0 +1,12 @@
package loki
import "time"
type LokiQuery struct {
Expr string
Step time.Duration
LegendFormat string
Start time.Time
End time.Time
RefId string
}

View File

@ -6,7 +6,7 @@
"logs": true,
"metrics": true,
"alerting": false,
"alerting": true,
"annotations": true,
"streaming": true,