2021-06-24 03:07:09 -05:00
|
|
|
|
package liveplugin
|
|
|
|
|
|
|
|
|
|
import (
|
2021-09-09 11:19:29 -05:00
|
|
|
|
"context"
|
2021-06-24 03:07:09 -05:00
|
|
|
|
"fmt"
|
|
|
|
|
|
|
|
|
|
"github.com/grafana/grafana/pkg/models"
|
|
|
|
|
"github.com/grafana/grafana/pkg/plugins/plugincontext"
|
2022-05-06 03:58:02 -05:00
|
|
|
|
"github.com/grafana/grafana/pkg/services/datasources"
|
2021-09-09 11:19:29 -05:00
|
|
|
|
"github.com/grafana/grafana/pkg/services/live/orgchannel"
|
|
|
|
|
"github.com/grafana/grafana/pkg/services/live/pipeline"
|
2021-06-24 03:07:09 -05:00
|
|
|
|
|
|
|
|
|
"github.com/centrifugal/centrifuge"
|
|
|
|
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type ChannelLocalPublisher struct {
|
2021-09-09 11:19:29 -05:00
|
|
|
|
node *centrifuge.Node
|
|
|
|
|
pipeline *pipeline.Pipeline
|
2021-06-24 03:07:09 -05:00
|
|
|
|
}
|
|
|
|
|
|
2021-09-09 11:19:29 -05:00
|
|
|
|
func NewChannelLocalPublisher(node *centrifuge.Node, pipeline *pipeline.Pipeline) *ChannelLocalPublisher {
|
|
|
|
|
return &ChannelLocalPublisher{node: node, pipeline: pipeline}
|
2021-06-24 03:07:09 -05:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *ChannelLocalPublisher) PublishLocal(channel string, data []byte) error {
|
2021-09-09 11:19:29 -05:00
|
|
|
|
if p.pipeline != nil {
|
|
|
|
|
orgID, channelID, err := orgchannel.StripOrgID(channel)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
ok, err := p.pipeline.ProcessInput(context.Background(), orgID, channelID, data)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if ok {
|
|
|
|
|
// if rule found – we are done here. If not - fall through and process as usual.
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
2021-06-24 03:07:09 -05:00
|
|
|
|
pub := ¢rifuge.Publication{
|
|
|
|
|
Data: data,
|
|
|
|
|
}
|
|
|
|
|
err := p.node.Hub().BroadcastPublication(channel, pub, centrifuge.StreamPosition{})
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("error publishing %s: %w", string(data), err)
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type NumLocalSubscribersGetter struct {
|
|
|
|
|
node *centrifuge.Node
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewNumLocalSubscribersGetter(node *centrifuge.Node) *NumLocalSubscribersGetter {
|
|
|
|
|
return &NumLocalSubscribersGetter{node: node}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *NumLocalSubscribersGetter) GetNumLocalSubscribers(channelID string) (int, error) {
|
|
|
|
|
return p.node.Hub().NumSubscribers(channelID), nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type ContextGetter struct {
|
2022-05-06 03:58:02 -05:00
|
|
|
|
pluginContextProvider *plugincontext.Provider
|
|
|
|
|
dataSourceCache datasources.CacheService
|
2021-06-24 03:07:09 -05:00
|
|
|
|
}
|
|
|
|
|
|
2022-05-06 03:58:02 -05:00
|
|
|
|
func NewContextGetter(pluginContextProvider *plugincontext.Provider, dataSourceCache datasources.CacheService) *ContextGetter {
|
2021-06-24 03:07:09 -05:00
|
|
|
|
return &ContextGetter{
|
2022-05-06 03:58:02 -05:00
|
|
|
|
pluginContextProvider: pluginContextProvider,
|
|
|
|
|
dataSourceCache: dataSourceCache,
|
2021-06-24 03:07:09 -05:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2021-12-20 10:05:33 -06:00
|
|
|
|
func (g *ContextGetter) GetPluginContext(ctx context.Context, user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) {
|
2022-05-06 03:58:02 -05:00
|
|
|
|
if datasourceUID == "" {
|
|
|
|
|
return g.pluginContextProvider.Get(ctx, pluginID, user)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ds, err := g.dataSourceCache.GetDatasourceByUID(ctx, datasourceUID, user, skipCache)
|
|
|
|
|
if err != nil {
|
2022-06-03 02:24:24 -05:00
|
|
|
|
return backend.PluginContext{}, false, fmt.Errorf("%v: %w", "Failed to get datasource", err)
|
2022-05-06 03:58:02 -05:00
|
|
|
|
}
|
|
|
|
|
return g.pluginContextProvider.GetWithDataSource(ctx, pluginID, user, ds)
|
2021-06-24 03:07:09 -05:00
|
|
|
|
}
|