Live: pipeline Loki output (#41272)

This commit is contained in:
Alexander Emelin
2021-11-09 13:10:43 +03:00
committed by GitHub
parent 833ed5418e
commit 78e9fe520b
6 changed files with 273 additions and 40 deletions

View File

@@ -4,14 +4,12 @@ import (
"context"
"fmt"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/services/encryption"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/encryption"
"github.com/grafana/grafana/pkg/services/live/managedstream"
"github.com/grafana/grafana/pkg/services/live/pipeline/pattern"
"github.com/grafana/grafana/pkg/services/live/pipeline/tree"
"github.com/grafana/grafana/pkg/setting"
"github.com/centrifugal/centrifuge"
)
@@ -53,6 +51,10 @@ type RemoteWriteOutputConfig struct {
SampleMilliseconds int64 `json:"sampleMilliseconds"`
}
type LokiOutputConfig struct {
UID string `json:"uid"`
}
type FrameOutputterConfig struct {
Type string `json:"type"`
ManagedStreamConfig *ManagedStreamOutputConfig `json:"managedStream,omitempty"`
@@ -61,12 +63,14 @@ type FrameOutputterConfig struct {
ConditionalOutputConfig *ConditionalOutputConfig `json:"conditional,omitempty"`
ThresholdOutputConfig *ThresholdOutputConfig `json:"threshold,omitempty"`
RemoteWriteOutputConfig *RemoteWriteOutputConfig `json:"remoteWrite,omitempty"`
LokiOutputConfig *LokiOutputConfig `json:"loki,omitempty"`
ChangeLogOutputConfig *ChangeLogOutputConfig `json:"changeLog,omitempty"`
}
type DataOutputterConfig struct {
Type string `json:"type"`
RedirectDataOutputConfig *RedirectDataOutputConfig `json:"redirect,omitempty"`
LokiOutputConfig *LokiOutputConfig `json:"loki,omitempty"`
}
type MultipleSubscriberConfig struct {
@@ -202,12 +206,6 @@ func (r RemoteWriteBackend) Valid() (bool, string) {
if r.Settings.Endpoint == "" {
return false, "endpoint required"
}
if r.Settings.User == "" {
return false, "user required"
}
if string(r.SecureSettings["password"]) == "" && r.Settings.Password == "" {
return false, "password required"
}
return true, ""
}
@@ -215,7 +213,7 @@ type RemoteWriteSettings struct {
// Endpoint to send streaming frames to.
Endpoint string `json:"endpoint"`
// User is a user for remote write request.
User string `json:"user"`
User string `json:"user,omitempty"`
// Password is a plain text non-encrypted password.
// TODO: remove after integrating with the database.
Password string `json:"password,omitempty"`
@@ -424,6 +422,22 @@ func (f *StorageRuleBuilder) extractFrameConditionChecker(config *FrameCondition
}
}
func (f *StorageRuleBuilder) getWritePassword(remoteWriteBackend RemoteWriteBackend) (string, error) {
var password string
hasSecurePassword := len(remoteWriteBackend.SecureSettings["password"]) > 0
if hasSecurePassword {
passwordBytes, err := f.EncryptionService.Decrypt(context.Background(), remoteWriteBackend.SecureSettings["password"], setting.SecretKey)
if err != nil {
return "", fmt.Errorf("password can't be decrypted: %w", err)
}
password = string(passwordBytes)
} else {
// Use plain text password (should be removed upon database integration).
password = remoteWriteBackend.Settings.Password
}
return password, nil
}
func (f *StorageRuleBuilder) extractFrameOutputter(config *FrameOutputterConfig, remoteWriteBackends []RemoteWriteBackend) (FrameOutputter, error) {
if config == nil {
return nil, nil
@@ -479,26 +493,33 @@ func (f *StorageRuleBuilder) extractFrameOutputter(config *FrameOutputterConfig,
if !ok {
return nil, fmt.Errorf("unknown remote write backend uid: %s", config.RemoteWriteOutputConfig.UID)
}
var password string
hasSecurePassword := len(remoteWriteBackend.SecureSettings["password"]) > 0
if hasSecurePassword {
passwordBytes, err := f.EncryptionService.Decrypt(context.Background(), remoteWriteBackend.SecureSettings["password"], setting.SecretKey)
if err != nil {
return nil, fmt.Errorf("password can't be decrypted: %w", err)
}
password = string(passwordBytes)
} else {
// Use plain text password (should be removed upon database integration).
password = remoteWriteBackend.Settings.Password
password, err := f.getWritePassword(remoteWriteBackend)
if err != nil {
return nil, fmt.Errorf("error getting password: %w", err)
}
return NewRemoteWriteFrameOutput(
remoteWriteBackend.Settings.Endpoint,
remoteWriteBackend.Settings.User,
password,
config.RemoteWriteOutputConfig.SampleMilliseconds,
), nil
case FrameOutputTypeLoki:
if config.LokiOutputConfig == nil {
return nil, missingConfiguration
}
remoteWriteBackend, ok := f.getRemoteWriteBackend(config.LokiOutputConfig.UID, remoteWriteBackends)
if !ok {
return nil, fmt.Errorf("unknown loki backend uid: %s", config.LokiOutputConfig.UID)
}
password, err := f.getWritePassword(remoteWriteBackend)
if err != nil {
return nil, fmt.Errorf("error getting password: %w", err)
}
return NewLokiFrameOutput(
remoteWriteBackend.Settings.Endpoint,
remoteWriteBackend.Settings.User,
password,
), nil
case FrameOutputTypeChangeLog:
if config.ChangeLogOutputConfig == nil {
return nil, missingConfiguration
@@ -509,7 +530,7 @@ func (f *StorageRuleBuilder) extractFrameOutputter(config *FrameOutputterConfig,
}
}
func (f *StorageRuleBuilder) extractDataOutputter(config *DataOutputterConfig) (DataOutputter, error) {
func (f *StorageRuleBuilder) extractDataOutputter(config *DataOutputterConfig, remoteWriteBackends []RemoteWriteBackend) (DataOutputter, error) {
if config == nil {
return nil, nil
}
@@ -520,6 +541,23 @@ func (f *StorageRuleBuilder) extractDataOutputter(config *DataOutputterConfig) (
return nil, missingConfiguration
}
return NewRedirectDataOutput(*config.RedirectDataOutputConfig), nil
case DataOutputTypeLoki:
if config.LokiOutputConfig == nil {
return nil, missingConfiguration
}
remoteWriteBackend, ok := f.getRemoteWriteBackend(config.LokiOutputConfig.UID, remoteWriteBackends)
if !ok {
return nil, fmt.Errorf("unknown loki backend uid: %s", config.LokiOutputConfig.UID)
}
password, err := f.getWritePassword(remoteWriteBackend)
if err != nil {
return nil, fmt.Errorf("error getting password: %w", err)
}
return NewLokiDataOutput(
remoteWriteBackend.Settings.Endpoint,
remoteWriteBackend.Settings.User,
password,
), nil
case DataOutputTypeBuiltin:
return NewBuiltinDataOutput(f.ChannelHandlerGetter), nil
case DataOutputTypeLocalSubscribers:
@@ -584,7 +622,7 @@ func (f *StorageRuleBuilder) BuildRules(ctx context.Context, orgID int64) ([]*Li
var dataOutputters []DataOutputter
for _, outConfig := range ruleConfig.Settings.DataOutputters {
out, err := f.extractDataOutputter(outConfig)
out, err := f.extractDataOutputter(outConfig, remoteWriteBackends)
if err != nil {
return nil, fmt.Errorf("error building data outputter for %s: %w", rule.Pattern, err)
}

View File

@@ -0,0 +1,37 @@
package pipeline
import (
"context"
"time"
)
// LokiDataOutput can output raw data to Loki (as string value).
type LokiDataOutput struct {
lokiWriter *lokiWriter
}
func NewLokiDataOutput(endpoint, user, password string) *LokiDataOutput {
return &LokiDataOutput{
lokiWriter: newLokiWriter(endpoint, user, password),
}
}
const DataOutputTypeLoki = "loki"
func (out *LokiDataOutput) Type() string {
return DataOutputTypeLoki
}
func (out *LokiDataOutput) OutputData(_ context.Context, vars Vars, data []byte) ([]*ChannelData, error) {
if out.lokiWriter.endpoint == "" {
logger.Debug("Skip sending to Loki: no url")
return nil, nil
}
err := out.lokiWriter.write(LokiStream{
Stream: map[string]string{"channel": vars.Channel},
Values: []interface{}{
[]interface{}{time.Now().UnixNano(), string(data)},
},
})
return nil, err
}

View File

@@ -99,7 +99,14 @@ type DevRuleBuilder struct {
func (f *DevRuleBuilder) BuildRules(_ context.Context, _ int64) ([]*LiveChannelRule, error) {
return []*LiveChannelRule{
{
Pattern: "plugin/testdata/random-20Hz-stream",
Pattern: "plugin/testdata/random-20Hz-stream",
DataOutputters: []DataOutputter{
NewLokiDataOutput(
os.Getenv("GF_LIVE_LOKI_ENDPOINT"),
os.Getenv("GF_LIVE_LOKI_USER"),
os.Getenv("GF_LIVE_LOKI_PASSWORD"),
),
},
Converter: NewJsonFrameConverter(JsonFrameConverterConfig{}),
FrameOutputters: []FrameOutputter{
NewManagedStreamFrameOutput(f.ManagedStream),

View File

@@ -0,0 +1,148 @@
package pipeline
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"sync"
"time"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
const lokiFlushInterval = 15 * time.Second
// LokiFrameOutput can output frame encoded to JSON to Loki.
type LokiFrameOutput struct {
lokiWriter *lokiWriter
}
func NewLokiFrameOutput(endpoint, user, password string) *LokiFrameOutput {
return &LokiFrameOutput{
lokiWriter: newLokiWriter(endpoint, user, password),
}
}
const FrameOutputTypeLoki = "loki"
func (out *LokiFrameOutput) Type() string {
return FrameOutputTypeLoki
}
type LokiStreamsEntry struct {
Streams []LokiStream `json:"streams"`
}
type LokiStream struct {
Stream map[string]string `json:"stream"`
Values []interface{} `json:"values"`
}
func (out *LokiFrameOutput) OutputFrame(_ context.Context, vars Vars, frame *data.Frame) ([]*ChannelFrame, error) {
if out.lokiWriter.endpoint == "" {
logger.Debug("Skip sending to Loki: no url")
return nil, nil
}
frameJSON, err := data.FrameToJSON(frame, data.IncludeAll)
if err != nil {
return nil, err
}
err = out.lokiWriter.write(LokiStream{
Stream: map[string]string{"frame": frame.Name, "channel": vars.Channel},
Values: []interface{}{
[]interface{}{time.Now().UnixNano(), string(frameJSON)},
},
})
return nil, err
}
type lokiWriter struct {
mu sync.RWMutex
httpClient *http.Client
buffer []LokiStream
// Endpoint to send streaming frames to.
endpoint string
// User is a user for remote write request.
user string
// Password for remote write endpoint.
password string
}
func newLokiWriter(endpoint, user, password string) *lokiWriter {
w := &lokiWriter{
endpoint: endpoint,
user: user,
password: password,
httpClient: &http.Client{
Timeout: 2 * time.Second,
},
}
go w.flushPeriodically()
return w
}
func (w *lokiWriter) flushPeriodically() {
for range time.NewTicker(lokiFlushInterval).C {
w.mu.Lock()
if len(w.buffer) == 0 {
w.mu.Unlock()
continue
}
tmpBuffer := make([]LokiStream, len(w.buffer))
copy(tmpBuffer, w.buffer)
w.buffer = nil
w.mu.Unlock()
err := w.flush(tmpBuffer)
if err != nil {
logger.Error("Error flush to Loki", "error", err)
w.mu.Lock()
// TODO: drop in case of large buffer size? Make several attempts only?
w.buffer = append(tmpBuffer, w.buffer...)
w.mu.Unlock()
}
}
}
func (w *lokiWriter) write(s LokiStream) error {
w.mu.Lock()
w.buffer = append(w.buffer, s)
w.mu.Unlock()
return nil
}
func (w *lokiWriter) flush(streams []LokiStream) error {
logger.Debug("Loki flush", "numStreams", len(streams))
writeData, err := json.Marshal(LokiStreamsEntry{
Streams: streams,
})
if err != nil {
return fmt.Errorf("error converting Loki stream entry to bytes: %v", err)
}
logger.Debug("Sending to Loki endpoint", "url", w.endpoint, "bodyLength", len(writeData))
req, err := http.NewRequest(http.MethodPost, w.endpoint, bytes.NewReader(writeData))
if err != nil {
return fmt.Errorf("error constructing loki push request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
if w.user != "" {
req.SetBasicAuth(w.user, w.password)
}
started := time.Now()
resp, err := w.httpClient.Do(req)
if err != nil {
return fmt.Errorf("error sending to Loki: %w", err)
}
_ = resp.Body.Close()
if resp.StatusCode != http.StatusNoContent {
logger.Error("Unexpected response code from Loki endpoint", "code", resp.StatusCode)
return errors.New("unexpected response code Loki endpoint")
}
logger.Debug("Successfully sent to Loki", "elapsed", time.Since(started))
return nil
}

View File

@@ -44,6 +44,10 @@ var FrameOutputsRegistry = []EntityInfo{
Type: FrameOutputTypeRemoteWrite,
Description: "output to remote write endpoint",
},
{
Type: FrameOutputTypeLoki,
Description: "output frame as JSON to Loki",
},
}
var ConvertersRegistry = []EntityInfo{
@@ -90,4 +94,8 @@ var DataOutputsRegistry = []EntityInfo{
Type: DataOutputTypeRedirect,
Description: "redirect data processing to another channel rule",
},
{
Type: DataOutputTypeLoki,
Description: "output data to Loki as logs",
},
}

View File

@@ -12,9 +12,8 @@ type testBuilder struct{}
func (t *testBuilder) BuildRules(_ context.Context, _ int64) ([]*LiveChannelRule, error) {
return []*LiveChannelRule{
{
OrgId: 1,
Pattern: "stream/telegraf/cpu",
Converter: NewAutoJsonConverter(AutoJsonConverterConfig{}),
OrgId: 1,
Pattern: "stream/telegraf/cpu",
},
{
OrgId: 1,
@@ -23,14 +22,10 @@ func (t *testBuilder) BuildRules(_ context.Context, _ int64) ([]*LiveChannelRule
{
OrgId: 1,
Pattern: "stream/telegraf/:metric/:extra",
FrameOutputters: []FrameOutputter{
NewRedirectFrameOutput(RedirectOutputConfig{}),
},
},
{
OrgId: 1,
Pattern: "stream/boom:er",
Converter: NewExactJsonConverter(ExactJsonConverterConfig{}),
OrgId: 1,
Pattern: "stream/boom:er",
},
}, nil
}
@@ -40,22 +35,22 @@ func TestStorage_Get(t *testing.T) {
rule, ok, err := s.Get(1, "stream/telegraf/cpu")
require.NoError(t, err)
require.True(t, ok)
require.NotNil(t, rule.Converter)
require.Equal(t, "stream/telegraf/cpu", rule.Pattern)
rule, ok, err = s.Get(1, "stream/telegraf/mem")
require.NoError(t, err)
require.True(t, ok)
require.Nil(t, rule.Converter)
require.Equal(t, "stream/telegraf/:metric", rule.Pattern)
rule, ok, err = s.Get(1, "stream/telegraf/mem/rss")
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, FrameOutputTypeRedirect, rule.FrameOutputters[0].Type())
require.Equal(t, "stream/telegraf/:metric/:extra", rule.Pattern)
rule, ok, err = s.Get(1, "stream/booms")
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, ConverterTypeJsonExact, rule.Converter.Type())
require.Equal(t, "stream/boom:er", rule.Pattern)
}
func BenchmarkRuleGet(b *testing.B) {