Live: Default to local engine if redis is unavailable (#77467)

* Live: Default to local engine if redis is unavailable

* fix error formatting
This commit is contained in:
João Calisto 2023-11-08 12:22:26 +00:00 committed by GitHub
parent c371dbb658
commit cbb85b77a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 72 additions and 31 deletions

View File

@ -120,55 +120,36 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r
}
g.node = node
redisHealthy := false
if g.IsHA() {
// Configure HA with Redis. In this case Centrifuge nodes
// will be connected over Redis PUB/SUB. Presence will work
// globally since kept inside Redis.
redisAddress := g.Cfg.LiveHAEngineAddress
redisPassword := g.Cfg.LiveHAEnginePassword
redisShardConfigs := []centrifuge.RedisShardConfig{
{Address: redisAddress, Password: redisPassword},
}
var redisShards []*centrifuge.RedisShard
for _, redisConf := range redisShardConfigs {
redisShard, err := centrifuge.NewRedisShard(node, redisConf)
if err != nil {
return nil, fmt.Errorf("error connecting to Live Redis: %v", err)
}
redisShards = append(redisShards, redisShard)
}
broker, err := centrifuge.NewRedisBroker(node, centrifuge.RedisBrokerConfig{
Prefix: "gf_live",
Shards: redisShards,
})
err := setupRedisLiveEngine(g, node)
if err != nil {
return nil, fmt.Errorf("error creating Live Redis broker: %v", err)
logger.Error("failed to setup redis live engine: %v", err)
} else {
redisHealthy = true
}
node.SetBroker(broker)
presenceManager, err := centrifuge.NewRedisPresenceManager(node, centrifuge.RedisPresenceManagerConfig{
Prefix: "gf_live",
Shards: redisShards,
})
if err != nil {
return nil, fmt.Errorf("error creating Live Redis presence manager: %v", err)
}
node.SetPresenceManager(presenceManager)
}
channelLocalPublisher := liveplugin.NewChannelLocalPublisher(node, nil)
var managedStreamRunner *managedstream.Runner
if g.IsHA() {
var redisClient *redis.Client
if g.IsHA() && redisHealthy {
redisClient := redis.NewClient(&redis.Options{
Addr: g.Cfg.LiveHAEngineAddress,
Password: g.Cfg.LiveHAEnginePassword,
})
cmd := redisClient.Ping(context.Background())
if _, err := cmd.Result(); err != nil {
return nil, fmt.Errorf("error pinging Redis: %v", err)
logger.Error("live engine failed to ping redis, proceeding without live ha, error: %v", err)
redisClient = nil
}
}
if redisClient != nil {
managedStreamRunner = managedstream.NewRunner(
g.Publish,
channelLocalPublisher,
@ -346,6 +327,41 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r
return g, nil
}
func setupRedisLiveEngine(g *GrafanaLive, node *centrifuge.Node) error {
redisAddress := g.Cfg.LiveHAEngineAddress
redisPassword := g.Cfg.LiveHAEnginePassword
redisShardConfigs := []centrifuge.RedisShardConfig{
{Address: redisAddress, Password: redisPassword},
}
var redisShards []*centrifuge.RedisShard
for _, redisConf := range redisShardConfigs {
redisShard, err := centrifuge.NewRedisShard(node, redisConf)
if err != nil {
return fmt.Errorf("error connecting to Live Redis: %v", err)
}
redisShards = append(redisShards, redisShard)
}
broker, err := centrifuge.NewRedisBroker(node, centrifuge.RedisBrokerConfig{
Prefix: "gf_live",
Shards: redisShards,
})
if err != nil {
return fmt.Errorf("error creating Live Redis broker: %v", err)
}
node.SetBroker(broker)
presenceManager, err := centrifuge.NewRedisPresenceManager(node, centrifuge.RedisPresenceManagerConfig{
Prefix: "gf_live",
Shards: redisShards,
})
if err != nil {
return fmt.Errorf("error creating Live Redis presence manager: %v", err)
}
node.SetPresenceManager(presenceManager)
return nil
}
// GrafanaLive manages live real-time connections to Grafana (over WebSocket at this moment).
// The main concept here is Channel. Connections can subscribe to many channels. Each channel
// can have different permissions and properties but once a connection subscribed to a channel

View File

@ -9,9 +9,34 @@ import (
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/api/routing"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/usagestats"
"github.com/grafana/grafana/pkg/services/accesscontrol/acimpl"
"github.com/grafana/grafana/pkg/services/annotations/annotationstest"
"github.com/grafana/grafana/pkg/services/dashboards"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/setting"
)
func Test_provideLiveService_RedisUnavailable(t *testing.T) {
cfg := setting.NewCfg()
cfg.LiveHAEngine = "testredisunavailable"
_, err := ProvideService(nil, cfg,
routing.NewRouteRegister(),
nil, nil, nil, nil,
db.InitTestDB(t),
nil,
&usagestats.UsageStatsMock{T: t},
nil,
featuremgmt.WithFeatures(), acimpl.ProvideAccessControl(cfg), &dashboards.FakeDashboardService{}, annotationstest.NewFakeAnnotationsRepo(), nil)
// Proceeds without live HA if redis is unavaialble
require.NoError(t, err)
}
func Test_runConcurrentlyIfNeeded_Concurrent(t *testing.T) {
doneCh := make(chan struct{})
f := func() {