diff --git a/apps/advisor/pkg/app/checks/datasourcecheck/check.go b/apps/advisor/pkg/app/checks/datasourcecheck/check.go index 88e10a72719..e946f2e2819 100644 --- a/apps/advisor/pkg/app/checks/datasourcecheck/check.go +++ b/apps/advisor/pkg/app/checks/datasourcecheck/check.go @@ -12,7 +12,6 @@ import ( "github.com/grafana/grafana/pkg/services/datasources" "github.com/grafana/grafana/pkg/services/pluginsintegration/pluginstore" "github.com/grafana/grafana/pkg/util" - "k8s.io/klog/v2" ) type check struct { @@ -76,26 +75,23 @@ func (s *uidValidationStep) Description() string { return "Check if the UID of each data source is valid." } -func (s *uidValidationStep) Run(ctx context.Context, obj *advisor.CheckSpec, items []any) ([]advisor.CheckReportError, error) { - dsErrs := []advisor.CheckReportError{} - for _, i := range items { - ds, ok := i.(*datasources.DataSource) - if !ok { - return nil, fmt.Errorf("invalid item type %T", i) - } - // Data source UID validation - err := util.ValidateUID(ds.UID) - if err != nil { - dsErrs = append(dsErrs, checks.NewCheckReportError( - advisor.CheckReportErrorSeverityLow, - fmt.Sprintf("Invalid UID '%s' for data source %s", ds.UID, ds.Name), - "Check the documentation for more information.", - s.ID(), - ds.UID, - )) - } +func (s *uidValidationStep) Run(ctx context.Context, obj *advisor.CheckSpec, i any) (*advisor.CheckReportError, error) { + ds, ok := i.(*datasources.DataSource) + if !ok { + return nil, fmt.Errorf("invalid item type %T", i) } - return dsErrs, nil + // Data source UID validation + err := util.ValidateUID(ds.UID) + if err != nil { + return checks.NewCheckReportError( + advisor.CheckReportErrorSeverityLow, + fmt.Sprintf("Invalid UID '%s' for data source %s", ds.UID, ds.Name), + "Check the documentation for more information.", + s.ID(), + ds.UID, + ), nil + } + return nil, nil } type healthCheckStep struct { @@ -115,46 +111,38 @@ func (s *healthCheckStep) ID() string { return "health-check" } -func (s *healthCheckStep) Run(ctx context.Context, obj *advisor.CheckSpec, items []any) ([]advisor.CheckReportError, error) { - dsErrs := []advisor.CheckReportError{} - for _, i := range items { - ds, ok := i.(*datasources.DataSource) - if !ok { - return nil, fmt.Errorf("invalid item type %T", i) - } - - // Health check execution - requester, err := identity.GetRequester(ctx) - if err != nil { - return nil, err - } - pCtx, err := s.PluginContextProvider.GetWithDataSource(ctx, ds.Type, requester, ds) - if err != nil { - klog.ErrorS(err, "Error creating plugin context", "datasource", ds.Name) - continue - } - req := &backend.CheckHealthRequest{ - PluginContext: pCtx, - Headers: map[string]string{}, - } - resp, err := s.PluginClient.CheckHealth(ctx, req) - if err != nil { - fmt.Println("Error checking health", err) - continue - } - if resp.Status != backend.HealthStatusOk { - dsErrs = append(dsErrs, checks.NewCheckReportError( - advisor.CheckReportErrorSeverityHigh, - fmt.Sprintf("Health check failed for %s", ds.Name), - fmt.Sprintf( - "Go to the data source configuration"+ - " and address the issues reported.", ds.UID), - s.ID(), - ds.UID, - )) - } +func (s *healthCheckStep) Run(ctx context.Context, obj *advisor.CheckSpec, i any) (*advisor.CheckReportError, error) { + ds, ok := i.(*datasources.DataSource) + if !ok { + return nil, fmt.Errorf("invalid item type %T", i) } - return dsErrs, nil + + // Health check execution + requester, err := identity.GetRequester(ctx) + if err != nil { + return nil, err + } + pCtx, err := s.PluginContextProvider.GetWithDataSource(ctx, ds.Type, requester, ds) + if err != nil { + return nil, fmt.Errorf("failed to get plugin context: %w", err) + } + req := &backend.CheckHealthRequest{ + PluginContext: pCtx, + Headers: map[string]string{}, + } + resp, err := s.PluginClient.CheckHealth(ctx, req) + if err != nil || resp.Status != backend.HealthStatusOk { + return checks.NewCheckReportError( + advisor.CheckReportErrorSeverityHigh, + fmt.Sprintf("Health check failed for %s", ds.Name), + fmt.Sprintf( + "Go to the data source configuration"+ + " and address the issues reported.", ds.UID), + s.ID(), + ds.UID, + ), nil + } + return nil, nil } type pluginContextProvider interface { diff --git a/apps/advisor/pkg/app/checks/datasourcecheck/check_test.go b/apps/advisor/pkg/app/checks/datasourcecheck/check_test.go index c360f96ad14..5cfe595de67 100644 --- a/apps/advisor/pkg/app/checks/datasourcecheck/check_test.go +++ b/apps/advisor/pkg/app/checks/datasourcecheck/check_test.go @@ -35,9 +35,13 @@ func TestCheck_Run(t *testing.T) { assert.NoError(t, err) errs := []advisor.CheckReportError{} for _, step := range check.Steps() { - stepErrs, err := step.Run(ctx, &advisor.CheckSpec{}, items) - assert.NoError(t, err) - errs = append(errs, stepErrs...) + for _, item := range items { + stepErr, err := step.Run(ctx, &advisor.CheckSpec{}, item) + assert.NoError(t, err) + if stepErr != nil { + errs = append(errs, *stepErr) + } + } } assert.NoError(t, err) @@ -65,9 +69,13 @@ func TestCheck_Run(t *testing.T) { assert.NoError(t, err) errs := []advisor.CheckReportError{} for _, step := range check.Steps() { - stepErrs, err := step.Run(ctx, &advisor.CheckSpec{}, items) - assert.NoError(t, err) - errs = append(errs, stepErrs...) + for _, item := range items { + stepErr, err := step.Run(ctx, &advisor.CheckSpec{}, item) + assert.NoError(t, err) + if stepErr != nil { + errs = append(errs, *stepErr) + } + } } assert.NoError(t, err) @@ -96,9 +104,13 @@ func TestCheck_Run(t *testing.T) { assert.NoError(t, err) errs := []advisor.CheckReportError{} for _, step := range check.Steps() { - stepErrs, err := step.Run(ctx, &advisor.CheckSpec{}, items) - assert.NoError(t, err) - errs = append(errs, stepErrs...) + for _, item := range items { + stepErr, err := step.Run(ctx, &advisor.CheckSpec{}, item) + assert.NoError(t, err) + if stepErr != nil { + errs = append(errs, *stepErr) + } + } } assert.NoError(t, err) diff --git a/apps/advisor/pkg/app/checks/ifaces.go b/apps/advisor/pkg/app/checks/ifaces.go index b630a84871c..915d986a4a2 100644 --- a/apps/advisor/pkg/app/checks/ifaces.go +++ b/apps/advisor/pkg/app/checks/ifaces.go @@ -24,6 +24,6 @@ type Step interface { Title() string // Description returns the description of the step Description() string - // Run executes the step and returns a list of errors - Run(ctx context.Context, obj *advisorv0alpha1.CheckSpec, items []any) ([]advisorv0alpha1.CheckReportError, error) + // Run executes the step for an item and returns a report + Run(ctx context.Context, obj *advisorv0alpha1.CheckSpec, item any) (*advisorv0alpha1.CheckReportError, error) } diff --git a/apps/advisor/pkg/app/checks/plugincheck/check.go b/apps/advisor/pkg/app/checks/plugincheck/check.go index c024b2608b0..f9dc853d0b8 100644 --- a/apps/advisor/pkg/app/checks/plugincheck/check.go +++ b/apps/advisor/pkg/app/checks/plugincheck/check.go @@ -78,35 +78,33 @@ func (s *deprecationStep) ID() string { return "deprecation" } -func (s *deprecationStep) Run(ctx context.Context, _ *advisor.CheckSpec, items []any) ([]advisor.CheckReportError, error) { - errs := []advisor.CheckReportError{} - for _, i := range items { - p, ok := i.(pluginstore.Plugin) - if !ok { - return nil, fmt.Errorf("invalid item type %T", i) - } - - // Skip if it's a core plugin - if p.IsCorePlugin() { - continue - } - - // Check if plugin is deprecated - i, err := s.PluginRepo.PluginInfo(ctx, p.ID) - if err != nil { - continue - } - if i.Status == "deprecated" { - errs = append(errs, checks.NewCheckReportError( - advisor.CheckReportErrorSeverityHigh, - fmt.Sprintf("Plugin deprecated: %s", p.ID), - "Check the documentation for recommended steps.", - s.ID(), - p.ID, - )) - } +func (s *deprecationStep) Run(ctx context.Context, _ *advisor.CheckSpec, it any) (*advisor.CheckReportError, error) { + p, ok := it.(pluginstore.Plugin) + if !ok { + return nil, fmt.Errorf("invalid item type %T", it) } - return errs, nil + + // Skip if it's a core plugin + if p.IsCorePlugin() { + return nil, nil + } + + // Check if plugin is deprecated + i, err := s.PluginRepo.PluginInfo(ctx, p.ID) + if err != nil { + // Unable to check deprecation status + return nil, nil + } + if i.Status == "deprecated" { + return checks.NewCheckReportError( + advisor.CheckReportErrorSeverityHigh, + fmt.Sprintf("Plugin deprecated: %s", p.ID), + "Check the documentation for recommended steps.", + s.ID(), + p.ID, + ), nil + } + return nil, nil } type updateStep struct { @@ -127,44 +125,42 @@ func (s *updateStep) ID() string { return "update" } -func (s *updateStep) Run(ctx context.Context, _ *advisor.CheckSpec, items []any) ([]advisor.CheckReportError, error) { - errs := []advisor.CheckReportError{} - for _, i := range items { - p, ok := i.(pluginstore.Plugin) - if !ok { - return nil, fmt.Errorf("invalid item type %T", i) - } - - // Skip if it's a core plugin - if p.IsCorePlugin() { - continue - } - - // Skip if it's managed or pinned - if s.isManaged(ctx, p.ID) || s.PluginPreinstall.IsPinned(p.ID) { - continue - } - - // Check if plugin has a newer version available - compatOpts := repo.NewCompatOpts(services.GrafanaVersion, sysruntime.GOOS, sysruntime.GOARCH) - info, err := s.PluginRepo.GetPluginArchiveInfo(ctx, p.ID, "", compatOpts) - if err != nil { - continue - } - if hasUpdate(p, info) { - errs = append(errs, checks.NewCheckReportError( - advisor.CheckReportErrorSeverityLow, - fmt.Sprintf("New version available for %s", p.ID), - fmt.Sprintf( - "Go to the plugin admin page"+ - " and upgrade to the latest version.", p.ID), - s.ID(), - p.ID, - )) - } +func (s *updateStep) Run(ctx context.Context, _ *advisor.CheckSpec, i any) (*advisor.CheckReportError, error) { + p, ok := i.(pluginstore.Plugin) + if !ok { + return nil, fmt.Errorf("invalid item type %T", i) } - return errs, nil + // Skip if it's a core plugin + if p.IsCorePlugin() { + return nil, nil + } + + // Skip if it's managed or pinned + if s.isManaged(ctx, p.ID) || s.PluginPreinstall.IsPinned(p.ID) { + return nil, nil + } + + // Check if plugin has a newer version available + compatOpts := repo.NewCompatOpts(services.GrafanaVersion, sysruntime.GOOS, sysruntime.GOARCH) + info, err := s.PluginRepo.GetPluginArchiveInfo(ctx, p.ID, "", compatOpts) + if err != nil { + // Unable to check updates + return nil, nil + } + if hasUpdate(p, info) { + return checks.NewCheckReportError( + advisor.CheckReportErrorSeverityLow, + fmt.Sprintf("New version available for %s", p.ID), + fmt.Sprintf( + "Go to the plugin admin page"+ + " and upgrade to the latest version.", p.ID), + s.ID(), + p.ID, + ), nil + } + + return nil, nil } func hasUpdate(current pluginstore.Plugin, latest *repo.PluginArchiveInfo) bool { diff --git a/apps/advisor/pkg/app/checks/plugincheck/check_test.go b/apps/advisor/pkg/app/checks/plugincheck/check_test.go index 29dc7ccaabd..ff5190278e2 100644 --- a/apps/advisor/pkg/app/checks/plugincheck/check_test.go +++ b/apps/advisor/pkg/app/checks/plugincheck/check_test.go @@ -136,9 +136,13 @@ func TestRun(t *testing.T) { assert.NoError(t, err) errs := []advisor.CheckReportError{} for _, step := range check.Steps() { - stepErrs, err := step.Run(context.Background(), &advisor.CheckSpec{}, items) - assert.NoError(t, err) - errs = append(errs, stepErrs...) + for _, item := range items { + stepErr, err := step.Run(context.Background(), &advisor.CheckSpec{}, item) + assert.NoError(t, err) + if stepErr != nil { + errs = append(errs, *stepErr) + } + } } assert.NoError(t, err) assert.Equal(t, len(tt.plugins), len(items)) diff --git a/apps/advisor/pkg/app/checks/utils.go b/apps/advisor/pkg/app/checks/utils.go index a6c4089da44..053dc591389 100644 --- a/apps/advisor/pkg/app/checks/utils.go +++ b/apps/advisor/pkg/app/checks/utils.go @@ -10,8 +10,8 @@ func NewCheckReportError( action string, stepID string, itemID string, -) advisor.CheckReportError { - return advisor.CheckReportError{ +) *advisor.CheckReportError { + return &advisor.CheckReportError{ Severity: severity, Reason: reason, Action: action, diff --git a/apps/advisor/pkg/app/utils.go b/apps/advisor/pkg/app/utils.go index c59dbaca21e..b1809dd2ee3 100644 --- a/apps/advisor/pkg/app/utils.go +++ b/apps/advisor/pkg/app/utils.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "sync" claims "github.com/grafana/authlib/types" "github.com/grafana/grafana-app-sdk/resource" @@ -83,20 +84,17 @@ func processCheck(ctx context.Context, client resource.Client, obj resource.Obje } // Run the steps steps := check.Steps() - errs := []advisorv0alpha1.CheckReportError{} - for _, step := range steps { - stepErrs, err := step.Run(ctx, &c.Spec, items) - if err != nil { - setErr := setStatusAnnotation(ctx, client, obj, "error") - if setErr != nil { - return setErr - } - return fmt.Errorf("error running step %s: %w", step.Title(), err) + reportErrors, err := runStepsInParallel(ctx, &c.Spec, steps, items) + if err != nil { + setErr := setStatusAnnotation(ctx, client, obj, "error") + if setErr != nil { + return setErr } - errs = append(errs, stepErrs...) + return fmt.Errorf("error running steps: %w", err) } + report := &advisorv0alpha1.CheckV0alpha1StatusReport{ - Errors: errs, + Errors: reportErrors, Count: int64(len(items)), } err = setStatusAnnotation(ctx, client, obj, "processed") @@ -111,3 +109,35 @@ func processCheck(ctx context.Context, client resource.Client, obj resource.Obje }}, }, resource.PatchOptions{}, obj) } + +func runStepsInParallel(ctx context.Context, spec *advisorv0alpha1.CheckSpec, steps []checks.Step, items []any) ([]advisorv0alpha1.CheckReportError, error) { + reportErrs := []advisorv0alpha1.CheckReportError{} + var internalErr error + var wg sync.WaitGroup + var mu sync.Mutex + // Avoid too many concurrent requests + limit := make(chan struct{}, 10) + + for _, step := range steps { + for _, item := range items { + wg.Add(1) + limit <- struct{}{} + go func(step checks.Step, item any) { + defer wg.Done() + defer func() { <-limit }() + stepErr, err := step.Run(ctx, spec, item) + mu.Lock() + defer mu.Unlock() + if err != nil { + internalErr = fmt.Errorf("error running step %s: %w", step.ID(), err) + return + } + if stepErr != nil { + reportErrs = append(reportErrs, *stepErr) + } + }(step, item) + } + } + wg.Wait() + return reportErrs, internalErr +} diff --git a/apps/advisor/pkg/app/utils_test.go b/apps/advisor/pkg/app/utils_test.go index a13880f8209..5905e2a9c44 100644 --- a/apps/advisor/pkg/app/utils_test.go +++ b/apps/advisor/pkg/app/utils_test.go @@ -3,6 +3,7 @@ package app import ( "context" "errors" + "fmt" "testing" "github.com/grafana/grafana-app-sdk/resource" @@ -68,13 +69,45 @@ func TestProcessCheck(t *testing.T) { meta.SetCreatedBy("user:1") client := &mockClient{} ctx := context.TODO() - check := &mockCheck{} + check := &mockCheck{ + items: []any{"item"}, + } err = processCheck(ctx, client, obj, check) assert.NoError(t, err) assert.Equal(t, "processed", obj.GetAnnotations()[statusAnnotation]) } +func TestProcessMultipleCheckItems(t *testing.T) { + obj := &advisorv0alpha1.Check{} + obj.SetAnnotations(map[string]string{}) + meta, err := utils.MetaAccessor(obj) + if err != nil { + t.Fatal(err) + } + meta.SetCreatedBy("user:1") + client := &mockClient{} + ctx := context.TODO() + items := make([]any, 100) + for i := range items { + if i%2 == 0 { + items[i] = fmt.Sprintf("item-%d", i) + } else { + items[i] = errors.New("error") + } + } + check := &mockCheck{ + items: items, + } + + err = processCheck(ctx, client, obj, check) + assert.NoError(t, err) + assert.Equal(t, "processed", obj.GetAnnotations()[statusAnnotation]) + r := client.lastValue.(advisorv0alpha1.CheckV0alpha1StatusReport) + assert.Equal(t, r.Count, int64(100)) + assert.Len(t, r.Errors, 50) +} + func TestProcessCheck_AlreadyProcessed(t *testing.T) { obj := &advisorv0alpha1.Check{} obj.SetAnnotations(map[string]string{statusAnnotation: "processed"}) @@ -98,7 +131,8 @@ func TestProcessCheck_RunError(t *testing.T) { ctx := context.TODO() check := &mockCheck{ - err: errors.New("run error"), + items: []any{"item"}, + err: errors.New("run error"), } err = processCheck(ctx, client, obj, check) @@ -108,14 +142,17 @@ func TestProcessCheck_RunError(t *testing.T) { type mockClient struct { resource.Client + lastValue any } func (m *mockClient) PatchInto(ctx context.Context, id resource.Identifier, req resource.PatchRequest, opts resource.PatchOptions, obj resource.Object) error { + m.lastValue = req.Operations[0].Value return nil } type mockCheck struct { - err error + err error + items []any } func (m *mockCheck) ID() string { @@ -123,7 +160,7 @@ func (m *mockCheck) ID() string { } func (m *mockCheck) Items(ctx context.Context) ([]any, error) { - return []any{}, nil + return m.items, nil } func (m *mockCheck) Steps() []checks.Step { @@ -136,10 +173,13 @@ type mockStep struct { err error } -func (m *mockStep) Run(ctx context.Context, obj *advisorv0alpha1.CheckSpec, items []any) ([]advisorv0alpha1.CheckReportError, error) { +func (m *mockStep) Run(ctx context.Context, obj *advisorv0alpha1.CheckSpec, items any) (*advisorv0alpha1.CheckReportError, error) { if m.err != nil { return nil, m.err } + if _, ok := items.(error); ok { + return &advisorv0alpha1.CheckReportError{}, nil + } return nil, nil }