From 958fb2c50ab50b28dcc1707e124965760b8fa26a Mon Sep 17 00:00:00 2001 From: Alexander Weaver Date: Sat, 11 Feb 2023 05:17:44 -0600 Subject: [PATCH] Alerting: Unify structs in Loki client and make them more consistent with Prometheus (#63055) * Use existing row struct instead of [2]string, add deserialization helper * Replace Stream struct with stream struct which is exactly the same * Drop unused status field * Don't export queryRes and queryData * Tests for custom marshalling * Rename row fields to T and V for consistency with prometheus samples * Rename row to sample --- pkg/services/ngalert/state/historian/loki.go | 29 ++++---- .../ngalert/state/historian/loki_http.go | 66 +++++++++++-------- .../ngalert/state/historian/loki_http_test.go | 65 ++++++++++++++++++ .../ngalert/state/historian/loki_test.go | 44 ++++++------- 4 files changed, 137 insertions(+), 67 deletions(-) diff --git a/pkg/services/ngalert/state/historian/loki.go b/pkg/services/ngalert/state/historian/loki.go index a4a63d063a9..8de8dffbf90 100644 --- a/pkg/services/ngalert/state/historian/loki.go +++ b/pkg/services/ngalert/state/historian/loki.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "math" - "strconv" "time" "github.com/grafana/grafana-plugin-sdk-go/data" @@ -39,7 +38,7 @@ const defaultQueryRange = 6 * time.Hour type remoteLokiClient interface { ping(context.Context) error push(context.Context, []stream) error - rangeQuery(ctx context.Context, selectors []Selector, start, end int64) (QueryRes, error) + rangeQuery(ctx context.Context, selectors []Selector, start, end int64) (queryRes, error) } type RemoteLokiBackend struct { @@ -139,7 +138,7 @@ func buildSelectors(query models.HistoryQuery) ([]Selector, error) { } // merge will put all the results in one array sorted by timestamp. -func merge(res QueryRes, ruleUID string) (*data.Frame, error) { +func merge(res queryRes, ruleUID string) (*data.Frame, error) { // Find the total number of elements in all arrays. totalLen := 0 for _, arr := range res.Data.Result { @@ -165,7 +164,7 @@ func merge(res QueryRes, ruleUID string) (*data.Frame, error) { pointers := make([]int, len(res.Data.Result)) for { minTime := int64(math.MaxInt64) - minEl := [2]string{} + minEl := sample{} minElStreamIdx := -1 // Find the element with the earliest time among all arrays. for i, stream := range res.Data.Result { @@ -173,10 +172,7 @@ func merge(res QueryRes, ruleUID string) (*data.Frame, error) { if len(stream.Values) == pointers[i] { continue } - curTime, err := strconv.ParseInt(stream.Values[pointers[i]][0], 10, 64) - if err != nil { - return nil, fmt.Errorf("failed to parse timestamp from loki response: %w", err) - } + curTime := stream.Values[pointers[i]].T.UnixNano() if pointers[i] < len(stream.Values) && curTime < minTime { minTime = curTime minEl = stream.Values[pointers[i]] @@ -188,22 +184,19 @@ func merge(res QueryRes, ruleUID string) (*data.Frame, error) { break } var entry lokiEntry - err := json.Unmarshal([]byte(minEl[1]), &entry) + err := json.Unmarshal([]byte(minEl.V), &entry) if err != nil { return nil, fmt.Errorf("failed to unmarshal entry: %w", err) } // Append the minimum element to the merged slice and move the pointer. - tsNano, err := strconv.ParseInt(minEl[0], 10, 64) - if err != nil { - return nil, fmt.Errorf("failed to parse timestamp in response: %w", err) - } + tsNano := minEl.T.UnixNano() // TODO: In general, perhaps we should omit the offending line and log, rather than failing the request entirely. streamLbls := res.Data.Result[minElStreamIdx].Stream lblsJson, err := json.Marshal(streamLbls) if err != nil { return nil, fmt.Errorf("failed to serialize stream labels: %w", err) } - line, err := jsonifyRow(minEl[1]) + line, err := jsonifyRow(minEl.V) if err != nil { return nil, fmt.Errorf("a line was in an invalid format: %w", err) } @@ -222,7 +215,7 @@ func merge(res QueryRes, ruleUID string) (*data.Frame, error) { } func statesToStreams(rule history_model.RuleMeta, states []state.StateTransition, externalLabels map[string]string, logger log.Logger) []stream { - buckets := make(map[string][]row) // label repr -> entries + buckets := make(map[string][]sample) // label repr -> entries for _, state := range states { if !shouldRecord(state) { continue @@ -255,9 +248,9 @@ func statesToStreams(rule history_model.RuleMeta, states []state.StateTransition } line := string(jsn) - buckets[repr] = append(buckets[repr], row{ - At: state.State.LastEvaluationTime, - Val: line, + buckets[repr] = append(buckets[repr], sample{ + T: state.State.LastEvaluationTime, + V: line, }) } diff --git a/pkg/services/ngalert/state/historian/loki_http.go b/pkg/services/ngalert/state/historian/loki_http.go index 459a08d64c1..d6d25071f55 100644 --- a/pkg/services/ngalert/state/historian/loki_http.go +++ b/pkg/services/ngalert/state/historian/loki_http.go @@ -8,6 +8,7 @@ import ( "io" "net/http" "net/url" + "strconv" "time" "github.com/grafana/grafana/pkg/infra/log" @@ -120,20 +121,37 @@ func (c *httpLokiClient) ping(ctx context.Context) error { type stream struct { Stream map[string]string `json:"stream"` - Values []row `json:"values"` + Values []sample `json:"values"` } -type row struct { - At time.Time - Val string +type sample struct { + T time.Time + V string } -func (r *row) MarshalJSON() ([]byte, error) { +func (r *sample) MarshalJSON() ([]byte, error) { return json.Marshal([2]string{ - fmt.Sprintf("%d", r.At.UnixNano()), r.Val, + fmt.Sprintf("%d", r.T.UnixNano()), r.V, }) } +func (r *sample) UnmarshalJSON(b []byte) error { + // A Loki stream sample is formatted like a list with two elements, [At, Val] + // At is a string wrapping a timestamp, in nanosecond unix epoch. + // Val is a string containing the log line. + var tuple [2]string + if err := json.Unmarshal(b, &tuple); err != nil { + return fmt.Errorf("failed to deserialize sample in Loki response: %w", err) + } + nano, err := strconv.ParseInt(tuple[0], 10, 64) + if err != nil { + return fmt.Errorf("timestamp in Loki sample not convertible to nanosecond epoch: %v", tuple[0]) + } + r.T = time.Unix(0, nano) + r.V = tuple[1] + return nil +} + func (c *httpLokiClient) push(ctx context.Context, s []stream) error { body := struct { Streams []stream `json:"streams"` @@ -186,13 +204,13 @@ func (c *httpLokiClient) setAuthAndTenantHeaders(req *http.Request) { req.Header.Add("X-Scope-OrgID", c.cfg.TenantID) } } -func (c *httpLokiClient) rangeQuery(ctx context.Context, selectors []Selector, start, end int64) (QueryRes, error) { +func (c *httpLokiClient) rangeQuery(ctx context.Context, selectors []Selector, start, end int64) (queryRes, error) { // Run the pre-flight checks for the query. if len(selectors) == 0 { - return QueryRes{}, fmt.Errorf("at least one selector required to query") + return queryRes{}, fmt.Errorf("at least one selector required to query") } if start > end { - return QueryRes{}, fmt.Errorf("start time cannot be after end time") + return queryRes{}, fmt.Errorf("start time cannot be after end time") } queryURL := c.cfg.ReadPathURL.JoinPath("/loki/api/v1/query_range") @@ -207,7 +225,7 @@ func (c *httpLokiClient) rangeQuery(ctx context.Context, selectors []Selector, s req, err := http.NewRequest(http.MethodGet, queryURL.String(), nil) if err != nil { - return QueryRes{}, fmt.Errorf("error creating request: %w", err) + return queryRes{}, fmt.Errorf("error creating request: %w", err) } req = req.WithContext(ctx) @@ -215,7 +233,7 @@ func (c *httpLokiClient) rangeQuery(ctx context.Context, selectors []Selector, s res, err := c.client.Do(req) if err != nil { - return QueryRes{}, fmt.Errorf("error executing request: %w", err) + return queryRes{}, fmt.Errorf("error executing request: %w", err) } defer func() { @@ -224,7 +242,7 @@ func (c *httpLokiClient) rangeQuery(ctx context.Context, selectors []Selector, s data, err := io.ReadAll(res.Body) if err != nil { - return QueryRes{}, fmt.Errorf("error reading request response: %w", err) + return queryRes{}, fmt.Errorf("error reading request response: %w", err) } if res.StatusCode < 200 || res.StatusCode >= 300 { @@ -233,17 +251,17 @@ func (c *httpLokiClient) rangeQuery(ctx context.Context, selectors []Selector, s } else { c.log.Error("Error response from Loki with an empty body", "status", res.StatusCode) } - return QueryRes{}, fmt.Errorf("received a non-200 response from loki, status: %d", res.StatusCode) + return queryRes{}, fmt.Errorf("received a non-200 response from loki, status: %d", res.StatusCode) } - queryRes := QueryRes{} - err = json.Unmarshal(data, &queryRes) + result := queryRes{} + err = json.Unmarshal(data, &result) if err != nil { fmt.Println(string(data)) - return QueryRes{}, fmt.Errorf("error parsing request response: %w", err) + return queryRes{}, fmt.Errorf("error parsing request response: %w", err) } - return queryRes, nil + return result, nil } func selectorString(selectors []Selector) string { @@ -275,16 +293,10 @@ func isValidOperator(op string) bool { return false } -type Stream struct { - Stream map[string]string `json:"stream"` - Values [][2]string `json:"values"` +type queryRes struct { + Data queryData `json:"data"` } -type QueryRes struct { - Status string `json:"status"` - Data QueryData `json:"data"` -} - -type QueryData struct { - Result []Stream `json:"result"` +type queryData struct { + Result []stream `json:"result"` } diff --git a/pkg/services/ngalert/state/historian/loki_http_test.go b/pkg/services/ngalert/state/historian/loki_http_test.go index 1c862abc633..9d7f4264d79 100644 --- a/pkg/services/ngalert/state/historian/loki_http_test.go +++ b/pkg/services/ngalert/state/historian/loki_http_test.go @@ -2,6 +2,7 @@ package historian import ( "context" + "encoding/json" "net/url" "testing" "time" @@ -155,3 +156,67 @@ func TestNewSelector(t *testing.T) { selector, err = NewSelector("label", "invalid", "value") require.Error(t, err) } + +func TestRow(t *testing.T) { + t.Run("marshal", func(t *testing.T) { + row := sample{ + T: time.Unix(0, 1234), + V: "some sample", + } + + jsn, err := json.Marshal(&row) + + require.NoError(t, err) + require.JSONEq(t, `["1234", "some sample"]`, string(jsn)) + }) + + t.Run("unmarshal", func(t *testing.T) { + jsn := []byte(`["1234", "some sample"]`) + + row := sample{} + err := json.Unmarshal(jsn, &row) + + require.NoError(t, err) + require.Equal(t, int64(1234), row.T.UnixNano()) + require.Equal(t, "some sample", row.V) + }) + + t.Run("unmarshal invalid", func(t *testing.T) { + jsn := []byte(`{"key": "wrong shape"}`) + + row := sample{} + err := json.Unmarshal(jsn, &row) + + require.ErrorContains(t, err, "failed to deserialize sample") + }) + + t.Run("unmarshal bad timestamp", func(t *testing.T) { + jsn := []byte(`["not-unix-nano", "some sample"]`) + + row := sample{} + err := json.Unmarshal(jsn, &row) + + require.ErrorContains(t, err, "timestamp in Loki sample") + }) +} + +func TestStream(t *testing.T) { + t.Run("marshal", func(t *testing.T) { + stream := stream{ + Stream: map[string]string{"a": "b"}, + Values: []sample{ + {T: time.Unix(0, 1), V: "one"}, + {T: time.Unix(0, 2), V: "two"}, + }, + } + + jsn, err := json.Marshal(stream) + + require.NoError(t, err) + require.JSONEq( + t, + `{"stream": {"a": "b"}, "values": [["1", "one"], ["2", "two"]]}`, + string(jsn), + ) + }) +} diff --git a/pkg/services/ngalert/state/historian/loki_test.go b/pkg/services/ngalert/state/historian/loki_test.go index 9142251e492..a35a9ecf53f 100644 --- a/pkg/services/ngalert/state/historian/loki_test.go +++ b/pkg/services/ngalert/state/historian/loki_test.go @@ -144,29 +144,29 @@ func TestRemoteLokiBackend(t *testing.T) { func TestMerge(t *testing.T) { testCases := []struct { name string - res QueryRes + res queryRes ruleID string expectedTime []time.Time }{ { name: "Should return values from multiple streams in right order", - res: QueryRes{ - Data: QueryData{ - Result: []Stream{ + res: queryRes{ + Data: queryData{ + Result: []stream{ { Stream: map[string]string{ "current": "pending", }, - Values: [][2]string{ - {"1", `{"schemaVersion": 1, "previous": "normal", "current": "pending", "values":{"a": "b"}}`}, + Values: []sample{ + {time.Unix(0, 1), `{"schemaVersion": 1, "previous": "normal", "current": "pending", "values":{"a": "b"}}`}, }, }, { Stream: map[string]string{ "current": "firing", }, - Values: [][2]string{ - {"2", `{"schemaVersion": 1, "previous": "pending", "current": "firing", "values":{"a": "b"}}`}, + Values: []sample{ + {time.Unix(0, 2), `{"schemaVersion": 1, "previous": "pending", "current": "firing", "values":{"a": "b"}}`}, }, }, }, @@ -180,14 +180,14 @@ func TestMerge(t *testing.T) { }, { name: "Should handle empty values", - res: QueryRes{ - Data: QueryData{ - Result: []Stream{ + res: queryRes{ + Data: queryData{ + Result: []stream{ { Stream: map[string]string{ "current": "normal", }, - Values: [][2]string{}, + Values: []sample{}, }, }, }, @@ -197,24 +197,24 @@ func TestMerge(t *testing.T) { }, { name: "Should handle multiple values in one stream", - res: QueryRes{ - Data: QueryData{ - Result: []Stream{ + res: queryRes{ + Data: queryData{ + Result: []stream{ { Stream: map[string]string{ "current": "normal", }, - Values: [][2]string{ - {"1", `{"schemaVersion": 1, "previous": "firing", "current": "normal", "values":{"a": "b"}}`}, - {"2", `{"schemaVersion": 1, "previous": "firing", "current": "normal", "values":{"a": "b"}}`}, + Values: []sample{ + {time.Unix(0, 1), `{"schemaVersion": 1, "previous": "firing", "current": "normal", "values":{"a": "b"}}`}, + {time.Unix(0, 2), `{"schemaVersion": 1, "previous": "firing", "current": "normal", "values":{"a": "b"}}`}, }, }, { Stream: map[string]string{ "current": "firing", }, - Values: [][2]string{ - {"3", `{"schemaVersion": 1, "previous": "pending", "current": "firing", "values":{"a": "b"}}`}, + Values: []sample{ + {time.Unix(0, 3), `{"schemaVersion": 1, "previous": "pending", "current": "firing", "values":{"a": "b"}}`}, }, }, }, @@ -276,11 +276,11 @@ func requireSingleEntry(t *testing.T, res []stream) lokiEntry { return requireEntry(t, res[0].Values[0]) } -func requireEntry(t *testing.T, row row) lokiEntry { +func requireEntry(t *testing.T, row sample) lokiEntry { t.Helper() var entry lokiEntry - err := json.Unmarshal([]byte(row.Val), &entry) + err := json.Unmarshal([]byte(row.V), &entry) require.NoError(t, err) return entry }