2020-07-27 02:26:16 -05:00
package live
import (
2021-03-23 12:24:08 -05:00
"context"
2021-09-30 11:29:32 -05:00
"encoding/json"
2021-05-19 12:47:53 -05:00
"errors"
2020-07-27 02:26:16 -05:00
"fmt"
2021-09-30 11:29:32 -05:00
"io/ioutil"
2021-03-30 05:23:29 -05:00
"net/http"
2021-06-23 11:51:03 -05:00
"net/url"
2021-09-09 11:19:29 -05:00
"os"
2022-04-13 14:27:03 -05:00
"strconv"
2021-06-23 11:51:03 -05:00
"strings"
2020-10-01 12:46:14 -05:00
"sync"
2021-03-23 12:24:08 -05:00
"time"
2020-07-27 02:26:16 -05:00
2021-03-30 05:23:29 -05:00
"github.com/grafana/grafana/pkg/api/dtos"
"github.com/grafana/grafana/pkg/api/response"
2020-10-28 03:36:57 -05:00
"github.com/grafana/grafana/pkg/api/routing"
2021-05-03 12:29:23 -05:00
"github.com/grafana/grafana/pkg/infra/localcache"
2020-07-27 02:26:16 -05:00
"github.com/grafana/grafana/pkg/infra/log"
2021-09-22 09:28:40 -05:00
"github.com/grafana/grafana/pkg/infra/usagestats"
2021-04-27 09:19:36 -05:00
"github.com/grafana/grafana/pkg/middleware"
2020-07-27 02:26:16 -05:00
"github.com/grafana/grafana/pkg/models"
2021-11-01 04:53:33 -05:00
"github.com/grafana/grafana/pkg/plugins"
2021-03-23 12:24:08 -05:00
"github.com/grafana/grafana/pkg/plugins/plugincontext"
2022-04-12 11:30:50 -05:00
"github.com/grafana/grafana/pkg/services/accesscontrol"
2022-02-22 01:47:42 -06:00
"github.com/grafana/grafana/pkg/services/comments/commentmodel"
2022-05-17 13:52:22 -05:00
"github.com/grafana/grafana/pkg/services/dashboards"
2021-03-23 12:24:08 -05:00
"github.com/grafana/grafana/pkg/services/datasources"
2022-02-22 01:47:42 -06:00
"github.com/grafana/grafana/pkg/services/featuremgmt"
2021-04-30 13:06:33 -05:00
"github.com/grafana/grafana/pkg/services/live/database"
2020-10-01 12:46:14 -05:00
"github.com/grafana/grafana/pkg/services/live/features"
2021-04-26 05:17:49 -05:00
"github.com/grafana/grafana/pkg/services/live/livecontext"
2021-06-24 03:07:09 -05:00
"github.com/grafana/grafana/pkg/services/live/liveplugin"
2021-04-26 05:17:49 -05:00
"github.com/grafana/grafana/pkg/services/live/managedstream"
2021-05-11 14:03:04 -05:00
"github.com/grafana/grafana/pkg/services/live/orgchannel"
2021-09-09 11:19:29 -05:00
"github.com/grafana/grafana/pkg/services/live/pipeline"
2021-04-26 05:17:49 -05:00
"github.com/grafana/grafana/pkg/services/live/pushws"
2021-04-19 10:48:43 -05:00
"github.com/grafana/grafana/pkg/services/live/runstream"
2021-06-24 03:07:09 -05:00
"github.com/grafana/grafana/pkg/services/live/survey"
2022-08-10 04:56:48 -05:00
"github.com/grafana/grafana/pkg/services/org"
2022-02-22 01:47:42 -06:00
"github.com/grafana/grafana/pkg/services/query"
2021-11-12 05:16:39 -06:00
"github.com/grafana/grafana/pkg/services/secrets"
2021-04-30 13:06:33 -05:00
"github.com/grafana/grafana/pkg/services/sqlstore"
2022-08-10 04:56:48 -05:00
"github.com/grafana/grafana/pkg/services/user"
2020-10-28 03:36:57 -05:00
"github.com/grafana/grafana/pkg/setting"
2021-04-05 11:04:46 -05:00
"github.com/grafana/grafana/pkg/util"
2021-10-11 07:30:59 -05:00
"github.com/grafana/grafana/pkg/web"
2022-02-22 01:47:42 -06:00
"github.com/centrifugal/centrifuge"
"github.com/go-redis/redis/v8"
"github.com/gobwas/glob"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/live"
jsoniter "github.com/json-iterator/go"
2021-09-22 09:28:40 -05:00
"golang.org/x/sync/errgroup"
2020-07-27 02:26:16 -05:00
)
var (
logger = log . New ( "live" )
loggerCF = log . New ( "live.centrifuge" )
)
2020-10-01 12:46:14 -05:00
// CoreGrafanaScope list of core features
type CoreGrafanaScope struct {
2020-10-22 02:10:26 -05:00
Features map [ string ] models . ChannelHandlerFactory
2020-10-01 12:46:14 -05:00
// The generic service to advertise dashboard changes
Dashboards models . DashboardActivityChannel
}
2021-08-25 08:11:22 -05:00
func ProvideService ( plugCtxProvider * plugincontext . Provider , cfg * setting . Cfg , routeRegister routing . RouteRegister ,
2022-01-20 07:58:39 -06:00
pluginStore plugins . Store , cacheService * localcache . CacheService ,
2021-11-12 05:16:39 -06:00
dataSourceCache datasources . CacheService , sqlStore * sqlstore . SQLStore , secretsService secrets . Service ,
2022-02-22 01:47:42 -06:00
usageStatsService usagestats . Service , queryDataService * query . Service , toggles featuremgmt . FeatureToggles ,
2022-05-17 13:52:22 -05:00
accessControl accesscontrol . AccessControl , dashboardService dashboards . DashboardService ) ( * GrafanaLive , error ) {
2021-08-25 08:11:22 -05:00
g := & GrafanaLive {
Cfg : cfg ,
2022-01-26 11:44:20 -06:00
Features : toggles ,
2021-08-25 08:11:22 -05:00
PluginContextProvider : plugCtxProvider ,
RouteRegister : routeRegister ,
2021-11-01 04:53:33 -05:00
pluginStore : pluginStore ,
2021-08-25 08:11:22 -05:00
CacheService : cacheService ,
DataSourceCache : dataSourceCache ,
SQLStore : sqlStore ,
2021-11-12 05:16:39 -06:00
SecretsService : secretsService ,
2021-12-14 11:38:20 -06:00
queryDataService : queryDataService ,
2021-08-25 08:11:22 -05:00
channels : make ( map [ string ] models . ChannelHandler ) ,
GrafanaScope : CoreGrafanaScope {
Features : make ( map [ string ] models . ChannelHandlerFactory ) ,
} ,
2021-09-22 09:28:40 -05:00
usageStatsService : usageStatsService ,
2021-03-23 12:24:08 -05:00
}
2021-05-03 12:29:23 -05:00
2021-06-24 03:07:09 -05:00
logger . Debug ( "GrafanaLive initialization" , "ha" , g . IsHA ( ) )
2020-10-28 03:36:57 -05:00
2020-07-27 02:26:16 -05:00
// We use default config here as starting point. Default config contains
// reasonable values for available options.
2021-08-25 08:11:22 -05:00
scfg := centrifuge . DefaultConfig
2020-07-27 02:26:16 -05:00
2021-08-25 08:11:22 -05:00
// scfg.LogLevel = centrifuge.LogLevelDebug
scfg . LogHandler = handleLog
scfg . LogLevel = centrifuge . LogLevelError
scfg . MetricsNamespace = "grafana_live"
2020-07-27 02:26:16 -05:00
// Node is the core object in Centrifuge library responsible for many useful
// things. For example Node allows to publish messages to channels from server
2021-03-23 12:24:08 -05:00
// side with its Publish method.
2021-08-25 08:11:22 -05:00
node , err := centrifuge . New ( scfg )
2020-07-27 02:26:16 -05:00
if err != nil {
2021-08-25 08:11:22 -05:00
return nil , err
2020-07-27 02:26:16 -05:00
}
2020-10-28 03:36:57 -05:00
g . node = node
2020-07-27 02:26:16 -05:00
2021-06-24 03:07:09 -05:00
if g . IsHA ( ) {
// Configure HA with Redis. In this case Centrifuge nodes
// will be connected over Redis PUB/SUB. Presence will work
// globally since kept inside Redis.
redisAddress := g . Cfg . LiveHAEngineAddress
redisShardConfigs := [ ] centrifuge . RedisShardConfig {
{ Address : redisAddress } ,
}
var redisShards [ ] * centrifuge . RedisShard
for _ , redisConf := range redisShardConfigs {
redisShard , err := centrifuge . NewRedisShard ( node , redisConf )
if err != nil {
2021-08-25 08:11:22 -05:00
return nil , fmt . Errorf ( "error connecting to Live Redis: %v" , err )
2021-06-24 03:07:09 -05:00
}
redisShards = append ( redisShards , redisShard )
}
broker , err := centrifuge . NewRedisBroker ( node , centrifuge . RedisBrokerConfig {
Prefix : "gf_live" ,
// Use reasonably large expiration interval for stream meta key,
// much bigger than maximum HistoryLifetime value in Node config.
// This way stream meta data will expire, in some cases you may want
// to prevent its expiration setting this to zero value.
HistoryMetaTTL : 7 * 24 * time . Hour ,
// And configure a couple of shards to use.
Shards : redisShards ,
} )
if err != nil {
2021-08-25 08:11:22 -05:00
return nil , fmt . Errorf ( "error creating Live Redis broker: %v" , err )
2021-06-24 03:07:09 -05:00
}
node . SetBroker ( broker )
presenceManager , err := centrifuge . NewRedisPresenceManager ( node , centrifuge . RedisPresenceManagerConfig {
Prefix : "gf_live" ,
Shards : redisShards ,
} )
if err != nil {
2021-08-25 08:11:22 -05:00
return nil , fmt . Errorf ( "error creating Live Redis presence manager: %v" , err )
2021-06-24 03:07:09 -05:00
}
node . SetPresenceManager ( presenceManager )
}
2021-09-09 11:19:29 -05:00
channelLocalPublisher := liveplugin . NewChannelLocalPublisher ( node , nil )
2021-04-05 11:04:46 -05:00
2021-06-24 03:07:09 -05:00
var managedStreamRunner * managedstream . Runner
if g . IsHA ( ) {
redisClient := redis . NewClient ( & redis . Options {
Addr : g . Cfg . LiveHAEngineAddress ,
} )
2021-12-28 03:26:18 -06:00
cmd := redisClient . Ping ( context . Background ( ) )
2021-06-24 03:07:09 -05:00
if _ , err := cmd . Result ( ) ; err != nil {
2021-08-25 08:11:22 -05:00
return nil , fmt . Errorf ( "error pinging Redis: %v" , err )
2021-06-24 03:07:09 -05:00
}
managedStreamRunner = managedstream . NewRunner (
g . Publish ,
2021-09-09 11:19:29 -05:00
channelLocalPublisher ,
2021-06-24 03:07:09 -05:00
managedstream . NewRedisFrameCache ( redisClient ) ,
)
} else {
managedStreamRunner = managedstream . NewRunner (
g . Publish ,
2021-09-09 11:19:29 -05:00
channelLocalPublisher ,
2021-06-24 03:07:09 -05:00
managedstream . NewMemoryFrameCache ( ) ,
)
}
g . ManagedStreamRunner = managedStreamRunner
2022-01-26 11:44:20 -06:00
if g . Features . IsEnabled ( featuremgmt . FlagLivePipeline ) {
2021-09-09 11:19:29 -05:00
var builder pipeline . RuleBuilder
if os . Getenv ( "GF_LIVE_DEV_BUILDER" ) != "" {
builder = & pipeline . DevRuleBuilder {
2021-09-17 08:40:32 -05:00
Node : node ,
ManagedStream : g . ManagedStreamRunner ,
FrameStorage : pipeline . NewFrameStorage ( ) ,
ChannelHandlerGetter : g ,
2021-09-09 11:19:29 -05:00
}
} else {
2021-09-14 15:27:51 -05:00
storage := & pipeline . FileStorage {
2021-11-12 05:16:39 -06:00
DataPath : cfg . DataPath ,
SecretsService : g . SecretsService ,
2021-09-14 15:27:51 -05:00
}
2021-11-05 04:13:40 -05:00
g . pipelineStorage = storage
2021-09-09 11:19:29 -05:00
builder = & pipeline . StorageRuleBuilder {
2021-09-17 08:40:32 -05:00
Node : node ,
ManagedStream : g . ManagedStreamRunner ,
FrameStorage : pipeline . NewFrameStorage ( ) ,
2021-11-05 04:13:40 -05:00
Storage : storage ,
2021-09-17 08:40:32 -05:00
ChannelHandlerGetter : g ,
2021-11-12 05:16:39 -06:00
SecretsService : g . SecretsService ,
2021-09-09 11:19:29 -05:00
}
}
channelRuleGetter := pipeline . NewCacheSegmentedTree ( builder )
2021-10-06 12:43:25 -05:00
// Pre-build/validate channel rules for all organizations on start.
// This can be unreasonable to have in production scenario with many
// organizations.
2021-12-14 11:38:20 -06:00
orgQuery := & models . SearchOrgsQuery { }
2022-02-09 04:45:31 -06:00
err := sqlStore . SearchOrgs ( context . Background ( ) , orgQuery )
2021-10-06 12:43:25 -05:00
if err != nil {
return nil , fmt . Errorf ( "can't get org list: %w" , err )
}
2021-12-14 11:38:20 -06:00
for _ , org := range orgQuery . Result {
2021-10-06 12:43:25 -05:00
_ , _ , err := channelRuleGetter . Get ( org . Id , "" )
if err != nil {
return nil , fmt . Errorf ( "error building channel rules for org %d: %w" , org . Id , err )
}
}
2021-09-09 11:19:29 -05:00
g . Pipeline , err = pipeline . New ( channelRuleGetter )
if err != nil {
return nil , err
}
}
2022-05-06 03:58:02 -05:00
g . contextGetter = liveplugin . NewContextGetter ( g . PluginContextProvider , g . DataSourceCache )
2021-09-09 11:19:29 -05:00
pipelinedChannelLocalPublisher := liveplugin . NewChannelLocalPublisher ( node , g . Pipeline )
numLocalSubscribersGetter := liveplugin . NewNumLocalSubscribersGetter ( node )
g . runStreamManager = runstream . NewManager ( pipelinedChannelLocalPublisher , numLocalSubscribersGetter , g . contextGetter )
// Initialize the main features
dash := & features . DashboardHandler {
2022-05-17 13:52:22 -05:00
Publisher : g . Publish ,
ClientCount : g . ClientCount ,
Store : sqlStore ,
DashboardService : dashboardService ,
2021-09-09 11:19:29 -05:00
}
g . storage = database . NewStorage ( g . SQLStore , g . CacheService )
g . GrafanaScope . Dashboards = dash
g . GrafanaScope . Features [ "dashboard" ] = dash
g . GrafanaScope . Features [ "broadcast" ] = features . NewBroadcastRunner ( g . storage )
2022-05-17 13:52:22 -05:00
g . GrafanaScope . Features [ "comment" ] = features . NewCommentHandler ( commentmodel . NewPermissionChecker ( g . SQLStore , g . Features , accessControl , dashboardService ) )
2021-09-09 11:19:29 -05:00
2021-06-24 03:07:09 -05:00
g . surveyCaller = survey . NewCaller ( managedStreamRunner , node )
err = g . surveyCaller . SetupHandlers ( )
if err != nil {
2021-08-25 08:11:22 -05:00
return nil , err
2021-06-24 03:07:09 -05:00
}
2020-07-27 02:26:16 -05:00
// Set ConnectHandler called when client successfully connected to Node. Your code
// inside handler must be synchronized since it will be called concurrently from
// different goroutines (belonging to different client connections). This is also
// true for other event handlers.
2020-11-05 12:37:04 -06:00
node . OnConnect ( func ( client * centrifuge . Client ) {
2021-05-27 14:03:18 -05:00
numConnections := g . node . Hub ( ) . NumClients ( )
if g . Cfg . LiveMaxConnections >= 0 && numConnections > g . Cfg . LiveMaxConnections {
logger . Warn (
"Max number of Live connections reached, increase max_connections in [live] configuration section" ,
"user" , client . UserID ( ) , "client" , client . ID ( ) , "limit" , g . Cfg . LiveMaxConnections ,
)
client . Disconnect ( centrifuge . DisconnectConnectionLimit )
return
}
2021-05-03 12:29:23 -05:00
var semaphore chan struct { }
if clientConcurrency > 1 {
semaphore = make ( chan struct { } , clientConcurrency )
}
2021-03-23 12:24:08 -05:00
logger . Debug ( "Client connected" , "user" , client . UserID ( ) , "client" , client . ID ( ) )
connectedAt := time . Now ( )
2020-11-05 12:37:04 -06:00
2021-12-14 11:38:20 -06:00
// Called when client issues RPC (async request over Live connection).
client . OnRPC ( func ( e centrifuge . RPCEvent , cb centrifuge . RPCCallback ) {
err := runConcurrentlyIfNeeded ( client . Context ( ) , semaphore , func ( ) {
cb ( g . handleOnRPC ( client , e ) )
} )
if err != nil {
cb ( centrifuge . RPCReply { } , err )
}
} )
2021-05-03 12:29:23 -05:00
// Called when client subscribes to the channel.
2020-11-05 12:37:04 -06:00
client . OnSubscribe ( func ( e centrifuge . SubscribeEvent , cb centrifuge . SubscribeCallback ) {
2021-05-03 12:29:23 -05:00
err := runConcurrentlyIfNeeded ( client . Context ( ) , semaphore , func ( ) {
2021-12-28 03:26:18 -06:00
cb ( g . handleOnSubscribe ( context . Background ( ) , client , e ) )
2021-04-27 10:01:12 -05:00
} )
if err != nil {
2021-05-03 12:29:23 -05:00
cb ( centrifuge . SubscribeReply { } , err )
2021-04-27 10:01:12 -05:00
}
2020-11-05 12:37:04 -06:00
} )
2021-05-03 12:29:23 -05:00
// Called when a client publishes to the channel.
2020-11-05 12:37:04 -06:00
// In general, we should prefer writing to the HTTP API, but this
2021-03-04 00:06:42 -06:00
// allows some simple prototypes to work quickly.
2020-11-05 12:37:04 -06:00
client . OnPublish ( func ( e centrifuge . PublishEvent , cb centrifuge . PublishCallback ) {
2021-05-03 12:29:23 -05:00
err := runConcurrentlyIfNeeded ( client . Context ( ) , semaphore , func ( ) {
2021-12-28 03:26:18 -06:00
cb ( g . handleOnPublish ( context . Background ( ) , client , e ) )
2021-04-27 10:01:12 -05:00
} )
if err != nil {
2021-05-03 12:29:23 -05:00
cb ( centrifuge . PublishReply { } , err )
2021-04-27 10:01:12 -05:00
}
2020-11-05 12:37:04 -06:00
} )
2021-03-23 12:24:08 -05:00
2021-04-27 10:01:12 -05:00
client . OnDisconnect ( func ( e centrifuge . DisconnectEvent ) {
reason := "normal"
if e . Disconnect != nil {
reason = e . Disconnect . Reason
2021-06-24 03:07:09 -05:00
if e . Disconnect . Code == 3001 { // Shutdown
return
}
2021-04-27 10:01:12 -05:00
}
logger . Debug ( "Client disconnected" , "user" , client . UserID ( ) , "client" , client . ID ( ) , "reason" , reason , "elapsed" , time . Since ( connectedAt ) )
2021-03-23 12:24:08 -05:00
} )
2020-07-27 02:26:16 -05:00
} )
// Run node. This method does not block.
if err := node . Run ( ) ; err != nil {
2021-08-25 08:11:22 -05:00
return nil , err
2020-07-27 02:26:16 -05:00
}
2021-06-23 11:51:03 -05:00
appURL , err := url . Parse ( g . Cfg . AppURL )
if err != nil {
2021-08-25 08:11:22 -05:00
return nil , fmt . Errorf ( "error parsing AppURL %s: %w" , g . Cfg . AppURL , err )
2021-06-23 11:51:03 -05:00
}
2021-07-01 01:30:09 -05:00
originPatterns := g . Cfg . LiveAllowedOrigins
originGlobs , _ := setting . GetAllowedOriginGlobs ( originPatterns ) // error already checked on config load.
checkOrigin := getCheckOriginFunc ( appURL , originPatterns , originGlobs )
2021-03-04 00:06:42 -06:00
// Use a pure websocket transport.
2020-07-27 02:26:16 -05:00
wsHandler := centrifuge . NewWebsocketHandler ( node , centrifuge . WebsocketConfig {
ReadBufferSize : 1024 ,
WriteBufferSize : 1024 ,
2021-07-01 01:30:09 -05:00
CheckOrigin : checkOrigin ,
2020-07-27 02:26:16 -05:00
} )
2021-04-26 05:17:49 -05:00
pushWSHandler := pushws . NewHandler ( g . ManagedStreamRunner , pushws . Config {
ReadBufferSize : 1024 ,
WriteBufferSize : 1024 ,
2021-07-01 01:30:09 -05:00
CheckOrigin : checkOrigin ,
2021-04-26 05:17:49 -05:00
} )
2021-11-15 03:43:18 -06:00
pushPipelineWSHandler := pushws . NewPipelinePushHandler ( g . Pipeline , pushws . Config {
ReadBufferSize : 1024 ,
WriteBufferSize : 1024 ,
CheckOrigin : checkOrigin ,
} )
2021-04-26 05:17:49 -05:00
g . websocketHandler = func ( ctx * models . ReqContext ) {
2020-09-15 02:01:14 -05:00
user := ctx . SignedInUser
2021-03-04 00:06:42 -06:00
// Centrifuge expects Credentials in context with a current user ID.
2020-07-27 02:26:16 -05:00
cred := & centrifuge . Credentials {
2020-09-15 02:01:14 -05:00
UserID : fmt . Sprintf ( "%d" , user . UserId ) ,
2020-07-27 02:26:16 -05:00
}
newCtx := centrifuge . SetCredentials ( ctx . Req . Context ( ) , cred )
2021-04-26 05:17:49 -05:00
newCtx = livecontext . SetContextSignedUser ( newCtx , user )
2021-09-01 04:18:30 -05:00
r := ctx . Req . WithContext ( newCtx )
2021-03-04 00:06:42 -06:00
wsHandler . ServeHTTP ( ctx . Resp , r )
2020-07-27 02:26:16 -05:00
}
2020-10-28 03:36:57 -05:00
2021-04-26 05:17:49 -05:00
g . pushWebsocketHandler = func ( ctx * models . ReqContext ) {
user := ctx . SignedInUser
2021-04-27 09:19:36 -05:00
newCtx := livecontext . SetContextSignedUser ( ctx . Req . Context ( ) , user )
2021-10-11 07:30:59 -05:00
newCtx = livecontext . SetContextStreamID ( newCtx , web . Params ( ctx . Req ) [ ":streamId" ] )
2021-09-01 04:18:30 -05:00
r := ctx . Req . WithContext ( newCtx )
2021-04-26 05:17:49 -05:00
pushWSHandler . ServeHTTP ( ctx . Resp , r )
}
2020-10-28 03:36:57 -05:00
2021-11-15 03:43:18 -06:00
g . pushPipelineWebsocketHandler = func ( ctx * models . ReqContext ) {
user := ctx . SignedInUser
newCtx := livecontext . SetContextSignedUser ( ctx . Req . Context ( ) , user )
newCtx = livecontext . SetContextChannelID ( newCtx , web . Params ( ctx . Req ) [ "*" ] )
r := ctx . Req . WithContext ( newCtx )
pushPipelineWSHandler . ServeHTTP ( ctx . Resp , r )
}
2021-05-10 11:56:02 -05:00
g . RouteRegister . Group ( "/api/live" , func ( group routing . RouteRegister ) {
2021-04-27 09:19:36 -05:00
group . Get ( "/ws" , g . websocketHandler )
} , middleware . ReqSignedIn )
g . RouteRegister . Group ( "/api/live" , func ( group routing . RouteRegister ) {
2021-05-06 14:28:14 -05:00
group . Get ( "/push/:streamId" , g . pushWebsocketHandler )
2021-11-15 03:43:18 -06:00
group . Get ( "/pipeline/push/*" , g . pushPipelineWebsocketHandler )
2021-04-27 09:19:36 -05:00
} , middleware . ReqOrgAdmin )
2021-09-22 09:28:40 -05:00
g . registerUsageMetrics ( )
2021-08-25 08:11:22 -05:00
return g , nil
}
// GrafanaLive manages live real-time connections to Grafana (over WebSocket at this moment).
// The main concept here is Channel. Connections can subscribe to many channels. Each channel
// can have different permissions and properties but once a connection subscribed to a channel
// it starts receiving all messages published into this channel. Thus GrafanaLive is a PUB/SUB
// server.
type GrafanaLive struct {
PluginContextProvider * plugincontext . Provider
Cfg * setting . Cfg
2022-01-26 11:44:20 -06:00
Features featuremgmt . FeatureToggles
2021-08-25 08:11:22 -05:00
RouteRegister routing . RouteRegister
CacheService * localcache . CacheService
DataSourceCache datasources . CacheService
SQLStore * sqlstore . SQLStore
2021-11-12 05:16:39 -06:00
SecretsService secrets . Service
2021-11-01 04:53:33 -05:00
pluginStore plugins . Store
2021-12-14 11:38:20 -06:00
queryDataService * query . Service
2021-08-25 08:11:22 -05:00
node * centrifuge . Node
surveyCaller * survey . Caller
// Websocket handlers
2021-11-15 03:43:18 -06:00
websocketHandler interface { }
pushWebsocketHandler interface { }
pushPipelineWebsocketHandler interface { }
2021-08-25 08:11:22 -05:00
// Full channel handler
channels map [ string ] models . ChannelHandler
channelsMu sync . RWMutex
// The core internal features
GrafanaScope CoreGrafanaScope
ManagedStreamRunner * managedstream . Runner
2021-09-09 11:19:29 -05:00
Pipeline * pipeline . Pipeline
2021-11-05 04:13:40 -05:00
pipelineStorage pipeline . Storage
2021-08-25 08:11:22 -05:00
contextGetter * liveplugin . ContextGetter
runStreamManager * runstream . Manager
storage * database . Storage
2021-09-22 09:28:40 -05:00
usageStatsService usagestats . Service
usageStats usageStats
2021-09-07 11:50:28 -05:00
}
2021-12-28 03:26:18 -06:00
func ( g * GrafanaLive ) getStreamPlugin ( ctx context . Context , pluginID string ) ( backend . StreamHandler , error ) {
plugin , exists := g . pluginStore . Plugin ( ctx , pluginID )
2021-11-17 05:04:22 -06:00
if ! exists {
2021-08-25 08:11:22 -05:00
return nil , fmt . Errorf ( "plugin not found: %s" , pluginID )
}
2021-11-01 04:53:33 -05:00
if plugin . SupportsStreaming ( ) {
return plugin , nil
2021-08-25 08:11:22 -05:00
}
2021-11-01 04:53:33 -05:00
return nil , fmt . Errorf ( "%s plugin does not implement StreamHandler: %#v" , pluginID , plugin )
2021-08-25 08:11:22 -05:00
}
func ( g * GrafanaLive ) Run ( ctx context . Context ) error {
2021-09-22 09:28:40 -05:00
eGroup , eCtx := errgroup . WithContext ( ctx )
eGroup . Go ( func ( ) error {
updateStatsTicker := time . NewTicker ( time . Minute * 30 )
defer updateStatsTicker . Stop ( )
for {
select {
case <- updateStatsTicker . C :
g . sampleLiveStats ( )
case <- ctx . Done ( ) :
return ctx . Err ( )
}
}
} )
2021-08-25 08:11:22 -05:00
if g . runStreamManager != nil {
// Only run stream manager if GrafanaLive properly initialized.
2021-09-22 09:28:40 -05:00
eGroup . Go ( func ( ) error {
return g . runStreamManager . Run ( eCtx )
} )
2021-08-25 08:11:22 -05:00
}
2021-09-22 09:28:40 -05:00
return eGroup . Wait ( )
2020-07-27 02:26:16 -05:00
}
2021-07-01 01:30:09 -05:00
func getCheckOriginFunc ( appURL * url . URL , originPatterns [ ] string , originGlobs [ ] glob . Glob ) func ( r * http . Request ) bool {
return func ( r * http . Request ) bool {
origin := r . Header . Get ( "Origin" )
if origin == "" {
return true
}
if len ( originPatterns ) == 1 && originPatterns [ 0 ] == "*" {
// fast path for *.
return true
}
2021-08-27 05:26:28 -05:00
originURL , err := url . Parse ( strings . ToLower ( origin ) )
if err != nil {
logger . Warn ( "Failed to parse request origin" , "error" , err , "origin" , origin )
return false
}
if strings . EqualFold ( originURL . Host , r . Host ) {
return true
}
ok , err := checkAllowedOrigin ( origin , originURL , appURL , originGlobs )
2021-07-01 01:30:09 -05:00
if err != nil {
logger . Warn ( "Error parsing request origin" , "error" , err , "origin" , origin )
return false
}
if ! ok {
2021-08-27 05:26:28 -05:00
logger . Warn ( "Request Origin is not authorized" , "origin" , origin , "host" , r . Host , "appUrl" , appURL . String ( ) , "allowedOrigins" , strings . Join ( originPatterns , "," ) )
2021-07-01 01:30:09 -05:00
return false
}
2021-06-23 11:51:03 -05:00
return true
}
2021-07-01 01:30:09 -05:00
}
2021-08-27 05:26:28 -05:00
func checkAllowedOrigin ( origin string , originURL * url . URL , appURL * url . URL , originGlobs [ ] glob . Glob ) ( bool , error ) {
2021-07-17 05:38:33 -05:00
// Try to match over configured [server] root_url first.
if originURL . Port ( ) == "" {
if strings . EqualFold ( originURL . Scheme , appURL . Scheme ) && strings . EqualFold ( originURL . Host , appURL . Hostname ( ) ) {
return true , nil
}
} else {
if strings . EqualFold ( originURL . Scheme , appURL . Scheme ) && strings . EqualFold ( originURL . Host , appURL . Host ) {
return true , nil
}
2021-06-23 11:51:03 -05:00
}
2021-07-17 05:38:33 -05:00
// If there is still no match try [live] allowed_origins patterns.
2021-07-01 01:30:09 -05:00
for _ , pattern := range originGlobs {
if pattern . Match ( origin ) {
return true , nil
}
2021-06-23 11:51:03 -05:00
}
2021-07-01 01:30:09 -05:00
return false , nil
2021-06-23 11:51:03 -05:00
}
2021-12-14 11:38:20 -06:00
var clientConcurrency = 12
2021-08-25 08:11:22 -05:00
func ( g * GrafanaLive ) IsHA ( ) bool {
return g . Cfg != nil && g . Cfg . LiveHAEngine != ""
}
2021-05-03 12:29:23 -05:00
func runConcurrentlyIfNeeded ( ctx context . Context , semaphore chan struct { } , fn func ( ) ) error {
if cap ( semaphore ) > 1 {
select {
case semaphore <- struct { } { } :
case <- ctx . Done ( ) :
return ctx . Err ( )
}
go func ( ) {
defer func ( ) { <- semaphore } ( )
fn ( )
} ( )
} else {
// No need in separate goroutines.
fn ( )
}
return nil
}
2021-05-18 13:39:56 -05:00
func ( g * GrafanaLive ) HandleDatasourceDelete ( orgID int64 , dsUID string ) {
if g . runStreamManager == nil {
return
}
err := g . runStreamManager . HandleDatasourceDelete ( orgID , dsUID )
if err != nil {
logger . Error ( "Error handling datasource delete" , "error" , err )
}
}
func ( g * GrafanaLive ) HandleDatasourceUpdate ( orgID int64 , dsUID string ) {
if g . runStreamManager == nil {
return
}
err := g . runStreamManager . HandleDatasourceUpdate ( orgID , dsUID )
if err != nil {
logger . Error ( "Error handling datasource update" , "error" , err )
}
}
2021-12-14 11:38:20 -06:00
// Use a configuration that's compatible with the standard library
// to minimize the risk of introducing bugs. This will make sure
// that map keys is ordered.
var jsonStd = jsoniter . ConfigCompatibleWithStandardLibrary
func ( g * GrafanaLive ) handleOnRPC ( client * centrifuge . Client , e centrifuge . RPCEvent ) ( centrifuge . RPCReply , error ) {
logger . Debug ( "Client calls RPC" , "user" , client . UserID ( ) , "client" , client . ID ( ) , "method" , e . Method )
if e . Method != "grafana.query" {
return centrifuge . RPCReply { } , centrifuge . ErrorMethodNotFound
}
user , ok := livecontext . GetContextSignedUser ( client . Context ( ) )
if ! ok {
logger . Error ( "No user found in context" , "user" , client . UserID ( ) , "client" , client . ID ( ) , "method" , e . Method )
return centrifuge . RPCReply { } , centrifuge . ErrorInternal
}
var req dtos . MetricRequest
err := json . Unmarshal ( e . Data , & req )
if err != nil {
return centrifuge . RPCReply { } , centrifuge . ErrorBadRequest
}
resp , err := g . queryDataService . QueryData ( client . Context ( ) , user , false , req , true )
if err != nil {
logger . Error ( "Error query data" , "user" , client . UserID ( ) , "client" , client . ID ( ) , "method" , e . Method , "error" , err )
2022-06-27 11:23:15 -05:00
if errors . Is ( err , datasources . ErrDataSourceAccessDenied ) {
2021-12-14 11:38:20 -06:00
return centrifuge . RPCReply { } , & centrifuge . Error { Code : uint32 ( http . StatusForbidden ) , Message : http . StatusText ( http . StatusForbidden ) }
}
var badQuery * query . ErrBadQuery
if errors . As ( err , & badQuery ) {
return centrifuge . RPCReply { } , & centrifuge . Error { Code : uint32 ( http . StatusBadRequest ) , Message : http . StatusText ( http . StatusBadRequest ) }
}
return centrifuge . RPCReply { } , centrifuge . ErrorInternal
}
data , err := jsonStd . Marshal ( resp )
if err != nil {
logger . Error ( "Error marshaling query response" , "user" , client . UserID ( ) , "client" , client . ID ( ) , "method" , e . Method , "error" , err )
return centrifuge . RPCReply { } , centrifuge . ErrorInternal
}
return centrifuge . RPCReply {
Data : data ,
} , nil
}
2021-12-28 03:26:18 -06:00
func ( g * GrafanaLive ) handleOnSubscribe ( ctx context . Context , client * centrifuge . Client , e centrifuge . SubscribeEvent ) ( centrifuge . SubscribeReply , error ) {
2021-05-03 12:29:23 -05:00
logger . Debug ( "Client wants to subscribe" , "user" , client . UserID ( ) , "client" , client . ID ( ) , "channel" , e . Channel )
2021-05-11 14:03:04 -05:00
2021-05-03 12:29:23 -05:00
user , ok := livecontext . GetContextSignedUser ( client . Context ( ) )
if ! ok {
logger . Error ( "No user found in context" , "user" , client . UserID ( ) , "client" , client . ID ( ) , "channel" , e . Channel )
return centrifuge . SubscribeReply { } , centrifuge . ErrorInternal
}
2021-05-11 14:03:04 -05:00
// See a detailed comment for StripOrgID about orgID management in Live.
orgID , channel , err := orgchannel . StripOrgID ( e . Channel )
if err != nil {
logger . Error ( "Error parsing channel" , "user" , client . UserID ( ) , "client" , client . ID ( ) , "channel" , e . Channel , "error" , err )
return centrifuge . SubscribeReply { } , centrifuge . ErrorInternal
}
if user . OrgId != orgID {
logger . Info ( "Error subscribing: wrong orgId" , "user" , client . UserID ( ) , "client" , client . ID ( ) , "channel" , e . Channel )
return centrifuge . SubscribeReply { } , centrifuge . ErrorPermissionDenied
}
2021-09-17 08:40:32 -05:00
var reply models . SubscribeReply
var status backend . SubscribeStreamStatus
2021-10-06 12:43:25 -05:00
var ruleFound bool
2021-09-17 08:40:32 -05:00
if g . Pipeline != nil {
rule , ok , err := g . Pipeline . Get ( user . OrgId , channel )
if err != nil {
logger . Error ( "Error getting channel rule" , "user" , client . UserID ( ) , "client" , client . ID ( ) , "channel" , e . Channel , "error" , err )
return centrifuge . SubscribeReply { } , centrifuge . ErrorInternal
}
2021-10-06 12:43:25 -05:00
ruleFound = ok
if ok {
if rule . SubscribeAuth != nil {
ok , err := rule . SubscribeAuth . CanSubscribe ( client . Context ( ) , user )
if err != nil {
logger . Error ( "Error checking subscribe permissions" , "user" , client . UserID ( ) , "client" , client . ID ( ) , "channel" , e . Channel , "error" , err )
return centrifuge . SubscribeReply { } , centrifuge . ErrorInternal
}
if ! ok {
// using HTTP error codes for WS errors too.
code , text := subscribeStatusToHTTPError ( backend . SubscribeStreamStatusPermissionDenied )
return centrifuge . SubscribeReply { } , & centrifuge . Error { Code : uint32 ( code ) , Message : text }
}
}
if len ( rule . Subscribers ) > 0 {
var err error
for _ , sub := range rule . Subscribers {
reply , status , err = sub . Subscribe ( client . Context ( ) , pipeline . Vars {
OrgID : orgID ,
Channel : channel ,
2021-12-14 11:12:00 -06:00
} , e . Data )
2021-10-06 12:43:25 -05:00
if err != nil {
logger . Error ( "Error channel rule subscribe" , "user" , client . UserID ( ) , "client" , client . ID ( ) , "channel" , e . Channel , "error" , err )
return centrifuge . SubscribeReply { } , centrifuge . ErrorInternal
}
if status != backend . SubscribeStreamStatusOK {
break
}
}
2021-09-17 08:40:32 -05:00
}
2021-05-19 12:47:53 -05:00
}
2021-05-03 12:29:23 -05:00
}
2021-10-06 12:43:25 -05:00
if ! ruleFound {
2021-12-28 03:26:18 -06:00
handler , addr , err := g . GetChannelHandler ( ctx , user , channel )
2021-09-17 08:40:32 -05:00
if err != nil {
if errors . Is ( err , live . ErrInvalidChannelID ) {
logger . Info ( "Invalid channel ID" , "user" , client . UserID ( ) , "client" , client . ID ( ) , "channel" , e . Channel )
return centrifuge . SubscribeReply { } , & centrifuge . Error { Code : uint32 ( http . StatusBadRequest ) , Message : "invalid channel ID" }
}
logger . Error ( "Error getting channel handler" , "user" , client . UserID ( ) , "client" , client . ID ( ) , "channel" , e . Channel , "error" , err )
return centrifuge . SubscribeReply { } , centrifuge . ErrorInternal
}
reply , status , err = handler . OnSubscribe ( client . Context ( ) , user , models . SubscribeEvent {
Channel : channel ,
Path : addr . Path ,
2021-12-14 11:12:00 -06:00
Data : e . Data ,
2021-09-17 08:40:32 -05:00
} )
if err != nil {
logger . Error ( "Error calling channel handler subscribe" , "user" , client . UserID ( ) , "client" , client . ID ( ) , "channel" , e . Channel , "error" , err )
return centrifuge . SubscribeReply { } , centrifuge . ErrorInternal
}
2021-05-03 12:29:23 -05:00
}
if status != backend . SubscribeStreamStatusOK {
// using HTTP error codes for WS errors too.
code , text := subscribeStatusToHTTPError ( status )
logger . Debug ( "Return custom subscribe error" , "user" , client . UserID ( ) , "client" , client . ID ( ) , "channel" , e . Channel , "code" , code )
return centrifuge . SubscribeReply { } , & centrifuge . Error { Code : uint32 ( code ) , Message : text }
}
logger . Debug ( "Client subscribed" , "user" , client . UserID ( ) , "client" , client . ID ( ) , "channel" , e . Channel )
return centrifuge . SubscribeReply {
Options : centrifuge . SubscribeOptions {
Presence : reply . Presence ,
JoinLeave : reply . JoinLeave ,
Recover : reply . Recover ,
Data : reply . Data ,
} ,
} , nil
}
2021-12-28 03:26:18 -06:00
func ( g * GrafanaLive ) handleOnPublish ( ctx context . Context , client * centrifuge . Client , e centrifuge . PublishEvent ) ( centrifuge . PublishReply , error ) {
2021-05-03 12:29:23 -05:00
logger . Debug ( "Client wants to publish" , "user" , client . UserID ( ) , "client" , client . ID ( ) , "channel" , e . Channel )
2021-05-11 14:03:04 -05:00
2021-05-03 12:29:23 -05:00
user , ok := livecontext . GetContextSignedUser ( client . Context ( ) )
if ! ok {
logger . Error ( "No user found in context" , "user" , client . UserID ( ) , "client" , client . ID ( ) , "channel" , e . Channel )
return centrifuge . PublishReply { } , centrifuge . ErrorInternal
}
2021-05-11 14:03:04 -05:00
// See a detailed comment for StripOrgID about orgID management in Live.
orgID , channel , err := orgchannel . StripOrgID ( e . Channel )
if err != nil {
logger . Error ( "Error parsing channel" , "user" , client . UserID ( ) , "client" , client . ID ( ) , "channel" , e . Channel , "error" , err )
return centrifuge . PublishReply { } , centrifuge . ErrorInternal
}
if user . OrgId != orgID {
logger . Info ( "Error subscribing: wrong orgId" , "user" , client . UserID ( ) , "client" , client . ID ( ) , "channel" , e . Channel )
return centrifuge . PublishReply { } , centrifuge . ErrorPermissionDenied
}
2021-10-06 12:43:25 -05:00
if g . Pipeline != nil {
rule , ok , err := g . Pipeline . Get ( user . OrgId , channel )
if err != nil {
logger . Error ( "Error getting channel rule" , "user" , client . UserID ( ) , "client" , client . ID ( ) , "channel" , e . Channel , "error" , err )
return centrifuge . PublishReply { } , centrifuge . ErrorInternal
}
if ok {
if rule . PublishAuth != nil {
ok , err := rule . PublishAuth . CanPublish ( client . Context ( ) , user )
if err != nil {
logger . Error ( "Error checking publish permissions" , "user" , client . UserID ( ) , "client" , client . ID ( ) , "channel" , e . Channel , "error" , err )
return centrifuge . PublishReply { } , centrifuge . ErrorInternal
}
if ! ok {
// using HTTP error codes for WS errors too.
code , text := publishStatusToHTTPError ( backend . PublishStreamStatusPermissionDenied )
return centrifuge . PublishReply { } , & centrifuge . Error { Code : uint32 ( code ) , Message : text }
}
} else {
2022-08-10 04:56:48 -05:00
if ! user . HasRole ( org . RoleAdmin ) {
2021-10-06 12:43:25 -05:00
// using HTTP error codes for WS errors too.
code , text := publishStatusToHTTPError ( backend . PublishStreamStatusPermissionDenied )
return centrifuge . PublishReply { } , & centrifuge . Error { Code : uint32 ( code ) , Message : text }
}
}
_ , err := g . Pipeline . ProcessInput ( client . Context ( ) , user . OrgId , channel , e . Data )
if err != nil {
logger . Error ( "Error processing input" , "user" , client . UserID ( ) , "client" , client . ID ( ) , "channel" , e . Channel , "error" , err )
return centrifuge . PublishReply { } , centrifuge . ErrorInternal
}
return centrifuge . PublishReply {
Result : & centrifuge . PublishResult { } ,
} , nil
}
}
2021-12-28 03:26:18 -06:00
handler , addr , err := g . GetChannelHandler ( ctx , user , channel )
2021-05-03 12:29:23 -05:00
if err != nil {
2021-05-19 12:47:53 -05:00
if errors . Is ( err , live . ErrInvalidChannelID ) {
logger . Info ( "Invalid channel ID" , "user" , client . UserID ( ) , "client" , client . ID ( ) , "channel" , e . Channel )
return centrifuge . PublishReply { } , & centrifuge . Error { Code : uint32 ( http . StatusBadRequest ) , Message : "invalid channel ID" }
}
2021-05-03 12:29:23 -05:00
logger . Error ( "Error getting channel handler" , "user" , client . UserID ( ) , "client" , client . ID ( ) , "channel" , e . Channel , "error" , err )
return centrifuge . PublishReply { } , centrifuge . ErrorInternal
}
reply , status , err := handler . OnPublish ( client . Context ( ) , user , models . PublishEvent {
2021-05-11 14:03:04 -05:00
Channel : channel ,
2021-05-03 12:29:23 -05:00
Path : addr . Path ,
Data : e . Data ,
} )
if err != nil {
logger . Error ( "Error calling channel handler publish" , "user" , client . UserID ( ) , "client" , client . ID ( ) , "channel" , e . Channel , "error" , err )
return centrifuge . PublishReply { } , centrifuge . ErrorInternal
}
2021-10-06 12:43:25 -05:00
2021-05-03 12:29:23 -05:00
if status != backend . PublishStreamStatusOK {
// using HTTP error codes for WS errors too.
code , text := publishStatusToHTTPError ( status )
logger . Debug ( "Return custom publish error" , "user" , client . UserID ( ) , "client" , client . ID ( ) , "channel" , e . Channel , "code" , code )
return centrifuge . PublishReply { } , & centrifuge . Error { Code : uint32 ( code ) , Message : text }
}
centrifugeReply := centrifuge . PublishReply {
Options : centrifuge . PublishOptions {
HistorySize : reply . HistorySize ,
HistoryTTL : reply . HistoryTTL ,
} ,
}
if reply . Data != nil {
// If data is not nil then we published it manually and tell Centrifuge
// publication result so Centrifuge won't publish itself.
result , err := g . node . Publish ( e . Channel , reply . Data )
if err != nil {
logger . Error ( "Error publishing" , "user" , client . UserID ( ) , "client" , client . ID ( ) , "channel" , e . Channel , "error" , err , "data" , string ( reply . Data ) )
return centrifuge . PublishReply { } , centrifuge . ErrorInternal
}
centrifugeReply . Result = & result
}
logger . Debug ( "Publication successful" , "user" , client . UserID ( ) , "client" , client . ID ( ) , "channel" , e . Channel )
return centrifugeReply , nil
}
2021-04-02 11:41:45 -05:00
func subscribeStatusToHTTPError ( status backend . SubscribeStreamStatus ) ( int , string ) {
switch status {
case backend . SubscribeStreamStatusNotFound :
return http . StatusNotFound , http . StatusText ( http . StatusNotFound )
case backend . SubscribeStreamStatusPermissionDenied :
return http . StatusForbidden , http . StatusText ( http . StatusForbidden )
default :
2021-11-08 10:56:56 -06:00
logger . Warn ( "unknown subscribe status" , "status" , status )
2021-04-02 11:41:45 -05:00
return http . StatusInternalServerError , http . StatusText ( http . StatusInternalServerError )
}
}
func publishStatusToHTTPError ( status backend . PublishStreamStatus ) ( int , string ) {
switch status {
case backend . PublishStreamStatusNotFound :
return http . StatusNotFound , http . StatusText ( http . StatusNotFound )
case backend . PublishStreamStatusPermissionDenied :
return http . StatusForbidden , http . StatusText ( http . StatusForbidden )
default :
2021-11-08 10:56:56 -06:00
logger . Warn ( "unknown publish status" , "status" , status )
2021-04-02 11:41:45 -05:00
return http . StatusInternalServerError , http . StatusText ( http . StatusInternalServerError )
}
}
2021-03-23 12:24:08 -05:00
// GetChannelHandler gives thread-safe access to the channel.
2022-08-10 04:56:48 -05:00
func ( g * GrafanaLive ) GetChannelHandler ( ctx context . Context , user * user . SignedInUser , channel string ) ( models . ChannelHandler , live . Channel , error ) {
2021-03-30 05:23:29 -05:00
// Parse the identifier ${scope}/${namespace}/${path}
2021-05-19 12:47:53 -05:00
addr , err := live . ParseChannel ( channel )
if err != nil {
return nil , live . Channel { } , err
2021-03-30 05:23:29 -05:00
}
2020-10-01 12:46:14 -05:00
g . channelsMu . RLock ( )
c , ok := g . channels [ channel ]
g . channelsMu . RUnlock ( ) // defer? but then you can't lock further down
if ok {
2021-03-23 12:24:08 -05:00
logger . Debug ( "Found cached channel handler" , "channel" , channel )
2021-03-30 05:23:29 -05:00
return c , addr , nil
2020-10-01 12:46:14 -05:00
}
g . channelsMu . Lock ( )
defer g . channelsMu . Unlock ( )
c , ok = g . channels [ channel ] // may have filled in while locked
if ok {
2021-03-23 12:24:08 -05:00
logger . Debug ( "Found cached channel handler" , "channel" , channel )
2021-03-30 05:23:29 -05:00
return c , addr , nil
2020-07-27 02:26:16 -05:00
}
2020-10-01 12:46:14 -05:00
2021-12-28 03:26:18 -06:00
getter , err := g . GetChannelHandlerFactory ( ctx , user , addr . Scope , addr . Namespace )
2020-10-01 12:46:14 -05:00
if err != nil {
2021-03-30 05:23:29 -05:00
return nil , addr , fmt . Errorf ( "error getting channel handler factory: %w" , err )
2020-10-01 12:46:14 -05:00
}
2020-10-22 02:10:26 -05:00
2021-03-23 12:24:08 -05:00
// First access will initialize.
2020-10-22 02:10:26 -05:00
c , err = getter . GetHandlerForPath ( addr . Path )
if err != nil {
2021-03-30 05:23:29 -05:00
return nil , addr , fmt . Errorf ( "error getting handler for path: %w" , err )
2020-10-22 02:10:26 -05:00
}
2021-03-23 12:24:08 -05:00
logger . Info ( "Initialized channel handler" , "channel" , channel , "address" , addr )
2020-10-01 12:46:14 -05:00
g . channels [ channel ] = c
2021-03-30 05:23:29 -05:00
return c , addr , nil
2020-10-01 12:46:14 -05:00
}
2020-10-22 02:10:26 -05:00
// GetChannelHandlerFactory gets a ChannelHandlerFactory for a namespace.
2021-03-23 12:24:08 -05:00
// It gives thread-safe access to the channel.
2022-08-10 04:56:48 -05:00
func ( g * GrafanaLive ) GetChannelHandlerFactory ( ctx context . Context , user * user . SignedInUser , scope string , namespace string ) ( models . ChannelHandlerFactory , error ) {
2021-03-23 12:24:08 -05:00
switch scope {
2021-04-09 14:17:22 -05:00
case live . ScopeGrafana :
2021-03-30 05:23:29 -05:00
return g . handleGrafanaScope ( user , namespace )
2021-04-09 14:17:22 -05:00
case live . ScopePlugin :
2021-12-28 03:26:18 -06:00
return g . handlePluginScope ( ctx , user , namespace )
2021-04-09 14:17:22 -05:00
case live . ScopeDatasource :
2021-12-28 03:26:18 -06:00
return g . handleDatasourceScope ( ctx , user , namespace )
2021-04-09 14:17:22 -05:00
case live . ScopeStream :
2021-04-05 11:04:46 -05:00
return g . handleStreamScope ( user , namespace )
2021-03-23 12:24:08 -05:00
default :
return nil , fmt . Errorf ( "invalid scope: %q" , scope )
2020-10-01 12:46:14 -05:00
}
2021-03-23 12:24:08 -05:00
}
2020-10-01 12:46:14 -05:00
2022-08-10 04:56:48 -05:00
func ( g * GrafanaLive ) handleGrafanaScope ( _ * user . SignedInUser , namespace string ) ( models . ChannelHandlerFactory , error ) {
2021-03-23 12:24:08 -05:00
if p , ok := g . GrafanaScope . Features [ namespace ] ; ok {
return p , nil
2020-10-01 12:46:14 -05:00
}
2021-03-23 12:24:08 -05:00
return nil , fmt . Errorf ( "unknown feature: %q" , namespace )
}
2020-10-01 12:46:14 -05:00
2022-08-10 04:56:48 -05:00
func ( g * GrafanaLive ) handlePluginScope ( ctx context . Context , _ * user . SignedInUser , namespace string ) ( models . ChannelHandlerFactory , error ) {
2021-12-28 03:26:18 -06:00
streamHandler , err := g . getStreamPlugin ( ctx , namespace )
2021-03-23 12:24:08 -05:00
if err != nil {
return nil , fmt . Errorf ( "can't find stream plugin: %s" , namespace )
}
return features . NewPluginRunner (
namespace ,
2021-04-02 11:41:45 -05:00
"" , // No instance uid for non-datasource plugins.
2021-04-16 17:17:08 -05:00
g . runStreamManager ,
2021-03-23 12:24:08 -05:00
g . contextGetter ,
streamHandler ,
) , nil
}
2020-10-01 12:46:14 -05:00
2022-08-10 04:56:48 -05:00
func ( g * GrafanaLive ) handleStreamScope ( u * user . SignedInUser , namespace string ) ( models . ChannelHandlerFactory , error ) {
2021-09-09 11:19:29 -05:00
return g . ManagedStreamRunner . GetOrCreateStream ( u . OrgId , live . ScopeStream , namespace )
2021-04-05 11:04:46 -05:00
}
2022-08-10 04:56:48 -05:00
func ( g * GrafanaLive ) handleDatasourceScope ( ctx context . Context , user * user . SignedInUser , namespace string ) ( models . ChannelHandlerFactory , error ) {
2021-12-28 03:26:18 -06:00
ds , err := g . DataSourceCache . GetDatasourceByUID ( ctx , namespace , user , false )
2021-03-23 12:24:08 -05:00
if err != nil {
2021-06-24 03:07:09 -05:00
return nil , fmt . Errorf ( "error getting datasource: %w" , err )
2021-03-23 12:24:08 -05:00
}
2021-12-28 03:26:18 -06:00
streamHandler , err := g . getStreamPlugin ( ctx , ds . Type )
2021-03-23 12:24:08 -05:00
if err != nil {
2021-04-02 11:41:45 -05:00
return nil , fmt . Errorf ( "can't find stream plugin: %s" , ds . Type )
2021-03-23 12:24:08 -05:00
}
return features . NewPluginRunner (
ds . Type ,
ds . Uid ,
2021-04-16 17:17:08 -05:00
g . runStreamManager ,
2021-03-23 12:24:08 -05:00
g . contextGetter ,
streamHandler ,
) , nil
2020-10-01 12:46:14 -05:00
}
2021-06-24 03:07:09 -05:00
// Publish sends the data to the channel without checking permissions etc.
2021-05-11 14:03:04 -05:00
func ( g * GrafanaLive ) Publish ( orgID int64 , channel string , data [ ] byte ) error {
_ , err := g . node . Publish ( orgchannel . PrependOrgID ( orgID , channel ) , data )
2020-10-01 12:46:14 -05:00
return err
2020-07-27 02:26:16 -05:00
}
2021-06-24 03:07:09 -05:00
// ClientCount returns the number of clients.
2021-05-11 14:03:04 -05:00
func ( g * GrafanaLive ) ClientCount ( orgID int64 , channel string ) ( int , error ) {
p , err := g . node . Presence ( orgchannel . PrependOrgID ( orgID , channel ) )
2021-04-23 14:55:31 -05:00
if err != nil {
return 0 , err
}
return len ( p . Presence ) , nil
}
2021-11-29 03:18:01 -06:00
func ( g * GrafanaLive ) HandleHTTPPublish ( ctx * models . ReqContext ) response . Response {
cmd := dtos . LivePublishCmd { }
if err := web . Bind ( ctx . Req , & cmd ) ; err != nil {
return response . Error ( http . StatusBadRequest , "bad request data" , err )
}
2021-05-19 12:47:53 -05:00
addr , err := live . ParseChannel ( cmd . Channel )
if err != nil {
return response . Error ( http . StatusBadRequest , "invalid channel ID" , nil )
2021-03-30 05:23:29 -05:00
}
2021-04-23 16:21:38 -05:00
logger . Debug ( "Publish API cmd" , "user" , ctx . SignedInUser . UserId , "channel" , cmd . Channel )
2021-10-06 12:43:25 -05:00
user := ctx . SignedInUser
channel := cmd . Channel
if g . Pipeline != nil {
rule , ok , err := g . Pipeline . Get ( user . OrgId , channel )
if err != nil {
logger . Error ( "Error getting channel rule" , "user" , user , "channel" , channel , "error" , err )
return response . Error ( http . StatusInternalServerError , http . StatusText ( http . StatusInternalServerError ) , nil )
}
if ok {
if rule . PublishAuth != nil {
ok , err := rule . PublishAuth . CanPublish ( ctx . Req . Context ( ) , user )
if err != nil {
logger . Error ( "Error checking publish permissions" , "user" , user , "channel" , channel , "error" , err )
return response . Error ( http . StatusInternalServerError , http . StatusText ( http . StatusInternalServerError ) , nil )
}
if ! ok {
return response . Error ( http . StatusForbidden , http . StatusText ( http . StatusForbidden ) , nil )
}
} else {
2022-08-10 04:56:48 -05:00
if ! user . HasRole ( org . RoleAdmin ) {
2021-10-06 12:43:25 -05:00
return response . Error ( http . StatusForbidden , http . StatusText ( http . StatusForbidden ) , nil )
}
}
_ , err := g . Pipeline . ProcessInput ( ctx . Req . Context ( ) , user . OrgId , channel , cmd . Data )
if err != nil {
logger . Error ( "Error processing input" , "user" , user , "channel" , channel , "error" , err )
return response . Error ( http . StatusInternalServerError , http . StatusText ( http . StatusInternalServerError ) , nil )
}
return response . JSON ( http . StatusOK , dtos . LivePublishResponse { } )
}
}
2021-03-30 05:23:29 -05:00
2021-12-28 03:26:18 -06:00
channelHandler , addr , err := g . GetChannelHandler ( ctx . Req . Context ( ) , ctx . SignedInUser , cmd . Channel )
2021-03-30 05:23:29 -05:00
if err != nil {
logger . Error ( "Error getting channels handler" , "error" , err , "channel" , cmd . Channel )
return response . Error ( http . StatusInternalServerError , http . StatusText ( http . StatusInternalServerError ) , nil )
}
2021-04-02 11:41:45 -05:00
reply , status , err := channelHandler . OnPublish ( ctx . Req . Context ( ) , ctx . SignedInUser , models . PublishEvent { Channel : cmd . Channel , Path : addr . Path , Data : cmd . Data } )
2021-03-30 05:23:29 -05:00
if err != nil {
logger . Error ( "Error calling OnPublish" , "error" , err , "channel" , cmd . Channel )
return response . Error ( http . StatusInternalServerError , http . StatusText ( http . StatusInternalServerError ) , nil )
}
2021-10-06 12:43:25 -05:00
2021-04-02 11:41:45 -05:00
if status != backend . PublishStreamStatusOK {
code , text := publishStatusToHTTPError ( status )
return response . Error ( code , text , nil )
}
if reply . Data != nil {
2021-09-01 10:54:34 -05:00
err = g . Publish ( ctx . OrgId , cmd . Channel , cmd . Data )
2021-04-02 11:41:45 -05:00
if err != nil {
logger . Error ( "Error publish to channel" , "error" , err , "channel" , cmd . Channel )
return response . Error ( http . StatusInternalServerError , http . StatusText ( http . StatusInternalServerError ) , nil )
}
2021-03-30 05:23:29 -05:00
}
2021-04-02 11:41:45 -05:00
logger . Debug ( "Publication successful" , "user" , ctx . SignedInUser . UserId , "channel" , cmd . Channel )
return response . JSON ( http . StatusOK , dtos . LivePublishResponse { } )
2021-03-30 05:23:29 -05:00
}
2021-06-24 03:07:09 -05:00
type streamChannelListResponse struct {
Channels [ ] * managedstream . ManagedChannel ` json:"channels" `
}
2021-04-05 11:04:46 -05:00
// HandleListHTTP returns metadata so the UI can build a nice form
2021-05-11 14:03:04 -05:00
func ( g * GrafanaLive ) HandleListHTTP ( c * models . ReqContext ) response . Response {
2021-06-24 03:07:09 -05:00
var channels [ ] * managedstream . ManagedChannel
var err error
if g . IsHA ( ) {
channels , err = g . surveyCaller . CallManagedStreams ( c . SignedInUser . OrgId )
} else {
channels , err = g . ManagedStreamRunner . GetManagedChannels ( c . SignedInUser . OrgId )
}
if err != nil {
return response . Error ( http . StatusInternalServerError , http . StatusText ( http . StatusInternalServerError ) , err )
}
info := streamChannelListResponse {
Channels : channels ,
2021-05-27 04:55:42 -05:00
}
2022-04-15 07:01:58 -05:00
return response . JSONStreaming ( http . StatusOK , info )
2020-07-27 02:26:16 -05:00
}
2021-04-23 14:55:31 -05:00
// HandleInfoHTTP special http response for
func ( g * GrafanaLive ) HandleInfoHTTP ( ctx * models . ReqContext ) response . Response {
2021-10-11 07:30:59 -05:00
path := web . Params ( ctx . Req ) [ "*" ]
2021-04-23 14:55:31 -05:00
if path == "grafana/dashboards/gitops" {
2022-04-15 07:01:58 -05:00
return response . JSON ( http . StatusOK , util . DynMap {
2021-05-11 14:03:04 -05:00
"active" : g . GrafanaScope . Dashboards . HasGitOpsObserver ( ctx . SignedInUser . OrgId ) ,
2021-04-23 14:55:31 -05:00
} )
}
return response . JSONStreaming ( 404 , util . DynMap {
"message" : "Info is not supported for this channel" ,
} )
}
2021-06-24 03:07:09 -05:00
2021-09-09 11:19:29 -05:00
// HandleChannelRulesListHTTP ...
func ( g * GrafanaLive ) HandleChannelRulesListHTTP ( c * models . ReqContext ) response . Response {
2021-11-05 04:13:40 -05:00
result , err := g . pipelineStorage . ListChannelRules ( c . Req . Context ( ) , c . OrgId )
2021-09-09 11:19:29 -05:00
if err != nil {
return response . Error ( http . StatusInternalServerError , "Failed to get channel rules" , err )
}
return response . JSON ( http . StatusOK , util . DynMap {
"rules" : result ,
} )
}
2021-09-30 12:28:06 -05:00
type ConvertDryRunRequest struct {
ChannelRules [ ] pipeline . ChannelRule ` json:"channelRules" `
Channel string ` json:"channel" `
Data string ` json:"data" `
}
type ConvertDryRunResponse struct {
ChannelFrames [ ] * pipeline . ChannelFrame ` json:"channelFrames" `
}
type DryRunRuleStorage struct {
ChannelRules [ ] pipeline . ChannelRule
}
2021-11-09 10:12:10 -06:00
func ( s * DryRunRuleStorage ) GetWriteConfig ( _ context . Context , _ int64 , _ pipeline . WriteConfigGetCmd ) ( pipeline . WriteConfig , bool , error ) {
return pipeline . WriteConfig { } , false , errors . New ( "not implemented by dry run rule storage" )
2021-11-05 04:13:40 -05:00
}
2021-11-09 10:12:10 -06:00
func ( s * DryRunRuleStorage ) CreateWriteConfig ( _ context . Context , _ int64 , _ pipeline . WriteConfigCreateCmd ) ( pipeline . WriteConfig , error ) {
return pipeline . WriteConfig { } , errors . New ( "not implemented by dry run rule storage" )
2021-11-05 04:13:40 -05:00
}
2021-11-09 10:12:10 -06:00
func ( s * DryRunRuleStorage ) UpdateWriteConfig ( _ context . Context , _ int64 , _ pipeline . WriteConfigUpdateCmd ) ( pipeline . WriteConfig , error ) {
return pipeline . WriteConfig { } , errors . New ( "not implemented by dry run rule storage" )
2021-11-05 04:13:40 -05:00
}
2021-11-09 10:12:10 -06:00
func ( s * DryRunRuleStorage ) DeleteWriteConfig ( _ context . Context , _ int64 , _ pipeline . WriteConfigDeleteCmd ) error {
2021-11-05 04:13:40 -05:00
return errors . New ( "not implemented by dry run rule storage" )
}
func ( s * DryRunRuleStorage ) CreateChannelRule ( _ context . Context , _ int64 , _ pipeline . ChannelRuleCreateCmd ) ( pipeline . ChannelRule , error ) {
2021-09-30 12:28:06 -05:00
return pipeline . ChannelRule { } , errors . New ( "not implemented by dry run rule storage" )
}
2021-11-05 04:13:40 -05:00
func ( s * DryRunRuleStorage ) UpdateChannelRule ( _ context . Context , _ int64 , _ pipeline . ChannelRuleUpdateCmd ) ( pipeline . ChannelRule , error ) {
2021-09-30 12:28:06 -05:00
return pipeline . ChannelRule { } , errors . New ( "not implemented by dry run rule storage" )
}
2021-11-05 04:13:40 -05:00
func ( s * DryRunRuleStorage ) DeleteChannelRule ( _ context . Context , _ int64 , _ pipeline . ChannelRuleDeleteCmd ) error {
2021-09-30 12:28:06 -05:00
return errors . New ( "not implemented by dry run rule storage" )
}
2021-11-09 10:12:10 -06:00
func ( s * DryRunRuleStorage ) ListWriteConfigs ( _ context . Context , _ int64 ) ( [ ] pipeline . WriteConfig , error ) {
2021-09-30 12:28:06 -05:00
return nil , nil
}
func ( s * DryRunRuleStorage ) ListChannelRules ( _ context . Context , _ int64 ) ( [ ] pipeline . ChannelRule , error ) {
return s . ChannelRules , nil
}
// HandlePipelineConvertTestHTTP ...
func ( g * GrafanaLive ) HandlePipelineConvertTestHTTP ( c * models . ReqContext ) response . Response {
body , err := ioutil . ReadAll ( c . Req . Body )
if err != nil {
return response . Error ( http . StatusInternalServerError , "Error reading body" , err )
}
var req ConvertDryRunRequest
err = json . Unmarshal ( body , & req )
if err != nil {
return response . Error ( http . StatusBadRequest , "Error decoding request" , err )
}
storage := & DryRunRuleStorage {
ChannelRules : req . ChannelRules ,
}
builder := & pipeline . StorageRuleBuilder {
Node : g . node ,
ManagedStream : g . ManagedStreamRunner ,
FrameStorage : pipeline . NewFrameStorage ( ) ,
2021-11-05 04:13:40 -05:00
Storage : storage ,
2021-09-30 12:28:06 -05:00
ChannelHandlerGetter : g ,
}
channelRuleGetter := pipeline . NewCacheSegmentedTree ( builder )
pipe , err := pipeline . New ( channelRuleGetter )
if err != nil {
return response . Error ( http . StatusInternalServerError , "Error creating pipeline" , err )
}
rule , ok , err := channelRuleGetter . Get ( c . OrgId , req . Channel )
if err != nil {
return response . Error ( http . StatusInternalServerError , "Error getting channel rule" , err )
}
if ! ok {
return response . Error ( http . StatusNotFound , "No rule found" , nil )
}
2021-10-06 12:43:25 -05:00
if rule . Converter == nil {
return response . Error ( http . StatusNotFound , "No converter found" , nil )
}
channelFrames , err := pipe . DataToChannelFrames ( c . Req . Context ( ) , * rule , c . OrgId , req . Channel , [ ] byte ( req . Data ) )
2021-09-30 12:28:06 -05:00
if err != nil {
return response . Error ( http . StatusInternalServerError , "Error converting data" , err )
}
return response . JSON ( http . StatusOK , ConvertDryRunResponse {
ChannelFrames : channelFrames ,
} )
}
2021-09-30 11:29:32 -05:00
// HandleChannelRulesPostHTTP ...
func ( g * GrafanaLive ) HandleChannelRulesPostHTTP ( c * models . ReqContext ) response . Response {
body , err := ioutil . ReadAll ( c . Req . Body )
if err != nil {
return response . Error ( http . StatusInternalServerError , "Error reading body" , err )
}
2021-11-05 04:13:40 -05:00
var cmd pipeline . ChannelRuleCreateCmd
err = json . Unmarshal ( body , & cmd )
2021-09-30 11:29:32 -05:00
if err != nil {
return response . Error ( http . StatusBadRequest , "Error decoding channel rule" , err )
}
2021-11-05 04:13:40 -05:00
rule , err := g . pipelineStorage . CreateChannelRule ( c . Req . Context ( ) , c . OrgId , cmd )
2021-09-30 11:29:32 -05:00
if err != nil {
return response . Error ( http . StatusInternalServerError , "Failed to create channel rule" , err )
}
return response . JSON ( http . StatusOK , util . DynMap {
2021-11-05 04:13:40 -05:00
"rule" : rule ,
2021-09-30 11:29:32 -05:00
} )
}
// HandleChannelRulesPutHTTP ...
func ( g * GrafanaLive ) HandleChannelRulesPutHTTP ( c * models . ReqContext ) response . Response {
body , err := ioutil . ReadAll ( c . Req . Body )
if err != nil {
return response . Error ( http . StatusInternalServerError , "Error reading body" , err )
}
2021-11-05 04:13:40 -05:00
var cmd pipeline . ChannelRuleUpdateCmd
err = json . Unmarshal ( body , & cmd )
2021-09-30 11:29:32 -05:00
if err != nil {
return response . Error ( http . StatusBadRequest , "Error decoding channel rule" , err )
}
2021-11-05 04:13:40 -05:00
if cmd . Pattern == "" {
2021-09-30 11:29:32 -05:00
return response . Error ( http . StatusBadRequest , "Rule pattern required" , nil )
}
2021-11-05 04:13:40 -05:00
rule , err := g . pipelineStorage . UpdateChannelRule ( c . Req . Context ( ) , c . OrgId , cmd )
2021-09-30 11:29:32 -05:00
if err != nil {
return response . Error ( http . StatusInternalServerError , "Failed to update channel rule" , err )
}
return response . JSON ( http . StatusOK , util . DynMap {
"rule" : rule ,
} )
}
// HandleChannelRulesDeleteHTTP ...
func ( g * GrafanaLive ) HandleChannelRulesDeleteHTTP ( c * models . ReqContext ) response . Response {
body , err := ioutil . ReadAll ( c . Req . Body )
if err != nil {
return response . Error ( http . StatusInternalServerError , "Error reading body" , err )
}
2021-11-05 04:13:40 -05:00
var cmd pipeline . ChannelRuleDeleteCmd
err = json . Unmarshal ( body , & cmd )
2021-09-30 11:29:32 -05:00
if err != nil {
return response . Error ( http . StatusBadRequest , "Error decoding channel rule" , err )
}
2021-11-05 04:13:40 -05:00
if cmd . Pattern == "" {
2021-09-30 11:29:32 -05:00
return response . Error ( http . StatusBadRequest , "Rule pattern required" , nil )
}
2021-11-05 04:13:40 -05:00
err = g . pipelineStorage . DeleteChannelRule ( c . Req . Context ( ) , c . OrgId , cmd )
2021-09-30 11:29:32 -05:00
if err != nil {
return response . Error ( http . StatusInternalServerError , "Failed to delete channel rule" , err )
}
return response . JSON ( http . StatusOK , util . DynMap { } )
2021-09-21 13:57:58 -05:00
}
// HandlePipelineEntitiesListHTTP ...
func ( g * GrafanaLive ) HandlePipelineEntitiesListHTTP ( _ * models . ReqContext ) response . Response {
return response . JSON ( http . StatusOK , util . DynMap {
2021-10-06 12:43:25 -05:00
"subscribers" : pipeline . SubscribersRegistry ,
"dataOutputs" : pipeline . DataOutputsRegistry ,
"converters" : pipeline . ConvertersRegistry ,
"frameProcessors" : pipeline . FrameProcessorsRegistry ,
"frameOutputs" : pipeline . FrameOutputsRegistry ,
2021-09-21 13:57:58 -05:00
} )
}
2021-11-09 10:12:10 -06:00
// HandleWriteConfigsListHTTP ...
func ( g * GrafanaLive ) HandleWriteConfigsListHTTP ( c * models . ReqContext ) response . Response {
backends , err := g . pipelineStorage . ListWriteConfigs ( c . Req . Context ( ) , c . OrgId )
2021-09-09 11:19:29 -05:00
if err != nil {
2021-11-09 10:12:10 -06:00
return response . Error ( http . StatusInternalServerError , "Failed to get write configs" , err )
2021-11-05 04:13:40 -05:00
}
2021-11-09 10:12:10 -06:00
result := make ( [ ] pipeline . WriteConfigDto , 0 , len ( backends ) )
2021-11-05 04:13:40 -05:00
for _ , b := range backends {
2021-11-09 10:12:10 -06:00
result = append ( result , pipeline . WriteConfigToDto ( b ) )
2021-09-09 11:19:29 -05:00
}
return response . JSON ( http . StatusOK , util . DynMap {
2021-11-09 10:12:10 -06:00
"writeConfigs" : result ,
2021-09-09 11:19:29 -05:00
} )
}
2021-11-09 10:12:10 -06:00
// HandleWriteConfigsPostHTTP ...
func ( g * GrafanaLive ) HandleWriteConfigsPostHTTP ( c * models . ReqContext ) response . Response {
2021-11-05 04:13:40 -05:00
body , err := ioutil . ReadAll ( c . Req . Body )
if err != nil {
return response . Error ( http . StatusInternalServerError , "Error reading body" , err )
}
2021-11-09 10:12:10 -06:00
var cmd pipeline . WriteConfigCreateCmd
2021-11-05 04:13:40 -05:00
err = json . Unmarshal ( body , & cmd )
if err != nil {
2021-11-09 10:12:10 -06:00
return response . Error ( http . StatusBadRequest , "Error decoding write config create command" , err )
2021-11-05 04:13:40 -05:00
}
2021-11-09 10:12:10 -06:00
result , err := g . pipelineStorage . CreateWriteConfig ( c . Req . Context ( ) , c . OrgId , cmd )
2021-11-05 04:13:40 -05:00
if err != nil {
2021-11-09 10:12:10 -06:00
return response . Error ( http . StatusInternalServerError , "Failed to create write config" , err )
2021-11-05 04:13:40 -05:00
}
return response . JSON ( http . StatusOK , util . DynMap {
2021-11-09 10:12:10 -06:00
"writeConfig" : pipeline . WriteConfigToDto ( result ) ,
2021-11-05 04:13:40 -05:00
} )
}
2021-11-09 10:12:10 -06:00
// HandleWriteConfigsPutHTTP ...
func ( g * GrafanaLive ) HandleWriteConfigsPutHTTP ( c * models . ReqContext ) response . Response {
2021-11-05 04:13:40 -05:00
body , err := ioutil . ReadAll ( c . Req . Body )
if err != nil {
return response . Error ( http . StatusInternalServerError , "Error reading body" , err )
}
2021-11-09 10:12:10 -06:00
var cmd pipeline . WriteConfigUpdateCmd
2021-11-05 04:13:40 -05:00
err = json . Unmarshal ( body , & cmd )
if err != nil {
2021-11-09 10:12:10 -06:00
return response . Error ( http . StatusBadRequest , "Error decoding write config update command" , err )
2021-11-05 04:13:40 -05:00
}
if cmd . UID == "" {
return response . Error ( http . StatusBadRequest , "UID required" , nil )
}
2021-11-09 10:12:10 -06:00
existingBackend , ok , err := g . pipelineStorage . GetWriteConfig ( c . Req . Context ( ) , c . OrgId , pipeline . WriteConfigGetCmd {
2021-11-05 04:13:40 -05:00
UID : cmd . UID ,
} )
if err != nil {
2021-11-09 10:12:10 -06:00
return response . Error ( http . StatusInternalServerError , "Failed to get write config" , err )
2021-11-05 04:13:40 -05:00
}
if ok {
if cmd . SecureSettings == nil {
cmd . SecureSettings = map [ string ] string { }
}
2021-11-12 05:16:39 -06:00
secureJSONData , err := g . SecretsService . DecryptJsonData ( c . Req . Context ( ) , existingBackend . SecureSettings )
2021-11-05 04:13:40 -05:00
if err != nil {
logger . Error ( "Error decrypting secure settings" , "error" , err )
return response . Error ( http . StatusInternalServerError , "Error decrypting secure settings" , err )
}
for k , v := range secureJSONData {
if _ , ok := cmd . SecureSettings [ k ] ; ! ok {
cmd . SecureSettings [ k ] = v
}
}
}
2021-11-09 10:12:10 -06:00
result , err := g . pipelineStorage . UpdateWriteConfig ( c . Req . Context ( ) , c . OrgId , cmd )
2021-11-05 04:13:40 -05:00
if err != nil {
2021-11-09 10:12:10 -06:00
return response . Error ( http . StatusInternalServerError , "Failed to update write config" , err )
2021-11-05 04:13:40 -05:00
}
return response . JSON ( http . StatusOK , util . DynMap {
2021-11-09 10:12:10 -06:00
"writeConfig" : pipeline . WriteConfigToDto ( result ) ,
2021-11-05 04:13:40 -05:00
} )
}
2021-11-09 10:12:10 -06:00
// HandleWriteConfigsDeleteHTTP ...
func ( g * GrafanaLive ) HandleWriteConfigsDeleteHTTP ( c * models . ReqContext ) response . Response {
2021-11-05 04:13:40 -05:00
body , err := ioutil . ReadAll ( c . Req . Body )
if err != nil {
return response . Error ( http . StatusInternalServerError , "Error reading body" , err )
}
2021-11-09 10:12:10 -06:00
var cmd pipeline . WriteConfigDeleteCmd
2021-11-05 04:13:40 -05:00
err = json . Unmarshal ( body , & cmd )
if err != nil {
2021-11-09 10:12:10 -06:00
return response . Error ( http . StatusBadRequest , "Error decoding write config delete command" , err )
2021-11-05 04:13:40 -05:00
}
if cmd . UID == "" {
return response . Error ( http . StatusBadRequest , "UID required" , nil )
}
2021-11-09 10:12:10 -06:00
err = g . pipelineStorage . DeleteWriteConfig ( c . Req . Context ( ) , c . OrgId , cmd )
2021-11-05 04:13:40 -05:00
if err != nil {
2021-11-09 10:12:10 -06:00
return response . Error ( http . StatusInternalServerError , "Failed to delete write config" , err )
2021-11-05 04:13:40 -05:00
}
return response . JSON ( http . StatusOK , util . DynMap { } )
}
2021-06-24 03:07:09 -05:00
// Write to the standard log15 logger
func handleLog ( msg centrifuge . LogEntry ) {
arr := make ( [ ] interface { } , 0 )
for k , v := range msg . Fields {
if v == nil {
v = "<nil>"
} else if v == "" {
v = "<empty>"
}
arr = append ( arr , k , v )
}
switch msg . Level {
case centrifuge . LogLevelDebug :
loggerCF . Debug ( msg . Message , arr ... )
case centrifuge . LogLevelError :
loggerCF . Error ( msg . Message , arr ... )
case centrifuge . LogLevelInfo :
loggerCF . Info ( msg . Message , arr ... )
default :
loggerCF . Debug ( msg . Message , arr ... )
}
}
2021-09-22 09:28:40 -05:00
func ( g * GrafanaLive ) sampleLiveStats ( ) {
numClients := g . node . Hub ( ) . NumClients ( )
numUsers := g . node . Hub ( ) . NumUsers ( )
2022-04-13 14:27:03 -05:00
numChannels := g . node . Hub ( ) . NumChannels ( )
var numNodes int
if info , err := g . node . Info ( ) ; err == nil {
numNodes = len ( info . Nodes )
}
2021-09-22 09:28:40 -05:00
g . usageStats . sampleCount ++
g . usageStats . numClientsSum += numClients
g . usageStats . numUsersSum += numUsers
if numClients > g . usageStats . numClientsMax {
g . usageStats . numClientsMax = numClients
}
if numUsers > g . usageStats . numUsersMax {
g . usageStats . numUsersMax = numUsers
}
2022-04-13 14:27:03 -05:00
if numNodes > g . usageStats . numNodesMax {
g . usageStats . numNodesMax = numNodes
}
if numChannels > g . usageStats . numChannelsMax {
g . usageStats . numChannelsMax = numChannels
2021-09-22 09:28:40 -05:00
}
}
2021-09-23 04:55:00 -05:00
func ( g * GrafanaLive ) resetLiveStats ( ) {
g . usageStats = usageStats { }
}
2022-04-13 14:27:03 -05:00
func getHistogramMetric ( val int , bounds [ ] int , metricPrefix string ) string {
for _ , bound := range bounds {
if val <= bound {
return metricPrefix + "le_" + strconv . Itoa ( bound )
}
}
return metricPrefix + "le_inf"
}
2021-09-23 04:55:00 -05:00
2022-04-13 14:27:03 -05:00
func ( g * GrafanaLive ) collectLiveStats ( _ context . Context ) ( map [ string ] interface { } , error ) {
liveUsersAvg := 0
liveClientsAvg := 0
2021-09-22 09:28:40 -05:00
2022-04-13 14:27:03 -05:00
if g . usageStats . sampleCount > 0 {
liveUsersAvg = g . usageStats . numUsersSum / g . usageStats . sampleCount
liveClientsAvg = g . usageStats . numClientsSum / g . usageStats . sampleCount
}
2021-09-22 09:28:40 -05:00
2022-04-13 14:27:03 -05:00
var liveEnabled int
if g . Cfg . LiveMaxConnections != 0 {
liveEnabled = 1
}
2021-09-22 09:28:40 -05:00
2022-04-13 14:27:03 -05:00
var liveHAEnabled int
if g . Cfg . LiveHAEngine != "" {
liveHAEnabled = 1
}
metrics := map [ string ] interface { } {
"stats.live_enabled.count" : liveEnabled ,
"stats.live_ha_enabled.count" : liveHAEnabled ,
"stats.live_samples.count" : g . usageStats . sampleCount ,
"stats.live_users_max.count" : g . usageStats . numUsersMax ,
"stats.live_users_avg.count" : liveUsersAvg ,
"stats.live_clients_max.count" : g . usageStats . numClientsMax ,
"stats.live_clients_avg.count" : liveClientsAvg ,
"stats.live_channels_max.count" : g . usageStats . numChannelsMax ,
"stats.live_nodes_max.count" : g . usageStats . numNodesMax ,
}
metrics [ getHistogramMetric ( g . usageStats . numClientsMax , [ ] int { 0 , 10 , 100 , 1000 , 10000 , 100000 } , "stats.live_clients_" ) ] = 1
metrics [ getHistogramMetric ( g . usageStats . numUsersMax , [ ] int { 0 , 10 , 100 , 1000 , 10000 , 100000 } , "stats.live_users_" ) ] = 1
metrics [ getHistogramMetric ( g . usageStats . numChannelsMax , [ ] int { 0 , 10 , 100 , 1000 , 10000 , 100000 } , "stats.live_channels_" ) ] = 1
metrics [ getHistogramMetric ( g . usageStats . numNodesMax , [ ] int { 1 , 3 , 9 } , "stats.live_nodes_" ) ] = 1
return metrics , nil
}
func ( g * GrafanaLive ) registerUsageMetrics ( ) {
g . usageStatsService . RegisterSendReportCallback ( g . resetLiveStats )
g . usageStatsService . RegisterMetricsFunc ( g . collectLiveStats )
2021-09-22 09:28:40 -05:00
}
type usageStats struct {
2022-04-13 14:27:03 -05:00
numClientsMax int
numClientsSum int
numUsersMax int
numUsersSum int
sampleCount int
numNodesMax int
numChannelsMax int
2021-09-22 09:28:40 -05:00
}