diff --git a/pkg/services/live/live.go b/pkg/services/live/live.go index 4017af85092..d81f7a99967 100644 --- a/pkg/services/live/live.go +++ b/pkg/services/live/live.go @@ -120,55 +120,36 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r } g.node = node + redisHealthy := false if g.IsHA() { // Configure HA with Redis. In this case Centrifuge nodes // will be connected over Redis PUB/SUB. Presence will work // globally since kept inside Redis. - redisAddress := g.Cfg.LiveHAEngineAddress - redisPassword := g.Cfg.LiveHAEnginePassword - redisShardConfigs := []centrifuge.RedisShardConfig{ - {Address: redisAddress, Password: redisPassword}, - } - var redisShards []*centrifuge.RedisShard - for _, redisConf := range redisShardConfigs { - redisShard, err := centrifuge.NewRedisShard(node, redisConf) - if err != nil { - return nil, fmt.Errorf("error connecting to Live Redis: %v", err) - } - redisShards = append(redisShards, redisShard) - } - - broker, err := centrifuge.NewRedisBroker(node, centrifuge.RedisBrokerConfig{ - Prefix: "gf_live", - Shards: redisShards, - }) + err := setupRedisLiveEngine(g, node) if err != nil { - return nil, fmt.Errorf("error creating Live Redis broker: %v", err) + logger.Error("failed to setup redis live engine: %v", err) + } else { + redisHealthy = true } - node.SetBroker(broker) - - presenceManager, err := centrifuge.NewRedisPresenceManager(node, centrifuge.RedisPresenceManagerConfig{ - Prefix: "gf_live", - Shards: redisShards, - }) - if err != nil { - return nil, fmt.Errorf("error creating Live Redis presence manager: %v", err) - } - node.SetPresenceManager(presenceManager) } channelLocalPublisher := liveplugin.NewChannelLocalPublisher(node, nil) var managedStreamRunner *managedstream.Runner - if g.IsHA() { + var redisClient *redis.Client + if g.IsHA() && redisHealthy { redisClient := redis.NewClient(&redis.Options{ Addr: g.Cfg.LiveHAEngineAddress, Password: g.Cfg.LiveHAEnginePassword, }) cmd := redisClient.Ping(context.Background()) if _, err := cmd.Result(); err != nil { - return nil, fmt.Errorf("error pinging Redis: %v", err) + logger.Error("live engine failed to ping redis, proceeding without live ha, error: %v", err) + redisClient = nil } + } + + if redisClient != nil { managedStreamRunner = managedstream.NewRunner( g.Publish, channelLocalPublisher, @@ -346,6 +327,41 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r return g, nil } +func setupRedisLiveEngine(g *GrafanaLive, node *centrifuge.Node) error { + redisAddress := g.Cfg.LiveHAEngineAddress + redisPassword := g.Cfg.LiveHAEnginePassword + redisShardConfigs := []centrifuge.RedisShardConfig{ + {Address: redisAddress, Password: redisPassword}, + } + var redisShards []*centrifuge.RedisShard + for _, redisConf := range redisShardConfigs { + redisShard, err := centrifuge.NewRedisShard(node, redisConf) + if err != nil { + return fmt.Errorf("error connecting to Live Redis: %v", err) + } + redisShards = append(redisShards, redisShard) + } + + broker, err := centrifuge.NewRedisBroker(node, centrifuge.RedisBrokerConfig{ + Prefix: "gf_live", + Shards: redisShards, + }) + if err != nil { + return fmt.Errorf("error creating Live Redis broker: %v", err) + } + node.SetBroker(broker) + + presenceManager, err := centrifuge.NewRedisPresenceManager(node, centrifuge.RedisPresenceManagerConfig{ + Prefix: "gf_live", + Shards: redisShards, + }) + if err != nil { + return fmt.Errorf("error creating Live Redis presence manager: %v", err) + } + node.SetPresenceManager(presenceManager) + return nil +} + // GrafanaLive manages live real-time connections to Grafana (over WebSocket at this moment). // The main concept here is Channel. Connections can subscribe to many channels. Each channel // can have different permissions and properties but once a connection subscribed to a channel diff --git a/pkg/services/live/live_test.go b/pkg/services/live/live_test.go index a448e55d7c0..f19f1a8e30b 100644 --- a/pkg/services/live/live_test.go +++ b/pkg/services/live/live_test.go @@ -9,9 +9,34 @@ import ( "github.com/stretchr/testify/require" + "github.com/grafana/grafana/pkg/api/routing" + "github.com/grafana/grafana/pkg/infra/db" + "github.com/grafana/grafana/pkg/infra/usagestats" + "github.com/grafana/grafana/pkg/services/accesscontrol/acimpl" + "github.com/grafana/grafana/pkg/services/annotations/annotationstest" + "github.com/grafana/grafana/pkg/services/dashboards" + "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/setting" ) +func Test_provideLiveService_RedisUnavailable(t *testing.T) { + cfg := setting.NewCfg() + + cfg.LiveHAEngine = "testredisunavailable" + + _, err := ProvideService(nil, cfg, + routing.NewRouteRegister(), + nil, nil, nil, nil, + db.InitTestDB(t), + nil, + &usagestats.UsageStatsMock{T: t}, + nil, + featuremgmt.WithFeatures(), acimpl.ProvideAccessControl(cfg), &dashboards.FakeDashboardService{}, annotationstest.NewFakeAnnotationsRepo(), nil) + + // Proceeds without live HA if redis is unavaialble + require.NoError(t, err) +} + func Test_runConcurrentlyIfNeeded_Concurrent(t *testing.T) { doneCh := make(chan struct{}) f := func() {