Advisor: Run check steps in parallel (#100200)

This commit is contained in:
Andres Martinez Gotor 2025-02-07 10:57:26 +01:00 committed by GitHub
parent 6dc98dbbcc
commit e291140be3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 225 additions and 155 deletions

View File

@ -12,7 +12,6 @@ import (
"github.com/grafana/grafana/pkg/services/datasources" "github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/pluginsintegration/pluginstore" "github.com/grafana/grafana/pkg/services/pluginsintegration/pluginstore"
"github.com/grafana/grafana/pkg/util" "github.com/grafana/grafana/pkg/util"
"k8s.io/klog/v2"
) )
type check struct { type check struct {
@ -76,26 +75,23 @@ func (s *uidValidationStep) Description() string {
return "Check if the UID of each data source is valid." return "Check if the UID of each data source is valid."
} }
func (s *uidValidationStep) Run(ctx context.Context, obj *advisor.CheckSpec, items []any) ([]advisor.CheckReportError, error) { func (s *uidValidationStep) Run(ctx context.Context, obj *advisor.CheckSpec, i any) (*advisor.CheckReportError, error) {
dsErrs := []advisor.CheckReportError{} ds, ok := i.(*datasources.DataSource)
for _, i := range items { if !ok {
ds, ok := i.(*datasources.DataSource) return nil, fmt.Errorf("invalid item type %T", i)
if !ok {
return nil, fmt.Errorf("invalid item type %T", i)
}
// Data source UID validation
err := util.ValidateUID(ds.UID)
if err != nil {
dsErrs = append(dsErrs, checks.NewCheckReportError(
advisor.CheckReportErrorSeverityLow,
fmt.Sprintf("Invalid UID '%s' for data source %s", ds.UID, ds.Name),
"Check the <a href='https://grafana.com/docs/grafana/latest/upgrade-guide/upgrade-v11.2/#grafana-data-source-uid-format-enforcement' target=_blank>documentation</a> for more information.",
s.ID(),
ds.UID,
))
}
} }
return dsErrs, nil // Data source UID validation
err := util.ValidateUID(ds.UID)
if err != nil {
return checks.NewCheckReportError(
advisor.CheckReportErrorSeverityLow,
fmt.Sprintf("Invalid UID '%s' for data source %s", ds.UID, ds.Name),
"Check the <a href='https://grafana.com/docs/grafana/latest/upgrade-guide/upgrade-v11.2/#grafana-data-source-uid-format-enforcement' target=_blank>documentation</a> for more information.",
s.ID(),
ds.UID,
), nil
}
return nil, nil
} }
type healthCheckStep struct { type healthCheckStep struct {
@ -115,46 +111,38 @@ func (s *healthCheckStep) ID() string {
return "health-check" return "health-check"
} }
func (s *healthCheckStep) Run(ctx context.Context, obj *advisor.CheckSpec, items []any) ([]advisor.CheckReportError, error) { func (s *healthCheckStep) Run(ctx context.Context, obj *advisor.CheckSpec, i any) (*advisor.CheckReportError, error) {
dsErrs := []advisor.CheckReportError{} ds, ok := i.(*datasources.DataSource)
for _, i := range items { if !ok {
ds, ok := i.(*datasources.DataSource) return nil, fmt.Errorf("invalid item type %T", i)
if !ok {
return nil, fmt.Errorf("invalid item type %T", i)
}
// Health check execution
requester, err := identity.GetRequester(ctx)
if err != nil {
return nil, err
}
pCtx, err := s.PluginContextProvider.GetWithDataSource(ctx, ds.Type, requester, ds)
if err != nil {
klog.ErrorS(err, "Error creating plugin context", "datasource", ds.Name)
continue
}
req := &backend.CheckHealthRequest{
PluginContext: pCtx,
Headers: map[string]string{},
}
resp, err := s.PluginClient.CheckHealth(ctx, req)
if err != nil {
fmt.Println("Error checking health", err)
continue
}
if resp.Status != backend.HealthStatusOk {
dsErrs = append(dsErrs, checks.NewCheckReportError(
advisor.CheckReportErrorSeverityHigh,
fmt.Sprintf("Health check failed for %s", ds.Name),
fmt.Sprintf(
"Go to the <a href='/connections/datasources/edit/%s'>data source configuration</a>"+
" and address the issues reported.", ds.UID),
s.ID(),
ds.UID,
))
}
} }
return dsErrs, nil
// Health check execution
requester, err := identity.GetRequester(ctx)
if err != nil {
return nil, err
}
pCtx, err := s.PluginContextProvider.GetWithDataSource(ctx, ds.Type, requester, ds)
if err != nil {
return nil, fmt.Errorf("failed to get plugin context: %w", err)
}
req := &backend.CheckHealthRequest{
PluginContext: pCtx,
Headers: map[string]string{},
}
resp, err := s.PluginClient.CheckHealth(ctx, req)
if err != nil || resp.Status != backend.HealthStatusOk {
return checks.NewCheckReportError(
advisor.CheckReportErrorSeverityHigh,
fmt.Sprintf("Health check failed for %s", ds.Name),
fmt.Sprintf(
"Go to the <a href='/connections/datasources/edit/%s'>data source configuration</a>"+
" and address the issues reported.", ds.UID),
s.ID(),
ds.UID,
), nil
}
return nil, nil
} }
type pluginContextProvider interface { type pluginContextProvider interface {

View File

@ -35,9 +35,13 @@ func TestCheck_Run(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
errs := []advisor.CheckReportError{} errs := []advisor.CheckReportError{}
for _, step := range check.Steps() { for _, step := range check.Steps() {
stepErrs, err := step.Run(ctx, &advisor.CheckSpec{}, items) for _, item := range items {
assert.NoError(t, err) stepErr, err := step.Run(ctx, &advisor.CheckSpec{}, item)
errs = append(errs, stepErrs...) assert.NoError(t, err)
if stepErr != nil {
errs = append(errs, *stepErr)
}
}
} }
assert.NoError(t, err) assert.NoError(t, err)
@ -65,9 +69,13 @@ func TestCheck_Run(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
errs := []advisor.CheckReportError{} errs := []advisor.CheckReportError{}
for _, step := range check.Steps() { for _, step := range check.Steps() {
stepErrs, err := step.Run(ctx, &advisor.CheckSpec{}, items) for _, item := range items {
assert.NoError(t, err) stepErr, err := step.Run(ctx, &advisor.CheckSpec{}, item)
errs = append(errs, stepErrs...) assert.NoError(t, err)
if stepErr != nil {
errs = append(errs, *stepErr)
}
}
} }
assert.NoError(t, err) assert.NoError(t, err)
@ -96,9 +104,13 @@ func TestCheck_Run(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
errs := []advisor.CheckReportError{} errs := []advisor.CheckReportError{}
for _, step := range check.Steps() { for _, step := range check.Steps() {
stepErrs, err := step.Run(ctx, &advisor.CheckSpec{}, items) for _, item := range items {
assert.NoError(t, err) stepErr, err := step.Run(ctx, &advisor.CheckSpec{}, item)
errs = append(errs, stepErrs...) assert.NoError(t, err)
if stepErr != nil {
errs = append(errs, *stepErr)
}
}
} }
assert.NoError(t, err) assert.NoError(t, err)

View File

@ -24,6 +24,6 @@ type Step interface {
Title() string Title() string
// Description returns the description of the step // Description returns the description of the step
Description() string Description() string
// Run executes the step and returns a list of errors // Run executes the step for an item and returns a report
Run(ctx context.Context, obj *advisorv0alpha1.CheckSpec, items []any) ([]advisorv0alpha1.CheckReportError, error) Run(ctx context.Context, obj *advisorv0alpha1.CheckSpec, item any) (*advisorv0alpha1.CheckReportError, error)
} }

View File

@ -78,35 +78,33 @@ func (s *deprecationStep) ID() string {
return "deprecation" return "deprecation"
} }
func (s *deprecationStep) Run(ctx context.Context, _ *advisor.CheckSpec, items []any) ([]advisor.CheckReportError, error) { func (s *deprecationStep) Run(ctx context.Context, _ *advisor.CheckSpec, it any) (*advisor.CheckReportError, error) {
errs := []advisor.CheckReportError{} p, ok := it.(pluginstore.Plugin)
for _, i := range items { if !ok {
p, ok := i.(pluginstore.Plugin) return nil, fmt.Errorf("invalid item type %T", it)
if !ok {
return nil, fmt.Errorf("invalid item type %T", i)
}
// Skip if it's a core plugin
if p.IsCorePlugin() {
continue
}
// Check if plugin is deprecated
i, err := s.PluginRepo.PluginInfo(ctx, p.ID)
if err != nil {
continue
}
if i.Status == "deprecated" {
errs = append(errs, checks.NewCheckReportError(
advisor.CheckReportErrorSeverityHigh,
fmt.Sprintf("Plugin deprecated: %s", p.ID),
"Check the <a href='https://grafana.com/legal/plugin-deprecation/#a-plugin-i-use-is-deprecated-what-should-i-do' target=_blank>documentation</a> for recommended steps.",
s.ID(),
p.ID,
))
}
} }
return errs, nil
// Skip if it's a core plugin
if p.IsCorePlugin() {
return nil, nil
}
// Check if plugin is deprecated
i, err := s.PluginRepo.PluginInfo(ctx, p.ID)
if err != nil {
// Unable to check deprecation status
return nil, nil
}
if i.Status == "deprecated" {
return checks.NewCheckReportError(
advisor.CheckReportErrorSeverityHigh,
fmt.Sprintf("Plugin deprecated: %s", p.ID),
"Check the <a href='https://grafana.com/legal/plugin-deprecation/#a-plugin-i-use-is-deprecated-what-should-i-do' target=_blank>documentation</a> for recommended steps.",
s.ID(),
p.ID,
), nil
}
return nil, nil
} }
type updateStep struct { type updateStep struct {
@ -127,44 +125,42 @@ func (s *updateStep) ID() string {
return "update" return "update"
} }
func (s *updateStep) Run(ctx context.Context, _ *advisor.CheckSpec, items []any) ([]advisor.CheckReportError, error) { func (s *updateStep) Run(ctx context.Context, _ *advisor.CheckSpec, i any) (*advisor.CheckReportError, error) {
errs := []advisor.CheckReportError{} p, ok := i.(pluginstore.Plugin)
for _, i := range items { if !ok {
p, ok := i.(pluginstore.Plugin) return nil, fmt.Errorf("invalid item type %T", i)
if !ok {
return nil, fmt.Errorf("invalid item type %T", i)
}
// Skip if it's a core plugin
if p.IsCorePlugin() {
continue
}
// Skip if it's managed or pinned
if s.isManaged(ctx, p.ID) || s.PluginPreinstall.IsPinned(p.ID) {
continue
}
// Check if plugin has a newer version available
compatOpts := repo.NewCompatOpts(services.GrafanaVersion, sysruntime.GOOS, sysruntime.GOARCH)
info, err := s.PluginRepo.GetPluginArchiveInfo(ctx, p.ID, "", compatOpts)
if err != nil {
continue
}
if hasUpdate(p, info) {
errs = append(errs, checks.NewCheckReportError(
advisor.CheckReportErrorSeverityLow,
fmt.Sprintf("New version available for %s", p.ID),
fmt.Sprintf(
"Go to the <a href='/plugins/%s?page=version-history'>plugin admin page</a>"+
" and upgrade to the latest version.", p.ID),
s.ID(),
p.ID,
))
}
} }
return errs, nil // Skip if it's a core plugin
if p.IsCorePlugin() {
return nil, nil
}
// Skip if it's managed or pinned
if s.isManaged(ctx, p.ID) || s.PluginPreinstall.IsPinned(p.ID) {
return nil, nil
}
// Check if plugin has a newer version available
compatOpts := repo.NewCompatOpts(services.GrafanaVersion, sysruntime.GOOS, sysruntime.GOARCH)
info, err := s.PluginRepo.GetPluginArchiveInfo(ctx, p.ID, "", compatOpts)
if err != nil {
// Unable to check updates
return nil, nil
}
if hasUpdate(p, info) {
return checks.NewCheckReportError(
advisor.CheckReportErrorSeverityLow,
fmt.Sprintf("New version available for %s", p.ID),
fmt.Sprintf(
"Go to the <a href='/plugins/%s?page=version-history'>plugin admin page</a>"+
" and upgrade to the latest version.", p.ID),
s.ID(),
p.ID,
), nil
}
return nil, nil
} }
func hasUpdate(current pluginstore.Plugin, latest *repo.PluginArchiveInfo) bool { func hasUpdate(current pluginstore.Plugin, latest *repo.PluginArchiveInfo) bool {

View File

@ -136,9 +136,13 @@ func TestRun(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
errs := []advisor.CheckReportError{} errs := []advisor.CheckReportError{}
for _, step := range check.Steps() { for _, step := range check.Steps() {
stepErrs, err := step.Run(context.Background(), &advisor.CheckSpec{}, items) for _, item := range items {
assert.NoError(t, err) stepErr, err := step.Run(context.Background(), &advisor.CheckSpec{}, item)
errs = append(errs, stepErrs...) assert.NoError(t, err)
if stepErr != nil {
errs = append(errs, *stepErr)
}
}
} }
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, len(tt.plugins), len(items)) assert.Equal(t, len(tt.plugins), len(items))

View File

@ -10,8 +10,8 @@ func NewCheckReportError(
action string, action string,
stepID string, stepID string,
itemID string, itemID string,
) advisor.CheckReportError { ) *advisor.CheckReportError {
return advisor.CheckReportError{ return &advisor.CheckReportError{
Severity: severity, Severity: severity,
Reason: reason, Reason: reason,
Action: action, Action: action,

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"sync"
claims "github.com/grafana/authlib/types" claims "github.com/grafana/authlib/types"
"github.com/grafana/grafana-app-sdk/resource" "github.com/grafana/grafana-app-sdk/resource"
@ -83,20 +84,17 @@ func processCheck(ctx context.Context, client resource.Client, obj resource.Obje
} }
// Run the steps // Run the steps
steps := check.Steps() steps := check.Steps()
errs := []advisorv0alpha1.CheckReportError{} reportErrors, err := runStepsInParallel(ctx, &c.Spec, steps, items)
for _, step := range steps { if err != nil {
stepErrs, err := step.Run(ctx, &c.Spec, items) setErr := setStatusAnnotation(ctx, client, obj, "error")
if err != nil { if setErr != nil {
setErr := setStatusAnnotation(ctx, client, obj, "error") return setErr
if setErr != nil {
return setErr
}
return fmt.Errorf("error running step %s: %w", step.Title(), err)
} }
errs = append(errs, stepErrs...) return fmt.Errorf("error running steps: %w", err)
} }
report := &advisorv0alpha1.CheckV0alpha1StatusReport{ report := &advisorv0alpha1.CheckV0alpha1StatusReport{
Errors: errs, Errors: reportErrors,
Count: int64(len(items)), Count: int64(len(items)),
} }
err = setStatusAnnotation(ctx, client, obj, "processed") err = setStatusAnnotation(ctx, client, obj, "processed")
@ -111,3 +109,35 @@ func processCheck(ctx context.Context, client resource.Client, obj resource.Obje
}}, }},
}, resource.PatchOptions{}, obj) }, resource.PatchOptions{}, obj)
} }
func runStepsInParallel(ctx context.Context, spec *advisorv0alpha1.CheckSpec, steps []checks.Step, items []any) ([]advisorv0alpha1.CheckReportError, error) {
reportErrs := []advisorv0alpha1.CheckReportError{}
var internalErr error
var wg sync.WaitGroup
var mu sync.Mutex
// Avoid too many concurrent requests
limit := make(chan struct{}, 10)
for _, step := range steps {
for _, item := range items {
wg.Add(1)
limit <- struct{}{}
go func(step checks.Step, item any) {
defer wg.Done()
defer func() { <-limit }()
stepErr, err := step.Run(ctx, spec, item)
mu.Lock()
defer mu.Unlock()
if err != nil {
internalErr = fmt.Errorf("error running step %s: %w", step.ID(), err)
return
}
if stepErr != nil {
reportErrs = append(reportErrs, *stepErr)
}
}(step, item)
}
}
wg.Wait()
return reportErrs, internalErr
}

View File

@ -3,6 +3,7 @@ package app
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"testing" "testing"
"github.com/grafana/grafana-app-sdk/resource" "github.com/grafana/grafana-app-sdk/resource"
@ -68,13 +69,45 @@ func TestProcessCheck(t *testing.T) {
meta.SetCreatedBy("user:1") meta.SetCreatedBy("user:1")
client := &mockClient{} client := &mockClient{}
ctx := context.TODO() ctx := context.TODO()
check := &mockCheck{} check := &mockCheck{
items: []any{"item"},
}
err = processCheck(ctx, client, obj, check) err = processCheck(ctx, client, obj, check)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, "processed", obj.GetAnnotations()[statusAnnotation]) assert.Equal(t, "processed", obj.GetAnnotations()[statusAnnotation])
} }
func TestProcessMultipleCheckItems(t *testing.T) {
obj := &advisorv0alpha1.Check{}
obj.SetAnnotations(map[string]string{})
meta, err := utils.MetaAccessor(obj)
if err != nil {
t.Fatal(err)
}
meta.SetCreatedBy("user:1")
client := &mockClient{}
ctx := context.TODO()
items := make([]any, 100)
for i := range items {
if i%2 == 0 {
items[i] = fmt.Sprintf("item-%d", i)
} else {
items[i] = errors.New("error")
}
}
check := &mockCheck{
items: items,
}
err = processCheck(ctx, client, obj, check)
assert.NoError(t, err)
assert.Equal(t, "processed", obj.GetAnnotations()[statusAnnotation])
r := client.lastValue.(advisorv0alpha1.CheckV0alpha1StatusReport)
assert.Equal(t, r.Count, int64(100))
assert.Len(t, r.Errors, 50)
}
func TestProcessCheck_AlreadyProcessed(t *testing.T) { func TestProcessCheck_AlreadyProcessed(t *testing.T) {
obj := &advisorv0alpha1.Check{} obj := &advisorv0alpha1.Check{}
obj.SetAnnotations(map[string]string{statusAnnotation: "processed"}) obj.SetAnnotations(map[string]string{statusAnnotation: "processed"})
@ -98,7 +131,8 @@ func TestProcessCheck_RunError(t *testing.T) {
ctx := context.TODO() ctx := context.TODO()
check := &mockCheck{ check := &mockCheck{
err: errors.New("run error"), items: []any{"item"},
err: errors.New("run error"),
} }
err = processCheck(ctx, client, obj, check) err = processCheck(ctx, client, obj, check)
@ -108,14 +142,17 @@ func TestProcessCheck_RunError(t *testing.T) {
type mockClient struct { type mockClient struct {
resource.Client resource.Client
lastValue any
} }
func (m *mockClient) PatchInto(ctx context.Context, id resource.Identifier, req resource.PatchRequest, opts resource.PatchOptions, obj resource.Object) error { func (m *mockClient) PatchInto(ctx context.Context, id resource.Identifier, req resource.PatchRequest, opts resource.PatchOptions, obj resource.Object) error {
m.lastValue = req.Operations[0].Value
return nil return nil
} }
type mockCheck struct { type mockCheck struct {
err error err error
items []any
} }
func (m *mockCheck) ID() string { func (m *mockCheck) ID() string {
@ -123,7 +160,7 @@ func (m *mockCheck) ID() string {
} }
func (m *mockCheck) Items(ctx context.Context) ([]any, error) { func (m *mockCheck) Items(ctx context.Context) ([]any, error) {
return []any{}, nil return m.items, nil
} }
func (m *mockCheck) Steps() []checks.Step { func (m *mockCheck) Steps() []checks.Step {
@ -136,10 +173,13 @@ type mockStep struct {
err error err error
} }
func (m *mockStep) Run(ctx context.Context, obj *advisorv0alpha1.CheckSpec, items []any) ([]advisorv0alpha1.CheckReportError, error) { func (m *mockStep) Run(ctx context.Context, obj *advisorv0alpha1.CheckSpec, items any) (*advisorv0alpha1.CheckReportError, error) {
if m.err != nil { if m.err != nil {
return nil, m.err return nil, m.err
} }
if _, ok := items.(error); ok {
return &advisorv0alpha1.CheckReportError{}, nil
}
return nil, nil return nil, nil
} }