Live: Telegraf input modifiers (#32982)

This commit is contained in:
Alexander Emelin 2021-04-19 18:48:43 +03:00 committed by GitHub
parent f92b6518c5
commit d807fbc9e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 187 additions and 57 deletions

2
go.mod
View File

@ -43,7 +43,7 @@ require (
github.com/gosimple/slug v1.9.0
github.com/grafana/alerting-api v0.0.0-20210414165752-6625e7a4f9a9
github.com/grafana/grafana-aws-sdk v0.4.0
github.com/grafana/grafana-live-sdk v0.0.4
github.com/grafana/grafana-live-sdk v0.0.5
github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4
github.com/grafana/grafana-plugin-sdk-go v0.92.0
github.com/grafana/loki v1.6.2-0.20201026154740-6978ee5d7387

4
go.sum
View File

@ -826,8 +826,8 @@ github.com/grafana/grafana v1.9.2-0.20210308201921-4ce0a49eac03/go.mod h1:AHRRvd
github.com/grafana/grafana-aws-sdk v0.1.0/go.mod h1:+pPo5U+pX0zWimR7YBc7ASeSQfbRkcTyQYqMiAj7G5U=
github.com/grafana/grafana-aws-sdk v0.4.0 h1:JmTaXfOJ/ydHSWH9kEt8Yhfb9kAhIW4LUOO3SWCviYg=
github.com/grafana/grafana-aws-sdk v0.4.0/go.mod h1:+pPo5U+pX0zWimR7YBc7ASeSQfbRkcTyQYqMiAj7G5U=
github.com/grafana/grafana-live-sdk v0.0.4 h1:mATki7fEkKtX4jD+HfOKst9CgFcVyF/pr3Co+gy+Ato=
github.com/grafana/grafana-live-sdk v0.0.4/go.mod h1:f15hHmWyLdFjmuWLsjeKeZnq/HnNQ3QkoPcaEww45AY=
github.com/grafana/grafana-live-sdk v0.0.5 h1:Y7qdOLbl+N4pXsfAR5+e4YWcOk/WFB7WpuT3Liv6/as=
github.com/grafana/grafana-live-sdk v0.0.5/go.mod h1:f15hHmWyLdFjmuWLsjeKeZnq/HnNQ3QkoPcaEww45AY=
github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4 h1:SPdxCL9BChFTlyi0Khv64vdCW4TMna8+sxL7+Chx+Ag=
github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4/go.mod h1:nc0XxBzjeGcrMltCDw269LoWF9S8ibhgxolCdA1R8To=
github.com/grafana/grafana-plugin-sdk-go v0.79.0/go.mod h1:NvxLzGkVhnoBKwzkst6CFfpMFKwAdIUZ1q8ssuLeF60=

View File

@ -2,6 +2,7 @@ package live
import (
"context"
"net/url"
"github.com/grafana/grafana/pkg/models"
)
@ -22,3 +23,18 @@ func getContextSignedUser(ctx context.Context) (*models.SignedInUser, bool) {
}
return nil, false
}
type valuesContextKey struct{}
func setContextValues(ctx context.Context, values url.Values) context.Context {
ctx = context.WithValue(ctx, valuesContextKey{}, values)
return ctx
}
func getContextValues(ctx context.Context) (url.Values, bool) {
if val := ctx.Value(valuesContextKey{}); val != nil {
values, ok := val.(url.Values)
return values, ok
}
return nil, false
}

View File

@ -0,0 +1,46 @@
package convert
import (
"errors"
"fmt"
"github.com/grafana/grafana-live-sdk/telemetry"
"github.com/grafana/grafana-live-sdk/telemetry/telegraf"
)
type Converter struct {
telegrafConverterWide *telegraf.Converter
telegrafConverterLabelsColumn *telegraf.Converter
}
func NewConverter() *Converter {
return &Converter{
telegrafConverterWide: telegraf.NewConverter(
telegraf.WithFloat64Numbers(true),
),
telegrafConverterLabelsColumn: telegraf.NewConverter(
telegraf.WithUseLabelsColumn(true),
telegraf.WithFloat64Numbers(true),
),
}
}
var ErrUnsupportedFrameFormat = errors.New("unsupported frame format")
func (c *Converter) Convert(data []byte, frameFormat string) ([]telemetry.FrameWrapper, error) {
var converter telemetry.Converter
switch frameFormat {
case "wide":
converter = c.telegrafConverterWide
case "labels_column":
converter = c.telegrafConverterLabelsColumn
default:
return nil, ErrUnsupportedFrameFormat
}
metricFrames, err := converter.Convert(data)
if err != nil {
return nil, fmt.Errorf("error converting metrics: %w", err)
}
return metricFrames, nil
}

View File

@ -2,24 +2,26 @@ package live
import (
"context"
"errors"
"github.com/grafana/grafana-live-sdk/telemetry/telegraf"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/live/convert"
"github.com/grafana/grafana/pkg/services/live/pushurl"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/models"
)
type Demultiplexer struct {
streamID string
managedStreamRunner *ManagedStreamRunner
telegrafConverterWide *telegraf.Converter
streamID string
managedStreamRunner *ManagedStreamRunner
converter *convert.Converter
}
func NewDemultiplexer(streamID string, managedStreamRunner *ManagedStreamRunner) *Demultiplexer {
return &Demultiplexer{
streamID: streamID,
managedStreamRunner: managedStreamRunner,
telegrafConverterWide: telegraf.NewConverter(),
streamID: streamID,
managedStreamRunner: managedStreamRunner,
converter: convert.NewConverter(),
}
}
@ -31,19 +33,36 @@ func (s *Demultiplexer) OnSubscribe(_ context.Context, _ *models.SignedInUser, _
return models.SubscribeReply{}, backend.SubscribeStreamStatusPermissionDenied, nil
}
func (s *Demultiplexer) OnPublish(_ context.Context, _ *models.SignedInUser, evt models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) {
func (s *Demultiplexer) OnPublish(ctx context.Context, _ *models.SignedInUser, evt models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) {
urlValues, ok := getContextValues(ctx)
if !ok {
return models.PublishReply{}, 0, errors.New("error extracting context url values")
}
stream, err := s.managedStreamRunner.GetOrCreateStream(s.streamID)
if err != nil {
logger.Error("Error getting stream", "error", err, "streamId", s.streamID)
return models.PublishReply{}, 0, err
}
metricFrames, err := s.telegrafConverterWide.Convert(evt.Data)
frameFormat := pushurl.FrameFormatFromValues(urlValues)
stableSchema := pushurl.StableSchemaFromValues(urlValues)
logger.Debug("Live Push request",
"protocol", "ws",
"streamId", s.streamID,
"bodyLength", len(evt.Data),
"stableSchema", stableSchema,
"frameFormat", frameFormat,
)
metricFrames, err := s.converter.Convert(evt.Data, frameFormat)
if err != nil {
logger.Error("Error converting metrics", "error", err, "data", string(evt.Data))
logger.Error("Error converting metrics", "error", err, "data", string(evt.Data), "frameFormat", frameFormat)
return models.PublishReply{}, 0, err
}
for _, mf := range metricFrames {
err := stream.Push(mf.Key(), mf.Frame())
err := stream.Push(mf.Key(), mf.Frame(), stableSchema)
if err != nil {
return models.PublishReply{}, 0, err
}

View File

@ -8,13 +8,6 @@ import (
"sync"
"time"
"github.com/grafana/grafana/pkg/services/live/runstream"
"github.com/centrifugal/centrifuge"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana-plugin-sdk-go/live"
"github.com/grafana/grafana/pkg/api/dtos"
"github.com/grafana/grafana/pkg/api/response"
"github.com/grafana/grafana/pkg/api/routing"
@ -25,9 +18,15 @@ import (
"github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/live/features"
"github.com/grafana/grafana/pkg/services/live/runstream"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch"
"github.com/grafana/grafana/pkg/util"
"github.com/centrifugal/centrifuge"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana-plugin-sdk-go/live"
)
var (
@ -273,6 +272,7 @@ func (g *GrafanaLive) Init() error {
}
newCtx := centrifuge.SetCredentials(ctx.Req.Context(), cred)
newCtx = setContextSignedUser(newCtx, user)
newCtx = setContextValues(newCtx, ctx.Req.URL.Query())
r := ctx.Req.Request
r = r.WithContext(newCtx) // Set a user ID.

View File

@ -87,30 +87,41 @@ func (s *ManagedStream) ListChannels(prefix string) []util.DynMap {
}
// Push sends frame to the stream and saves it for later retrieval by subscribers.
func (s *ManagedStream) Push(path string, frame *data.Frame) error {
func (s *ManagedStream) Push(path string, frame *data.Frame, stableSchema bool) error {
// Keep schema + data for last packet.
frameJSON, err := data.FrameToJSON(frame, true, true)
if err != nil {
logger.Error("Error marshaling Frame to Schema", "error", err)
logger.Error("Error marshaling frame with Schema", "error", err)
return err
}
// Locks until we totally finish?
s.mu.Lock()
defer s.mu.Unlock()
if stableSchema {
// If schema is stable we can safely cache it, and only send values if
// stream already has schema cached.
s.mu.Lock()
_, exists := s.last[path]
s.last[path] = frameJSON
s.mu.Unlock()
_, exists := s.last[path]
s.last[path] = frameJSON
// When the packet already exits, only send the data.
if exists {
frameJSON, err = data.FrameToJSON(frame, false, true)
if err != nil {
logger.Error("Error marshaling Frame to JSON", "error", err)
return err
// When the packet already exits, only send the data.
// TODO: maybe a good idea would be MarshalJSON function of
// frame to keep Schema JSON and Values JSON in frame object
// to avoid encoding twice.
if exists {
frameJSON, err = data.FrameToJSON(frame, false, true)
if err != nil {
logger.Error("Error marshaling Frame to JSON", "error", err)
return err
}
}
} else {
// For unstable schema we always need to send everything to a connection.
// And we don't want to cache schema for unstable case. But we still need to
// set path to a map to make stream visible in UI stream select widget.
s.mu.Lock()
s.last[path] = nil
s.mu.Unlock()
}
// The channel this will be posted into.
channel := live.Channel{Scope: live.ScopeStream, Namespace: s.id, Path: path}.String()
logger.Debug("Publish data to channel", "channel", channel, "dataLength", len(frameJSON))
@ -122,7 +133,7 @@ func (s *ManagedStream) getLastPacket(path string) (json.RawMessage, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
schema, ok := s.last[path]
return schema, ok
return schema, ok && schema != nil
}
func (s *ManagedStream) GetHandlerForPath(_ string) (models.ChannelHandler, error) {
@ -145,7 +156,7 @@ func (s *ManagedStream) OnPublish(_ context.Context, _ *models.SignedInUser, evt
// Stream scope only deals with data frames.
return models.PublishReply{}, 0, err
}
err = s.Push(evt.Path, &frame)
err = s.Push(evt.Path, &frame, true)
if err != nil {
// Stream scope only deals with data frames.
return models.PublishReply{}, 0, err

View File

@ -20,7 +20,19 @@ func TestManagedStream_GetLastPacket(t *testing.T) {
c := NewManagedStream("a", noopPublisher)
_, ok := c.getLastPacket("test")
require.False(t, ok)
err := c.Push("test", data.NewFrame("hello"))
err := c.Push("test", data.NewFrame("hello"), false)
require.NoError(t, err)
_, ok = c.getLastPacket("test")
require.NoError(t, err)
require.False(t, ok)
}
func TestManagedStream_GetLastPacket_StableSchema(t *testing.T) {
c := NewManagedStream("a", noopPublisher)
_, ok := c.getLastPacket("test")
require.False(t, ok)
err := c.Push("test", data.NewFrame("hello"), true)
require.NoError(t, err)
s, ok := c.getLastPacket("test")

View File

@ -2,14 +2,15 @@ package push
import (
"context"
"errors"
"net/http"
"github.com/grafana/grafana-live-sdk/telemetry/telegraf"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/services/live"
"github.com/grafana/grafana/pkg/services/live/convert"
"github.com/grafana/grafana/pkg/services/live/pushurl"
"github.com/grafana/grafana/pkg/setting"
)
@ -26,8 +27,7 @@ type Gateway struct {
Cfg *setting.Cfg `inject:""`
GrafanaLive *live.GrafanaLive `inject:""`
telegrafConverterWide *telegraf.Converter
telegrafConverterLabelsColumn *telegraf.Converter
converter *convert.Converter
}
// Init Gateway.
@ -39,9 +39,7 @@ func (g *Gateway) Init() error {
return nil
}
// For now only Telegraf converter (influx format) is supported.
g.telegrafConverterWide = telegraf.NewConverter()
g.telegrafConverterLabelsColumn = telegraf.NewConverter(telegraf.WithUseLabelsColumn(true))
g.converter = convert.NewConverter()
return nil
}
@ -70,11 +68,10 @@ func (g *Gateway) Handle(ctx *models.ReqContext) {
return
}
// TODO Grafana 8: decide which format to use or keep both.
converter := g.telegrafConverterWide
if ctx.Req.URL.Query().Get("format") == "labels_column" {
converter = g.telegrafConverterLabelsColumn
}
// TODO Grafana 8: decide which formats to use or keep all.
urlValues := ctx.Req.URL.Query()
frameFormat := pushurl.FrameFormatFromValues(urlValues)
stableSchema := pushurl.StableSchemaFromValues(urlValues)
body, err := ctx.Req.Body().Bytes()
if err != nil {
@ -82,12 +79,22 @@ func (g *Gateway) Handle(ctx *models.ReqContext) {
ctx.Resp.WriteHeader(http.StatusInternalServerError)
return
}
logger.Debug("Live Push request body", "streamId", streamID, "bodyLength", len(body))
logger.Debug("Live Push request",
"protocol", "http",
"streamId", streamID,
"bodyLength", len(body),
"stableSchema", stableSchema,
"frameFormat", frameFormat,
)
metricFrames, err := converter.Convert(body)
metricFrames, err := g.converter.Convert(body, frameFormat)
if err != nil {
logger.Error("Error converting metrics", "error", err)
ctx.Resp.WriteHeader(http.StatusInternalServerError)
logger.Error("Error converting metrics", "error", err, "frameFormat", frameFormat)
if errors.Is(err, convert.ErrUnsupportedFrameFormat) {
ctx.Resp.WriteHeader(http.StatusBadRequest)
} else {
ctx.Resp.WriteHeader(http.StatusInternalServerError)
}
return
}
@ -95,7 +102,7 @@ func (g *Gateway) Handle(ctx *models.ReqContext) {
// interval = "1s" vs flush_interval = "5s"
for _, mf := range metricFrames {
err := stream.Push(mf.Key(), mf.Frame())
err := stream.Push(mf.Key(), mf.Frame(), stableSchema)
if err != nil {
ctx.Resp.WriteHeader(http.StatusInternalServerError)
return

View File

@ -0,0 +1,19 @@
package pushurl
import (
"net/url"
"strings"
)
func StableSchemaFromValues(values url.Values) bool {
key := "gf_live_stable_schema"
return strings.ToLower(values.Get(key)) == "true" || values.Get(key) == "1"
}
func FrameFormatFromValues(values url.Values) string {
frameFormat := strings.ToLower(values.Get("gf_live_frame_format"))
if frameFormat == "" {
frameFormat = "wide"
}
return frameFormat
}