From 54ad791c7e7ff9671e3d83bae4a4b9f8754bbd5d Mon Sep 17 00:00:00 2001 From: Alexander Emelin Date: Mon, 5 Apr 2021 19:04:46 +0300 Subject: [PATCH] Live: expose HTTP push endpoint that will read influx line protocol and publish to websocket (#32311) Co-authored-by: Ryan McKinley --- go.mod | 3 +- go.sum | 8 +- .../src/dataframe/DataFrameJSON.ts | 6 - packages/grafana-data/src/types/live.ts | 3 +- .../grafana-runtime/src/measurement/query.ts | 18 ++- .../src/components/GraphNG/utils.ts | 4 + .../src/components/InfoBox/InfoBox.tsx | 1 - pkg/api/api.go | 6 + pkg/api/http_server.go | 2 + pkg/services/live/channel.go | 3 +- pkg/services/live/features/broadcast.go | 6 + pkg/services/live/features/measurements.go | 36 ----- pkg/services/live/live.go | 59 ++++--- pkg/services/live/log.go | 27 ++++ pkg/services/live/managed.go | 142 ++++++++++++++++ pkg/services/live/managed_test.go | 30 ++++ pkg/services/live/plugin_helpers.go | 7 +- pkg/services/live/push/push.go | 104 ++++++++++++ pkg/services/live/scope.go | 2 + public/app/features/live/channel.ts | 17 +- public/app/features/live/live.ts | 2 + public/app/features/live/scopes.ts | 27 ++++ .../grafana/components/QueryEditor.tsx | 151 ++++++++++++++---- .../plugins/datasource/grafana/datasource.ts | 41 ++--- .../app/plugins/datasource/grafana/types.ts | 1 + 25 files changed, 576 insertions(+), 130 deletions(-) delete mode 100644 pkg/services/live/features/measurements.go create mode 100644 pkg/services/live/log.go create mode 100644 pkg/services/live/managed.go create mode 100644 pkg/services/live/managed_test.go create mode 100644 pkg/services/live/push/push.go diff --git a/go.mod b/go.mod index a693a1458ae..7a24ca7dffe 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/VividCortex/mysqlerr v0.0.0-20170204212430-6c6b55f8796f github.com/aws/aws-sdk-go v1.38.12 github.com/beevik/etree v1.1.0 - github.com/benbjohnson/clock v0.0.0-20161215174838-7dc76406b6d3 + github.com/benbjohnson/clock v1.0.3 github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b github.com/centrifugal/centrifuge v0.16.0 github.com/cortexproject/cortex v1.4.1-0.20201022071705-85942c5703cf @@ -42,6 +42,7 @@ require ( github.com/gosimple/slug v1.9.0 github.com/grafana/alerting-api v0.0.0-20210331135037-3294563b51bb github.com/grafana/grafana-aws-sdk v0.4.0 + github.com/grafana/grafana-live-sdk v0.0.4 github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4 github.com/grafana/grafana-plugin-sdk-go v0.91.0 github.com/grafana/loki v1.6.2-0.20201026154740-6978ee5d7387 diff --git a/go.sum b/go.sum index 22a5dd99058..330f4044de9 100644 --- a/go.sum +++ b/go.sum @@ -211,8 +211,9 @@ github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f/go.mod github.com/beevik/etree v1.1.0 h1:T0xke/WvNtMoCqgzPhkX2r4rjY3GDZFi+FjpRZY2Jbs= github.com/beevik/etree v1.1.0/go.mod h1:r8Aw8JqVegEf0w2fDnATrX9VpkMcyFeM0FhwO62wh+A= github.com/beevik/ntp v0.2.0/go.mod h1:hIHWr+l3+/clUnF44zdK+CWW7fO8dR5cIylAQ76NRpg= -github.com/benbjohnson/clock v0.0.0-20161215174838-7dc76406b6d3 h1:wOysYcIdqv3WnvwqFFzrYCFALPED7qkUGaLXu359GSc= github.com/benbjohnson/clock v0.0.0-20161215174838-7dc76406b6d3/go.mod h1:UMqtWQTnOe4byzwe7Zhwh8f8s+36uszN51sJrSIZlTE= +github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg= +github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -808,6 +809,8 @@ github.com/grafana/grafana v1.9.2-0.20210308201921-4ce0a49eac03/go.mod h1:AHRRvd github.com/grafana/grafana-aws-sdk v0.1.0/go.mod h1:+pPo5U+pX0zWimR7YBc7ASeSQfbRkcTyQYqMiAj7G5U= github.com/grafana/grafana-aws-sdk v0.4.0 h1:JmTaXfOJ/ydHSWH9kEt8Yhfb9kAhIW4LUOO3SWCviYg= github.com/grafana/grafana-aws-sdk v0.4.0/go.mod h1:+pPo5U+pX0zWimR7YBc7ASeSQfbRkcTyQYqMiAj7G5U= +github.com/grafana/grafana-live-sdk v0.0.4 h1:mATki7fEkKtX4jD+HfOKst9CgFcVyF/pr3Co+gy+Ato= +github.com/grafana/grafana-live-sdk v0.0.4/go.mod h1:f15hHmWyLdFjmuWLsjeKeZnq/HnNQ3QkoPcaEww45AY= github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4 h1:SPdxCL9BChFTlyi0Khv64vdCW4TMna8+sxL7+Chx+Ag= github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4/go.mod h1:nc0XxBzjeGcrMltCDw269LoWF9S8ibhgxolCdA1R8To= github.com/grafana/grafana-plugin-sdk-go v0.79.0/go.mod h1:NvxLzGkVhnoBKwzkst6CFfpMFKwAdIUZ1q8ssuLeF60= @@ -949,8 +952,9 @@ github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod github.com/influxdata/influxql v1.1.0/go.mod h1:KpVI7okXjK6PRi3Z5B+mtKZli+R1DnZgb3N+tzevNgo= github.com/influxdata/influxql v1.1.1-0.20200828144457-65d3ef77d385/go.mod h1:gHp9y86a/pxhjJ+zMjNXiQAA197Xk9wLxaz+fGG+kWk= github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e/go.mod h1:4kt73NQhadE3daL3WhR5EJ/J2ocX0PZzwxQ0gXJ7oFE= -github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU= github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= +github.com/influxdata/line-protocol v0.0.0-20210311194329-9aa0e372d097 h1:vilfsDSy7TDxedi9gyBkMvAirat/oRcL0lFdJBf6tdM= +github.com/influxdata/line-protocol v0.0.0-20210311194329-9aa0e372d097/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= github.com/influxdata/promql/v2 v2.12.0/go.mod h1:fxOPu+DY0bqCTCECchSRtWfc+0X19ybifQhZoQNF5D8= github.com/influxdata/roaring v0.4.13-0.20180809181101-fc520f41fab6/go.mod h1:bSgUQ7q5ZLSO+bKBGqJiCBGAl+9DxyW63zLTujjUlOE= github.com/influxdata/tdigest v0.0.0-20181121200506-bf2b5ad3c0a9/go.mod h1:Js0mqiSBE6Ffsg94weZZ2c+v/ciT8QRHFOap7EKDrR0= diff --git a/packages/grafana-data/src/dataframe/DataFrameJSON.ts b/packages/grafana-data/src/dataframe/DataFrameJSON.ts index 56289f7e95e..c9a530e22ae 100644 --- a/packages/grafana-data/src/dataframe/DataFrameJSON.ts +++ b/packages/grafana-data/src/dataframe/DataFrameJSON.ts @@ -8,12 +8,6 @@ import { guessFieldTypeFromNameAndValue } from './processDataFrame'; * @alpha */ export interface DataFrameJSON { - /**HACK: this will get removed, but will help transition telegraf streaming - * - * In telegraf, this will be: ${name}${labels} - */ - key?: string; - /** * The schema defines the field type and configuration. */ diff --git a/packages/grafana-data/src/types/live.ts b/packages/grafana-data/src/types/live.ts index 0bdc3edb7dd..f5cd393327f 100644 --- a/packages/grafana-data/src/types/live.ts +++ b/packages/grafana-data/src/types/live.ts @@ -13,6 +13,7 @@ export enum LiveChannelScope { DataSource = 'ds', // namespace = data source ID Plugin = 'plugin', // namespace = plugin name (singleton works for apps too) Grafana = 'grafana', // namespace = feature + Stream = 'stream', // namespace = id for the managed data stream } /** @@ -155,7 +156,7 @@ export interface LiveChannelAddress { * * @alpha -- experimental */ -export function parseLiveChannelAddress(id: string): LiveChannelAddress | undefined { +export function parseLiveChannelAddress(id?: string): LiveChannelAddress | undefined { if (id?.length) { let parts = id.trim().split('/'); if (parts.length >= 3) { diff --git a/packages/grafana-runtime/src/measurement/query.ts b/packages/grafana-runtime/src/measurement/query.ts index d999c64308f..30847540ec5 100644 --- a/packages/grafana-runtime/src/measurement/query.ts +++ b/packages/grafana-runtime/src/measurement/query.ts @@ -48,6 +48,7 @@ export function getLiveDataStream(options: LiveDataStreamOptions): Observable((subscriber) => { let data: StreamingDataFrame | undefined = undefined; + let filtered: DataFrame | undefined = undefined; let state = LoadingState.Loading; const { key, filter } = options; let last = perf.last; @@ -60,17 +61,20 @@ export function getLiveDataStream(options: LiveDataStreamOptions): Observable filter.fields!.includes(f.name)), - }; + // Select the fields we are actually looking at + if (!filtered || msg.schema) { + filtered = data; + if (filter?.fields?.length) { + filtered = { + ...data, + fields: data.fields.filter((f) => filter.fields!.includes(f.name)), + }; + } } const elapsed = perf.last - last; if (elapsed > 1000 || perf.ok) { + filtered.length = data.length; // make sure they stay up-to-date subscriber.next({ state, data: [filtered], key }); last = perf.last; } diff --git a/packages/grafana-ui/src/components/GraphNG/utils.ts b/packages/grafana-ui/src/components/GraphNG/utils.ts index 3f60f87cfe7..6076d95a309 100644 --- a/packages/grafana-ui/src/components/GraphNG/utils.ts +++ b/packages/grafana-ui/src/components/GraphNG/utils.ts @@ -90,6 +90,10 @@ export function preparePlotConfigBuilder( // X is the first field in the aligned frame const xField = frame.fields[0]; + if (!xField) { + return builder; // empty frame with no options + } + let seriesIndex = 0; if (xField.type === FieldType.time) { diff --git a/packages/grafana-ui/src/components/InfoBox/InfoBox.tsx b/packages/grafana-ui/src/components/InfoBox/InfoBox.tsx index a165b6956de..118bff60d00 100644 --- a/packages/grafana-ui/src/components/InfoBox/InfoBox.tsx +++ b/packages/grafana-ui/src/components/InfoBox/InfoBox.tsx @@ -68,7 +68,6 @@ const getInfoBoxStyles = stylesFactory((theme: GrafanaTheme, severity: AlertVari color: ${theme.colors.textSemiWeak}; code { - @include font-family-monospace(); font-size: ${theme.typography.size.sm}; background-color: ${theme.colors.bg1}; color: ${theme.colors.text}; diff --git a/pkg/api/api.go b/pkg/api/api.go index 48a20b9ee69..b3dbc73b702 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -402,6 +402,12 @@ func (hs *HTTPServer) registerRoutes() { if hs.Live.IsEnabled() { apiRoute.Post("/live/publish", bind(dtos.LivePublishCmd{}), routing.Wrap(hs.Live.HandleHTTPPublish)) + + // POST influx line protocol + apiRoute.Post("/live/push/:streamId", hs.LivePushGateway.Handle) + + // List available streams and fields + apiRoute.Get("/live/list", routing.Wrap(hs.Live.HandleListHTTP)) } // short urls diff --git a/pkg/api/http_server.go b/pkg/api/http_server.go index 0af211d19f9..f28b7d0f8a6 100644 --- a/pkg/api/http_server.go +++ b/pkg/api/http_server.go @@ -35,6 +35,7 @@ import ( "github.com/grafana/grafana/pkg/services/hooks" "github.com/grafana/grafana/pkg/services/librarypanels" "github.com/grafana/grafana/pkg/services/live" + "github.com/grafana/grafana/pkg/services/live/push" "github.com/grafana/grafana/pkg/services/login" "github.com/grafana/grafana/pkg/services/provisioning" "github.com/grafana/grafana/pkg/services/quota" @@ -87,6 +88,7 @@ type HTTPServer struct { SearchService *search.SearchService `inject:""` ShortURLService *shorturls.ShortURLService `inject:""` Live *live.GrafanaLive `inject:""` + LivePushGateway *push.Gateway `inject:""` ContextHandler *contexthandler.ContextHandler `inject:""` SQLStore *sqlstore.SQLStore `inject:""` LibraryPanelService *librarypanels.LibraryPanelService `inject:""` diff --git a/pkg/services/live/channel.go b/pkg/services/live/channel.go index 962411104f5..91e99104037 100644 --- a/pkg/services/live/channel.go +++ b/pkg/services/live/channel.go @@ -7,13 +7,14 @@ import ( // ChannelAddress is the channel ID split by parts. type ChannelAddress struct { // Scope is one of available channel scopes: - // like ScopeGrafana, ScopePlugin, ScopeDatasource. + // like ScopeGrafana, ScopePlugin, ScopeDatasource, ScopeStream. Scope string `json:"scope,omitempty"` // Namespace meaning depends on the scope. // * when ScopeGrafana, namespace is a "feature" // * when ScopePlugin, namespace is the plugin name // * when ScopeDatasource, namespace is the datasource uid + // * when ScopeStream, namespace is the stream ID. Namespace string `json:"namespace,omitempty"` // Within each namespace, the handler can process the path as needed. diff --git a/pkg/services/live/features/broadcast.go b/pkg/services/live/features/broadcast.go index a767cc7d35e..5430f189ac6 100644 --- a/pkg/services/live/features/broadcast.go +++ b/pkg/services/live/features/broadcast.go @@ -4,11 +4,17 @@ import ( "context" "time" + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana/pkg/models" ) +var ( + logger = log.New("live.features") // scoped to all features? +) + // BroadcastRunner will simply broadcast all events to `grafana/broadcast/*` channels // This assumes that data is a JSON object type BroadcastRunner struct{} diff --git a/pkg/services/live/features/measurements.go b/pkg/services/live/features/measurements.go deleted file mode 100644 index 6c7960f9bda..00000000000 --- a/pkg/services/live/features/measurements.go +++ /dev/null @@ -1,36 +0,0 @@ -package features - -import ( - "context" - - "github.com/grafana/grafana-plugin-sdk-go/backend" - - "github.com/grafana/grafana/pkg/infra/log" - "github.com/grafana/grafana/pkg/models" -) - -var ( - logger = log.New("live.features") // scoped to all features? -) - -// MeasurementsRunner will simply broadcast all events to `grafana/broadcast/*` channels. -// This makes no assumptions about the shape of the data and will broadcast it to anyone listening -type MeasurementsRunner struct { -} - -// GetHandlerForPath gets the handler for a path. -// It's called on init. -func (m *MeasurementsRunner) GetHandlerForPath(path string) (models.ChannelHandler, error) { - return m, nil // for now all channels share config -} - -// OnSubscribe will let anyone connect to the path -func (m *MeasurementsRunner) OnSubscribe(ctx context.Context, _ *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, backend.SubscribeStreamStatus, error) { - return models.SubscribeReply{}, backend.SubscribeStreamStatusOK, nil -} - -// OnPublish is called when a client wants to broadcast on the websocket -// Currently this sends measurements over websocket -- should be replaced with the HTTP interface -func (m *MeasurementsRunner) OnPublish(ctx context.Context, _ *models.SignedInUser, e models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) { - return models.PublishReply{}, backend.PublishStreamStatusOK, nil -} diff --git a/pkg/services/live/live.go b/pkg/services/live/live.go index 320e1f393b3..84f8d6f64ab 100644 --- a/pkg/services/live/live.go +++ b/pkg/services/live/live.go @@ -9,6 +9,7 @@ import ( "github.com/centrifugal/centrifuge" "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana/pkg/api/dtos" "github.com/grafana/grafana/pkg/api/response" @@ -22,6 +23,7 @@ import ( "github.com/grafana/grafana/pkg/services/live/features" "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/tsdb/cloudwatch" + "github.com/grafana/grafana/pkg/util" ) var ( @@ -68,6 +70,8 @@ type GrafanaLive struct { // The core internal features GrafanaScope CoreGrafanaScope + ManagedStreamRunner *ManagedStreamRunner + contextGetter *pluginContextGetter streamManager *features.StreamManager } @@ -131,7 +135,8 @@ func (g *GrafanaLive) Init() error { g.GrafanaScope.Dashboards = dash g.GrafanaScope.Features["dashboard"] = dash g.GrafanaScope.Features["broadcast"] = &features.BroadcastRunner{} - g.GrafanaScope.Features["measurements"] = &features.MeasurementsRunner{} + + g.ManagedStreamRunner = NewManagedStreamRunner(g.Publish) // Set ConnectHandler called when client successfully connected to Node. Your code // inside handler must be synchronized since it will be called concurrently from @@ -349,6 +354,8 @@ func (g *GrafanaLive) GetChannelHandlerFactory(user *models.SignedInUser, scope return g.handlePluginScope(user, namespace) case ScopeDatasource: return g.handleDatasourceScope(user, namespace) + case ScopeStream: + return g.handleStreamScope(user, namespace) default: return nil, fmt.Errorf("invalid scope: %q", scope) } @@ -382,6 +389,10 @@ func (g *GrafanaLive) handlePluginScope(_ *models.SignedInUser, namespace string ), nil } +func (g *GrafanaLive) handleStreamScope(_ *models.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) { + return g.ManagedStreamRunner.GetOrCreateStream(namespace) +} + func (g *GrafanaLive) handleDatasourceScope(user *models.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) { ds, err := g.DatasourceCache.GetDatasourceByUID(namespace, user, false) if err != nil { @@ -445,26 +456,32 @@ func (g *GrafanaLive) HandleHTTPPublish(ctx *models.ReqContext, cmd dtos.LivePub return response.JSON(http.StatusOK, dtos.LivePublishResponse{}) } -// Write to the standard log15 logger -func handleLog(msg centrifuge.LogEntry) { - arr := make([]interface{}, 0) - for k, v := range msg.Fields { - if v == nil { - v = "" - } else if v == "" { - v = "" - } - arr = append(arr, k, v) +// HandleListHTTP returns metadata so the UI can build a nice form +func (g *GrafanaLive) HandleListHTTP(_ *models.ReqContext) response.Response { + info := util.DynMap{} + channels := make([]util.DynMap, 0) + for k, v := range g.ManagedStreamRunner.Streams() { + channels = append(channels, v.ListChannels("stream/"+k+"/")...) } - switch msg.Level { - case centrifuge.LogLevelDebug: - loggerCF.Debug(msg.Message, arr...) - case centrifuge.LogLevelError: - loggerCF.Error(msg.Message, arr...) - case centrifuge.LogLevelInfo: - loggerCF.Info(msg.Message, arr...) - default: - loggerCF.Debug(msg.Message, arr...) - } + // Hardcode sample streams + frame := data.NewFrame("testdata", + data.NewField("Time", nil, make([]time.Time, 0)), + data.NewField("Value", nil, make([]float64, 0)), + data.NewField("Min", nil, make([]float64, 0)), + data.NewField("Max", nil, make([]float64, 0)), + ) + channels = append(channels, util.DynMap{ + "channel": "plugin/testdata/random-2s-stream", + "data": frame, + }, util.DynMap{ + "channel": "plugin/testdata/random-flakey-stream", + "data": frame, + }, util.DynMap{ + "channel": "plugin/testdata/random-20Hz-stream", + "data": frame, + }) + + info["channels"] = channels + return response.JSONStreaming(200, info) } diff --git a/pkg/services/live/log.go b/pkg/services/live/log.go new file mode 100644 index 00000000000..8c97094a4b4 --- /dev/null +++ b/pkg/services/live/log.go @@ -0,0 +1,27 @@ +package live + +import "github.com/centrifugal/centrifuge" + +// Write to the standard log15 logger +func handleLog(msg centrifuge.LogEntry) { + arr := make([]interface{}, 0) + for k, v := range msg.Fields { + if v == nil { + v = "" + } else if v == "" { + v = "" + } + arr = append(arr, k, v) + } + + switch msg.Level { + case centrifuge.LogLevelDebug: + loggerCF.Debug(msg.Message, arr...) + case centrifuge.LogLevelError: + loggerCF.Error(msg.Message, arr...) + case centrifuge.LogLevelInfo: + loggerCF.Info(msg.Message, arr...) + default: + loggerCF.Debug(msg.Message, arr...) + } +} diff --git a/pkg/services/live/managed.go b/pkg/services/live/managed.go new file mode 100644 index 00000000000..41a8a21eeee --- /dev/null +++ b/pkg/services/live/managed.go @@ -0,0 +1,142 @@ +package live + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/util" +) + +type ManagedStreamRunner struct { + mu sync.RWMutex + streams map[string]*ManagedStream + publisher models.ChannelPublisher +} + +// NewPluginRunner creates new PluginRunner. +func NewManagedStreamRunner(publisher models.ChannelPublisher) *ManagedStreamRunner { + return &ManagedStreamRunner{ + publisher: publisher, + streams: map[string]*ManagedStream{}, + } +} + +// Streams returns map of active managed streams. +func (r *ManagedStreamRunner) Streams() map[string]*ManagedStream { + r.mu.RLock() + defer r.mu.RUnlock() + streams := make(map[string]*ManagedStream, len(r.streams)) + for k, v := range r.streams { + streams[k] = v + } + return streams +} + +// GetOrCreateStream -- for now this will create new manager for each key. +// Eventually, the stream behavior will need to be configured explicitly +func (r *ManagedStreamRunner) GetOrCreateStream(streamID string) (*ManagedStream, error) { + r.mu.Lock() + defer r.mu.Unlock() + s, ok := r.streams[streamID] + if !ok { + s = NewManagedStream(streamID, r.publisher) + r.streams[streamID] = s + } + return s, nil +} + +// ManagedStream holds the state of a managed stream +type ManagedStream struct { + mu sync.RWMutex + id string + start time.Time + last map[string]json.RawMessage + publisher models.ChannelPublisher +} + +// NewCache creates new Cache. +func NewManagedStream(id string, publisher models.ChannelPublisher) *ManagedStream { + return &ManagedStream{ + id: id, + start: time.Now(), + last: map[string]json.RawMessage{}, + publisher: publisher, + } +} + +// ListChannels returns info for the UI about this stream. +func (s *ManagedStream) ListChannels(prefix string) []util.DynMap { + s.mu.RLock() + defer s.mu.RUnlock() + + info := make([]util.DynMap, 0, len(s.last)) + for k, v := range s.last { + ch := util.DynMap{} + ch["channel"] = prefix + k + ch["data"] = v + info = append(info, ch) + } + return info +} + +// Push sends data to the stream and optionally processes it. +func (s *ManagedStream) Push(path string, frame *data.Frame) error { + // Keep schema + data for last packet. + frameJSON, err := data.FrameToJSON(frame, true, true) + if err != nil { + logger.Error("Error marshaling Frame to Schema", "error", err) + return err + } + + // Locks until we totally finish? + s.mu.Lock() + defer s.mu.Unlock() + + _, exists := s.last[path] + s.last[path] = frameJSON + + // When the packet already exits, only send the data. + if exists { + frameJSON, err = data.FrameToJSON(frame, false, true) + if err != nil { + logger.Error("Error marshaling Frame to JSON", "error", err) + return err + } + } + + // The channel this will be posted into. + channel := fmt.Sprintf("stream/%s/%s", s.id, path) + logger.Debug("Publish data to channel", "channel", channel, "dataLength", len(frameJSON)) + return s.publisher(channel, frameJSON) +} + +// getLastPacket retrieves schema for a channel. +func (s *ManagedStream) getLastPacket(path string) (json.RawMessage, bool) { + s.mu.RLock() + defer s.mu.RUnlock() + schema, ok := s.last[path] + return schema, ok +} + +func (s *ManagedStream) GetHandlerForPath(_ string) (models.ChannelHandler, error) { + return s, nil +} + +func (s *ManagedStream) OnSubscribe(_ context.Context, _ *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, backend.SubscribeStreamStatus, error) { + reply := models.SubscribeReply{} + packet, ok := s.getLastPacket(e.Path) + if ok { + reply.Data = packet + } + return reply, backend.SubscribeStreamStatusOK, nil +} + +func (s *ManagedStream) OnPublish(_ context.Context, _ *models.SignedInUser, _ models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) { + return models.PublishReply{}, backend.PublishStreamStatusPermissionDenied, nil +} diff --git a/pkg/services/live/managed_test.go b/pkg/services/live/managed_test.go new file mode 100644 index 00000000000..ada62cadd9c --- /dev/null +++ b/pkg/services/live/managed_test.go @@ -0,0 +1,30 @@ +package live + +import ( + "testing" + + "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/stretchr/testify/require" +) + +var noopPublisher = func(p string, b []byte) error { + return nil +} + +func TestNewManagedStream(t *testing.T) { + c := NewManagedStream("a", noopPublisher) + require.NotNil(t, c) +} + +func TestManagedStream_GetLastPacket(t *testing.T) { + c := NewManagedStream("a", noopPublisher) + _, ok := c.getLastPacket("test") + require.False(t, ok) + err := c.Push("test", data.NewFrame("hello")) + require.NoError(t, err) + + s, ok := c.getLastPacket("test") + require.NoError(t, err) + require.True(t, ok) + require.Equal(t, `{"schema":{"name":"hello","fields":[]},"data":{"values":[]}}`, string(s)) +} diff --git a/pkg/services/live/plugin_helpers.go b/pkg/services/live/plugin_helpers.go index d1577e84735..69a23b79bb3 100644 --- a/pkg/services/live/plugin_helpers.go +++ b/pkg/services/live/plugin_helpers.go @@ -1,6 +1,8 @@ package live import ( + "fmt" + "github.com/grafana/grafana/pkg/models" "github.com/centrifugal/centrifuge" @@ -18,7 +20,10 @@ func newPluginPacketSender(node *centrifuge.Node) *pluginPacketSender { func (p *pluginPacketSender) Send(channel string, packet *backend.StreamPacket) error { _, err := p.node.Publish(channel, packet.Data) - return err + if err != nil { + return fmt.Errorf("error publishing %s: %w", string(packet.Data), err) + } + return nil } type pluginPresenceGetter struct { diff --git a/pkg/services/live/push/push.go b/pkg/services/live/push/push.go new file mode 100644 index 00000000000..e40758a3640 --- /dev/null +++ b/pkg/services/live/push/push.go @@ -0,0 +1,104 @@ +package push + +import ( + "context" + "net/http" + + "github.com/grafana/grafana-live-sdk/telemetry/telegraf" + + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/registry" + "github.com/grafana/grafana/pkg/services/live" + "github.com/grafana/grafana/pkg/setting" +) + +var ( + logger = log.New("live_push") +) + +func init() { + registry.RegisterServiceWithPriority(&Gateway{}, registry.Low) +} + +// Gateway receives data and translates it to Grafana Live publications. +type Gateway struct { + Cfg *setting.Cfg `inject:""` + GrafanaLive *live.GrafanaLive `inject:""` + + telegrafConverterWide *telegraf.Converter + telegrafConverterLabelsColumn *telegraf.Converter +} + +// Init Gateway. +func (g *Gateway) Init() error { + logger.Info("Telemetry Gateway initialization") + + if !g.IsEnabled() { + logger.Debug("Telemetry Gateway not enabled, skipping initialization") + return nil + } + + // For now only Telegraf converter (influx format) is supported. + g.telegrafConverterWide = telegraf.NewConverter() + g.telegrafConverterLabelsColumn = telegraf.NewConverter(telegraf.WithUseLabelsColumn(true)) + return nil +} + +// Run Gateway. +func (g *Gateway) Run(ctx context.Context) error { + if !g.IsEnabled() { + logger.Debug("GrafanaLive feature not enabled, skipping initialization of Telemetry Gateway") + return nil + } + <-ctx.Done() + return ctx.Err() +} + +// IsEnabled returns true if the Grafana Live feature is enabled. +func (g *Gateway) IsEnabled() bool { + return g.Cfg.IsLiveEnabled() // turn on when Live on for now. +} + +func (g *Gateway) Handle(ctx *models.ReqContext) { + streamID := ctx.Params(":streamId") + + stream, err := g.GrafanaLive.ManagedStreamRunner.GetOrCreateStream(streamID) + if err != nil { + logger.Error("Error getting stream", "error", err) + ctx.Resp.WriteHeader(http.StatusInternalServerError) + return + } + + // TODO Grafana 8: decide which format to use or keep both. + converter := g.telegrafConverterWide + if ctx.Req.URL.Query().Get("format") == "labels_column" { + converter = g.telegrafConverterLabelsColumn + } + + body, err := ctx.Req.Body().Bytes() + if err != nil { + logger.Error("Error reading body", "error", err) + ctx.Resp.WriteHeader(http.StatusInternalServerError) + return + } + logger.Debug("Live Push request body", "streamId", streamID, "bodyLength", len(body)) + + metricFrames, err := converter.Convert(body) + if err != nil { + logger.Error("Error converting metrics", "error", err) + ctx.Resp.WriteHeader(http.StatusInternalServerError) + return + } + + // TODO -- make sure all packets are combined together! + // interval = "1s" vs flush_interval = "5s" + + for _, mf := range metricFrames { + err := stream.Push(mf.Key(), mf.Frame()) + if err != nil { + ctx.Resp.WriteHeader(http.StatusInternalServerError) + return + } + } +} diff --git a/pkg/services/live/scope.go b/pkg/services/live/scope.go index e56577fa58c..d167e16470a 100644 --- a/pkg/services/live/scope.go +++ b/pkg/services/live/scope.go @@ -7,4 +7,6 @@ const ( ScopePlugin = "plugin" // ScopeDatasource passes control to a datasource plugin. ScopeDatasource = "ds" + // ScopeStream is a managed data frame stream + ScopeStream = "stream" ) diff --git a/public/app/features/live/channel.ts b/public/app/features/live/channel.ts index d3df7ad2306..83171adba69 100644 --- a/public/app/features/live/channel.ts +++ b/public/app/features/live/channel.ts @@ -17,7 +17,7 @@ import Centrifuge, { UnsubscribeContext, } from 'centrifuge/dist/centrifuge'; -import { Subject, of, merge } from 'rxjs'; +import { Subject, of, Observable } from 'rxjs'; /** * Internal class that maps Centrifuge support to GrafanaLive @@ -127,7 +127,20 @@ export class CentrifugeLiveChannel implements Li * Get the stream of events and */ getStream() { - return merge(of({ ...this.currentStatus }), this.stream.asObservable()); + return new Observable((subscriber) => { + subscriber.next({ ...this.currentStatus }); + const sub = this.stream.subscribe(subscriber); + return () => { + sub.unsubscribe(); + const count = this.stream.observers.length; + console.log('unsubscribe stream', this.addr, count); + + // Fully disconnect when no more listeners + if (count === 0) { + this.disconnect(); + } + }; + }) as Observable>; } /** diff --git a/public/app/features/live/live.ts b/public/app/features/live/live.ts index 85e5ec0d986..e7897d85464 100644 --- a/public/app/features/live/live.ts +++ b/public/app/features/live/live.ts @@ -8,6 +8,7 @@ import { grafanaLiveCoreFeatures, GrafanaLiveDataSourceScope, GrafanaLivePluginScope, + GrafanaLiveStreamScope, } from './scopes'; import { registerLiveFeatures } from './features'; @@ -52,6 +53,7 @@ export class CentrifugeSrv implements GrafanaLiveSrv { [LiveChannelScope.Grafana]: grafanaLiveCoreFeatures, [LiveChannelScope.DataSource]: new GrafanaLiveDataSourceScope(), [LiveChannelScope.Plugin]: new GrafanaLivePluginScope(), + [LiveChannelScope.Stream]: new GrafanaLiveStreamScope(), }; // Register global listeners diff --git a/public/app/features/live/scopes.ts b/public/app/features/live/scopes.ts index 658ef3973a1..ebda62d7305 100644 --- a/public/app/features/live/scopes.ts +++ b/public/app/features/live/scopes.ts @@ -2,6 +2,7 @@ import { LiveChannelScope, LiveChannelSupport, SelectableValue } from '@grafana/ import { getDataSourceSrv } from '@grafana/runtime'; import { config } from 'app/core/config'; import { loadPlugin } from '../plugins/PluginPage'; +import { LiveMeasurementsSupport } from './measurements/measurementsSupport'; export abstract class GrafanaLiveScope { constructor(protected scope: LiveChannelScope) {} @@ -152,3 +153,29 @@ export class GrafanaLivePluginScope extends GrafanaLiveScope { return (this.names = names); } } + +export class GrafanaLiveStreamScope extends GrafanaLiveScope { + names?: Array>; + + constructor() { + super(LiveChannelScope.Stream); + } + + async getChannelSupport(namespace: string) { + return new LiveMeasurementsSupport(); + } + + /** + * List the possible values within this scope + */ + async listNamespaces() { + if (this.names) { + return Promise.resolve(this.names); + } + const names: Array> = []; + + // TODO!!! + + return (this.names = names); + } +} diff --git a/public/app/plugins/datasource/grafana/components/QueryEditor.tsx b/public/app/plugins/datasource/grafana/components/QueryEditor.tsx index 41efbff9291..eb46efce0ac 100644 --- a/public/app/plugins/datasource/grafana/components/QueryEditor.tsx +++ b/public/app/plugins/datasource/grafana/components/QueryEditor.tsx @@ -1,16 +1,24 @@ import defaults from 'lodash/defaults'; import React, { PureComponent } from 'react'; -import { InlineField, Select, FeatureInfoBox } from '@grafana/ui'; -import { QueryEditorProps, SelectableValue, FeatureState, getFrameDisplayName } from '@grafana/data'; +import { InlineField, Select, FeatureInfoBox, Input } from '@grafana/ui'; +import { QueryEditorProps, SelectableValue, FeatureState, dataFrameFromJSON, rangeUtil } from '@grafana/data'; import { GrafanaDatasource } from '../datasource'; import { defaultQuery, GrafanaQuery, GrafanaQueryType } from '../types'; +import { getBackendSrv } from '@grafana/runtime'; type Props = QueryEditorProps; const labelWidth = 12; -export class QueryEditor extends PureComponent { +interface State { + channels: Array>; + channelFields: Record>>; +} + +export class QueryEditor extends PureComponent { + state: State = { channels: [], channelFields: {} }; + queryTypes: Array> = [ { label: 'Random Walk', @@ -24,6 +32,43 @@ export class QueryEditor extends PureComponent { }, ]; + loadChannelInfo() { + getBackendSrv() + .fetch({ url: 'api/live/list' }) + .subscribe({ + next: (v: any) => { + console.log('GOT', v); + const channelInfo = v.data?.channels as any[]; + if (channelInfo?.length) { + const channelFields: Record>> = {}; + const channels: Array> = channelInfo.map((c) => { + if (c.data) { + const distinctFields = new Set(); + const frame = dataFrameFromJSON(c.data); + for (const f of frame.fields) { + distinctFields.add(f.name); + } + channelFields[c.channel] = Array.from(distinctFields).map((n) => ({ + value: n, + label: n, + })); + } + return { + value: c.channel, + label: c.channel, + }; + }); + + this.setState({ channelFields, channels }); + } + }, + }); + } + + componentDidMount() { + this.loadChannelInfo(); + } + onQueryTypeChange = (sel: SelectableValue) => { const { onChange, query, onRunQuery } = this.props; onChange({ ...query, queryType: sel.value! }); @@ -45,6 +90,15 @@ export class QueryEditor extends PureComponent { fields = [item.value]; } + // When adding the first field, also add time (if it exists) + if (fields.length === 1 && !query.filter?.fields?.length && query.channel) { + const names = this.state.channelFields[query.channel] ?? []; + const tf = names.find((f) => f.value === 'time' || f.value === 'Time'); + if (tf && tf.value && tf.value !== fields[0]) { + fields = [tf.value, ...fields]; + } + } + onChange({ ...query, filter: { @@ -55,19 +109,37 @@ export class QueryEditor extends PureComponent { onRunQuery(); }; + checkAndUpdateBuffer = (txt: string) => { + const { onChange, query, onRunQuery } = this.props; + let buffer: number | undefined; + if (txt) { + try { + buffer = rangeUtil.intervalToSeconds(txt) * 1000; + } catch (err) { + console.warn('ERROR', err); + } + } + onChange({ + ...query, + buffer, + }); + onRunQuery(); + }; + + handleEnterKey = (e: React.KeyboardEvent) => { + if (e.key !== 'Enter') { + return; + } + this.checkAndUpdateBuffer((e.target as any).value); + }; + + handleBlur = (e: React.FocusEvent) => { + this.checkAndUpdateBuffer(e.target.value); + }; + renderMeasurementsQuery() { - const { data } = this.props; - let { channel, filter } = this.props.query; - const channels: Array> = [ - { - value: 'plugin/testdata/random-2s-stream', - label: 'plugin/testdata/random-2s-stream', - }, - { - value: 'plugin/testdata/random-flakey-stream', - label: 'plugin/testdata/random-flakey-stream', - }, - ]; + let { channel, filter, buffer } = this.props.query; + let { channels, channelFields } = this.state; let currentChannel = channels.find((c) => c.value === channel); if (channel && !currentChannel) { currentChannel = { @@ -75,26 +147,26 @@ export class QueryEditor extends PureComponent { label: channel, description: `Connected to ${channel}`, }; - channels.push(currentChannel); + channels = [currentChannel, ...channels]; } const distinctFields = new Set(); - const fields: Array> = []; - if (data && data.series?.length) { - for (const frame of data.series) { - for (const field of frame.fields) { - if (distinctFields.has(field.name) || !field.name) { - continue; - } - fields.push({ - value: field.name, - label: field.name, - description: `(${getFrameDisplayName(frame)} / ${field.type})`, - }); - distinctFields.add(field.name); - } - } - } + const fields: Array> = channel ? channelFields[channel] ?? [] : []; + // if (data && data.series?.length) { + // for (const frame of data.series) { + // for (const field of frame.fields) { + // if (distinctFields.has(field.name) || !field.name) { + // continue; + // } + // fields.push({ + // value: field.name, + // label: field.name, + // description: `(${getFrameDisplayName(frame)} / ${field.type})`, + // }); + // distinctFields.add(field.name); + // } + // } + // } if (filter?.fields) { for (const f of filter.fields) { if (!distinctFields.has(f)) { @@ -108,6 +180,11 @@ export class QueryEditor extends PureComponent { } } + let formattedTime = ''; + if (buffer) { + formattedTime = rangeUtil.secondsToHms(buffer / 1000); + } + return ( <>
@@ -142,6 +219,16 @@ export class QueryEditor extends PureComponent { isMulti={true} /> + + +
)} diff --git a/public/app/plugins/datasource/grafana/datasource.ts b/public/app/plugins/datasource/grafana/datasource.ts index a9ecd044bc2..4cb75598f8b 100644 --- a/public/app/plugins/datasource/grafana/datasource.ts +++ b/public/app/plugins/datasource/grafana/datasource.ts @@ -6,6 +6,7 @@ import { DataQueryResponse, DataSourceApi, DataSourceInstanceSettings, + isValidLiveChannelAddress, parseLiveChannelAddress, StreamingFrameOptions, } from '@grafana/data'; @@ -23,15 +24,6 @@ export class GrafanaDatasource extends DataSourceApi { } query(request: DataQueryRequest): Observable { - const buffer: StreamingFrameOptions = { - maxLength: request.maxDataPoints ?? 500, - }; - - if (request.rangeRaw?.to === 'now') { - const elapsed = request.range.to.valueOf() - request.range.from.valueOf(); - buffer.maxDelta = elapsed; - } - const queries: Array> = []; for (const target of request.targets) { if (target.hide) { @@ -39,17 +31,28 @@ export class GrafanaDatasource extends DataSourceApi { } if (target.queryType === GrafanaQueryType.LiveMeasurements) { const { channel, filter } = target; - if (channel) { - const addr = parseLiveChannelAddress(channel); - queries.push( - getLiveDataStream({ - key: `${request.requestId}.${counter++}`, - addr: addr!, - filter, - buffer, - }) - ); + const addr = parseLiveChannelAddress(channel); + if (!isValidLiveChannelAddress(addr)) { + continue; } + const buffer: StreamingFrameOptions = { + maxLength: request.maxDataPoints ?? 500, + }; + if (target.buffer) { + buffer.maxDelta = target.buffer; + buffer.maxLength = buffer.maxLength! * 2; //?? + } else if (request.rangeRaw?.to === 'now') { + buffer.maxDelta = request.range.to.valueOf() - request.range.from.valueOf(); + } + + queries.push( + getLiveDataStream({ + key: `${request.requestId}.${counter++}`, + addr: addr!, + filter, + buffer, + }) + ); } else { queries.push(getRandomWalk(request)); } diff --git a/public/app/plugins/datasource/grafana/types.ts b/public/app/plugins/datasource/grafana/types.ts index 0f34f610c71..ccace7bf98c 100644 --- a/public/app/plugins/datasource/grafana/types.ts +++ b/public/app/plugins/datasource/grafana/types.ts @@ -14,6 +14,7 @@ export interface GrafanaQuery extends DataQuery { queryType: GrafanaQueryType; // RandomWalk by default channel?: string; filter?: LiveDataFilter; + buffer?: number; } export const defaultQuery: GrafanaQuery = {