diff --git a/pkg/services/ngalert/state/historian/loki.go b/pkg/services/ngalert/state/historian/loki.go index e18b58d3305..4b1c64c41bd 100644 --- a/pkg/services/ngalert/state/historian/loki.go +++ b/pkg/services/ngalert/state/historian/loki.go @@ -2,15 +2,28 @@ package historian import ( "context" + "encoding/json" + "fmt" + "sort" "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/services/ngalert/eval" "github.com/grafana/grafana/pkg/services/ngalert/models" "github.com/grafana/grafana/pkg/services/ngalert/state" ) +const ( + OrgIDLabel = "orgID" + RuleUIDLabel = "ruleUID" + GroupLabel = "group" + FolderUIDLabel = "folderUID" +) + type remoteLokiClient interface { ping() error + push([]stream) error } type RemoteLokiBackend struct { @@ -30,11 +43,107 @@ func (h *RemoteLokiBackend) TestConnection() error { return h.client.ping() } -func (h *RemoteLokiBackend) RecordStatesAsync(ctx context.Context, _ *models.AlertRule, _ []state.StateTransition) { +func (h *RemoteLokiBackend) RecordStatesAsync(ctx context.Context, rule *models.AlertRule, states []state.StateTransition) { logger := h.log.FromContext(ctx) - logger.Debug("Remote Loki state history backend was called with states") + streams := h.statesToStreams(rule, states, logger) + h.recordStreamsAsync(ctx, streams, logger) } func (h *RemoteLokiBackend) QueryStates(ctx context.Context, query models.HistoryQuery) (*data.Frame, error) { return data.NewFrame("states"), nil } + +func (h *RemoteLokiBackend) statesToStreams(rule *models.AlertRule, states []state.StateTransition, logger log.Logger) []stream { + buckets := make(map[string][]row) // label repr -> entries + for _, state := range states { + if !shouldRecord(state) { + continue + } + + labels := removePrivateLabels(state.State.Labels) + labels[OrgIDLabel] = fmt.Sprint(rule.OrgID) + labels[RuleUIDLabel] = fmt.Sprint(rule.UID) + labels[GroupLabel] = fmt.Sprint(rule.RuleGroup) + labels[FolderUIDLabel] = fmt.Sprint(rule.NamespaceUID) + repr := labels.String() + + entry := lokiEntry{ + SchemaVersion: 1, + Previous: state.PreviousFormatted(), + Current: state.Formatted(), + Values: valuesAsDataBlob(state.State), + } + jsn, err := json.Marshal(entry) + if err != nil { + logger.Error("Failed to construct history record for state, skipping", "error", err) + continue + } + line := string(jsn) + + buckets[repr] = append(buckets[repr], row{ + At: state.State.LastEvaluationTime, + Val: line, + }) + } + + result := make([]stream, 0, len(buckets)) + for repr, rows := range buckets { + labels, err := data.LabelsFromString(repr) + if err != nil { + logger.Error("Failed to parse frame labels, skipping state history batch: %w", err) + continue + } + result = append(result, stream{ + Stream: labels, + Values: rows, + }) + } + + return result +} + +func (h *RemoteLokiBackend) recordStreamsAsync(ctx context.Context, streams []stream, logger log.Logger) { + go func() { + if err := h.recordStreams(ctx, streams, logger); err != nil { + logger.Error("Failed to save alert state history batch", "error", err) + } + }() +} + +func (h *RemoteLokiBackend) recordStreams(ctx context.Context, streams []stream, logger log.Logger) error { + if err := h.client.push(streams); err != nil { + return err + } + logger.Debug("Done saving alert state history batch") + return nil +} + +type lokiEntry struct { + SchemaVersion int `json:"schemaVersion"` + Previous string `json:"previous"` + Current string `json:"current"` + Values *simplejson.Json `json:"values"` +} + +func valuesAsDataBlob(state *state.State) *simplejson.Json { + jsonData := simplejson.New() + + switch state.State { + case eval.Error: + if state.Error == nil { + jsonData.Set("error", nil) + } else { + jsonData.Set("error", state.Error.Error()) + } + case eval.NoData: + jsonData.Set("noData", true) + default: + keys := make([]string, 0, len(state.Values)) + for k := range state.Values { + keys = append(keys, k) + } + sort.Strings(keys) + jsonData.Set("values", simplejson.NewFromAny(state.Values)) + } + return jsonData +} diff --git a/pkg/services/ngalert/state/historian/loki_http.go b/pkg/services/ngalert/state/historian/loki_http.go index bea1693b907..457516b0b4f 100644 --- a/pkg/services/ngalert/state/historian/loki_http.go +++ b/pkg/services/ngalert/state/historian/loki_http.go @@ -1,7 +1,10 @@ package historian import ( + "bytes" + "encoding/json" "fmt" + "io" "net/http" "net/url" "time" @@ -37,18 +40,10 @@ func newLokiClient(cfg LokiConfig, logger log.Logger) *httpLokiClient { func (c *httpLokiClient) ping() error { uri := c.cfg.Url.JoinPath("/loki/api/v1/labels") req, err := http.NewRequest(http.MethodGet, uri.String(), nil) - - if c.cfg.BasicAuthUser != "" || c.cfg.BasicAuthPassword != "" { - req.SetBasicAuth(c.cfg.BasicAuthUser, c.cfg.BasicAuthPassword) - } - - if c.cfg.TenantID != "" { - req.Header.Add("X-Scope-OrgID", c.cfg.TenantID) - } - if err != nil { return fmt.Errorf("error creating request: %w", err) } + c.setAuthAndTenantHeaders(req) res, err := c.client.Do(req) if res != nil { @@ -68,3 +63,71 @@ func (c *httpLokiClient) ping() error { c.log.Debug("Ping request to Loki endpoint succeeded", "status", res.StatusCode) return nil } + +type stream struct { + Stream map[string]string `json:"stream"` + Values []row `json:"values"` +} + +type row struct { + At time.Time + Val string +} + +func (r *row) MarshalJSON() ([]byte, error) { + return json.Marshal([2]string{ + fmt.Sprintf("%d", r.At.UnixNano()), r.Val, + }) +} + +func (c *httpLokiClient) push(s []stream) error { + body := struct { + Streams []stream `json:"streams"` + }{Streams: s} + enc, err := json.Marshal(body) + if err != nil { + return fmt.Errorf("failed to serialize Loki payload: %w", err) + } + + uri := c.cfg.Url.JoinPath("/loki/api/v1/push") + req, err := http.NewRequest(http.MethodPost, uri.String(), bytes.NewBuffer(enc)) + if err != nil { + return fmt.Errorf("failed to create Loki request: %w", err) + } + + c.setAuthAndTenantHeaders(req) + req.Header.Add("content-type", "application/json") + + resp, err := c.client.Do(req) + if resp != nil { + defer func() { + if err := resp.Body.Close(); err != nil { + c.log.Warn("Failed to close response body", "err", err) + } + }() + } + if err != nil { + return fmt.Errorf("failed to send request: %w", err) + } + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + byt, _ := io.ReadAll(resp.Body) + if len(byt) > 0 { + c.log.Error("Error response from Loki", "response", string(byt), "status", resp.StatusCode) + } else { + c.log.Error("Error response from Loki with an empty body", "status", resp.StatusCode) + } + return fmt.Errorf("received a non-200 response from loki, status: %d", resp.StatusCode) + } + return nil +} + +func (c *httpLokiClient) setAuthAndTenantHeaders(req *http.Request) { + if c.cfg.BasicAuthUser != "" || c.cfg.BasicAuthPassword != "" { + req.SetBasicAuth(c.cfg.BasicAuthUser, c.cfg.BasicAuthPassword) + } + + if c.cfg.TenantID != "" { + req.Header.Add("X-Scope-OrgID", c.cfg.TenantID) + } +} diff --git a/pkg/services/ngalert/state/historian/loki_http_test.go b/pkg/services/ngalert/state/historian/loki_http_test.go index 4a76b2e65f4..89947ccd8fc 100644 --- a/pkg/services/ngalert/state/historian/loki_http_test.go +++ b/pkg/services/ngalert/state/historian/loki_http_test.go @@ -12,25 +12,27 @@ import ( func TestLokiHTTPClient(t *testing.T) { t.Skip() - url, err := url.Parse("https://logs-prod-eu-west-0.grafana.net") - require.NoError(t, err) + t.Run("smoke test pinging Loki", func(t *testing.T) { + url, err := url.Parse("https://logs-prod-eu-west-0.grafana.net") + require.NoError(t, err) - client := newLokiClient(LokiConfig{ - Url: url, - }, log.NewNopLogger()) + client := newLokiClient(LokiConfig{ + Url: url, + }, log.NewNopLogger()) - // Unauthorized request should fail against Grafana Cloud. - err = client.ping() - require.Error(t, err) + // Unauthorized request should fail against Grafana Cloud. + err = client.ping() + require.Error(t, err) - client.cfg.BasicAuthUser = "" - client.cfg.BasicAuthPassword = "" + client.cfg.BasicAuthUser = "" + client.cfg.BasicAuthPassword = "" - // When running on prem, you might need to set the tenant id, - // so the x-scope-orgid header is set. - // client.cfg.TenantID = "" + // When running on prem, you might need to set the tenant id, + // so the x-scope-orgid header is set. + // client.cfg.TenantID = "" - // Authorized request should fail against Grafana Cloud. - err = client.ping() - require.NoError(t, err) + // Authorized request should fail against Grafana Cloud. + err = client.ping() + require.NoError(t, err) + }) }