Alerting: Make alert state history Loki http client public (#78291)

* Make state history Loki client public

* Make historian metrics subsystem configurable
This commit is contained in:
William Wernert 2023-11-27 09:20:50 -05:00 committed by GitHub
parent 17f3bbe4a1
commit f7bf818527
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 107 additions and 107 deletions

View File

@ -16,48 +16,48 @@ type Historian struct {
BytesWritten prometheus.Counter BytesWritten prometheus.Counter
} }
func NewHistorianMetrics(r prometheus.Registerer) *Historian { func NewHistorianMetrics(r prometheus.Registerer, subsystem string) *Historian {
return &Historian{ return &Historian{
Info: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{ Info: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{
Namespace: Namespace, Namespace: Namespace,
Subsystem: Subsystem, Subsystem: subsystem,
Name: "state_history_info", Name: "state_history_info",
Help: "Information about the state history store.", Help: "Information about the state history store.",
}, []string{"backend"}), }, []string{"backend"}),
TransitionsTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ TransitionsTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: Namespace, Namespace: Namespace,
Subsystem: Subsystem, Subsystem: subsystem,
Name: "state_history_transitions_total", Name: "state_history_transitions_total",
Help: "The total number of state transitions processed.", Help: "The total number of state transitions processed.",
}, []string{"org"}), }, []string{"org"}),
TransitionsFailed: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ TransitionsFailed: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: Namespace, Namespace: Namespace,
Subsystem: Subsystem, Subsystem: subsystem,
Name: "state_history_transitions_failed_total", Name: "state_history_transitions_failed_total",
Help: "The total number of state transitions that failed to be written - they are not retried.", Help: "The total number of state transitions that failed to be written - they are not retried.",
}, []string{"org"}), }, []string{"org"}),
WritesTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ WritesTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: Namespace, Namespace: Namespace,
Subsystem: Subsystem, Subsystem: subsystem,
Name: "state_history_writes_total", Name: "state_history_writes_total",
Help: "The total number of state history batches that were attempted to be written.", Help: "The total number of state history batches that were attempted to be written.",
}, []string{"org", "backend"}), }, []string{"org", "backend"}),
WritesFailed: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ WritesFailed: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: Namespace, Namespace: Namespace,
Subsystem: Subsystem, Subsystem: subsystem,
Name: "state_history_writes_failed_total", Name: "state_history_writes_failed_total",
Help: "The total number of failed writes of state history batches.", Help: "The total number of failed writes of state history batches.",
}, []string{"org", "backend"}), }, []string{"org", "backend"}),
WriteDuration: instrument.NewHistogramCollector(promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ WriteDuration: instrument.NewHistogramCollector(promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: Namespace, Namespace: Namespace,
Subsystem: Subsystem, Subsystem: subsystem,
Name: "state_history_request_duration_seconds", Name: "state_history_request_duration_seconds",
Help: "Histogram of request durations to the state history store. Only valid when using external stores.", Help: "Histogram of request durations to the state history store. Only valid when using external stores.",
Buckets: instrument.DefBuckets, Buckets: instrument.DefBuckets,
}, instrument.HistogramCollectorBuckets)), }, instrument.HistogramCollectorBuckets)),
BytesWritten: promauto.With(r).NewCounter(prometheus.CounterOpts{ BytesWritten: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: Namespace, Namespace: Namespace,
Subsystem: Subsystem, Subsystem: subsystem,
Name: "state_history_writes_bytes_total", Name: "state_history_writes_bytes_total",
Help: "The total number of bytes sent within a batch to the state history store. Only valid when using the Loki store.", Help: "The total number of bytes sent within a batch to the state history store. Only valid when using the Loki store.",
}), }),

View File

@ -40,7 +40,7 @@ func NewNGAlert(r prometheus.Registerer) *NGAlert {
stateMetrics: NewStateMetrics(r), stateMetrics: NewStateMetrics(r),
multiOrgAlertmanagerMetrics: NewMultiOrgAlertmanagerMetrics(r), multiOrgAlertmanagerMetrics: NewMultiOrgAlertmanagerMetrics(r),
apiMetrics: NewAPIMetrics(r), apiMetrics: NewAPIMetrics(r),
historianMetrics: NewHistorianMetrics(r), historianMetrics: NewHistorianMetrics(r, Subsystem),
} }
} }

View File

