mirror of
https://github.com/grafana/grafana.git
synced 2025-02-12 08:35:43 -06:00
408 lines
13 KiB
Go
408 lines
13 KiB
Go
package historian
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"math"
|
|
"sort"
|
|
"time"
|
|
|
|
"github.com/benbjohnson/clock"
|
|
"github.com/grafana/grafana-plugin-sdk-go/data"
|
|
"github.com/weaveworks/common/http/client"
|
|
|
|
"github.com/grafana/grafana/pkg/components/simplejson"
|
|
"github.com/grafana/grafana/pkg/infra/log"
|
|
"github.com/grafana/grafana/pkg/infra/tracing"
|
|
"github.com/grafana/grafana/pkg/services/ngalert/eval"
|
|
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
|
"github.com/grafana/grafana/pkg/services/ngalert/models"
|
|
"github.com/grafana/grafana/pkg/services/ngalert/state"
|
|
history_model "github.com/grafana/grafana/pkg/services/ngalert/state/historian/model"
|
|
)
|
|
|
|
const (
|
|
OrgIDLabel = "orgID"
|
|
RuleUIDLabel = "ruleUID"
|
|
GroupLabel = "group"
|
|
FolderUIDLabel = "folderUID"
|
|
// Name of the columns used in the dataframe.
|
|
dfTime = "time"
|
|
dfLine = "line"
|
|
dfLabels = "labels"
|
|
)
|
|
|
|
const (
|
|
StateHistoryLabelKey = "from"
|
|
StateHistoryLabelValue = "state-history"
|
|
)
|
|
|
|
const defaultQueryRange = 6 * time.Hour
|
|
|
|
type remoteLokiClient interface {
|
|
ping(context.Context) error
|
|
push(context.Context, []stream) error
|
|
rangeQuery(ctx context.Context, logQL string, start, end, limit int64) (queryRes, error)
|
|
}
|
|
|
|
// RemoteLokibackend is a state.Historian that records state history to an external Loki instance.
|
|
type RemoteLokiBackend struct {
|
|
client remoteLokiClient
|
|
externalLabels map[string]string
|
|
clock clock.Clock
|
|
metrics *metrics.Historian
|
|
log log.Logger
|
|
}
|
|
|
|
func NewRemoteLokiBackend(cfg LokiConfig, req client.Requester, metrics *metrics.Historian) *RemoteLokiBackend {
|
|
logger := log.New("ngalert.state.historian", "backend", "loki")
|
|
return &RemoteLokiBackend{
|
|
client: newLokiClient(cfg, req, metrics, logger),
|
|
externalLabels: cfg.ExternalLabels,
|
|
clock: clock.New(),
|
|
metrics: metrics,
|
|
log: logger,
|
|
}
|
|
}
|
|
|
|
func (h *RemoteLokiBackend) TestConnection(ctx context.Context) error {
|
|
return h.client.ping(ctx)
|
|
}
|
|
|
|
// Record writes a number of state transitions for a given rule to an external Loki instance.
|
|
func (h *RemoteLokiBackend) Record(ctx context.Context, rule history_model.RuleMeta, states []state.StateTransition) <-chan error {
|
|
logger := h.log.FromContext(ctx)
|
|
logStream := statesToStream(rule, states, h.externalLabels, logger)
|
|
|
|
errCh := make(chan error, 1)
|
|
if len(logStream.Values) == 0 {
|
|
close(errCh)
|
|
return errCh
|
|
}
|
|
|
|
// This is a new background job, so let's create a brand new context for it.
|
|
// We want it to be isolated, i.e. we don't want grafana shutdowns to interrupt this work
|
|
// immediately but rather try to flush writes.
|
|
// This also prevents timeouts or other lingering objects (like transactions) from being
|
|
// incorrectly propagated here from other areas.
|
|
writeCtx := context.Background()
|
|
writeCtx, cancel := context.WithTimeout(writeCtx, StateHistoryWriteTimeout)
|
|
writeCtx = history_model.WithRuleData(writeCtx, rule)
|
|
writeCtx = tracing.ContextWithSpan(writeCtx, tracing.SpanFromContext(ctx))
|
|
|
|
go func(ctx context.Context) {
|
|
defer cancel()
|
|
defer close(errCh)
|
|
logger := h.log.FromContext(ctx)
|
|
|
|
org := fmt.Sprint(rule.OrgID)
|
|
h.metrics.WritesTotal.WithLabelValues(org, "loki").Inc()
|
|
h.metrics.TransitionsTotal.WithLabelValues(org).Add(float64(len(logStream.Values)))
|
|
|
|
if err := h.recordStreams(ctx, []stream{logStream}, logger); err != nil {
|
|
logger.Error("Failed to save alert state history batch", "error", err)
|
|
h.metrics.WritesFailed.WithLabelValues(org, "loki").Inc()
|
|
h.metrics.TransitionsFailed.WithLabelValues(org).Add(float64(len(logStream.Values)))
|
|
errCh <- fmt.Errorf("failed to save alert state history batch: %w", err)
|
|
}
|
|
}(writeCtx)
|
|
return errCh
|
|
}
|
|
|
|
// Query retrieves state history entries from an external Loki instance and formats the results into a dataframe.
|
|
func (h *RemoteLokiBackend) Query(ctx context.Context, query models.HistoryQuery) (*data.Frame, error) {
|
|
logQL, err := buildLogQuery(query)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
now := time.Now().UTC()
|
|
if query.To.IsZero() {
|
|
query.To = now
|
|
}
|
|
if query.From.IsZero() {
|
|
query.From = now.Add(-defaultQueryRange)
|
|
}
|
|
|
|
// Timestamps are expected in RFC3339Nano.
|
|
res, err := h.client.rangeQuery(ctx, logQL, query.From.UnixNano(), query.To.UnixNano(), int64(query.Limit))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return merge(res, query.RuleUID)
|
|
}
|
|
|
|
func buildSelectors(query models.HistoryQuery) ([]Selector, error) {
|
|
// OrgID and the state history label are static and will be included in all queries.
|
|
selectors := make([]Selector, 2)
|
|
|
|
// Set the predefined selector orgID.
|
|
selector, err := NewSelector(OrgIDLabel, "=", fmt.Sprintf("%d", query.OrgID))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
selectors[0] = selector
|
|
|
|
// Set the predefined selector for the state history label.
|
|
selector, err = NewSelector(StateHistoryLabelKey, "=", StateHistoryLabelValue)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
selectors[1] = selector
|
|
|
|
return selectors, nil
|
|
}
|
|
|
|
// merge will put all the results in one array sorted by timestamp.
|
|
func merge(res queryRes, ruleUID string) (*data.Frame, error) {
|
|
// Find the total number of elements in all arrays.
|
|
totalLen := 0
|
|
for _, arr := range res.Data.Result {
|
|
totalLen += len(arr.Values)
|
|
}
|
|
|
|
// Create a new slice to store the merged elements.
|
|
frame := data.NewFrame("states")
|
|
|
|
// We merge all series into a single linear history.
|
|
lbls := data.Labels(map[string]string{})
|
|
|
|
// We represent state history as a single merged history, that roughly corresponds to what you get in the Grafana Explore tab when querying Loki directly.
|
|
// The format is composed of the following vectors:
|
|
// 1. `time` - timestamp - when the transition happened
|
|
// 2. `line` - JSON - the full data of the transition
|
|
// 3. `labels` - JSON - the labels associated with that state transition
|
|
times := make([]time.Time, 0, totalLen)
|
|
lines := make([]json.RawMessage, 0, totalLen)
|
|
labels := make([]json.RawMessage, 0, totalLen)
|
|
|
|
// Initialize a slice of pointers to the current position in each array.
|
|
pointers := make([]int, len(res.Data.Result))
|
|
for {
|
|
minTime := int64(math.MaxInt64)
|
|
minEl := sample{}
|
|
minElStreamIdx := -1
|
|
// Find the element with the earliest time among all arrays.
|
|
for i, stream := range res.Data.Result {
|
|
// Skip if we already reached the end of the current array.
|
|
if len(stream.Values) == pointers[i] {
|
|
continue
|
|
}
|
|
curTime := stream.Values[pointers[i]].T.UnixNano()
|
|
if pointers[i] < len(stream.Values) && curTime < minTime {
|
|
minTime = curTime
|
|
minEl = stream.Values[pointers[i]]
|
|
minElStreamIdx = i
|
|
}
|
|
}
|
|
// If all pointers have reached the end of their arrays, we're done.
|
|
if minElStreamIdx == -1 {
|
|
break
|
|
}
|
|
var entry lokiEntry
|
|
err := json.Unmarshal([]byte(minEl.V), &entry)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to unmarshal entry: %w", err)
|
|
}
|
|
// Append the minimum element to the merged slice and move the pointer.
|
|
tsNano := minEl.T.UnixNano()
|
|
// TODO: In general, perhaps we should omit the offending line and log, rather than failing the request entirely.
|
|
streamLbls := res.Data.Result[minElStreamIdx].Stream
|
|
lblsJson, err := json.Marshal(streamLbls)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to serialize stream labels: %w", err)
|
|
}
|
|
line, err := jsonifyRow(minEl.V)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("a line was in an invalid format: %w", err)
|
|
}
|
|
|
|
times = append(times, time.Unix(0, tsNano))
|
|
labels = append(labels, lblsJson)
|
|
lines = append(lines, line)
|
|
pointers[minElStreamIdx]++
|
|
}
|
|
|
|
frame.Fields = append(frame.Fields, data.NewField(dfTime, lbls, times))
|
|
frame.Fields = append(frame.Fields, data.NewField(dfLine, lbls, lines))
|
|
frame.Fields = append(frame.Fields, data.NewField(dfLabels, lbls, labels))
|
|
|
|
return frame, nil
|
|
}
|
|
|
|
func statesToStream(rule history_model.RuleMeta, states []state.StateTransition, externalLabels map[string]string, logger log.Logger) stream {
|
|
labels := mergeLabels(make(map[string]string), externalLabels)
|
|
// System-defined labels take precedence over user-defined external labels.
|
|
labels[StateHistoryLabelKey] = StateHistoryLabelValue
|
|
labels[OrgIDLabel] = fmt.Sprint(rule.OrgID)
|
|
labels[GroupLabel] = fmt.Sprint(rule.Group)
|
|
labels[FolderUIDLabel] = fmt.Sprint(rule.NamespaceUID)
|
|
|
|
samples := make([]sample, 0, len(states))
|
|
for _, state := range states {
|
|
if !shouldRecord(state) {
|
|
continue
|
|
}
|
|
|
|
sanitizedLabels := removePrivateLabels(state.Labels)
|
|
entry := lokiEntry{
|
|
SchemaVersion: 1,
|
|
Previous: state.PreviousFormatted(),
|
|
Current: state.Formatted(),
|
|
Values: valuesAsDataBlob(state.State),
|
|
Condition: rule.Condition,
|
|
DashboardUID: rule.DashboardUID,
|
|
PanelID: rule.PanelID,
|
|
Fingerprint: labelFingerprint(sanitizedLabels),
|
|
RuleUID: rule.UID,
|
|
InstanceLabels: sanitizedLabels,
|
|
}
|
|
if state.State.State == eval.Error {
|
|
entry.Error = state.Error.Error()
|
|
}
|
|
|
|
jsn, err := json.Marshal(entry)
|
|
if err != nil {
|
|
logger.Error("Failed to construct history record for state, skipping", "error", err)
|
|
continue
|
|
}
|
|
line := string(jsn)
|
|
|
|
samples = append(samples, sample{
|
|
T: state.State.LastEvaluationTime,
|
|
V: line,
|
|
})
|
|
}
|
|
|
|
return stream{
|
|
Stream: labels,
|
|
Values: samples,
|
|
}
|
|
}
|
|
|
|
func (h *RemoteLokiBackend) recordStreams(ctx context.Context, streams []stream, logger log.Logger) error {
|
|
if err := h.client.push(ctx, streams); err != nil {
|
|
return err
|
|
}
|
|
|
|
logger.Debug("Done saving alert state history batch")
|
|
return nil
|
|
}
|
|
|
|
type lokiEntry struct {
|
|
SchemaVersion int `json:"schemaVersion"`
|
|
Previous string `json:"previous"`
|
|
Current string `json:"current"`
|
|
Error string `json:"error,omitempty"`
|
|
Values *simplejson.Json `json:"values"`
|
|
Condition string `json:"condition"`
|
|
DashboardUID string `json:"dashboardUID"`
|
|
PanelID int64 `json:"panelID"`
|
|
Fingerprint string `json:"fingerprint"`
|
|
RuleUID string `json:"ruleUID"`
|
|
// InstanceLabels is exactly the set of labels associated with the alert instance in Alertmanager.
|
|
// These should not be conflated with labels associated with log streams.
|
|
InstanceLabels map[string]string `json:"labels"`
|
|
}
|
|
|
|
func valuesAsDataBlob(state *state.State) *simplejson.Json {
|
|
if state.State == eval.Error || state.State == eval.NoData {
|
|
return simplejson.New()
|
|
}
|
|
|
|
return jsonifyValues(state.Values)
|
|
}
|
|
|
|
func jsonifyRow(line string) (json.RawMessage, error) {
|
|
// Ser/deser to validate the contents of the log line before shipping it forward.
|
|
// TODO: We may want to remove this in the future, as we already have the value in the form of a []byte, and json.RawMessage is also a []byte.
|
|
// TODO: Though, if the log line does not contain valid JSON, this can cause problems later on when rendering the dataframe.
|
|
var entry lokiEntry
|
|
if err := json.Unmarshal([]byte(line), &entry); err != nil {
|
|
return nil, err
|
|
}
|
|
return json.Marshal(entry)
|
|
}
|
|
|
|
type Selector struct {
|
|
// Label to Select
|
|
Label string
|
|
Op Operator
|
|
// Value that is expected
|
|
Value string
|
|
}
|
|
|
|
func NewSelector(label, op, value string) (Selector, error) {
|
|
if !isValidOperator(op) {
|
|
return Selector{}, fmt.Errorf("'%s' is not a valid query operator", op)
|
|
}
|
|
return Selector{Label: label, Op: Operator(op), Value: value}, nil
|
|
}
|
|
|
|
func selectorString(selectors []Selector) string {
|
|
if len(selectors) == 0 {
|
|
return "{}"
|
|
}
|
|
// Build the query selector.
|
|
query := ""
|
|
for _, s := range selectors {
|
|
query += fmt.Sprintf("%s%s%q,", s.Label, s.Op, s.Value)
|
|
}
|
|
// Remove the last comma, as we append one to every selector.
|
|
query = query[:len(query)-1]
|
|
return "{" + query + "}"
|
|
}
|
|
|
|
func isValidOperator(op string) bool {
|
|
switch op {
|
|
case "=", "!=", "=~", "!~":
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func buildLogQuery(query models.HistoryQuery) (string, error) {
|
|
selectors, err := buildSelectors(query)
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to build the provided selectors: %w", err)
|
|
}
|
|
|
|
logQL := selectorString(selectors)
|
|
|
|
if queryHasLogFilters(query) {
|
|
logQL = fmt.Sprintf("%s | json", logQL)
|
|
}
|
|
|
|
if query.RuleUID != "" {
|
|
logQL = fmt.Sprintf("%s | ruleUID=%q", logQL, query.RuleUID)
|
|
}
|
|
if query.DashboardUID != "" {
|
|
logQL = fmt.Sprintf("%s | dashboardUID=%q", logQL, query.DashboardUID)
|
|
}
|
|
if query.PanelID != 0 {
|
|
logQL = fmt.Sprintf("%s | panelID=%d", logQL, query.PanelID)
|
|
}
|
|
|
|
labelFilters := ""
|
|
labelKeys := make([]string, 0, len(query.Labels))
|
|
for k := range query.Labels {
|
|
labelKeys = append(labelKeys, k)
|
|
}
|
|
// Ensure that all queries we build are deterministic.
|
|
sort.Strings(labelKeys)
|
|
for _, k := range labelKeys {
|
|
labelFilters += fmt.Sprintf(" | labels_%s=%q", k, query.Labels[k])
|
|
}
|
|
logQL += labelFilters
|
|
|
|
return logQL, nil
|
|
}
|
|
|
|
func queryHasLogFilters(query models.HistoryQuery) bool {
|
|
return query.RuleUID != "" ||
|
|
query.DashboardUID != "" ||
|
|
query.PanelID != 0 ||
|
|
len(query.Labels) > 0
|
|
}
|