Alerting: implement loki query for alert state history (#61992)

* Alerting: implement loki query for alert state history

* extract selector building

* add unit tests for selector creation

* backup

* give selectors their own type

* build dataframe

* add some tests

* small changes after manual testing

* use struct client

* golint

* more golint

* Make RuleUID optional for Loki implementation

* Drop initial assumption that we only have one series

* Pare down to three columns, fix timestamp overflows, improve failure cases in loki responses

* Embed structred log lines in the dataframe as objects rather than json strings

* Include state history label filter

* Remove dead code

---------

Co-authored-by: Alex Weaver <weaver.alex.d@gmail.com>
This commit is contained in:
Jean-Philippe Quéméner 2023-02-02 23:31:51 +01:00 committed by GitHub
parent f49efa6e27
commit 1694a35e0c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 443 additions and 2 deletions

View File

@ -4,6 +4,9 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"strconv"
"time"
"github.com/grafana/grafana-plugin-sdk-go/data"
@ -20,6 +23,10 @@ const (
RuleUIDLabel = "ruleUID"
GroupLabel = "group"
FolderUIDLabel = "folderUID"
// Name of the columns used in the dataframe.
dfTime = "time"
dfLine = "line"
dfLabels = "labels"
)
const (
@ -30,6 +37,7 @@ const (
type remoteLokiClient interface {
ping(context.Context) error
push(context.Context, []stream) error
query(ctx context.Context, selectors []Selector, start, end int64) (QueryRes, error)
}
type RemoteLokiBackend struct {
@ -66,7 +74,140 @@ func (h *RemoteLokiBackend) RecordStatesAsync(ctx context.Context, rule history_
}
func (h *RemoteLokiBackend) QueryStates(ctx context.Context, query models.HistoryQuery) (*data.Frame, error) {
return data.NewFrame("states"), nil
selectors, err := buildSelectors(query)
if err != nil {
return nil, fmt.Errorf("failed to build the provided selectors: %w", err)
}
// Timestamps are expected in RFC3339Nano.
res, err := h.client.query(ctx, selectors, query.From.UnixNano(), query.To.UnixNano())
if err != nil {
return nil, err
}
return merge(res, query.RuleUID)
}
func buildSelectors(query models.HistoryQuery) ([]Selector, error) {
// +2 as OrgID and the state history label will always be selectors at the API level.
selectors := make([]Selector, len(query.Labels)+2)
// Set the predefined selector orgID.
selector, err := NewSelector(OrgIDLabel, "=", fmt.Sprintf("%d", query.OrgID))
if err != nil {
return nil, err
}
selectors[0] = selector
// Set the predefined selector for the state history label.
selector, err = NewSelector(StateHistoryLabelKey, "=", StateHistoryLabelValue)
if err != nil {
return nil, err
}
selectors[1] = selector
// Set the label selectors
i := 2
for label, val := range query.Labels {
selector, err = NewSelector(label, "=", val)
if err != nil {
return nil, err
}
selectors[i] = selector
i++
}
// Set the optional special selector rule_id
if query.RuleUID != "" {
rsel, err := NewSelector(RuleUIDLabel, "=", query.RuleUID)
if err != nil {
return nil, err
}
selectors = append(selectors, rsel)
}
return selectors, nil
}
// merge will put all the results in one array sorted by timestamp.
func merge(res QueryRes, ruleUID string) (*data.Frame, error) {
// Find the total number of elements in all arrays.
totalLen := 0
for _, arr := range res.Data.Result {
totalLen += len(arr.Values)
}
// Create a new slice to store the merged elements.
frame := data.NewFrame("states")
// We merge all series into a single linear history.
lbls := data.Labels(map[string]string{})
// We represent state history as a single merged history, that roughly corresponds to what you get in the Grafana Explore tab when querying Loki directly.
// The format is composed of the following vectors:
// 1. `time` - timestamp - when the transition happened
// 2. `line` - JSON - the full data of the transition
// 3. `labels` - JSON - the labels associated with that state transition
times := make([]time.Time, 0, totalLen)
lines := make([]json.RawMessage, 0, totalLen)
labels := make([]json.RawMessage, 0, totalLen)
// Initialize a slice of pointers to the current position in each array.
pointers := make([]int, len(res.Data.Result))
for {
minTime := int64(math.MaxInt64)
minEl := [2]string{}
minElStreamIdx := -1
// Find the element with the earliest time among all arrays.
for i, stream := range res.Data.Result {
// Skip if we already reached the end of the current array.
if len(stream.Values) == pointers[i] {
continue
}
curTime, err := strconv.ParseInt(stream.Values[pointers[i]][0], 10, 64)
if err != nil {
return nil, fmt.Errorf("failed to parse timestamp from loki response: %w", err)
}
if pointers[i] < len(stream.Values) && curTime < minTime {
minTime = curTime
minEl = stream.Values[pointers[i]]
minElStreamIdx = i
}
}
// If all pointers have reached the end of their arrays, we're done.
if minElStreamIdx == -1 {
break
}
var entry lokiEntry
err := json.Unmarshal([]byte(minEl[1]), &entry)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal entry: %w", err)
}
// Append the minimum element to the merged slice and move the pointer.
tsNano, err := strconv.ParseInt(minEl[0], 10, 64)
if err != nil {
return nil, fmt.Errorf("failed to parse timestamp in response: %w", err)
}
// TODO: In general, perhaps we should omit the offending line and log, rather than failing the request entirely.
streamLbls := res.Data.Result[minElStreamIdx].Stream
lblsJson, err := json.Marshal(streamLbls)
if err != nil {
return nil, fmt.Errorf("failed to serialize stream labels: %w", err)
}
line, err := jsonifyRow(minEl[1])
if err != nil {
return nil, fmt.Errorf("a line was in an invalid format: %w", err)
}
times = append(times, time.Unix(0, tsNano))
labels = append(labels, lblsJson)
lines = append(lines, line)
pointers[minElStreamIdx]++
}
frame.Fields = append(frame.Fields, data.NewField(dfTime, lbls, times))
frame.Fields = append(frame.Fields, data.NewField(dfLine, lbls, lines))
frame.Fields = append(frame.Fields, data.NewField(dfLabels, lbls, labels))
return frame, nil
}
func statesToStreams(rule history_model.RuleMeta, states []state.StateTransition, externalLabels map[string]string, logger log.Logger) []stream {
@ -150,3 +291,14 @@ func valuesAsDataBlob(state *state.State) *simplejson.Json {
return jsonifyValues(state.Values)
}
func jsonifyRow(line string) (json.RawMessage, error) {
// Ser/deser to validate the contents of the log line before shipping it forward.
// TODO: We may want to remove this in the future, as we already have the value in the form of a []byte, and json.RawMessage is also a []byte.
// TODO: Though, if the log line does not contain valid JSON, this can cause problems later on when rendering the dataframe.
var entry lokiEntry
if err := json.Unmarshal([]byte(line), &entry); err != nil {
return nil, err
}
return json.Marshal(entry)
}

View File

@ -58,6 +58,28 @@ type httpLokiClient struct {
log log.Logger
}
// Kind of Operation (=, !=, =~, !~)
type Operator string
const (
// Equal operator (=)
Eq Operator = "="
// Not Equal operator (!=)
Neq Operator = "!="
// Equal operator supporting RegEx (=~)
EqRegEx Operator = "=~"
// Not Equal operator supporting RegEx (!~)
NeqRegEx Operator = "!~"
)
type Selector struct {
// Label to Select
Label string
Op Operator
// Value that is expected
Value string
}
func newLokiClient(cfg LokiConfig, logger log.Logger) *httpLokiClient {
return &httpLokiClient{
client: http.Client{
@ -164,3 +186,105 @@ func (c *httpLokiClient) setAuthAndTenantHeaders(req *http.Request) {
req.Header.Add("X-Scope-OrgID", c.cfg.TenantID)
}
}
func (c *httpLokiClient) query(ctx context.Context, selectors []Selector, start, end int64) (QueryRes, error) {
// Run the pre-flight checks for the query.
if len(selectors) == 0 {
return QueryRes{}, fmt.Errorf("at least one selector required to query")
}
if start > end {
return QueryRes{}, fmt.Errorf("start time cannot be after end time")
}
queryURL := c.cfg.ReadPathURL.JoinPath("/loki/api/v1/query_range")
values := url.Values{}
values.Set("query", selectorString(selectors))
values.Set("start", fmt.Sprintf("%d", start))
values.Set("end", fmt.Sprintf("%d", end))
queryURL.RawQuery = values.Encode()
req, err := http.NewRequest(http.MethodGet,
queryURL.String(), nil)
if err != nil {
return QueryRes{}, fmt.Errorf("error creating request: %w", err)
}
req = req.WithContext(ctx)
c.setAuthAndTenantHeaders(req)
res, err := c.client.Do(req)
if err != nil {
return QueryRes{}, fmt.Errorf("error executing request: %w", err)
}
defer func() {
_ = res.Body.Close()
}()
data, err := io.ReadAll(res.Body)
if err != nil {
return QueryRes{}, fmt.Errorf("error reading request response: %w", err)
}
if res.StatusCode < 200 || res.StatusCode >= 300 {
if len(data) > 0 {
c.log.Error("Error response from Loki", "response", string(data), "status", res.StatusCode)
} else {
c.log.Error("Error response from Loki with an empty body", "status", res.StatusCode)
}
return QueryRes{}, fmt.Errorf("received a non-200 response from loki, status: %d", res.StatusCode)
}
queryRes := QueryRes{}
err = json.Unmarshal(data, &queryRes)
if err != nil {
fmt.Println(string(data))
return QueryRes{}, fmt.Errorf("error parsing request response: %w", err)
}
return queryRes, nil
}
func selectorString(selectors []Selector) string {
if len(selectors) == 0 {
return "{}"
}
// Build the query selector.
query := ""
for _, s := range selectors {
query += fmt.Sprintf("%s%s%q,", s.Label, s.Op, s.Value)
}
// Remove the last comma, as we append one to every selector.
query = query[:len(query)-1]
return "{" + query + "}"
}
func NewSelector(label, op, value string) (Selector, error) {
if !isValidOperator(op) {
return Selector{}, fmt.Errorf("'%s' is not a valid query operator", op)
}
return Selector{Label: label, Op: Operator(op), Value: value}, nil
}
func isValidOperator(op string) bool {
switch op {
case "=", "!=", "=~", "!~":
return true
}
return false
}
type Stream struct {
Stream map[string]string `json:"stream"`
Values [][2]string `json:"values"`
}
type QueryRes struct {
Status string `json:"status"`
Data QueryData `json:"data"`
}
type QueryData struct {
Result []Stream `json:"result"`
}

View File

@ -4,6 +4,7 @@ import (
"context"
"net/url"
"testing"
"time"
"github.com/grafana/grafana/pkg/setting"
"github.com/stretchr/testify/require"
@ -95,8 +96,62 @@ func TestLokiHTTPClient(t *testing.T) {
// so the x-scope-orgid header is set.
// client.cfg.TenantID = "<your_tenant_id>"
// Authorized request should fail against Grafana Cloud.
// Authorized request should not fail against Grafana Cloud.
err = client.ping(context.Background())
require.NoError(t, err)
})
t.Run("smoke test querying Loki", func(t *testing.T) {
url, err := url.Parse("https://logs-prod-eu-west-0.grafana.net")
require.NoError(t, err)
client := newLokiClient(LokiConfig{
ReadPathURL: url,
WritePathURL: url,
BasicAuthUser: "<your_username>",
BasicAuthPassword: "<your_password>",
}, log.NewNopLogger())
// When running on prem, you might need to set the tenant id,
// so the x-scope-orgid header is set.
// client.cfg.TenantID = "<your_tenant_id>"
// Create an array of selectors that should be used for the
// query.
selectors := []Selector{
{Label: "probe", Op: Eq, Value: "Paris"},
}
// Define the query time range
start := time.Now().Add(-30 * time.Minute).UnixNano()
end := time.Now().UnixNano()
// Authorized request should not fail against Grafana Cloud.
res, err := client.query(context.Background(), selectors, start, end)
require.NoError(t, err)
require.NotNil(t, res)
})
}
func TestSelectorString(t *testing.T) {
selectors := []Selector{{"name", "=", "Bob"}, {"age", "=~", "30"}}
expected := "{name=\"Bob\",age=~\"30\"}"
result := selectorString(selectors)
require.Equal(t, expected, result)
selectors = []Selector{}
expected = "{}"
result = selectorString(selectors)
require.Equal(t, expected, result)
}
func TestNewSelector(t *testing.T) {
selector, err := NewSelector("label", "=", "value")
require.NoError(t, err)
require.Equal(t, "label", selector.Label)
require.Equal(t, Eq, selector.Op)
require.Equal(t, "value", selector.Value)
selector, err = NewSelector("label", "invalid", "value")
require.Error(t, err)
}

View File

@ -5,6 +5,7 @@ import (
"fmt"
"sort"
"testing"
"time"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/infra/log"
@ -140,6 +141,115 @@ func TestRemoteLokiBackend(t *testing.T) {
})
}
func TestMerge(t *testing.T) {
testCases := []struct {
name string
res QueryRes
ruleID string
expectedTime []time.Time
}{
{
name: "Should return values from multiple streams in right order",
res: QueryRes{
Data: QueryData{
Result: []Stream{
{
Stream: map[string]string{
"current": "pending",
},
Values: [][2]string{
{"1", `{"schemaVersion": 1, "previous": "normal", "current": "pending", "values":{"a": "b"}}`},
},
},
{
Stream: map[string]string{
"current": "firing",
},
Values: [][2]string{
{"2", `{"schemaVersion": 1, "previous": "pending", "current": "firing", "values":{"a": "b"}}`},
},
},
},
},
},
ruleID: "123456",
expectedTime: []time.Time{
time.Unix(0, 1),
time.Unix(0, 2),
},
},
{
name: "Should handle empty values",
res: QueryRes{
Data: QueryData{
Result: []Stream{
{
Stream: map[string]string{
"current": "normal",
},
Values: [][2]string{},
},
},
},
},
ruleID: "123456",
expectedTime: []time.Time{},
},
{
name: "Should handle multiple values in one stream",
res: QueryRes{
Data: QueryData{
Result: []Stream{
{
Stream: map[string]string{
"current": "normal",
},
Values: [][2]string{
{"1", `{"schemaVersion": 1, "previous": "firing", "current": "normal", "values":{"a": "b"}}`},
{"2", `{"schemaVersion": 1, "previous": "firing", "current": "normal", "values":{"a": "b"}}`},
},
},
{
Stream: map[string]string{
"current": "firing",
},
Values: [][2]string{
{"3", `{"schemaVersion": 1, "previous": "pending", "current": "firing", "values":{"a": "b"}}`},
},
},
},
},
},
ruleID: "123456",
expectedTime: []time.Time{
time.Unix(0, 1),
time.Unix(0, 2),
time.Unix(0, 3),
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
m, err := merge(tc.res, tc.ruleID)
require.NoError(t, err)
var dfTimeColumn *data.Field
for _, f := range m.Fields {
if f.Name == dfTime {
dfTimeColumn = f
}
}
require.NotNil(t, dfTimeColumn)
for i := 0; i < len(tc.expectedTime); i++ {
require.Equal(t, tc.expectedTime[i], dfTimeColumn.At(i))
}
})
}
}
func singleFromNormal(st *state.State) []state.StateTransition {
return []state.StateTransition{
{