Live: expose HTTP push endpoint that will read influx line protocol and publish to websocket (#32311)

Co-authored-by: Ryan McKinley <ryantxu@gmail.com>
This commit is contained in:
Alexander Emelin 2021-04-05 19:04:46 +03:00 committed by GitHub
parent 7896c6a7b1
commit 54ad791c7e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 576 additions and 130 deletions

3
go.mod
View File

@ -17,7 +17,7 @@ require (
github.com/VividCortex/mysqlerr v0.0.0-20170204212430-6c6b55f8796f
github.com/aws/aws-sdk-go v1.38.12
github.com/beevik/etree v1.1.0
github.com/benbjohnson/clock v0.0.0-20161215174838-7dc76406b6d3
github.com/benbjohnson/clock v1.0.3
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
github.com/centrifugal/centrifuge v0.16.0
github.com/cortexproject/cortex v1.4.1-0.20201022071705-85942c5703cf
@ -42,6 +42,7 @@ require (
github.com/gosimple/slug v1.9.0
github.com/grafana/alerting-api v0.0.0-20210331135037-3294563b51bb
github.com/grafana/grafana-aws-sdk v0.4.0
github.com/grafana/grafana-live-sdk v0.0.4
github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4
github.com/grafana/grafana-plugin-sdk-go v0.91.0
github.com/grafana/loki v1.6.2-0.20201026154740-6978ee5d7387

8
go.sum
View File

@ -211,8 +211,9 @@ github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f/go.mod
github.com/beevik/etree v1.1.0 h1:T0xke/WvNtMoCqgzPhkX2r4rjY3GDZFi+FjpRZY2Jbs=
github.com/beevik/etree v1.1.0/go.mod h1:r8Aw8JqVegEf0w2fDnATrX9VpkMcyFeM0FhwO62wh+A=
github.com/beevik/ntp v0.2.0/go.mod h1:hIHWr+l3+/clUnF44zdK+CWW7fO8dR5cIylAQ76NRpg=
github.com/benbjohnson/clock v0.0.0-20161215174838-7dc76406b6d3 h1:wOysYcIdqv3WnvwqFFzrYCFALPED7qkUGaLXu359GSc=
github.com/benbjohnson/clock v0.0.0-20161215174838-7dc76406b6d3/go.mod h1:UMqtWQTnOe4byzwe7Zhwh8f8s+36uszN51sJrSIZlTE=
github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg=
github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
@ -808,6 +809,8 @@ github.com/grafana/grafana v1.9.2-0.20210308201921-4ce0a49eac03/go.mod h1:AHRRvd
github.com/grafana/grafana-aws-sdk v0.1.0/go.mod h1:+pPo5U+pX0zWimR7YBc7ASeSQfbRkcTyQYqMiAj7G5U=
github.com/grafana/grafana-aws-sdk v0.4.0 h1:JmTaXfOJ/ydHSWH9kEt8Yhfb9kAhIW4LUOO3SWCviYg=
github.com/grafana/grafana-aws-sdk v0.4.0/go.mod h1:+pPo5U+pX0zWimR7YBc7ASeSQfbRkcTyQYqMiAj7G5U=
github.com/grafana/grafana-live-sdk v0.0.4 h1:mATki7fEkKtX4jD+HfOKst9CgFcVyF/pr3Co+gy+Ato=
github.com/grafana/grafana-live-sdk v0.0.4/go.mod h1:f15hHmWyLdFjmuWLsjeKeZnq/HnNQ3QkoPcaEww45AY=
github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4 h1:SPdxCL9BChFTlyi0Khv64vdCW4TMna8+sxL7+Chx+Ag=
github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4/go.mod h1:nc0XxBzjeGcrMltCDw269LoWF9S8ibhgxolCdA1R8To=
github.com/grafana/grafana-plugin-sdk-go v0.79.0/go.mod h1:NvxLzGkVhnoBKwzkst6CFfpMFKwAdIUZ1q8ssuLeF60=
@ -949,8 +952,9 @@ github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod
github.com/influxdata/influxql v1.1.0/go.mod h1:KpVI7okXjK6PRi3Z5B+mtKZli+R1DnZgb3N+tzevNgo=
github.com/influxdata/influxql v1.1.1-0.20200828144457-65d3ef77d385/go.mod h1:gHp9y86a/pxhjJ+zMjNXiQAA197Xk9wLxaz+fGG+kWk=
github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e/go.mod h1:4kt73NQhadE3daL3WhR5EJ/J2ocX0PZzwxQ0gXJ7oFE=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/influxdata/line-protocol v0.0.0-20210311194329-9aa0e372d097 h1:vilfsDSy7TDxedi9gyBkMvAirat/oRcL0lFdJBf6tdM=
github.com/influxdata/line-protocol v0.0.0-20210311194329-9aa0e372d097/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/influxdata/promql/v2 v2.12.0/go.mod h1:fxOPu+DY0bqCTCECchSRtWfc+0X19ybifQhZoQNF5D8=
github.com/influxdata/roaring v0.4.13-0.20180809181101-fc520f41fab6/go.mod h1:bSgUQ7q5ZLSO+bKBGqJiCBGAl+9DxyW63zLTujjUlOE=
github.com/influxdata/tdigest v0.0.0-20181121200506-bf2b5ad3c0a9/go.mod h1:Js0mqiSBE6Ffsg94weZZ2c+v/ciT8QRHFOap7EKDrR0=

View File

@ -8,12 +8,6 @@ import { guessFieldTypeFromNameAndValue } from './processDataFrame';
* @alpha
*/
export interface DataFrameJSON {
/**HACK: this will get removed, but will help transition telegraf streaming
*
* In telegraf, this will be: ${name}${labels}
*/
key?: string;
/**
* The schema defines the field type and configuration.
*/

View File

@ -13,6 +13,7 @@ export enum LiveChannelScope {
DataSource = 'ds', // namespace = data source ID
Plugin = 'plugin', // namespace = plugin name (singleton works for apps too)
Grafana = 'grafana', // namespace = feature
Stream = 'stream', // namespace = id for the managed data stream
}
/**
@ -155,7 +156,7 @@ export interface LiveChannelAddress {
*
* @alpha -- experimental
*/
export function parseLiveChannelAddress(id: string): LiveChannelAddress | undefined {
export function parseLiveChannelAddress(id?: string): LiveChannelAddress | undefined {
if (id?.length) {
let parts = id.trim().split('/');
if (parts.length >= 3) {

View File

@ -48,6 +48,7 @@ export function getLiveDataStream(options: LiveDataStreamOptions): Observable<Da
return new Observable<DataQueryResponse>((subscriber) => {
let data: StreamingDataFrame | undefined = undefined;
let filtered: DataFrame | undefined = undefined;
let state = LoadingState.Loading;
const { key, filter } = options;
let last = perf.last;
@ -60,17 +61,20 @@ export function getLiveDataStream(options: LiveDataStreamOptions): Observable<Da
}
state = LoadingState.Streaming;
// TODO? this *coud* happen only when the schema changes
let filtered = data as DataFrame;
if (filter?.fields && filter.fields.length) {
filtered = {
...data,
fields: data.fields.filter((f) => filter.fields!.includes(f.name)),
};
// Select the fields we are actually looking at
if (!filtered || msg.schema) {
filtered = data;
if (filter?.fields?.length) {
filtered = {
...data,
fields: data.fields.filter((f) => filter.fields!.includes(f.name)),
};
}
}
const elapsed = perf.last - last;
if (elapsed > 1000 || perf.ok) {
filtered.length = data.length; // make sure they stay up-to-date
subscriber.next({ state, data: [filtered], key });
last = perf.last;
}

View File

@ -90,6 +90,10 @@ export function preparePlotConfigBuilder(
// X is the first field in the aligned frame
const xField = frame.fields[0];
if (!xField) {
return builder; // empty frame with no options
}
let seriesIndex = 0;
if (xField.type === FieldType.time) {

View File

@ -68,7 +68,6 @@ const getInfoBoxStyles = stylesFactory((theme: GrafanaTheme, severity: AlertVari
color: ${theme.colors.textSemiWeak};
code {
@include font-family-monospace();
font-size: ${theme.typography.size.sm};
background-color: ${theme.colors.bg1};
color: ${theme.colors.text};

View File

@ -402,6 +402,12 @@ func (hs *HTTPServer) registerRoutes() {
if hs.Live.IsEnabled() {
apiRoute.Post("/live/publish", bind(dtos.LivePublishCmd{}), routing.Wrap(hs.Live.HandleHTTPPublish))
// POST influx line protocol
apiRoute.Post("/live/push/:streamId", hs.LivePushGateway.Handle)
// List available streams and fields
apiRoute.Get("/live/list", routing.Wrap(hs.Live.HandleListHTTP))
}
// short urls

View File

@ -35,6 +35,7 @@ import (
"github.com/grafana/grafana/pkg/services/hooks"
"github.com/grafana/grafana/pkg/services/librarypanels"
"github.com/grafana/grafana/pkg/services/live"
"github.com/grafana/grafana/pkg/services/live/push"
"github.com/grafana/grafana/pkg/services/login"
"github.com/grafana/grafana/pkg/services/provisioning"
"github.com/grafana/grafana/pkg/services/quota"
@ -87,6 +88,7 @@ type HTTPServer struct {
SearchService *search.SearchService `inject:""`
ShortURLService *shorturls.ShortURLService `inject:""`
Live *live.GrafanaLive `inject:""`
LivePushGateway *push.Gateway `inject:""`
ContextHandler *contexthandler.ContextHandler `inject:""`
SQLStore *sqlstore.SQLStore `inject:""`
LibraryPanelService *librarypanels.LibraryPanelService `inject:""`

View File

@ -7,13 +7,14 @@ import (
// ChannelAddress is the channel ID split by parts.
type ChannelAddress struct {
// Scope is one of available channel scopes:
// like ScopeGrafana, ScopePlugin, ScopeDatasource.
// like ScopeGrafana, ScopePlugin, ScopeDatasource, ScopeStream.
Scope string `json:"scope,omitempty"`
// Namespace meaning depends on the scope.
// * when ScopeGrafana, namespace is a "feature"
// * when ScopePlugin, namespace is the plugin name
// * when ScopeDatasource, namespace is the datasource uid
// * when ScopeStream, namespace is the stream ID.
Namespace string `json:"namespace,omitempty"`
// Within each namespace, the handler can process the path as needed.

View File

@ -4,11 +4,17 @@ import (
"context"
"time"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/models"
)
var (
logger = log.New("live.features") // scoped to all features?
)
// BroadcastRunner will simply broadcast all events to `grafana/broadcast/*` channels
// This assumes that data is a JSON object
type BroadcastRunner struct{}

View File

@ -1,36 +0,0 @@
package features
import (
"context"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
)
var (
logger = log.New("live.features") // scoped to all features?
)
// MeasurementsRunner will simply broadcast all events to `grafana/broadcast/*` channels.
// This makes no assumptions about the shape of the data and will broadcast it to anyone listening
type MeasurementsRunner struct {
}
// GetHandlerForPath gets the handler for a path.
// It's called on init.
func (m *MeasurementsRunner) GetHandlerForPath(path string) (models.ChannelHandler, error) {
return m, nil // for now all channels share config
}
// OnSubscribe will let anyone connect to the path
func (m *MeasurementsRunner) OnSubscribe(ctx context.Context, _ *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, backend.SubscribeStreamStatus, error) {
return models.SubscribeReply{}, backend.SubscribeStreamStatusOK, nil
}
// OnPublish is called when a client wants to broadcast on the websocket
// Currently this sends measurements over websocket -- should be replaced with the HTTP interface
func (m *MeasurementsRunner) OnPublish(ctx context.Context, _ *models.SignedInUser, e models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) {
return models.PublishReply{}, backend.PublishStreamStatusOK, nil
}

View File

@ -9,6 +9,7 @@ import (
"github.com/centrifugal/centrifuge"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/api/dtos"
"github.com/grafana/grafana/pkg/api/response"
@ -22,6 +23,7 @@ import (
"github.com/grafana/grafana/pkg/services/live/features"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch"
"github.com/grafana/grafana/pkg/util"
)
var (
@ -68,6 +70,8 @@ type GrafanaLive struct {
// The core internal features
GrafanaScope CoreGrafanaScope
ManagedStreamRunner *ManagedStreamRunner
contextGetter *pluginContextGetter
streamManager *features.StreamManager
}
@ -131,7 +135,8 @@ func (g *GrafanaLive) Init() error {
g.GrafanaScope.Dashboards = dash
g.GrafanaScope.Features["dashboard"] = dash
g.GrafanaScope.Features["broadcast"] = &features.BroadcastRunner{}
g.GrafanaScope.Features["measurements"] = &features.MeasurementsRunner{}
g.ManagedStreamRunner = NewManagedStreamRunner(g.Publish)
// Set ConnectHandler called when client successfully connected to Node. Your code
// inside handler must be synchronized since it will be called concurrently from
@ -349,6 +354,8 @@ func (g *GrafanaLive) GetChannelHandlerFactory(user *models.SignedInUser, scope
return g.handlePluginScope(user, namespace)
case ScopeDatasource:
return g.handleDatasourceScope(user, namespace)
case ScopeStream:
return g.handleStreamScope(user, namespace)
default:
return nil, fmt.Errorf("invalid scope: %q", scope)
}
@ -382,6 +389,10 @@ func (g *GrafanaLive) handlePluginScope(_ *models.SignedInUser, namespace string
), nil
}
func (g *GrafanaLive) handleStreamScope(_ *models.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) {
return g.ManagedStreamRunner.GetOrCreateStream(namespace)
}
func (g *GrafanaLive) handleDatasourceScope(user *models.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) {
ds, err := g.DatasourceCache.GetDatasourceByUID(namespace, user, false)
if err != nil {
@ -445,26 +456,32 @@ func (g *GrafanaLive) HandleHTTPPublish(ctx *models.ReqContext, cmd dtos.LivePub
return response.JSON(http.StatusOK, dtos.LivePublishResponse{})
}
// Write to the standard log15 logger
func handleLog(msg centrifuge.LogEntry) {
arr := make([]interface{}, 0)
for k, v := range msg.Fields {
if v == nil {
v = "<nil>"
} else if v == "" {
v = "<empty>"
}
arr = append(arr, k, v)
// HandleListHTTP returns metadata so the UI can build a nice form
func (g *GrafanaLive) HandleListHTTP(_ *models.ReqContext) response.Response {
info := util.DynMap{}
channels := make([]util.DynMap, 0)
for k, v := range g.ManagedStreamRunner.Streams() {
channels = append(channels, v.ListChannels("stream/"+k+"/")...)
}
switch msg.Level {
case centrifuge.LogLevelDebug:
loggerCF.Debug(msg.Message, arr...)
case centrifuge.LogLevelError:
loggerCF.Error(msg.Message, arr...)
case centrifuge.LogLevelInfo:
loggerCF.Info(msg.Message, arr...)
default:
loggerCF.Debug(msg.Message, arr...)
}
// Hardcode sample streams
frame := data.NewFrame("testdata",
data.NewField("Time", nil, make([]time.Time, 0)),
data.NewField("Value", nil, make([]float64, 0)),
data.NewField("Min", nil, make([]float64, 0)),
data.NewField("Max", nil, make([]float64, 0)),
)
channels = append(channels, util.DynMap{
"channel": "plugin/testdata/random-2s-stream",
"data": frame,
}, util.DynMap{
"channel": "plugin/testdata/random-flakey-stream",
"data": frame,
}, util.DynMap{
"channel": "plugin/testdata/random-20Hz-stream",
"data": frame,
})
info["channels"] = channels
return response.JSONStreaming(200, info)
}

27
pkg/services/live/log.go Normal file
View File

@ -0,0 +1,27 @@
package live
import "github.com/centrifugal/centrifuge"
// Write to the standard log15 logger
func handleLog(msg centrifuge.LogEntry) {
arr := make([]interface{}, 0)
for k, v := range msg.Fields {
if v == nil {
v = "<nil>"
} else if v == "" {
v = "<empty>"
}
arr = append(arr, k, v)
}
switch msg.Level {
case centrifuge.LogLevelDebug:
loggerCF.Debug(msg.Message, arr...)
case centrifuge.LogLevelError:
loggerCF.Error(msg.Message, arr...)
case centrifuge.LogLevelInfo:
loggerCF.Info(msg.Message, arr...)
default:
loggerCF.Debug(msg.Message, arr...)
}
}

View File

@ -0,0 +1,142 @@
package live
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/util"
)
type ManagedStreamRunner struct {
mu sync.RWMutex
streams map[string]*ManagedStream
publisher models.ChannelPublisher
}
// NewPluginRunner creates new PluginRunner.
func NewManagedStreamRunner(publisher models.ChannelPublisher) *ManagedStreamRunner {
return &ManagedStreamRunner{
publisher: publisher,
streams: map[string]*ManagedStream{},
}
}
// Streams returns map of active managed streams.
func (r *ManagedStreamRunner) Streams() map[string]*ManagedStream {
r.mu.RLock()
defer r.mu.RUnlock()
streams := make(map[string]*ManagedStream, len(r.streams))
for k, v := range r.streams {
streams[k] = v
}
return streams
}
// GetOrCreateStream -- for now this will create new manager for each key.
// Eventually, the stream behavior will need to be configured explicitly
func (r *ManagedStreamRunner) GetOrCreateStream(streamID string) (*ManagedStream, error) {
r.mu.Lock()
defer r.mu.Unlock()
s, ok := r.streams[streamID]
if !ok {
s = NewManagedStream(streamID, r.publisher)
r.streams[streamID] = s
}
return s, nil
}
// ManagedStream holds the state of a managed stream
type ManagedStream struct {
mu sync.RWMutex
id string
start time.Time
last map[string]json.RawMessage
publisher models.ChannelPublisher
}
// NewCache creates new Cache.
func NewManagedStream(id string, publisher models.ChannelPublisher) *ManagedStream {
return &ManagedStream{
id: id,
start: time.Now(),
last: map[string]json.RawMessage{},
publisher: publisher,
}
}
// ListChannels returns info for the UI about this stream.
func (s *ManagedStream) ListChannels(prefix string) []util.DynMap {
s.mu.RLock()
defer s.mu.RUnlock()
info := make([]util.DynMap, 0, len(s.last))
for k, v := range s.last {
ch := util.DynMap{}
ch["channel"] = prefix + k
ch["data"] = v
info = append(info, ch)
}
return info
}
// Push sends data to the stream and optionally processes it.
func (s *ManagedStream) Push(path string, frame *data.Frame) error {
// Keep schema + data for last packet.
frameJSON, err := data.FrameToJSON(frame, true, true)
if err != nil {
logger.Error("Error marshaling Frame to Schema", "error", err)
return err
}
// Locks until we totally finish?
s.mu.Lock()
defer s.mu.Unlock()
_, exists := s.last[path]
s.last[path] = frameJSON
// When the packet already exits, only send the data.
if exists {
frameJSON, err = data.FrameToJSON(frame, false, true)
if err != nil {
logger.Error("Error marshaling Frame to JSON", "error", err)
return err
}
}
// The channel this will be posted into.
channel := fmt.Sprintf("stream/%s/%s", s.id, path)
logger.Debug("Publish data to channel", "channel", channel, "dataLength", len(frameJSON))
return s.publisher(channel, frameJSON)
}
// getLastPacket retrieves schema for a channel.
func (s *ManagedStream) getLastPacket(path string) (json.RawMessage, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
schema, ok := s.last[path]
return schema, ok
}
func (s *ManagedStream) GetHandlerForPath(_ string) (models.ChannelHandler, error) {
return s, nil
}
func (s *ManagedStream) OnSubscribe(_ context.Context, _ *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, backend.SubscribeStreamStatus, error) {
reply := models.SubscribeReply{}
packet, ok := s.getLastPacket(e.Path)
if ok {
reply.Data = packet
}
return reply, backend.SubscribeStreamStatusOK, nil
}
func (s *ManagedStream) OnPublish(_ context.Context, _ *models.SignedInUser, _ models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) {
return models.PublishReply{}, backend.PublishStreamStatusPermissionDenied, nil
}

View File

@ -0,0 +1,30 @@
package live
import (
"testing"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/stretchr/testify/require"
)
var noopPublisher = func(p string, b []byte) error {
return nil
}
func TestNewManagedStream(t *testing.T) {
c := NewManagedStream("a", noopPublisher)
require.NotNil(t, c)
}
func TestManagedStream_GetLastPacket(t *testing.T) {
c := NewManagedStream("a", noopPublisher)
_, ok := c.getLastPacket("test")
require.False(t, ok)
err := c.Push("test", data.NewFrame("hello"))
require.NoError(t, err)
s, ok := c.getLastPacket("test")
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, `{"schema":{"name":"hello","fields":[]},"data":{"values":[]}}`, string(s))
}

View File

@ -1,6 +1,8 @@
package live
import (
"fmt"
"github.com/grafana/grafana/pkg/models"
"github.com/centrifugal/centrifuge"
@ -18,7 +20,10 @@ func newPluginPacketSender(node *centrifuge.Node) *pluginPacketSender {
func (p *pluginPacketSender) Send(channel string, packet *backend.StreamPacket) error {
_, err := p.node.Publish(channel, packet.Data)
return err
if err != nil {
return fmt.Errorf("error publishing %s: %w", string(packet.Data), err)
}
return nil
}
type pluginPresenceGetter struct {

View File

@ -0,0 +1,104 @@
package push
import (
"context"
"net/http"
"github.com/grafana/grafana-live-sdk/telemetry/telegraf"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/services/live"
"github.com/grafana/grafana/pkg/setting"
)
var (
logger = log.New("live_push")
)
func init() {
registry.RegisterServiceWithPriority(&Gateway{}, registry.Low)
}
// Gateway receives data and translates it to Grafana Live publications.
type Gateway struct {
Cfg *setting.Cfg `inject:""`
GrafanaLive *live.GrafanaLive `inject:""`
telegrafConverterWide *telegraf.Converter
telegrafConverterLabelsColumn *telegraf.Converter
}
// Init Gateway.
func (g *Gateway) Init() error {
logger.Info("Telemetry Gateway initialization")
if !g.IsEnabled() {
logger.Debug("Telemetry Gateway not enabled, skipping initialization")
return nil
}
// For now only Telegraf converter (influx format) is supported.
g.telegrafConverterWide = telegraf.NewConverter()
g.telegrafConverterLabelsColumn = telegraf.NewConverter(telegraf.WithUseLabelsColumn(true))
return nil
}
// Run Gateway.
func (g *Gateway) Run(ctx context.Context) error {
if !g.IsEnabled() {
logger.Debug("GrafanaLive feature not enabled, skipping initialization of Telemetry Gateway")
return nil
}
<-ctx.Done()
return ctx.Err()
}
// IsEnabled returns true if the Grafana Live feature is enabled.
func (g *Gateway) IsEnabled() bool {
return g.Cfg.IsLiveEnabled() // turn on when Live on for now.
}
func (g *Gateway) Handle(ctx *models.ReqContext) {
streamID := ctx.Params(":streamId")
stream, err := g.GrafanaLive.ManagedStreamRunner.GetOrCreateStream(streamID)
if err != nil {
logger.Error("Error getting stream", "error", err)
ctx.Resp.WriteHeader(http.StatusInternalServerError)
return
}
// TODO Grafana 8: decide which format to use or keep both.
converter := g.telegrafConverterWide
if ctx.Req.URL.Query().Get("format") == "labels_column" {
converter = g.telegrafConverterLabelsColumn
}
body, err := ctx.Req.Body().Bytes()
if err != nil {
logger.Error("Error reading body", "error", err)
ctx.Resp.WriteHeader(http.StatusInternalServerError)
return
}
logger.Debug("Live Push request body", "streamId", streamID, "bodyLength", len(body))
metricFrames, err := converter.Convert(body)
if err != nil {
logger.Error("Error converting metrics", "error", err)
ctx.Resp.WriteHeader(http.StatusInternalServerError)
return
}
// TODO -- make sure all packets are combined together!
// interval = "1s" vs flush_interval = "5s"
for _, mf := range metricFrames {
err := stream.Push(mf.Key(), mf.Frame())
if err != nil {
ctx.Resp.WriteHeader(http.StatusInternalServerError)
return
}
}
}

View File

@ -7,4 +7,6 @@ const (
ScopePlugin = "plugin"
// ScopeDatasource passes control to a datasource plugin.
ScopeDatasource = "ds"
// ScopeStream is a managed data frame stream
ScopeStream = "stream"
)

View File

@ -17,7 +17,7 @@ import Centrifuge, {
UnsubscribeContext,
} from 'centrifuge/dist/centrifuge';
import { Subject, of, merge } from 'rxjs';
import { Subject, of, Observable } from 'rxjs';
/**
* Internal class that maps Centrifuge support to GrafanaLive
@ -127,7 +127,20 @@ export class CentrifugeLiveChannel<TMessage = any, TPublish = any> implements Li
* Get the stream of events and
*/
getStream() {
return merge(of({ ...this.currentStatus }), this.stream.asObservable());
return new Observable((subscriber) => {
subscriber.next({ ...this.currentStatus });
const sub = this.stream.subscribe(subscriber);
return () => {
sub.unsubscribe();
const count = this.stream.observers.length;
console.log('unsubscribe stream', this.addr, count);
// Fully disconnect when no more listeners
if (count === 0) {
this.disconnect();
}
};
}) as Observable<LiveChannelEvent<TMessage>>;
}
/**

View File

@ -8,6 +8,7 @@ import {
grafanaLiveCoreFeatures,
GrafanaLiveDataSourceScope,
GrafanaLivePluginScope,
GrafanaLiveStreamScope,
} from './scopes';
import { registerLiveFeatures } from './features';
@ -52,6 +53,7 @@ export class CentrifugeSrv implements GrafanaLiveSrv {
[LiveChannelScope.Grafana]: grafanaLiveCoreFeatures,
[LiveChannelScope.DataSource]: new GrafanaLiveDataSourceScope(),
[LiveChannelScope.Plugin]: new GrafanaLivePluginScope(),
[LiveChannelScope.Stream]: new GrafanaLiveStreamScope(),
};
// Register global listeners

View File

@ -2,6 +2,7 @@ import { LiveChannelScope, LiveChannelSupport, SelectableValue } from '@grafana/
import { getDataSourceSrv } from '@grafana/runtime';
import { config } from 'app/core/config';
import { loadPlugin } from '../plugins/PluginPage';
import { LiveMeasurementsSupport } from './measurements/measurementsSupport';
export abstract class GrafanaLiveScope {
constructor(protected scope: LiveChannelScope) {}
@ -152,3 +153,29 @@ export class GrafanaLivePluginScope extends GrafanaLiveScope {
return (this.names = names);
}
}
export class GrafanaLiveStreamScope extends GrafanaLiveScope {
names?: Array<SelectableValue<string>>;
constructor() {
super(LiveChannelScope.Stream);
}
async getChannelSupport(namespace: string) {
return new LiveMeasurementsSupport();
}
/**
* List the possible values within this scope
*/
async listNamespaces() {
if (this.names) {
return Promise.resolve(this.names);
}
const names: Array<SelectableValue<string>> = [];
// TODO!!!
return (this.names = names);
}
}

View File

@ -1,16 +1,24 @@
import defaults from 'lodash/defaults';
import React, { PureComponent } from 'react';
import { InlineField, Select, FeatureInfoBox } from '@grafana/ui';
import { QueryEditorProps, SelectableValue, FeatureState, getFrameDisplayName } from '@grafana/data';
import { InlineField, Select, FeatureInfoBox, Input } from '@grafana/ui';
import { QueryEditorProps, SelectableValue, FeatureState, dataFrameFromJSON, rangeUtil } from '@grafana/data';
import { GrafanaDatasource } from '../datasource';
import { defaultQuery, GrafanaQuery, GrafanaQueryType } from '../types';
import { getBackendSrv } from '@grafana/runtime';
type Props = QueryEditorProps<GrafanaDatasource, GrafanaQuery>;
const labelWidth = 12;
export class QueryEditor extends PureComponent<Props> {
interface State {
channels: Array<SelectableValue<string>>;
channelFields: Record<string, Array<SelectableValue<string>>>;
}
export class QueryEditor extends PureComponent<Props, State> {
state: State = { channels: [], channelFields: {} };
queryTypes: Array<SelectableValue<GrafanaQueryType>> = [
{
label: 'Random Walk',
@ -24,6 +32,43 @@ export class QueryEditor extends PureComponent<Props> {
},
];
loadChannelInfo() {
getBackendSrv()
.fetch({ url: 'api/live/list' })
.subscribe({
next: (v: any) => {
console.log('GOT', v);
const channelInfo = v.data?.channels as any[];
if (channelInfo?.length) {
const channelFields: Record<string, Array<SelectableValue<string>>> = {};
const channels: Array<SelectableValue<string>> = channelInfo.map((c) => {
if (c.data) {
const distinctFields = new Set<string>();
const frame = dataFrameFromJSON(c.data);
for (const f of frame.fields) {
distinctFields.add(f.name);
}
channelFields[c.channel] = Array.from(distinctFields).map((n) => ({
value: n,
label: n,
}));
}
return {
value: c.channel,
label: c.channel,
};
});
this.setState({ channelFields, channels });
}
},
});
}
componentDidMount() {
this.loadChannelInfo();
}
onQueryTypeChange = (sel: SelectableValue<GrafanaQueryType>) => {
const { onChange, query, onRunQuery } = this.props;
onChange({ ...query, queryType: sel.value! });
@ -45,6 +90,15 @@ export class QueryEditor extends PureComponent<Props> {
fields = [item.value];
}
// When adding the first field, also add time (if it exists)
if (fields.length === 1 && !query.filter?.fields?.length && query.channel) {
const names = this.state.channelFields[query.channel] ?? [];
const tf = names.find((f) => f.value === 'time' || f.value === 'Time');
if (tf && tf.value && tf.value !== fields[0]) {
fields = [tf.value, ...fields];
}
}
onChange({
...query,
filter: {
@ -55,19 +109,37 @@ export class QueryEditor extends PureComponent<Props> {
onRunQuery();
};
checkAndUpdateBuffer = (txt: string) => {
const { onChange, query, onRunQuery } = this.props;
let buffer: number | undefined;
if (txt) {
try {
buffer = rangeUtil.intervalToSeconds(txt) * 1000;
} catch (err) {
console.warn('ERROR', err);
}
}
onChange({
...query,
buffer,
});
onRunQuery();
};
handleEnterKey = (e: React.KeyboardEvent<HTMLInputElement>) => {
if (e.key !== 'Enter') {
return;
}
this.checkAndUpdateBuffer((e.target as any).value);
};
handleBlur = (e: React.FocusEvent<HTMLInputElement>) => {
this.checkAndUpdateBuffer(e.target.value);
};
renderMeasurementsQuery() {
const { data } = this.props;
let { channel, filter } = this.props.query;
const channels: Array<SelectableValue<string>> = [
{
value: 'plugin/testdata/random-2s-stream',
label: 'plugin/testdata/random-2s-stream',
},
{
value: 'plugin/testdata/random-flakey-stream',
label: 'plugin/testdata/random-flakey-stream',
},
];
let { channel, filter, buffer } = this.props.query;
let { channels, channelFields } = this.state;
let currentChannel = channels.find((c) => c.value === channel);
if (channel && !currentChannel) {
currentChannel = {
@ -75,26 +147,26 @@ export class QueryEditor extends PureComponent<Props> {
label: channel,
description: `Connected to ${channel}`,
};
channels.push(currentChannel);
channels = [currentChannel, ...channels];
}
const distinctFields = new Set<string>();
const fields: Array<SelectableValue<string>> = [];
if (data && data.series?.length) {
for (const frame of data.series) {
for (const field of frame.fields) {
if (distinctFields.has(field.name) || !field.name) {
continue;
}
fields.push({
value: field.name,
label: field.name,
description: `(${getFrameDisplayName(frame)} / ${field.type})`,
});
distinctFields.add(field.name);
}
}
}
const fields: Array<SelectableValue<string>> = channel ? channelFields[channel] ?? [] : [];
// if (data && data.series?.length) {
// for (const frame of data.series) {
// for (const field of frame.fields) {
// if (distinctFields.has(field.name) || !field.name) {
// continue;
// }
// fields.push({
// value: field.name,
// label: field.name,
// description: `(${getFrameDisplayName(frame)} / ${field.type})`,
// });
// distinctFields.add(field.name);
// }
// }
// }
if (filter?.fields) {
for (const f of filter.fields) {
if (!distinctFields.has(f)) {
@ -108,6 +180,11 @@ export class QueryEditor extends PureComponent<Props> {
}
}
let formattedTime = '';
if (buffer) {
formattedTime = rangeUtil.secondsToHms(buffer / 1000);
}
return (
<>
<div className="gf-form">
@ -142,6 +219,16 @@ export class QueryEditor extends PureComponent<Props> {
isMulti={true}
/>
</InlineField>
<InlineField label="Buffer">
<Input
placeholder="Auto"
width={12}
defaultValue={formattedTime}
onKeyDown={this.handleEnterKey}
onBlur={this.handleBlur}
spellCheck={false}
/>
</InlineField>
</div>
)}

View File

@ -6,6 +6,7 @@ import {
DataQueryResponse,
DataSourceApi,
DataSourceInstanceSettings,
isValidLiveChannelAddress,
parseLiveChannelAddress,
StreamingFrameOptions,
} from '@grafana/data';
@ -23,15 +24,6 @@ export class GrafanaDatasource extends DataSourceApi<GrafanaQuery> {
}
query(request: DataQueryRequest<GrafanaQuery>): Observable<DataQueryResponse> {
const buffer: StreamingFrameOptions = {
maxLength: request.maxDataPoints ?? 500,
};
if (request.rangeRaw?.to === 'now') {
const elapsed = request.range.to.valueOf() - request.range.from.valueOf();
buffer.maxDelta = elapsed;
}
const queries: Array<Observable<DataQueryResponse>> = [];
for (const target of request.targets) {
if (target.hide) {
@ -39,17 +31,28 @@ export class GrafanaDatasource extends DataSourceApi<GrafanaQuery> {
}
if (target.queryType === GrafanaQueryType.LiveMeasurements) {
const { channel, filter } = target;
if (channel) {
const addr = parseLiveChannelAddress(channel);
queries.push(
getLiveDataStream({
key: `${request.requestId}.${counter++}`,
addr: addr!,
filter,
buffer,
})
);
const addr = parseLiveChannelAddress(channel);
if (!isValidLiveChannelAddress(addr)) {
continue;
}
const buffer: StreamingFrameOptions = {
maxLength: request.maxDataPoints ?? 500,
};
if (target.buffer) {
buffer.maxDelta = target.buffer;
buffer.maxLength = buffer.maxLength! * 2; //??
} else if (request.rangeRaw?.to === 'now') {
buffer.maxDelta = request.range.to.valueOf() - request.range.from.valueOf();
}
queries.push(
getLiveDataStream({
key: `${request.requestId}.${counter++}`,
addr: addr!,
filter,
buffer,
})
);
} else {
queries.push(getRandomWalk(request));
}

View File

@ -14,6 +14,7 @@ export interface GrafanaQuery extends DataQuery {
queryType: GrafanaQueryType; // RandomWalk by default
channel?: string;
filter?: LiveDataFilter;
buffer?: number;
}
export const defaultQuery: GrafanaQuery = {