Alerting: Clamp Loki ASH range query to configured max_query_length (#83986)

* Clamp range in loki http client to configured max_query_length

Defaults to 721h to match Loki default
This commit is contained in:
William Wernert
2024-03-15 12:59:45 -04:00
committed by GitHub
parent f2628bfad4
commit 97f37b2e6f
6 changed files with 159 additions and 45 deletions

View File

@@ -88,7 +88,7 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) {
t.Run("can query history by alert id", func(t *testing.T) {
rule := dashboardRules[dashboard1.UID][0]
fakeLokiClient.Response = []historian.Stream{
fakeLokiClient.rangeQueryRes = []historian.Stream{
historian.StatesToStream(ruleMetaFromRule(t, rule), transitions, map[string]string{}, log.NewNopLogger()),
}
@@ -113,7 +113,7 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) {
})
t.Run("can query history by dashboard id", func(t *testing.T) {
fakeLokiClient.Response = []historian.Stream{
fakeLokiClient.rangeQueryRes = []historian.Stream{
historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][0]), transitions, map[string]string{}, log.NewNopLogger()),
historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][1]), transitions, map[string]string{}, log.NewNopLogger()),
}
@@ -139,7 +139,7 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) {
})
t.Run("should return empty results when type is annotation", func(t *testing.T) {
fakeLokiClient.Response = []historian.Stream{
fakeLokiClient.rangeQueryRes = []historian.Stream{
historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][0]), transitions, map[string]string{}, log.NewNopLogger()),
historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][1]), transitions, map[string]string{}, log.NewNopLogger()),
}
@@ -163,7 +163,7 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) {
})
t.Run("should return empty results when history is outside time range", func(t *testing.T) {
fakeLokiClient.Response = []historian.Stream{
fakeLokiClient.rangeQueryRes = []historian.Stream{
historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][0]), transitions, map[string]string{}, log.NewNopLogger()),
historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][1]), transitions, map[string]string{}, log.NewNopLogger()),
}
@@ -188,8 +188,41 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) {
require.Len(t, res, 0)
})
t.Run("should return partial results when history is partly outside clamped time range", func(t *testing.T) {
fakeLokiClient.rangeQueryRes = []historian.Stream{
historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][0]), transitions, map[string]string{}, log.NewNopLogger()),
historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][1]), transitions, map[string]string{}, log.NewNopLogger()),
}
// clamp time range to 1 second
oldMax := fakeLokiClient.cfg.MaxQueryLength
fakeLokiClient.cfg.MaxQueryLength = 1 * time.Second
query := annotations.ItemQuery{
OrgID: 1,
DashboardID: dashboard1.ID,
From: start.Add(-1 * time.Second).UnixMilli(), // should clamp to start
To: start.Add(1 * time.Second).UnixMilli(),
}
res, err := store.Get(
context.Background(),
&query,
&annotation_ac.AccessResources{
Dashboards: map[string]int64{
dashboard1.UID: dashboard1.ID,
},
CanAccessDashAnnotations: true,
},
)
require.NoError(t, err)
require.Len(t, res, 2)
// restore original max query length
fakeLokiClient.cfg.MaxQueryLength = oldMax
})
t.Run("should sort history by time", func(t *testing.T) {
fakeLokiClient.Response = []historian.Stream{
fakeLokiClient.rangeQueryRes = []historian.Stream{
historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][0]), transitions, map[string]string{}, log.NewNopLogger()),
historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][1]), transitions, map[string]string{}, log.NewNopLogger()),
}
@@ -716,11 +749,11 @@ func compareAnnotationItem(t *testing.T, expected, actual *annotations.ItemDTO)
}
type FakeLokiClient struct {
client client.Requester
cfg historian.LokiConfig
metrics *metrics.Historian
log log.Logger
Response []historian.Stream
client client.Requester
cfg historian.LokiConfig
metrics *metrics.Historian
log log.Logger
rangeQueryRes []historian.Stream
}
func NewFakeLokiClient() *FakeLokiClient {
@@ -731,19 +764,23 @@ func NewFakeLokiClient() *FakeLokiClient {
return &FakeLokiClient{
client: client.NewTimedClient(req, metrics.WriteDuration),
cfg: historian.LokiConfig{
WritePathURL: url,
ReadPathURL: url,
Encoder: historian.JsonEncoder{},
WritePathURL: url,
ReadPathURL: url,
Encoder: historian.JsonEncoder{},
MaxQueryLength: 721 * time.Hour,
},
metrics: metrics,
log: log.New("ngalert.state.historian", "backend", "loki"),
}
}
func (c *FakeLokiClient) RangeQuery(_ context.Context, _ string, from, to, _ int64) (historian.QueryRes, error) {
streams := make([]historian.Stream, len(c.Response))
func (c *FakeLokiClient) RangeQuery(ctx context.Context, query string, from, to, limit int64) (historian.QueryRes, error) {
streams := make([]historian.Stream, len(c.rangeQueryRes))
for n, stream := range c.Response {
// clamp time range using logic from historian
from, to = historian.ClampRange(from, to, c.cfg.MaxQueryLength.Nanoseconds())
for n, stream := range c.rangeQueryRes {
streams[n].Stream = stream.Stream
streams[n].Values = []historian.Sample{}
for _, sample := range stream.Values {
@@ -759,8 +796,9 @@ func (c *FakeLokiClient) RangeQuery(_ context.Context, _ string, from, to, _ int
Result: streams,
},
}
// reset expected streams on read
c.Response = []historian.Stream{}
c.rangeQueryRes = []historian.Stream{}
return res, nil
}