Alerting: Add more tracing for receivers service (#94572)

This commit is contained in:
Yuri Tseretyan 2024-10-11 11:41:13 -04:00 committed by GitHub
parent 0ed94dc71e
commit 18e66d22b1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 145 additions and 13 deletions

View File

@ -14,6 +14,7 @@ import (
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/log/logtest"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/accesscontrol"
contextmodel "github.com/grafana/grafana/pkg/services/contexthandler/model"
ac "github.com/grafana/grafana/pkg/services/ngalert/accesscontrol"
@ -375,6 +376,7 @@ func createNotificationSrvSutFromEnv(t *testing.T, env *testEnvironment) Notific
env.xact,
env.log,
fakes.NewFakeReceiverPermissionsService(),
tracing.InitializeTracerForTest(),
)
return NotificationSrv{
logger: env.log,

View File

@ -1892,7 +1892,7 @@ func createProvisioningSrvSut(t *testing.T) ProvisioningSrv {
func createProvisioningSrvSutFromEnv(t *testing.T, env *testEnvironment) ProvisioningSrv {
t.Helper()
tracer := tracing.InitializeTracerForTest()
configStore := legacy_storage.NewAlertmanagerConfigStore(env.configs)
receiverSvc := notifier.NewReceiverService(
ac.NewReceiverAccess[*models.Receiver](env.ac, true),
@ -1903,6 +1903,7 @@ func createProvisioningSrvSutFromEnv(t *testing.T, env *testEnvironment) Provisi
env.xact,
env.log,
ngalertfakes.NewFakeReceiverPermissionsService(),
tracer,
)
return ProvisioningSrv{
log: env.log,

View File

@ -133,6 +133,14 @@ func (r *Receiver) Validate(decryptFn DecryptFn) error {
return errors.Join(errs...)
}
func (r *Receiver) GetIntegrationTypes() []string {
result := make([]string, 0, len(r.Integrations))
for _, i := range r.Integrations {
result = append(result, i.Config.Type)
}
return result
}
// Integration is the domain model representation of an integration.
type Integration struct {
UID string

View File

@ -441,6 +441,7 @@ func (ng *AlertNG) init() error {
ng.store,
ng.Log,
ng.ResourcePermissions,
ng.tracer,
)
provisioningReceiverService := notifier.NewReceiverService(
ac.NewReceiverAccess[*models.Receiver](ng.accesscontrol, true),
@ -451,6 +452,7 @@ func (ng *AlertNG) init() error {
ng.store,
ng.Log,
ng.ResourcePermissions,
ng.tracer,
)
// Provisioning

View File

@ -7,11 +7,14 @@ import (
"fmt"
"strings"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/exp/maps"
"github.com/grafana/grafana/pkg/apimachinery/errutil"
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
ac "github.com/grafana/grafana/pkg/services/accesscontrol"
"github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
"github.com/grafana/grafana/pkg/services/ngalert/models"
@ -47,6 +50,7 @@ type ReceiverService struct {
log log.Logger
provenanceValidator validation.ProvenanceStatusTransitionValidator
resourcePermissions ac.ReceiverPermissionsService
tracer tracing.Tracer
}
type alertRuleNotificationSettingsStore interface {
@ -99,6 +103,7 @@ func NewReceiverService(
xact transactionManager,
log log.Logger,
resourcePermissions ac.ReceiverPermissionsService,
tracer tracing.Tracer,
) *ReceiverService {
return &ReceiverService{
authz: authz,
@ -110,12 +115,20 @@ func NewReceiverService(
log: log,
provenanceValidator: validation.ValidateProvenanceRelaxed,
resourcePermissions: resourcePermissions,
tracer: tracer,
}
}
// GetReceiver returns a receiver by name.
// The receiver's secure settings are decrypted if requested and the user has access to do so.
func (rs *ReceiverService) GetReceiver(ctx context.Context, q models.GetReceiverQuery, user identity.Requester) (*models.Receiver, error) {
ctx, span := rs.tracer.Start(ctx, "alerting.receivers.get", trace.WithAttributes(
attribute.Int64("query_org_id", q.OrgID),
attribute.String("query_name", q.Name),
attribute.Bool("query_decrypt", q.Decrypt),
))
defer span.End()
revision, err := rs.cfgStore.Get(ctx, q.OrgID)
if err != nil {
return nil, err
@ -125,6 +138,10 @@ func (rs *ReceiverService) GetReceiver(ctx context.Context, q models.GetReceiver
return nil, err
}
span.AddEvent("Loaded receiver", trace.WithAttributes(
attribute.String("concurrency_token", revision.ConcurrencyToken),
))
storedProvenances, err := rs.provisioningStore.GetProvenances(ctx, q.OrgID, (&definitions.EmbeddedContactPoint{}).ResourceType())
if err != nil {
return nil, err
@ -145,12 +162,12 @@ func (rs *ReceiverService) GetReceiver(ctx context.Context, q models.GetReceiver
if q.Decrypt {
err := rcv.Decrypt(rs.decryptor(ctx))
if err != nil {
rs.log.Warn("Failed to decrypt secure settings", "name", rcv.Name, "error", err)
rs.log.FromContext(ctx).Warn("Failed to decrypt secure settings", "name", rcv.Name, "error", err)
}
} else {
err := rcv.Encrypt(rs.encryptor(ctx))
if err != nil {
rs.log.Warn("Failed to encrypt secure settings", "name", rcv.Name, "error", err)
rs.log.FromContext(ctx).Warn("Failed to encrypt secure settings", "name", rcv.Name, "error", err)
}
}
@ -160,6 +177,15 @@ func (rs *ReceiverService) GetReceiver(ctx context.Context, q models.GetReceiver
// GetReceivers returns a list of receivers a user has access to.
// Receivers can be filtered by name, and secure settings are decrypted if requested and the user has access to do so.
func (rs *ReceiverService) GetReceivers(ctx context.Context, q models.GetReceiversQuery, user identity.Requester) ([]*models.Receiver, error) {
ctx, span := rs.tracer.Start(ctx, "alerting.receivers.getMany", trace.WithAttributes(
attribute.Int64("query_org_id", q.OrgID),
attribute.StringSlice("query_names", q.Names),
attribute.Int("query_limit", q.Limit),
attribute.Int("query_offset", q.Offset),
attribute.Bool("query_decrypt", q.Decrypt),
))
defer span.End()
uids := make([]string, 0, len(q.Names))
for _, name := range q.Names {
uids = append(uids, legacy_storage.NameToUid(name))
@ -171,6 +197,11 @@ func (rs *ReceiverService) GetReceivers(ctx context.Context, q models.GetReceive
}
postables := revision.GetReceivers(uids)
span.AddEvent("Loaded receivers", trace.WithAttributes(
attribute.String("concurrency_token", revision.ConcurrencyToken),
attribute.Int("count", len(postables)),
))
storedProvenances, err := rs.provisioningStore.GetProvenances(ctx, q.OrgID, (&definitions.EmbeddedContactPoint{}).ResourceType())
if err != nil {
return nil, err
@ -189,16 +220,20 @@ func (rs *ReceiverService) GetReceivers(ctx context.Context, q models.GetReceive
return nil, err
}
span.AddEvent("Applied access control filter", trace.WithAttributes(
attribute.Int("count", len(receivers)),
))
for _, rcv := range filtered {
if q.Decrypt {
err := rcv.Decrypt(rs.decryptor(ctx))
if err != nil {
rs.log.Warn("Failed to decrypt secure settings", "name", rcv.Name, "error", err)
rs.log.FromContext(ctx).Warn("Failed to decrypt secure settings", "name", rcv.Name, "error", err)
}
} else {
err := rcv.Encrypt(rs.encryptor(ctx))
if err != nil {
rs.log.Warn("Failed to encrypt secure settings", "name", rcv.Name, "error", err)
rs.log.FromContext(ctx).Warn("Failed to encrypt secure settings", "name", rcv.Name, "error", err)
}
}
}
@ -212,6 +247,14 @@ func (rs *ReceiverService) GetReceivers(ctx context.Context, q models.GetReceive
// If the users has list access, all receiver settings will be removed from the response. This option is for backwards compatibility with the v1/receivers endpoint
// and should be removed when FGAC is fully implemented.
func (rs *ReceiverService) ListReceivers(ctx context.Context, q models.ListReceiversQuery, user identity.Requester) ([]*models.Receiver, error) { // TODO: Remove this method with FGAC.
ctx, span := rs.tracer.Start(ctx, "alerting.receivers.list", trace.WithAttributes(
attribute.Int64("query_org_id", q.OrgID),
attribute.StringSlice("query_names", q.Names),
attribute.Int("query_limit", q.Limit),
attribute.Int("query_offset", q.Offset),
))
defer span.End()
listAccess, err := rs.authz.HasList(ctx, user)
if err != nil {
return nil, err
@ -228,6 +271,11 @@ func (rs *ReceiverService) ListReceivers(ctx context.Context, q models.ListRecei
}
postables := revision.GetReceivers(uids)
span.AddEvent("Loaded receivers", trace.WithAttributes(
attribute.String("concurrency_token", revision.ConcurrencyToken),
attribute.Int("count", len(postables)),
))
storedProvenances, err := rs.provisioningStore.GetProvenances(ctx, q.OrgID, (&definitions.EmbeddedContactPoint{}).ResourceType())
if err != nil {
return nil, err
@ -243,6 +291,10 @@ func (rs *ReceiverService) ListReceivers(ctx context.Context, q models.ListRecei
if err != nil {
return nil, err
}
span.AddEvent("Applied access control filter", trace.WithAttributes(
attribute.Int("count", len(receivers)),
))
}
// Remove settings.
@ -260,6 +312,12 @@ func (rs *ReceiverService) ListReceivers(ctx context.Context, q models.ListRecei
// DeleteReceiver deletes a receiver by uid.
// UID field currently does not exist, we assume the uid is a particular hashed value of the receiver name.
func (rs *ReceiverService) DeleteReceiver(ctx context.Context, uid string, callerProvenance definitions.Provenance, version string, orgID int64, user identity.Requester) error {
ctx, span := rs.tracer.Start(ctx, "alerting.receivers.delete", trace.WithAttributes(
attribute.String("receiver_uid", uid),
attribute.String("receiver_version", version),
))
defer span.End()
if err := rs.authz.AuthorizeDeleteByUID(ctx, user, uid); err != nil {
return err
}
@ -284,6 +342,8 @@ func (rs *ReceiverService) DeleteReceiver(ctx context.Context, uid string, calle
return err
}
logger := rs.log.FromContext(ctx).New("receiver", existing.Name, "uid", uid, "version", version, "integrations", existing.GetIntegrationTypes())
// Check optimistic concurrency.
// Optimistic concurrency is optional for delete operations, but we still check it if a version is provided.
if version != "" {
@ -292,7 +352,7 @@ func (rs *ReceiverService) DeleteReceiver(ctx context.Context, uid string, calle
return err
}
} else {
rs.log.Debug("Ignoring optimistic concurrency check because version was not provided", "receiver", existing.Name, "operation", "delete")
logger.Debug("Ignoring optimistic concurrency check because version was not provided", "operation", "delete")
}
if err := rs.provenanceValidator(existing.Provenance, models.Provenance(callerProvenance)); err != nil {
@ -306,25 +366,37 @@ func (rs *ReceiverService) DeleteReceiver(ctx context.Context, uid string, calle
}
if usedByRoutes || len(usedByRules) > 0 {
logger.Warn("Cannot delete receiver because it is used", "used_by_routes", usedByRoutes, "used_by_rules", len(usedByRules))
return makeReceiverInUseErr(usedByRoutes, usedByRules)
}
revision.DeleteReceiver(uid)
return rs.xact.InTransaction(ctx, func(ctx context.Context) error {
err = rs.xact.InTransaction(ctx, func(ctx context.Context) error {
err = rs.cfgStore.Save(ctx, revision, orgID)
if err != nil {
return err
}
err = rs.resourcePermissions.DeleteResourcePermissions(ctx, orgID, uid)
if err != nil {
rs.log.Error("Could not delete receiver permissions", "receiver", existing.Name, "error", err)
logger.Error("Could not delete receiver permissions", "error", err)
}
return rs.deleteProvenances(ctx, orgID, existing.Integrations)
})
if err != nil {
return err
}
logger.Info("Deleted receiver")
return nil
}
func (rs *ReceiverService) CreateReceiver(ctx context.Context, r *models.Receiver, orgID int64, user identity.Requester) (*models.Receiver, error) {
func (rs *ReceiverService) CreateReceiver(ctx context.Context, r *models.Receiver, orgID int64, user identity.Requester) (result *models.Receiver, err error) {
ctx, span := rs.tracer.Start(ctx, "alerting.receivers.create", trace.WithAttributes(
attribute.String("receiver", r.Name),
attribute.StringSlice("integrations", r.GetIntegrationTypes()),
))
defer span.End()
if err := rs.authz.AuthorizeCreate(ctx, user); err != nil {
return nil, err
}
@ -334,6 +406,8 @@ func (rs *ReceiverService) CreateReceiver(ctx context.Context, r *models.Receive
return nil, err
}
span.AddEvent("Loaded Alertmanager configuration", trace.WithAttributes(attribute.String("concurrency_token", revision.ConcurrencyToken)))
createdReceiver := r.Clone()
err = createdReceiver.Encrypt(rs.encryptor(ctx))
if err != nil {
@ -341,6 +415,7 @@ func (rs *ReceiverService) CreateReceiver(ctx context.Context, r *models.Receive
}
if err := createdReceiver.Validate(rs.decryptor(ctx)); err != nil {
span.RecordError(err)
return nil, legacy_storage.MakeErrReceiverInvalid(err)
}
@ -364,19 +439,34 @@ func (rs *ReceiverService) CreateReceiver(ctx context.Context, r *models.Receive
return nil, err
}
result, err := PostableApiReceiverToReceiver(created, createdReceiver.Provenance)
result, err = PostableApiReceiverToReceiver(created, createdReceiver.Provenance)
if err != nil {
return nil, err
}
span.AddEvent("Created a new receiver", trace.WithAttributes(
attribute.String("uid", result.UID),
attribute.String("version", result.Version),
))
rs.log.FromContext(ctx).Info("Created a new receiver", "receiver", result.Name, "uid", result.UID, "fingerprint", result.Version, "integrations", result.GetIntegrationTypes())
return result, nil
}
func (rs *ReceiverService) UpdateReceiver(ctx context.Context, r *models.Receiver, storedSecureFields map[string][]string, orgID int64, user identity.Requester) (*models.Receiver, error) {
// TODO: To support receiver renaming, we need to consider permissions on old and new UID since UIDs are tied to names.
ctx, span := rs.tracer.Start(ctx, "alerting.receivers.update", trace.WithAttributes(
attribute.String("receiver", r.Name),
attribute.String("uid", r.UID),
attribute.String("version", r.Version),
attribute.StringSlice("integrations", r.GetIntegrationTypes()),
))
defer span.End()
if err := rs.authz.AuthorizeUpdate(ctx, user, r); err != nil {
return nil, err
}
logger := rs.log.FromContext(ctx).New("receiver", r.Name, "uid", r.UID, "version", r.Version, "integrations", r.GetIntegrationTypes())
logger.Debug("Updating receiver")
revision, err := rs.cfgStore.Get(ctx, orgID)
if err != nil {
return nil, err
@ -395,6 +485,14 @@ func (rs *ReceiverService) UpdateReceiver(ctx context.Context, r *models.Receive
return nil, err
}
span.AddEvent("Loaded current receiver", trace.WithAttributes(
attribute.String("concurrency_token", revision.ConcurrencyToken),
attribute.String("receiver", existing.Name),
attribute.String("uid", existing.UID),
attribute.String("version", existing.Version),
attribute.StringSlice("integrations", existing.GetIntegrationTypes()),
))
// Check optimistic concurrency.
err = rs.checkOptimisticConcurrency(existing, r.Version)
if err != nil {
@ -440,7 +538,7 @@ func (rs *ReceiverService) UpdateReceiver(ctx context.Context, r *models.Receive
return err
}
if permissionsUpdated > 0 {
rs.log.FromContext(ctx).Info("Moved custom receiver permissions", "oldName", existing.Name, "newName", r.Name, "count", permissionsUpdated)
logger.Info("Moved custom receiver permissions", "oldName", existing.Name, "count", permissionsUpdated)
}
if err := rs.resourcePermissions.DeleteResourcePermissions(ctx, orgID, legacy_storage.NameToUid(existing.Name)); err != nil {
return err
@ -465,6 +563,7 @@ func (rs *ReceiverService) UpdateReceiver(ctx context.Context, r *models.Receive
if err != nil {
return nil, err
}
logger.Info("Updated receiver", "new_version", result.Version)
return result, nil
}
@ -662,6 +761,13 @@ func makeErrReceiverDependentResourcesProvenance(usedByRoutes bool, rules []mode
}
func (rs *ReceiverService) RenameReceiverInDependentResources(ctx context.Context, orgID int64, route *definitions.Route, oldName, newName string, receiverProvenance models.Provenance) error {
ctx, span := rs.tracer.Start(ctx, "alerting.receivers.rename-dependent-resources", trace.WithAttributes(
attribute.String("oldName", oldName),
attribute.String("newName", newName),
attribute.String("receiver_provenance", string(receiverProvenance)),
))
defer span.End()
validate := validation.ValidateProvenanceOfDependentResources(receiverProvenance)
// if there are no references to the old time interval, exit
updatedRoutes := legacy_storage.RenameReceiverInRoute(oldName, newName, route)
@ -679,7 +785,12 @@ func (rs *ReceiverService) RenameReceiverInDependentResources(ctx context.Contex
return err
}
if !canUpdate || len(invalidProvenance) > 0 {
return makeErrReceiverDependentResourcesProvenance(updatedRoutes > 0, invalidProvenance)
err := makeErrReceiverDependentResourcesProvenance(updatedRoutes > 0, invalidProvenance)
span.RecordError(err, trace.WithAttributes(
attribute.Bool("invalid_route_provenance", canUpdate),
attribute.Int("invalid_rule_provenances", len(invalidProvenance)),
))
return err
}
if len(affected) > 0 || updatedRoutes > 0 {
rs.log.FromContext(ctx).Info("Updated rules and routes that use renamed receiver", "oldName", oldName, "newName", newName, "rules", len(affected), "routes", updatedRoutes)

View File

@ -14,6 +14,7 @@ import (
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/accesscontrol"
"github.com/grafana/grafana/pkg/services/accesscontrol/acimpl"
"github.com/grafana/grafana/pkg/services/authz/zanzana"
@ -1484,6 +1485,7 @@ func createReceiverServiceSut(t *testing.T, encryptSvc secretService) *ReceiverS
xact,
log.NewNopLogger(),
fakes.NewFakeReceiverPermissionsService(),
tracing.InitializeTracerForTest(),
)
}

View File

@ -17,6 +17,7 @@ import (
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/accesscontrol"
"github.com/grafana/grafana/pkg/services/accesscontrol/acimpl"
"github.com/grafana/grafana/pkg/services/authz/zanzana"
@ -494,6 +495,7 @@ func createContactPointServiceSutWithConfigStore(t *testing.T, secretService sec
xact,
log.NewNopLogger(),
fakes.NewFakeReceiverPermissionsService(),
tracing.InitializeTracerForTest(),
)
return NewContactPointService(

View File

@ -9,6 +9,7 @@ import (
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/services/accesscontrol"
"github.com/grafana/grafana/pkg/services/correlations"
@ -54,6 +55,7 @@ func ProvideService(
secrectService secrets.Service,
orgService org.Service,
resourcePermissions accesscontrol.ReceiverPermissionsService,
tracer tracing.Tracer,
) (*ProvisioningServiceImpl, error) {
s := &ProvisioningServiceImpl{
Cfg: cfg,
@ -157,6 +159,7 @@ type ProvisioningServiceImpl struct {
secretService secrets.Service
folderService folder.Service
resourcePermissions accesscontrol.ReceiverPermissionsService
tracer tracing.Tracer
}
func (ps *ProvisioningServiceImpl) RunInitProvisioners(ctx context.Context) error {
@ -291,6 +294,7 @@ func (ps *ProvisioningServiceImpl) ProvisionAlerting(ctx context.Context) error
ps.SQLStore,
ps.log,
ps.resourcePermissions,
ps.tracer,
)
contactPointService := provisioning.NewContactPointService(configStore, ps.secretService,
st, ps.SQLStore, receiverSvc, ps.log, &st, ps.resourcePermissions)