Alerting: No longer index state history log streams by instance labels (#65474)

* Remove private labels

* No longer index by instance labels

* Labels are now invariant, only build them once

* Remove bucketing since everything is in a single stream

* Refactor statesToStreams to only return a single unified log stream

* Don't query on labels that no longer exist

* Move selector logic to loki layer, genericize client to work in terms of straight logQL

* Add support for line-level label filters in query

* Combine existing selector tests for better parallelism

* Tests for logQL construction

* Underscore instead of dot for unwrapping labels in logql
This commit is contained in:
Alexander Weaver 2023-03-29 11:52:11 -05:00 committed by GitHub
parent 4839f1543a
commit a416100abc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 167 additions and 184 deletions

View File

@ -41,7 +41,7 @@ const defaultQueryRange = 6 * time.Hour
type remoteLokiClient interface {
ping(context.Context) error
push(context.Context, []stream) error
rangeQuery(ctx context.Context, selectors []Selector, start, end int64) (queryRes, error)
rangeQuery(ctx context.Context, logQL string, start, end int64) (queryRes, error)
}
// RemoteLokibackend is a state.Historian that records state history to an external Loki instance.
@ -71,10 +71,10 @@ func (h *RemoteLokiBackend) TestConnection(ctx context.Context) error {
// Record writes a number of state transitions for a given rule to an external Loki instance.
func (h *RemoteLokiBackend) Record(ctx context.Context, rule history_model.RuleMeta, states []state.StateTransition) <-chan error {
logger := h.log.FromContext(ctx)
streams := statesToStreams(rule, states, h.externalLabels, logger)
logStream := statesToStream(rule, states, h.externalLabels, logger)
errCh := make(chan error, 1)
if len(streams) == 0 {
if len(logStream.Values) == 0 {
close(errCh)
return errCh
}
@ -84,16 +84,12 @@ func (h *RemoteLokiBackend) Record(ctx context.Context, rule history_model.RuleM
org := fmt.Sprint(rule.OrgID)
h.metrics.WritesTotal.WithLabelValues(org, "loki").Inc()
samples := 0
for _, s := range streams {
samples += len(s.Values)
}
h.metrics.TransitionsTotal.WithLabelValues(org).Add(float64(samples))
h.metrics.TransitionsTotal.WithLabelValues(org).Add(float64(len(logStream.Values)))
if err := h.recordStreams(ctx, streams, logger); err != nil {
if err := h.recordStreams(ctx, []stream{logStream}, logger); err != nil {
logger.Error("Failed to save alert state history batch", "error", err)
h.metrics.WritesFailed.WithLabelValues(org, "loki").Inc()
h.metrics.TransitionsFailed.WithLabelValues(org).Add(float64(samples))
h.metrics.TransitionsFailed.WithLabelValues(org).Add(float64(len(logStream.Values)))
errCh <- fmt.Errorf("failed to save alert state history batch: %w", err)
}
}()
@ -102,9 +98,9 @@ func (h *RemoteLokiBackend) Record(ctx context.Context, rule history_model.RuleM
// Query retrieves state history entries from an external Loki instance and formats the results into a dataframe.
func (h *RemoteLokiBackend) Query(ctx context.Context, query models.HistoryQuery) (*data.Frame, error) {
selectors, err := buildSelectors(query)
logQL, err := buildLogQuery(query)
if err != nil {
return nil, fmt.Errorf("failed to build the provided selectors: %w", err)
return nil, err
}
now := time.Now().UTC()
@ -116,7 +112,7 @@ func (h *RemoteLokiBackend) Query(ctx context.Context, query models.HistoryQuery
}
// Timestamps are expected in RFC3339Nano.
res, err := h.client.rangeQuery(ctx, selectors, query.From.UnixNano(), query.To.UnixNano())
res, err := h.client.rangeQuery(ctx, logQL, query.From.UnixNano(), query.To.UnixNano())
if err != nil {
return nil, err
}
@ -124,8 +120,8 @@ func (h *RemoteLokiBackend) Query(ctx context.Context, query models.HistoryQuery
}
func buildSelectors(query models.HistoryQuery) ([]Selector, error) {
// +2 as OrgID and the state history label will always be selectors at the API level.
selectors := make([]Selector, len(query.Labels)+2)
// OrgID and the state history label are static and will be included in all queries.
selectors := make([]Selector, 2)
// Set the predefined selector orgID.
selector, err := NewSelector(OrgIDLabel, "=", fmt.Sprintf("%d", query.OrgID))
@ -141,17 +137,6 @@ func buildSelectors(query models.HistoryQuery) ([]Selector, error) {
}
selectors[1] = selector
// Set the label selectors
i := 2
for label, val := range query.Labels {
selector, err = NewSelector(label, "=", val)
if err != nil {
return nil, err
}
selectors[i] = selector
i++
}
// Set the optional special selector rule_id
if query.RuleUID != "" {
rsel, err := NewSelector(RuleUIDLabel, "=", query.RuleUID)
@ -241,26 +226,21 @@ func merge(res queryRes, ruleUID string) (*data.Frame, error) {
return frame, nil
}
func statesToStreams(rule history_model.RuleMeta, states []state.StateTransition, externalLabels map[string]string, logger log.Logger) []stream {
buckets := make(map[string][]sample) // label repr (JSON) -> entries
func statesToStream(rule history_model.RuleMeta, states []state.StateTransition, externalLabels map[string]string, logger log.Logger) stream {
labels := mergeLabels(make(map[string]string), externalLabels)
// System-defined labels take precedence over user-defined external labels.
labels[StateHistoryLabelKey] = StateHistoryLabelValue
labels[OrgIDLabel] = fmt.Sprint(rule.OrgID)
labels[RuleUIDLabel] = fmt.Sprint(rule.UID)
labels[GroupLabel] = fmt.Sprint(rule.Group)
labels[FolderUIDLabel] = fmt.Sprint(rule.NamespaceUID)
samples := make([]sample, 0, len(states))
for _, state := range states {
if !shouldRecord(state) {
continue
}
labels := mergeLabels(removePrivateLabels(state.State.Labels), externalLabels)
labels[StateHistoryLabelKey] = StateHistoryLabelValue
labels[OrgIDLabel] = fmt.Sprint(rule.OrgID)
labels[RuleUIDLabel] = fmt.Sprint(rule.UID)
labels[GroupLabel] = fmt.Sprint(rule.Group)
labels[FolderUIDLabel] = fmt.Sprint(rule.NamespaceUID)
lblJsn, err := json.Marshal(labels)
if err != nil {
logger.Error("Failed to marshal labels to JSON", "error", err)
continue
}
repr := string(lblJsn)
entry := lokiEntry{
SchemaVersion: 1,
Previous: state.PreviousFormatted(),
@ -268,7 +248,7 @@ func statesToStreams(rule history_model.RuleMeta, states []state.StateTransition
Values: valuesAsDataBlob(state.State),
DashboardUID: rule.DashboardUID,
PanelID: rule.PanelID,
InstanceLabels: state.Labels,
InstanceLabels: removePrivateLabels(state.Labels),
}
if state.State.State == eval.Error {
entry.Error = state.Error.Error()
@ -281,26 +261,16 @@ func statesToStreams(rule history_model.RuleMeta, states []state.StateTransition
}
line := string(jsn)
buckets[repr] = append(buckets[repr], sample{
samples = append(samples, sample{
T: state.State.LastEvaluationTime,
V: line,
})
}
result := make([]stream, 0, len(buckets))
for repr, rows := range buckets {
labels, err := data.LabelsFromString(repr)
if err != nil {
logger.Error("Failed to parse frame labels, skipping state history batch: %w", err)
continue
}
result = append(result, stream{
Stream: labels,
Values: rows,
})
return stream{
Stream: labels,
Values: samples,
}
return result
}
func (h *RemoteLokiBackend) recordStreams(ctx context.Context, streams []stream, logger log.Logger) error {
@ -343,3 +313,60 @@ func jsonifyRow(line string) (json.RawMessage, error) {
}
return json.Marshal(entry)
}
type Selector struct {
// Label to Select
Label string
Op Operator
// Value that is expected
Value string
}
func NewSelector(label, op, value string) (Selector, error) {
if !isValidOperator(op) {
return Selector{}, fmt.Errorf("'%s' is not a valid query operator", op)
}
return Selector{Label: label, Op: Operator(op), Value: value}, nil
}
func selectorString(selectors []Selector) string {
if len(selectors) == 0 {
return "{}"
}
// Build the query selector.
query := ""
for _, s := range selectors {
query += fmt.Sprintf("%s%s%q,", s.Label, s.Op, s.Value)
}
// Remove the last comma, as we append one to every selector.
query = query[:len(query)-1]
return "{" + query + "}"
}
func isValidOperator(op string) bool {
switch op {
case "=", "!=", "=~", "!~":
return true
}
return false
}
func buildLogQuery(query models.HistoryQuery) (string, error) {
selectors, err := buildSelectors(query)
if err != nil {
return "", fmt.Errorf("failed to build the provided selectors: %w", err)
}
logQL := selectorString(selectors)
labelFilters := ""
for k, v := range query.Labels {
labelFilters += fmt.Sprintf(" | labels_%s=%q", k, v)
}
if labelFilters != "" {
logQL = fmt.Sprintf("%s | json%s", logQL, labelFilters)
}
return logQL, nil
}

View File

@ -102,14 +102,6 @@ const (
NeqRegEx Operator = "!~"
)
type Selector struct {
// Label to Select
Label string
Op Operator
// Value that is expected
Value string
}
func newLokiClient(cfg LokiConfig, req client.Requester, metrics *metrics.Historian, logger log.Logger) *httpLokiClient {
tc := client.NewTimedClient(req, metrics.WriteDuration)
return &httpLokiClient{
@ -234,11 +226,8 @@ func (c *httpLokiClient) setAuthAndTenantHeaders(req *http.Request) {
req.Header.Add("X-Scope-OrgID", c.cfg.TenantID)
}
}
func (c *httpLokiClient) rangeQuery(ctx context.Context, selectors []Selector, start, end int64) (queryRes, error) {
func (c *httpLokiClient) rangeQuery(ctx context.Context, logQL string, start, end int64) (queryRes, error) {
// Run the pre-flight checks for the query.
if len(selectors) == 0 {
return queryRes{}, fmt.Errorf("at least one selector required to query")
}
if start > end {
return queryRes{}, fmt.Errorf("start time cannot be after end time")
}
@ -246,7 +235,7 @@ func (c *httpLokiClient) rangeQuery(ctx context.Context, selectors []Selector, s
queryURL := c.cfg.ReadPathURL.JoinPath("/loki/api/v1/query_range")
values := url.Values{}
values.Set("query", selectorString(selectors))
values.Set("query", logQL)
values.Set("start", fmt.Sprintf("%d", start))
values.Set("end", fmt.Sprintf("%d", end))
@ -294,35 +283,6 @@ func (c *httpLokiClient) rangeQuery(ctx context.Context, selectors []Selector, s
return result, nil
}
func selectorString(selectors []Selector) string {
if len(selectors) == 0 {
return "{}"
}
// Build the query selector.
query := ""
for _, s := range selectors {
query += fmt.Sprintf("%s%s%q,", s.Label, s.Op, s.Value)
}
// Remove the last comma, as we append one to every selector.
query = query[:len(query)-1]
return "{" + query + "}"
}
func NewSelector(label, op, value string) (Selector, error) {
if !isValidOperator(op) {
return Selector{}, fmt.Errorf("'%s' is not a valid query operator", op)
}
return Selector{Label: label, Op: Operator(op), Value: value}, nil
}
func isValidOperator(op string) bool {
switch op {
case "=", "!=", "=~", "!~":
return true
}
return false
}
type queryRes struct {
Data queryData `json:"data"`
}

View File

@ -164,46 +164,19 @@ func TestLokiHTTPClient_Manual(t *testing.T) {
// so the x-scope-orgid header is set.
// client.cfg.TenantID = "<your_tenant_id>"
// Create an array of selectors that should be used for the
// query.
selectors := []Selector{
{Label: "probe", Op: Eq, Value: "Paris"},
}
logQL := `{probe="Paris"}`
// Define the query time range
start := time.Now().Add(-30 * time.Minute).UnixNano()
end := time.Now().UnixNano()
// Authorized request should not fail against Grafana Cloud.
res, err := client.rangeQuery(context.Background(), selectors, start, end)
res, err := client.rangeQuery(context.Background(), logQL, start, end)
require.NoError(t, err)
require.NotNil(t, res)
})
}
func TestSelectorString(t *testing.T) {
selectors := []Selector{{"name", "=", "Bob"}, {"age", "=~", "30"}}
expected := "{name=\"Bob\",age=~\"30\"}"
result := selectorString(selectors)
require.Equal(t, expected, result)
selectors = []Selector{}
expected = "{}"
result = selectorString(selectors)
require.Equal(t, expected, result)
}
func TestNewSelector(t *testing.T) {
selector, err := NewSelector("label", "=", "value")
require.NoError(t, err)
require.Equal(t, "label", selector.Label)
require.Equal(t, Eq, selector.Op)
require.Equal(t, "value", selector.Value)
selector, err = NewSelector("label", "invalid", "value")
require.Error(t, err)
}
func TestRow(t *testing.T) {
t.Run("marshal", func(t *testing.T) {
row := sample{

View File

@ -8,7 +8,6 @@ import (
"io"
"net/http"
"net/url"
"sort"
"testing"
"time"
@ -16,6 +15,7 @@ import (
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/state"
history_model "github.com/grafana/grafana/pkg/services/ngalert/state/historian/model"
"github.com/prometheus/client_golang/prometheus"
@ -25,15 +25,15 @@ import (
)
func TestRemoteLokiBackend(t *testing.T) {
t.Run("statesToStreams", func(t *testing.T) {
t.Run("statesToStream", func(t *testing.T) {
t.Run("skips non-transitory states", func(t *testing.T) {
rule := createTestRule()
l := log.NewNopLogger()
states := singleFromNormal(&state.State{State: eval.Normal})
res := statesToStreams(rule, states, nil, l)
res := statesToStream(rule, states, nil, l)
require.Empty(t, res)
require.Empty(t, res.Values)
})
t.Run("maps evaluation errors", func(t *testing.T) {
@ -41,7 +41,7 @@ func TestRemoteLokiBackend(t *testing.T) {
l := log.NewNopLogger()
states := singleFromNormal(&state.State{State: eval.Error, Error: fmt.Errorf("oh no")})
res := statesToStreams(rule, states, nil, l)
res := statesToStream(rule, states, nil, l)
entry := requireSingleEntry(t, res)
require.Contains(t, entry.Error, "oh no")
@ -52,7 +52,7 @@ func TestRemoteLokiBackend(t *testing.T) {
l := log.NewNopLogger()
states := singleFromNormal(&state.State{State: eval.NoData})
res := statesToStreams(rule, states, nil, l)
res := statesToStream(rule, states, nil, l)
_ = requireSingleEntry(t, res)
})
@ -65,55 +65,16 @@ func TestRemoteLokiBackend(t *testing.T) {
Labels: data.Labels{"a": "b"},
})
res := statesToStreams(rule, states, nil, l)
res := statesToStream(rule, states, nil, l)
require.Len(t, res, 1)
exp := map[string]string{
StateHistoryLabelKey: StateHistoryLabelValue,
"folderUID": rule.NamespaceUID,
"group": rule.Group,
"orgID": fmt.Sprint(rule.OrgID),
"ruleUID": rule.UID,
"a": "b",
}
require.Equal(t, exp, res[0].Stream)
})
t.Run("groups streams based on combined labels", func(t *testing.T) {
rule := createTestRule()
l := log.NewNopLogger()
states := []state.StateTransition{
{
PreviousState: eval.Normal,
State: &state.State{
State: eval.Alerting,
Labels: data.Labels{"a": "b"},
},
},
{
PreviousState: eval.Normal,
State: &state.State{
State: eval.Alerting,
Labels: data.Labels{"a": "b"},
},
},
{
PreviousState: eval.Normal,
State: &state.State{
State: eval.Alerting,
Labels: data.Labels{"c": "d"},
},
},
}
res := statesToStreams(rule, states, nil, l)
require.Len(t, res, 2)
sort.Slice(res, func(i, j int) bool { return len(res[i].Values) > len(res[j].Values) })
require.Contains(t, res[0].Stream, "a")
require.Len(t, res[0].Values, 2)
require.Contains(t, res[1].Stream, "c")
require.Len(t, res[1].Values, 1)
require.Equal(t, exp, res.Stream)
})
t.Run("excludes private labels", func(t *testing.T) {
@ -124,10 +85,9 @@ func TestRemoteLokiBackend(t *testing.T) {
Labels: data.Labels{"__private__": "b"},
})
res := statesToStreams(rule, states, nil, l)
res := statesToStream(rule, states, nil, l)
require.Len(t, res, 1)
require.NotContains(t, res[0].Stream, "__private__")
require.NotContains(t, res.Stream, "__private__")
})
t.Run("includes instance labels in log line", func(t *testing.T) {
@ -138,7 +98,7 @@ func TestRemoteLokiBackend(t *testing.T) {
Labels: data.Labels{"statelabel": "labelvalue"},
})
res := statesToStreams(rule, states, nil, l)
res := statesToStream(rule, states, nil, l)
entry := requireSingleEntry(t, res)
require.Contains(t, entry.InstanceLabels, "statelabel")
@ -156,7 +116,7 @@ func TestRemoteLokiBackend(t *testing.T) {
},
})
res := statesToStreams(rule, states, nil, l)
res := statesToStream(rule, states, nil, l)
entry := requireSingleEntry(t, res)
require.Len(t, entry.InstanceLabels, 3)
@ -170,7 +130,7 @@ func TestRemoteLokiBackend(t *testing.T) {
Values: map[string]float64{"A": 2.0, "B": 5.5},
})
res := statesToStreams(rule, states, nil, l)
res := statesToStream(rule, states, nil, l)
entry := requireSingleEntry(t, res)
require.NotNil(t, entry.Values)
@ -180,6 +140,70 @@ func TestRemoteLokiBackend(t *testing.T) {
require.InDelta(t, 5.5, entry.Values.Get("B").MustFloat64(), 1e-4)
})
})
t.Run("selector string", func(t *testing.T) {
selectors := []Selector{{"name", "=", "Bob"}, {"age", "=~", "30"}}
expected := "{name=\"Bob\",age=~\"30\"}"
result := selectorString(selectors)
require.Equal(t, expected, result)
selectors = []Selector{}
expected = "{}"
result = selectorString(selectors)
require.Equal(t, expected, result)
})
t.Run("new selector", func(t *testing.T) {
selector, err := NewSelector("label", "=", "value")
require.NoError(t, err)
require.Equal(t, "label", selector.Label)
require.Equal(t, Eq, selector.Op)
require.Equal(t, "value", selector.Value)
selector, err = NewSelector("label", "invalid", "value")
require.Error(t, err)
})
t.Run("buildLogQuery", func(t *testing.T) {
cases := []struct {
name string
query models.HistoryQuery
exp string
}{
{
name: "default includes state history label and orgID label",
query: models.HistoryQuery{},
exp: `{orgID="0",from="state-history"}`,
},
{
name: "adds stream label filter for ruleUID and orgID",
query: models.HistoryQuery{
RuleUID: "rule-uid",
OrgID: 123,
},
exp: `{orgID="123",from="state-history",ruleUID="rule-uid"}`,
},
{
name: "filters instance labels in log line",
query: models.HistoryQuery{
OrgID: 123,
Labels: map[string]string{
"customlabel": "customvalue",
"labeltwo": "labelvaluetwo",
},
},
exp: `{orgID="123",from="state-history"} | json | labels_customlabel="customvalue" | labels_labeltwo="labelvaluetwo"`,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
res, err := buildLogQuery(tc.query)
require.NoError(t, err)
require.Equal(t, tc.exp, res)
})
}
})
}
func TestMerge(t *testing.T) {
@ -428,10 +452,9 @@ func createTestRule() history_model.RuleMeta {
}
}
func requireSingleEntry(t *testing.T, res []stream) lokiEntry {
require.Len(t, res, 1)
require.Len(t, res[0].Values, 1)
return requireEntry(t, res[0].Values[0])
func requireSingleEntry(t *testing.T, res stream) lokiEntry {
require.Len(t, res.Values, 1)
return requireEntry(t, res.Values[0])
}
func requireEntry(t *testing.T, row sample) lokiEntry {