Alerting: Unify structs in Loki client and make them more consistent with Prometheus (#63055)

* Use existing row struct instead of [2]string, add deserialization helper

* Replace Stream struct with stream struct which is exactly the same

* Drop unused status field

* Don't export queryRes and queryData

* Tests for custom marshalling

* Rename row fields to T and V for consistency with prometheus samples

* Rename row to sample
This commit is contained in:
Alexander Weaver 2023-02-11 05:17:44 -06:00 committed by GitHub
parent 229f8b6e6d
commit 958fb2c50a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 137 additions and 67 deletions

View File

@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"math"
"strconv"
"time"
"github.com/grafana/grafana-plugin-sdk-go/data"
@ -39,7 +38,7 @@ const defaultQueryRange = 6 * time.Hour
type remoteLokiClient interface {
ping(context.Context) error
push(context.Context, []stream) error
rangeQuery(ctx context.Context, selectors []Selector, start, end int64) (QueryRes, error)
rangeQuery(ctx context.Context, selectors []Selector, start, end int64) (queryRes, error)
}
type RemoteLokiBackend struct {
@ -139,7 +138,7 @@ func buildSelectors(query models.HistoryQuery) ([]Selector, error) {
}
// merge will put all the results in one array sorted by timestamp.
func merge(res QueryRes, ruleUID string) (*data.Frame, error) {
func merge(res queryRes, ruleUID string) (*data.Frame, error) {
// Find the total number of elements in all arrays.
totalLen := 0
for _, arr := range res.Data.Result {
@ -165,7 +164,7 @@ func merge(res QueryRes, ruleUID string) (*data.Frame, error) {
pointers := make([]int, len(res.Data.Result))
for {
minTime := int64(math.MaxInt64)
minEl := [2]string{}
minEl := sample{}
minElStreamIdx := -1
// Find the element with the earliest time among all arrays.
for i, stream := range res.Data.Result {
@ -173,10 +172,7 @@ func merge(res QueryRes, ruleUID string) (*data.Frame, error) {
if len(stream.Values) == pointers[i] {
continue
}
curTime, err := strconv.ParseInt(stream.Values[pointers[i]][0], 10, 64)
if err != nil {
return nil, fmt.Errorf("failed to parse timestamp from loki response: %w", err)
}
curTime := stream.Values[pointers[i]].T.UnixNano()
if pointers[i] < len(stream.Values) && curTime < minTime {
minTime = curTime
minEl = stream.Values[pointers[i]]
@ -188,22 +184,19 @@ func merge(res QueryRes, ruleUID string) (*data.Frame, error) {
break
}
var entry lokiEntry
err := json.Unmarshal([]byte(minEl[1]), &entry)
err := json.Unmarshal([]byte(minEl.V), &entry)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal entry: %w", err)
}
// Append the minimum element to the merged slice and move the pointer.
tsNano, err := strconv.ParseInt(minEl[0], 10, 64)
if err != nil {
return nil, fmt.Errorf("failed to parse timestamp in response: %w", err)
}
tsNano := minEl.T.UnixNano()
// TODO: In general, perhaps we should omit the offending line and log, rather than failing the request entirely.
streamLbls := res.Data.Result[minElStreamIdx].Stream
lblsJson, err := json.Marshal(streamLbls)
if err != nil {
return nil, fmt.Errorf("failed to serialize stream labels: %w", err)
}
line, err := jsonifyRow(minEl[1])
line, err := jsonifyRow(minEl.V)
if err != nil {
return nil, fmt.Errorf("a line was in an invalid format: %w", err)
}
@ -222,7 +215,7 @@ func merge(res QueryRes, ruleUID string) (*data.Frame, error) {
}
func statesToStreams(rule history_model.RuleMeta, states []state.StateTransition, externalLabels map[string]string, logger log.Logger) []stream {
buckets := make(map[string][]row) // label repr -> entries
buckets := make(map[string][]sample) // label repr -> entries
for _, state := range states {
if !shouldRecord(state) {
continue
@ -255,9 +248,9 @@ func statesToStreams(rule history_model.RuleMeta, states []state.StateTransition
}
line := string(jsn)
buckets[repr] = append(buckets[repr], row{
At: state.State.LastEvaluationTime,
Val: line,
buckets[repr] = append(buckets[repr], sample{
T: state.State.LastEvaluationTime,
V: line,
})
}

View File

@ -8,6 +8,7 @@ import (
"io"
"net/http"
"net/url"
"strconv"
"time"
"github.com/grafana/grafana/pkg/infra/log"
@ -120,20 +121,37 @@ func (c *httpLokiClient) ping(ctx context.Context) error {
type stream struct {
Stream map[string]string `json:"stream"`
Values []row `json:"values"`
Values []sample `json:"values"`
}
type row struct {
At time.Time
Val string
type sample struct {
T time.Time
V string
}
func (r *row) MarshalJSON() ([]byte, error) {
func (r *sample) MarshalJSON() ([]byte, error) {
return json.Marshal([2]string{
fmt.Sprintf("%d", r.At.UnixNano()), r.Val,
fmt.Sprintf("%d", r.T.UnixNano()), r.V,
})
}
func (r *sample) UnmarshalJSON(b []byte) error {
// A Loki stream sample is formatted like a list with two elements, [At, Val]
// At is a string wrapping a timestamp, in nanosecond unix epoch.
// Val is a string containing the log line.
var tuple [2]string
if err := json.Unmarshal(b, &tuple); err != nil {
return fmt.Errorf("failed to deserialize sample in Loki response: %w", err)
}
nano, err := strconv.ParseInt(tuple[0], 10, 64)
if err != nil {
return fmt.Errorf("timestamp in Loki sample not convertible to nanosecond epoch: %v", tuple[0])
}
r.T = time.Unix(0, nano)
r.V = tuple[1]
return nil
}
func (c *httpLokiClient) push(ctx context.Context, s []stream) error {
body := struct {
Streams []stream `json:"streams"`
@ -186,13 +204,13 @@ func (c *httpLokiClient) setAuthAndTenantHeaders(req *http.Request) {
req.Header.Add("X-Scope-OrgID", c.cfg.TenantID)
}
}
func (c *httpLokiClient) rangeQuery(ctx context.Context, selectors []Selector, start, end int64) (QueryRes, error) {
func (c *httpLokiClient) rangeQuery(ctx context.Context, selectors []Selector, start, end int64) (queryRes, error) {
// Run the pre-flight checks for the query.
if len(selectors) == 0 {
return QueryRes{}, fmt.Errorf("at least one selector required to query")
return queryRes{}, fmt.Errorf("at least one selector required to query")
}
if start > end {
return QueryRes{}, fmt.Errorf("start time cannot be after end time")
return queryRes{}, fmt.Errorf("start time cannot be after end time")
}
queryURL := c.cfg.ReadPathURL.JoinPath("/loki/api/v1/query_range")
@ -207,7 +225,7 @@ func (c *httpLokiClient) rangeQuery(ctx context.Context, selectors []Selector, s
req, err := http.NewRequest(http.MethodGet,
queryURL.String(), nil)
if err != nil {
return QueryRes{}, fmt.Errorf("error creating request: %w", err)
return queryRes{}, fmt.Errorf("error creating request: %w", err)
}
req = req.WithContext(ctx)
@ -215,7 +233,7 @@ func (c *httpLokiClient) rangeQuery(ctx context.Context, selectors []Selector, s
res, err := c.client.Do(req)
if err != nil {
return QueryRes{}, fmt.Errorf("error executing request: %w", err)
return queryRes{}, fmt.Errorf("error executing request: %w", err)
}
defer func() {
@ -224,7 +242,7 @@ func (c *httpLokiClient) rangeQuery(ctx context.Context, selectors []Selector, s
data, err := io.ReadAll(res.Body)
if err != nil {
return QueryRes{}, fmt.Errorf("error reading request response: %w", err)
return queryRes{}, fmt.Errorf("error reading request response: %w", err)
}
if res.StatusCode < 200 || res.StatusCode >= 300 {
@ -233,17 +251,17 @@ func (c *httpLokiClient) rangeQuery(ctx context.Context, selectors []Selector, s
} else {
c.log.Error("Error response from Loki with an empty body", "status", res.StatusCode)
}
return QueryRes{}, fmt.Errorf("received a non-200 response from loki, status: %d", res.StatusCode)
return queryRes{}, fmt.Errorf("received a non-200 response from loki, status: %d", res.StatusCode)
}
queryRes := QueryRes{}
err = json.Unmarshal(data, &queryRes)
result := queryRes{}
err = json.Unmarshal(data, &result)
if err != nil {
fmt.Println(string(data))
return QueryRes{}, fmt.Errorf("error parsing request response: %w", err)
return queryRes{}, fmt.Errorf("error parsing request response: %w", err)
}
return queryRes, nil
return result, nil
}
func selectorString(selectors []Selector) string {
@ -275,16 +293,10 @@ func isValidOperator(op string) bool {
return false
}
type Stream struct {
Stream map[string]string `json:"stream"`
Values [][2]string `json:"values"`
type queryRes struct {
Data queryData `json:"data"`
}
type QueryRes struct {
Status string `json:"status"`
Data QueryData `json:"data"`
}
type QueryData struct {
Result []Stream `json:"result"`
type queryData struct {
Result []stream `json:"result"`
}

View File

@ -2,6 +2,7 @@ package historian
import (
"context"
"encoding/json"
"net/url"
"testing"
"time"
@ -155,3 +156,67 @@ func TestNewSelector(t *testing.T) {
selector, err = NewSelector("label", "invalid", "value")
require.Error(t, err)
}
func TestRow(t *testing.T) {
t.Run("marshal", func(t *testing.T) {
row := sample{
T: time.Unix(0, 1234),
V: "some sample",
}
jsn, err := json.Marshal(&row)
require.NoError(t, err)
require.JSONEq(t, `["1234", "some sample"]`, string(jsn))
})
t.Run("unmarshal", func(t *testing.T) {
jsn := []byte(`["1234", "some sample"]`)
row := sample{}
err := json.Unmarshal(jsn, &row)
require.NoError(t, err)
require.Equal(t, int64(1234), row.T.UnixNano())
require.Equal(t, "some sample", row.V)
})
t.Run("unmarshal invalid", func(t *testing.T) {
jsn := []byte(`{"key": "wrong shape"}`)
row := sample{}
err := json.Unmarshal(jsn, &row)
require.ErrorContains(t, err, "failed to deserialize sample")
})
t.Run("unmarshal bad timestamp", func(t *testing.T) {
jsn := []byte(`["not-unix-nano", "some sample"]`)
row := sample{}
err := json.Unmarshal(jsn, &row)
require.ErrorContains(t, err, "timestamp in Loki sample")
})
}
func TestStream(t *testing.T) {
t.Run("marshal", func(t *testing.T) {
stream := stream{
Stream: map[string]string{"a": "b"},
Values: []sample{
{T: time.Unix(0, 1), V: "one"},
{T: time.Unix(0, 2), V: "two"},
},
}
jsn, err := json.Marshal(stream)
require.NoError(t, err)
require.JSONEq(
t,
`{"stream": {"a": "b"}, "values": [["1", "one"], ["2", "two"]]}`,
string(jsn),
)
})
}

View File

@ -144,29 +144,29 @@ func TestRemoteLokiBackend(t *testing.T) {
func TestMerge(t *testing.T) {
testCases := []struct {
name string
res QueryRes
res queryRes
ruleID string
expectedTime []time.Time
}{
{
name: "Should return values from multiple streams in right order",
res: QueryRes{
Data: QueryData{
Result: []Stream{
res: queryRes{
Data: queryData{
Result: []stream{
{
Stream: map[string]string{
"current": "pending",
},
Values: [][2]string{
{"1", `{"schemaVersion": 1, "previous": "normal", "current": "pending", "values":{"a": "b"}}`},
Values: []sample{
{time.Unix(0, 1), `{"schemaVersion": 1, "previous": "normal", "current": "pending", "values":{"a": "b"}}`},
},
},
{
Stream: map[string]string{
"current": "firing",
},
Values: [][2]string{
{"2", `{"schemaVersion": 1, "previous": "pending", "current": "firing", "values":{"a": "b"}}`},
Values: []sample{
{time.Unix(0, 2), `{"schemaVersion": 1, "previous": "pending", "current": "firing", "values":{"a": "b"}}`},
},
},
},
@ -180,14 +180,14 @@ func TestMerge(t *testing.T) {
},
{
name: "Should handle empty values",
res: QueryRes{
Data: QueryData{
Result: []Stream{
res: queryRes{
Data: queryData{
Result: []stream{
{
Stream: map[string]string{
"current": "normal",
},
Values: [][2]string{},
Values: []sample{},
},
},
},
@ -197,24 +197,24 @@ func TestMerge(t *testing.T) {
},
{
name: "Should handle multiple values in one stream",
res: QueryRes{
Data: QueryData{
Result: []Stream{
res: queryRes{
Data: queryData{
Result: []stream{
{
Stream: map[string]string{
"current": "normal",
},
Values: [][2]string{
{"1", `{"schemaVersion": 1, "previous": "firing", "current": "normal", "values":{"a": "b"}}`},
{"2", `{"schemaVersion": 1, "previous": "firing", "current": "normal", "values":{"a": "b"}}`},
Values: []sample{
{time.Unix(0, 1), `{"schemaVersion": 1, "previous": "firing", "current": "normal", "values":{"a": "b"}}`},
{time.Unix(0, 2), `{"schemaVersion": 1, "previous": "firing", "current": "normal", "values":{"a": "b"}}`},
},
},
{
Stream: map[string]string{
"current": "firing",
},
Values: [][2]string{
{"3", `{"schemaVersion": 1, "previous": "pending", "current": "firing", "values":{"a": "b"}}`},
Values: []sample{
{time.Unix(0, 3), `{"schemaVersion": 1, "previous": "pending", "current": "firing", "values":{"a": "b"}}`},
},
},
},
@ -276,11 +276,11 @@ func requireSingleEntry(t *testing.T, res []stream) lokiEntry {
return requireEntry(t, res[0].Values[0])
}
func requireEntry(t *testing.T, row row) lokiEntry {
func requireEntry(t *testing.T, row sample) lokiEntry {
t.Helper()
var entry lokiEntry
err := json.Unmarshal([]byte(row.Val), &entry)
err := json.Unmarshal([]byte(row.V), &entry)
require.NoError(t, err)
return entry
}