@ -61,7 +61,7 @@ func Test_subscribeToFolderChanges(t *testing.T) {
func TestConfigureHistorianBackend(t *testing.T) { func TestConfigureHistorianBackend(t *testing.T) {
t.Run("fail initialization if invalid backend", func(t *testing.T) { t.Run("fail initialization if invalid backend", func(t *testing.T) {
met := metrics.NewHistorianMetrics(prometheus.NewRegistry()) met := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem)
logger := log.NewNopLogger() logger := log.NewNopLogger()
cfg := setting.UnifiedAlertingStateHistorySettings{ cfg := setting.UnifiedAlertingStateHistorySettings{
Enabled: true, Enabled: true,
@ -74,7 +74,7 @@ func TestConfigureHistorianBackend(t *testing.T) {
}) })
t.Run("fail initialization if invalid multi-backend primary", func(t *testing.T) { t.Run("fail initialization if invalid multi-backend primary", func(t *testing.T) {
met := metrics.NewHistorianMetrics(prometheus.NewRegistry()) met := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem)
logger := log.NewNopLogger() logger := log.NewNopLogger()
cfg := setting.UnifiedAlertingStateHistorySettings{ cfg := setting.UnifiedAlertingStateHistorySettings{
Enabled: true, Enabled: true,
@ -89,7 +89,7 @@ func TestConfigureHistorianBackend(t *testing.T) {
}) })
t.Run("fail initialization if invalid multi-backend secondary", func(t *testing.T) { t.Run("fail initialization if invalid multi-backend secondary", func(t *testing.T) {
met := metrics.NewHistorianMetrics(prometheus.NewRegistry()) met := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem)
logger := log.NewNopLogger() logger := log.NewNopLogger()
cfg := setting.UnifiedAlertingStateHistorySettings{ cfg := setting.UnifiedAlertingStateHistorySettings{
Enabled: true, Enabled: true,
@ -105,7 +105,7 @@ func TestConfigureHistorianBackend(t *testing.T) {
}) })
t.Run("do not fail initialization if pinging Loki fails", func(t *testing.T) { t.Run("do not fail initialization if pinging Loki fails", func(t *testing.T) {
met := metrics.NewHistorianMetrics(prometheus.NewRegistry()) met := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem)
logger := log.NewNopLogger() logger := log.NewNopLogger()
cfg := setting.UnifiedAlertingStateHistorySettings{ cfg := setting.UnifiedAlertingStateHistorySettings{
Enabled: true, Enabled: true,
@ -123,7 +123,7 @@ func TestConfigureHistorianBackend(t *testing.T) {
t.Run("emit metric describing chosen backend", func(t *testing.T) { t.Run("emit metric describing chosen backend", func(t *testing.T) {
reg := prometheus.NewRegistry() reg := prometheus.NewRegistry()
met := metrics.NewHistorianMetrics(reg) met := metrics.NewHistorianMetrics(reg, metrics.Subsystem)
logger := log.NewNopLogger() logger := log.NewNopLogger()
cfg := setting.UnifiedAlertingStateHistorySettings{ cfg := setting.UnifiedAlertingStateHistorySettings{
Enabled: true, Enabled: true,
@ -145,7 +145,7 @@ grafana_alerting_state_history_info{backend="annotations"} 1
t.Run("emit special zero metric if state history disabled", func(t *testing.T) { t.Run("emit special zero metric if state history disabled", func(t *testing.T) {
reg := prometheus.NewRegistry() reg := prometheus.NewRegistry()
met := metrics.NewHistorianMetrics(reg) met := metrics.NewHistorianMetrics(reg, metrics.Subsystem)
logger := log.NewNopLogger() logger := log.NewNopLogger()
cfg := setting.UnifiedAlertingStateHistorySettings{ cfg := setting.UnifiedAlertingStateHistorySettings{
Enabled: false, Enabled: false,

View File

@ -62,7 +62,7 @@ func TestAnnotationHistorian(t *testing.T) {
t.Run("emits expected write metrics", func(t *testing.T) { t.Run("emits expected write metrics", func(t *testing.T) {
reg := prometheus.NewRegistry() reg := prometheus.NewRegistry()
met := metrics.NewHistorianMetrics(reg) met := metrics.NewHistorianMetrics(reg, metrics.Subsystem)
anns := createTestAnnotationBackendSutWithMetrics(t, met) anns := createTestAnnotationBackendSutWithMetrics(t, met)
errAnns := createFailingAnnotationSut(t, met) errAnns := createFailingAnnotationSut(t, met)
rule := createTestRule() rule := createTestRule()
@ -101,7 +101,7 @@ grafana_alerting_state_history_writes_total{backend="annotations",org="1"} 2
} }
func createTestAnnotationBackendSut(t *testing.T) *AnnotationBackend { func createTestAnnotationBackendSut(t *testing.T) *AnnotationBackend {
return createTestAnnotationBackendSutWithMetrics(t, metrics.NewHistorianMetrics(prometheus.NewRegistry())) return createTestAnnotationBackendSutWithMetrics(t, metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem))
} }
func createTestAnnotationBackendSutWithMetrics(t *testing.T, met *metrics.Historian) *AnnotationBackend { func createTestAnnotationBackendSutWithMetrics(t *testing.T, met *metrics.Historian) *AnnotationBackend {

View File

@ -16,9 +16,9 @@ import (
type JsonEncoder struct{} type JsonEncoder struct{}
func (e JsonEncoder) encode(s []stream) ([]byte, error) { func (e JsonEncoder) encode(s []Stream) ([]byte, error) {
body := struct { body := struct {
Streams []stream `json:"streams"` Streams []Stream `json:"streams"`
}{Streams: s} }{Streams: s}
enc, err := json.Marshal(body) enc, err := json.Marshal(body)
if err != nil { if err != nil {
@ -35,7 +35,7 @@ func (e JsonEncoder) headers() map[string]string {
type SnappyProtoEncoder struct{} type SnappyProtoEncoder struct{}
func (e SnappyProtoEncoder) encode(s []stream) ([]byte, error) { func (e SnappyProtoEncoder) encode(s []Stream) ([]byte, error) {
body := logproto.PushRequest{ body := logproto.PushRequest{
Streams: make([]logproto.Stream, 0, len(s)), Streams: make([]logproto.Stream, 0, len(s)),
} }

View File

@ -41,9 +41,9 @@ const (
const defaultQueryRange = 6 * time.Hour const defaultQueryRange = 6 * time.Hour
type remoteLokiClient interface { type remoteLokiClient interface {
ping(context.Context) error Ping(context.Context) error
push(context.Context, []stream) error Push(context.Context, []Stream) error
rangeQuery(ctx context.Context, logQL string, start, end, limit int64) (queryRes, error) RangeQuery(ctx context.Context, logQL string, start, end, limit int64) (QueryRes, error)
} }
// RemoteLokibackend is a state.Historian that records state history to an external Loki instance. // RemoteLokibackend is a state.Historian that records state history to an external Loki instance.
@ -58,7 +58,7 @@ type RemoteLokiBackend struct {
func NewRemoteLokiBackend(cfg LokiConfig, req client.Requester, metrics *metrics.Historian) *RemoteLokiBackend { func NewRemoteLokiBackend(cfg LokiConfig, req client.Requester, metrics *metrics.Historian) *RemoteLokiBackend {
logger := log.New("ngalert.state.historian", "backend", "loki") logger := log.New("ngalert.state.historian", "backend", "loki")
return &RemoteLokiBackend{ return &RemoteLokiBackend{
client: newLokiClient(cfg, req, metrics, logger), client: NewLokiClient(cfg, req, metrics, logger),
externalLabels: cfg.ExternalLabels, externalLabels: cfg.ExternalLabels,
clock: clock.New(), clock: clock.New(),
metrics: metrics, metrics: metrics,
@ -67,7 +67,7 @@ func NewRemoteLokiBackend(cfg LokiConfig, req client.Requester, metrics *metrics
} }
func (h *RemoteLokiBackend) TestConnection(ctx context.Context) error { func (h *RemoteLokiBackend) TestConnection(ctx context.Context) error {
return h.client.ping(ctx) return h.client.Ping(ctx)
} }
// Record writes a number of state transitions for a given rule to an external Loki instance. // Record writes a number of state transitions for a given rule to an external Loki instance.
@ -100,7 +100,7 @@ func (h *RemoteLokiBackend) Record(ctx context.Context, rule history_model.RuleM
h.metrics.WritesTotal.WithLabelValues(org, "loki").Inc() h.metrics.WritesTotal.WithLabelValues(org, "loki").Inc()
h.metrics.TransitionsTotal.WithLabelValues(org).Add(float64(len(logStream.Values))) h.metrics.TransitionsTotal.WithLabelValues(org).Add(float64(len(logStream.Values)))
if err := h.recordStreams(ctx, []stream{logStream}, logger); err != nil { if err := h.recordStreams(ctx, []Stream{logStream}, logger); err != nil {
logger.Error("Failed to save alert state history batch", "error", err) logger.Error("Failed to save alert state history batch", "error", err)
h.metrics.WritesFailed.WithLabelValues(org, "loki").Inc() h.metrics.WritesFailed.WithLabelValues(org, "loki").Inc()
h.metrics.TransitionsFailed.WithLabelValues(org).Add(float64(len(logStream.Values))) h.metrics.TransitionsFailed.WithLabelValues(org).Add(float64(len(logStream.Values)))
@ -126,7 +126,7 @@ func (h *RemoteLokiBackend) Query(ctx context.Context, query models.HistoryQuery
} }
// Timestamps are expected in RFC3339Nano. // Timestamps are expected in RFC3339Nano.
res, err := h.client.rangeQuery(ctx, logQL, query.From.UnixNano(), query.To.UnixNano(), int64(query.Limit)) res, err := h.client.RangeQuery(ctx, logQL, query.From.UnixNano(), query.To.UnixNano(), int64(query.Limit))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -155,7 +155,7 @@ func buildSelectors(query models.HistoryQuery) ([]Selector, error) {
} }
// merge will put all the results in one array sorted by timestamp. // merge will put all the results in one array sorted by timestamp.
func merge(res queryRes, ruleUID string) (*data.Frame, error) { func merge(res QueryRes, ruleUID string) (*data.Frame, error) {
// Find the total number of elements in all arrays. // Find the total number of elements in all arrays.
totalLen := 0 totalLen := 0
for _, arr := range res.Data.Result { for _, arr := range res.Data.Result {
@ -181,7 +181,7 @@ func merge(res queryRes, ruleUID string) (*data.Frame, error) {
pointers := make([]int, len(res.Data.Result)) pointers := make([]int, len(res.Data.Result))
for { for {
minTime := int64(math.MaxInt64) minTime := int64(math.MaxInt64)
minEl := sample{} minEl := Sample{}
minElStreamIdx := -1 minElStreamIdx := -1
// Find the element with the earliest time among all arrays. // Find the element with the earliest time among all arrays.
for i, stream := range res.Data.Result { for i, stream := range res.Data.Result {
@ -231,7 +231,7 @@ func merge(res queryRes, ruleUID string) (*data.Frame, error) {
return frame, nil return frame, nil
} }
func statesToStream(rule history_model.RuleMeta, states []state.StateTransition, externalLabels map[string]string, logger log.Logger) stream { func statesToStream(rule history_model.RuleMeta, states []state.StateTransition, externalLabels map[string]string, logger log.Logger) Stream {
labels := mergeLabels(make(map[string]string), externalLabels) labels := mergeLabels(make(map[string]string), externalLabels)
// System-defined labels take precedence over user-defined external labels. // System-defined labels take precedence over user-defined external labels.
labels[StateHistoryLabelKey] = StateHistoryLabelValue labels[StateHistoryLabelKey] = StateHistoryLabelValue
@ -239,7 +239,7 @@ func statesToStream(rule history_model.RuleMeta, states []state.StateTransition,
labels[GroupLabel] = fmt.Sprint(rule.Group) labels[GroupLabel] = fmt.Sprint(rule.Group)
labels[FolderUIDLabel] = fmt.Sprint(rule.NamespaceUID) labels[FolderUIDLabel] = fmt.Sprint(rule.NamespaceUID)
samples := make([]sample, 0, len(states)) samples := make([]Sample, 0, len(states))
for _, state := range states { for _, state := range states {
if !shouldRecord(state) { if !shouldRecord(state) {
continue continue
@ -269,20 +269,20 @@ func statesToStream(rule history_model.RuleMeta, states []state.StateTransition,
} }
line := string(jsn) line := string(jsn)
samples = append(samples, sample{ samples = append(samples, Sample{
T: state.State.LastEvaluationTime, T: state.State.LastEvaluationTime,
V: line, V: line,
}) })
} }
return stream{ return Stream{
Stream: labels, Stream: labels,
Values: samples, Values: samples,
} }
} }
func (h *RemoteLokiBackend) recordStreams(ctx context.Context, streams []stream, logger log.Logger) error { func (h *RemoteLokiBackend) recordStreams(ctx context.Context, streams []Stream, logger log.Logger) error {
if err := h.client.push(ctx, streams); err != nil { if err := h.client.Push(ctx, streams); err != nil {
return err return err
} }

View File

@ -27,7 +27,7 @@ func NewRequester() client.Requester {
// encoder serializes log streams to some byte format. // encoder serializes log streams to some byte format.
type encoder interface { type encoder interface {
// encode serializes a set of log streams to bytes. // encode serializes a set of log streams to bytes.
encode(s []stream) ([]byte, error) encode(s []Stream) ([]byte, error)
// headers returns a set of HTTP-style headers that describes the encoding scheme used. // headers returns a set of HTTP-style headers that describes the encoding scheme used.
headers() map[string]string headers() map[string]string
} }
@ -79,7 +79,7 @@ func NewLokiConfig(cfg setting.UnifiedAlertingStateHistorySettings) (LokiConfig,
}, nil }, nil
} }
type httpLokiClient struct { type HttpLokiClient struct {
client client.Requester client client.Requester
encoder encoder encoder encoder
cfg LokiConfig cfg LokiConfig
@ -101,9 +101,9 @@ const (
NeqRegEx Operator = "!~" NeqRegEx Operator = "!~"
) )
func newLokiClient(cfg LokiConfig, req client.Requester, metrics *metrics.Historian, logger log.Logger) *httpLokiClient { func NewLokiClient(cfg LokiConfig, req client.Requester, metrics *metrics.Historian, logger log.Logger) *HttpLokiClient {
tc := client.NewTimedClient(req, metrics.WriteDuration) tc := client.NewTimedClient(req, metrics.WriteDuration)
return &httpLokiClient{ return &HttpLokiClient{
client: tc, client: tc,
encoder: cfg.Encoder, encoder: cfg.Encoder,
cfg: cfg, cfg: cfg,
@ -112,7 +112,7 @@ func newLokiClient(cfg LokiConfig, req client.Requester, metrics *metrics.Histor
} }
} }
func (c *httpLokiClient) ping(ctx context.Context) error { func (c *HttpLokiClient) Ping(ctx context.Context) error {
uri := c.cfg.ReadPathURL.JoinPath("/loki/api/v1/labels") uri := c.cfg.ReadPathURL.JoinPath("/loki/api/v1/labels")
req, err := http.NewRequest(http.MethodGet, uri.String(), nil) req, err := http.NewRequest(http.MethodGet, uri.String(), nil)
if err != nil { if err != nil {
@ -140,23 +140,23 @@ func (c *httpLokiClient) ping(ctx context.Context) error {
return nil return nil
} }
type stream struct { type Stream struct {
Stream map[string]string `json:"stream"` Stream map[string]string `json:"stream"`
Values []sample `json:"values"` Values []Sample `json:"values"`
} }
type sample struct { type Sample struct {
T time.Time T time.Time
V string V string
} }
func (r *sample) MarshalJSON() ([]byte, error) { func (r *Sample) MarshalJSON() ([]byte, error) {
return json.Marshal([2]string{ return json.Marshal([2]string{
fmt.Sprintf("%d", r.T.UnixNano()), r.V, fmt.Sprintf("%d", r.T.UnixNano()), r.V,
}) })
} }
func (r *sample) UnmarshalJSON(b []byte) error { func (r *Sample) UnmarshalJSON(b []byte) error {
// A Loki stream sample is formatted like a list with two elements, [At, Val] // A Loki stream sample is formatted like a list with two elements, [At, Val]
// At is a string wrapping a timestamp, in nanosecond unix epoch. // At is a string wrapping a timestamp, in nanosecond unix epoch.
// Val is a string containing the log line. // Val is a string containing the log line.
@ -173,7 +173,7 @@ func (r *sample) UnmarshalJSON(b []byte) error {
return nil return nil
} }
func (c *httpLokiClient) push(ctx context.Context, s []stream) error { func (c *HttpLokiClient) Push(ctx context.Context, s []Stream) error {
enc, err := c.encoder.encode(s) enc, err := c.encoder.encode(s)
if err != nil { if err != nil {
return err return err
@ -216,7 +216,7 @@ func (c *httpLokiClient) push(ctx context.Context, s []stream) error {
return nil return nil
} }
func (c *httpLokiClient) setAuthAndTenantHeaders(req *http.Request) { func (c *HttpLokiClient) setAuthAndTenantHeaders(req *http.Request) {
if c.cfg.BasicAuthUser != "" || c.cfg.BasicAuthPassword != "" { if c.cfg.BasicAuthUser != "" || c.cfg.BasicAuthPassword != "" {
req.SetBasicAuth(c.cfg.BasicAuthUser, c.cfg.BasicAuthPassword) req.SetBasicAuth(c.cfg.BasicAuthUser, c.cfg.BasicAuthPassword)
} }
@ -226,10 +226,10 @@ func (c *httpLokiClient) setAuthAndTenantHeaders(req *http.Request) {
} }
} }
func (c *httpLokiClient) rangeQuery(ctx context.Context, logQL string, start, end, limit int64) (queryRes, error) { func (c *HttpLokiClient) RangeQuery(ctx context.Context, logQL string, start, end, limit int64) (QueryRes, error) {
// Run the pre-flight checks for the query. // Run the pre-flight checks for the query.
if start > end { if start > end {
return queryRes{}, fmt.Errorf("start time cannot be after end time") return QueryRes{}, fmt.Errorf("start time cannot be after end time")
} }
if limit < 1 { if limit < 1 {
limit = defaultPageSize limit = defaultPageSize
@ -251,7 +251,7 @@ func (c *httpLokiClient) rangeQuery(ctx context.Context, logQL string, start, en
req, err := http.NewRequest(http.MethodGet, req, err := http.NewRequest(http.MethodGet,
queryURL.String(), nil) queryURL.String(), nil)
if err != nil { if err != nil {
return queryRes{}, fmt.Errorf("error creating request: %w", err) return QueryRes{}, fmt.Errorf("error creating request: %w", err)
} }
req = req.WithContext(ctx) req = req.WithContext(ctx)
@ -259,7 +259,7 @@ func (c *httpLokiClient) rangeQuery(ctx context.Context, logQL string, start, en
res, err := c.client.Do(req) res, err := c.client.Do(req)
if err != nil { if err != nil {
return queryRes{}, fmt.Errorf("error executing request: %w", err) return QueryRes{}, fmt.Errorf("error executing request: %w", err)
} }
defer func() { defer func() {
@ -268,7 +268,7 @@ func (c *httpLokiClient) rangeQuery(ctx context.Context, logQL string, start, en
data, err := io.ReadAll(res.Body) data, err := io.ReadAll(res.Body)
if err != nil { if err != nil {
return queryRes{}, fmt.Errorf("error reading request response: %w", err) return QueryRes{}, fmt.Errorf("error reading request response: %w", err)
} }
if res.StatusCode < 200 || res.StatusCode >= 300 { if res.StatusCode < 200 || res.StatusCode >= 300 {
@ -277,23 +277,23 @@ func (c *httpLokiClient) rangeQuery(ctx context.Context, logQL string, start, en
} else { } else {
c.log.Error("Error response from Loki with an empty body", "status", res.StatusCode) c.log.Error("Error response from Loki with an empty body", "status", res.StatusCode)
} }
return queryRes{}, fmt.Errorf("received a non-200 response from loki, status: %d", res.StatusCode) return QueryRes{}, fmt.Errorf("received a non-200 response from loki, status: %d", res.StatusCode)
} }
result := queryRes{} result := QueryRes{}
err = json.Unmarshal(data, &result) err = json.Unmarshal(data, &result)
if err != nil { if err != nil {
fmt.Println(string(data)) fmt.Println(string(data))
return queryRes{}, fmt.Errorf("error parsing request response: %w", err) return QueryRes{}, fmt.Errorf("error parsing request response: %w", err)
} }
return result, nil return result, nil
} }
type queryRes struct { type QueryRes struct {
Data queryData `json:"data"` Data QueryData `json:"data"`
} }
type queryData struct { type QueryData struct {
Result []stream `json:"result"` Result []Stream `json:"result"`
} }

View File

@ -111,10 +111,10 @@ func TestLokiHTTPClient(t *testing.T) {
req := NewFakeRequester() req := NewFakeRequester()
client := createTestLokiClient(req) client := createTestLokiClient(req)
now := time.Now().UTC() now := time.Now().UTC()
data := []stream{ data := []Stream{
{ {
Stream: map[string]string{}, Stream: map[string]string{},
Values: []sample{ Values: []Sample{
{ {
T: now, T: now,
V: "some line", V: "some line",
@ -123,7 +123,7 @@ func TestLokiHTTPClient(t *testing.T) {
}, },
} }
err := client.push(context.Background(), data) err := client.Push(context.Background(), data)
require.NoError(t, err) require.NoError(t, err)
require.Contains(t, "/loki/api/v1/push", req.lastRequest.URL.Path) require.Contains(t, "/loki/api/v1/push", req.lastRequest.URL.Path)
@ -145,7 +145,7 @@ func TestLokiHTTPClient(t *testing.T) {
now := time.Now().UTC().UnixNano() now := time.Now().UTC().UnixNano()
q := `{from="state-history"}` q := `{from="state-history"}`
_, err := client.rangeQuery(context.Background(), q, now-100, now, 1100) _, err := client.RangeQuery(context.Background(), q, now-100, now, 1100)
require.NoError(t, err) require.NoError(t, err)
params := req.lastRequest.URL.Query() params := req.lastRequest.URL.Query()
@ -165,7 +165,7 @@ func TestLokiHTTPClient(t *testing.T) {
now := time.Now().UTC().UnixNano() now := time.Now().UTC().UnixNano()
q := `{from="state-history"}` q := `{from="state-history"}`
_, err := client.rangeQuery(context.Background(), q, now-100, now, 0) _, err := client.RangeQuery(context.Background(), q, now-100, now, 0)
require.NoError(t, err) require.NoError(t, err)
params := req.lastRequest.URL.Query() params := req.lastRequest.URL.Query()
@ -185,7 +185,7 @@ func TestLokiHTTPClient(t *testing.T) {
now := time.Now().UTC().UnixNano() now := time.Now().UTC().UnixNano()
q := `{from="state-history"}` q := `{from="state-history"}`
_, err := client.rangeQuery(context.Background(), q, now-100, now, -100) _, err := client.RangeQuery(context.Background(), q, now-100, now, -100)
require.NoError(t, err) require.NoError(t, err)
params := req.lastRequest.URL.Query() params := req.lastRequest.URL.Query()
@ -205,7 +205,7 @@ func TestLokiHTTPClient(t *testing.T) {
now := time.Now().UTC().UnixNano() now := time.Now().UTC().UnixNano()
q := `{from="state-history"}` q := `{from="state-history"}`
_, err := client.rangeQuery(context.Background(), q, now-100, now, maximumPageSize+1000) _, err := client.RangeQuery(context.Background(), q, now-100, now, maximumPageSize+1000)
require.NoError(t, err) require.NoError(t, err)
params := req.lastRequest.URL.Query() params := req.lastRequest.URL.Query()
@ -223,14 +223,14 @@ func TestLokiHTTPClient_Manual(t *testing.T) {
url, err := url.Parse("https://logs-prod-eu-west-0.grafana.net") url, err := url.Parse("https://logs-prod-eu-west-0.grafana.net")
require.NoError(t, err) require.NoError(t, err)
client := newLokiClient(LokiConfig{ client := NewLokiClient(LokiConfig{
ReadPathURL: url, ReadPathURL: url,
WritePathURL: url, WritePathURL: url,
Encoder: JsonEncoder{}, Encoder: JsonEncoder{},
}, NewRequester(), metrics.NewHistorianMetrics(prometheus.NewRegistry()), log.NewNopLogger()) }, NewRequester(), metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem), log.NewNopLogger())
// Unauthorized request should fail against Grafana Cloud. // Unauthorized request should fail against Grafana Cloud.
err = client.ping(context.Background()) err = client.Ping(context.Background())
require.Error(t, err) require.Error(t, err)
client.cfg.BasicAuthUser = "<your_username>" client.cfg.BasicAuthUser = "<your_username>"
@ -241,7 +241,7 @@ func TestLokiHTTPClient_Manual(t *testing.T) {
// client.cfg.TenantID = "<your_tenant_id>" // client.cfg.TenantID = "<your_tenant_id>"
// Authorized request should not fail against Grafana Cloud. // Authorized request should not fail against Grafana Cloud.
err = client.ping(context.Background()) err = client.Ping(context.Background())
require.NoError(t, err) require.NoError(t, err)
}) })
@ -249,13 +249,13 @@ func TestLokiHTTPClient_Manual(t *testing.T) {
url, err := url.Parse("https://logs-prod-eu-west-0.grafana.net") url, err := url.Parse("https://logs-prod-eu-west-0.grafana.net")
require.NoError(t, err) require.NoError(t, err)
client := newLokiClient(LokiConfig{ client := NewLokiClient(LokiConfig{
ReadPathURL: url, ReadPathURL: url,
WritePathURL: url, WritePathURL: url,
BasicAuthUser: "<your_username>", BasicAuthUser: "<your_username>",
BasicAuthPassword: "<your_password>", BasicAuthPassword: "<your_password>",
Encoder: JsonEncoder{}, Encoder: JsonEncoder{},
}, NewRequester(), metrics.NewHistorianMetrics(prometheus.NewRegistry()), log.NewNopLogger()) }, NewRequester(), metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem), log.NewNopLogger())
// When running on prem, you might need to set the tenant id, // When running on prem, you might need to set the tenant id,
// so the x-scope-orgid header is set. // so the x-scope-orgid header is set.
@ -268,7 +268,7 @@ func TestLokiHTTPClient_Manual(t *testing.T) {
end := time.Now().UnixNano() end := time.Now().UnixNano()
// Authorized request should not fail against Grafana Cloud. // Authorized request should not fail against Grafana Cloud.
res, err := client.rangeQuery(context.Background(), logQL, start, end, defaultPageSize) res, err := client.RangeQuery(context.Background(), logQL, start, end, defaultPageSize)
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, res) require.NotNil(t, res)
}) })
@ -276,7 +276,7 @@ func TestLokiHTTPClient_Manual(t *testing.T) {
func TestRow(t *testing.T) { func TestRow(t *testing.T) {
t.Run("marshal", func(t *testing.T) { t.Run("marshal", func(t *testing.T) {
row := sample{ row := Sample{
T: time.Unix(0, 1234), T: time.Unix(0, 1234),
V: "some sample", V: "some sample",
} }
@ -290,7 +290,7 @@ func TestRow(t *testing.T) {
t.Run("unmarshal", func(t *testing.T) { t.Run("unmarshal", func(t *testing.T) {
jsn := []byte(`["1234", "some sample"]`) jsn := []byte(`["1234", "some sample"]`)
row := sample{} row := Sample{}
err := json.Unmarshal(jsn, &row) err := json.Unmarshal(jsn, &row)
require.NoError(t, err) require.NoError(t, err)
@ -301,7 +301,7 @@ func TestRow(t *testing.T) {
t.Run("unmarshal invalid", func(t *testing.T) { t.Run("unmarshal invalid", func(t *testing.T) {
jsn := []byte(`{"key": "wrong shape"}`) jsn := []byte(`{"key": "wrong shape"}`)
row := sample{} row := Sample{}
err := json.Unmarshal(jsn, &row) err := json.Unmarshal(jsn, &row)
require.ErrorContains(t, err, "failed to deserialize sample") require.ErrorContains(t, err, "failed to deserialize sample")
@ -310,7 +310,7 @@ func TestRow(t *testing.T) {
t.Run("unmarshal bad timestamp", func(t *testing.T) { t.Run("unmarshal bad timestamp", func(t *testing.T) {
jsn := []byte(`["not-unix-nano", "some sample"]`) jsn := []byte(`["not-unix-nano", "some sample"]`)
row := sample{} row := Sample{}
err := json.Unmarshal(jsn, &row) err := json.Unmarshal(jsn, &row)
require.ErrorContains(t, err, "timestamp in Loki sample") require.ErrorContains(t, err, "timestamp in Loki sample")
@ -319,9 +319,9 @@ func TestRow(t *testing.T) {
func TestStream(t *testing.T) { func TestStream(t *testing.T) {
t.Run("marshal", func(t *testing.T) { t.Run("marshal", func(t *testing.T) {
stream := stream{ stream := Stream{
Stream: map[string]string{"a": "b"}, Stream: map[string]string{"a": "b"},
Values: []sample{ Values: []Sample{
{T: time.Unix(0, 1), V: "one"}, {T: time.Unix(0, 1), V: "one"},
{T: time.Unix(0, 2), V: "two"}, {T: time.Unix(0, 2), V: "two"},
}, },
@ -338,15 +338,15 @@ func TestStream(t *testing.T) {
}) })
} }
func createTestLokiClient(req client.Requester) *httpLokiClient { func createTestLokiClient(req client.Requester) *HttpLokiClient {
url, _ := url.Parse("http://some.url") url, _ := url.Parse("http://some.url")
cfg := LokiConfig{ cfg := LokiConfig{
WritePathURL: url, WritePathURL: url,
ReadPathURL: url, ReadPathURL: url,
Encoder: JsonEncoder{}, Encoder: JsonEncoder{},
} }
met := metrics.NewHistorianMetrics(prometheus.NewRegistry()) met := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem)
return newLokiClient(cfg, req, met, log.NewNopLogger()) return NewLokiClient(cfg, req, met, log.NewNopLogger())
} }
func reqBody(t *testing.T, req *http.Request) string { func reqBody(t *testing.T, req *http.Request) string {

View File

@ -290,20 +290,20 @@ func TestRemoteLokiBackend(t *testing.T) {
func TestMerge(t *testing.T) { func TestMerge(t *testing.T) {
testCases := []struct { testCases := []struct {
name string name string
res queryRes res QueryRes
ruleID string ruleID string
expectedTime []time.Time expectedTime []time.Time
}{ }{
{ {
name: "Should return values from multiple streams in right order", name: "Should return values from multiple streams in right order",
res: queryRes{ res: QueryRes{
Data: queryData{ Data: QueryData{
Result: []stream{ Result: []Stream{
{ {
Stream: map[string]string{ Stream: map[string]string{
"current": "pending", "current": "pending",
}, },
Values: []sample{ Values: []Sample{
{time.Unix(0, 1), `{"schemaVersion": 1, "previous": "normal", "current": "pending", "values":{"a": "b"}}`}, {time.Unix(0, 1), `{"schemaVersion": 1, "previous": "normal", "current": "pending", "values":{"a": "b"}}`},
}, },
}, },
@ -311,7 +311,7 @@ func TestMerge(t *testing.T) {
Stream: map[string]string{ Stream: map[string]string{
"current": "firing", "current": "firing",
}, },
Values: []sample{ Values: []Sample{
{time.Unix(0, 2), `{"schemaVersion": 1, "previous": "pending", "current": "firing", "values":{"a": "b"}}`}, {time.Unix(0, 2), `{"schemaVersion": 1, "previous": "pending", "current": "firing", "values":{"a": "b"}}`},
}, },
}, },
@ -326,14 +326,14 @@ func TestMerge(t *testing.T) {
}, },
{ {
name: "Should handle empty values", name: "Should handle empty values",
res: queryRes{ res: QueryRes{
Data: queryData{ Data: QueryData{
Result: []stream{ Result: []Stream{
{ {
Stream: map[string]string{ Stream: map[string]string{
"current": "normal", "current": "normal",
}, },
Values: []sample{}, Values: []Sample{},
}, },
}, },
}, },
@ -343,14 +343,14 @@ func TestMerge(t *testing.T) {
}, },
{ {
name: "Should handle multiple values in one stream", name: "Should handle multiple values in one stream",
res: queryRes{ res: QueryRes{
Data: queryData{ Data: QueryData{
Result: []stream{ Result: []Stream{
{ {
Stream: map[string]string{ Stream: map[string]string{
"current": "normal", "current": "normal",
}, },
Values: []sample{ Values: []Sample{
{time.Unix(0, 1), `{"schemaVersion": 1, "previous": "firing", "current": "normal", "values":{"a": "b"}}`}, {time.Unix(0, 1), `{"schemaVersion": 1, "previous": "firing", "current": "normal", "values":{"a": "b"}}`},
{time.Unix(0, 2), `{"schemaVersion": 1, "previous": "firing", "current": "normal", "values":{"a": "b"}}`}, {time.Unix(0, 2), `{"schemaVersion": 1, "previous": "firing", "current": "normal", "values":{"a": "b"}}`},
}, },
@ -359,7 +359,7 @@ func TestMerge(t *testing.T) {
Stream: map[string]string{ Stream: map[string]string{
"current": "firing", "current": "firing",
}, },
Values: []sample{ Values: []Sample{
{time.Unix(0, 3), `{"schemaVersion": 1, "previous": "pending", "current": "firing", "values":{"a": "b"}}`}, {time.Unix(0, 3), `{"schemaVersion": 1, "previous": "pending", "current": "firing", "values":{"a": "b"}}`},
}, },
}, },
@ -399,7 +399,7 @@ func TestMerge(t *testing.T) {
func TestRecordStates(t *testing.T) { func TestRecordStates(t *testing.T) {
t.Run("writes state transitions to loki", func(t *testing.T) { t.Run("writes state transitions to loki", func(t *testing.T) {
req := NewFakeRequester() req := NewFakeRequester()
loki := createTestLokiBackend(req, metrics.NewHistorianMetrics(prometheus.NewRegistry())) loki := createTestLokiBackend(req, metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem))
rule := createTestRule() rule := createTestRule()
states := singleFromNormal(&state.State{ states := singleFromNormal(&state.State{
State: eval.Alerting, State: eval.Alerting,
@ -414,7 +414,7 @@ func TestRecordStates(t *testing.T) {
t.Run("emits expected write metrics", func(t *testing.T) { t.Run("emits expected write metrics", func(t *testing.T) {
reg := prometheus.NewRegistry() reg := prometheus.NewRegistry()
met := metrics.NewHistorianMetrics(reg) met := metrics.NewHistorianMetrics(reg, metrics.Subsystem)
loki := createTestLokiBackend(NewFakeRequester(), met) loki := createTestLokiBackend(NewFakeRequester(), met)
errLoki := createTestLokiBackend(NewFakeRequester().WithResponse(badResponse()), met) //nolint:bodyclose errLoki := createTestLokiBackend(NewFakeRequester().WithResponse(badResponse()), met) //nolint:bodyclose
rule := createTestRule() rule := createTestRule()
@ -451,7 +451,7 @@ grafana_alerting_state_history_writes_total{backend="loki",org="1"} 2
t.Run("elides request if nothing to send", func(t *testing.T) { t.Run("elides request if nothing to send", func(t *testing.T) {
req := NewFakeRequester() req := NewFakeRequester()
loki := createTestLokiBackend(req, metrics.NewHistorianMetrics(prometheus.NewRegistry())) loki := createTestLokiBackend(req, metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem))
rule := createTestRule() rule := createTestRule()
states := []state.StateTransition{} states := []state.StateTransition{}
@ -463,7 +463,7 @@ grafana_alerting_state_history_writes_total{backend="loki",org="1"} 2
t.Run("succeeds with special chars in labels", func(t *testing.T) { t.Run("succeeds with special chars in labels", func(t *testing.T) {
req := NewFakeRequester() req := NewFakeRequester()
loki := createTestLokiBackend(req, metrics.NewHistorianMetrics(prometheus.NewRegistry())) loki := createTestLokiBackend(req, metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem))
rule := createTestRule() rule := createTestRule()
states := singleFromNormal(&state.State{ states := singleFromNormal(&state.State{
State: eval.Alerting, State: eval.Alerting,
@ -486,7 +486,7 @@ grafana_alerting_state_history_writes_total{backend="loki",org="1"} 2
t.Run("adds external labels to log lines", func(t *testing.T) { t.Run("adds external labels to log lines", func(t *testing.T) {
req := NewFakeRequester() req := NewFakeRequester()
loki := createTestLokiBackend(req, metrics.NewHistorianMetrics(prometheus.NewRegistry())) loki := createTestLokiBackend(req, metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem))
rule := createTestRule() rule := createTestRule()
states := singleFromNormal(&state.State{ states := singleFromNormal(&state.State{
State: eval.Alerting, State: eval.Alerting,
@ -533,12 +533,12 @@ func createTestRule() history_model.RuleMeta {
} }
} }
func requireSingleEntry(t *testing.T, res stream) lokiEntry { func requireSingleEntry(t *testing.T, res Stream) lokiEntry {
require.Len(t, res.Values, 1) require.Len(t, res.Values, 1)
return requireEntry(t, res.Values[0]) return requireEntry(t, res.Values[0])
} }
func requireEntry(t *testing.T, row sample) lokiEntry { func requireEntry(t *testing.T, row Sample) lokiEntry {
t.Helper() t.Helper()
var entry lokiEntry var entry lokiEntry

View File

@ -22,7 +22,7 @@ import (
func BenchmarkProcessEvalResults(b *testing.B) { func BenchmarkProcessEvalResults(b *testing.B) {
as := annotations.FakeAnnotationsRepo{} as := annotations.FakeAnnotationsRepo{}
as.On("SaveMany", mock.Anything, mock.Anything).Return(nil) as.On("SaveMany", mock.Anything, mock.Anything).Return(nil)
metrics := metrics.NewHistorianMetrics(prometheus.NewRegistry()) metrics := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem)
store := historian.NewAnnotationStore(&as, nil, metrics) store := historian.NewAnnotationStore(&as, nil, metrics)
hist := historian.NewAnnotationBackend(store, nil, metrics) hist := historian.NewAnnotationBackend(store, nil, metrics)
cfg := state.ManagerCfg{ cfg := state.ManagerCfg{

View File

@ -227,7 +227,7 @@ func TestDashboardAnnotations(t *testing.T) {
_, dbstore := tests.SetupTestEnv(t, 1) _, dbstore := tests.SetupTestEnv(t, 1)
fakeAnnoRepo := annotationstest.NewFakeAnnotationsRepo() fakeAnnoRepo := annotationstest.NewFakeAnnotationsRepo()
historianMetrics := metrics.NewHistorianMetrics(prometheus.NewRegistry()) historianMetrics := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem)
store := historian.NewAnnotationStore(fakeAnnoRepo, &dashboards.FakeDashboardService{}, historianMetrics) store := historian.NewAnnotationStore(fakeAnnoRepo, &dashboards.FakeDashboardService{}, historianMetrics)
hist := historian.NewAnnotationBackend(store, nil, historianMetrics) hist := historian.NewAnnotationBackend(store, nil, historianMetrics)
cfg := state.ManagerCfg{ cfg := state.ManagerCfg{
@ -1206,7 +1206,7 @@ func TestProcessEvalResults(t *testing.T) {
fakeAnnoRepo := annotationstest.NewFakeAnnotationsRepo() fakeAnnoRepo := annotationstest.NewFakeAnnotationsRepo()
reg := prometheus.NewPedanticRegistry() reg := prometheus.NewPedanticRegistry()
stateMetrics := metrics.NewStateMetrics(reg) stateMetrics := metrics.NewStateMetrics(reg)
m := metrics.NewHistorianMetrics(prometheus.NewRegistry()) m := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem)
store := historian.NewAnnotationStore(fakeAnnoRepo, &dashboards.FakeDashboardService{}, m) store := historian.NewAnnotationStore(fakeAnnoRepo, &dashboards.FakeDashboardService{}, m)
hist := historian.NewAnnotationBackend(store, nil, m) hist := historian.NewAnnotationBackend(store, nil, m)
clk := clock.NewMock() clk := clock.NewMock()