From 93292f6eefd666405af65576c40b88845d5e15f4 Mon Sep 17 00:00:00 2001 From: Alexander Emelin Date: Fri, 2 Apr 2021 19:41:45 +0300 Subject: [PATCH] Live: update Streaming plugin definitions, put frame schema in subscribe result data (#32561) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Torkel Ödegaard Co-authored-by: Ryan McKinley --- go.mod | 2 +- go.sum | 5 +- .../src/dataframe/StreamingDataFrame.ts | 22 +++- pkg/models/live.go | 22 +++- .../backendplugin/coreplugin/core_plugin.go | 11 +- .../backendplugin/grpcplugin/client_v1.go | 6 +- .../backendplugin/grpcplugin/client_v2.go | 37 +++--- .../backendplugin/grpcplugin/grpc_plugin.go | 53 +++++---- .../backendplugin/manager/manager_test.go | 6 +- pkg/services/live/features/broadcast.go | 10 +- pkg/services/live/features/dashboard.go | 12 +- pkg/services/live/features/measurements.go | 10 +- pkg/services/live/features/mock.go | 39 ++++--- pkg/services/live/features/plugin.go | 85 +++++++++----- pkg/services/live/features/stream.go | 108 ++++++++++++------ pkg/services/live/features/stream_test.go | 24 ++-- pkg/services/live/live.go | 92 ++++++++++++--- pkg/services/live/plugin_helpers.go | 10 +- pkg/tsdb/cloudwatch/live.go | 10 +- pkg/tsdb/testdatasource/stream_handler.go | 47 +++++--- public/app/features/live/channel.ts | 2 +- 21 files changed, 408 insertions(+), 205 deletions(-) diff --git a/go.mod b/go.mod index 7645434bcdf..99ee19d8f59 100644 --- a/go.mod +++ b/go.mod @@ -43,7 +43,7 @@ require ( github.com/grafana/alerting-api v0.0.0-20210331135037-3294563b51bb github.com/grafana/grafana-aws-sdk v0.4.0 github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4 - github.com/grafana/grafana-plugin-sdk-go v0.90.0 + github.com/grafana/grafana-plugin-sdk-go v0.91.0 github.com/grafana/loki v1.6.2-0.20201026154740-6978ee5d7387 github.com/grpc-ecosystem/go-grpc-middleware v1.2.2 github.com/hashicorp/go-hclog v0.15.0 diff --git a/go.sum b/go.sum index 69dde95edfe..faa9a73716a 100644 --- a/go.sum +++ b/go.sum @@ -811,8 +811,8 @@ github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4 h1:SP github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4/go.mod h1:nc0XxBzjeGcrMltCDw269LoWF9S8ibhgxolCdA1R8To= github.com/grafana/grafana-plugin-sdk-go v0.79.0/go.mod h1:NvxLzGkVhnoBKwzkst6CFfpMFKwAdIUZ1q8ssuLeF60= github.com/grafana/grafana-plugin-sdk-go v0.88.0/go.mod h1:PTALh0lz+Y7k0+OMczAABTpeocL63aw6FVOBptp5GVo= -github.com/grafana/grafana-plugin-sdk-go v0.90.0 h1:ea+mTQSr/Sk00WPyRn4guFjSJMRezaOEtfA+jVwFljk= -github.com/grafana/grafana-plugin-sdk-go v0.90.0/go.mod h1:Ot3k7nY7P6DXmUsDgKvNB7oG1v7PRyTdmnYVoS554bU= +github.com/grafana/grafana-plugin-sdk-go v0.91.0 h1:kiPS3NqR+KOvHrc32EkX7D40JON3+GYZ6Nm2WOtCElQ= +github.com/grafana/grafana-plugin-sdk-go v0.91.0/go.mod h1:Ot3k7nY7P6DXmUsDgKvNB7oG1v7PRyTdmnYVoS554bU= github.com/grafana/loki v1.6.2-0.20201026154740-6978ee5d7387 h1:iwcM8lkYJ3EhytGLJ2BvRSwutb0QWoI7EWbYv3yJRsY= github.com/grafana/loki v1.6.2-0.20201026154740-6978ee5d7387/go.mod h1:jHA1OHnPsuj3LLgMXmFopsKDt4ARHHUhrmT3JrGf71g= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= @@ -1422,7 +1422,6 @@ github.com/prometheus/prometheus v1.8.2-0.20210217141258-a6be548dbc17 h1:VN3p3Nb github.com/prometheus/prometheus v1.8.2-0.20210217141258-a6be548dbc17/go.mod h1:dv3B1syqmkrkmo665MPCU6L8PbTXIiUeg/OEQULLNxA= github.com/prometheus/statsd_exporter v0.15.0/go.mod h1:Dv8HnkoLQkeEjkIE4/2ndAA7WL1zHKK7WMqFQqu72rw= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= -github.com/quasilyte/go-ruleguard/dsl/fluent v0.0.0-20201222093424-5d7e62a465d3 h1:eL7x4/zMnlquMxYe7V078BD7MGskZ0daGln+SJCVzuY= github.com/quasilyte/go-ruleguard/dsl/fluent v0.0.0-20201222093424-5d7e62a465d3/go.mod h1:P7JlQWFT7jDcFZMtUPQbtGzzzxva3rBn6oIF+LPwFcM= github.com/rafaeljusto/redigomock v0.0.0-20190202135759-257e089e14a1/go.mod h1:JaY6n2sDr+z2WTsXkOmNRUfDy6FN0L6Nk7x06ndm4tY= github.com/rainycape/unidecode v0.0.0-20150907023854-cb7f23ec59be h1:ta7tUOvsPHVHGom5hKW5VXNc2xZIkfCKP8iaqOyYtUQ= diff --git a/packages/grafana-data/src/dataframe/StreamingDataFrame.ts b/packages/grafana-data/src/dataframe/StreamingDataFrame.ts index 5bd510334c8..8ff38302c86 100644 --- a/packages/grafana-data/src/dataframe/StreamingDataFrame.ts +++ b/packages/grafana-data/src/dataframe/StreamingDataFrame.ts @@ -2,6 +2,7 @@ import { Field, DataFrame, FieldType } from '../types/dataFrame'; import { QueryResultMeta } from '../types'; import { ArrayVector } from '../vector'; import { DataFrameJSON, decodeFieldValueEntities } from './DataFrameJSON'; +import { guessFieldTypeFromValue } from './processDataFrame'; // binary search for index of closest value function closestIdx(num: number, arr: number[], lo?: number, hi?: number) { @@ -141,7 +142,26 @@ export class StreamingDataFrame implements DataFrame { if (data && data.values.length && data.values[0].length) { const { values, entities } = data; if (values.length !== this.fields.length) { - throw new Error('push message mismatch'); + if (this.fields.length) { + throw new Error(`push message mismatch. Expected: ${this.fields.length}, recieved: ${values.length}`); + } + + this.fields = values.map((vals, idx) => { + let name = `Field ${idx}`; + let type = guessFieldTypeFromValue(vals[0]); + const isTime = idx === 0 && type === FieldType.number && vals[0] > 1600016688632; + if (isTime) { + type = FieldType.time; + name = 'Time'; + } + + return { + name, + type, + config: {}, + values: new ArrayVector([]), + }; + }); } if (entities) { diff --git a/pkg/models/live.go b/pkg/models/live.go index fc4012575bb..548a3a1c915 100644 --- a/pkg/models/live.go +++ b/pkg/models/live.go @@ -4,40 +4,54 @@ import ( "context" "encoding/json" "time" + + "github.com/grafana/grafana-plugin-sdk-go/backend" ) -// ChannelPublisher writes data into a channel. Note that pemissions are not checked. +// ChannelPublisher writes data into a channel. Note that permissions are not checked. type ChannelPublisher func(channel string, data []byte) error +// SubscribeEvent contains subscription data. type SubscribeEvent struct { Channel string Path string } +// SubscribeReply is a reaction to SubscribeEvent. type SubscribeReply struct { Presence bool JoinLeave bool Recover bool + Data json.RawMessage } +// PublishEvent contains publication data. type PublishEvent struct { Channel string Path string Data json.RawMessage } +// PublishReply is a reaction to PublishEvent. type PublishReply struct { + // By default, it's a handler responsibility to publish data + // into a stream upon OnPublish but setting Fallthrough to true + // will make Grafana Live publish data itself (i.e. stream handler + // just works as permission proxy in this case). + Data json.RawMessage + // HistorySize sets a stream history size. HistorySize int - HistoryTTL time.Duration + // HistoryTTL is a time that messages will live in stream history. + HistoryTTL time.Duration } // ChannelHandler defines the core channel behavior type ChannelHandler interface { // OnSubscribe is called when a client wants to subscribe to a channel - OnSubscribe(ctx context.Context, user *SignedInUser, e SubscribeEvent) (SubscribeReply, bool, error) + OnSubscribe(ctx context.Context, user *SignedInUser, e SubscribeEvent) (SubscribeReply, backend.SubscribeStreamStatus, error) // OnPublish is called when a client writes a message to the channel websocket. - OnPublish(ctx context.Context, user *SignedInUser, e PublishEvent) (PublishReply, bool, error) + OnPublish(ctx context.Context, user *SignedInUser, e PublishEvent) (PublishReply, backend.PublishStreamStatus, error) } // ChannelHandlerFactory should be implemented by all core features. diff --git a/pkg/plugins/backendplugin/coreplugin/core_plugin.go b/pkg/plugins/backendplugin/coreplugin/core_plugin.go index aab5ab9952c..bbb38b06f8b 100644 --- a/pkg/plugins/backendplugin/coreplugin/core_plugin.go +++ b/pkg/plugins/backendplugin/coreplugin/core_plugin.go @@ -87,9 +87,16 @@ func (cp *corePlugin) CallResource(ctx context.Context, req *backend.CallResourc return backendplugin.ErrMethodNotImplemented } -func (cp *corePlugin) CanSubscribeToStream(ctx context.Context, req *backend.SubscribeToStreamRequest) (*backend.SubscribeToStreamResponse, error) { +func (cp *corePlugin) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) { if cp.StreamHandler != nil { - return cp.StreamHandler.CanSubscribeToStream(ctx, req) + return cp.StreamHandler.SubscribeStream(ctx, req) + } + return nil, backendplugin.ErrMethodNotImplemented +} + +func (cp *corePlugin) PublishStream(ctx context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) { + if cp.StreamHandler != nil { + return cp.StreamHandler.PublishStream(ctx, req) } return nil, backendplugin.ErrMethodNotImplemented } diff --git a/pkg/plugins/backendplugin/grpcplugin/client_v1.go b/pkg/plugins/backendplugin/grpcplugin/client_v1.go index bedf3cd30d6..b7fea010211 100644 --- a/pkg/plugins/backendplugin/grpcplugin/client_v1.go +++ b/pkg/plugins/backendplugin/grpcplugin/client_v1.go @@ -63,7 +63,11 @@ func (c *clientV1) CallResource(ctx context.Context, req *backend.CallResourceRe return backendplugin.ErrMethodNotImplemented } -func (c *clientV1) CanSubscribeToStream(ctx context.Context, request *backend.SubscribeToStreamRequest) (*backend.SubscribeToStreamResponse, error) { +func (c *clientV1) SubscribeStream(ctx context.Context, request *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) { + return nil, backendplugin.ErrMethodNotImplemented +} + +func (c *clientV1) PublishStream(ctx context.Context, request *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) { return nil, backendplugin.ErrMethodNotImplemented } diff --git a/pkg/plugins/backendplugin/grpcplugin/client_v2.go b/pkg/plugins/backendplugin/grpcplugin/client_v2.go index 17501184cd1..172d6a2a01b 100644 --- a/pkg/plugins/backendplugin/grpcplugin/client_v2.go +++ b/pkg/plugins/backendplugin/grpcplugin/client_v2.go @@ -55,32 +55,32 @@ func newClientV2(descriptor PluginDescriptor, logger log.Logger, rpcClient plugi c := clientV2{} if rawDiagnostics != nil { - if plugin, ok := rawDiagnostics.(grpcplugin.DiagnosticsClient); ok { - c.DiagnosticsClient = plugin + if diagnosticsClient, ok := rawDiagnostics.(grpcplugin.DiagnosticsClient); ok { + c.DiagnosticsClient = diagnosticsClient } } if rawResource != nil { - if plugin, ok := rawResource.(grpcplugin.ResourceClient); ok { - c.ResourceClient = plugin + if resourceClient, ok := rawResource.(grpcplugin.ResourceClient); ok { + c.ResourceClient = resourceClient } } if rawData != nil { - if plugin, ok := rawData.(grpcplugin.DataClient); ok { - c.DataClient = instrumentDataClient(plugin) + if dataClient, ok := rawData.(grpcplugin.DataClient); ok { + c.DataClient = instrumentDataClient(dataClient) } } if rawStream != nil { - if plugin, ok := rawStream.(grpcplugin.StreamClient); ok { - c.StreamClient = plugin + if streamClient, ok := rawStream.(grpcplugin.StreamClient); ok { + c.StreamClient = streamClient } } if rawRenderer != nil { - if plugin, ok := rawRenderer.(pluginextensionv2.RendererPlugin); ok { - c.RendererPlugin = plugin + if rendererPlugin, ok := rawRenderer.(pluginextensionv2.RendererPlugin); ok { + c.RendererPlugin = rendererPlugin } } @@ -171,15 +171,26 @@ func (c *clientV2) CallResource(ctx context.Context, req *backend.CallResourceRe } } -func (c *clientV2) CanSubscribeToStream(ctx context.Context, req *backend.SubscribeToStreamRequest) (*backend.SubscribeToStreamResponse, error) { +func (c *clientV2) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) { if c.StreamClient == nil { return nil, backendplugin.ErrMethodNotImplemented } - protoResp, err := c.StreamClient.CanSubscribeToStream(ctx, backend.ToProto().SubscribeToStreamRequest(req)) + protoResp, err := c.StreamClient.SubscribeStream(ctx, backend.ToProto().SubscribeStreamRequest(req)) if err != nil { return nil, err } - return backend.FromProto().SubscribeToStreamResponse(protoResp), nil + return backend.FromProto().SubscribeStreamResponse(protoResp), nil +} + +func (c *clientV2) PublishStream(ctx context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) { + if c.StreamClient == nil { + return nil, backendplugin.ErrMethodNotImplemented + } + protoResp, err := c.StreamClient.PublishStream(ctx, backend.ToProto().PublishStreamRequest(req)) + if err != nil { + return nil, err + } + return backend.FromProto().PublishStreamResponse(protoResp), nil } func (c *clientV2) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender backend.StreamPacketSender) error { diff --git a/pkg/plugins/backendplugin/grpcplugin/grpc_plugin.go b/pkg/plugins/backendplugin/grpcplugin/grpc_plugin.go index 1ac979d4ac5..77b1e8ec2cf 100644 --- a/pkg/plugins/backendplugin/grpcplugin/grpc_plugin.go +++ b/pkg/plugins/backendplugin/grpcplugin/grpc_plugin.go @@ -100,62 +100,61 @@ func (p *grpcPlugin) Exited() bool { return true } -func (p *grpcPlugin) CollectMetrics(ctx context.Context) (*backend.CollectMetricsResult, error) { +func (p *grpcPlugin) getPluginClient() (pluginClient, bool) { p.mutex.RLock() if p.client == nil || p.client.Exited() || p.pluginClient == nil { p.mutex.RUnlock() - return nil, backendplugin.ErrPluginUnavailable + return nil, false } pluginClient := p.pluginClient p.mutex.RUnlock() + return pluginClient, true +} +func (p *grpcPlugin) CollectMetrics(ctx context.Context) (*backend.CollectMetricsResult, error) { + pluginClient, ok := p.getPluginClient() + if !ok { + return nil, backendplugin.ErrPluginUnavailable + } return pluginClient.CollectMetrics(ctx) } func (p *grpcPlugin) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { - p.mutex.RLock() - if p.client == nil || p.client.Exited() || p.pluginClient == nil { - p.mutex.RUnlock() + pluginClient, ok := p.getPluginClient() + if !ok { return nil, backendplugin.ErrPluginUnavailable } - pluginClient := p.pluginClient - p.mutex.RUnlock() - return pluginClient.CheckHealth(ctx, req) } func (p *grpcPlugin) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { - p.mutex.RLock() - if p.client == nil || p.client.Exited() || p.pluginClient == nil { - p.mutex.RUnlock() + pluginClient, ok := p.getPluginClient() + if !ok { return backendplugin.ErrPluginUnavailable } - pluginClient := p.pluginClient - p.mutex.RUnlock() - return pluginClient.CallResource(ctx, req, sender) } -func (p *grpcPlugin) CanSubscribeToStream(ctx context.Context, request *backend.SubscribeToStreamRequest) (*backend.SubscribeToStreamResponse, error) { - p.mutex.RLock() - if p.client == nil || p.client.Exited() || p.pluginClient == nil { - p.mutex.RUnlock() +func (p *grpcPlugin) SubscribeStream(ctx context.Context, request *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) { + pluginClient, ok := p.getPluginClient() + if !ok { return nil, backendplugin.ErrPluginUnavailable } - pluginClient := p.pluginClient - p.mutex.RUnlock() + return pluginClient.SubscribeStream(ctx, request) +} - return pluginClient.CanSubscribeToStream(ctx, request) +func (p *grpcPlugin) PublishStream(ctx context.Context, request *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) { + pluginClient, ok := p.getPluginClient() + if !ok { + return nil, backendplugin.ErrPluginUnavailable + } + return pluginClient.PublishStream(ctx, request) } func (p *grpcPlugin) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender backend.StreamPacketSender) error { - p.mutex.RLock() - if p.client == nil || p.client.Exited() || p.pluginClient == nil { - p.mutex.RUnlock() + pluginClient, ok := p.getPluginClient() + if !ok { return backendplugin.ErrPluginUnavailable } - pluginClient := p.pluginClient - p.mutex.RUnlock() - return pluginClient.RunStream(ctx, req, sender) } diff --git a/pkg/plugins/backendplugin/manager/manager_test.go b/pkg/plugins/backendplugin/manager/manager_test.go index 960337b0774..d23631a3d71 100644 --- a/pkg/plugins/backendplugin/manager/manager_test.go +++ b/pkg/plugins/backendplugin/manager/manager_test.go @@ -392,7 +392,11 @@ func (tp *testPlugin) CallResource(ctx context.Context, req *backend.CallResourc return backendplugin.ErrMethodNotImplemented } -func (tp *testPlugin) CanSubscribeToStream(ctx context.Context, request *backend.SubscribeToStreamRequest) (*backend.SubscribeToStreamResponse, error) { +func (tp *testPlugin) SubscribeStream(ctx context.Context, request *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) { + return nil, backendplugin.ErrMethodNotImplemented +} + +func (tp *testPlugin) PublishStream(ctx context.Context, request *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) { return nil, backendplugin.ErrMethodNotImplemented } diff --git a/pkg/services/live/features/broadcast.go b/pkg/services/live/features/broadcast.go index 9dd1c879450..a767cc7d35e 100644 --- a/pkg/services/live/features/broadcast.go +++ b/pkg/services/live/features/broadcast.go @@ -4,6 +4,8 @@ import ( "context" "time" + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana/pkg/models" ) @@ -17,18 +19,18 @@ func (b *BroadcastRunner) GetHandlerForPath(path string) (models.ChannelHandler, } // OnSubscribe will let anyone connect to the path -func (b *BroadcastRunner) OnSubscribe(ctx context.Context, _ *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, bool, error) { +func (b *BroadcastRunner) OnSubscribe(ctx context.Context, _ *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, backend.SubscribeStreamStatus, error) { return models.SubscribeReply{ Presence: true, JoinLeave: true, Recover: true, // loads the saved value from history - }, true, nil + }, backend.SubscribeStreamStatusOK, nil } // OnPublish is called when a client wants to broadcast on the websocket -func (b *BroadcastRunner) OnPublish(ctx context.Context, _ *models.SignedInUser, e models.PublishEvent) (models.PublishReply, bool, error) { +func (b *BroadcastRunner) OnPublish(ctx context.Context, _ *models.SignedInUser, e models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) { return models.PublishReply{ HistorySize: 1, // The last message is saved for 10 min. HistoryTTL: 10 * time.Minute, - }, true, nil + }, backend.PublishStreamStatusOK, nil } diff --git a/pkg/services/live/features/dashboard.go b/pkg/services/live/features/dashboard.go index 5fc07736914..cca303ca97b 100644 --- a/pkg/services/live/features/dashboard.go +++ b/pkg/services/live/features/dashboard.go @@ -4,6 +4,8 @@ import ( "context" "encoding/json" + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana/pkg/models" ) @@ -26,16 +28,16 @@ func (h *DashboardHandler) GetHandlerForPath(path string) (models.ChannelHandler } // OnSubscribe for now allows anyone to subscribe to any dashboard -func (h *DashboardHandler) OnSubscribe(ctx context.Context, _ *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, bool, error) { +func (h *DashboardHandler) OnSubscribe(ctx context.Context, _ *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, backend.SubscribeStreamStatus, error) { return models.SubscribeReply{ Presence: true, JoinLeave: true, - }, true, nil + }, backend.SubscribeStreamStatusOK, nil } -// OnPublish is called when someone begins to edit a dashoard -func (h *DashboardHandler) OnPublish(ctx context.Context, _ *models.SignedInUser, e models.PublishEvent) (models.PublishReply, bool, error) { - return models.PublishReply{}, true, nil +// OnPublish is called when someone begins to edit a dashboard +func (h *DashboardHandler) OnPublish(ctx context.Context, _ *models.SignedInUser, e models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) { + return models.PublishReply{}, backend.PublishStreamStatusOK, nil } // DashboardSaved should broadcast to the appropriate stream diff --git a/pkg/services/live/features/measurements.go b/pkg/services/live/features/measurements.go index ca8ed346107..6c7960f9bda 100644 --- a/pkg/services/live/features/measurements.go +++ b/pkg/services/live/features/measurements.go @@ -3,6 +3,8 @@ package features import ( "context" + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/models" ) @@ -23,12 +25,12 @@ func (m *MeasurementsRunner) GetHandlerForPath(path string) (models.ChannelHandl } // OnSubscribe will let anyone connect to the path -func (m *MeasurementsRunner) OnSubscribe(ctx context.Context, _ *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, bool, error) { - return models.SubscribeReply{}, true, nil +func (m *MeasurementsRunner) OnSubscribe(ctx context.Context, _ *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, backend.SubscribeStreamStatus, error) { + return models.SubscribeReply{}, backend.SubscribeStreamStatusOK, nil } // OnPublish is called when a client wants to broadcast on the websocket // Currently this sends measurements over websocket -- should be replaced with the HTTP interface -func (m *MeasurementsRunner) OnPublish(ctx context.Context, _ *models.SignedInUser, e models.PublishEvent) (models.PublishReply, bool, error) { - return models.PublishReply{}, true, nil +func (m *MeasurementsRunner) OnPublish(ctx context.Context, _ *models.SignedInUser, e models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) { + return models.PublishReply{}, backend.PublishStreamStatusOK, nil } diff --git a/pkg/services/live/features/mock.go b/pkg/services/live/features/mock.go index d8d4244673f..9f21ef02fbd 100644 --- a/pkg/services/live/features/mock.go +++ b/pkg/services/live/features/mock.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/grafana/grafana/pkg/services/live/features (interfaces: ChannelPublisher,PresenceGetter,PluginContextGetter,StreamRunner) +// Source: github.com/grafana/grafana/pkg/services/live/features (interfaces: StreamPacketSender,PresenceGetter,PluginContextGetter,StreamRunner) // Package features is a generated GoMock package. package features @@ -10,43 +10,44 @@ import ( gomock "github.com/golang/mock/gomock" backend "github.com/grafana/grafana-plugin-sdk-go/backend" + models "github.com/grafana/grafana/pkg/models" ) -// MockChannelPublisher is a mock of ChannelPublisher interface. -type MockChannelPublisher struct { +// MockStreamPacketSender is a mock of StreamPacketSender interface. +type MockStreamPacketSender struct { ctrl *gomock.Controller - recorder *MockChannelPublisherMockRecorder + recorder *MockStreamPacketSenderMockRecorder } -// MockChannelPublisherMockRecorder is the mock recorder for MockChannelPublisher. -type MockChannelPublisherMockRecorder struct { - mock *MockChannelPublisher +// MockStreamPacketSenderMockRecorder is the mock recorder for MockStreamPacketSender. +type MockStreamPacketSenderMockRecorder struct { + mock *MockStreamPacketSender } -// NewMockChannelPublisher creates a new mock instance. -func NewMockChannelPublisher(ctrl *gomock.Controller) *MockChannelPublisher { - mock := &MockChannelPublisher{ctrl: ctrl} - mock.recorder = &MockChannelPublisherMockRecorder{mock} +// NewMockStreamPacketSender creates a new mock instance. +func NewMockStreamPacketSender(ctrl *gomock.Controller) *MockStreamPacketSender { + mock := &MockStreamPacketSender{ctrl: ctrl} + mock.recorder = &MockStreamPacketSenderMockRecorder{mock} return mock } // EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockChannelPublisher) EXPECT() *MockChannelPublisherMockRecorder { +func (m *MockStreamPacketSender) EXPECT() *MockStreamPacketSenderMockRecorder { return m.recorder } -// Publish mocks base method. -func (m *MockChannelPublisher) Publish(arg0 string, arg1 []byte) error { +// Send mocks base method. +func (m *MockStreamPacketSender) Send(arg0 string, arg1 *backend.StreamPacket) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Publish", arg0, arg1) + ret := m.ctrl.Call(m, "Send", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } -// Publish indicates an expected call of Publish. -func (mr *MockChannelPublisherMockRecorder) Publish(arg0, arg1 interface{}) *gomock.Call { +// Send indicates an expected call of Send. +func (mr *MockStreamPacketSenderMockRecorder) Send(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Publish", reflect.TypeOf((*MockChannelPublisher)(nil).Publish), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockStreamPacketSender)(nil).Send), arg0, arg1) } // MockPresenceGetter is a mock of PresenceGetter interface. @@ -111,7 +112,7 @@ func (m *MockPluginContextGetter) EXPECT() *MockPluginContextGetterMockRecorder } // GetPluginContext mocks base method. -func (m *MockPluginContextGetter) GetPluginContext(arg0 context.Context, arg1, arg2 string) (backend.PluginContext, bool, error) { +func (m *MockPluginContextGetter) GetPluginContext(arg0 *models.SignedInUser, arg1, arg2 string) (backend.PluginContext, bool, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetPluginContext", arg0, arg1, arg2) ret0, _ := ret[0].(backend.PluginContext) diff --git a/pkg/services/live/features/plugin.go b/pkg/services/live/features/plugin.go index 422db56cc34..cd16d6e0108 100644 --- a/pkg/services/live/features/plugin.go +++ b/pkg/services/live/features/plugin.go @@ -2,17 +2,16 @@ package features import ( "context" - "fmt" "github.com/centrifugal/centrifuge" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana/pkg/models" ) -//go:generate mockgen -destination=mock.go -package=features github.com/grafana/grafana/pkg/services/live/features ChannelPublisher,PresenceGetter,PluginContextGetter,StreamRunner +//go:generate mockgen -destination=mock.go -package=features github.com/grafana/grafana/pkg/services/live/features StreamPacketSender,PresenceGetter,PluginContextGetter,StreamRunner -type ChannelPublisher interface { - Publish(channel string, data []byte) error +type StreamPacketSender interface { + Send(channel string, packet *backend.StreamPacket) error } type PresenceGetter interface { @@ -28,16 +27,19 @@ type StreamRunner interface { } type streamSender struct { - channel string - channelPublisher ChannelPublisher + channel string + packetSender StreamPacketSender } -func newStreamSender(channel string, publisher ChannelPublisher) *streamSender { - return &streamSender{channel: channel, channelPublisher: publisher} +func newStreamSender(channel string, packetSender StreamPacketSender) *streamSender { + return &streamSender{ + channel: channel, + packetSender: packetSender, + } } func (p *streamSender) Send(packet *backend.StreamPacket) error { - return p.channelPublisher.Publish(p.channel, packet.Payload) + return p.packetSender.Send(p.channel, packet) } // PluginRunner can handle streaming operations for channels belonging to plugins. @@ -83,39 +85,70 @@ type PluginPathRunner struct { } // OnSubscribe passes control to a plugin. -func (r *PluginPathRunner) OnSubscribe(ctx context.Context, user *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, bool, error) { +func (r *PluginPathRunner) OnSubscribe(ctx context.Context, user *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, backend.SubscribeStreamStatus, error) { pCtx, found, err := r.pluginContextGetter.GetPluginContext(user, r.pluginID, r.datasourceUID) if err != nil { logger.Error("Get plugin context error", "error", err, "path", r.path) - return models.SubscribeReply{}, false, err + return models.SubscribeReply{}, 0, err } if !found { logger.Error("Plugin context not found", "path", r.path) - return models.SubscribeReply{}, false, centrifuge.ErrorInternal + return models.SubscribeReply{}, 0, centrifuge.ErrorInternal } - resp, err := r.handler.CanSubscribeToStream(ctx, &backend.SubscribeToStreamRequest{ + resp, err := r.handler.SubscribeStream(ctx, &backend.SubscribeStreamRequest{ PluginContext: pCtx, Path: r.path, }) if err != nil { logger.Error("Plugin CanSubscribeToStream call error", "error", err, "path", r.path) - return models.SubscribeReply{}, false, err + return models.SubscribeReply{}, 0, err } - if !resp.OK { - return models.SubscribeReply{}, false, nil + if resp.Status != backend.SubscribeStreamStatusOK { + return models.SubscribeReply{}, resp.Status, nil } - err = r.streamManager.SubmitStream(e.Channel, r.path, pCtx, r.handler) - if err != nil { - logger.Error("Error submitting stream to manager", "error", err, "path", r.path) - return models.SubscribeReply{}, false, centrifuge.ErrorInternal + + if resp.UseRunStream { + submitResult, err := r.streamManager.SubmitStream(ctx, e.Channel, r.path, pCtx, r.handler) + if err != nil { + logger.Error("Error submitting stream to manager", "error", err, "path", r.path) + return models.SubscribeReply{}, 0, centrifuge.ErrorInternal + } + if submitResult.StreamExists { + logger.Debug("Skip running new stream (already exists)", "path", r.path) + } else { + logger.Debug("Running a new keepalive stream", "path", r.path) + } } - return models.SubscribeReply{ - Presence: true, - }, true, nil + + reply := models.SubscribeReply{ + Presence: resp.UseRunStream, // only enable presence for streams with UseRunStream on at the moment. + Data: resp.Data, + } + return reply, backend.SubscribeStreamStatusOK, nil } // OnPublish passes control to a plugin. -func (r *PluginPathRunner) OnPublish(_ context.Context, _ *models.SignedInUser, _ models.PublishEvent) (models.PublishReply, bool, error) { - // TODO: pass control to a plugin. - return models.PublishReply{}, false, fmt.Errorf("not implemented yet") +func (r *PluginPathRunner) OnPublish(ctx context.Context, user *models.SignedInUser, e models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) { + pCtx, found, err := r.pluginContextGetter.GetPluginContext(user, r.pluginID, r.datasourceUID) + if err != nil { + logger.Error("Get plugin context error", "error", err, "path", r.path) + return models.PublishReply{}, 0, err + } + if !found { + logger.Error("Plugin context not found", "path", r.path) + return models.PublishReply{}, 0, centrifuge.ErrorInternal + } + resp, err := r.handler.PublishStream(ctx, &backend.PublishStreamRequest{ + PluginContext: pCtx, + Path: r.path, + Data: e.Data, + }) + if err != nil { + logger.Error("Plugin CanSubscribeToStream call error", "error", err, "path", r.path) + return models.PublishReply{}, 0, err + } + if resp.Status != backend.PublishStreamStatusOK { + return models.PublishReply{}, resp.Status, nil + } + return models.PublishReply{Data: resp.Data}, backend.PublishStreamStatusOK, nil } diff --git a/pkg/services/live/features/stream.go b/pkg/services/live/features/stream.go index 8ef23f08e28..95d84629e32 100644 --- a/pkg/services/live/features/stream.go +++ b/pkg/services/live/features/stream.go @@ -11,14 +11,14 @@ import ( // StreamManager manages streams from Grafana to plugins. type StreamManager struct { - mu sync.RWMutex - streams map[string]struct{} - presenceGetter PresenceGetter - channelPublisher ChannelPublisher - registerCh chan streamRequest - closedCh chan struct{} - checkInterval time.Duration - maxChecks int + mu sync.RWMutex + streams map[string]struct{} + presenceGetter PresenceGetter + packetSender StreamPacketSender + registerCh chan submitRequest + closedCh chan struct{} + checkInterval time.Duration + maxChecks int } // StreamManagerOption modifies StreamManager behavior (used for tests for example). @@ -38,15 +38,15 @@ const ( ) // NewStreamManager creates new StreamManager. -func NewStreamManager(chPublisher ChannelPublisher, presenceGetter PresenceGetter, opts ...StreamManagerOption) *StreamManager { +func NewStreamManager(packetSender StreamPacketSender, presenceGetter PresenceGetter, opts ...StreamManagerOption) *StreamManager { sm := &StreamManager{ - streams: make(map[string]struct{}), - channelPublisher: chPublisher, - presenceGetter: presenceGetter, - registerCh: make(chan streamRequest), - closedCh: make(chan struct{}), - checkInterval: defaultCheckInterval, - maxChecks: defaultMaxChecks, + streams: make(map[string]struct{}), + packetSender: packetSender, + presenceGetter: presenceGetter, + registerCh: make(chan submitRequest), + closedCh: make(chan struct{}), + checkInterval: defaultCheckInterval, + maxChecks: defaultMaxChecks, } for _, opt := range opts { opt(sm) @@ -80,7 +80,7 @@ func (s *StreamManager) watchStream(ctx context.Context, cancelFn func(), sr str } numNoSubscribersChecks++ if numNoSubscribersChecks >= s.maxChecks { - logger.Info("Stop stream since no active subscribers", "channel", sr.Channel, "path", sr.Path) + logger.Debug("Stop stream since no active subscribers", "channel", sr.Channel, "path", sr.Path) s.stopStream(sr, cancelFn) return } @@ -102,11 +102,11 @@ func (s *StreamManager) runStream(ctx context.Context, sr streamRequest) { PluginContext: sr.PluginContext, Path: sr.Path, }, - newStreamSender(sr.Channel, s.channelPublisher), + newStreamSender(sr.Channel, s.packetSender), ) if err != nil { if errors.Is(ctx.Err(), context.Canceled) { - logger.Info("Stream cleanly finished", "path", sr.Path) + logger.Debug("Stream cleanly finished", "path", sr.Path) return } logger.Error("Error running stream, retrying", "path", sr.Path, "error", err) @@ -117,20 +117,22 @@ func (s *StreamManager) runStream(ctx context.Context, sr streamRequest) { } } -func (s *StreamManager) registerStream(ctx context.Context, sr streamRequest) { +var errClosed = errors.New("stream manager closed") + +func (s *StreamManager) registerStream(ctx context.Context, sr submitRequest) { s.mu.Lock() - if _, ok := s.streams[sr.Channel]; ok { - logger.Debug("Skip running new stream (already exists)", "path", sr.Path) + if _, ok := s.streams[sr.streamRequest.Channel]; ok { s.mu.Unlock() + sr.responseCh <- submitResponse{Result: submitResult{StreamExists: true}} return } ctx, cancel := context.WithCancel(ctx) defer cancel() - s.streams[sr.Channel] = struct{}{} + s.streams[sr.streamRequest.Channel] = struct{}{} s.mu.Unlock() - - go s.watchStream(ctx, cancel, sr) - s.runStream(ctx, sr) + sr.responseCh <- submitResponse{Result: submitResult{StreamExists: false}} + go s.watchStream(ctx, cancel, sr.streamRequest) + s.runStream(ctx, sr.streamRequest) } // Run StreamManager till context canceled. @@ -153,21 +155,53 @@ type streamRequest struct { StreamRunner StreamRunner } +type submitRequest struct { + responseCh chan submitResponse + streamRequest streamRequest +} + +type submitResult struct { + StreamExists bool +} + +type submitResponse struct { + Error error + Result submitResult +} + // SubmitStream submits stream handler in StreamManager to manage. // The stream will be opened and kept till channel has active subscribers. -func (s *StreamManager) SubmitStream(channel string, path string, pCtx backend.PluginContext, streamRunner StreamRunner) error { +func (s *StreamManager) SubmitStream(ctx context.Context, channel string, path string, pCtx backend.PluginContext, streamRunner StreamRunner) (*submitResult, error) { + req := submitRequest{ + responseCh: make(chan submitResponse, 1), + streamRequest: streamRequest{ + Channel: channel, + Path: path, + PluginContext: pCtx, + StreamRunner: streamRunner, + }, + } + + // Send submit request. select { + case s.registerCh <- req: case <-s.closedCh: close(s.registerCh) - return nil - case s.registerCh <- streamRequest{ - Channel: channel, - Path: path, - PluginContext: pCtx, - StreamRunner: streamRunner, - }: - case <-time.After(time.Second): - return errors.New("timeout") + return nil, errClosed + case <-ctx.Done(): + return nil, ctx.Err() + } + + // Wait for submit response. + select { + case resp := <-req.responseCh: + if resp.Error != nil { + return nil, resp.Error + } + return &resp.Result, nil + case <-s.closedCh: + return nil, errClosed + case <-ctx.Done(): + return nil, ctx.Err() } - return nil } diff --git a/pkg/services/live/features/stream_test.go b/pkg/services/live/features/stream_test.go index 5d42ff8ed05..53d7a40b668 100644 --- a/pkg/services/live/features/stream_test.go +++ b/pkg/services/live/features/stream_test.go @@ -24,10 +24,10 @@ func TestStreamManager_Run(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - mockChannelPublisher := NewMockChannelPublisher(mockCtrl) + mockPacketSender := NewMockStreamPacketSender(mockCtrl) mockPresenceGetter := NewMockPresenceGetter(mockCtrl) - manager := NewStreamManager(mockChannelPublisher, mockPresenceGetter) + manager := NewStreamManager(mockPacketSender, mockPresenceGetter) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -44,10 +44,10 @@ func TestStreamManager_SubmitStream_Send(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - mockChannelPublisher := NewMockChannelPublisher(mockCtrl) + mockPacketSender := NewMockStreamPacketSender(mockCtrl) mockPresenceGetter := NewMockPresenceGetter(mockCtrl) - manager := NewStreamManager(mockChannelPublisher, mockPresenceGetter) + manager := NewStreamManager(mockPacketSender, mockPresenceGetter) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -58,7 +58,7 @@ func TestStreamManager_SubmitStream_Send(t *testing.T) { startedCh := make(chan struct{}) doneCh := make(chan struct{}) - mockChannelPublisher.EXPECT().Publish("test", []byte("test")).Times(1) + mockPacketSender.EXPECT().Send("test", gomock.Any()).Times(1) mockStreamRunner := NewMockStreamRunner(mockCtrl) mockStreamRunner.EXPECT().RunStream( @@ -67,7 +67,7 @@ func TestStreamManager_SubmitStream_Send(t *testing.T) { require.Equal(t, "test", req.Path) close(startedCh) err := sender.Send(&backend.StreamPacket{ - Payload: []byte("test"), + Data: []byte("test"), }) require.NoError(t, err) <-ctx.Done() @@ -75,12 +75,14 @@ func TestStreamManager_SubmitStream_Send(t *testing.T) { return ctx.Err() }).Times(1) - err := manager.SubmitStream("test", "test", backend.PluginContext{}, mockStreamRunner) + result, err := manager.SubmitStream(context.Background(), "test", "test", backend.PluginContext{}, mockStreamRunner) require.NoError(t, err) + require.False(t, result.StreamExists) // try submit the same. - err = manager.SubmitStream("test", "test", backend.PluginContext{}, mockStreamRunner) + result, err = manager.SubmitStream(context.Background(), "test", "test", backend.PluginContext{}, mockStreamRunner) require.NoError(t, err) + require.True(t, result.StreamExists) waitWithTimeout(t, startedCh, time.Second) require.Len(t, manager.streams, 1) @@ -92,11 +94,11 @@ func TestStreamManager_SubmitStream_CloseNoSubscribers(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - mockChannelPublisher := NewMockChannelPublisher(mockCtrl) + mockPacketSender := NewMockStreamPacketSender(mockCtrl) mockPresenceGetter := NewMockPresenceGetter(mockCtrl) manager := NewStreamManager( - mockChannelPublisher, + mockPacketSender, mockPresenceGetter, WithCheckConfig(10*time.Millisecond, 3), ) @@ -120,7 +122,7 @@ func TestStreamManager_SubmitStream_CloseNoSubscribers(t *testing.T) { return ctx.Err() }).Times(1) - err := manager.SubmitStream("test", "test", backend.PluginContext{}, mockStreamRunner) + _, err := manager.SubmitStream(context.Background(), "test", "test", backend.PluginContext{}, mockStreamRunner) require.NoError(t, err) waitWithTimeout(t, startedCh, time.Second) diff --git a/pkg/services/live/live.go b/pkg/services/live/live.go index 840e841d16f..320e1f393b3 100644 --- a/pkg/services/live/live.go +++ b/pkg/services/live/live.go @@ -108,6 +108,7 @@ func (g *GrafanaLive) Init() error { // cfg.LogLevel = centrifuge.LogLevelDebug cfg.LogHandler = handleLog + cfg.LogLevel = centrifuge.LogLevelError // Node is the core object in Centrifuge library responsible for many useful // things. For example Node allows to publish messages to channels from server @@ -119,10 +120,9 @@ func (g *GrafanaLive) Init() error { g.node = node g.contextGetter = newPluginContextGetter(g.PluginContextProvider) - - channelPublisher := newPluginChannelPublisher(node) + packetSender := newPluginPacketSender(node) presenceGetter := newPluginPresenceGetter(node) - g.streamManager = features.NewStreamManager(channelPublisher, presenceGetter) + g.streamManager = features.NewStreamManager(packetSender, presenceGetter) // Initialize the main features dash := &features.DashboardHandler{ @@ -154,23 +154,29 @@ func (g *GrafanaLive) Init() error { logger.Error("Error getting channel handler", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err) cb(centrifuge.SubscribeReply{}, err) } else { - reply, allowed, err := handler.OnSubscribe(client.Context(), user, models.SubscribeEvent{ + reply, status, err := handler.OnSubscribe(client.Context(), user, models.SubscribeEvent{ Channel: e.Channel, Path: addr.Path, }) if err != nil { + logger.Error("Error calling channel handler subscribe", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err) cb(centrifuge.SubscribeReply{}, centrifuge.ErrorInternal) return } - if !allowed { - cb(centrifuge.SubscribeReply{}, centrifuge.ErrorPermissionDenied) + if status != backend.SubscribeStreamStatusOK { + // using HTTP error codes for WS errors too. + code, text := subscribeStatusToHTTPError(status) + logger.Debug("Return custom subscribe error", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "code", code) + cb(centrifuge.SubscribeReply{}, ¢rifuge.Error{Code: uint32(code), Message: text}) return } + logger.Debug("Client subscribed", "user", client.UserID(), "client", client.ID(), "channel", e.Channel) cb(centrifuge.SubscribeReply{ Options: centrifuge.SubscribeOptions{ Presence: reply.Presence, JoinLeave: reply.JoinLeave, Recover: reply.Recover, + Data: reply.Data, }, }, nil) } @@ -192,24 +198,39 @@ func (g *GrafanaLive) Init() error { logger.Error("Error getting channel handler", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err) cb(centrifuge.PublishReply{}, err) } else { - reply, allowed, err := handler.OnPublish(client.Context(), user, models.PublishEvent{ + reply, status, err := handler.OnPublish(client.Context(), user, models.PublishEvent{ Channel: e.Channel, Path: addr.Path, }) if err != nil { + logger.Error("Error calling channel handler publish", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err) cb(centrifuge.PublishReply{}, centrifuge.ErrorInternal) return } - if !allowed { - cb(centrifuge.PublishReply{}, centrifuge.ErrorPermissionDenied) + if status != backend.PublishStreamStatusOK { + // using HTTP error codes for WS errors too. + code, text := publishStatusToHTTPError(status) + logger.Debug("Return custom publish error", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "code", code) + cb(centrifuge.PublishReply{}, ¢rifuge.Error{Code: uint32(code), Message: text}) return } - cb(centrifuge.PublishReply{ + centrifugeReply := centrifuge.PublishReply{ Options: centrifuge.PublishOptions{ HistorySize: reply.HistorySize, HistoryTTL: reply.HistoryTTL, }, - }, nil) + } + if reply.Data != nil { + result, err := g.node.Publish(e.Channel, reply.Data) + if err != nil { + logger.Error("Error publishing", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err) + cb(centrifuge.PublishReply{}, centrifuge.ErrorInternal) + return + } + centrifugeReply.Result = &result + } + logger.Debug("Publication successful", "user", client.UserID(), "client", client.ID(), "channel", e.Channel) + cb(centrifugeReply, nil) } }) @@ -254,6 +275,30 @@ func (g *GrafanaLive) Init() error { return nil } +func subscribeStatusToHTTPError(status backend.SubscribeStreamStatus) (int, string) { + switch status { + case backend.SubscribeStreamStatusNotFound: + return http.StatusNotFound, http.StatusText(http.StatusNotFound) + case backend.SubscribeStreamStatusPermissionDenied: + return http.StatusForbidden, http.StatusText(http.StatusForbidden) + default: + log.Warn("unknown subscribe status", "status", status) + return http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError) + } +} + +func publishStatusToHTTPError(status backend.PublishStreamStatus) (int, string) { + switch status { + case backend.PublishStreamStatusNotFound: + return http.StatusNotFound, http.StatusText(http.StatusNotFound) + case backend.PublishStreamStatusPermissionDenied: + return http.StatusForbidden, http.StatusText(http.StatusForbidden) + default: + log.Warn("unknown publish status", "status", status) + return http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError) + } +} + // GetChannelHandler gives thread-safe access to the channel. func (g *GrafanaLive) GetChannelHandler(user *models.SignedInUser, channel string) (models.ChannelHandler, ChannelAddress, error) { // Parse the identifier ${scope}/${namespace}/${path} @@ -330,7 +375,7 @@ func (g *GrafanaLive) handlePluginScope(_ *models.SignedInUser, namespace string } return features.NewPluginRunner( namespace, - "", + "", // No instance uid for non-datasource plugins. g.streamManager, g.contextGetter, streamHandler, @@ -342,9 +387,9 @@ func (g *GrafanaLive) handleDatasourceScope(user *models.SignedInUser, namespace if err != nil { return nil, fmt.Errorf("error getting datasource: %w", err) } - streamHandler, err := g.getStreamPlugin(ds.Name) + streamHandler, err := g.getStreamPlugin(ds.Type) if err != nil { - return nil, fmt.Errorf("can't find stream plugin: %s", namespace) + return nil, fmt.Errorf("can't find stream plugin: %s", ds.Type) } return features.NewPluginRunner( ds.Type, @@ -372,7 +417,7 @@ func (g *GrafanaLive) HandleHTTPPublish(ctx *models.ReqContext, cmd dtos.LivePub return response.Error(http.StatusBadRequest, "Bad channel address", nil) } - logger.Debug("Publish API cmd", "cmd", cmd) + logger.Debug("Publish API cmd", "user", ctx.SignedInUser.UserId, "cmd", cmd) channelHandler, addr, err := g.GetChannelHandler(ctx.SignedInUser, cmd.Channel) if err != nil { @@ -380,15 +425,24 @@ func (g *GrafanaLive) HandleHTTPPublish(ctx *models.ReqContext, cmd dtos.LivePub return response.Error(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), nil) } - _, allowed, err := channelHandler.OnPublish(ctx.Req.Context(), ctx.SignedInUser, models.PublishEvent{Channel: cmd.Channel, Path: addr.Path, Data: cmd.Data}) + reply, status, err := channelHandler.OnPublish(ctx.Req.Context(), ctx.SignedInUser, models.PublishEvent{Channel: cmd.Channel, Path: addr.Path, Data: cmd.Data}) if err != nil { logger.Error("Error calling OnPublish", "error", err, "channel", cmd.Channel) return response.Error(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), nil) } - if !allowed { - return response.Error(http.StatusForbidden, http.StatusText(http.StatusForbidden), nil) + if status != backend.PublishStreamStatusOK { + code, text := publishStatusToHTTPError(status) + return response.Error(code, text, nil) } - return response.JSON(200, dtos.LivePublishResponse{}) + if reply.Data != nil { + _, err = g.node.Publish(cmd.Channel, cmd.Data) + if err != nil { + logger.Error("Error publish to channel", "error", err, "channel", cmd.Channel) + return response.Error(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), nil) + } + } + logger.Debug("Publication successful", "user", ctx.SignedInUser.UserId, "channel", cmd.Channel) + return response.JSON(http.StatusOK, dtos.LivePublishResponse{}) } // Write to the standard log15 logger diff --git a/pkg/services/live/plugin_helpers.go b/pkg/services/live/plugin_helpers.go index 81f4d649d0c..d1577e84735 100644 --- a/pkg/services/live/plugin_helpers.go +++ b/pkg/services/live/plugin_helpers.go @@ -8,16 +8,16 @@ import ( "github.com/grafana/grafana/pkg/plugins/plugincontext" ) -type pluginChannelPublisher struct { +type pluginPacketSender struct { node *centrifuge.Node } -func newPluginChannelPublisher(node *centrifuge.Node) *pluginChannelPublisher { - return &pluginChannelPublisher{node: node} +func newPluginPacketSender(node *centrifuge.Node) *pluginPacketSender { + return &pluginPacketSender{node: node} } -func (p *pluginChannelPublisher) Publish(channel string, data []byte) error { - _, err := p.node.Publish(channel, data) +func (p *pluginPacketSender) Send(channel string, packet *backend.StreamPacket) error { + _, err := p.node.Publish(channel, packet.Data) return err } diff --git a/pkg/tsdb/cloudwatch/live.go b/pkg/tsdb/cloudwatch/live.go index 03f3d709980..45554a07d8f 100644 --- a/pkg/tsdb/cloudwatch/live.go +++ b/pkg/tsdb/cloudwatch/live.go @@ -56,12 +56,12 @@ func (s *LogQueryRunnerSupplier) GetHandlerForPath(path string) (models.ChannelH } // OnSubscribe publishes results from the corresponding CloudWatch Logs query to the provided channel -func (r *logQueryRunner) OnSubscribe(ctx context.Context, user *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, bool, error) { +func (r *logQueryRunner) OnSubscribe(ctx context.Context, user *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, backend.SubscribeStreamStatus, error) { r.runningMu.Lock() defer r.runningMu.Unlock() if _, ok := r.running[e.Channel]; ok { - return models.SubscribeReply{}, true, nil + return models.SubscribeReply{}, backend.SubscribeStreamStatusOK, nil } r.running[e.Channel] = true @@ -71,12 +71,12 @@ func (r *logQueryRunner) OnSubscribe(ctx context.Context, user *models.SignedInU } }() - return models.SubscribeReply{}, true, nil + return models.SubscribeReply{}, backend.SubscribeStreamStatusOK, nil } // OnPublish checks if a message from the websocket can be broadcast on this channel -func (r *logQueryRunner) OnPublish(ctx context.Context, user *models.SignedInUser, e models.PublishEvent) (models.PublishReply, bool, error) { - return models.PublishReply{}, false, nil +func (r *logQueryRunner) OnPublish(ctx context.Context, user *models.SignedInUser, e models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) { + return models.PublishReply{}, backend.PublishStreamStatusPermissionDenied, nil } func (r *logQueryRunner) publishResults(channelName string) error { diff --git a/pkg/tsdb/testdatasource/stream_handler.go b/pkg/tsdb/testdatasource/stream_handler.go index 87ef99beeb8..e208b56c9d4 100644 --- a/pkg/tsdb/testdatasource/stream_handler.go +++ b/pkg/tsdb/testdatasource/stream_handler.go @@ -8,7 +8,6 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" - jsoniter "github.com/json-iterator/go" "github.com/grafana/grafana/pkg/cmd/grafana-cli/logger" "github.com/grafana/grafana/pkg/infra/log" @@ -16,17 +15,40 @@ import ( type testStreamHandler struct { logger log.Logger + frame *data.Frame } func newTestStreamHandler(logger log.Logger) *testStreamHandler { + frame := data.NewFrame("testdata", + data.NewField("Time", nil, make([]time.Time, 1)), + data.NewField("Value", nil, make([]float64, 1)), + data.NewField("Min", nil, make([]float64, 1)), + data.NewField("Max", nil, make([]float64, 1)), + ) return &testStreamHandler{ + frame: frame, logger: logger, } } -func (p *testStreamHandler) CanSubscribeToStream(_ context.Context, req *backend.SubscribeToStreamRequest) (*backend.SubscribeToStreamResponse, error) { +func (p *testStreamHandler) SubscribeStream(_ context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) { + schema, err := data.FrameToJSON(p.frame, true, false) + if err != nil { + return nil, err + } p.logger.Debug("Allowing access to stream", "path", req.Path, "user", req.PluginContext.User) - return &backend.SubscribeToStreamResponse{OK: true}, nil + return &backend.SubscribeStreamResponse{ + Status: backend.SubscribeStreamStatusOK, + Data: schema, + UseRunStream: true, + }, nil +} + +func (p *testStreamHandler) PublishStream(_ context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) { + p.logger.Debug("Attempt to publish into stream", "path", req.Path, "user", req.PluginContext.User) + return &backend.PublishStreamResponse{ + Status: backend.PublishStreamStatusPermissionDenied, + }, nil } func (p *testStreamHandler) RunStream(ctx context.Context, request *backend.RunStreamRequest, sender backend.StreamPacketSender) error { @@ -64,13 +86,6 @@ func (p *testStreamHandler) runTestStream(ctx context.Context, path string, conf ticker := time.NewTicker(conf.Interval) defer ticker.Stop() - frame := data.NewFrame("testdata", - data.NewField("Time", nil, make([]time.Time, 1)), - data.NewField("Value", nil, make([]float64, 1)), - data.NewField("Min", nil, make([]float64, 1)), - data.NewField("Max", nil, make([]float64, 1)), - ) - for { select { case <-ctx.Done(): @@ -83,19 +98,19 @@ func (p *testStreamHandler) runTestStream(ctx context.Context, path string, conf delta := rand.Float64() - 0.5 walker += delta - frame.Fields[0].Set(0, t) - frame.Fields[1].Set(0, walker) // Value - frame.Fields[2].Set(0, walker-((rand.Float64()*spread)+0.01)) // Min - frame.Fields[3].Set(0, walker+((rand.Float64()*spread)+0.01)) // Max + p.frame.Fields[0].Set(0, t) + p.frame.Fields[1].Set(0, walker) // Value + p.frame.Fields[2].Set(0, walker-((rand.Float64()*spread)+0.01)) // Min + p.frame.Fields[3].Set(0, walker+((rand.Float64()*spread)+0.01)) // Max - bytes, err := jsoniter.Marshal(frame) // schema + points + bytes, err := data.FrameToJSON(p.frame, false, true) if err != nil { logger.Warn("unable to marshal line", "error", err) continue } packet := &backend.StreamPacket{ - Payload: bytes, + Data: bytes, } if err := sender.Send(packet); err != nil { return err diff --git a/public/app/features/live/channel.ts b/public/app/features/live/channel.ts index 1b9d199249d..d3df7ad2306 100644 --- a/public/app/features/live/channel.ts +++ b/public/app/features/live/channel.ts @@ -87,7 +87,7 @@ export class CentrifugeLiveChannel implements Li this.currentStatus.timestamp = Date.now(); this.currentStatus.state = LiveChannelConnectionState.Connected; delete this.currentStatus.error; - this.sendStatus(); + this.sendStatus(ctx.data); }, unsubscribe: (ctx: UnsubscribeContext) => { this.currentStatus.timestamp = Date.now();