Live: Add ha_prefix (#93759)

This commit is contained in:
Todd Treece 2024-09-25 16:20:35 -04:00 committed by GitHub
parent 57ab354139
commit 277d82db9a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 40 additions and 9 deletions

View File

@ -1764,6 +1764,9 @@ ha_engine_address = "127.0.0.1:6379"
# ha_engine_password allows setting an optional password to authenticate with the engine
ha_engine_password = ""
# ha_prefix is a prefix for keys in the HA engine. It's used to separate keys for different Grafana instances.
ha_prefix =
#################################### Grafana Image Renderer Plugin ##########################
[plugin.grafana-image-renderer]
# Instruct headless browser instance to use a default timezone when not provided by Grafana, e.g. when rendering panel image of alert.

View File

@ -1718,6 +1718,9 @@ timeout = 30s
# ha_engine_password allows setting an optional password to authenticate with the engine
;ha_engine_password = ""
# ha_prefix is a prefix for keys in the HA engine. It's used to separate keys for different Grafana instances.
;ha_prefix =
#################################### Grafana Image Renderer Plugin ##########################
[plugin.grafana-image-renderer]
# Instruct headless browser instance to use a default timezone when not provided by Grafana, e.g. when rendering panel image of alert.

View File

@ -97,6 +97,11 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r
},
usageStatsService: usageStatsService,
orgService: orgService,
keyPrefix: "gf_live",
}
if cfg.LiveHAPrefix != "" {
g.keyPrefix = cfg.LiveHAPrefix + ".gf_live"
}
logger.Debug("GrafanaLive initialization", "ha", g.IsHA())
@ -153,7 +158,7 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r
managedStreamRunner = managedstream.NewRunner(
g.Publish,
channelLocalPublisher,
managedstream.NewRedisFrameCache(redisClient),
managedstream.NewRedisFrameCache(redisClient, g.keyPrefix),
)
} else {
managedStreamRunner = managedstream.NewRunner(
@ -344,7 +349,7 @@ func setupRedisLiveEngine(g *GrafanaLive, node *centrifuge.Node) error {
}
broker, err := centrifuge.NewRedisBroker(node, centrifuge.RedisBrokerConfig{
Prefix: "gf_live",
Prefix: g.keyPrefix,
Shards: redisShards,
})
if err != nil {
@ -354,7 +359,7 @@ func setupRedisLiveEngine(g *GrafanaLive, node *centrifuge.Node) error {
node.SetBroker(broker)
presenceManager, err := centrifuge.NewRedisPresenceManager(node, centrifuge.RedisPresenceManagerConfig{
Prefix: "gf_live",
Prefix: g.keyPrefix,
Shards: redisShards,
})
if err != nil {
@ -385,6 +390,8 @@ type GrafanaLive struct {
queryDataService query.Service
orgService org.Service
keyPrefix string
node *centrifuge.Node
surveyCaller *survey.Caller

View File

@ -18,11 +18,13 @@ type RedisFrameCache struct {
mu sync.RWMutex
redisClient *redis.Client
frames map[int64]map[string]data.FrameJSONCache
keyPrefix string
}
// NewRedisFrameCache ...
func NewRedisFrameCache(redisClient *redis.Client) *RedisFrameCache {
func NewRedisFrameCache(redisClient *redis.Client, keyPrefix string) *RedisFrameCache {
return &RedisFrameCache{
keyPrefix: keyPrefix,
frames: map[int64]map[string]data.FrameJSONCache{},
redisClient: redisClient,
}
@ -43,7 +45,7 @@ func (c *RedisFrameCache) GetActiveChannels(orgID int64) (map[string]json.RawMes
}
func (c *RedisFrameCache) GetFrame(ctx context.Context, orgID int64, channel string) (json.RawMessage, bool, error) {
key := getCacheKey(orgchannel.PrependOrgID(orgID, channel))
key := c.getCacheKey(orgchannel.PrependOrgID(orgID, channel))
cmd := c.redisClient.HGetAll(ctx, key)
result, err := cmd.Result()
if err != nil {
@ -69,7 +71,7 @@ func (c *RedisFrameCache) Update(ctx context.Context, orgID int64, channel strin
stringSchema := string(jsonFrame.Bytes(data.IncludeSchemaOnly))
key := getCacheKey(orgchannel.PrependOrgID(orgID, channel))
key := c.getCacheKey(orgchannel.PrependOrgID(orgID, channel))
pipe := c.redisClient.TxPipeline()
defer func() { _ = pipe.Close() }()
@ -107,6 +109,6 @@ func (c *RedisFrameCache) Update(ctx context.Context, orgID int64, channel strin
return true, nil
}
func getCacheKey(channelID string) string {
return "gf_live.managed_stream." + channelID
func (c *RedisFrameCache) getCacheKey(channelID string) string {
return c.keyPrefix + ".managed_stream." + channelID
}

View File

@ -2,6 +2,7 @@ package managedstream
import (
"os"
"strings"
"testing"
"github.com/go-redis/redis/v8"
@ -30,7 +31,19 @@ func TestIntegrationRedisCacheStorage(t *testing.T) {
Addr: addr,
DB: db,
})
c := NewRedisFrameCache(redisClient)
c := NewRedisFrameCache(redisClient, "A")
require.NotNil(t, c)
testFrameCache(t, c)
keys, err := redisClient.Keys(redisClient.Context(), "*").Result()
if err != nil {
require.NoError(t, err)
}
require.NotZero(t, len(keys))
for _, key := range keys {
require.Equal(t, "A", key[:1])
require.True(t, strings.HasPrefix(key, "A"), "key", key)
}
}

View File

@ -423,6 +423,8 @@ type Cfg struct {
// LiveHAEngine is a type of engine to use to achieve HA with Grafana Live.
// Zero value means in-memory single node setup.
LiveHAEngine string
// LiveHAPRefix is a prefix for HA engine keys.
LiveHAPrefix string
// LiveHAEngineAddress is a connection address for Live HA engine.
LiveHAEngineAddress string
LiveHAEnginePassword string
@ -2024,6 +2026,7 @@ func (cfg *Cfg) readLiveSettings(iniFile *ini.File) error {
default:
return fmt.Errorf("unsupported live HA engine type: %s", cfg.LiveHAEngine)
}
cfg.LiveHAPrefix = section.Key("ha_prefix").MustString("")
cfg.LiveHAEngineAddress = section.Key("ha_engine_address").MustString("127.0.0.1:6379")
cfg.LiveHAEnginePassword = section.Key("ha_engine_password").MustString("")