From 1694a35e0c416f17e6b6c7c4ab59df6093f1c5bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Philippe=20Qu=C3=A9m=C3=A9ner?= Date: Thu, 2 Feb 2023 23:31:51 +0100 Subject: [PATCH] Alerting: implement loki query for alert state history (#61992) * Alerting: implement loki query for alert state history * extract selector building * add unit tests for selector creation * backup * give selectors their own type * build dataframe * add some tests * small changes after manual testing * use struct client * golint * more golint * Make RuleUID optional for Loki implementation * Drop initial assumption that we only have one series * Pare down to three columns, fix timestamp overflows, improve failure cases in loki responses * Embed structred log lines in the dataframe as objects rather than json strings * Include state history label filter * Remove dead code --------- Co-authored-by: Alex Weaver --- pkg/services/ngalert/state/historian/loki.go | 154 +++++++++++++++++- .../ngalert/state/historian/loki_http.go | 124 ++++++++++++++ .../ngalert/state/historian/loki_http_test.go | 57 ++++++- .../ngalert/state/historian/loki_test.go | 110 +++++++++++++ 4 files changed, 443 insertions(+), 2 deletions(-) diff --git a/pkg/services/ngalert/state/historian/loki.go b/pkg/services/ngalert/state/historian/loki.go index f46db89370c..4158887aa1d 100644 --- a/pkg/services/ngalert/state/historian/loki.go +++ b/pkg/services/ngalert/state/historian/loki.go @@ -4,6 +4,9 @@ import ( "context" "encoding/json" "fmt" + "math" + "strconv" + "time" "github.com/grafana/grafana-plugin-sdk-go/data" @@ -20,6 +23,10 @@ const ( RuleUIDLabel = "ruleUID" GroupLabel = "group" FolderUIDLabel = "folderUID" + // Name of the columns used in the dataframe. + dfTime = "time" + dfLine = "line" + dfLabels = "labels" ) const ( @@ -30,6 +37,7 @@ const ( type remoteLokiClient interface { ping(context.Context) error push(context.Context, []stream) error + query(ctx context.Context, selectors []Selector, start, end int64) (QueryRes, error) } type RemoteLokiBackend struct { @@ -66,7 +74,140 @@ func (h *RemoteLokiBackend) RecordStatesAsync(ctx context.Context, rule history_ } func (h *RemoteLokiBackend) QueryStates(ctx context.Context, query models.HistoryQuery) (*data.Frame, error) { - return data.NewFrame("states"), nil + selectors, err := buildSelectors(query) + if err != nil { + return nil, fmt.Errorf("failed to build the provided selectors: %w", err) + } + // Timestamps are expected in RFC3339Nano. + res, err := h.client.query(ctx, selectors, query.From.UnixNano(), query.To.UnixNano()) + if err != nil { + return nil, err + } + return merge(res, query.RuleUID) +} + +func buildSelectors(query models.HistoryQuery) ([]Selector, error) { + // +2 as OrgID and the state history label will always be selectors at the API level. + selectors := make([]Selector, len(query.Labels)+2) + + // Set the predefined selector orgID. + selector, err := NewSelector(OrgIDLabel, "=", fmt.Sprintf("%d", query.OrgID)) + if err != nil { + return nil, err + } + selectors[0] = selector + + // Set the predefined selector for the state history label. + selector, err = NewSelector(StateHistoryLabelKey, "=", StateHistoryLabelValue) + if err != nil { + return nil, err + } + selectors[1] = selector + + // Set the label selectors + i := 2 + for label, val := range query.Labels { + selector, err = NewSelector(label, "=", val) + if err != nil { + return nil, err + } + selectors[i] = selector + i++ + } + + // Set the optional special selector rule_id + if query.RuleUID != "" { + rsel, err := NewSelector(RuleUIDLabel, "=", query.RuleUID) + if err != nil { + return nil, err + } + selectors = append(selectors, rsel) + } + + return selectors, nil +} + +// merge will put all the results in one array sorted by timestamp. +func merge(res QueryRes, ruleUID string) (*data.Frame, error) { + // Find the total number of elements in all arrays. + totalLen := 0 + for _, arr := range res.Data.Result { + totalLen += len(arr.Values) + } + + // Create a new slice to store the merged elements. + frame := data.NewFrame("states") + + // We merge all series into a single linear history. + lbls := data.Labels(map[string]string{}) + + // We represent state history as a single merged history, that roughly corresponds to what you get in the Grafana Explore tab when querying Loki directly. + // The format is composed of the following vectors: + // 1. `time` - timestamp - when the transition happened + // 2. `line` - JSON - the full data of the transition + // 3. `labels` - JSON - the labels associated with that state transition + times := make([]time.Time, 0, totalLen) + lines := make([]json.RawMessage, 0, totalLen) + labels := make([]json.RawMessage, 0, totalLen) + + // Initialize a slice of pointers to the current position in each array. + pointers := make([]int, len(res.Data.Result)) + for { + minTime := int64(math.MaxInt64) + minEl := [2]string{} + minElStreamIdx := -1 + // Find the element with the earliest time among all arrays. + for i, stream := range res.Data.Result { + // Skip if we already reached the end of the current array. + if len(stream.Values) == pointers[i] { + continue + } + curTime, err := strconv.ParseInt(stream.Values[pointers[i]][0], 10, 64) + if err != nil { + return nil, fmt.Errorf("failed to parse timestamp from loki response: %w", err) + } + if pointers[i] < len(stream.Values) && curTime < minTime { + minTime = curTime + minEl = stream.Values[pointers[i]] + minElStreamIdx = i + } + } + // If all pointers have reached the end of their arrays, we're done. + if minElStreamIdx == -1 { + break + } + var entry lokiEntry + err := json.Unmarshal([]byte(minEl[1]), &entry) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal entry: %w", err) + } + // Append the minimum element to the merged slice and move the pointer. + tsNano, err := strconv.ParseInt(minEl[0], 10, 64) + if err != nil { + return nil, fmt.Errorf("failed to parse timestamp in response: %w", err) + } + // TODO: In general, perhaps we should omit the offending line and log, rather than failing the request entirely. + streamLbls := res.Data.Result[minElStreamIdx].Stream + lblsJson, err := json.Marshal(streamLbls) + if err != nil { + return nil, fmt.Errorf("failed to serialize stream labels: %w", err) + } + line, err := jsonifyRow(minEl[1]) + if err != nil { + return nil, fmt.Errorf("a line was in an invalid format: %w", err) + } + + times = append(times, time.Unix(0, tsNano)) + labels = append(labels, lblsJson) + lines = append(lines, line) + pointers[minElStreamIdx]++ + } + + frame.Fields = append(frame.Fields, data.NewField(dfTime, lbls, times)) + frame.Fields = append(frame.Fields, data.NewField(dfLine, lbls, lines)) + frame.Fields = append(frame.Fields, data.NewField(dfLabels, lbls, labels)) + + return frame, nil } func statesToStreams(rule history_model.RuleMeta, states []state.StateTransition, externalLabels map[string]string, logger log.Logger) []stream { @@ -150,3 +291,14 @@ func valuesAsDataBlob(state *state.State) *simplejson.Json { return jsonifyValues(state.Values) } + +func jsonifyRow(line string) (json.RawMessage, error) { + // Ser/deser to validate the contents of the log line before shipping it forward. + // TODO: We may want to remove this in the future, as we already have the value in the form of a []byte, and json.RawMessage is also a []byte. + // TODO: Though, if the log line does not contain valid JSON, this can cause problems later on when rendering the dataframe. + var entry lokiEntry + if err := json.Unmarshal([]byte(line), &entry); err != nil { + return nil, err + } + return json.Marshal(entry) +} diff --git a/pkg/services/ngalert/state/historian/loki_http.go b/pkg/services/ngalert/state/historian/loki_http.go index 2e1d9ba5397..37a87e73add 100644 --- a/pkg/services/ngalert/state/historian/loki_http.go +++ b/pkg/services/ngalert/state/historian/loki_http.go @@ -58,6 +58,28 @@ type httpLokiClient struct { log log.Logger } +// Kind of Operation (=, !=, =~, !~) +type Operator string + +const ( + // Equal operator (=) + Eq Operator = "=" + // Not Equal operator (!=) + Neq Operator = "!=" + // Equal operator supporting RegEx (=~) + EqRegEx Operator = "=~" + // Not Equal operator supporting RegEx (!~) + NeqRegEx Operator = "!~" +) + +type Selector struct { + // Label to Select + Label string + Op Operator + // Value that is expected + Value string +} + func newLokiClient(cfg LokiConfig, logger log.Logger) *httpLokiClient { return &httpLokiClient{ client: http.Client{ @@ -164,3 +186,105 @@ func (c *httpLokiClient) setAuthAndTenantHeaders(req *http.Request) { req.Header.Add("X-Scope-OrgID", c.cfg.TenantID) } } +func (c *httpLokiClient) query(ctx context.Context, selectors []Selector, start, end int64) (QueryRes, error) { + // Run the pre-flight checks for the query. + if len(selectors) == 0 { + return QueryRes{}, fmt.Errorf("at least one selector required to query") + } + if start > end { + return QueryRes{}, fmt.Errorf("start time cannot be after end time") + } + + queryURL := c.cfg.ReadPathURL.JoinPath("/loki/api/v1/query_range") + + values := url.Values{} + values.Set("query", selectorString(selectors)) + values.Set("start", fmt.Sprintf("%d", start)) + values.Set("end", fmt.Sprintf("%d", end)) + + queryURL.RawQuery = values.Encode() + + req, err := http.NewRequest(http.MethodGet, + queryURL.String(), nil) + if err != nil { + return QueryRes{}, fmt.Errorf("error creating request: %w", err) + } + + req = req.WithContext(ctx) + c.setAuthAndTenantHeaders(req) + + res, err := c.client.Do(req) + if err != nil { + return QueryRes{}, fmt.Errorf("error executing request: %w", err) + } + + defer func() { + _ = res.Body.Close() + }() + + data, err := io.ReadAll(res.Body) + if err != nil { + return QueryRes{}, fmt.Errorf("error reading request response: %w", err) + } + + if res.StatusCode < 200 || res.StatusCode >= 300 { + if len(data) > 0 { + c.log.Error("Error response from Loki", "response", string(data), "status", res.StatusCode) + } else { + c.log.Error("Error response from Loki with an empty body", "status", res.StatusCode) + } + return QueryRes{}, fmt.Errorf("received a non-200 response from loki, status: %d", res.StatusCode) + } + + queryRes := QueryRes{} + err = json.Unmarshal(data, &queryRes) + if err != nil { + fmt.Println(string(data)) + return QueryRes{}, fmt.Errorf("error parsing request response: %w", err) + } + + return queryRes, nil +} + +func selectorString(selectors []Selector) string { + if len(selectors) == 0 { + return "{}" + } + // Build the query selector. + query := "" + for _, s := range selectors { + query += fmt.Sprintf("%s%s%q,", s.Label, s.Op, s.Value) + } + // Remove the last comma, as we append one to every selector. + query = query[:len(query)-1] + return "{" + query + "}" +} + +func NewSelector(label, op, value string) (Selector, error) { + if !isValidOperator(op) { + return Selector{}, fmt.Errorf("'%s' is not a valid query operator", op) + } + return Selector{Label: label, Op: Operator(op), Value: value}, nil +} + +func isValidOperator(op string) bool { + switch op { + case "=", "!=", "=~", "!~": + return true + } + return false +} + +type Stream struct { + Stream map[string]string `json:"stream"` + Values [][2]string `json:"values"` +} + +type QueryRes struct { + Status string `json:"status"` + Data QueryData `json:"data"` +} + +type QueryData struct { + Result []Stream `json:"result"` +} diff --git a/pkg/services/ngalert/state/historian/loki_http_test.go b/pkg/services/ngalert/state/historian/loki_http_test.go index 629174edb11..86b856f3a4e 100644 --- a/pkg/services/ngalert/state/historian/loki_http_test.go +++ b/pkg/services/ngalert/state/historian/loki_http_test.go @@ -4,6 +4,7 @@ import ( "context" "net/url" "testing" + "time" "github.com/grafana/grafana/pkg/setting" "github.com/stretchr/testify/require" @@ -95,8 +96,62 @@ func TestLokiHTTPClient(t *testing.T) { // so the x-scope-orgid header is set. // client.cfg.TenantID = "" - // Authorized request should fail against Grafana Cloud. + // Authorized request should not fail against Grafana Cloud. err = client.ping(context.Background()) require.NoError(t, err) }) + + t.Run("smoke test querying Loki", func(t *testing.T) { + url, err := url.Parse("https://logs-prod-eu-west-0.grafana.net") + require.NoError(t, err) + + client := newLokiClient(LokiConfig{ + ReadPathURL: url, + WritePathURL: url, + BasicAuthUser: "", + BasicAuthPassword: "", + }, log.NewNopLogger()) + + // When running on prem, you might need to set the tenant id, + // so the x-scope-orgid header is set. + // client.cfg.TenantID = "" + + // Create an array of selectors that should be used for the + // query. + selectors := []Selector{ + {Label: "probe", Op: Eq, Value: "Paris"}, + } + + // Define the query time range + start := time.Now().Add(-30 * time.Minute).UnixNano() + end := time.Now().UnixNano() + + // Authorized request should not fail against Grafana Cloud. + res, err := client.query(context.Background(), selectors, start, end) + require.NoError(t, err) + require.NotNil(t, res) + }) +} + +func TestSelectorString(t *testing.T) { + selectors := []Selector{{"name", "=", "Bob"}, {"age", "=~", "30"}} + expected := "{name=\"Bob\",age=~\"30\"}" + result := selectorString(selectors) + require.Equal(t, expected, result) + + selectors = []Selector{} + expected = "{}" + result = selectorString(selectors) + require.Equal(t, expected, result) +} + +func TestNewSelector(t *testing.T) { + selector, err := NewSelector("label", "=", "value") + require.NoError(t, err) + require.Equal(t, "label", selector.Label) + require.Equal(t, Eq, selector.Op) + require.Equal(t, "value", selector.Value) + + selector, err = NewSelector("label", "invalid", "value") + require.Error(t, err) } diff --git a/pkg/services/ngalert/state/historian/loki_test.go b/pkg/services/ngalert/state/historian/loki_test.go index 01d04cdcecd..9142251e492 100644 --- a/pkg/services/ngalert/state/historian/loki_test.go +++ b/pkg/services/ngalert/state/historian/loki_test.go @@ -5,6 +5,7 @@ import ( "fmt" "sort" "testing" + "time" "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana/pkg/infra/log" @@ -140,6 +141,115 @@ func TestRemoteLokiBackend(t *testing.T) { }) } +func TestMerge(t *testing.T) { + testCases := []struct { + name string + res QueryRes + ruleID string + expectedTime []time.Time + }{ + { + name: "Should return values from multiple streams in right order", + res: QueryRes{ + Data: QueryData{ + Result: []Stream{ + { + Stream: map[string]string{ + "current": "pending", + }, + Values: [][2]string{ + {"1", `{"schemaVersion": 1, "previous": "normal", "current": "pending", "values":{"a": "b"}}`}, + }, + }, + { + Stream: map[string]string{ + "current": "firing", + }, + Values: [][2]string{ + {"2", `{"schemaVersion": 1, "previous": "pending", "current": "firing", "values":{"a": "b"}}`}, + }, + }, + }, + }, + }, + ruleID: "123456", + expectedTime: []time.Time{ + time.Unix(0, 1), + time.Unix(0, 2), + }, + }, + { + name: "Should handle empty values", + res: QueryRes{ + Data: QueryData{ + Result: []Stream{ + { + Stream: map[string]string{ + "current": "normal", + }, + Values: [][2]string{}, + }, + }, + }, + }, + ruleID: "123456", + expectedTime: []time.Time{}, + }, + { + name: "Should handle multiple values in one stream", + res: QueryRes{ + Data: QueryData{ + Result: []Stream{ + { + Stream: map[string]string{ + "current": "normal", + }, + Values: [][2]string{ + {"1", `{"schemaVersion": 1, "previous": "firing", "current": "normal", "values":{"a": "b"}}`}, + {"2", `{"schemaVersion": 1, "previous": "firing", "current": "normal", "values":{"a": "b"}}`}, + }, + }, + { + Stream: map[string]string{ + "current": "firing", + }, + Values: [][2]string{ + {"3", `{"schemaVersion": 1, "previous": "pending", "current": "firing", "values":{"a": "b"}}`}, + }, + }, + }, + }, + }, + ruleID: "123456", + expectedTime: []time.Time{ + time.Unix(0, 1), + time.Unix(0, 2), + time.Unix(0, 3), + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + m, err := merge(tc.res, tc.ruleID) + require.NoError(t, err) + + var dfTimeColumn *data.Field + for _, f := range m.Fields { + if f.Name == dfTime { + dfTimeColumn = f + } + } + + require.NotNil(t, dfTimeColumn) + + for i := 0; i < len(tc.expectedTime); i++ { + require.Equal(t, tc.expectedTime[i], dfTimeColumn.At(i)) + } + }) + } +} + func singleFromNormal(st *state.State) []state.StateTransition { return []state.StateTransition{ {