From afef0c926cd3c31e8497c7a581d995a4d3dd7d80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joan=20L=C3=B3pez=20de=20la=20Franca=20Beltran?= <5459617+joanlopez@users.noreply.github.com> Date: Mon, 23 Jan 2023 16:24:22 +0100 Subject: [PATCH] Loki: Push support for multi-tenancy mode (#60866) * Loki: Add multitenancy support for the HTTP client * Loki: Support to push with multitenancy mode * Apply feedback suggestions --- pkg/components/loki/lokigrpc/client.go | 96 ++++++++++++++++++++++++++ pkg/components/loki/lokigrpc/config.go | 15 ++++ pkg/components/loki/lokigrpc/tenant.go | 30 ++++++++ pkg/components/loki/lokihttp/client.go | 19 ++++- pkg/components/loki/lokihttp/config.go | 2 + 5 files changed, 161 insertions(+), 1 deletion(-) create mode 100644 pkg/components/loki/lokigrpc/client.go create mode 100644 pkg/components/loki/lokigrpc/config.go create mode 100644 pkg/components/loki/lokigrpc/tenant.go diff --git a/pkg/components/loki/lokigrpc/client.go b/pkg/components/loki/lokigrpc/client.go new file mode 100644 index 00000000000..e8a0c11808c --- /dev/null +++ b/pkg/components/loki/lokigrpc/client.go @@ -0,0 +1,96 @@ +package lokigrpc + +import ( + "context" + "crypto/tls" + "errors" + + grpcretry "github.com/grpc-ecosystem/go-grpc-middleware/retry" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" + + "github.com/grafana/grafana/pkg/components/loki/logproto" +) + +// Client is a gRPC-based Loki client implementation. +type Client struct { + client logproto.PusherClient + conn *grpc.ClientConn + opts []grpc.DialOption + cfg Config +} + +// NewClient instantiates a new Client. +func NewClient(cfg Config, opts ...grpc.DialOption) (*Client, error) { + w := &Client{ + opts: opts, + cfg: cfg, + } + + return w, w.init() +} + +// Write pushes a new request with the given streams through +// the attached gRPC connection. +func (c *Client) Write(streams []logproto.Stream) (err error) { + pushRequest := &logproto.PushRequest{ + Streams: streams, + } + + ctx, cancel := c.timeoutCtx() + defer cancel() + + if len(c.cfg.TenantID) > 0 { + ctx = injectOrgID(ctx, c.cfg.TenantID) + } + + _, err = c.client.Push(ctx, pushRequest) + return err +} + +// Close closes the attached gRPC connection. +func (c *Client) Close() error { + return c.conn.Close() +} + +func (c *Client) init() error { + if len(c.cfg.URL) == 0 { + return errors.New("cfg must have Loki url") + } + + opts := append(c.opts, c.grpcTLSOption(), c.grpcRetryOption()) + conn, err := grpc.Dial(c.cfg.URL, opts...) + if err != nil { + return err + } + + c.conn = conn + c.client = logproto.NewPusherClient(conn) + return nil +} + +func (c *Client) grpcTLSOption() grpc.DialOption { + if c.cfg.TLSDisabled { + return grpc.WithTransportCredentials(insecure.NewCredentials()) + } + + config := &tls.Config{InsecureSkipVerify: false} + return grpc.WithTransportCredentials(credentials.NewTLS(config)) +} + +func (c *Client) grpcRetryOption() grpc.DialOption { + return grpc.WithUnaryInterceptor( + grpcretry.UnaryClientInterceptor( + grpcretry.WithMax(c.cfg.Retries), + ), + ) +} + +func (c *Client) timeoutCtx() (context.Context, context.CancelFunc) { + if c.cfg.Timeout > 0 { + return context.WithTimeout(context.Background(), c.cfg.Timeout) + } + + return context.WithCancel(context.Background()) +} diff --git a/pkg/components/loki/lokigrpc/config.go b/pkg/components/loki/lokigrpc/config.go new file mode 100644 index 00000000000..3e1b4b08619 --- /dev/null +++ b/pkg/components/loki/lokigrpc/config.go @@ -0,0 +1,15 @@ +package lokigrpc + +import "time" + +// Config describes configuration for a gRPC pusher client. +type Config struct { + URL string + + Retries uint + Timeout time.Duration + + TLSDisabled bool + + TenantID string +} diff --git a/pkg/components/loki/lokigrpc/tenant.go b/pkg/components/loki/lokigrpc/tenant.go new file mode 100644 index 00000000000..1079c6489b9 --- /dev/null +++ b/pkg/components/loki/lokigrpc/tenant.go @@ -0,0 +1,30 @@ +package lokigrpc + +import ( + "context" + "errors" + + "google.golang.org/grpc/metadata" +) + +const ( + lowerOrgIDHeaderName = "x-scope-orgid" +) + +var ( + ErrDifferentOrgIDPresent = errors.New("different org ID already present") + ErrTooManyOrgIDs = errors.New("multiple org IDs present") +) + +func injectOrgID(ctx context.Context, tenantID string) context.Context { + md, ok := metadata.FromOutgoingContext(ctx) + if ok { + md = md.Copy() + } else { + md = metadata.New(map[string]string{}) + } + + md[lowerOrgIDHeaderName] = []string{tenantID} + newCtx := metadata.NewOutgoingContext(ctx, md) + return newCtx +} diff --git a/pkg/components/loki/lokihttp/client.go b/pkg/components/loki/lokihttp/client.go index 924c0d98c75..7c876b9c613 100644 --- a/pkg/components/loki/lokihttp/client.go +++ b/pkg/components/loki/lokihttp/client.go @@ -15,6 +15,7 @@ import ( "github.com/grafana/dskit/backoff" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/config" + "github.com/prometheus/common/model" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/setting" @@ -231,7 +232,7 @@ func (c *client) run() { if !ok { return } - tenantID := "" + tenantID := c.getTenantID(e.Labels) batch, ok := batches[tenantID] // If the batch doesn't exist yet, we create a new one with the entry @@ -266,6 +267,22 @@ func (c *client) run() { } } +func (c *client) getTenantID(labels model.LabelSet) string { + // Check if it has been overridden while processing the pipeline stages + if value, ok := labels[ReservedLabelTenantID]; ok { + return string(value) + } + + // Check if has been specified in the config + if c.cfg.TenantID != "" { + return c.cfg.TenantID + } + + // Defaults to an empty string, which means the X-Scope-OrgID header + // will not be sent + return "" +} + func (c *client) Chan() chan<- Entry { return c.entries } diff --git a/pkg/components/loki/lokihttp/config.go b/pkg/components/loki/lokihttp/config.go index 10cba6e4761..7989118188a 100644 --- a/pkg/components/loki/lokihttp/config.go +++ b/pkg/components/loki/lokihttp/config.go @@ -18,4 +18,6 @@ type Config struct { BackoffConfig backoff.Config Timeout time.Duration + + TenantID string }