mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Alerting: update Loki backend of state history to batch requests by folder (#89865)
* refactor `selectorString` and remove Selector struct * move code from selector string to BuildLogQuery * batch requests by folder UID * update historian annotation store to handle multiple queries * sort folder uids to make consistent queries * add logs to loki http * log batch size but not content. content is logged by the client
This commit is contained in:
parent
05ab4cdd1f
commit
2023821100
@ -99,7 +99,8 @@ func (r *LokiHistorianStore) Get(ctx context.Context, query *annotations.ItemQue
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
logQL, _, err := historian.BuildLogQuery(buildHistoryQuery(query, accessResources.Dashboards, rule.UID), nil, r.client.MaxQuerySize())
|
// No folders in the filter because it filter by Dashboard UID, and the request is already authorized.
|
||||||
|
logQL, err := historian.BuildLogQuery(buildHistoryQuery(query, accessResources.Dashboards, rule.UID), nil, r.client.MaxQuerySize())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
grafanaErr := errutil.Error{}
|
grafanaErr := errutil.Error{}
|
||||||
if errors.As(err, &grafanaErr) {
|
if errors.As(err, &grafanaErr) {
|
||||||
@ -107,6 +108,9 @@ func (r *LokiHistorianStore) Get(ctx context.Context, query *annotations.ItemQue
|
|||||||
}
|
}
|
||||||
return make([]*annotations.ItemDTO, 0), ErrLokiStoreInternal.Errorf("failed to build loki query: %w", err)
|
return make([]*annotations.ItemDTO, 0), ErrLokiStoreInternal.Errorf("failed to build loki query: %w", err)
|
||||||
}
|
}
|
||||||
|
if len(logQL) > 1 {
|
||||||
|
r.log.FromContext(ctx).Info("Execute query in multiple batches", "batches", logQL, "maxQueryLimit", r.client.MaxQuerySize())
|
||||||
|
}
|
||||||
|
|
||||||
now := time.Now().UTC()
|
now := time.Now().UTC()
|
||||||
if query.To == 0 {
|
if query.To == 0 {
|
||||||
@ -119,18 +123,17 @@ func (r *LokiHistorianStore) Get(ctx context.Context, query *annotations.ItemQue
|
|||||||
// query.From and query.To are always in milliseconds, convert them to nanoseconds for loki
|
// query.From and query.To are always in milliseconds, convert them to nanoseconds for loki
|
||||||
from := query.From * 1e6
|
from := query.From * 1e6
|
||||||
to := query.To * 1e6
|
to := query.To * 1e6
|
||||||
|
|
||||||
res, err := r.client.RangeQuery(ctx, logQL, from, to, query.Limit)
|
|
||||||
if err != nil {
|
|
||||||
return make([]*annotations.ItemDTO, 0), ErrLokiStoreInternal.Errorf("failed to query loki: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
items := make([]*annotations.ItemDTO, 0)
|
items := make([]*annotations.ItemDTO, 0)
|
||||||
for _, stream := range res.Data.Result {
|
for _, q := range logQL {
|
||||||
items = append(items, r.annotationsFromStream(stream, *accessResources)...)
|
res, err := r.client.RangeQuery(ctx, q, from, to, query.Limit)
|
||||||
|
if err != nil {
|
||||||
|
return make([]*annotations.ItemDTO, 0), ErrLokiStoreInternal.Errorf("failed to query loki: %w", err)
|
||||||
|
}
|
||||||
|
for _, stream := range res.Data.Result {
|
||||||
|
items = append(items, r.annotationsFromStream(stream, *accessResources)...)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
sort.Sort(annotations.SortedItems(items))
|
sort.Sort(annotations.SortedItems(items))
|
||||||
|
|
||||||
return items, err
|
return items, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -7,6 +7,7 @@ import (
|
|||||||
"math"
|
"math"
|
||||||
"regexp"
|
"regexp"
|
||||||
"sort"
|
"sort"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -47,7 +48,7 @@ const defaultQueryRange = 6 * time.Hour
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
ErrLokiQueryTooLong = errutil.BadRequest("loki.requestTooLong").MustTemplate(
|
ErrLokiQueryTooLong = errutil.BadRequest("loki.requestTooLong").MustTemplate(
|
||||||
"Request to Loki exceeded ({{.Public.QuerySize}} bytes) configured maximum size of {{.Public.MaxLimit}} bytes",
|
"Request to Loki exceeded ({{.Public.QuerySize}} bytes) configured maximum size of {{.Public.MaxLimit}} bytes. Query: {{.Private.Query}}",
|
||||||
errutil.WithPublic("Query for Loki exceeded the configured limit of {{.Public.MaxLimit}} bytes. Remove some filters and try again."),
|
errutil.WithPublic("Query for Loki exceeded the configured limit of {{.Public.MaxLimit}} bytes. Remove some filters and try again."),
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
@ -55,7 +56,7 @@ var (
|
|||||||
func NewErrLokiQueryTooLong(query string, maxLimit int) error {
|
func NewErrLokiQueryTooLong(query string, maxLimit int) error {
|
||||||
return ErrLokiQueryTooLong.Build(errutil.TemplateData{
|
return ErrLokiQueryTooLong.Build(errutil.TemplateData{
|
||||||
Private: map[string]any{
|
Private: map[string]any{
|
||||||
"query": query,
|
"Query": query,
|
||||||
},
|
},
|
||||||
Public: map[string]any{
|
Public: map[string]any{
|
||||||
"MaxLimit": maxLimit,
|
"MaxLimit": maxLimit,
|
||||||
@ -145,12 +146,12 @@ func (h *RemoteLokiBackend) Query(ctx context.Context, query models.HistoryQuery
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
logQL, filterByFolderSkipped, err := BuildLogQuery(query, uids, h.client.MaxQuerySize())
|
queries, err := BuildLogQuery(query, uids, h.client.MaxQuerySize())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if filterByFolderSkipped {
|
if len(queries) > 1 {
|
||||||
h.log.FromContext(ctx).Warn("Filter by folder skipped because it's too long. Use in-memory filtering", "folders", len(uids))
|
h.log.FromContext(ctx).Info("Execute query in multiple batches", "batchSize", len(queries), "folders", len(uids), "maxQueryLimit", h.client.MaxQuerySize())
|
||||||
}
|
}
|
||||||
|
|
||||||
now := time.Now().UTC()
|
now := time.Now().UTC()
|
||||||
@ -160,38 +161,22 @@ func (h *RemoteLokiBackend) Query(ctx context.Context, query models.HistoryQuery
|
|||||||
if query.From.IsZero() {
|
if query.From.IsZero() {
|
||||||
query.From = now.Add(-defaultQueryRange)
|
query.From = now.Add(-defaultQueryRange)
|
||||||
}
|
}
|
||||||
|
var res []Stream
|
||||||
// Timestamps are expected in RFC3339Nano.
|
for _, logQL := range queries {
|
||||||
res, err := h.client.RangeQuery(ctx, logQL, query.From.UnixNano(), query.To.UnixNano(), int64(query.Limit))
|
// Timestamps are expected in RFC3339Nano.
|
||||||
if err != nil {
|
// Apply user-defined limit to every request. Multiple batches is a very rare case, and therefore we can tolerate getting more data than needed.
|
||||||
return nil, err
|
// The limit can be applied after all results are merged
|
||||||
|
r, err := h.client.RangeQuery(ctx, logQL, query.From.UnixNano(), query.To.UnixNano(), int64(query.Limit))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
res = append(res, r.Data.Result...)
|
||||||
}
|
}
|
||||||
return merge(res, uids)
|
return merge(res, uids)
|
||||||
}
|
}
|
||||||
|
|
||||||
func buildSelectors(query models.HistoryQuery) ([]Selector, error) {
|
|
||||||
// OrgID and the state history label are static and will be included in all queries.
|
|
||||||
selectors := make([]Selector, 2)
|
|
||||||
|
|
||||||
// Set the predefined selector orgID.
|
|
||||||
selector, err := NewSelector(OrgIDLabel, "=", fmt.Sprintf("%d", query.OrgID))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
selectors[0] = selector
|
|
||||||
|
|
||||||
// Set the predefined selector for the state history label.
|
|
||||||
selector, err = NewSelector(StateHistoryLabelKey, "=", StateHistoryLabelValue)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
selectors[1] = selector
|
|
||||||
|
|
||||||
return selectors, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// merge will put all the results in one array sorted by timestamp.
|
// merge will put all the results in one array sorted by timestamp.
|
||||||
func merge(res QueryRes, folderUIDToFilter []string) (*data.Frame, error) {
|
func merge(res []Stream, folderUIDToFilter []string) (*data.Frame, error) {
|
||||||
filterByFolderUIDMap := make(map[string]struct{}, len(folderUIDToFilter))
|
filterByFolderUIDMap := make(map[string]struct{}, len(folderUIDToFilter))
|
||||||
for _, uid := range folderUIDToFilter {
|
for _, uid := range folderUIDToFilter {
|
||||||
filterByFolderUIDMap[uid] = struct{}{}
|
filterByFolderUIDMap[uid] = struct{}{}
|
||||||
@ -199,7 +184,7 @@ func merge(res QueryRes, folderUIDToFilter []string) (*data.Frame, error) {
|
|||||||
|
|
||||||
// Find the total number of elements in all arrays.
|
// Find the total number of elements in all arrays.
|
||||||
totalLen := 0
|
totalLen := 0
|
||||||
for _, arr := range res.Data.Result {
|
for _, arr := range res {
|
||||||
totalLen += len(arr.Values)
|
totalLen += len(arr.Values)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -219,13 +204,13 @@ func merge(res QueryRes, folderUIDToFilter []string) (*data.Frame, error) {
|
|||||||
labels := make([]json.RawMessage, 0, totalLen)
|
labels := make([]json.RawMessage, 0, totalLen)
|
||||||
|
|
||||||
// Initialize a slice of pointers to the current position in each array.
|
// Initialize a slice of pointers to the current position in each array.
|
||||||
pointers := make([]int, len(res.Data.Result))
|
pointers := make([]int, len(res))
|
||||||
for {
|
for {
|
||||||
minTime := int64(math.MaxInt64)
|
minTime := int64(math.MaxInt64)
|
||||||
minEl := Sample{}
|
minEl := Sample{}
|
||||||
minElStreamIdx := -1
|
minElStreamIdx := -1
|
||||||
// Find the element with the earliest time among all arrays.
|
// Find the element with the earliest time among all arrays.
|
||||||
for i, stream := range res.Data.Result {
|
for i, stream := range res {
|
||||||
// Skip if we already reached the end of the current array.
|
// Skip if we already reached the end of the current array.
|
||||||
if len(stream.Values) == pointers[i] {
|
if len(stream.Values) == pointers[i] {
|
||||||
continue
|
continue
|
||||||
@ -261,7 +246,7 @@ func merge(res QueryRes, folderUIDToFilter []string) (*data.Frame, error) {
|
|||||||
// Append the minimum element to the merged slice and move the pointer.
|
// Append the minimum element to the merged slice and move the pointer.
|
||||||
tsNano := minEl.T.UnixNano()
|
tsNano := minEl.T.UnixNano()
|
||||||
// TODO: In general, perhaps we should omit the offending line and log, rather than failing the request entirely.
|
// TODO: In general, perhaps we should omit the offending line and log, rather than failing the request entirely.
|
||||||
streamLbls := res.Data.Result[minElStreamIdx].Stream
|
streamLbls := res[minElStreamIdx].Stream
|
||||||
lblsJson, err := json.Marshal(streamLbls)
|
lblsJson, err := json.Marshal(streamLbls)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to serialize stream labels: %w", err)
|
return nil, fmt.Errorf("failed to serialize stream labels: %w", err)
|
||||||
@ -382,115 +367,122 @@ func jsonifyRow(line string) (json.RawMessage, error) {
|
|||||||
return json.Marshal(entry)
|
return json.Marshal(entry)
|
||||||
}
|
}
|
||||||
|
|
||||||
type Selector struct {
|
// BuildLogQuery converts models.HistoryQuery and a list of folder UIDs to Loki queries.
|
||||||
// Label to Select
|
// It can return multiple queries if the list of folder UIDs is too big to fit into single query.
|
||||||
Label string
|
// If there is a folder UID long enough to exceed a query size it returns ErrQueryTooLong.
|
||||||
Op Operator
|
func BuildLogQuery(query models.HistoryQuery, folderUIDs []string, maxQuerySize int) ([]string, error) {
|
||||||
// Value that is expected
|
// first build tail of the query (if exists) to know what remaining capacity we have for folders
|
||||||
Value string
|
tail, err := buildQueryTail(query)
|
||||||
}
|
|
||||||
|
|
||||||
func NewSelector(label, op, value string) (Selector, error) {
|
|
||||||
if !isValidOperator(op) {
|
|
||||||
return Selector{}, fmt.Errorf("'%s' is not a valid query operator", op)
|
|
||||||
}
|
|
||||||
return Selector{Label: label, Op: Operator(op), Value: value}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func selectorString(selectors []Selector, folderUIDs []string) string {
|
|
||||||
if len(selectors) == 0 {
|
|
||||||
return "{}"
|
|
||||||
}
|
|
||||||
// Build the query selector.
|
|
||||||
query := ""
|
|
||||||
for _, s := range selectors {
|
|
||||||
query += fmt.Sprintf("%s%s%q,", s.Label, s.Op, s.Value)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(folderUIDs) > 0 {
|
|
||||||
b := strings.Builder{}
|
|
||||||
b.Grow(len(folderUIDs)*40 + len(FolderUIDLabel)) // rough estimate of the length
|
|
||||||
b.WriteString(FolderUIDLabel)
|
|
||||||
b.WriteString("=~`")
|
|
||||||
b.WriteString(regexp.QuoteMeta(folderUIDs[0]))
|
|
||||||
for _, uid := range folderUIDs[1:] {
|
|
||||||
b.WriteString("|")
|
|
||||||
b.WriteString(regexp.QuoteMeta(uid))
|
|
||||||
}
|
|
||||||
b.WriteString("`")
|
|
||||||
query += b.String()
|
|
||||||
} else {
|
|
||||||
// Remove the last comma, as we append one to every selector.
|
|
||||||
query = query[:len(query)-1]
|
|
||||||
}
|
|
||||||
return "{" + query + "}"
|
|
||||||
}
|
|
||||||
|
|
||||||
func isValidOperator(op string) bool {
|
|
||||||
switch op {
|
|
||||||
case "=", "!=", "=~", "!~":
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// BuildLogQuery converts models.HistoryQuery and a list of folder UIDs to a Loki query.
|
|
||||||
// If query size exceeds the `maxQuerySize` then it re-builds query ignoring the folderUIDs. If it's still bigger - returns ErrQueryTooLong.
|
|
||||||
// Returns a tuple:
|
|
||||||
// - loki query
|
|
||||||
// - true if filter by folder UID was not added to the query ignored
|
|
||||||
// - error if log query cannot be constructed, and ErrQueryTooLong if user-defined query exceeds maximum allowed size
|
|
||||||
func BuildLogQuery(query models.HistoryQuery, folderUIDs []string, maxQuerySize int) (string, bool, error) {
|
|
||||||
selectors, err := buildSelectors(query)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", false, fmt.Errorf("failed to build the provided selectors: %w", err)
|
return nil, err
|
||||||
|
}
|
||||||
|
// build the base selectors. skip the closing bracket because we will append folders below. Closing bracket will be added at the end
|
||||||
|
head := fmt.Sprintf(`{%s="%d",%s=%q`, OrgIDLabel, query.OrgID, StateHistoryLabelKey, StateHistoryLabelValue)
|
||||||
|
|
||||||
|
// check if system-defined + user-defined query parameters do not exceed maximum size
|
||||||
|
baseQuerySize := len(head) + 1 + len(tail) // 1 stands for closing bracket
|
||||||
|
if len(head)+1+len(tail) > maxQuerySize {
|
||||||
|
return nil, NewErrLokiQueryTooLong(head+"}"+tail, maxQuerySize)
|
||||||
}
|
}
|
||||||
|
|
||||||
logQL := selectorString(selectors, folderUIDs)
|
// nothing to append to the head. Construct the query and return what we have
|
||||||
|
if len(folderUIDs) == 0 {
|
||||||
if queryHasLogFilters(query) {
|
return []string{head + "}" + tail}, nil
|
||||||
logQL = fmt.Sprintf("%s | json", logQL)
|
|
||||||
}
|
}
|
||||||
|
remainingFolders := folderUIDs
|
||||||
|
var result []string
|
||||||
|
for len(remainingFolders) > 0 { // iterating until all folders are processed and at least one result
|
||||||
|
// pre-calculate the size of the string to avoid excessive re-allocations
|
||||||
|
requiredLengthEstimate := baseQuerySize + 1 + len(FolderUIDLabel) + 3 // 1 - comma, 3 - operator + opening backtick
|
||||||
|
for _, folder := range remainingFolders { // this does not account to escaping characters because it's not expected generally
|
||||||
|
requiredLengthEstimate += len(folder) + 1 // 1 - accounts to pipe-separator and backtick at the end
|
||||||
|
if requiredLengthEstimate > maxQuerySize {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s := strings.Builder{}
|
||||||
|
s.Grow(requiredLengthEstimate)
|
||||||
|
s.WriteString(head)
|
||||||
|
s.WriteString(",")
|
||||||
|
s.WriteString(FolderUIDLabel)
|
||||||
|
s.WriteString("=~`")
|
||||||
|
added := false
|
||||||
|
for len(remainingFolders) > 0 {
|
||||||
|
uid := regexp.QuoteMeta(remainingFolders[0])
|
||||||
|
requiredLength := s.Len() + len(tail) + len(uid) + 2 // 2 - backtick + closing bracket
|
||||||
|
if added {
|
||||||
|
requiredLength++ // account to pipe symbol
|
||||||
|
}
|
||||||
|
if requiredLength > maxQuerySize {
|
||||||
|
if !added {
|
||||||
|
// finish the query for logging
|
||||||
|
s.WriteString(uid)
|
||||||
|
s.WriteString("`}")
|
||||||
|
s.WriteString(tail)
|
||||||
|
return nil, NewErrLokiQueryTooLong(s.String(), maxQuerySize)
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if added {
|
||||||
|
s.WriteString("|")
|
||||||
|
}
|
||||||
|
s.WriteString(uid)
|
||||||
|
added = true
|
||||||
|
remainingFolders = remainingFolders[1:]
|
||||||
|
}
|
||||||
|
s.WriteString("`")
|
||||||
|
s.WriteString("}")
|
||||||
|
s.WriteString(tail)
|
||||||
|
result = append(result, s.String())
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildQueryTail(query models.HistoryQuery) (string, error) {
|
||||||
|
if !queryHasLogFilters(query) {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
b := strings.Builder{}
|
||||||
|
b.WriteString(" | json")
|
||||||
|
|
||||||
if query.RuleUID != "" {
|
if query.RuleUID != "" {
|
||||||
logQL = fmt.Sprintf("%s | ruleUID=%q", logQL, query.RuleUID)
|
b.WriteString(" | ruleUID=")
|
||||||
|
_, err := fmt.Fprintf(&b, "%q", query.RuleUID)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if query.DashboardUID != "" {
|
if query.DashboardUID != "" {
|
||||||
logQL = fmt.Sprintf("%s | dashboardUID=%q", logQL, query.DashboardUID)
|
b.WriteString(" | dashboardUID=")
|
||||||
|
_, err := fmt.Fprintf(&b, "%q", query.DashboardUID)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if query.PanelID != 0 {
|
if query.PanelID != 0 {
|
||||||
logQL = fmt.Sprintf("%s | panelID=%d", logQL, query.PanelID)
|
b.WriteString(" | panelID=")
|
||||||
|
b.WriteString(strconv.FormatInt(query.PanelID, 10))
|
||||||
}
|
}
|
||||||
|
|
||||||
labelFilters := ""
|
requiredSize := 0
|
||||||
labelKeys := make([]string, 0, len(query.Labels))
|
labelKeys := make([]string, 0, len(query.Labels))
|
||||||
for k := range query.Labels {
|
for k, v := range query.Labels {
|
||||||
|
requiredSize += len(k) + len(v) + 13 // 13 all literals below
|
||||||
labelKeys = append(labelKeys, k)
|
labelKeys = append(labelKeys, k)
|
||||||
}
|
}
|
||||||
// Ensure that all queries we build are deterministic.
|
// Ensure that all queries we build are deterministic.
|
||||||
sort.Strings(labelKeys)
|
sort.Strings(labelKeys)
|
||||||
|
b.Grow(requiredSize)
|
||||||
for _, k := range labelKeys {
|
for _, k := range labelKeys {
|
||||||
labelFilters += fmt.Sprintf(" | labels_%s=%q", k, query.Labels[k])
|
b.WriteString(" | labels_")
|
||||||
}
|
b.WriteString(k)
|
||||||
logQL += labelFilters
|
b.WriteString("=")
|
||||||
|
_, err := fmt.Fprintf(&b, "%q", query.Labels[k])
|
||||||
if len(logQL) > maxQuerySize {
|
if err != nil {
|
||||||
// if request is too long, try to drop filter by folder UIDs.
|
return "", err
|
||||||
if len(folderUIDs) > 0 {
|
|
||||||
logQL, tooLong, err := BuildLogQuery(query, nil, maxQuerySize)
|
|
||||||
if err != nil {
|
|
||||||
return "", false, err
|
|
||||||
}
|
|
||||||
if tooLong {
|
|
||||||
return "", false, NewErrLokiQueryTooLong(logQL, maxQuerySize)
|
|
||||||
}
|
|
||||||
return logQL, true, nil
|
|
||||||
}
|
}
|
||||||
// if the query is too long even without filter by folders, then fail
|
|
||||||
return "", false, NewErrLokiQueryTooLong(logQL, maxQuerySize)
|
|
||||||
}
|
}
|
||||||
|
return b.String(), nil
|
||||||
return logQL, false, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func queryHasLogFilters(query models.HistoryQuery) bool {
|
func queryHasLogFilters(query models.HistoryQuery) bool {
|
||||||
@ -542,5 +534,6 @@ func (h *RemoteLokiBackend) getFolderUIDsForFilter(ctx context.Context, query mo
|
|||||||
if len(uids) == 0 {
|
if len(uids) == 0 {
|
||||||
return nil, accesscontrol.NewAuthorizationErrorGeneric("read rules in any folder")
|
return nil, accesscontrol.NewAuthorizationErrorGeneric("read rules in any folder")
|
||||||
}
|
}
|
||||||
|
sort.Strings(uids)
|
||||||
return uids, nil
|
return uids, nil
|
||||||
}
|
}
|
||||||
|
@ -119,6 +119,7 @@ func NewLokiClient(cfg LokiConfig, req client.Requester, metrics *metrics.Histor
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *HttpLokiClient) Ping(ctx context.Context) error {
|
func (c *HttpLokiClient) Ping(ctx context.Context) error {
|
||||||
|
log := c.log.FromContext(ctx)
|
||||||
uri := c.cfg.ReadPathURL.JoinPath("/loki/api/v1/labels")
|
uri := c.cfg.ReadPathURL.JoinPath("/loki/api/v1/labels")
|
||||||
req, err := http.NewRequest(http.MethodGet, uri.String(), nil)
|
req, err := http.NewRequest(http.MethodGet, uri.String(), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -131,7 +132,7 @@ func (c *HttpLokiClient) Ping(ctx context.Context) error {
|
|||||||
if res != nil {
|
if res != nil {
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := res.Body.Close(); err != nil {
|
if err := res.Body.Close(); err != nil {
|
||||||
c.log.Warn("Failed to close response body", "err", err)
|
log.Warn("Failed to close response body", "err", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
@ -142,7 +143,7 @@ func (c *HttpLokiClient) Ping(ctx context.Context) error {
|
|||||||
if res.StatusCode < 200 || res.StatusCode >= 300 {
|
if res.StatusCode < 200 || res.StatusCode >= 300 {
|
||||||
return fmt.Errorf("ping request to loki endpoint returned a non-200 status code: %d", res.StatusCode)
|
return fmt.Errorf("ping request to loki endpoint returned a non-200 status code: %d", res.StatusCode)
|
||||||
}
|
}
|
||||||
c.log.Debug("Ping request to Loki endpoint succeeded", "status", res.StatusCode)
|
log.Debug("Ping request to Loki endpoint succeeded", "status", res.StatusCode)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -180,6 +181,7 @@ func (r *Sample) UnmarshalJSON(b []byte) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *HttpLokiClient) Push(ctx context.Context, s []Stream) error {
|
func (c *HttpLokiClient) Push(ctx context.Context, s []Stream) error {
|
||||||
|
log := c.log.FromContext(ctx)
|
||||||
enc, err := c.encoder.encode(s)
|
enc, err := c.encoder.encode(s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -204,11 +206,11 @@ func (c *HttpLokiClient) Push(ctx context.Context, s []Stream) error {
|
|||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := resp.Body.Close(); err != nil {
|
if err := resp.Body.Close(); err != nil {
|
||||||
c.log.Warn("Failed to close response body", "err", err)
|
log.Warn("Failed to close response body", "err", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
_, err = c.handleLokiResponse(resp)
|
_, err = c.handleLokiResponse(log, resp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -227,6 +229,7 @@ func (c *HttpLokiClient) setAuthAndTenantHeaders(req *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *HttpLokiClient) RangeQuery(ctx context.Context, logQL string, start, end, limit int64) (QueryRes, error) {
|
func (c *HttpLokiClient) RangeQuery(ctx context.Context, logQL string, start, end, limit int64) (QueryRes, error) {
|
||||||
|
log := c.log.FromContext(ctx)
|
||||||
// Run the pre-flight checks for the query.
|
// Run the pre-flight checks for the query.
|
||||||
if start > end {
|
if start > end {
|
||||||
return QueryRes{}, fmt.Errorf("start time cannot be after end time")
|
return QueryRes{}, fmt.Errorf("start time cannot be after end time")
|
||||||
@ -248,7 +251,7 @@ func (c *HttpLokiClient) RangeQuery(ctx context.Context, logQL string, start, en
|
|||||||
values.Set("limit", fmt.Sprintf("%d", limit))
|
values.Set("limit", fmt.Sprintf("%d", limit))
|
||||||
|
|
||||||
queryURL.RawQuery = values.Encode()
|
queryURL.RawQuery = values.Encode()
|
||||||
|
log.Debug("Sending query request", "query", logQL, "start", start, "end", end, "limit", limit)
|
||||||
req, err := http.NewRequest(http.MethodGet,
|
req, err := http.NewRequest(http.MethodGet,
|
||||||
queryURL.String(), nil)
|
queryURL.String(), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -264,11 +267,11 @@ func (c *HttpLokiClient) RangeQuery(ctx context.Context, logQL string, start, en
|
|||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := res.Body.Close(); err != nil {
|
if err := res.Body.Close(); err != nil {
|
||||||
c.log.Warn("Failed to close response body", "err", err)
|
log.Warn("Failed to close response body", "err", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
data, err := c.handleLokiResponse(res)
|
data, err := c.handleLokiResponse(log, res)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return QueryRes{}, err
|
return QueryRes{}, err
|
||||||
}
|
}
|
||||||
@ -295,7 +298,7 @@ type QueryData struct {
|
|||||||
Result []Stream `json:"result"`
|
Result []Stream `json:"result"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *HttpLokiClient) handleLokiResponse(res *http.Response) ([]byte, error) {
|
func (c *HttpLokiClient) handleLokiResponse(log log.Logger, res *http.Response) ([]byte, error) {
|
||||||
if res == nil {
|
if res == nil {
|
||||||
return nil, fmt.Errorf("response is nil")
|
return nil, fmt.Errorf("response is nil")
|
||||||
}
|
}
|
||||||
@ -307,9 +310,9 @@ func (c *HttpLokiClient) handleLokiResponse(res *http.Response) ([]byte, error)
|
|||||||
|
|
||||||
if res.StatusCode < 200 || res.StatusCode >= 300 {
|
if res.StatusCode < 200 || res.StatusCode >= 300 {
|
||||||
if len(data) > 0 {
|
if len(data) > 0 {
|
||||||
c.log.Error("Error response from Loki", "response", string(data), "status", res.StatusCode)
|
log.Error("Error response from Loki", "response", string(data), "status", res.StatusCode)
|
||||||
} else {
|
} else {
|
||||||
c.log.Error("Error response from Loki with an empty body", "status", res.StatusCode)
|
log.Error("Error response from Loki with an empty body", "status", res.StatusCode)
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("received a non-200 response from loki, status: %d", res.StatusCode)
|
return nil, fmt.Errorf("received a non-200 response from loki, status: %d", res.StatusCode)
|
||||||
}
|
}
|
||||||
|
@ -202,34 +202,6 @@ func TestRemoteLokiBackend(t *testing.T) {
|
|||||||
require.Equal(t, exp, entry.Fingerprint)
|
require.Equal(t, exp, entry.Fingerprint)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("selector string", func(t *testing.T) {
|
|
||||||
selectors := []Selector{{"name", "=", "Bob"}, {"age", "=~", "30"}}
|
|
||||||
expected := "{name=\"Bob\",age=~\"30\"}"
|
|
||||||
result := selectorString(selectors, nil)
|
|
||||||
require.Equal(t, expected, result)
|
|
||||||
|
|
||||||
selectors = []Selector{{"name", "=", "quoted\"string"}, {"age", "=~", "30"}}
|
|
||||||
expected = "{name=\"quoted\\\"string\",age=~\"30\",folderUID=~`some\\\\d\\.r\\$|normal_string`}"
|
|
||||||
result = selectorString(selectors, []string{`some\d.r$`, "normal_string"})
|
|
||||||
require.Equal(t, expected, result)
|
|
||||||
|
|
||||||
selectors = []Selector{}
|
|
||||||
expected = "{}"
|
|
||||||
result = selectorString(selectors, nil)
|
|
||||||
require.Equal(t, expected, result)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("new selector", func(t *testing.T) {
|
|
||||||
selector, err := NewSelector("label", "=", "value")
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, "label", selector.Label)
|
|
||||||
require.Equal(t, Eq, selector.Op)
|
|
||||||
require.Equal(t, "value", selector.Value)
|
|
||||||
|
|
||||||
selector, err = NewSelector("label", "invalid", "value")
|
|
||||||
require.Error(t, err)
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBuildLogQuery(t *testing.T) {
|
func TestBuildLogQuery(t *testing.T) {
|
||||||
@ -238,21 +210,20 @@ func TestBuildLogQuery(t *testing.T) {
|
|||||||
name string
|
name string
|
||||||
query models.HistoryQuery
|
query models.HistoryQuery
|
||||||
folderUIDs []string
|
folderUIDs []string
|
||||||
exp string
|
exp []string
|
||||||
expErr error
|
expErr error
|
||||||
expDropped bool
|
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "default includes state history label and orgID label",
|
name: "default includes state history label and orgID label",
|
||||||
query: models.HistoryQuery{},
|
query: models.HistoryQuery{},
|
||||||
exp: `{orgID="0",from="state-history"}`,
|
exp: []string{`{orgID="0",from="state-history"}`},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "adds stream label filter for orgID",
|
name: "adds stream label filter for orgID",
|
||||||
query: models.HistoryQuery{
|
query: models.HistoryQuery{
|
||||||
OrgID: 123,
|
OrgID: 123,
|
||||||
},
|
},
|
||||||
exp: `{orgID="123",from="state-history"}`,
|
exp: []string{`{orgID="123",from="state-history"}`},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "filters ruleUID in log line",
|
name: "filters ruleUID in log line",
|
||||||
@ -260,7 +231,7 @@ func TestBuildLogQuery(t *testing.T) {
|
|||||||
OrgID: 123,
|
OrgID: 123,
|
||||||
RuleUID: "rule-uid",
|
RuleUID: "rule-uid",
|
||||||
},
|
},
|
||||||
exp: `{orgID="123",from="state-history"} | json | ruleUID="rule-uid"`,
|
exp: []string{`{orgID="123",from="state-history"} | json | ruleUID="rule-uid"`},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "filters dashboardUID in log line",
|
name: "filters dashboardUID in log line",
|
||||||
@ -268,7 +239,7 @@ func TestBuildLogQuery(t *testing.T) {
|
|||||||
OrgID: 123,
|
OrgID: 123,
|
||||||
DashboardUID: "dash-uid",
|
DashboardUID: "dash-uid",
|
||||||
},
|
},
|
||||||
exp: `{orgID="123",from="state-history"} | json | dashboardUID="dash-uid"`,
|
exp: []string{`{orgID="123",from="state-history"} | json | dashboardUID="dash-uid"`},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "filters panelID in log line",
|
name: "filters panelID in log line",
|
||||||
@ -276,7 +247,7 @@ func TestBuildLogQuery(t *testing.T) {
|
|||||||
OrgID: 123,
|
OrgID: 123,
|
||||||
PanelID: 456,
|
PanelID: 456,
|
||||||
},
|
},
|
||||||
exp: `{orgID="123",from="state-history"} | json | panelID=456`,
|
exp: []string{`{orgID="123",from="state-history"} | json | panelID=456`},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "filters instance labels in log line",
|
name: "filters instance labels in log line",
|
||||||
@ -287,7 +258,7 @@ func TestBuildLogQuery(t *testing.T) {
|
|||||||
"labeltwo": "labelvaluetwo",
|
"labeltwo": "labelvaluetwo",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
exp: `{orgID="123",from="state-history"} | json | labels_customlabel="customvalue" | labels_labeltwo="labelvaluetwo"`,
|
exp: []string{`{orgID="123",from="state-history"} | json | labels_customlabel="customvalue" | labels_labeltwo="labelvaluetwo"`},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "filters both instance labels + ruleUID",
|
name: "filters both instance labels + ruleUID",
|
||||||
@ -298,7 +269,8 @@ func TestBuildLogQuery(t *testing.T) {
|
|||||||
"customlabel": "customvalue",
|
"customlabel": "customvalue",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
exp: `{orgID="123",from="state-history"} | json | ruleUID="rule-uid" | labels_customlabel="customvalue"`},
|
exp: []string{`{orgID="123",from="state-history"} | json | ruleUID="rule-uid" | labels_customlabel="customvalue"`},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
name: "should return if query does not exceed max limit",
|
name: "should return if query does not exceed max limit",
|
||||||
query: models.HistoryQuery{
|
query: models.HistoryQuery{
|
||||||
@ -308,7 +280,7 @@ func TestBuildLogQuery(t *testing.T) {
|
|||||||
"customlabel": strings.Repeat("!", 24),
|
"customlabel": strings.Repeat("!", 24),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
exp: `{orgID="123",from="state-history"} | json | ruleUID="rule-uid" | labels_customlabel="!!!!!!!!!!!!!!!!!!!!!!!!"`,
|
exp: []string{`{orgID="123",from="state-history"} | json | ruleUID="rule-uid" | labels_customlabel="!!!!!!!!!!!!!!!!!!!!!!!!"`},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "should return error if query is too long",
|
name: "should return error if query is too long",
|
||||||
@ -327,34 +299,48 @@ func TestBuildLogQuery(t *testing.T) {
|
|||||||
OrgID: 123,
|
OrgID: 123,
|
||||||
},
|
},
|
||||||
folderUIDs: []string{"folder-1", "folder\\d"},
|
folderUIDs: []string{"folder-1", "folder\\d"},
|
||||||
exp: `{orgID="123",from="state-history",folderUID=~` + "`folder-1|folder\\\\d`" + `}`,
|
exp: []string{`{orgID="123",from="state-history",folderUID=~` + "`folder-1|folder\\\\d`" + `}`},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "should drop folders if it's too long",
|
name: "should batch queries to fit all folders",
|
||||||
query: models.HistoryQuery{
|
query: models.HistoryQuery{
|
||||||
OrgID: 123,
|
OrgID: 123,
|
||||||
RuleUID: "rule-uid",
|
|
||||||
Labels: map[string]string{
|
Labels: map[string]string{
|
||||||
"customlabel": "customvalue",
|
"customlabel": "customvalue",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
folderUIDs: []string{"folder-1", "folder-2", "folder\\d"},
|
folderUIDs: []string{"folder-1", "folder-2", "folder\\d", "folder-" + strings.Repeat("!", 13)},
|
||||||
exp: `{orgID="123",from="state-history"} | json | ruleUID="rule-uid" | labels_customlabel="customvalue"`,
|
exp: []string{
|
||||||
expDropped: true,
|
`{orgID="123",from="state-history",folderUID=~` + "`folder-1|folder-2`" + `} | json | labels_customlabel="customvalue"`,
|
||||||
|
`{orgID="123",from="state-history",folderUID=~` + "`folder\\\\d`" + `} | json | labels_customlabel="customvalue"`,
|
||||||
|
`{orgID="123",from="state-history",folderUID=~` + "`folder-!!!!!!!!!!!!!`" + `} | json | labels_customlabel="customvalue"`,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "should fail if a single folder UID is too long",
|
||||||
|
query: models.HistoryQuery{
|
||||||
|
OrgID: 123,
|
||||||
|
Labels: map[string]string{
|
||||||
|
"customlabel": "customvalue",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
folderUIDs: []string{"folder-1", "folder-2", "folder-" + strings.Repeat("!", 14)},
|
||||||
|
expErr: ErrLokiQueryTooLong,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range cases {
|
for _, tc := range cases {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
res, dropped, err := BuildLogQuery(tc.query, tc.folderUIDs, maxQuerySize)
|
res, err := BuildLogQuery(tc.query, tc.folderUIDs, maxQuerySize)
|
||||||
if tc.expErr != nil {
|
if tc.expErr != nil {
|
||||||
require.ErrorIs(t, err, tc.expErr)
|
require.ErrorIs(t, err, tc.expErr)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
require.LessOrEqual(t, len(res), maxQuerySize)
|
|
||||||
require.Equal(t, tc.expDropped, dropped)
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, tc.exp, res)
|
assert.EqualValues(t, tc.exp, res)
|
||||||
|
for i, q := range res {
|
||||||
|
assert.LessOrEqualf(t, len(q), maxQuerySize, "query at index %d exceeded max query size. Query: %s", i, q)
|
||||||
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -625,7 +611,7 @@ func TestMerge(t *testing.T) {
|
|||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
expectedJson, err := tc.expected.MarshalJSON()
|
expectedJson, err := tc.expected.MarshalJSON()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
m, err := merge(tc.res, tc.folderUIDs)
|
m, err := merge(tc.res.Data.Result, tc.folderUIDs)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
actualJson, err := m.MarshalJSON()
|
actualJson, err := m.MarshalJSON()
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
Loading…
Reference in New Issue
Block a user