Alerting: Push state history entries to Loki (#61724)

* Implement push endpoint

* Drop duplicated struct

* Genericize auth/tenant headers and improve logging in error case

* Flesh out the data model

* Drop dead code

* Drop log line entirely

* Drop unused arg

* Rename a few type manipulation functions

* Extract label keys as constants

* Improve logs when loki responds with error

* Inline lokiRepresentation function
This commit is contained in:
Alexander Weaver 2023-01-23 16:31:03 -06:00 committed by GitHub
parent 3146740d82
commit 7ccc845187
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 201 additions and 27 deletions

View File

@ -2,15 +2,28 @@ package historian
import (
"context"
"encoding/json"
"fmt"
"sort"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/state"
)
const (
OrgIDLabel = "orgID"
RuleUIDLabel = "ruleUID"
GroupLabel = "group"
FolderUIDLabel = "folderUID"
)
type remoteLokiClient interface {
ping() error
push([]stream) error
}
type RemoteLokiBackend struct {
@ -30,11 +43,107 @@ func (h *RemoteLokiBackend) TestConnection() error {
return h.client.ping()
}
func (h *RemoteLokiBackend) RecordStatesAsync(ctx context.Context, _ *models.AlertRule, _ []state.StateTransition) {
func (h *RemoteLokiBackend) RecordStatesAsync(ctx context.Context, rule *models.AlertRule, states []state.StateTransition) {
logger := h.log.FromContext(ctx)
logger.Debug("Remote Loki state history backend was called with states")
streams := h.statesToStreams(rule, states, logger)
h.recordStreamsAsync(ctx, streams, logger)
}
func (h *RemoteLokiBackend) QueryStates(ctx context.Context, query models.HistoryQuery) (*data.Frame, error) {
return data.NewFrame("states"), nil
}
func (h *RemoteLokiBackend) statesToStreams(rule *models.AlertRule, states []state.StateTransition, logger log.Logger) []stream {
buckets := make(map[string][]row) // label repr -> entries
for _, state := range states {
if !shouldRecord(state) {
continue
}
labels := removePrivateLabels(state.State.Labels)
labels[OrgIDLabel] = fmt.Sprint(rule.OrgID)
labels[RuleUIDLabel] = fmt.Sprint(rule.UID)
labels[GroupLabel] = fmt.Sprint(rule.RuleGroup)
labels[FolderUIDLabel] = fmt.Sprint(rule.NamespaceUID)
repr := labels.String()
entry := lokiEntry{
SchemaVersion: 1,
Previous: state.PreviousFormatted(),
Current: state.Formatted(),
Values: valuesAsDataBlob(state.State),
}
jsn, err := json.Marshal(entry)
if err != nil {
logger.Error("Failed to construct history record for state, skipping", "error", err)
continue
}
line := string(jsn)
buckets[repr] = append(buckets[repr], row{
At: state.State.LastEvaluationTime,
Val: line,
})
}
result := make([]stream, 0, len(buckets))
for repr, rows := range buckets {
labels, err := data.LabelsFromString(repr)
if err != nil {
logger.Error("Failed to parse frame labels, skipping state history batch: %w", err)
continue
}
result = append(result, stream{
Stream: labels,
Values: rows,
})
}
return result
}
func (h *RemoteLokiBackend) recordStreamsAsync(ctx context.Context, streams []stream, logger log.Logger) {
go func() {
if err := h.recordStreams(ctx, streams, logger); err != nil {
logger.Error("Failed to save alert state history batch", "error", err)
}
}()
}
func (h *RemoteLokiBackend) recordStreams(ctx context.Context, streams []stream, logger log.Logger) error {
if err := h.client.push(streams); err != nil {
return err
}
logger.Debug("Done saving alert state history batch")
return nil
}
type lokiEntry struct {
SchemaVersion int `json:"schemaVersion"`
Previous string `json:"previous"`
Current string `json:"current"`
Values *simplejson.Json `json:"values"`
}
func valuesAsDataBlob(state *state.State) *simplejson.Json {
jsonData := simplejson.New()
switch state.State {
case eval.Error:
if state.Error == nil {
jsonData.Set("error", nil)
} else {
jsonData.Set("error", state.Error.Error())
}
case eval.NoData:
jsonData.Set("noData", true)
default:
keys := make([]string, 0, len(state.Values))
for k := range state.Values {
keys = append(keys, k)
}
sort.Strings(keys)
jsonData.Set("values", simplejson.NewFromAny(state.Values))
}
return jsonData
}

View File

@ -1,7 +1,10 @@
package historian
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"time"
@ -37,18 +40,10 @@ func newLokiClient(cfg LokiConfig, logger log.Logger) *httpLokiClient {
func (c *httpLokiClient) ping() error {
uri := c.cfg.Url.JoinPath("/loki/api/v1/labels")
req, err := http.NewRequest(http.MethodGet, uri.String(), nil)
if c.cfg.BasicAuthUser != "" || c.cfg.BasicAuthPassword != "" {
req.SetBasicAuth(c.cfg.BasicAuthUser, c.cfg.BasicAuthPassword)
}
if c.cfg.TenantID != "" {
req.Header.Add("X-Scope-OrgID", c.cfg.TenantID)
}
if err != nil {
return fmt.Errorf("error creating request: %w", err)
}
c.setAuthAndTenantHeaders(req)
res, err := c.client.Do(req)
if res != nil {
@ -68,3 +63,71 @@ func (c *httpLokiClient) ping() error {
c.log.Debug("Ping request to Loki endpoint succeeded", "status", res.StatusCode)
return nil
}
type stream struct {
Stream map[string]string `json:"stream"`
Values []row `json:"values"`
}
type row struct {
At time.Time
Val string
}
func (r *row) MarshalJSON() ([]byte, error) {
return json.Marshal([2]string{
fmt.Sprintf("%d", r.At.UnixNano()), r.Val,
})
}
func (c *httpLokiClient) push(s []stream) error {
body := struct {
Streams []stream `json:"streams"`
}{Streams: s}
enc, err := json.Marshal(body)
if err != nil {
return fmt.Errorf("failed to serialize Loki payload: %w", err)
}
uri := c.cfg.Url.JoinPath("/loki/api/v1/push")
req, err := http.NewRequest(http.MethodPost, uri.String(), bytes.NewBuffer(enc))
if err != nil {
return fmt.Errorf("failed to create Loki request: %w", err)
}
c.setAuthAndTenantHeaders(req)
req.Header.Add("content-type", "application/json")
resp, err := c.client.Do(req)
if resp != nil {
defer func() {
if err := resp.Body.Close(); err != nil {
c.log.Warn("Failed to close response body", "err", err)
}
}()
}
if err != nil {
return fmt.Errorf("failed to send request: %w", err)
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
byt, _ := io.ReadAll(resp.Body)
if len(byt) > 0 {
c.log.Error("Error response from Loki", "response", string(byt), "status", resp.StatusCode)
} else {
c.log.Error("Error response from Loki with an empty body", "status", resp.StatusCode)
}
return fmt.Errorf("received a non-200 response from loki, status: %d", resp.StatusCode)
}
return nil
}
func (c *httpLokiClient) setAuthAndTenantHeaders(req *http.Request) {
if c.cfg.BasicAuthUser != "" || c.cfg.BasicAuthPassword != "" {
req.SetBasicAuth(c.cfg.BasicAuthUser, c.cfg.BasicAuthPassword)
}
if c.cfg.TenantID != "" {
req.Header.Add("X-Scope-OrgID", c.cfg.TenantID)
}
}

View File

@ -12,25 +12,27 @@ import (
func TestLokiHTTPClient(t *testing.T) {
t.Skip()
url, err := url.Parse("https://logs-prod-eu-west-0.grafana.net")
require.NoError(t, err)
t.Run("smoke test pinging Loki", func(t *testing.T) {
url, err := url.Parse("https://logs-prod-eu-west-0.grafana.net")
require.NoError(t, err)
client := newLokiClient(LokiConfig{
Url: url,
}, log.NewNopLogger())
client := newLokiClient(LokiConfig{
Url: url,
}, log.NewNopLogger())
// Unauthorized request should fail against Grafana Cloud.
err = client.ping()
require.Error(t, err)
// Unauthorized request should fail against Grafana Cloud.
err = client.ping()
require.Error(t, err)
client.cfg.BasicAuthUser = "<your_username>"
client.cfg.BasicAuthPassword = "<your_password>"
client.cfg.BasicAuthUser = "<your_username>"
client.cfg.BasicAuthPassword = "<your_password>"
// When running on prem, you might need to set the tenant id,
// so the x-scope-orgid header is set.
// client.cfg.TenantID = "<your_tenant_id>"
// When running on prem, you might need to set the tenant id,
// so the x-scope-orgid header is set.
// client.cfg.TenantID = "<your_tenant_id>"
// Authorized request should fail against Grafana Cloud.
err = client.ping()
require.NoError(t, err)
// Authorized request should fail against Grafana Cloud.
err = client.ping()
require.NoError(t, err)
})
}