grafana/pkg/services/live/pipeline/data_output_local_subscribers.go

39 lines
1.0 KiB
Go

package pipeline
import (
"context"
"fmt"
"github.com/grafana/grafana/pkg/services/live/orgchannel"
"github.com/centrifugal/centrifuge"
)
type LocalSubscribersDataOutput struct {
// TODO: refactor to depend on interface (avoid Centrifuge dependency here).
node *centrifuge.Node
}
func NewLocalSubscribersDataOutput(node *centrifuge.Node) *LocalSubscribersDataOutput {
return &LocalSubscribersDataOutput{node: node}
}
const DataOutputTypeLocalSubscribers = "localSubscribers"
func (out *LocalSubscribersDataOutput) Type() string {
return DataOutputTypeLocalSubscribers
}
func (out *LocalSubscribersDataOutput) OutputData(_ context.Context, vars Vars, data []byte) ([]*ChannelData, error) {
channelID := vars.Channel
channel := orgchannel.PrependOrgID(vars.OrgID, channelID)
pub := &centrifuge.Publication{
Data: data,
}
err := out.node.Hub().BroadcastPublication(channel, pub, centrifuge.StreamPosition{})
if err != nil {
return nil, fmt.Errorf("error publishing %s: %w", string(data), err)
}
return nil, nil
}