2021-08-24 05:28:09 -05:00
package notifier
import (
"context"
"fmt"
2022-08-11 06:21:12 -05:00
"os"
2021-10-12 05:05:02 -05:00
"path/filepath"
"strconv"
2021-08-24 05:28:09 -05:00
"sync"
"time"
2023-01-30 02:55:35 -06:00
"github.com/prometheus/alertmanager/cluster"
"github.com/prometheus/client_golang/prometheus"
2023-02-03 10:36:49 -06:00
alertingNotify "github.com/grafana/alerting/notify"
2021-09-09 11:25:22 -05:00
"github.com/grafana/grafana/pkg/infra/kvstore"
2021-08-24 05:28:09 -05:00
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
2021-09-21 10:01:23 -05:00
"github.com/grafana/grafana/pkg/services/ngalert/models"
2023-01-12 12:47:22 -06:00
"github.com/grafana/grafana/pkg/services/ngalert/provisioning"
2021-08-24 05:28:09 -05:00
"github.com/grafana/grafana/pkg/services/ngalert/store"
2023-01-12 12:47:22 -06:00
"github.com/grafana/grafana/pkg/services/notifications"
"github.com/grafana/grafana/pkg/services/secrets"
2021-08-24 05:28:09 -05:00
"github.com/grafana/grafana/pkg/setting"
)
var (
ErrNoAlertmanagerForOrg = fmt . Errorf ( "Alertmanager does not exist for this organization" )
ErrAlertmanagerNotReady = fmt . Errorf ( "Alertmanager is not ready yet" )
)
type MultiOrgAlertmanager struct {
2022-04-22 11:57:56 -05:00
Crypto Crypto
ProvStore provisioning . ProvisioningStore
2022-04-14 13:06:21 -05:00
2021-08-24 05:28:09 -05:00
alertmanagersMtx sync . RWMutex
alertmanagers map [ int64 ] * Alertmanager
settings * setting . Cfg
logger log . Logger
2021-09-16 09:33:51 -05:00
// clusterPeer represents the clustering peers of Alertmanagers between Grafana instances.
2023-02-03 10:36:49 -06:00
peer alertingNotify . ClusterPeer
2021-09-16 09:33:51 -05:00
settleCancel context . CancelFunc
2022-05-23 01:24:20 -05:00
configStore AlertingStore
2021-08-24 05:28:09 -05:00
orgStore store . OrgStore
2021-09-09 11:25:22 -05:00
kvStore kvstore . KVStore
2021-08-24 05:28:09 -05:00
2023-04-25 12:39:46 -05:00
decryptFn alertingNotify . GetDecryptedValueFn
2021-10-07 09:33:50 -05:00
2021-09-14 06:55:01 -05:00
metrics * metrics . MultiOrgAlertmanager
2022-01-26 09:42:40 -06:00
ns notifications . Service
2021-08-24 05:28:09 -05:00
}
2022-05-23 01:24:20 -05:00
func NewMultiOrgAlertmanager ( cfg * setting . Cfg , configStore AlertingStore , orgStore store . OrgStore ,
2023-04-25 12:39:46 -05:00
kvStore kvstore . KVStore , provStore provisioning . ProvisioningStore , decryptFn alertingNotify . GetDecryptedValueFn ,
2022-04-22 11:57:56 -05:00
m * metrics . MultiOrgAlertmanager , ns notifications . Service , l log . Logger , s secrets . Service ,
2021-10-07 09:33:50 -05:00
) ( * MultiOrgAlertmanager , error ) {
2021-09-16 09:33:51 -05:00
moa := & MultiOrgAlertmanager {
2022-04-22 11:57:56 -05:00
Crypto : NewCrypto ( s , configStore , l ) ,
ProvStore : provStore ,
2022-04-14 13:06:21 -05:00
2021-09-16 09:33:51 -05:00
logger : l ,
2021-08-24 05:28:09 -05:00
settings : cfg ,
alertmanagers : map [ int64 ] * Alertmanager { } ,
configStore : configStore ,
orgStore : orgStore ,
2021-09-09 11:25:22 -05:00
kvStore : kvStore ,
2021-10-07 09:33:50 -05:00
decryptFn : decryptFn ,
2021-09-14 06:55:01 -05:00
metrics : m ,
2022-01-26 09:42:40 -06:00
ns : ns ,
2023-04-19 10:05:26 -05:00
peer : & NilPeer { } ,
2021-08-24 05:28:09 -05:00
}
2023-04-19 10:05:26 -05:00
if err := moa . setupClustering ( cfg ) ; err != nil {
return nil , err
}
return moa , nil
}
2021-09-16 09:33:51 -05:00
2023-04-19 10:05:26 -05:00
func ( moa * MultiOrgAlertmanager ) setupClustering ( cfg * setting . Cfg ) error {
clusterLogger := moa . logger . New ( "component" , "clustering" )
// We set the settlement timeout to be a multiple of the gossip interval,
// ensuring that a sufficient number of broadcasts have occurred, thereby
// increasing the probability of success when waiting for the cluster to settle.
const settleTimeout = cluster . DefaultGossipInterval * 10
// Redis setup.
if cfg . UnifiedAlerting . HARedisAddr != "" {
redisPeer , err := newRedisPeer ( redisConfig {
addr : cfg . UnifiedAlerting . HARedisAddr ,
name : cfg . UnifiedAlerting . HARedisPeerName ,
prefix : cfg . UnifiedAlerting . HARedisPrefix ,
password : cfg . UnifiedAlerting . HARedisPassword ,
username : cfg . UnifiedAlerting . HARedisUsername ,
db : cfg . UnifiedAlerting . HARedisDB ,
} , clusterLogger , moa . metrics . Registerer , cfg . UnifiedAlerting . HAPushPullInterval )
if err != nil {
return fmt . Errorf ( "unable to initialize redis: %w" , err )
}
var ctx context . Context
ctx , moa . settleCancel = context . WithTimeout ( context . Background ( ) , 30 * time . Second )
go redisPeer . Settle ( ctx , settleTimeout )
moa . peer = redisPeer
return nil
}
// Memberlist setup.
2021-09-20 02:12:21 -05:00
if len ( cfg . UnifiedAlerting . HAPeers ) > 0 {
2021-09-16 09:33:51 -05:00
peer , err := cluster . Create (
clusterLogger ,
2023-04-19 10:05:26 -05:00
moa . metrics . Registerer ,
2021-09-20 02:12:21 -05:00
cfg . UnifiedAlerting . HAListenAddr ,
cfg . UnifiedAlerting . HAAdvertiseAddr ,
cfg . UnifiedAlerting . HAPeers , // peers
2021-09-16 09:33:51 -05:00
true ,
2021-09-20 02:12:21 -05:00
cfg . UnifiedAlerting . HAPushPullInterval ,
cfg . UnifiedAlerting . HAGossipInterval ,
2022-10-03 08:58:41 -05:00
cluster . DefaultTCPTimeout ,
2021-09-16 09:33:51 -05:00
cluster . DefaultProbeTimeout ,
cluster . DefaultProbeInterval ,
nil ,
2021-11-19 09:50:55 -06:00
true ,
2023-05-09 03:32:23 -05:00
cfg . UnifiedAlerting . HALabel ,
2021-09-16 09:33:51 -05:00
)
if err != nil {
2023-04-19 10:05:26 -05:00
return fmt . Errorf ( "unable to initialize gossip mesh: %w" , err )
2021-09-16 09:33:51 -05:00
}
err = peer . Join ( cluster . DefaultReconnectInterval , cluster . DefaultReconnectTimeout )
if err != nil {
2023-04-19 10:05:26 -05:00
moa . logger . Error ( "msg" , "unable to join gossip mesh while initializing cluster for high availability mode" , "error" , err )
2021-09-16 09:33:51 -05:00
}
// Attempt to verify the number of peers for 30s every 2s. The risk here is what we send a notification "too soon".
// Which should _never_ happen given we share the notification log via the database so the risk of double notification is very low.
var ctx context . Context
ctx , moa . settleCancel = context . WithTimeout ( context . Background ( ) , 30 * time . Second )
2023-04-19 10:05:26 -05:00
go peer . Settle ( ctx , settleTimeout )
2021-09-16 09:33:51 -05:00
moa . peer = peer
2023-04-19 10:05:26 -05:00
return nil
2021-09-16 09:33:51 -05:00
}
2023-04-19 10:05:26 -05:00
return nil
2021-08-24 05:28:09 -05:00
}
func ( moa * MultiOrgAlertmanager ) Run ( ctx context . Context ) error {
moa . logger . Info ( "starting MultiOrg Alertmanager" )
for {
select {
case <- ctx . Done ( ) :
moa . StopAndWait ( )
return nil
2021-09-20 02:12:21 -05:00
case <- time . After ( moa . settings . UnifiedAlerting . AlertmanagerConfigPollInterval ) :
2021-08-24 05:28:09 -05:00
if err := moa . LoadAndSyncAlertmanagersForOrgs ( ctx ) ; err != nil {
2022-10-19 16:36:54 -05:00
moa . logger . Error ( "error while synchronizing Alertmanager orgs" , "error" , err )
2021-08-24 05:28:09 -05:00
}
}
}
}
func ( moa * MultiOrgAlertmanager ) LoadAndSyncAlertmanagersForOrgs ( ctx context . Context ) error {
moa . logger . Debug ( "synchronizing Alertmanagers for orgs" )
// First, load all the organizations from the database.
orgIDs , err := moa . orgStore . GetOrgs ( ctx )
if err != nil {
return err
}
// Then, sync them by creating or deleting Alertmanagers as necessary.
2021-09-14 06:55:01 -05:00
moa . metrics . DiscoveredConfigurations . Set ( float64 ( len ( orgIDs ) ) )
2021-09-21 10:01:23 -05:00
moa . SyncAlertmanagersForOrgs ( ctx , orgIDs )
2021-08-24 05:28:09 -05:00
moa . logger . Debug ( "done synchronizing Alertmanagers for orgs" )
return nil
}
2021-09-21 10:01:23 -05:00
// getLatestConfigs retrieves the latest Alertmanager configuration for every organization. It returns a map where the key is the ID of each organization and the value is the configuration.
func ( moa * MultiOrgAlertmanager ) getLatestConfigs ( ctx context . Context ) ( map [ int64 ] * models . AlertConfiguration , error ) {
configs , err := moa . configStore . GetAllLatestAlertmanagerConfiguration ( ctx )
if err != nil {
return nil , err
}
result := make ( map [ int64 ] * models . AlertConfiguration , len ( configs ) )
for _ , config := range configs {
result [ config . OrgID ] = config
}
return result , nil
}
// SyncAlertmanagersForOrgs syncs configuration of the Alertmanager required by each organization.
func ( moa * MultiOrgAlertmanager ) SyncAlertmanagersForOrgs ( ctx context . Context , orgIDs [ ] int64 ) {
2021-08-24 05:28:09 -05:00
orgsFound := make ( map [ int64 ] struct { } , len ( orgIDs ) )
2021-09-21 10:01:23 -05:00
dbConfigs , err := moa . getLatestConfigs ( ctx )
if err != nil {
2022-10-19 16:36:54 -05:00
moa . logger . Error ( "failed to load Alertmanager configurations" , "error" , err )
2021-09-21 10:01:23 -05:00
return
}
2021-08-24 05:28:09 -05:00
moa . alertmanagersMtx . Lock ( )
for _ , orgID := range orgIDs {
2021-09-29 09:16:40 -05:00
if _ , isDisabledOrg := moa . settings . UnifiedAlerting . DisabledOrgs [ orgID ] ; isDisabledOrg {
2023-01-17 17:13:27 -06:00
moa . logger . Debug ( "skipping syncing Alertmanager for disabled org" , "org" , orgID )
2021-09-29 09:16:40 -05:00
continue
}
2021-08-24 05:28:09 -05:00
orgsFound [ orgID ] = struct { } { }
2021-09-21 10:01:23 -05:00
alertmanager , found := moa . alertmanagers [ orgID ]
2021-09-29 09:16:40 -05:00
2021-08-24 05:28:09 -05:00
if ! found {
2021-09-14 06:55:01 -05:00
// These metrics are not exported by Grafana and are mostly a placeholder.
// To export them, we need to translate the metrics from each individual registry and,
// then aggregate them on the main registry.
m := metrics . NewAlertmanagerMetrics ( moa . metrics . GetOrCreateOrgRegistry ( orgID ) )
2022-01-26 09:42:40 -06:00
am , err := newAlertmanager ( ctx , orgID , moa . settings , moa . configStore , moa . kvStore , moa . peer , moa . decryptFn , moa . ns , m )
2021-08-24 05:28:09 -05:00
if err != nil {
2022-10-19 16:36:54 -05:00
moa . logger . Error ( "unable to create Alertmanager for org" , "org" , orgID , "error" , err )
2021-08-24 05:28:09 -05:00
}
2021-10-12 12:10:08 -05:00
moa . alertmanagers [ orgID ] = am
2021-09-21 10:01:23 -05:00
alertmanager = am
2021-08-24 05:28:09 -05:00
}
2021-09-21 10:01:23 -05:00
dbConfig , cfgFound := dbConfigs [ orgID ]
if ! cfgFound {
if found {
// This means that the configuration is gone but the organization, as well as the Alertmanager, exists.
moa . logger . Warn ( "Alertmanager exists for org but the configuration is gone. Applying the default configuration" , "org" , orgID )
}
2022-02-09 03:22:09 -06:00
err := alertmanager . SaveAndApplyDefaultConfig ( ctx )
2021-09-21 10:01:23 -05:00
if err != nil {
moa . logger . Error ( "failed to apply the default Alertmanager configuration" , "org" , orgID )
continue
}
moa . alertmanagers [ orgID ] = alertmanager
continue
}
2023-02-02 11:45:17 -06:00
err := alertmanager . ApplyConfig ( ctx , dbConfig )
2021-09-21 10:01:23 -05:00
if err != nil {
2022-10-19 16:36:54 -05:00
moa . logger . Error ( "failed to apply Alertmanager config for org" , "org" , orgID , "id" , dbConfig . ID , "error" , err )
2021-09-21 10:01:23 -05:00
continue
2021-08-24 05:28:09 -05:00
}
2021-09-21 10:01:23 -05:00
moa . alertmanagers [ orgID ] = alertmanager
2021-08-24 05:28:09 -05:00
}
amsToStop := map [ int64 ] * Alertmanager { }
for orgId , am := range moa . alertmanagers {
if _ , exists := orgsFound [ orgId ] ; ! exists {
amsToStop [ orgId ] = am
delete ( moa . alertmanagers , orgId )
2021-09-14 06:55:01 -05:00
moa . metrics . RemoveOrgRegistry ( orgId )
2021-08-24 05:28:09 -05:00
}
}
2021-09-14 06:55:01 -05:00
moa . metrics . ActiveConfigurations . Set ( float64 ( len ( moa . alertmanagers ) ) )
2021-08-24 05:28:09 -05:00
moa . alertmanagersMtx . Unlock ( )
// Now, we can stop the Alertmanagers without having to hold a lock.
for orgID , am := range amsToStop {
moa . logger . Info ( "stopping Alertmanager" , "org" , orgID )
am . StopAndWait ( )
moa . logger . Info ( "stopped Alertmanager" , "org" , orgID )
2021-10-12 05:05:02 -05:00
// Cleanup all the remaining resources from this alertmanager.
am . fileStore . CleanUp ( )
}
// We look for orphan directories and remove them. Orphan directories can
// occur when an organization is deleted and the node running Grafana is
// shutdown before the next sync is executed.
2021-10-14 05:04:00 -05:00
moa . cleanupOrphanLocalOrgState ( ctx , orgsFound )
2021-10-12 05:05:02 -05:00
}
// cleanupOrphanLocalOrgState will check if there is any organization on
// disk that is not part of the active organizations. If this is the case
// it will delete the local state from disk.
2021-10-14 05:04:00 -05:00
func ( moa * MultiOrgAlertmanager ) cleanupOrphanLocalOrgState ( ctx context . Context ,
activeOrganizations map [ int64 ] struct { } ) {
2021-10-12 05:05:02 -05:00
dataDir := filepath . Join ( moa . settings . DataPath , workingDir )
2022-08-11 06:21:12 -05:00
files , err := os . ReadDir ( dataDir )
2021-10-12 05:05:02 -05:00
if err != nil {
2022-10-19 16:36:54 -05:00
moa . logger . Error ( "failed to list local working directory" , "dir" , dataDir , "error" , err )
2021-10-12 05:05:02 -05:00
return
}
for _ , file := range files {
if ! file . IsDir ( ) {
moa . logger . Warn ( "ignoring unexpected file while scanning local working directory" , "filename" , filepath . Join ( dataDir , file . Name ( ) ) )
continue
}
orgID , err := strconv . ParseInt ( file . Name ( ) , 10 , 64 )
if err != nil {
2022-10-19 16:36:54 -05:00
moa . logger . Error ( "unable to parse orgID from directory name" , "name" , file . Name ( ) , "error" , err )
2021-10-12 05:05:02 -05:00
continue
}
_ , exists := activeOrganizations [ orgID ]
if ! exists {
moa . logger . Info ( "found orphan organization directory" , "orgID" , orgID )
workingDirPath := filepath . Join ( dataDir , strconv . FormatInt ( orgID , 10 ) )
fileStore := NewFileStore ( orgID , moa . kvStore , workingDirPath )
// Cleanup all the remaining resources from this alertmanager.
fileStore . CleanUp ( )
}
2021-08-24 05:28:09 -05:00
}
2021-10-14 05:04:00 -05:00
// Remove all orphaned items from kvstore by listing all existing items
// in our used namespace and comparing them to the currently active
// organizations.
storedFiles := [ ] string { notificationLogFilename , silencesFilename }
for _ , fileName := range storedFiles {
keys , err := moa . kvStore . Keys ( ctx , kvstore . AllOrganizations , KVNamespace , fileName )
if err != nil {
2022-10-19 16:36:54 -05:00
moa . logger . Error ( "failed to fetch items from kvstore" , "error" , err ,
2021-10-14 05:04:00 -05:00
"namespace" , KVNamespace , "key" , fileName )
}
for _ , key := range keys {
if _ , exists := activeOrganizations [ key . OrgId ] ; exists {
continue
}
err = moa . kvStore . Del ( ctx , key . OrgId , key . Namespace , key . Key )
if err != nil {
2022-10-19 16:36:54 -05:00
moa . logger . Error ( "failed to delete item from kvstore" , "error" , err ,
2021-10-14 05:04:00 -05:00
"orgID" , key . OrgId , "namespace" , KVNamespace , "key" , key . Key )
}
}
}
2021-08-24 05:28:09 -05:00
}
func ( moa * MultiOrgAlertmanager ) StopAndWait ( ) {
moa . alertmanagersMtx . Lock ( )
defer moa . alertmanagersMtx . Unlock ( )
for _ , am := range moa . alertmanagers {
am . StopAndWait ( )
}
2021-09-16 09:33:51 -05:00
p , ok := moa . peer . ( * cluster . Peer )
if ok {
moa . settleCancel ( )
if err := p . Leave ( 10 * time . Second ) ; err != nil {
2022-10-19 16:36:54 -05:00
moa . logger . Warn ( "unable to leave the gossip mesh" , "error" , err )
2021-09-16 09:33:51 -05:00
}
}
2023-04-19 10:05:26 -05:00
r , ok := moa . peer . ( * redisPeer )
if ok {
moa . settleCancel ( )
r . Shutdown ( )
}
2021-08-24 05:28:09 -05:00
}
// AlertmanagerFor returns the Alertmanager instance for the organization provided.
// When the organization does not have an active Alertmanager, it returns a ErrNoAlertmanagerForOrg.
// When the Alertmanager of the organization is not ready, it returns a ErrAlertmanagerNotReady.
func ( moa * MultiOrgAlertmanager ) AlertmanagerFor ( orgID int64 ) ( * Alertmanager , error ) {
moa . alertmanagersMtx . RLock ( )
defer moa . alertmanagersMtx . RUnlock ( )
orgAM , existing := moa . alertmanagers [ orgID ]
if ! existing {
return nil , ErrNoAlertmanagerForOrg
}
if ! orgAM . Ready ( ) {
2021-12-27 17:01:17 -06:00
return orgAM , ErrAlertmanagerNotReady
2021-08-24 05:28:09 -05:00
}
return orgAM , nil
}
2021-09-16 09:33:51 -05:00
// NilPeer and NilChannel implements the Alertmanager clustering interface.
type NilPeer struct { }
func ( p * NilPeer ) Position ( ) int { return 0 }
func ( p * NilPeer ) WaitReady ( context . Context ) error { return nil }
func ( p * NilPeer ) AddState ( string , cluster . State , prometheus . Registerer ) cluster . ClusterChannel {
return & NilChannel { }
}
type NilChannel struct { }
func ( c * NilChannel ) Broadcast ( [ ] byte ) { }