diff --git a/pkg/services/ngalert/metrics/historian.go b/pkg/services/ngalert/metrics/historian.go index fa5d55965fd..9d2febf9890 100644 --- a/pkg/services/ngalert/metrics/historian.go +++ b/pkg/services/ngalert/metrics/historian.go @@ -16,48 +16,48 @@ type Historian struct { BytesWritten prometheus.Counter } -func NewHistorianMetrics(r prometheus.Registerer) *Historian { +func NewHistorianMetrics(r prometheus.Registerer, subsystem string) *Historian { return &Historian{ Info: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{ Namespace: Namespace, - Subsystem: Subsystem, + Subsystem: subsystem, Name: "state_history_info", Help: "Information about the state history store.", }, []string{"backend"}), TransitionsTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Namespace: Namespace, - Subsystem: Subsystem, + Subsystem: subsystem, Name: "state_history_transitions_total", Help: "The total number of state transitions processed.", }, []string{"org"}), TransitionsFailed: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Namespace: Namespace, - Subsystem: Subsystem, + Subsystem: subsystem, Name: "state_history_transitions_failed_total", Help: "The total number of state transitions that failed to be written - they are not retried.", }, []string{"org"}), WritesTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Namespace: Namespace, - Subsystem: Subsystem, + Subsystem: subsystem, Name: "state_history_writes_total", Help: "The total number of state history batches that were attempted to be written.", }, []string{"org", "backend"}), WritesFailed: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Namespace: Namespace, - Subsystem: Subsystem, + Subsystem: subsystem, Name: "state_history_writes_failed_total", Help: "The total number of failed writes of state history batches.", }, []string{"org", "backend"}), WriteDuration: instrument.NewHistogramCollector(promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ Namespace: Namespace, - Subsystem: Subsystem, + Subsystem: subsystem, Name: "state_history_request_duration_seconds", Help: "Histogram of request durations to the state history store. Only valid when using external stores.", Buckets: instrument.DefBuckets, }, instrument.HistogramCollectorBuckets)), BytesWritten: promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: Namespace, - Subsystem: Subsystem, + Subsystem: subsystem, Name: "state_history_writes_bytes_total", Help: "The total number of bytes sent within a batch to the state history store. Only valid when using the Loki store.", }), diff --git a/pkg/services/ngalert/metrics/ngalert.go b/pkg/services/ngalert/metrics/ngalert.go index ddffd9408fa..e5f7df291e2 100644 --- a/pkg/services/ngalert/metrics/ngalert.go +++ b/pkg/services/ngalert/metrics/ngalert.go @@ -40,7 +40,7 @@ func NewNGAlert(r prometheus.Registerer) *NGAlert { stateMetrics: NewStateMetrics(r), multiOrgAlertmanagerMetrics: NewMultiOrgAlertmanagerMetrics(r), apiMetrics: NewAPIMetrics(r), - historianMetrics: NewHistorianMetrics(r), + historianMetrics: NewHistorianMetrics(r, Subsystem), } } diff --git a/pkg/services/ngalert/ngalert_test.go b/pkg/services/ngalert/ngalert_test.go index 04e1997ea4f..b7844f1fe18 100644 --- a/pkg/services/ngalert/ngalert_test.go +++ b/pkg/services/ngalert/ngalert_test.go @@ -61,7 +61,7 @@ func Test_subscribeToFolderChanges(t *testing.T) { func TestConfigureHistorianBackend(t *testing.T) { t.Run("fail initialization if invalid backend", func(t *testing.T) { - met := metrics.NewHistorianMetrics(prometheus.NewRegistry()) + met := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem) logger := log.NewNopLogger() cfg := setting.UnifiedAlertingStateHistorySettings{ Enabled: true, @@ -74,7 +74,7 @@ func TestConfigureHistorianBackend(t *testing.T) { }) t.Run("fail initialization if invalid multi-backend primary", func(t *testing.T) { - met := metrics.NewHistorianMetrics(prometheus.NewRegistry()) + met := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem) logger := log.NewNopLogger() cfg := setting.UnifiedAlertingStateHistorySettings{ Enabled: true, @@ -89,7 +89,7 @@ func TestConfigureHistorianBackend(t *testing.T) { }) t.Run("fail initialization if invalid multi-backend secondary", func(t *testing.T) { - met := metrics.NewHistorianMetrics(prometheus.NewRegistry()) + met := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem) logger := log.NewNopLogger() cfg := setting.UnifiedAlertingStateHistorySettings{ Enabled: true, @@ -105,7 +105,7 @@ func TestConfigureHistorianBackend(t *testing.T) { }) t.Run("do not fail initialization if pinging Loki fails", func(t *testing.T) { - met := metrics.NewHistorianMetrics(prometheus.NewRegistry()) + met := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem) logger := log.NewNopLogger() cfg := setting.UnifiedAlertingStateHistorySettings{ Enabled: true, @@ -123,7 +123,7 @@ func TestConfigureHistorianBackend(t *testing.T) { t.Run("emit metric describing chosen backend", func(t *testing.T) { reg := prometheus.NewRegistry() - met := metrics.NewHistorianMetrics(reg) + met := metrics.NewHistorianMetrics(reg, metrics.Subsystem) logger := log.NewNopLogger() cfg := setting.UnifiedAlertingStateHistorySettings{ Enabled: true, @@ -145,7 +145,7 @@ grafana_alerting_state_history_info{backend="annotations"} 1 t.Run("emit special zero metric if state history disabled", func(t *testing.T) { reg := prometheus.NewRegistry() - met := metrics.NewHistorianMetrics(reg) + met := metrics.NewHistorianMetrics(reg, metrics.Subsystem) logger := log.NewNopLogger() cfg := setting.UnifiedAlertingStateHistorySettings{ Enabled: false, diff --git a/pkg/services/ngalert/state/historian/annotation_test.go b/pkg/services/ngalert/state/historian/annotation_test.go index bf2e59074aa..4108323cfed 100644 --- a/pkg/services/ngalert/state/historian/annotation_test.go +++ b/pkg/services/ngalert/state/historian/annotation_test.go @@ -62,7 +62,7 @@ func TestAnnotationHistorian(t *testing.T) { t.Run("emits expected write metrics", func(t *testing.T) { reg := prometheus.NewRegistry() - met := metrics.NewHistorianMetrics(reg) + met := metrics.NewHistorianMetrics(reg, metrics.Subsystem) anns := createTestAnnotationBackendSutWithMetrics(t, met) errAnns := createFailingAnnotationSut(t, met) rule := createTestRule() @@ -101,7 +101,7 @@ grafana_alerting_state_history_writes_total{backend="annotations",org="1"} 2 } func createTestAnnotationBackendSut(t *testing.T) *AnnotationBackend { - return createTestAnnotationBackendSutWithMetrics(t, metrics.NewHistorianMetrics(prometheus.NewRegistry())) + return createTestAnnotationBackendSutWithMetrics(t, metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem)) } func createTestAnnotationBackendSutWithMetrics(t *testing.T, met *metrics.Historian) *AnnotationBackend { diff --git a/pkg/services/ngalert/state/historian/encode.go b/pkg/services/ngalert/state/historian/encode.go index eed07c8f534..18a551c31b8 100644 --- a/pkg/services/ngalert/state/historian/encode.go +++ b/pkg/services/ngalert/state/historian/encode.go @@ -16,9 +16,9 @@ import ( type JsonEncoder struct{} -func (e JsonEncoder) encode(s []stream) ([]byte, error) { +func (e JsonEncoder) encode(s []Stream) ([]byte, error) { body := struct { - Streams []stream `json:"streams"` + Streams []Stream `json:"streams"` }{Streams: s} enc, err := json.Marshal(body) if err != nil { @@ -35,7 +35,7 @@ func (e JsonEncoder) headers() map[string]string { type SnappyProtoEncoder struct{} -func (e SnappyProtoEncoder) encode(s []stream) ([]byte, error) { +func (e SnappyProtoEncoder) encode(s []Stream) ([]byte, error) { body := logproto.PushRequest{ Streams: make([]logproto.Stream, 0, len(s)), } diff --git a/pkg/services/ngalert/state/historian/loki.go b/pkg/services/ngalert/state/historian/loki.go index 117a9d151c5..f5392fb709a 100644 --- a/pkg/services/ngalert/state/historian/loki.go +++ b/pkg/services/ngalert/state/historian/loki.go @@ -41,9 +41,9 @@ const ( const defaultQueryRange = 6 * time.Hour type remoteLokiClient interface { - ping(context.Context) error - push(context.Context, []stream) error - rangeQuery(ctx context.Context, logQL string, start, end, limit int64) (queryRes, error) + Ping(context.Context) error + Push(context.Context, []Stream) error + RangeQuery(ctx context.Context, logQL string, start, end, limit int64) (QueryRes, error) } // RemoteLokibackend is a state.Historian that records state history to an external Loki instance. @@ -58,7 +58,7 @@ type RemoteLokiBackend struct { func NewRemoteLokiBackend(cfg LokiConfig, req client.Requester, metrics *metrics.Historian) *RemoteLokiBackend { logger := log.New("ngalert.state.historian", "backend", "loki") return &RemoteLokiBackend{ - client: newLokiClient(cfg, req, metrics, logger), + client: NewLokiClient(cfg, req, metrics, logger), externalLabels: cfg.ExternalLabels, clock: clock.New(), metrics: metrics, @@ -67,7 +67,7 @@ func NewRemoteLokiBackend(cfg LokiConfig, req client.Requester, metrics *metrics } func (h *RemoteLokiBackend) TestConnection(ctx context.Context) error { - return h.client.ping(ctx) + return h.client.Ping(ctx) } // Record writes a number of state transitions for a given rule to an external Loki instance. @@ -100,7 +100,7 @@ func (h *RemoteLokiBackend) Record(ctx context.Context, rule history_model.RuleM h.metrics.WritesTotal.WithLabelValues(org, "loki").Inc() h.metrics.TransitionsTotal.WithLabelValues(org).Add(float64(len(logStream.Values))) - if err := h.recordStreams(ctx, []stream{logStream}, logger); err != nil { + if err := h.recordStreams(ctx, []Stream{logStream}, logger); err != nil { logger.Error("Failed to save alert state history batch", "error", err) h.metrics.WritesFailed.WithLabelValues(org, "loki").Inc() h.metrics.TransitionsFailed.WithLabelValues(org).Add(float64(len(logStream.Values))) @@ -126,7 +126,7 @@ func (h *RemoteLokiBackend) Query(ctx context.Context, query models.HistoryQuery } // Timestamps are expected in RFC3339Nano. - res, err := h.client.rangeQuery(ctx, logQL, query.From.UnixNano(), query.To.UnixNano(), int64(query.Limit)) + res, err := h.client.RangeQuery(ctx, logQL, query.From.UnixNano(), query.To.UnixNano(), int64(query.Limit)) if err != nil { return nil, err } @@ -155,7 +155,7 @@ func buildSelectors(query models.HistoryQuery) ([]Selector, error) { } // merge will put all the results in one array sorted by timestamp. -func merge(res queryRes, ruleUID string) (*data.Frame, error) { +func merge(res QueryRes, ruleUID string) (*data.Frame, error) { // Find the total number of elements in all arrays. totalLen := 0 for _, arr := range res.Data.Result { @@ -181,7 +181,7 @@ func merge(res queryRes, ruleUID string) (*data.Frame, error) { pointers := make([]int, len(res.Data.Result)) for { minTime := int64(math.MaxInt64) - minEl := sample{} + minEl := Sample{} minElStreamIdx := -1 // Find the element with the earliest time among all arrays. for i, stream := range res.Data.Result { @@ -231,7 +231,7 @@ func merge(res queryRes, ruleUID string) (*data.Frame, error) { return frame, nil } -func statesToStream(rule history_model.RuleMeta, states []state.StateTransition, externalLabels map[string]string, logger log.Logger) stream { +func statesToStream(rule history_model.RuleMeta, states []state.StateTransition, externalLabels map[string]string, logger log.Logger) Stream { labels := mergeLabels(make(map[string]string), externalLabels) // System-defined labels take precedence over user-defined external labels. labels[StateHistoryLabelKey] = StateHistoryLabelValue @@ -239,7 +239,7 @@ func statesToStream(rule history_model.RuleMeta, states []state.StateTransition, labels[GroupLabel] = fmt.Sprint(rule.Group) labels[FolderUIDLabel] = fmt.Sprint(rule.NamespaceUID) - samples := make([]sample, 0, len(states)) + samples := make([]Sample, 0, len(states)) for _, state := range states { if !shouldRecord(state) { continue @@ -269,20 +269,20 @@ func statesToStream(rule history_model.RuleMeta, states []state.StateTransition, } line := string(jsn) - samples = append(samples, sample{ + samples = append(samples, Sample{ T: state.State.LastEvaluationTime, V: line, }) } - return stream{ + return Stream{ Stream: labels, Values: samples, } } -func (h *RemoteLokiBackend) recordStreams(ctx context.Context, streams []stream, logger log.Logger) error { - if err := h.client.push(ctx, streams); err != nil { +func (h *RemoteLokiBackend) recordStreams(ctx context.Context, streams []Stream, logger log.Logger) error { + if err := h.client.Push(ctx, streams); err != nil { return err } diff --git a/pkg/services/ngalert/state/historian/loki_http.go b/pkg/services/ngalert/state/historian/loki_http.go index 399707d541f..003904e9871 100644 --- a/pkg/services/ngalert/state/historian/loki_http.go +++ b/pkg/services/ngalert/state/historian/loki_http.go @@ -27,7 +27,7 @@ func NewRequester() client.Requester { // encoder serializes log streams to some byte format. type encoder interface { // encode serializes a set of log streams to bytes. - encode(s []stream) ([]byte, error) + encode(s []Stream) ([]byte, error) // headers returns a set of HTTP-style headers that describes the encoding scheme used. headers() map[string]string } @@ -79,7 +79,7 @@ func NewLokiConfig(cfg setting.UnifiedAlertingStateHistorySettings) (LokiConfig, }, nil } -type httpLokiClient struct { +type HttpLokiClient struct { client client.Requester encoder encoder cfg LokiConfig @@ -101,9 +101,9 @@ const ( NeqRegEx Operator = "!~" ) -func newLokiClient(cfg LokiConfig, req client.Requester, metrics *metrics.Historian, logger log.Logger) *httpLokiClient { +func NewLokiClient(cfg LokiConfig, req client.Requester, metrics *metrics.Historian, logger log.Logger) *HttpLokiClient { tc := client.NewTimedClient(req, metrics.WriteDuration) - return &httpLokiClient{ + return &HttpLokiClient{ client: tc, encoder: cfg.Encoder, cfg: cfg, @@ -112,7 +112,7 @@ func newLokiClient(cfg LokiConfig, req client.Requester, metrics *metrics.Histor } } -func (c *httpLokiClient) ping(ctx context.Context) error { +func (c *HttpLokiClient) Ping(ctx context.Context) error { uri := c.cfg.ReadPathURL.JoinPath("/loki/api/v1/labels") req, err := http.NewRequest(http.MethodGet, uri.String(), nil) if err != nil { @@ -140,23 +140,23 @@ func (c *httpLokiClient) ping(ctx context.Context) error { return nil } -type stream struct { +type Stream struct { Stream map[string]string `json:"stream"` - Values []sample `json:"values"` + Values []Sample `json:"values"` } -type sample struct { +type Sample struct { T time.Time V string } -func (r *sample) MarshalJSON() ([]byte, error) { +func (r *Sample) MarshalJSON() ([]byte, error) { return json.Marshal([2]string{ fmt.Sprintf("%d", r.T.UnixNano()), r.V, }) } -func (r *sample) UnmarshalJSON(b []byte) error { +func (r *Sample) UnmarshalJSON(b []byte) error { // A Loki stream sample is formatted like a list with two elements, [At, Val] // At is a string wrapping a timestamp, in nanosecond unix epoch. // Val is a string containing the log line. @@ -173,7 +173,7 @@ func (r *sample) UnmarshalJSON(b []byte) error { return nil } -func (c *httpLokiClient) push(ctx context.Context, s []stream) error { +func (c *HttpLokiClient) Push(ctx context.Context, s []Stream) error { enc, err := c.encoder.encode(s) if err != nil { return err @@ -216,7 +216,7 @@ func (c *httpLokiClient) push(ctx context.Context, s []stream) error { return nil } -func (c *httpLokiClient) setAuthAndTenantHeaders(req *http.Request) { +func (c *HttpLokiClient) setAuthAndTenantHeaders(req *http.Request) { if c.cfg.BasicAuthUser != "" || c.cfg.BasicAuthPassword != "" { req.SetBasicAuth(c.cfg.BasicAuthUser, c.cfg.BasicAuthPassword) } @@ -226,10 +226,10 @@ func (c *httpLokiClient) setAuthAndTenantHeaders(req *http.Request) { } } -func (c *httpLokiClient) rangeQuery(ctx context.Context, logQL string, start, end, limit int64) (queryRes, error) { +func (c *HttpLokiClient) RangeQuery(ctx context.Context, logQL string, start, end, limit int64) (QueryRes, error) { // Run the pre-flight checks for the query. if start > end { - return queryRes{}, fmt.Errorf("start time cannot be after end time") + return QueryRes{}, fmt.Errorf("start time cannot be after end time") } if limit < 1 { limit = defaultPageSize @@ -251,7 +251,7 @@ func (c *httpLokiClient) rangeQuery(ctx context.Context, logQL string, start, en req, err := http.NewRequest(http.MethodGet, queryURL.String(), nil) if err != nil { - return queryRes{}, fmt.Errorf("error creating request: %w", err) + return QueryRes{}, fmt.Errorf("error creating request: %w", err) } req = req.WithContext(ctx) @@ -259,7 +259,7 @@ func (c *httpLokiClient) rangeQuery(ctx context.Context, logQL string, start, en res, err := c.client.Do(req) if err != nil { - return queryRes{}, fmt.Errorf("error executing request: %w", err) + return QueryRes{}, fmt.Errorf("error executing request: %w", err) } defer func() { @@ -268,7 +268,7 @@ func (c *httpLokiClient) rangeQuery(ctx context.Context, logQL string, start, en data, err := io.ReadAll(res.Body) if err != nil { - return queryRes{}, fmt.Errorf("error reading request response: %w", err) + return QueryRes{}, fmt.Errorf("error reading request response: %w", err) } if res.StatusCode < 200 || res.StatusCode >= 300 { @@ -277,23 +277,23 @@ func (c *httpLokiClient) rangeQuery(ctx context.Context, logQL string, start, en } else { c.log.Error("Error response from Loki with an empty body", "status", res.StatusCode) } - return queryRes{}, fmt.Errorf("received a non-200 response from loki, status: %d", res.StatusCode) + return QueryRes{}, fmt.Errorf("received a non-200 response from loki, status: %d", res.StatusCode) } - result := queryRes{} + result := QueryRes{} err = json.Unmarshal(data, &result) if err != nil { fmt.Println(string(data)) - return queryRes{}, fmt.Errorf("error parsing request response: %w", err) + return QueryRes{}, fmt.Errorf("error parsing request response: %w", err) } return result, nil } -type queryRes struct { - Data queryData `json:"data"` +type QueryRes struct { + Data QueryData `json:"data"` } -type queryData struct { - Result []stream `json:"result"` +type QueryData struct { + Result []Stream `json:"result"` } diff --git a/pkg/services/ngalert/state/historian/loki_http_test.go b/pkg/services/ngalert/state/historian/loki_http_test.go index d47313e5c46..764f9e61f82 100644 --- a/pkg/services/ngalert/state/historian/loki_http_test.go +++ b/pkg/services/ngalert/state/historian/loki_http_test.go @@ -111,10 +111,10 @@ func TestLokiHTTPClient(t *testing.T) { req := NewFakeRequester() client := createTestLokiClient(req) now := time.Now().UTC() - data := []stream{ + data := []Stream{ { Stream: map[string]string{}, - Values: []sample{ + Values: []Sample{ { T: now, V: "some line", @@ -123,7 +123,7 @@ func TestLokiHTTPClient(t *testing.T) { }, } - err := client.push(context.Background(), data) + err := client.Push(context.Background(), data) require.NoError(t, err) require.Contains(t, "/loki/api/v1/push", req.lastRequest.URL.Path) @@ -145,7 +145,7 @@ func TestLokiHTTPClient(t *testing.T) { now := time.Now().UTC().UnixNano() q := `{from="state-history"}` - _, err := client.rangeQuery(context.Background(), q, now-100, now, 1100) + _, err := client.RangeQuery(context.Background(), q, now-100, now, 1100) require.NoError(t, err) params := req.lastRequest.URL.Query() @@ -165,7 +165,7 @@ func TestLokiHTTPClient(t *testing.T) { now := time.Now().UTC().UnixNano() q := `{from="state-history"}` - _, err := client.rangeQuery(context.Background(), q, now-100, now, 0) + _, err := client.RangeQuery(context.Background(), q, now-100, now, 0) require.NoError(t, err) params := req.lastRequest.URL.Query() @@ -185,7 +185,7 @@ func TestLokiHTTPClient(t *testing.T) { now := time.Now().UTC().UnixNano() q := `{from="state-history"}` - _, err := client.rangeQuery(context.Background(), q, now-100, now, -100) + _, err := client.RangeQuery(context.Background(), q, now-100, now, -100) require.NoError(t, err) params := req.lastRequest.URL.Query() @@ -205,7 +205,7 @@ func TestLokiHTTPClient(t *testing.T) { now := time.Now().UTC().UnixNano() q := `{from="state-history"}` - _, err := client.rangeQuery(context.Background(), q, now-100, now, maximumPageSize+1000) + _, err := client.RangeQuery(context.Background(), q, now-100, now, maximumPageSize+1000) require.NoError(t, err) params := req.lastRequest.URL.Query() @@ -223,14 +223,14 @@ func TestLokiHTTPClient_Manual(t *testing.T) { url, err := url.Parse("https://logs-prod-eu-west-0.grafana.net") require.NoError(t, err) - client := newLokiClient(LokiConfig{ + client := NewLokiClient(LokiConfig{ ReadPathURL: url, WritePathURL: url, Encoder: JsonEncoder{}, - }, NewRequester(), metrics.NewHistorianMetrics(prometheus.NewRegistry()), log.NewNopLogger()) + }, NewRequester(), metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem), log.NewNopLogger()) // Unauthorized request should fail against Grafana Cloud. - err = client.ping(context.Background()) + err = client.Ping(context.Background()) require.Error(t, err) client.cfg.BasicAuthUser = "" @@ -241,7 +241,7 @@ func TestLokiHTTPClient_Manual(t *testing.T) { // client.cfg.TenantID = "" // Authorized request should not fail against Grafana Cloud. - err = client.ping(context.Background()) + err = client.Ping(context.Background()) require.NoError(t, err) }) @@ -249,13 +249,13 @@ func TestLokiHTTPClient_Manual(t *testing.T) { url, err := url.Parse("https://logs-prod-eu-west-0.grafana.net") require.NoError(t, err) - client := newLokiClient(LokiConfig{ + client := NewLokiClient(LokiConfig{ ReadPathURL: url, WritePathURL: url, BasicAuthUser: "", BasicAuthPassword: "", Encoder: JsonEncoder{}, - }, NewRequester(), metrics.NewHistorianMetrics(prometheus.NewRegistry()), log.NewNopLogger()) + }, NewRequester(), metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem), log.NewNopLogger()) // When running on prem, you might need to set the tenant id, // so the x-scope-orgid header is set. @@ -268,7 +268,7 @@ func TestLokiHTTPClient_Manual(t *testing.T) { end := time.Now().UnixNano() // Authorized request should not fail against Grafana Cloud. - res, err := client.rangeQuery(context.Background(), logQL, start, end, defaultPageSize) + res, err := client.RangeQuery(context.Background(), logQL, start, end, defaultPageSize) require.NoError(t, err) require.NotNil(t, res) }) @@ -276,7 +276,7 @@ func TestLokiHTTPClient_Manual(t *testing.T) { func TestRow(t *testing.T) { t.Run("marshal", func(t *testing.T) { - row := sample{ + row := Sample{ T: time.Unix(0, 1234), V: "some sample", } @@ -290,7 +290,7 @@ func TestRow(t *testing.T) { t.Run("unmarshal", func(t *testing.T) { jsn := []byte(`["1234", "some sample"]`) - row := sample{} + row := Sample{} err := json.Unmarshal(jsn, &row) require.NoError(t, err) @@ -301,7 +301,7 @@ func TestRow(t *testing.T) { t.Run("unmarshal invalid", func(t *testing.T) { jsn := []byte(`{"key": "wrong shape"}`) - row := sample{} + row := Sample{} err := json.Unmarshal(jsn, &row) require.ErrorContains(t, err, "failed to deserialize sample") @@ -310,7 +310,7 @@ func TestRow(t *testing.T) { t.Run("unmarshal bad timestamp", func(t *testing.T) { jsn := []byte(`["not-unix-nano", "some sample"]`) - row := sample{} + row := Sample{} err := json.Unmarshal(jsn, &row) require.ErrorContains(t, err, "timestamp in Loki sample") @@ -319,9 +319,9 @@ func TestRow(t *testing.T) { func TestStream(t *testing.T) { t.Run("marshal", func(t *testing.T) { - stream := stream{ + stream := Stream{ Stream: map[string]string{"a": "b"}, - Values: []sample{ + Values: []Sample{ {T: time.Unix(0, 1), V: "one"}, {T: time.Unix(0, 2), V: "two"}, }, @@ -338,15 +338,15 @@ func TestStream(t *testing.T) { }) } -func createTestLokiClient(req client.Requester) *httpLokiClient { +func createTestLokiClient(req client.Requester) *HttpLokiClient { url, _ := url.Parse("http://some.url") cfg := LokiConfig{ WritePathURL: url, ReadPathURL: url, Encoder: JsonEncoder{}, } - met := metrics.NewHistorianMetrics(prometheus.NewRegistry()) - return newLokiClient(cfg, req, met, log.NewNopLogger()) + met := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem) + return NewLokiClient(cfg, req, met, log.NewNopLogger()) } func reqBody(t *testing.T, req *http.Request) string { diff --git a/pkg/services/ngalert/state/historian/loki_test.go b/pkg/services/ngalert/state/historian/loki_test.go index a6abc4176c8..586ecaa5cf8 100644 --- a/pkg/services/ngalert/state/historian/loki_test.go +++ b/pkg/services/ngalert/state/historian/loki_test.go @@ -290,20 +290,20 @@ func TestRemoteLokiBackend(t *testing.T) { func TestMerge(t *testing.T) { testCases := []struct { name string - res queryRes + res QueryRes ruleID string expectedTime []time.Time }{ { name: "Should return values from multiple streams in right order", - res: queryRes{ - Data: queryData{ - Result: []stream{ + res: QueryRes{ + Data: QueryData{ + Result: []Stream{ { Stream: map[string]string{ "current": "pending", }, - Values: []sample{ + Values: []Sample{ {time.Unix(0, 1), `{"schemaVersion": 1, "previous": "normal", "current": "pending", "values":{"a": "b"}}`}, }, }, @@ -311,7 +311,7 @@ func TestMerge(t *testing.T) { Stream: map[string]string{ "current": "firing", }, - Values: []sample{ + Values: []Sample{ {time.Unix(0, 2), `{"schemaVersion": 1, "previous": "pending", "current": "firing", "values":{"a": "b"}}`}, }, }, @@ -326,14 +326,14 @@ func TestMerge(t *testing.T) { }, { name: "Should handle empty values", - res: queryRes{ - Data: queryData{ - Result: []stream{ + res: QueryRes{ + Data: QueryData{ + Result: []Stream{ { Stream: map[string]string{ "current": "normal", }, - Values: []sample{}, + Values: []Sample{}, }, }, }, @@ -343,14 +343,14 @@ func TestMerge(t *testing.T) { }, { name: "Should handle multiple values in one stream", - res: queryRes{ - Data: queryData{ - Result: []stream{ + res: QueryRes{ + Data: QueryData{ + Result: []Stream{ { Stream: map[string]string{ "current": "normal", }, - Values: []sample{ + Values: []Sample{ {time.Unix(0, 1), `{"schemaVersion": 1, "previous": "firing", "current": "normal", "values":{"a": "b"}}`}, {time.Unix(0, 2), `{"schemaVersion": 1, "previous": "firing", "current": "normal", "values":{"a": "b"}}`}, }, @@ -359,7 +359,7 @@ func TestMerge(t *testing.T) { Stream: map[string]string{ "current": "firing", }, - Values: []sample{ + Values: []Sample{ {time.Unix(0, 3), `{"schemaVersion": 1, "previous": "pending", "current": "firing", "values":{"a": "b"}}`}, }, }, @@ -399,7 +399,7 @@ func TestMerge(t *testing.T) { func TestRecordStates(t *testing.T) { t.Run("writes state transitions to loki", func(t *testing.T) { req := NewFakeRequester() - loki := createTestLokiBackend(req, metrics.NewHistorianMetrics(prometheus.NewRegistry())) + loki := createTestLokiBackend(req, metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem)) rule := createTestRule() states := singleFromNormal(&state.State{ State: eval.Alerting, @@ -414,7 +414,7 @@ func TestRecordStates(t *testing.T) { t.Run("emits expected write metrics", func(t *testing.T) { reg := prometheus.NewRegistry() - met := metrics.NewHistorianMetrics(reg) + met := metrics.NewHistorianMetrics(reg, metrics.Subsystem) loki := createTestLokiBackend(NewFakeRequester(), met) errLoki := createTestLokiBackend(NewFakeRequester().WithResponse(badResponse()), met) //nolint:bodyclose rule := createTestRule() @@ -451,7 +451,7 @@ grafana_alerting_state_history_writes_total{backend="loki",org="1"} 2 t.Run("elides request if nothing to send", func(t *testing.T) { req := NewFakeRequester() - loki := createTestLokiBackend(req, metrics.NewHistorianMetrics(prometheus.NewRegistry())) + loki := createTestLokiBackend(req, metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem)) rule := createTestRule() states := []state.StateTransition{} @@ -463,7 +463,7 @@ grafana_alerting_state_history_writes_total{backend="loki",org="1"} 2 t.Run("succeeds with special chars in labels", func(t *testing.T) { req := NewFakeRequester() - loki := createTestLokiBackend(req, metrics.NewHistorianMetrics(prometheus.NewRegistry())) + loki := createTestLokiBackend(req, metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem)) rule := createTestRule() states := singleFromNormal(&state.State{ State: eval.Alerting, @@ -486,7 +486,7 @@ grafana_alerting_state_history_writes_total{backend="loki",org="1"} 2 t.Run("adds external labels to log lines", func(t *testing.T) { req := NewFakeRequester() - loki := createTestLokiBackend(req, metrics.NewHistorianMetrics(prometheus.NewRegistry())) + loki := createTestLokiBackend(req, metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem)) rule := createTestRule() states := singleFromNormal(&state.State{ State: eval.Alerting, @@ -533,12 +533,12 @@ func createTestRule() history_model.RuleMeta { } } -func requireSingleEntry(t *testing.T, res stream) lokiEntry { +func requireSingleEntry(t *testing.T, res Stream) lokiEntry { require.Len(t, res.Values, 1) return requireEntry(t, res.Values[0]) } -func requireEntry(t *testing.T, row sample) lokiEntry { +func requireEntry(t *testing.T, row Sample) lokiEntry { t.Helper() var entry lokiEntry diff --git a/pkg/services/ngalert/state/manager_bench_test.go b/pkg/services/ngalert/state/manager_bench_test.go index c47d0477032..3e6ce0e2ec0 100644 --- a/pkg/services/ngalert/state/manager_bench_test.go +++ b/pkg/services/ngalert/state/manager_bench_test.go @@ -22,7 +22,7 @@ import ( func BenchmarkProcessEvalResults(b *testing.B) { as := annotations.FakeAnnotationsRepo{} as.On("SaveMany", mock.Anything, mock.Anything).Return(nil) - metrics := metrics.NewHistorianMetrics(prometheus.NewRegistry()) + metrics := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem) store := historian.NewAnnotationStore(&as, nil, metrics) hist := historian.NewAnnotationBackend(store, nil, metrics) cfg := state.ManagerCfg{ diff --git a/pkg/services/ngalert/state/manager_test.go b/pkg/services/ngalert/state/manager_test.go index e24934a6376..32858008667 100644 --- a/pkg/services/ngalert/state/manager_test.go +++ b/pkg/services/ngalert/state/manager_test.go @@ -227,7 +227,7 @@ func TestDashboardAnnotations(t *testing.T) { _, dbstore := tests.SetupTestEnv(t, 1) fakeAnnoRepo := annotationstest.NewFakeAnnotationsRepo() - historianMetrics := metrics.NewHistorianMetrics(prometheus.NewRegistry()) + historianMetrics := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem) store := historian.NewAnnotationStore(fakeAnnoRepo, &dashboards.FakeDashboardService{}, historianMetrics) hist := historian.NewAnnotationBackend(store, nil, historianMetrics) cfg := state.ManagerCfg{ @@ -1206,7 +1206,7 @@ func TestProcessEvalResults(t *testing.T) { fakeAnnoRepo := annotationstest.NewFakeAnnotationsRepo() reg := prometheus.NewPedanticRegistry() stateMetrics := metrics.NewStateMetrics(reg) - m := metrics.NewHistorianMetrics(prometheus.NewRegistry()) + m := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem) store := historian.NewAnnotationStore(fakeAnnoRepo, &dashboards.FakeDashboardService{}, m) hist := historian.NewAnnotationBackend(store, nil, m) clk := clock.NewMock()