mirror of
https://github.com/grafana/grafana.git
synced 2025-01-10 08:03:58 -06:00
953c8f838f
Co-authored-by: Sven Grossmann <sven.grossmann@grafana.com>
381 lines
12 KiB
Go
381 lines
12 KiB
Go
package loki
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"path"
|
|
"strconv"
|
|
"syscall"
|
|
"time"
|
|
|
|
jsoniter "github.com/json-iterator/go"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/codes"
|
|
"go.opentelemetry.io/otel/trace"
|
|
"google.golang.org/grpc/metadata"
|
|
|
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
|
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
|
|
"github.com/grafana/grafana/pkg/infra/tracing"
|
|
"github.com/grafana/grafana/pkg/promlib/converter"
|
|
"github.com/grafana/grafana/pkg/tsdb/loki/instrumentation"
|
|
)
|
|
|
|
type LokiAPI struct {
|
|
client *http.Client
|
|
url string
|
|
log log.Logger
|
|
tracer tracing.Tracer
|
|
requestStructuredMetadata bool
|
|
}
|
|
|
|
type RawLokiResponse struct {
|
|
Body []byte
|
|
Status int
|
|
Encoding string
|
|
}
|
|
|
|
func newLokiAPI(client *http.Client, url string, log log.Logger, tracer tracing.Tracer, requestStructuredMetadata bool) *LokiAPI {
|
|
return &LokiAPI{client: client, url: url, log: log, tracer: tracer, requestStructuredMetadata: requestStructuredMetadata}
|
|
}
|
|
|
|
func makeDataRequest(ctx context.Context, lokiDsUrl string, query lokiQuery, categorizeLabels bool) (*http.Request, error) {
|
|
qs := url.Values{}
|
|
qs.Set("query", query.Expr)
|
|
|
|
qs.Set("direction", string(query.Direction))
|
|
|
|
// MaxLines defaults to zero when not received,
|
|
// and Loki does not like limit=0, even when it is not needed
|
|
// (for example for metric queries), so we
|
|
// only send it when it's set
|
|
if query.MaxLines > 0 {
|
|
qs.Set("limit", fmt.Sprintf("%d", query.MaxLines))
|
|
}
|
|
|
|
lokiUrl, err := url.Parse(lokiDsUrl)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
switch query.QueryType {
|
|
case QueryTypeRange:
|
|
{
|
|
qs.Set("start", strconv.FormatInt(query.Start.UnixNano(), 10))
|
|
qs.Set("end", strconv.FormatInt(query.End.UnixNano(), 10))
|
|
// NOTE: technically for streams-producing queries `step`
|
|
// is ignored, so it would be nicer to not send it in such cases,
|
|
// but we cannot detect that situation, so we always send it.
|
|
// it should not break anything.
|
|
// NOTE2: we do this at millisecond precision for two reasons:
|
|
// a. Loki cannot do steps with better precision anyway,
|
|
// so the microsecond & nanosecond part can be ignored.
|
|
// b. having it always be number+'ms' makes it more robust and
|
|
// precise, as Loki does not support step with float number
|
|
// and time-specifier, like "1.5s"
|
|
qs.Set("step", fmt.Sprintf("%dms", query.Step.Milliseconds()))
|
|
lokiUrl.Path = path.Join(lokiUrl.Path, "/loki/api/v1/query_range")
|
|
}
|
|
case QueryTypeInstant:
|
|
{
|
|
qs.Set("time", strconv.FormatInt(query.End.UnixNano(), 10))
|
|
lokiUrl.Path = path.Join(lokiUrl.Path, "/loki/api/v1/query")
|
|
}
|
|
default:
|
|
return nil, fmt.Errorf("invalid QueryType: %v", query.QueryType)
|
|
}
|
|
|
|
lokiUrl.RawQuery = qs.Encode()
|
|
|
|
req, err := http.NewRequestWithContext(ctx, "GET", lokiUrl.String(), nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if query.SupportingQueryType != SupportingQueryNone {
|
|
value := getSupportingQueryHeaderValue(query.SupportingQueryType)
|
|
if value != "" {
|
|
req.Header.Set("X-Query-Tags", "Source="+value)
|
|
}
|
|
}
|
|
|
|
if categorizeLabels {
|
|
req.Header.Set("X-Loki-Response-Encoding-Flags", "categorize-labels")
|
|
}
|
|
|
|
setXScopeOrgIDHeader(req, ctx)
|
|
|
|
return req, nil
|
|
}
|
|
|
|
type lokiResponseError struct {
|
|
Message string `json:"message"`
|
|
TraceID string `json:"traceID,omitempty"`
|
|
}
|
|
|
|
type lokiError struct {
|
|
Message string
|
|
}
|
|
|
|
func makeLokiError(bytes []byte) error {
|
|
var data lokiError
|
|
err := json.Unmarshal(bytes, &data)
|
|
if err != nil {
|
|
// we were unable to convert the bytes to JSON, we return the whole text
|
|
return fmt.Errorf("%v", string(bytes))
|
|
}
|
|
|
|
if data.Message == "" {
|
|
// we got no usable error message, we return the whole text
|
|
return fmt.Errorf("%v", string(bytes))
|
|
}
|
|
|
|
return fmt.Errorf("%v", data.Message)
|
|
}
|
|
|
|
// we know there is an error,
|
|
// based on the http-response-body
|
|
// we have to make an informative error-object
|
|
func readLokiError(body io.ReadCloser) error {
|
|
var buf bytes.Buffer
|
|
_, err := buf.ReadFrom(body)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
bytes := buf.Bytes()
|
|
|
|
// the error-message is probably a JSON structure,
|
|
// with a string-field named "message". we want the
|
|
// value of that field.
|
|
// but, the response might be just a simple string,
|
|
// this was used in older Loki versions.
|
|
// so our approach is this:
|
|
// - we try to convert the bytes to JSON
|
|
// - we take the value of the field "message"
|
|
// - if any of these steps fail, or if "message" is empty, we return the whole text
|
|
|
|
return makeLokiError(bytes)
|
|
}
|
|
|
|
func (api *LokiAPI) DataQuery(ctx context.Context, query lokiQuery, responseOpts ResponseOpts) (*backend.DataResponse, error) {
|
|
req, err := makeDataRequest(ctx, api.url, query, api.requestStructuredMetadata)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
queryAttrs := []any{"start", query.Start, "end", query.End, "step", query.Step, "query", query.Expr, "queryType", query.QueryType, "direction", query.Direction, "maxLines", query.MaxLines, "supportingQueryType", query.SupportingQueryType, "lokiHost", req.URL.Host, "lokiPath", req.URL.Path}
|
|
api.log.Debug("Sending query to loki", queryAttrs...)
|
|
start := time.Now()
|
|
resp, err := api.client.Do(req)
|
|
if err != nil {
|
|
status := "error"
|
|
if errors.Is(err, context.Canceled) {
|
|
status = "cancelled"
|
|
}
|
|
lp := []any{"error", err, "status", status, "duration", time.Since(start), "stage", stageDatabaseRequest}
|
|
lp = append(lp, queryAttrs...)
|
|
if resp != nil {
|
|
lp = append(lp, "statusCode", resp.StatusCode)
|
|
}
|
|
api.log.Error("Error received from Loki", lp...)
|
|
res := backend.DataResponse{
|
|
Error: err,
|
|
}
|
|
if errors.Is(err, syscall.ECONNREFUSED) {
|
|
res.ErrorSource = backend.ErrorSourceDownstream
|
|
}
|
|
return &res, nil
|
|
}
|
|
|
|
defer func() {
|
|
if err := resp.Body.Close(); err != nil {
|
|
api.log.Warn("Failed to close response body", "error", err)
|
|
}
|
|
}()
|
|
|
|
lp := []any{"duration", time.Since(start), "stage", stageDatabaseRequest, "statusCode", resp.StatusCode, "contentLength", resp.Header.Get("Content-Length")}
|
|
lp = append(lp, queryAttrs...)
|
|
if resp.StatusCode/100 != 2 {
|
|
err := readLokiError(resp.Body)
|
|
res := backend.DataResponse{
|
|
Error: err,
|
|
ErrorSource: backend.ErrorSourceFromHTTPStatus(resp.StatusCode),
|
|
}
|
|
lp = append(lp, "status", "error", "error", err, "statusSource", res.ErrorSource)
|
|
api.log.Error("Error received from Loki", lp...)
|
|
return &res, nil
|
|
} else {
|
|
lp = append(lp, "status", "ok")
|
|
api.log.Info("Response received from loki", lp...)
|
|
}
|
|
|
|
start = time.Now()
|
|
_, span := api.tracer.Start(ctx, "datasource.loki.parseResponse", trace.WithAttributes(
|
|
attribute.Bool("metricDataplane", responseOpts.metricDataplane),
|
|
))
|
|
defer span.End()
|
|
|
|
iter := jsoniter.Parse(jsoniter.ConfigDefault, resp.Body, 1024)
|
|
res := converter.ReadPrometheusStyleResult(iter, converter.Options{Dataplane: responseOpts.metricDataplane})
|
|
|
|
if res.Error != nil {
|
|
span.RecordError(res.Error)
|
|
span.SetStatus(codes.Error, res.Error.Error())
|
|
instrumentation.UpdatePluginParsingResponseDurationSeconds(ctx, time.Since(start), "error")
|
|
api.log.Error("Error parsing response from loki", "error", res.Error, "metricDataplane", responseOpts.metricDataplane, "duration", time.Since(start), "stage", stageParseResponse)
|
|
return nil, res.Error
|
|
}
|
|
instrumentation.UpdatePluginParsingResponseDurationSeconds(ctx, time.Since(start), "ok")
|
|
api.log.Info("Response parsed from loki", "duration", time.Since(start), "metricDataplane", responseOpts.metricDataplane, "framesLength", len(res.Frames), "stage", stageParseResponse)
|
|
|
|
return &res, nil
|
|
}
|
|
|
|
func makeRawRequest(ctx context.Context, lokiDsUrl string, resourcePath string) (*http.Request, error) {
|
|
lokiUrl, err := url.Parse(lokiDsUrl)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resourceUrl, err := url.Parse(resourcePath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// we take the path and the query-string only
|
|
lokiUrl.RawQuery = resourceUrl.RawQuery
|
|
lokiUrl.Path = path.Join(lokiUrl.Path, resourceUrl.Path)
|
|
|
|
req, err := http.NewRequestWithContext(ctx, "GET", lokiUrl.String(), nil)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
setXScopeOrgIDHeader(req, ctx)
|
|
|
|
return req, nil
|
|
}
|
|
|
|
func (api *LokiAPI) RawQuery(ctx context.Context, resourcePath string) (RawLokiResponse, error) {
|
|
api.log.Debug("Sending raw query to loki", "resourcePath", resourcePath)
|
|
req, err := makeRawRequest(ctx, api.url, resourcePath)
|
|
if err != nil {
|
|
api.log.Error("Failed to prepare request to loki", "error", err, "resourcePath", resourcePath)
|
|
return RawLokiResponse{}, err
|
|
}
|
|
start := time.Now()
|
|
resp, err := api.client.Do(req)
|
|
if err != nil {
|
|
status := "error"
|
|
if errors.Is(err, context.Canceled) {
|
|
status = "cancelled"
|
|
}
|
|
lp := []any{"error", err, "resourcePath", resourcePath, "status", status, "duration", time.Since(start), "stage", stageDatabaseRequest}
|
|
if resp != nil {
|
|
lp = append(lp, "statusCode", resp.StatusCode)
|
|
}
|
|
api.log.Error("Error received from Loki", lp...)
|
|
return RawLokiResponse{}, err
|
|
}
|
|
|
|
defer func() {
|
|
if err := resp.Body.Close(); err != nil {
|
|
api.log.Warn("Failed to close response body", "error", err)
|
|
}
|
|
}()
|
|
|
|
api.log.Info("Response received from loki", "status", "ok", "statusCode", resp.StatusCode, "contentLength", resp.Header.Get("Content-Length"), "duration", time.Since(start), "contentEncoding", resp.Header.Get("Content-Encoding"), "stage", stageDatabaseRequest)
|
|
|
|
// server errors are handled by the plugin-proxy to hide the error message
|
|
if resp.StatusCode/100 == 5 {
|
|
return RawLokiResponse{}, readLokiError(resp.Body)
|
|
}
|
|
|
|
body, err := io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
api.log.Error("Error reading response body bytes", "error", err)
|
|
return RawLokiResponse{}, err
|
|
}
|
|
|
|
// client errors are passed as a json struct to the client
|
|
if resp.StatusCode/100 != 2 {
|
|
lokiResponseErr := lokiResponseError{Message: makeLokiError(body).Error()}
|
|
api.log.Warn("Non 200 HTTP status received from loki", "error", lokiResponseErr.Message, "statusCode", resp.StatusCode, "resourcePath", resourcePath)
|
|
traceID := tracing.TraceIDFromContext(ctx, false)
|
|
if traceID != "" {
|
|
lokiResponseErr.TraceID = traceID
|
|
}
|
|
body, err = json.Marshal(lokiResponseErr)
|
|
if err != nil {
|
|
return RawLokiResponse{}, err
|
|
}
|
|
}
|
|
|
|
rawLokiResponse := RawLokiResponse{
|
|
Body: body,
|
|
Status: resp.StatusCode,
|
|
Encoding: resp.Header.Get("Content-Encoding"),
|
|
}
|
|
|
|
return rawLokiResponse, nil
|
|
}
|
|
|
|
func getSupportingQueryHeaderValue(supportingQueryType SupportingQueryType) string {
|
|
value := ""
|
|
|
|
// we need to map the SupportingQueryType to the actual header value. For
|
|
// legacy reasons we defined each value, such as "logsVolume" maps to the
|
|
// "logvolhist" header value to Loki. With #85123, even the value set in the
|
|
// frontend query can be passed as is to Loki.
|
|
switch supportingQueryType {
|
|
case SupportingQueryLogsVolume:
|
|
value = "logvolhist"
|
|
case SupportingQueryLogsSample:
|
|
value = "logsample"
|
|
case SupportingQueryDataSample:
|
|
value = "datasample"
|
|
case SupportingQueryInfiniteScroll:
|
|
value = "infinitescroll"
|
|
default:
|
|
value = string(supportingQueryType)
|
|
}
|
|
|
|
return value
|
|
}
|
|
|
|
// setXScopeOrgIDHeader sets the `X-Scope-OrgID` header in the provided HTTP request based on the tenant ID retrieved from the context.
|
|
// `X-Scope-OrgID` is needed by the Loki system to work in multi-tenant mode.
|
|
// See https://github.com/grafana/loki/blob/main/docs/sources/operations/multi-tenancy.md
|
|
func setXScopeOrgIDHeader(req *http.Request, ctx context.Context) *http.Request {
|
|
logger := backend.NewLoggerWith("logger", "tsdb.loki")
|
|
|
|
md, ok := metadata.FromIncomingContext(ctx)
|
|
if !ok {
|
|
// Metadata are currently set and needed only locally for multi-tenancy, while on cloud
|
|
// this is set by our stack
|
|
logger.Debug("Metadata not present in context. Header not set")
|
|
return req
|
|
}
|
|
|
|
tenantids := md.Get("tenantid")
|
|
if len(tenantids) == 0 {
|
|
// We assume we are not using multi-tenant mode, which is fine
|
|
logger.Debug("Tenant ID not present. Header not set")
|
|
} else if len(tenantids) > 1 {
|
|
// Loki supports multiple tenant IDs, but we should receive them from different contexts
|
|
logger.Error(strconv.Itoa(len(tenantids)) + " tenant IDs found. Header not set")
|
|
} else {
|
|
req.Header.Add("X-Scope-OrgID", tenantids[0])
|
|
logger.Debug("Tenant ID " + tenantids[0] + " added to Loki request")
|
|
}
|
|
return req
|
|
}
|